sea_streamer_file/
messages.rs

1use std::{cmp::Ordering, collections::BTreeMap, num::NonZeroU32, path::Path};
2
3use sea_streamer_types::{
4    export::futures::{future::BoxFuture, FutureExt},
5    Buffer, Message as MessageTrait, MessageHeader, OwnedMessage, SeqNo, SeqPos, ShardId,
6    SharedMessage, StreamKey, Timestamp, SEA_STREAMER_INTERNAL,
7};
8
9use crate::{
10    format::{Beacon, Checksum, FormatErr, Header, Marker, Message, RunningChecksum},
11    AsyncFile, BeaconReader, ByteBuffer, ByteSource, Bytes, DynFileSource, FileErr, FileId,
12    FileReader, FileSink, FileSourceType, SeekErr, StreamMode, SurveyResult, Surveyor,
13    SEA_STREAMER_WILDCARD,
14};
15
16pub const END_OF_STREAM: &str = "EOS";
17pub const PULSE_MESSAGE: &str = "PULSE";
18
19/// A high level file reader that demux messages and beacon
20pub struct MessageSource {
21    header: Header,
22    source: DynFileSource,
23    buffer: ByteBuffer,
24    offset: u64,
25    beacon: (u32, Vec<Marker>),
26    pending: Option<Message>,
27}
28
29/// A high level file writer that mux messages and beacon
30pub struct MessageSink {
31    sink: FileSinkState,
32    offset: u64,
33    beacon_interval: u32,
34    beacon: BTreeMap<(StreamKey, ShardId), BeaconState>,
35    beacon_count: u32,
36    message_count: u32,
37    started_from: u64,
38}
39
40enum FileSinkState {
41    Alive(FileSink),
42    Dead,
43}
44
45impl Default for FileSinkState {
46    fn default() -> Self {
47        Self::Dead
48    }
49}
50
51pub enum SeekTarget {
52    SeqNo(SeqNo),
53    Timestamp(Timestamp),
54    Beginning,
55    End,
56}
57
58pub(crate) struct BeaconState {
59    pub(crate) seq_no: SeqNo,
60    pub(crate) ts: Timestamp,
61    pub(crate) running_checksum: RunningChecksum,
62}
63
64impl MessageSource {
65    /// Creates a new message source. First, the stream Header is read
66    /// from the file's beginning.
67    ///
68    /// If StreamMode is `Live`, it will fast forward to the file's end.
69    pub async fn new(file_id: FileId, mode: StreamMode) -> Result<Self, FileErr> {
70        let source = DynFileSource::new(
71            file_id,
72            match mode {
73                StreamMode::Live | StreamMode::LiveReplay => FileSourceType::FileSource,
74                StreamMode::Replay => FileSourceType::FileReader,
75            },
76        )
77        .await?;
78        Self::new_with(source, mode).await
79    }
80
81    pub(crate) async fn new_with(
82        mut source: DynFileSource,
83        mode: StreamMode,
84    ) -> Result<Self, FileErr> {
85        let header = Header::read_from(&mut source).await?;
86        assert!(Header::size() <= header.beacon_interval as usize);
87        let mut stream = Self {
88            header,
89            source,
90            buffer: ByteBuffer::new(),
91            offset: Header::size() as u64,
92            beacon: (0, Vec::new()),
93            pending: None,
94        };
95        if mode == StreamMode::Live {
96            stream.rewind(SeqPos::End).await?;
97        }
98        Ok(stream)
99    }
100
101    pub fn file_header(&self) -> &Header {
102        &self.header
103    }
104
105    /// Rewind the message stream to a coarse position.
106    /// SeqNo is regarded as the N-th beacon.
107    /// Returns the current location in terms of N-th beacon.
108    ///
109    /// Warning: This future must not be canceled.
110    pub async fn rewind(&mut self, target: SeqPos) -> Result<u32, FileErr> {
111        let pos = match target {
112            SeqPos::Beginning | SeqPos::At(0) => SeqPos::At(Header::size() as u64),
113            SeqPos::End => SeqPos::End,
114            SeqPos::At(nth) => {
115                let at = nth * self.beacon_interval();
116                if at < self.known_size() {
117                    SeqPos::At(at)
118                } else {
119                    SeqPos::End
120                }
121            }
122        };
123        self.offset = self.source.seek(pos).await?;
124
125        // Align at a beacon
126        if pos == SeqPos::End {
127            let max = self.known_size() - (self.known_size() % self.beacon_interval());
128            let max = std::cmp::max(max, Header::size() as u64);
129            let pos = match target {
130                SeqPos::Beginning | SeqPos::At(0) => unreachable!(),
131                SeqPos::End => max,
132                SeqPos::At(nth) => {
133                    let at = nth * self.beacon_interval();
134                    if at < self.known_size() {
135                        at
136                    } else {
137                        max
138                    }
139                }
140            };
141            if self.offset != pos {
142                self.offset = self.source.seek(SeqPos::At(pos)).await?;
143            }
144        }
145
146        self.buffer.clear();
147        self.clear_beacon();
148
149        // Read until the start of the next message
150        while let Some(i) = self.has_beacon(self.offset) {
151            let beacon = Beacon::read_from(&mut self.source).await?;
152            let beacon_size = beacon.size();
153            self.offset += beacon_size as u64;
154            self.beacon = (i, beacon.items);
155
156            let bytes = self
157                .source
158                .request_bytes(std::cmp::min(
159                    beacon.remaining_messages_bytes as usize,
160                    self.beacon_interval() as usize - beacon_size,
161                ))
162                .await?;
163            self.offset += bytes.len() as u64;
164        }
165
166        // Now we are at the first message after the last beacon,
167        // we want to consume all messages up to known size
168        if matches!(target, SeqPos::End) && self.offset < self.known_size() {
169            let mut next = self.offset;
170            let bytes = self
171                .source
172                .request_bytes((self.known_size() - self.offset) as usize)
173                .await?;
174            let mut buffer = ByteBuffer::one(bytes);
175            while let Ok(message) = Message::read_from(&mut buffer).await {
176                next += message.size() as u64;
177            }
178            self.offset = self.source.seek(SeqPos::At(next)).await?;
179        }
180
181        Ok((self.offset / self.beacon_interval()) as u32)
182    }
183
184    /// Warning: This future must not be canceled.
185    pub async fn seek(
186        &mut self,
187        stream_key: &StreamKey,
188        shard_id: &ShardId,
189        to: SeekTarget,
190    ) -> Result<(), FileErr> {
191        // a short cut
192        match to {
193            SeekTarget::Beginning => return self.rewind(SeqPos::Beginning).await.map(|_| ()),
194            SeekTarget::End => return self.rewind(SeqPos::End).await.map(|_| ()),
195            _ => (),
196        }
197        let savepoint = self.offset;
198        let source_type = self.source.source_type();
199        let source = std::mem::replace(&mut self.source, DynFileSource::Dead);
200        self.source = source.switch_to(FileSourceType::FileReader).await?;
201        self.source.resize().await?;
202        #[allow(clippy::never_loop)]
203        let res = 'outer: loop {
204            // survey the beacons to narrow down the scope of search
205            let surveyor = Surveyor::new(self, |b: &Beacon| {
206                for item in b.items.iter() {
207                    if (stream_key, shard_id) == (item.header.stream_key(), item.header.shard_id())
208                    {
209                        return compare(&to, &item.header);
210                    }
211                }
212                SurveyResult::Undecided
213            })
214            .await;
215
216            let surveyor = match surveyor {
217                Ok(s) => s,
218                Err(e) => {
219                    break Err(e);
220                }
221            };
222            let (pos, _) = match surveyor.run().await {
223                Ok(s) => s,
224                Err(e) => {
225                    break Err(e);
226                }
227            };
228            // now we know roughly where's the message
229            match self.rewind(SeqPos::At(pos as u64)).await {
230                Ok(_) => (),
231                Err(e) => {
232                    break 'outer match e {
233                        FileErr::NotEnoughBytes => Err(FileErr::SeekErr(SeekErr::OutOfBound)),
234                        e => Err(e),
235                    }
236                }
237            };
238            // read until we found what we want
239            loop {
240                let mess = match self.next().await {
241                    Ok(m) => m,
242                    Err(e) => {
243                        break 'outer match e {
244                            FileErr::NotEnoughBytes => Err(FileErr::SeekErr(SeekErr::OutOfBound)),
245                            e => Err(e),
246                        }
247                    }
248                };
249                if let SurveyResult::Right = compare(&to, mess.message.header()) {
250                    // This is a wanted message!
251                    self.pending = Some(mess);
252                    break;
253                }
254            }
255            break Ok(());
256        };
257
258        // Restore file source to original state
259        let source = std::mem::replace(&mut self.source, DynFileSource::Dead);
260        self.source = source.switch_to(source_type).await?;
261
262        if res.is_err() {
263            self.source.seek(SeqPos::At(savepoint)).await?;
264            self.buffer.clear();
265            self.pending.take();
266        }
267
268        /// In the nutshell, for SeqNo the condition is >= N.
269        /// While for Timestamp, the condition is > N.
270        ///
271        /// Reason being, SeqNo is a discrete time thus precise;
272        /// Timestamp is a continuous time, thus, should be treated as a real number.
273        fn compare(to: &SeekTarget, header: &MessageHeader) -> SurveyResult {
274            match to {
275                SeekTarget::Beginning | SeekTarget::End => panic!("Should not appear here"),
276                SeekTarget::SeqNo(no) => match header.sequence().cmp(no) {
277                    Ordering::Less => SurveyResult::Left,
278                    Ordering::Greater | Ordering::Equal => SurveyResult::Right,
279                },
280                SeekTarget::Timestamp(ts) => match header.timestamp().cmp(ts) {
281                    Ordering::Less | Ordering::Equal => SurveyResult::Left,
282                    Ordering::Greater => SurveyResult::Right,
283                },
284            }
285        }
286
287        res
288    }
289
290    #[inline]
291    fn beacon_interval(&self) -> u64 {
292        self.header.beacon_interval as u64
293    }
294
295    fn has_beacon(&self, offset: u64) -> Option<u32> {
296        if offset > 0 && offset % self.beacon_interval() == 0 {
297            Some((offset / self.beacon_interval()) as u32)
298        } else {
299            None
300        }
301    }
302
303    async fn request_bytes(&mut self, size: usize) -> Result<Bytes, FileErr> {
304        loop {
305            if let Some(i) = self.has_beacon(self.offset) {
306                let beacon = Beacon::read_from(&mut self.source).await?;
307                self.offset += beacon.size() as u64;
308                self.beacon = (i, beacon.items);
309            }
310
311            let chunk = std::cmp::min(
312                size - self.buffer.size(), // remaining size
313                self.beacon_interval() as usize - (self.offset % self.beacon_interval()) as usize, // should not read past the next beacon
314            );
315            let bytes = self.source.request_bytes(chunk).await?;
316            self.offset += chunk as u64;
317            self.buffer.append(bytes); // these are message bytes
318
319            debug_assert!(self.buffer.size() <= size, "we should never over-read");
320            if self.buffer.size() == size {
321                return Ok(self.buffer.consume(size));
322            }
323        }
324    }
325
326    /// Switch the file source type.
327    ///
328    /// Warning: This future must not be canceled.
329    pub async fn switch_to(&mut self, stype: FileSourceType) -> Result<(), FileErr> {
330        let source = std::mem::replace(&mut self.source, DynFileSource::Dead);
331        self.source = source.switch_to(stype).await?;
332        Ok(())
333    }
334
335    /// Read the next message.
336    pub async fn next(&mut self) -> Result<Message, FileErr> {
337        let message = match self.pending.take() {
338            Some(m) => m,
339            None => Message::read_from(self).await?,
340        };
341        let computed = message.compute_checksum();
342        if message.checksum != computed {
343            Err(FileErr::FormatErr(FormatErr::ChecksumErr {
344                received: message.checksum,
345                computed,
346            }))
347        } else {
348            Ok(message)
349        }
350    }
351
352    /// Get the most recent Beacon and it's index. Note that it is cleared (rather than carry-over)
353    /// on each Beacon point.
354    ///
355    /// Beacon index starts from 1 (don't wary, because 0 is the header), and we have the following
356    /// equation:
357    ///
358    /// ```ignore
359    /// file offset = beacon index * beacon interval
360    /// ```
361    pub fn beacon(&self) -> (u32, &[Marker]) {
362        (self.beacon.0, &self.beacon.1)
363    }
364
365    fn clear_beacon(&mut self) {
366        self.beacon.0 = 0;
367        self.beacon.1.clear();
368    }
369
370    #[inline]
371    pub fn offset(&self) -> u64 {
372        self.offset
373    }
374
375    #[inline]
376    fn known_size(&self) -> u64 {
377        self.source.file_size()
378    }
379
380    pub(crate) fn take_source(self) -> DynFileSource {
381        self.source
382    }
383}
384
385impl ByteSource for MessageSource {
386    /// Too complex to unroll by hand. Let's just box it.
387    type Future<'a> = BoxFuture<'a, Result<Bytes, FileErr>>;
388
389    /// Although this is exposed as public. Do not call this directly,
390    /// this will interfere the Message Stream.
391    fn request_bytes(&mut self, size: usize) -> Self::Future<'_> {
392        self.request_bytes(size).boxed()
393    }
394}
395
396impl BeaconReader for MessageSource {
397    type Future<'a> = BoxFuture<'a, Result<Beacon, FileErr>>;
398
399    fn survey(&mut self, at: NonZeroU32) -> Self::Future<'_> {
400        async move {
401            let at = at.get() as u64 * self.beacon_interval();
402            let offset = self.source.seek(SeqPos::At(at)).await?;
403            if at == offset {
404                let beacon = Beacon::read_from(&mut self.source).await?;
405                Ok(beacon)
406            } else {
407                Err(FileErr::NotEnoughBytes)
408            }
409        }
410        .boxed()
411    }
412
413    fn max_beacons(&self) -> u32 {
414        (self.source.file_size() / self.beacon_interval()) as u32
415    }
416}
417
418impl MessageSink {
419    /// Create a fresh sink. Overwrite if file already exists.
420    pub async fn new(file_id: FileId, beacon_interval: u32, limit: u64) -> Result<Self, FileErr> {
421        let file = AsyncFile::new_ow(file_id).await?;
422        Self::new_with(file, beacon_interval, limit).await
423    }
424
425    /// Create a sink. Append if file already exists, and follow its beacon interval.
426    pub async fn append(
427        file_id: FileId,
428        beacon_interval: u32,
429        limit: u64,
430    ) -> Result<Self, FileErr> {
431        let file = AsyncFile::new_rw(file_id.clone()).await?;
432        if file.size() == 0 {
433            Self::new_with(file, beacon_interval, limit).await
434        } else {
435            let source =
436                DynFileSource::FileReader(FileReader::new_with(file, 0, Default::default())?);
437            let mut source = MessageSource::new_with(source, StreamMode::Replay).await?;
438            let mut offset = 0;
439            match source.rewind(SeqPos::End).await {
440                Ok(mut nth) => {
441                    offset = source.offset;
442                    // we must read the last message, and truncate the EOS
443                    let mut read = false;
444                    loop {
445                        match source.next().await {
446                            Ok(m) => {
447                                if is_end_of_stream(&m.message) {
448                                    if read {
449                                        // the file ends with a EOS
450                                        break;
451                                    } else {
452                                        // the next iteration will be NotEnoughBytes
453                                    }
454                                } else {
455                                    // got a normal message
456                                    offset = source.offset;
457                                    read = true;
458                                }
459                            }
460                            Err(FileErr::NotEnoughBytes) => {
461                                if !read {
462                                    if nth > 0 {
463                                        // we need to rewind further backwards
464                                        nth -= 1;
465                                        source.rewind(SeqPos::At(nth as u64)).await?;
466                                    } else {
467                                        // we reached the start now
468                                        break;
469                                    }
470                                } else {
471                                    // the file ended without an EOS
472                                    break;
473                                }
474                            }
475                            Err(e) => return Err(e),
476                        }
477                    }
478                }
479                Err(FileErr::NotEnoughBytes) => {
480                    // the file has no messages
481                }
482                Err(e) => return Err(e),
483            }
484            if beacon_interval != source.header.beacon_interval {
485                log::warn!(
486                    "Beacon interval mismatch: expected {}, got {}",
487                    beacon_interval,
488                    source.header.beacon_interval
489                );
490            }
491            let beacon_interval = source.header.beacon_interval;
492            let has_beacon = source.has_beacon(offset).is_some();
493            if let DynFileSource::FileReader(reader) = source.source {
494                let (mut file, _, _) = reader.end();
495                assert_eq!(offset, file.seek(SeqPos::At(offset)).await?);
496                let mut sink = FileSink::new(file, limit)?;
497
498                if has_beacon {
499                    // if coincidentally we are at a beacon location
500                    offset += Beacon {
501                        remaining_messages_bytes: 0,
502                        items: Default::default(),
503                    }
504                    .write_to(&mut sink)? as u64;
505                    sink.flush(0).await?;
506                }
507
508                Ok(Self {
509                    sink: FileSinkState::Alive(sink),
510                    offset,
511                    beacon_interval,
512                    beacon: Default::default(),
513                    beacon_count: 0,
514                    message_count: 0,
515                    started_from: offset,
516                })
517            } else {
518                unreachable!()
519            }
520        }
521    }
522
523    async fn new_with(file: AsyncFile, beacon_interval: u32, limit: u64) -> Result<Self, FileErr> {
524        assert!(Header::size() <= beacon_interval as usize);
525        let header = Self::new_header(&file, beacon_interval);
526        let mut sink = FileSink::new(file, limit)?;
527        let mut offset = header.write_to(&mut sink)?;
528        if offset == beacon_interval as usize {
529            // a very special case
530            offset += Beacon {
531                remaining_messages_bytes: 0,
532                items: Default::default(),
533            }
534            .write_to(&mut sink)?;
535        }
536        sink.flush(0).await?;
537
538        Ok(Self {
539            sink: FileSinkState::Alive(sink),
540            offset: offset as u64,
541            beacon_interval,
542            beacon: Default::default(),
543            beacon_count: 0,
544            message_count: 0,
545            started_from: offset as u64,
546        })
547    }
548
549    fn new_header(file: &AsyncFile, beacon_interval: u32) -> Header {
550        let path = file.id();
551        let path = path.path();
552        let path: &Path = path.as_ref();
553        let file_name: String = path.file_name().unwrap().to_str().unwrap().to_owned();
554        Header {
555            file_name,
556            created_at: Timestamp::now_utc(),
557            beacon_interval,
558        }
559    }
560
561    /// This method does not block. To make sure messages have been written, call [`MessageSink::flush`].
562    pub fn write(&mut self, message: OwnedMessage) -> Result<Checksum, FileErr> {
563        let key = (message.stream_key(), message.shard_id());
564        let (seq_no, ts) = (message.sequence(), message.timestamp());
565        let message = Message {
566            message,
567            checksum: 0,
568        };
569        let mut buffer = ByteBuffer::new();
570        let (_, checksum) = message.write_to(&mut buffer)?;
571        let entry = self.beacon.entry(key).or_insert(BeaconState {
572            seq_no,
573            ts,
574            running_checksum: RunningChecksum::new(),
575        });
576        entry.seq_no = std::cmp::max(seq_no, entry.seq_no);
577        entry.ts = std::cmp::max(ts, entry.ts);
578        entry.running_checksum.update(checksum);
579
580        while !buffer.is_empty() {
581            let chunk = self.beacon_interval as usize
582                - (self.offset % self.beacon_interval as u64) as usize;
583            let chunk: ByteBuffer = buffer.consume(std::cmp::min(chunk, buffer.size()));
584            self.offset += chunk.write_to(self.sink())? as u64;
585
586            if self.offset > 0 && self.offset % self.beacon_interval as u64 == 0 {
587                let num_markers = Beacon::num_markers(self.beacon_interval as usize);
588                let mut items = Vec::new();
589                // We may not have enough space to fit in all beacon for every stream.
590                // In which case, we'll round-robin among them.
591                for ((key, sid), beacon) in self
592                    .beacon
593                    .iter()
594                    .skip(self.beacon_count as usize % self.beacon.len())
595                    .chain(self.beacon.iter())
596                    .take(std::cmp::min(self.beacon.len(), num_markers))
597                {
598                    items.push(Marker {
599                        header: MessageHeader::new(key.to_owned(), *sid, beacon.seq_no, beacon.ts),
600                        running_checksum: beacon.running_checksum.crc(),
601                    });
602                }
603                let beacon_count = items.len();
604                let beacon = Beacon {
605                    remaining_messages_bytes: buffer.size() as u32,
606                    items,
607                };
608                self.offset += beacon.write_to(self.sink())? as u64;
609                self.beacon_count += beacon_count as u32;
610            }
611        }
612
613        self.message_count += 1;
614
615        Ok(checksum)
616    }
617
618    #[inline]
619    pub fn offset(&self) -> u64 {
620        self.offset
621    }
622
623    /// Where this sink was started
624    #[inline]
625    pub fn started_from(&self) -> u64 {
626        self.started_from
627    }
628
629    pub async fn flush(&mut self) -> Result<(), FileErr> {
630        let c = self.message_count;
631        self.sink().flush(c).await
632    }
633
634    /// End this stream gracefully, with an optional EOS message
635    pub async fn end(mut self, eos: bool) -> Result<(), FileErr> {
636        if eos {
637            self.write(end_of_stream())?;
638        }
639        self.flush().await?;
640        self.sink().sync_all().await
641    }
642
643    fn sink(&mut self) -> &mut FileSink {
644        match &mut self.sink {
645            FileSinkState::Alive(sink) => sink,
646            FileSinkState::Dead => panic!("FileSinkState::Dead"),
647        }
648    }
649
650    /// Take ownership of the file sink
651    pub(crate) async fn take_file(&mut self) -> Result<AsyncFile, FileErr> {
652        let sink = std::mem::take(&mut self.sink);
653        match sink {
654            FileSinkState::Alive(sink) => sink.end().await,
655            FileSinkState::Dead => panic!("FileSinkState::Dead"),
656        }
657    }
658
659    /// Return the file sink
660    pub(crate) fn use_file(&mut self, sink: FileSink) {
661        self.sink = FileSinkState::Alive(sink);
662    }
663
664    pub(crate) fn update_stream_state(&mut self, key: (StreamKey, ShardId), state: BeaconState) {
665        self.beacon.insert(key, state);
666    }
667}
668
669pub trait HasMessageHeader: MessageTrait {
670    fn header(&self) -> &MessageHeader;
671}
672impl HasMessageHeader for SharedMessage {
673    fn header(&self) -> &MessageHeader {
674        self.header()
675    }
676}
677impl HasMessageHeader for OwnedMessage {
678    fn header(&self) -> &MessageHeader {
679        self.header()
680    }
681}
682
683/// This can be written to a file to properly end the stream
684pub fn end_of_stream() -> OwnedMessage {
685    let header = MessageHeader::new(
686        StreamKey::new(SEA_STREAMER_INTERNAL).unwrap(),
687        ShardId::new(0),
688        0,
689        Timestamp::now_utc(),
690    );
691    OwnedMessage::new(header, END_OF_STREAM.into_bytes())
692}
693
694pub fn is_end_of_stream<M: HasMessageHeader>(mess: &M) -> bool {
695    mess.header().stream_key().name() == SEA_STREAMER_INTERNAL
696        && mess.message().as_bytes() == END_OF_STREAM.as_bytes()
697}
698
699/// This should never be written on file
700pub(crate) fn pulse_message() -> OwnedMessage {
701    let header = MessageHeader::new(
702        StreamKey::new(SEA_STREAMER_INTERNAL).unwrap(),
703        ShardId::new(0),
704        0,
705        Timestamp::now_utc(),
706    );
707    OwnedMessage::new(header, PULSE_MESSAGE.into_bytes())
708}
709
710pub(crate) fn is_pulse(mess: &SharedMessage) -> bool {
711    mess.header().stream_key().name() == SEA_STREAMER_INTERNAL
712        && mess.message().as_bytes() == PULSE_MESSAGE.as_bytes()
713}
714
715pub(crate) fn is_internal(mess: &SharedMessage) -> bool {
716    mess.header().stream_key().name() == SEA_STREAMER_INTERNAL
717}
718
719pub(crate) fn is_wildcard(key: &StreamKey) -> bool {
720    key.name() == SEA_STREAMER_WILDCARD
721}