engula_journal/file/
stream.rs1use std::{path::PathBuf, sync::Arc};
16
17use futures::{future, stream, StreamExt};
18use tokio::{fs, sync::Mutex};
19
20use super::{segment::Segment, segment_reader::SegmentReader};
21use crate::{async_trait, Error, Event, Result, ResultStream, Timestamp};
22
23#[derive(Clone)]
24pub struct Stream {
25 inner: Arc<Mutex<Inner>>,
26}
27
28#[derive(Default)]
29pub struct Inner {
30 path: PathBuf,
31 segment_size: usize,
32 active_segment: Option<Segment>,
33 sealed_segments: Vec<SegmentReader>,
34}
35
36const ACTIVE_SEGMENT: &str = "CURRENT";
46
47impl Inner {
48 fn active_segment_path(&self) -> PathBuf {
49 self.path.join(ACTIVE_SEGMENT)
50 }
51
52 fn sealed_segment_path(&self, ts: Timestamp) -> PathBuf {
53 self.path.join(format!("{:?}", ts))
54 }
55}
56
57impl Stream {
58 pub async fn open(path: PathBuf, segment_size: usize) -> Result<Stream> {
59 let mut inner = Inner {
60 path,
61 segment_size,
62 active_segment: None,
63 sealed_segments: Vec::new(),
64 };
65
66 let mut entries = fs::read_dir(&inner.path).await?;
68 while let Some(ent) = entries.next_entry().await? {
69 if ent.file_name() != ACTIVE_SEGMENT {
70 let segment = SegmentReader::open(ent.path()).await?;
71 inner.sealed_segments.push(segment);
72 }
73 }
74 inner.sealed_segments.sort_by_key(|x| x.max_timestamp());
75
76 let last_timestamp = inner.sealed_segments.last().map(|x| x.max_timestamp());
77 let active = Segment::open(inner.active_segment_path(), last_timestamp).await?;
78 inner.active_segment = Some(active);
79
80 Ok(Stream {
81 inner: Arc::new(Mutex::new(inner)),
82 })
83 }
84
85 async fn read_segments(&self, ts: Timestamp) -> Result<Vec<ResultStream<Vec<Event>>>> {
86 let inner = self.inner.lock().await;
87 let index = inner
88 .sealed_segments
89 .partition_point(|x| x.max_timestamp() < ts);
90 let mut streams = Vec::new();
91 for segment in &inner.sealed_segments[index..] {
92 streams.push(segment.read_events(ts).await?);
93 }
94 if let Some(segment) = &inner.active_segment {
95 streams.push(segment.read_events(ts).await?);
96 }
97 Ok(streams)
98 }
99}
100
101#[async_trait]
102impl crate::Stream for Stream {
103 async fn read_events(&self, ts: Timestamp) -> ResultStream<Vec<Event>> {
104 match self.read_segments(ts).await {
105 Ok(streams) => Box::pin(stream::iter(streams).flatten()),
106 Err(err) => Box::pin(stream::once(future::err(err))),
107 }
108 }
109
110 async fn append_event(&self, event: Event) -> Result<()> {
111 let mut inner = self.inner.lock().await;
112 let size = if let Some(active) = inner.active_segment.as_mut() {
113 active.append_event(event).await?
114 } else {
115 return Err(Error::Unknown(
116 "active segment is closed due to previous errors".to_owned(),
117 ));
118 };
119
120 if size >= inner.segment_size {
121 let active = inner.active_segment.take().unwrap();
123 let last_timestamp = active.seal().await?;
124
125 let active_segment_path = inner.active_segment_path();
127 let sealed_segment_path = inner.sealed_segment_path(last_timestamp);
128 fs::rename(&active_segment_path, &sealed_segment_path).await?;
129 let sealed = SegmentReader::open(sealed_segment_path).await?;
130 inner.sealed_segments.push(sealed);
131
132 let active = Segment::open(&active_segment_path, Some(last_timestamp)).await?;
134 inner.active_segment = Some(active);
135 }
136 Ok(())
137 }
138
139 async fn release_events(&self, ts: Timestamp) -> Result<()> {
140 let mut inner = self.inner.lock().await;
141 let index = inner
142 .sealed_segments
143 .partition_point(|x| x.max_timestamp() < ts);
144 for segment in inner.sealed_segments.drain(..index) {
145 fs::remove_file(segment.path()).await?;
146 }
147 Ok(())
148 }
149}