pub struct ShardMapper { /* private fields */ }Expand description
Dynamic shard mapper that manages shard allocation and producer routing.
This is the core component for dynamic scaling. It:
- Tracks metrics for all shards
- Makes scaling decisions based on policy
- Routes producers to the least-loaded shards
- Manages shard lifecycle (active → draining → stopped)
Implementations§
Source§impl ShardMapper
impl ShardMapper
Sourcepub fn new(
initial_shards: u16,
ring_buffer_capacity: usize,
policy: ScalingPolicy,
) -> Result<Self, ScalingError>
pub fn new( initial_shards: u16, ring_buffer_capacity: usize, policy: ScalingPolicy, ) -> Result<Self, ScalingError>
Create a new shard mapper with the given initial shard count and policy.
Sourcepub fn set_on_shard_created<F>(&self, callback: F)
pub fn set_on_shard_created<F>(&self, callback: F)
Set callback for shard creation.
Sourcepub fn set_on_shard_removed<F>(&self, callback: F)
pub fn set_on_shard_removed<F>(&self, callback: F)
Set callback for shard removal.
Sourcepub fn metrics_collector(
&self,
shard_id: u16,
) -> Option<Arc<ShardMetricsCollector>>
pub fn metrics_collector( &self, shard_id: u16, ) -> Option<Arc<ShardMetricsCollector>>
Get the metrics collector for a shard.
Sourcepub fn active_shard_count(&self) -> u16
pub fn active_shard_count(&self) -> u16
Get current active shard count.
Sourcepub fn total_shard_count(&self) -> u16
pub fn total_shard_count(&self) -> u16
Get total shard count (including draining).
Sourcepub fn select_shard(&self, event_hash: u64) -> u16
pub fn select_shard(&self, event_hash: u64) -> u16
Select the best shard for a new event/producer.
This implements weighted shard selection:
- Only considers active (non-draining) shards
- Prefers shards with lower weight (less loaded)
- Falls back to round-robin if weights are equal
Sourcepub fn collect_metrics(&self) -> Vec<ShardMetrics>
pub fn collect_metrics(&self) -> Vec<ShardMetrics>
Collect metrics from all shards and update weights.
Sourcepub fn evaluate_scaling(&self) -> ScalingDecision
pub fn evaluate_scaling(&self) -> ScalingDecision
Evaluate scaling based on current metrics.
Returns a scaling decision without executing it.
Sourcepub fn scale_up(&self, count: u16) -> Result<Vec<u16>, ScalingError>
pub fn scale_up(&self, count: u16) -> Result<Vec<u16>, ScalingError>
Execute a scale-up operation.
Creates new shards in the Active state and makes them
immediately available for routing. Use scale_up_provisioning
if upstream workers (drain / batch) need to be wired up before
the shard becomes selectable — otherwise producer pushes can
race ahead of consumer creation.
Sourcepub fn scale_up_provisioning(
&self,
count: u16,
) -> Result<Vec<u16>, ScalingError>
pub fn scale_up_provisioning( &self, count: u16, ) -> Result<Vec<u16>, ScalingError>
Like scale_up, but the new shards are created in the
Provisioning state. They receive an id and a metrics
collector, but select_shard will not route to them and they
are excluded from active_shard_count / evaluate_scaling
until the caller transitions each shard with activate.
Use this when upstream consumer infrastructure (drain/batch
workers, mpsc channels, etc.) must be wired up before the
shard becomes selectable. Without this gating, producers can
observe the shard via select_shard, push into its ring
buffer, and never have those events drained.
Returns the allocated ids in order. Cooldown / max_shards
gating matches scale_up so that staged allocation cannot be
used to bypass the policy.
Sourcepub fn scale_up_provisioning_force(
&self,
count: u16,
) -> Result<Vec<u16>, ScalingError>
pub fn scale_up_provisioning_force( &self, count: u16, ) -> Result<Vec<u16>, ScalingError>
Allocate count Provisioning shards, bypassing the cooldown gate.
Used by operator-initiated manual_scale_up paths. The
cooldown exists to prevent the auto-scaling monitor from
scaling-up too aggressively in response to transient
load spikes; a manual call from an operator is a
deliberate request that should not be rate-limited by
the auto-scaling cadence. The budget check (against
max_shards) still applies.
Pre-fix manual_scale_up(N) looped add_shard() N
times, each call invoking scale_up_provisioning(1)
which bumped last_scaling. The second call then
immediately failed with InCooldown (default 30s
cooldown), leaving the first shard half-added and
returning an error to the operator with no rollback.
Sourcepub fn activate(&self, shard_id: u16) -> Result<bool, ScalingError>
pub fn activate(&self, shard_id: u16) -> Result<bool, ScalingError>
Transition a Provisioning shard to Active.
Returns Ok(true) if a state transition actually occurred
(Provisioning → Active) and Ok(false) if the shard was
already Active — the latter is the idempotent path.
Returns InvalidPolicy for unknown or Draining/Stopped
shards — those states require a different lifecycle path.
Bumps active_count and notifies the on_shard_created
callback exactly once per real transition.
Pre-fix this returned Result<(), ScalingError>,
so callers (notably ShardManager::activate_shard) could
not tell whether they had bumped the live count or not and
double-incremented their own num_shards on every
idempotent call.
Sourcepub fn drain_specific(&self, shard_id: u16) -> Result<(), ScalingError>
pub fn drain_specific(&self, shard_id: u16) -> Result<(), ScalingError>
Drain a specific shard by id, transitioning it from Active
to Draining.
Companion to ShardManager::drain_shard. The previous
implementation only flipped the metrics collector’s draining
atomic; this version atomically updates MappedShard.state
(so select_shard stops routing to the shard) and decrements
active_count (so evaluate_scaling’s budget math stays
consistent with scale_down). Returns an error if the shard
is not in Active state, or if doing so would push the active
count below min_shards.
Sourcepub fn scale_down(&self, count: u16) -> Result<Vec<u16>, ScalingError>
pub fn scale_down(&self, count: u16) -> Result<Vec<u16>, ScalingError>
Start draining shards for scale-down.
Marks shards as draining so they stop receiving new events. Shards will be removed once they’re empty.
Sourcepub fn finalize_draining(&self) -> Vec<u16>
pub fn finalize_draining(&self) -> Vec<u16>
Check draining shards and finalize those that are empty.
Returns IDs of shards that were stopped.
This predicate looks ONLY at the ring buffer
(current_len + pushes_since_drain_start); it does NOT
probe the per-shard mpsc channel or the BatchWorker’s
current_batch. A shard that the predicate flags as empty
can still have events queued in those two places. The
correctness gate is therefore bus::remove_shard_internal,
which awaits the BatchWorker’s JoinHandle before
constructing the stranded-flush batch — see that function’s
step 3 for the rationale. Tightening this predicate is a
defense-in-depth follow-up; a stricter ring-buffer-empty
signal here would only narrow an already-closed window.
Sourcepub fn remove_specific_stopped_shard(&self, shard_id: u16) -> bool
pub fn remove_specific_stopped_shard(&self, shard_id: u16) -> bool
Remove a specific shard from the mapper if it is in the
Stopped state. Used by ShardManager::remove_shard so a
per-shard cleanup doesn’t disturb sibling Stopped
entries — which a sequential manual_scale_down loop
still needs to look up state for. Returns true if the
shard existed and was Stopped (and was removed).
Sourcepub fn remove_stopped_shards(&self) -> Vec<u16>
pub fn remove_stopped_shards(&self) -> Vec<u16>
Remove stopped shards from the mapper.
Sourcepub fn shard_state(&self, shard_id: u16) -> Option<ShardState>
pub fn shard_state(&self, shard_id: u16) -> Option<ShardState>
Get the state of a specific shard.
Sourcepub fn active_shard_ids(&self) -> Vec<u16>
pub fn active_shard_ids(&self) -> Vec<u16>
Get all active shard IDs.
Sourcepub fn all_shard_ids(&self) -> Vec<u16>
pub fn all_shard_ids(&self) -> Vec<u16>
Get all shard IDs (including draining).
Sourcepub fn policy(&self) -> &ScalingPolicy
pub fn policy(&self) -> &ScalingPolicy
Get the scaling policy.