pub struct EventBus { /* private fields */ }Expand description
The main event bus.
§Example
use net::{EventBus, EventBusConfig, Event};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let bus = EventBus::new(EventBusConfig::default()).await?;
// Ingest events
bus.ingest(Event::from_str(r#"{"token": "hello"}"#)?)?;
// Poll events
let response = bus.poll(ConsumeRequest::new(100)).await?;
bus.shutdown().await?;
Ok(())
}Implementations§
Source§impl EventBus
impl EventBus
Sourcepub async fn new(config: EventBusConfig) -> Result<Self, AdapterError>
pub async fn new(config: EventBusConfig) -> Result<Self, AdapterError>
Create a new event bus with the given configuration.
Sourcepub async fn new_with_adapter(
config: EventBusConfig,
adapter: Box<dyn Adapter>,
) -> Result<Self, AdapterError>
pub async fn new_with_adapter( config: EventBusConfig, adapter: Box<dyn Adapter>, ) -> Result<Self, AdapterError>
Create a new event bus with a caller-supplied adapter.
config.adapter is ignored — the supplied adapter is used
instead. Useful for tests that need to observe or inject
behavior at the adapter boundary (e.g. a counting adapter
for end-to-end delivery assertions, a flaky adapter for
retry-path coverage).
Sourcepub fn start_scaling_monitor(self: &Arc<Self>)
pub fn start_scaling_monitor(self: &Arc<Self>)
Start the scaling monitor (if dynamic scaling is enabled). This spawns a background task that periodically evaluates scaling decisions.
The spawned task holds a Weak<Self> rather than a strong
Arc<Self> clone. With a strong clone the task kept the bus
alive forever, and shutdown(self) (which consumes by value)
was unreachable: callers with an Arc<EventBus> could not
Arc::try_unwrap to consume it because the spawned task always
held one of the strong refs.
With a Weak, the monitor task upgrades each tick. Once the
last caller-held Arc is dropped, the upgrade fails and the
task exits cleanly. To shut down via shutdown(self), the
caller must hold the only strong reference: Arc::try_unwrap
on the resulting bus succeeds because the spawned task only
holds a Weak.
Sourcepub fn ingest(&self, event: Event) -> IngestionResult<(u16, u64)>
pub fn ingest(&self, event: Event) -> IngestionResult<(u16, u64)>
Ingest an event.
This is a non-blocking operation. The event is added to the appropriate shard’s ring buffer and will be batched and persisted asynchronously.
§Returns
The shard ID and insertion timestamp on success.
Sourcepub fn ingest_raw(&self, event: RawEvent) -> IngestionResult<(u16, u64)>
pub fn ingest_raw(&self, event: RawEvent) -> IngestionResult<(u16, u64)>
Ingest a raw event (pre-serialized with cached hash).
This is the fastest ingestion path:
- Uses pre-computed hash for shard selection (no serialization)
- Stores bytes directly (no clone needed, reference-counted)
§Returns
The shard ID and insertion timestamp on success.
Sourcepub fn ingest_batch(&self, events: Vec<Event>) -> usize
pub fn ingest_batch(&self, events: Vec<Event>) -> usize
Ingest a batch of events.
This is more efficient than calling ingest repeatedly: events
destined for the same shard share a single mutex acquisition.
§Returns
The number of successfully ingested events.
Sourcepub fn ingest_raw_batch(&self, events: Vec<RawEvent>) -> usize
pub fn ingest_raw_batch(&self, events: Vec<RawEvent>) -> usize
Ingest a batch of raw events (fastest batch ingestion).
Groups events by their destination shard and pushes each group under a single mutex acquisition.
§Returns
The number of successfully ingested events.
Sourcepub async fn poll(
&self,
request: ConsumeRequest,
) -> Result<ConsumeResponse, ConsumerError>
pub async fn poll( &self, request: ConsumeRequest, ) -> Result<ConsumeResponse, ConsumerError>
Poll events from the bus.
This retrieves events from storage according to the request parameters.
§Topology-change visibility
ArcSwap::load() snapshots the current PollMerger for
the duration of this call. A concurrent add_shard /
remove_shard_internal that .store()s a fresh merger
only affects subsequent polls — this poll continues
against the loaded snapshot’s shard list.
The implications:
- add_shard mid-poll: events ingested into the new shard between the merger swap and our return are invisible to this call. They appear on the next poll.
- remove_shard_internal mid-poll: the stale merger
still has the removed shard in its id list. Adapters
that lazy-create streams on
poll_shard(JetStream in particular) may recreate the stream and return empty/stale data. The drained events are dispatched to durable storage byremove_shard_internalitself before this poll’s adapter call lands; the next poll loads the new merger and sees the correct shard set.
In both cases the loss is transient and self-healing:
pagination via next_id and the next poll’s cursor pick
up where we left off. Callers requiring strict
“topology-stable” semantics should serialize their polls
against scaling operations externally.
Sourcepub fn num_shards(&self) -> u16
pub fn num_shards(&self) -> u16
Get the number of shards.
Sourcepub fn adapter_name(&self) -> &'static str
pub fn adapter_name(&self) -> &'static str
Get the adapter name.
Sourcepub async fn is_healthy(&self) -> bool
pub async fn is_healthy(&self) -> bool
Check if the adapter is healthy.
Sourcepub fn stats(&self) -> &EventBusStats
pub fn stats(&self) -> &EventBusStats
Get statistics.
Sourcepub fn shard_stats(&self) -> ShardStats
pub fn shard_stats(&self) -> ShardStats
Get shard statistics.
Sourcepub fn pending_in_rings(&self) -> u64
pub fn pending_in_rings(&self) -> u64
Sum of len() across every shard’s ring buffer.
Mainly useful in tests and operational diagnostics: a
non-zero value at the time of Drop (without an awaited
shutdown()) would be silently lost, so Drop folds this
into events_dropped before the bus disappears.
Sourcepub async fn flush(&self) -> Result<(), AdapterError>
pub async fn flush(&self) -> Result<(), AdapterError>
Flush all pending batches.
Waits for all shard ring buffers to drain, then for the
per-shard mpsc channels to drain, then for any pending batch
inside each batch worker to time out and dispatch — and only
then calls adapter.flush().
§Latency bound
The total wall-clock budget is the sum of three phases:
- Phase 1 (ring-buffer drain): up to 5 s.
- Phase 2 (channel + pending-batch drain): up to
min(2 s, batch.max_delay × n_workers)— capped at 2 s so a misconfiguredmax_delaycannot inflate the budget. - Phase 3 (
adapter.flush()call): up toadapter_timeout(default 30 s).
Worst-case flush() runtime is therefore ~37 s under
default config, NOT 5 s as an earlier doc-comment stated.
Callers wiring flush() into request-path latencies (HTTP
handler, RPC) MUST set adapter_timeout accordingly or run
the flush under their own outer timeout. The 5-second figure
describes Phase 1 only; the doc was misleading and is fixed
here.
The previous implementation slept a single batch.max_delay
(default 10 ms) after the ring buffers drained and immediately
called adapter.flush(). Events still in transit through the
per-shard mpsc channel, the batch worker’s pending batch, or
the in-progress adapter.on_batch call (bounded only by
adapter_timeout, default 30 s) could miss the flush. Callers
using flush() as a delivery barrier silently lost events.
Sourcepub async fn shutdown(self) -> Result<(), AdapterError>
pub async fn shutdown(self) -> Result<(), AdapterError>
Gracefully shut down the event bus.
The shutdown order is load-bearing:
- Signal
shutdownso drain workers stop pulling from ring buffers after their final sweep. - Await drain workers so every event the producer has handed to the bus is now in the per-shard mpsc channel.
- Drop
batch_sendersso each channel’s last sender is gone — the nextrecv().awaitin a batch worker will returnNone. - Await batch workers, which drain everything
remaining in their channel and exit on
recv() = None.
Reversing steps 2 and 4 (the previous design) silently
dropped events: a batch worker that exited on the shutdown
flag could leave events the drain worker pushed after its
try_recv sweep stranded in the channel.
Sourcepub async fn shutdown_via_ref(&self) -> Result<(), AdapterError>
pub async fn shutdown_via_ref(&self) -> Result<(), AdapterError>
Shutdown via shared reference — same semantics as
shutdown, but does not consume self.
Useful for callers that hold the bus behind Arc<EventBus>
(e.g., the SDK, where subscribe perpetuates an Arc clone
into every EventStream) and therefore cannot satisfy
Arc::try_unwrap. Idempotent: the first caller does the
work; concurrent or subsequent callers wait for the
shutdown_completed flag and return Ok(()).
Sourcepub fn is_shutdown(&self) -> bool
pub fn is_shutdown(&self) -> bool
True once shutdown / shutdown_via_ref has signaled — does
not imply the shutdown work has finished. Use
is_shutdown_completed for
completion.
Sourcepub fn is_shutdown_completed(&self) -> bool
pub fn is_shutdown_completed(&self) -> bool
True once shutdown / shutdown_via_ref has fully drained
workers and the adapter shutdown returned (success path only).
Sourcepub fn shard_metrics(&self) -> Option<Vec<ShardMetrics>>
pub fn shard_metrics(&self) -> Option<Vec<ShardMetrics>>
Get shard metrics (if dynamic scaling is enabled).
Sourcepub fn is_dynamic_scaling_enabled(&self) -> bool
pub fn is_dynamic_scaling_enabled(&self) -> bool
Check if dynamic scaling is enabled.
Sourcepub async fn manual_scale_up(
&self,
count: u16,
) -> Result<Vec<u16>, AdapterError>
pub async fn manual_scale_up( &self, count: u16, ) -> Result<Vec<u16>, AdapterError>
Manually trigger a scale-up (for testing or manual intervention).
Bypasses the auto-scaling cooldown so a deliberate operator
request isn’t rate-limited by the auto-scaling cadence.
Pre-fix this looped add_shard_internal() N times, each
of which bumped last_scaling, so iteration 1+ failed
with InCooldown against any non-zero cooldown — the
first shard was left half-added (workers spawned, routing
entry installed) while the error propagated to the
caller. The max_shards budget check still applies.
Sourcepub async fn manual_scale_down(
&self,
count: u16,
) -> Result<Vec<u16>, AdapterError>
pub async fn manual_scale_down( &self, count: u16, ) -> Result<Vec<u16>, AdapterError>
Manually trigger a scale-down (for testing or manual intervention).
Marks count shards as Draining, waits for them to empty,
finalizes them to Stopped, and removes them from the
routing table — mirroring the scaling monitor’s per-tick
finalize loop. Returns the IDs of shards that were
successfully drained AND removed (subset of those marked
Draining if any failed to empty within the deadline).
Drives the full scale-down lifecycle synchronously: a
plain mapper.scale_down call marks shards Draining but
does NOT finalize them — finalization is the scaling
monitor’s responsibility. Bus configs without an active
monitor (or callers that shut down before the monitor’s
next tick) would otherwise lose any events queued in those
shards’ ring buffers.