medea_control_api_proto/grpc/
client.rs

1//! [`ControlApi`] client and [`CallbackApi`] server [gRPC] implementations.
2//!
3//! [gRPC]: https://grpc.io
4
5use async_trait::async_trait;
6use derive_more::with_trait::{Display, Error, From};
7use tonic::codegen::{Body, Bytes};
8
9use crate::{
10    CallbackApi, ControlApi, Elements, Fid, Ping, Pong,
11    callback::Request as CallbackRequest,
12    control::{ParseFidError, Request as ControlRequest},
13    grpc::{
14        ControlApiClient, ProtobufError,
15        api::{self as control_proto},
16        callback::{
17            self as callback_proto,
18            callback_server::Callback as GrpcCallbackService,
19        },
20    },
21    member,
22    member::ParseSidError,
23};
24
25/// [`Box`]ed [`Error`] with [`Send`] and [`Sync`].
26type StdError = Box<dyn Error + Send + Sync + 'static>;
27
28#[async_trait]
29impl<T: ?Sized> GrpcCallbackService for T
30where
31    T: CallbackApi + Send + Sync + 'static,
32    T::Error: From<ProtobufError>,
33    tonic::Status: From<T::Error>,
34{
35    async fn on_event(
36        &self,
37        request: tonic::Request<callback_proto::Request>,
38    ) -> Result<tonic::Response<callback_proto::Response>, tonic::Status> {
39        let req = CallbackRequest::try_from(request.into_inner())
40            .map_err(T::Error::from)?;
41        self.on_event(req)
42            .await
43            .map(|()| tonic::Response::new(callback_proto::Response {}))
44            .map_err(Into::into)
45    }
46}
47
48#[async_trait]
49impl<T> ControlApi for ControlApiClient<T>
50where
51    T: tonic::client::GrpcService<
52            tonic::body::Body,
53            Future: Send,
54            ResponseBody: Body<Data = Bytes, Error: Into<StdError> + Send>
55                              + Send
56                              + 'static,
57        > + Clone
58        + Send
59        + Sync,
60{
61    type Error = ControlApiClientError;
62
63    async fn create(
64        &self,
65        request: ControlRequest,
66    ) -> Result<member::Sids, Self::Error> {
67        // It's OK to `.clone()` `tonic::client`:
68        // https://docs.rs/tonic/latest/tonic/client/index.html#concurrent-usage
69        let mut this = self.clone();
70
71        let resp = Self::create(
72            &mut this,
73            control_proto::CreateRequest::from(request),
74        )
75        .await?
76        .into_inner();
77        if let Some(e) = resp.error {
78            return Err(e.into());
79        }
80
81        resp.sid
82            .into_iter()
83            .map(|(id, sid)| {
84                Ok((member::Id::from(id), sid.parse::<member::Sid>()?))
85            })
86            .collect()
87    }
88
89    async fn apply(
90        &self,
91        request: ControlRequest,
92    ) -> Result<member::Sids, Self::Error> {
93        // It's OK to `.clone()` `tonic::client`:
94        // https://docs.rs/tonic/latest/tonic/client/index.html#concurrent-usage
95        let mut this = self.clone();
96
97        let resp =
98            Self::apply(&mut this, control_proto::ApplyRequest::from(request))
99                .await?
100                .into_inner();
101        if let Some(e) = resp.error {
102            return Err(e.into());
103        }
104
105        resp.sid
106            .into_iter()
107            .map(|(id, sid)| {
108                Ok((member::Id::from(id), sid.parse::<member::Sid>()?))
109            })
110            .collect()
111    }
112
113    async fn delete(&self, fids: &[Fid]) -> Result<(), Self::Error> {
114        // It's OK to `.clone()` `tonic::client`:
115        // https://docs.rs/tonic/latest/tonic/client/index.html#concurrent-usage
116        let mut this = self.clone();
117
118        let resp = Self::delete(
119            &mut this,
120            control_proto::IdRequest {
121                fid: fids.iter().map(ToString::to_string).collect(),
122            },
123        )
124        .await?
125        .into_inner();
126        if let Some(e) = resp.error {
127            return Err(e.into());
128        }
129
130        Ok(())
131    }
132
133    async fn get(&self, fids: &[Fid]) -> Result<Elements, Self::Error> {
134        // It's OK to `.clone()` `tonic::client`:
135        // https://docs.rs/tonic/latest/tonic/client/index.html#concurrent-usage
136        let mut this = self.clone();
137
138        let resp = Self::get(
139            &mut this,
140            control_proto::IdRequest {
141                fid: fids.iter().map(ToString::to_string).collect(),
142            },
143        )
144        .await?
145        .into_inner();
146        if let Some(e) = resp.error {
147            return Err(e.into());
148        }
149
150        resp.elements
151            .into_iter()
152            .map(|(fid, el)| Ok((fid.parse()?, el.try_into()?)))
153            .collect()
154    }
155
156    async fn healthz(&self, ping: Ping) -> Result<Pong, Self::Error> {
157        // It's OK to `.clone()` `tonic::client`:
158        // https://docs.rs/tonic/latest/tonic/client/index.html#concurrent-usage
159        let mut this = self.clone();
160
161        Ok(Self::healthz(&mut this, control_proto::Ping::from(ping))
162            .await?
163            .into_inner()
164            .into())
165    }
166}
167
168/// Possible errors of [`ControlApiClient`].
169#[derive(Debug, Display, From, Error)]
170pub enum ControlApiClientError {
171    /// Failed to parse [`member::Sid`].
172    #[display("Invalid SID: {_0}")]
173    InvalidSid(ParseSidError),
174
175    /// Failed to parse [`Fid`].
176    #[display("Invalid FID: {_0}")]
177    InvalidFid(ParseFidError),
178
179    /// [gRPC] server errored.
180    ///
181    /// [gRPC]: https://grpc.io
182    #[display("gRPC server errored: {_0}")]
183    Tonic(tonic::Status),
184
185    /// Failed to convert from [gRPC] response.
186    ///
187    /// [gRPC]: https://grpc.io
188    #[display("Failed to convert from gRPC response: {_0}")]
189    InvalidProtobuf(ProtobufError),
190
191    /// [`ControlApi`] server implementation errored.
192    #[display("Control API server errored: {_0:?}")]
193    ControlError(#[error(not(source))] control_proto::Error),
194}