orpc_procedure/
procedure.rs1use std::panic::{AssertUnwindSafe, catch_unwind};
2use std::sync::Arc;
3
4use crate::error::ProcedureError;
5use crate::input::DynInput;
6use crate::route::{ErrorMap, Meta, Route};
7use crate::schema::ErasedSchema;
8use crate::stream::ProcedureStream;
9
10pub struct ErasedProcedure<TCtx> {
16 exec: Arc<dyn Fn(TCtx, DynInput) -> ProcedureStream + Send + Sync>,
17 pub input_schema: Option<Box<dyn ErasedSchema>>,
18 pub output_schema: Option<Box<dyn ErasedSchema>>,
19 pub error_map: ErrorMap,
20 pub route: Route,
21 pub meta: Meta,
22}
23
24impl<TCtx> ErasedProcedure<TCtx> {
25 pub fn new(
29 exec: impl Fn(TCtx, DynInput) -> ProcedureStream + Send + Sync + 'static,
30 route: Route,
31 meta: Meta,
32 ) -> Self {
33 ErasedProcedure {
34 exec: Arc::new(exec),
35 input_schema: None,
36 output_schema: None,
37 error_map: ErrorMap::default(),
38 route,
39 meta,
40 }
41 }
42
43 pub fn with_input_schema(mut self, schema: impl ErasedSchema) -> Self {
45 self.input_schema = Some(Box::new(schema));
46 self
47 }
48
49 pub fn with_input_schema_boxed(mut self, schema: Box<dyn ErasedSchema>) -> Self {
51 self.input_schema = Some(schema);
52 self
53 }
54
55 pub fn with_output_schema(mut self, schema: impl ErasedSchema) -> Self {
57 self.output_schema = Some(Box::new(schema));
58 self
59 }
60
61 pub fn with_output_schema_boxed(mut self, schema: Box<dyn ErasedSchema>) -> Self {
63 self.output_schema = Some(schema);
64 self
65 }
66
67 pub fn with_error_map(mut self, error_map: ErrorMap) -> Self {
69 self.error_map = error_map;
70 self
71 }
72
73 pub fn exec(&self, ctx: TCtx, input: DynInput) -> ProcedureStream {
83 match catch_unwind(AssertUnwindSafe(|| (self.exec)(ctx, input))) {
84 Ok(stream) => stream,
85 Err(panic) => ProcedureStream::error(ProcedureError::Unwind(panic)),
86 }
87 }
88}
89
90impl<TCtx> std::fmt::Debug for ErasedProcedure<TCtx> {
91 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
92 f.debug_struct("ErasedProcedure")
93 .field("route", &self.route)
94 .field("meta", &self.meta)
95 .field("has_input_schema", &self.input_schema.is_some())
96 .field("has_output_schema", &self.output_schema.is_some())
97 .finish()
98 }
99}
100
101#[cfg(test)]
102mod tests {
103 use super::*;
104 use crate::output::DynOutput;
105 use futures_util::StreamExt;
106 use serde::Deserialize;
107
108 #[derive(Debug, Deserialize)]
109 struct TestInput {
110 value: u32,
111 }
112
113 #[tokio::test]
114 async fn basic_exec() {
115 let proc = ErasedProcedure::new(
116 |_ctx: (), input: DynInput| {
117 let test_input: TestInput = input.deserialize().unwrap();
118 ProcedureStream::from_future(
119 async move { Ok(DynOutput::new(test_input.value * 2)) },
120 )
121 },
122 Route::get("/test"),
123 Meta::default(),
124 );
125
126 let input = DynInput::from_value(serde_json::json!({"value": 21}));
127 let mut stream = proc.exec((), input);
128 let result = stream.next().await.unwrap().unwrap();
129 assert_eq!(result.to_value().unwrap(), serde_json::json!(42));
130 }
131
132 #[tokio::test]
133 async fn panic_safety() {
134 let proc = ErasedProcedure::new(
135 |_ctx: (), _input: DynInput| -> ProcedureStream {
136 panic!("handler exploded");
137 },
138 Route::default(),
139 Meta::default(),
140 );
141
142 let input = DynInput::from_value(serde_json::json!(null));
143 let mut stream = proc.exec((), input);
144 let result = stream.next().await.unwrap();
145 assert!(matches!(result, Err(ProcedureError::Unwind(_))));
146 }
147
148 #[tokio::test]
149 async fn multiple_calls_arc_shared() {
150 let proc = ErasedProcedure::new(
151 |ctx: u32, _input: DynInput| {
152 ProcedureStream::from_future(async move { Ok(DynOutput::new(ctx + 1)) })
153 },
154 Route::default(),
155 Meta::default(),
156 );
157
158 for i in 0..3 {
159 let input = DynInput::from_value(serde_json::json!(null));
160 let mut stream = proc.exec(i, input);
161 let result = stream.next().await.unwrap().unwrap();
162 assert_eq!(result.to_value().unwrap(), serde_json::json!(i + 1));
163 }
164 }
165
166 #[test]
167 fn route_and_meta_accessible() {
168 let proc = ErasedProcedure::new(
169 |_ctx: (), _input: DynInput| {
170 ProcedureStream::error(ProcedureError::Unwind(Box::new("unused")))
171 },
172 Route::get("/users").tag("users").summary("List users"),
173 Meta::default(),
174 );
175
176 assert_eq!(proc.route.method, Some(crate::route::HttpMethod::Get));
177 assert_eq!(proc.route.path.as_deref(), Some("/users"));
178 assert_eq!(proc.route.tags, vec!["users"]);
179 assert_eq!(proc.route.summary.as_deref(), Some("List users"));
180 }
181
182 #[test]
183 fn with_schemas() {
184 use crate::schema::NoSchema;
185
186 let proc = ErasedProcedure::new(
187 |_ctx: (), _input: DynInput| {
188 ProcedureStream::error(ProcedureError::Unwind(Box::new("unused")))
189 },
190 Route::default(),
191 Meta::default(),
192 )
193 .with_input_schema(NoSchema)
194 .with_output_schema(NoSchema);
195
196 assert!(proc.input_schema.is_some());
197 assert!(proc.output_schema.is_some());
198 }
199
200 #[test]
201 fn debug_output() {
202 let proc = ErasedProcedure::new(
203 |_ctx: (), _input: DynInput| {
204 ProcedureStream::error(ProcedureError::Unwind(Box::new("unused")))
205 },
206 Route::get("/test"),
207 Meta::default(),
208 );
209 let debug = format!("{proc:?}");
210 assert!(debug.contains("ErasedProcedure"));
211 assert!(debug.contains("route"));
212 }
213
214 #[test]
215 fn erased_procedure_is_send_sync() {
216 fn assert_send_sync<T: Send + Sync>() {}
217 assert_send_sync::<ErasedProcedure<()>>();
218 }
219}