plexor_core/base_ganglion/
mod.rs1use crate::backpressure::BackpressureConfig;
8use crate::codec::{Codec, CodecName};
9use crate::erasure::synapse::{SynapseExternalErased, SynapseInternalErased};
10use crate::neuron::Neuron;
11use std::collections::{HashMap, HashSet};
12use std::sync::Arc;
13use parking_lot::RwLock;
14use uuid::Uuid;
15
16#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
17pub enum ConnectionState {
18 Disconnected,
20 Connecting,
22 Handshaking,
24 Established,
26 Ready,
28 Connected,
30 Disconnecting,
31}
32
33pub struct BaseGanglionInner<S: ?Sized> {
34 pub id: Uuid,
35 pub synapses_by_name: Arc<RwLock<HashMap<String, Arc<RwLock<S>>>>>,
36 pub state: RwLock<ConnectionState>,
37 pub relevant_neurons: RwLock<HashSet<String>>,
38 pub ignored_neurons: RwLock<HashSet<String>>,
39 pub relevant_codecs: RwLock<HashSet<String>>,
40 pub ignored_codecs: RwLock<HashSet<String>>,
41 pub transmit_backpressure: BackpressureConfig,
42 pub transduce_backpressure: BackpressureConfig,
43}
44
45#[derive(Clone)]
47pub struct BaseGanglionExternal {
48 pub inner: Arc<BaseGanglionInner<dyn SynapseExternalErased + Send + Sync + 'static>>,
49}
50
51#[derive(Clone)]
53pub struct BaseGanglionInternal {
54 pub inner: Arc<BaseGanglionInner<dyn SynapseInternalErased + Send + Sync + 'static>>,
55}
56
57impl BaseGanglionExternal {
58 pub fn new() -> Self {
59 Self {
60 inner: Arc::new(BaseGanglionInner {
61 id: Uuid::now_v7(),
62 synapses_by_name: Arc::new(RwLock::new(HashMap::new())),
63 state: RwLock::new(ConnectionState::Disconnected),
64 relevant_neurons: RwLock::new(HashSet::new()),
65 ignored_neurons: RwLock::new(HashSet::new()),
66 relevant_codecs: RwLock::new(HashSet::new()),
67 ignored_codecs: RwLock::new(HashSet::new()),
68 transmit_backpressure: BackpressureConfig::default(),
69 transduce_backpressure: BackpressureConfig::default(),
70 }),
71 }
72 }
73
74 pub fn with_backpressure(transmit: BackpressureConfig, transduce: BackpressureConfig) -> Self {
75 Self {
76 inner: Arc::new(BaseGanglionInner {
77 id: Uuid::now_v7(),
78 synapses_by_name: Arc::new(RwLock::new(HashMap::new())),
79 state: RwLock::new(ConnectionState::Disconnected),
80 relevant_neurons: RwLock::new(HashSet::new()),
81 ignored_neurons: RwLock::new(HashSet::new()),
82 relevant_codecs: RwLock::new(HashSet::new()),
83 ignored_codecs: RwLock::new(HashSet::new()),
84 transmit_backpressure: transmit,
85 transduce_backpressure: transduce,
86 }),
87 }
88 }
89
90 pub fn capable<T, C>(&self, neuron: &Arc<dyn Neuron<T, C> + Send + Sync>) -> bool
91 where
92 C: Codec<T> + CodecName + Send + Sync + 'static,
93 T: Send + Sync + 'static,
94 {
95 check_capable(&self.inner, neuron)
96 }
97
98 pub fn get_synapse(
99 &self,
100 name: &str,
101 ) -> Option<Arc<RwLock<dyn SynapseExternalErased + Send + Sync + 'static>>> {
102 let map = self.inner.synapses_by_name.read();
103 map.get(name).cloned()
104 }
105
106 pub fn insert_synapse(
107 &self,
108 name: String,
109 synapse: Arc<RwLock<dyn SynapseExternalErased + Send + Sync + 'static>>,
110 ) {
111 let mut map = self.inner.synapses_by_name.write();
112 map.insert(name, synapse);
113 }
114
115 pub fn id(&self) -> Uuid {
116 self.inner.id
117 }
118
119 pub fn set_state(&self, state: ConnectionState) {
120 let mut guard = self.inner.state.write();
121 *guard = state;
122 }
123
124 pub fn state(&self) -> ConnectionState {
125 *self.inner.state.read()
126 }
127}
128
129impl Default for BaseGanglionExternal {
130 fn default() -> Self {
131 Self::new()
132 }
133}
134
135impl BaseGanglionInternal {
136 pub fn new() -> Self {
137 Self {
138 inner: Arc::new(BaseGanglionInner {
139 id: Uuid::now_v7(),
140 synapses_by_name: Arc::new(RwLock::new(HashMap::new())),
141 state: RwLock::new(ConnectionState::Disconnected),
142 relevant_neurons: RwLock::new(HashSet::new()),
143 ignored_neurons: RwLock::new(HashSet::new()),
144 relevant_codecs: RwLock::new(HashSet::new()),
145 ignored_codecs: RwLock::new(HashSet::new()),
146 transmit_backpressure: BackpressureConfig::default(),
147 transduce_backpressure: BackpressureConfig::default(),
148 }),
149 }
150 }
151
152 pub fn capable<T, C>(&self, neuron: &Arc<dyn Neuron<T, C> + Send + Sync>) -> bool
153 where
154 C: Codec<T> + CodecName + Send + Sync + 'static,
155 T: Send + Sync + 'static,
156 {
157 check_capable(&self.inner, neuron)
158 }
159
160 pub fn get_synapse(
161 &self,
162 name: &str,
163 ) -> Option<Arc<RwLock<dyn SynapseInternalErased + Send + Sync + 'static>>> {
164 let map = self.inner.synapses_by_name.read();
165 map.get(name).cloned()
166 }
167
168 pub fn insert_synapse(
169 &self,
170 name: String,
171 synapse: Arc<RwLock<dyn SynapseInternalErased + Send + Sync + 'static>>,
172 ) {
173 let mut map = self.inner.synapses_by_name.write();
174 map.insert(name, synapse);
175 }
176
177 pub fn id(&self) -> Uuid {
178 self.inner.id
179 }
180}
181
182impl Default for BaseGanglionInternal {
183 fn default() -> Self {
184 Self::new()
185 }
186}
187
188fn check_capable<T, C, S: ?Sized>(
190 inner: &Arc<BaseGanglionInner<S>>,
191 neuron: &Arc<dyn Neuron<T, C> + Send + Sync>,
192) -> bool
193where
194 C: Codec<T> + CodecName + Send + Sync + 'static,
195 T: Send + Sync + 'static,
196{
197 let neuron_name = neuron.name();
198 let codec_name = C::name();
199
200 {
201 let relevant = inner.relevant_neurons.read();
202 if !relevant.is_empty() && !relevant.contains(&neuron_name) {
203 return false;
204 }
205 }
206 {
207 let ignored = inner.ignored_neurons.read();
208 if ignored.contains(&neuron_name) {
209 return false;
210 }
211 }
212 {
213 let relevant = inner.relevant_codecs.read();
214 if !relevant.is_empty() && !relevant.contains(codec_name) {
215 return false;
216 }
217 }
218 {
219 let ignored = inner.ignored_codecs.read();
220 if ignored.contains(codec_name) {
221 return false;
222 }
223 }
224
225 true
226}
227
228pub type BaseGanglion = BaseGanglionExternal;