1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
use futures::StreamExt;
use tonic::Streaming;
use super::{client::Client, proto::*};
use crate::{async_trait, Error, Event, Result, ResultStream, Timestamp};
#[derive(Clone)]
pub struct Stream {
client: Client,
stream: String,
}
impl Stream {
pub fn new(client: Client, stream: String) -> Stream {
Stream { client, stream }
}
async fn read_events_internal(&self, ts: Timestamp) -> Result<Streaming<ReadEventsResponse>> {
let input = ReadEventsRequest {
stream: self.stream.clone(),
ts: ts.serialize(),
};
self.client.read_events(input).await
}
}
#[async_trait]
impl crate::Stream for Stream {
async fn read_events(&self, ts: Timestamp) -> ResultStream<Vec<Event>> {
let output = self.read_events_internal(ts).await;
match output {
Ok(output) => Box::pin(output.map(|result| match result {
Ok(resp) => {
let events: Result<Vec<Event>> = resp
.events
.into_iter()
.map(|e| {
Ok(Event {
ts: Timestamp::deserialize(e.ts)?,
data: e.data,
})
})
.collect();
Ok(events?)
}
Err(status) => Err(Error::from(status)),
})),
Err(e) => Box::pin(futures::stream::once(futures::future::err(e))),
}
}
async fn append_event(&self, event: Event) -> Result<()> {
let input = AppendEventRequest {
stream: self.stream.clone(),
ts: event.ts.serialize(),
data: event.data,
};
self.client.append_event(input).await?;
Ok(())
}
async fn release_events(&self, ts: Timestamp) -> Result<()> {
let input = ReleaseEventsRequest {
stream: self.stream.clone(),
ts: ts.serialize(),
};
self.client.release_events(input).await?;
Ok(())
}
}