camel_processor/resequencer/
mod.rs1use std::sync::atomic::{AtomicU64, Ordering};
11use std::sync::{Arc, Mutex};
12use std::time::{Duration, Instant};
13
14use async_trait::async_trait;
15use tokio::sync::{Mutex as TokioMutex, mpsc};
16use tokio::task::JoinHandle;
17use tower::Service;
18use tower::util::BoxCloneService;
19
20pub mod batch;
21pub mod stream;
22
23use camel_api::{
24 CamelError, MetricsCollector, StepLifecycle, StepShutdownReason, exchange::Exchange,
25 message::Message, processor::SyncBoxProcessor,
26};
27
28const INOUT_WARN_INTERVAL: Duration = Duration::from_secs(30);
30
31#[derive(Clone, Default)]
33pub struct ResequencerConfig {
34 pub allow_inout: bool,
37 pub metrics: Option<Arc<dyn MetricsCollector>>,
39 pub route_id: Option<String>,
41}
42
43impl std::fmt::Debug for ResequencerConfig {
44 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
45 f.debug_struct("ResequencerConfig")
46 .field("allow_inout", &self.allow_inout)
47 .field("metrics", &self.metrics.as_ref().map(|_| "<metrics>"))
48 .field("route_id", &self.route_id)
49 .finish()
50 }
51}
52
53pub const CAMEL_RESEQUENCER_ACCEPTED: &str = "CamelResequencerAccepted";
57
58pub const CAMEL_RESEQUENCER_DROPPED: &str = "CamelResequencerDropped";
60
61pub const CAMEL_RESEQUENCER_INOUT_WARN: &str = "CamelResequencerInoutWarn";
63
64#[async_trait]
70pub trait ResequencePolicy: Send + Sync + 'static {
71 async fn accept(&self, input: Exchange) -> Vec<Exchange>;
73
74 async fn flush(&self) -> Vec<Exchange>;
76
77 fn name(&self) -> &'static str;
79
80 fn set_timeout_tx(&self, _tx: tokio::sync::mpsc::Sender<Exchange>) {}
84}
85
86#[derive(Clone)]
94pub struct ResequencerService {
95 policy: Arc<dyn ResequencePolicy>,
96 config: ResequencerConfig,
97 input_tx: Arc<Mutex<Option<mpsc::Sender<Exchange>>>>,
100 driver_tx: Arc<Mutex<Option<mpsc::Sender<Exchange>>>>,
104 actor_handle: Arc<Mutex<Option<JoinHandle<()>>>>,
105 driver_handle: Arc<Mutex<Option<JoinHandle<()>>>>,
106 shutdown_started: Arc<Mutex<bool>>,
107 post_lifecycles: Arc<Mutex<Vec<Arc<dyn StepLifecycle>>>>,
110 inout_counter: Arc<AtomicU64>,
112 last_inout_warn: Arc<TokioMutex<Option<Instant>>>,
114 metrics: Option<Arc<dyn MetricsCollector>>,
116 route_id: Option<String>,
118}
119
120impl std::fmt::Debug for ResequencerService {
121 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
122 f.debug_struct("ResequencerService")
123 .field("policy", &self.policy.name())
124 .finish_non_exhaustive()
125 }
126}
127
128impl ResequencerService {
129 pub fn new(
141 policy: Arc<dyn ResequencePolicy>,
142 post_continuation: BoxCloneService<Exchange, Exchange, CamelError>,
143 input_capacity: usize,
144 post_lifecycles: Vec<Arc<dyn StepLifecycle>>,
145 ) -> Self {
146 Self::with_config(
147 policy,
148 post_continuation,
149 input_capacity,
150 post_lifecycles,
151 ResequencerConfig::default(),
152 )
153 }
154
155 pub fn with_config(
161 policy: Arc<dyn ResequencePolicy>,
162 post_continuation: BoxCloneService<Exchange, Exchange, CamelError>,
163 input_capacity: usize,
164 post_lifecycles: Vec<Arc<dyn StepLifecycle>>,
165 config: ResequencerConfig,
166 ) -> Self {
167 let (input_tx, mut input_rx) = mpsc::channel::<Exchange>(input_capacity);
169
170 let (driver_tx, mut driver_rx) = mpsc::channel::<Exchange>(input_capacity);
172
173 let input_tx_shared: Arc<Mutex<Option<mpsc::Sender<Exchange>>>> =
175 Arc::new(Mutex::new(Some(input_tx)));
176 let driver_tx_shared: Arc<Mutex<Option<mpsc::Sender<Exchange>>>> =
177 Arc::new(Mutex::new(Some(driver_tx.clone()))); let actor_handle: Arc<Mutex<Option<JoinHandle<()>>>> = Arc::new(Mutex::new(None));
179 let driver_handle: Arc<Mutex<Option<JoinHandle<()>>>> = Arc::new(Mutex::new(None));
180 let shutdown_started = Arc::new(Mutex::new(false));
181
182 policy.set_timeout_tx(driver_tx.clone());
184
185 let sync_post = SyncBoxProcessor::new(post_continuation);
187
188 {
190 let policy = Arc::clone(&policy);
191 let actor_h = Arc::clone(&actor_handle);
192 let actor_driver_tx = driver_tx; let handle = tokio::spawn(async move {
194 while let Some(input) = input_rx.recv().await {
195 let ready = policy.accept(input).await;
196 for ex in ready {
197 if actor_driver_tx.send(ex).await.is_err() {
199 return;
201 }
202 }
203 }
204 });
206 *actor_h.lock().expect("actor_handle lock poisoned") = Some(handle); }
208
209 {
211 let post = sync_post.clone();
212 let driver_h = Arc::clone(&driver_handle);
213 let metrics = config.metrics.clone();
214 let route_id = config.route_id.clone();
215 let handle = tokio::spawn(async move {
216 while let Some(ex) = driver_rx.recv().await {
217 if camel_api::is_camel_stop(&ex) {
219 tracing::debug!(
220 "resequencer post-driver: skipping continuation for CamelStop exchange"
221 );
222 continue;
223 }
224 let mut proc = post.clone_inner();
226 match proc.call(ex).await {
227 Ok(_) => {}
228 Err(e) => {
229 tracing::warn!(
231 error = %e,
232 "resequencer post-driver: continuation call failed after ack (best-effort)"
233 );
234 if let Some(ref m) = metrics {
235 m.increment_errors(
236 route_id.as_deref().unwrap_or("unknown"),
237 "resequencer:post_ack_failure",
238 );
239 }
240 }
241 }
242 }
243 });
244 *driver_h.lock().expect("driver_handle lock poisoned") = Some(handle); }
246
247 let metrics = config.metrics.clone();
248 let route_id = config.route_id.clone();
249 Self {
250 policy,
251 config,
252 input_tx: input_tx_shared,
253 driver_tx: driver_tx_shared,
254 actor_handle,
255 driver_handle,
256 shutdown_started,
257 post_lifecycles: Arc::new(Mutex::new(post_lifecycles)),
258 inout_counter: Arc::new(AtomicU64::new(0)),
259 last_inout_warn: Arc::new(TokioMutex::new(None)),
260 metrics,
261 route_id,
262 }
263 }
264}
265
266impl Service<Exchange> for ResequencerService {
269 type Response = Exchange;
270 type Error = CamelError;
271 type Future =
272 std::pin::Pin<Box<dyn std::future::Future<Output = Result<Exchange, CamelError>> + Send>>;
273
274 fn poll_ready(
275 &mut self,
276 _cx: &mut std::task::Context<'_>,
277 ) -> std::task::Poll<Result<(), CamelError>> {
278 std::task::Poll::Ready(Ok(()))
280 }
281
282 fn call(&mut self, input: Exchange) -> Self::Future {
283 let config = self.config.clone();
284 let inout_counter = Arc::clone(&self.inout_counter);
285 let last_inout_warn = Arc::clone(&self.last_inout_warn);
286 let tx_opt = Arc::clone(&self.input_tx);
287 let metrics = self.metrics.clone();
288 let route_id = self.route_id.clone();
289
290 Box::pin(async move {
291 let mut ack = Exchange::new(Message::default());
293
294 if input.pattern == camel_api::exchange::ExchangePattern::InOut && !config.allow_inout {
296 inout_counter.fetch_add(1, Ordering::Relaxed);
297 ack.set_property(CAMEL_RESEQUENCER_INOUT_WARN, true);
298 if let Some(ref m) = metrics {
299 m.increment_errors(
300 route_id.as_deref().unwrap_or("unknown"),
301 "resequencer:inout_warning",
302 );
303 }
304 let now = Instant::now();
305 let mut last_guard = last_inout_warn.lock().await;
306 let should_warn = last_guard
307 .map(|t| now.duration_since(t) >= INOUT_WARN_INTERVAL)
308 .unwrap_or(true);
309 if should_warn {
310 let count = inout_counter.load(Ordering::Relaxed);
311 tracing::warn!(
312 inout_count = count,
313 "InOut exchange reached resequencer ({count} total); \
314 consider using InOnly pattern. \
315 Set allow_inout=true to suppress this warning."
316 );
317 *last_guard = Some(now);
318 }
319 }
320
321 let tx = {
324 let guard = tx_opt.lock().unwrap_or_else(|e| e.into_inner());
325 guard.clone()
326 };
327 if let Some(tx) = tx {
328 match tx.send(input).await {
330 Ok(()) => {
331 ack.set_property(CAMEL_RESEQUENCER_ACCEPTED, true);
332 }
333 Err(tokio::sync::mpsc::error::SendError(input)) => {
334 tracing::warn!(
335 correlation_id = %input.correlation_id,
336 "resequencer input dropped during shutdown"
337 );
338 ack.set_property(CAMEL_RESEQUENCER_ACCEPTED, false);
339 ack.set_property(CAMEL_RESEQUENCER_DROPPED, true);
340 }
341 }
342 }
343
344 Ok(ack)
345 })
346 }
347}
348
349#[async_trait]
352impl StepLifecycle for ResequencerService {
353 fn name(&self) -> &'static str {
354 self.policy.name()
355 }
356
357 async fn shutdown(&self, reason: StepShutdownReason) -> Result<(), CamelError> {
365 tracing::debug!(
369 reason = ?reason,
370 policy = self.policy.name(),
371 "ResequencerService shutdown via StepLifecycle"
372 );
373
374 {
376 let mut started = self
377 .shutdown_started
378 .lock()
379 .unwrap_or_else(|e| e.into_inner());
380 if *started {
381 tracing::debug!(
382 "ResequencerService shutdown already started (idempotent); skipping"
383 );
384 return Ok(());
385 }
386 *started = true;
387 }
388
389 {
393 let mut guard = self.input_tx.lock().unwrap_or_else(|e| e.into_inner());
394 *guard = None; }
396
397 let actor_handle_to_await = {
400 let mut guard = self.actor_handle.lock().unwrap_or_else(|e| e.into_inner());
401 guard.take()
402 };
403 if let Some(handle) = actor_handle_to_await {
404 let _ = tokio::time::timeout(Duration::from_secs(5), handle).await;
406 }
407
408 let flushed = self.policy.flush().await;
410 if !flushed.is_empty() {
411 let dt = {
412 let guard = self.driver_tx.lock().unwrap_or_else(|e| e.into_inner());
413 guard.clone()
414 };
415 if let Some(driver_tx) = dt {
416 for ex in flushed {
417 if driver_tx.send(ex).await.is_err() {
418 tracing::warn!(
419 "resequencer shutdown flush: post-driver channel closed early"
420 );
421 break;
422 }
423 }
424 }
425 }
426
427 {
429 let mut guard = self.driver_tx.lock().unwrap_or_else(|e| e.into_inner());
430 *guard = None; }
432
433 let driver_handle_to_await = {
435 let mut guard = self.driver_handle.lock().unwrap_or_else(|e| e.into_inner());
436 guard.take()
437 };
438 if let Some(handle) = driver_handle_to_await {
439 let result = tokio::time::timeout(Duration::from_secs(5), handle).await;
440 if result.is_err() {
441 tracing::warn!(
442 "resequencer post-driver task did not finish within 5s deadline; \
443 leaking handle (best-effort)"
444 );
445 }
446 }
447
448 {
451 let post_lcs: Vec<Arc<dyn StepLifecycle>> = {
452 let mut guard = self
453 .post_lifecycles
454 .lock()
455 .unwrap_or_else(|e| e.into_inner());
456 std::mem::take(&mut *guard)
457 };
458 for lc in &post_lcs {
459 if let Err(e) = lc.shutdown(reason).await {
460 tracing::warn!(
461 step = lc.name(),
462 error = %e,
463 "resequencer post-step lifecycle shutdown failed (best-effort)"
464 );
465 }
466 }
467 }
468
469 Ok(())
470 }
471}
472
473#[derive(Debug)]
478pub struct PassthroughPolicy;
479
480#[async_trait]
481impl ResequencePolicy for PassthroughPolicy {
482 async fn accept(&self, input: Exchange) -> Vec<Exchange> {
483 vec![input]
484 }
485
486 async fn flush(&self) -> Vec<Exchange> {
487 vec![]
488 }
489
490 fn name(&self) -> &'static str {
491 "passthrough"
492 }
493}
494
495#[cfg(test)]
498mod tests {
499 use super::*;
500 use tower::ServiceExt;
501
502 #[derive(Clone)]
504 struct CapturePost {
505 tx: mpsc::UnboundedSender<Exchange>,
506 }
507
508 impl Service<Exchange> for CapturePost {
509 type Response = Exchange;
510 type Error = CamelError;
511 type Future = std::pin::Pin<
512 Box<dyn std::future::Future<Output = Result<Exchange, CamelError>> + Send>,
513 >;
514
515 fn poll_ready(
516 &mut self,
517 _cx: &mut std::task::Context<'_>,
518 ) -> std::task::Poll<Result<(), CamelError>> {
519 std::task::Poll::Ready(Ok(()))
520 }
521
522 fn call(&mut self, exchange: Exchange) -> Self::Future {
523 let tx = self.tx.clone();
524 Box::pin(async move {
525 let _ = tx.send(exchange.clone());
527 Ok(exchange)
528 })
529 }
530 }
531
532 #[tokio::test]
533 async fn resequencer_boundary_passthrough_ack_and_continuation() {
534 use camel_api::body::Body;
535
536 let policy: Arc<dyn ResequencePolicy> = Arc::new(PassthroughPolicy);
538 let (capture_tx, mut capture_rx) = mpsc::unbounded_channel::<Exchange>();
539 let capture = CapturePost { tx: capture_tx };
540 let post_continuation: BoxCloneService<Exchange, Exchange, CamelError> =
541 BoxCloneService::new(capture);
542
543 let service = ResequencerService::new(policy, post_continuation, 1024, vec![]);
544
545 let mut input = Exchange::new(Message::new(Body::Text("hello".into())));
547 input.set_property("seq", 1);
548
549 let ack = service.clone().oneshot(input).await.unwrap();
550
551 assert!(
553 matches!(ack.input.body, Body::Empty),
554 "ack body should be Empty, got {:?}",
555 ack.input.body
556 );
557 assert_eq!(
558 ack.property(CAMEL_RESEQUENCER_ACCEPTED)
559 .and_then(|v| v.as_bool()),
560 Some(true),
561 "CAMEL_RESEQUENCER_ACCEPTED should be true"
562 );
563
564 let captured = tokio::time::timeout(Duration::from_millis(500), capture_rx.recv())
566 .await
567 .expect("post-continuation did not receive exchange within 500ms timeout")
568 .expect("capture channel closed without receiving exchange");
569 let body = captured.input.body.as_text();
570 assert_eq!(
571 body,
572 Some("hello"),
573 "post-continuation received body should match"
574 );
575
576 service
578 .shutdown(StepShutdownReason::RouteStop)
579 .await
580 .expect("first shutdown should succeed");
581
582 service
583 .shutdown(StepShutdownReason::RouteStop)
584 .await
585 .expect("second shutdown should succeed (idempotent)");
586 }
587
588 #[tokio::test]
589 async fn resequencer_boundary_camel_stop_skipped() {
590 use camel_api::body::Body;
591
592 let policy: Arc<dyn ResequencePolicy> = Arc::new(PassthroughPolicy);
593 let (capture_tx, mut capture_rx) = mpsc::unbounded_channel::<Exchange>();
594 let capture = CapturePost { tx: capture_tx };
595 let post_continuation: BoxCloneService<Exchange, Exchange, CamelError> =
596 BoxCloneService::new(capture);
597
598 let service = ResequencerService::new(policy, post_continuation, 1024, vec![]);
599
600 let mut input = Exchange::new(Message::new(Body::Text(
602 "should-not-reach-continuation".into(),
603 )));
604 input.set_property(camel_api::exchange::CAMEL_STOP, true);
605
606 let ack = service.clone().oneshot(input).await.unwrap();
607
608 assert_eq!(
610 ack.property(CAMEL_RESEQUENCER_ACCEPTED)
611 .and_then(|v| v.as_bool()),
612 Some(true),
613 "CamelStop exchange should still be accepted by resequencer actor"
614 );
615
616 let did_receive = tokio::time::timeout(Duration::from_millis(500), capture_rx.recv()).await;
618 match did_receive {
619 Ok(Some(_)) => panic!("CamelStop exchange should NOT reach post-continuation"),
620 Ok(None) => {} Err(_elapsed) => {} }
623
624 service
626 .shutdown(StepShutdownReason::RouteStop)
627 .await
628 .expect("shutdown should succeed");
629 }
630
631 #[tokio::test]
632 async fn inout_guard_increments_counter() {
633 let policy: Arc<dyn ResequencePolicy> = Arc::new(PassthroughPolicy);
634 let (tx, _rx) = mpsc::unbounded_channel::<Exchange>();
635 let post: BoxCloneService<Exchange, Exchange, CamelError> =
636 BoxCloneService::new(CapturePost { tx });
637 let config = ResequencerConfig::default();
638 let service = ResequencerService::with_config(policy, post, 16, vec![], config);
639
640 let ex_inonly = Exchange::new(Message::new("inonly"));
642 let _ = service.clone().oneshot(ex_inonly).await.unwrap();
643
644 let ex_inout = Exchange::new_in_out(Message::new("inout"));
646 let _ = service.clone().oneshot(ex_inout).await.unwrap();
647 assert!(
648 service.inout_counter.load(Ordering::Relaxed) > 0,
649 "InOut counter should be > 0 after InOut exchange"
650 );
651
652 service
653 .shutdown(StepShutdownReason::RouteStop)
654 .await
655 .expect("shutdown");
656 }
657
658 #[tokio::test]
659 async fn inout_guard_allow_inout_suppresses() {
660 let policy: Arc<dyn ResequencePolicy> = Arc::new(PassthroughPolicy);
661 let (tx, _rx) = mpsc::unbounded_channel::<Exchange>();
662 let post: BoxCloneService<Exchange, Exchange, CamelError> =
663 BoxCloneService::new(CapturePost { tx });
664 let config = ResequencerConfig {
665 allow_inout: true,
666 ..Default::default()
667 };
668 let service = ResequencerService::with_config(policy, post, 16, vec![], config);
669
670 let ex_inout = Exchange::new_in_out(Message::new("inout-allowed"));
671 let _ = service.clone().oneshot(ex_inout).await.unwrap();
672 assert_eq!(
673 service.inout_counter.load(Ordering::Relaxed),
674 0,
675 "InOut counter should be 0 when allow_inout=true"
676 );
677
678 service
679 .shutdown(StepShutdownReason::RouteStop)
680 .await
681 .expect("shutdown");
682 }
683}