1use std::collections::HashMap;
14use std::sync::{
15 atomic::{AtomicBool, Ordering},
16 Arc, Mutex, RwLock,
17};
18
19use host_encoding::statement_store::{blake2b_256, decode_statement, Statement, Topic};
20
21pub type RawStatement = Vec<u8>;
27
28pub trait StatementHandler: Send + Sync {
34 fn on_statement(&self, statement: &Statement, raw: &[u8]) -> Result<(), String>;
39}
40
41pub trait StatementTransport: Send + Sync {
43 fn subscribe(
51 &self,
52 topics: &[Topic],
53 on_statement: Arc<dyn Fn(RawStatement) + Send + Sync>,
54 on_disconnect: Arc<dyn Fn() + Send + Sync>,
55 ) -> Box<dyn SubscriptionToken>;
56}
57
58pub trait SubscriptionToken: Send {}
62
63#[derive(Debug, Clone)]
69pub struct SubscriptionConfig {
70 pub dedup_cache_size: usize,
73 pub reconnect_delay_ms: u64,
76}
77
78impl Default for SubscriptionConfig {
79 fn default() -> Self {
80 Self {
81 dedup_cache_size: 8192,
82 reconnect_delay_ms: 3000,
83 }
84 }
85}
86
87struct DedupState {
99 cache: HashMap<[u8; 32], u32>,
100 order: Vec<[u8; 32]>,
101}
102
103pub struct StatementStoreSubscription<T: StatementTransport> {
104 #[cfg_attr(target_arch = "wasm32", allow(dead_code))]
107 transport: Arc<T>,
108 config: SubscriptionConfig,
109 topics: RwLock<Vec<Topic>>,
110 handlers: Mutex<Vec<Arc<dyn StatementHandler>>>,
111 dedup: Mutex<DedupState>,
112 running: Arc<AtomicBool>,
113}
114
115impl<T: StatementTransport> std::fmt::Debug for StatementStoreSubscription<T> {
116 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
117 f.debug_struct("StatementStoreSubscription")
118 .field("config", &self.config)
119 .field("running", &self.running.load(Ordering::Relaxed))
120 .finish()
121 }
122}
123
124impl<T: StatementTransport> StatementStoreSubscription<T> {
125 pub fn new(transport: T, config: SubscriptionConfig) -> Self {
127 Self {
128 transport: Arc::new(transport),
129 config,
130 topics: RwLock::new(Vec::new()),
131 handlers: Mutex::new(Vec::new()),
132 dedup: Mutex::new(DedupState {
133 cache: HashMap::new(),
134 order: Vec::new(),
135 }),
136 running: Arc::new(AtomicBool::new(false)),
137 }
138 }
139
140 pub fn add_handler(&self, handler: Arc<dyn StatementHandler>) {
142 self.handlers
143 .lock()
144 .unwrap_or_else(|e| e.into_inner())
145 .push(handler);
146 }
147
148 pub fn add_topics(&self, new_topics: &[Topic]) {
153 let mut topics = self.topics.write().unwrap_or_else(|e| e.into_inner());
154 for t in new_topics {
155 if !topics.contains(t) {
156 topics.push(*t);
157 }
158 }
159 }
160
161 pub fn is_running(&self) -> bool {
163 self.running.load(Ordering::Relaxed)
164 }
165
166 pub fn remove_topics(&self, to_remove: &[Topic]) {
171 let mut topics = self.topics.write().unwrap_or_else(|e| e.into_inner());
172 topics.retain(|t| !to_remove.contains(t));
173 }
174
175 #[cfg(target_arch = "wasm32")]
177 pub fn start(self: &Arc<Self>) {
178 self.running.store(true, Ordering::Relaxed);
179 }
180
181 pub fn stop(&self) {
183 self.running.store(false, Ordering::Relaxed);
184 }
185
186 pub fn deliver(&self, raw: &[u8]) {
195 let statement = match decode_statement(raw) {
196 Ok(s) => s,
197 Err(e) => {
198 log::warn!("[subscription] decode failed: {e}");
199 return;
200 }
201 };
202
203 let dedup_key = blake2b_256(&statement.data);
205
206 if !self.should_deliver(&dedup_key, statement.priority) {
207 return;
208 }
209
210 self.dispatch_to_handlers(&statement, raw);
211 }
212
213 fn should_deliver(&self, dedup_key: &[u8; 32], priority: u32) -> bool {
221 let mut state = self.dedup.lock().unwrap_or_else(|e| e.into_inner());
222
223 if let Some(&prev_priority) = state.cache.get(dedup_key) {
224 if priority <= prev_priority {
225 return false;
226 }
227 state.cache.insert(*dedup_key, priority);
229 return true;
230 }
231
232 state.cache.insert(*dedup_key, priority);
234 state.order.push(*dedup_key);
235
236 if state.order.len() > self.config.dedup_cache_size {
238 let drain_count = state.order.len() / 2;
239 let evicted: Vec<[u8; 32]> = state.order.drain(..drain_count).collect();
240 for key in &evicted {
241 state.cache.remove(key);
242 }
243 }
244
245 true
246 }
247
248 fn dispatch_to_handlers(&self, statement: &Statement, raw: &[u8]) {
250 let handlers = self.handlers.lock().unwrap_or_else(|e| e.into_inner());
251 for handler in handlers.iter() {
252 if let Err(e) = handler.on_statement(statement, raw) {
253 log::warn!("[subscription] handler error: {e}");
254 }
255 }
256 }
257}
258
259#[cfg(not(target_arch = "wasm32"))]
265impl<T: StatementTransport + 'static> StatementStoreSubscription<T> {
266 pub fn start(self: &Arc<Self>) {
270 if self.running.swap(true, Ordering::Relaxed) {
271 return;
272 }
273 let sub = self.clone();
274 std::thread::spawn(move || reconnect_loop(sub));
275 }
276}
277
278#[cfg(not(target_arch = "wasm32"))]
287fn reconnect_loop<T: StatementTransport + 'static>(sub: Arc<StatementStoreSubscription<T>>) {
288 use std::time::Duration;
289
290 while sub.running.load(Ordering::Relaxed) {
291 let topics = sub.topics.read().unwrap_or_else(|e| e.into_inner()).clone();
292
293 if topics.is_empty() {
294 std::thread::sleep(Duration::from_millis(sub.config.reconnect_delay_ms));
295 continue;
296 }
297
298 let sub_clone = sub.clone();
299 let on_statement = Arc::new(move |raw: RawStatement| {
300 sub_clone.deliver(&raw);
301 });
302
303 let disconnected = Arc::new(AtomicBool::new(false));
304 let disc_clone = disconnected.clone();
305 let on_disconnect = Arc::new(move || {
306 disc_clone.store(true, Ordering::Relaxed);
307 });
308
309 let _token = sub
310 .transport
311 .subscribe(&topics, on_statement, on_disconnect);
312
313 while sub.running.load(Ordering::Relaxed) && !disconnected.load(Ordering::Relaxed) {
315 std::thread::sleep(Duration::from_millis(200));
316 }
317
318 if sub.running.load(Ordering::Relaxed) {
319 log::info!(
320 "[subscription] disconnected, reconnecting in {}ms",
321 sub.config.reconnect_delay_ms
322 );
323 std::thread::sleep(Duration::from_millis(sub.config.reconnect_delay_ms));
324 }
325 }
326
327 log::info!("[subscription] reconnect loop stopped");
328}
329
330#[cfg(test)]
335mod tests {
336 use super::*;
337 use host_encoding::statement_store::{
338 assemble_statement, build_signing_payload, string_to_topic,
339 };
340 use std::sync::atomic::AtomicUsize;
341
342 fn make_statement_bytes(data: &[u8], priority: u32) -> Vec<u8> {
348 let topic = string_to_topic("test-topic");
349 let pubkey = [0xabu8; 32];
350 let fake_sig = [0xcdu8; 64];
351 let (payload, num_fields) =
352 build_signing_payload(1_700_000_000, None, None, priority, &[topic], data).unwrap();
353 assemble_statement(&payload, num_fields, &pubkey, &fake_sig)
354 }
355
356 struct StubTransport {
359 subscribe_count: Arc<AtomicUsize>,
360 }
361
362 impl StubTransport {
363 fn new() -> (Self, Arc<AtomicUsize>) {
364 let count = Arc::new(AtomicUsize::new(0));
365 (
366 Self {
367 subscribe_count: count.clone(),
368 },
369 count,
370 )
371 }
372 }
373
374 struct StubToken;
375 impl SubscriptionToken for StubToken {}
376
377 impl StatementTransport for StubTransport {
378 fn subscribe(
379 &self,
380 _topics: &[Topic],
381 _on_statement: Arc<dyn Fn(RawStatement) + Send + Sync>,
382 _on_disconnect: Arc<dyn Fn() + Send + Sync>,
383 ) -> Box<dyn SubscriptionToken> {
384 self.subscribe_count
385 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
386 Box::new(StubToken)
387 }
388 }
389
390 struct RecordingHandler {
392 received: Arc<Mutex<Vec<Vec<u8>>>>,
393 force_error: Option<String>,
395 }
396
397 impl RecordingHandler {
398 fn new() -> (Self, Arc<Mutex<Vec<Vec<u8>>>>) {
399 let received = Arc::new(Mutex::new(Vec::new()));
400 (
401 Self {
402 received: received.clone(),
403 force_error: None,
404 },
405 received,
406 )
407 }
408
409 fn with_error(err: &str) -> Self {
410 let (mut h, _) = Self::new();
411 h.force_error = Some(err.to_owned());
412 h
413 }
414 }
415
416 impl StatementHandler for RecordingHandler {
417 fn on_statement(&self, _statement: &Statement, raw: &[u8]) -> Result<(), String> {
418 self.received
419 .lock()
420 .unwrap_or_else(|e| e.into_inner())
421 .push(raw.to_vec());
422 if let Some(e) = &self.force_error {
423 return Err(e.clone());
424 }
425 Ok(())
426 }
427 }
428
429 fn make_sub() -> Arc<StatementStoreSubscription<StubTransport>> {
430 let (transport, _) = StubTransport::new();
431 Arc::new(StatementStoreSubscription::new(
432 transport,
433 SubscriptionConfig::default(),
434 ))
435 }
436
437 #[test]
442 fn test_delivers_statement_to_handler() {
443 let sub = make_sub();
444 let (handler, received) = RecordingHandler::new();
445 sub.add_handler(Arc::new(handler));
446
447 let raw = make_statement_bytes(b"hello", 0);
448 sub.deliver(&raw);
449
450 let received = received.lock().unwrap_or_else(|e| e.into_inner());
451 assert_eq!(received.len(), 1);
452 assert_eq!(received[0], raw);
453 }
454
455 #[test]
456 fn test_dispatches_to_multiple_handlers_in_order() {
457 let sub = make_sub();
458 let (h1, r1) = RecordingHandler::new();
459 let (h2, r2) = RecordingHandler::new();
460 sub.add_handler(Arc::new(h1));
461 sub.add_handler(Arc::new(h2));
462
463 let raw = make_statement_bytes(b"multi", 0);
464 sub.deliver(&raw);
465
466 assert_eq!(
467 r1.lock().unwrap_or_else(|e| e.into_inner()).len(),
468 1,
469 "first handler must receive statement"
470 );
471 assert_eq!(
472 r2.lock().unwrap_or_else(|e| e.into_inner()).len(),
473 1,
474 "second handler must receive statement"
475 );
476 }
477
478 #[test]
479 fn test_handler_error_does_not_stop_other_handlers() {
480 let sub = make_sub();
481 let failing = RecordingHandler::with_error("intentional error");
482 let (passing, received) = RecordingHandler::new();
483 sub.add_handler(Arc::new(failing));
484 sub.add_handler(Arc::new(passing));
485
486 let raw = make_statement_bytes(b"error-test", 0);
487 sub.deliver(&raw);
488
489 assert_eq!(
491 received.lock().unwrap_or_else(|e| e.into_inner()).len(),
492 1,
493 "handler after a failing one must still be called"
494 );
495 }
496
497 #[test]
498 fn test_dedup_skips_same_priority() {
499 let sub = make_sub();
500 let (handler, received) = RecordingHandler::new();
501 sub.add_handler(Arc::new(handler));
502
503 let raw = make_statement_bytes(b"dedup-data", 5);
504 sub.deliver(&raw);
505 sub.deliver(&raw); assert_eq!(
508 received.lock().unwrap_or_else(|e| e.into_inner()).len(),
509 1,
510 "identical statement must not be delivered twice"
511 );
512 }
513
514 #[test]
515 fn test_dedup_redelivers_higher_priority() {
516 let sub = make_sub();
517 let (handler, received) = RecordingHandler::new();
518 sub.add_handler(Arc::new(handler));
519
520 let data = b"priority-data";
521 let low = make_statement_bytes(data, 0);
522 let high = make_statement_bytes(data, 1);
523
524 sub.deliver(&low);
525 sub.deliver(&high); assert_eq!(
528 received.lock().unwrap_or_else(|e| e.into_inner()).len(),
529 2,
530 "higher-priority statement with same data must be redelivered"
531 );
532 }
533
534 #[test]
535 fn test_dedup_evicts_when_cache_full() {
536 let (transport, _) = StubTransport::new();
537 let config = SubscriptionConfig {
538 dedup_cache_size: 4,
539 reconnect_delay_ms: 3000,
540 };
541 let sub = Arc::new(StatementStoreSubscription::new(transport, config));
542 let (handler, received) = RecordingHandler::new();
543 sub.add_handler(Arc::new(handler));
544
545 for i in 0u8..5 {
547 let raw = make_statement_bytes(&[i], 0);
548 sub.deliver(&raw);
549 }
550
551 let count = received.lock().unwrap_or_else(|e| e.into_inner()).len();
552 assert_eq!(count, 5, "all 5 unique statements must be delivered");
553
554 let cache_len = sub
556 .dedup
557 .lock()
558 .unwrap_or_else(|e| e.into_inner())
559 .cache
560 .len();
561 assert!(
562 cache_len <= 4,
563 "dedup cache must not exceed cache_size after eviction, got {cache_len}"
564 );
565 }
566
567 #[test]
568 fn test_add_topics_deduplicates() {
569 let sub = make_sub();
570 let topic = string_to_topic("my-topic");
571
572 sub.add_topics(&[topic]);
573 sub.add_topics(&[topic]); let topics = sub.topics.read().unwrap_or_else(|e| e.into_inner());
576 assert_eq!(
577 topics.len(),
578 1,
579 "duplicate topic must not be added a second time"
580 );
581 }
582
583 #[test]
584 fn test_remove_topics() {
585 let sub = make_sub();
586 let topic = string_to_topic("removable");
587
588 sub.add_topics(&[topic]);
589 sub.remove_topics(&[topic]);
590
591 let topics = sub.topics.read().unwrap_or_else(|e| e.into_inner());
592 assert!(topics.is_empty(), "topic must be removed");
593 }
594
595 #[test]
596 fn test_deliver_with_malformed_bytes_logs_warning() {
597 let sub = make_sub();
598 let (handler, received) = RecordingHandler::new();
599 sub.add_handler(Arc::new(handler));
600
601 sub.deliver(&[0xff, 0x00, 0xde, 0xad]);
603
604 assert!(
605 received
606 .lock()
607 .unwrap_or_else(|e| e.into_inner())
608 .is_empty(),
609 "malformed statement must not reach handlers"
610 );
611 }
612
613 #[test]
614 fn test_deliver_with_no_handlers() {
615 let sub = make_sub();
616 let raw = make_statement_bytes(b"no-handlers", 0);
618 sub.deliver(&raw);
619 }
620}