Skip to main content

a3s_event/
store.rs

1//! High-level event bus built on pluggable providers
2//!
3//! `EventBus` provides a convenient API for event publishing, querying,
4//! and subscription management on top of any `EventProvider` implementation.
5
6#[cfg(feature = "routing")]
7use crate::broker::Broker;
8#[cfg(feature = "encryption")]
9use crate::crypto::EventEncryptor;
10use crate::error::{EventError, Result};
11use crate::metrics::EventMetrics;
12use crate::provider::{EventProvider, ProviderInfo, Subscription};
13use crate::schema::SchemaRegistry;
14use crate::state::StateStore;
15use crate::types::{Event, EventCounts, PublishOptions, SubscriptionFilter};
16use crate::dlq::DlqHandler;
17use std::collections::HashMap;
18use std::sync::Arc;
19use std::time::Instant;
20use tokio::sync::RwLock;
21
22/// High-level event bus backed by a pluggable provider
23///
24/// Wraps any `EventProvider` with subscription tracking and convenience
25/// methods. Thread-safe via internal locks.
26///
27/// Optionally validates events against a `SchemaRegistry` before publishing.
28/// Optionally encrypts event payloads via an `EventEncryptor`.
29/// Optionally persists subscription state via a `StateStore`.
30/// Optionally routes failed events to a `DlqHandler`.
31pub struct EventBus {
32    provider: Arc<dyn EventProvider>,
33
34    /// Tracked subscriptions (subscriber_id → filter)
35    subscriptions: Arc<RwLock<HashMap<String, SubscriptionFilter>>>,
36
37    /// Optional schema registry for publish-time validation
38    schema_registry: Option<Arc<dyn SchemaRegistry>>,
39
40    /// Optional dead letter queue handler
41    dlq_handler: Option<Arc<dyn DlqHandler>>,
42
43    /// Optional payload encryptor
44    #[cfg(feature = "encryption")]
45    encryptor: Option<Arc<dyn EventEncryptor>>,
46
47    /// Optional state store for subscription persistence
48    state_store: Option<Arc<dyn StateStore>>,
49
50    /// Optional event broker for trigger-based routing
51    #[cfg(feature = "routing")]
52    broker: Option<Arc<Broker>>,
53
54    /// Observability metrics
55    metrics: Arc<EventMetrics>,
56}
57
58impl EventBus {
59    /// Create a new event bus from a provider
60    pub fn new(provider: impl EventProvider + 'static) -> Self {
61        Self {
62            provider: Arc::new(provider),
63            subscriptions: Arc::new(RwLock::new(HashMap::new())),
64            schema_registry: None,
65            dlq_handler: None,
66            #[cfg(feature = "encryption")]
67            encryptor: None,
68            state_store: None,
69            #[cfg(feature = "routing")]
70            broker: None,
71            metrics: Arc::new(EventMetrics::new()),
72        }
73    }
74
75    /// Create a new event bus with schema validation
76    pub fn with_schema_registry(
77        provider: impl EventProvider + 'static,
78        registry: Arc<dyn SchemaRegistry>,
79    ) -> Self {
80        Self {
81            provider: Arc::new(provider),
82            subscriptions: Arc::new(RwLock::new(HashMap::new())),
83            schema_registry: Some(registry),
84            dlq_handler: None,
85            #[cfg(feature = "encryption")]
86            encryptor: None,
87            state_store: None,
88            #[cfg(feature = "routing")]
89            broker: None,
90            metrics: Arc::new(EventMetrics::new()),
91        }
92    }
93
94    /// Set the dead letter queue handler
95    pub fn set_dlq_handler(&mut self, handler: Arc<dyn DlqHandler>) {
96        self.dlq_handler = Some(handler);
97    }
98
99    /// Set the payload encryptor
100    #[cfg(feature = "encryption")]
101    pub fn set_encryptor(&mut self, encryptor: Arc<dyn EventEncryptor>) {
102        self.encryptor = Some(encryptor);
103    }
104
105    /// Set the state store and load persisted subscriptions
106    ///
107    /// Any previously persisted subscriptions are loaded immediately.
108    pub fn set_state_store(&mut self, store: Arc<dyn StateStore>) -> Result<()> {
109        let loaded = store.load()?;
110        if !loaded.is_empty() {
111            tracing::info!(count = loaded.len(), "Restored subscriptions from state store");
112            // Use try_write to avoid async — this is called during setup
113            let mut subs = self.subscriptions.try_write().map_err(|_| {
114                EventError::Config("Failed to acquire subscription lock during state restore".to_string())
115            })?;
116            *subs = loaded;
117        }
118        self.state_store = Some(store);
119        Ok(())
120    }
121
122    /// Get the state store (if configured)
123    pub fn state_store(&self) -> Option<&dyn StateStore> {
124        self.state_store.as_deref()
125    }
126
127    /// Get the metrics handle
128    ///
129    /// Use `metrics().snapshot()` for a point-in-time view of all counters.
130    pub fn metrics(&self) -> &EventMetrics {
131        &self.metrics
132    }
133
134    /// Get the encryptor (if configured)
135    #[cfg(feature = "encryption")]
136    pub fn encryptor(&self) -> Option<&dyn EventEncryptor> {
137        self.encryptor.as_deref()
138    }
139
140    /// Get the DLQ handler (if configured)
141    pub fn dlq_handler(&self) -> Option<&dyn DlqHandler> {
142        self.dlq_handler.as_deref()
143    }
144
145    /// Get the schema registry (if configured)
146    pub fn schema_registry(&self) -> Option<&dyn SchemaRegistry> {
147        self.schema_registry.as_deref()
148    }
149
150    /// Get the provider name
151    pub fn provider_name(&self) -> &str {
152        self.provider.name()
153    }
154
155    /// Set the event broker for trigger-based routing
156    ///
157    /// When a broker is configured, all published events are automatically
158    /// routed through the broker after being published to the provider.
159    #[cfg(feature = "routing")]
160    pub fn set_broker(&mut self, broker: Arc<Broker>) {
161        self.broker = Some(broker);
162    }
163
164    /// Get the broker (if configured)
165    #[cfg(feature = "routing")]
166    pub fn broker(&self) -> Option<&Broker> {
167        self.broker.as_deref()
168    }
169
170    /// Get a shared reference to the underlying provider
171    ///
172    /// Useful for creating `TopicSink` instances that share the provider.
173    pub fn provider_arc(&self) -> Arc<dyn EventProvider> {
174        self.provider.clone()
175    }
176
177    /// Publish an event with convenience parameters
178    pub async fn publish(
179        &self,
180        category: &str,
181        topic: &str,
182        summary: &str,
183        source: &str,
184        payload: serde_json::Value,
185    ) -> Result<Event> {
186        let subject = self.provider.build_subject(category, topic);
187        #[cfg(feature = "encryption")]
188        let mut event = Event::new(subject, category, summary, source, payload);
189        #[cfg(not(feature = "encryption"))]
190        let event = Event::new(subject, category, summary, source, payload);
191
192        if let Err(e) = self.validate_if_configured(&event) {
193            self.metrics.record_validation_error();
194            return Err(e);
195        }
196
197        #[cfg(feature = "encryption")]
198        if self.encryptor.is_some() {
199            self.encrypt_if_configured(&mut event)?;
200            self.metrics.record_encrypt();
201        }
202
203        let span = tracing::info_span!(
204            "event.publish",
205            event_id = %event.id,
206            subject = %event.subject,
207            category = category,
208            provider = self.provider.name(),
209        );
210        let _guard = span.enter();
211        drop(_guard);
212
213        let start = Instant::now();
214        match self.provider.publish(&event).await {
215            Ok(_) => {
216                self.metrics.record_publish(start);
217                #[cfg(feature = "routing")]
218                self.maybe_route_through_broker(&event).await;
219                Ok(event)
220            }
221            Err(e) => {
222                self.metrics.record_publish_error();
223                Err(e)
224            }
225        }
226    }
227
228    /// Publish a pre-built event
229    pub async fn publish_event(&self, event: &Event) -> Result<u64> {
230        if let Err(e) = self.validate_if_configured(event) {
231            self.metrics.record_validation_error();
232            return Err(e);
233        }
234
235        #[cfg(feature = "encryption")]
236        let event = {
237            let e = self.maybe_encrypt_clone(event)?;
238            if self.encryptor.is_some() {
239                self.metrics.record_encrypt();
240            }
241            e
242        };
243        #[cfg(not(feature = "encryption"))]
244        let event = event.clone();
245
246        let span = tracing::info_span!(
247            "event.publish",
248            event_id = %event.id,
249            subject = %event.subject,
250            category = %event.category,
251            provider = self.provider.name(),
252        );
253        let _guard = span.enter();
254        drop(_guard);
255
256        let start = Instant::now();
257        match self.provider.publish(&event).await {
258            Ok(seq) => {
259                self.metrics.record_publish(start);
260                #[cfg(feature = "routing")]
261                self.maybe_route_through_broker(&event).await;
262                Ok(seq)
263            }
264            Err(e) => {
265                self.metrics.record_publish_error();
266                Err(e)
267            }
268        }
269    }
270
271    /// Publish a pre-built event with provider-specific options
272    pub async fn publish_event_with_options(
273        &self,
274        event: &Event,
275        opts: &PublishOptions,
276    ) -> Result<u64> {
277        if let Err(e) = self.validate_if_configured(event) {
278            self.metrics.record_validation_error();
279            return Err(e);
280        }
281
282        #[cfg(feature = "encryption")]
283        let event = {
284            let e = self.maybe_encrypt_clone(event)?;
285            if self.encryptor.is_some() {
286                self.metrics.record_encrypt();
287            }
288            e
289        };
290        #[cfg(not(feature = "encryption"))]
291        let event = event.clone();
292
293        let span = tracing::info_span!(
294            "event.publish",
295            event_id = %event.id,
296            subject = %event.subject,
297            category = %event.category,
298            provider = self.provider.name(),
299            msg_id = ?opts.msg_id,
300        );
301        let _guard = span.enter();
302        drop(_guard);
303
304        let start = Instant::now();
305        match self.provider.publish_with_options(&event, opts).await {
306            Ok(seq) => {
307                self.metrics.record_publish(start);
308                #[cfg(feature = "routing")]
309                self.maybe_route_through_broker(&event).await;
310                Ok(seq)
311            }
312            Err(e) => {
313                self.metrics.record_publish_error();
314                Err(e)
315            }
316        }
317    }
318
319    /// Fetch recent events, optionally filtered by category
320    ///
321    /// If an encryptor is configured, encrypted payloads are decrypted automatically.
322    pub async fn list_events(
323        &self,
324        category: Option<&str>,
325        limit: usize,
326    ) -> Result<Vec<Event>> {
327        let filter = category.map(|c| self.provider.category_subject(c));
328        #[cfg(feature = "encryption")]
329        let mut events = self.provider
330            .history(filter.as_deref(), limit)
331            .await?;
332        #[cfg(not(feature = "encryption"))]
333        let events = self.provider
334            .history(filter.as_deref(), limit)
335            .await?;
336        #[cfg(feature = "encryption")]
337        {
338            let decrypted = self.decrypt_events(&mut events);
339            if decrypted > 0 {
340                for _ in 0..decrypted {
341                    self.metrics.record_decrypt();
342                }
343            }
344        }
345        Ok(events)
346    }
347
348    /// Get event counts by category
349    pub async fn counts(&self, limit: usize) -> Result<EventCounts> {
350        let events = self.provider.history(None, limit).await?;
351        let mut counts = EventCounts::default();
352
353        for event in &events {
354            *counts.categories.entry(event.category.clone()).or_insert(0) += 1;
355            counts.total += 1;
356        }
357
358        Ok(counts)
359    }
360
361    /// Register or update a subscription
362    ///
363    /// Auto-saves to state store if configured.
364    pub async fn update_subscription(&self, filter: SubscriptionFilter) -> Result<()> {
365        let subscriber_id = filter.subscriber_id.clone();
366
367        {
368            let mut subs = self.subscriptions.write().await;
369            subs.insert(subscriber_id.clone(), filter.clone());
370            self.persist_state(&subs);
371        }
372
373        self.metrics.record_subscribe();
374
375        tracing::info!(
376            subscriber = %subscriber_id,
377            subjects = ?filter.subjects,
378            durable = filter.durable,
379            "Subscription updated"
380        );
381
382        Ok(())
383    }
384
385    /// Create subscribers for a registered subscription
386    pub async fn create_subscriber(
387        &self,
388        subscriber_id: &str,
389    ) -> Result<Vec<Box<dyn Subscription>>> {
390        let subs = self.subscriptions.read().await;
391        let filter = subs.get(subscriber_id).ok_or_else(|| {
392            EventError::NotFound(format!("Subscription not found: {}", subscriber_id))
393        })?;
394
395        let span = tracing::info_span!(
396            "event.subscribe",
397            subscriber = subscriber_id,
398            subjects = ?filter.subjects,
399            durable = filter.durable,
400            provider = self.provider.name(),
401        );
402        let _guard = span.enter();
403        drop(_guard);
404
405        let mut subscribers = Vec::new();
406        for subject in &filter.subjects {
407            let consumer_name = format!("{}-{}", subscriber_id, subject.replace('.', "-"));
408            let sub = match (&filter.options, filter.durable) {
409                (Some(opts), true) => {
410                    self.provider
411                        .subscribe_durable_with_options(&consumer_name, subject, opts)
412                        .await?
413                }
414                (Some(opts), false) => {
415                    self.provider
416                        .subscribe_with_options(subject, opts)
417                        .await?
418                }
419                (None, true) => {
420                    self.provider
421                        .subscribe_durable(&consumer_name, subject)
422                        .await?
423                }
424                (None, false) => {
425                    self.provider.subscribe(subject).await?
426                }
427            };
428            subscribers.push(sub);
429        }
430
431        Ok(subscribers)
432    }
433
434    /// Remove a subscription
435    ///
436    /// Auto-saves to state store if configured.
437    pub async fn remove_subscription(&self, subscriber_id: &str) -> Result<()> {
438        let filter = {
439            let mut subs = self.subscriptions.write().await;
440            let removed = subs.remove(subscriber_id);
441            self.persist_state(&subs);
442            removed
443        };
444
445        if let Some(filter) = filter {
446            self.metrics.record_unsubscribe();
447            for subject in &filter.subjects {
448                let consumer_name = format!("{}-{}", subscriber_id, subject.replace('.', "-"));
449                if let Err(e) = self.provider.unsubscribe(&consumer_name).await {
450                    tracing::warn!(
451                        consumer = %consumer_name,
452                        error = %e,
453                        "Failed to delete consumer during unsubscribe"
454                    );
455                }
456            }
457        }
458
459        Ok(())
460    }
461
462    /// Get all registered subscriptions
463    pub async fn list_subscriptions(&self) -> Vec<SubscriptionFilter> {
464        let subs = self.subscriptions.read().await;
465        subs.values().cloned().collect()
466    }
467
468    /// Get a specific subscription
469    pub async fn get_subscription(&self, subscriber_id: &str) -> Option<SubscriptionFilter> {
470        let subs = self.subscriptions.read().await;
471        subs.get(subscriber_id).cloned()
472    }
473
474    /// Get provider info
475    pub async fn info(&self) -> Result<ProviderInfo> {
476        self.provider.info().await
477    }
478
479    /// Get a reference to the underlying provider
480    pub fn provider(&self) -> &dyn EventProvider {
481        self.provider.as_ref()
482    }
483
484    /// Health check — returns true if the provider is connected and operational
485    pub async fn health(&self) -> Result<bool> {
486        self.provider.health().await
487    }
488
489    /// Validate event against schema registry (if configured)
490    fn validate_if_configured(&self, event: &Event) -> Result<()> {
491        if let Some(ref registry) = self.schema_registry {
492            registry.validate(event)?;
493        }
494        Ok(())
495    }
496
497    /// Encrypt event payload in-place (if encryptor configured)
498    #[cfg(feature = "encryption")]
499    fn encrypt_if_configured(&self, event: &mut Event) -> Result<()> {
500        if let Some(ref encryptor) = self.encryptor {
501            event.payload = encryptor.encrypt(&event.payload)?;
502        }
503        Ok(())
504    }
505
506    /// Clone event and encrypt payload if encryptor is configured
507    #[cfg(feature = "encryption")]
508    fn maybe_encrypt_clone(&self, event: &Event) -> Result<Event> {
509        match self.encryptor {
510            Some(ref encryptor) => {
511                let mut cloned = event.clone();
512                cloned.payload = encryptor.encrypt(&cloned.payload)?;
513                Ok(cloned)
514            }
515            None => Ok(event.clone()),
516        }
517    }
518
519    /// Decrypt event payloads in-place (best-effort, skips failures)
520    /// Returns the number of payloads decrypted.
521    #[cfg(feature = "encryption")]
522    fn decrypt_events(&self, events: &mut [Event]) -> usize {
523        let mut count = 0;
524        if let Some(ref encryptor) = self.encryptor {
525            for event in events.iter_mut() {
526                if crate::crypto::EncryptedPayload::is_encrypted(&event.payload) {
527                    if let Ok(decrypted) = encryptor.decrypt(&event.payload) {
528                        event.payload = decrypted;
529                        count += 1;
530                    }
531                }
532            }
533        }
534        count
535    }
536
537    /// Route event through broker if configured (fire-and-forget)
538    #[cfg(feature = "routing")]
539    async fn maybe_route_through_broker(&self, event: &Event) {
540        if let Some(ref broker) = self.broker {
541            let result = broker.route(event).await;
542            if result.failed > 0 {
543                tracing::warn!(
544                    event_id = %event.id,
545                    matched = result.matched,
546                    delivered = result.delivered,
547                    failed = result.failed,
548                    "Broker routing had failures"
549                );
550            }
551        }
552    }
553
554    /// Persist subscription state (best-effort, logs on failure)
555    fn persist_state(&self, subs: &HashMap<String, SubscriptionFilter>) {
556        if let Some(ref store) = self.state_store {
557            if let Err(e) = store.save(subs) {
558                tracing::warn!(error = %e, "Failed to persist subscription state");
559            }
560        }
561    }
562}
563
564#[cfg(test)]
565mod tests {
566    use super::*;
567    use crate::dlq::{DeadLetterEvent, MemoryDlqHandler};
568    use crate::provider::memory::MemoryProvider;
569    use crate::schema::{EventSchema, MemorySchemaRegistry};
570    use crate::types::Event;
571
572    fn test_bus() -> EventBus {
573        EventBus::new(MemoryProvider::default())
574    }
575
576    #[tokio::test]
577    async fn test_publish_and_list() {
578        let bus = test_bus();
579        let event = bus
580            .publish("market", "forex", "Rate change", "reuters", serde_json::json!({"rate": 7.35}))
581            .await
582            .unwrap();
583
584        assert!(event.id.starts_with("evt-"));
585        assert_eq!(event.subject, "events.market.forex");
586        assert_eq!(event.category, "market");
587
588        let events = bus.list_events(Some("market"), 10).await.unwrap();
589        assert_eq!(events.len(), 1);
590        assert_eq!(events[0].id, event.id);
591    }
592
593    #[tokio::test]
594    async fn test_publish_event_prebuilt() {
595        let bus = test_bus();
596        let event = Event::new("events.test.a", "test", "Test", "test", serde_json::json!({}));
597        let seq = bus.publish_event(&event).await.unwrap();
598        assert!(seq > 0);
599
600        let events = bus.list_events(None, 10).await.unwrap();
601        assert_eq!(events.len(), 1);
602    }
603
604    #[tokio::test]
605    async fn test_list_events_by_category() {
606        let bus = test_bus();
607        bus.publish("market", "forex", "A", "test", serde_json::json!({})).await.unwrap();
608        bus.publish("system", "deploy", "B", "test", serde_json::json!({})).await.unwrap();
609        bus.publish("market", "crypto", "C", "test", serde_json::json!({})).await.unwrap();
610
611        let market = bus.list_events(Some("market"), 10).await.unwrap();
612        assert_eq!(market.len(), 2);
613
614        let system = bus.list_events(Some("system"), 10).await.unwrap();
615        assert_eq!(system.len(), 1);
616
617        let all = bus.list_events(None, 10).await.unwrap();
618        assert_eq!(all.len(), 3);
619    }
620
621    #[tokio::test]
622    async fn test_counts() {
623        let bus = test_bus();
624        bus.publish("market", "forex", "A", "test", serde_json::json!({})).await.unwrap();
625        bus.publish("market", "crypto", "B", "test", serde_json::json!({})).await.unwrap();
626        bus.publish("system", "deploy", "C", "test", serde_json::json!({})).await.unwrap();
627
628        let counts = bus.counts(100).await.unwrap();
629        assert_eq!(counts.total, 3);
630        assert_eq!(counts.categories["market"], 2);
631        assert_eq!(counts.categories["system"], 1);
632    }
633
634    #[tokio::test]
635    async fn test_subscription_lifecycle() {
636        let bus = test_bus();
637
638        let filter = SubscriptionFilter {
639            subscriber_id: "analyst".to_string(),
640            subjects: vec!["events.market.>".to_string()],
641            durable: false,
642            options: None,
643        };
644
645        bus.update_subscription(filter).await.unwrap();
646
647        let sub = bus.get_subscription("analyst").await;
648        assert!(sub.is_some());
649        assert_eq!(sub.unwrap().subjects, vec!["events.market.>"]);
650
651        let subs = bus.list_subscriptions().await;
652        assert_eq!(subs.len(), 1);
653
654        bus.remove_subscription("analyst").await.unwrap();
655        assert!(bus.get_subscription("analyst").await.is_none());
656        assert!(bus.list_subscriptions().await.is_empty());
657    }
658
659    #[tokio::test]
660    async fn test_create_subscriber_not_found() {
661        let bus = test_bus();
662        let result = bus.create_subscriber("nonexistent").await;
663        assert!(matches!(result, Err(EventError::NotFound(_))));
664    }
665
666    #[tokio::test]
667    async fn test_provider_name() {
668        let bus = test_bus();
669        assert_eq!(bus.provider_name(), "memory");
670    }
671
672    #[tokio::test]
673    async fn test_info() {
674        let bus = test_bus();
675        bus.publish("test", "a", "A", "test", serde_json::json!({})).await.unwrap();
676
677        let info = bus.info().await.unwrap();
678        assert_eq!(info.provider, "memory");
679        assert_eq!(info.messages, 1);
680    }
681
682    #[tokio::test]
683    async fn test_health() {
684        let bus = test_bus();
685        assert!(bus.health().await.unwrap());
686    }
687
688    #[tokio::test]
689    async fn test_schema_validation_on_publish() {
690        let registry = Arc::new(MemorySchemaRegistry::new());
691        registry
692            .register(EventSchema {
693                event_type: "forex.rate".to_string(),
694                version: 1,
695                required_fields: vec!["rate".to_string()],
696                description: String::new(),
697            })
698            .unwrap();
699
700        let bus = EventBus::with_schema_registry(MemoryProvider::default(), registry);
701
702        // Valid typed event
703        let event = Event::typed(
704            "events.market.forex",
705            "market",
706            "forex.rate",
707            1,
708            "Rate",
709            "test",
710            serde_json::json!({"rate": 7.35}),
711        );
712        assert!(bus.publish_event(&event).await.is_ok());
713
714        // Invalid typed event (missing required field)
715        let bad_event = Event::typed(
716            "events.market.forex",
717            "market",
718            "forex.rate",
719            1,
720            "Rate",
721            "test",
722            serde_json::json!({"currency": "USD"}),
723        );
724        let err = bus.publish_event(&bad_event).await.unwrap_err();
725        assert!(matches!(err, EventError::SchemaValidation { .. }));
726    }
727
728    #[tokio::test]
729    async fn test_untyped_event_skips_validation() {
730        let registry = Arc::new(MemorySchemaRegistry::new());
731        registry
732            .register(EventSchema {
733                event_type: "forex.rate".to_string(),
734                version: 1,
735                required_fields: vec!["rate".to_string()],
736                description: String::new(),
737            })
738            .unwrap();
739
740        let bus = EventBus::with_schema_registry(MemoryProvider::default(), registry);
741
742        // Untyped event should pass even without required fields
743        let event = bus
744            .publish("market", "forex", "Rate", "test", serde_json::json!({}))
745            .await;
746        assert!(event.is_ok());
747    }
748
749    #[tokio::test]
750    async fn test_dlq_handler_integration() {
751        let dlq = Arc::new(MemoryDlqHandler::default());
752        let mut bus = test_bus();
753        bus.set_dlq_handler(dlq.clone());
754
755        assert!(bus.dlq_handler().is_some());
756
757        // Manually route an event to DLQ
758        let received = crate::types::ReceivedEvent {
759            event: Event::new("events.test.a", "test", "Test", "test", serde_json::json!({})),
760            sequence: 1,
761            num_delivered: 5,
762            stream: "memory".to_string(),
763        };
764        let dle = DeadLetterEvent::new(received, "Max retries exceeded");
765        dlq.handle(dle).await.unwrap();
766
767        assert_eq!(dlq.count().await.unwrap(), 1);
768    }
769
770    #[tokio::test]
771    async fn test_publish_with_options() {
772        let bus = test_bus();
773        let event = Event::new("events.test.a", "test", "Test", "test", serde_json::json!({}));
774        let opts = PublishOptions {
775            msg_id: Some("dedup-1".to_string()),
776            ..Default::default()
777        };
778
779        // MemoryProvider ignores options but should still succeed
780        let seq = bus.publish_event_with_options(&event, &opts).await.unwrap();
781        assert!(seq > 0);
782    }
783
784    #[tokio::test]
785    async fn test_concurrent_publish() {
786        let bus = Arc::new(test_bus());
787        let mut handles = Vec::new();
788
789        for i in 0..50 {
790            let bus = bus.clone();
791            handles.push(tokio::spawn(async move {
792                bus.publish(
793                    "test",
794                    &format!("topic.{}", i),
795                    &format!("Event {}", i),
796                    "test",
797                    serde_json::json!({"index": i}),
798                )
799                .await
800                .unwrap()
801            }));
802        }
803
804        for handle in handles {
805            handle.await.unwrap();
806        }
807
808        let events = bus.list_events(None, 100).await.unwrap();
809        assert_eq!(events.len(), 50);
810    }
811
812    #[tokio::test]
813    async fn test_remove_nonexistent_subscription() {
814        let bus = test_bus();
815        // Should not error — just a no-op
816        assert!(bus.remove_subscription("nonexistent").await.is_ok());
817    }
818
819    #[tokio::test]
820    async fn test_update_subscription_overwrites() {
821        let bus = test_bus();
822
823        let filter1 = SubscriptionFilter {
824            subscriber_id: "analyst".to_string(),
825            subjects: vec!["events.market.>".to_string()],
826            durable: false,
827            options: None,
828        };
829        bus.update_subscription(filter1).await.unwrap();
830
831        let filter2 = SubscriptionFilter {
832            subscriber_id: "analyst".to_string(),
833            subjects: vec!["events.system.>".to_string()],
834            durable: true,
835            options: None,
836        };
837        bus.update_subscription(filter2).await.unwrap();
838
839        let sub = bus.get_subscription("analyst").await.unwrap();
840        assert_eq!(sub.subjects, vec!["events.system.>"]);
841        assert!(sub.durable);
842        assert_eq!(bus.list_subscriptions().await.len(), 1);
843    }
844
845    #[cfg(feature = "encryption")]
846    #[tokio::test]
847    async fn test_encrypted_publish_and_list() {
848        let enc = Arc::new(crate::crypto::Aes256GcmEncryptor::new("k1", &[0x42; 32]));
849        let mut bus = test_bus();
850        bus.set_encryptor(enc.clone());
851
852        let event = bus
853            .publish("market", "forex", "Rate", "test", serde_json::json!({"rate": 7.35}))
854            .await
855            .unwrap();
856
857        // The stored payload should be encrypted
858        assert!(crate::crypto::EncryptedPayload::is_encrypted(&event.payload));
859
860        // list_events should auto-decrypt
861        let events = bus.list_events(Some("market"), 10).await.unwrap();
862        assert_eq!(events.len(), 1);
863        assert_eq!(events[0].payload, serde_json::json!({"rate": 7.35}));
864    }
865
866    #[cfg(feature = "encryption")]
867    #[tokio::test]
868    async fn test_encrypted_publish_event_prebuilt() {
869        let enc = Arc::new(crate::crypto::Aes256GcmEncryptor::new("k1", &[0x42; 32]));
870        let mut bus = test_bus();
871        bus.set_encryptor(enc);
872
873        let event = Event::new("events.test.a", "test", "Test", "test", serde_json::json!({"secret": "data"}));
874        let seq = bus.publish_event(&event).await.unwrap();
875        assert!(seq > 0);
876
877        // Original event should NOT be mutated
878        assert_eq!(event.payload, serde_json::json!({"secret": "data"}));
879
880        // list_events should decrypt
881        let events = bus.list_events(None, 10).await.unwrap();
882        assert_eq!(events[0].payload, serde_json::json!({"secret": "data"}));
883    }
884
885    #[cfg(feature = "encryption")]
886    #[tokio::test]
887    async fn test_no_encryptor_passthrough() {
888        let bus = test_bus();
889        let event = bus
890            .publish("test", "a", "Test", "test", serde_json::json!({"plain": true}))
891            .await
892            .unwrap();
893
894        // Without encryptor, payload is plain
895        assert!(!crate::crypto::EncryptedPayload::is_encrypted(&event.payload));
896        assert_eq!(event.payload, serde_json::json!({"plain": true}));
897    }
898
899    #[cfg(feature = "encryption")]
900    #[tokio::test]
901    async fn test_encryptor_accessor() {
902        let enc = Arc::new(crate::crypto::Aes256GcmEncryptor::new("k1", &[0x42; 32]));
903        let mut bus = test_bus();
904        assert!(bus.encryptor().is_none());
905        bus.set_encryptor(enc);
906        assert!(bus.encryptor().is_some());
907        assert_eq!(bus.encryptor().unwrap().active_key_id(), "k1");
908    }
909
910    #[tokio::test]
911    async fn test_state_store_persists_subscriptions() {
912        let store = Arc::new(crate::state::MemoryStateStore::default());
913        let mut bus = test_bus();
914        bus.set_state_store(store.clone()).unwrap();
915
916        let filter = SubscriptionFilter {
917            subscriber_id: "analyst".to_string(),
918            subjects: vec!["events.market.>".to_string()],
919            durable: true,
920            options: None,
921        };
922        bus.update_subscription(filter).await.unwrap();
923
924        // Verify state was persisted
925        let loaded = store.load().unwrap();
926        assert_eq!(loaded.len(), 1);
927        assert!(loaded.contains_key("analyst"));
928    }
929
930    #[tokio::test]
931    async fn test_state_store_remove_persists() {
932        let store = Arc::new(crate::state::MemoryStateStore::default());
933        let mut bus = test_bus();
934        bus.set_state_store(store.clone()).unwrap();
935
936        let filter = SubscriptionFilter {
937            subscriber_id: "analyst".to_string(),
938            subjects: vec!["events.market.>".to_string()],
939            durable: false,
940            options: None,
941        };
942        bus.update_subscription(filter).await.unwrap();
943        bus.remove_subscription("analyst").await.unwrap();
944
945        let loaded = store.load().unwrap();
946        assert!(loaded.is_empty());
947    }
948
949    #[tokio::test]
950    async fn test_state_store_restores_on_set() {
951        let store = Arc::new(crate::state::MemoryStateStore::default());
952
953        // Pre-populate the store
954        let mut initial = std::collections::HashMap::new();
955        initial.insert(
956            "monitor".to_string(),
957            SubscriptionFilter {
958                subscriber_id: "monitor".to_string(),
959                subjects: vec!["events.system.>".to_string()],
960                durable: true,
961                options: None,
962            },
963        );
964        store.save(&initial).unwrap();
965
966        // Create bus and set store — should restore
967        let mut bus = test_bus();
968        bus.set_state_store(store).unwrap();
969
970        let sub = bus.get_subscription("monitor").await;
971        assert!(sub.is_some());
972        assert_eq!(sub.unwrap().subjects, vec!["events.system.>"]);
973    }
974
975    #[tokio::test]
976    async fn test_state_store_accessor() {
977        let mut bus = test_bus();
978        assert!(bus.state_store().is_none());
979
980        let store = Arc::new(crate::state::MemoryStateStore::default());
981        bus.set_state_store(store).unwrap();
982        assert!(bus.state_store().is_some());
983    }
984
985    #[tokio::test]
986    async fn test_file_state_store_lifecycle() {
987        let dir = std::env::temp_dir().join(format!("a3s-event-bus-{}", uuid::Uuid::new_v4()));
988        let path = dir.join("bus-state.json");
989        let store = Arc::new(crate::state::FileStateStore::new(&path));
990
991        // Bus 1: add subscriptions
992        {
993            let mut bus = test_bus();
994            bus.set_state_store(store.clone()).unwrap();
995
996            bus.update_subscription(SubscriptionFilter {
997                subscriber_id: "a".to_string(),
998                subjects: vec!["events.market.>".to_string()],
999                durable: true,
1000                options: None,
1001            })
1002            .await
1003            .unwrap();
1004
1005            bus.update_subscription(SubscriptionFilter {
1006                subscriber_id: "b".to_string(),
1007                subjects: vec!["events.system.>".to_string()],
1008                durable: false,
1009                options: None,
1010            })
1011            .await
1012            .unwrap();
1013        }
1014
1015        // Bus 2: restore from same file
1016        {
1017            let mut bus = test_bus();
1018            bus.set_state_store(store).unwrap();
1019
1020            assert_eq!(bus.list_subscriptions().await.len(), 2);
1021            assert!(bus.get_subscription("a").await.is_some());
1022            assert!(bus.get_subscription("b").await.is_some());
1023        }
1024
1025        std::fs::remove_dir_all(&dir).unwrap();
1026    }
1027
1028    #[tokio::test]
1029    async fn test_metrics_publish_count() {
1030        let bus = test_bus();
1031        bus.publish("test", "a", "A", "test", serde_json::json!({})).await.unwrap();
1032        bus.publish("test", "b", "B", "test", serde_json::json!({})).await.unwrap();
1033
1034        let s = bus.metrics().snapshot();
1035        assert_eq!(s.publish_count, 2);
1036        assert_eq!(s.publish_errors, 0);
1037        assert!(s.avg_publish_latency_us < 1_000_000); // sanity check
1038    }
1039
1040    #[tokio::test]
1041    async fn test_metrics_subscribe_unsubscribe() {
1042        let bus = test_bus();
1043        let filter = SubscriptionFilter {
1044            subscriber_id: "m".to_string(),
1045            subjects: vec!["events.>".to_string()],
1046            durable: false,
1047            options: None,
1048        };
1049        bus.update_subscription(filter).await.unwrap();
1050        bus.remove_subscription("m").await.unwrap();
1051
1052        let s = bus.metrics().snapshot();
1053        assert_eq!(s.subscribe_count, 1);
1054        assert_eq!(s.unsubscribe_count, 1);
1055    }
1056
1057    #[tokio::test]
1058    async fn test_metrics_validation_error() {
1059        let registry = Arc::new(MemorySchemaRegistry::new());
1060        registry
1061            .register(EventSchema {
1062                event_type: "strict.type".to_string(),
1063                version: 1,
1064                required_fields: vec!["required_field".to_string()],
1065                description: String::new(),
1066            })
1067            .unwrap();
1068
1069        let bus = EventBus::with_schema_registry(MemoryProvider::default(), registry);
1070
1071        let bad_event = Event::typed(
1072            "events.test.a", "test", "strict.type", 1,
1073            "Bad", "test", serde_json::json!({}),
1074        );
1075        assert!(bus.publish_event(&bad_event).await.is_err());
1076
1077        let s = bus.metrics().snapshot();
1078        assert_eq!(s.validation_errors, 1);
1079        assert_eq!(s.publish_count, 0);
1080    }
1081
1082    #[cfg(feature = "encryption")]
1083    #[tokio::test]
1084    async fn test_metrics_encrypt_decrypt() {
1085        let enc = Arc::new(crate::crypto::Aes256GcmEncryptor::new("k1", &[0x42; 32]));
1086        let mut bus = test_bus();
1087        bus.set_encryptor(enc);
1088
1089        bus.publish("test", "a", "A", "test", serde_json::json!({"data": 1})).await.unwrap();
1090        bus.list_events(None, 10).await.unwrap();
1091
1092        let s = bus.metrics().snapshot();
1093        assert_eq!(s.encrypt_count, 1);
1094        assert_eq!(s.decrypt_count, 1);
1095    }
1096
1097    #[tokio::test]
1098    async fn test_metrics_snapshot_serializable() {
1099        let bus = test_bus();
1100        bus.publish("test", "a", "A", "test", serde_json::json!({})).await.unwrap();
1101
1102        let s = bus.metrics().snapshot();
1103        let json = serde_json::to_string(&s).unwrap();
1104        assert!(json.contains("publishCount"));
1105    }
1106
1107    #[tokio::test]
1108    async fn test_metrics_reset() {
1109        let bus = test_bus();
1110        bus.publish("test", "a", "A", "test", serde_json::json!({})).await.unwrap();
1111        assert_eq!(bus.metrics().snapshot().publish_count, 1);
1112
1113        bus.metrics().reset();
1114        assert_eq!(bus.metrics().snapshot().publish_count, 0);
1115    }
1116}