Skip to main content

camel_core/step/
function_step.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::sync::Arc;
4use std::task::{Context, Poll};
5use std::time::Duration;
6
7use camel_api::{
8    CamelError, Exchange, ExchangePatch, FunctionDefinition, FunctionId, FunctionInvocationError,
9    FunctionInvoker, PatchBody,
10};
11use tower::Service;
12use tracing::Instrument;
13
14#[derive(Clone)]
15pub struct FunctionStep {
16    definition: FunctionDefinition,
17    invoker: Arc<dyn FunctionInvoker>,
18}
19
20impl FunctionStep {
21    pub fn new(invoker: Arc<dyn FunctionInvoker>, definition: FunctionDefinition) -> Self {
22        Self {
23            definition,
24            invoker,
25        }
26    }
27}
28
29impl Service<Exchange> for FunctionStep {
30    type Response = Exchange;
31    type Error = CamelError;
32    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
33
34    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
35        Poll::Ready(Ok(()))
36    }
37
38    fn call(&mut self, mut ex: Exchange) -> Self::Future {
39        let invoker = Arc::clone(&self.invoker);
40        let id = self.definition.id.clone();
41        let runtime = self.definition.runtime.clone();
42        let timeout_ms = self.definition.timeout_ms;
43        let span = tracing::info_span!(
44            target: "camel_function",
45            "function",
46            function_id = %id.0,
47            runtime = %runtime,
48            timeout_ms = timeout_ms,
49            status = tracing::field::Empty,
50            duration_ms = tracing::field::Empty,
51            error_kind = tracing::field::Empty,
52        );
53        Box::pin(async move {
54            let start = std::time::Instant::now();
55            let outcome: Result<ExchangePatch, CamelError> = async {
56                let result = tokio::time::timeout(
57                    Duration::from_millis(timeout_ms),
58                    invoker.invoke(&id, &ex),
59                )
60                .await
61                .map_err(|_| {
62                    CamelError::ProcessorError(format!(
63                        "function:timeout: {} timed out after {}ms",
64                        id.0, timeout_ms
65                    ))
66                })?;
67                let patch = result.map_err(|e| map_invocation_error(e, &id))?;
68                Ok(patch)
69            }
70            .instrument(span.clone())
71            .await;
72            let elapsed = start.elapsed().as_millis() as u64;
73            span.record("duration_ms", elapsed);
74            match &outcome {
75                Ok(_) => {
76                    span.record("status", "ok");
77                }
78                Err(CamelError::ProcessorError(msg)) => {
79                    let kind = if msg.starts_with("function:timeout:") {
80                        "timeout"
81                    } else if msg.starts_with("function:user_error:") {
82                        "user_error"
83                    } else if msg.starts_with("function:runner_unavailable:") {
84                        "runner_unavailable"
85                    } else if msg.starts_with("function:not_registered:") {
86                        "not_registered"
87                    } else if msg.starts_with("function:transport:") {
88                        "transport"
89                    } else if msg.starts_with("function:invalid_patch:") {
90                        "invalid_patch"
91                    } else {
92                        "unknown"
93                    };
94                    span.record("status", kind);
95                    span.record("error_kind", kind);
96                }
97                Err(_) => {
98                    span.record("status", "unknown");
99                    span.record("error_kind", "unknown");
100                }
101            }
102            let patch = outcome?;
103            apply_patch(&mut ex, patch);
104            Ok(ex)
105        })
106    }
107}
108
109fn map_invocation_error(err: FunctionInvocationError, id: &FunctionId) -> CamelError {
110    match err {
111        FunctionInvocationError::UserError { message, stack, .. } => {
112            let detail = match stack {
113                Some(s) if !s.is_empty() => {
114                    format!("function:user_error: {}: {}\n{}", id.0, message, s)
115                }
116                _ => format!("function:user_error: {}: {}", id.0, message),
117            };
118            CamelError::ProcessorError(detail)
119        }
120        FunctionInvocationError::Timeout { timeout_ms, .. } => CamelError::ProcessorError(format!(
121            "function:timeout: {} timed out after {}ms",
122            id.0, timeout_ms
123        )),
124        FunctionInvocationError::NotRegistered { .. } => {
125            CamelError::ProcessorError(format!("function:not_registered: {}", id.0))
126        }
127        FunctionInvocationError::RunnerUnavailable { reason } => {
128            CamelError::ProcessorError(format!("function:runner_unavailable: {}: {}", id.0, reason))
129        }
130        FunctionInvocationError::Transport(msg) => {
131            CamelError::ProcessorError(format!("function:transport: {}: {}", id.0, msg))
132        }
133        FunctionInvocationError::InvalidPatch(msg) => {
134            CamelError::ProcessorError(format!("function:invalid_patch: {}: {}", id.0, msg))
135        }
136    }
137}
138
139fn apply_patch(ex: &mut Exchange, patch: ExchangePatch) {
140    if let Some(body) = patch.body {
141        ex.input.body = match body {
142            PatchBody::Text(s) => s.into(),
143            PatchBody::Json(v) => v.into(),
144            PatchBody::Empty => camel_api::Body::Empty,
145        };
146    }
147    for (k, v) in patch.headers_set {
148        ex.input.headers.insert(k, v);
149    }
150    for k in patch.headers_removed {
151        ex.input.headers.remove(&k);
152    }
153    for (k, v) in patch.properties_set {
154        ex.properties.insert(k, v);
155    }
156}
157
158#[cfg(test)]
159mod tests {
160    use super::*;
161    use async_trait::async_trait;
162    use camel_api::function::PrepareToken;
163    use camel_api::{FunctionDiff, FunctionInvokerSync};
164    use std::sync::Mutex;
165
166    struct MockInvoker {
167        responses: Mutex<Vec<Result<ExchangePatch, FunctionInvocationError>>>,
168    }
169
170    impl MockInvoker {
171        fn new(responses: Vec<Result<ExchangePatch, FunctionInvocationError>>) -> Self {
172            Self {
173                responses: Mutex::new(responses),
174            }
175        }
176    }
177
178    impl FunctionInvokerSync for MockInvoker {
179        fn stage_pending(
180            &self,
181            _def: FunctionDefinition,
182            _route_id: Option<&str>,
183            _generation: u64,
184        ) {
185        }
186        fn discard_staging(&self, _generation: u64) {}
187        fn begin_reload(&self) -> u64 {
188            0
189        }
190        fn function_refs_for_route(&self, _route_id: &str) -> Vec<(FunctionId, Option<String>)> {
191            vec![]
192        }
193        fn staged_refs_for_route(
194            &self,
195            _route_id: &str,
196            _generation: u64,
197        ) -> Vec<(FunctionId, Option<String>)> {
198            vec![]
199        }
200        fn staged_defs_for_route(
201            &self,
202            _route_id: &str,
203            _generation: u64,
204        ) -> Vec<(FunctionDefinition, Option<String>)> {
205            vec![]
206        }
207    }
208
209    #[async_trait]
210    impl FunctionInvoker for MockInvoker {
211        async fn register(
212            &self,
213            _def: FunctionDefinition,
214            _route_id: Option<&str>,
215        ) -> Result<(), FunctionInvocationError> {
216            Ok(())
217        }
218        async fn unregister(
219            &self,
220            _id: &FunctionId,
221            _route_id: Option<&str>,
222        ) -> Result<(), FunctionInvocationError> {
223            Ok(())
224        }
225        async fn invoke(
226            &self,
227            _id: &FunctionId,
228            _exchange: &Exchange,
229        ) -> Result<ExchangePatch, FunctionInvocationError> {
230            let mut resp = self.responses.lock().unwrap();
231            resp.remove(0)
232        }
233        async fn prepare_reload(
234            &self,
235            _diff: FunctionDiff,
236            _generation: u64,
237        ) -> Result<PrepareToken, FunctionInvocationError> {
238            Ok(PrepareToken::default())
239        }
240        async fn finalize_reload(
241            &self,
242            _diff: &FunctionDiff,
243            _generation: u64,
244        ) -> Result<(), FunctionInvocationError> {
245            Ok(())
246        }
247        async fn rollback_reload(
248            &self,
249            _token: PrepareToken,
250            _generation: u64,
251        ) -> Result<(), FunctionInvocationError> {
252            Ok(())
253        }
254        async fn commit_reload(
255            &self,
256            _diff: FunctionDiff,
257            _generation: u64,
258        ) -> Result<(), FunctionInvocationError> {
259            Ok(())
260        }
261        async fn commit_staged(&self) -> Result<(), FunctionInvocationError> {
262            Ok(())
263        }
264    }
265
266    fn test_definition() -> FunctionDefinition {
267        FunctionDefinition {
268            id: FunctionId::compute("deno", "test", 5000),
269            runtime: "deno".into(),
270            source: "test".into(),
271            timeout_ms: 5000,
272            route_id: None,
273            step_index: None,
274        }
275    }
276
277    #[tokio::test]
278    async fn function_step_applies_patch_body_text() {
279        let invoker = Arc::new(MockInvoker::new(vec![Ok(ExchangePatch {
280            body: Some(PatchBody::Text("patched".into())),
281            ..Default::default()
282        })]));
283        let mut step = FunctionStep::new(invoker, test_definition());
284        let ex = Exchange::default();
285        let result = step.call(ex).await.unwrap();
286        assert_eq!(result.input.body.as_text(), Some("patched"));
287    }
288
289    #[tokio::test]
290    async fn function_step_applies_patch_headers() {
291        let invoker = Arc::new(MockInvoker::new(vec![Ok(ExchangePatch {
292            headers_set: vec![("x-key".into(), serde_json::json!("val"))],
293            headers_removed: vec!["x-old".into()],
294            ..Default::default()
295        })]));
296        let mut step = FunctionStep::new(invoker, test_definition());
297        let mut ex = Exchange::default();
298        ex.input
299            .headers
300            .insert("x-old".into(), serde_json::json!("gone"));
301        let result = step.call(ex).await.unwrap();
302        assert_eq!(
303            result.input.headers.get("x-key").unwrap().as_str(),
304            Some("val")
305        );
306        assert!(!result.input.headers.contains_key("x-old"));
307    }
308
309    #[tokio::test]
310    async fn function_step_applies_patch_properties() {
311        let invoker = Arc::new(MockInvoker::new(vec![Ok(ExchangePatch {
312            properties_set: vec![("prop".into(), serde_json::json!(42))],
313            ..Default::default()
314        })]));
315        let mut step = FunctionStep::new(invoker, test_definition());
316        let ex = Exchange::default();
317        let result = step.call(ex).await.unwrap();
318        assert_eq!(result.properties.get("prop").unwrap().as_i64(), Some(42));
319    }
320
321    #[tokio::test]
322    async fn function_step_maps_timeout_error() {
323        let invoker = Arc::new(MockInvoker::new(vec![Err(
324            FunctionInvocationError::Timeout {
325                function_id: FunctionId("x".into()),
326                timeout_ms: 5000,
327            },
328        )]));
329        let mut step = FunctionStep::new(invoker, test_definition());
330        let ex = Exchange::default();
331        let err = step.call(ex).await.unwrap_err();
332        let msg = match &err {
333            CamelError::ProcessorError(m) => m,
334            _ => panic!("wrong error type"),
335        };
336        assert!(msg.contains("function:timeout:"));
337    }
338
339    #[tokio::test]
340    async fn function_step_maps_user_error() {
341        let invoker = Arc::new(MockInvoker::new(vec![Err(
342            FunctionInvocationError::UserError {
343                function_id: FunctionId("x".into()),
344                message: "boom".into(),
345                stack: None,
346            },
347        )]));
348        let mut step = FunctionStep::new(invoker, test_definition());
349        let ex = Exchange::default();
350        let err = step.call(ex).await.unwrap_err();
351        let msg = match &err {
352            CamelError::ProcessorError(m) => m,
353            _ => panic!("wrong error type"),
354        };
355        assert!(msg.contains("function:user_error:"));
356        assert!(msg.contains("boom"));
357    }
358
359    #[tokio::test]
360    async fn function_step_client_side_timeout_fires() {
361        struct SlowInvoker;
362        impl FunctionInvokerSync for SlowInvoker {
363            fn stage_pending(
364                &self,
365                _def: FunctionDefinition,
366                _route_id: Option<&str>,
367                _generation: u64,
368            ) {
369            }
370            fn discard_staging(&self, _generation: u64) {}
371            fn begin_reload(&self) -> u64 {
372                0
373            }
374            fn function_refs_for_route(
375                &self,
376                _route_id: &str,
377            ) -> Vec<(FunctionId, Option<String>)> {
378                vec![]
379            }
380            fn staged_refs_for_route(
381                &self,
382                _route_id: &str,
383                _generation: u64,
384            ) -> Vec<(FunctionId, Option<String>)> {
385                vec![]
386            }
387            fn staged_defs_for_route(
388                &self,
389                _route_id: &str,
390                _generation: u64,
391            ) -> Vec<(FunctionDefinition, Option<String>)> {
392                vec![]
393            }
394        }
395        #[async_trait]
396        impl FunctionInvoker for SlowInvoker {
397            async fn register(
398                &self,
399                _def: FunctionDefinition,
400                _route_id: Option<&str>,
401            ) -> Result<(), FunctionInvocationError> {
402                Ok(())
403            }
404            async fn unregister(
405                &self,
406                _id: &FunctionId,
407                _route_id: Option<&str>,
408            ) -> Result<(), FunctionInvocationError> {
409                Ok(())
410            }
411            async fn invoke(
412                &self,
413                _id: &FunctionId,
414                _exchange: &Exchange,
415            ) -> Result<ExchangePatch, FunctionInvocationError> {
416                tokio::time::sleep(Duration::from_secs(10)).await;
417                Ok(ExchangePatch::default())
418            }
419            async fn prepare_reload(
420                &self,
421                _diff: FunctionDiff,
422                _generation: u64,
423            ) -> Result<PrepareToken, FunctionInvocationError> {
424                Ok(PrepareToken::default())
425            }
426            async fn finalize_reload(
427                &self,
428                _diff: &FunctionDiff,
429                _generation: u64,
430            ) -> Result<(), FunctionInvocationError> {
431                Ok(())
432            }
433            async fn rollback_reload(
434                &self,
435                _token: PrepareToken,
436                _generation: u64,
437            ) -> Result<(), FunctionInvocationError> {
438                Ok(())
439            }
440            async fn commit_reload(
441                &self,
442                _diff: FunctionDiff,
443                _generation: u64,
444            ) -> Result<(), FunctionInvocationError> {
445                Ok(())
446            }
447            async fn commit_staged(&self) -> Result<(), FunctionInvocationError> {
448                Ok(())
449            }
450        }
451        let def = FunctionDefinition {
452            id: FunctionId::compute("deno", "slow", 50),
453            runtime: "deno".into(),
454            source: "slow".into(),
455            timeout_ms: 50,
456            route_id: None,
457            step_index: None,
458        };
459        let mut step = FunctionStep::new(Arc::new(SlowInvoker), def);
460        let ex = Exchange::default();
461        let err = step.call(ex).await.unwrap_err();
462        let msg = match &err {
463            CamelError::ProcessorError(m) => m,
464            _ => panic!("wrong error type"),
465        };
466        assert!(msg.contains("function:timeout:"));
467    }
468
469    #[tokio::test]
470    async fn function_step_emits_tracing_span() {
471        use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering};
472        use tracing_subscriber::prelude::*;
473
474        let invoker = Arc::new(MockInvoker::new(vec![Ok(ExchangePatch::default())]));
475        let def = FunctionDefinition {
476            id: FunctionId::compute("deno", "span_test", 5000),
477            runtime: "deno".into(),
478            source: "span_test".into(),
479            timeout_ms: 5000,
480            route_id: None,
481            step_index: None,
482        };
483        let mut step = FunctionStep::new(invoker, def);
484        let ex = Exchange::default();
485
486        let span_seen = Arc::new(AtomicBool::new(false));
487        let span_seen_clone = span_seen.clone();
488
489        let layer = tracing_subscriber::fmt::layer()
490            .with_writer(std::io::sink)
491            .with_filter(tracing_subscriber::filter::filter_fn(move |meta| {
492                if meta.target() == "camel_function" && meta.name() == "function" {
493                    span_seen_clone.store(true, AtomicOrdering::SeqCst);
494                }
495                true
496            }));
497
498        let _guard = tracing_subscriber::registry().with(layer).set_default();
499        let result = step.call(ex).await;
500        assert!(result.is_ok());
501        assert!(
502            span_seen.load(AtomicOrdering::SeqCst),
503            "expected function span with target 'camel_function' and name 'function'"
504        );
505    }
506
507    #[tokio::test]
508    async fn function_step_user_error_with_stack() {
509        let invoker = Arc::new(MockInvoker::new(vec![Err(
510            FunctionInvocationError::UserError {
511                function_id: FunctionId("x".into()),
512                message: "custom error".into(),
513                stack: Some("at line 1\nat line 2".into()),
514            },
515        )]));
516        let mut step = FunctionStep::new(invoker, test_definition());
517        let ex = Exchange::default();
518        let err = step.call(ex).await.unwrap_err();
519        let msg = match &err {
520            CamelError::ProcessorError(m) => m.clone(),
521            _ => panic!("wrong error type"),
522        };
523        assert!(msg.contains("function:user_error:"));
524        assert!(msg.contains("custom error"));
525        assert!(msg.contains("at line 1"));
526    }
527}