1use bytes::Bytes;
2use wasmrs_frames::{Metadata, PayloadError, RawPayload};
3use wasmrs_runtime::RtRc;
4
5use crate::operations::OperationList;
6use crate::{BoxFlux, BoxMono};
7
8pub type GenericError = Box<dyn std::error::Error + Send + Sync + 'static>;
10pub type OperationMap<T> = Vec<(String, String, RtRc<T>)>;
12pub type OperationHandler<I, O> = Box<dyn Fn(I) -> Result<O, GenericError> + Send + Sync>;
14
15pub type IncomingMono = BoxMono<Payload, PayloadError>;
17pub type OutgoingMono = BoxMono<RawPayload, PayloadError>;
19pub type IncomingStream = BoxFlux<Payload, PayloadError>;
21pub type OutgoingStream = BoxFlux<RawPayload, PayloadError>;
23
24#[allow(missing_debug_implementations)]
25#[derive(Debug)]
26pub struct Payload {
28 pub metadata: Metadata,
30 pub data: Bytes,
32}
33
34impl Payload {
35 pub fn new(metadata: Metadata, data: Bytes) -> Self {
37 Self { metadata, data }
38 }
39}
40
41impl TryFrom<RawPayload> for Payload {
42 type Error = crate::Error;
43
44 fn try_from(mut value: RawPayload) -> Result<Self, Self::Error> {
45 Ok(Payload {
46 metadata: value.parse_metadata()?,
47 data: value.data.unwrap_or_default(),
48 })
49 }
50}
51
52#[derive(Default)]
53pub struct Handlers {
55 op_list: OperationList,
56 request_response_handlers: OperationMap<OperationHandler<IncomingMono, OutgoingMono>>,
57 request_stream_handlers: OperationMap<OperationHandler<IncomingMono, OutgoingStream>>,
58 request_channel_handlers: OperationMap<OperationHandler<IncomingStream, OutgoingStream>>,
59 request_fnf_handlers: OperationMap<OperationHandler<IncomingMono, ()>>,
60}
61
62impl std::fmt::Debug for Handlers {
63 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
64 f.debug_struct("Handlers").field("op_list", &self.op_list).finish()
65 }
66}
67
68impl Handlers {
69 pub fn op_list(&self) -> &OperationList {
71 &self.op_list
72 }
73
74 pub fn register_request_response(
76 &mut self,
77 ns: impl AsRef<str>,
78 op: impl AsRef<str>,
79 handler: OperationHandler<IncomingMono, OutgoingMono>,
80 ) -> usize {
81 let list = &mut self.request_response_handlers;
82 list.push((ns.as_ref().to_owned(), op.as_ref().to_owned(), RtRc::new(handler)));
83 let index = list.len() - 1;
84 self
85 .op_list
86 .add_export(index as _, crate::OperationType::RequestResponse, ns, op);
87 index
88 }
89
90 pub fn register_request_stream(
92 &mut self,
93 ns: impl AsRef<str>,
94 op: impl AsRef<str>,
95 handler: OperationHandler<IncomingMono, OutgoingStream>,
96 ) -> usize {
97 let list = &mut self.request_stream_handlers;
98 list.push((ns.as_ref().to_owned(), op.as_ref().to_owned(), RtRc::new(handler)));
99 let index = list.len() - 1;
100 self
101 .op_list
102 .add_export(index as _, crate::OperationType::RequestStream, ns, op);
103 index
104 }
105
106 pub fn register_request_channel(
108 &mut self,
109 ns: impl AsRef<str>,
110 op: impl AsRef<str>,
111 handler: OperationHandler<IncomingStream, OutgoingStream>,
112 ) -> usize {
113 let list = &mut self.request_channel_handlers;
114 list.push((ns.as_ref().to_owned(), op.as_ref().to_owned(), RtRc::new(handler)));
115 let index = list.len() - 1;
116 self
117 .op_list
118 .add_export(index as _, crate::OperationType::RequestChannel, ns, op);
119 index
120 }
121
122 pub fn register_fire_and_forget(
124 &mut self,
125 ns: impl AsRef<str>,
126 op: impl AsRef<str>,
127 handler: OperationHandler<IncomingMono, ()>,
128 ) -> usize {
129 let list = &mut self.request_fnf_handlers;
130 list.push((ns.as_ref().to_owned(), op.as_ref().to_owned(), RtRc::new(handler)));
131 let index = list.len() - 1;
132 self
133 .op_list
134 .add_export(index as _, crate::OperationType::RequestFnF, ns, op);
135 index
136 }
137
138 #[must_use]
139 pub fn get_request_response_handler(&self, index: u32) -> Option<RtRc<OperationHandler<IncomingMono, OutgoingMono>>> {
141 let a = self
142 .request_response_handlers
143 .get(index as usize)
144 .map(|(_, _, h)| h.clone());
145 a
146 }
147 #[must_use]
148 pub fn get_request_stream_handler(&self, index: u32) -> Option<RtRc<OperationHandler<IncomingMono, OutgoingStream>>> {
150 let a = self
151 .request_stream_handlers
152 .get(index as usize)
153 .map(|(_, _, h)| h.clone());
154 a
155 }
156 #[must_use]
157 pub fn get_request_channel_handler(
159 &self,
160 index: u32,
161 ) -> Option<RtRc<OperationHandler<IncomingStream, OutgoingStream>>> {
162 let a = self
163 .request_channel_handlers
164 .get(index as usize)
165 .map(|(_, _, h)| h.clone());
166 a
167 }
168 #[must_use]
169 pub fn get_fnf_handler(&self, index: u32) -> Option<RtRc<OperationHandler<IncomingMono, ()>>> {
171 let a = self.request_fnf_handlers.get(index as usize).map(|(_, _, h)| h.clone());
172 a
173 }
174}