Skip to main content

vox_types/
calls.rs

1use std::{future::Future, pin::Pin, sync::Arc};
2
3use crate::{
4    ConnectionId, MaybeSend, MaybeSendFuture, MaybeSync, Metadata, MethodId, RequestCall,
5    RequestId, RequestResponse, SchemaRecvTracker, SelfRef, VoxError,
6};
7
8/// A boxed future that is `Send` on native targets and `!Send` on wasm32.
9pub type BoxFut<'a, T> = Pin<Box<dyn MaybeSendFuture<Output = T> + 'a>>;
10
11/// Result type for one caller-visible RPC call: either a tracked response or an error.
12///
13/// The tracked value is the wire-level [`RequestResponse`] that resolved the
14/// current request attempt for that call.
15pub type CallResult = Result<crate::WithTracker<SelfRef<RequestResponse<'static>>>, VoxError>;
16
17// As a recap, a service defined like so:
18//
19// #[vox::service]
20// trait Hash {
21//   async fn hash(&self, payload: &[u8]) -> Result<&[u8], E>;
22// }
23//
24// Would expand to the following caller:
25//
26// impl HashClient {
27//   async fn hash(&self, payload: &[u8]) -> Result<SelfRef<&[u8]>, VoxError<E>>;
28// }
29//
30// Would expand to a service trait (what users implement):
31//
32// trait Hash {
33//   async fn hash(&self, call: impl Call<&[u8], E>, payload: &[u8]);
34// }
35//
36// And a HashDispatcher<S: Hash> that implements Handler<R: ReplySink>:
37// it deserializes args, constructs an ErasedCall<T, E> from the ReplySink,
38// and routes to the appropriate method by method ID.
39//
40// For owned success returns, generated methods return values directly and
41// the dispatcher sends replies on their behalf.
42//
43// HashDispatcher<S> implements Handler<R>, and can be stored as
44// Box<dyn Handler<R>> to erase both S and the service type.
45//
46// Why impl Call in HashServer? So that the server can reply with something
47// _borrowed_ from its own stack frame.
48//
49// For example:
50//
51// impl Hash for MyHasher {
52//   async fn hash(&self, call: impl Call<&[u8], E>, payload: &[u8]) {
53//     let result: [u8; 16] = compute_hash(payload);
54//     call.ok(&result).await;
55//   }
56// }
57//
58// Call's public API is:
59//
60// trait Call<T, E> {
61//   async fn reply(self, result: Result<T, E>);
62//   async fn ok(self, value: T) { self.reply(Ok(value)).await }
63//   async fn err(self, error: E) { self.reply(Err(error)).await }
64// }
65//
66// If a Call is dropped before reply/ok/err is called, the caller will
67// receive a VoxError::Cancelled error. This is to ensure that the caller
68// is always notified, even if the handler panics or otherwise fails to
69// reply.
70
71/// Represents an in-progress API-level call as seen by a server handler.
72///
73/// A `Call` is handed to a [`Handler`] implementation for one incoming
74/// request attempt. It provides the mechanism for sending the terminal
75/// response for that attempt back to the caller. The response can be sent
76/// via [`Call::reply`], [`Call::ok`], or [`Call::err`].
77///
78/// In the retry model, one logical operation may span multiple request
79/// attempts over time, but each `Call` value corresponds to exactly one
80/// request attempt currently being handled.
81///
82/// # Cancellation
83///
84/// If a `Call` is dropped without a reply being sent, the caller will
85/// automatically receive a [`VoxError::Cancelled`] error. This guarantees
86/// that the caller is always notified, even if the handler panics or
87/// otherwise fails to produce a reply.
88///
89/// # Type Parameters
90///
91/// - `T`: The success value type of the response.
92/// - `E`: The error value type of the response.
93pub trait Call<'wire, T, E>: MaybeSend
94where
95    T: facet::Facet<'wire> + MaybeSend,
96    E: facet::Facet<'wire> + MaybeSend,
97{
98    /// Send the terminal response for this request attempt, consuming this `Call`.
99    fn reply(self, result: Result<T, E>) -> impl Future<Output = ()> + MaybeSend;
100
101    /// Send a successful response for this request attempt, consuming this `Call`.
102    ///
103    /// Equivalent to `self.reply(Ok(value)).await`.
104    fn ok(self, value: T) -> impl Future<Output = ()> + MaybeSend
105    where
106        Self: Sized,
107    {
108        self.reply(Ok(value))
109    }
110
111    /// Send an error response for this request attempt, consuming this `Call`.
112    ///
113    /// Equivalent to `self.reply(Err(error)).await`.
114    fn err(self, error: E) -> impl Future<Output = ()> + MaybeSend
115    where
116        Self: Sized,
117    {
118        self.reply(Err(error))
119    }
120}
121
122/// Sink for sending the terminal response for one request attempt.
123///
124/// Implemented by the session driver. Provides backpressure: `send_reply`
125/// awaits until the transport can accept the response before serializing it.
126///
127/// # Cancellation
128///
129/// If the `ReplySink` is dropped without `send_reply` being called, the caller
130/// will automatically receive a [`crate::VoxError::Cancelled`] error.
131pub trait ReplySink: MaybeSend + MaybeSync + 'static {
132    /// Send the terminal response for this request attempt, consuming the sink.
133    /// Any error that happens during `send_reply` must set a flag in the driver
134    /// for it to resolve the attempt as failed.
135    ///
136    /// This cannot return a `Result` because we cannot trust callers to deal
137    /// with it, and they cannot try sending a second response anyway.
138    ///
139    /// Do not spawn a task to send the error because it too, might fail.
140    fn send_reply(self, response: RequestResponse<'_>) -> impl Future<Output = ()> + MaybeSend;
141
142    /// Send an error response for this request attempt, consuming the sink.
143    ///
144    /// This is a convenience method used by generated dispatchers when
145    /// deserialization fails or the method ID is unknown.
146    fn send_error<E: for<'a> facet::Facet<'a> + MaybeSend>(
147        self,
148        error: VoxError<E>,
149    ) -> impl Future<Output = ()> + MaybeSend
150    where
151        Self: Sized,
152    {
153        use crate::{Payload, RequestResponse};
154        // Wire format is always Result<T, VoxError<E>>. We don't know T here,
155        // but postcard encodes () as zero bytes, so Result<(), VoxError<E>>
156        // produces the same Err variant encoding as any Result<T, VoxError<E>>.
157        async move {
158            let wire: Result<(), VoxError<E>> = Err(error);
159            self.send_reply(RequestResponse {
160                ret: Payload::outgoing(&wire),
161                metadata: Default::default(),
162                schemas: Default::default(),
163            })
164            .await;
165        }
166    }
167
168    /// Send an error response using the full wire shape `Result<T, VoxError<E>>`.
169    ///
170    /// This preserves the method's real `Ok` type for schema extraction.
171    fn send_typed_error<'wire, T, E>(
172        self,
173        error: VoxError<E>,
174    ) -> impl Future<Output = ()> + MaybeSend
175    where
176        Self: Sized,
177        T: facet::Facet<'wire> + MaybeSend,
178        E: facet::Facet<'wire> + MaybeSend,
179    {
180        use crate::{Payload, RequestResponse};
181        async move {
182            let wire: Result<T, VoxError<E>> = Err(error);
183            let ptr = facet::PtrConst::new((&wire as *const Result<T, VoxError<E>>).cast::<u8>());
184            let shape = <Result<T, VoxError<E>> as facet::Facet<'wire>>::SHAPE;
185            let ret = unsafe { Payload::outgoing_unchecked(ptr, shape) };
186            self.send_reply(RequestResponse {
187                ret,
188                metadata: Default::default(),
189                schemas: Default::default(),
190            })
191            .await;
192        }
193    }
194
195    /// Return a channel binder for binding Tx/Rx handles in deserialized args.
196    ///
197    /// Returns `None` by default. The driver's `ReplySink` implementation
198    /// overrides this to provide actual channel binding.
199    fn channel_binder(&self) -> Option<&dyn crate::ChannelBinder> {
200        None
201    }
202
203    /// Return the wire-level request identifier for this reply sink when available.
204    fn request_id(&self) -> Option<RequestId> {
205        None
206    }
207
208    /// Return the virtual connection identifier for this reply sink when available.
209    fn connection_id(&self) -> Option<ConnectionId> {
210        None
211    }
212}
213
214/// Type-erased handler for incoming service calls.
215///
216/// Implemented (by the macro-generated dispatch code) for server-side types.
217/// Takes a fully decoded [`RequestCall`](crate::RequestCall) — one wire-level
218/// request attempt already parsed from the connection — and a [`ReplySink`]
219/// through which the terminal response for that attempt is sent.
220///
221/// The dispatch impl decodes the args, routes by [`crate::MethodId`], and
222/// invokes the appropriate typed [`Call`]-based method on the concrete server type.
223///
224/// Generated clients hold a `Caller` (from `vox-core`) as a public field
225/// and use it to start API-level calls.
226pub trait Handler<R: ReplySink>: MaybeSend + MaybeSync + 'static {
227    /// Return the static retry policy for a method ID served by this handler.
228    fn retry_policy(&self, _method_id: MethodId) -> crate::RetryPolicy {
229        crate::RetryPolicy::VOLATILE
230    }
231
232    /// Return whether the method's argument shape contains any channels.
233    fn args_have_channels(&self, _method_id: MethodId) -> bool {
234        false
235    }
236
237    /// Return the canonical wire response shape for a method, if known.
238    ///
239    /// This is the full wire type `Result<T, VoxError<E>>`, not the
240    /// user-facing return type `T` or `Result<T, E>`.
241    fn response_wire_shape(&self, _method_id: MethodId) -> Option<&'static facet::Shape> {
242        None
243    }
244
245    /// Dispatch an incoming call to the appropriate method implementation.
246    fn handle(
247        &self,
248        call: SelfRef<RequestCall<'static>>,
249        reply: R,
250        schemas: Arc<SchemaRecvTracker>,
251    ) -> impl Future<Output = ()> + MaybeSend + '_;
252}
253
254impl<R: ReplySink> Handler<R> for () {
255    async fn handle(
256        &self,
257        _call: SelfRef<RequestCall<'static>>,
258        _reply: R,
259        _schemas: Arc<SchemaRecvTracker>,
260    ) {
261    }
262}
263
264/// A decoded response value paired with response metadata.
265///
266/// This helper is available for lower-level callers that need both the
267/// decoded value and metadata together. Generated Rust client methods do
268/// not expose response metadata in their return types.
269pub struct ResponseParts<'a, T> {
270    /// The decoded return value.
271    pub ret: T,
272    /// Metadata attached to the response by the server.
273    pub metadata: Metadata<'a>,
274}
275
276impl<'a, T> std::ops::Deref for ResponseParts<'a, T> {
277    type Target = T;
278    fn deref(&self) -> &T {
279        &self.ret
280    }
281}
282
283/// Concrete [`Call`] implementation backed by a [`ReplySink`].
284///
285/// Constructed by the dispatcher and handed to the server method.
286/// When the server calls [`Call::reply`], the result is serialized and
287/// sent through the sink.
288pub struct SinkCall<R: ReplySink> {
289    reply: R,
290}
291
292impl<R: ReplySink> SinkCall<R> {
293    pub fn new(reply: R) -> Self {
294        Self { reply }
295    }
296}
297
298impl<'wire, T, E, R> Call<'wire, T, E> for SinkCall<R>
299where
300    T: facet::Facet<'wire> + MaybeSend,
301    E: facet::Facet<'wire> + MaybeSend,
302    R: ReplySink,
303{
304    async fn reply(self, result: Result<T, E>) {
305        use crate::{Payload, RequestResponse};
306        let wire: Result<T, VoxError<E>> = result.map_err(VoxError::User);
307        let ptr = facet::PtrConst::new((&wire as *const Result<T, VoxError<E>>).cast::<u8>());
308        let shape = <Result<T, VoxError<E>> as facet::Facet<'wire>>::SHAPE;
309        // SAFETY: `wire` lives until `send_reply(...).await` completes in this function,
310        // and `shape` matches the pointed value exactly.
311        let ret = unsafe { Payload::outgoing_unchecked(ptr, shape) };
312
313        self.reply
314            .send_reply(RequestResponse {
315                ret,
316                metadata: Default::default(),
317                schemas: Default::default(),
318            })
319            .await;
320    }
321}
322
323#[cfg(test)]
324mod tests {
325    use std::sync::{Arc, Mutex};
326
327    use crate::{MaybeSend, Metadata, Payload, RequestCall, RequestResponse};
328
329    use super::{Call, Handler, ReplySink, ResponseParts};
330
331    struct RecordingCall<T, E> {
332        observed: Arc<Mutex<Option<Result<T, E>>>>,
333    }
334
335    impl<'wire, T, E> Call<'wire, T, E> for RecordingCall<T, E>
336    where
337        T: facet::Facet<'wire> + MaybeSend + Send + 'static,
338        E: facet::Facet<'wire> + MaybeSend + Send + 'static,
339    {
340        async fn reply(self, result: Result<T, E>) {
341            let mut guard = self.observed.lock().expect("recording mutex poisoned");
342            *guard = Some(result);
343        }
344    }
345
346    struct RecordingReplySink {
347        saw_send_reply: Arc<Mutex<bool>>,
348        saw_outgoing_payload: Arc<Mutex<bool>>,
349    }
350
351    impl ReplySink for RecordingReplySink {
352        async fn send_reply(self, response: RequestResponse<'_>) {
353            let mut saw_send_reply = self
354                .saw_send_reply
355                .lock()
356                .expect("send-reply mutex poisoned");
357            *saw_send_reply = true;
358
359            let mut saw_outgoing = self
360                .saw_outgoing_payload
361                .lock()
362                .expect("payload-kind mutex poisoned");
363            *saw_outgoing = matches!(response.ret, Payload::Value { .. });
364        }
365    }
366
367    #[tokio::test]
368    async fn call_ok_and_err_route_through_reply() {
369        let observed_ok: Arc<Mutex<Option<Result<u32, &'static str>>>> = Arc::new(Mutex::new(None));
370        RecordingCall {
371            observed: Arc::clone(&observed_ok),
372        }
373        .ok(7)
374        .await;
375        assert!(matches!(
376            *observed_ok.lock().expect("ok mutex poisoned"),
377            Some(Ok(7))
378        ));
379
380        let observed_err: Arc<Mutex<Option<Result<u32, &'static str>>>> =
381            Arc::new(Mutex::new(None));
382        RecordingCall {
383            observed: Arc::clone(&observed_err),
384        }
385        .err("boom")
386        .await;
387        assert!(matches!(
388            *observed_err.lock().expect("err mutex poisoned"),
389            Some(Err("boom"))
390        ));
391    }
392
393    #[tokio::test]
394    async fn reply_sink_send_error_uses_outgoing_payload_and_reply_path() {
395        let saw_send_reply = Arc::new(Mutex::new(false));
396        let saw_outgoing_payload = Arc::new(Mutex::new(false));
397        let sink = RecordingReplySink {
398            saw_send_reply: Arc::clone(&saw_send_reply),
399            saw_outgoing_payload: Arc::clone(&saw_outgoing_payload),
400        };
401
402        sink.send_error(crate::VoxError::<String>::Cancelled).await;
403
404        assert!(*saw_send_reply.lock().expect("send-reply mutex poisoned"));
405        assert!(
406            *saw_outgoing_payload
407                .lock()
408                .expect("payload-kind mutex poisoned")
409        );
410    }
411
412    #[tokio::test]
413    async fn reply_sink_send_typed_error_preserves_ok_shape() {
414        use crate::{
415            SchemaKind, TypeRef, VariantPayload, VoxError, build_registry, extract_schemas,
416        };
417
418        struct ShapeReplySink {
419            observed_root: Arc<Mutex<Option<TypeRef>>>,
420        }
421
422        impl ReplySink for ShapeReplySink {
423            async fn send_reply(self, response: RequestResponse<'_>) {
424                let Payload::Value { shape, .. } = response.ret else {
425                    panic!("typed error should use outgoing payload");
426                };
427                let extracted = extract_schemas(shape).expect("response shape should extract");
428                *self
429                    .observed_root
430                    .lock()
431                    .expect("observed-root mutex poisoned") = Some(extracted.root.clone());
432            }
433        }
434
435        let observed_root = Arc::new(Mutex::new(None));
436        ShapeReplySink {
437            observed_root: Arc::clone(&observed_root),
438        }
439        .send_typed_error::<(String, i32), String>(VoxError::Cancelled)
440        .await;
441
442        let root = observed_root
443            .lock()
444            .expect("observed-root mutex poisoned")
445            .clone()
446            .expect("typed error should record a root");
447        let extracted =
448            extract_schemas(<Result<(String, i32), VoxError<String>> as facet::Facet>::SHAPE)
449                .expect("expected result shape should extract");
450        let registry = build_registry(&extracted.schemas);
451        let root_kind = root.resolve_kind(&registry).expect("root should resolve");
452        let SchemaKind::Enum { variants, .. } = root_kind else {
453            panic!("expected result enum root");
454        };
455        let ok_variant = variants
456            .iter()
457            .find(|variant| variant.name == "Ok")
458            .expect("Result should have Ok variant");
459        let VariantPayload::Newtype { type_ref } = &ok_variant.payload else {
460            panic!("Ok variant should be newtype");
461        };
462        match type_ref
463            .resolve_kind(&registry)
464            .expect("Ok payload should resolve")
465        {
466            SchemaKind::Tuple { elements } => {
467                assert_eq!(elements.len(), 2, "Ok tuple should have two elements");
468            }
469            other => panic!("expected Ok payload to be tuple, got {other:?}"),
470        }
471    }
472
473    #[tokio::test]
474    async fn unit_handler_is_noop() {
475        let req = crate::SelfRef::owning(
476            crate::Backing::Boxed(Box::<[u8]>::default()),
477            RequestCall {
478                method_id: crate::MethodId(1),
479                metadata: Metadata::default(),
480                args: Payload::PostcardBytes(&[]),
481                schemas: Default::default(),
482            },
483        );
484        ().handle(
485            req,
486            RecordingReplySink {
487                saw_send_reply: Arc::new(Mutex::new(false)),
488                saw_outgoing_payload: Arc::new(Mutex::new(false)),
489            },
490            Arc::new(crate::SchemaRecvTracker::new()),
491        )
492        .await;
493    }
494
495    #[test]
496    fn response_parts_deref_exposes_ret() {
497        let parts = ResponseParts {
498            ret: 42_u32,
499            metadata: Metadata::default(),
500        };
501        assert_eq!(*parts, 42);
502    }
503
504    #[test]
505    fn default_channel_binder_accessor_for_reply_sink_returns_none() {
506        let sink = RecordingReplySink {
507            saw_send_reply: Arc::new(Mutex::new(false)),
508            saw_outgoing_payload: Arc::new(Mutex::new(false)),
509        };
510        assert!(sink.channel_binder().is_none());
511    }
512}