orchestra_toolkit/session/api/
atapi_adapt.rs

1/* Copyright 2024-2025 LEDR Technologies Inc.
2* This file is part of the Orchestra library, which helps developer use our Orchestra technology which is based on AvesTerra, owned and developped by Georgetown University, under license agreement with LEDR Technologies Inc.
3*
4* The Orchestra library is a free software: you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as published by the Free Software Foundation, either version 3 of the License, or any later version.
5*
6* The Orchestra library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
7*
8* You should have received a copy of the GNU Lesser General Public License along with the Orchestra library. If not, see <https://www.gnu.org/licenses/>.
9*
10* If you have any questions, feedback or issues about the Orchestra library, you can contact us at support@ledr.io.
11*/
12
13#![allow(private_bounds)]
14use std::{
15    future::{Future, IntoFuture},
16    pin::Pin,
17};
18
19use bb8::PooledConnection;
20use time::OffsetDateTime;
21
22use crate::{
23    hgtp::*, taxonomy::*, AvesterraError, CallError, Entity, Session, SessionAsync, SessionTrait,
24    String255, Token, Value,
25};
26
27pub trait AdaptOutletTrait {
28    type CallbackType;
29}
30impl AdaptOutletTrait for AdaptOutlet<Session> {
31    type CallbackType = Box<dyn FnOnce(AdapterArgs) -> Result<Value, AvesterraError> + Send>;
32}
33#[rustfmt::skip]
34impl AdaptOutletTrait for AdaptOutlet<SessionAsync> {
35    type CallbackType = 
36        Box<
37            dyn FnOnce(
38                AdapterArgs,
39            )
40            -> Pin<Box<dyn Future<Output = Result<Value, AvesterraError>> + Send>>
41            + Send,
42        >;
43}
44
45pub struct AdaptOutlet<T: SessionTrait>
46where
47    AdaptOutlet<T>: AdaptOutletTrait,
48{
49    pub(crate) session: T,
50    cb: <Self as AdaptOutletTrait>::CallbackType,
51    outlet: Entity,
52    authorization: Token,
53    timeout: i64,
54}
55
56impl Session {
57    #[must_use]
58    pub fn adapt_outlet(
59        &self,
60        outlet: Entity,
61        authorization: Token,
62        callback: <AdaptOutlet<Session> as AdaptOutletTrait>::CallbackType,
63    ) -> AdaptOutlet<Session> {
64        AdaptOutlet {
65            session: self.clone(),
66            cb: callback,
67            outlet,
68            authorization,
69            timeout: i64::default(),
70        }
71    }
72}
73impl SessionAsync {
74    /// Example:
75    /// ```
76    /// use orchestra_toolkit::*;
77    ///
78    /// async fn callback(s: SessionAsync, args: AdapterArgs) -> Result<Value, AvesterraError> {
79    ///     println!("Adapting {:?}", args);
80    ///     Ok(Value::NULL)
81    /// }
82    ///
83    /// let outlet = Entity::new(0, 0, 4200);
84    /// let auth = Token::NULL;
85    /// let session = Session::initialize(SessionConfig::default()).unwrap();
86    /// session.run_async(|s1| async {
87    ///     let s2 = s1.clone();
88    ///     s1.
89    ///       adapt_outlet(
90    ///           outlet,
91    ///           auth,
92    ///           Box::new(|args: AdapterArgs| Box::pin(callback(s2, args))),
93    ///     )
94    /// });
95    /// ```
96    #[must_use]
97    pub fn adapt_outlet(
98        &self,
99        outlet: Entity,
100        authorization: Token,
101        callback: <AdaptOutlet<SessionAsync> as AdaptOutletTrait>::CallbackType,
102    ) -> AdaptOutlet<SessionAsync> {
103        AdaptOutlet {
104            session: self.clone(),
105            cb: callback,
106            outlet,
107            authorization,
108            timeout: i64::default(),
109        }
110    }
111}
112
113impl IntoFuture for AdaptOutlet<SessionAsync> {
114    type IntoFuture = Pin<Box<dyn Future<Output = Result<(), CallError>> + Send>>;
115    type Output = <Self::IntoFuture as Future>::Output;
116    fn into_future(self) -> Self::IntoFuture {
117        Box::pin(async move {
118            let mut conn = self
119                .session
120                .get_socket_pool()
121                .get()
122                .await
123                .expect("socket_pool.get()"); // TODO: first handle connection error
124            let args =
125                call_async_1(&mut conn, self.outlet, self.timeout, self.authorization).await?;
126            match args {
127                Some(args) => {
128                    let res = (self.cb)(args).await;
129                    call_async_2(&mut conn, res).await
130                }
131                None => Ok(()),
132            }
133        })
134    }
135}
136impl AdaptOutlet<Session> {
137    pub fn call(self) -> Result<(), CallError> {
138        let mut conn = self
139            .session
140            .run_async(|s| s.get_socket_pool().get())
141            .expect("socket_pool.get()"); // TODO: first handle connection error
142
143        let args = self.session.run_async(|_| {
144            call_async_1(&mut conn, self.outlet, self.timeout, self.authorization)
145        })?;
146
147        let args = match args {
148            Some(args) => args,
149            None => return Ok(()),
150        };
151
152        // The callback shall not becalled from a session.run_async because
153        // nesting session.run_async calls is not allowed, and the callback
154        // should be free to do session.run_async calls itself.
155        let res = (self.cb)(args);
156        self.session.run_async(|_| call_async_2(&mut conn, res))
157    }
158}
159
160impl<T, Callback> AdaptOutlet<T>
161where
162    T: SessionTrait,
163    AdaptOutlet<T>: AdaptOutletTrait<CallbackType = Callback>,
164{
165    #[must_use]
166    pub fn with_timeout(mut self, timeout: i64) -> Self {
167        self.timeout = timeout;
168        self
169    }
170}
171
172async fn call_async_1(
173    conn: &mut PooledConnection<'_, PoolManager>,
174    outlet: Entity,
175    timeout: i64,
176    authorization: Token,
177) -> Result<Option<AdapterArgs>, CallError> {
178    let mut msg = HGTPMessage::default();
179
180    msg.pack_command(Command::Adapt);
181    msg.pack_outlet(outlet);
182    msg.pack_timeout(timeout);
183    msg.pack_authorization(authorization);
184
185    conn.send(&msg).await?;
186    conn.recv(&mut msg).await?;
187
188    match AdapterArgs::try_from(&msg) {
189        Ok(args) => Ok(Some(args)),
190        Err(e) => {
191            let mut msg = HGTPMessage::default();
192            msg.pack_error_code(HGTPError::Adapter);
193            msg.pack_error(&AvesterraError {
194                errcode: HGTPError::Adapter,
195                message: String255::from_str_truncate(&e.to_string()),
196            });
197            conn.send(&msg).await?;
198            Ok(None)
199        }
200    }
201}
202
203async fn call_async_2(
204    conn: &mut PooledConnection<'_, PoolManager>,
205    res: Result<Value, AvesterraError>,
206) -> Result<(), CallError> {
207    let mut msg = HGTPMessage::default();
208    match res {
209        Ok(value) => {
210            msg.pack_value(&value);
211        }
212        Err(e) => msg.pack_error(&e),
213    }
214    conn.send(&msg).await?;
215
216    Ok(())
217}
218
219#[derive(Debug)]
220pub struct AdapterArgs {
221    pub entity: Entity,
222    pub auxiliary: Entity,
223    pub ancillary: Entity,
224    /// If call was InquireEntity, the method is always null method (`0`)
225    pub method: Method,
226    // The outlet the rendezvous has taken place in
227    pub outlet: Entity,
228    pub attribute: Attribute,
229    pub instance: i32,
230    pub offset: i32,
231    pub name: String255,
232    pub key: String255,
233    pub value: Value,
234    pub parameter: i64,
235    pub resultant: i64,
236    pub index: i64,
237    pub count: i64,
238    pub aspect: Aspect,
239    pub context: Context,
240    pub category: Category,
241    pub class: Class,
242    pub event: Event,
243    pub mode: Mode,
244    pub state: State,
245    pub condition: Condition,
246    pub precedence: u16,
247    pub time: OffsetDateTime,
248    pub timeout: i64,
249    pub authority: Token,
250    pub authorization: Token,
251}
252
253impl TryFrom<&HGTPMessage> for AdapterArgs {
254    type Error = UnpackError;
255
256    #[rustfmt::skip]
257    fn try_from(msg: &HGTPMessage) -> Result<Self, Self::Error> {
258        Ok(Self {
259            entity       : msg.unpack_entity(),
260            auxiliary    : msg.unpack_auxiliary(),
261            ancillary    : msg.unpack_ancillary(),
262            method       : msg.unpack_method()?,
263            outlet       : msg.unpack_outlet(),
264            attribute    : msg.unpack_attribute()?,
265            instance     : msg.unpack_instance(),
266            offset       : msg.unpack_offset(),
267            name         : msg.unpack_name()?,
268            key          : msg.unpack_key()?,
269            value        : msg.unpack_value()?,
270            parameter    : msg.unpack_parameter(),
271            resultant    : msg.unpack_resultant(),
272            index        : msg.unpack_index(),
273            count        : msg.unpack_count(),
274            aspect       : msg.unpack_aspect()?,
275            context      : msg.unpack_context()?,
276            category     : msg.unpack_category()?,
277            class        : msg.unpack_class()?,
278            event        : msg.unpack_event()?,
279            mode         : msg.unpack_mode()?,
280            state        : msg.unpack_state()?,
281            condition    : msg.unpack_condition()?,
282            precedence   : msg.unpack_precedence(),
283            time         : msg.unpack_time()?,
284            timeout      : msg.unpack_timeout(),
285            authority    : msg.unpack_authority(),
286            authorization: msg.unpack_authorization(),
287        })
288    }
289}