1use std::collections::{HashMap, VecDeque};
10
11pub trait TidePoolLayer {
16 fn enqueue(&mut self, msg: &[u8]) -> bool;
17 fn dequeue(&mut self) -> Option<Vec<u8>>;
18 fn buffer_len(&self) -> usize;
19}
20
21#[derive(Debug, Clone)]
24pub struct QueuedMessage {
25 pub payload: Vec<u8>,
26 pub priority: f32, pub sender_trust: f32, pub enqueued_at: u64, pub hops: u32,
30}
31
32impl QueuedMessage {
33 pub fn new(payload: &[u8]) -> Self {
34 Self {
35 payload: payload.to_vec(),
36 priority: 0.5,
37 sender_trust: 0.5,
38 enqueued_at: nanos_now(),
39 hops: 0,
40 }
41 }
42
43 pub fn with_priority(mut self, p: f32) -> Self {
44 self.priority = p.max(0.0).min(1.0);
45 self
46 }
47
48 pub fn with_trust(mut self, t: f32) -> Self {
49 self.sender_trust = t.max(0.0).min(1.0);
50 self
51 }
52
53 pub fn effective_priority(&self) -> f32 {
55 self.priority * (0.3 + 0.7 * self.sender_trust)
56 }
57}
58
59#[derive(Debug, Clone)]
64pub struct TidePoolAdapter {
65 buffer: VecDeque<QueuedMessage>,
66 capacity: usize,
67 total_enqueued: u64,
68 total_dequeued: u64,
69 total_dropped: u64,
70}
71
72impl TidePoolAdapter {
73 pub fn new() -> Self {
74 Self { buffer: VecDeque::new(), capacity: 1024, total_enqueued: 0, total_dequeued: 0, total_dropped: 0 }
75 }
76
77 pub fn with_capacity(cap: usize) -> Self {
78 Self { buffer: VecDeque::new(), capacity: cap, total_enqueued: 0, total_dequeued: 0, total_dropped: 0 }
79 }
80
81 pub fn enqueue_trusted(&mut self, payload: &[u8], trust: f32, priority: f32) -> bool {
83 if self.buffer.len() >= self.capacity {
84 self.total_dropped += 1;
85 return false;
86 }
87 let msg = QueuedMessage::new(payload)
88 .with_trust(trust)
89 .with_priority(priority);
90 self.buffer.push_back(msg);
91 self.total_enqueued += 1;
92 true
93 }
94
95 pub fn dequeue_priority(&mut self) -> Option<QueuedMessage> {
97 if self.buffer.is_empty() { return None; }
98
99 let best_idx = (0..self.buffer.len())
101 .max_by(|a, b| {
102 self.buffer[*a].effective_priority()
103 .partial_cmp(&self.buffer[*b].effective_priority())
104 .unwrap_or(std::cmp::Ordering::Equal)
105 })?;
106
107 let msg = self.buffer.remove(best_idx)?;
108 self.total_dequeued += 1;
109 Some(msg)
110 }
111
112 pub fn total_enqueued(&self) -> u64 { self.total_enqueued }
114 pub fn total_dequeued(&self) -> u64 { self.total_dequeued }
115 pub fn total_dropped(&self) -> u64 { self.total_dropped }
116
117 pub fn avg_buffer_trust(&self) -> f32 {
119 if self.buffer.is_empty() { return 0.0; }
120 let sum: f32 = self.buffer.iter().map(|m| m.sender_trust).sum();
121 sum / self.buffer.len() as f32
122 }
123
124 pub fn drain(&mut self) -> Vec<QueuedMessage> {
126 let msgs: Vec<_> = self.buffer.drain(..).collect();
127 self.total_dequeued += msgs.len() as u64;
128 msgs
129 }
130}
131
132impl Default for TidePoolAdapter {
133 fn default() -> Self { Self::new() }
134}
135
136impl TidePoolLayer for TidePoolAdapter {
137 fn enqueue(&mut self, msg: &[u8]) -> bool {
138 if self.buffer.len() >= self.capacity {
139 self.total_dropped += 1;
140 return false;
141 }
142 self.buffer.push_back(QueuedMessage::new(msg));
143 self.total_enqueued += 1;
144 true
145 }
146
147 fn dequeue(&mut self) -> Option<Vec<u8>> {
148 let msg = self.buffer.pop_front()?;
149 self.total_dequeued += 1;
150 Some(msg.payload)
151 }
152
153 fn buffer_len(&self) -> usize {
154 self.buffer.len()
155 }
156}
157
158#[derive(Debug, Clone)]
163pub struct TrustRouter {
164 pools: HashMap<String, TidePoolAdapter>,
165 trust: HashMap<String, f32>, default_trust: f32,
167}
168
169impl TrustRouter {
170 pub fn new() -> Self {
171 Self { pools: HashMap::new(), trust: HashMap::new(), default_trust: 0.5 }
172 }
173
174 pub fn with_default_trust(t: f32) -> Self {
175 Self { pools: HashMap::new(), trust: HashMap::new(), default_trust: t }
176 }
177
178 pub fn register_agent(&mut self, agent_id: &str, capacity: usize) {
180 self.pools.insert(agent_id.to_string(), TidePoolAdapter::with_capacity(capacity));
181 }
182
183 pub fn set_trust(&mut self, agent_id: &str, trust: f32) {
185 self.trust.insert(agent_id.to_string(), trust.max(0.0).min(1.0));
186 }
187
188 pub fn get_trust(&self, agent_id: &str) -> f32 {
190 *self.trust.get(agent_id).unwrap_or(&self.default_trust)
191 }
192
193 pub fn route(&mut self, to_agent: &str, payload: &[u8], from_trust: f32) -> bool {
195 if let Some(pool) = self.pools.get_mut(to_agent) {
196 pool.enqueue_trusted(payload, from_trust, 0.5)
197 } else {
198 false
199 }
200 }
201
202 pub fn next_for(&mut self, agent_id: &str) -> Option<QueuedMessage> {
204 self.pools.get_mut(agent_id)?.dequeue_priority()
205 }
206
207 pub fn buffer_size(&self, agent_id: &str) -> usize {
209 self.pools.get(agent_id).map(|p| p.buffer_len()).unwrap_or(0)
210 }
211
212 pub fn total_buffered(&self) -> usize {
214 self.pools.values().map(|p| p.buffer_len()).sum()
215 }
216
217 pub fn agent_count(&self) -> usize { self.pools.len() }
219}
220
221impl Default for TrustRouter {
222 fn default() -> Self { Self::new() }
223}
224
225fn nanos_now() -> u64 {
226 use std::time::{SystemTime, UNIX_EPOCH};
227 SystemTime::now().duration_since(UNIX_EPOCH).map(|d| d.as_nanos() as u64).unwrap_or(0)
228}
229
230#[cfg(test)]
233mod tests {
234 use super::*;
235
236 #[test]
237 fn test_enqueue_dequeue() {
238 let mut pool = TidePoolAdapter::new();
239 assert!(pool.enqueue(b"hello"));
240 assert!(pool.enqueue(b"world"));
241 assert_eq!(pool.buffer_len(), 2);
242
243 let msg = pool.dequeue().unwrap();
244 assert_eq!(msg, b"hello");
245 assert_eq!(pool.buffer_len(), 1);
246 }
247
248 #[test]
249 fn test_dequeue_empty() {
250 let mut pool = TidePoolAdapter::new();
251 assert!(pool.dequeue().is_none());
252 }
253
254 #[test]
255 fn test_fifo_order() {
256 let mut pool = TidePoolAdapter::new();
257 pool.enqueue(b"first");
258 pool.enqueue(b"second");
259 pool.enqueue(b"third");
260
261 assert_eq!(pool.dequeue().unwrap(), b"first");
262 assert_eq!(pool.dequeue().unwrap(), b"second");
263 assert_eq!(pool.dequeue().unwrap(), b"third");
264 }
265
266 #[test]
267 fn test_capacity_limit() {
268 let mut pool = TidePoolAdapter::with_capacity(2);
269 assert!(pool.enqueue(b"a"));
270 assert!(pool.enqueue(b"b"));
271 assert!(!pool.enqueue(b"c")); assert_eq!(pool.total_dropped(), 1);
273 }
274
275 #[test]
276 fn test_trusted_enqueue() {
277 let mut pool = TidePoolAdapter::new();
278 pool.enqueue_trusted(b"trusted", 0.9, 0.5);
279 pool.enqueue_trusted(b"untrusted", 0.1, 0.5);
280
281 let first = pool.dequeue_priority().unwrap();
283 assert_eq!(first.payload, b"trusted");
284 }
285
286 #[test]
287 fn test_effective_priority() {
288 let high_trust = QueuedMessage::new(b"hi")
289 .with_trust(0.9).with_priority(0.5);
290 let low_trust = QueuedMessage::new(b"hi")
291 .with_trust(0.1).with_priority(0.5);
292
293 assert!((high_trust.effective_priority() - 0.465).abs() < 0.01);
295 assert!((low_trust.effective_priority() - 0.185).abs() < 0.01);
297 }
298
299 #[test]
300 fn test_priority_ordering() {
301 let mut pool = TidePoolAdapter::new();
302 pool.enqueue_trusted(b"low", 0.1, 0.5);
303 pool.enqueue_trusted(b"high", 0.9, 0.8);
304 pool.enqueue_trusted(b"mid", 0.5, 0.5);
305
306 let first = pool.dequeue_priority().unwrap();
307 assert_eq!(first.payload, b"high");
308 }
309
310 #[test]
311 fn test_drain() {
312 let mut pool = TidePoolAdapter::new();
313 pool.enqueue(b"a");
314 pool.enqueue(b"b");
315 pool.enqueue(b"c");
316
317 let msgs = pool.drain();
318 assert_eq!(msgs.len(), 3);
319 assert!(pool.buffer.is_empty());
320 assert_eq!(pool.total_dequeued(), 3);
321 }
322
323 #[test]
324 fn test_stats() {
325 let mut pool = TidePoolAdapter::with_capacity(1);
326 pool.enqueue(b"a");
327 pool.enqueue(b"b"); pool.dequeue();
329
330 assert_eq!(pool.total_enqueued(), 1); assert_eq!(pool.total_dequeued(), 1);
332 assert_eq!(pool.total_dropped(), 1); }
334
335 #[test]
336 fn test_avg_buffer_trust() {
337 let mut pool = TidePoolAdapter::new();
338 pool.enqueue_trusted(b"a", 0.8, 0.5);
339 pool.enqueue_trusted(b"b", 0.4, 0.5);
340
341 let avg = pool.avg_buffer_trust();
342 assert!((avg - 0.6).abs() < 0.01);
343 }
344
345 #[test]
346 fn test_trust_router() {
347 let mut router = TrustRouter::new();
348 router.register_agent("oracle1", 100);
349 router.register_agent("jc1", 100);
350 router.set_trust("oracle1", 0.9);
351
352 assert!(router.route("oracle1", b"hello", 0.8));
353 assert_eq!(router.buffer_size("oracle1"), 1);
354 assert_eq!(router.buffer_size("jc1"), 0);
355
356 let msg = router.next_for("oracle1").unwrap();
357 assert_eq!(msg.payload, b"hello");
358 }
359
360 #[test]
361 fn test_trust_router_unknown_agent() {
362 let mut router = TrustRouter::new();
363 assert!(!router.route("nonexistent", b"msg", 0.5));
364 }
365
366 #[test]
367 fn test_trust_router_default_trust() {
368 let router = TrustRouter::with_default_trust(0.3);
369 assert_eq!(router.get_trust("unknown"), 0.3);
370 assert_eq!(router.get_trust("known"), 0.3); }
372
373 #[test]
374 fn test_trust_router_trust_weighted_delivery() {
375 let mut router = TrustRouter::new();
376 router.register_agent("dest", 100);
377
378 router.route("dest", b"low_priority", 0.1);
380 router.route("dest", b"high_priority", 0.9);
382
383 let first = router.next_for("dest").unwrap();
384 assert_eq!(first.payload, b"high_priority");
385 }
386
387 #[test]
388 fn test_queued_message_clamping() {
389 let msg = QueuedMessage::new(b"test").with_priority(2.0).with_trust(-0.5);
390 assert_eq!(msg.priority, 1.0);
391 assert_eq!(msg.sender_trust, 0.0);
392 }
393}