#![allow(private_bounds)]
use std::{
future::{Future, IntoFuture},
pin::Pin,
};
use bb8::PooledConnection;
use time::OffsetDateTime;
use crate::{
hgtp::*, taxonomy::*, AvesterraError, CallError, Entity, Session, SessionAsync, SessionTrait,
String255, Token, Value,
};
pub trait AdaptOutletTrait {
type CallbackType;
}
impl AdaptOutletTrait for AdaptOutlet<Session> {
type CallbackType = Box<dyn FnOnce(AdapterArgs) -> Result<Value, AvesterraError> + Send>;
}
#[rustfmt::skip]
impl AdaptOutletTrait for AdaptOutlet<SessionAsync> {
type CallbackType =
Box<
dyn FnOnce(
AdapterArgs,
)
-> Pin<Box<dyn Future<Output = Result<Value, AvesterraError>> + Send>>
+ Send,
>;
}
pub struct AdaptOutlet<T: SessionTrait>
where
AdaptOutlet<T>: AdaptOutletTrait,
{
pub(crate) session: T,
cb: <Self as AdaptOutletTrait>::CallbackType,
outlet: Entity,
authorization: Token,
timeout: i64,
}
impl Session {
#[must_use]
pub fn adapt_outlet(
&self,
outlet: Entity,
authorization: Token,
callback: <AdaptOutlet<Session> as AdaptOutletTrait>::CallbackType,
) -> AdaptOutlet<Session> {
AdaptOutlet {
session: self.clone(),
cb: callback,
outlet,
authorization,
timeout: i64::default(),
}
}
}
impl SessionAsync {
#[must_use]
pub fn adapt_outlet(
&self,
outlet: Entity,
authorization: Token,
callback: <AdaptOutlet<SessionAsync> as AdaptOutletTrait>::CallbackType,
) -> AdaptOutlet<SessionAsync> {
AdaptOutlet {
session: self.clone(),
cb: callback,
outlet,
authorization,
timeout: i64::default(),
}
}
}
impl IntoFuture for AdaptOutlet<SessionAsync> {
type IntoFuture = Pin<Box<dyn Future<Output = Result<(), CallError>> + Send>>;
type Output = <Self::IntoFuture as Future>::Output;
fn into_future(self) -> Self::IntoFuture {
Box::pin(async move {
let mut conn = self
.session
.get_socket_pool()
.get()
.await
.expect("socket_pool.get()"); let args =
call_async_1(&mut conn, self.outlet, self.timeout, self.authorization).await?;
match args {
Some(args) => {
let res = (self.cb)(args).await;
call_async_2(&mut conn, res).await
}
None => Ok(()),
}
})
}
}
impl AdaptOutlet<Session> {
pub fn call(self) -> Result<(), CallError> {
let mut conn = self
.session
.run_async(|s| s.get_socket_pool().get())
.expect("socket_pool.get()");
let args = self.session.run_async(|_| {
call_async_1(&mut conn, self.outlet, self.timeout, self.authorization)
})?;
let args = match args {
Some(args) => args,
None => return Ok(()),
};
let res = (self.cb)(args);
self.session.run_async(|_| call_async_2(&mut conn, res))
}
}
impl<T, Callback> AdaptOutlet<T>
where
T: SessionTrait,
AdaptOutlet<T>: AdaptOutletTrait<CallbackType = Callback>,
{
#[must_use]
pub fn with_timeout(mut self, timeout: i64) -> Self {
self.timeout = timeout;
self
}
}
async fn call_async_1(
conn: &mut PooledConnection<'_, PoolManager>,
outlet: Entity,
timeout: i64,
authorization: Token,
) -> Result<Option<AdapterArgs>, CallError> {
let mut msg = HGTPMessage::default();
msg.pack_command(Command::Adapt);
msg.pack_outlet(outlet);
msg.pack_timeout(timeout);
msg.pack_authorization(authorization);
conn.send(&msg).await?;
conn.recv(&mut msg).await?;
match AdapterArgs::try_from(&msg) {
Ok(args) => Ok(Some(args)),
Err(e) => {
let mut msg = HGTPMessage::default();
msg.pack_error_code(HGTPError::Adapter);
msg.pack_error(&AvesterraError {
errcode: HGTPError::Adapter,
message: String255::from_str_truncate(&e.to_string()),
});
conn.send(&msg).await?;
Ok(None)
}
}
}
async fn call_async_2(
conn: &mut PooledConnection<'_, PoolManager>,
res: Result<Value, AvesterraError>,
) -> Result<(), CallError> {
let mut msg = HGTPMessage::default();
match res {
Ok(value) => {
msg.pack_value(&value);
}
Err(e) => msg.pack_error(&e),
}
conn.send(&msg).await?;
Ok(())
}
#[derive(Debug)]
pub struct AdapterArgs {
pub entity: Entity,
pub auxiliary: Entity,
pub ancillary: Entity,
pub method: Method,
pub outlet: Entity,
pub attribute: Attribute,
pub instance: i32,
pub offset: i32,
pub name: String255,
pub key: String255,
pub value: Value,
pub parameter: i64,
pub resultant: i64,
pub index: i64,
pub count: i64,
pub aspect: Aspect,
pub context: Context,
pub category: Category,
pub class: Class,
pub event: Event,
pub mode: Mode,
pub state: State,
pub condition: Condition,
pub precedence: u16,
pub time: OffsetDateTime,
pub timeout: i64,
pub authority: Token,
pub authorization: Token,
}
impl TryFrom<&HGTPMessage> for AdapterArgs {
type Error = UnpackError;
#[rustfmt::skip]
fn try_from(msg: &HGTPMessage) -> Result<Self, Self::Error> {
Ok(Self {
entity : msg.unpack_entity(),
auxiliary : msg.unpack_auxiliary(),
ancillary : msg.unpack_ancillary(),
method : msg.unpack_method()?,
outlet : msg.unpack_outlet(),
attribute : msg.unpack_attribute()?,
instance : msg.unpack_instance(),
offset : msg.unpack_offset(),
name : msg.unpack_name()?,
key : msg.unpack_key()?,
value : msg.unpack_value()?,
parameter : msg.unpack_parameter(),
resultant : msg.unpack_resultant(),
index : msg.unpack_index(),
count : msg.unpack_count(),
aspect : msg.unpack_aspect()?,
context : msg.unpack_context()?,
category : msg.unpack_category()?,
class : msg.unpack_class()?,
event : msg.unpack_event()?,
mode : msg.unpack_mode()?,
state : msg.unpack_state()?,
condition : msg.unpack_condition()?,
precedence : msg.unpack_precedence(),
time : msg.unpack_time()?,
timeout : msg.unpack_timeout(),
authority : msg.unpack_authority(),
authorization: msg.unpack_authorization(),
})
}
}