1use std::{future::Future, pin::Pin, sync::Arc};
2
3use crate::{MaybeSend, MaybeSync, Metadata, RequestCall, RequestResponse, RoamError, SelfRef};
4
5pub trait Call<'wire, T, E>: MaybeSend
77where
78 T: facet::Facet<'wire> + MaybeSend,
79 E: facet::Facet<'wire> + MaybeSend,
80{
81 fn reply(self, result: Result<T, E>) -> impl std::future::Future<Output = ()> + MaybeSend;
83
84 fn ok(self, value: T) -> impl std::future::Future<Output = ()> + MaybeSend
88 where
89 Self: Sized,
90 {
91 self.reply(Ok(value))
92 }
93
94 fn err(self, error: E) -> impl std::future::Future<Output = ()> + MaybeSend
98 where
99 Self: Sized,
100 {
101 self.reply(Err(error))
102 }
103}
104
105pub trait ReplySink: MaybeSend + MaybeSync + 'static {
115 fn send_reply(
123 self,
124 response: RequestResponse<'_>,
125 ) -> impl std::future::Future<Output = ()> + MaybeSend;
126
127 fn send_error<E: for<'a> facet::Facet<'a> + MaybeSend>(
132 self,
133 error: RoamError<E>,
134 ) -> impl std::future::Future<Output = ()> + MaybeSend
135 where
136 Self: Sized,
137 {
138 use crate::{Payload, RequestResponse};
139 async move {
143 let wire: Result<(), RoamError<E>> = Err(error);
144 self.send_reply(RequestResponse {
145 ret: Payload::outgoing(&wire),
146 channels: vec![],
147 metadata: Default::default(),
148 })
149 .await;
150 }
151 }
152
153 #[cfg(not(target_arch = "wasm32"))]
158 fn channel_binder(&self) -> Option<&dyn crate::ChannelBinder> {
159 None
160 }
161}
162
163pub trait Caller: Clone + MaybeSend + MaybeSync + 'static {
177 fn call<'a>(
179 &'a self,
180 call: RequestCall<'a>,
181 ) -> impl Future<Output = Result<SelfRef<RequestResponse<'static>>, RoamError>> + MaybeSend + 'a;
182
183 #[cfg(not(target_arch = "wasm32"))]
188 fn channel_binder(&self) -> Option<&dyn crate::ChannelBinder> {
189 None
190 }
191}
192
193trait ErasedCallerDyn: MaybeSend + MaybeSync + 'static {
194 #[cfg(not(target_arch = "wasm32"))]
195 fn call<'a>(
196 &'a self,
197 call: RequestCall<'a>,
198 ) -> Pin<
199 Box<dyn Future<Output = Result<SelfRef<RequestResponse<'static>>, RoamError>> + Send + 'a>,
200 >;
201 #[cfg(target_arch = "wasm32")]
202 fn call<'a>(
203 &'a self,
204 call: RequestCall<'a>,
205 ) -> Pin<Box<dyn Future<Output = Result<SelfRef<RequestResponse<'static>>, RoamError>> + 'a>>;
206
207 #[cfg(not(target_arch = "wasm32"))]
208 fn channel_binder(&self) -> Option<&dyn crate::ChannelBinder>;
209}
210
211impl<C: Caller> ErasedCallerDyn for C {
212 #[cfg(not(target_arch = "wasm32"))]
213 fn call<'a>(
214 &'a self,
215 call: RequestCall<'a>,
216 ) -> Pin<
217 Box<dyn Future<Output = Result<SelfRef<RequestResponse<'static>>, RoamError>> + Send + 'a>,
218 > {
219 Box::pin(Caller::call(self, call))
220 }
221 #[cfg(target_arch = "wasm32")]
222 fn call<'a>(
223 &'a self,
224 call: RequestCall<'a>,
225 ) -> Pin<Box<dyn Future<Output = Result<SelfRef<RequestResponse<'static>>, RoamError>> + 'a>>
226 {
227 Box::pin(Caller::call(self, call))
228 }
229
230 #[cfg(not(target_arch = "wasm32"))]
231 fn channel_binder(&self) -> Option<&dyn crate::ChannelBinder> {
232 Caller::channel_binder(self)
233 }
234}
235
236#[derive(Clone)]
238pub struct ErasedCaller {
239 inner: Arc<dyn ErasedCallerDyn>,
240}
241
242impl ErasedCaller {
243 pub fn new<C: Caller>(caller: C) -> Self {
244 Self {
245 inner: Arc::new(caller),
246 }
247 }
248}
249
250impl Caller for ErasedCaller {
251 fn call<'a>(
252 &'a self,
253 call: RequestCall<'a>,
254 ) -> impl Future<Output = Result<SelfRef<RequestResponse<'static>>, RoamError>> + MaybeSend + 'a
255 {
256 self.inner.call(call)
257 }
258
259 #[cfg(not(target_arch = "wasm32"))]
260 fn channel_binder(&self) -> Option<&dyn crate::ChannelBinder> {
261 self.inner.channel_binder()
262 }
263}
264
265pub trait Handler<R: ReplySink>: MaybeSend + MaybeSync + 'static {
266 fn handle(
268 &self,
269 call: SelfRef<crate::RequestCall<'static>>,
270 reply: R,
271 ) -> impl std::future::Future<Output = ()> + MaybeSend + '_;
272}
273
274impl<R: ReplySink> Handler<R> for () {
275 async fn handle(&self, _call: SelfRef<crate::RequestCall<'static>>, _reply: R) {}
276}
277
278pub struct ResponseParts<'a, T> {
284 pub ret: T,
286 pub metadata: Metadata<'a>,
288}
289
290impl<'a, T> std::ops::Deref for ResponseParts<'a, T> {
291 type Target = T;
292 fn deref(&self) -> &T {
293 &self.ret
294 }
295}
296
297pub struct SinkCall<R: ReplySink> {
303 reply: R,
304}
305
306impl<R: ReplySink> SinkCall<R> {
307 pub fn new(reply: R) -> Self {
308 Self { reply }
309 }
310}
311
312impl<'wire, T, E, R> Call<'wire, T, E> for SinkCall<R>
313where
314 T: facet::Facet<'wire> + MaybeSend,
315 E: facet::Facet<'wire> + MaybeSend,
316 R: ReplySink,
317{
318 async fn reply(self, result: Result<T, E>) {
319 use crate::{Payload, RequestResponse};
320 let wire: Result<T, crate::RoamError<E>> = result.map_err(crate::RoamError::User);
321 let ptr =
322 facet::PtrConst::new((&wire as *const Result<T, crate::RoamError<E>>).cast::<u8>());
323 let shape = <Result<T, crate::RoamError<E>> as facet::Facet<'wire>>::SHAPE;
324 let ret = unsafe { Payload::outgoing_unchecked(ptr, shape) };
327 self.reply
328 .send_reply(RequestResponse {
329 ret,
330 channels: vec![],
331 metadata: Default::default(),
332 })
333 .await;
334 }
335}
336
337#[cfg(test)]
338mod tests {
339 use std::sync::{Arc, Mutex};
340
341 use crate::{MaybeSend, Metadata, Payload, RequestCall, RequestResponse};
342
343 use super::{Call, Caller, Handler, ReplySink, ResponseParts};
344
345 struct RecordingCall<T, E> {
346 observed: Arc<Mutex<Option<Result<T, E>>>>,
347 }
348
349 impl<'wire, T, E> Call<'wire, T, E> for RecordingCall<T, E>
350 where
351 T: facet::Facet<'wire> + MaybeSend + Send + 'static,
352 E: facet::Facet<'wire> + MaybeSend + Send + 'static,
353 {
354 async fn reply(self, result: Result<T, E>) {
355 let mut guard = self.observed.lock().expect("recording mutex poisoned");
356 *guard = Some(result);
357 }
358 }
359
360 struct RecordingReplySink {
361 saw_send_reply: Arc<Mutex<bool>>,
362 saw_outgoing_payload: Arc<Mutex<bool>>,
363 }
364
365 impl ReplySink for RecordingReplySink {
366 async fn send_reply(self, response: RequestResponse<'_>) {
367 let mut saw_send_reply = self
368 .saw_send_reply
369 .lock()
370 .expect("send-reply mutex poisoned");
371 *saw_send_reply = true;
372
373 let mut saw_outgoing = self
374 .saw_outgoing_payload
375 .lock()
376 .expect("payload-kind mutex poisoned");
377 *saw_outgoing = matches!(response.ret, Payload::Outgoing { .. });
378 }
379 }
380
381 #[derive(Clone)]
382 struct NoopCaller;
383
384 impl Caller for NoopCaller {
385 fn call<'a>(
386 &'a self,
387 _call: RequestCall<'a>,
388 ) -> impl Future<
389 Output = Result<crate::SelfRef<RequestResponse<'static>>, crate::RoamError>,
390 > + MaybeSend
391 + 'a {
392 async move { unreachable!("NoopCaller::call is not used by this test") }
393 }
394 }
395
396 #[tokio::test]
397 async fn call_ok_and_err_route_through_reply() {
398 let observed_ok: Arc<Mutex<Option<Result<u32, &'static str>>>> = Arc::new(Mutex::new(None));
399 RecordingCall {
400 observed: Arc::clone(&observed_ok),
401 }
402 .ok(7)
403 .await;
404 assert!(matches!(
405 *observed_ok.lock().expect("ok mutex poisoned"),
406 Some(Ok(7))
407 ));
408
409 let observed_err: Arc<Mutex<Option<Result<u32, &'static str>>>> =
410 Arc::new(Mutex::new(None));
411 RecordingCall {
412 observed: Arc::clone(&observed_err),
413 }
414 .err("boom")
415 .await;
416 assert!(matches!(
417 *observed_err.lock().expect("err mutex poisoned"),
418 Some(Err("boom"))
419 ));
420 }
421
422 #[tokio::test]
423 async fn reply_sink_send_error_uses_outgoing_payload_and_reply_path() {
424 let saw_send_reply = Arc::new(Mutex::new(false));
425 let saw_outgoing_payload = Arc::new(Mutex::new(false));
426 let sink = RecordingReplySink {
427 saw_send_reply: Arc::clone(&saw_send_reply),
428 saw_outgoing_payload: Arc::clone(&saw_outgoing_payload),
429 };
430
431 sink.send_error(crate::RoamError::<String>::Cancelled).await;
432
433 assert!(*saw_send_reply.lock().expect("send-reply mutex poisoned"));
434 assert!(
435 *saw_outgoing_payload
436 .lock()
437 .expect("payload-kind mutex poisoned")
438 );
439 }
440
441 #[tokio::test]
442 async fn unit_handler_is_noop() {
443 let req = crate::SelfRef::owning(
444 crate::Backing::Boxed(Box::<[u8]>::default()),
445 RequestCall {
446 method_id: crate::MethodId(1),
447 channels: vec![],
448 metadata: Metadata::default(),
449 args: Payload::Incoming(&[]),
450 },
451 );
452 ().handle(
453 req,
454 RecordingReplySink {
455 saw_send_reply: Arc::new(Mutex::new(false)),
456 saw_outgoing_payload: Arc::new(Mutex::new(false)),
457 },
458 )
459 .await;
460 }
461
462 #[test]
463 fn response_parts_deref_exposes_ret() {
464 let parts = ResponseParts {
465 ret: 42_u32,
466 metadata: Metadata::default(),
467 };
468 assert_eq!(*parts, 42);
469 }
470
471 #[test]
472 fn default_channel_binder_accessor_for_caller_returns_none() {
473 let caller = NoopCaller;
474 assert!(caller.channel_binder().is_none());
475 }
476
477 #[test]
478 fn default_channel_binder_accessor_for_reply_sink_returns_none() {
479 let sink = RecordingReplySink {
480 saw_send_reply: Arc::new(Mutex::new(false)),
481 saw_outgoing_payload: Arc::new(Mutex::new(false)),
482 };
483 assert!(sink.channel_binder().is_none());
484 }
485}