engula_journal/grpc/
stream.rs1use futures::StreamExt;
16use tonic::Streaming;
17
18use super::{client::Client, proto::*};
19use crate::{async_trait, Error, Event, Result, ResultStream, Timestamp};
20
21#[derive(Clone)]
22pub struct Stream {
23 client: Client,
24 stream: String,
25}
26
27impl Stream {
28 pub fn new(client: Client, stream: String) -> Stream {
29 Stream { client, stream }
30 }
31
32 async fn read_events_internal(&self, ts: Timestamp) -> Result<Streaming<ReadEventsResponse>> {
33 let input = ReadEventsRequest {
34 stream: self.stream.clone(),
35 ts: ts.serialize(),
36 };
37 self.client.read_events(input).await
38 }
39}
40
41#[async_trait]
42impl crate::Stream for Stream {
43 async fn read_events(&self, ts: Timestamp) -> ResultStream<Vec<Event>> {
44 let output = self.read_events_internal(ts).await;
45 match output {
46 Ok(output) => Box::pin(output.map(|result| match result {
47 Ok(resp) => {
48 let events: Result<Vec<Event>> = resp
49 .events
50 .into_iter()
51 .map(|e| {
52 Ok(Event {
53 ts: Timestamp::deserialize(e.ts)?,
54 data: e.data,
55 })
56 })
57 .collect();
58 Ok(events?)
59 }
60 Err(status) => Err(Error::from(status)),
61 })),
62 Err(e) => Box::pin(futures::stream::once(futures::future::err(e))),
63 }
64 }
65
66 async fn append_event(&self, event: Event) -> Result<()> {
67 let input = AppendEventRequest {
68 stream: self.stream.clone(),
69 ts: event.ts.serialize(),
70 data: event.data,
71 };
72 self.client.append_event(input).await?;
73 Ok(())
74 }
75
76 async fn release_events(&self, ts: Timestamp) -> Result<()> {
77 let input = ReleaseEventsRequest {
78 stream: self.stream.clone(),
79 ts: ts.serialize(),
80 };
81 self.client.release_events(input).await?;
82 Ok(())
83 }
84}