Skip to main content

plato_relay_tidepool/
lib.rs

1//! plato-relay-tidepool — TidePoolLayer adapter for plato-relay
2//!
3//! Bridges plato-relay's trust-weighted BFS routing to the Ship Interconnection
4//! Protocol's TidePoolLayer trait. Messages are enqueued, routed through trust
5//! weighting, and dequeued for delivery.
6//!
7//! Sprint 3 Task S3-3: implement TidePoolLayer for plato-relay.
8
9use std::collections::{HashMap, VecDeque};
10
11// ── TidePool Trait ───────────────────────────────────────
12
13/// Tide pool layer: async message routing with buffering.
14/// Matches plato-ship-protocol::TidePoolLayer exactly.
15pub trait TidePoolLayer {
16    fn enqueue(&mut self, msg: &[u8]) -> bool;
17    fn dequeue(&mut self) -> Option<Vec<u8>>;
18    fn buffer_len(&self) -> usize;
19}
20
21// ── Queued Message ───────────────────────────────────────
22
23#[derive(Debug, Clone)]
24pub struct QueuedMessage {
25    pub payload: Vec<u8>,
26    pub priority: f32,       // 0.0-1.0, higher = delivered first
27    pub sender_trust: f32,   // sender's trust level
28    pub enqueued_at: u64,    // nanosecond timestamp
29    pub hops: u32,
30}
31
32impl QueuedMessage {
33    pub fn new(payload: &[u8]) -> Self {
34        Self {
35            payload: payload.to_vec(),
36            priority: 0.5,
37            sender_trust: 0.5,
38            enqueued_at: nanos_now(),
39            hops: 0,
40        }
41    }
42
43    pub fn with_priority(mut self, p: f32) -> Self {
44        self.priority = p.max(0.0).min(1.0);
45        self
46    }
47
48    pub fn with_trust(mut self, t: f32) -> Self {
49        self.sender_trust = t.max(0.0).min(1.0);
50        self
51    }
52
53    /// Effective priority: base priority weighted by sender trust
54    pub fn effective_priority(&self) -> f32 {
55        self.priority * (0.3 + 0.7 * self.sender_trust)
56    }
57}
58
59// ── TidePool Adapter ────────────────────────────────────
60
61/// Adapts trust-weighted message routing into a TidePoolLayer.
62/// Messages are buffered, scored by trust, and dequeued in priority order.
63#[derive(Debug, Clone)]
64pub struct TidePoolAdapter {
65    buffer: VecDeque<QueuedMessage>,
66    capacity: usize,
67    total_enqueued: u64,
68    total_dequeued: u64,
69    total_dropped: u64,
70}
71
72impl TidePoolAdapter {
73    pub fn new() -> Self {
74        Self { buffer: VecDeque::new(), capacity: 1024, total_enqueued: 0, total_dequeued: 0, total_dropped: 0 }
75    }
76
77    pub fn with_capacity(cap: usize) -> Self {
78        Self { buffer: VecDeque::new(), capacity: cap, total_enqueued: 0, total_dequeued: 0, total_dropped: 0 }
79    }
80
81    /// Enqueue with trust and priority metadata
82    pub fn enqueue_trusted(&mut self, payload: &[u8], trust: f32, priority: f32) -> bool {
83        if self.buffer.len() >= self.capacity {
84            self.total_dropped += 1;
85            return false;
86        }
87        let msg = QueuedMessage::new(payload)
88            .with_trust(trust)
89            .with_priority(priority);
90        self.buffer.push_back(msg);
91        self.total_enqueued += 1;
92        true
93    }
94
95    /// Dequeue highest-priority message (trust-weighted)
96    pub fn dequeue_priority(&mut self) -> Option<QueuedMessage> {
97        if self.buffer.is_empty() { return None; }
98
99        // Find highest effective priority
100        let best_idx = (0..self.buffer.len())
101            .max_by(|a, b| {
102                self.buffer[*a].effective_priority()
103                    .partial_cmp(&self.buffer[*b].effective_priority())
104                    .unwrap_or(std::cmp::Ordering::Equal)
105            })?;
106
107        let msg = self.buffer.remove(best_idx)?;
108        self.total_dequeued += 1;
109        Some(msg)
110    }
111
112    /// Stats
113    pub fn total_enqueued(&self) -> u64 { self.total_enqueued }
114    pub fn total_dequeued(&self) -> u64 { self.total_dequeued }
115    pub fn total_dropped(&self) -> u64 { self.total_dropped }
116
117    /// Average trust of buffered messages
118    pub fn avg_buffer_trust(&self) -> f32 {
119        if self.buffer.is_empty() { return 0.0; }
120        let sum: f32 = self.buffer.iter().map(|m| m.sender_trust).sum();
121        sum / self.buffer.len() as f32
122    }
123
124    /// Drain all messages (for batch processing)
125    pub fn drain(&mut self) -> Vec<QueuedMessage> {
126        let msgs: Vec<_> = self.buffer.drain(..).collect();
127        self.total_dequeued += msgs.len() as u64;
128        msgs
129    }
130}
131
132impl Default for TidePoolAdapter {
133    fn default() -> Self { Self::new() }
134}
135
136impl TidePoolLayer for TidePoolAdapter {
137    fn enqueue(&mut self, msg: &[u8]) -> bool {
138        if self.buffer.len() >= self.capacity {
139            self.total_dropped += 1;
140            return false;
141        }
142        self.buffer.push_back(QueuedMessage::new(msg));
143        self.total_enqueued += 1;
144        true
145    }
146
147    fn dequeue(&mut self) -> Option<Vec<u8>> {
148        let msg = self.buffer.pop_front()?;
149        self.total_dequeued += 1;
150        Some(msg.payload)
151    }
152
153    fn buffer_len(&self) -> usize {
154        self.buffer.len()
155    }
156}
157
158// ── Trust-Weighted Router ───────────────────────────────
159
160/// Routes messages between named agents with trust scoring.
161/// Integrates TidePoolLayer with trust-based priority.
162#[derive(Debug, Clone)]
163pub struct TrustRouter {
164    pools: HashMap<String, TidePoolAdapter>,
165    trust: HashMap<String, f32>, // agent_id → trust score
166    default_trust: f32,
167}
168
169impl TrustRouter {
170    pub fn new() -> Self {
171        Self { pools: HashMap::new(), trust: HashMap::new(), default_trust: 0.5 }
172    }
173
174    pub fn with_default_trust(t: f32) -> Self {
175        Self { pools: HashMap::new(), trust: HashMap::new(), default_trust: t }
176    }
177
178    /// Register an agent with a pool
179    pub fn register_agent(&mut self, agent_id: &str, capacity: usize) {
180        self.pools.insert(agent_id.to_string(), TidePoolAdapter::with_capacity(capacity));
181    }
182
183    /// Set trust level for an agent
184    pub fn set_trust(&mut self, agent_id: &str, trust: f32) {
185        self.trust.insert(agent_id.to_string(), trust.max(0.0).min(1.0));
186    }
187
188    /// Get trust for an agent
189    pub fn get_trust(&self, agent_id: &str) -> f32 {
190        *self.trust.get(agent_id).unwrap_or(&self.default_trust)
191    }
192
193    /// Route a message to an agent's pool with trust-weighted priority
194    pub fn route(&mut self, to_agent: &str, payload: &[u8], from_trust: f32) -> bool {
195        if let Some(pool) = self.pools.get_mut(to_agent) {
196            pool.enqueue_trusted(payload, from_trust, 0.5)
197        } else {
198            false
199        }
200    }
201
202    /// Get next message for an agent (highest trust-weighted priority)
203    pub fn next_for(&mut self, agent_id: &str) -> Option<QueuedMessage> {
204        self.pools.get_mut(agent_id)?.dequeue_priority()
205    }
206
207    /// Buffer size for an agent
208    pub fn buffer_size(&self, agent_id: &str) -> usize {
209        self.pools.get(agent_id).map(|p| p.buffer_len()).unwrap_or(0)
210    }
211
212    /// Total messages across all pools
213    pub fn total_buffered(&self) -> usize {
214        self.pools.values().map(|p| p.buffer_len()).sum()
215    }
216
217    /// Agent count
218    pub fn agent_count(&self) -> usize { self.pools.len() }
219}
220
221impl Default for TrustRouter {
222    fn default() -> Self { Self::new() }
223}
224
225fn nanos_now() -> u64 {
226    use std::time::{SystemTime, UNIX_EPOCH};
227    SystemTime::now().duration_since(UNIX_EPOCH).map(|d| d.as_nanos() as u64).unwrap_or(0)
228}
229
230// ── Tests ────────────────────────────────────────────────
231
232#[cfg(test)]
233mod tests {
234    use super::*;
235
236    #[test]
237    fn test_enqueue_dequeue() {
238        let mut pool = TidePoolAdapter::new();
239        assert!(pool.enqueue(b"hello"));
240        assert!(pool.enqueue(b"world"));
241        assert_eq!(pool.buffer_len(), 2);
242
243        let msg = pool.dequeue().unwrap();
244        assert_eq!(msg, b"hello");
245        assert_eq!(pool.buffer_len(), 1);
246    }
247
248    #[test]
249    fn test_dequeue_empty() {
250        let mut pool = TidePoolAdapter::new();
251        assert!(pool.dequeue().is_none());
252    }
253
254    #[test]
255    fn test_fifo_order() {
256        let mut pool = TidePoolAdapter::new();
257        pool.enqueue(b"first");
258        pool.enqueue(b"second");
259        pool.enqueue(b"third");
260
261        assert_eq!(pool.dequeue().unwrap(), b"first");
262        assert_eq!(pool.dequeue().unwrap(), b"second");
263        assert_eq!(pool.dequeue().unwrap(), b"third");
264    }
265
266    #[test]
267    fn test_capacity_limit() {
268        let mut pool = TidePoolAdapter::with_capacity(2);
269        assert!(pool.enqueue(b"a"));
270        assert!(pool.enqueue(b"b"));
271        assert!(!pool.enqueue(b"c")); // over capacity
272        assert_eq!(pool.total_dropped(), 1);
273    }
274
275    #[test]
276    fn test_trusted_enqueue() {
277        let mut pool = TidePoolAdapter::new();
278        pool.enqueue_trusted(b"trusted", 0.9, 0.5);
279        pool.enqueue_trusted(b"untrusted", 0.1, 0.5);
280
281        // dequeue_priority should return trusted first
282        let first = pool.dequeue_priority().unwrap();
283        assert_eq!(first.payload, b"trusted");
284    }
285
286    #[test]
287    fn test_effective_priority() {
288        let high_trust = QueuedMessage::new(b"hi")
289            .with_trust(0.9).with_priority(0.5);
290        let low_trust = QueuedMessage::new(b"hi")
291            .with_trust(0.1).with_priority(0.5);
292
293        // 0.5 * (0.3 + 0.7 * 0.9) = 0.5 * 0.93 = 0.465
294        assert!((high_trust.effective_priority() - 0.465).abs() < 0.01);
295        // 0.5 * (0.3 + 0.7 * 0.1) = 0.5 * 0.37 = 0.185
296        assert!((low_trust.effective_priority() - 0.185).abs() < 0.01);
297    }
298
299    #[test]
300    fn test_priority_ordering() {
301        let mut pool = TidePoolAdapter::new();
302        pool.enqueue_trusted(b"low", 0.1, 0.5);
303        pool.enqueue_trusted(b"high", 0.9, 0.8);
304        pool.enqueue_trusted(b"mid", 0.5, 0.5);
305
306        let first = pool.dequeue_priority().unwrap();
307        assert_eq!(first.payload, b"high");
308    }
309
310    #[test]
311    fn test_drain() {
312        let mut pool = TidePoolAdapter::new();
313        pool.enqueue(b"a");
314        pool.enqueue(b"b");
315        pool.enqueue(b"c");
316
317        let msgs = pool.drain();
318        assert_eq!(msgs.len(), 3);
319        assert!(pool.buffer.is_empty());
320        assert_eq!(pool.total_dequeued(), 3);
321    }
322
323    #[test]
324    fn test_stats() {
325        let mut pool = TidePoolAdapter::with_capacity(1);
326        pool.enqueue(b"a");
327        pool.enqueue(b"b"); // dropped (over capacity)
328        pool.dequeue();
329
330        assert_eq!(pool.total_enqueued(), 1);  // only 'a' was actually enqueued
331        assert_eq!(pool.total_dequeued(), 1);
332        assert_eq!(pool.total_dropped(), 1);   // 'b' was dropped
333    }
334
335    #[test]
336    fn test_avg_buffer_trust() {
337        let mut pool = TidePoolAdapter::new();
338        pool.enqueue_trusted(b"a", 0.8, 0.5);
339        pool.enqueue_trusted(b"b", 0.4, 0.5);
340
341        let avg = pool.avg_buffer_trust();
342        assert!((avg - 0.6).abs() < 0.01);
343    }
344
345    #[test]
346    fn test_trust_router() {
347        let mut router = TrustRouter::new();
348        router.register_agent("oracle1", 100);
349        router.register_agent("jc1", 100);
350        router.set_trust("oracle1", 0.9);
351
352        assert!(router.route("oracle1", b"hello", 0.8));
353        assert_eq!(router.buffer_size("oracle1"), 1);
354        assert_eq!(router.buffer_size("jc1"), 0);
355
356        let msg = router.next_for("oracle1").unwrap();
357        assert_eq!(msg.payload, b"hello");
358    }
359
360    #[test]
361    fn test_trust_router_unknown_agent() {
362        let mut router = TrustRouter::new();
363        assert!(!router.route("nonexistent", b"msg", 0.5));
364    }
365
366    #[test]
367    fn test_trust_router_default_trust() {
368        let router = TrustRouter::with_default_trust(0.3);
369        assert_eq!(router.get_trust("unknown"), 0.3);
370        assert_eq!(router.get_trust("known"), 0.3); // not set
371    }
372
373    #[test]
374    fn test_trust_router_trust_weighted_delivery() {
375        let mut router = TrustRouter::new();
376        router.register_agent("dest", 100);
377
378        // Low trust sender
379        router.route("dest", b"low_priority", 0.1);
380        // High trust sender
381        router.route("dest", b"high_priority", 0.9);
382
383        let first = router.next_for("dest").unwrap();
384        assert_eq!(first.payload, b"high_priority");
385    }
386
387    #[test]
388    fn test_queued_message_clamping() {
389        let msg = QueuedMessage::new(b"test").with_priority(2.0).with_trust(-0.5);
390        assert_eq!(msg.priority, 1.0);
391        assert_eq!(msg.sender_trust, 0.0);
392    }
393}