use arc_swap::ArcSwap;
use camel_api::{BoxProcessor, IdentityProcessor, SyncBoxProcessor};
use std::sync::Arc;
pub(crate) type SharedPipeline = Arc<ArcSwap<SyncBoxProcessor>>;
pub(crate) fn new_shared_pipeline(processor: BoxProcessor) -> SharedPipeline {
Arc::new(ArcSwap::from_pointee(SyncBoxProcessor::new(processor)))
}
#[allow(dead_code)]
pub(crate) fn new_identity_pipeline() -> SharedPipeline {
Arc::new(ArcSwap::from_pointee(SyncBoxProcessor::new(
BoxProcessor::new(IdentityProcessor),
)))
}
pub(crate) fn swap_pipeline(pipeline: &SharedPipeline, new_processor: BoxProcessor) {
pipeline.store(Arc::new(SyncBoxProcessor::new(new_processor)));
}
pub(crate) fn get_pipeline(pipeline: &SharedPipeline) -> BoxProcessor {
pipeline.load().clone_inner()
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicUsize, Ordering};
use tower::ServiceExt;
#[tokio::test]
async fn swap_and_get_pipeline() {
let shared = new_identity_pipeline();
let mut p1 = get_pipeline(&shared);
assert!(p1.ready().await.is_ok());
let new_proc = BoxProcessor::new(IdentityProcessor);
swap_pipeline(&shared, new_proc);
let mut p2 = get_pipeline(&shared);
assert!(p2.ready().await.is_ok());
}
#[tokio::test]
async fn concurrent_access() {
let shared = new_identity_pipeline();
let counter = Arc::new(AtomicUsize::new(0));
let mut handles = Vec::new();
for i in 0..4 {
let s = Arc::clone(&shared);
let c = Arc::clone(&counter);
handles.push(tokio::spawn(async move {
if i % 2 == 0 {
let new_proc = BoxProcessor::new(IdentityProcessor);
swap_pipeline(&s, new_proc);
c.fetch_add(1, Ordering::Relaxed);
} else {
let _pipe = get_pipeline(&s);
c.fetch_add(1, Ordering::Relaxed);
}
}));
}
for h in handles {
h.await.unwrap();
}
assert_eq!(counter.load(Ordering::Relaxed), 4);
}
}