plexor-core 0.1.0-alpha.2

Core library for the rust implementation of the Plexo distributed system architecture, providing the fundamental Plexus, Neuron, Codec, and Axon abstractions.
Documentation
// Copyright 2025 Alecks Gates
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.

use plexor_core::erasure::payload::erase_payload;
use plexor_core::erasure::reactant::erase_reactant;
use plexor_core::ganglion::{GanglionInprocess, GanglionInternal};
use plexor_core::neuron::{Neuron, NeuronImpl};
use plexor_core::payload::Payload;
use plexor_core::reactant::Reactant;
use plexor_core::test_utils::{DebugCodec, DebugStruct, test_namespace};
use plexor_core::thalamus::Thalamus;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use tokio::sync::Mutex;
use tokio::sync::mpsc::{Sender, channel};
use uuid::Uuid;

#[derive(Clone)]
struct DistributionReactant {
    id: usize,
    tx: Sender<usize>,
}

impl Reactant<DebugStruct, DebugCodec> for DistributionReactant {
    fn react(
        &self,
        _payload: Arc<Payload<DebugStruct, DebugCodec>>,
    ) -> Pin<
        Box<dyn Future<Output = Result<(), plexor_core::reactant::ReactantError>> + Send + 'static>,
    > {
        let tx = self.tx.clone();
        let id = self.id;
        Box::pin(async move {
            let _ = tx.try_send(id);
            Ok(())
        })
    }

    fn erase(
        self: Box<Self>,
    ) -> Arc<dyn plexor_core::erasure::reactant::ReactantErased + Send + Sync + 'static> {
        erase_reactant(self)
    }
}

#[tokio::test]
async fn test_thalamus_load_balancing_and_tracing() {
    let ns = test_namespace();
    let neuron_impl = NeuronImpl::<DebugStruct, DebugCodec>::new(ns);
    let neuron_arc: Arc<dyn Neuron<DebugStruct, DebugCodec> + Send + Sync> = Arc::new(neuron_impl);
    let neuron_name = neuron_arc.name();

    // Create 3 internal peers
    let g1 = Arc::new(Mutex::new(GanglionInprocess::new()));
    let g2 = Arc::new(Mutex::new(GanglionInprocess::new()));
    let g3 = Arc::new(Mutex::new(GanglionInprocess::new()));

    let ganglia = [g1.clone(), g2.clone(), g3.clone()];

    // Adapt all peers to the neuron at once
    plexor_core::ganglion::adapt_all(&ganglia, neuron_arc.clone())
        .await
        .expect("Adapt all failed");

    let (tx, mut rx) = channel(30);

    // Attach distribution reactants to each peer
    for (i, g) in ganglia.iter().enumerate() {
        let mut guard = g.lock().await;
        let reactant = DistributionReactant {
            id: i,
            tx: tx.clone(),
        };
        guard
            .react(neuron_name.clone(), vec![reactant.new_erased()], vec![])
            .await
            .expect("React failed");
    }

    // Create Thalamus
    let mut thalamus = Thalamus::new(vec![g1, g2, g3]);

    // Transmit 9 messages
    let _correlation_id = Uuid::now_v7();
    for i in 0..9 {
        let payload = Payload::with_correlation(
            DebugStruct {
                foo: i,
                bar: "load balanced msg".to_string(),
            },
            neuron_arc.clone(),
            Some(_correlation_id),
        );

        thalamus
            .transmit(erase_payload(payload))
            .await
            .expect("Transmit failed");
    }

    // Collect results
    let mut counts = [0, 0, 0];
    for _ in 0..9 {
        let id = rx.recv().await.expect("No message");
        counts[id] += 1;
    }

    // Verify round-robin distribution
    assert_eq!(counts[0], 3);
    assert_eq!(counts[1], 3);
    assert_eq!(counts[2], 3);
}