Skip to main content

plato_mud/
flux.rs

1//! PLATO MUD Engine — FLUX Transference
2//!
3//! Zeitgeist encoding/decoding, room-to-room transference, merge, deadband,
4//! parity monitoring.
5
6extern crate alloc;
7
8use alloc::string::String;
9use alloc::vec::Vec;
10
11use crate::types::*;
12
13/// FLUX transference manager
14pub struct FluxManager {
15    pending: Vec<FluxTransference>,
16    parity_state: alloc::collections::BTreeMap<RoomId, ParityState>,
17}
18
19/// Parity state for a room (CONSTRAINT 7)
20#[derive(Debug, Clone)]
21pub struct ParityState {
22    pub even_count: u64,
23    pub odd_count: u64,
24    pub last_check: f64,
25    pub parity_ok: bool,
26}
27
28impl Default for FluxManager {
29    fn default() -> Self {
30        Self::new()
31    }
32}
33
34impl FluxManager {
35    pub fn new() -> Self {
36        Self {
37            pending: Vec::new(),
38            parity_state: alloc::collections::BTreeMap::new(),
39        }
40    }
41
42    /// Create a FLUX transference from source to target
43    pub fn create_flux(
44        &mut self,
45        source: RoomId,
46        target: RoomId,
47        timestamp: f64,
48        payload: TransferencePayload,
49        zeitgeist: Zeitgeist,
50    ) -> FluxTransference {
51        FluxTransference {
52            source,
53            target,
54            timestamp,
55            payload,
56            zeitgeist,
57        }
58    }
59
60    /// Send a FLUX transference (queue it)
61    pub fn send_flux(&mut self, flux: FluxTransference) {
62        // CONSTRAINT 8: FLUX must carry full zeitgeist
63        self.pending.push(flux);
64    }
65
66    /// Receive pending FLUX transferences for a target room
67    pub fn recv_flux_for(&mut self, target: &RoomId) -> Vec<FluxTransference> {
68        let (for_target, remaining): (Vec<_>, Vec<_>) =
69            self.pending.drain(..).partition(|f| &f.target == target);
70        self.pending = remaining;
71        for_target
72    }
73
74    /// Receive any pending FLUX transference
75    pub fn recv_flux(&mut self) -> Option<FluxTransference> {
76        if self.pending.is_empty() {
77            None
78        } else {
79            Some(self.pending.remove(0))
80        }
81    }
82
83    /// Update parity state for a room (CONSTRAINT 7)
84    pub fn update_parity(&mut self, room: &RoomId, zeitgeist: &Zeitgeist, timestamp: f64) {
85        let parity = zeitgeist.temporal.beat % 2;
86        let state = self
87            .parity_state
88            .entry(room.clone())
89            .or_insert(ParityState {
90                even_count: 0,
91                odd_count: 0,
92                last_check: 0.0,
93                parity_ok: true,
94            });
95
96        if parity == 0 {
97            state.even_count += 1;
98        } else {
99            state.odd_count += 1;
100        }
101        state.last_check = timestamp;
102
103        // Parity is OK if both even and odd counts are present
104        state.parity_ok = state.even_count > 0 && state.odd_count > 0;
105    }
106
107    /// Check if a room has valid parity (CONSTRAINT 7)
108    pub fn check_parity(&self, room: &RoomId) -> bool {
109        self.parity_state
110            .get(room)
111            .map(|s| s.parity_ok)
112            .unwrap_or(false)
113    }
114
115    /// Encode zeitgeist to bytes (simplified CBOR-like format)
116    pub fn encode_zeitgeist(zeitgeist: &Zeitgeist) -> Vec<u8> {
117        // Simple encoding: we'll use a compact binary format
118        // In production, use serde_cbor or postcard
119        let mut bytes = Vec::new();
120
121        // Precision funnel
122        bytes.extend_from_slice(&zeitgeist.precision.center.to_le_bytes());
123        bytes.extend_from_slice(&zeitgeist.precision.width.to_le_bytes());
124        bytes.extend_from_slice(&zeitgeist.precision.samples.to_le_bytes());
125        bytes.push(zeitgeist.precision.converged as u8);
126
127        // Bloom filter
128        bytes.push(zeitgeist.confidence.num_hashes as u8);
129        for word in &zeitgeist.confidence.bits {
130            bytes.extend_from_slice(&word.to_le_bytes());
131        }
132
133        // Trajectory
134        bytes.extend_from_slice(&zeitgeist.trajectory.value.to_le_bytes());
135        bytes.extend_from_slice(&zeitgeist.trajectory.confidence.to_le_bytes());
136
137        // Consensus
138        bytes.extend_from_slice(&zeitgeist.consensus.coherence.to_le_bytes());
139
140        // Temporal
141        bytes.extend_from_slice(&zeitgeist.temporal.beat.to_le_bytes());
142        bytes.extend_from_slice(&zeitgeist.temporal.tempo.to_le_bytes());
143
144        bytes
145    }
146
147    /// Decode zeitgeist from bytes
148    pub fn decode_zeitgeist(bytes: &[u8]) -> Result<Zeitgeist, String> {
149        if bytes.len() < 8 + 8 + 8 + 1 {
150            return Err("Insufficient bytes for zeitgeist".into());
151        }
152
153        let mut offset = 0;
154
155        let center = f64::from_le_bytes(
156            bytes[offset..offset + 8]
157                .try_into()
158                .map_err(|_| "parse error")?,
159        );
160        offset += 8;
161        let width = f64::from_le_bytes(
162            bytes[offset..offset + 8]
163                .try_into()
164                .map_err(|_| "parse error")?,
165        );
166        offset += 8;
167        let samples = u64::from_le_bytes(
168            bytes[offset..offset + 8]
169                .try_into()
170                .map_err(|_| "parse error")?,
171        );
172        offset += 8;
173        let converged = bytes[offset] != 0;
174        offset += 1;
175
176        let num_hashes = bytes[offset] as u32;
177        offset += 1;
178
179        let remaining_words = (bytes.len() - offset - 8 - 8 - 8 - 8 - 8) / 8;
180        let mut bits = Vec::new();
181        for _i in 0..remaining_words {
182            if offset + 8 <= bytes.len() {
183                bits.push(u64::from_le_bytes(
184                    bytes[offset..offset + 8]
185                        .try_into()
186                        .map_err(|_| "parse error")?,
187                ));
188                offset += 8;
189            }
190        }
191
192        let trajectory_value = f64::from_le_bytes(
193            bytes[offset..offset + 8]
194                .try_into()
195                .map_err(|_| "parse error")?,
196        );
197        offset += 8;
198        let trajectory_confidence = f64::from_le_bytes(
199            bytes[offset..offset + 8]
200                .try_into()
201                .map_err(|_| "parse error")?,
202        );
203        offset += 8;
204
205        let coherence = f64::from_le_bytes(
206            bytes[offset..offset + 8]
207                .try_into()
208                .map_err(|_| "parse error")?,
209        );
210        offset += 8;
211
212        let beat = u64::from_le_bytes(
213            bytes[offset..offset + 8]
214                .try_into()
215                .map_err(|_| "parse error")?,
216        );
217        offset += 8;
218        let tempo = f64::from_le_bytes(
219            bytes[offset..offset + 8]
220                .try_into()
221                .map_err(|_| "parse error")?,
222        );
223
224        Ok(Zeitgeist {
225            precision: FunnelState {
226                center,
227                width,
228                samples,
229                converged,
230            },
231            confidence: BloomFilter {
232                bits,
233                num_hashes,
234                estimated_count: 0,
235            },
236            trajectory: HurstEstimate {
237                value: trajectory_value,
238                confidence: trajectory_confidence,
239                sample_count: 0,
240            },
241            consensus: HolonomyState {
242                cycle_count: 0,
243                coherence,
244                last_check: 0.0,
245            },
246            temporal: BeatPosition {
247                beat,
248                tempo,
249                phase: 0.0,
250            },
251        })
252    }
253
254    /// Track deadband funnel for precision convergence
255    pub fn track_funnel(&self, zeitgeist: &Zeitgeist, observations: &[f64]) -> FunnelState {
256        if observations.is_empty() {
257            return zeitgeist.precision.clone();
258        }
259
260        let n = observations.len() as f64;
261        let mean = observations.iter().sum::<f64>() / n;
262        let variance = observations.iter().map(|x| (x - mean).powi(2)).sum::<f64>() / n;
263        let std_dev = variance.sqrt();
264        let width = 2.0 * std_dev; // 1-sigma deadband
265
266        FunnelState {
267            center: mean,
268            width,
269            samples: observations.len() as u64,
270            converged: width < 0.01, // converged when deadband < 1%
271        }
272    }
273
274    /// Pending count
275    pub fn pending_count(&self) -> usize {
276        self.pending.len()
277    }
278}
279
280#[cfg(test)]
281mod tests {
282    use super::*;
283
284    #[test]
285    fn test_flux_create_and_send() {
286        let mut mgr = FluxManager::new();
287        let flux = mgr.create_flux(
288            RoomId("r1".to_string()),
289            RoomId("r2".to_string()),
290            100.0,
291            TransferencePayload::Heartbeat,
292            Zeitgeist::new(),
293        );
294        mgr.send_flux(flux);
295        assert_eq!(mgr.pending_count(), 1);
296    }
297
298    #[test]
299    fn test_flux_recv_for_target() {
300        let mut mgr = FluxManager::new();
301
302        let flux1 = mgr.create_flux(
303            RoomId("r1".to_string()),
304            RoomId("r2".to_string()),
305            100.0,
306            TransferencePayload::Heartbeat,
307            Zeitgeist::new(),
308        );
309        let flux2 = mgr.create_flux(
310            RoomId("r1".to_string()),
311            RoomId("r3".to_string()),
312            101.0,
313            TransferencePayload::Heartbeat,
314            Zeitgeist::new(),
315        );
316        mgr.send_flux(flux1);
317        mgr.send_flux(flux2);
318
319        let for_r2 = mgr.recv_flux_for(&RoomId("r2".to_string()));
320        assert_eq!(for_r2.len(), 1);
321        assert_eq!(mgr.pending_count(), 1);
322    }
323
324    #[test]
325    fn test_zeitgeist_encode_decode() {
326        let zeitgeist = Zeitgeist::new();
327        let encoded = FluxManager::encode_zeitgeist(&zeitgeist);
328        let decoded = FluxManager::decode_zeitgeist(&encoded);
329        assert!(decoded.is_ok());
330    }
331
332    #[test]
333    fn test_parity_monitoring() {
334        let mut mgr = FluxManager::new();
335        let room = RoomId("r1".to_string());
336
337        // Initially no parity
338        assert!(!mgr.check_parity(&room));
339
340        // Update with even beat
341        let mut z1 = Zeitgeist::new();
342        z1.temporal.beat = 0; // even
343        mgr.update_parity(&room, &z1, 100.0);
344        assert!(!mgr.check_parity(&room)); // Only even, no odd yet
345
346        // Update with odd beat
347        let mut z2 = Zeitgeist::new();
348        z2.temporal.beat = 1; // odd
349        mgr.update_parity(&room, &z2, 200.0);
350        assert!(mgr.check_parity(&room)); // Both even and odd present
351    }
352
353    #[test]
354    fn test_funnel_tracking() {
355        let mgr = FluxManager::new();
356        let zeitgeist = Zeitgeist::new();
357
358        // Tight observations → narrow funnel → converged
359        let obs: Vec<f64> = vec![1.001, 1.002, 0.999, 1.000, 1.001];
360        let funnel = mgr.track_funnel(&zeitgeist, &obs);
361        assert!(funnel.converged);
362        assert!(funnel.width < 0.01);
363
364        // Wide observations → wide funnel → not converged
365        let wide: Vec<f64> = vec![0.0, 1.0, 2.0, 3.0, 4.0];
366        let funnel2 = mgr.track_funnel(&zeitgeist, &wide);
367        assert!(!funnel2.converged);
368    }
369
370    #[test]
371    fn test_bloom_filter() {
372        let mut bf = BloomFilter::new(3, 256);
373        bf.insert(b"hello");
374        bf.insert(b"world");
375
376        assert!(bf.contains(b"hello"));
377        assert!(bf.contains(b"world"));
378        assert!(!bf.contains(b"missing"));
379    }
380
381    #[test]
382    fn test_bloom_filter_merge() {
383        let mut bf1 = BloomFilter::new(3, 256);
384        bf1.insert(b"hello");
385
386        let mut bf2 = BloomFilter::new(3, 256);
387        bf2.insert(b"world");
388
389        bf1.merge(&bf2);
390        assert!(bf1.contains(b"hello"));
391        assert!(bf1.contains(b"world"));
392    }
393}