qubit_event_bus/core/event_bus.rs
1/*******************************************************************************
2 *
3 * Copyright (c) 2026 Haixing Hu.
4 *
5 * SPDX-License-Identifier: Apache-2.0
6 *
7 * Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10//! Event bus abstraction shared by concrete backends.
11// qubit-style: allow multiple-public-types
12
13use crate::{
14 DeadLetterPayload,
15 EventBusError,
16 EventBusResult,
17 EventEnvelope,
18 IntoEventBusResult,
19 PublishOptions,
20 SubscribeOptions,
21 Subscription,
22 Topic,
23};
24use std::time::Duration;
25
26/// Failure captured while best-effort batch publishing continues.
27#[derive(Debug, Clone, Eq, PartialEq)]
28pub struct BatchPublishFailure {
29 index: usize,
30 event_id: String,
31 error: EventBusError,
32}
33
34impl BatchPublishFailure {
35 /// Creates a batch publish failure record.
36 pub(crate) fn new(index: usize, event_id: String, error: EventBusError) -> Self {
37 Self {
38 index,
39 event_id,
40 error,
41 }
42 }
43
44 /// Returns the input index of the failed envelope.
45 ///
46 /// # Returns
47 /// Zero-based index in the input batch.
48 pub fn index(&self) -> usize {
49 self.index
50 }
51
52 /// Returns the failed event ID.
53 ///
54 /// # Returns
55 /// Stable event identifier captured before publishing.
56 pub fn event_id(&self) -> &str {
57 &self.event_id
58 }
59
60 /// Returns the final publish error.
61 ///
62 /// # Returns
63 /// Error returned for this envelope after publish retries.
64 pub fn error(&self) -> &EventBusError {
65 &self.error
66 }
67}
68
69/// Result summary returned by best-effort batch publishing.
70#[derive(Debug, Clone, Eq, PartialEq)]
71pub struct BatchPublishResult {
72 total_count: usize,
73 accepted_count: usize,
74 dropped_count: usize,
75 failures: Vec<BatchPublishFailure>,
76}
77
78impl BatchPublishResult {
79 /// Creates an empty batch publish result.
80 pub(crate) fn new(total_count: usize) -> Self {
81 Self {
82 total_count,
83 accepted_count: 0,
84 dropped_count: 0,
85 failures: Vec::new(),
86 }
87 }
88
89 /// Records one accepted envelope submission.
90 pub(crate) fn record_accepted(&mut self) {
91 self.accepted_count += 1;
92 }
93
94 /// Records one envelope dropped by publisher interceptors.
95 pub(crate) fn record_dropped(&mut self) {
96 self.dropped_count += 1;
97 }
98
99 /// Records one failed envelope submission.
100 pub(crate) fn record_failure(&mut self, failure: BatchPublishFailure) {
101 self.failures.push(failure);
102 }
103
104 /// Returns the total number of envelopes in the batch.
105 ///
106 /// # Returns
107 /// Input envelope count.
108 pub fn total_count(&self) -> usize {
109 self.total_count
110 }
111
112 /// Returns the number of envelopes accepted by the backend.
113 ///
114 /// # Returns
115 /// Accepted submission count.
116 pub fn accepted_count(&self) -> usize {
117 self.accepted_count
118 }
119
120 /// Returns the number of envelopes dropped before dispatch.
121 ///
122 /// # Returns
123 /// Drop count reported by publisher interceptors.
124 pub fn dropped_count(&self) -> usize {
125 self.dropped_count
126 }
127
128 /// Returns the number of failed envelope submissions.
129 ///
130 /// # Returns
131 /// Failure count.
132 pub fn failure_count(&self) -> usize {
133 self.failures.len()
134 }
135
136 /// Returns captured per-envelope failures.
137 ///
138 /// # Returns
139 /// Failures in input order.
140 pub fn failures(&self) -> &[BatchPublishFailure] {
141 &self.failures
142 }
143
144 /// Returns whether the batch completed without per-envelope failures.
145 ///
146 /// # Returns
147 /// `true` when every envelope was accepted or intentionally dropped.
148 pub fn is_success(&self) -> bool {
149 self.failures.is_empty()
150 }
151}
152
153/// Common event bus contract implemented by concrete backends.
154///
155/// The trait mirrors the Java `EventBus` interface with Rust ownership and
156/// error handling. Methods are generic over the payload type, so the trait is
157/// intended for static dispatch rather than `dyn EventBus` trait objects.
158///
159/// Unless a backend documents stronger transactional semantics, publish
160/// operations are non-transactional across matching subscribers. A dispatch
161/// error can be returned after earlier subscriber work has already been
162/// accepted.
163pub trait EventBus: Clone + Send + Sync + 'static {
164 /// Starts the event bus.
165 ///
166 /// # Returns
167 /// `Ok(true)` when this call changed the bus from stopped to started.
168 ///
169 /// # Errors
170 /// Returns backend-specific startup errors when resources cannot be created.
171 fn start(&self) -> EventBusResult<bool>;
172
173 /// Shuts down the event bus.
174 ///
175 /// # Returns
176 /// `true` when this call changed the bus from started to stopped.
177 fn shutdown(&self) -> bool;
178
179 /// Publishes a payload to a topic.
180 ///
181 /// # Parameters
182 /// - `topic`: Target topic.
183 /// - `payload`: Event payload.
184 ///
185 /// # Returns
186 /// `Ok(())` after the backend accepts the event.
187 ///
188 /// # Errors
189 /// Returns backend-specific errors such as a stopped bus or dispatch failure.
190 fn publish<T>(&self, topic: &Topic<T>, payload: T) -> EventBusResult<()>
191 where
192 T: Clone + Send + Sync + 'static,
193 {
194 self.publish_envelope(EventEnvelope::create(topic.clone(), payload))
195 }
196
197 /// Publishes a payload to a topic with explicit publish options.
198 ///
199 /// # Parameters
200 /// - `topic`: Target topic.
201 /// - `payload`: Event payload.
202 /// - `options`: Publish options applied to this event.
203 ///
204 /// # Returns
205 /// `Ok(())` after the backend accepts the event.
206 ///
207 /// # Errors
208 /// Returns backend-specific publish errors.
209 fn publish_with_options<T>(
210 &self,
211 topic: &Topic<T>,
212 payload: T,
213 options: PublishOptions<T>,
214 ) -> EventBusResult<()>
215 where
216 T: Clone + Send + Sync + 'static,
217 {
218 self.publish_envelope_with_options(EventEnvelope::create(topic.clone(), payload), options)
219 }
220
221 /// Publishes an existing envelope with default publish options.
222 ///
223 /// # Parameters
224 /// - `envelope`: Event envelope to publish.
225 ///
226 /// # Returns
227 /// `Ok(())` after the backend accepts the event.
228 ///
229 /// # Errors
230 /// Returns backend-specific publishing errors.
231 fn publish_envelope<T>(&self, envelope: EventEnvelope<T>) -> EventBusResult<()>
232 where
233 T: Clone + Send + Sync + 'static,
234 {
235 self.publish_envelope_with_options(envelope, PublishOptions::empty())
236 }
237
238 /// Publishes an existing envelope with explicit publish options.
239 ///
240 /// # Parameters
241 /// - `envelope`: Event envelope to publish.
242 /// - `options`: Publish options applied to this event.
243 ///
244 /// # Returns
245 /// `Ok(())` after the backend accepts the event.
246 ///
247 /// # Errors
248 /// Returns backend-specific publishing errors.
249 fn publish_envelope_with_options<T>(
250 &self,
251 envelope: EventEnvelope<T>,
252 options: PublishOptions<T>,
253 ) -> EventBusResult<()>
254 where
255 T: Clone + Send + Sync + 'static;
256
257 /// Publishes a batch of envelopes with default publish options.
258 ///
259 /// The default implementation submits envelopes in input order. Concrete
260 /// backends may still execute handlers concurrently unless they document a
261 /// stronger ordering guarantee.
262 ///
263 /// # Parameters
264 /// - `envelopes`: Envelopes to submit in order.
265 ///
266 /// # Returns
267 /// Summary containing per-envelope successes and failures.
268 ///
269 /// # Errors
270 /// Returns backend-level batch precondition errors.
271 fn publish_all<T>(&self, envelopes: Vec<EventEnvelope<T>>) -> EventBusResult<BatchPublishResult>
272 where
273 T: Clone + Send + Sync + 'static,
274 {
275 self.publish_all_with_options(envelopes, PublishOptions::empty())
276 }
277
278 /// Publishes a batch of envelopes with explicit publish options.
279 ///
280 /// The default implementation submits envelopes in input order. Concrete
281 /// backends may still execute handlers concurrently unless they document a
282 /// stronger ordering guarantee.
283 ///
284 /// # Parameters
285 /// - `envelopes`: Envelopes to submit in order.
286 /// - `options`: Publish options cloned for each envelope.
287 ///
288 /// # Returns
289 /// Summary containing per-envelope successes and failures.
290 ///
291 /// # Errors
292 /// Returns backend-level batch precondition errors. Per-envelope publish
293 /// failures are captured in [`BatchPublishResult`].
294 fn publish_all_with_options<T>(
295 &self,
296 envelopes: Vec<EventEnvelope<T>>,
297 options: PublishOptions<T>,
298 ) -> EventBusResult<BatchPublishResult>
299 where
300 T: Clone + Send + Sync + 'static,
301 {
302 let mut result = BatchPublishResult::new(envelopes.len());
303 for (index, envelope) in envelopes.into_iter().enumerate() {
304 let event_id = envelope.id().to_string();
305 match self.publish_envelope_with_options(envelope, options.clone()) {
306 Ok(()) => result.record_accepted(),
307 Err(error) => {
308 result.record_failure(BatchPublishFailure::new(index, event_id, error));
309 }
310 }
311 }
312 Ok(result)
313 }
314
315 /// Subscribes a handler using backend default options.
316 ///
317 /// # Parameters
318 /// - `subscriber_id`: Subscriber identifier.
319 /// - `topic`: Topic to subscribe.
320 /// - `handler`: Handler invoked for matching events.
321 ///
322 /// # Returns
323 /// Subscription handle.
324 ///
325 /// # Errors
326 /// Returns backend-specific subscription errors.
327 fn subscribe<T, S, F, R>(
328 &self,
329 subscriber_id: S,
330 topic: &Topic<T>,
331 handler: F,
332 ) -> EventBusResult<Subscription<T>>
333 where
334 T: Clone + Send + Sync + 'static,
335 S: Into<String>,
336 F: Fn(EventEnvelope<T>) -> R + Send + Sync + 'static,
337 R: IntoEventBusResult + 'static,
338 {
339 self.subscribe_with_options(subscriber_id, topic, handler, SubscribeOptions::empty())
340 }
341
342 /// Subscribes a handler using explicit options.
343 ///
344 /// # Parameters
345 /// - `subscriber_id`: Subscriber identifier.
346 /// - `topic`: Topic to subscribe.
347 /// - `handler`: Handler invoked for matching events.
348 /// - `options`: Subscription options.
349 ///
350 /// # Returns
351 /// Subscription handle.
352 ///
353 /// # Errors
354 /// Returns backend-specific subscription errors.
355 fn subscribe_with_options<T, S, F, R>(
356 &self,
357 subscriber_id: S,
358 topic: &Topic<T>,
359 handler: F,
360 options: SubscribeOptions<T>,
361 ) -> EventBusResult<Subscription<T>>
362 where
363 T: Clone + Send + Sync + 'static,
364 S: Into<String>,
365 F: Fn(EventEnvelope<T>) -> R + Send + Sync + 'static,
366 R: IntoEventBusResult + 'static;
367
368 /// Registers a handler for standard dead-letter payloads.
369 ///
370 /// The default implementation adapts the handler into a normal subscription
371 /// with a deterministic system subscriber ID derived from the topic name.
372 ///
373 /// # Parameters
374 /// - `dead_letter_topic`: Topic carrying [`DeadLetterPayload`] events.
375 /// - `handler`: Handler invoked for dead-letter events.
376 /// - `options`: Subscription options for dead-letter consumption.
377 ///
378 /// # Returns
379 /// Subscription handle for the dead-letter handler.
380 ///
381 /// # Errors
382 /// Returns backend-specific subscription errors.
383 fn add_dead_letter_handler<F, R>(
384 &self,
385 dead_letter_topic: &Topic<DeadLetterPayload>,
386 handler: F,
387 options: SubscribeOptions<DeadLetterPayload>,
388 ) -> EventBusResult<Subscription<DeadLetterPayload>>
389 where
390 F: Fn(EventEnvelope<DeadLetterPayload>) -> R + Send + Sync + 'static,
391 R: IntoEventBusResult + 'static,
392 {
393 self.subscribe_with_options(
394 format!("dead-letter:{}", dead_letter_topic.name()),
395 dead_letter_topic,
396 handler,
397 options,
398 )
399 }
400
401 /// Waits until all work for a topic is idle.
402 ///
403 /// # Parameters
404 /// - `topic`: Topic to wait for.
405 ///
406 /// # Returns
407 /// `Ok(())` once the topic has no active handler work.
408 ///
409 /// # Errors
410 /// Returns backend-specific wait errors.
411 fn wait_for_idle<T>(&self, topic: &Topic<T>) -> EventBusResult<()>
412 where
413 T: 'static;
414
415 /// Waits until all work for a topic is idle or the timeout elapses.
416 ///
417 /// # Parameters
418 /// - `topic`: Topic to wait for.
419 /// - `timeout`: Maximum duration to wait.
420 ///
421 /// # Returns
422 /// `Ok(true)` once the topic has no active handler work, or `Ok(false)` when
423 /// the timeout elapses first.
424 ///
425 /// # Errors
426 /// Returns backend-specific wait errors.
427 fn wait_for_idle_timeout<T>(&self, topic: &Topic<T>, timeout: Duration) -> EventBusResult<bool>
428 where
429 T: 'static;
430}