1use std::future::Future;
2use std::pin::Pin;
3use std::sync::Arc;
4use std::task::{Context, Poll};
5use std::time::Duration;
6
7use camel_api::{
8 CamelError, Exchange, ExchangePatch, FunctionDefinition, FunctionId, FunctionInvocationError,
9 FunctionInvoker, PatchBody,
10};
11use tower::Service;
12use tracing::Instrument;
13
14#[derive(Clone)]
15pub struct FunctionStep {
16 definition: FunctionDefinition,
17 invoker: Arc<dyn FunctionInvoker>,
18}
19
20impl FunctionStep {
21 pub fn new(invoker: Arc<dyn FunctionInvoker>, definition: FunctionDefinition) -> Self {
22 Self {
23 definition,
24 invoker,
25 }
26 }
27}
28
29impl Service<Exchange> for FunctionStep {
30 type Response = Exchange;
31 type Error = CamelError;
32 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
33
34 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
35 Poll::Ready(Ok(()))
36 }
37
38 fn call(&mut self, mut ex: Exchange) -> Self::Future {
39 let invoker = Arc::clone(&self.invoker);
40 let id = self.definition.id.clone();
41 let runtime = self.definition.runtime.clone();
42 let timeout_ms = self.definition.timeout_ms;
43 let span = tracing::info_span!(
44 target: "camel_function",
45 "function",
46 function_id = %id.0,
47 runtime = %runtime,
48 timeout_ms = timeout_ms,
49 status = tracing::field::Empty,
50 duration_ms = tracing::field::Empty,
51 error_kind = tracing::field::Empty,
52 );
53 Box::pin(async move {
54 let start = std::time::Instant::now();
55 let outcome: Result<ExchangePatch, CamelError> = async {
56 let result = tokio::time::timeout(
57 Duration::from_millis(timeout_ms),
58 invoker.invoke(&id, &ex),
59 )
60 .await
61 .map_err(|_| {
62 CamelError::ProcessorError(format!(
63 "function:timeout: {} timed out after {}ms",
64 id.0, timeout_ms
65 ))
66 })?;
67 let patch = result.map_err(|e| map_invocation_error(e, &id))?;
68 Ok(patch)
69 }
70 .instrument(span.clone())
71 .await;
72 let elapsed = start.elapsed().as_millis() as u64;
73 span.record("duration_ms", elapsed);
74 match &outcome {
75 Ok(_) => {
76 span.record("status", "ok");
77 }
78 Err(CamelError::ProcessorError(msg)) => {
79 let kind = if msg.starts_with("function:timeout:") {
80 "timeout"
81 } else if msg.starts_with("function:user_error:") {
82 "user_error"
83 } else if msg.starts_with("function:runner_unavailable:") {
84 "runner_unavailable"
85 } else if msg.starts_with("function:not_registered:") {
86 "not_registered"
87 } else if msg.starts_with("function:transport:") {
88 "transport"
89 } else if msg.starts_with("function:invalid_patch:") {
90 "invalid_patch"
91 } else {
92 "unknown"
93 };
94 span.record("status", kind);
95 span.record("error_kind", kind);
96 }
97 Err(_) => {
98 span.record("status", "unknown");
99 span.record("error_kind", "unknown");
100 }
101 }
102 let patch = outcome?;
103 apply_patch(&mut ex, patch);
104 Ok(ex)
105 })
106 }
107}
108
109fn map_invocation_error(err: FunctionInvocationError, id: &FunctionId) -> CamelError {
110 match err {
111 FunctionInvocationError::UserError { message, stack, .. } => {
112 let detail = match stack {
113 Some(s) if !s.is_empty() => {
114 format!("function:user_error: {}: {}\n{}", id.0, message, s)
115 }
116 _ => format!("function:user_error: {}: {}", id.0, message),
117 };
118 CamelError::ProcessorError(detail)
119 }
120 FunctionInvocationError::Timeout { timeout_ms, .. } => CamelError::ProcessorError(format!(
121 "function:timeout: {} timed out after {}ms",
122 id.0, timeout_ms
123 )),
124 FunctionInvocationError::NotRegistered { .. } => {
125 CamelError::ProcessorError(format!("function:not_registered: {}", id.0))
126 }
127 FunctionInvocationError::RunnerUnavailable { reason } => {
128 CamelError::ProcessorError(format!("function:runner_unavailable: {}: {}", id.0, reason))
129 }
130 FunctionInvocationError::Transport(msg) => {
131 CamelError::ProcessorError(format!("function:transport: {}: {}", id.0, msg))
132 }
133 FunctionInvocationError::InvalidPatch(msg) => {
134 CamelError::ProcessorError(format!("function:invalid_patch: {}: {}", id.0, msg))
135 }
136 }
137}
138
139fn apply_patch(ex: &mut Exchange, patch: ExchangePatch) {
140 if let Some(body) = patch.body {
141 ex.input.body = match body {
142 PatchBody::Text(s) => s.into(),
143 PatchBody::Json(v) => v.into(),
144 PatchBody::Empty => camel_api::Body::Empty,
145 };
146 }
147 for (k, v) in patch.headers_set {
148 ex.input.headers.insert(k, v);
149 }
150 for k in patch.headers_removed {
151 ex.input.headers.remove(&k);
152 }
153 for (k, v) in patch.properties_set {
154 ex.properties.insert(k, v);
155 }
156}
157
158#[cfg(test)]
159mod tests {
160 use super::*;
161 use async_trait::async_trait;
162 use camel_api::function::PrepareToken;
163 use camel_api::{FunctionDiff, FunctionInvokerSync};
164 use std::sync::Mutex;
165
166 struct MockInvoker {
167 responses: Mutex<Vec<Result<ExchangePatch, FunctionInvocationError>>>,
168 }
169
170 impl MockInvoker {
171 fn new(responses: Vec<Result<ExchangePatch, FunctionInvocationError>>) -> Self {
172 Self {
173 responses: Mutex::new(responses),
174 }
175 }
176 }
177
178 impl FunctionInvokerSync for MockInvoker {
179 fn stage_pending(
180 &self,
181 _def: FunctionDefinition,
182 _route_id: Option<&str>,
183 _generation: u64,
184 ) {
185 }
186 fn discard_staging(&self, _generation: u64) {}
187 fn begin_reload(&self) -> u64 {
188 0
189 }
190 fn function_refs_for_route(&self, _route_id: &str) -> Vec<(FunctionId, Option<String>)> {
191 vec![]
192 }
193 fn staged_refs_for_route(
194 &self,
195 _route_id: &str,
196 _generation: u64,
197 ) -> Vec<(FunctionId, Option<String>)> {
198 vec![]
199 }
200 fn staged_defs_for_route(
201 &self,
202 _route_id: &str,
203 _generation: u64,
204 ) -> Vec<(FunctionDefinition, Option<String>)> {
205 vec![]
206 }
207 }
208
209 #[async_trait]
210 impl FunctionInvoker for MockInvoker {
211 async fn register(
212 &self,
213 _def: FunctionDefinition,
214 _route_id: Option<&str>,
215 ) -> Result<(), FunctionInvocationError> {
216 Ok(())
217 }
218 async fn unregister(
219 &self,
220 _id: &FunctionId,
221 _route_id: Option<&str>,
222 ) -> Result<(), FunctionInvocationError> {
223 Ok(())
224 }
225 async fn invoke(
226 &self,
227 _id: &FunctionId,
228 _exchange: &Exchange,
229 ) -> Result<ExchangePatch, FunctionInvocationError> {
230 let mut resp = self.responses.lock().unwrap();
231 resp.remove(0)
232 }
233 async fn prepare_reload(
234 &self,
235 _diff: FunctionDiff,
236 _generation: u64,
237 ) -> Result<PrepareToken, FunctionInvocationError> {
238 Ok(PrepareToken::default())
239 }
240 async fn finalize_reload(
241 &self,
242 _diff: &FunctionDiff,
243 _generation: u64,
244 ) -> Result<(), FunctionInvocationError> {
245 Ok(())
246 }
247 async fn rollback_reload(
248 &self,
249 _token: PrepareToken,
250 _generation: u64,
251 ) -> Result<(), FunctionInvocationError> {
252 Ok(())
253 }
254 async fn commit_reload(
255 &self,
256 _diff: FunctionDiff,
257 _generation: u64,
258 ) -> Result<(), FunctionInvocationError> {
259 Ok(())
260 }
261 async fn commit_staged(&self) -> Result<(), FunctionInvocationError> {
262 Ok(())
263 }
264 }
265
266 fn test_definition() -> FunctionDefinition {
267 FunctionDefinition {
268 id: FunctionId::compute("deno", "test", 5000),
269 runtime: "deno".into(),
270 source: "test".into(),
271 timeout_ms: 5000,
272 route_id: None,
273 step_index: None,
274 }
275 }
276
277 #[tokio::test]
278 async fn function_step_applies_patch_body_text() {
279 let invoker = Arc::new(MockInvoker::new(vec![Ok(ExchangePatch {
280 body: Some(PatchBody::Text("patched".into())),
281 ..Default::default()
282 })]));
283 let mut step = FunctionStep::new(invoker, test_definition());
284 let ex = Exchange::default();
285 let result = step.call(ex).await.unwrap();
286 assert_eq!(result.input.body.as_text(), Some("patched"));
287 }
288
289 #[tokio::test]
290 async fn function_step_applies_patch_headers() {
291 let invoker = Arc::new(MockInvoker::new(vec![Ok(ExchangePatch {
292 headers_set: vec![("x-key".into(), serde_json::json!("val"))],
293 headers_removed: vec!["x-old".into()],
294 ..Default::default()
295 })]));
296 let mut step = FunctionStep::new(invoker, test_definition());
297 let mut ex = Exchange::default();
298 ex.input
299 .headers
300 .insert("x-old".into(), serde_json::json!("gone"));
301 let result = step.call(ex).await.unwrap();
302 assert_eq!(
303 result.input.headers.get("x-key").unwrap().as_str(),
304 Some("val")
305 );
306 assert!(!result.input.headers.contains_key("x-old"));
307 }
308
309 #[tokio::test]
310 async fn function_step_applies_patch_properties() {
311 let invoker = Arc::new(MockInvoker::new(vec![Ok(ExchangePatch {
312 properties_set: vec![("prop".into(), serde_json::json!(42))],
313 ..Default::default()
314 })]));
315 let mut step = FunctionStep::new(invoker, test_definition());
316 let ex = Exchange::default();
317 let result = step.call(ex).await.unwrap();
318 assert_eq!(result.properties.get("prop").unwrap().as_i64(), Some(42));
319 }
320
321 #[tokio::test]
322 async fn function_step_maps_timeout_error() {
323 let invoker = Arc::new(MockInvoker::new(vec![Err(
324 FunctionInvocationError::Timeout {
325 function_id: FunctionId("x".into()),
326 timeout_ms: 5000,
327 },
328 )]));
329 let mut step = FunctionStep::new(invoker, test_definition());
330 let ex = Exchange::default();
331 let err = step.call(ex).await.unwrap_err();
332 let msg = match &err {
333 CamelError::ProcessorError(m) => m,
334 _ => panic!("wrong error type"),
335 };
336 assert!(msg.contains("function:timeout:"));
337 }
338
339 #[tokio::test]
340 async fn function_step_maps_user_error() {
341 let invoker = Arc::new(MockInvoker::new(vec![Err(
342 FunctionInvocationError::UserError {
343 function_id: FunctionId("x".into()),
344 message: "boom".into(),
345 stack: None,
346 },
347 )]));
348 let mut step = FunctionStep::new(invoker, test_definition());
349 let ex = Exchange::default();
350 let err = step.call(ex).await.unwrap_err();
351 let msg = match &err {
352 CamelError::ProcessorError(m) => m,
353 _ => panic!("wrong error type"),
354 };
355 assert!(msg.contains("function:user_error:"));
356 assert!(msg.contains("boom"));
357 }
358
359 #[tokio::test]
360 async fn function_step_client_side_timeout_fires() {
361 struct SlowInvoker;
362 impl FunctionInvokerSync for SlowInvoker {
363 fn stage_pending(
364 &self,
365 _def: FunctionDefinition,
366 _route_id: Option<&str>,
367 _generation: u64,
368 ) {
369 }
370 fn discard_staging(&self, _generation: u64) {}
371 fn begin_reload(&self) -> u64 {
372 0
373 }
374 fn function_refs_for_route(
375 &self,
376 _route_id: &str,
377 ) -> Vec<(FunctionId, Option<String>)> {
378 vec![]
379 }
380 fn staged_refs_for_route(
381 &self,
382 _route_id: &str,
383 _generation: u64,
384 ) -> Vec<(FunctionId, Option<String>)> {
385 vec![]
386 }
387 fn staged_defs_for_route(
388 &self,
389 _route_id: &str,
390 _generation: u64,
391 ) -> Vec<(FunctionDefinition, Option<String>)> {
392 vec![]
393 }
394 }
395 #[async_trait]
396 impl FunctionInvoker for SlowInvoker {
397 async fn register(
398 &self,
399 _def: FunctionDefinition,
400 _route_id: Option<&str>,
401 ) -> Result<(), FunctionInvocationError> {
402 Ok(())
403 }
404 async fn unregister(
405 &self,
406 _id: &FunctionId,
407 _route_id: Option<&str>,
408 ) -> Result<(), FunctionInvocationError> {
409 Ok(())
410 }
411 async fn invoke(
412 &self,
413 _id: &FunctionId,
414 _exchange: &Exchange,
415 ) -> Result<ExchangePatch, FunctionInvocationError> {
416 tokio::time::sleep(Duration::from_secs(10)).await;
417 Ok(ExchangePatch::default())
418 }
419 async fn prepare_reload(
420 &self,
421 _diff: FunctionDiff,
422 _generation: u64,
423 ) -> Result<PrepareToken, FunctionInvocationError> {
424 Ok(PrepareToken::default())
425 }
426 async fn finalize_reload(
427 &self,
428 _diff: &FunctionDiff,
429 _generation: u64,
430 ) -> Result<(), FunctionInvocationError> {
431 Ok(())
432 }
433 async fn rollback_reload(
434 &self,
435 _token: PrepareToken,
436 _generation: u64,
437 ) -> Result<(), FunctionInvocationError> {
438 Ok(())
439 }
440 async fn commit_reload(
441 &self,
442 _diff: FunctionDiff,
443 _generation: u64,
444 ) -> Result<(), FunctionInvocationError> {
445 Ok(())
446 }
447 async fn commit_staged(&self) -> Result<(), FunctionInvocationError> {
448 Ok(())
449 }
450 }
451 let def = FunctionDefinition {
452 id: FunctionId::compute("deno", "slow", 50),
453 runtime: "deno".into(),
454 source: "slow".into(),
455 timeout_ms: 50,
456 route_id: None,
457 step_index: None,
458 };
459 let mut step = FunctionStep::new(Arc::new(SlowInvoker), def);
460 let ex = Exchange::default();
461 let err = step.call(ex).await.unwrap_err();
462 let msg = match &err {
463 CamelError::ProcessorError(m) => m,
464 _ => panic!("wrong error type"),
465 };
466 assert!(msg.contains("function:timeout:"));
467 }
468
469 #[tokio::test]
470 async fn function_step_emits_tracing_span() {
471 use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering};
472 use tracing_subscriber::prelude::*;
473
474 let invoker = Arc::new(MockInvoker::new(vec![Ok(ExchangePatch::default())]));
475 let def = FunctionDefinition {
476 id: FunctionId::compute("deno", "span_test", 5000),
477 runtime: "deno".into(),
478 source: "span_test".into(),
479 timeout_ms: 5000,
480 route_id: None,
481 step_index: None,
482 };
483 let mut step = FunctionStep::new(invoker, def);
484 let ex = Exchange::default();
485
486 let span_seen = Arc::new(AtomicBool::new(false));
487 let span_seen_clone = span_seen.clone();
488
489 let layer = tracing_subscriber::fmt::layer()
490 .with_writer(std::io::sink)
491 .with_filter(tracing_subscriber::filter::filter_fn(move |meta| {
492 if meta.target() == "camel_function" && meta.name() == "function" {
493 span_seen_clone.store(true, AtomicOrdering::SeqCst);
494 }
495 true
496 }));
497
498 let _guard = tracing_subscriber::registry().with(layer).set_default();
499 let result = step.call(ex).await;
500 assert!(result.is_ok());
501 assert!(
502 span_seen.load(AtomicOrdering::SeqCst),
503 "expected function span with target 'camel_function' and name 'function'"
504 );
505 }
506
507 #[tokio::test]
508 async fn function_step_user_error_with_stack() {
509 let invoker = Arc::new(MockInvoker::new(vec![Err(
510 FunctionInvocationError::UserError {
511 function_id: FunctionId("x".into()),
512 message: "custom error".into(),
513 stack: Some("at line 1\nat line 2".into()),
514 },
515 )]));
516 let mut step = FunctionStep::new(invoker, test_definition());
517 let ex = Exchange::default();
518 let err = step.call(ex).await.unwrap_err();
519 let msg = match &err {
520 CamelError::ProcessorError(m) => m.clone(),
521 _ => panic!("wrong error type"),
522 };
523 assert!(msg.contains("function:user_error:"));
524 assert!(msg.contains("custom error"));
525 assert!(msg.contains("at line 1"));
526 }
527}