engula_journal/file/
segment_stream.rs

1// Copyright 2021 The Engula Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::path::Path;
16
17use futures::stream;
18use tokio::fs::File;
19
20use super::codec;
21use crate::{Event, Result, ResultStream, Timestamp};
22
23pub struct SegmentStream {
24    file: File,
25    offset: usize,
26    max_offset: usize,
27    start_event: Option<Event>,
28}
29
30impl SegmentStream {
31    pub async fn open(
32        path: impl AsRef<Path>,
33        limit: usize,
34        start_ts: Option<Timestamp>,
35    ) -> Result<ResultStream<Vec<Event>>> {
36        let file = File::open(path).await?;
37        let mut stream = Self {
38            file,
39            offset: 0,
40            max_offset: limit,
41            start_event: None,
42        };
43
44        // Seeks to the start event.
45        if let Some(ts) = start_ts {
46            while let Some(event) = stream.read_event().await? {
47                if event.ts >= ts {
48                    stream.start_event = Some(event);
49                    break;
50                }
51            }
52        }
53
54        let stream = stream::unfold(stream, |mut stream| async move {
55            stream.next_events().await.map(|events| (events, stream))
56        });
57        Ok(Box::pin(stream))
58    }
59
60    async fn read_event(&mut self) -> Result<Option<Event>> {
61        if let Some((event, offset)) =
62            codec::read_event_at(&mut self.file, self.offset, self.max_offset).await?
63        {
64            self.offset = offset;
65            Ok(Some(event))
66        } else {
67            Ok(None)
68        }
69    }
70
71    async fn next_event(&mut self) -> Result<Option<Event>> {
72        if let Some(event) = self.start_event.take() {
73            Ok(Some(event))
74        } else {
75            self.read_event().await
76        }
77    }
78
79    async fn next_events(&mut self) -> Option<Result<Vec<Event>>> {
80        match self.next_event().await {
81            Ok(Some(event)) => Some(Ok(vec![event])),
82            Ok(None) => None,
83            Err(err) => Some(Err(err)),
84        }
85    }
86}