medea_control_api_proto/grpc/
server.rs1use std::collections::HashMap;
6
7use async_trait::async_trait;
8use derive_more::with_trait::{Display, Error, From};
9use tonic::codegen::{Body, Bytes};
10
11use crate::{
12 CallbackApi, ControlApi,
13 callback::Request as CallbackRequest,
14 control::Request as ControlRequest,
15 grpc::{
16 CallbackApiClient, ProtobufError,
17 api::{
18 self as control_proto,
19 control_api_server::ControlApi as GrpcControlApiService,
20 },
21 callback as callback_proto,
22 },
23};
24
25type StdError = Box<dyn Error + Send + Sync + 'static>;
27
28#[async_trait]
29impl<T: ?Sized> GrpcControlApiService for T
30where
31 T: ControlApi + Send + Sync + 'static,
32 T::Error: From<ProtobufError>,
33 control_proto::Error: From<T::Error>,
34{
35 async fn create(
36 &self,
37 request: tonic::Request<control_proto::CreateRequest>,
38 ) -> Result<tonic::Response<control_proto::CreateResponse>, tonic::Status>
39 {
40 let fut = async {
41 self.create(ControlRequest::try_from(request.into_inner())?).await
42 };
43
44 Ok(tonic::Response::new(match fut.await {
45 Ok(sids) => control_proto::CreateResponse {
46 sid: sids
47 .into_iter()
48 .map(|(id, sid)| (id.to_string(), sid.to_uri_string()))
49 .collect(),
50 error: None,
51 },
52 Err(e) => control_proto::CreateResponse {
53 sid: HashMap::new(),
54 error: Some(e.into()),
55 },
56 }))
57 }
58
59 async fn delete(
60 &self,
61 request: tonic::Request<control_proto::IdRequest>,
62 ) -> Result<tonic::Response<control_proto::Response>, tonic::Status> {
63 let ids = request
64 .into_inner()
65 .fid
66 .into_iter()
67 .map(|fid| fid.parse().map_err(ProtobufError::from))
68 .collect::<Result<Vec<_>, _>>();
69
70 let result = match ids {
71 Ok(ids) => self.delete(&ids).await,
72 Err(e) => Err(e.into()),
73 };
74
75 Ok(tonic::Response::new(match result {
76 Ok(()) => control_proto::Response { error: None },
77 Err(e) => control_proto::Response { error: Some(e.into()) },
78 }))
79 }
80
81 async fn get(
82 &self,
83 request: tonic::Request<control_proto::IdRequest>,
84 ) -> Result<tonic::Response<control_proto::GetResponse>, tonic::Status>
85 {
86 let ids = request
87 .into_inner()
88 .fid
89 .into_iter()
90 .map(|fid| fid.parse().map_err(ProtobufError::from))
91 .collect::<Result<Vec<_>, _>>();
92
93 let result = match ids {
94 Ok(ids) => self.get(&ids).await,
95 Err(e) => Err(e.into()),
96 };
97
98 Ok(tonic::Response::new(match result {
99 Ok(elements) => control_proto::GetResponse {
100 elements: elements
101 .into_iter()
102 .map(|(id, el)| {
103 let s = id.to_string();
104 (id, el).try_into().map(|proto| (s, proto))
105 })
106 .collect::<Result<_, _>>()?,
107 error: None,
108 },
109 Err(e) => control_proto::GetResponse {
110 elements: HashMap::new(),
111 error: Some(e.into()),
112 },
113 }))
114 }
115
116 async fn apply(
117 &self,
118 request: tonic::Request<control_proto::ApplyRequest>,
119 ) -> Result<tonic::Response<control_proto::CreateResponse>, tonic::Status>
120 {
121 let result = async {
122 let req = ControlRequest::try_from(request.into_inner())?;
123 self.apply(req).await
124 };
125
126 Ok(tonic::Response::new(match result.await {
127 Ok(sids) => control_proto::CreateResponse {
128 sid: sids
129 .into_iter()
130 .map(|(id, sid)| (id.to_string(), sid.to_uri_string()))
131 .collect(),
132 error: None,
133 },
134 Err(e) => control_proto::CreateResponse {
135 sid: HashMap::new(),
136 error: Some(e.into()),
137 },
138 }))
139 }
140
141 async fn healthz(
142 &self,
143 request: tonic::Request<control_proto::Ping>,
144 ) -> Result<tonic::Response<control_proto::Pong>, tonic::Status> {
145 self.healthz(request.into_inner().into())
146 .await
147 .map(|pong| tonic::Response::new(pong.into()))
148 .map_err(|e| {
149 let e = control_proto::Error::from(e);
150 let message = [&e.doc, &e.element, &e.text].into_iter().fold(
151 e.code.to_string(),
152 |mut acc, s| {
153 if !s.is_empty() {
154 acc.push_str(": ");
155 acc.push_str(s);
156 }
157 acc
158 },
159 );
160 tonic::Status::unknown(message)
161 })
162 }
163}
164
165#[async_trait]
166impl<T> CallbackApi for CallbackApiClient<T>
167where
168 T: tonic::client::GrpcService<
169 tonic::body::Body,
170 Future: Send,
171 ResponseBody: Body<Data = Bytes, Error: Into<StdError> + Send>
172 + Send
173 + 'static,
174 > + Clone
175 + Send
176 + Sync,
177{
178 type Error = CallbackApiClientError;
179
180 async fn on_event(
181 &self,
182 request: CallbackRequest,
183 ) -> Result<(), Self::Error> {
184 let mut this = self.clone();
187
188 Self::on_event(&mut this, callback_proto::Request::from(request))
189 .await
190 .map(drop)
191 .map_err(Into::into)
192 }
193}
194
195#[derive(Debug, Display, From, Error)]
197pub enum CallbackApiClientError {
198 #[display("gRPC server errored: {_0}")]
202 Tonic(tonic::Status),
203
204 #[display("Failed to convert from gRPC response: {_0}")]
208 InvalidProtobuf(ProtobufError),
209}