orchestra_toolkit/session/api/
atapi_adapt.rs1#![allow(private_bounds)]
14use std::{
15 future::{Future, IntoFuture},
16 pin::Pin,
17};
18
19use bb8::PooledConnection;
20use time::OffsetDateTime;
21
22use crate::{
23 hgtp::*, taxonomy::*, AvesterraError, CallError, Entity, Session, SessionAsync, SessionTrait,
24 String255, Token, Value,
25};
26
27pub trait AdaptOutletTrait {
28 type CallbackType;
29}
30impl AdaptOutletTrait for AdaptOutlet<Session> {
31 type CallbackType = Box<dyn FnOnce(AdapterArgs) -> Result<Value, AvesterraError> + Send>;
32}
33#[rustfmt::skip]
34impl AdaptOutletTrait for AdaptOutlet<SessionAsync> {
35 type CallbackType =
36 Box<
37 dyn FnOnce(
38 AdapterArgs,
39 )
40 -> Pin<Box<dyn Future<Output = Result<Value, AvesterraError>> + Send>>
41 + Send,
42 >;
43}
44
45pub struct AdaptOutlet<T: SessionTrait>
46where
47 AdaptOutlet<T>: AdaptOutletTrait,
48{
49 pub(crate) session: T,
50 cb: <Self as AdaptOutletTrait>::CallbackType,
51 outlet: Entity,
52 authorization: Token,
53 timeout: i64,
54}
55
56impl Session {
57 #[must_use]
58 pub fn adapt_outlet(
59 &self,
60 outlet: Entity,
61 authorization: Token,
62 callback: <AdaptOutlet<Session> as AdaptOutletTrait>::CallbackType,
63 ) -> AdaptOutlet<Session> {
64 AdaptOutlet {
65 session: self.clone(),
66 cb: callback,
67 outlet,
68 authorization,
69 timeout: i64::default(),
70 }
71 }
72}
73impl SessionAsync {
74 #[must_use]
97 pub fn adapt_outlet(
98 &self,
99 outlet: Entity,
100 authorization: Token,
101 callback: <AdaptOutlet<SessionAsync> as AdaptOutletTrait>::CallbackType,
102 ) -> AdaptOutlet<SessionAsync> {
103 AdaptOutlet {
104 session: self.clone(),
105 cb: callback,
106 outlet,
107 authorization,
108 timeout: i64::default(),
109 }
110 }
111}
112
113impl IntoFuture for AdaptOutlet<SessionAsync> {
114 type IntoFuture = Pin<Box<dyn Future<Output = Result<(), CallError>> + Send>>;
115 type Output = <Self::IntoFuture as Future>::Output;
116 fn into_future(self) -> Self::IntoFuture {
117 Box::pin(async move {
118 let mut conn = self
119 .session
120 .get_socket_pool()
121 .get()
122 .await
123 .expect("socket_pool.get()"); let args =
125 call_async_1(&mut conn, self.outlet, self.timeout, self.authorization).await?;
126 match args {
127 Some(args) => {
128 let res = (self.cb)(args).await;
129 call_async_2(&mut conn, res).await
130 }
131 None => Ok(()),
132 }
133 })
134 }
135}
136impl AdaptOutlet<Session> {
137 pub fn call(self) -> Result<(), CallError> {
138 let mut conn = self
139 .session
140 .run_async(|s| s.get_socket_pool().get())
141 .expect("socket_pool.get()"); let args = self.session.run_async(|_| {
144 call_async_1(&mut conn, self.outlet, self.timeout, self.authorization)
145 })?;
146
147 let args = match args {
148 Some(args) => args,
149 None => return Ok(()),
150 };
151
152 let res = (self.cb)(args);
156 self.session.run_async(|_| call_async_2(&mut conn, res))
157 }
158}
159
160impl<T, Callback> AdaptOutlet<T>
161where
162 T: SessionTrait,
163 AdaptOutlet<T>: AdaptOutletTrait<CallbackType = Callback>,
164{
165 #[must_use]
166 pub fn with_timeout(mut self, timeout: i64) -> Self {
167 self.timeout = timeout;
168 self
169 }
170}
171
172async fn call_async_1(
173 conn: &mut PooledConnection<'_, PoolManager>,
174 outlet: Entity,
175 timeout: i64,
176 authorization: Token,
177) -> Result<Option<AdapterArgs>, CallError> {
178 let mut msg = HGTPMessage::default();
179
180 msg.pack_command(Command::Adapt);
181 msg.pack_outlet(outlet);
182 msg.pack_timeout(timeout);
183 msg.pack_authorization(authorization);
184
185 conn.send(&msg).await?;
186 conn.recv(&mut msg).await?;
187
188 match AdapterArgs::try_from(&msg) {
189 Ok(args) => Ok(Some(args)),
190 Err(e) => {
191 let mut msg = HGTPMessage::default();
192 msg.pack_error_code(HGTPError::Adapter);
193 msg.pack_error(&AvesterraError {
194 errcode: HGTPError::Adapter,
195 message: String255::from_str_truncate(&e.to_string()),
196 });
197 conn.send(&msg).await?;
198 Ok(None)
199 }
200 }
201}
202
203async fn call_async_2(
204 conn: &mut PooledConnection<'_, PoolManager>,
205 res: Result<Value, AvesterraError>,
206) -> Result<(), CallError> {
207 let mut msg = HGTPMessage::default();
208 match res {
209 Ok(value) => {
210 msg.pack_value(&value);
211 }
212 Err(e) => msg.pack_error(&e),
213 }
214 conn.send(&msg).await?;
215
216 Ok(())
217}
218
219#[derive(Debug)]
220pub struct AdapterArgs {
221 pub entity: Entity,
222 pub auxiliary: Entity,
223 pub ancillary: Entity,
224 pub method: Method,
226 pub outlet: Entity,
228 pub attribute: Attribute,
229 pub instance: i32,
230 pub offset: i32,
231 pub name: String255,
232 pub key: String255,
233 pub value: Value,
234 pub parameter: i64,
235 pub resultant: i64,
236 pub index: i64,
237 pub count: i64,
238 pub aspect: Aspect,
239 pub context: Context,
240 pub category: Category,
241 pub class: Class,
242 pub event: Event,
243 pub mode: Mode,
244 pub state: State,
245 pub condition: Condition,
246 pub precedence: u16,
247 pub time: OffsetDateTime,
248 pub timeout: i64,
249 pub authority: Token,
250 pub authorization: Token,
251}
252
253impl TryFrom<&HGTPMessage> for AdapterArgs {
254 type Error = UnpackError;
255
256 #[rustfmt::skip]
257 fn try_from(msg: &HGTPMessage) -> Result<Self, Self::Error> {
258 Ok(Self {
259 entity : msg.unpack_entity(),
260 auxiliary : msg.unpack_auxiliary(),
261 ancillary : msg.unpack_ancillary(),
262 method : msg.unpack_method()?,
263 outlet : msg.unpack_outlet(),
264 attribute : msg.unpack_attribute()?,
265 instance : msg.unpack_instance(),
266 offset : msg.unpack_offset(),
267 name : msg.unpack_name()?,
268 key : msg.unpack_key()?,
269 value : msg.unpack_value()?,
270 parameter : msg.unpack_parameter(),
271 resultant : msg.unpack_resultant(),
272 index : msg.unpack_index(),
273 count : msg.unpack_count(),
274 aspect : msg.unpack_aspect()?,
275 context : msg.unpack_context()?,
276 category : msg.unpack_category()?,
277 class : msg.unpack_class()?,
278 event : msg.unpack_event()?,
279 mode : msg.unpack_mode()?,
280 state : msg.unpack_state()?,
281 condition : msg.unpack_condition()?,
282 precedence : msg.unpack_precedence(),
283 time : msg.unpack_time()?,
284 timeout : msg.unpack_timeout(),
285 authority : msg.unpack_authority(),
286 authorization: msg.unpack_authorization(),
287 })
288 }
289}