use std::collections::HashMap;
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc, Mutex, RwLock,
};
use host_encoding::statement_store::{blake2b_256, decode_statement, Statement, Topic};
pub type RawStatement = Vec<u8>;
pub trait StatementHandler: Send + Sync {
fn on_statement(&self, statement: &Statement, raw: &[u8]) -> Result<(), String>;
}
pub trait StatementTransport: Send + Sync {
fn subscribe(
&self,
topics: &[Topic],
on_statement: Arc<dyn Fn(RawStatement) + Send + Sync>,
on_disconnect: Arc<dyn Fn() + Send + Sync>,
) -> Box<dyn SubscriptionToken>;
}
pub trait SubscriptionToken: Send {}
#[derive(Debug, Clone)]
pub struct SubscriptionConfig {
pub dedup_cache_size: usize,
pub reconnect_delay_ms: u64,
}
impl Default for SubscriptionConfig {
fn default() -> Self {
Self {
dedup_cache_size: 8192,
reconnect_delay_ms: 3000,
}
}
}
struct DedupState {
cache: HashMap<[u8; 32], u32>,
order: Vec<[u8; 32]>,
}
pub struct StatementStoreSubscription<T: StatementTransport> {
#[cfg_attr(target_arch = "wasm32", allow(dead_code))]
transport: Arc<T>,
config: SubscriptionConfig,
topics: RwLock<Vec<Topic>>,
handlers: Mutex<Vec<Arc<dyn StatementHandler>>>,
dedup: Mutex<DedupState>,
running: Arc<AtomicBool>,
}
impl<T: StatementTransport> std::fmt::Debug for StatementStoreSubscription<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("StatementStoreSubscription")
.field("config", &self.config)
.field("running", &self.running.load(Ordering::Relaxed))
.finish()
}
}
impl<T: StatementTransport> StatementStoreSubscription<T> {
pub fn new(transport: T, config: SubscriptionConfig) -> Self {
Self {
transport: Arc::new(transport),
config,
topics: RwLock::new(Vec::new()),
handlers: Mutex::new(Vec::new()),
dedup: Mutex::new(DedupState {
cache: HashMap::new(),
order: Vec::new(),
}),
running: Arc::new(AtomicBool::new(false)),
}
}
pub fn add_handler(&self, handler: Arc<dyn StatementHandler>) {
self.handlers
.lock()
.unwrap_or_else(|e| e.into_inner())
.push(handler);
}
pub fn add_topics(&self, new_topics: &[Topic]) {
let mut topics = self.topics.write().unwrap_or_else(|e| e.into_inner());
for t in new_topics {
if !topics.contains(t) {
topics.push(*t);
}
}
}
pub fn is_running(&self) -> bool {
self.running.load(Ordering::Relaxed)
}
pub fn remove_topics(&self, to_remove: &[Topic]) {
let mut topics = self.topics.write().unwrap_or_else(|e| e.into_inner());
topics.retain(|t| !to_remove.contains(t));
}
#[cfg(target_arch = "wasm32")]
pub fn start(self: &Arc<Self>) {
self.running.store(true, Ordering::Relaxed);
}
pub fn stop(&self) {
self.running.store(false, Ordering::Relaxed);
}
pub fn deliver(&self, raw: &[u8]) {
let statement = match decode_statement(raw) {
Ok(s) => s,
Err(e) => {
log::warn!("[subscription] decode failed: {e}");
return;
}
};
let dedup_key = blake2b_256(&statement.data);
if !self.should_deliver(&dedup_key, statement.priority) {
return;
}
self.dispatch_to_handlers(&statement, raw);
}
fn should_deliver(&self, dedup_key: &[u8; 32], priority: u32) -> bool {
let mut state = self.dedup.lock().unwrap_or_else(|e| e.into_inner());
if let Some(&prev_priority) = state.cache.get(dedup_key) {
if priority <= prev_priority {
return false;
}
state.cache.insert(*dedup_key, priority);
return true;
}
state.cache.insert(*dedup_key, priority);
state.order.push(*dedup_key);
if state.order.len() > self.config.dedup_cache_size {
let drain_count = state.order.len() / 2;
let evicted: Vec<[u8; 32]> = state.order.drain(..drain_count).collect();
for key in &evicted {
state.cache.remove(key);
}
}
true
}
fn dispatch_to_handlers(&self, statement: &Statement, raw: &[u8]) {
let handlers = self.handlers.lock().unwrap_or_else(|e| e.into_inner());
for handler in handlers.iter() {
if let Err(e) = handler.on_statement(statement, raw) {
log::warn!("[subscription] handler error: {e}");
}
}
}
}
#[cfg(not(target_arch = "wasm32"))]
impl<T: StatementTransport + 'static> StatementStoreSubscription<T> {
pub fn start(self: &Arc<Self>) {
if self.running.swap(true, Ordering::Relaxed) {
return;
}
let sub = self.clone();
std::thread::spawn(move || reconnect_loop(sub));
}
}
#[cfg(not(target_arch = "wasm32"))]
fn reconnect_loop<T: StatementTransport + 'static>(sub: Arc<StatementStoreSubscription<T>>) {
use std::time::Duration;
while sub.running.load(Ordering::Relaxed) {
let topics = sub.topics.read().unwrap_or_else(|e| e.into_inner()).clone();
if topics.is_empty() {
std::thread::sleep(Duration::from_millis(sub.config.reconnect_delay_ms));
continue;
}
let sub_clone = sub.clone();
let on_statement = Arc::new(move |raw: RawStatement| {
sub_clone.deliver(&raw);
});
let disconnected = Arc::new(AtomicBool::new(false));
let disc_clone = disconnected.clone();
let on_disconnect = Arc::new(move || {
disc_clone.store(true, Ordering::Relaxed);
});
let _token = sub
.transport
.subscribe(&topics, on_statement, on_disconnect);
while sub.running.load(Ordering::Relaxed) && !disconnected.load(Ordering::Relaxed) {
std::thread::sleep(Duration::from_millis(200));
}
if sub.running.load(Ordering::Relaxed) {
log::info!(
"[subscription] disconnected, reconnecting in {}ms",
sub.config.reconnect_delay_ms
);
std::thread::sleep(Duration::from_millis(sub.config.reconnect_delay_ms));
}
}
log::info!("[subscription] reconnect loop stopped");
}
#[cfg(test)]
mod tests {
use super::*;
use host_encoding::statement_store::{
assemble_statement, build_signing_payload, string_to_topic,
};
use std::sync::atomic::AtomicUsize;
fn make_statement_bytes(data: &[u8], priority: u32) -> Vec<u8> {
let topic = string_to_topic("test-topic");
let pubkey = [0xabu8; 32];
let fake_sig = [0xcdu8; 64];
let (payload, num_fields) =
build_signing_payload(1_700_000_000, None, None, priority, &[topic], data).unwrap();
assemble_statement(&payload, num_fields, &pubkey, &fake_sig)
}
struct StubTransport {
subscribe_count: Arc<AtomicUsize>,
}
impl StubTransport {
fn new() -> (Self, Arc<AtomicUsize>) {
let count = Arc::new(AtomicUsize::new(0));
(
Self {
subscribe_count: count.clone(),
},
count,
)
}
}
struct StubToken;
impl SubscriptionToken for StubToken {}
impl StatementTransport for StubTransport {
fn subscribe(
&self,
_topics: &[Topic],
_on_statement: Arc<dyn Fn(RawStatement) + Send + Sync>,
_on_disconnect: Arc<dyn Fn() + Send + Sync>,
) -> Box<dyn SubscriptionToken> {
self.subscribe_count
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
Box::new(StubToken)
}
}
struct RecordingHandler {
received: Arc<Mutex<Vec<Vec<u8>>>>,
force_error: Option<String>,
}
impl RecordingHandler {
fn new() -> (Self, Arc<Mutex<Vec<Vec<u8>>>>) {
let received = Arc::new(Mutex::new(Vec::new()));
(
Self {
received: received.clone(),
force_error: None,
},
received,
)
}
fn with_error(err: &str) -> Self {
let (mut h, _) = Self::new();
h.force_error = Some(err.to_owned());
h
}
}
impl StatementHandler for RecordingHandler {
fn on_statement(&self, _statement: &Statement, raw: &[u8]) -> Result<(), String> {
self.received
.lock()
.unwrap_or_else(|e| e.into_inner())
.push(raw.to_vec());
if let Some(e) = &self.force_error {
return Err(e.clone());
}
Ok(())
}
}
fn make_sub() -> Arc<StatementStoreSubscription<StubTransport>> {
let (transport, _) = StubTransport::new();
Arc::new(StatementStoreSubscription::new(
transport,
SubscriptionConfig::default(),
))
}
#[test]
fn test_delivers_statement_to_handler() {
let sub = make_sub();
let (handler, received) = RecordingHandler::new();
sub.add_handler(Arc::new(handler));
let raw = make_statement_bytes(b"hello", 0);
sub.deliver(&raw);
let received = received.lock().unwrap_or_else(|e| e.into_inner());
assert_eq!(received.len(), 1);
assert_eq!(received[0], raw);
}
#[test]
fn test_dispatches_to_multiple_handlers_in_order() {
let sub = make_sub();
let (h1, r1) = RecordingHandler::new();
let (h2, r2) = RecordingHandler::new();
sub.add_handler(Arc::new(h1));
sub.add_handler(Arc::new(h2));
let raw = make_statement_bytes(b"multi", 0);
sub.deliver(&raw);
assert_eq!(
r1.lock().unwrap_or_else(|e| e.into_inner()).len(),
1,
"first handler must receive statement"
);
assert_eq!(
r2.lock().unwrap_or_else(|e| e.into_inner()).len(),
1,
"second handler must receive statement"
);
}
#[test]
fn test_handler_error_does_not_stop_other_handlers() {
let sub = make_sub();
let failing = RecordingHandler::with_error("intentional error");
let (passing, received) = RecordingHandler::new();
sub.add_handler(Arc::new(failing));
sub.add_handler(Arc::new(passing));
let raw = make_statement_bytes(b"error-test", 0);
sub.deliver(&raw);
assert_eq!(
received.lock().unwrap_or_else(|e| e.into_inner()).len(),
1,
"handler after a failing one must still be called"
);
}
#[test]
fn test_dedup_skips_same_priority() {
let sub = make_sub();
let (handler, received) = RecordingHandler::new();
sub.add_handler(Arc::new(handler));
let raw = make_statement_bytes(b"dedup-data", 5);
sub.deliver(&raw);
sub.deliver(&raw);
assert_eq!(
received.lock().unwrap_or_else(|e| e.into_inner()).len(),
1,
"identical statement must not be delivered twice"
);
}
#[test]
fn test_dedup_redelivers_higher_priority() {
let sub = make_sub();
let (handler, received) = RecordingHandler::new();
sub.add_handler(Arc::new(handler));
let data = b"priority-data";
let low = make_statement_bytes(data, 0);
let high = make_statement_bytes(data, 1);
sub.deliver(&low);
sub.deliver(&high);
assert_eq!(
received.lock().unwrap_or_else(|e| e.into_inner()).len(),
2,
"higher-priority statement with same data must be redelivered"
);
}
#[test]
fn test_dedup_evicts_when_cache_full() {
let (transport, _) = StubTransport::new();
let config = SubscriptionConfig {
dedup_cache_size: 4,
reconnect_delay_ms: 3000,
};
let sub = Arc::new(StatementStoreSubscription::new(transport, config));
let (handler, received) = RecordingHandler::new();
sub.add_handler(Arc::new(handler));
for i in 0u8..5 {
let raw = make_statement_bytes(&[i], 0);
sub.deliver(&raw);
}
let count = received.lock().unwrap_or_else(|e| e.into_inner()).len();
assert_eq!(count, 5, "all 5 unique statements must be delivered");
let cache_len = sub
.dedup
.lock()
.unwrap_or_else(|e| e.into_inner())
.cache
.len();
assert!(
cache_len <= 4,
"dedup cache must not exceed cache_size after eviction, got {cache_len}"
);
}
#[test]
fn test_add_topics_deduplicates() {
let sub = make_sub();
let topic = string_to_topic("my-topic");
sub.add_topics(&[topic]);
sub.add_topics(&[topic]);
let topics = sub.topics.read().unwrap_or_else(|e| e.into_inner());
assert_eq!(
topics.len(),
1,
"duplicate topic must not be added a second time"
);
}
#[test]
fn test_remove_topics() {
let sub = make_sub();
let topic = string_to_topic("removable");
sub.add_topics(&[topic]);
sub.remove_topics(&[topic]);
let topics = sub.topics.read().unwrap_or_else(|e| e.into_inner());
assert!(topics.is_empty(), "topic must be removed");
}
#[test]
fn test_deliver_with_malformed_bytes_logs_warning() {
let sub = make_sub();
let (handler, received) = RecordingHandler::new();
sub.add_handler(Arc::new(handler));
sub.deliver(&[0xff, 0x00, 0xde, 0xad]);
assert!(
received
.lock()
.unwrap_or_else(|e| e.into_inner())
.is_empty(),
"malformed statement must not reach handlers"
);
}
#[test]
fn test_deliver_with_no_handlers() {
let sub = make_sub();
let raw = make_statement_bytes(b"no-handlers", 0);
sub.deliver(&raw);
}
}