1use std::{cmp::Ordering, collections::BTreeMap, num::NonZeroU32, path::Path};
2
3use sea_streamer_types::{
4 export::futures::{future::BoxFuture, FutureExt},
5 Buffer, Message as MessageTrait, MessageHeader, OwnedMessage, SeqNo, SeqPos, ShardId,
6 SharedMessage, StreamKey, Timestamp, SEA_STREAMER_INTERNAL,
7};
8
9use crate::{
10 format::{Beacon, Checksum, FormatErr, Header, Marker, Message, RunningChecksum},
11 AsyncFile, BeaconReader, ByteBuffer, ByteSource, Bytes, DynFileSource, FileErr, FileId,
12 FileReader, FileSink, FileSourceType, SeekErr, StreamMode, SurveyResult, Surveyor,
13 SEA_STREAMER_WILDCARD,
14};
15
16pub const END_OF_STREAM: &str = "EOS";
17pub const PULSE_MESSAGE: &str = "PULSE";
18
19pub struct MessageSource {
21 header: Header,
22 source: DynFileSource,
23 buffer: ByteBuffer,
24 offset: u64,
25 beacon: (u32, Vec<Marker>),
26 pending: Option<Message>,
27}
28
29pub struct MessageSink {
31 sink: FileSinkState,
32 offset: u64,
33 beacon_interval: u32,
34 beacon: BTreeMap<(StreamKey, ShardId), BeaconState>,
35 beacon_count: u32,
36 message_count: u32,
37 started_from: u64,
38}
39
40enum FileSinkState {
41 Alive(FileSink),
42 Dead,
43}
44
45impl Default for FileSinkState {
46 fn default() -> Self {
47 Self::Dead
48 }
49}
50
51pub enum SeekTarget {
52 SeqNo(SeqNo),
53 Timestamp(Timestamp),
54 Beginning,
55 End,
56}
57
58pub(crate) struct BeaconState {
59 pub(crate) seq_no: SeqNo,
60 pub(crate) ts: Timestamp,
61 pub(crate) running_checksum: RunningChecksum,
62}
63
64impl MessageSource {
65 pub async fn new(file_id: FileId, mode: StreamMode) -> Result<Self, FileErr> {
70 let source = DynFileSource::new(
71 file_id,
72 match mode {
73 StreamMode::Live | StreamMode::LiveReplay => FileSourceType::FileSource,
74 StreamMode::Replay => FileSourceType::FileReader,
75 },
76 )
77 .await?;
78 Self::new_with(source, mode).await
79 }
80
81 pub(crate) async fn new_with(
82 mut source: DynFileSource,
83 mode: StreamMode,
84 ) -> Result<Self, FileErr> {
85 let header = Header::read_from(&mut source).await?;
86 assert!(Header::size() <= header.beacon_interval as usize);
87 let mut stream = Self {
88 header,
89 source,
90 buffer: ByteBuffer::new(),
91 offset: Header::size() as u64,
92 beacon: (0, Vec::new()),
93 pending: None,
94 };
95 if mode == StreamMode::Live {
96 stream.rewind(SeqPos::End).await?;
97 }
98 Ok(stream)
99 }
100
101 pub fn file_header(&self) -> &Header {
102 &self.header
103 }
104
105 pub async fn rewind(&mut self, target: SeqPos) -> Result<u32, FileErr> {
111 let pos = match target {
112 SeqPos::Beginning | SeqPos::At(0) => SeqPos::At(Header::size() as u64),
113 SeqPos::End => SeqPos::End,
114 SeqPos::At(nth) => {
115 let at = nth * self.beacon_interval();
116 if at < self.known_size() {
117 SeqPos::At(at)
118 } else {
119 SeqPos::End
120 }
121 }
122 };
123 self.offset = self.source.seek(pos).await?;
124
125 if pos == SeqPos::End {
127 let max = self.known_size() - (self.known_size() % self.beacon_interval());
128 let max = std::cmp::max(max, Header::size() as u64);
129 let pos = match target {
130 SeqPos::Beginning | SeqPos::At(0) => unreachable!(),
131 SeqPos::End => max,
132 SeqPos::At(nth) => {
133 let at = nth * self.beacon_interval();
134 if at < self.known_size() {
135 at
136 } else {
137 max
138 }
139 }
140 };
141 if self.offset != pos {
142 self.offset = self.source.seek(SeqPos::At(pos)).await?;
143 }
144 }
145
146 self.buffer.clear();
147 self.clear_beacon();
148
149 while let Some(i) = self.has_beacon(self.offset) {
151 let beacon = Beacon::read_from(&mut self.source).await?;
152 let beacon_size = beacon.size();
153 self.offset += beacon_size as u64;
154 self.beacon = (i, beacon.items);
155
156 let bytes = self
157 .source
158 .request_bytes(std::cmp::min(
159 beacon.remaining_messages_bytes as usize,
160 self.beacon_interval() as usize - beacon_size,
161 ))
162 .await?;
163 self.offset += bytes.len() as u64;
164 }
165
166 if matches!(target, SeqPos::End) && self.offset < self.known_size() {
169 let mut next = self.offset;
170 let bytes = self
171 .source
172 .request_bytes((self.known_size() - self.offset) as usize)
173 .await?;
174 let mut buffer = ByteBuffer::one(bytes);
175 while let Ok(message) = Message::read_from(&mut buffer).await {
176 next += message.size() as u64;
177 }
178 self.offset = self.source.seek(SeqPos::At(next)).await?;
179 }
180
181 Ok((self.offset / self.beacon_interval()) as u32)
182 }
183
184 pub async fn seek(
186 &mut self,
187 stream_key: &StreamKey,
188 shard_id: &ShardId,
189 to: SeekTarget,
190 ) -> Result<(), FileErr> {
191 match to {
193 SeekTarget::Beginning => return self.rewind(SeqPos::Beginning).await.map(|_| ()),
194 SeekTarget::End => return self.rewind(SeqPos::End).await.map(|_| ()),
195 _ => (),
196 }
197 let savepoint = self.offset;
198 let source_type = self.source.source_type();
199 let source = std::mem::replace(&mut self.source, DynFileSource::Dead);
200 self.source = source.switch_to(FileSourceType::FileReader).await?;
201 self.source.resize().await?;
202 #[allow(clippy::never_loop)]
203 let res = 'outer: loop {
204 let surveyor = Surveyor::new(self, |b: &Beacon| {
206 for item in b.items.iter() {
207 if (stream_key, shard_id) == (item.header.stream_key(), item.header.shard_id())
208 {
209 return compare(&to, &item.header);
210 }
211 }
212 SurveyResult::Undecided
213 })
214 .await;
215
216 let surveyor = match surveyor {
217 Ok(s) => s,
218 Err(e) => {
219 break Err(e);
220 }
221 };
222 let (pos, _) = match surveyor.run().await {
223 Ok(s) => s,
224 Err(e) => {
225 break Err(e);
226 }
227 };
228 match self.rewind(SeqPos::At(pos as u64)).await {
230 Ok(_) => (),
231 Err(e) => {
232 break 'outer match e {
233 FileErr::NotEnoughBytes => Err(FileErr::SeekErr(SeekErr::OutOfBound)),
234 e => Err(e),
235 }
236 }
237 };
238 loop {
240 let mess = match self.next().await {
241 Ok(m) => m,
242 Err(e) => {
243 break 'outer match e {
244 FileErr::NotEnoughBytes => Err(FileErr::SeekErr(SeekErr::OutOfBound)),
245 e => Err(e),
246 }
247 }
248 };
249 if let SurveyResult::Right = compare(&to, mess.message.header()) {
250 self.pending = Some(mess);
252 break;
253 }
254 }
255 break Ok(());
256 };
257
258 let source = std::mem::replace(&mut self.source, DynFileSource::Dead);
260 self.source = source.switch_to(source_type).await?;
261
262 if res.is_err() {
263 self.source.seek(SeqPos::At(savepoint)).await?;
264 self.buffer.clear();
265 self.pending.take();
266 }
267
268 fn compare(to: &SeekTarget, header: &MessageHeader) -> SurveyResult {
274 match to {
275 SeekTarget::Beginning | SeekTarget::End => panic!("Should not appear here"),
276 SeekTarget::SeqNo(no) => match header.sequence().cmp(no) {
277 Ordering::Less => SurveyResult::Left,
278 Ordering::Greater | Ordering::Equal => SurveyResult::Right,
279 },
280 SeekTarget::Timestamp(ts) => match header.timestamp().cmp(ts) {
281 Ordering::Less | Ordering::Equal => SurveyResult::Left,
282 Ordering::Greater => SurveyResult::Right,
283 },
284 }
285 }
286
287 res
288 }
289
290 #[inline]
291 fn beacon_interval(&self) -> u64 {
292 self.header.beacon_interval as u64
293 }
294
295 fn has_beacon(&self, offset: u64) -> Option<u32> {
296 if offset > 0 && offset % self.beacon_interval() == 0 {
297 Some((offset / self.beacon_interval()) as u32)
298 } else {
299 None
300 }
301 }
302
303 async fn request_bytes(&mut self, size: usize) -> Result<Bytes, FileErr> {
304 loop {
305 if let Some(i) = self.has_beacon(self.offset) {
306 let beacon = Beacon::read_from(&mut self.source).await?;
307 self.offset += beacon.size() as u64;
308 self.beacon = (i, beacon.items);
309 }
310
311 let chunk = std::cmp::min(
312 size - self.buffer.size(), self.beacon_interval() as usize - (self.offset % self.beacon_interval()) as usize, );
315 let bytes = self.source.request_bytes(chunk).await?;
316 self.offset += chunk as u64;
317 self.buffer.append(bytes); debug_assert!(self.buffer.size() <= size, "we should never over-read");
320 if self.buffer.size() == size {
321 return Ok(self.buffer.consume(size));
322 }
323 }
324 }
325
326 pub async fn switch_to(&mut self, stype: FileSourceType) -> Result<(), FileErr> {
330 let source = std::mem::replace(&mut self.source, DynFileSource::Dead);
331 self.source = source.switch_to(stype).await?;
332 Ok(())
333 }
334
335 pub async fn next(&mut self) -> Result<Message, FileErr> {
337 let message = match self.pending.take() {
338 Some(m) => m,
339 None => Message::read_from(self).await?,
340 };
341 let computed = message.compute_checksum();
342 if message.checksum != computed {
343 Err(FileErr::FormatErr(FormatErr::ChecksumErr {
344 received: message.checksum,
345 computed,
346 }))
347 } else {
348 Ok(message)
349 }
350 }
351
352 pub fn beacon(&self) -> (u32, &[Marker]) {
362 (self.beacon.0, &self.beacon.1)
363 }
364
365 fn clear_beacon(&mut self) {
366 self.beacon.0 = 0;
367 self.beacon.1.clear();
368 }
369
370 #[inline]
371 pub fn offset(&self) -> u64 {
372 self.offset
373 }
374
375 #[inline]
376 fn known_size(&self) -> u64 {
377 self.source.file_size()
378 }
379
380 pub(crate) fn take_source(self) -> DynFileSource {
381 self.source
382 }
383}
384
385impl ByteSource for MessageSource {
386 type Future<'a> = BoxFuture<'a, Result<Bytes, FileErr>>;
388
389 fn request_bytes(&mut self, size: usize) -> Self::Future<'_> {
392 self.request_bytes(size).boxed()
393 }
394}
395
396impl BeaconReader for MessageSource {
397 type Future<'a> = BoxFuture<'a, Result<Beacon, FileErr>>;
398
399 fn survey(&mut self, at: NonZeroU32) -> Self::Future<'_> {
400 async move {
401 let at = at.get() as u64 * self.beacon_interval();
402 let offset = self.source.seek(SeqPos::At(at)).await?;
403 if at == offset {
404 let beacon = Beacon::read_from(&mut self.source).await?;
405 Ok(beacon)
406 } else {
407 Err(FileErr::NotEnoughBytes)
408 }
409 }
410 .boxed()
411 }
412
413 fn max_beacons(&self) -> u32 {
414 (self.source.file_size() / self.beacon_interval()) as u32
415 }
416}
417
418impl MessageSink {
419 pub async fn new(file_id: FileId, beacon_interval: u32, limit: u64) -> Result<Self, FileErr> {
421 let file = AsyncFile::new_ow(file_id).await?;
422 Self::new_with(file, beacon_interval, limit).await
423 }
424
425 pub async fn append(
427 file_id: FileId,
428 beacon_interval: u32,
429 limit: u64,
430 ) -> Result<Self, FileErr> {
431 let file = AsyncFile::new_rw(file_id.clone()).await?;
432 if file.size() == 0 {
433 Self::new_with(file, beacon_interval, limit).await
434 } else {
435 let source =
436 DynFileSource::FileReader(FileReader::new_with(file, 0, Default::default())?);
437 let mut source = MessageSource::new_with(source, StreamMode::Replay).await?;
438 let mut offset = 0;
439 match source.rewind(SeqPos::End).await {
440 Ok(mut nth) => {
441 offset = source.offset;
442 let mut read = false;
444 loop {
445 match source.next().await {
446 Ok(m) => {
447 if is_end_of_stream(&m.message) {
448 if read {
449 break;
451 } else {
452 }
454 } else {
455 offset = source.offset;
457 read = true;
458 }
459 }
460 Err(FileErr::NotEnoughBytes) => {
461 if !read {
462 if nth > 0 {
463 nth -= 1;
465 source.rewind(SeqPos::At(nth as u64)).await?;
466 } else {
467 break;
469 }
470 } else {
471 break;
473 }
474 }
475 Err(e) => return Err(e),
476 }
477 }
478 }
479 Err(FileErr::NotEnoughBytes) => {
480 }
482 Err(e) => return Err(e),
483 }
484 if beacon_interval != source.header.beacon_interval {
485 log::warn!(
486 "Beacon interval mismatch: expected {}, got {}",
487 beacon_interval,
488 source.header.beacon_interval
489 );
490 }
491 let beacon_interval = source.header.beacon_interval;
492 let has_beacon = source.has_beacon(offset).is_some();
493 if let DynFileSource::FileReader(reader) = source.source {
494 let (mut file, _, _) = reader.end();
495 assert_eq!(offset, file.seek(SeqPos::At(offset)).await?);
496 let mut sink = FileSink::new(file, limit)?;
497
498 if has_beacon {
499 offset += Beacon {
501 remaining_messages_bytes: 0,
502 items: Default::default(),
503 }
504 .write_to(&mut sink)? as u64;
505 sink.flush(0).await?;
506 }
507
508 Ok(Self {
509 sink: FileSinkState::Alive(sink),
510 offset,
511 beacon_interval,
512 beacon: Default::default(),
513 beacon_count: 0,
514 message_count: 0,
515 started_from: offset,
516 })
517 } else {
518 unreachable!()
519 }
520 }
521 }
522
523 async fn new_with(file: AsyncFile, beacon_interval: u32, limit: u64) -> Result<Self, FileErr> {
524 assert!(Header::size() <= beacon_interval as usize);
525 let header = Self::new_header(&file, beacon_interval);
526 let mut sink = FileSink::new(file, limit)?;
527 let mut offset = header.write_to(&mut sink)?;
528 if offset == beacon_interval as usize {
529 offset += Beacon {
531 remaining_messages_bytes: 0,
532 items: Default::default(),
533 }
534 .write_to(&mut sink)?;
535 }
536 sink.flush(0).await?;
537
538 Ok(Self {
539 sink: FileSinkState::Alive(sink),
540 offset: offset as u64,
541 beacon_interval,
542 beacon: Default::default(),
543 beacon_count: 0,
544 message_count: 0,
545 started_from: offset as u64,
546 })
547 }
548
549 fn new_header(file: &AsyncFile, beacon_interval: u32) -> Header {
550 let path = file.id();
551 let path = path.path();
552 let path: &Path = path.as_ref();
553 let file_name: String = path.file_name().unwrap().to_str().unwrap().to_owned();
554 Header {
555 file_name,
556 created_at: Timestamp::now_utc(),
557 beacon_interval,
558 }
559 }
560
561 pub fn write(&mut self, message: OwnedMessage) -> Result<Checksum, FileErr> {
563 let key = (message.stream_key(), message.shard_id());
564 let (seq_no, ts) = (message.sequence(), message.timestamp());
565 let message = Message {
566 message,
567 checksum: 0,
568 };
569 let mut buffer = ByteBuffer::new();
570 let (_, checksum) = message.write_to(&mut buffer)?;
571 let entry = self.beacon.entry(key).or_insert(BeaconState {
572 seq_no,
573 ts,
574 running_checksum: RunningChecksum::new(),
575 });
576 entry.seq_no = std::cmp::max(seq_no, entry.seq_no);
577 entry.ts = std::cmp::max(ts, entry.ts);
578 entry.running_checksum.update(checksum);
579
580 while !buffer.is_empty() {
581 let chunk = self.beacon_interval as usize
582 - (self.offset % self.beacon_interval as u64) as usize;
583 let chunk: ByteBuffer = buffer.consume(std::cmp::min(chunk, buffer.size()));
584 self.offset += chunk.write_to(self.sink())? as u64;
585
586 if self.offset > 0 && self.offset % self.beacon_interval as u64 == 0 {
587 let num_markers = Beacon::num_markers(self.beacon_interval as usize);
588 let mut items = Vec::new();
589 for ((key, sid), beacon) in self
592 .beacon
593 .iter()
594 .skip(self.beacon_count as usize % self.beacon.len())
595 .chain(self.beacon.iter())
596 .take(std::cmp::min(self.beacon.len(), num_markers))
597 {
598 items.push(Marker {
599 header: MessageHeader::new(key.to_owned(), *sid, beacon.seq_no, beacon.ts),
600 running_checksum: beacon.running_checksum.crc(),
601 });
602 }
603 let beacon_count = items.len();
604 let beacon = Beacon {
605 remaining_messages_bytes: buffer.size() as u32,
606 items,
607 };
608 self.offset += beacon.write_to(self.sink())? as u64;
609 self.beacon_count += beacon_count as u32;
610 }
611 }
612
613 self.message_count += 1;
614
615 Ok(checksum)
616 }
617
618 #[inline]
619 pub fn offset(&self) -> u64 {
620 self.offset
621 }
622
623 #[inline]
625 pub fn started_from(&self) -> u64 {
626 self.started_from
627 }
628
629 pub async fn flush(&mut self) -> Result<(), FileErr> {
630 let c = self.message_count;
631 self.sink().flush(c).await
632 }
633
634 pub async fn end(mut self, eos: bool) -> Result<(), FileErr> {
636 if eos {
637 self.write(end_of_stream())?;
638 }
639 self.flush().await?;
640 self.sink().sync_all().await
641 }
642
643 fn sink(&mut self) -> &mut FileSink {
644 match &mut self.sink {
645 FileSinkState::Alive(sink) => sink,
646 FileSinkState::Dead => panic!("FileSinkState::Dead"),
647 }
648 }
649
650 pub(crate) async fn take_file(&mut self) -> Result<AsyncFile, FileErr> {
652 let sink = std::mem::take(&mut self.sink);
653 match sink {
654 FileSinkState::Alive(sink) => sink.end().await,
655 FileSinkState::Dead => panic!("FileSinkState::Dead"),
656 }
657 }
658
659 pub(crate) fn use_file(&mut self, sink: FileSink) {
661 self.sink = FileSinkState::Alive(sink);
662 }
663
664 pub(crate) fn update_stream_state(&mut self, key: (StreamKey, ShardId), state: BeaconState) {
665 self.beacon.insert(key, state);
666 }
667}
668
669pub trait HasMessageHeader: MessageTrait {
670 fn header(&self) -> &MessageHeader;
671}
672impl HasMessageHeader for SharedMessage {
673 fn header(&self) -> &MessageHeader {
674 self.header()
675 }
676}
677impl HasMessageHeader for OwnedMessage {
678 fn header(&self) -> &MessageHeader {
679 self.header()
680 }
681}
682
683pub fn end_of_stream() -> OwnedMessage {
685 let header = MessageHeader::new(
686 StreamKey::new(SEA_STREAMER_INTERNAL).unwrap(),
687 ShardId::new(0),
688 0,
689 Timestamp::now_utc(),
690 );
691 OwnedMessage::new(header, END_OF_STREAM.into_bytes())
692}
693
694pub fn is_end_of_stream<M: HasMessageHeader>(mess: &M) -> bool {
695 mess.header().stream_key().name() == SEA_STREAMER_INTERNAL
696 && mess.message().as_bytes() == END_OF_STREAM.as_bytes()
697}
698
699pub(crate) fn pulse_message() -> OwnedMessage {
701 let header = MessageHeader::new(
702 StreamKey::new(SEA_STREAMER_INTERNAL).unwrap(),
703 ShardId::new(0),
704 0,
705 Timestamp::now_utc(),
706 );
707 OwnedMessage::new(header, PULSE_MESSAGE.into_bytes())
708}
709
710pub(crate) fn is_pulse(mess: &SharedMessage) -> bool {
711 mess.header().stream_key().name() == SEA_STREAMER_INTERNAL
712 && mess.message().as_bytes() == PULSE_MESSAGE.as_bytes()
713}
714
715pub(crate) fn is_internal(mess: &SharedMessage) -> bool {
716 mess.header().stream_key().name() == SEA_STREAMER_INTERNAL
717}
718
719pub(crate) fn is_wildcard(key: &StreamKey) -> bool {
720 key.name() == SEA_STREAMER_WILDCARD
721}