Skip to main content

plexor_core/base_ganglion/
mod.rs

1// Copyright 2025 Alecks Gates
2//
3// This Source Code Form is subject to the terms of the Mozilla Public
4// License, v. 2.0. If a copy of the MPL was not distributed with this
5// file, You can obtain one at http://mozilla.org/MPL/2.0/.
6
7use crate::codec::{Codec, CodecName};
8use crate::erasure::synapse::{SynapseExternalErased, SynapseInternalErased};
9use crate::neuron::Neuron;
10use crate::synapse::BackpressureConfig;
11use std::collections::{HashMap, HashSet};
12use std::sync::{Arc, RwLock};
13use uuid::Uuid;
14
15#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
16pub enum ConnectionState {
17    /// Transport is offline or disconnected.
18    Disconnected,
19    /// Transport is connected, but protocol compatibility is not yet verified.
20    Connecting,
21    /// Transport is connected. Node has sent its handshake and is waiting for the peer's.
22    Handshaking,
23    /// Handshakes have been exchanged and versions matched. The control channel is live.
24    Established,
25    /// Handshakes and initial Discovery Announcements have been exchanged. Ready for app data.
26    Ready,
27    /// Transport is connected, but protocol compatibility is not yet verified.
28    Connected,
29    Disconnecting,
30}
31
32pub struct BaseGanglionInner<S: ?Sized> {
33    pub id: Uuid,
34    pub synapses_by_name: Arc<RwLock<HashMap<String, Arc<RwLock<S>>>>>,
35    pub state: RwLock<ConnectionState>,
36    pub relevant_neurons: RwLock<HashSet<String>>,
37    pub ignored_neurons: RwLock<HashSet<String>>,
38    pub relevant_codecs: RwLock<HashSet<String>>,
39    pub ignored_codecs: RwLock<HashSet<String>>,
40    pub transmit_backpressure: BackpressureConfig,
41    pub transduce_backpressure: BackpressureConfig,
42}
43
44/// Base functionality for External Ganglia (Raw Bytes/Codecs).
45#[derive(Clone)]
46pub struct BaseGanglionExternal {
47    pub inner: Arc<BaseGanglionInner<dyn SynapseExternalErased + Send + Sync + 'static>>,
48}
49
50/// Base functionality for Internal Ganglia (Typed Objects).
51#[derive(Clone)]
52pub struct BaseGanglionInternal {
53    pub inner: Arc<BaseGanglionInner<dyn SynapseInternalErased + Send + Sync + 'static>>,
54}
55
56impl BaseGanglionExternal {
57    pub fn new() -> Self {
58        Self {
59            inner: Arc::new(BaseGanglionInner {
60                id: Uuid::now_v7(),
61                synapses_by_name: Arc::new(RwLock::new(HashMap::new())),
62                state: RwLock::new(ConnectionState::Disconnected),
63                relevant_neurons: RwLock::new(HashSet::new()),
64                ignored_neurons: RwLock::new(HashSet::new()),
65                relevant_codecs: RwLock::new(HashSet::new()),
66                ignored_codecs: RwLock::new(HashSet::new()),
67                transmit_backpressure: BackpressureConfig::default(),
68                transduce_backpressure: BackpressureConfig::default(),
69            }),
70        }
71    }
72
73    pub fn with_backpressure(transmit: BackpressureConfig, transduce: BackpressureConfig) -> Self {
74        Self {
75            inner: Arc::new(BaseGanglionInner {
76                id: Uuid::now_v7(),
77                synapses_by_name: Arc::new(RwLock::new(HashMap::new())),
78                state: RwLock::new(ConnectionState::Disconnected),
79                relevant_neurons: RwLock::new(HashSet::new()),
80                ignored_neurons: RwLock::new(HashSet::new()),
81                relevant_codecs: RwLock::new(HashSet::new()),
82                ignored_codecs: RwLock::new(HashSet::new()),
83                transmit_backpressure: transmit,
84                transduce_backpressure: transduce,
85            }),
86        }
87    }
88
89    pub fn capable<T, C>(&self, neuron: &Arc<dyn Neuron<T, C> + Send + Sync>) -> bool
90    where
91        C: Codec<T> + CodecName + Send + Sync + 'static,
92        T: Send + Sync + 'static,
93    {
94        check_capable(&self.inner, neuron)
95    }
96
97    pub fn get_synapse(
98        &self,
99        name: &str,
100    ) -> Option<Arc<RwLock<dyn SynapseExternalErased + Send + Sync + 'static>>> {
101        let map = self.inner.synapses_by_name.read().unwrap();
102        map.get(name).cloned()
103    }
104
105    pub fn insert_synapse(
106        &self,
107        name: String,
108        synapse: Arc<RwLock<dyn SynapseExternalErased + Send + Sync + 'static>>,
109    ) {
110        let mut map = self.inner.synapses_by_name.write().unwrap();
111        map.insert(name, synapse);
112    }
113
114    pub fn id(&self) -> Uuid {
115        self.inner.id
116    }
117
118    pub fn set_state(&self, state: ConnectionState) {
119        let mut guard = self.inner.state.write().unwrap();
120        *guard = state;
121    }
122
123    pub fn state(&self) -> ConnectionState {
124        *self.inner.state.read().unwrap()
125    }
126}
127
128impl Default for BaseGanglionExternal {
129    fn default() -> Self {
130        Self::new()
131    }
132}
133
134impl BaseGanglionInternal {
135    pub fn new() -> Self {
136        Self {
137            inner: Arc::new(BaseGanglionInner {
138                id: Uuid::now_v7(),
139                synapses_by_name: Arc::new(RwLock::new(HashMap::new())),
140                state: RwLock::new(ConnectionState::Disconnected),
141                relevant_neurons: RwLock::new(HashSet::new()),
142                ignored_neurons: RwLock::new(HashSet::new()),
143                relevant_codecs: RwLock::new(HashSet::new()),
144                ignored_codecs: RwLock::new(HashSet::new()),
145                transmit_backpressure: BackpressureConfig::default(),
146                transduce_backpressure: BackpressureConfig::default(),
147            }),
148        }
149    }
150
151    pub fn capable<T, C>(&self, neuron: &Arc<dyn Neuron<T, C> + Send + Sync>) -> bool
152    where
153        C: Codec<T> + CodecName + Send + Sync + 'static,
154        T: Send + Sync + 'static,
155    {
156        check_capable(&self.inner, neuron)
157    }
158
159    pub fn get_synapse(
160        &self,
161        name: &str,
162    ) -> Option<Arc<RwLock<dyn SynapseInternalErased + Send + Sync + 'static>>> {
163        let map = self.inner.synapses_by_name.read().unwrap();
164        map.get(name).cloned()
165    }
166
167    pub fn insert_synapse(
168        &self,
169        name: String,
170        synapse: Arc<RwLock<dyn SynapseInternalErased + Send + Sync + 'static>>,
171    ) {
172        let mut map = self.inner.synapses_by_name.write().unwrap();
173        map.insert(name, synapse);
174    }
175
176    pub fn id(&self) -> Uuid {
177        self.inner.id
178    }
179}
180
181impl Default for BaseGanglionInternal {
182    fn default() -> Self {
183        Self::new()
184    }
185}
186
187// Helper for shared capable logic
188fn check_capable<T, C, S: ?Sized>(
189    inner: &Arc<BaseGanglionInner<S>>,
190    neuron: &Arc<dyn Neuron<T, C> + Send + Sync>,
191) -> bool
192where
193    C: Codec<T> + CodecName + Send + Sync + 'static,
194    T: Send + Sync + 'static,
195{
196    let neuron_name = neuron.name();
197    let codec_name = C::name();
198
199    {
200        let relevant = inner.relevant_neurons.read().unwrap();
201        if !relevant.is_empty() && !relevant.contains(&neuron_name) {
202            return false;
203        }
204    }
205    {
206        let ignored = inner.ignored_neurons.read().unwrap();
207        if ignored.contains(&neuron_name) {
208            return false;
209        }
210    }
211    {
212        let relevant = inner.relevant_codecs.read().unwrap();
213        if !relevant.is_empty() && !relevant.contains(codec_name) {
214            return false;
215        }
216    }
217    {
218        let ignored = inner.ignored_codecs.read().unwrap();
219        if ignored.contains(codec_name) {
220            return false;
221        }
222    }
223
224    true
225}
226
227// Legacy alias to ease migration (deprecated)
228pub type BaseGanglion = BaseGanglionExternal;