use bytes::Bytes;
use wasmrs_frames::{Metadata, PayloadError, RawPayload};
use wasmrs_runtime::RtRc;
use crate::operations::OperationList;
use crate::{BoxFlux, BoxMono};
pub type GenericError = Box<dyn std::error::Error + Send + Sync + 'static>;
pub type OperationMap<T> = Vec<(String, String, RtRc<T>)>;
pub type OperationHandler<I, O> = Box<dyn Fn(I) -> Result<O, GenericError> + Send + Sync>;
pub type IncomingMono = BoxMono<Payload, PayloadError>;
pub type OutgoingMono = BoxMono<RawPayload, PayloadError>;
pub type IncomingStream = BoxFlux<Payload, PayloadError>;
pub type OutgoingStream = BoxFlux<RawPayload, PayloadError>;
#[allow(missing_debug_implementations)]
#[derive(Debug)]
pub struct Payload {
pub metadata: Metadata,
pub data: Bytes,
}
impl Payload {
pub fn new(metadata: Metadata, data: Bytes) -> Self {
Self { metadata, data }
}
}
impl TryFrom<RawPayload> for Payload {
type Error = crate::Error;
fn try_from(mut value: RawPayload) -> Result<Self, Self::Error> {
Ok(Payload {
metadata: value.parse_metadata()?,
data: value.data.unwrap_or_default(),
})
}
}
#[derive(Default)]
pub struct Handlers {
op_list: OperationList,
request_response_handlers: OperationMap<OperationHandler<IncomingMono, OutgoingMono>>,
request_stream_handlers: OperationMap<OperationHandler<IncomingMono, OutgoingStream>>,
request_channel_handlers: OperationMap<OperationHandler<IncomingStream, OutgoingStream>>,
request_fnf_handlers: OperationMap<OperationHandler<IncomingMono, ()>>,
}
impl std::fmt::Debug for Handlers {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Handlers").field("op_list", &self.op_list).finish()
}
}
impl Handlers {
pub fn op_list(&self) -> &OperationList {
&self.op_list
}
pub fn register_request_response(
&mut self,
ns: impl AsRef<str>,
op: impl AsRef<str>,
handler: OperationHandler<IncomingMono, OutgoingMono>,
) -> usize {
let list = &mut self.request_response_handlers;
list.push((ns.as_ref().to_owned(), op.as_ref().to_owned(), RtRc::new(handler)));
let index = list.len() - 1;
self
.op_list
.add_export(index as _, crate::OperationType::RequestResponse, ns, op);
index
}
pub fn register_request_stream(
&mut self,
ns: impl AsRef<str>,
op: impl AsRef<str>,
handler: OperationHandler<IncomingMono, OutgoingStream>,
) -> usize {
let list = &mut self.request_stream_handlers;
list.push((ns.as_ref().to_owned(), op.as_ref().to_owned(), RtRc::new(handler)));
let index = list.len() - 1;
self
.op_list
.add_export(index as _, crate::OperationType::RequestStream, ns, op);
index
}
pub fn register_request_channel(
&mut self,
ns: impl AsRef<str>,
op: impl AsRef<str>,
handler: OperationHandler<IncomingStream, OutgoingStream>,
) -> usize {
let list = &mut self.request_channel_handlers;
list.push((ns.as_ref().to_owned(), op.as_ref().to_owned(), RtRc::new(handler)));
let index = list.len() - 1;
self
.op_list
.add_export(index as _, crate::OperationType::RequestChannel, ns, op);
index
}
pub fn register_fire_and_forget(
&mut self,
ns: impl AsRef<str>,
op: impl AsRef<str>,
handler: OperationHandler<IncomingMono, ()>,
) -> usize {
let list = &mut self.request_fnf_handlers;
list.push((ns.as_ref().to_owned(), op.as_ref().to_owned(), RtRc::new(handler)));
let index = list.len() - 1;
self
.op_list
.add_export(index as _, crate::OperationType::RequestFnF, ns, op);
index
}
#[must_use]
pub fn get_request_response_handler(&self, index: u32) -> Option<RtRc<OperationHandler<IncomingMono, OutgoingMono>>> {
let a = self
.request_response_handlers
.get(index as usize)
.map(|(_, _, h)| h.clone());
a
}
#[must_use]
pub fn get_request_stream_handler(&self, index: u32) -> Option<RtRc<OperationHandler<IncomingMono, OutgoingStream>>> {
let a = self
.request_stream_handlers
.get(index as usize)
.map(|(_, _, h)| h.clone());
a
}
#[must_use]
pub fn get_request_channel_handler(
&self,
index: u32,
) -> Option<RtRc<OperationHandler<IncomingStream, OutgoingStream>>> {
let a = self
.request_channel_handlers
.get(index as usize)
.map(|(_, _, h)| h.clone());
a
}
#[must_use]
pub fn get_fnf_handler(&self, index: u32) -> Option<RtRc<OperationHandler<IncomingMono, ()>>> {
let a = self.request_fnf_handlers.get(index as usize).map(|(_, _, h)| h.clone());
a
}
}