engula_journal/mem/
stream.rs

1// Copyright 2021 The Engula Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{collections::VecDeque, sync::Arc};
16
17use futures::{future, stream};
18use tokio::sync::Mutex;
19
20use crate::{async_trait, Error, Event, Result, ResultStream, Timestamp};
21
22#[derive(Clone)]
23pub struct Stream {
24    events: Arc<Mutex<VecDeque<Event>>>,
25}
26
27impl Default for Stream {
28    fn default() -> Self {
29        Self {
30            events: Arc::new(Mutex::new(VecDeque::new())),
31        }
32    }
33}
34
35#[async_trait]
36impl crate::Stream for Stream {
37    async fn read_events(&self, ts: Timestamp) -> ResultStream<Vec<Event>> {
38        let events = self.events.lock().await;
39        let offset = events.partition_point(|x| x.ts < ts);
40        Box::pin(stream::once(future::ok(
41            events.range(offset..).cloned().collect(),
42        )))
43    }
44
45    async fn append_event(&self, event: Event) -> Result<()> {
46        let mut events = self.events.lock().await;
47        if let Some(last_ts) = events.back().map(|x| x.ts) {
48            if event.ts <= last_ts {
49                return Err(Error::InvalidArgument(format!(
50                    "timestamp {:?} <= last timestamp {:?}",
51                    event.ts, last_ts
52                )));
53            }
54        }
55        events.push_back(event);
56        Ok(())
57    }
58
59    async fn release_events(&self, ts: Timestamp) -> Result<()> {
60        let mut events = self.events.lock().await;
61        let index = events.partition_point(|x| x.ts < ts);
62        events.drain(..index);
63        Ok(())
64    }
65}