engula_journal/grpc/
stream.rs

1// Copyright 2021 The Engula Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use futures::StreamExt;
16use tonic::Streaming;
17
18use super::{client::Client, proto::*};
19use crate::{async_trait, Error, Event, Result, ResultStream, Timestamp};
20
21#[derive(Clone)]
22pub struct Stream {
23    client: Client,
24    stream: String,
25}
26
27impl Stream {
28    pub fn new(client: Client, stream: String) -> Stream {
29        Stream { client, stream }
30    }
31
32    async fn read_events_internal(&self, ts: Timestamp) -> Result<Streaming<ReadEventsResponse>> {
33        let input = ReadEventsRequest {
34            stream: self.stream.clone(),
35            ts: ts.serialize(),
36        };
37        self.client.read_events(input).await
38    }
39}
40
41#[async_trait]
42impl crate::Stream for Stream {
43    async fn read_events(&self, ts: Timestamp) -> ResultStream<Vec<Event>> {
44        let output = self.read_events_internal(ts).await;
45        match output {
46            Ok(output) => Box::pin(output.map(|result| match result {
47                Ok(resp) => {
48                    let events: Result<Vec<Event>> = resp
49                        .events
50                        .into_iter()
51                        .map(|e| {
52                            Ok(Event {
53                                ts: Timestamp::deserialize(e.ts)?,
54                                data: e.data,
55                            })
56                        })
57                        .collect();
58                    Ok(events?)
59                }
60                Err(status) => Err(Error::from(status)),
61            })),
62            Err(e) => Box::pin(futures::stream::once(futures::future::err(e))),
63        }
64    }
65
66    async fn append_event(&self, event: Event) -> Result<()> {
67        let input = AppendEventRequest {
68            stream: self.stream.clone(),
69            ts: event.ts.serialize(),
70            data: event.data,
71        };
72        self.client.append_event(input).await?;
73        Ok(())
74    }
75
76    async fn release_events(&self, ts: Timestamp) -> Result<()> {
77        let input = ReleaseEventsRequest {
78            stream: self.stream.clone(),
79            ts: ts.serialize(),
80        };
81        self.client.release_events(input).await?;
82        Ok(())
83    }
84}