medea_control_api_proto/grpc/
client.rs1use async_trait::async_trait;
6use derive_more::with_trait::{Display, Error, From};
7use tonic::codegen::{Body, Bytes};
8
9use crate::{
10 CallbackApi, ControlApi, Elements, Fid, Ping, Pong,
11 callback::Request as CallbackRequest,
12 control::{ParseFidError, Request as ControlRequest},
13 grpc::{
14 ControlApiClient, ProtobufError,
15 api::{self as control_proto},
16 callback::{
17 self as callback_proto,
18 callback_server::Callback as GrpcCallbackService,
19 },
20 },
21 member,
22 member::ParseSidError,
23};
24
25type StdError = Box<dyn Error + Send + Sync + 'static>;
27
28#[async_trait]
29impl<T: ?Sized> GrpcCallbackService for T
30where
31 T: CallbackApi + Send + Sync + 'static,
32 T::Error: From<ProtobufError>,
33 tonic::Status: From<T::Error>,
34{
35 async fn on_event(
36 &self,
37 request: tonic::Request<callback_proto::Request>,
38 ) -> Result<tonic::Response<callback_proto::Response>, tonic::Status> {
39 let req = CallbackRequest::try_from(request.into_inner())
40 .map_err(T::Error::from)?;
41 self.on_event(req)
42 .await
43 .map(|()| tonic::Response::new(callback_proto::Response {}))
44 .map_err(Into::into)
45 }
46}
47
48#[async_trait]
49impl<T> ControlApi for ControlApiClient<T>
50where
51 T: tonic::client::GrpcService<tonic::body::BoxBody> + Clone + Send + Sync,
52 T::Future: Send,
53 T::ResponseBody: Body<Data = Bytes> + Send + 'static,
54 <T::ResponseBody as Body>::Error: Send,
55 StdError: From<<T::ResponseBody as Body>::Error>,
56{
57 type Error = ControlApiClientError;
58
59 async fn create(
60 &self,
61 request: ControlRequest,
62 ) -> Result<member::Sids, Self::Error> {
63 let mut this = self.clone();
66
67 let resp = Self::create(
68 &mut this,
69 control_proto::CreateRequest::from(request),
70 )
71 .await?
72 .into_inner();
73 if let Some(e) = resp.error {
74 return Err(e.into());
75 }
76
77 resp.sid
78 .into_iter()
79 .map(|(id, sid)| {
80 Ok((member::Id::from(id), sid.parse::<member::Sid>()?))
81 })
82 .collect()
83 }
84
85 async fn apply(
86 &self,
87 request: ControlRequest,
88 ) -> Result<member::Sids, Self::Error> {
89 let mut this = self.clone();
92
93 let resp =
94 Self::apply(&mut this, control_proto::ApplyRequest::from(request))
95 .await?
96 .into_inner();
97 if let Some(e) = resp.error {
98 return Err(e.into());
99 }
100
101 resp.sid
102 .into_iter()
103 .map(|(id, sid)| {
104 Ok((member::Id::from(id), sid.parse::<member::Sid>()?))
105 })
106 .collect()
107 }
108
109 async fn delete(&self, fids: &[Fid]) -> Result<(), Self::Error> {
110 let mut this = self.clone();
113
114 let resp = Self::delete(
115 &mut this,
116 control_proto::IdRequest {
117 fid: fids.iter().map(ToString::to_string).collect(),
118 },
119 )
120 .await?
121 .into_inner();
122 if let Some(e) = resp.error {
123 return Err(e.into());
124 }
125
126 Ok(())
127 }
128
129 async fn get(&self, fids: &[Fid]) -> Result<Elements, Self::Error> {
130 let mut this = self.clone();
133
134 let resp = Self::get(
135 &mut this,
136 control_proto::IdRequest {
137 fid: fids.iter().map(ToString::to_string).collect(),
138 },
139 )
140 .await?
141 .into_inner();
142 if let Some(e) = resp.error {
143 return Err(e.into());
144 }
145
146 resp.elements
147 .into_iter()
148 .map(|(fid, el)| Ok((fid.parse()?, el.try_into()?)))
149 .collect()
150 }
151
152 async fn healthz(&self, ping: Ping) -> Result<Pong, Self::Error> {
153 let mut this = self.clone();
156
157 Ok(Self::healthz(&mut this, control_proto::Ping::from(ping))
158 .await?
159 .into_inner()
160 .into())
161 }
162}
163
164#[derive(Debug, Display, From, Error)]
166pub enum ControlApiClientError {
167 #[display("Invalid SID: {_0}")]
169 InvalidSid(ParseSidError),
170
171 #[display("Invalid FID: {_0}")]
173 InvalidFid(ParseFidError),
174
175 #[display("gRPC server errored: {_0}")]
179 Tonic(tonic::Status),
180
181 #[display("Failed to convert from gRPC response: {_0}")]
185 InvalidProtobuf(ProtobufError),
186
187 #[display("Control API server errored: {_0:?}")]
189 ControlError(#[error(not(source))] control_proto::Error),
190}