pub struct ShardManager { /* private fields */ }Expand description
Manager for multiple shards.
The ShardManager can operate in two modes:
- Static mode (default): Fixed number of shards, simple hash-based routing
- Dynamic mode: Shards can be added/removed based on load, weighted routing
Implementations§
Source§impl ShardManager
impl ShardManager
Sourcepub fn new(
num_shards: u16,
ring_buffer_capacity: usize,
backpressure_mode: BackpressureMode,
) -> Self
pub fn new( num_shards: u16, ring_buffer_capacity: usize, backpressure_mode: BackpressureMode, ) -> Self
Create a new shard manager (static mode).
Sourcepub fn with_mapper(
num_shards: u16,
ring_buffer_capacity: usize,
backpressure_mode: BackpressureMode,
policy: ScalingPolicy,
) -> Result<Self, ScalingError>
pub fn with_mapper( num_shards: u16, ring_buffer_capacity: usize, backpressure_mode: BackpressureMode, policy: ScalingPolicy, ) -> Result<Self, ScalingError>
Create a new shard manager with dynamic scaling enabled.
Sourcepub fn mapper(&self) -> Option<&Arc<ShardMapper>>
pub fn mapper(&self) -> Option<&Arc<ShardMapper>>
Get the shard mapper (if dynamic scaling is enabled).
Sourcepub fn num_shards(&self) -> u16
pub fn num_shards(&self) -> u16
Get the number of active shards.
Sourcepub fn backpressure_mode(&self) -> BackpressureMode
pub fn backpressure_mode(&self) -> BackpressureMode
Get the backpressure mode.
Sourcepub fn select_shard(&self, event: &JsonValue) -> u16
👎Deprecated since 0.10.0: serializes the value just to hash it; prefer RawEvent::from_value(v).hash() + select_shard_by_hash to avoid the duplicate serialization
pub fn select_shard(&self, event: &JsonValue) -> u16
serializes the value just to hash it; prefer RawEvent::from_value(v).hash() + select_shard_by_hash to avoid the duplicate serialization
Select a shard for an event based on its content hash. Uses weighted selection if dynamic scaling is enabled.
Prefer select_shard_by_hash. This method serializes the
JsonValue to bytes just to compute the hash; if you already
have a RawEvent (or any pre-computed xxh3_64 of the
canonical bytes), pass that hash directly. The internal
ingest paths all do — this method exists for ad-hoc external
callers that haven’t yet adopted the RawEvent pattern.
Sourcepub fn select_shard_by_hash(&self, hash: u64) -> u16
pub fn select_shard_by_hash(&self, hash: u64) -> u16
Select a shard using a pre-computed hash.
This is faster than select_shard when you already have the hash.
Sourcepub fn ingest(&self, event: JsonValue) -> Result<(u16, u64), IngestionError>
pub fn ingest(&self, event: JsonValue) -> Result<(u16, u64), IngestionError>
Ingest an event into the appropriate shard.
Sourcepub fn ingest_raw(&self, event: RawEvent) -> Result<(u16, u64), IngestionError>
pub fn ingest_raw(&self, event: RawEvent) -> Result<(u16, u64), IngestionError>
Ingest a raw event (pre-serialized with cached hash).
This is the fastest ingestion path:
- Uses pre-computed hash for shard selection (no serialization)
- Stores bytes directly (no clone needed, reference-counted)
Sourcepub fn ingest_raw_batch(&self, events: Vec<RawEvent>) -> (usize, usize)
pub fn ingest_raw_batch(&self, events: Vec<RawEvent>) -> (usize, usize)
Ingest a batch of pre-serialized events, grouped by shard.
Each destination shard’s mutex is acquired once and all of that
shard’s events are pushed before releasing. With a uniform hash
distribution this amortizes lock acquisitions from O(events) to
O(shards). Backpressure semantics match per-event ingest_raw.
Returns (success, unrouted) where success is the count of
events successfully pushed onto a shard’s ring buffer and
unrouted is the count of events whose destination shard was
not present in the routing table at the time of dispatch
(e.g. concurrent scale-down). The remainder
(total - success - unrouted) is the backpressure-class drop
count.
Returns (success, unrouted) rather than just success
so the bus can subtract unrouted before publishing
events_dropped. Returning only success would let the
bus’s dropped = total - success accounting double-count
unrouted events — they’re already tallied on
events_unrouted inside this function.
Sourcepub fn with_shard<F, R>(&self, id: u16, f: F) -> Option<R>
pub fn with_shard<F, R>(&self, id: u16, f: F) -> Option<R>
Execute a function with exclusive access to a shard.
Sourcepub fn all_shards_empty(&self) -> bool
pub fn all_shards_empty(&self) -> bool
Returns true if every shard’s ring buffer is empty.
Cheaper than shard_ids() + repeated with_shard: loads the
routing table once and checks each shard behind a brief lock.
Sourcepub fn total_pending_in_rings(&self) -> u64
pub fn total_pending_in_rings(&self) -> u64
Sum of len() across every shard’s ring buffer.
Sourcepub fn try_total_pending_in_rings(&self) -> (u64, usize)
pub fn try_total_pending_in_rings(&self) -> (u64, usize)
Best-effort variant of Self::total_pending_in_rings that
never blocks: every shard whose mutex is currently held is
skipped (counted as zero). Use this from Drop or any path
that may run on a thread already holding a shard lock
(single-thread runtime + panic during shutdown is the
canonical hazard); the blocking variant would self-deadlock
there.
Returns (sum_counted, uncounted_shard_count) so the caller
can log the uncertainty in the result.
Sourcepub fn stats(&self) -> ShardStats
pub fn stats(&self) -> ShardStats
Get aggregated statistics from all shards.
Lock-free: reads each shard’s atomic counters directly via the
parallel counters vector on the routing table, with no per-
shard mutex acquisition. events_unrouted is sourced from the
ShardManager itself rather than the per-shard counters since
unrouted events have no shard to attribute to.
Sourcepub fn add_shard(&self) -> Result<u16, ScalingError>
pub fn add_shard(&self) -> Result<u16, ScalingError>
Add a new shard (for dynamic scaling).
Returns the new shard ID. The shard is in the routing table
and ready to be the destination of select_shard calls
only after activate_shard is called for it.
Previously the mapper marked the shard Active before the
routing table was rebuilt and before any worker was wired up
to drain its ring buffer. Producers could select_shard to
the new id, push into its ring buffer, and have the events
stranded with no consumer. The fix uses
scale_up_provisioning so the mapper records the shard but
select_shard skips it, then activate_shard flips it to
Active once workers are ready.
Sourcepub fn add_shard_force(&self) -> Result<u16, ScalingError>
pub fn add_shard_force(&self) -> Result<u16, ScalingError>
Like add_shard but bypasses the auto-scaling cooldown.
Used by operator-initiated manual_scale_up paths. The
auto-scaling cooldown protects against the auto-scaling
monitor reacting too quickly to transient load spikes;
a deliberate operator action should not be rate-limited
by that cadence. The max_shards budget check still
applies.
Sourcepub fn activate_shard(&self, shard_id: u16) -> Result<(), ScalingError>
pub fn activate_shard(&self, shard_id: u16) -> Result<(), ScalingError>
Activate a previously-provisioned shard. After this returns,
select_shard will route to the shard and producer pushes
will land in its ring buffer.
Idempotent: calling on an already-Active shard is Ok(()).
Pre-fix this unconditionally fetch_add(1)d
num_shards even when the mapper’s activate() early-
returned for an already-Active shard. After repeated
activate calls, num_shards exceeded both the mapper’s
active_count and the actual shard count, breaking
modulo-based shard selection (select_shard) and
producing stale routing decisions. Post-fix gates the
fetch_add on the mapper’s transition signal.
Sourcepub fn drain_shard(&self, shard_id: u16) -> Result<(), ScalingError>
pub fn drain_shard(&self, shard_id: u16) -> Result<(), ScalingError>
Start draining a shard (for dynamic scaling).
Previously only flipped the metrics collector’s draining
atomic, leaving MappedShard.state untouched. Result:
select_shard (which filters on state == Active) still
routed new producers to the shard. The fix calls into the
mapper, which atomically transitions the state to Draining
and (for accounting) decrements active_count, mirroring
scale_down(N) for a single targeted shard.
Sourcepub fn remove_shard(
&self,
shard_id: u16,
) -> Result<Vec<InternalEvent>, ScalingError>
pub fn remove_shard( &self, shard_id: u16, ) -> Result<Vec<InternalEvent>, ScalingError>
Remove a shard from the routing table.
Previously this only unmapped the shard from the routing
table. The drain worker, on its next with_shard call,
observed None and exited — leaving any events still in the
ring buffer permanently stranded. The fix drains the ring
buffer into a caller-supplied scratch Vec before the
unmap, then returns the drained events so the caller
(typically EventBus::remove_shard_internal) can flush them
through to the adapter rather than dropping them.
Returns Ok(events) where events is whatever was still
queued in the ring buffer at unmap time (possibly empty).
Caller is responsible for handing those off to the adapter.
Sourcepub fn collect_metrics(&self) -> Option<Vec<ShardMetrics>>
pub fn collect_metrics(&self) -> Option<Vec<ShardMetrics>>
Collect metrics from all shards (for dynamic scaling decisions).
Sourcepub fn evaluate_scaling(&self) -> ScalingDecision
pub fn evaluate_scaling(&self) -> ScalingDecision
Evaluate and optionally execute scaling.