use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use async_trait::async_trait;
use tokio::sync::{Mutex as TokioMutex, mpsc};
use tokio::task::JoinHandle;
use tower::Service;
use tower::util::BoxCloneService;
pub mod batch;
pub mod stream;
use camel_api::{
CamelError, MetricsCollector, StepLifecycle, StepShutdownReason, exchange::Exchange,
message::Message, processor::SyncBoxProcessor,
};
const INOUT_WARN_INTERVAL: Duration = Duration::from_secs(30);
#[derive(Clone, Default)]
pub struct ResequencerConfig {
pub allow_inout: bool,
pub metrics: Option<Arc<dyn MetricsCollector>>,
pub route_id: Option<String>,
}
impl std::fmt::Debug for ResequencerConfig {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ResequencerConfig")
.field("allow_inout", &self.allow_inout)
.field("metrics", &self.metrics.as_ref().map(|_| "<metrics>"))
.field("route_id", &self.route_id)
.finish()
}
}
pub const CAMEL_RESEQUENCER_ACCEPTED: &str = "CamelResequencerAccepted";
pub const CAMEL_RESEQUENCER_DROPPED: &str = "CamelResequencerDropped";
pub const CAMEL_RESEQUENCER_INOUT_WARN: &str = "CamelResequencerInoutWarn";
#[async_trait]
pub trait ResequencePolicy: Send + Sync + 'static {
async fn accept(&self, input: Exchange) -> Vec<Exchange>;
async fn flush(&self) -> Vec<Exchange>;
fn name(&self) -> &'static str;
fn set_timeout_tx(&self, _tx: tokio::sync::mpsc::Sender<Exchange>) {}
}
#[derive(Clone)]
pub struct ResequencerService {
policy: Arc<dyn ResequencePolicy>,
config: ResequencerConfig,
input_tx: Arc<Mutex<Option<mpsc::Sender<Exchange>>>>,
driver_tx: Arc<Mutex<Option<mpsc::Sender<Exchange>>>>,
actor_handle: Arc<Mutex<Option<JoinHandle<()>>>>,
driver_handle: Arc<Mutex<Option<JoinHandle<()>>>>,
shutdown_started: Arc<Mutex<bool>>,
post_lifecycles: Arc<Mutex<Vec<Arc<dyn StepLifecycle>>>>,
inout_counter: Arc<AtomicU64>,
last_inout_warn: Arc<TokioMutex<Option<Instant>>>,
metrics: Option<Arc<dyn MetricsCollector>>,
route_id: Option<String>,
}
impl std::fmt::Debug for ResequencerService {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ResequencerService")
.field("policy", &self.policy.name())
.finish_non_exhaustive()
}
}
impl ResequencerService {
pub fn new(
policy: Arc<dyn ResequencePolicy>,
post_continuation: BoxCloneService<Exchange, Exchange, CamelError>,
input_capacity: usize,
post_lifecycles: Vec<Arc<dyn StepLifecycle>>,
) -> Self {
Self::with_config(
policy,
post_continuation,
input_capacity,
post_lifecycles,
ResequencerConfig::default(),
)
}
pub fn with_config(
policy: Arc<dyn ResequencePolicy>,
post_continuation: BoxCloneService<Exchange, Exchange, CamelError>,
input_capacity: usize,
post_lifecycles: Vec<Arc<dyn StepLifecycle>>,
config: ResequencerConfig,
) -> Self {
let (input_tx, mut input_rx) = mpsc::channel::<Exchange>(input_capacity);
let (driver_tx, mut driver_rx) = mpsc::channel::<Exchange>(input_capacity);
let input_tx_shared: Arc<Mutex<Option<mpsc::Sender<Exchange>>>> =
Arc::new(Mutex::new(Some(input_tx)));
let driver_tx_shared: Arc<Mutex<Option<mpsc::Sender<Exchange>>>> =
Arc::new(Mutex::new(Some(driver_tx.clone()))); let actor_handle: Arc<Mutex<Option<JoinHandle<()>>>> = Arc::new(Mutex::new(None));
let driver_handle: Arc<Mutex<Option<JoinHandle<()>>>> = Arc::new(Mutex::new(None));
let shutdown_started = Arc::new(Mutex::new(false));
policy.set_timeout_tx(driver_tx.clone());
let sync_post = SyncBoxProcessor::new(post_continuation);
{
let policy = Arc::clone(&policy);
let actor_h = Arc::clone(&actor_handle);
let actor_driver_tx = driver_tx; let handle = tokio::spawn(async move {
while let Some(input) = input_rx.recv().await {
let ready = policy.accept(input).await;
for ex in ready {
if actor_driver_tx.send(ex).await.is_err() {
return;
}
}
}
});
*actor_h.lock().expect("actor_handle lock poisoned") = Some(handle); }
{
let post = sync_post.clone();
let driver_h = Arc::clone(&driver_handle);
let metrics = config.metrics.clone();
let route_id = config.route_id.clone();
let handle = tokio::spawn(async move {
while let Some(ex) = driver_rx.recv().await {
if camel_api::is_camel_stop(&ex) {
tracing::debug!(
"resequencer post-driver: skipping continuation for CamelStop exchange"
);
continue;
}
let mut proc = post.clone_inner();
match proc.call(ex).await {
Ok(_) => {}
Err(e) => {
tracing::warn!(
error = %e,
"resequencer post-driver: continuation call failed after ack (best-effort)"
);
if let Some(ref m) = metrics {
m.increment_errors(
route_id.as_deref().unwrap_or("unknown"),
"resequencer:post_ack_failure",
);
}
}
}
}
});
*driver_h.lock().expect("driver_handle lock poisoned") = Some(handle); }
let metrics = config.metrics.clone();
let route_id = config.route_id.clone();
Self {
policy,
config,
input_tx: input_tx_shared,
driver_tx: driver_tx_shared,
actor_handle,
driver_handle,
shutdown_started,
post_lifecycles: Arc::new(Mutex::new(post_lifecycles)),
inout_counter: Arc::new(AtomicU64::new(0)),
last_inout_warn: Arc::new(TokioMutex::new(None)),
metrics,
route_id,
}
}
}
impl Service<Exchange> for ResequencerService {
type Response = Exchange;
type Error = CamelError;
type Future =
std::pin::Pin<Box<dyn std::future::Future<Output = Result<Exchange, CamelError>> + Send>>;
fn poll_ready(
&mut self,
_cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), CamelError>> {
std::task::Poll::Ready(Ok(()))
}
fn call(&mut self, input: Exchange) -> Self::Future {
let config = self.config.clone();
let inout_counter = Arc::clone(&self.inout_counter);
let last_inout_warn = Arc::clone(&self.last_inout_warn);
let tx_opt = Arc::clone(&self.input_tx);
let metrics = self.metrics.clone();
let route_id = self.route_id.clone();
Box::pin(async move {
let mut ack = Exchange::new(Message::default());
if input.pattern == camel_api::exchange::ExchangePattern::InOut && !config.allow_inout {
inout_counter.fetch_add(1, Ordering::Relaxed);
ack.set_property(CAMEL_RESEQUENCER_INOUT_WARN, true);
if let Some(ref m) = metrics {
m.increment_errors(
route_id.as_deref().unwrap_or("unknown"),
"resequencer:inout_warning",
);
}
let now = Instant::now();
let mut last_guard = last_inout_warn.lock().await;
let should_warn = last_guard
.map(|t| now.duration_since(t) >= INOUT_WARN_INTERVAL)
.unwrap_or(true);
if should_warn {
let count = inout_counter.load(Ordering::Relaxed);
tracing::warn!(
inout_count = count,
"InOut exchange reached resequencer ({count} total); \
consider using InOnly pattern. \
Set allow_inout=true to suppress this warning."
);
*last_guard = Some(now);
}
}
let tx = {
let guard = tx_opt.lock().unwrap_or_else(|e| e.into_inner());
guard.clone()
};
if let Some(tx) = tx {
match tx.send(input).await {
Ok(()) => {
ack.set_property(CAMEL_RESEQUENCER_ACCEPTED, true);
}
Err(tokio::sync::mpsc::error::SendError(input)) => {
tracing::warn!(
correlation_id = %input.correlation_id,
"resequencer input dropped during shutdown"
);
ack.set_property(CAMEL_RESEQUENCER_ACCEPTED, false);
ack.set_property(CAMEL_RESEQUENCER_DROPPED, true);
}
}
}
Ok(ack)
})
}
}
#[async_trait]
impl StepLifecycle for ResequencerService {
fn name(&self) -> &'static str {
self.policy.name()
}
async fn shutdown(&self, reason: StepShutdownReason) -> Result<(), CamelError> {
tracing::debug!(
reason = ?reason,
policy = self.policy.name(),
"ResequencerService shutdown via StepLifecycle"
);
{
let mut started = self
.shutdown_started
.lock()
.unwrap_or_else(|e| e.into_inner());
if *started {
tracing::debug!(
"ResequencerService shutdown already started (idempotent); skipping"
);
return Ok(());
}
*started = true;
}
{
let mut guard = self.input_tx.lock().unwrap_or_else(|e| e.into_inner());
*guard = None; }
let actor_handle_to_await = {
let mut guard = self.actor_handle.lock().unwrap_or_else(|e| e.into_inner());
guard.take()
};
if let Some(handle) = actor_handle_to_await {
let _ = tokio::time::timeout(Duration::from_secs(5), handle).await;
}
let flushed = self.policy.flush().await;
if !flushed.is_empty() {
let dt = {
let guard = self.driver_tx.lock().unwrap_or_else(|e| e.into_inner());
guard.clone()
};
if let Some(driver_tx) = dt {
for ex in flushed {
if driver_tx.send(ex).await.is_err() {
tracing::warn!(
"resequencer shutdown flush: post-driver channel closed early"
);
break;
}
}
}
}
{
let mut guard = self.driver_tx.lock().unwrap_or_else(|e| e.into_inner());
*guard = None; }
let driver_handle_to_await = {
let mut guard = self.driver_handle.lock().unwrap_or_else(|e| e.into_inner());
guard.take()
};
if let Some(handle) = driver_handle_to_await {
let result = tokio::time::timeout(Duration::from_secs(5), handle).await;
if result.is_err() {
tracing::warn!(
"resequencer post-driver task did not finish within 5s deadline; \
leaking handle (best-effort)"
);
}
}
{
let post_lcs: Vec<Arc<dyn StepLifecycle>> = {
let mut guard = self
.post_lifecycles
.lock()
.unwrap_or_else(|e| e.into_inner());
std::mem::take(&mut *guard)
};
for lc in &post_lcs {
if let Err(e) = lc.shutdown(reason).await {
tracing::warn!(
step = lc.name(),
error = %e,
"resequencer post-step lifecycle shutdown failed (best-effort)"
);
}
}
}
Ok(())
}
}
#[derive(Debug)]
pub struct PassthroughPolicy;
#[async_trait]
impl ResequencePolicy for PassthroughPolicy {
async fn accept(&self, input: Exchange) -> Vec<Exchange> {
vec![input]
}
async fn flush(&self) -> Vec<Exchange> {
vec![]
}
fn name(&self) -> &'static str {
"passthrough"
}
}
#[cfg(test)]
mod tests {
use super::*;
use tower::ServiceExt;
#[derive(Clone)]
struct CapturePost {
tx: mpsc::UnboundedSender<Exchange>,
}
impl Service<Exchange> for CapturePost {
type Response = Exchange;
type Error = CamelError;
type Future = std::pin::Pin<
Box<dyn std::future::Future<Output = Result<Exchange, CamelError>> + Send>,
>;
fn poll_ready(
&mut self,
_cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), CamelError>> {
std::task::Poll::Ready(Ok(()))
}
fn call(&mut self, exchange: Exchange) -> Self::Future {
let tx = self.tx.clone();
Box::pin(async move {
let _ = tx.send(exchange.clone());
Ok(exchange)
})
}
}
#[tokio::test]
async fn resequencer_boundary_passthrough_ack_and_continuation() {
use camel_api::body::Body;
let policy: Arc<dyn ResequencePolicy> = Arc::new(PassthroughPolicy);
let (capture_tx, mut capture_rx) = mpsc::unbounded_channel::<Exchange>();
let capture = CapturePost { tx: capture_tx };
let post_continuation: BoxCloneService<Exchange, Exchange, CamelError> =
BoxCloneService::new(capture);
let service = ResequencerService::new(policy, post_continuation, 1024, vec![]);
let mut input = Exchange::new(Message::new(Body::Text("hello".into())));
input.set_property("seq", 1);
let ack = service.clone().oneshot(input).await.unwrap();
assert!(
matches!(ack.input.body, Body::Empty),
"ack body should be Empty, got {:?}",
ack.input.body
);
assert_eq!(
ack.property(CAMEL_RESEQUENCER_ACCEPTED)
.and_then(|v| v.as_bool()),
Some(true),
"CAMEL_RESEQUENCER_ACCEPTED should be true"
);
let captured = tokio::time::timeout(Duration::from_millis(500), capture_rx.recv())
.await
.expect("post-continuation did not receive exchange within 500ms timeout")
.expect("capture channel closed without receiving exchange");
let body = captured.input.body.as_text();
assert_eq!(
body,
Some("hello"),
"post-continuation received body should match"
);
service
.shutdown(StepShutdownReason::RouteStop)
.await
.expect("first shutdown should succeed");
service
.shutdown(StepShutdownReason::RouteStop)
.await
.expect("second shutdown should succeed (idempotent)");
}
#[tokio::test]
async fn resequencer_boundary_camel_stop_skipped() {
use camel_api::body::Body;
let policy: Arc<dyn ResequencePolicy> = Arc::new(PassthroughPolicy);
let (capture_tx, mut capture_rx) = mpsc::unbounded_channel::<Exchange>();
let capture = CapturePost { tx: capture_tx };
let post_continuation: BoxCloneService<Exchange, Exchange, CamelError> =
BoxCloneService::new(capture);
let service = ResequencerService::new(policy, post_continuation, 1024, vec![]);
let mut input = Exchange::new(Message::new(Body::Text(
"should-not-reach-continuation".into(),
)));
input.set_property(camel_api::exchange::CAMEL_STOP, true);
let ack = service.clone().oneshot(input).await.unwrap();
assert_eq!(
ack.property(CAMEL_RESEQUENCER_ACCEPTED)
.and_then(|v| v.as_bool()),
Some(true),
"CamelStop exchange should still be accepted by resequencer actor"
);
let did_receive = tokio::time::timeout(Duration::from_millis(500), capture_rx.recv()).await;
match did_receive {
Ok(Some(_)) => panic!("CamelStop exchange should NOT reach post-continuation"),
Ok(None) => {} Err(_elapsed) => {} }
service
.shutdown(StepShutdownReason::RouteStop)
.await
.expect("shutdown should succeed");
}
#[tokio::test]
async fn inout_guard_increments_counter() {
let policy: Arc<dyn ResequencePolicy> = Arc::new(PassthroughPolicy);
let (tx, _rx) = mpsc::unbounded_channel::<Exchange>();
let post: BoxCloneService<Exchange, Exchange, CamelError> =
BoxCloneService::new(CapturePost { tx });
let config = ResequencerConfig::default();
let service = ResequencerService::with_config(policy, post, 16, vec![], config);
let ex_inonly = Exchange::new(Message::new("inonly"));
let _ = service.clone().oneshot(ex_inonly).await.unwrap();
let ex_inout = Exchange::new_in_out(Message::new("inout"));
let _ = service.clone().oneshot(ex_inout).await.unwrap();
assert!(
service.inout_counter.load(Ordering::Relaxed) > 0,
"InOut counter should be > 0 after InOut exchange"
);
service
.shutdown(StepShutdownReason::RouteStop)
.await
.expect("shutdown");
}
#[tokio::test]
async fn inout_guard_allow_inout_suppresses() {
let policy: Arc<dyn ResequencePolicy> = Arc::new(PassthroughPolicy);
let (tx, _rx) = mpsc::unbounded_channel::<Exchange>();
let post: BoxCloneService<Exchange, Exchange, CamelError> =
BoxCloneService::new(CapturePost { tx });
let config = ResequencerConfig {
allow_inout: true,
..Default::default()
};
let service = ResequencerService::with_config(policy, post, 16, vec![], config);
let ex_inout = Exchange::new_in_out(Message::new("inout-allowed"));
let _ = service.clone().oneshot(ex_inout).await.unwrap();
assert_eq!(
service.inout_counter.load(Ordering::Relaxed),
0,
"InOut counter should be 0 when allow_inout=true"
);
service
.shutdown(StepShutdownReason::RouteStop)
.await
.expect("shutdown");
}
}