orchestra-toolkit 0.6.1

Client to interract with Orchestra system, uses HGTP protocol
Documentation
/* Copyright 2024-2025 LEDR Technologies Inc.
* This file is part of the Orchestra library, which helps developer use our Orchestra technology which is based on AvesTerra, owned and developped by Georgetown University, under license agreement with LEDR Technologies Inc.
*
* The Orchestra library is a free software: you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as published by the Free Software Foundation, either version 3 of the License, or any later version.
*
* The Orchestra library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License along with the Orchestra library. If not, see <https://www.gnu.org/licenses/>.
*
* If you have any questions, feedback or issues about the Orchestra library, you can contact us at support@ledr.io.
*/

#![allow(private_bounds)]
use std::{
    future::{Future, IntoFuture},
    pin::Pin,
};

use bb8::PooledConnection;
use time::OffsetDateTime;

use crate::{
    hgtp::*, taxonomy::*, AvesterraError, CallError, Entity, Session, SessionAsync, SessionTrait,
    String255, Token, Value,
};

pub trait AdaptOutletTrait {
    type CallbackType;
}
impl AdaptOutletTrait for AdaptOutlet<Session> {
    type CallbackType = Box<dyn FnOnce(AdapterArgs) -> Result<Value, AvesterraError> + Send>;
}
#[rustfmt::skip]
impl AdaptOutletTrait for AdaptOutlet<SessionAsync> {
    type CallbackType = 
        Box<
            dyn FnOnce(
                AdapterArgs,
            )
            -> Pin<Box<dyn Future<Output = Result<Value, AvesterraError>> + Send>>
            + Send,
        >;
}

pub struct AdaptOutlet<T: SessionTrait>
where
    AdaptOutlet<T>: AdaptOutletTrait,
{
    pub(crate) session: T,
    cb: <Self as AdaptOutletTrait>::CallbackType,
    outlet: Entity,
    authorization: Token,
    timeout: i64,
}

impl Session {
    #[must_use]
    pub fn adapt_outlet(
        &self,
        outlet: Entity,
        authorization: Token,
        callback: <AdaptOutlet<Session> as AdaptOutletTrait>::CallbackType,
    ) -> AdaptOutlet<Session> {
        AdaptOutlet {
            session: self.clone(),
            cb: callback,
            outlet,
            authorization,
            timeout: i64::default(),
        }
    }
}
impl SessionAsync {
    /// Example:
    /// ```
    /// use orchestra_toolkit::*;
    ///
    /// async fn callback(s: SessionAsync, args: AdapterArgs) -> Result<Value, AvesterraError> {
    ///     println!("Adapting {:?}", args);
    ///     Ok(Value::NULL)
    /// }
    ///
    /// let outlet = Entity::new(0, 0, 4200);
    /// let auth = Token::NULL;
    /// let session = Session::initialize(SessionConfig::default()).unwrap();
    /// session.run_async(|s1| async {
    ///     let s2 = s1.clone();
    ///     s1.
    ///       adapt_outlet(
    ///           outlet,
    ///           auth,
    ///           Box::new(|args: AdapterArgs| Box::pin(callback(s2, args))),
    ///     )
    /// });
    /// ```
    #[must_use]
    pub fn adapt_outlet(
        &self,
        outlet: Entity,
        authorization: Token,
        callback: <AdaptOutlet<SessionAsync> as AdaptOutletTrait>::CallbackType,
    ) -> AdaptOutlet<SessionAsync> {
        AdaptOutlet {
            session: self.clone(),
            cb: callback,
            outlet,
            authorization,
            timeout: i64::default(),
        }
    }
}

impl IntoFuture for AdaptOutlet<SessionAsync> {
    type IntoFuture = Pin<Box<dyn Future<Output = Result<(), CallError>> + Send>>;
    type Output = <Self::IntoFuture as Future>::Output;
    fn into_future(self) -> Self::IntoFuture {
        Box::pin(async move {
            let mut conn = self
                .session
                .get_socket_pool()
                .get()
                .await
                .expect("socket_pool.get()"); // TODO: first handle connection error
            let args =
                call_async_1(&mut conn, self.outlet, self.timeout, self.authorization).await?;
            match args {
                Some(args) => {
                    let res = (self.cb)(args).await;
                    call_async_2(&mut conn, res).await
                }
                None => Ok(()),
            }
        })
    }
}
impl AdaptOutlet<Session> {
    pub fn call(self) -> Result<(), CallError> {
        let mut conn = self
            .session
            .run_async(|s| s.get_socket_pool().get())
            .expect("socket_pool.get()"); // TODO: first handle connection error

        let args = self.session.run_async(|_| {
            call_async_1(&mut conn, self.outlet, self.timeout, self.authorization)
        })?;

        let args = match args {
            Some(args) => args,
            None => return Ok(()),
        };

        // The callback shall not becalled from a session.run_async because
        // nesting session.run_async calls is not allowed, and the callback
        // should be free to do session.run_async calls itself.
        let res = (self.cb)(args);
        self.session.run_async(|_| call_async_2(&mut conn, res))
    }
}

impl<T, Callback> AdaptOutlet<T>
where
    T: SessionTrait,
    AdaptOutlet<T>: AdaptOutletTrait<CallbackType = Callback>,
{
    #[must_use]
    pub fn with_timeout(mut self, timeout: i64) -> Self {
        self.timeout = timeout;
        self
    }
}

async fn call_async_1(
    conn: &mut PooledConnection<'_, PoolManager>,
    outlet: Entity,
    timeout: i64,
    authorization: Token,
) -> Result<Option<AdapterArgs>, CallError> {
    let mut msg = HGTPMessage::default();

    msg.pack_command(Command::Adapt);
    msg.pack_outlet(outlet);
    msg.pack_timeout(timeout);
    msg.pack_authorization(authorization);

    conn.send(&msg).await?;
    conn.recv(&mut msg).await?;

    match AdapterArgs::try_from(&msg) {
        Ok(args) => Ok(Some(args)),
        Err(e) => {
            let mut msg = HGTPMessage::default();
            msg.pack_error_code(HGTPError::Adapter);
            msg.pack_error(&AvesterraError {
                errcode: HGTPError::Adapter,
                message: String255::from_str_truncate(&e.to_string()),
            });
            conn.send(&msg).await?;
            Ok(None)
        }
    }
}

async fn call_async_2(
    conn: &mut PooledConnection<'_, PoolManager>,
    res: Result<Value, AvesterraError>,
) -> Result<(), CallError> {
    let mut msg = HGTPMessage::default();
    match res {
        Ok(value) => {
            msg.pack_value(&value);
        }
        Err(e) => msg.pack_error(&e),
    }
    conn.send(&msg).await?;

    Ok(())
}

#[derive(Debug)]
pub struct AdapterArgs {
    pub entity: Entity,
    pub auxiliary: Entity,
    pub ancillary: Entity,
    /// If call was InquireEntity, the method is always null method (`0`)
    pub method: Method,
    // The outlet the rendezvous has taken place in
    pub outlet: Entity,
    pub attribute: Attribute,
    pub instance: i32,
    pub offset: i32,
    pub name: String255,
    pub key: String255,
    pub value: Value,
    pub parameter: i64,
    pub resultant: i64,
    pub index: i64,
    pub count: i64,
    pub aspect: Aspect,
    pub context: Context,
    pub category: Category,
    pub class: Class,
    pub event: Event,
    pub mode: Mode,
    pub state: State,
    pub condition: Condition,
    pub precedence: u16,
    pub time: OffsetDateTime,
    pub timeout: i64,
    pub authority: Token,
    pub authorization: Token,
}

impl TryFrom<&HGTPMessage> for AdapterArgs {
    type Error = UnpackError;

    #[rustfmt::skip]
    fn try_from(msg: &HGTPMessage) -> Result<Self, Self::Error> {
        Ok(Self {
            entity       : msg.unpack_entity(),
            auxiliary    : msg.unpack_auxiliary(),
            ancillary    : msg.unpack_ancillary(),
            method       : msg.unpack_method()?,
            outlet       : msg.unpack_outlet(),
            attribute    : msg.unpack_attribute()?,
            instance     : msg.unpack_instance(),
            offset       : msg.unpack_offset(),
            name         : msg.unpack_name()?,
            key          : msg.unpack_key()?,
            value        : msg.unpack_value()?,
            parameter    : msg.unpack_parameter(),
            resultant    : msg.unpack_resultant(),
            index        : msg.unpack_index(),
            count        : msg.unpack_count(),
            aspect       : msg.unpack_aspect()?,
            context      : msg.unpack_context()?,
            category     : msg.unpack_category()?,
            class        : msg.unpack_class()?,
            event        : msg.unpack_event()?,
            mode         : msg.unpack_mode()?,
            state        : msg.unpack_state()?,
            condition    : msg.unpack_condition()?,
            precedence   : msg.unpack_precedence(),
            time         : msg.unpack_time()?,
            timeout      : msg.unpack_timeout(),
            authority    : msg.unpack_authority(),
            authorization: msg.unpack_authorization(),
        })
    }
}