plexor-core 0.1.0-alpha.2

Core library for the rust implementation of the Plexo distributed system architecture, providing the fundamental Plexus, Neuron, Codec, and Axon abstractions.
Documentation
// Copyright 2025 Alecks Gates
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.

use plexor_core::codec::{Codec, CodecError, CodecName};
use plexor_core::erasure::neuron::erase_neuron;
use plexor_core::erasure::payload::erase_payload;
use plexor_core::erasure::reactant::erase_reactant;
use plexor_core::ganglion::{Ganglion, GanglionInternal};
use plexor_core::namespace::NamespaceImpl;
use plexor_core::neuron::{Neuron, NeuronImpl};
use plexor_core::payload::Payload;
use plexor_core::plexus::Plexus;
use plexor_core::reactant::Reactant;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use tokio::sync::mpsc::{Sender, channel};

#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)]
struct AlphaMsg {
    value: i32,
}

#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)]
struct BetaMsg {
    text: String,
}

struct UniversalTestCodec;
impl CodecName for UniversalTestCodec {
    fn name() -> &'static str {
        "universal_test"
    }
}
impl Codec<AlphaMsg> for UniversalTestCodec {
    fn encode(_data: &AlphaMsg) -> Result<Vec<u8>, CodecError> {
        Ok(vec![])
    }
    fn decode(_data: &[u8]) -> Result<AlphaMsg, CodecError> {
        Ok(AlphaMsg { value: 123 })
    }
}
impl Codec<BetaMsg> for UniversalTestCodec {
    fn encode(_data: &BetaMsg) -> Result<Vec<u8>, CodecError> {
        Ok(vec![])
    }
    fn decode(_data: &[u8]) -> Result<BetaMsg, CodecError> {
        Ok(BetaMsg {
            text: "Hello Beta".to_string(),
        })
    }
}

#[derive(Clone)]
struct AlphaReactant {
    tx: Sender<i32>,
}

impl Reactant<AlphaMsg, UniversalTestCodec> for AlphaReactant {
    fn react(
        &self,
        payload: Arc<Payload<AlphaMsg, UniversalTestCodec>>,
    ) -> Pin<
        Box<dyn Future<Output = Result<(), plexor_core::reactant::ReactantError>> + Send + 'static>,
    > {
        let tx = self.tx.clone();
        let val = payload.value.value;
        Box::pin(async move {
            let _ = tx.try_send(val);
            Ok(())
        })
    }

    fn erase(
        self: Box<Self>,
    ) -> Arc<dyn plexor_core::erasure::reactant::ReactantErased + Send + Sync + 'static> {
        erase_reactant(self)
    }
}

#[derive(Clone)]
struct BetaReactant {
    tx: Sender<String>,
}

impl Reactant<BetaMsg, UniversalTestCodec> for BetaReactant {
    fn react(
        &self,
        payload: Arc<Payload<BetaMsg, UniversalTestCodec>>,
    ) -> Pin<
        Box<dyn Future<Output = Result<(), plexor_core::reactant::ReactantError>> + Send + 'static>,
    > {
        let tx = self.tx.clone();
        let val = payload.value.text.clone();
        Box::pin(async move {
            let _ = tx.try_send(val);
            Ok(())
        })
    }

    fn erase(
        self: Box<Self>,
    ) -> Arc<dyn plexor_core::erasure::reactant::ReactantErased + Send + Sync + 'static> {
        erase_reactant(self)
    }
}

#[tokio::test]
async fn test_universal_plexus_routing_multiple_types() {
    let ns = Arc::new(NamespaceImpl {
        delimiter: ".",
        parts: vec!["test"],
    });

    // Create two different neuron types
    let neuron_alpha = Arc::new(NeuronImpl::<AlphaMsg, UniversalTestCodec>::new(ns.clone()));
    let neuron_beta = Arc::new(NeuronImpl::<BetaMsg, UniversalTestCodec>::new(ns.clone()));

    // Create Plexus with both neurons
    let mut plexus = Plexus::new(
        vec![
            erase_neuron(neuron_alpha.clone()),
            erase_neuron(neuron_beta.clone()),
        ],
        vec![],
    )
    .await;

    // Adapt both
    plexus
        .adapt(neuron_alpha.clone())
        .await
        .expect("Adapt Alpha failed");
    plexus
        .adapt(neuron_beta.clone())
        .await
        .expect("Adapt Beta failed");

    // Setup channels and reactants
    let (tx_a, mut rx_alpha) = channel(10);
    let (tx_b, mut rx_beta) = channel(10);

    plexus
        .react(
            neuron_alpha.name(),
            vec![Box::new(AlphaReactant { tx: tx_a }).erase()],
            vec![],
        )
        .await
        .expect("React Alpha failed");
    plexus
        .react(
            neuron_beta.name(),
            vec![Box::new(BetaReactant { tx: tx_b }).erase()],
            vec![],
        )
        .await
        .expect("React Beta failed");

    // 1. Transmit Alpha
    let payload_alpha = Payload::new(AlphaMsg { value: 123 }, neuron_alpha.clone());

    plexus
        .transmit(erase_payload(payload_alpha))
        .await
        .expect("Transmit Alpha failed");

    // 2. Transmit Beta
    let payload_beta = Payload::new(
        BetaMsg {
            text: "Hello Beta".to_string(),
        },
        neuron_beta.clone(),
    );

    plexus
        .transmit(erase_payload(payload_beta))
        .await
        .expect("Transmit Beta failed");

    // Verify Alpha received
    let val_a = rx_alpha.recv().await.expect("No Alpha message");
    assert_eq!(val_a, 123);

    // Verify Beta received
    let val_b = rx_beta.recv().await.expect("No Beta message");
    assert_eq!(val_b, "Hello Beta");

    // Ensure no cross-talk (channels should be empty now)
    assert!(rx_alpha.try_recv().is_err());
    assert!(rx_beta.try_recv().is_err());
}