taut_rpc/procedure.rs
1//! Type-erased procedure contract used by `#[rpc]`-emitted code to register
2//! a procedure with the [`crate::router::Router`].
3//!
4//! The `#[rpc]` proc-macro emits a [`ProcedureDescriptor`] per annotated
5//! function: a static name, a runtime kind tag, the IR fragment and reachable
6//! [`crate::ir::TypeDef`]s for codegen, and a type-erased async body in the
7//! form of a [`ProcedureBody`].
8//!
9//! The body is one of two shapes:
10//!
11//! - [`ProcedureBody::Unary`] — for queries and mutations. A future-returning
12//! closure shaped like SPEC §4.1's request/response cycle: take the JSON
13//! `input`, return a single [`ProcedureResult`].
14//! - [`ProcedureBody::Stream`] — for subscriptions (Phase 3). A stream-returning
15//! closure: take the JSON `input`, yield a sequence of [`StreamFrame`]s,
16//! each mapping to one SSE frame per SPEC §4.2 (`event: data` or
17//! `event: error`). End-of-stream is implicit when the stream finishes —
18//! the router emits the closing `event: end\ndata:\n\n` frame itself.
19//!
20//! Both shapes are wrapped in `Arc<dyn Fn>` so descriptors are cheap to clone
21//! and trivially `Send + Sync` — exactly what a shared `Router` needs to
22//! dispatch concurrent requests across procedures. The deserialize → call user
23//! fn → serialize cycle is owned entirely by the macro emission: the body
24//! closure already accepts `serde_json::Value` for the input and produces
25//! pre-serialized payloads. The [`crate::router::Router`] knows nothing about
26//! input/output types — its job is purely HTTP framing.
27
28use std::sync::Arc;
29
30use futures::future::BoxFuture;
31use futures::stream::BoxStream;
32
33/// Type-erased async unary handler — used for queries and mutations.
34///
35/// Takes a JSON `Value` (the already-extracted `input` field of the §4.1
36/// request envelope) and returns a future resolving to a single
37/// [`ProcedureResult`]. Wrapped in `Arc<dyn Fn>` so the descriptor is
38/// `Clone + Send + Sync` while keeping the closure type erased.
39pub type UnaryHandler =
40 Arc<dyn Fn(serde_json::Value) -> BoxFuture<'static, ProcedureResult> + Send + Sync>;
41
42/// Type-erased async streaming handler — used for subscriptions (SPEC §4.2).
43///
44/// Takes a JSON `Value` (the request input) and returns a `BoxStream` of
45/// [`StreamFrame`]s. Each yielded frame maps to one SSE event per SPEC §4.2:
46/// [`StreamFrame::Data`] becomes `event: data`, [`StreamFrame::Error`]
47/// becomes `event: error`. End-of-stream is implicit — when the stream
48/// finishes, the router emits the closing `event: end\ndata:\n\n` frame.
49pub type StreamHandler =
50 Arc<dyn Fn(serde_json::Value) -> BoxStream<'static, StreamFrame> + Send + Sync>;
51
52/// Backwards-compatible alias. Older code (and the Phase 1/2 macro emission)
53/// referred to a single `ProcedureHandler` type that was implicitly unary;
54/// keep the name pointed at [`UnaryHandler`] so unrelated call sites compile
55/// unchanged across the Phase 3 split.
56pub type ProcedureHandler = UnaryHandler;
57
58/// Outcome of invoking a [`UnaryHandler`].
59///
60/// Maps directly to the SPEC §4.1 wire envelope: [`Self::Ok`] becomes
61/// `200 { "ok": <payload> }`; [`Self::Err`] becomes
62/// `<http_status> { "err": { "code", "payload" } }`.
63pub enum ProcedureResult {
64 /// Successful response — the JSON value sent back as `{"ok": ...}`.
65 Ok(serde_json::Value),
66 /// Failure response — sent back as `{"err": {"code", "payload"}}` with
67 /// the given HTTP status.
68 Err {
69 /// HTTP status code returned to the caller.
70 http_status: u16,
71 /// Stable, machine-readable error code.
72 code: String,
73 /// Error payload serialized into the wire envelope.
74 payload: serde_json::Value,
75 },
76}
77
78impl ProcedureResult {
79 /// Serialize a value into [`ProcedureResult::Ok`].
80 ///
81 /// On serialization failure, falls back to a 500 `serialization_error`
82 /// with a `null` payload — there's no useful structured payload to emit
83 /// when serde itself failed, and surfacing the raw `serde_json::Error`
84 /// would leak Rust-internal type names to the wire.
85 pub fn ok(value: impl serde::Serialize) -> Self {
86 match serde_json::to_value(&value) {
87 Ok(v) => ProcedureResult::Ok(v),
88 Err(_) => ProcedureResult::Err {
89 http_status: 500,
90 code: "serialization_error".to_string(),
91 payload: serde_json::Value::Null,
92 },
93 }
94 }
95
96 /// Build [`ProcedureResult::Err`] from a status, stable code, and
97 /// serializable payload. Same fallback semantics as [`Self::ok`] when the
98 /// payload fails to serialize.
99 pub fn err(http_status: u16, code: impl Into<String>, payload: impl serde::Serialize) -> Self {
100 match serde_json::to_value(&payload) {
101 Ok(payload) => ProcedureResult::Err {
102 http_status,
103 code: code.into(),
104 payload,
105 },
106 Err(_) => ProcedureResult::Err {
107 http_status: 500,
108 code: "serialization_error".to_string(),
109 payload: serde_json::Value::Null,
110 },
111 }
112 }
113
114 /// Build [`ProcedureResult::Err`] from a [`crate::TautError`]. The payload
115 /// is `serde_json::to_value(&e)`; if that fails the payload becomes
116 /// `null` but `code` and `http_status` are still taken from the error.
117 #[allow(clippy::needless_pass_by_value)] // owned `e` matches macro-emitted call sites
118 pub fn from_taut_error<E: crate::TautError>(e: E) -> Self {
119 let code = e.code().to_string();
120 let http_status = e.http_status();
121 let payload = serde_json::to_value(&e).unwrap_or(serde_json::Value::Null);
122 ProcedureResult::Err {
123 http_status,
124 code,
125 payload,
126 }
127 }
128
129 /// Convenience helper for macro-emitted code: maps a `serde_json::Error`
130 /// (typically from output serialization in the handler wrapper) to a
131 /// uniform 500 `serialization_error` response.
132 #[must_use]
133 #[allow(clippy::needless_pass_by_value)] // owned arg matches macro-emitted call sites
134 pub fn from_serialization(_e: serde_json::Error) -> Self {
135 ProcedureResult::Err {
136 http_status: 500,
137 code: "serialization_error".to_string(),
138 payload: serde_json::Value::Null,
139 }
140 }
141}
142
143/// One frame yielded by a [`StreamHandler`].
144///
145/// Mirrors the SPEC §4.2 SSE event shapes:
146///
147/// - [`Self::Data`] → `event: data\ndata: <json>\n\n`
148/// - [`Self::Error`] → `event: error\ndata: <{code,payload}>\n\n`
149///
150/// The terminal `event: end\ndata:\n\n` frame is implicit — when the
151/// underlying stream finishes, the router emits it. Stream handlers should
152/// just stop yielding rather than try to encode the end frame themselves.
153///
154/// `StreamFrame` is intentionally runtime-only: it carries pre-serialized
155/// `serde_json::Value`s so the router can splat them into SSE bodies without
156/// re-running user `Serialize` impls. It does **not** implement
157/// `serde::Serialize`/`Deserialize` itself — there's no wire shape to round
158/// trip.
159#[derive(Debug, Clone)]
160pub enum StreamFrame {
161 /// A successful payload frame. Becomes `event: data\ndata: <json>\n\n`
162 /// on the SSE wire.
163 Data(serde_json::Value),
164 /// An error frame. Becomes `event: error\ndata: {"code","payload"}\n\n`
165 /// on the SSE wire. Streaming errors do **not** terminate the connection
166 /// at the SPEC level — the user's stream chooses whether to keep yielding
167 /// after an `Error` frame or stop. (The HTTP response is already
168 /// committed by the time SSE frames flow, so there's no status code to
169 /// flip.)
170 Error {
171 /// Stable error code emitted with the SSE error frame.
172 code: String,
173 /// Error payload serialized into the SSE error frame.
174 payload: serde_json::Value,
175 },
176}
177
178impl StreamFrame {
179 /// Serialize a value into [`StreamFrame::Data`].
180 ///
181 /// On serialization failure, falls back to a [`StreamFrame::Error`] with
182 /// `code = "serialization_error"` and a `null` payload — same fallback
183 /// shape as [`ProcedureResult::ok`], for consistency between the unary
184 /// and streaming paths.
185 pub fn data(value: impl serde::Serialize) -> Self {
186 match serde_json::to_value(&value) {
187 Ok(v) => StreamFrame::Data(v),
188 Err(_) => StreamFrame::Error {
189 code: "serialization_error".to_string(),
190 payload: serde_json::Value::Null,
191 },
192 }
193 }
194
195 /// Build [`StreamFrame::Error`] from a stable code and serializable
196 /// payload. Same fallback semantics as [`Self::data`] when the payload
197 /// fails to serialize.
198 pub fn err(code: impl Into<String>, payload: impl serde::Serialize) -> Self {
199 match serde_json::to_value(&payload) {
200 Ok(payload) => StreamFrame::Error {
201 code: code.into(),
202 payload,
203 },
204 Err(_) => StreamFrame::Error {
205 code: "serialization_error".to_string(),
206 payload: serde_json::Value::Null,
207 },
208 }
209 }
210
211 /// Build [`StreamFrame::Error`] from a [`crate::TautError`]. The payload
212 /// is `serde_json::to_value(&e)`; if that fails the payload becomes
213 /// `null` but `code` is still taken from the error.
214 ///
215 /// Note that, unlike the unary [`ProcedureResult::from_taut_error`], the
216 /// `http_status` of the error is intentionally dropped: SSE frames flow
217 /// after the HTTP status line is already committed, so per-frame status
218 /// codes don't fit. Callers wanting status-mapping semantics should use a
219 /// unary procedure instead.
220 #[allow(clippy::needless_pass_by_value)] // owned `e` matches macro-emitted call sites
221 pub fn from_taut_error<E: crate::TautError>(e: E) -> Self {
222 let code = e.code().to_string();
223 let payload = serde_json::to_value(&e).unwrap_or(serde_json::Value::Null);
224 StreamFrame::Error { code, payload }
225 }
226}
227
228/// Body of a [`ProcedureDescriptor`] — either a unary handler (queries and
229/// mutations, SPEC §4.1) or a streaming handler (subscriptions, SPEC §4.2).
230///
231/// `Clone` because [`UnaryHandler`] / [`StreamHandler`] are themselves `Arc`s
232/// — cloning a `ProcedureBody` just bumps refcounts.
233///
234/// # Examples
235///
236/// Pattern-match a descriptor's body to dispatch on the procedure flavor.
237/// This is the same shape the router itself uses internally:
238///
239/// ```rust,ignore
240/// use taut_rpc::procedure::{ProcedureBody, ProcedureDescriptor};
241///
242/// fn describe(desc: &ProcedureDescriptor) -> &'static str {
243/// match &desc.body {
244/// ProcedureBody::Unary(_) => "query or mutation",
245/// ProcedureBody::Stream(_) => "subscription",
246/// }
247/// }
248/// ```
249#[derive(Clone)]
250pub enum ProcedureBody {
251 /// Unary handler — used by queries and mutations (SPEC §4.1).
252 Unary(UnaryHandler),
253 /// Streaming handler — used by subscriptions (SPEC §4.2).
254 Stream(StreamHandler),
255}
256
257/// Runtime descriptor for a single `#[rpc]` procedure.
258///
259/// Built by the `#[rpc]` macro at compile time and registered with
260/// [`crate::router::Router`] at startup. Carries everything the router needs
261/// to dispatch a request (`name`, `kind`, `body`) plus everything the IR
262/// document needs to describe this procedure to the TypeScript codegen
263/// (`ir`, `type_defs`).
264#[derive(Clone)]
265pub struct ProcedureDescriptor {
266 /// Procedure name. Matches the underlying Rust function name and is the
267 /// path segment in `/rpc/<name>`.
268 pub name: &'static str,
269 /// Runtime tag distinguishing query / mutation / subscription dispatch.
270 pub kind: crate::router::ProcKindRuntime,
271 /// IR fragment (input/output types, HTTP method, doc) for this procedure.
272 pub ir: crate::ir::Procedure,
273 /// All [`crate::ir::TypeDef`]s reachable from this procedure's signature.
274 /// Router-level IR assembly deduplicates across procedures.
275 pub type_defs: Vec<crate::ir::TypeDef>,
276 /// Type-erased async body — unary for query/mutation, streaming for
277 /// subscriptions. Phase 3 replaces the Phase 1/2 single `handler` field
278 /// with this two-variant body.
279 pub body: ProcedureBody,
280}
281
282impl std::fmt::Debug for ProcedureDescriptor {
283 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
284 // Skip the actual handler closure (no useful Debug for `dyn Fn`); just
285 // print which variant of `ProcedureBody` we're holding so logs show
286 // the unary-vs-stream split. IR input/output refs round out the
287 // procedure shape.
288 let body_kind = match &self.body {
289 ProcedureBody::Unary(_) => "Unary",
290 ProcedureBody::Stream(_) => "Stream",
291 };
292 f.debug_struct("ProcedureDescriptor")
293 .field("name", &self.name)
294 .field("kind", &self.kind)
295 .field("body", &body_kind)
296 .field("input", &self.ir.input)
297 .field("output", &self.ir.output)
298 .finish_non_exhaustive()
299 }
300}
301
302#[cfg(test)]
303mod tests {
304 use super::*;
305
306 #[test]
307 fn ok_serializes_to_expected_json_value() {
308 let r = ProcedureResult::ok(42u32);
309 match r {
310 ProcedureResult::Ok(v) => assert_eq!(v, serde_json::json!(42)),
311 ProcedureResult::Err { .. } => panic!("expected Ok"),
312 }
313 }
314
315 #[test]
316 fn err_builds_envelope_with_supplied_fields() {
317 let r = ProcedureResult::err(404, "not_found", serde_json::Value::Null);
318 match r {
319 ProcedureResult::Err {
320 http_status,
321 code,
322 payload,
323 } => {
324 assert_eq!(http_status, 404);
325 assert_eq!(code, "not_found");
326 assert_eq!(payload, serde_json::Value::Null);
327 }
328 ProcedureResult::Ok(_) => panic!("expected Err"),
329 }
330 }
331
332 #[test]
333 fn from_taut_error_preserves_code_and_status() {
334 let r = ProcedureResult::from_taut_error(crate::error::StandardError::Unauthenticated);
335 match r {
336 ProcedureResult::Err {
337 http_status, code, ..
338 } => {
339 assert_eq!(code, "unauthenticated");
340 assert_eq!(http_status, 401);
341 }
342 ProcedureResult::Ok(_) => panic!("expected Err"),
343 }
344 }
345
346 #[test]
347 fn ok_payload_roundtrips_through_serde_json_string() {
348 let value = serde_json::json!({ "id": 7, "name": "ada" });
349 let r = ProcedureResult::Ok(value.clone());
350 let encoded = match r {
351 ProcedureResult::Ok(v) => serde_json::to_string(&v).expect("encode"),
352 ProcedureResult::Err { .. } => panic!("expected Ok"),
353 };
354 let decoded: serde_json::Value = serde_json::from_str(&encoded).expect("decode");
355 assert_eq!(decoded, value);
356 }
357
358 // ---- Phase 3: ProcedureBody / StreamFrame -------------------------------
359
360 /// Smallest possible IR fragment for tests — fields the router/IR loop
361 /// don't care about for a closure-dispatch test, but that we still need
362 /// to construct a `ProcedureDescriptor`.
363 fn dummy_procedure_ir(name: &str) -> crate::ir::Procedure {
364 use crate::ir::{HttpMethod, Primitive, ProcKind, TypeRef};
365 crate::ir::Procedure {
366 name: name.to_string(),
367 kind: ProcKind::Query,
368 input: TypeRef::Primitive(Primitive::Unit),
369 output: TypeRef::Primitive(Primitive::Unit),
370 errors: vec![],
371 http_method: HttpMethod::Post,
372 doc: None,
373 }
374 }
375
376 #[tokio::test]
377 async fn unary_body_dispatches_through_handler() {
378 // Construct a `ProcedureBody::Unary` directly (i.e. without going
379 // through the macro emission), call its handler with a JSON value,
380 // and assert the result echoes back.
381 let handler: UnaryHandler = Arc::new(|input: serde_json::Value| {
382 Box::pin(async move { ProcedureResult::Ok(input) })
383 });
384 let desc = ProcedureDescriptor {
385 name: "echo",
386 kind: crate::router::ProcKindRuntime::Query,
387 ir: dummy_procedure_ir("echo"),
388 type_defs: vec![],
389 body: ProcedureBody::Unary(handler),
390 };
391
392 let h = match &desc.body {
393 ProcedureBody::Unary(h) => h.clone(),
394 ProcedureBody::Stream(_) => panic!("expected Unary body"),
395 };
396 let result = h(serde_json::json!({"hello": "world"})).await;
397 match result {
398 ProcedureResult::Ok(v) => assert_eq!(v, serde_json::json!({"hello": "world"})),
399 ProcedureResult::Err { .. } => panic!("expected Ok"),
400 }
401 }
402
403 #[tokio::test]
404 async fn stream_body_emits_collected_frames() {
405 use futures::stream::{self, StreamExt};
406
407 // Yield three `StreamFrame::Data` items — proves the descriptor's
408 // streaming side compiles, runs, and produces the expected sequence.
409 let handler: StreamHandler = Arc::new(|_input: serde_json::Value| {
410 let frames = vec![
411 StreamFrame::Data(serde_json::json!(1)),
412 StreamFrame::Data(serde_json::json!(2)),
413 StreamFrame::Data(serde_json::json!(3)),
414 ];
415 stream::iter(frames).boxed()
416 });
417 let desc = ProcedureDescriptor {
418 name: "counter",
419 kind: crate::router::ProcKindRuntime::Subscription,
420 ir: dummy_procedure_ir("counter"),
421 type_defs: vec![],
422 body: ProcedureBody::Stream(handler),
423 };
424
425 let s = match &desc.body {
426 ProcedureBody::Stream(s) => s.clone(),
427 ProcedureBody::Unary(_) => panic!("expected Stream body"),
428 };
429 let frames: Vec<StreamFrame> = s(serde_json::Value::Null).collect().await;
430 assert_eq!(frames.len(), 3);
431 let values: Vec<serde_json::Value> = frames
432 .into_iter()
433 .map(|f| match f {
434 StreamFrame::Data(v) => v,
435 StreamFrame::Error { .. } => panic!("expected Data frame"),
436 })
437 .collect();
438 assert_eq!(
439 values,
440 vec![
441 serde_json::json!(1),
442 serde_json::json!(2),
443 serde_json::json!(3),
444 ]
445 );
446 }
447
448 #[test]
449 fn stream_frame_data_serializes_payload_in_place() {
450 // `StreamFrame` is runtime-only — it doesn't implement Serialize /
451 // Deserialize, so there's no JSON round-trip to assert. Instead,
452 // verify that `StreamFrame::data` serializes its argument *into* the
453 // variant payload (so the router doesn't need to re-serialize).
454 let f = StreamFrame::data(42u32);
455 match f {
456 StreamFrame::Data(v) => assert_eq!(v, serde_json::json!(42)),
457 StreamFrame::Error { .. } => panic!("expected Data variant"),
458 }
459 }
460
461 #[test]
462 fn stream_frame_err_builds_error_variant() {
463 let f = StreamFrame::err("rate_limited", serde_json::json!({"retry_after": 5}));
464 match f {
465 StreamFrame::Error { code, payload } => {
466 assert_eq!(code, "rate_limited");
467 assert_eq!(payload, serde_json::json!({"retry_after": 5}));
468 }
469 StreamFrame::Data(_) => panic!("expected Error variant"),
470 }
471 }
472
473 #[test]
474 fn stream_frame_from_taut_error_preserves_code() {
475 let f = StreamFrame::from_taut_error(crate::error::StandardError::Unauthenticated);
476 match f {
477 StreamFrame::Error { code, .. } => assert_eq!(code, "unauthenticated"),
478 StreamFrame::Data(_) => panic!("expected Error variant"),
479 }
480 }
481}