1#[cfg(feature = "routing")]
7use crate::broker::Broker;
8#[cfg(feature = "encryption")]
9use crate::crypto::EventEncryptor;
10use crate::error::{EventError, Result};
11use crate::metrics::EventMetrics;
12use crate::provider::{EventProvider, ProviderInfo, Subscription};
13use crate::schema::SchemaRegistry;
14use crate::state::StateStore;
15use crate::types::{Event, EventCounts, PublishOptions, SubscriptionFilter};
16use crate::dlq::DlqHandler;
17use std::collections::HashMap;
18use std::sync::Arc;
19use std::time::Instant;
20use tokio::sync::RwLock;
21
22pub struct EventBus {
32 provider: Arc<dyn EventProvider>,
33
34 subscriptions: Arc<RwLock<HashMap<String, SubscriptionFilter>>>,
36
37 schema_registry: Option<Arc<dyn SchemaRegistry>>,
39
40 dlq_handler: Option<Arc<dyn DlqHandler>>,
42
43 #[cfg(feature = "encryption")]
45 encryptor: Option<Arc<dyn EventEncryptor>>,
46
47 state_store: Option<Arc<dyn StateStore>>,
49
50 #[cfg(feature = "routing")]
52 broker: Option<Arc<Broker>>,
53
54 metrics: Arc<EventMetrics>,
56}
57
58impl EventBus {
59 pub fn new(provider: impl EventProvider + 'static) -> Self {
61 Self {
62 provider: Arc::new(provider),
63 subscriptions: Arc::new(RwLock::new(HashMap::new())),
64 schema_registry: None,
65 dlq_handler: None,
66 #[cfg(feature = "encryption")]
67 encryptor: None,
68 state_store: None,
69 #[cfg(feature = "routing")]
70 broker: None,
71 metrics: Arc::new(EventMetrics::new()),
72 }
73 }
74
75 pub fn with_schema_registry(
77 provider: impl EventProvider + 'static,
78 registry: Arc<dyn SchemaRegistry>,
79 ) -> Self {
80 Self {
81 provider: Arc::new(provider),
82 subscriptions: Arc::new(RwLock::new(HashMap::new())),
83 schema_registry: Some(registry),
84 dlq_handler: None,
85 #[cfg(feature = "encryption")]
86 encryptor: None,
87 state_store: None,
88 #[cfg(feature = "routing")]
89 broker: None,
90 metrics: Arc::new(EventMetrics::new()),
91 }
92 }
93
94 pub fn set_dlq_handler(&mut self, handler: Arc<dyn DlqHandler>) {
96 self.dlq_handler = Some(handler);
97 }
98
99 #[cfg(feature = "encryption")]
101 pub fn set_encryptor(&mut self, encryptor: Arc<dyn EventEncryptor>) {
102 self.encryptor = Some(encryptor);
103 }
104
105 pub fn set_state_store(&mut self, store: Arc<dyn StateStore>) -> Result<()> {
109 let loaded = store.load()?;
110 if !loaded.is_empty() {
111 tracing::info!(count = loaded.len(), "Restored subscriptions from state store");
112 let mut subs = self.subscriptions.try_write().map_err(|_| {
114 EventError::Config("Failed to acquire subscription lock during state restore".to_string())
115 })?;
116 *subs = loaded;
117 }
118 self.state_store = Some(store);
119 Ok(())
120 }
121
122 pub fn state_store(&self) -> Option<&dyn StateStore> {
124 self.state_store.as_deref()
125 }
126
127 pub fn metrics(&self) -> &EventMetrics {
131 &self.metrics
132 }
133
134 #[cfg(feature = "encryption")]
136 pub fn encryptor(&self) -> Option<&dyn EventEncryptor> {
137 self.encryptor.as_deref()
138 }
139
140 pub fn dlq_handler(&self) -> Option<&dyn DlqHandler> {
142 self.dlq_handler.as_deref()
143 }
144
145 pub fn schema_registry(&self) -> Option<&dyn SchemaRegistry> {
147 self.schema_registry.as_deref()
148 }
149
150 pub fn provider_name(&self) -> &str {
152 self.provider.name()
153 }
154
155 #[cfg(feature = "routing")]
160 pub fn set_broker(&mut self, broker: Arc<Broker>) {
161 self.broker = Some(broker);
162 }
163
164 #[cfg(feature = "routing")]
166 pub fn broker(&self) -> Option<&Broker> {
167 self.broker.as_deref()
168 }
169
170 pub fn provider_arc(&self) -> Arc<dyn EventProvider> {
174 self.provider.clone()
175 }
176
177 pub async fn publish(
179 &self,
180 category: &str,
181 topic: &str,
182 summary: &str,
183 source: &str,
184 payload: serde_json::Value,
185 ) -> Result<Event> {
186 let subject = self.provider.build_subject(category, topic);
187 #[cfg(feature = "encryption")]
188 let mut event = Event::new(subject, category, summary, source, payload);
189 #[cfg(not(feature = "encryption"))]
190 let event = Event::new(subject, category, summary, source, payload);
191
192 if let Err(e) = self.validate_if_configured(&event) {
193 self.metrics.record_validation_error();
194 return Err(e);
195 }
196
197 #[cfg(feature = "encryption")]
198 if self.encryptor.is_some() {
199 self.encrypt_if_configured(&mut event)?;
200 self.metrics.record_encrypt();
201 }
202
203 let span = tracing::info_span!(
204 "event.publish",
205 event_id = %event.id,
206 subject = %event.subject,
207 category = category,
208 provider = self.provider.name(),
209 );
210 let _guard = span.enter();
211 drop(_guard);
212
213 let start = Instant::now();
214 match self.provider.publish(&event).await {
215 Ok(_) => {
216 self.metrics.record_publish(start);
217 #[cfg(feature = "routing")]
218 self.maybe_route_through_broker(&event).await;
219 Ok(event)
220 }
221 Err(e) => {
222 self.metrics.record_publish_error();
223 Err(e)
224 }
225 }
226 }
227
228 pub async fn publish_event(&self, event: &Event) -> Result<u64> {
230 if let Err(e) = self.validate_if_configured(event) {
231 self.metrics.record_validation_error();
232 return Err(e);
233 }
234
235 #[cfg(feature = "encryption")]
236 let event = {
237 let e = self.maybe_encrypt_clone(event)?;
238 if self.encryptor.is_some() {
239 self.metrics.record_encrypt();
240 }
241 e
242 };
243 #[cfg(not(feature = "encryption"))]
244 let event = event.clone();
245
246 let span = tracing::info_span!(
247 "event.publish",
248 event_id = %event.id,
249 subject = %event.subject,
250 category = %event.category,
251 provider = self.provider.name(),
252 );
253 let _guard = span.enter();
254 drop(_guard);
255
256 let start = Instant::now();
257 match self.provider.publish(&event).await {
258 Ok(seq) => {
259 self.metrics.record_publish(start);
260 #[cfg(feature = "routing")]
261 self.maybe_route_through_broker(&event).await;
262 Ok(seq)
263 }
264 Err(e) => {
265 self.metrics.record_publish_error();
266 Err(e)
267 }
268 }
269 }
270
271 pub async fn publish_event_with_options(
273 &self,
274 event: &Event,
275 opts: &PublishOptions,
276 ) -> Result<u64> {
277 if let Err(e) = self.validate_if_configured(event) {
278 self.metrics.record_validation_error();
279 return Err(e);
280 }
281
282 #[cfg(feature = "encryption")]
283 let event = {
284 let e = self.maybe_encrypt_clone(event)?;
285 if self.encryptor.is_some() {
286 self.metrics.record_encrypt();
287 }
288 e
289 };
290 #[cfg(not(feature = "encryption"))]
291 let event = event.clone();
292
293 let span = tracing::info_span!(
294 "event.publish",
295 event_id = %event.id,
296 subject = %event.subject,
297 category = %event.category,
298 provider = self.provider.name(),
299 msg_id = ?opts.msg_id,
300 );
301 let _guard = span.enter();
302 drop(_guard);
303
304 let start = Instant::now();
305 match self.provider.publish_with_options(&event, opts).await {
306 Ok(seq) => {
307 self.metrics.record_publish(start);
308 #[cfg(feature = "routing")]
309 self.maybe_route_through_broker(&event).await;
310 Ok(seq)
311 }
312 Err(e) => {
313 self.metrics.record_publish_error();
314 Err(e)
315 }
316 }
317 }
318
319 pub async fn list_events(
323 &self,
324 category: Option<&str>,
325 limit: usize,
326 ) -> Result<Vec<Event>> {
327 let filter = category.map(|c| self.provider.category_subject(c));
328 #[cfg(feature = "encryption")]
329 let mut events = self.provider
330 .history(filter.as_deref(), limit)
331 .await?;
332 #[cfg(not(feature = "encryption"))]
333 let events = self.provider
334 .history(filter.as_deref(), limit)
335 .await?;
336 #[cfg(feature = "encryption")]
337 {
338 let decrypted = self.decrypt_events(&mut events);
339 if decrypted > 0 {
340 for _ in 0..decrypted {
341 self.metrics.record_decrypt();
342 }
343 }
344 }
345 Ok(events)
346 }
347
348 pub async fn counts(&self, limit: usize) -> Result<EventCounts> {
350 let events = self.provider.history(None, limit).await?;
351 let mut counts = EventCounts::default();
352
353 for event in &events {
354 *counts.categories.entry(event.category.clone()).or_insert(0) += 1;
355 counts.total += 1;
356 }
357
358 Ok(counts)
359 }
360
361 pub async fn update_subscription(&self, filter: SubscriptionFilter) -> Result<()> {
365 let subscriber_id = filter.subscriber_id.clone();
366
367 {
368 let mut subs = self.subscriptions.write().await;
369 subs.insert(subscriber_id.clone(), filter.clone());
370 self.persist_state(&subs);
371 }
372
373 self.metrics.record_subscribe();
374
375 tracing::info!(
376 subscriber = %subscriber_id,
377 subjects = ?filter.subjects,
378 durable = filter.durable,
379 "Subscription updated"
380 );
381
382 Ok(())
383 }
384
385 pub async fn create_subscriber(
387 &self,
388 subscriber_id: &str,
389 ) -> Result<Vec<Box<dyn Subscription>>> {
390 let subs = self.subscriptions.read().await;
391 let filter = subs.get(subscriber_id).ok_or_else(|| {
392 EventError::NotFound(format!("Subscription not found: {}", subscriber_id))
393 })?;
394
395 let span = tracing::info_span!(
396 "event.subscribe",
397 subscriber = subscriber_id,
398 subjects = ?filter.subjects,
399 durable = filter.durable,
400 provider = self.provider.name(),
401 );
402 let _guard = span.enter();
403 drop(_guard);
404
405 let mut subscribers = Vec::new();
406 for subject in &filter.subjects {
407 let consumer_name = format!("{}-{}", subscriber_id, subject.replace('.', "-"));
408 let sub = match (&filter.options, filter.durable) {
409 (Some(opts), true) => {
410 self.provider
411 .subscribe_durable_with_options(&consumer_name, subject, opts)
412 .await?
413 }
414 (Some(opts), false) => {
415 self.provider
416 .subscribe_with_options(subject, opts)
417 .await?
418 }
419 (None, true) => {
420 self.provider
421 .subscribe_durable(&consumer_name, subject)
422 .await?
423 }
424 (None, false) => {
425 self.provider.subscribe(subject).await?
426 }
427 };
428 subscribers.push(sub);
429 }
430
431 Ok(subscribers)
432 }
433
434 pub async fn remove_subscription(&self, subscriber_id: &str) -> Result<()> {
438 let filter = {
439 let mut subs = self.subscriptions.write().await;
440 let removed = subs.remove(subscriber_id);
441 self.persist_state(&subs);
442 removed
443 };
444
445 if let Some(filter) = filter {
446 self.metrics.record_unsubscribe();
447 for subject in &filter.subjects {
448 let consumer_name = format!("{}-{}", subscriber_id, subject.replace('.', "-"));
449 if let Err(e) = self.provider.unsubscribe(&consumer_name).await {
450 tracing::warn!(
451 consumer = %consumer_name,
452 error = %e,
453 "Failed to delete consumer during unsubscribe"
454 );
455 }
456 }
457 }
458
459 Ok(())
460 }
461
462 pub async fn list_subscriptions(&self) -> Vec<SubscriptionFilter> {
464 let subs = self.subscriptions.read().await;
465 subs.values().cloned().collect()
466 }
467
468 pub async fn get_subscription(&self, subscriber_id: &str) -> Option<SubscriptionFilter> {
470 let subs = self.subscriptions.read().await;
471 subs.get(subscriber_id).cloned()
472 }
473
474 pub async fn info(&self) -> Result<ProviderInfo> {
476 self.provider.info().await
477 }
478
479 pub fn provider(&self) -> &dyn EventProvider {
481 self.provider.as_ref()
482 }
483
484 pub async fn health(&self) -> Result<bool> {
486 self.provider.health().await
487 }
488
489 fn validate_if_configured(&self, event: &Event) -> Result<()> {
491 if let Some(ref registry) = self.schema_registry {
492 registry.validate(event)?;
493 }
494 Ok(())
495 }
496
497 #[cfg(feature = "encryption")]
499 fn encrypt_if_configured(&self, event: &mut Event) -> Result<()> {
500 if let Some(ref encryptor) = self.encryptor {
501 event.payload = encryptor.encrypt(&event.payload)?;
502 }
503 Ok(())
504 }
505
506 #[cfg(feature = "encryption")]
508 fn maybe_encrypt_clone(&self, event: &Event) -> Result<Event> {
509 match self.encryptor {
510 Some(ref encryptor) => {
511 let mut cloned = event.clone();
512 cloned.payload = encryptor.encrypt(&cloned.payload)?;
513 Ok(cloned)
514 }
515 None => Ok(event.clone()),
516 }
517 }
518
519 #[cfg(feature = "encryption")]
522 fn decrypt_events(&self, events: &mut [Event]) -> usize {
523 let mut count = 0;
524 if let Some(ref encryptor) = self.encryptor {
525 for event in events.iter_mut() {
526 if crate::crypto::EncryptedPayload::is_encrypted(&event.payload) {
527 if let Ok(decrypted) = encryptor.decrypt(&event.payload) {
528 event.payload = decrypted;
529 count += 1;
530 }
531 }
532 }
533 }
534 count
535 }
536
537 #[cfg(feature = "routing")]
539 async fn maybe_route_through_broker(&self, event: &Event) {
540 if let Some(ref broker) = self.broker {
541 let result = broker.route(event).await;
542 if result.failed > 0 {
543 tracing::warn!(
544 event_id = %event.id,
545 matched = result.matched,
546 delivered = result.delivered,
547 failed = result.failed,
548 "Broker routing had failures"
549 );
550 }
551 }
552 }
553
554 fn persist_state(&self, subs: &HashMap<String, SubscriptionFilter>) {
556 if let Some(ref store) = self.state_store {
557 if let Err(e) = store.save(subs) {
558 tracing::warn!(error = %e, "Failed to persist subscription state");
559 }
560 }
561 }
562}
563
564#[cfg(test)]
565mod tests {
566 use super::*;
567 use crate::dlq::{DeadLetterEvent, MemoryDlqHandler};
568 use crate::provider::memory::MemoryProvider;
569 use crate::schema::{EventSchema, MemorySchemaRegistry};
570 use crate::types::Event;
571
572 fn test_bus() -> EventBus {
573 EventBus::new(MemoryProvider::default())
574 }
575
576 #[tokio::test]
577 async fn test_publish_and_list() {
578 let bus = test_bus();
579 let event = bus
580 .publish("market", "forex", "Rate change", "reuters", serde_json::json!({"rate": 7.35}))
581 .await
582 .unwrap();
583
584 assert!(event.id.starts_with("evt-"));
585 assert_eq!(event.subject, "events.market.forex");
586 assert_eq!(event.category, "market");
587
588 let events = bus.list_events(Some("market"), 10).await.unwrap();
589 assert_eq!(events.len(), 1);
590 assert_eq!(events[0].id, event.id);
591 }
592
593 #[tokio::test]
594 async fn test_publish_event_prebuilt() {
595 let bus = test_bus();
596 let event = Event::new("events.test.a", "test", "Test", "test", serde_json::json!({}));
597 let seq = bus.publish_event(&event).await.unwrap();
598 assert!(seq > 0);
599
600 let events = bus.list_events(None, 10).await.unwrap();
601 assert_eq!(events.len(), 1);
602 }
603
604 #[tokio::test]
605 async fn test_list_events_by_category() {
606 let bus = test_bus();
607 bus.publish("market", "forex", "A", "test", serde_json::json!({})).await.unwrap();
608 bus.publish("system", "deploy", "B", "test", serde_json::json!({})).await.unwrap();
609 bus.publish("market", "crypto", "C", "test", serde_json::json!({})).await.unwrap();
610
611 let market = bus.list_events(Some("market"), 10).await.unwrap();
612 assert_eq!(market.len(), 2);
613
614 let system = bus.list_events(Some("system"), 10).await.unwrap();
615 assert_eq!(system.len(), 1);
616
617 let all = bus.list_events(None, 10).await.unwrap();
618 assert_eq!(all.len(), 3);
619 }
620
621 #[tokio::test]
622 async fn test_counts() {
623 let bus = test_bus();
624 bus.publish("market", "forex", "A", "test", serde_json::json!({})).await.unwrap();
625 bus.publish("market", "crypto", "B", "test", serde_json::json!({})).await.unwrap();
626 bus.publish("system", "deploy", "C", "test", serde_json::json!({})).await.unwrap();
627
628 let counts = bus.counts(100).await.unwrap();
629 assert_eq!(counts.total, 3);
630 assert_eq!(counts.categories["market"], 2);
631 assert_eq!(counts.categories["system"], 1);
632 }
633
634 #[tokio::test]
635 async fn test_subscription_lifecycle() {
636 let bus = test_bus();
637
638 let filter = SubscriptionFilter {
639 subscriber_id: "analyst".to_string(),
640 subjects: vec!["events.market.>".to_string()],
641 durable: false,
642 options: None,
643 };
644
645 bus.update_subscription(filter).await.unwrap();
646
647 let sub = bus.get_subscription("analyst").await;
648 assert!(sub.is_some());
649 assert_eq!(sub.unwrap().subjects, vec!["events.market.>"]);
650
651 let subs = bus.list_subscriptions().await;
652 assert_eq!(subs.len(), 1);
653
654 bus.remove_subscription("analyst").await.unwrap();
655 assert!(bus.get_subscription("analyst").await.is_none());
656 assert!(bus.list_subscriptions().await.is_empty());
657 }
658
659 #[tokio::test]
660 async fn test_create_subscriber_not_found() {
661 let bus = test_bus();
662 let result = bus.create_subscriber("nonexistent").await;
663 assert!(matches!(result, Err(EventError::NotFound(_))));
664 }
665
666 #[tokio::test]
667 async fn test_provider_name() {
668 let bus = test_bus();
669 assert_eq!(bus.provider_name(), "memory");
670 }
671
672 #[tokio::test]
673 async fn test_info() {
674 let bus = test_bus();
675 bus.publish("test", "a", "A", "test", serde_json::json!({})).await.unwrap();
676
677 let info = bus.info().await.unwrap();
678 assert_eq!(info.provider, "memory");
679 assert_eq!(info.messages, 1);
680 }
681
682 #[tokio::test]
683 async fn test_health() {
684 let bus = test_bus();
685 assert!(bus.health().await.unwrap());
686 }
687
688 #[tokio::test]
689 async fn test_schema_validation_on_publish() {
690 let registry = Arc::new(MemorySchemaRegistry::new());
691 registry
692 .register(EventSchema {
693 event_type: "forex.rate".to_string(),
694 version: 1,
695 required_fields: vec!["rate".to_string()],
696 description: String::new(),
697 })
698 .unwrap();
699
700 let bus = EventBus::with_schema_registry(MemoryProvider::default(), registry);
701
702 let event = Event::typed(
704 "events.market.forex",
705 "market",
706 "forex.rate",
707 1,
708 "Rate",
709 "test",
710 serde_json::json!({"rate": 7.35}),
711 );
712 assert!(bus.publish_event(&event).await.is_ok());
713
714 let bad_event = Event::typed(
716 "events.market.forex",
717 "market",
718 "forex.rate",
719 1,
720 "Rate",
721 "test",
722 serde_json::json!({"currency": "USD"}),
723 );
724 let err = bus.publish_event(&bad_event).await.unwrap_err();
725 assert!(matches!(err, EventError::SchemaValidation { .. }));
726 }
727
728 #[tokio::test]
729 async fn test_untyped_event_skips_validation() {
730 let registry = Arc::new(MemorySchemaRegistry::new());
731 registry
732 .register(EventSchema {
733 event_type: "forex.rate".to_string(),
734 version: 1,
735 required_fields: vec!["rate".to_string()],
736 description: String::new(),
737 })
738 .unwrap();
739
740 let bus = EventBus::with_schema_registry(MemoryProvider::default(), registry);
741
742 let event = bus
744 .publish("market", "forex", "Rate", "test", serde_json::json!({}))
745 .await;
746 assert!(event.is_ok());
747 }
748
749 #[tokio::test]
750 async fn test_dlq_handler_integration() {
751 let dlq = Arc::new(MemoryDlqHandler::default());
752 let mut bus = test_bus();
753 bus.set_dlq_handler(dlq.clone());
754
755 assert!(bus.dlq_handler().is_some());
756
757 let received = crate::types::ReceivedEvent {
759 event: Event::new("events.test.a", "test", "Test", "test", serde_json::json!({})),
760 sequence: 1,
761 num_delivered: 5,
762 stream: "memory".to_string(),
763 };
764 let dle = DeadLetterEvent::new(received, "Max retries exceeded");
765 dlq.handle(dle).await.unwrap();
766
767 assert_eq!(dlq.count().await.unwrap(), 1);
768 }
769
770 #[tokio::test]
771 async fn test_publish_with_options() {
772 let bus = test_bus();
773 let event = Event::new("events.test.a", "test", "Test", "test", serde_json::json!({}));
774 let opts = PublishOptions {
775 msg_id: Some("dedup-1".to_string()),
776 ..Default::default()
777 };
778
779 let seq = bus.publish_event_with_options(&event, &opts).await.unwrap();
781 assert!(seq > 0);
782 }
783
784 #[tokio::test]
785 async fn test_concurrent_publish() {
786 let bus = Arc::new(test_bus());
787 let mut handles = Vec::new();
788
789 for i in 0..50 {
790 let bus = bus.clone();
791 handles.push(tokio::spawn(async move {
792 bus.publish(
793 "test",
794 &format!("topic.{}", i),
795 &format!("Event {}", i),
796 "test",
797 serde_json::json!({"index": i}),
798 )
799 .await
800 .unwrap()
801 }));
802 }
803
804 for handle in handles {
805 handle.await.unwrap();
806 }
807
808 let events = bus.list_events(None, 100).await.unwrap();
809 assert_eq!(events.len(), 50);
810 }
811
812 #[tokio::test]
813 async fn test_remove_nonexistent_subscription() {
814 let bus = test_bus();
815 assert!(bus.remove_subscription("nonexistent").await.is_ok());
817 }
818
819 #[tokio::test]
820 async fn test_update_subscription_overwrites() {
821 let bus = test_bus();
822
823 let filter1 = SubscriptionFilter {
824 subscriber_id: "analyst".to_string(),
825 subjects: vec!["events.market.>".to_string()],
826 durable: false,
827 options: None,
828 };
829 bus.update_subscription(filter1).await.unwrap();
830
831 let filter2 = SubscriptionFilter {
832 subscriber_id: "analyst".to_string(),
833 subjects: vec!["events.system.>".to_string()],
834 durable: true,
835 options: None,
836 };
837 bus.update_subscription(filter2).await.unwrap();
838
839 let sub = bus.get_subscription("analyst").await.unwrap();
840 assert_eq!(sub.subjects, vec!["events.system.>"]);
841 assert!(sub.durable);
842 assert_eq!(bus.list_subscriptions().await.len(), 1);
843 }
844
845 #[cfg(feature = "encryption")]
846 #[tokio::test]
847 async fn test_encrypted_publish_and_list() {
848 let enc = Arc::new(crate::crypto::Aes256GcmEncryptor::new("k1", &[0x42; 32]));
849 let mut bus = test_bus();
850 bus.set_encryptor(enc.clone());
851
852 let event = bus
853 .publish("market", "forex", "Rate", "test", serde_json::json!({"rate": 7.35}))
854 .await
855 .unwrap();
856
857 assert!(crate::crypto::EncryptedPayload::is_encrypted(&event.payload));
859
860 let events = bus.list_events(Some("market"), 10).await.unwrap();
862 assert_eq!(events.len(), 1);
863 assert_eq!(events[0].payload, serde_json::json!({"rate": 7.35}));
864 }
865
866 #[cfg(feature = "encryption")]
867 #[tokio::test]
868 async fn test_encrypted_publish_event_prebuilt() {
869 let enc = Arc::new(crate::crypto::Aes256GcmEncryptor::new("k1", &[0x42; 32]));
870 let mut bus = test_bus();
871 bus.set_encryptor(enc);
872
873 let event = Event::new("events.test.a", "test", "Test", "test", serde_json::json!({"secret": "data"}));
874 let seq = bus.publish_event(&event).await.unwrap();
875 assert!(seq > 0);
876
877 assert_eq!(event.payload, serde_json::json!({"secret": "data"}));
879
880 let events = bus.list_events(None, 10).await.unwrap();
882 assert_eq!(events[0].payload, serde_json::json!({"secret": "data"}));
883 }
884
885 #[cfg(feature = "encryption")]
886 #[tokio::test]
887 async fn test_no_encryptor_passthrough() {
888 let bus = test_bus();
889 let event = bus
890 .publish("test", "a", "Test", "test", serde_json::json!({"plain": true}))
891 .await
892 .unwrap();
893
894 assert!(!crate::crypto::EncryptedPayload::is_encrypted(&event.payload));
896 assert_eq!(event.payload, serde_json::json!({"plain": true}));
897 }
898
899 #[cfg(feature = "encryption")]
900 #[tokio::test]
901 async fn test_encryptor_accessor() {
902 let enc = Arc::new(crate::crypto::Aes256GcmEncryptor::new("k1", &[0x42; 32]));
903 let mut bus = test_bus();
904 assert!(bus.encryptor().is_none());
905 bus.set_encryptor(enc);
906 assert!(bus.encryptor().is_some());
907 assert_eq!(bus.encryptor().unwrap().active_key_id(), "k1");
908 }
909
910 #[tokio::test]
911 async fn test_state_store_persists_subscriptions() {
912 let store = Arc::new(crate::state::MemoryStateStore::default());
913 let mut bus = test_bus();
914 bus.set_state_store(store.clone()).unwrap();
915
916 let filter = SubscriptionFilter {
917 subscriber_id: "analyst".to_string(),
918 subjects: vec!["events.market.>".to_string()],
919 durable: true,
920 options: None,
921 };
922 bus.update_subscription(filter).await.unwrap();
923
924 let loaded = store.load().unwrap();
926 assert_eq!(loaded.len(), 1);
927 assert!(loaded.contains_key("analyst"));
928 }
929
930 #[tokio::test]
931 async fn test_state_store_remove_persists() {
932 let store = Arc::new(crate::state::MemoryStateStore::default());
933 let mut bus = test_bus();
934 bus.set_state_store(store.clone()).unwrap();
935
936 let filter = SubscriptionFilter {
937 subscriber_id: "analyst".to_string(),
938 subjects: vec!["events.market.>".to_string()],
939 durable: false,
940 options: None,
941 };
942 bus.update_subscription(filter).await.unwrap();
943 bus.remove_subscription("analyst").await.unwrap();
944
945 let loaded = store.load().unwrap();
946 assert!(loaded.is_empty());
947 }
948
949 #[tokio::test]
950 async fn test_state_store_restores_on_set() {
951 let store = Arc::new(crate::state::MemoryStateStore::default());
952
953 let mut initial = std::collections::HashMap::new();
955 initial.insert(
956 "monitor".to_string(),
957 SubscriptionFilter {
958 subscriber_id: "monitor".to_string(),
959 subjects: vec!["events.system.>".to_string()],
960 durable: true,
961 options: None,
962 },
963 );
964 store.save(&initial).unwrap();
965
966 let mut bus = test_bus();
968 bus.set_state_store(store).unwrap();
969
970 let sub = bus.get_subscription("monitor").await;
971 assert!(sub.is_some());
972 assert_eq!(sub.unwrap().subjects, vec!["events.system.>"]);
973 }
974
975 #[tokio::test]
976 async fn test_state_store_accessor() {
977 let mut bus = test_bus();
978 assert!(bus.state_store().is_none());
979
980 let store = Arc::new(crate::state::MemoryStateStore::default());
981 bus.set_state_store(store).unwrap();
982 assert!(bus.state_store().is_some());
983 }
984
985 #[tokio::test]
986 async fn test_file_state_store_lifecycle() {
987 let dir = std::env::temp_dir().join(format!("a3s-event-bus-{}", uuid::Uuid::new_v4()));
988 let path = dir.join("bus-state.json");
989 let store = Arc::new(crate::state::FileStateStore::new(&path));
990
991 {
993 let mut bus = test_bus();
994 bus.set_state_store(store.clone()).unwrap();
995
996 bus.update_subscription(SubscriptionFilter {
997 subscriber_id: "a".to_string(),
998 subjects: vec!["events.market.>".to_string()],
999 durable: true,
1000 options: None,
1001 })
1002 .await
1003 .unwrap();
1004
1005 bus.update_subscription(SubscriptionFilter {
1006 subscriber_id: "b".to_string(),
1007 subjects: vec!["events.system.>".to_string()],
1008 durable: false,
1009 options: None,
1010 })
1011 .await
1012 .unwrap();
1013 }
1014
1015 {
1017 let mut bus = test_bus();
1018 bus.set_state_store(store).unwrap();
1019
1020 assert_eq!(bus.list_subscriptions().await.len(), 2);
1021 assert!(bus.get_subscription("a").await.is_some());
1022 assert!(bus.get_subscription("b").await.is_some());
1023 }
1024
1025 std::fs::remove_dir_all(&dir).unwrap();
1026 }
1027
1028 #[tokio::test]
1029 async fn test_metrics_publish_count() {
1030 let bus = test_bus();
1031 bus.publish("test", "a", "A", "test", serde_json::json!({})).await.unwrap();
1032 bus.publish("test", "b", "B", "test", serde_json::json!({})).await.unwrap();
1033
1034 let s = bus.metrics().snapshot();
1035 assert_eq!(s.publish_count, 2);
1036 assert_eq!(s.publish_errors, 0);
1037 assert!(s.avg_publish_latency_us < 1_000_000); }
1039
1040 #[tokio::test]
1041 async fn test_metrics_subscribe_unsubscribe() {
1042 let bus = test_bus();
1043 let filter = SubscriptionFilter {
1044 subscriber_id: "m".to_string(),
1045 subjects: vec!["events.>".to_string()],
1046 durable: false,
1047 options: None,
1048 };
1049 bus.update_subscription(filter).await.unwrap();
1050 bus.remove_subscription("m").await.unwrap();
1051
1052 let s = bus.metrics().snapshot();
1053 assert_eq!(s.subscribe_count, 1);
1054 assert_eq!(s.unsubscribe_count, 1);
1055 }
1056
1057 #[tokio::test]
1058 async fn test_metrics_validation_error() {
1059 let registry = Arc::new(MemorySchemaRegistry::new());
1060 registry
1061 .register(EventSchema {
1062 event_type: "strict.type".to_string(),
1063 version: 1,
1064 required_fields: vec!["required_field".to_string()],
1065 description: String::new(),
1066 })
1067 .unwrap();
1068
1069 let bus = EventBus::with_schema_registry(MemoryProvider::default(), registry);
1070
1071 let bad_event = Event::typed(
1072 "events.test.a", "test", "strict.type", 1,
1073 "Bad", "test", serde_json::json!({}),
1074 );
1075 assert!(bus.publish_event(&bad_event).await.is_err());
1076
1077 let s = bus.metrics().snapshot();
1078 assert_eq!(s.validation_errors, 1);
1079 assert_eq!(s.publish_count, 0);
1080 }
1081
1082 #[cfg(feature = "encryption")]
1083 #[tokio::test]
1084 async fn test_metrics_encrypt_decrypt() {
1085 let enc = Arc::new(crate::crypto::Aes256GcmEncryptor::new("k1", &[0x42; 32]));
1086 let mut bus = test_bus();
1087 bus.set_encryptor(enc);
1088
1089 bus.publish("test", "a", "A", "test", serde_json::json!({"data": 1})).await.unwrap();
1090 bus.list_events(None, 10).await.unwrap();
1091
1092 let s = bus.metrics().snapshot();
1093 assert_eq!(s.encrypt_count, 1);
1094 assert_eq!(s.decrypt_count, 1);
1095 }
1096
1097 #[tokio::test]
1098 async fn test_metrics_snapshot_serializable() {
1099 let bus = test_bus();
1100 bus.publish("test", "a", "A", "test", serde_json::json!({})).await.unwrap();
1101
1102 let s = bus.metrics().snapshot();
1103 let json = serde_json::to_string(&s).unwrap();
1104 assert!(json.contains("publishCount"));
1105 }
1106
1107 #[tokio::test]
1108 async fn test_metrics_reset() {
1109 let bus = test_bus();
1110 bus.publish("test", "a", "A", "test", serde_json::json!({})).await.unwrap();
1111 assert_eq!(bus.metrics().snapshot().publish_count, 1);
1112
1113 bus.metrics().reset();
1114 assert_eq!(bus.metrics().snapshot().publish_count, 0);
1115 }
1116}