medea_control_api_proto/grpc/
server.rs

1//! [`ControlApi`] server and [`CallbackApi`] client [gRPC] implementations.
2//!
3//! [gRPC]: https://grpc.io
4
5use std::collections::HashMap;
6
7use async_trait::async_trait;
8use derive_more::with_trait::{Display, Error, From};
9use tonic::codegen::{Body, Bytes};
10
11use crate::{
12    CallbackApi, ControlApi,
13    callback::Request as CallbackRequest,
14    control::Request as ControlRequest,
15    grpc::{
16        CallbackApiClient, ProtobufError,
17        api::{
18            self as control_proto,
19            control_api_server::ControlApi as GrpcControlApiService,
20        },
21        callback as callback_proto,
22    },
23};
24
25/// [`Box`]ed [`Error`] with [`Send`] and [`Sync`].
26type StdError = Box<dyn Error + Send + Sync + 'static>;
27
28#[async_trait]
29impl<T: ?Sized> GrpcControlApiService for T
30where
31    T: ControlApi + Send + Sync + 'static,
32    T::Error: From<ProtobufError>,
33    control_proto::Error: From<T::Error>,
34{
35    async fn create(
36        &self,
37        request: tonic::Request<control_proto::CreateRequest>,
38    ) -> Result<tonic::Response<control_proto::CreateResponse>, tonic::Status>
39    {
40        let fut = async {
41            self.create(ControlRequest::try_from(request.into_inner())?).await
42        };
43
44        Ok(tonic::Response::new(match fut.await {
45            Ok(sids) => control_proto::CreateResponse {
46                sid: sids
47                    .into_iter()
48                    .map(|(id, sid)| (id.to_string(), sid.to_uri_string()))
49                    .collect(),
50                error: None,
51            },
52            Err(e) => control_proto::CreateResponse {
53                sid: HashMap::new(),
54                error: Some(e.into()),
55            },
56        }))
57    }
58
59    async fn delete(
60        &self,
61        request: tonic::Request<control_proto::IdRequest>,
62    ) -> Result<tonic::Response<control_proto::Response>, tonic::Status> {
63        let ids = request
64            .into_inner()
65            .fid
66            .into_iter()
67            .map(|fid| fid.parse().map_err(ProtobufError::from))
68            .collect::<Result<Vec<_>, _>>();
69
70        let result = match ids {
71            Ok(ids) => self.delete(&ids).await,
72            Err(e) => Err(e.into()),
73        };
74
75        Ok(tonic::Response::new(match result {
76            Ok(()) => control_proto::Response { error: None },
77            Err(e) => control_proto::Response { error: Some(e.into()) },
78        }))
79    }
80
81    async fn get(
82        &self,
83        request: tonic::Request<control_proto::IdRequest>,
84    ) -> Result<tonic::Response<control_proto::GetResponse>, tonic::Status>
85    {
86        let ids = request
87            .into_inner()
88            .fid
89            .into_iter()
90            .map(|fid| fid.parse().map_err(ProtobufError::from))
91            .collect::<Result<Vec<_>, _>>();
92
93        let result = match ids {
94            Ok(ids) => self.get(&ids).await,
95            Err(e) => Err(e.into()),
96        };
97
98        Ok(tonic::Response::new(match result {
99            Ok(elements) => control_proto::GetResponse {
100                elements: elements
101                    .into_iter()
102                    .map(|(id, el)| {
103                        let s = id.to_string();
104                        (id, el).try_into().map(|proto| (s, proto))
105                    })
106                    .collect::<Result<_, _>>()?,
107                error: None,
108            },
109            Err(e) => control_proto::GetResponse {
110                elements: HashMap::new(),
111                error: Some(e.into()),
112            },
113        }))
114    }
115
116    async fn apply(
117        &self,
118        request: tonic::Request<control_proto::ApplyRequest>,
119    ) -> Result<tonic::Response<control_proto::CreateResponse>, tonic::Status>
120    {
121        let result = async {
122            let req = ControlRequest::try_from(request.into_inner())?;
123            self.apply(req).await
124        };
125
126        Ok(tonic::Response::new(match result.await {
127            Ok(sids) => control_proto::CreateResponse {
128                sid: sids
129                    .into_iter()
130                    .map(|(id, sid)| (id.to_string(), sid.to_uri_string()))
131                    .collect(),
132                error: None,
133            },
134            Err(e) => control_proto::CreateResponse {
135                sid: HashMap::new(),
136                error: Some(e.into()),
137            },
138        }))
139    }
140
141    async fn healthz(
142        &self,
143        request: tonic::Request<control_proto::Ping>,
144    ) -> Result<tonic::Response<control_proto::Pong>, tonic::Status> {
145        self.healthz(request.into_inner().into())
146            .await
147            .map(|pong| tonic::Response::new(pong.into()))
148            .map_err(|e| {
149                let e = control_proto::Error::from(e);
150                let message = [&e.doc, &e.element, &e.text].into_iter().fold(
151                    e.code.to_string(),
152                    |mut acc, s| {
153                        if !s.is_empty() {
154                            acc.push_str(": ");
155                            acc.push_str(s);
156                        }
157                        acc
158                    },
159                );
160                tonic::Status::unknown(message)
161            })
162    }
163}
164
165#[async_trait]
166impl<T> CallbackApi for CallbackApiClient<T>
167where
168    T: tonic::client::GrpcService<
169            tonic::body::Body,
170            Future: Send,
171            ResponseBody: Body<Data = Bytes, Error: Into<StdError> + Send>
172                              + Send
173                              + 'static,
174        > + Clone
175        + Send
176        + Sync,
177{
178    type Error = CallbackApiClientError;
179
180    async fn on_event(
181        &self,
182        request: CallbackRequest,
183    ) -> Result<(), Self::Error> {
184        // It's OK to `.clone()` `tonic::client`:
185        // https://docs.rs/tonic/latest/tonic/client/index.html#concurrent-usage
186        let mut this = self.clone();
187
188        Self::on_event(&mut this, callback_proto::Request::from(request))
189            .await
190            .map(drop)
191            .map_err(Into::into)
192    }
193}
194
195/// Possible errors of [`CallbackApiClient`].
196#[derive(Debug, Display, From, Error)]
197pub enum CallbackApiClientError {
198    /// [gRPC] server errored.
199    ///
200    /// [gRPC]: https://grpc.io
201    #[display("gRPC server errored: {_0}")]
202    Tonic(tonic::Status),
203
204    /// Failed to convert from [gRPC] response.
205    ///
206    /// [gRPC]: https://grpc.io
207    #[display("Failed to convert from gRPC response: {_0}")]
208    InvalidProtobuf(ProtobufError),
209}