wasmrs/
handlers.rs

1use bytes::Bytes;
2use wasmrs_frames::{Metadata, PayloadError, RawPayload};
3use wasmrs_runtime::RtRc;
4
5use crate::operations::OperationList;
6use crate::{BoxFlux, BoxMono};
7
8/// An alias to [Box<dyn std::error::Error + Send + Sync + 'static>]
9pub type GenericError = Box<dyn std::error::Error + Send + Sync + 'static>;
10/// An alias for a [Vec<(String, String, RtRc<T>)>]
11pub type OperationMap<T> = Vec<(String, String, RtRc<T>)>;
12/// An alias for the function that creates the output for a task.
13pub type OperationHandler<I, O> = Box<dyn Fn(I) -> Result<O, GenericError> + Send + Sync>;
14
15/// An alias for [Mono<ParsedPayload, PayloadError>]
16pub type IncomingMono = BoxMono<Payload, PayloadError>;
17/// An alias for [Mono<Payload, PayloadError>]
18pub type OutgoingMono = BoxMono<RawPayload, PayloadError>;
19/// An alias for [FluxReceiver<ParsedPayload, PayloadError>]
20pub type IncomingStream = BoxFlux<Payload, PayloadError>;
21/// An alias for [FluxReceiver<Payload, PayloadError>]
22pub type OutgoingStream = BoxFlux<RawPayload, PayloadError>;
23
24#[allow(missing_debug_implementations)]
25#[derive(Debug)]
26/// A [Payload] with pre-parsed [Metadata].
27pub struct Payload {
28  /// The parsed [Metadata].
29  pub metadata: Metadata,
30  /// The raw data bytes.
31  pub data: Bytes,
32}
33
34impl Payload {
35  /// Create a new [ParsedPayload] from the given [Metadata] and [Bytes].
36  pub fn new(metadata: Metadata, data: Bytes) -> Self {
37    Self { metadata, data }
38  }
39}
40
41impl TryFrom<RawPayload> for Payload {
42  type Error = crate::Error;
43
44  fn try_from(mut value: RawPayload) -> Result<Self, Self::Error> {
45    Ok(Payload {
46      metadata: value.parse_metadata()?,
47      data: value.data.unwrap_or_default(),
48    })
49  }
50}
51
52#[derive(Default)]
53/// A list of all the operations exported by a wasmrs implementer.
54pub struct Handlers {
55  op_list: OperationList,
56  request_response_handlers: OperationMap<OperationHandler<IncomingMono, OutgoingMono>>,
57  request_stream_handlers: OperationMap<OperationHandler<IncomingMono, OutgoingStream>>,
58  request_channel_handlers: OperationMap<OperationHandler<IncomingStream, OutgoingStream>>,
59  request_fnf_handlers: OperationMap<OperationHandler<IncomingMono, ()>>,
60}
61
62impl std::fmt::Debug for Handlers {
63  fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
64    f.debug_struct("Handlers").field("op_list", &self.op_list).finish()
65  }
66}
67
68impl Handlers {
69  /// Get the operation list.
70  pub fn op_list(&self) -> &OperationList {
71    &self.op_list
72  }
73
74  /// Register a Request/Response style handler on the host.
75  pub fn register_request_response(
76    &mut self,
77    ns: impl AsRef<str>,
78    op: impl AsRef<str>,
79    handler: OperationHandler<IncomingMono, OutgoingMono>,
80  ) -> usize {
81    let list = &mut self.request_response_handlers;
82    list.push((ns.as_ref().to_owned(), op.as_ref().to_owned(), RtRc::new(handler)));
83    let index = list.len() - 1;
84    self
85      .op_list
86      .add_export(index as _, crate::OperationType::RequestResponse, ns, op);
87    index
88  }
89
90  /// Register a Request/Response style handler on the host.
91  pub fn register_request_stream(
92    &mut self,
93    ns: impl AsRef<str>,
94    op: impl AsRef<str>,
95    handler: OperationHandler<IncomingMono, OutgoingStream>,
96  ) -> usize {
97    let list = &mut self.request_stream_handlers;
98    list.push((ns.as_ref().to_owned(), op.as_ref().to_owned(), RtRc::new(handler)));
99    let index = list.len() - 1;
100    self
101      .op_list
102      .add_export(index as _, crate::OperationType::RequestStream, ns, op);
103    index
104  }
105
106  /// Register a Request/Response style handler on the host.
107  pub fn register_request_channel(
108    &mut self,
109    ns: impl AsRef<str>,
110    op: impl AsRef<str>,
111    handler: OperationHandler<IncomingStream, OutgoingStream>,
112  ) -> usize {
113    let list = &mut self.request_channel_handlers;
114    list.push((ns.as_ref().to_owned(), op.as_ref().to_owned(), RtRc::new(handler)));
115    let index = list.len() - 1;
116    self
117      .op_list
118      .add_export(index as _, crate::OperationType::RequestChannel, ns, op);
119    index
120  }
121
122  /// Register a Request/Response style handler on the host.
123  pub fn register_fire_and_forget(
124    &mut self,
125    ns: impl AsRef<str>,
126    op: impl AsRef<str>,
127    handler: OperationHandler<IncomingMono, ()>,
128  ) -> usize {
129    let list = &mut self.request_fnf_handlers;
130    list.push((ns.as_ref().to_owned(), op.as_ref().to_owned(), RtRc::new(handler)));
131    let index = list.len() - 1;
132    self
133      .op_list
134      .add_export(index as _, crate::OperationType::RequestFnF, ns, op);
135    index
136  }
137
138  #[must_use]
139  /// Get a Request/Response handler by id.
140  pub fn get_request_response_handler(&self, index: u32) -> Option<RtRc<OperationHandler<IncomingMono, OutgoingMono>>> {
141    let a = self
142      .request_response_handlers
143      .get(index as usize)
144      .map(|(_, _, h)| h.clone());
145    a
146  }
147  #[must_use]
148  /// Get a Request/Response handler by id.
149  pub fn get_request_stream_handler(&self, index: u32) -> Option<RtRc<OperationHandler<IncomingMono, OutgoingStream>>> {
150    let a = self
151      .request_stream_handlers
152      .get(index as usize)
153      .map(|(_, _, h)| h.clone());
154    a
155  }
156  #[must_use]
157  /// Get a Request/Response handler by id.
158  pub fn get_request_channel_handler(
159    &self,
160    index: u32,
161  ) -> Option<RtRc<OperationHandler<IncomingStream, OutgoingStream>>> {
162    let a = self
163      .request_channel_handlers
164      .get(index as usize)
165      .map(|(_, _, h)| h.clone());
166    a
167  }
168  #[must_use]
169  /// Get a Request/Response handler by id.
170  pub fn get_fnf_handler(&self, index: u32) -> Option<RtRc<OperationHandler<IncomingMono, ()>>> {
171    let a = self.request_fnf_handlers.get(index as usize).map(|(_, _, h)| h.clone());
172    a
173  }
174}