Skip to main content

orpc_procedure/
procedure.rs

1use std::panic::{AssertUnwindSafe, catch_unwind};
2use std::sync::Arc;
3
4use crate::error::ProcedureError;
5use crate::input::DynInput;
6use crate::route::{ErrorMap, Meta, Route};
7use crate::schema::ErasedSchema;
8use crate::stream::ProcedureStream;
9
10/// Fully type-erased procedure. Only `TCtx` (= TBaseCtx) remains as generic.
11///
12/// This is what Router and transport integrations work with.
13/// `TInput`/`TOutput`/`TError` are captured inside the `exec` closure —
14/// deserialization and serialization happen internally.
15pub struct ErasedProcedure<TCtx> {
16    exec: Arc<dyn Fn(TCtx, DynInput) -> ProcedureStream + Send + Sync>,
17    pub input_schema: Option<Box<dyn ErasedSchema>>,
18    pub output_schema: Option<Box<dyn ErasedSchema>>,
19    pub error_map: ErrorMap,
20    pub route: Route,
21    pub meta: Meta,
22}
23
24impl<TCtx> ErasedProcedure<TCtx> {
25    /// Create a new procedure with the given execution closure.
26    ///
27    /// Schemas and error_map default to None/default.
28    pub fn new(
29        exec: impl Fn(TCtx, DynInput) -> ProcedureStream + Send + Sync + 'static,
30        route: Route,
31        meta: Meta,
32    ) -> Self {
33        ErasedProcedure {
34            exec: Arc::new(exec),
35            input_schema: None,
36            output_schema: None,
37            error_map: ErrorMap::default(),
38            route,
39            meta,
40        }
41    }
42
43    /// Set the input schema.
44    pub fn with_input_schema(mut self, schema: impl ErasedSchema) -> Self {
45        self.input_schema = Some(Box::new(schema));
46        self
47    }
48
49    /// Set the input schema from a boxed trait object.
50    pub fn with_input_schema_boxed(mut self, schema: Box<dyn ErasedSchema>) -> Self {
51        self.input_schema = Some(schema);
52        self
53    }
54
55    /// Set the output schema.
56    pub fn with_output_schema(mut self, schema: impl ErasedSchema) -> Self {
57        self.output_schema = Some(Box::new(schema));
58        self
59    }
60
61    /// Set the output schema from a boxed trait object.
62    pub fn with_output_schema_boxed(mut self, schema: Box<dyn ErasedSchema>) -> Self {
63        self.output_schema = Some(schema);
64        self
65    }
66
67    /// Set the error map.
68    pub fn with_error_map(mut self, error_map: ErrorMap) -> Self {
69        self.error_map = error_map;
70        self
71    }
72
73    /// Execute the procedure with type-erased input.
74    ///
75    /// Wraps the call in `catch_unwind` to prevent handler panics from
76    /// crashing the server. Panics are caught and returned as
77    /// `ProcedureError::Unwind`.
78    ///
79    /// **Note**: Only synchronous panics (during closure invocation) are caught.
80    /// Panics inside the returned `ProcedureStream`'s async polling are NOT
81    /// caught — this is a known limitation shared with rspc.
82    pub fn exec(&self, ctx: TCtx, input: DynInput) -> ProcedureStream {
83        match catch_unwind(AssertUnwindSafe(|| (self.exec)(ctx, input))) {
84            Ok(stream) => stream,
85            Err(panic) => ProcedureStream::error(ProcedureError::Unwind(panic)),
86        }
87    }
88}
89
90impl<TCtx> std::fmt::Debug for ErasedProcedure<TCtx> {
91    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
92        f.debug_struct("ErasedProcedure")
93            .field("route", &self.route)
94            .field("meta", &self.meta)
95            .field("has_input_schema", &self.input_schema.is_some())
96            .field("has_output_schema", &self.output_schema.is_some())
97            .finish()
98    }
99}
100
101#[cfg(test)]
102mod tests {
103    use super::*;
104    use crate::output::DynOutput;
105    use futures_util::StreamExt;
106    use serde::Deserialize;
107
108    #[derive(Debug, Deserialize)]
109    struct TestInput {
110        value: u32,
111    }
112
113    #[tokio::test]
114    async fn basic_exec() {
115        let proc = ErasedProcedure::new(
116            |_ctx: (), input: DynInput| {
117                let test_input: TestInput = input.deserialize().unwrap();
118                ProcedureStream::from_future(
119                    async move { Ok(DynOutput::new(test_input.value * 2)) },
120                )
121            },
122            Route::get("/test"),
123            Meta::default(),
124        );
125
126        let input = DynInput::from_value(serde_json::json!({"value": 21}));
127        let mut stream = proc.exec((), input);
128        let result = stream.next().await.unwrap().unwrap();
129        assert_eq!(result.to_value().unwrap(), serde_json::json!(42));
130    }
131
132    #[tokio::test]
133    async fn panic_safety() {
134        let proc = ErasedProcedure::new(
135            |_ctx: (), _input: DynInput| -> ProcedureStream {
136                panic!("handler exploded");
137            },
138            Route::default(),
139            Meta::default(),
140        );
141
142        let input = DynInput::from_value(serde_json::json!(null));
143        let mut stream = proc.exec((), input);
144        let result = stream.next().await.unwrap();
145        assert!(matches!(result, Err(ProcedureError::Unwind(_))));
146    }
147
148    #[tokio::test]
149    async fn multiple_calls_arc_shared() {
150        let proc = ErasedProcedure::new(
151            |ctx: u32, _input: DynInput| {
152                ProcedureStream::from_future(async move { Ok(DynOutput::new(ctx + 1)) })
153            },
154            Route::default(),
155            Meta::default(),
156        );
157
158        for i in 0..3 {
159            let input = DynInput::from_value(serde_json::json!(null));
160            let mut stream = proc.exec(i, input);
161            let result = stream.next().await.unwrap().unwrap();
162            assert_eq!(result.to_value().unwrap(), serde_json::json!(i + 1));
163        }
164    }
165
166    #[test]
167    fn route_and_meta_accessible() {
168        let proc = ErasedProcedure::new(
169            |_ctx: (), _input: DynInput| {
170                ProcedureStream::error(ProcedureError::Unwind(Box::new("unused")))
171            },
172            Route::get("/users").tag("users").summary("List users"),
173            Meta::default(),
174        );
175
176        assert_eq!(proc.route.method, Some(crate::route::HttpMethod::Get));
177        assert_eq!(proc.route.path.as_deref(), Some("/users"));
178        assert_eq!(proc.route.tags, vec!["users"]);
179        assert_eq!(proc.route.summary.as_deref(), Some("List users"));
180    }
181
182    #[test]
183    fn with_schemas() {
184        use crate::schema::NoSchema;
185
186        let proc = ErasedProcedure::new(
187            |_ctx: (), _input: DynInput| {
188                ProcedureStream::error(ProcedureError::Unwind(Box::new("unused")))
189            },
190            Route::default(),
191            Meta::default(),
192        )
193        .with_input_schema(NoSchema)
194        .with_output_schema(NoSchema);
195
196        assert!(proc.input_schema.is_some());
197        assert!(proc.output_schema.is_some());
198    }
199
200    #[test]
201    fn debug_output() {
202        let proc = ErasedProcedure::new(
203            |_ctx: (), _input: DynInput| {
204                ProcedureStream::error(ProcedureError::Unwind(Box::new("unused")))
205            },
206            Route::get("/test"),
207            Meta::default(),
208        );
209        let debug = format!("{proc:?}");
210        assert!(debug.contains("ErasedProcedure"));
211        assert!(debug.contains("route"));
212    }
213
214    #[test]
215    fn erased_procedure_is_send_sync() {
216        fn assert_send_sync<T: Send + Sync>() {}
217        assert_send_sync::<ErasedProcedure<()>>();
218    }
219}