plexor-core 0.1.0-alpha.2

Core library for the rust implementation of the Plexo distributed system architecture, providing the fundamental Plexus, Neuron, Codec, and Axon abstractions.
Documentation
// Copyright 2025 Alecks Gates
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.

use crate::backpressure::BackpressureConfig;
use crate::codec::{Codec, CodecName};
use crate::erasure::synapse::{SynapseExternalErased, SynapseInternalErased};
use crate::neuron::Neuron;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use parking_lot::RwLock;
use uuid::Uuid;

#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub enum ConnectionState {
    /// Transport is offline or disconnected.
    Disconnected,
    /// Transport is connected, but protocol compatibility is not yet verified.
    Connecting,
    /// Transport is connected. Node has sent its handshake and is waiting for the peer's.
    Handshaking,
    /// Handshakes have been exchanged and versions matched. The control channel is live.
    Established,
    /// Handshakes and initial Discovery Announcements have been exchanged. Ready for app data.
    Ready,
    /// Transport is connected, but protocol compatibility is not yet verified.
    Connected,
    Disconnecting,
}

pub struct BaseGanglionInner<S: ?Sized> {
    pub id: Uuid,
    pub synapses_by_name: Arc<RwLock<HashMap<String, Arc<RwLock<S>>>>>,
    pub state: RwLock<ConnectionState>,
    pub relevant_neurons: RwLock<HashSet<String>>,
    pub ignored_neurons: RwLock<HashSet<String>>,
    pub relevant_codecs: RwLock<HashSet<String>>,
    pub ignored_codecs: RwLock<HashSet<String>>,
    pub transmit_backpressure: BackpressureConfig,
    pub transduce_backpressure: BackpressureConfig,
}

/// Base functionality for External Ganglia (Raw Bytes/Codecs).
#[derive(Clone)]
pub struct BaseGanglionExternal {
    pub inner: Arc<BaseGanglionInner<dyn SynapseExternalErased + Send + Sync + 'static>>,
}

/// Base functionality for Internal Ganglia (Typed Objects).
#[derive(Clone)]
pub struct BaseGanglionInternal {
    pub inner: Arc<BaseGanglionInner<dyn SynapseInternalErased + Send + Sync + 'static>>,
}

impl BaseGanglionExternal {
    pub fn new() -> Self {
        Self {
            inner: Arc::new(BaseGanglionInner {
                id: Uuid::now_v7(),
                synapses_by_name: Arc::new(RwLock::new(HashMap::new())),
                state: RwLock::new(ConnectionState::Disconnected),
                relevant_neurons: RwLock::new(HashSet::new()),
                ignored_neurons: RwLock::new(HashSet::new()),
                relevant_codecs: RwLock::new(HashSet::new()),
                ignored_codecs: RwLock::new(HashSet::new()),
                transmit_backpressure: BackpressureConfig::default(),
                transduce_backpressure: BackpressureConfig::default(),
            }),
        }
    }

    pub fn with_backpressure(transmit: BackpressureConfig, transduce: BackpressureConfig) -> Self {
        Self {
            inner: Arc::new(BaseGanglionInner {
                id: Uuid::now_v7(),
                synapses_by_name: Arc::new(RwLock::new(HashMap::new())),
                state: RwLock::new(ConnectionState::Disconnected),
                relevant_neurons: RwLock::new(HashSet::new()),
                ignored_neurons: RwLock::new(HashSet::new()),
                relevant_codecs: RwLock::new(HashSet::new()),
                ignored_codecs: RwLock::new(HashSet::new()),
                transmit_backpressure: transmit,
                transduce_backpressure: transduce,
            }),
        }
    }

    pub fn capable<T, C>(&self, neuron: &Arc<dyn Neuron<T, C> + Send + Sync>) -> bool
    where
        C: Codec<T> + CodecName + Send + Sync + 'static,
        T: Send + Sync + 'static,
    {
        check_capable(&self.inner, neuron)
    }

    pub fn get_synapse(
        &self,
        name: &str,
    ) -> Option<Arc<RwLock<dyn SynapseExternalErased + Send + Sync + 'static>>> {
        let map = self.inner.synapses_by_name.read();
        map.get(name).cloned()
    }

    pub fn insert_synapse(
        &self,
        name: String,
        synapse: Arc<RwLock<dyn SynapseExternalErased + Send + Sync + 'static>>,
    ) {
        let mut map = self.inner.synapses_by_name.write();
        map.insert(name, synapse);
    }

    pub fn id(&self) -> Uuid {
        self.inner.id
    }

    pub fn set_state(&self, state: ConnectionState) {
        let mut guard = self.inner.state.write();
        *guard = state;
    }

    pub fn state(&self) -> ConnectionState {
        *self.inner.state.read()
    }
}

impl Default for BaseGanglionExternal {
    fn default() -> Self {
        Self::new()
    }
}

impl BaseGanglionInternal {
    pub fn new() -> Self {
        Self {
            inner: Arc::new(BaseGanglionInner {
                id: Uuid::now_v7(),
                synapses_by_name: Arc::new(RwLock::new(HashMap::new())),
                state: RwLock::new(ConnectionState::Disconnected),
                relevant_neurons: RwLock::new(HashSet::new()),
                ignored_neurons: RwLock::new(HashSet::new()),
                relevant_codecs: RwLock::new(HashSet::new()),
                ignored_codecs: RwLock::new(HashSet::new()),
                transmit_backpressure: BackpressureConfig::default(),
                transduce_backpressure: BackpressureConfig::default(),
            }),
        }
    }

    pub fn capable<T, C>(&self, neuron: &Arc<dyn Neuron<T, C> + Send + Sync>) -> bool
    where
        C: Codec<T> + CodecName + Send + Sync + 'static,
        T: Send + Sync + 'static,
    {
        check_capable(&self.inner, neuron)
    }

    pub fn get_synapse(
        &self,
        name: &str,
    ) -> Option<Arc<RwLock<dyn SynapseInternalErased + Send + Sync + 'static>>> {
        let map = self.inner.synapses_by_name.read();
        map.get(name).cloned()
    }

    pub fn insert_synapse(
        &self,
        name: String,
        synapse: Arc<RwLock<dyn SynapseInternalErased + Send + Sync + 'static>>,
    ) {
        let mut map = self.inner.synapses_by_name.write();
        map.insert(name, synapse);
    }

    pub fn id(&self) -> Uuid {
        self.inner.id
    }
}

impl Default for BaseGanglionInternal {
    fn default() -> Self {
        Self::new()
    }
}

// Helper for shared capable logic
fn check_capable<T, C, S: ?Sized>(
    inner: &Arc<BaseGanglionInner<S>>,
    neuron: &Arc<dyn Neuron<T, C> + Send + Sync>,
) -> bool
where
    C: Codec<T> + CodecName + Send + Sync + 'static,
    T: Send + Sync + 'static,
{
    let neuron_name = neuron.name();
    let codec_name = C::name();

    {
        let relevant = inner.relevant_neurons.read();
        if !relevant.is_empty() && !relevant.contains(&neuron_name) {
            return false;
        }
    }
    {
        let ignored = inner.ignored_neurons.read();
        if ignored.contains(&neuron_name) {
            return false;
        }
    }
    {
        let relevant = inner.relevant_codecs.read();
        if !relevant.is_empty() && !relevant.contains(codec_name) {
            return false;
        }
    }
    {
        let ignored = inner.ignored_codecs.read();
        if ignored.contains(codec_name) {
            return false;
        }
    }

    true
}

// Legacy alias to ease migration (deprecated)
pub type BaseGanglion = BaseGanglionExternal;