1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
use std::{collections::VecDeque, sync::Arc};
use futures::{future, stream};
use tokio::sync::Mutex;
use crate::{async_trait, Error, Event, Result, ResultStream, Timestamp};
#[derive(Clone)]
pub struct Stream {
events: Arc<Mutex<VecDeque<Event>>>,
}
impl Default for Stream {
fn default() -> Self {
Self {
events: Arc::new(Mutex::new(VecDeque::new())),
}
}
}
#[async_trait]
impl crate::Stream for Stream {
async fn read_events(&self, ts: Timestamp) -> ResultStream<Vec<Event>> {
let events = self.events.lock().await;
let offset = events.partition_point(|x| x.ts < ts);
Box::pin(stream::once(future::ok(
events.range(offset..).cloned().collect(),
)))
}
async fn append_event(&self, event: Event) -> Result<()> {
let mut events = self.events.lock().await;
if let Some(last_ts) = events.back().map(|x| x.ts) {
if event.ts <= last_ts {
return Err(Error::InvalidArgument(format!(
"timestamp {:?} <= last timestamp {:?}",
event.ts, last_ts
)));
}
}
events.push_back(event);
Ok(())
}
async fn release_events(&self, ts: Timestamp) -> Result<()> {
let mut events = self.events.lock().await;
let index = events.partition_point(|x| x.ts < ts);
events.drain(..index);
Ok(())
}
}