engula_journal/file/
segment.rs1use std::path::PathBuf;
16
17use futures::TryStreamExt;
18use tokio::{
19 fs::{File, OpenOptions},
20 io::AsyncWriteExt,
21};
22
23use super::{codec, segment_stream::SegmentStream};
24use crate::{Error, Event, Result, ResultStream, Timestamp};
25
26pub struct Segment {
27 path: PathBuf,
28 file: File,
29 offset: usize,
30 last_timestamp: Option<Timestamp>,
31}
32
33impl Segment {
34 pub async fn open(
35 path: impl Into<PathBuf>,
36 mut last_timestamp: Option<Timestamp>,
37 ) -> Result<Self> {
38 let path = path.into();
39 let file = OpenOptions::new()
40 .create(true)
41 .append(true)
42 .open(&path)
43 .await?;
44 let offset = file.metadata().await?.len() as usize;
45
46 let mut stream = SegmentStream::open(&path, offset, None).await?;
48 while let Some(events) = stream.try_next().await? {
49 for event in events {
50 last_timestamp = Some(event.ts);
51 }
52 }
53
54 Ok(Self {
55 path,
56 file,
57 offset,
58 last_timestamp,
59 })
60 }
61
62 pub async fn seal(mut self) -> Result<Timestamp> {
63 let ts = self.last_timestamp.ok_or_else(|| {
64 Error::Unknown("should not seal a segment with no timestamp".to_owned())
65 })?;
66 codec::write_footer(&mut self.file, ts).await?;
68 self.file.sync_data().await?;
69 Ok(ts)
70 }
71
72 pub async fn read_events(&self, ts: Timestamp) -> Result<ResultStream<Vec<Event>>> {
73 SegmentStream::open(&self.path, self.offset, Some(ts)).await
74 }
75
76 pub async fn append_event(&mut self, event: Event) -> Result<usize> {
77 if let Some(last_ts) = self.last_timestamp {
78 if event.ts <= last_ts {
79 return Err(Error::InvalidArgument(format!(
80 "event timestamp {:?} <= last event timestamp {:?}",
81 event.ts, last_ts,
82 )));
83 }
84 }
85 let size = codec::write_event(&mut self.file, &event).await?;
86 self.file.flush().await?;
87 self.offset += size;
88 self.last_timestamp = Some(event.ts);
89 Ok(self.offset)
90 }
91}