1extern crate alloc;
7
8use alloc::string::String;
9use alloc::vec::Vec;
10
11use crate::types::*;
12
13pub struct FluxManager {
15 pending: Vec<FluxTransference>,
16 parity_state: alloc::collections::BTreeMap<RoomId, ParityState>,
17}
18
19#[derive(Debug, Clone)]
21pub struct ParityState {
22 pub even_count: u64,
23 pub odd_count: u64,
24 pub last_check: f64,
25 pub parity_ok: bool,
26}
27
28impl Default for FluxManager {
29 fn default() -> Self {
30 Self::new()
31 }
32}
33
34impl FluxManager {
35 pub fn new() -> Self {
36 Self {
37 pending: Vec::new(),
38 parity_state: alloc::collections::BTreeMap::new(),
39 }
40 }
41
42 pub fn create_flux(
44 &mut self,
45 source: RoomId,
46 target: RoomId,
47 timestamp: f64,
48 payload: TransferencePayload,
49 zeitgeist: Zeitgeist,
50 ) -> FluxTransference {
51 FluxTransference {
52 source,
53 target,
54 timestamp,
55 payload,
56 zeitgeist,
57 }
58 }
59
60 pub fn send_flux(&mut self, flux: FluxTransference) {
62 self.pending.push(flux);
64 }
65
66 pub fn recv_flux_for(&mut self, target: &RoomId) -> Vec<FluxTransference> {
68 let (for_target, remaining): (Vec<_>, Vec<_>) =
69 self.pending.drain(..).partition(|f| &f.target == target);
70 self.pending = remaining;
71 for_target
72 }
73
74 pub fn recv_flux(&mut self) -> Option<FluxTransference> {
76 if self.pending.is_empty() {
77 None
78 } else {
79 Some(self.pending.remove(0))
80 }
81 }
82
83 pub fn update_parity(&mut self, room: &RoomId, zeitgeist: &Zeitgeist, timestamp: f64) {
85 let parity = zeitgeist.temporal.beat % 2;
86 let state = self
87 .parity_state
88 .entry(room.clone())
89 .or_insert(ParityState {
90 even_count: 0,
91 odd_count: 0,
92 last_check: 0.0,
93 parity_ok: true,
94 });
95
96 if parity == 0 {
97 state.even_count += 1;
98 } else {
99 state.odd_count += 1;
100 }
101 state.last_check = timestamp;
102
103 state.parity_ok = state.even_count > 0 && state.odd_count > 0;
105 }
106
107 pub fn check_parity(&self, room: &RoomId) -> bool {
109 self.parity_state
110 .get(room)
111 .map(|s| s.parity_ok)
112 .unwrap_or(false)
113 }
114
115 pub fn encode_zeitgeist(zeitgeist: &Zeitgeist) -> Vec<u8> {
117 let mut bytes = Vec::new();
120
121 bytes.extend_from_slice(&zeitgeist.precision.center.to_le_bytes());
123 bytes.extend_from_slice(&zeitgeist.precision.width.to_le_bytes());
124 bytes.extend_from_slice(&zeitgeist.precision.samples.to_le_bytes());
125 bytes.push(zeitgeist.precision.converged as u8);
126
127 bytes.push(zeitgeist.confidence.num_hashes as u8);
129 for word in &zeitgeist.confidence.bits {
130 bytes.extend_from_slice(&word.to_le_bytes());
131 }
132
133 bytes.extend_from_slice(&zeitgeist.trajectory.value.to_le_bytes());
135 bytes.extend_from_slice(&zeitgeist.trajectory.confidence.to_le_bytes());
136
137 bytes.extend_from_slice(&zeitgeist.consensus.coherence.to_le_bytes());
139
140 bytes.extend_from_slice(&zeitgeist.temporal.beat.to_le_bytes());
142 bytes.extend_from_slice(&zeitgeist.temporal.tempo.to_le_bytes());
143
144 bytes
145 }
146
147 pub fn decode_zeitgeist(bytes: &[u8]) -> Result<Zeitgeist, String> {
149 if bytes.len() < 8 + 8 + 8 + 1 {
150 return Err("Insufficient bytes for zeitgeist".into());
151 }
152
153 let mut offset = 0;
154
155 let center = f64::from_le_bytes(
156 bytes[offset..offset + 8]
157 .try_into()
158 .map_err(|_| "parse error")?,
159 );
160 offset += 8;
161 let width = f64::from_le_bytes(
162 bytes[offset..offset + 8]
163 .try_into()
164 .map_err(|_| "parse error")?,
165 );
166 offset += 8;
167 let samples = u64::from_le_bytes(
168 bytes[offset..offset + 8]
169 .try_into()
170 .map_err(|_| "parse error")?,
171 );
172 offset += 8;
173 let converged = bytes[offset] != 0;
174 offset += 1;
175
176 let num_hashes = bytes[offset] as u32;
177 offset += 1;
178
179 let remaining_words = (bytes.len() - offset - 8 - 8 - 8 - 8 - 8) / 8;
180 let mut bits = Vec::new();
181 for _i in 0..remaining_words {
182 if offset + 8 <= bytes.len() {
183 bits.push(u64::from_le_bytes(
184 bytes[offset..offset + 8]
185 .try_into()
186 .map_err(|_| "parse error")?,
187 ));
188 offset += 8;
189 }
190 }
191
192 let trajectory_value = f64::from_le_bytes(
193 bytes[offset..offset + 8]
194 .try_into()
195 .map_err(|_| "parse error")?,
196 );
197 offset += 8;
198 let trajectory_confidence = f64::from_le_bytes(
199 bytes[offset..offset + 8]
200 .try_into()
201 .map_err(|_| "parse error")?,
202 );
203 offset += 8;
204
205 let coherence = f64::from_le_bytes(
206 bytes[offset..offset + 8]
207 .try_into()
208 .map_err(|_| "parse error")?,
209 );
210 offset += 8;
211
212 let beat = u64::from_le_bytes(
213 bytes[offset..offset + 8]
214 .try_into()
215 .map_err(|_| "parse error")?,
216 );
217 offset += 8;
218 let tempo = f64::from_le_bytes(
219 bytes[offset..offset + 8]
220 .try_into()
221 .map_err(|_| "parse error")?,
222 );
223
224 Ok(Zeitgeist {
225 precision: FunnelState {
226 center,
227 width,
228 samples,
229 converged,
230 },
231 confidence: BloomFilter {
232 bits,
233 num_hashes,
234 estimated_count: 0,
235 },
236 trajectory: HurstEstimate {
237 value: trajectory_value,
238 confidence: trajectory_confidence,
239 sample_count: 0,
240 },
241 consensus: HolonomyState {
242 cycle_count: 0,
243 coherence,
244 last_check: 0.0,
245 },
246 temporal: BeatPosition {
247 beat,
248 tempo,
249 phase: 0.0,
250 },
251 })
252 }
253
254 pub fn track_funnel(&self, zeitgeist: &Zeitgeist, observations: &[f64]) -> FunnelState {
256 if observations.is_empty() {
257 return zeitgeist.precision.clone();
258 }
259
260 let n = observations.len() as f64;
261 let mean = observations.iter().sum::<f64>() / n;
262 let variance = observations.iter().map(|x| (x - mean).powi(2)).sum::<f64>() / n;
263 let std_dev = variance.sqrt();
264 let width = 2.0 * std_dev; FunnelState {
267 center: mean,
268 width,
269 samples: observations.len() as u64,
270 converged: width < 0.01, }
272 }
273
274 pub fn pending_count(&self) -> usize {
276 self.pending.len()
277 }
278}
279
280#[cfg(test)]
281mod tests {
282 use super::*;
283
284 #[test]
285 fn test_flux_create_and_send() {
286 let mut mgr = FluxManager::new();
287 let flux = mgr.create_flux(
288 RoomId("r1".to_string()),
289 RoomId("r2".to_string()),
290 100.0,
291 TransferencePayload::Heartbeat,
292 Zeitgeist::new(),
293 );
294 mgr.send_flux(flux);
295 assert_eq!(mgr.pending_count(), 1);
296 }
297
298 #[test]
299 fn test_flux_recv_for_target() {
300 let mut mgr = FluxManager::new();
301
302 let flux1 = mgr.create_flux(
303 RoomId("r1".to_string()),
304 RoomId("r2".to_string()),
305 100.0,
306 TransferencePayload::Heartbeat,
307 Zeitgeist::new(),
308 );
309 let flux2 = mgr.create_flux(
310 RoomId("r1".to_string()),
311 RoomId("r3".to_string()),
312 101.0,
313 TransferencePayload::Heartbeat,
314 Zeitgeist::new(),
315 );
316 mgr.send_flux(flux1);
317 mgr.send_flux(flux2);
318
319 let for_r2 = mgr.recv_flux_for(&RoomId("r2".to_string()));
320 assert_eq!(for_r2.len(), 1);
321 assert_eq!(mgr.pending_count(), 1);
322 }
323
324 #[test]
325 fn test_zeitgeist_encode_decode() {
326 let zeitgeist = Zeitgeist::new();
327 let encoded = FluxManager::encode_zeitgeist(&zeitgeist);
328 let decoded = FluxManager::decode_zeitgeist(&encoded);
329 assert!(decoded.is_ok());
330 }
331
332 #[test]
333 fn test_parity_monitoring() {
334 let mut mgr = FluxManager::new();
335 let room = RoomId("r1".to_string());
336
337 assert!(!mgr.check_parity(&room));
339
340 let mut z1 = Zeitgeist::new();
342 z1.temporal.beat = 0; mgr.update_parity(&room, &z1, 100.0);
344 assert!(!mgr.check_parity(&room)); let mut z2 = Zeitgeist::new();
348 z2.temporal.beat = 1; mgr.update_parity(&room, &z2, 200.0);
350 assert!(mgr.check_parity(&room)); }
352
353 #[test]
354 fn test_funnel_tracking() {
355 let mgr = FluxManager::new();
356 let zeitgeist = Zeitgeist::new();
357
358 let obs: Vec<f64> = vec![1.001, 1.002, 0.999, 1.000, 1.001];
360 let funnel = mgr.track_funnel(&zeitgeist, &obs);
361 assert!(funnel.converged);
362 assert!(funnel.width < 0.01);
363
364 let wide: Vec<f64> = vec![0.0, 1.0, 2.0, 3.0, 4.0];
366 let funnel2 = mgr.track_funnel(&zeitgeist, &wide);
367 assert!(!funnel2.converged);
368 }
369
370 #[test]
371 fn test_bloom_filter() {
372 let mut bf = BloomFilter::new(3, 256);
373 bf.insert(b"hello");
374 bf.insert(b"world");
375
376 assert!(bf.contains(b"hello"));
377 assert!(bf.contains(b"world"));
378 assert!(!bf.contains(b"missing"));
379 }
380
381 #[test]
382 fn test_bloom_filter_merge() {
383 let mut bf1 = BloomFilter::new(3, 256);
384 bf1.insert(b"hello");
385
386 let mut bf2 = BloomFilter::new(3, 256);
387 bf2.insert(b"world");
388
389 bf1.merge(&bf2);
390 assert!(bf1.contains(b"hello"));
391 assert!(bf1.contains(b"world"));
392 }
393}