medea_control_api_proto/grpc/
client.rs1use async_trait::async_trait;
6use derive_more::with_trait::{Display, Error, From};
7use tonic::codegen::{Body, Bytes};
8
9use crate::{
10 CallbackApi, ControlApi, Elements, Fid, Ping, Pong,
11 callback::Request as CallbackRequest,
12 control::{ParseFidError, Request as ControlRequest},
13 grpc::{
14 ControlApiClient, ProtobufError,
15 api::{self as control_proto},
16 callback::{
17 self as callback_proto,
18 callback_server::Callback as GrpcCallbackService,
19 },
20 },
21 member,
22 member::ParseSidError,
23};
24
25type StdError = Box<dyn Error + Send + Sync + 'static>;
27
28#[async_trait]
29impl<T: ?Sized> GrpcCallbackService for T
30where
31 T: CallbackApi + Send + Sync + 'static,
32 T::Error: From<ProtobufError>,
33 tonic::Status: From<T::Error>,
34{
35 async fn on_event(
36 &self,
37 request: tonic::Request<callback_proto::Request>,
38 ) -> Result<tonic::Response<callback_proto::Response>, tonic::Status> {
39 let req = CallbackRequest::try_from(request.into_inner())
40 .map_err(T::Error::from)?;
41 self.on_event(req)
42 .await
43 .map(|()| tonic::Response::new(callback_proto::Response {}))
44 .map_err(Into::into)
45 }
46}
47
48#[async_trait]
49impl<T> ControlApi for ControlApiClient<T>
50where
51 T: tonic::client::GrpcService<
52 tonic::body::Body,
53 Future: Send,
54 ResponseBody: Body<Data = Bytes, Error: Into<StdError> + Send>
55 + Send
56 + 'static,
57 > + Clone
58 + Send
59 + Sync,
60{
61 type Error = ControlApiClientError;
62
63 async fn create(
64 &self,
65 request: ControlRequest,
66 ) -> Result<member::Sids, Self::Error> {
67 let mut this = self.clone();
70
71 let resp = Self::create(
72 &mut this,
73 control_proto::CreateRequest::from(request),
74 )
75 .await?
76 .into_inner();
77 if let Some(e) = resp.error {
78 return Err(e.into());
79 }
80
81 resp.sid
82 .into_iter()
83 .map(|(id, sid)| {
84 Ok((member::Id::from(id), sid.parse::<member::Sid>()?))
85 })
86 .collect()
87 }
88
89 async fn apply(
90 &self,
91 request: ControlRequest,
92 ) -> Result<member::Sids, Self::Error> {
93 let mut this = self.clone();
96
97 let resp =
98 Self::apply(&mut this, control_proto::ApplyRequest::from(request))
99 .await?
100 .into_inner();
101 if let Some(e) = resp.error {
102 return Err(e.into());
103 }
104
105 resp.sid
106 .into_iter()
107 .map(|(id, sid)| {
108 Ok((member::Id::from(id), sid.parse::<member::Sid>()?))
109 })
110 .collect()
111 }
112
113 async fn delete(&self, fids: &[Fid]) -> Result<(), Self::Error> {
114 let mut this = self.clone();
117
118 let resp = Self::delete(
119 &mut this,
120 control_proto::IdRequest {
121 fid: fids.iter().map(ToString::to_string).collect(),
122 },
123 )
124 .await?
125 .into_inner();
126 if let Some(e) = resp.error {
127 return Err(e.into());
128 }
129
130 Ok(())
131 }
132
133 async fn get(&self, fids: &[Fid]) -> Result<Elements, Self::Error> {
134 let mut this = self.clone();
137
138 let resp = Self::get(
139 &mut this,
140 control_proto::IdRequest {
141 fid: fids.iter().map(ToString::to_string).collect(),
142 },
143 )
144 .await?
145 .into_inner();
146 if let Some(e) = resp.error {
147 return Err(e.into());
148 }
149
150 resp.elements
151 .into_iter()
152 .map(|(fid, el)| Ok((fid.parse()?, el.try_into()?)))
153 .collect()
154 }
155
156 async fn healthz(&self, ping: Ping) -> Result<Pong, Self::Error> {
157 let mut this = self.clone();
160
161 Ok(Self::healthz(&mut this, control_proto::Ping::from(ping))
162 .await?
163 .into_inner()
164 .into())
165 }
166}
167
168#[derive(Debug, Display, From, Error)]
170pub enum ControlApiClientError {
171 #[display("Invalid SID: {_0}")]
173 InvalidSid(ParseSidError),
174
175 #[display("Invalid FID: {_0}")]
177 InvalidFid(ParseFidError),
178
179 #[display("gRPC server errored: {_0}")]
183 Tonic(tonic::Status),
184
185 #[display("Failed to convert from gRPC response: {_0}")]
189 InvalidProtobuf(ProtobufError),
190
191 #[display("Control API server errored: {_0:?}")]
193 ControlError(#[error(not(source))] control_proto::Error),
194}