plexor-core 0.1.0-alpha.2

Core library for the rust implementation of the Plexo distributed system architecture, providing the fundamental Plexus, Neuron, Codec, and Axon abstractions.
Documentation
// Copyright 2025 Alecks Gates
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.

use plexor_core::axon::{Axon, AxonImpl};
use plexor_core::erasure::reactant::{
    ErrorReactantErased, ReactantErased, erase_error_reactant, erase_reactant,
};
use plexor_core::ganglion::{Ganglion, GanglionInternal};
use plexor_core::namespace::NamespaceImpl;
use plexor_core::neuron::{Neuron, NeuronImpl};
use plexor_core::payload::Payload;
use plexor_core::plexus::Plexus;
use plexor_core::reactant::{ErrorReactant, Reactant, ReactantError};
use plexor_core::test_utils::{DebugCodec, DebugStruct};
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;

#[derive(Clone)]
struct FailingReactant;

impl Reactant<DebugStruct, DebugCodec> for FailingReactant {
    fn react(
        &self,
        _payload: Arc<Payload<DebugStruct, DebugCodec>>,
    ) -> Pin<Box<dyn Future<Output = Result<(), ReactantError>> + Send + 'static>> {
        Box::pin(async move { Err(ReactantError::Execution("Intentional Failure".to_string())) })
    }

    fn erase(self: Box<Self>) -> Arc<dyn ReactantErased + Send + Sync + 'static> {
        erase_reactant::<DebugStruct, DebugCodec, Self>(self)
    }
}

#[derive(Clone)]
struct PayloadCapturingErrorReactant {
    sender: tokio::sync::mpsc::Sender<(String, i32)>,
}

impl ErrorReactant<DebugStruct, DebugCodec> for PayloadCapturingErrorReactant {
    fn react_error(
        &self,
        error: Arc<ReactantError>,
        payload: Arc<Payload<DebugStruct, DebugCodec>>,
    ) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> {
        let sender = self.sender.clone();
        Box::pin(async move {
            let _ = sender.send((error.to_string(), payload.value.foo)).await;
        })
    }

    fn erase_error(self: Box<Self>) -> Arc<dyn ErrorReactantErased + Send + Sync + 'static> {
        erase_error_reactant::<DebugStruct, DebugCodec, Self>(self)
    }
}

#[tokio::test]
async fn test_error_reactant_receives_payload_context() {
    let ns = Arc::new(NamespaceImpl {
        delimiter: ".",
        parts: vec!["test", "error"],
    });
    // Use new_arc helper
    let neuron = NeuronImpl::<DebugStruct, DebugCodec>::new_arc(ns);

    // Use new_shared helper
    let plexus_arc = Plexus::new_shared(vec![], vec![]).await;

    // Adapt
    {
        let mut p = plexus_arc.lock().await;
        p.adapt(neuron.clone()).await.expect("adapt failed");
    }

    let (tx, mut rx) = tokio::sync::mpsc::channel(1);

    // Attach FailingReactant
    let failing = FailingReactant;
    // Attach ErrorHandler
    let error_handler = PayloadCapturingErrorReactant { sender: tx };

    {
        let mut p = plexus_arc.lock().await;
        p.react(
            neuron.name(),
            vec![failing.new_erased()],
            vec![error_handler.new_erased_error()],
        )
        .await
        .expect("react failed");
    }

    let mut axon = AxonImpl::new(neuron.clone(), plexus_arc.clone());

    // Transmit
    let msg = DebugStruct {
        foo: 42,
        bar: "test".to_string(),
    };
    axon.transmit(msg).await.expect("transmit failed");

    // Verify reception
    let result = tokio::time::timeout(std::time::Duration::from_secs(1), rx.recv()).await;

    match result {
        Ok(Some((err_msg, val))) => {
            assert!(err_msg.contains("Intentional Failure"));
            assert_eq!(val, 42);
        }
        Ok(None) => panic!("Channel closed unexpectedly"),
        Err(_) => panic!("Timeout waiting for error handler"),
    }
}