engula_journal/file/
segment_stream.rs1use std::path::Path;
16
17use futures::stream;
18use tokio::fs::File;
19
20use super::codec;
21use crate::{Event, Result, ResultStream, Timestamp};
22
23pub struct SegmentStream {
24 file: File,
25 offset: usize,
26 max_offset: usize,
27 start_event: Option<Event>,
28}
29
30impl SegmentStream {
31 pub async fn open(
32 path: impl AsRef<Path>,
33 limit: usize,
34 start_ts: Option<Timestamp>,
35 ) -> Result<ResultStream<Vec<Event>>> {
36 let file = File::open(path).await?;
37 let mut stream = Self {
38 file,
39 offset: 0,
40 max_offset: limit,
41 start_event: None,
42 };
43
44 if let Some(ts) = start_ts {
46 while let Some(event) = stream.read_event().await? {
47 if event.ts >= ts {
48 stream.start_event = Some(event);
49 break;
50 }
51 }
52 }
53
54 let stream = stream::unfold(stream, |mut stream| async move {
55 stream.next_events().await.map(|events| (events, stream))
56 });
57 Ok(Box::pin(stream))
58 }
59
60 async fn read_event(&mut self) -> Result<Option<Event>> {
61 if let Some((event, offset)) =
62 codec::read_event_at(&mut self.file, self.offset, self.max_offset).await?
63 {
64 self.offset = offset;
65 Ok(Some(event))
66 } else {
67 Ok(None)
68 }
69 }
70
71 async fn next_event(&mut self) -> Result<Option<Event>> {
72 if let Some(event) = self.start_event.take() {
73 Ok(Some(event))
74 } else {
75 self.read_event().await
76 }
77 }
78
79 async fn next_events(&mut self) -> Option<Result<Vec<Event>>> {
80 match self.next_event().await {
81 Ok(Some(event)) => Some(Ok(vec![event])),
82 Ok(None) => None,
83 Err(err) => Some(Err(err)),
84 }
85 }
86}