engula_journal/mem/
stream.rs1use std::{collections::VecDeque, sync::Arc};
16
17use futures::{future, stream};
18use tokio::sync::Mutex;
19
20use crate::{async_trait, Error, Event, Result, ResultStream, Timestamp};
21
22#[derive(Clone)]
23pub struct Stream {
24 events: Arc<Mutex<VecDeque<Event>>>,
25}
26
27impl Default for Stream {
28 fn default() -> Self {
29 Self {
30 events: Arc::new(Mutex::new(VecDeque::new())),
31 }
32 }
33}
34
35#[async_trait]
36impl crate::Stream for Stream {
37 async fn read_events(&self, ts: Timestamp) -> ResultStream<Vec<Event>> {
38 let events = self.events.lock().await;
39 let offset = events.partition_point(|x| x.ts < ts);
40 Box::pin(stream::once(future::ok(
41 events.range(offset..).cloned().collect(),
42 )))
43 }
44
45 async fn append_event(&self, event: Event) -> Result<()> {
46 let mut events = self.events.lock().await;
47 if let Some(last_ts) = events.back().map(|x| x.ts) {
48 if event.ts <= last_ts {
49 return Err(Error::InvalidArgument(format!(
50 "timestamp {:?} <= last timestamp {:?}",
51 event.ts, last_ts
52 )));
53 }
54 }
55 events.push_back(event);
56 Ok(())
57 }
58
59 async fn release_events(&self, ts: Timestamp) -> Result<()> {
60 let mut events = self.events.lock().await;
61 let index = events.partition_point(|x| x.ts < ts);
62 events.drain(..index);
63 Ok(())
64 }
65}