1use std::{future::Future, pin::Pin, sync::Arc};
2
3use crate::{MaybeSend, MaybeSync, Metadata, RequestCall, RequestResponse, RoamError, SelfRef};
4
5pub trait Call<'wire, T, E>: MaybeSend
77where
78 T: facet::Facet<'wire> + MaybeSend,
79 E: facet::Facet<'wire> + MaybeSend,
80{
81 fn reply(self, result: Result<T, E>) -> impl std::future::Future<Output = ()> + MaybeSend;
83
84 fn ok(self, value: T) -> impl std::future::Future<Output = ()> + MaybeSend
88 where
89 Self: Sized,
90 {
91 self.reply(Ok(value))
92 }
93
94 fn err(self, error: E) -> impl std::future::Future<Output = ()> + MaybeSend
98 where
99 Self: Sized,
100 {
101 self.reply(Err(error))
102 }
103}
104
105pub trait ReplySink: MaybeSend + MaybeSync + 'static {
115 fn send_reply(
123 self,
124 response: RequestResponse<'_>,
125 ) -> impl std::future::Future<Output = ()> + MaybeSend;
126
127 fn send_error<E: for<'a> facet::Facet<'a> + MaybeSend>(
132 self,
133 error: RoamError<E>,
134 ) -> impl std::future::Future<Output = ()> + MaybeSend
135 where
136 Self: Sized,
137 {
138 use crate::{Payload, RequestResponse};
139 async move {
143 let wire: Result<(), RoamError<E>> = Err(error);
144 self.send_reply(RequestResponse {
145 ret: Payload::outgoing(&wire),
146 channels: vec![],
147 metadata: Default::default(),
148 })
149 .await;
150 }
151 }
152
153 #[cfg(not(target_arch = "wasm32"))]
158 fn channel_binder(&self) -> Option<&dyn crate::ChannelBinder> {
159 None
160 }
161}
162
163pub trait Caller: Clone + MaybeSend + MaybeSync + 'static {
177 #[allow(async_fn_in_trait)]
179 async fn call<'a>(
180 &self,
181 call: RequestCall<'a>,
182 ) -> Result<SelfRef<RequestResponse<'static>>, RoamError>;
183
184 #[cfg(not(target_arch = "wasm32"))]
189 fn channel_binder(&self) -> Option<&dyn crate::ChannelBinder> {
190 None
191 }
192}
193
194trait ErasedCallerDyn: MaybeSend + MaybeSync + 'static {
195 fn call<'a>(
196 &'a self,
197 call: RequestCall<'a>,
198 ) -> Pin<Box<dyn Future<Output = Result<SelfRef<RequestResponse<'static>>, RoamError>> + 'a>>;
199
200 #[cfg(not(target_arch = "wasm32"))]
201 fn channel_binder(&self) -> Option<&dyn crate::ChannelBinder>;
202}
203
204impl<C: Caller> ErasedCallerDyn for C {
205 fn call<'a>(
206 &'a self,
207 call: RequestCall<'a>,
208 ) -> Pin<Box<dyn Future<Output = Result<SelfRef<RequestResponse<'static>>, RoamError>> + 'a>>
209 {
210 Box::pin(Caller::call(self, call))
211 }
212
213 #[cfg(not(target_arch = "wasm32"))]
214 fn channel_binder(&self) -> Option<&dyn crate::ChannelBinder> {
215 Caller::channel_binder(self)
216 }
217}
218
219#[derive(Clone)]
221pub struct ErasedCaller {
222 inner: Arc<dyn ErasedCallerDyn>,
223}
224
225impl ErasedCaller {
226 pub fn new<C: Caller>(caller: C) -> Self {
227 Self {
228 inner: Arc::new(caller),
229 }
230 }
231}
232
233impl Caller for ErasedCaller {
234 async fn call<'a>(
235 &self,
236 call: RequestCall<'a>,
237 ) -> Result<SelfRef<RequestResponse<'static>>, RoamError> {
238 self.inner.call(call).await
239 }
240
241 #[cfg(not(target_arch = "wasm32"))]
242 fn channel_binder(&self) -> Option<&dyn crate::ChannelBinder> {
243 self.inner.channel_binder()
244 }
245}
246
247pub trait Handler<R: ReplySink>: MaybeSend + MaybeSync + 'static {
248 fn handle(
250 &self,
251 call: SelfRef<crate::RequestCall<'static>>,
252 reply: R,
253 ) -> impl std::future::Future<Output = ()> + MaybeSend + '_;
254}
255
256impl<R: ReplySink> Handler<R> for () {
257 async fn handle(&self, _call: SelfRef<crate::RequestCall<'static>>, _reply: R) {}
258}
259
260pub struct ResponseParts<'a, T> {
266 pub ret: T,
268 pub metadata: Metadata<'a>,
270}
271
272impl<'a, T> std::ops::Deref for ResponseParts<'a, T> {
273 type Target = T;
274 fn deref(&self) -> &T {
275 &self.ret
276 }
277}
278
279pub struct SinkCall<R: ReplySink> {
285 reply: R,
286}
287
288impl<R: ReplySink> SinkCall<R> {
289 pub fn new(reply: R) -> Self {
290 Self { reply }
291 }
292}
293
294impl<'wire, T, E, R> Call<'wire, T, E> for SinkCall<R>
295where
296 T: facet::Facet<'wire> + MaybeSend,
297 E: facet::Facet<'wire> + MaybeSend,
298 R: ReplySink,
299{
300 async fn reply(self, result: Result<T, E>) {
301 use crate::{Payload, RequestResponse};
302 let wire: Result<T, crate::RoamError<E>> = result.map_err(crate::RoamError::User);
303 let ptr =
304 facet::PtrConst::new((&wire as *const Result<T, crate::RoamError<E>>).cast::<u8>());
305 let shape = <Result<T, crate::RoamError<E>> as facet::Facet<'wire>>::SHAPE;
306 let ret = unsafe { Payload::outgoing_unchecked(ptr, shape) };
309 self.reply
310 .send_reply(RequestResponse {
311 ret,
312 channels: vec![],
313 metadata: Default::default(),
314 })
315 .await;
316 }
317}
318
319#[cfg(test)]
320mod tests {
321 use std::sync::{Arc, Mutex};
322
323 use crate::{MaybeSend, Metadata, Payload, RequestCall, RequestResponse};
324
325 use super::{Call, Caller, Handler, ReplySink, ResponseParts};
326
327 struct RecordingCall<T, E> {
328 observed: Arc<Mutex<Option<Result<T, E>>>>,
329 }
330
331 impl<'wire, T, E> Call<'wire, T, E> for RecordingCall<T, E>
332 where
333 T: facet::Facet<'wire> + MaybeSend + Send + 'static,
334 E: facet::Facet<'wire> + MaybeSend + Send + 'static,
335 {
336 async fn reply(self, result: Result<T, E>) {
337 let mut guard = self.observed.lock().expect("recording mutex poisoned");
338 *guard = Some(result);
339 }
340 }
341
342 struct RecordingReplySink {
343 saw_send_reply: Arc<Mutex<bool>>,
344 saw_outgoing_payload: Arc<Mutex<bool>>,
345 }
346
347 impl ReplySink for RecordingReplySink {
348 async fn send_reply(self, response: RequestResponse<'_>) {
349 let mut saw_send_reply = self
350 .saw_send_reply
351 .lock()
352 .expect("send-reply mutex poisoned");
353 *saw_send_reply = true;
354
355 let mut saw_outgoing = self
356 .saw_outgoing_payload
357 .lock()
358 .expect("payload-kind mutex poisoned");
359 *saw_outgoing = matches!(response.ret, Payload::Outgoing { .. });
360 }
361 }
362
363 #[derive(Clone)]
364 struct NoopCaller;
365
366 impl Caller for NoopCaller {
367 async fn call<'a>(
368 &self,
369 _call: RequestCall<'a>,
370 ) -> Result<crate::SelfRef<RequestResponse<'static>>, crate::RoamError> {
371 unreachable!("NoopCaller::call is not used by this test")
372 }
373 }
374
375 #[tokio::test]
376 async fn call_ok_and_err_route_through_reply() {
377 let observed_ok: Arc<Mutex<Option<Result<u32, &'static str>>>> = Arc::new(Mutex::new(None));
378 RecordingCall {
379 observed: Arc::clone(&observed_ok),
380 }
381 .ok(7)
382 .await;
383 assert!(matches!(
384 *observed_ok.lock().expect("ok mutex poisoned"),
385 Some(Ok(7))
386 ));
387
388 let observed_err: Arc<Mutex<Option<Result<u32, &'static str>>>> =
389 Arc::new(Mutex::new(None));
390 RecordingCall {
391 observed: Arc::clone(&observed_err),
392 }
393 .err("boom")
394 .await;
395 assert!(matches!(
396 *observed_err.lock().expect("err mutex poisoned"),
397 Some(Err("boom"))
398 ));
399 }
400
401 #[tokio::test]
402 async fn reply_sink_send_error_uses_outgoing_payload_and_reply_path() {
403 let saw_send_reply = Arc::new(Mutex::new(false));
404 let saw_outgoing_payload = Arc::new(Mutex::new(false));
405 let sink = RecordingReplySink {
406 saw_send_reply: Arc::clone(&saw_send_reply),
407 saw_outgoing_payload: Arc::clone(&saw_outgoing_payload),
408 };
409
410 sink.send_error(crate::RoamError::<String>::Cancelled).await;
411
412 assert!(*saw_send_reply.lock().expect("send-reply mutex poisoned"));
413 assert!(
414 *saw_outgoing_payload
415 .lock()
416 .expect("payload-kind mutex poisoned")
417 );
418 }
419
420 #[tokio::test]
421 async fn unit_handler_is_noop() {
422 let req = crate::SelfRef::owning(
423 crate::Backing::Boxed(Box::<[u8]>::default()),
424 RequestCall {
425 method_id: crate::MethodId(1),
426 channels: vec![],
427 metadata: Metadata::default(),
428 args: Payload::Incoming(&[]),
429 },
430 );
431 ().handle(
432 req,
433 RecordingReplySink {
434 saw_send_reply: Arc::new(Mutex::new(false)),
435 saw_outgoing_payload: Arc::new(Mutex::new(false)),
436 },
437 )
438 .await;
439 }
440
441 #[test]
442 fn response_parts_deref_exposes_ret() {
443 let parts = ResponseParts {
444 ret: 42_u32,
445 metadata: Metadata::default(),
446 };
447 assert_eq!(*parts, 42);
448 }
449
450 #[test]
451 fn default_channel_binder_accessor_for_caller_returns_none() {
452 let caller = NoopCaller;
453 assert!(caller.channel_binder().is_none());
454 }
455
456 #[test]
457 fn default_channel_binder_accessor_for_reply_sink_returns_none() {
458 let sink = RecordingReplySink {
459 saw_send_reply: Arc::new(Mutex::new(false)),
460 saw_outgoing_payload: Arc::new(Mutex::new(false)),
461 };
462 assert!(sink.channel_binder().is_none());
463 }
464}