Skip to main content

eventbus_core/eventbus/
mod.rs

1use chrono::{DateTime, Utc};
2use serde::{Deserialize, Serialize};
3use std::collections::HashMap;
4use std::sync::Arc;
5use std::time::Duration;
6
7use crate::contract::delivery::DeliveryInspector;
8use crate::contract::{
9    AckMode, BackpressurePolicy, ConsumerBalanceMode, DeliveryGuarantee, OrderingMode,
10    OverflowStrategy, PublishConfirmation,
11};
12use crate::error::EventBusError;
13
14// ---------------------------------------------------------------------------
15// Core types
16// ---------------------------------------------------------------------------
17
18pub type Headers = HashMap<String, String>;
19
20// ---------------------------------------------------------------------------
21// Newtypes
22// ---------------------------------------------------------------------------
23
24/// Stream / queue identifier. Validated via [`Topic::new`] at construction:
25/// non-empty, ≤ [`Topic::MAX_LEN`] bytes, no ASCII control characters.
26#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
27#[serde(transparent)]
28#[repr(transparent)]
29pub struct Topic(String);
30
31impl Topic {
32    /// Maximum allowed topic length (bytes).
33    pub const MAX_LEN: usize = 1024;
34
35    /// Construct a [`Topic`], validating it at the boundary.
36    pub fn new(s: impl Into<String>) -> Result<Self, EventBusError> {
37        let s = s.into();
38        if s.trim().is_empty() {
39            return Err(EventBusError::Validation("topic is required".into()));
40        }
41        if s.len() > Self::MAX_LEN {
42            return Err(EventBusError::Validation(format!(
43                "topic length {} exceeds limit of {}",
44                s.len(),
45                Self::MAX_LEN,
46            )));
47        }
48        if s.chars().any(|c| c.is_control()) {
49            return Err(EventBusError::Validation(
50                "topic must not contain control characters".into(),
51            ));
52        }
53        Ok(Self(s))
54    }
55
56    #[must_use]
57    pub fn as_str(&self) -> &str {
58        &self.0
59    }
60
61    #[must_use]
62    pub fn into_inner(self) -> String {
63        self.0
64    }
65}
66
67impl std::ops::Deref for Topic {
68    type Target = str;
69    fn deref(&self) -> &str {
70        &self.0
71    }
72}
73
74impl std::fmt::Display for Topic {
75    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
76        f.write_str(&self.0)
77    }
78}
79
80impl AsRef<str> for Topic {
81    fn as_ref(&self) -> &str {
82        &self.0
83    }
84}
85
86impl TryFrom<&str> for Topic {
87    type Error = EventBusError;
88    fn try_from(s: &str) -> Result<Self, Self::Error> {
89        Topic::new(s)
90    }
91}
92
93impl TryFrom<String> for Topic {
94    type Error = EventBusError;
95    fn try_from(s: String) -> Result<Self, Self::Error> {
96        Topic::new(s)
97    }
98}
99
100/// Consumer group name. Validated: non-empty, ≤ [`ConsumerGroup::MAX_LEN`] bytes.
101#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
102#[serde(transparent)]
103#[repr(transparent)]
104pub struct ConsumerGroup(String);
105
106impl ConsumerGroup {
107    pub const MAX_LEN: usize = 256;
108
109    pub fn new(s: impl Into<String>) -> Result<Self, EventBusError> {
110        let s = s.into();
111        if s.trim().is_empty() {
112            return Err(EventBusError::Validation(
113                "consumer group is required".into(),
114            ));
115        }
116        if s.len() > Self::MAX_LEN {
117            return Err(EventBusError::Validation(format!(
118                "consumer group length {} exceeds limit of {}",
119                s.len(),
120                Self::MAX_LEN,
121            )));
122        }
123        Ok(Self(s))
124    }
125
126    #[must_use]
127    pub fn as_str(&self) -> &str {
128        &self.0
129    }
130
131    #[must_use]
132    pub fn into_inner(self) -> String {
133        self.0
134    }
135}
136
137impl std::ops::Deref for ConsumerGroup {
138    type Target = str;
139    fn deref(&self) -> &str {
140        &self.0
141    }
142}
143
144impl std::fmt::Display for ConsumerGroup {
145    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
146        f.write_str(&self.0)
147    }
148}
149
150impl AsRef<str> for ConsumerGroup {
151    fn as_ref(&self) -> &str {
152        &self.0
153    }
154}
155
156impl TryFrom<&str> for ConsumerGroup {
157    type Error = EventBusError;
158    fn try_from(s: &str) -> Result<Self, Self::Error> {
159        Self::new(s)
160    }
161}
162
163impl TryFrom<String> for ConsumerGroup {
164    type Error = EventBusError;
165    fn try_from(s: String) -> Result<Self, Self::Error> {
166        Self::new(s)
167    }
168}
169
170/// Per-process consumer name. Validated: non-empty, ≤ [`ConsumerName::MAX_LEN`].
171/// Use [`ConsumerName::auto`] to obtain a unique auto-generated name.
172#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
173#[serde(transparent)]
174#[repr(transparent)]
175pub struct ConsumerName(String);
176
177impl ConsumerName {
178    pub const MAX_LEN: usize = 256;
179
180    pub fn new(s: impl Into<String>) -> Result<Self, EventBusError> {
181        let s = s.into();
182        if s.trim().is_empty() {
183            return Err(EventBusError::Validation(
184                "consumer name is required".into(),
185            ));
186        }
187        if s.len() > Self::MAX_LEN {
188            return Err(EventBusError::Validation(format!(
189                "consumer name length {} exceeds limit of {}",
190                s.len(),
191                Self::MAX_LEN,
192            )));
193        }
194        Ok(Self(s))
195    }
196
197    /// Auto-generate a unique consumer name of the form
198    /// `consumer-<nanos>-<entropy>`. Bypasses validation since the format is
199    /// known-good.
200    #[must_use]
201    pub fn auto() -> Self {
202        let nanos = chrono::Utc::now().timestamp_nanos_opt().unwrap_or_default();
203        let entropy: u64 = rand::Rng::gen(&mut rand::thread_rng());
204        Self(format!("consumer-{nanos}-{entropy:016x}"))
205    }
206
207    #[must_use]
208    pub fn as_str(&self) -> &str {
209        &self.0
210    }
211
212    #[must_use]
213    pub fn into_inner(self) -> String {
214        self.0
215    }
216}
217
218impl std::ops::Deref for ConsumerName {
219    type Target = str;
220    fn deref(&self) -> &str {
221        &self.0
222    }
223}
224
225impl std::fmt::Display for ConsumerName {
226    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
227        f.write_str(&self.0)
228    }
229}
230
231impl AsRef<str> for ConsumerName {
232    fn as_ref(&self) -> &str {
233        &self.0
234    }
235}
236
237impl TryFrom<&str> for ConsumerName {
238    type Error = EventBusError;
239    fn try_from(s: &str) -> Result<Self, Self::Error> {
240        Self::new(s)
241    }
242}
243
244impl TryFrom<String> for ConsumerName {
245    type Error = EventBusError;
246    fn try_from(s: String) -> Result<Self, Self::Error> {
247        Self::new(s)
248    }
249}
250
251// ---------------------------------------------------------------------------
252// Message
253// ---------------------------------------------------------------------------
254
255#[derive(Debug, Clone, Serialize, Deserialize)]
256pub struct Message {
257    pub uid: String,
258    pub topic: Topic,
259    pub key: String,
260    pub kind: String,
261    pub source: String,
262    pub occurred_at: DateTime<Utc>,
263    pub headers: Headers,
264    #[serde(with = "crate::serde_bytes")]
265    pub payload: bytes::Bytes,
266    #[serde(default, skip_serializing_if = "Option::is_none")]
267    pub content_type: Option<String>,
268    #[serde(default, skip_serializing_if = "Option::is_none")]
269    pub event_version: Option<String>,
270    #[serde(default, skip_serializing_if = "Option::is_none")]
271    pub idempotency_key: Option<String>,
272    #[serde(default, skip_serializing_if = "Option::is_none")]
273    pub expires_at: Option<DateTime<Utc>>,
274    #[serde(default, skip_serializing_if = "Option::is_none")]
275    pub trace_uid: Option<String>,
276    #[serde(default, skip_serializing_if = "Option::is_none")]
277    pub correlation_uid: Option<String>,
278}
279
280// ---------------------------------------------------------------------------
281// Publish options (builder pattern)
282// ---------------------------------------------------------------------------
283
284#[derive(Debug, Clone, Default)]
285#[non_exhaustive]
286#[must_use = "PublishOptions is inert until passed to publish/publish_batch"]
287pub struct PublishOptions {
288    /// Blocks the calling task for the specified duration before publishing.
289    /// For high-throughput scenarios, consider handling delays externally.
290    pub delay: Option<Duration>,
291    pub ordered_key: Option<String>,
292    pub metadata: HashMap<String, String>,
293    pub require_ordered_key: bool,
294    pub guarantee: Option<DeliveryGuarantee>,
295    pub confirmation: Option<PublishConfirmation>,
296    pub idempotency_key: Option<String>,
297    pub backpressure: Option<BackpressurePolicy>,
298    pub topic_ttl: Option<Duration>,
299    pub expected_content_type: Option<String>,
300    pub expected_event_version: Option<String>,
301}
302
303impl PublishOptions {
304    pub fn new() -> Self {
305        Self::default()
306    }
307
308    pub fn with_delay(mut self, delay: Duration) -> Self {
309        self.delay = Some(delay);
310        self
311    }
312
313    pub fn with_ordered_key(mut self, key: impl Into<String>) -> Self {
314        self.ordered_key = Some(key.into());
315        self
316    }
317
318    pub fn with_metadata(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
319        self.metadata.insert(key.into(), value.into());
320        self
321    }
322
323    pub fn with_require_ordered_key(mut self) -> Self {
324        self.require_ordered_key = true;
325        self
326    }
327
328    pub fn with_guarantee(mut self, g: DeliveryGuarantee) -> Self {
329        self.guarantee = Some(g);
330        self
331    }
332
333    pub fn with_confirmation(mut self, c: PublishConfirmation) -> Self {
334        self.confirmation = Some(c);
335        self
336    }
337
338    pub fn with_idempotency_key(mut self, key: impl Into<String>) -> Self {
339        self.idempotency_key = Some(key.into());
340        self
341    }
342
343    pub fn with_backpressure(mut self, bp: BackpressurePolicy) -> Self {
344        self.backpressure = Some(bp);
345        self
346    }
347
348    pub fn with_topic_ttl(mut self, ttl: Duration) -> Self {
349        self.topic_ttl = Some(ttl);
350        self
351    }
352
353    pub fn with_expected_content_type(mut self, content_type: impl Into<String>) -> Self {
354        self.expected_content_type = Some(content_type.into());
355        self
356    }
357
358    pub fn with_expected_event_version(mut self, version: impl Into<String>) -> Self {
359        self.expected_event_version = Some(version.into());
360        self
361    }
362
363    pub fn validate(&self) -> Result<(), EventBusError> {
364        if self.require_ordered_key
365            && self
366                .ordered_key
367                .as_ref()
368                .is_none_or(|k| k.trim().is_empty())
369        {
370            return Err(EventBusError::Validation("ordered key is required".into()));
371        }
372
373        if self.guarantee == Some(DeliveryGuarantee::ExactlyOnce)
374            && self.confirmation != Some(PublishConfirmation::Persisted)
375        {
376            return Err(EventBusError::Validation(format!(
377                "exactly-once requires {:?} confirmation",
378                PublishConfirmation::Persisted,
379            )));
380        }
381
382        if let Some(ref bp) = self.backpressure {
383            bp.validate()?;
384        }
385
386        if let Some(ttl) = self.topic_ttl {
387            if ttl.is_zero() {
388                return Err(EventBusError::Validation("topic ttl must be > 0".into()));
389            }
390        }
391
392        Ok(())
393    }
394}
395
396// ---------------------------------------------------------------------------
397// Subscription config
398// ---------------------------------------------------------------------------
399
400/// Subscription configuration.
401///
402/// ## In-flight sizing precedence
403///
404/// During [`SubscriptionConfig::apply_defaults`] the bus reconciles three
405/// related inputs:
406///
407/// 1. If `backpressure` is set, its `max_in_flight` / `max_pending_acks` seed
408///    the matching fields when those are still `0`. Setting both
409///    `backpressure.max_in_flight` and `max_in_flight` to **different**
410///    non-zero values is a configuration error surfaced by `validate`.
411/// 2. `max_in_flight` falls back to `1` if still unset.
412/// 3. `max_pending_acks` falls back to `2 * max_in_flight`.
413/// 4. If `backpressure` was unset, one is synthesized from the resolved
414///    `max_in_flight` / `max_pending_acks`.
415#[derive(Debug, Clone)]
416#[non_exhaustive]
417pub struct SubscriptionConfig {
418    pub(crate) topic: Topic,
419    pub(crate) consumer_group: ConsumerGroup,
420    pub(crate) consumer_name: ConsumerName,
421    pub(crate) max_retry: usize,
422    pub(crate) retry_backoff: Duration,
423    pub(crate) dead_letter_topic: Option<Topic>,
424    pub(crate) ack_mode: AckMode,
425    pub(crate) ordering_mode: Option<OrderingMode>,
426    pub(crate) balance_mode: Option<ConsumerBalanceMode>,
427    pub(crate) guarantee: Option<DeliveryGuarantee>,
428    pub(crate) max_in_flight: usize,
429    pub(crate) max_pending_acks: usize,
430    pub(crate) wildcard_topic: bool,
431    pub(crate) backpressure: Option<BackpressurePolicy>,
432}
433
434impl SubscriptionConfig {
435    /// Begin building a [`SubscriptionConfig`] for the given topic + group.
436    pub fn builder(topic: Topic, consumer_group: ConsumerGroup) -> SubscriptionConfigBuilder {
437        SubscriptionConfigBuilder::new(topic, consumer_group)
438    }
439
440    #[must_use]
441    pub fn topic(&self) -> &Topic {
442        &self.topic
443    }
444    #[must_use]
445    pub fn consumer_group(&self) -> &ConsumerGroup {
446        &self.consumer_group
447    }
448    #[must_use]
449    pub fn consumer_name(&self) -> &ConsumerName {
450        &self.consumer_name
451    }
452    #[must_use]
453    pub fn ack_mode(&self) -> AckMode {
454        self.ack_mode
455    }
456    #[must_use]
457    pub fn ordering_mode(&self) -> Option<OrderingMode> {
458        self.ordering_mode
459    }
460    #[must_use]
461    pub fn balance_mode(&self) -> Option<ConsumerBalanceMode> {
462        self.balance_mode
463    }
464    #[must_use]
465    pub fn guarantee(&self) -> Option<DeliveryGuarantee> {
466        self.guarantee
467    }
468    #[must_use]
469    pub fn max_in_flight(&self) -> usize {
470        self.max_in_flight
471    }
472    #[must_use]
473    pub fn max_pending_acks(&self) -> usize {
474        self.max_pending_acks
475    }
476    #[must_use]
477    pub fn max_retry(&self) -> usize {
478        self.max_retry
479    }
480    #[must_use]
481    pub fn retry_backoff(&self) -> Duration {
482        self.retry_backoff
483    }
484    #[must_use]
485    pub fn dead_letter_topic(&self) -> Option<&Topic> {
486        self.dead_letter_topic.as_ref()
487    }
488    #[must_use]
489    pub fn backpressure(&self) -> Option<&BackpressurePolicy> {
490        self.backpressure.as_ref()
491    }
492    #[must_use]
493    pub fn wildcard_topic(&self) -> bool {
494        self.wildcard_topic
495    }
496}
497
498/// Builder for [`SubscriptionConfig`]. Construct via
499/// [`SubscriptionConfig::builder`].
500#[must_use = "build the config and pass it to subscribe()"]
501#[derive(Debug, Clone)]
502pub struct SubscriptionConfigBuilder {
503    cfg: SubscriptionConfig,
504}
505
506impl SubscriptionConfigBuilder {
507    fn new(topic: Topic, consumer_group: ConsumerGroup) -> Self {
508        Self {
509            cfg: SubscriptionConfig {
510                topic,
511                consumer_group,
512                consumer_name: ConsumerName::auto(),
513                max_retry: 0,
514                retry_backoff: Duration::ZERO,
515                dead_letter_topic: None,
516                ack_mode: AckMode::Manual,
517                ordering_mode: None,
518                balance_mode: None,
519                guarantee: None,
520                max_in_flight: 0,
521                max_pending_acks: 0,
522                wildcard_topic: false,
523                backpressure: None,
524            },
525        }
526    }
527
528    pub fn consumer_name(mut self, n: ConsumerName) -> Self {
529        self.cfg.consumer_name = n;
530        self
531    }
532    pub fn max_retry(mut self, n: usize) -> Self {
533        self.cfg.max_retry = n;
534        self
535    }
536    pub fn retry_backoff(mut self, d: Duration) -> Self {
537        self.cfg.retry_backoff = d;
538        self
539    }
540    pub fn dead_letter_topic(mut self, t: Topic) -> Self {
541        self.cfg.dead_letter_topic = Some(t);
542        self
543    }
544    pub fn ack_mode(mut self, m: AckMode) -> Self {
545        self.cfg.ack_mode = m;
546        self
547    }
548    pub fn ordering(mut self, o: OrderingMode) -> Self {
549        self.cfg.ordering_mode = Some(o);
550        self
551    }
552    pub fn balance(mut self, b: ConsumerBalanceMode) -> Self {
553        self.cfg.balance_mode = Some(b);
554        self
555    }
556    pub fn guarantee(mut self, g: DeliveryGuarantee) -> Self {
557        self.cfg.guarantee = Some(g);
558        self
559    }
560    pub fn max_in_flight(mut self, n: usize) -> Self {
561        self.cfg.max_in_flight = n;
562        self
563    }
564    pub fn max_pending_acks(mut self, n: usize) -> Self {
565        self.cfg.max_pending_acks = n;
566        self
567    }
568    pub fn backpressure(mut self, p: BackpressurePolicy) -> Self {
569        self.cfg.backpressure = Some(p);
570        self
571    }
572    pub fn wildcard_topic(mut self) -> Self {
573        self.cfg.wildcard_topic = true;
574        self
575    }
576
577    /// Apply defaults, validate, and return the finished config.
578    pub fn build(mut self) -> Result<SubscriptionConfig, EventBusError> {
579        self.cfg.normalize_and_validate()?;
580        Ok(self.cfg)
581    }
582}
583
584impl SubscriptionConfig {
585    /// Fill in zero-value fields with sensible defaults.
586    pub fn apply_defaults(&mut self) {
587        if self.ordering_mode.is_none() {
588            self.ordering_mode = Some(OrderingMode::None);
589        }
590
591        if self.balance_mode.is_none() {
592            self.balance_mode = Some(ConsumerBalanceMode::Competing);
593        }
594
595        if self.guarantee.is_none() {
596            self.guarantee = Some(DeliveryGuarantee::AtLeastOnce);
597        }
598
599        if let Some(ref bp) = self.backpressure {
600            if self.max_in_flight == 0 {
601                self.max_in_flight = bp.max_in_flight;
602            }
603            if self.max_pending_acks == 0 {
604                self.max_pending_acks = bp.max_pending_acks;
605            }
606        }
607
608        if self.max_in_flight == 0 {
609            self.max_in_flight = 1;
610        }
611        if self.max_pending_acks == 0 {
612            self.max_pending_acks = self.max_in_flight * 2;
613        }
614        if self.retry_backoff.is_zero() {
615            self.retry_backoff = Duration::from_millis(100);
616        }
617
618        if self.backpressure.is_none() {
619            self.backpressure = Some(BackpressurePolicy {
620                max_in_flight: self.max_in_flight,
621                max_pending_acks: self.max_pending_acks,
622                max_batch_size: self.max_in_flight,
623                overflow_strategy: OverflowStrategy::Reject,
624            });
625        }
626    }
627
628    /// Check config consistency without mutating fields.
629    /// Call `apply_defaults` first, or use `normalize_and_validate`.
630    pub fn validate(&self) -> Result<(), EventBusError> {
631        if self.ordering_mode.is_none() {
632            return Err(EventBusError::Validation(
633                "ordering mode is required".into(),
634            ));
635        }
636
637        if self.balance_mode.is_none() {
638            return Err(EventBusError::Validation("balance mode is required".into()));
639        }
640
641        if self.guarantee.is_none() {
642            return Err(EventBusError::Validation(
643                "delivery guarantee is required".into(),
644            ));
645        }
646
647        if let Some(ref bp) = self.backpressure {
648            bp.validate()?;
649            if self.max_in_flight > 0 && self.max_in_flight != bp.max_in_flight {
650                return Err(EventBusError::Validation(
651                    "max in flight conflicts with backpressure policy".into(),
652                ));
653            }
654            if self.max_pending_acks > 0 && self.max_pending_acks != bp.max_pending_acks {
655                return Err(EventBusError::Validation(
656                    "max pending acks conflicts with backpressure policy".into(),
657                ));
658            }
659        }
660
661        if self.max_pending_acks < self.max_in_flight {
662            return Err(EventBusError::Validation(
663                "max pending acks must be >= max in flight".into(),
664            ));
665        }
666
667        Ok(())
668    }
669
670    /// Apply defaults then validate. Recommended single-call entrypoint.
671    pub fn normalize_and_validate(&mut self) -> Result<(), EventBusError> {
672        self.apply_defaults();
673        self.validate()
674    }
675}
676
677// ---------------------------------------------------------------------------
678// Traits
679// ---------------------------------------------------------------------------
680
681/// Read-only view of a message in flight.
682///
683/// Backends supply this to handlers as part of [`DeliveryHandle`]. The handler
684/// can inspect the message and delivery state, then call methods on
685/// [`DeliveryControl`] to finalize.
686pub trait Delivery: DeliveryInspector + Send + Sync {
687    fn message(&self) -> &Message;
688}
689
690/// Finalize a delivery exactly once.
691///
692/// Each method consumes `Box<Self>`, so the compiler guarantees a delivery is
693/// finalized at most once: a handler that calls `ack()` cannot call `nack()`
694/// or `retry()` after — the box is already moved.
695///
696/// Dropping the box without calling any of these is also valid: the message
697/// is left un-acked and will be reclaimed by the backend after the
698/// configured idle timeout.
699pub trait DeliveryControl: Send {
700    fn ack(self: Box<Self>) -> crate::BoxFuture<'static, Result<(), EventBusError>>;
701    fn nack(
702        self: Box<Self>,
703        reason: crate::BoxedError,
704    ) -> crate::BoxFuture<'static, Result<(), EventBusError>>;
705    fn retry(
706        self: Box<Self>,
707        reason: crate::BoxedError,
708    ) -> crate::BoxFuture<'static, Result<(), EventBusError>>;
709}
710
711/// Composite trait — a handler-facing delivery. Anything that is both
712/// [`Delivery`] (read) and [`DeliveryControl`] (finalize) qualifies.
713pub trait DeliveryHandle: Delivery + DeliveryControl {}
714impl<T: Delivery + DeliveryControl + ?Sized> DeliveryHandle for T {}
715
716pub trait Handler: Send + Sync {
717    fn handle(
718        &self,
719        delivery: Box<dyn DeliveryHandle>,
720    ) -> crate::BoxFuture<'_, Result<(), EventBusError>>;
721}
722
723pub trait Subscription: Send + Sync {
724    fn name(&self) -> &str;
725    fn close(self: Arc<Self>) -> crate::BoxFuture<'static, Result<(), EventBusError>>;
726}
727
728/// Backend-assigned identifier for a published message. Unvalidated — backends
729/// produce these strings, the bus only forwards them.
730#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
731#[serde(transparent)]
732#[repr(transparent)]
733pub struct MessageId(String);
734
735impl MessageId {
736    #[must_use]
737    pub fn new(s: impl Into<String>) -> Self {
738        Self(s.into())
739    }
740
741    #[must_use]
742    pub fn as_str(&self) -> &str {
743        &self.0
744    }
745
746    #[must_use]
747    pub fn into_inner(self) -> String {
748        self.0
749    }
750}
751
752impl std::ops::Deref for MessageId {
753    type Target = str;
754    fn deref(&self) -> &str {
755        &self.0
756    }
757}
758
759impl std::fmt::Display for MessageId {
760    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
761        f.write_str(&self.0)
762    }
763}
764
765impl AsRef<str> for MessageId {
766    fn as_ref(&self) -> &str {
767        &self.0
768    }
769}
770
771impl From<&str> for MessageId {
772    fn from(s: &str) -> Self {
773        Self::new(s)
774    }
775}
776
777impl From<String> for MessageId {
778    fn from(s: String) -> Self {
779        Self::new(s)
780    }
781}
782
783/// Per-message result for `publish_batch`. PR 5 will populate `results`
784/// during a full re-implementation that no longer fails fast.
785#[derive(Debug)]
786#[non_exhaustive]
787pub struct BatchOutcome {
788    pub results: Vec<Result<MessageId, EventBusError>>,
789}
790
791impl BatchOutcome {
792    #[must_use]
793    pub fn all_ok(&self) -> bool {
794        self.results.iter().all(Result::is_ok)
795    }
796    #[must_use]
797    pub fn ok_count(&self) -> usize {
798        self.results.iter().filter(|r| r.is_ok()).count()
799    }
800    #[must_use]
801    pub fn err_count(&self) -> usize {
802        self.results.len() - self.ok_count()
803    }
804    pub fn errors(&self) -> impl Iterator<Item = &EventBusError> {
805        self.results.iter().filter_map(|r| r.as_ref().err())
806    }
807
808    /// Convert to `Ok(Vec<MessageId>)` when every slot succeeded, or
809    /// `Err(BatchError { outcome: self })` otherwise. The error variant
810    /// preserves the full outcome so callers can still inspect partial
811    /// successes.
812    pub fn into_result(self) -> Result<Vec<MessageId>, BatchError> {
813        if self.all_ok() {
814            Ok(self.results.into_iter().filter_map(Result::ok).collect())
815        } else {
816            Err(BatchError { outcome: self })
817        }
818    }
819}
820
821/// Wraps a partially-failed [`BatchOutcome`] for callers that want a single
822/// `Result`. Carries the full outcome so partial successes remain inspectable.
823#[derive(Debug)]
824pub struct BatchError {
825    pub outcome: BatchOutcome,
826}
827
828impl std::fmt::Display for BatchError {
829    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
830        write!(
831            f,
832            "publish_batch had {} failure(s) out of {} messages",
833            self.outcome.err_count(),
834            self.outcome.results.len()
835        )
836    }
837}
838
839impl std::error::Error for BatchError {}
840
841pub trait Publisher: Send + Sync {
842    fn publish(
843        &self,
844        msg: Message,
845        opts: PublishOptions,
846    ) -> crate::BoxFuture<'_, Result<MessageId, EventBusError>>;
847
848    fn publish_batch(
849        &self,
850        msgs: Vec<Message>,
851        opts: PublishOptions,
852    ) -> crate::BoxFuture<'_, Result<BatchOutcome, EventBusError>>;
853}
854
855/// Convenience layer over [`Publisher`]. Generic methods that monomorphize
856/// for callers who don't need dynamic dispatch.
857pub trait PublisherExt: Publisher {
858    /// Publish an iterator of messages. Collects into `Vec` and delegates
859    /// to [`Publisher::publish_batch`]. The collect is mandatory because the
860    /// dyn-safe `publish_batch` cannot accept an opaque iterator.
861    fn publish_iter<I>(
862        &self,
863        msgs: I,
864        opts: PublishOptions,
865    ) -> crate::BoxFuture<'_, Result<BatchOutcome, EventBusError>>
866    where
867        I: IntoIterator<Item = Message> + Send + 'static,
868        I::IntoIter: Send,
869    {
870        let collected: Vec<Message> = msgs.into_iter().collect();
871        self.publish_batch(collected, opts)
872    }
873}
874
875impl<P: Publisher + ?Sized> PublisherExt for P {}
876
877pub trait Subscriber: Send + Sync {
878    fn subscribe(
879        &self,
880        cfg: SubscriptionConfig,
881        handler: Arc<dyn Handler>,
882    ) -> crate::BoxFuture<'_, Result<Arc<dyn Subscription>, EventBusError>>;
883}
884
885/// Convenience layer over [`Subscriber`]. Generic methods that monomorphize
886/// for callers who don't need dynamic dispatch.
887pub trait SubscriberExt: Subscriber {
888    fn subscribe_with<H>(
889        &self,
890        cfg: SubscriptionConfig,
891        handler: H,
892    ) -> crate::BoxFuture<'_, Result<Arc<dyn Subscription>, EventBusError>>
893    where
894        H: Handler + 'static,
895    {
896        self.subscribe(cfg, Arc::new(handler))
897    }
898}
899
900impl<S: Subscriber + ?Sized> SubscriberExt for S {}
901
902pub trait Bus: Publisher + Subscriber + Send + Sync {}
903
904impl<T> Bus for T where T: Publisher + Subscriber + Send + Sync {}
905
906/// Pluggable wire-format encoder for [`Message`].
907///
908/// Object-safe: backends store an `Arc<dyn Codec>` and dispatch through it.
909/// Implementations own the full envelope so swapping codecs is a binary
910/// decision (e.g. JSON for cross-language compat vs. binary for throughput).
911pub trait Codec: Send + Sync {
912    fn name(&self) -> &str;
913    fn encode(&self, msg: &Message) -> Result<Vec<u8>, EventBusError>;
914    fn decode(&self, bytes: &[u8]) -> Result<Message, EventBusError>;
915}
916
917// ---------------------------------------------------------------------------
918// Tests
919// ---------------------------------------------------------------------------
920
921#[cfg(test)]
922mod tests {
923    use super::*;
924
925    #[test]
926    fn publish_options_accepts_valid() {
927        let opts = PublishOptions::new()
928            .with_ordered_key("user-1")
929            .with_require_ordered_key()
930            .with_guarantee(DeliveryGuarantee::AtLeastOnce)
931            .with_confirmation(PublishConfirmation::Accepted)
932            .with_backpressure(BackpressurePolicy {
933                max_in_flight: 10,
934                max_pending_acks: 20,
935                max_batch_size: 10,
936                overflow_strategy: OverflowStrategy::Reject,
937            });
938        assert!(opts.validate().is_ok());
939    }
940
941    #[test]
942    fn publish_options_rejects_missing_ordered_key() {
943        let opts = PublishOptions::new().with_require_ordered_key();
944        assert!(opts.validate().is_err());
945    }
946
947    #[test]
948    fn publish_options_rejects_exactly_once_without_persisted() {
949        let opts = PublishOptions::new()
950            .with_guarantee(DeliveryGuarantee::ExactlyOnce)
951            .with_confirmation(PublishConfirmation::Accepted);
952        assert!(opts.validate().is_err());
953    }
954
955    #[test]
956    fn publish_options_rejects_exactly_once_without_confirmation() {
957        let opts = PublishOptions::new().with_guarantee(DeliveryGuarantee::ExactlyOnce);
958        assert!(opts.validate().is_err());
959    }
960
961    #[test]
962    fn publish_options_rejects_zero_ttl() {
963        let opts = PublishOptions::new().with_topic_ttl(Duration::ZERO);
964        assert!(opts.validate().is_err());
965    }
966
967    #[test]
968    fn batch_outcome_into_result_ok_when_all_succeed() {
969        let outcome = BatchOutcome {
970            results: vec![Ok(MessageId::new("a")), Ok(MessageId::new("b"))],
971        };
972        let res = outcome.into_result().expect("should be ok");
973        assert_eq!(res.len(), 2);
974        assert_eq!(res[0].as_str(), "a");
975    }
976
977    #[test]
978    fn batch_outcome_into_result_preserves_outcome_on_failure() {
979        let outcome = BatchOutcome {
980            results: vec![
981                Ok(MessageId::new("a")),
982                Err(EventBusError::Validation("bad".into())),
983            ],
984        };
985        let err = outcome.into_result().expect_err("should be err");
986        assert_eq!(err.outcome.results.len(), 2);
987        assert_eq!(err.outcome.ok_count(), 1);
988        assert_eq!(err.outcome.err_count(), 1);
989    }
990
991    fn topic() -> Topic {
992        Topic::new("evt.test").expect("topic")
993    }
994    fn group() -> ConsumerGroup {
995        ConsumerGroup::new("cg.test").expect("group")
996    }
997
998    #[test]
999    fn subscription_config_preserves_explicit_ack_mode() {
1000        let cfg = SubscriptionConfig::builder(topic(), group())
1001            .ack_mode(AckMode::AutoOnHandlerSuccess)
1002            .max_in_flight(8)
1003            .build()
1004            .expect("build");
1005        assert_eq!(cfg.ack_mode(), AckMode::AutoOnHandlerSuccess);
1006    }
1007
1008    #[test]
1009    fn subscription_config_defaults_to_manual_ack() {
1010        let cfg = SubscriptionConfig::builder(topic(), group())
1011            .max_in_flight(8)
1012            .build()
1013            .expect("build");
1014        assert_eq!(cfg.ack_mode(), AckMode::Manual);
1015    }
1016
1017    #[test]
1018    fn subscription_config_rejects_conflicting_backpressure() {
1019        let res = SubscriptionConfig::builder(topic(), group())
1020            .ack_mode(AckMode::Manual)
1021            .max_in_flight(8)
1022            .backpressure(BackpressurePolicy {
1023                max_in_flight: 16,
1024                max_pending_acks: 32,
1025                max_batch_size: 8,
1026                overflow_strategy: OverflowStrategy::Reject,
1027            })
1028            .build();
1029        assert!(res.is_err());
1030    }
1031}
1032
1033#[cfg(test)]
1034mod topic_tests {
1035    use super::*;
1036
1037    #[test]
1038    fn topic_rejects_empty() {
1039        assert!(Topic::new("").is_err());
1040    }
1041    #[test]
1042    fn topic_rejects_only_whitespace() {
1043        assert!(Topic::new("   ").is_err());
1044    }
1045    #[test]
1046    fn topic_rejects_oversize() {
1047        let s = "a".repeat(Topic::MAX_LEN + 1);
1048        assert!(Topic::new(s).is_err());
1049    }
1050    #[test]
1051    fn topic_rejects_control_chars() {
1052        assert!(Topic::new("foo\x07bar").is_err());
1053    }
1054    #[test]
1055    fn topic_accepts_normal() {
1056        assert!(Topic::new("orders.created").is_ok());
1057    }
1058
1059    #[test]
1060    fn consumer_group_rejects_empty() {
1061        assert!(ConsumerGroup::new("").is_err());
1062    }
1063    #[test]
1064    fn consumer_group_accepts_normal() {
1065        assert!(ConsumerGroup::new("cg.x").is_ok());
1066    }
1067
1068    #[test]
1069    fn consumer_name_auto_is_unique() {
1070        let a = ConsumerName::auto();
1071        let b = ConsumerName::auto();
1072        assert_ne!(a.as_str(), b.as_str());
1073        assert!(a.as_str().starts_with("consumer-"));
1074    }
1075
1076    #[test]
1077    fn message_id_is_unvalidated() {
1078        let id = MessageId::new("");
1079        assert_eq!(id.as_str(), "");
1080    }
1081}