medea_control_api_proto/grpc/
client.rs

1//! [`ControlApi`] client and [`CallbackApi`] server [gRPC] implementations.
2//!
3//! [gRPC]: https://grpc.io
4
5use async_trait::async_trait;
6use derive_more::with_trait::{Display, Error, From};
7use tonic::codegen::{Body, Bytes};
8
9use crate::{
10    CallbackApi, ControlApi, Elements, Fid, Ping, Pong,
11    callback::Request as CallbackRequest,
12    control::{ParseFidError, Request as ControlRequest},
13    grpc::{
14        ControlApiClient, ProtobufError,
15        api::{self as control_proto},
16        callback::{
17            self as callback_proto,
18            callback_server::Callback as GrpcCallbackService,
19        },
20    },
21    member,
22    member::ParseSidError,
23};
24
25/// [`Box`]ed [`Error`] with [`Send`] and [`Sync`].
26type StdError = Box<dyn Error + Send + Sync + 'static>;
27
28#[async_trait]
29impl<T: ?Sized> GrpcCallbackService for T
30where
31    T: CallbackApi + Send + Sync + 'static,
32    T::Error: From<ProtobufError>,
33    tonic::Status: From<T::Error>,
34{
35    async fn on_event(
36        &self,
37        request: tonic::Request<callback_proto::Request>,
38    ) -> Result<tonic::Response<callback_proto::Response>, tonic::Status> {
39        let req = CallbackRequest::try_from(request.into_inner())
40            .map_err(T::Error::from)?;
41        self.on_event(req)
42            .await
43            .map(|()| tonic::Response::new(callback_proto::Response {}))
44            .map_err(Into::into)
45    }
46}
47
48#[async_trait]
49impl<T> ControlApi for ControlApiClient<T>
50where
51    T: tonic::client::GrpcService<tonic::body::BoxBody> + Clone + Send + Sync,
52    T::Future: Send,
53    T::ResponseBody: Body<Data = Bytes> + Send + 'static,
54    <T::ResponseBody as Body>::Error: Send,
55    StdError: From<<T::ResponseBody as Body>::Error>,
56{
57    type Error = ControlApiClientError;
58
59    async fn create(
60        &self,
61        request: ControlRequest,
62    ) -> Result<member::Sids, Self::Error> {
63        // It's OK to `.clone()` `tonic::client`:
64        // https://docs.rs/tonic/latest/tonic/client/index.html#concurrent-usage
65        let mut this = self.clone();
66
67        let resp = Self::create(
68            &mut this,
69            control_proto::CreateRequest::from(request),
70        )
71        .await?
72        .into_inner();
73        if let Some(e) = resp.error {
74            return Err(e.into());
75        }
76
77        resp.sid
78            .into_iter()
79            .map(|(id, sid)| {
80                Ok((member::Id::from(id), sid.parse::<member::Sid>()?))
81            })
82            .collect()
83    }
84
85    async fn apply(
86        &self,
87        request: ControlRequest,
88    ) -> Result<member::Sids, Self::Error> {
89        // It's OK to `.clone()` `tonic::client`:
90        // https://docs.rs/tonic/latest/tonic/client/index.html#concurrent-usage
91        let mut this = self.clone();
92
93        let resp =
94            Self::apply(&mut this, control_proto::ApplyRequest::from(request))
95                .await?
96                .into_inner();
97        if let Some(e) = resp.error {
98            return Err(e.into());
99        }
100
101        resp.sid
102            .into_iter()
103            .map(|(id, sid)| {
104                Ok((member::Id::from(id), sid.parse::<member::Sid>()?))
105            })
106            .collect()
107    }
108
109    async fn delete(&self, fids: &[Fid]) -> Result<(), Self::Error> {
110        // It's OK to `.clone()` `tonic::client`:
111        // https://docs.rs/tonic/latest/tonic/client/index.html#concurrent-usage
112        let mut this = self.clone();
113
114        let resp = Self::delete(
115            &mut this,
116            control_proto::IdRequest {
117                fid: fids.iter().map(ToString::to_string).collect(),
118            },
119        )
120        .await?
121        .into_inner();
122        if let Some(e) = resp.error {
123            return Err(e.into());
124        }
125
126        Ok(())
127    }
128
129    async fn get(&self, fids: &[Fid]) -> Result<Elements, Self::Error> {
130        // It's OK to `.clone()` `tonic::client`:
131        // https://docs.rs/tonic/latest/tonic/client/index.html#concurrent-usage
132        let mut this = self.clone();
133
134        let resp = Self::get(
135            &mut this,
136            control_proto::IdRequest {
137                fid: fids.iter().map(ToString::to_string).collect(),
138            },
139        )
140        .await?
141        .into_inner();
142        if let Some(e) = resp.error {
143            return Err(e.into());
144        }
145
146        resp.elements
147            .into_iter()
148            .map(|(fid, el)| Ok((fid.parse()?, el.try_into()?)))
149            .collect()
150    }
151
152    async fn healthz(&self, ping: Ping) -> Result<Pong, Self::Error> {
153        // It's OK to `.clone()` `tonic::client`:
154        // https://docs.rs/tonic/latest/tonic/client/index.html#concurrent-usage
155        let mut this = self.clone();
156
157        Ok(Self::healthz(&mut this, control_proto::Ping::from(ping))
158            .await?
159            .into_inner()
160            .into())
161    }
162}
163
164/// Possible errors of [`ControlApiClient`].
165#[derive(Debug, Display, From, Error)]
166pub enum ControlApiClientError {
167    /// Failed to parse [`member::Sid`].
168    #[display("Invalid SID: {_0}")]
169    InvalidSid(ParseSidError),
170
171    /// Failed to parse [`Fid`].
172    #[display("Invalid FID: {_0}")]
173    InvalidFid(ParseFidError),
174
175    /// [gRPC] server errored.
176    ///
177    /// [gRPC]: https://grpc.io
178    #[display("gRPC server errored: {_0}")]
179    Tonic(tonic::Status),
180
181    /// Failed to convert from [gRPC] response.
182    ///
183    /// [gRPC]: https://grpc.io
184    #[display("Failed to convert from gRPC response: {_0}")]
185    InvalidProtobuf(ProtobufError),
186
187    /// [`ControlApi`] server implementation errored.
188    #[display("Control API server errored: {_0:?}")]
189    ControlError(#[error(not(source))] control_proto::Error),
190}