evno/lib.rs
1//! # evno
2//!
3//! `evno` is a high-performance, type-safe asynchronous event bus library, designed specifically
4//! for the Rust Tokio runtime. It combines an **ultra-fast multicast channel** ([`gyre`](https://docs.rs/gyre))
5//! with the **structured concurrency** of the [`acty`](https://docs.rs/acty) Actor model,
6//! providing an easy-to-use event distribution system that supports middleware and graceful shutdown.
7//!
8//! ## Core Design and Features
9//!
10//! 1. **Strongly-Typed Event Dispatch:** The `Bus` maintains separate publishers internally for
11//! different event types (`E: Event`), ensuring compile-time type safety for event sending and receiving.
12//!
13//! 2. **Startup Guarantee (`BindLatch`):** Publishers wait for all listeners currently starting up to
14//! complete their subscription registration before delivering events. This entirely prevents
15//! transient event loss due to startup race conditions.
16//!
17//! 3. **Actor-Driven Lifecycle:** Each subscription (`bind`) launches an independent Actor
18//! (`ListenerActor`), featuring a `begin -> handle -> after` lifecycle. This is tightly integrated
19//! with `CancellationToken` to guarantee **structured concurrency** and **graceful shutdown** for tasks.
20//!
21//! 4. **Type-Safe Middleware (`Chain`):** Using the [`Chain`](./chain/struct.Chain.html) and [`Step`](./chain/trait.Step.html)
22//! Traits, you can build rich, type-safe pipelines to preprocess events (e.g., inject context or logging)
23//! before they reach the bus.
24//!
25//! ## Quick Start
26//!
27//! This is the simplest example of `Bus` usage, showing how to bind a listener and emit an event.
28//!
29//! ```rust
30//! use evno::{Bus, from_fn, Emit, Guard, Close};
31//! use std::time::Duration;
32//!
33//! // 1. Define the event
34//! #[derive(Clone, Debug)]
35//! struct UserLoggedIn {
36//! username: String,
37//! }
38//!
39//! #[tokio::main]
40//! async fn main() {
41//! // 1. Initialize the Bus with a capacity of 4
42//! let bus = Bus::new(4);
43//!
44//! // 2. Bind a listener (using from_fn to wrap an async closure)
45//! let handle = bus.on(from_fn(|event: Guard<UserLoggedIn>| async move {
46//! println!("Listener received login for: {}", event.username);
47//! tokio::time::sleep(Duration::from_millis(5)).await;
48//! }));
49//!
50//! // 3. Emit events
51//! bus.emit(UserLoggedIn { username: "Alice".to_string() }).await;
52//! bus.emit(UserLoggedIn { username: "Bob".to_string() }).await;
53//!
54//! // 4. Graceful shutdown (wait for all event processing to complete)
55//! // close() checks if the bus is the last reference; if so, it executes drain().
56//! // This waits for all listeners to finish their execution.
57//! bus.close().await;
58//! }
59//! ```
60//!
61//! ## Core Concepts: Lifecycle and Shutdown
62//!
63//! `Bus` instances are cloneable.
64//!
65//! ### `Drain` vs. `Close`
66//!
67//! | Method | Semantics | Behavior |
68//! | :--- | :--- | :--- |
69//! | [`Drain`](./emit/trait.Drain.html) | **Global Drain**. Consumes the caller's `Bus` instance. | Blocks until: **1.** All `Bus` clones have been dropped. **2.** All running Listener tasks have finished processing and exited. |
70//! | [`Close`](./emit/trait.Close.html) | **Conditional Close**. Consumes the caller's `Bus` instance. | **1.** If the current instance is the **last** `Bus` clone, the behavior is equivalent to `drain()`. **2.** If **other clones still exist**, it only drops the current clone and returns immediately. |
71//!
72//! **Best Practice:** Always use `close()` in your application. The system will automatically trigger a global drain only when the last holder releases the `Bus`.
73//!
74//! ### Listener Control Flow and Self-Cancellation
75//!
76//! The [`Listener`](./listener/trait.Listener.html) Trait allows you to define complex event processing logic.
77//!
78//! * The `handle` method receives a `&CancellationToken`. A Listener can initiate its own conditional exit by calling `cancel.cancel()`.
79//! * **Utility Functions:**
80//! * [`from_fn`](./listener/fn.from_fn.html): Suitable for simple asynchronous closures.
81//! * [`from_fn_with_cancel`](./listener/fn.from_fn_with_cancel.html): Suitable for closures that need access to the `CancellationToken` within the `handle` logic to perform self-cancellation.
82//!
83//! ## Event Chain and Context Injection
84//!
85//! The `Chain` mechanism enables decorating or transforming events before they reach the bus, offering middleware capabilities.
86//!
87//! ```rust
88//! use evno::{Event, Step, Guard, from_fn, Bus, Emit, Close, Chain};
89//! use std::sync::atomic::{AtomicU64, Ordering};
90//! use std::sync::Arc;
91//!
92//! #[derive(Debug, Clone, PartialEq)]
93//! struct OriginalEvent(String);
94//!
95//! #[derive(Debug, Clone, PartialEq)]
96//! struct RequestContext { request_id: u64 }
97//!
98//! // The event type after Step transformation (Input E -> Output ContextualEvent<E>)
99//! #[derive(Debug, Clone, PartialEq)]
100//! struct ContextualEvent<E>(E, RequestContext);
101//!
102//! // Define a Step to inject RequestContext
103//! #[derive(Clone)]
104//! struct RequestInjector(Arc<AtomicU64>);
105//!
106//! impl Step for RequestInjector {
107//! // Use GAT (Generic Associated Type) to define the output type:
108//! // For any incoming event E, the output type is ContextualEvent<E>
109//! type Event<E: Event> = ContextualEvent<E>;
110//!
111//! async fn process<E: Event>(self, event: E) -> Self::Event<E> {
112//! let id = self.0.fetch_add(1, Ordering::Relaxed);
113//! ContextualEvent(event, RequestContext { request_id: id })
114//! }
115//! }
116//!
117//! #[tokio::main]
118//! async fn main() {
119//! let bus = Bus::new(4);
120//! let counter = Arc::new(AtomicU64::new(1000));
121//!
122//! // 1. Create the event processing chain: prepend RequestInjector onto the Bus
123//! let chain = Chain::from(bus.clone()).prepend(RequestInjector(counter));
124//!
125//! // 2. Bind a listener. Note: It must listen for the type processed by the Step
126//! let handle = bus.on(from_fn(|event: Guard<ContextualEvent<OriginalEvent>>| async move {
127//! // We can safely access the injected context
128//! println!(
129//! "[ID: {}] Processing event: {}",
130//! event.1.request_id,
131//! event.0.0
132//! );
133//! assert!(event.1.request_id == 1000);
134//! }));
135//!
136//! // 3. Emit the original event through the pipeline
137//! chain.emit(OriginalEvent("First request".to_string())).await;
138//! chain.emit(OriginalEvent("Second request".to_string())).await;
139//!
140//! // 4. Graceful shutdown
141//! chain.close().await;
142//! bus.close().await;
143//! }
144//! ```
145//!
146//! ## Module Overview
147//!
148//! * [`Bus`](./bus/struct.Bus.html): The core structure of the event bus.
149//! * [`Chain`](./chain/struct.Chain.html) / [`Step`](./chain/trait.Step.html): Mechanisms for building event processing pipelines.
150//! * [`Publisher`](./publisher/struct.Publisher.html): The sending endpoint for a specific event type `E`.
151//! * [`Listener`](./listener/trait.Listener.html): The Trait implemented by users for defining event handling logic.
152//! * [`Emit`](./emit/trait.Emit.html) / [`TypedEmit`](./emit/trait.TypedEmit.html): Traits encapsulating event sending functionality.
153//! * [`Drain`](./emit/trait.Drain.html) / [`Close`](./emit/trait.Close.html): Defines the asynchronous Traits for bus shutdown and cleanup.
154
155mod bind_latch;
156mod bus;
157mod chain;
158mod emit;
159mod emitter;
160mod event;
161mod handle;
162mod launcher;
163mod listener;
164mod publisher;
165mod wait_group;
166
167pub use {bus::Bus, chain::*, emit::*, emitter::*, event::Event, handle::*, listener::*};