plexor-core 0.1.0-alpha.2

Core library for the rust implementation of the Plexo distributed system architecture, providing the fundamental Plexus, Neuron, Codec, and Axon abstractions.
Documentation
// Copyright 2025 Alecks Gates
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.

use plexor_core::erasure::neuron::erase_neuron;
use plexor_core::erasure::payload::erase_payload;
use plexor_core::erasure::reactant::erase_reactant;
use plexor_core::ganglion::{Ganglion, GanglionInternal};
use plexor_core::neuron::{Neuron, NeuronImpl};
use plexor_core::payload::Payload;
use plexor_core::plexus::Plexus;
use plexor_core::reactant::Reactant;
use plexor_core::test_utils::{DebugCodec, DebugStruct, test_namespace};
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc::{Sender, channel};
use uuid::Uuid;

#[derive(Clone)]
struct TestReactant {
    tx: Sender<Arc<Payload<DebugStruct, DebugCodec>>>,
}

impl Reactant<DebugStruct, DebugCodec> for TestReactant {
    fn react(
        &self,
        payload: Arc<Payload<DebugStruct, DebugCodec>>,
    ) -> Pin<
        Box<dyn Future<Output = Result<(), plexor_core::reactant::ReactantError>> + Send + 'static>,
    > {
        let tx = self.tx.clone();
        Box::pin(async move {
            let _ = tx.try_send(payload);
            Ok(())
        })
    }

    fn erase(
        self: Box<Self>,
    ) -> Arc<dyn plexor_core::erasure::reactant::ReactantErased + Send + Sync + 'static> {
        erase_reactant(self)
    }
}

#[tokio::test]
async fn test_plexus_full_integration_and_tracing() {
    let ns = test_namespace();
    let neuron_impl = NeuronImpl::<DebugStruct, DebugCodec>::new(ns);
    let neuron_arc: Arc<dyn Neuron<DebugStruct, DebugCodec> + Send + Sync> = Arc::new(neuron_impl);
    let neuron_name = neuron_arc.name();

    // Create Plexus
    let mut plexus = Plexus::new(vec![erase_neuron(neuron_arc.clone())], vec![]).await;

    // Create a local reactant to receive messages
    let (tx, mut rx) = channel(10);
    let reactant = TestReactant { tx };

    // Adapt Plexus to the neuron and add reactant
    plexus
        .adapt(neuron_arc.clone())
        .await
        .expect("Adapt failed");
    plexus
        .react(neuron_name.clone(), vec![reactant.new_erased()], vec![])
        .await
        .expect("React failed");

    // 1. Internal Transmission via Plexus::transmit
    let correlation_id = Uuid::now_v7();
    let span_id = Uuid::now_v7().as_u128() as u64;
    let payload = Payload::builder()
        .value(DebugStruct {
            foo: 1,
            bar: "test message".to_string(),
        })
        .neuron(neuron_arc.clone())
        .correlation_id(correlation_id)
        .span_id(span_id)
        .build()
        .expect("Failed to build payload");

    plexus
        .transmit(erase_payload(payload))
        .await
        .expect("Transmit failed");

    let received = tokio::time::timeout(Duration::from_millis(100), rx.recv())
        .await
        .expect("Timeout")
        .expect("No message");

    assert_eq!(received.value.foo, 1);
    assert_eq!(received.correlation_id(), correlation_id);
    assert_eq!(received.span_id(), span_id);
    assert!(received.parent_id().is_none());
}