Skip to main content

host_chain_core/
subscription.rs

1//! Generic statement-store subscription — trait definitions and
2//! [`StatementStoreSubscription`] reconnect/dedup/dispatch infrastructure.
3//!
4//! # Design
5//!
6//! - [`StatementTransport`] abstracts the transport layer (WebSocket, RPC bridge,
7//!   test stub).  Implementors call back via `on_statement` / `on_disconnect`.
8//! - [`StatementHandler`] receives decoded statements after dedup.
9//! - [`StatementStoreSubscription`] wires them together: it holds a dedup cache,
10//!   a handler list, and on native targets drives a reconnect loop on a
11//!   background thread.  On WASM the JS host drives delivery via [`StatementStoreSubscription::deliver`].
12
13use std::collections::HashMap;
14use std::sync::{
15    atomic::{AtomicBool, Ordering},
16    Arc, Mutex, RwLock,
17};
18
19use host_encoding::statement_store::{blake2b_256, decode_statement, Statement, Topic};
20
21// ---------------------------------------------------------------------------
22// Public type aliases
23// ---------------------------------------------------------------------------
24
25/// A raw SCALE-encoded statement received from the transport.
26pub type RawStatement = Vec<u8>;
27
28// ---------------------------------------------------------------------------
29// Traits
30// ---------------------------------------------------------------------------
31
32/// Receives decoded statements after dedup.
33pub trait StatementHandler: Send + Sync {
34    /// Called for each unique statement that passes the dedup filter.
35    ///
36    /// Return `Err` to signal a handler-level problem; other handlers are
37    /// still called and the error is logged at `warn` level.
38    fn on_statement(&self, statement: &Statement, raw: &[u8]) -> Result<(), String>;
39}
40
41/// Abstracts the transport layer used to receive statements.
42pub trait StatementTransport: Send + Sync {
43    /// Subscribe to the given topics.
44    ///
45    /// - `on_statement` is invoked for every raw statement received.
46    /// - `on_disconnect` is invoked exactly once when the connection drops.
47    ///
48    /// The returned [`SubscriptionToken`] controls the subscription lifetime;
49    /// dropping it cancels the subscription.
50    fn subscribe(
51        &self,
52        topics: &[Topic],
53        on_statement: Arc<dyn Fn(RawStatement) + Send + Sync>,
54        on_disconnect: Arc<dyn Fn() + Send + Sync>,
55    ) -> Box<dyn SubscriptionToken>;
56}
57
58/// RAII handle for an active subscription.
59///
60/// Dropping the token cancels the subscription.
61pub trait SubscriptionToken: Send {}
62
63// ---------------------------------------------------------------------------
64// Configuration
65// ---------------------------------------------------------------------------
66
67/// Tuning parameters for [`StatementStoreSubscription`].
68#[derive(Debug, Clone)]
69pub struct SubscriptionConfig {
70    /// Maximum number of dedup entries before the oldest half is evicted.
71    /// Default: 8192.
72    pub dedup_cache_size: usize,
73    /// Milliseconds to wait before reconnecting after a disconnect.
74    /// Default: 3000.
75    pub reconnect_delay_ms: u64,
76}
77
78impl Default for SubscriptionConfig {
79    fn default() -> Self {
80        Self {
81            dedup_cache_size: 8192,
82            reconnect_delay_ms: 3000,
83        }
84    }
85}
86
87// ---------------------------------------------------------------------------
88// StatementStoreSubscription
89// ---------------------------------------------------------------------------
90
91/// Manages topics, handlers, dedup, and reconnection for statement delivery.
92///
93/// Call [`StatementStoreSubscription::start`] to begin receiving statements.
94/// On native targets a background thread drives the reconnect loop; on WASM
95/// the JS host must call [`StatementStoreSubscription::deliver`] directly.
96/// Combined dedup state under a single lock to avoid TOCTOU races
97/// between the cache and insertion-order vec.
98struct DedupState {
99    cache: HashMap<[u8; 32], u32>,
100    order: Vec<[u8; 32]>,
101}
102
103pub struct StatementStoreSubscription<T: StatementTransport> {
104    // On WASM the transport is unused because delivery is push-based; on native
105    // the reconnect loop calls transport.subscribe() on each reconnect cycle.
106    #[cfg_attr(target_arch = "wasm32", allow(dead_code))]
107    transport: Arc<T>,
108    config: SubscriptionConfig,
109    topics: RwLock<Vec<Topic>>,
110    handlers: Mutex<Vec<Arc<dyn StatementHandler>>>,
111    dedup: Mutex<DedupState>,
112    running: Arc<AtomicBool>,
113}
114
115impl<T: StatementTransport> std::fmt::Debug for StatementStoreSubscription<T> {
116    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
117        f.debug_struct("StatementStoreSubscription")
118            .field("config", &self.config)
119            .field("running", &self.running.load(Ordering::Relaxed))
120            .finish()
121    }
122}
123
124impl<T: StatementTransport> StatementStoreSubscription<T> {
125    /// Create a new subscription manager with the given transport and config.
126    pub fn new(transport: T, config: SubscriptionConfig) -> Self {
127        Self {
128            transport: Arc::new(transport),
129            config,
130            topics: RwLock::new(Vec::new()),
131            handlers: Mutex::new(Vec::new()),
132            dedup: Mutex::new(DedupState {
133                cache: HashMap::new(),
134                order: Vec::new(),
135            }),
136            running: Arc::new(AtomicBool::new(false)),
137        }
138    }
139
140    /// Register a handler to receive decoded statements.
141    pub fn add_handler(&self, handler: Arc<dyn StatementHandler>) {
142        self.handlers
143            .lock()
144            .unwrap_or_else(|e| e.into_inner())
145            .push(handler);
146    }
147
148    /// Add topics to subscribe to, deduplicating against existing topics.
149    ///
150    /// If the subscription is already running, the reconnect loop will pick
151    /// up the new topic set on the next reconnect cycle.
152    pub fn add_topics(&self, new_topics: &[Topic]) {
153        let mut topics = self.topics.write().unwrap_or_else(|e| e.into_inner());
154        for t in new_topics {
155            if !topics.contains(t) {
156                topics.push(*t);
157            }
158        }
159    }
160
161    /// Whether the subscription loop is running.
162    pub fn is_running(&self) -> bool {
163        self.running.load(Ordering::Relaxed)
164    }
165
166    /// Remove topics from the subscription list.
167    ///
168    /// If the subscription is already running, the reconnect loop will pick
169    /// up the reduced topic set on the next reconnect cycle.
170    pub fn remove_topics(&self, to_remove: &[Topic]) {
171        let mut topics = self.topics.write().unwrap_or_else(|e| e.into_inner());
172        topics.retain(|t| !to_remove.contains(t));
173    }
174
175    /// Start the subscription (WASM target: no-op; JS host drives delivery).
176    #[cfg(target_arch = "wasm32")]
177    pub fn start(self: &Arc<Self>) {
178        self.running.store(true, Ordering::Relaxed);
179    }
180
181    /// Stop the subscription.  Idempotent.
182    pub fn stop(&self) {
183        self.running.store(false, Ordering::Relaxed);
184    }
185
186    /// Decode, dedup, and dispatch a raw statement to all registered handlers.
187    ///
188    /// This is the hot path. Signature verification is the handler's
189    /// responsibility; this layer only performs structural decoding and dedup.
190    ///
191    /// On WASM the JS host calls this directly via `pushStatement()`.
192    /// On native the reconnect loop calls this from the background thread.
193    /// Callers should check `is_running()` if they need stop semantics.
194    pub fn deliver(&self, raw: &[u8]) {
195        let statement = match decode_statement(raw) {
196            Ok(s) => s,
197            Err(e) => {
198                log::warn!("[subscription] decode failed: {e}");
199                return;
200            }
201        };
202
203        // Dedup key is blake2b_256 of the statement data field.
204        let dedup_key = blake2b_256(&statement.data);
205
206        if !self.should_deliver(&dedup_key, statement.priority) {
207            return;
208        }
209
210        self.dispatch_to_handlers(&statement, raw);
211    }
212
213    // -----------------------------------------------------------------------
214    // Private helpers
215    // -----------------------------------------------------------------------
216
217    /// Check the dedup cache and update it if delivery should proceed.
218    ///
219    /// Returns `true` if the statement should be delivered to handlers.
220    fn should_deliver(&self, dedup_key: &[u8; 32], priority: u32) -> bool {
221        let mut state = self.dedup.lock().unwrap_or_else(|e| e.into_inner());
222
223        if let Some(&prev_priority) = state.cache.get(dedup_key) {
224            if priority <= prev_priority {
225                return false;
226            }
227            // Higher priority update — replace value but don't re-add to order vec
228            state.cache.insert(*dedup_key, priority);
229            return true;
230        }
231
232        // New key — insert into both cache and order
233        state.cache.insert(*dedup_key, priority);
234        state.order.push(*dedup_key);
235
236        // Evict oldest half when cache grows beyond the configured limit.
237        if state.order.len() > self.config.dedup_cache_size {
238            let drain_count = state.order.len() / 2;
239            let evicted: Vec<[u8; 32]> = state.order.drain(..drain_count).collect();
240            for key in &evicted {
241                state.cache.remove(key);
242            }
243        }
244
245        true
246    }
247
248    /// Dispatch to all registered handlers, logging but not aborting on errors.
249    fn dispatch_to_handlers(&self, statement: &Statement, raw: &[u8]) {
250        let handlers = self.handlers.lock().unwrap_or_else(|e| e.into_inner());
251        for handler in handlers.iter() {
252            if let Err(e) = handler.on_statement(statement, raw) {
253                log::warn!("[subscription] handler error: {e}");
254            }
255        }
256    }
257}
258
259// ---------------------------------------------------------------------------
260// Native start — separate impl to carry the 'static bound needed for
261// std::thread::spawn without forcing it onto every other method.
262// ---------------------------------------------------------------------------
263
264#[cfg(not(target_arch = "wasm32"))]
265impl<T: StatementTransport + 'static> StatementStoreSubscription<T> {
266    /// Start the subscription on a background reconnect thread.
267    ///
268    /// Idempotent: calling `start` on an already-running subscription is a no-op.
269    pub fn start(self: &Arc<Self>) {
270        if self.running.swap(true, Ordering::Relaxed) {
271            return;
272        }
273        let sub = self.clone();
274        std::thread::spawn(move || reconnect_loop(sub));
275    }
276}
277
278// ---------------------------------------------------------------------------
279// Native reconnect loop
280// ---------------------------------------------------------------------------
281
282/// Reconnect loop — runs on a dedicated background thread (native only).
283///
284/// Subscribes via the transport, waits for disconnect, then waits
285/// `reconnect_delay_ms` before retrying.  Exits when `running` is false.
286#[cfg(not(target_arch = "wasm32"))]
287fn reconnect_loop<T: StatementTransport + 'static>(sub: Arc<StatementStoreSubscription<T>>) {
288    use std::time::Duration;
289
290    while sub.running.load(Ordering::Relaxed) {
291        let topics = sub.topics.read().unwrap_or_else(|e| e.into_inner()).clone();
292
293        if topics.is_empty() {
294            std::thread::sleep(Duration::from_millis(sub.config.reconnect_delay_ms));
295            continue;
296        }
297
298        let sub_clone = sub.clone();
299        let on_statement = Arc::new(move |raw: RawStatement| {
300            sub_clone.deliver(&raw);
301        });
302
303        let disconnected = Arc::new(AtomicBool::new(false));
304        let disc_clone = disconnected.clone();
305        let on_disconnect = Arc::new(move || {
306            disc_clone.store(true, Ordering::Relaxed);
307        });
308
309        let _token = sub
310            .transport
311            .subscribe(&topics, on_statement, on_disconnect);
312
313        // Poll until either the subscription is stopped or a disconnect occurs.
314        while sub.running.load(Ordering::Relaxed) && !disconnected.load(Ordering::Relaxed) {
315            std::thread::sleep(Duration::from_millis(200));
316        }
317
318        if sub.running.load(Ordering::Relaxed) {
319            log::info!(
320                "[subscription] disconnected, reconnecting in {}ms",
321                sub.config.reconnect_delay_ms
322            );
323            std::thread::sleep(Duration::from_millis(sub.config.reconnect_delay_ms));
324        }
325    }
326
327    log::info!("[subscription] reconnect loop stopped");
328}
329
330// ---------------------------------------------------------------------------
331// Tests
332// ---------------------------------------------------------------------------
333
334#[cfg(test)]
335mod tests {
336    use super::*;
337    use host_encoding::statement_store::{
338        assemble_statement, build_signing_payload, string_to_topic,
339    };
340    use std::sync::atomic::AtomicUsize;
341
342    // -----------------------------------------------------------------------
343    // Test helpers
344    // -----------------------------------------------------------------------
345
346    /// Build a valid encoded statement with the given data and priority.
347    fn make_statement_bytes(data: &[u8], priority: u32) -> Vec<u8> {
348        let topic = string_to_topic("test-topic");
349        let pubkey = [0xabu8; 32];
350        let fake_sig = [0xcdu8; 64];
351        let (payload, num_fields) =
352            build_signing_payload(1_700_000_000, None, None, priority, &[topic], data).unwrap();
353        assemble_statement(&payload, num_fields, &pubkey, &fake_sig)
354    }
355
356    /// A transport stub that records how many times `subscribe` was called
357    /// but never delivers any statements.
358    struct StubTransport {
359        subscribe_count: Arc<AtomicUsize>,
360    }
361
362    impl StubTransport {
363        fn new() -> (Self, Arc<AtomicUsize>) {
364            let count = Arc::new(AtomicUsize::new(0));
365            (
366                Self {
367                    subscribe_count: count.clone(),
368                },
369                count,
370            )
371        }
372    }
373
374    struct StubToken;
375    impl SubscriptionToken for StubToken {}
376
377    impl StatementTransport for StubTransport {
378        fn subscribe(
379            &self,
380            _topics: &[Topic],
381            _on_statement: Arc<dyn Fn(RawStatement) + Send + Sync>,
382            _on_disconnect: Arc<dyn Fn() + Send + Sync>,
383        ) -> Box<dyn SubscriptionToken> {
384            self.subscribe_count
385                .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
386            Box::new(StubToken)
387        }
388    }
389
390    /// Records every statement it receives, in order.
391    struct RecordingHandler {
392        received: Arc<Mutex<Vec<Vec<u8>>>>,
393        /// Optional forced error to return from `on_statement`.
394        force_error: Option<String>,
395    }
396
397    impl RecordingHandler {
398        fn new() -> (Self, Arc<Mutex<Vec<Vec<u8>>>>) {
399            let received = Arc::new(Mutex::new(Vec::new()));
400            (
401                Self {
402                    received: received.clone(),
403                    force_error: None,
404                },
405                received,
406            )
407        }
408
409        fn with_error(err: &str) -> Self {
410            let (mut h, _) = Self::new();
411            h.force_error = Some(err.to_owned());
412            h
413        }
414    }
415
416    impl StatementHandler for RecordingHandler {
417        fn on_statement(&self, _statement: &Statement, raw: &[u8]) -> Result<(), String> {
418            self.received
419                .lock()
420                .unwrap_or_else(|e| e.into_inner())
421                .push(raw.to_vec());
422            if let Some(e) = &self.force_error {
423                return Err(e.clone());
424            }
425            Ok(())
426        }
427    }
428
429    fn make_sub() -> Arc<StatementStoreSubscription<StubTransport>> {
430        let (transport, _) = StubTransport::new();
431        Arc::new(StatementStoreSubscription::new(
432            transport,
433            SubscriptionConfig::default(),
434        ))
435    }
436
437    // -----------------------------------------------------------------------
438    // Tests
439    // -----------------------------------------------------------------------
440
441    #[test]
442    fn test_delivers_statement_to_handler() {
443        let sub = make_sub();
444        let (handler, received) = RecordingHandler::new();
445        sub.add_handler(Arc::new(handler));
446
447        let raw = make_statement_bytes(b"hello", 0);
448        sub.deliver(&raw);
449
450        let received = received.lock().unwrap_or_else(|e| e.into_inner());
451        assert_eq!(received.len(), 1);
452        assert_eq!(received[0], raw);
453    }
454
455    #[test]
456    fn test_dispatches_to_multiple_handlers_in_order() {
457        let sub = make_sub();
458        let (h1, r1) = RecordingHandler::new();
459        let (h2, r2) = RecordingHandler::new();
460        sub.add_handler(Arc::new(h1));
461        sub.add_handler(Arc::new(h2));
462
463        let raw = make_statement_bytes(b"multi", 0);
464        sub.deliver(&raw);
465
466        assert_eq!(
467            r1.lock().unwrap_or_else(|e| e.into_inner()).len(),
468            1,
469            "first handler must receive statement"
470        );
471        assert_eq!(
472            r2.lock().unwrap_or_else(|e| e.into_inner()).len(),
473            1,
474            "second handler must receive statement"
475        );
476    }
477
478    #[test]
479    fn test_handler_error_does_not_stop_other_handlers() {
480        let sub = make_sub();
481        let failing = RecordingHandler::with_error("intentional error");
482        let (passing, received) = RecordingHandler::new();
483        sub.add_handler(Arc::new(failing));
484        sub.add_handler(Arc::new(passing));
485
486        let raw = make_statement_bytes(b"error-test", 0);
487        sub.deliver(&raw);
488
489        // The second handler must still be called despite the first returning Err.
490        assert_eq!(
491            received.lock().unwrap_or_else(|e| e.into_inner()).len(),
492            1,
493            "handler after a failing one must still be called"
494        );
495    }
496
497    #[test]
498    fn test_dedup_skips_same_priority() {
499        let sub = make_sub();
500        let (handler, received) = RecordingHandler::new();
501        sub.add_handler(Arc::new(handler));
502
503        let raw = make_statement_bytes(b"dedup-data", 5);
504        sub.deliver(&raw);
505        sub.deliver(&raw); // identical bytes → same dedup key and priority
506
507        assert_eq!(
508            received.lock().unwrap_or_else(|e| e.into_inner()).len(),
509            1,
510            "identical statement must not be delivered twice"
511        );
512    }
513
514    #[test]
515    fn test_dedup_redelivers_higher_priority() {
516        let sub = make_sub();
517        let (handler, received) = RecordingHandler::new();
518        sub.add_handler(Arc::new(handler));
519
520        let data = b"priority-data";
521        let low = make_statement_bytes(data, 0);
522        let high = make_statement_bytes(data, 1);
523
524        sub.deliver(&low);
525        sub.deliver(&high); // same data, higher priority → should be delivered
526
527        assert_eq!(
528            received.lock().unwrap_or_else(|e| e.into_inner()).len(),
529            2,
530            "higher-priority statement with same data must be redelivered"
531        );
532    }
533
534    #[test]
535    fn test_dedup_evicts_when_cache_full() {
536        let (transport, _) = StubTransport::new();
537        let config = SubscriptionConfig {
538            dedup_cache_size: 4,
539            reconnect_delay_ms: 3000,
540        };
541        let sub = Arc::new(StatementStoreSubscription::new(transport, config));
542        let (handler, received) = RecordingHandler::new();
543        sub.add_handler(Arc::new(handler));
544
545        // Deliver 5 unique statements (cache limit = 4, so eviction fires).
546        for i in 0u8..5 {
547            let raw = make_statement_bytes(&[i], 0);
548            sub.deliver(&raw);
549        }
550
551        let count = received.lock().unwrap_or_else(|e| e.into_inner()).len();
552        assert_eq!(count, 5, "all 5 unique statements must be delivered");
553
554        // Cache must not have grown unbounded.
555        let cache_len = sub
556            .dedup
557            .lock()
558            .unwrap_or_else(|e| e.into_inner())
559            .cache
560            .len();
561        assert!(
562            cache_len <= 4,
563            "dedup cache must not exceed cache_size after eviction, got {cache_len}"
564        );
565    }
566
567    #[test]
568    fn test_add_topics_deduplicates() {
569        let sub = make_sub();
570        let topic = string_to_topic("my-topic");
571
572        sub.add_topics(&[topic]);
573        sub.add_topics(&[topic]); // duplicate
574
575        let topics = sub.topics.read().unwrap_or_else(|e| e.into_inner());
576        assert_eq!(
577            topics.len(),
578            1,
579            "duplicate topic must not be added a second time"
580        );
581    }
582
583    #[test]
584    fn test_remove_topics() {
585        let sub = make_sub();
586        let topic = string_to_topic("removable");
587
588        sub.add_topics(&[topic]);
589        sub.remove_topics(&[topic]);
590
591        let topics = sub.topics.read().unwrap_or_else(|e| e.into_inner());
592        assert!(topics.is_empty(), "topic must be removed");
593    }
594
595    #[test]
596    fn test_deliver_with_malformed_bytes_logs_warning() {
597        let sub = make_sub();
598        let (handler, received) = RecordingHandler::new();
599        sub.add_handler(Arc::new(handler));
600
601        // Garbage bytes — decode_statement must fail gracefully.
602        sub.deliver(&[0xff, 0x00, 0xde, 0xad]);
603
604        assert!(
605            received
606                .lock()
607                .unwrap_or_else(|e| e.into_inner())
608                .is_empty(),
609            "malformed statement must not reach handlers"
610        );
611    }
612
613    #[test]
614    fn test_deliver_with_no_handlers() {
615        let sub = make_sub();
616        // No handlers registered — must not panic.
617        let raw = make_statement_bytes(b"no-handlers", 0);
618        sub.deliver(&raw);
619    }
620}