Skip to main content

camel_processor/resequencer/
mod.rs

1//! Resequencer — continuation-boundary EIP.
2//!
3//! The resequencer is a `CompiledStep` + `StepLifecycle`. `call(input)` sends
4//! the input into a bounded actor channel; an actor buffers + computes ready
5//! outputs + sends them to a post-driver that drives the owned post-continuation;
6//! `call()` returns a control ack. The main pipeline ends at the resequencer.
7//!
8//! Architecture: See ADR-0029 (resequencer continuation boundary).
9
10use std::sync::atomic::{AtomicU64, Ordering};
11use std::sync::{Arc, Mutex};
12use std::time::{Duration, Instant};
13
14use async_trait::async_trait;
15use tokio::sync::{Mutex as TokioMutex, mpsc};
16use tokio::task::JoinHandle;
17use tower::Service;
18use tower::util::BoxCloneService;
19
20pub mod batch;
21pub mod stream;
22
23use camel_api::{
24    CamelError, MetricsCollector, StepLifecycle, StepShutdownReason, exchange::Exchange,
25    message::Message, processor::SyncBoxProcessor,
26};
27
28/// Rate-limit window for InOut warning log emission.
29const INOUT_WARN_INTERVAL: Duration = Duration::from_secs(30);
30
31/// Configuration for the `ResequencerService`.
32#[derive(Clone, Default)]
33pub struct ResequencerConfig {
34    /// Allow `InOut` exchanges to pass through the resequencer without
35    /// emitting a warning. Defaults to `false`.
36    pub allow_inout: bool,
37    /// Optional metrics collector for incrementing operational counters.
38    pub metrics: Option<Arc<dyn MetricsCollector>>,
39    /// Optional route ID for metric labels.
40    pub route_id: Option<String>,
41}
42
43impl std::fmt::Debug for ResequencerConfig {
44    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
45        f.debug_struct("ResequencerConfig")
46            .field("allow_inout", &self.allow_inout)
47            .field("metrics", &self.metrics.as_ref().map(|_| "<metrics>"))
48            .field("route_id", &self.route_id)
49            .finish()
50    }
51}
52
53// ── Property keys (CamelCase, matching CAMEL_AGGREGATED_COMPLETION_REASON) ──
54
55/// Set on the ack exchange: `true` when the resequencer accepted the input.
56pub const CAMEL_RESEQUENCER_ACCEPTED: &str = "CamelResequencerAccepted";
57
58/// Set on the ack exchange: `true` when the input was dropped during shutdown.
59pub const CAMEL_RESEQUENCER_DROPPED: &str = "CamelResequencerDropped";
60
61/// Set on the ack exchange: `true` when an InOut exchange reaches the resequencer.
62pub const CAMEL_RESEQUENCER_INOUT_WARN: &str = "CamelResequencerInoutWarn";
63
64// ── Policy trait ──
65
66/// Buffer / ordering policy for a resequencer.
67///
68/// Implementations (batch, stream) live in sibling modules.
69#[async_trait]
70pub trait ResequencePolicy: Send + Sync + 'static {
71    /// Accept an input; return the list of now-ready exchanges (in emit order).
72    async fn accept(&self, input: Exchange) -> Vec<Exchange>;
73
74    /// Flush all buffered state (shutdown). Return any remaining, ordered.
75    async fn flush(&self) -> Vec<Exchange>;
76
77    /// Stable name for logging / diagnostics.
78    fn name(&self) -> &'static str;
79
80    /// Set the driver channel for timeout-triggered emissions.
81    /// Default is a no-op. `BatchPolicy` overrides this to receive
82    /// the channel that feeds the post-driver.
83    fn set_timeout_tx(&self, _tx: tokio::sync::mpsc::Sender<Exchange>) {}
84}
85
86// ── Service ──
87
88/// Continuation-boundary resequencer.
89///
90/// Owns an actor task (consumes from input channel, calls `policy.accept(input)`)
91/// and a post-driver task (consumes ready exchanges, drives `post_continuation`).
92/// `Service::call(input)` sends into the bounded input channel and returns an ack.
93#[derive(Clone)]
94pub struct ResequencerService {
95    policy: Arc<dyn ResequencePolicy>,
96    config: ResequencerConfig,
97    /// Bounded input channel sender. Wrapped in `Option` so `shutdown` can
98    /// `take()` it to signal EOF to the actor (all sender clones must drop).
99    input_tx: Arc<Mutex<Option<mpsc::Sender<Exchange>>>>,
100    /// Post-driver channel sender. The actor task holds a clone; we hold one
101    /// here for shutdown flush. Wrapped in `Option` so `shutdown` can take it
102    /// to close the post-driver channel after flush.
103    driver_tx: Arc<Mutex<Option<mpsc::Sender<Exchange>>>>,
104    actor_handle: Arc<Mutex<Option<JoinHandle<()>>>>,
105    driver_handle: Arc<Mutex<Option<JoinHandle<()>>>>,
106    shutdown_started: Arc<Mutex<bool>>,
107    /// Post-step lifecycles to drain after post-driver quiesces (oracle Fix 2).
108    /// Empty for Task 1a (no post-steps yet); filled by Tasks 1b/2/3.
109    post_lifecycles: Arc<Mutex<Vec<Arc<dyn StepLifecycle>>>>,
110    /// Metric counter for InOut exchanges that reach the resequencer.
111    inout_counter: Arc<AtomicU64>,
112    /// Rate-limit last-warn timestamp (TokioMutex for async safety).
113    last_inout_warn: Arc<TokioMutex<Option<Instant>>>,
114    /// Optional metrics collector for operational counters.
115    metrics: Option<Arc<dyn MetricsCollector>>,
116    /// Route ID for metric labels.
117    route_id: Option<String>,
118}
119
120impl std::fmt::Debug for ResequencerService {
121    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
122        f.debug_struct("ResequencerService")
123            .field("policy", &self.policy.name())
124            .finish_non_exhaustive()
125    }
126}
127
128impl ResequencerService {
129    /// Create a new resequencer with the given policy, continuation, and post-step lifecycles.
130    ///
131    /// * `input_capacity` — bounded channel capacity (default 1024). Backpressure
132    ///   propagates to the caller via `send().await`.
133    /// * `post_lifecycles` — lifecycle handles for steps AFTER the resequencer
134    ///   (drained in `shutdown` after post-driver quiesces; empty for Task 1a).
135    ///
136    /// # Panics
137    ///
138    /// Panics if called outside a Tokio runtime context: `new()` spawns the actor and
139    /// post-driver tasks via `tokio::spawn`.
140    pub fn new(
141        policy: Arc<dyn ResequencePolicy>,
142        post_continuation: BoxCloneService<Exchange, Exchange, CamelError>,
143        input_capacity: usize,
144        post_lifecycles: Vec<Arc<dyn StepLifecycle>>,
145    ) -> Self {
146        Self::with_config(
147            policy,
148            post_continuation,
149            input_capacity,
150            post_lifecycles,
151            ResequencerConfig::default(),
152        )
153    }
154
155    /// Full constructor with explicit `ResequencerConfig`.
156    ///
157    /// # Panics
158    ///
159    /// Panics if called outside a Tokio runtime context.
160    pub fn with_config(
161        policy: Arc<dyn ResequencePolicy>,
162        post_continuation: BoxCloneService<Exchange, Exchange, CamelError>,
163        input_capacity: usize,
164        post_lifecycles: Vec<Arc<dyn StepLifecycle>>,
165        config: ResequencerConfig,
166    ) -> Self {
167        // Bounded channel for input exchanges → actor
168        let (input_tx, mut input_rx) = mpsc::channel::<Exchange>(input_capacity);
169
170        // Post-driver channel: actor → post-driver (bounded, single consumer)
171        let (driver_tx, mut driver_rx) = mpsc::channel::<Exchange>(input_capacity);
172
173        // Shared (Arc<Mutex<Option<T>>>) wrappers
174        let input_tx_shared: Arc<Mutex<Option<mpsc::Sender<Exchange>>>> =
175            Arc::new(Mutex::new(Some(input_tx)));
176        let driver_tx_shared: Arc<Mutex<Option<mpsc::Sender<Exchange>>>> =
177            Arc::new(Mutex::new(Some(driver_tx.clone()))); // we keep one for flush; actor gets clone
178        let actor_handle: Arc<Mutex<Option<JoinHandle<()>>>> = Arc::new(Mutex::new(None));
179        let driver_handle: Arc<Mutex<Option<JoinHandle<()>>>> = Arc::new(Mutex::new(None));
180        let shutdown_started = Arc::new(Mutex::new(false));
181
182        // Wire the driver channel into the policy for timeout-triggered emissions
183        policy.set_timeout_tx(driver_tx.clone());
184
185        // Thread-safe wrapper for the post-continuation
186        let sync_post = SyncBoxProcessor::new(post_continuation);
187
188        // ── Spawn actor task ──
189        {
190            let policy = Arc::clone(&policy);
191            let actor_h = Arc::clone(&actor_handle);
192            let actor_driver_tx = driver_tx; // move the original sender into the actor
193            let handle = tokio::spawn(async move {
194                while let Some(input) = input_rx.recv().await {
195                    let ready = policy.accept(input).await;
196                    for ex in ready {
197                        // If the post-driver channel is closed, stop
198                        if actor_driver_tx.send(ex).await.is_err() {
199                            // post-driver dropped → exit
200                            return;
201                        }
202                    }
203                }
204                // input channel closed (EOF from shutdown) → exit naturally
205            });
206            *actor_h.lock().expect("actor_handle lock poisoned") = Some(handle); // allow-unwrap: handle slot is None at construction time
207        }
208
209        // ── Spawn post-driver task ──
210        {
211            let post = sync_post.clone();
212            let driver_h = Arc::clone(&driver_handle);
213            let metrics = config.metrics.clone();
214            let route_id = config.route_id.clone();
215            let handle = tokio::spawn(async move {
216                while let Some(ex) = driver_rx.recv().await {
217                    // CamelStop interaction: skip continuation for stop-signaled exchanges
218                    if camel_api::is_camel_stop(&ex) {
219                        tracing::debug!(
220                            "resequencer post-driver: skipping continuation for CamelStop exchange"
221                        );
222                        continue;
223                    }
224                    // Clone inner processor (cheap: Arc bump + Mutex briefly held)
225                    let mut proc = post.clone_inner();
226                    match proc.call(ex).await {
227                        Ok(_) => {}
228                        Err(e) => {
229                            // log-policy: post-ack failure (ADR-0012 best-effort, ADR-0029 I7)
230                            tracing::warn!(
231                                error = %e,
232                                "resequencer post-driver: continuation call failed after ack (best-effort)"
233                            );
234                            if let Some(ref m) = metrics {
235                                m.increment_errors(
236                                    route_id.as_deref().unwrap_or("unknown"),
237                                    "resequencer:post_ack_failure",
238                                );
239                            }
240                        }
241                    }
242                }
243            });
244            *driver_h.lock().expect("driver_handle lock poisoned") = Some(handle); // allow-unwrap: handle slot is None at construction time
245        }
246
247        let metrics = config.metrics.clone();
248        let route_id = config.route_id.clone();
249        Self {
250            policy,
251            config,
252            input_tx: input_tx_shared,
253            driver_tx: driver_tx_shared,
254            actor_handle,
255            driver_handle,
256            shutdown_started,
257            post_lifecycles: Arc::new(Mutex::new(post_lifecycles)),
258            inout_counter: Arc::new(AtomicU64::new(0)),
259            last_inout_warn: Arc::new(TokioMutex::new(None)),
260            metrics,
261            route_id,
262        }
263    }
264}
265
266// ── Tower Service impl ──
267
268impl Service<Exchange> for ResequencerService {
269    type Response = Exchange;
270    type Error = CamelError;
271    type Future =
272        std::pin::Pin<Box<dyn std::future::Future<Output = Result<Exchange, CamelError>> + Send>>;
273
274    fn poll_ready(
275        &mut self,
276        _cx: &mut std::task::Context<'_>,
277    ) -> std::task::Poll<Result<(), CamelError>> {
278        // ADR-0019: always ready; backpressure via bounded send().await in call()
279        std::task::Poll::Ready(Ok(()))
280    }
281
282    fn call(&mut self, input: Exchange) -> Self::Future {
283        let config = self.config.clone();
284        let inout_counter = Arc::clone(&self.inout_counter);
285        let last_inout_warn = Arc::clone(&self.last_inout_warn);
286        let tx_opt = Arc::clone(&self.input_tx);
287        let metrics = self.metrics.clone();
288        let route_id = self.route_id.clone();
289
290        Box::pin(async move {
291            // Build ack exchange
292            let mut ack = Exchange::new(Message::default());
293
294            // InOut guard (I6): rate-limited with metric counter and property flag
295            if input.pattern == camel_api::exchange::ExchangePattern::InOut && !config.allow_inout {
296                inout_counter.fetch_add(1, Ordering::Relaxed);
297                ack.set_property(CAMEL_RESEQUENCER_INOUT_WARN, true);
298                if let Some(ref m) = metrics {
299                    m.increment_errors(
300                        route_id.as_deref().unwrap_or("unknown"),
301                        "resequencer:inout_warning",
302                    );
303                }
304                let now = Instant::now();
305                let mut last_guard = last_inout_warn.lock().await;
306                let should_warn = last_guard
307                    .map(|t| now.duration_since(t) >= INOUT_WARN_INTERVAL)
308                    .unwrap_or(true);
309                if should_warn {
310                    let count = inout_counter.load(Ordering::Relaxed);
311                    tracing::warn!(
312                        inout_count = count,
313                        "InOut exchange reached resequencer ({count} total); \
314                         consider using InOnly pattern. \
315                         Set allow_inout=true to suppress this warning."
316                    );
317                    *last_guard = Some(now);
318                }
319            }
320
321            // Snapshot the sender (if still active). If shutdown already took it,
322            // the exchange is dropped — best-effort (intake already cancelled).
323            let tx = {
324                let guard = tx_opt.lock().unwrap_or_else(|e| e.into_inner());
325                guard.clone()
326            };
327            if let Some(tx) = tx {
328                // Backpressure: blocks if channel is full
329                match tx.send(input).await {
330                    Ok(()) => {
331                        ack.set_property(CAMEL_RESEQUENCER_ACCEPTED, true);
332                    }
333                    Err(tokio::sync::mpsc::error::SendError(input)) => {
334                        tracing::warn!(
335                            correlation_id = %input.correlation_id,
336                            "resequencer input dropped during shutdown"
337                        );
338                        ack.set_property(CAMEL_RESEQUENCER_ACCEPTED, false);
339                        ack.set_property(CAMEL_RESEQUENCER_DROPPED, true);
340                    }
341                }
342            }
343
344            Ok(ack)
345        })
346    }
347}
348
349// ── StepLifecycle impl ──
350
351#[async_trait]
352impl StepLifecycle for ResequencerService {
353    fn name(&self) -> &'static str {
354        self.policy.name()
355    }
356
357    /// Idempotent shutdown with this ordering:
358    /// 1. Set shutdown flag; close/drop input_tx so actor sees EOF.
359    /// 2. Await actor JoinHandle (bounded deadline).
360    /// 3. policy.flush() → emit remaining in order via post-driver.
361    /// 4. Close post-driver channel sender so its loop sees EOF.
362    /// 5. Await post-driver JoinHandle with 5s deadline.
363    /// 6. Drain post-step lifecycles (oracle Fix 2).
364    async fn shutdown(&self, reason: StepShutdownReason) -> Result<(), CamelError> {
365        // TODO(Task 1b): differentiate HotSwap (complete in-flight through old continuation,
366        // ADR-0004) from RouteStop (flush + drain). Currently both paths run the same
367        // flush-then-close sequence.
368        tracing::debug!(
369            reason = ?reason,
370            policy = self.policy.name(),
371            "ResequencerService shutdown via StepLifecycle"
372        );
373
374        // Idempotent guard (I1)
375        {
376            let mut started = self
377                .shutdown_started
378                .lock()
379                .unwrap_or_else(|e| e.into_inner());
380            if *started {
381                tracing::debug!(
382                    "ResequencerService shutdown already started (idempotent); skipping"
383                );
384                return Ok(());
385            }
386            *started = true;
387        }
388
389        // Step 1: Close/drop input_tx so actor's input_rx sees EOF.
390        // Taking the sender from the Option drops it. The shared Arc<Mutex<...>>
391        // means all clones see None after this.
392        {
393            let mut guard = self.input_tx.lock().unwrap_or_else(|e| e.into_inner());
394            *guard = None; // drop the sender
395        }
396
397        // Step 2: Await actor JoinHandle (bounded deadline).
398        // Extract handle first, then await outside the lock to avoid Send issue.
399        let actor_handle_to_await = {
400            let mut guard = self.actor_handle.lock().unwrap_or_else(|e| e.into_inner());
401            guard.take()
402        };
403        if let Some(handle) = actor_handle_to_await {
404            // 5s deadline for actor to finish processing remaining input
405            let _ = tokio::time::timeout(Duration::from_secs(5), handle).await;
406        }
407
408        // Step 3: policy.flush() → emit remaining in order via post-driver.
409        let flushed = self.policy.flush().await;
410        if !flushed.is_empty() {
411            let dt = {
412                let guard = self.driver_tx.lock().unwrap_or_else(|e| e.into_inner());
413                guard.clone()
414            };
415            if let Some(driver_tx) = dt {
416                for ex in flushed {
417                    if driver_tx.send(ex).await.is_err() {
418                        tracing::warn!(
419                            "resequencer shutdown flush: post-driver channel closed early"
420                        );
421                        break;
422                    }
423                }
424            }
425        }
426
427        // Step 4: Close the post-driver channel sender so its loop sees EOF.
428        {
429            let mut guard = self.driver_tx.lock().unwrap_or_else(|e| e.into_inner());
430            *guard = None; // drop the sender → driver_rx.recv() returns None
431        }
432
433        // Step 5: Await the post-driver JoinHandle with a 5s deadline.
434        let driver_handle_to_await = {
435            let mut guard = self.driver_handle.lock().unwrap_or_else(|e| e.into_inner());
436            guard.take()
437        };
438        if let Some(handle) = driver_handle_to_await {
439            let result = tokio::time::timeout(Duration::from_secs(5), handle).await;
440            if result.is_err() {
441                tracing::warn!(
442                    "resequencer post-driver task did not finish within 5s deadline; \
443                     leaking handle (best-effort)"
444                );
445            }
446        }
447
448        // Step 6: Drain post-step lifecycles (oracle Fix 2).
449        // Must happen AFTER post-driver drains (flush emits through continuation first).
450        {
451            let post_lcs: Vec<Arc<dyn StepLifecycle>> = {
452                let mut guard = self
453                    .post_lifecycles
454                    .lock()
455                    .unwrap_or_else(|e| e.into_inner());
456                std::mem::take(&mut *guard)
457            };
458            for lc in &post_lcs {
459                if let Err(e) = lc.shutdown(reason).await {
460                    tracing::warn!(
461                        step = lc.name(),
462                        error = %e,
463                        "resequencer post-step lifecycle shutdown failed (best-effort)"
464                    );
465                }
466            }
467        }
468
469        Ok(())
470    }
471}
472
473// ── Passthrough policy (for testing) ──
474
475/// Emits each input unchanged — used as a baseline policy for testing
476/// the continuation-boundary mechanics.
477#[derive(Debug)]
478pub struct PassthroughPolicy;
479
480#[async_trait]
481impl ResequencePolicy for PassthroughPolicy {
482    async fn accept(&self, input: Exchange) -> Vec<Exchange> {
483        vec![input]
484    }
485
486    async fn flush(&self) -> Vec<Exchange> {
487        vec![]
488    }
489
490    fn name(&self) -> &'static str {
491        "passthrough"
492    }
493}
494
495// ── Tests ──
496
497#[cfg(test)]
498mod tests {
499    use super::*;
500    use tower::ServiceExt;
501
502    /// A test continuation that sends received exchanges through an mpsc channel.
503    #[derive(Clone)]
504    struct CapturePost {
505        tx: mpsc::UnboundedSender<Exchange>,
506    }
507
508    impl Service<Exchange> for CapturePost {
509        type Response = Exchange;
510        type Error = CamelError;
511        type Future = std::pin::Pin<
512            Box<dyn std::future::Future<Output = Result<Exchange, CamelError>> + Send>,
513        >;
514
515        fn poll_ready(
516            &mut self,
517            _cx: &mut std::task::Context<'_>,
518        ) -> std::task::Poll<Result<(), CamelError>> {
519            std::task::Poll::Ready(Ok(()))
520        }
521
522        fn call(&mut self, exchange: Exchange) -> Self::Future {
523            let tx = self.tx.clone();
524            Box::pin(async move {
525                // Fire-and-forget: drop send errors (receiver gone = test teardown)
526                let _ = tx.send(exchange.clone());
527                Ok(exchange)
528            })
529        }
530    }
531
532    #[tokio::test]
533    async fn resequencer_boundary_passthrough_ack_and_continuation() {
534        use camel_api::body::Body;
535
536        // Build a resequencer with passthrough policy + channel capture continuation
537        let policy: Arc<dyn ResequencePolicy> = Arc::new(PassthroughPolicy);
538        let (capture_tx, mut capture_rx) = mpsc::unbounded_channel::<Exchange>();
539        let capture = CapturePost { tx: capture_tx };
540        let post_continuation: BoxCloneService<Exchange, Exchange, CamelError> =
541            BoxCloneService::new(capture);
542
543        let service = ResequencerService::new(policy, post_continuation, 1024, vec![]);
544
545        // Send an exchange
546        let mut input = Exchange::new(Message::new(Body::Text("hello".into())));
547        input.set_property("seq", 1);
548
549        let ack = service.clone().oneshot(input).await.unwrap();
550
551        // Assert (a): ack has Body::Empty + CAMEL_RESEQUENCER_ACCEPTED=true
552        assert!(
553            matches!(ack.input.body, Body::Empty),
554            "ack body should be Empty, got {:?}",
555            ack.input.body
556        );
557        assert_eq!(
558            ack.property(CAMEL_RESEQUENCER_ACCEPTED)
559                .and_then(|v| v.as_bool()),
560            Some(true),
561            "CAMEL_RESEQUENCER_ACCEPTED should be true"
562        );
563
564        // Assert (b): the post-continuation receives the input payload
565        let captured = tokio::time::timeout(Duration::from_millis(500), capture_rx.recv())
566            .await
567            .expect("post-continuation did not receive exchange within 500ms timeout")
568            .expect("capture channel closed without receiving exchange");
569        let body = captured.input.body.as_text();
570        assert_eq!(
571            body,
572            Some("hello"),
573            "post-continuation received body should match"
574        );
575
576        // Assert (c): shutdown is idempotent (calling twice returns Ok)
577        service
578            .shutdown(StepShutdownReason::RouteStop)
579            .await
580            .expect("first shutdown should succeed");
581
582        service
583            .shutdown(StepShutdownReason::RouteStop)
584            .await
585            .expect("second shutdown should succeed (idempotent)");
586    }
587
588    #[tokio::test]
589    async fn resequencer_boundary_camel_stop_skipped() {
590        use camel_api::body::Body;
591
592        let policy: Arc<dyn ResequencePolicy> = Arc::new(PassthroughPolicy);
593        let (capture_tx, mut capture_rx) = mpsc::unbounded_channel::<Exchange>();
594        let capture = CapturePost { tx: capture_tx };
595        let post_continuation: BoxCloneService<Exchange, Exchange, CamelError> =
596            BoxCloneService::new(capture);
597
598        let service = ResequencerService::new(policy, post_continuation, 1024, vec![]);
599
600        // Send an exchange flagged with CamelStop
601        let mut input = Exchange::new(Message::new(Body::Text(
602            "should-not-reach-continuation".into(),
603        )));
604        input.set_property(camel_api::exchange::CAMEL_STOP, true);
605
606        let ack = service.clone().oneshot(input).await.unwrap();
607
608        // Assert: actor accepted the exchange (ack is returned)
609        assert_eq!(
610            ack.property(CAMEL_RESEQUENCER_ACCEPTED)
611                .and_then(|v| v.as_bool()),
612            Some(true),
613            "CamelStop exchange should still be accepted by resequencer actor"
614        );
615
616        // Assert: the post-continuation does NOT receive the CamelStop exchange
617        let did_receive = tokio::time::timeout(Duration::from_millis(500), capture_rx.recv()).await;
618        match did_receive {
619            Ok(Some(_)) => panic!("CamelStop exchange should NOT reach post-continuation"),
620            Ok(None) => {}      // channel closed (expected in some teardown scenarios)
621            Err(_elapsed) => {} // timeout is the expected path: nothing arrived
622        }
623
624        // Cleanup
625        service
626            .shutdown(StepShutdownReason::RouteStop)
627            .await
628            .expect("shutdown should succeed");
629    }
630
631    #[tokio::test]
632    async fn inout_guard_increments_counter() {
633        let policy: Arc<dyn ResequencePolicy> = Arc::new(PassthroughPolicy);
634        let (tx, _rx) = mpsc::unbounded_channel::<Exchange>();
635        let post: BoxCloneService<Exchange, Exchange, CamelError> =
636            BoxCloneService::new(CapturePost { tx });
637        let config = ResequencerConfig::default();
638        let service = ResequencerService::with_config(policy, post, 16, vec![], config);
639
640        // InOnly should NOT increment counter
641        let ex_inonly = Exchange::new(Message::new("inonly"));
642        let _ = service.clone().oneshot(ex_inonly).await.unwrap();
643
644        // InOut SHOULD increment counter
645        let ex_inout = Exchange::new_in_out(Message::new("inout"));
646        let _ = service.clone().oneshot(ex_inout).await.unwrap();
647        assert!(
648            service.inout_counter.load(Ordering::Relaxed) > 0,
649            "InOut counter should be > 0 after InOut exchange"
650        );
651
652        service
653            .shutdown(StepShutdownReason::RouteStop)
654            .await
655            .expect("shutdown");
656    }
657
658    #[tokio::test]
659    async fn inout_guard_allow_inout_suppresses() {
660        let policy: Arc<dyn ResequencePolicy> = Arc::new(PassthroughPolicy);
661        let (tx, _rx) = mpsc::unbounded_channel::<Exchange>();
662        let post: BoxCloneService<Exchange, Exchange, CamelError> =
663            BoxCloneService::new(CapturePost { tx });
664        let config = ResequencerConfig {
665            allow_inout: true,
666            ..Default::default()
667        };
668        let service = ResequencerService::with_config(policy, post, 16, vec![], config);
669
670        let ex_inout = Exchange::new_in_out(Message::new("inout-allowed"));
671        let _ = service.clone().oneshot(ex_inout).await.unwrap();
672        assert_eq!(
673            service.inout_counter.load(Ordering::Relaxed),
674            0,
675            "InOut counter should be 0 when allow_inout=true"
676        );
677
678        service
679            .shutdown(StepShutdownReason::RouteStop)
680            .await
681            .expect("shutdown");
682    }
683}