Skip to main content

ShardManager

Struct ShardManager 

Source
pub struct ShardManager { /* private fields */ }
Expand description

Manager for multiple shards.

The ShardManager can operate in two modes:

  1. Static mode (default): Fixed number of shards, simple hash-based routing
  2. Dynamic mode: Shards can be added/removed based on load, weighted routing

Implementations§

Source§

impl ShardManager

Source

pub fn new( num_shards: u16, ring_buffer_capacity: usize, backpressure_mode: BackpressureMode, ) -> Self

Create a new shard manager (static mode).

Source

pub fn with_mapper( num_shards: u16, ring_buffer_capacity: usize, backpressure_mode: BackpressureMode, policy: ScalingPolicy, ) -> Result<Self, ScalingError>

Create a new shard manager with dynamic scaling enabled.

Source

pub fn mapper(&self) -> Option<&Arc<ShardMapper>>

Get the shard mapper (if dynamic scaling is enabled).

Source

pub fn num_shards(&self) -> u16

Get the number of active shards.

Source

pub fn backpressure_mode(&self) -> BackpressureMode

Get the backpressure mode.

Source

pub fn select_shard(&self, event: &JsonValue) -> u16

👎Deprecated since 0.10.0:

serializes the value just to hash it; prefer RawEvent::from_value(v).hash() + select_shard_by_hash to avoid the duplicate serialization

Select a shard for an event based on its content hash. Uses weighted selection if dynamic scaling is enabled.

Prefer select_shard_by_hash. This method serializes the JsonValue to bytes just to compute the hash; if you already have a RawEvent (or any pre-computed xxh3_64 of the canonical bytes), pass that hash directly. The internal ingest paths all do — this method exists for ad-hoc external callers that haven’t yet adopted the RawEvent pattern.

Source

pub fn select_shard_by_hash(&self, hash: u64) -> u16

Select a shard using a pre-computed hash.

This is faster than select_shard when you already have the hash.

Source

pub fn ingest(&self, event: JsonValue) -> Result<(u16, u64), IngestionError>

Ingest an event into the appropriate shard.

Source

pub fn ingest_raw(&self, event: RawEvent) -> Result<(u16, u64), IngestionError>

Ingest a raw event (pre-serialized with cached hash).

This is the fastest ingestion path:

  • Uses pre-computed hash for shard selection (no serialization)
  • Stores bytes directly (no clone needed, reference-counted)
Source

pub fn ingest_raw_batch(&self, events: Vec<RawEvent>) -> (usize, usize)

Ingest a batch of pre-serialized events, grouped by shard.

Each destination shard’s mutex is acquired once and all of that shard’s events are pushed before releasing. With a uniform hash distribution this amortizes lock acquisitions from O(events) to O(shards). Backpressure semantics match per-event ingest_raw.

Returns (success, unrouted) where success is the count of events successfully pushed onto a shard’s ring buffer and unrouted is the count of events whose destination shard was not present in the routing table at the time of dispatch (e.g. concurrent scale-down). The remainder (total - success - unrouted) is the backpressure-class drop count.

Returns (success, unrouted) rather than just success so the bus can subtract unrouted before publishing events_dropped. Returning only success would let the bus’s dropped = total - success accounting double-count unrouted events — they’re already tallied on events_unrouted inside this function.

Source

pub fn shard(&self, id: u16) -> Option<ShardRef>

Get a reference to a shard by ID.

Source

pub fn with_shard<F, R>(&self, id: u16, f: F) -> Option<R>
where F: FnOnce(&mut Shard) -> R,

Execute a function with exclusive access to a shard.

Source

pub fn all_shards_empty(&self) -> bool

Returns true if every shard’s ring buffer is empty.

Cheaper than shard_ids() + repeated with_shard: loads the routing table once and checks each shard behind a brief lock.

Source

pub fn shard_ids(&self) -> Vec<u16>

Iterate over all active shard IDs.

Source

pub fn total_pending_in_rings(&self) -> u64

Sum of len() across every shard’s ring buffer.

Source

pub fn try_total_pending_in_rings(&self) -> (u64, usize)

Best-effort variant of Self::total_pending_in_rings that never blocks: every shard whose mutex is currently held is skipped (counted as zero). Use this from Drop or any path that may run on a thread already holding a shard lock (single-thread runtime + panic during shutdown is the canonical hazard); the blocking variant would self-deadlock there.

Returns (sum_counted, uncounted_shard_count) so the caller can log the uncertainty in the result.

Source

pub fn stats(&self) -> ShardStats

Get aggregated statistics from all shards.

Lock-free: reads each shard’s atomic counters directly via the parallel counters vector on the routing table, with no per- shard mutex acquisition. events_unrouted is sourced from the ShardManager itself rather than the per-shard counters since unrouted events have no shard to attribute to.

Source

pub fn add_shard(&self) -> Result<u16, ScalingError>

Add a new shard (for dynamic scaling). Returns the new shard ID. The shard is in the routing table and ready to be the destination of select_shard calls only after activate_shard is called for it.

Previously the mapper marked the shard Active before the routing table was rebuilt and before any worker was wired up to drain its ring buffer. Producers could select_shard to the new id, push into its ring buffer, and have the events stranded with no consumer. The fix uses scale_up_provisioning so the mapper records the shard but select_shard skips it, then activate_shard flips it to Active once workers are ready.

Source

pub fn add_shard_force(&self) -> Result<u16, ScalingError>

Like add_shard but bypasses the auto-scaling cooldown.

Used by operator-initiated manual_scale_up paths. The auto-scaling cooldown protects against the auto-scaling monitor reacting too quickly to transient load spikes; a deliberate operator action should not be rate-limited by that cadence. The max_shards budget check still applies.

Source

pub fn activate_shard(&self, shard_id: u16) -> Result<(), ScalingError>

Activate a previously-provisioned shard. After this returns, select_shard will route to the shard and producer pushes will land in its ring buffer.

Idempotent: calling on an already-Active shard is Ok(()).

Pre-fix this unconditionally fetch_add(1)d num_shards even when the mapper’s activate() early- returned for an already-Active shard. After repeated activate calls, num_shards exceeded both the mapper’s active_count and the actual shard count, breaking modulo-based shard selection (select_shard) and producing stale routing decisions. Post-fix gates the fetch_add on the mapper’s transition signal.

Source

pub fn drain_shard(&self, shard_id: u16) -> Result<(), ScalingError>

Start draining a shard (for dynamic scaling).

Previously only flipped the metrics collector’s draining atomic, leaving MappedShard.state untouched. Result: select_shard (which filters on state == Active) still routed new producers to the shard. The fix calls into the mapper, which atomically transitions the state to Draining and (for accounting) decrements active_count, mirroring scale_down(N) for a single targeted shard.

Source

pub fn remove_shard( &self, shard_id: u16, ) -> Result<Vec<InternalEvent>, ScalingError>

Remove a shard from the routing table.

Previously this only unmapped the shard from the routing table. The drain worker, on its next with_shard call, observed None and exited — leaving any events still in the ring buffer permanently stranded. The fix drains the ring buffer into a caller-supplied scratch Vec before the unmap, then returns the drained events so the caller (typically EventBus::remove_shard_internal) can flush them through to the adapter rather than dropping them.

Returns Ok(events) where events is whatever was still queued in the ring buffer at unmap time (possibly empty). Caller is responsible for handing those off to the adapter.

Source

pub fn collect_metrics(&self) -> Option<Vec<ShardMetrics>>

Collect metrics from all shards (for dynamic scaling decisions).

Source

pub fn evaluate_scaling(&self) -> ScalingDecision

Evaluate and optionally execute scaling.

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more