Skip to main content

plexor_core/base_ganglion/
mod.rs

1// Copyright 2025 Alecks Gates
2//
3// This Source Code Form is subject to the terms of the Mozilla Public
4// License, v. 2.0. If a copy of the MPL was not distributed with this
5// file, You can obtain one at http://mozilla.org/MPL/2.0/.
6
7use crate::backpressure::BackpressureConfig;
8use crate::codec::{Codec, CodecName};
9use crate::erasure::synapse::{SynapseExternalErased, SynapseInternalErased};
10use crate::neuron::Neuron;
11use std::collections::{HashMap, HashSet};
12use std::sync::Arc;
13use parking_lot::RwLock;
14use uuid::Uuid;
15
16#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
17pub enum ConnectionState {
18    /// Transport is offline or disconnected.
19    Disconnected,
20    /// Transport is connected, but protocol compatibility is not yet verified.
21    Connecting,
22    /// Transport is connected. Node has sent its handshake and is waiting for the peer's.
23    Handshaking,
24    /// Handshakes have been exchanged and versions matched. The control channel is live.
25    Established,
26    /// Handshakes and initial Discovery Announcements have been exchanged. Ready for app data.
27    Ready,
28    /// Transport is connected, but protocol compatibility is not yet verified.
29    Connected,
30    Disconnecting,
31}
32
33pub struct BaseGanglionInner<S: ?Sized> {
34    pub id: Uuid,
35    pub synapses_by_name: Arc<RwLock<HashMap<String, Arc<RwLock<S>>>>>,
36    pub state: RwLock<ConnectionState>,
37    pub relevant_neurons: RwLock<HashSet<String>>,
38    pub ignored_neurons: RwLock<HashSet<String>>,
39    pub relevant_codecs: RwLock<HashSet<String>>,
40    pub ignored_codecs: RwLock<HashSet<String>>,
41    pub transmit_backpressure: BackpressureConfig,
42    pub transduce_backpressure: BackpressureConfig,
43}
44
45/// Base functionality for External Ganglia (Raw Bytes/Codecs).
46#[derive(Clone)]
47pub struct BaseGanglionExternal {
48    pub inner: Arc<BaseGanglionInner<dyn SynapseExternalErased + Send + Sync + 'static>>,
49}
50
51/// Base functionality for Internal Ganglia (Typed Objects).
52#[derive(Clone)]
53pub struct BaseGanglionInternal {
54    pub inner: Arc<BaseGanglionInner<dyn SynapseInternalErased + Send + Sync + 'static>>,
55}
56
57impl BaseGanglionExternal {
58    pub fn new() -> Self {
59        Self {
60            inner: Arc::new(BaseGanglionInner {
61                id: Uuid::now_v7(),
62                synapses_by_name: Arc::new(RwLock::new(HashMap::new())),
63                state: RwLock::new(ConnectionState::Disconnected),
64                relevant_neurons: RwLock::new(HashSet::new()),
65                ignored_neurons: RwLock::new(HashSet::new()),
66                relevant_codecs: RwLock::new(HashSet::new()),
67                ignored_codecs: RwLock::new(HashSet::new()),
68                transmit_backpressure: BackpressureConfig::default(),
69                transduce_backpressure: BackpressureConfig::default(),
70            }),
71        }
72    }
73
74    pub fn with_backpressure(transmit: BackpressureConfig, transduce: BackpressureConfig) -> Self {
75        Self {
76            inner: Arc::new(BaseGanglionInner {
77                id: Uuid::now_v7(),
78                synapses_by_name: Arc::new(RwLock::new(HashMap::new())),
79                state: RwLock::new(ConnectionState::Disconnected),
80                relevant_neurons: RwLock::new(HashSet::new()),
81                ignored_neurons: RwLock::new(HashSet::new()),
82                relevant_codecs: RwLock::new(HashSet::new()),
83                ignored_codecs: RwLock::new(HashSet::new()),
84                transmit_backpressure: transmit,
85                transduce_backpressure: transduce,
86            }),
87        }
88    }
89
90    pub fn capable<T, C>(&self, neuron: &Arc<dyn Neuron<T, C> + Send + Sync>) -> bool
91    where
92        C: Codec<T> + CodecName + Send + Sync + 'static,
93        T: Send + Sync + 'static,
94    {
95        check_capable(&self.inner, neuron)
96    }
97
98    pub fn get_synapse(
99        &self,
100        name: &str,
101    ) -> Option<Arc<RwLock<dyn SynapseExternalErased + Send + Sync + 'static>>> {
102        let map = self.inner.synapses_by_name.read();
103        map.get(name).cloned()
104    }
105
106    pub fn insert_synapse(
107        &self,
108        name: String,
109        synapse: Arc<RwLock<dyn SynapseExternalErased + Send + Sync + 'static>>,
110    ) {
111        let mut map = self.inner.synapses_by_name.write();
112        map.insert(name, synapse);
113    }
114
115    pub fn id(&self) -> Uuid {
116        self.inner.id
117    }
118
119    pub fn set_state(&self, state: ConnectionState) {
120        let mut guard = self.inner.state.write();
121        *guard = state;
122    }
123
124    pub fn state(&self) -> ConnectionState {
125        *self.inner.state.read()
126    }
127}
128
129impl Default for BaseGanglionExternal {
130    fn default() -> Self {
131        Self::new()
132    }
133}
134
135impl BaseGanglionInternal {
136    pub fn new() -> Self {
137        Self {
138            inner: Arc::new(BaseGanglionInner {
139                id: Uuid::now_v7(),
140                synapses_by_name: Arc::new(RwLock::new(HashMap::new())),
141                state: RwLock::new(ConnectionState::Disconnected),
142                relevant_neurons: RwLock::new(HashSet::new()),
143                ignored_neurons: RwLock::new(HashSet::new()),
144                relevant_codecs: RwLock::new(HashSet::new()),
145                ignored_codecs: RwLock::new(HashSet::new()),
146                transmit_backpressure: BackpressureConfig::default(),
147                transduce_backpressure: BackpressureConfig::default(),
148            }),
149        }
150    }
151
152    pub fn capable<T, C>(&self, neuron: &Arc<dyn Neuron<T, C> + Send + Sync>) -> bool
153    where
154        C: Codec<T> + CodecName + Send + Sync + 'static,
155        T: Send + Sync + 'static,
156    {
157        check_capable(&self.inner, neuron)
158    }
159
160    pub fn get_synapse(
161        &self,
162        name: &str,
163    ) -> Option<Arc<RwLock<dyn SynapseInternalErased + Send + Sync + 'static>>> {
164        let map = self.inner.synapses_by_name.read();
165        map.get(name).cloned()
166    }
167
168    pub fn insert_synapse(
169        &self,
170        name: String,
171        synapse: Arc<RwLock<dyn SynapseInternalErased + Send + Sync + 'static>>,
172    ) {
173        let mut map = self.inner.synapses_by_name.write();
174        map.insert(name, synapse);
175    }
176
177    pub fn id(&self) -> Uuid {
178        self.inner.id
179    }
180}
181
182impl Default for BaseGanglionInternal {
183    fn default() -> Self {
184        Self::new()
185    }
186}
187
188// Helper for shared capable logic
189fn check_capable<T, C, S: ?Sized>(
190    inner: &Arc<BaseGanglionInner<S>>,
191    neuron: &Arc<dyn Neuron<T, C> + Send + Sync>,
192) -> bool
193where
194    C: Codec<T> + CodecName + Send + Sync + 'static,
195    T: Send + Sync + 'static,
196{
197    let neuron_name = neuron.name();
198    let codec_name = C::name();
199
200    {
201        let relevant = inner.relevant_neurons.read();
202        if !relevant.is_empty() && !relevant.contains(&neuron_name) {
203            return false;
204        }
205    }
206    {
207        let ignored = inner.ignored_neurons.read();
208        if ignored.contains(&neuron_name) {
209            return false;
210        }
211    }
212    {
213        let relevant = inner.relevant_codecs.read();
214        if !relevant.is_empty() && !relevant.contains(codec_name) {
215            return false;
216        }
217    }
218    {
219        let ignored = inner.ignored_codecs.read();
220        if ignored.contains(codec_name) {
221            return false;
222        }
223    }
224
225    true
226}
227
228// Legacy alias to ease migration (deprecated)
229pub type BaseGanglion = BaseGanglionExternal;