plexor_core/base_ganglion/
mod.rs1use crate::codec::{Codec, CodecName};
8use crate::erasure::synapse::{SynapseExternalErased, SynapseInternalErased};
9use crate::neuron::Neuron;
10use crate::synapse::BackpressureConfig;
11use std::collections::{HashMap, HashSet};
12use std::sync::{Arc, RwLock};
13use uuid::Uuid;
14
15#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
16pub enum ConnectionState {
17 Disconnected,
19 Connecting,
21 Handshaking,
23 Established,
25 Ready,
27 Connected,
29 Disconnecting,
30}
31
32pub struct BaseGanglionInner<S: ?Sized> {
33 pub id: Uuid,
34 pub synapses_by_name: Arc<RwLock<HashMap<String, Arc<RwLock<S>>>>>,
35 pub state: RwLock<ConnectionState>,
36 pub relevant_neurons: RwLock<HashSet<String>>,
37 pub ignored_neurons: RwLock<HashSet<String>>,
38 pub relevant_codecs: RwLock<HashSet<String>>,
39 pub ignored_codecs: RwLock<HashSet<String>>,
40 pub transmit_backpressure: BackpressureConfig,
41 pub transduce_backpressure: BackpressureConfig,
42}
43
44#[derive(Clone)]
46pub struct BaseGanglionExternal {
47 pub inner: Arc<BaseGanglionInner<dyn SynapseExternalErased + Send + Sync + 'static>>,
48}
49
50#[derive(Clone)]
52pub struct BaseGanglionInternal {
53 pub inner: Arc<BaseGanglionInner<dyn SynapseInternalErased + Send + Sync + 'static>>,
54}
55
56impl BaseGanglionExternal {
57 pub fn new() -> Self {
58 Self {
59 inner: Arc::new(BaseGanglionInner {
60 id: Uuid::now_v7(),
61 synapses_by_name: Arc::new(RwLock::new(HashMap::new())),
62 state: RwLock::new(ConnectionState::Disconnected),
63 relevant_neurons: RwLock::new(HashSet::new()),
64 ignored_neurons: RwLock::new(HashSet::new()),
65 relevant_codecs: RwLock::new(HashSet::new()),
66 ignored_codecs: RwLock::new(HashSet::new()),
67 transmit_backpressure: BackpressureConfig::default(),
68 transduce_backpressure: BackpressureConfig::default(),
69 }),
70 }
71 }
72
73 pub fn with_backpressure(transmit: BackpressureConfig, transduce: BackpressureConfig) -> Self {
74 Self {
75 inner: Arc::new(BaseGanglionInner {
76 id: Uuid::now_v7(),
77 synapses_by_name: Arc::new(RwLock::new(HashMap::new())),
78 state: RwLock::new(ConnectionState::Disconnected),
79 relevant_neurons: RwLock::new(HashSet::new()),
80 ignored_neurons: RwLock::new(HashSet::new()),
81 relevant_codecs: RwLock::new(HashSet::new()),
82 ignored_codecs: RwLock::new(HashSet::new()),
83 transmit_backpressure: transmit,
84 transduce_backpressure: transduce,
85 }),
86 }
87 }
88
89 pub fn capable<T, C>(&self, neuron: &Arc<dyn Neuron<T, C> + Send + Sync>) -> bool
90 where
91 C: Codec<T> + CodecName + Send + Sync + 'static,
92 T: Send + Sync + 'static,
93 {
94 check_capable(&self.inner, neuron)
95 }
96
97 pub fn get_synapse(
98 &self,
99 name: &str,
100 ) -> Option<Arc<RwLock<dyn SynapseExternalErased + Send + Sync + 'static>>> {
101 let map = self.inner.synapses_by_name.read().unwrap();
102 map.get(name).cloned()
103 }
104
105 pub fn insert_synapse(
106 &self,
107 name: String,
108 synapse: Arc<RwLock<dyn SynapseExternalErased + Send + Sync + 'static>>,
109 ) {
110 let mut map = self.inner.synapses_by_name.write().unwrap();
111 map.insert(name, synapse);
112 }
113
114 pub fn id(&self) -> Uuid {
115 self.inner.id
116 }
117
118 pub fn set_state(&self, state: ConnectionState) {
119 let mut guard = self.inner.state.write().unwrap();
120 *guard = state;
121 }
122
123 pub fn state(&self) -> ConnectionState {
124 *self.inner.state.read().unwrap()
125 }
126}
127
128impl Default for BaseGanglionExternal {
129 fn default() -> Self {
130 Self::new()
131 }
132}
133
134impl BaseGanglionInternal {
135 pub fn new() -> Self {
136 Self {
137 inner: Arc::new(BaseGanglionInner {
138 id: Uuid::now_v7(),
139 synapses_by_name: Arc::new(RwLock::new(HashMap::new())),
140 state: RwLock::new(ConnectionState::Disconnected),
141 relevant_neurons: RwLock::new(HashSet::new()),
142 ignored_neurons: RwLock::new(HashSet::new()),
143 relevant_codecs: RwLock::new(HashSet::new()),
144 ignored_codecs: RwLock::new(HashSet::new()),
145 transmit_backpressure: BackpressureConfig::default(),
146 transduce_backpressure: BackpressureConfig::default(),
147 }),
148 }
149 }
150
151 pub fn capable<T, C>(&self, neuron: &Arc<dyn Neuron<T, C> + Send + Sync>) -> bool
152 where
153 C: Codec<T> + CodecName + Send + Sync + 'static,
154 T: Send + Sync + 'static,
155 {
156 check_capable(&self.inner, neuron)
157 }
158
159 pub fn get_synapse(
160 &self,
161 name: &str,
162 ) -> Option<Arc<RwLock<dyn SynapseInternalErased + Send + Sync + 'static>>> {
163 let map = self.inner.synapses_by_name.read().unwrap();
164 map.get(name).cloned()
165 }
166
167 pub fn insert_synapse(
168 &self,
169 name: String,
170 synapse: Arc<RwLock<dyn SynapseInternalErased + Send + Sync + 'static>>,
171 ) {
172 let mut map = self.inner.synapses_by_name.write().unwrap();
173 map.insert(name, synapse);
174 }
175
176 pub fn id(&self) -> Uuid {
177 self.inner.id
178 }
179}
180
181impl Default for BaseGanglionInternal {
182 fn default() -> Self {
183 Self::new()
184 }
185}
186
187fn check_capable<T, C, S: ?Sized>(
189 inner: &Arc<BaseGanglionInner<S>>,
190 neuron: &Arc<dyn Neuron<T, C> + Send + Sync>,
191) -> bool
192where
193 C: Codec<T> + CodecName + Send + Sync + 'static,
194 T: Send + Sync + 'static,
195{
196 let neuron_name = neuron.name();
197 let codec_name = C::name();
198
199 {
200 let relevant = inner.relevant_neurons.read().unwrap();
201 if !relevant.is_empty() && !relevant.contains(&neuron_name) {
202 return false;
203 }
204 }
205 {
206 let ignored = inner.ignored_neurons.read().unwrap();
207 if ignored.contains(&neuron_name) {
208 return false;
209 }
210 }
211 {
212 let relevant = inner.relevant_codecs.read().unwrap();
213 if !relevant.is_empty() && !relevant.contains(codec_name) {
214 return false;
215 }
216 }
217 {
218 let ignored = inner.ignored_codecs.read().unwrap();
219 if ignored.contains(codec_name) {
220 return false;
221 }
222 }
223
224 true
225}
226
227pub type BaseGanglion = BaseGanglionExternal;