Skip to main content

taut_rpc/
procedure.rs

1//! Type-erased procedure contract used by `#[rpc]`-emitted code to register
2//! a procedure with the [`crate::router::Router`].
3//!
4//! The `#[rpc]` proc-macro emits a [`ProcedureDescriptor`] per annotated
5//! function: a static name, a runtime kind tag, the IR fragment and reachable
6//! [`crate::ir::TypeDef`]s for codegen, and a type-erased async body in the
7//! form of a [`ProcedureBody`].
8//!
9//! The body is one of two shapes:
10//!
11//! - [`ProcedureBody::Unary`] — for queries and mutations. A future-returning
12//!   closure shaped like SPEC §4.1's request/response cycle: take the JSON
13//!   `input`, return a single [`ProcedureResult`].
14//! - [`ProcedureBody::Stream`] — for subscriptions (Phase 3). A stream-returning
15//!   closure: take the JSON `input`, yield a sequence of [`StreamFrame`]s,
16//!   each mapping to one SSE frame per SPEC §4.2 (`event: data` or
17//!   `event: error`). End-of-stream is implicit when the stream finishes —
18//!   the router emits the closing `event: end\ndata:\n\n` frame itself.
19//!
20//! Both shapes are wrapped in `Arc<dyn Fn>` so descriptors are cheap to clone
21//! and trivially `Send + Sync` — exactly what a shared `Router` needs to
22//! dispatch concurrent requests across procedures. The deserialize → call user
23//! fn → serialize cycle is owned entirely by the macro emission: the body
24//! closure already accepts `serde_json::Value` for the input and produces
25//! pre-serialized payloads. The [`crate::router::Router`] knows nothing about
26//! input/output types — its job is purely HTTP framing.
27
28use std::sync::Arc;
29
30use futures::future::BoxFuture;
31use futures::stream::BoxStream;
32
33/// Type-erased async unary handler — used for queries and mutations.
34///
35/// Takes a JSON `Value` (the already-extracted `input` field of the §4.1
36/// request envelope) and returns a future resolving to a single
37/// [`ProcedureResult`]. Wrapped in `Arc<dyn Fn>` so the descriptor is
38/// `Clone + Send + Sync` while keeping the closure type erased.
39pub type UnaryHandler =
40    Arc<dyn Fn(serde_json::Value) -> BoxFuture<'static, ProcedureResult> + Send + Sync>;
41
42/// Type-erased async streaming handler — used for subscriptions (SPEC §4.2).
43///
44/// Takes a JSON `Value` (the request input) and returns a `BoxStream` of
45/// [`StreamFrame`]s. Each yielded frame maps to one SSE event per SPEC §4.2:
46/// [`StreamFrame::Data`] becomes `event: data`, [`StreamFrame::Error`]
47/// becomes `event: error`. End-of-stream is implicit — when the stream
48/// finishes, the router emits the closing `event: end\ndata:\n\n` frame.
49pub type StreamHandler =
50    Arc<dyn Fn(serde_json::Value) -> BoxStream<'static, StreamFrame> + Send + Sync>;
51
52/// Backwards-compatible alias. Older code (and the Phase 1/2 macro emission)
53/// referred to a single `ProcedureHandler` type that was implicitly unary;
54/// keep the name pointed at [`UnaryHandler`] so unrelated call sites compile
55/// unchanged across the Phase 3 split.
56pub type ProcedureHandler = UnaryHandler;
57
58/// Outcome of invoking a [`UnaryHandler`].
59///
60/// Maps directly to the SPEC §4.1 wire envelope: [`Self::Ok`] becomes
61/// `200 { "ok": <payload> }`; [`Self::Err`] becomes
62/// `<http_status> { "err": { "code", "payload" } }`.
63pub enum ProcedureResult {
64    /// Successful response — the JSON value sent back as `{"ok": ...}`.
65    Ok(serde_json::Value),
66    /// Failure response — sent back as `{"err": {"code", "payload"}}` with
67    /// the given HTTP status.
68    Err {
69        /// HTTP status code returned to the caller.
70        http_status: u16,
71        /// Stable, machine-readable error code.
72        code: String,
73        /// Error payload serialized into the wire envelope.
74        payload: serde_json::Value,
75    },
76}
77
78impl ProcedureResult {
79    /// Serialize a value into [`ProcedureResult::Ok`].
80    ///
81    /// On serialization failure, falls back to a 500 `serialization_error`
82    /// with a `null` payload — there's no useful structured payload to emit
83    /// when serde itself failed, and surfacing the raw `serde_json::Error`
84    /// would leak Rust-internal type names to the wire.
85    pub fn ok(value: impl serde::Serialize) -> Self {
86        match serde_json::to_value(&value) {
87            Ok(v) => ProcedureResult::Ok(v),
88            Err(_) => ProcedureResult::Err {
89                http_status: 500,
90                code: "serialization_error".to_string(),
91                payload: serde_json::Value::Null,
92            },
93        }
94    }
95
96    /// Build [`ProcedureResult::Err`] from a status, stable code, and
97    /// serializable payload. Same fallback semantics as [`Self::ok`] when the
98    /// payload fails to serialize.
99    pub fn err(http_status: u16, code: impl Into<String>, payload: impl serde::Serialize) -> Self {
100        match serde_json::to_value(&payload) {
101            Ok(payload) => ProcedureResult::Err {
102                http_status,
103                code: code.into(),
104                payload,
105            },
106            Err(_) => ProcedureResult::Err {
107                http_status: 500,
108                code: "serialization_error".to_string(),
109                payload: serde_json::Value::Null,
110            },
111        }
112    }
113
114    /// Build [`ProcedureResult::Err`] from a [`crate::TautError`]. The payload
115    /// is `serde_json::to_value(&e)`; if that fails the payload becomes
116    /// `null` but `code` and `http_status` are still taken from the error.
117    #[allow(clippy::needless_pass_by_value)] // owned `e` matches macro-emitted call sites
118    pub fn from_taut_error<E: crate::TautError>(e: E) -> Self {
119        let code = e.code().to_string();
120        let http_status = e.http_status();
121        let payload = serde_json::to_value(&e).unwrap_or(serde_json::Value::Null);
122        ProcedureResult::Err {
123            http_status,
124            code,
125            payload,
126        }
127    }
128
129    /// Convenience helper for macro-emitted code: maps a `serde_json::Error`
130    /// (typically from output serialization in the handler wrapper) to a
131    /// uniform 500 `serialization_error` response.
132    #[must_use]
133    #[allow(clippy::needless_pass_by_value)] // owned arg matches macro-emitted call sites
134    pub fn from_serialization(_e: serde_json::Error) -> Self {
135        ProcedureResult::Err {
136            http_status: 500,
137            code: "serialization_error".to_string(),
138            payload: serde_json::Value::Null,
139        }
140    }
141}
142
143/// One frame yielded by a [`StreamHandler`].
144///
145/// Mirrors the SPEC §4.2 SSE event shapes:
146///
147/// - [`Self::Data`] → `event: data\ndata: <json>\n\n`
148/// - [`Self::Error`] → `event: error\ndata: <{code,payload}>\n\n`
149///
150/// The terminal `event: end\ndata:\n\n` frame is implicit — when the
151/// underlying stream finishes, the router emits it. Stream handlers should
152/// just stop yielding rather than try to encode the end frame themselves.
153///
154/// `StreamFrame` is intentionally runtime-only: it carries pre-serialized
155/// `serde_json::Value`s so the router can splat them into SSE bodies without
156/// re-running user `Serialize` impls. It does **not** implement
157/// `serde::Serialize`/`Deserialize` itself — there's no wire shape to round
158/// trip.
159#[derive(Debug, Clone)]
160pub enum StreamFrame {
161    /// A successful payload frame. Becomes `event: data\ndata: <json>\n\n`
162    /// on the SSE wire.
163    Data(serde_json::Value),
164    /// An error frame. Becomes `event: error\ndata: {"code","payload"}\n\n`
165    /// on the SSE wire. Streaming errors do **not** terminate the connection
166    /// at the SPEC level — the user's stream chooses whether to keep yielding
167    /// after an `Error` frame or stop. (The HTTP response is already
168    /// committed by the time SSE frames flow, so there's no status code to
169    /// flip.)
170    Error {
171        /// Stable error code emitted with the SSE error frame.
172        code: String,
173        /// Error payload serialized into the SSE error frame.
174        payload: serde_json::Value,
175    },
176}
177
178impl StreamFrame {
179    /// Serialize a value into [`StreamFrame::Data`].
180    ///
181    /// On serialization failure, falls back to a [`StreamFrame::Error`] with
182    /// `code = "serialization_error"` and a `null` payload — same fallback
183    /// shape as [`ProcedureResult::ok`], for consistency between the unary
184    /// and streaming paths.
185    pub fn data(value: impl serde::Serialize) -> Self {
186        match serde_json::to_value(&value) {
187            Ok(v) => StreamFrame::Data(v),
188            Err(_) => StreamFrame::Error {
189                code: "serialization_error".to_string(),
190                payload: serde_json::Value::Null,
191            },
192        }
193    }
194
195    /// Build [`StreamFrame::Error`] from a stable code and serializable
196    /// payload. Same fallback semantics as [`Self::data`] when the payload
197    /// fails to serialize.
198    pub fn err(code: impl Into<String>, payload: impl serde::Serialize) -> Self {
199        match serde_json::to_value(&payload) {
200            Ok(payload) => StreamFrame::Error {
201                code: code.into(),
202                payload,
203            },
204            Err(_) => StreamFrame::Error {
205                code: "serialization_error".to_string(),
206                payload: serde_json::Value::Null,
207            },
208        }
209    }
210
211    /// Build [`StreamFrame::Error`] from a [`crate::TautError`]. The payload
212    /// is `serde_json::to_value(&e)`; if that fails the payload becomes
213    /// `null` but `code` is still taken from the error.
214    ///
215    /// Note that, unlike the unary [`ProcedureResult::from_taut_error`], the
216    /// `http_status` of the error is intentionally dropped: SSE frames flow
217    /// after the HTTP status line is already committed, so per-frame status
218    /// codes don't fit. Callers wanting status-mapping semantics should use a
219    /// unary procedure instead.
220    #[allow(clippy::needless_pass_by_value)] // owned `e` matches macro-emitted call sites
221    pub fn from_taut_error<E: crate::TautError>(e: E) -> Self {
222        let code = e.code().to_string();
223        let payload = serde_json::to_value(&e).unwrap_or(serde_json::Value::Null);
224        StreamFrame::Error { code, payload }
225    }
226}
227
228/// Body of a [`ProcedureDescriptor`] — either a unary handler (queries and
229/// mutations, SPEC §4.1) or a streaming handler (subscriptions, SPEC §4.2).
230///
231/// `Clone` because [`UnaryHandler`] / [`StreamHandler`] are themselves `Arc`s
232/// — cloning a `ProcedureBody` just bumps refcounts.
233///
234/// # Examples
235///
236/// Pattern-match a descriptor's body to dispatch on the procedure flavor.
237/// This is the same shape the router itself uses internally:
238///
239/// ```rust,ignore
240/// use taut_rpc::procedure::{ProcedureBody, ProcedureDescriptor};
241///
242/// fn describe(desc: &ProcedureDescriptor) -> &'static str {
243///     match &desc.body {
244///         ProcedureBody::Unary(_) => "query or mutation",
245///         ProcedureBody::Stream(_) => "subscription",
246///     }
247/// }
248/// ```
249#[derive(Clone)]
250pub enum ProcedureBody {
251    /// Unary handler — used by queries and mutations (SPEC §4.1).
252    Unary(UnaryHandler),
253    /// Streaming handler — used by subscriptions (SPEC §4.2).
254    Stream(StreamHandler),
255}
256
257/// Runtime descriptor for a single `#[rpc]` procedure.
258///
259/// Built by the `#[rpc]` macro at compile time and registered with
260/// [`crate::router::Router`] at startup. Carries everything the router needs
261/// to dispatch a request (`name`, `kind`, `body`) plus everything the IR
262/// document needs to describe this procedure to the TypeScript codegen
263/// (`ir`, `type_defs`).
264#[derive(Clone)]
265pub struct ProcedureDescriptor {
266    /// Procedure name. Matches the underlying Rust function name and is the
267    /// path segment in `/rpc/<name>`.
268    pub name: &'static str,
269    /// Runtime tag distinguishing query / mutation / subscription dispatch.
270    pub kind: crate::router::ProcKindRuntime,
271    /// IR fragment (input/output types, HTTP method, doc) for this procedure.
272    pub ir: crate::ir::Procedure,
273    /// All [`crate::ir::TypeDef`]s reachable from this procedure's signature.
274    /// Router-level IR assembly deduplicates across procedures.
275    pub type_defs: Vec<crate::ir::TypeDef>,
276    /// Type-erased async body — unary for query/mutation, streaming for
277    /// subscriptions. Phase 3 replaces the Phase 1/2 single `handler` field
278    /// with this two-variant body.
279    pub body: ProcedureBody,
280}
281
282impl std::fmt::Debug for ProcedureDescriptor {
283    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
284        // Skip the actual handler closure (no useful Debug for `dyn Fn`); just
285        // print which variant of `ProcedureBody` we're holding so logs show
286        // the unary-vs-stream split. IR input/output refs round out the
287        // procedure shape.
288        let body_kind = match &self.body {
289            ProcedureBody::Unary(_) => "Unary",
290            ProcedureBody::Stream(_) => "Stream",
291        };
292        f.debug_struct("ProcedureDescriptor")
293            .field("name", &self.name)
294            .field("kind", &self.kind)
295            .field("body", &body_kind)
296            .field("input", &self.ir.input)
297            .field("output", &self.ir.output)
298            .finish_non_exhaustive()
299    }
300}
301
302#[cfg(test)]
303mod tests {
304    use super::*;
305
306    #[test]
307    fn ok_serializes_to_expected_json_value() {
308        let r = ProcedureResult::ok(42u32);
309        match r {
310            ProcedureResult::Ok(v) => assert_eq!(v, serde_json::json!(42)),
311            ProcedureResult::Err { .. } => panic!("expected Ok"),
312        }
313    }
314
315    #[test]
316    fn err_builds_envelope_with_supplied_fields() {
317        let r = ProcedureResult::err(404, "not_found", serde_json::Value::Null);
318        match r {
319            ProcedureResult::Err {
320                http_status,
321                code,
322                payload,
323            } => {
324                assert_eq!(http_status, 404);
325                assert_eq!(code, "not_found");
326                assert_eq!(payload, serde_json::Value::Null);
327            }
328            ProcedureResult::Ok(_) => panic!("expected Err"),
329        }
330    }
331
332    #[test]
333    fn from_taut_error_preserves_code_and_status() {
334        let r = ProcedureResult::from_taut_error(crate::error::StandardError::Unauthenticated);
335        match r {
336            ProcedureResult::Err {
337                http_status, code, ..
338            } => {
339                assert_eq!(code, "unauthenticated");
340                assert_eq!(http_status, 401);
341            }
342            ProcedureResult::Ok(_) => panic!("expected Err"),
343        }
344    }
345
346    #[test]
347    fn ok_payload_roundtrips_through_serde_json_string() {
348        let value = serde_json::json!({ "id": 7, "name": "ada" });
349        let r = ProcedureResult::Ok(value.clone());
350        let encoded = match r {
351            ProcedureResult::Ok(v) => serde_json::to_string(&v).expect("encode"),
352            ProcedureResult::Err { .. } => panic!("expected Ok"),
353        };
354        let decoded: serde_json::Value = serde_json::from_str(&encoded).expect("decode");
355        assert_eq!(decoded, value);
356    }
357
358    // ---- Phase 3: ProcedureBody / StreamFrame -------------------------------
359
360    /// Smallest possible IR fragment for tests — fields the router/IR loop
361    /// don't care about for a closure-dispatch test, but that we still need
362    /// to construct a `ProcedureDescriptor`.
363    fn dummy_procedure_ir(name: &str) -> crate::ir::Procedure {
364        use crate::ir::{HttpMethod, Primitive, ProcKind, TypeRef};
365        crate::ir::Procedure {
366            name: name.to_string(),
367            kind: ProcKind::Query,
368            input: TypeRef::Primitive(Primitive::Unit),
369            output: TypeRef::Primitive(Primitive::Unit),
370            errors: vec![],
371            http_method: HttpMethod::Post,
372            doc: None,
373        }
374    }
375
376    #[tokio::test]
377    async fn unary_body_dispatches_through_handler() {
378        // Construct a `ProcedureBody::Unary` directly (i.e. without going
379        // through the macro emission), call its handler with a JSON value,
380        // and assert the result echoes back.
381        let handler: UnaryHandler = Arc::new(|input: serde_json::Value| {
382            Box::pin(async move { ProcedureResult::Ok(input) })
383        });
384        let desc = ProcedureDescriptor {
385            name: "echo",
386            kind: crate::router::ProcKindRuntime::Query,
387            ir: dummy_procedure_ir("echo"),
388            type_defs: vec![],
389            body: ProcedureBody::Unary(handler),
390        };
391
392        let h = match &desc.body {
393            ProcedureBody::Unary(h) => h.clone(),
394            ProcedureBody::Stream(_) => panic!("expected Unary body"),
395        };
396        let result = h(serde_json::json!({"hello": "world"})).await;
397        match result {
398            ProcedureResult::Ok(v) => assert_eq!(v, serde_json::json!({"hello": "world"})),
399            ProcedureResult::Err { .. } => panic!("expected Ok"),
400        }
401    }
402
403    #[tokio::test]
404    async fn stream_body_emits_collected_frames() {
405        use futures::stream::{self, StreamExt};
406
407        // Yield three `StreamFrame::Data` items — proves the descriptor's
408        // streaming side compiles, runs, and produces the expected sequence.
409        let handler: StreamHandler = Arc::new(|_input: serde_json::Value| {
410            let frames = vec![
411                StreamFrame::Data(serde_json::json!(1)),
412                StreamFrame::Data(serde_json::json!(2)),
413                StreamFrame::Data(serde_json::json!(3)),
414            ];
415            stream::iter(frames).boxed()
416        });
417        let desc = ProcedureDescriptor {
418            name: "counter",
419            kind: crate::router::ProcKindRuntime::Subscription,
420            ir: dummy_procedure_ir("counter"),
421            type_defs: vec![],
422            body: ProcedureBody::Stream(handler),
423        };
424
425        let s = match &desc.body {
426            ProcedureBody::Stream(s) => s.clone(),
427            ProcedureBody::Unary(_) => panic!("expected Stream body"),
428        };
429        let frames: Vec<StreamFrame> = s(serde_json::Value::Null).collect().await;
430        assert_eq!(frames.len(), 3);
431        let values: Vec<serde_json::Value> = frames
432            .into_iter()
433            .map(|f| match f {
434                StreamFrame::Data(v) => v,
435                StreamFrame::Error { .. } => panic!("expected Data frame"),
436            })
437            .collect();
438        assert_eq!(
439            values,
440            vec![
441                serde_json::json!(1),
442                serde_json::json!(2),
443                serde_json::json!(3),
444            ]
445        );
446    }
447
448    #[test]
449    fn stream_frame_data_serializes_payload_in_place() {
450        // `StreamFrame` is runtime-only — it doesn't implement Serialize /
451        // Deserialize, so there's no JSON round-trip to assert. Instead,
452        // verify that `StreamFrame::data` serializes its argument *into* the
453        // variant payload (so the router doesn't need to re-serialize).
454        let f = StreamFrame::data(42u32);
455        match f {
456            StreamFrame::Data(v) => assert_eq!(v, serde_json::json!(42)),
457            StreamFrame::Error { .. } => panic!("expected Data variant"),
458        }
459    }
460
461    #[test]
462    fn stream_frame_err_builds_error_variant() {
463        let f = StreamFrame::err("rate_limited", serde_json::json!({"retry_after": 5}));
464        match f {
465            StreamFrame::Error { code, payload } => {
466                assert_eq!(code, "rate_limited");
467                assert_eq!(payload, serde_json::json!({"retry_after": 5}));
468            }
469            StreamFrame::Data(_) => panic!("expected Error variant"),
470        }
471    }
472
473    #[test]
474    fn stream_frame_from_taut_error_preserves_code() {
475        let f = StreamFrame::from_taut_error(crate::error::StandardError::Unauthenticated);
476        match f {
477            StreamFrame::Error { code, .. } => assert_eq!(code, "unauthenticated"),
478            StreamFrame::Data(_) => panic!("expected Error variant"),
479        }
480    }
481}