Skip to main content

EventBus

Struct EventBus 

Source
pub struct EventBus { /* private fields */ }
Expand description

The main event bus.

§Example

use net::{EventBus, EventBusConfig, Event};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let bus = EventBus::new(EventBusConfig::default()).await?;

    // Ingest events
    bus.ingest(Event::from_str(r#"{"token": "hello"}"#)?)?;

    // Poll events
    let response = bus.poll(ConsumeRequest::new(100)).await?;

    bus.shutdown().await?;
    Ok(())
}

Implementations§

Source§

impl EventBus

Source

pub async fn new(config: EventBusConfig) -> Result<Self, AdapterError>

Create a new event bus with the given configuration.

Source

pub async fn new_with_adapter( config: EventBusConfig, adapter: Box<dyn Adapter>, ) -> Result<Self, AdapterError>

Create a new event bus with a caller-supplied adapter.

config.adapter is ignored — the supplied adapter is used instead. Useful for tests that need to observe or inject behavior at the adapter boundary (e.g. a counting adapter for end-to-end delivery assertions, a flaky adapter for retry-path coverage).

Source

pub fn start_scaling_monitor(self: &Arc<Self>)

Start the scaling monitor (if dynamic scaling is enabled). This spawns a background task that periodically evaluates scaling decisions.

The spawned task holds a Weak<Self> rather than a strong Arc<Self> clone. With a strong clone the task kept the bus alive forever, and shutdown(self) (which consumes by value) was unreachable: callers with an Arc<EventBus> could not Arc::try_unwrap to consume it because the spawned task always held one of the strong refs.

With a Weak, the monitor task upgrades each tick. Once the last caller-held Arc is dropped, the upgrade fails and the task exits cleanly. To shut down via shutdown(self), the caller must hold the only strong reference: Arc::try_unwrap on the resulting bus succeeds because the spawned task only holds a Weak.

Source

pub fn ingest(&self, event: Event) -> IngestionResult<(u16, u64)>

Ingest an event.

This is a non-blocking operation. The event is added to the appropriate shard’s ring buffer and will be batched and persisted asynchronously.

§Returns

The shard ID and insertion timestamp on success.

Source

pub fn ingest_raw(&self, event: RawEvent) -> IngestionResult<(u16, u64)>

Ingest a raw event (pre-serialized with cached hash).

This is the fastest ingestion path:

  • Uses pre-computed hash for shard selection (no serialization)
  • Stores bytes directly (no clone needed, reference-counted)
§Returns

The shard ID and insertion timestamp on success.

Source

pub fn ingest_batch(&self, events: Vec<Event>) -> usize

Ingest a batch of events.

This is more efficient than calling ingest repeatedly: events destined for the same shard share a single mutex acquisition.

§Returns

The number of successfully ingested events.

Source

pub fn ingest_raw_batch(&self, events: Vec<RawEvent>) -> usize

Ingest a batch of raw events (fastest batch ingestion).

Groups events by their destination shard and pushes each group under a single mutex acquisition.

§Returns

The number of successfully ingested events.

Source

pub async fn poll( &self, request: ConsumeRequest, ) -> Result<ConsumeResponse, ConsumerError>

Poll events from the bus.

This retrieves events from storage according to the request parameters.

§Topology-change visibility

ArcSwap::load() snapshots the current PollMerger for the duration of this call. A concurrent add_shard / remove_shard_internal that .store()s a fresh merger only affects subsequent polls — this poll continues against the loaded snapshot’s shard list.

The implications:

  • add_shard mid-poll: events ingested into the new shard between the merger swap and our return are invisible to this call. They appear on the next poll.
  • remove_shard_internal mid-poll: the stale merger still has the removed shard in its id list. Adapters that lazy-create streams on poll_shard (JetStream in particular) may recreate the stream and return empty/stale data. The drained events are dispatched to durable storage by remove_shard_internal itself before this poll’s adapter call lands; the next poll loads the new merger and sees the correct shard set.

In both cases the loss is transient and self-healing: pagination via next_id and the next poll’s cursor pick up where we left off. Callers requiring strict “topology-stable” semantics should serialize their polls against scaling operations externally.

Source

pub fn num_shards(&self) -> u16

Get the number of shards.

Source

pub fn adapter_name(&self) -> &'static str

Get the adapter name.

Source

pub async fn is_healthy(&self) -> bool

Check if the adapter is healthy.

Source

pub fn stats(&self) -> &EventBusStats

Get statistics.

Source

pub fn shard_stats(&self) -> ShardStats

Get shard statistics.

Source

pub fn pending_in_rings(&self) -> u64

Sum of len() across every shard’s ring buffer.

Mainly useful in tests and operational diagnostics: a non-zero value at the time of Drop (without an awaited shutdown()) would be silently lost, so Drop folds this into events_dropped before the bus disappears.

Source

pub async fn flush(&self) -> Result<(), AdapterError>

Flush all pending batches.

Waits for all shard ring buffers to drain, then for the per-shard mpsc channels to drain, then for any pending batch inside each batch worker to time out and dispatch — and only then calls adapter.flush().

§Latency bound

The total wall-clock budget is the sum of three phases:

  • Phase 1 (ring-buffer drain): up to 5 s.
  • Phase 2 (channel + pending-batch drain): up to min(2 s, batch.max_delay × n_workers) — capped at 2 s so a misconfigured max_delay cannot inflate the budget.
  • Phase 3 (adapter.flush() call): up to adapter_timeout (default 30 s).

Worst-case flush() runtime is therefore ~37 s under default config, NOT 5 s as an earlier doc-comment stated. Callers wiring flush() into request-path latencies (HTTP handler, RPC) MUST set adapter_timeout accordingly or run the flush under their own outer timeout. The 5-second figure describes Phase 1 only; the doc was misleading and is fixed here.

The previous implementation slept a single batch.max_delay (default 10 ms) after the ring buffers drained and immediately called adapter.flush(). Events still in transit through the per-shard mpsc channel, the batch worker’s pending batch, or the in-progress adapter.on_batch call (bounded only by adapter_timeout, default 30 s) could miss the flush. Callers using flush() as a delivery barrier silently lost events.

Source

pub async fn shutdown(self) -> Result<(), AdapterError>

Gracefully shut down the event bus.

The shutdown order is load-bearing:

  1. Signal shutdown so drain workers stop pulling from ring buffers after their final sweep.
  2. Await drain workers so every event the producer has handed to the bus is now in the per-shard mpsc channel.
  3. Drop batch_senders so each channel’s last sender is gone — the next recv().await in a batch worker will return None.
  4. Await batch workers, which drain everything remaining in their channel and exit on recv() = None.

Reversing steps 2 and 4 (the previous design) silently dropped events: a batch worker that exited on the shutdown flag could leave events the drain worker pushed after its try_recv sweep stranded in the channel.

Source

pub async fn shutdown_via_ref(&self) -> Result<(), AdapterError>

Shutdown via shared reference — same semantics as shutdown, but does not consume self.

Useful for callers that hold the bus behind Arc<EventBus> (e.g., the SDK, where subscribe perpetuates an Arc clone into every EventStream) and therefore cannot satisfy Arc::try_unwrap. Idempotent: the first caller does the work; concurrent or subsequent callers wait for the shutdown_completed flag and return Ok(()).

Source

pub fn is_shutdown(&self) -> bool

True once shutdown / shutdown_via_ref has signaled — does not imply the shutdown work has finished. Use is_shutdown_completed for completion.

Source

pub fn is_shutdown_completed(&self) -> bool

True once shutdown / shutdown_via_ref has fully drained workers and the adapter shutdown returned (success path only).

Source

pub fn shard_metrics(&self) -> Option<Vec<ShardMetrics>>

Get shard metrics (if dynamic scaling is enabled).

Source

pub fn is_dynamic_scaling_enabled(&self) -> bool

Check if dynamic scaling is enabled.

Source

pub async fn manual_scale_up( &self, count: u16, ) -> Result<Vec<u16>, AdapterError>

Manually trigger a scale-up (for testing or manual intervention).

Bypasses the auto-scaling cooldown so a deliberate operator request isn’t rate-limited by the auto-scaling cadence. Pre-fix this looped add_shard_internal() N times, each of which bumped last_scaling, so iteration 1+ failed with InCooldown against any non-zero cooldown — the first shard was left half-added (workers spawned, routing entry installed) while the error propagated to the caller. The max_shards budget check still applies.

Source

pub async fn manual_scale_down( &self, count: u16, ) -> Result<Vec<u16>, AdapterError>

Manually trigger a scale-down (for testing or manual intervention).

Marks count shards as Draining, waits for them to empty, finalizes them to Stopped, and removes them from the routing table — mirroring the scaling monitor’s per-tick finalize loop. Returns the IDs of shards that were successfully drained AND removed (subset of those marked Draining if any failed to empty within the deadline).

Drives the full scale-down lifecycle synchronously: a plain mapper.scale_down call marks shards Draining but does NOT finalize them — finalization is the scaling monitor’s responsibility. Bus configs without an active monitor (or callers that shut down before the monitor’s next tick) would otherwise lose any events queued in those shards’ ring buffers.

Trait Implementations§

Source§

impl Drop for EventBus

Source§

fn drop(&mut self)

Executes the destructor for this type. Read more
Source§

fn pin_drop(self: Pin<&mut Self>)

🔬This is a nightly-only experimental API. (pin_ergonomics)
Execute the destructor for this type, but different to Drop::drop, it requires self to be pinned. Read more

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more