1use chrono::{DateTime, Utc};
2use serde::{Deserialize, Serialize};
3use std::collections::HashMap;
4use std::sync::Arc;
5use std::time::Duration;
6
7use crate::contract::delivery::DeliveryInspector;
8use crate::contract::{
9 AckMode, BackpressurePolicy, ConsumerBalanceMode, DeliveryGuarantee, OrderingMode,
10 OverflowStrategy, PublishConfirmation,
11};
12use crate::error::EventBusError;
13
14pub type Headers = HashMap<String, String>;
19
20#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
27#[serde(transparent)]
28#[repr(transparent)]
29pub struct Topic(String);
30
31impl Topic {
32 pub const MAX_LEN: usize = 1024;
34
35 pub fn new(s: impl Into<String>) -> Result<Self, EventBusError> {
37 let s = s.into();
38 if s.trim().is_empty() {
39 return Err(EventBusError::Validation("topic is required".into()));
40 }
41 if s.len() > Self::MAX_LEN {
42 return Err(EventBusError::Validation(format!(
43 "topic length {} exceeds limit of {}",
44 s.len(),
45 Self::MAX_LEN,
46 )));
47 }
48 if s.chars().any(|c| c.is_control()) {
49 return Err(EventBusError::Validation(
50 "topic must not contain control characters".into(),
51 ));
52 }
53 Ok(Self(s))
54 }
55
56 #[must_use]
57 pub fn as_str(&self) -> &str {
58 &self.0
59 }
60
61 #[must_use]
62 pub fn into_inner(self) -> String {
63 self.0
64 }
65}
66
67impl std::ops::Deref for Topic {
68 type Target = str;
69 fn deref(&self) -> &str {
70 &self.0
71 }
72}
73
74impl std::fmt::Display for Topic {
75 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
76 f.write_str(&self.0)
77 }
78}
79
80impl AsRef<str> for Topic {
81 fn as_ref(&self) -> &str {
82 &self.0
83 }
84}
85
86impl TryFrom<&str> for Topic {
87 type Error = EventBusError;
88 fn try_from(s: &str) -> Result<Self, Self::Error> {
89 Topic::new(s)
90 }
91}
92
93impl TryFrom<String> for Topic {
94 type Error = EventBusError;
95 fn try_from(s: String) -> Result<Self, Self::Error> {
96 Topic::new(s)
97 }
98}
99
100#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
102#[serde(transparent)]
103#[repr(transparent)]
104pub struct ConsumerGroup(String);
105
106impl ConsumerGroup {
107 pub const MAX_LEN: usize = 256;
108
109 pub fn new(s: impl Into<String>) -> Result<Self, EventBusError> {
110 let s = s.into();
111 if s.trim().is_empty() {
112 return Err(EventBusError::Validation(
113 "consumer group is required".into(),
114 ));
115 }
116 if s.len() > Self::MAX_LEN {
117 return Err(EventBusError::Validation(format!(
118 "consumer group length {} exceeds limit of {}",
119 s.len(),
120 Self::MAX_LEN,
121 )));
122 }
123 Ok(Self(s))
124 }
125
126 #[must_use]
127 pub fn as_str(&self) -> &str {
128 &self.0
129 }
130
131 #[must_use]
132 pub fn into_inner(self) -> String {
133 self.0
134 }
135}
136
137impl std::ops::Deref for ConsumerGroup {
138 type Target = str;
139 fn deref(&self) -> &str {
140 &self.0
141 }
142}
143
144impl std::fmt::Display for ConsumerGroup {
145 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
146 f.write_str(&self.0)
147 }
148}
149
150impl AsRef<str> for ConsumerGroup {
151 fn as_ref(&self) -> &str {
152 &self.0
153 }
154}
155
156impl TryFrom<&str> for ConsumerGroup {
157 type Error = EventBusError;
158 fn try_from(s: &str) -> Result<Self, Self::Error> {
159 Self::new(s)
160 }
161}
162
163impl TryFrom<String> for ConsumerGroup {
164 type Error = EventBusError;
165 fn try_from(s: String) -> Result<Self, Self::Error> {
166 Self::new(s)
167 }
168}
169
170#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
173#[serde(transparent)]
174#[repr(transparent)]
175pub struct ConsumerName(String);
176
177impl ConsumerName {
178 pub const MAX_LEN: usize = 256;
179
180 pub fn new(s: impl Into<String>) -> Result<Self, EventBusError> {
181 let s = s.into();
182 if s.trim().is_empty() {
183 return Err(EventBusError::Validation(
184 "consumer name is required".into(),
185 ));
186 }
187 if s.len() > Self::MAX_LEN {
188 return Err(EventBusError::Validation(format!(
189 "consumer name length {} exceeds limit of {}",
190 s.len(),
191 Self::MAX_LEN,
192 )));
193 }
194 Ok(Self(s))
195 }
196
197 #[must_use]
201 pub fn auto() -> Self {
202 let nanos = chrono::Utc::now().timestamp_nanos_opt().unwrap_or_default();
203 let entropy: u64 = rand::Rng::gen(&mut rand::thread_rng());
204 Self(format!("consumer-{nanos}-{entropy:016x}"))
205 }
206
207 #[must_use]
208 pub fn as_str(&self) -> &str {
209 &self.0
210 }
211
212 #[must_use]
213 pub fn into_inner(self) -> String {
214 self.0
215 }
216}
217
218impl std::ops::Deref for ConsumerName {
219 type Target = str;
220 fn deref(&self) -> &str {
221 &self.0
222 }
223}
224
225impl std::fmt::Display for ConsumerName {
226 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
227 f.write_str(&self.0)
228 }
229}
230
231impl AsRef<str> for ConsumerName {
232 fn as_ref(&self) -> &str {
233 &self.0
234 }
235}
236
237impl TryFrom<&str> for ConsumerName {
238 type Error = EventBusError;
239 fn try_from(s: &str) -> Result<Self, Self::Error> {
240 Self::new(s)
241 }
242}
243
244impl TryFrom<String> for ConsumerName {
245 type Error = EventBusError;
246 fn try_from(s: String) -> Result<Self, Self::Error> {
247 Self::new(s)
248 }
249}
250
251#[derive(Debug, Clone, Serialize, Deserialize)]
256pub struct Message {
257 pub uid: String,
258 pub topic: Topic,
259 pub key: String,
260 pub kind: String,
261 pub source: String,
262 pub occurred_at: DateTime<Utc>,
263 pub headers: Headers,
264 #[serde(with = "crate::serde_bytes")]
265 pub payload: bytes::Bytes,
266 #[serde(default, skip_serializing_if = "Option::is_none")]
267 pub content_type: Option<String>,
268 #[serde(default, skip_serializing_if = "Option::is_none")]
269 pub event_version: Option<String>,
270 #[serde(default, skip_serializing_if = "Option::is_none")]
271 pub idempotency_key: Option<String>,
272 #[serde(default, skip_serializing_if = "Option::is_none")]
273 pub expires_at: Option<DateTime<Utc>>,
274 #[serde(default, skip_serializing_if = "Option::is_none")]
275 pub trace_uid: Option<String>,
276 #[serde(default, skip_serializing_if = "Option::is_none")]
277 pub correlation_uid: Option<String>,
278}
279
280#[derive(Debug, Clone, Default)]
285#[non_exhaustive]
286#[must_use = "PublishOptions is inert until passed to publish/publish_batch"]
287pub struct PublishOptions {
288 pub delay: Option<Duration>,
291 pub ordered_key: Option<String>,
292 pub metadata: HashMap<String, String>,
293 pub require_ordered_key: bool,
294 pub guarantee: Option<DeliveryGuarantee>,
295 pub confirmation: Option<PublishConfirmation>,
296 pub idempotency_key: Option<String>,
297 pub backpressure: Option<BackpressurePolicy>,
298 pub topic_ttl: Option<Duration>,
299 pub expected_content_type: Option<String>,
300 pub expected_event_version: Option<String>,
301}
302
303impl PublishOptions {
304 pub fn new() -> Self {
305 Self::default()
306 }
307
308 pub fn with_delay(mut self, delay: Duration) -> Self {
309 self.delay = Some(delay);
310 self
311 }
312
313 pub fn with_ordered_key(mut self, key: impl Into<String>) -> Self {
314 self.ordered_key = Some(key.into());
315 self
316 }
317
318 pub fn with_metadata(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
319 self.metadata.insert(key.into(), value.into());
320 self
321 }
322
323 pub fn with_require_ordered_key(mut self) -> Self {
324 self.require_ordered_key = true;
325 self
326 }
327
328 pub fn with_guarantee(mut self, g: DeliveryGuarantee) -> Self {
329 self.guarantee = Some(g);
330 self
331 }
332
333 pub fn with_confirmation(mut self, c: PublishConfirmation) -> Self {
334 self.confirmation = Some(c);
335 self
336 }
337
338 pub fn with_idempotency_key(mut self, key: impl Into<String>) -> Self {
339 self.idempotency_key = Some(key.into());
340 self
341 }
342
343 pub fn with_backpressure(mut self, bp: BackpressurePolicy) -> Self {
344 self.backpressure = Some(bp);
345 self
346 }
347
348 pub fn with_topic_ttl(mut self, ttl: Duration) -> Self {
349 self.topic_ttl = Some(ttl);
350 self
351 }
352
353 pub fn with_expected_content_type(mut self, content_type: impl Into<String>) -> Self {
354 self.expected_content_type = Some(content_type.into());
355 self
356 }
357
358 pub fn with_expected_event_version(mut self, version: impl Into<String>) -> Self {
359 self.expected_event_version = Some(version.into());
360 self
361 }
362
363 pub fn validate(&self) -> Result<(), EventBusError> {
364 if self.require_ordered_key
365 && self
366 .ordered_key
367 .as_ref()
368 .is_none_or(|k| k.trim().is_empty())
369 {
370 return Err(EventBusError::Validation("ordered key is required".into()));
371 }
372
373 if self.guarantee == Some(DeliveryGuarantee::ExactlyOnce)
374 && self.confirmation != Some(PublishConfirmation::Persisted)
375 {
376 return Err(EventBusError::Validation(format!(
377 "exactly-once requires {:?} confirmation",
378 PublishConfirmation::Persisted,
379 )));
380 }
381
382 if let Some(ref bp) = self.backpressure {
383 bp.validate()?;
384 }
385
386 if let Some(ttl) = self.topic_ttl {
387 if ttl.is_zero() {
388 return Err(EventBusError::Validation("topic ttl must be > 0".into()));
389 }
390 }
391
392 Ok(())
393 }
394}
395
396#[derive(Debug, Clone)]
416#[non_exhaustive]
417pub struct SubscriptionConfig {
418 pub(crate) topic: Topic,
419 pub(crate) consumer_group: ConsumerGroup,
420 pub(crate) consumer_name: ConsumerName,
421 pub(crate) max_retry: usize,
422 pub(crate) retry_backoff: Duration,
423 pub(crate) dead_letter_topic: Option<Topic>,
424 pub(crate) ack_mode: AckMode,
425 pub(crate) ordering_mode: Option<OrderingMode>,
426 pub(crate) balance_mode: Option<ConsumerBalanceMode>,
427 pub(crate) guarantee: Option<DeliveryGuarantee>,
428 pub(crate) max_in_flight: usize,
429 pub(crate) max_pending_acks: usize,
430 pub(crate) wildcard_topic: bool,
431 pub(crate) backpressure: Option<BackpressurePolicy>,
432}
433
434impl SubscriptionConfig {
435 pub fn builder(topic: Topic, consumer_group: ConsumerGroup) -> SubscriptionConfigBuilder {
437 SubscriptionConfigBuilder::new(topic, consumer_group)
438 }
439
440 #[must_use]
441 pub fn topic(&self) -> &Topic {
442 &self.topic
443 }
444 #[must_use]
445 pub fn consumer_group(&self) -> &ConsumerGroup {
446 &self.consumer_group
447 }
448 #[must_use]
449 pub fn consumer_name(&self) -> &ConsumerName {
450 &self.consumer_name
451 }
452 #[must_use]
453 pub fn ack_mode(&self) -> AckMode {
454 self.ack_mode
455 }
456 #[must_use]
457 pub fn ordering_mode(&self) -> Option<OrderingMode> {
458 self.ordering_mode
459 }
460 #[must_use]
461 pub fn balance_mode(&self) -> Option<ConsumerBalanceMode> {
462 self.balance_mode
463 }
464 #[must_use]
465 pub fn guarantee(&self) -> Option<DeliveryGuarantee> {
466 self.guarantee
467 }
468 #[must_use]
469 pub fn max_in_flight(&self) -> usize {
470 self.max_in_flight
471 }
472 #[must_use]
473 pub fn max_pending_acks(&self) -> usize {
474 self.max_pending_acks
475 }
476 #[must_use]
477 pub fn max_retry(&self) -> usize {
478 self.max_retry
479 }
480 #[must_use]
481 pub fn retry_backoff(&self) -> Duration {
482 self.retry_backoff
483 }
484 #[must_use]
485 pub fn dead_letter_topic(&self) -> Option<&Topic> {
486 self.dead_letter_topic.as_ref()
487 }
488 #[must_use]
489 pub fn backpressure(&self) -> Option<&BackpressurePolicy> {
490 self.backpressure.as_ref()
491 }
492 #[must_use]
493 pub fn wildcard_topic(&self) -> bool {
494 self.wildcard_topic
495 }
496}
497
498#[must_use = "build the config and pass it to subscribe()"]
501#[derive(Debug, Clone)]
502pub struct SubscriptionConfigBuilder {
503 cfg: SubscriptionConfig,
504}
505
506impl SubscriptionConfigBuilder {
507 fn new(topic: Topic, consumer_group: ConsumerGroup) -> Self {
508 Self {
509 cfg: SubscriptionConfig {
510 topic,
511 consumer_group,
512 consumer_name: ConsumerName::auto(),
513 max_retry: 0,
514 retry_backoff: Duration::ZERO,
515 dead_letter_topic: None,
516 ack_mode: AckMode::Manual,
517 ordering_mode: None,
518 balance_mode: None,
519 guarantee: None,
520 max_in_flight: 0,
521 max_pending_acks: 0,
522 wildcard_topic: false,
523 backpressure: None,
524 },
525 }
526 }
527
528 pub fn consumer_name(mut self, n: ConsumerName) -> Self {
529 self.cfg.consumer_name = n;
530 self
531 }
532 pub fn max_retry(mut self, n: usize) -> Self {
533 self.cfg.max_retry = n;
534 self
535 }
536 pub fn retry_backoff(mut self, d: Duration) -> Self {
537 self.cfg.retry_backoff = d;
538 self
539 }
540 pub fn dead_letter_topic(mut self, t: Topic) -> Self {
541 self.cfg.dead_letter_topic = Some(t);
542 self
543 }
544 pub fn ack_mode(mut self, m: AckMode) -> Self {
545 self.cfg.ack_mode = m;
546 self
547 }
548 pub fn ordering(mut self, o: OrderingMode) -> Self {
549 self.cfg.ordering_mode = Some(o);
550 self
551 }
552 pub fn balance(mut self, b: ConsumerBalanceMode) -> Self {
553 self.cfg.balance_mode = Some(b);
554 self
555 }
556 pub fn guarantee(mut self, g: DeliveryGuarantee) -> Self {
557 self.cfg.guarantee = Some(g);
558 self
559 }
560 pub fn max_in_flight(mut self, n: usize) -> Self {
561 self.cfg.max_in_flight = n;
562 self
563 }
564 pub fn max_pending_acks(mut self, n: usize) -> Self {
565 self.cfg.max_pending_acks = n;
566 self
567 }
568 pub fn backpressure(mut self, p: BackpressurePolicy) -> Self {
569 self.cfg.backpressure = Some(p);
570 self
571 }
572 pub fn wildcard_topic(mut self) -> Self {
573 self.cfg.wildcard_topic = true;
574 self
575 }
576
577 pub fn build(mut self) -> Result<SubscriptionConfig, EventBusError> {
579 self.cfg.normalize_and_validate()?;
580 Ok(self.cfg)
581 }
582}
583
584impl SubscriptionConfig {
585 pub fn apply_defaults(&mut self) {
587 if self.ordering_mode.is_none() {
588 self.ordering_mode = Some(OrderingMode::None);
589 }
590
591 if self.balance_mode.is_none() {
592 self.balance_mode = Some(ConsumerBalanceMode::Competing);
593 }
594
595 if self.guarantee.is_none() {
596 self.guarantee = Some(DeliveryGuarantee::AtLeastOnce);
597 }
598
599 if let Some(ref bp) = self.backpressure {
600 if self.max_in_flight == 0 {
601 self.max_in_flight = bp.max_in_flight;
602 }
603 if self.max_pending_acks == 0 {
604 self.max_pending_acks = bp.max_pending_acks;
605 }
606 }
607
608 if self.max_in_flight == 0 {
609 self.max_in_flight = 1;
610 }
611 if self.max_pending_acks == 0 {
612 self.max_pending_acks = self.max_in_flight * 2;
613 }
614 if self.retry_backoff.is_zero() {
615 self.retry_backoff = Duration::from_millis(100);
616 }
617
618 if self.backpressure.is_none() {
619 self.backpressure = Some(BackpressurePolicy {
620 max_in_flight: self.max_in_flight,
621 max_pending_acks: self.max_pending_acks,
622 max_batch_size: self.max_in_flight,
623 overflow_strategy: OverflowStrategy::Reject,
624 });
625 }
626 }
627
628 pub fn validate(&self) -> Result<(), EventBusError> {
631 if self.ordering_mode.is_none() {
632 return Err(EventBusError::Validation(
633 "ordering mode is required".into(),
634 ));
635 }
636
637 if self.balance_mode.is_none() {
638 return Err(EventBusError::Validation("balance mode is required".into()));
639 }
640
641 if self.guarantee.is_none() {
642 return Err(EventBusError::Validation(
643 "delivery guarantee is required".into(),
644 ));
645 }
646
647 if let Some(ref bp) = self.backpressure {
648 bp.validate()?;
649 if self.max_in_flight > 0 && self.max_in_flight != bp.max_in_flight {
650 return Err(EventBusError::Validation(
651 "max in flight conflicts with backpressure policy".into(),
652 ));
653 }
654 if self.max_pending_acks > 0 && self.max_pending_acks != bp.max_pending_acks {
655 return Err(EventBusError::Validation(
656 "max pending acks conflicts with backpressure policy".into(),
657 ));
658 }
659 }
660
661 if self.max_pending_acks < self.max_in_flight {
662 return Err(EventBusError::Validation(
663 "max pending acks must be >= max in flight".into(),
664 ));
665 }
666
667 Ok(())
668 }
669
670 pub fn normalize_and_validate(&mut self) -> Result<(), EventBusError> {
672 self.apply_defaults();
673 self.validate()
674 }
675}
676
677pub trait Delivery: DeliveryInspector + Send + Sync {
687 fn message(&self) -> &Message;
688}
689
690pub trait DeliveryControl: Send {
700 fn ack(self: Box<Self>) -> crate::BoxFuture<'static, Result<(), EventBusError>>;
701 fn nack(
702 self: Box<Self>,
703 reason: crate::BoxedError,
704 ) -> crate::BoxFuture<'static, Result<(), EventBusError>>;
705 fn retry(
706 self: Box<Self>,
707 reason: crate::BoxedError,
708 ) -> crate::BoxFuture<'static, Result<(), EventBusError>>;
709}
710
711pub trait DeliveryHandle: Delivery + DeliveryControl {}
714impl<T: Delivery + DeliveryControl + ?Sized> DeliveryHandle for T {}
715
716pub trait Handler: Send + Sync {
717 fn handle(
718 &self,
719 delivery: Box<dyn DeliveryHandle>,
720 ) -> crate::BoxFuture<'_, Result<(), EventBusError>>;
721}
722
723pub trait Subscription: Send + Sync {
724 fn name(&self) -> &str;
725 fn close(self: Arc<Self>) -> crate::BoxFuture<'static, Result<(), EventBusError>>;
726}
727
728#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
731#[serde(transparent)]
732#[repr(transparent)]
733pub struct MessageId(String);
734
735impl MessageId {
736 #[must_use]
737 pub fn new(s: impl Into<String>) -> Self {
738 Self(s.into())
739 }
740
741 #[must_use]
742 pub fn as_str(&self) -> &str {
743 &self.0
744 }
745
746 #[must_use]
747 pub fn into_inner(self) -> String {
748 self.0
749 }
750}
751
752impl std::ops::Deref for MessageId {
753 type Target = str;
754 fn deref(&self) -> &str {
755 &self.0
756 }
757}
758
759impl std::fmt::Display for MessageId {
760 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
761 f.write_str(&self.0)
762 }
763}
764
765impl AsRef<str> for MessageId {
766 fn as_ref(&self) -> &str {
767 &self.0
768 }
769}
770
771impl From<&str> for MessageId {
772 fn from(s: &str) -> Self {
773 Self::new(s)
774 }
775}
776
777impl From<String> for MessageId {
778 fn from(s: String) -> Self {
779 Self::new(s)
780 }
781}
782
783#[derive(Debug)]
786#[non_exhaustive]
787pub struct BatchOutcome {
788 pub results: Vec<Result<MessageId, EventBusError>>,
789}
790
791impl BatchOutcome {
792 #[must_use]
793 pub fn all_ok(&self) -> bool {
794 self.results.iter().all(Result::is_ok)
795 }
796 #[must_use]
797 pub fn ok_count(&self) -> usize {
798 self.results.iter().filter(|r| r.is_ok()).count()
799 }
800 #[must_use]
801 pub fn err_count(&self) -> usize {
802 self.results.len() - self.ok_count()
803 }
804 pub fn errors(&self) -> impl Iterator<Item = &EventBusError> {
805 self.results.iter().filter_map(|r| r.as_ref().err())
806 }
807
808 pub fn into_result(self) -> Result<Vec<MessageId>, BatchError> {
813 if self.all_ok() {
814 Ok(self.results.into_iter().filter_map(Result::ok).collect())
815 } else {
816 Err(BatchError { outcome: self })
817 }
818 }
819}
820
821#[derive(Debug)]
824pub struct BatchError {
825 pub outcome: BatchOutcome,
826}
827
828impl std::fmt::Display for BatchError {
829 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
830 write!(
831 f,
832 "publish_batch had {} failure(s) out of {} messages",
833 self.outcome.err_count(),
834 self.outcome.results.len()
835 )
836 }
837}
838
839impl std::error::Error for BatchError {}
840
841pub trait Publisher: Send + Sync {
842 fn publish(
843 &self,
844 msg: Message,
845 opts: PublishOptions,
846 ) -> crate::BoxFuture<'_, Result<MessageId, EventBusError>>;
847
848 fn publish_batch(
849 &self,
850 msgs: Vec<Message>,
851 opts: PublishOptions,
852 ) -> crate::BoxFuture<'_, Result<BatchOutcome, EventBusError>>;
853}
854
855pub trait PublisherExt: Publisher {
858 fn publish_iter<I>(
862 &self,
863 msgs: I,
864 opts: PublishOptions,
865 ) -> crate::BoxFuture<'_, Result<BatchOutcome, EventBusError>>
866 where
867 I: IntoIterator<Item = Message> + Send + 'static,
868 I::IntoIter: Send,
869 {
870 let collected: Vec<Message> = msgs.into_iter().collect();
871 self.publish_batch(collected, opts)
872 }
873}
874
875impl<P: Publisher + ?Sized> PublisherExt for P {}
876
877pub trait Subscriber: Send + Sync {
878 fn subscribe(
879 &self,
880 cfg: SubscriptionConfig,
881 handler: Arc<dyn Handler>,
882 ) -> crate::BoxFuture<'_, Result<Arc<dyn Subscription>, EventBusError>>;
883}
884
885pub trait SubscriberExt: Subscriber {
888 fn subscribe_with<H>(
889 &self,
890 cfg: SubscriptionConfig,
891 handler: H,
892 ) -> crate::BoxFuture<'_, Result<Arc<dyn Subscription>, EventBusError>>
893 where
894 H: Handler + 'static,
895 {
896 self.subscribe(cfg, Arc::new(handler))
897 }
898}
899
900impl<S: Subscriber + ?Sized> SubscriberExt for S {}
901
902pub trait Bus: Publisher + Subscriber + Send + Sync {}
903
904impl<T> Bus for T where T: Publisher + Subscriber + Send + Sync {}
905
906pub trait Codec: Send + Sync {
912 fn name(&self) -> &str;
913 fn encode(&self, msg: &Message) -> Result<Vec<u8>, EventBusError>;
914 fn decode(&self, bytes: &[u8]) -> Result<Message, EventBusError>;
915}
916
917#[cfg(test)]
922mod tests {
923 use super::*;
924
925 #[test]
926 fn publish_options_accepts_valid() {
927 let opts = PublishOptions::new()
928 .with_ordered_key("user-1")
929 .with_require_ordered_key()
930 .with_guarantee(DeliveryGuarantee::AtLeastOnce)
931 .with_confirmation(PublishConfirmation::Accepted)
932 .with_backpressure(BackpressurePolicy {
933 max_in_flight: 10,
934 max_pending_acks: 20,
935 max_batch_size: 10,
936 overflow_strategy: OverflowStrategy::Reject,
937 });
938 assert!(opts.validate().is_ok());
939 }
940
941 #[test]
942 fn publish_options_rejects_missing_ordered_key() {
943 let opts = PublishOptions::new().with_require_ordered_key();
944 assert!(opts.validate().is_err());
945 }
946
947 #[test]
948 fn publish_options_rejects_exactly_once_without_persisted() {
949 let opts = PublishOptions::new()
950 .with_guarantee(DeliveryGuarantee::ExactlyOnce)
951 .with_confirmation(PublishConfirmation::Accepted);
952 assert!(opts.validate().is_err());
953 }
954
955 #[test]
956 fn publish_options_rejects_exactly_once_without_confirmation() {
957 let opts = PublishOptions::new().with_guarantee(DeliveryGuarantee::ExactlyOnce);
958 assert!(opts.validate().is_err());
959 }
960
961 #[test]
962 fn publish_options_rejects_zero_ttl() {
963 let opts = PublishOptions::new().with_topic_ttl(Duration::ZERO);
964 assert!(opts.validate().is_err());
965 }
966
967 #[test]
968 fn batch_outcome_into_result_ok_when_all_succeed() {
969 let outcome = BatchOutcome {
970 results: vec![Ok(MessageId::new("a")), Ok(MessageId::new("b"))],
971 };
972 let res = outcome.into_result().expect("should be ok");
973 assert_eq!(res.len(), 2);
974 assert_eq!(res[0].as_str(), "a");
975 }
976
977 #[test]
978 fn batch_outcome_into_result_preserves_outcome_on_failure() {
979 let outcome = BatchOutcome {
980 results: vec![
981 Ok(MessageId::new("a")),
982 Err(EventBusError::Validation("bad".into())),
983 ],
984 };
985 let err = outcome.into_result().expect_err("should be err");
986 assert_eq!(err.outcome.results.len(), 2);
987 assert_eq!(err.outcome.ok_count(), 1);
988 assert_eq!(err.outcome.err_count(), 1);
989 }
990
991 fn topic() -> Topic {
992 Topic::new("evt.test").expect("topic")
993 }
994 fn group() -> ConsumerGroup {
995 ConsumerGroup::new("cg.test").expect("group")
996 }
997
998 #[test]
999 fn subscription_config_preserves_explicit_ack_mode() {
1000 let cfg = SubscriptionConfig::builder(topic(), group())
1001 .ack_mode(AckMode::AutoOnHandlerSuccess)
1002 .max_in_flight(8)
1003 .build()
1004 .expect("build");
1005 assert_eq!(cfg.ack_mode(), AckMode::AutoOnHandlerSuccess);
1006 }
1007
1008 #[test]
1009 fn subscription_config_defaults_to_manual_ack() {
1010 let cfg = SubscriptionConfig::builder(topic(), group())
1011 .max_in_flight(8)
1012 .build()
1013 .expect("build");
1014 assert_eq!(cfg.ack_mode(), AckMode::Manual);
1015 }
1016
1017 #[test]
1018 fn subscription_config_rejects_conflicting_backpressure() {
1019 let res = SubscriptionConfig::builder(topic(), group())
1020 .ack_mode(AckMode::Manual)
1021 .max_in_flight(8)
1022 .backpressure(BackpressurePolicy {
1023 max_in_flight: 16,
1024 max_pending_acks: 32,
1025 max_batch_size: 8,
1026 overflow_strategy: OverflowStrategy::Reject,
1027 })
1028 .build();
1029 assert!(res.is_err());
1030 }
1031}
1032
1033#[cfg(test)]
1034mod topic_tests {
1035 use super::*;
1036
1037 #[test]
1038 fn topic_rejects_empty() {
1039 assert!(Topic::new("").is_err());
1040 }
1041 #[test]
1042 fn topic_rejects_only_whitespace() {
1043 assert!(Topic::new(" ").is_err());
1044 }
1045 #[test]
1046 fn topic_rejects_oversize() {
1047 let s = "a".repeat(Topic::MAX_LEN + 1);
1048 assert!(Topic::new(s).is_err());
1049 }
1050 #[test]
1051 fn topic_rejects_control_chars() {
1052 assert!(Topic::new("foo\x07bar").is_err());
1053 }
1054 #[test]
1055 fn topic_accepts_normal() {
1056 assert!(Topic::new("orders.created").is_ok());
1057 }
1058
1059 #[test]
1060 fn consumer_group_rejects_empty() {
1061 assert!(ConsumerGroup::new("").is_err());
1062 }
1063 #[test]
1064 fn consumer_group_accepts_normal() {
1065 assert!(ConsumerGroup::new("cg.x").is_ok());
1066 }
1067
1068 #[test]
1069 fn consumer_name_auto_is_unique() {
1070 let a = ConsumerName::auto();
1071 let b = ConsumerName::auto();
1072 assert_ne!(a.as_str(), b.as_str());
1073 assert!(a.as_str().starts_with("consumer-"));
1074 }
1075
1076 #[test]
1077 fn message_id_is_unvalidated() {
1078 let id = MessageId::new("");
1079 assert_eq!(id.as_str(), "");
1080 }
1081}