engula_journal/file/
stream.rs

1// Copyright 2021 The Engula Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{path::PathBuf, sync::Arc};
16
17use futures::{future, stream, StreamExt};
18use tokio::{fs, sync::Mutex};
19
20use super::{segment::Segment, segment_reader::SegmentReader};
21use crate::{async_trait, Error, Event, Result, ResultStream, Timestamp};
22
23#[derive(Clone)]
24pub struct Stream {
25    inner: Arc<Mutex<Inner>>,
26}
27
28#[derive(Default)]
29pub struct Inner {
30    path: PathBuf,
31    segment_size: usize,
32    active_segment: Option<Segment>,
33    sealed_segments: Vec<SegmentReader>,
34}
35
36// Journal file layout:
37//
38// - journal
39//   - stream_1
40//     - active_segment (CURRENT)
41//     - sealed_segment_1
42//     - sealed_segment_2
43//     - ...
44//   - stream_2
45const ACTIVE_SEGMENT: &str = "CURRENT";
46
47impl Inner {
48    fn active_segment_path(&self) -> PathBuf {
49        self.path.join(ACTIVE_SEGMENT)
50    }
51
52    fn sealed_segment_path(&self, ts: Timestamp) -> PathBuf {
53        self.path.join(format!("{:?}", ts))
54    }
55}
56
57impl Stream {
58    pub async fn open(path: PathBuf, segment_size: usize) -> Result<Stream> {
59        let mut inner = Inner {
60            path,
61            segment_size,
62            active_segment: None,
63            sealed_segments: Vec::new(),
64        };
65
66        // Opens all sealed segments and sorts them by timestamp.
67        let mut entries = fs::read_dir(&inner.path).await?;
68        while let Some(ent) = entries.next_entry().await? {
69            if ent.file_name() != ACTIVE_SEGMENT {
70                let segment = SegmentReader::open(ent.path()).await?;
71                inner.sealed_segments.push(segment);
72            }
73        }
74        inner.sealed_segments.sort_by_key(|x| x.max_timestamp());
75
76        let last_timestamp = inner.sealed_segments.last().map(|x| x.max_timestamp());
77        let active = Segment::open(inner.active_segment_path(), last_timestamp).await?;
78        inner.active_segment = Some(active);
79
80        Ok(Stream {
81            inner: Arc::new(Mutex::new(inner)),
82        })
83    }
84
85    async fn read_segments(&self, ts: Timestamp) -> Result<Vec<ResultStream<Vec<Event>>>> {
86        let inner = self.inner.lock().await;
87        let index = inner
88            .sealed_segments
89            .partition_point(|x| x.max_timestamp() < ts);
90        let mut streams = Vec::new();
91        for segment in &inner.sealed_segments[index..] {
92            streams.push(segment.read_events(ts).await?);
93        }
94        if let Some(segment) = &inner.active_segment {
95            streams.push(segment.read_events(ts).await?);
96        }
97        Ok(streams)
98    }
99}
100
101#[async_trait]
102impl crate::Stream for Stream {
103    async fn read_events(&self, ts: Timestamp) -> ResultStream<Vec<Event>> {
104        match self.read_segments(ts).await {
105            Ok(streams) => Box::pin(stream::iter(streams).flatten()),
106            Err(err) => Box::pin(stream::once(future::err(err))),
107        }
108    }
109
110    async fn append_event(&self, event: Event) -> Result<()> {
111        let mut inner = self.inner.lock().await;
112        let size = if let Some(active) = inner.active_segment.as_mut() {
113            active.append_event(event).await?
114        } else {
115            return Err(Error::Unknown(
116                "active segment is closed due to previous errors".to_owned(),
117            ));
118        };
119
120        if size >= inner.segment_size {
121            // Seals the active segment.
122            let active = inner.active_segment.take().unwrap();
123            let last_timestamp = active.seal().await?;
124
125            // Renames the active segment to a sealed segment.
126            let active_segment_path = inner.active_segment_path();
127            let sealed_segment_path = inner.sealed_segment_path(last_timestamp);
128            fs::rename(&active_segment_path, &sealed_segment_path).await?;
129            let sealed = SegmentReader::open(sealed_segment_path).await?;
130            inner.sealed_segments.push(sealed);
131
132            // Opens a new active segment.
133            let active = Segment::open(&active_segment_path, Some(last_timestamp)).await?;
134            inner.active_segment = Some(active);
135        }
136        Ok(())
137    }
138
139    async fn release_events(&self, ts: Timestamp) -> Result<()> {
140        let mut inner = self.inner.lock().await;
141        let index = inner
142            .sealed_segments
143            .partition_point(|x| x.max_timestamp() < ts);
144        for segment in inner.sealed_segments.drain(..index) {
145            fs::remove_file(segment.path()).await?;
146        }
147        Ok(())
148    }
149}