Skip to main content

qubit_event_bus/core/
event_bus.rs

1/*******************************************************************************
2 *
3 *    Copyright (c) 2026 Haixing Hu.
4 *
5 *    SPDX-License-Identifier: Apache-2.0
6 *
7 *    Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10//! Event bus abstraction shared by concrete backends.
11// qubit-style: allow multiple-public-types
12
13use crate::{
14    DeadLetterPayload,
15    EventBusError,
16    EventBusResult,
17    EventEnvelope,
18    IntoEventBusResult,
19    PublishOptions,
20    SubscribeOptions,
21    Subscription,
22    Topic,
23};
24use std::time::Duration;
25
26/// Failure captured while best-effort batch publishing continues.
27#[derive(Debug, Clone, Eq, PartialEq)]
28pub struct BatchPublishFailure {
29    index: usize,
30    event_id: String,
31    error: EventBusError,
32}
33
34impl BatchPublishFailure {
35    /// Creates a batch publish failure record.
36    pub(crate) fn new(index: usize, event_id: String, error: EventBusError) -> Self {
37        Self {
38            index,
39            event_id,
40            error,
41        }
42    }
43
44    /// Returns the input index of the failed envelope.
45    ///
46    /// # Returns
47    /// Zero-based index in the input batch.
48    pub fn index(&self) -> usize {
49        self.index
50    }
51
52    /// Returns the failed event ID.
53    ///
54    /// # Returns
55    /// Stable event identifier captured before publishing.
56    pub fn event_id(&self) -> &str {
57        &self.event_id
58    }
59
60    /// Returns the final publish error.
61    ///
62    /// # Returns
63    /// Error returned for this envelope after publish retries.
64    pub fn error(&self) -> &EventBusError {
65        &self.error
66    }
67}
68
69/// Result summary returned by best-effort batch publishing.
70#[derive(Debug, Clone, Eq, PartialEq)]
71pub struct BatchPublishResult {
72    total_count: usize,
73    accepted_count: usize,
74    dropped_count: usize,
75    failures: Vec<BatchPublishFailure>,
76}
77
78impl BatchPublishResult {
79    /// Creates an empty batch publish result.
80    pub(crate) fn new(total_count: usize) -> Self {
81        Self {
82            total_count,
83            accepted_count: 0,
84            dropped_count: 0,
85            failures: Vec::new(),
86        }
87    }
88
89    /// Records one accepted envelope submission.
90    pub(crate) fn record_accepted(&mut self) {
91        self.accepted_count += 1;
92    }
93
94    /// Records one envelope dropped by publisher interceptors.
95    pub(crate) fn record_dropped(&mut self) {
96        self.dropped_count += 1;
97    }
98
99    /// Records one failed envelope submission.
100    pub(crate) fn record_failure(&mut self, failure: BatchPublishFailure) {
101        self.failures.push(failure);
102    }
103
104    /// Returns the total number of envelopes in the batch.
105    ///
106    /// # Returns
107    /// Input envelope count.
108    pub fn total_count(&self) -> usize {
109        self.total_count
110    }
111
112    /// Returns the number of envelopes accepted by the backend.
113    ///
114    /// # Returns
115    /// Accepted submission count.
116    pub fn accepted_count(&self) -> usize {
117        self.accepted_count
118    }
119
120    /// Returns the number of envelopes dropped before dispatch.
121    ///
122    /// # Returns
123    /// Drop count reported by publisher interceptors.
124    pub fn dropped_count(&self) -> usize {
125        self.dropped_count
126    }
127
128    /// Returns the number of failed envelope submissions.
129    ///
130    /// # Returns
131    /// Failure count.
132    pub fn failure_count(&self) -> usize {
133        self.failures.len()
134    }
135
136    /// Returns captured per-envelope failures.
137    ///
138    /// # Returns
139    /// Failures in input order.
140    pub fn failures(&self) -> &[BatchPublishFailure] {
141        &self.failures
142    }
143
144    /// Returns whether the batch completed without per-envelope failures.
145    ///
146    /// # Returns
147    /// `true` when every envelope was accepted or intentionally dropped.
148    pub fn is_success(&self) -> bool {
149        self.failures.is_empty()
150    }
151}
152
153/// Common event bus contract implemented by concrete backends.
154///
155/// The trait mirrors the Java `EventBus` interface with Rust ownership and
156/// error handling. Methods are generic over the payload type, so the trait is
157/// intended for static dispatch rather than `dyn EventBus` trait objects.
158///
159/// Unless a backend documents stronger transactional semantics, publish
160/// operations are non-transactional across matching subscribers. A dispatch
161/// error can be returned after earlier subscriber work has already been
162/// accepted.
163pub trait EventBus: Clone + Send + Sync + 'static {
164    /// Starts the event bus.
165    ///
166    /// # Returns
167    /// `Ok(true)` when this call changed the bus from stopped to started.
168    ///
169    /// # Errors
170    /// Returns backend-specific startup errors when resources cannot be created.
171    fn start(&self) -> EventBusResult<bool>;
172
173    /// Shuts down the event bus.
174    ///
175    /// # Returns
176    /// `true` when this call changed the bus from started to stopped.
177    fn shutdown(&self) -> bool;
178
179    /// Publishes a payload to a topic.
180    ///
181    /// # Parameters
182    /// - `topic`: Target topic.
183    /// - `payload`: Event payload.
184    ///
185    /// # Returns
186    /// `Ok(())` after the backend accepts the event.
187    ///
188    /// # Errors
189    /// Returns backend-specific errors such as a stopped bus or dispatch failure.
190    fn publish<T>(&self, topic: &Topic<T>, payload: T) -> EventBusResult<()>
191    where
192        T: Clone + Send + Sync + 'static,
193    {
194        self.publish_envelope(EventEnvelope::create(topic.clone(), payload))
195    }
196
197    /// Publishes a payload to a topic with explicit publish options.
198    ///
199    /// # Parameters
200    /// - `topic`: Target topic.
201    /// - `payload`: Event payload.
202    /// - `options`: Publish options applied to this event.
203    ///
204    /// # Returns
205    /// `Ok(())` after the backend accepts the event.
206    ///
207    /// # Errors
208    /// Returns backend-specific publish errors.
209    fn publish_with_options<T>(
210        &self,
211        topic: &Topic<T>,
212        payload: T,
213        options: PublishOptions<T>,
214    ) -> EventBusResult<()>
215    where
216        T: Clone + Send + Sync + 'static,
217    {
218        self.publish_envelope_with_options(EventEnvelope::create(topic.clone(), payload), options)
219    }
220
221    /// Publishes an existing envelope with default publish options.
222    ///
223    /// # Parameters
224    /// - `envelope`: Event envelope to publish.
225    ///
226    /// # Returns
227    /// `Ok(())` after the backend accepts the event.
228    ///
229    /// # Errors
230    /// Returns backend-specific publishing errors.
231    fn publish_envelope<T>(&self, envelope: EventEnvelope<T>) -> EventBusResult<()>
232    where
233        T: Clone + Send + Sync + 'static,
234    {
235        self.publish_envelope_with_options(envelope, PublishOptions::empty())
236    }
237
238    /// Publishes an existing envelope with explicit publish options.
239    ///
240    /// # Parameters
241    /// - `envelope`: Event envelope to publish.
242    /// - `options`: Publish options applied to this event.
243    ///
244    /// # Returns
245    /// `Ok(())` after the backend accepts the event.
246    ///
247    /// # Errors
248    /// Returns backend-specific publishing errors.
249    fn publish_envelope_with_options<T>(
250        &self,
251        envelope: EventEnvelope<T>,
252        options: PublishOptions<T>,
253    ) -> EventBusResult<()>
254    where
255        T: Clone + Send + Sync + 'static;
256
257    /// Publishes a batch of envelopes with default publish options.
258    ///
259    /// The default implementation submits envelopes in input order. Concrete
260    /// backends may still execute handlers concurrently unless they document a
261    /// stronger ordering guarantee.
262    ///
263    /// # Parameters
264    /// - `envelopes`: Envelopes to submit in order.
265    ///
266    /// # Returns
267    /// Summary containing per-envelope successes and failures.
268    ///
269    /// # Errors
270    /// Returns backend-level batch precondition errors.
271    fn publish_all<T>(&self, envelopes: Vec<EventEnvelope<T>>) -> EventBusResult<BatchPublishResult>
272    where
273        T: Clone + Send + Sync + 'static,
274    {
275        self.publish_all_with_options(envelopes, PublishOptions::empty())
276    }
277
278    /// Publishes a batch of envelopes with explicit publish options.
279    ///
280    /// The default implementation submits envelopes in input order. Concrete
281    /// backends may still execute handlers concurrently unless they document a
282    /// stronger ordering guarantee.
283    ///
284    /// # Parameters
285    /// - `envelopes`: Envelopes to submit in order.
286    /// - `options`: Publish options cloned for each envelope.
287    ///
288    /// # Returns
289    /// Summary containing per-envelope successes and failures.
290    ///
291    /// # Errors
292    /// Returns backend-level batch precondition errors. Per-envelope publish
293    /// failures are captured in [`BatchPublishResult`].
294    fn publish_all_with_options<T>(
295        &self,
296        envelopes: Vec<EventEnvelope<T>>,
297        options: PublishOptions<T>,
298    ) -> EventBusResult<BatchPublishResult>
299    where
300        T: Clone + Send + Sync + 'static,
301    {
302        let mut result = BatchPublishResult::new(envelopes.len());
303        for (index, envelope) in envelopes.into_iter().enumerate() {
304            let event_id = envelope.id().to_string();
305            match self.publish_envelope_with_options(envelope, options.clone()) {
306                Ok(()) => result.record_accepted(),
307                Err(error) => {
308                    result.record_failure(BatchPublishFailure::new(index, event_id, error));
309                }
310            }
311        }
312        Ok(result)
313    }
314
315    /// Subscribes a handler using backend default options.
316    ///
317    /// # Parameters
318    /// - `subscriber_id`: Subscriber identifier.
319    /// - `topic`: Topic to subscribe.
320    /// - `handler`: Handler invoked for matching events.
321    ///
322    /// # Returns
323    /// Subscription handle.
324    ///
325    /// # Errors
326    /// Returns backend-specific subscription errors.
327    fn subscribe<T, S, F, R>(
328        &self,
329        subscriber_id: S,
330        topic: &Topic<T>,
331        handler: F,
332    ) -> EventBusResult<Subscription<T>>
333    where
334        T: Clone + Send + Sync + 'static,
335        S: Into<String>,
336        F: Fn(EventEnvelope<T>) -> R + Send + Sync + 'static,
337        R: IntoEventBusResult + 'static,
338    {
339        self.subscribe_with_options(subscriber_id, topic, handler, SubscribeOptions::empty())
340    }
341
342    /// Subscribes a handler using explicit options.
343    ///
344    /// # Parameters
345    /// - `subscriber_id`: Subscriber identifier.
346    /// - `topic`: Topic to subscribe.
347    /// - `handler`: Handler invoked for matching events.
348    /// - `options`: Subscription options.
349    ///
350    /// # Returns
351    /// Subscription handle.
352    ///
353    /// # Errors
354    /// Returns backend-specific subscription errors.
355    fn subscribe_with_options<T, S, F, R>(
356        &self,
357        subscriber_id: S,
358        topic: &Topic<T>,
359        handler: F,
360        options: SubscribeOptions<T>,
361    ) -> EventBusResult<Subscription<T>>
362    where
363        T: Clone + Send + Sync + 'static,
364        S: Into<String>,
365        F: Fn(EventEnvelope<T>) -> R + Send + Sync + 'static,
366        R: IntoEventBusResult + 'static;
367
368    /// Registers a handler for standard dead-letter payloads.
369    ///
370    /// The default implementation adapts the handler into a normal subscription
371    /// with a deterministic system subscriber ID derived from the topic name.
372    ///
373    /// # Parameters
374    /// - `dead_letter_topic`: Topic carrying [`DeadLetterPayload`] events.
375    /// - `handler`: Handler invoked for dead-letter events.
376    /// - `options`: Subscription options for dead-letter consumption.
377    ///
378    /// # Returns
379    /// Subscription handle for the dead-letter handler.
380    ///
381    /// # Errors
382    /// Returns backend-specific subscription errors.
383    fn add_dead_letter_handler<F, R>(
384        &self,
385        dead_letter_topic: &Topic<DeadLetterPayload>,
386        handler: F,
387        options: SubscribeOptions<DeadLetterPayload>,
388    ) -> EventBusResult<Subscription<DeadLetterPayload>>
389    where
390        F: Fn(EventEnvelope<DeadLetterPayload>) -> R + Send + Sync + 'static,
391        R: IntoEventBusResult + 'static,
392    {
393        self.subscribe_with_options(
394            format!("dead-letter:{}", dead_letter_topic.name()),
395            dead_letter_topic,
396            handler,
397            options,
398        )
399    }
400
401    /// Waits until all work for a topic is idle.
402    ///
403    /// # Parameters
404    /// - `topic`: Topic to wait for.
405    ///
406    /// # Returns
407    /// `Ok(())` once the topic has no active handler work.
408    ///
409    /// # Errors
410    /// Returns backend-specific wait errors.
411    fn wait_for_idle<T>(&self, topic: &Topic<T>) -> EventBusResult<()>
412    where
413        T: 'static;
414
415    /// Waits until all work for a topic is idle or the timeout elapses.
416    ///
417    /// # Parameters
418    /// - `topic`: Topic to wait for.
419    /// - `timeout`: Maximum duration to wait.
420    ///
421    /// # Returns
422    /// `Ok(true)` once the topic has no active handler work, or `Ok(false)` when
423    /// the timeout elapses first.
424    ///
425    /// # Errors
426    /// Returns backend-specific wait errors.
427    fn wait_for_idle_timeout<T>(&self, topic: &Topic<T>, timeout: Duration) -> EventBusResult<bool>
428    where
429        T: 'static;
430}