engula_journal/file/
segment.rs

1// Copyright 2021 The Engula Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::path::PathBuf;
16
17use futures::TryStreamExt;
18use tokio::{
19    fs::{File, OpenOptions},
20    io::AsyncWriteExt,
21};
22
23use super::{codec, segment_stream::SegmentStream};
24use crate::{Error, Event, Result, ResultStream, Timestamp};
25
26pub struct Segment {
27    path: PathBuf,
28    file: File,
29    offset: usize,
30    last_timestamp: Option<Timestamp>,
31}
32
33impl Segment {
34    pub async fn open(
35        path: impl Into<PathBuf>,
36        mut last_timestamp: Option<Timestamp>,
37    ) -> Result<Self> {
38        let path = path.into();
39        let file = OpenOptions::new()
40            .create(true)
41            .append(true)
42            .open(&path)
43            .await?;
44        let offset = file.metadata().await?.len() as usize;
45
46        // Recovers the last timestamp.
47        let mut stream = SegmentStream::open(&path, offset, None).await?;
48        while let Some(events) = stream.try_next().await? {
49            for event in events {
50                last_timestamp = Some(event.ts);
51            }
52        }
53
54        Ok(Self {
55            path,
56            file,
57            offset,
58            last_timestamp,
59        })
60    }
61
62    pub async fn seal(mut self) -> Result<Timestamp> {
63        let ts = self.last_timestamp.ok_or_else(|| {
64            Error::Unknown("should not seal a segment with no timestamp".to_owned())
65        })?;
66        // Records the last timestamp at the file footer.
67        codec::write_footer(&mut self.file, ts).await?;
68        self.file.sync_data().await?;
69        Ok(ts)
70    }
71
72    pub async fn read_events(&self, ts: Timestamp) -> Result<ResultStream<Vec<Event>>> {
73        SegmentStream::open(&self.path, self.offset, Some(ts)).await
74    }
75
76    pub async fn append_event(&mut self, event: Event) -> Result<usize> {
77        if let Some(last_ts) = self.last_timestamp {
78            if event.ts <= last_ts {
79                return Err(Error::InvalidArgument(format!(
80                    "event timestamp {:?} <= last event timestamp {:?}",
81                    event.ts, last_ts,
82                )));
83            }
84        }
85        let size = codec::write_event(&mut self.file, &event).await?;
86        self.file.flush().await?;
87        self.offset += size;
88        self.last_timestamp = Some(event.ts);
89        Ok(self.offset)
90    }
91}