plexor-core 0.1.0-alpha.2

Core library for the rust implementation of the Plexo distributed system architecture, providing the fundamental Plexus, Neuron, Codec, and Axon abstractions.
Documentation
// Copyright 2025 Alecks Gates
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.

use plexor_core::dendrite::{DendriteDecoder, DendriteError};
use plexor_core::erasure::reactant::erase_reactant;
use plexor_core::neuron::{Neuron, NeuronImpl};
use plexor_core::payload::{Payload, PayloadRaw};
use plexor_core::reactant::Reactant;
use plexor_core::backpressure::{
    BackpressureConfig, BackpressureQueue, BackpressureStrategy,
};
use plexor_core::synapse::SynapseError;
use plexor_core::test_utils::{DebugCodec, DebugStruct, test_namespace};
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc;
use uuid::Uuid;

#[derive(Clone)]
struct SlowReactant {
    delay: Duration,
    tx: mpsc::Sender<u32>,
}

impl Reactant<DebugStruct, DebugCodec> for SlowReactant {
    fn react(
        &self,
        payload: Arc<Payload<DebugStruct, DebugCodec>>,
    ) -> Pin<
        Box<dyn Future<Output = Result<(), plexor_core::reactant::ReactantError>> + Send + 'static>,
    > {
        let delay = self.delay;
        let tx = self.tx.clone();
        let val = payload.value.foo as u32;
        Box::pin(async move {
            tokio::time::sleep(delay).await;
            let _ = tx.send(val).await;
            Ok(())
        })
    }

    fn erase(
        self: Box<Self>,
    ) -> Arc<dyn plexor_core::erasure::reactant::ReactantErased + Send + Sync + 'static> {
        erase_reactant(self)
    }
}

#[tokio::test]
async fn test_backpressure_reject() {
    let config = BackpressureConfig {
        queue_size: 2,
        strategy: BackpressureStrategy::Reject,
    };

    let (tx, mut rx) = mpsc::channel(10);

    // Create a queue with a processor that forwards to a channel
    let queue = BackpressureQueue::<u32>::new("test_reject".to_string(), config, move |item: u32| {
        let tx = tx.clone();
        async move {
            let _ = tx.send(item).await;
            // Simulate slow processing
            tokio::time::sleep(Duration::from_millis(50)).await;
        }
    });

    // Fill the queue
    let _: Result<(), SynapseError> = queue.push(1).await;
    let _: Result<(), SynapseError> = queue.push(2).await;

    // The third push should be rejected because queue size is 2 and consumer is slow
    let result = queue.push(3).await;
    assert!(matches!(result, Err(SynapseError::QueueFull { .. })));

    // Verify received items
    assert_eq!(rx.recv().await, Some(1));
    assert_eq!(rx.recv().await, Some(2));
}

#[tokio::test]
async fn test_backpressure_drop_oldest() {
    let config = BackpressureConfig {
        queue_size: 2,
        strategy: BackpressureStrategy::DropOldest,
    };

    let (tx, mut rx) = mpsc::channel(10);

    // Create a queue where the processor waits for a signal to start
    // This ensures the queue fills up completely before any processing happens
    let queue = BackpressureQueue::<u32>::new("test_drop_oldest".to_string(), config, move |item: u32| {
        let tx = tx.clone();
        // We can't easily wait here for the *first* item without blocking others if we use a shared channel
        // But we can just rely on the fact that queue fills up synchronously in the test
        // if we push fast enough before the worker picks up.
        // A better way is to make the processor slow.
        async move {
            // If it's the first item, wait? No, the worker picks up one item immediately and starts processing.
            // So queue capacity is effectively queue_size (in VecDeque) + 1 (in worker future).
            // Actually BackpressureQueue pops ONE item and awaits the processor.
            // So 1 item is in processing, queue_size items are in queue.
            // If queue_size is 2, we can fit 1 (processing) + 2 (queued) = 3 items before strategy triggers?
            // No, push checks `queue.len() < queue_size`.
            // So if queue holds 2 items, and worker holds 1, we can have 3 in flight.
            // Let's make processor slow.
            tokio::time::sleep(Duration::from_millis(100)).await;
            let _ = tx.send(item).await;
        }
    });

    // Push items rapidly
    let _: Result<(), SynapseError> = queue.push(1).await; // Worker picks this up immediately (processing)
    // Give the worker a moment to pick up the first item
    tokio::time::sleep(Duration::from_millis(10)).await;

    let _: Result<(), SynapseError> = queue.push(2).await; // In queue (size 1)
    let _: Result<(), SynapseError> = queue.push(3).await; // In queue (size 2) - Queue FULL now

    // Next push should drop oldest in queue.
    // Queue contains [2, 3]. Oldest is 2.
    // So 2 should be dropped, 4 added. Queue becomes [3, 4].
    let _: Result<(), SynapseError> = queue.push(4).await;

    // Verify results
    // 1 should be processed (was picked up immediately)
    assert_eq!(
        tokio::time::timeout(Duration::from_secs(1), rx.recv())
            .await
            .unwrap(),
        Some(1)
    );

    // 2 should be dropped. 3 should be next.
    assert_eq!(
        tokio::time::timeout(Duration::from_secs(1), rx.recv())
            .await
            .unwrap(),
        Some(3)
    );

    // 4 should be last.
    assert_eq!(
        tokio::time::timeout(Duration::from_secs(1), rx.recv())
            .await
            .unwrap(),
        Some(4)
    );

    // Ensure no more items
    assert!(
        tokio::time::timeout(Duration::from_millis(100), rx.recv())
            .await
            .is_err()
    );
}

#[tokio::test]
async fn test_backpressure_drop_newest() {
    let config = BackpressureConfig {
        queue_size: 2,
        strategy: BackpressureStrategy::DropNewest,
    };

    let (tx, mut rx) = mpsc::channel(10);

    let queue = BackpressureQueue::<u32>::new("test_drop_newest".to_string(), config, move |item: u32| {
        let tx = tx.clone();
        async move {
            tokio::time::sleep(Duration::from_millis(100)).await;
            let _ = tx.send(item).await;
        }
    });

    // Push items rapidly
    let _: Result<(), SynapseError> = queue.push(1).await; // Processing
    // Give the worker a moment to pick up the first item
    tokio::time::sleep(Duration::from_millis(10)).await;

    let _: Result<(), SynapseError> = queue.push(2).await; // In queue
    let _: Result<(), SynapseError> = queue.push(3).await; // In queue - Queue FULL

    // Next push should be dropped (it is the newest)
    let _: Result<(), SynapseError> = queue.push(4).await; // Dropped

    // Verify results
    assert_eq!(
        tokio::time::timeout(Duration::from_secs(1), rx.recv())
            .await
            .unwrap(),
        Some(1)
    );
    assert_eq!(
        tokio::time::timeout(Duration::from_secs(1), rx.recv())
            .await
            .unwrap(),
        Some(2)
    );
    assert_eq!(
        tokio::time::timeout(Duration::from_secs(1), rx.recv())
            .await
            .unwrap(),
        Some(3)
    );

    // 4 should be missing
    assert!(
        tokio::time::timeout(Duration::from_millis(100), rx.recv())
            .await
            .is_err()
    );
}

#[tokio::test]
async fn test_backpressure_block() {
    let config = BackpressureConfig {
        queue_size: 1, // Small queue
        strategy: BackpressureStrategy::Block,
    };

    let (tx, mut rx) = mpsc::channel(10);

    let queue = BackpressureQueue::<u32>::new("test_block".to_string(), config, move |item: u32| {
        let tx = tx.clone();
        async move {
            // Simulate processing time
            tokio::time::sleep(Duration::from_millis(100)).await;
            let _ = tx.send(item).await;
        }
    });

    // Use a Arc<BackpressureQueue> to spawn a blocking push task
    let queue = Arc::new(queue);

    let _: Result<(), SynapseError> = queue.push(1).await; // Processing
    let _: Result<(), SynapseError> = queue.push(2).await; // In queue (size 1 - FULL)

    let start = std::time::Instant::now();
    let queue_clone = queue.clone();

    // This push should block until 1 finishes and 2 moves to processing
    let handle = tokio::spawn(async move {
        let _: Result<(), SynapseError> = queue_clone.push(3).await;
    });

    // Check that handle doesn't finish immediately (give it a small margin)
    tokio::time::sleep(Duration::from_millis(20)).await;
    assert!(!handle.is_finished(), "Push 3 should be blocked");

    // Wait for everything to complete
    let _ = handle.await;
    let duration = start.elapsed();

    // Should have taken at least ~100ms (processing time of 1)
    assert!(
        duration >= Duration::from_millis(80),
        "Should have blocked for processing time"
    );

    assert_eq!(rx.recv().await, Some(1));
    assert_eq!(rx.recv().await, Some(2));
    assert_eq!(rx.recv().await, Some(3));
}

#[tokio::test]
async fn test_ingress_backpressure_dendrite_decoder_reject() {
    let ns = test_namespace();
    let neuron_impl: NeuronImpl<DebugStruct, DebugCodec> = NeuronImpl::new(ns.clone());
    let neuron_arc: Arc<dyn Neuron<DebugStruct, DebugCodec> + Send + Sync + '_> =
        Arc::new(neuron_impl.clone());

    let (tx, mut rx) = mpsc::channel(10);

    // 1. Create SlowReactant
    let slow_reactant = SlowReactant {
        delay: Duration::from_millis(100),
        tx,
    };
    let reactants: Vec<Arc<dyn Reactant<DebugStruct, DebugCodec> + Send + Sync>> =
        vec![Arc::new(slow_reactant)];

    // 2. Create DendriteDecoder with small queue and Reject strategy
    let backpressure_config = BackpressureConfig {
        queue_size: 1,
        strategy: BackpressureStrategy::Reject,
    };

    let dendrite_decoder = DendriteDecoder::new(
        neuron_arc.clone(),
        reactants,
        vec![], // No raw reactants
        vec![], // No error reactants
        Some(backpressure_config),
    );

    // Helper to create PayloadRaw
    let create_payload = |val: i32| {
        let debug_struct = DebugStruct {
            foo: val,
            bar: "test".to_string(),
        };
        let encoded = neuron_impl.encode(&debug_struct).unwrap();
        PayloadRaw::with_correlation(encoded, neuron_arc.clone(), Some(Uuid::now_v7()))
    };

    // 3. Push payloads
    // Push 1: Starts processing immediately (worker picks it up)
    dendrite_decoder
        .transduce(create_payload(1))
        .await
        .expect("Push 1 success");

    // Give worker a tiny bit of time to pop it from queue
    tokio::time::sleep(Duration::from_millis(10)).await;

    // Push 2: Goes into queue (size 1/1)
    dendrite_decoder
        .transduce(create_payload(2))
        .await
        .expect("Push 2 success");

    // Push 3: Should be REJECTED because queue is full (size 1) and worker is busy with 1
    let result = dendrite_decoder.transduce(create_payload(3)).await;

    // DendriteDecoder maps QueueFull to ReactantsWriteLock (verify this mapping in code)
    assert!(
        matches!(result, Err(DendriteError::ReactantsWriteLock { .. })),
        "Expected rejection, got {result:?}"
    );

    // Verify we receive 1 and 2 eventually
    assert_eq!(rx.recv().await, Some(1));
    assert_eq!(rx.recv().await, Some(2));
    // Should not receive 3
    assert!(
        tokio::time::timeout(Duration::from_millis(200), rx.recv())
            .await
            .is_err()
    );
}