use plexor_core::dendrite::{DendriteDecoder, DendriteError};
use plexor_core::erasure::reactant::erase_reactant;
use plexor_core::neuron::{Neuron, NeuronImpl};
use plexor_core::payload::{Payload, PayloadRaw};
use plexor_core::reactant::Reactant;
use plexor_core::backpressure::{
BackpressureConfig, BackpressureQueue, BackpressureStrategy,
};
use plexor_core::synapse::SynapseError;
use plexor_core::test_utils::{DebugCodec, DebugStruct, test_namespace};
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc;
use uuid::Uuid;
#[derive(Clone)]
struct SlowReactant {
delay: Duration,
tx: mpsc::Sender<u32>,
}
impl Reactant<DebugStruct, DebugCodec> for SlowReactant {
fn react(
&self,
payload: Arc<Payload<DebugStruct, DebugCodec>>,
) -> Pin<
Box<dyn Future<Output = Result<(), plexor_core::reactant::ReactantError>> + Send + 'static>,
> {
let delay = self.delay;
let tx = self.tx.clone();
let val = payload.value.foo as u32;
Box::pin(async move {
tokio::time::sleep(delay).await;
let _ = tx.send(val).await;
Ok(())
})
}
fn erase(
self: Box<Self>,
) -> Arc<dyn plexor_core::erasure::reactant::ReactantErased + Send + Sync + 'static> {
erase_reactant(self)
}
}
#[tokio::test]
async fn test_backpressure_reject() {
let config = BackpressureConfig {
queue_size: 2,
strategy: BackpressureStrategy::Reject,
};
let (tx, mut rx) = mpsc::channel(10);
let queue = BackpressureQueue::<u32>::new("test_reject".to_string(), config, move |item: u32| {
let tx = tx.clone();
async move {
let _ = tx.send(item).await;
tokio::time::sleep(Duration::from_millis(50)).await;
}
});
let _: Result<(), SynapseError> = queue.push(1).await;
let _: Result<(), SynapseError> = queue.push(2).await;
let result = queue.push(3).await;
assert!(matches!(result, Err(SynapseError::QueueFull { .. })));
assert_eq!(rx.recv().await, Some(1));
assert_eq!(rx.recv().await, Some(2));
}
#[tokio::test]
async fn test_backpressure_drop_oldest() {
let config = BackpressureConfig {
queue_size: 2,
strategy: BackpressureStrategy::DropOldest,
};
let (tx, mut rx) = mpsc::channel(10);
let queue = BackpressureQueue::<u32>::new("test_drop_oldest".to_string(), config, move |item: u32| {
let tx = tx.clone();
async move {
tokio::time::sleep(Duration::from_millis(100)).await;
let _ = tx.send(item).await;
}
});
let _: Result<(), SynapseError> = queue.push(1).await; tokio::time::sleep(Duration::from_millis(10)).await;
let _: Result<(), SynapseError> = queue.push(2).await; let _: Result<(), SynapseError> = queue.push(3).await;
let _: Result<(), SynapseError> = queue.push(4).await;
assert_eq!(
tokio::time::timeout(Duration::from_secs(1), rx.recv())
.await
.unwrap(),
Some(1)
);
assert_eq!(
tokio::time::timeout(Duration::from_secs(1), rx.recv())
.await
.unwrap(),
Some(3)
);
assert_eq!(
tokio::time::timeout(Duration::from_secs(1), rx.recv())
.await
.unwrap(),
Some(4)
);
assert!(
tokio::time::timeout(Duration::from_millis(100), rx.recv())
.await
.is_err()
);
}
#[tokio::test]
async fn test_backpressure_drop_newest() {
let config = BackpressureConfig {
queue_size: 2,
strategy: BackpressureStrategy::DropNewest,
};
let (tx, mut rx) = mpsc::channel(10);
let queue = BackpressureQueue::<u32>::new("test_drop_newest".to_string(), config, move |item: u32| {
let tx = tx.clone();
async move {
tokio::time::sleep(Duration::from_millis(100)).await;
let _ = tx.send(item).await;
}
});
let _: Result<(), SynapseError> = queue.push(1).await; tokio::time::sleep(Duration::from_millis(10)).await;
let _: Result<(), SynapseError> = queue.push(2).await; let _: Result<(), SynapseError> = queue.push(3).await;
let _: Result<(), SynapseError> = queue.push(4).await;
assert_eq!(
tokio::time::timeout(Duration::from_secs(1), rx.recv())
.await
.unwrap(),
Some(1)
);
assert_eq!(
tokio::time::timeout(Duration::from_secs(1), rx.recv())
.await
.unwrap(),
Some(2)
);
assert_eq!(
tokio::time::timeout(Duration::from_secs(1), rx.recv())
.await
.unwrap(),
Some(3)
);
assert!(
tokio::time::timeout(Duration::from_millis(100), rx.recv())
.await
.is_err()
);
}
#[tokio::test]
async fn test_backpressure_block() {
let config = BackpressureConfig {
queue_size: 1, strategy: BackpressureStrategy::Block,
};
let (tx, mut rx) = mpsc::channel(10);
let queue = BackpressureQueue::<u32>::new("test_block".to_string(), config, move |item: u32| {
let tx = tx.clone();
async move {
tokio::time::sleep(Duration::from_millis(100)).await;
let _ = tx.send(item).await;
}
});
let queue = Arc::new(queue);
let _: Result<(), SynapseError> = queue.push(1).await; let _: Result<(), SynapseError> = queue.push(2).await;
let start = std::time::Instant::now();
let queue_clone = queue.clone();
let handle = tokio::spawn(async move {
let _: Result<(), SynapseError> = queue_clone.push(3).await;
});
tokio::time::sleep(Duration::from_millis(20)).await;
assert!(!handle.is_finished(), "Push 3 should be blocked");
let _ = handle.await;
let duration = start.elapsed();
assert!(
duration >= Duration::from_millis(80),
"Should have blocked for processing time"
);
assert_eq!(rx.recv().await, Some(1));
assert_eq!(rx.recv().await, Some(2));
assert_eq!(rx.recv().await, Some(3));
}
#[tokio::test]
async fn test_ingress_backpressure_dendrite_decoder_reject() {
let ns = test_namespace();
let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns.clone());
let neuron_arc: Arc<dyn Neuron<DebugStruct, DebugCodec> + Send + Sync + '_> =
Arc::new(neuron_impl.clone());
let (tx, mut rx) = mpsc::channel(10);
let slow_reactant = SlowReactant {
delay: Duration::from_millis(100),
tx,
};
let reactants: Vec<Arc<dyn Reactant<DebugStruct, DebugCodec> + Send + Sync>> =
vec![Arc::new(slow_reactant)];
let backpressure_config = BackpressureConfig {
queue_size: 1,
strategy: BackpressureStrategy::Reject,
};
let dendrite_decoder = DendriteDecoder::new(
neuron_arc.clone(),
reactants,
vec![], vec![], Some(backpressure_config),
);
let create_payload = |val: i32| {
let debug_struct = DebugStruct {
foo: val,
bar: "test".to_string(),
};
let encoded = neuron_impl.encode(&debug_struct).unwrap();
PayloadRaw::with_correlation(encoded, neuron_arc.clone(), Some(Uuid::now_v7()))
};
dendrite_decoder
.transduce(create_payload(1))
.await
.expect("Push 1 success");
tokio::time::sleep(Duration::from_millis(10)).await;
dendrite_decoder
.transduce(create_payload(2))
.await
.expect("Push 2 success");
let result = dendrite_decoder.transduce(create_payload(3)).await;
assert!(
matches!(result, Err(DendriteError::ReactantsWriteLock { .. })),
"Expected rejection, got {result:?}"
);
assert_eq!(rx.recv().await, Some(1));
assert_eq!(rx.recv().await, Some(2));
assert!(
tokio::time::timeout(Duration::from_millis(200), rx.recv())
.await
.is_err()
);
}