Skip to main content

ShardMapper

Struct ShardMapper 

Source
pub struct ShardMapper { /* private fields */ }
Expand description

Dynamic shard mapper that manages shard allocation and producer routing.

This is the core component for dynamic scaling. It:

  • Tracks metrics for all shards
  • Makes scaling decisions based on policy
  • Routes producers to the least-loaded shards
  • Manages shard lifecycle (active → draining → stopped)

Implementations§

Source§

impl ShardMapper

Source

pub fn new( initial_shards: u16, ring_buffer_capacity: usize, policy: ScalingPolicy, ) -> Result<Self, ScalingError>

Create a new shard mapper with the given initial shard count and policy.

Source

pub fn set_on_shard_created<F>(&self, callback: F)
where F: Fn(u16) + Send + Sync + 'static,

Set callback for shard creation.

Source

pub fn set_on_shard_removed<F>(&self, callback: F)
where F: Fn(u16) + Send + Sync + 'static,

Set callback for shard removal.

Source

pub fn metrics_collector( &self, shard_id: u16, ) -> Option<Arc<ShardMetricsCollector>>

Get the metrics collector for a shard.

Source

pub fn active_shard_count(&self) -> u16

Get current active shard count.

Source

pub fn total_shard_count(&self) -> u16

Get total shard count (including draining).

Source

pub fn select_shard(&self, event_hash: u64) -> u16

Select the best shard for a new event/producer.

This implements weighted shard selection:

  • Only considers active (non-draining) shards
  • Prefers shards with lower weight (less loaded)
  • Falls back to round-robin if weights are equal
Source

pub fn collect_metrics(&self) -> Vec<ShardMetrics>

Collect metrics from all shards and update weights.

Source

pub fn evaluate_scaling(&self) -> ScalingDecision

Evaluate scaling based on current metrics.

Returns a scaling decision without executing it.

Source

pub fn scale_up(&self, count: u16) -> Result<Vec<u16>, ScalingError>

Execute a scale-up operation.

Creates new shards in the Active state and makes them immediately available for routing. Use scale_up_provisioning if upstream workers (drain / batch) need to be wired up before the shard becomes selectable — otherwise producer pushes can race ahead of consumer creation.

Source

pub fn scale_up_provisioning( &self, count: u16, ) -> Result<Vec<u16>, ScalingError>

Like scale_up, but the new shards are created in the Provisioning state. They receive an id and a metrics collector, but select_shard will not route to them and they are excluded from active_shard_count / evaluate_scaling until the caller transitions each shard with activate.

Use this when upstream consumer infrastructure (drain/batch workers, mpsc channels, etc.) must be wired up before the shard becomes selectable. Without this gating, producers can observe the shard via select_shard, push into its ring buffer, and never have those events drained.

Returns the allocated ids in order. Cooldown / max_shards gating matches scale_up so that staged allocation cannot be used to bypass the policy.

Source

pub fn scale_up_provisioning_force( &self, count: u16, ) -> Result<Vec<u16>, ScalingError>

Allocate count Provisioning shards, bypassing the cooldown gate.

Used by operator-initiated manual_scale_up paths. The cooldown exists to prevent the auto-scaling monitor from scaling-up too aggressively in response to transient load spikes; a manual call from an operator is a deliberate request that should not be rate-limited by the auto-scaling cadence. The budget check (against max_shards) still applies.

Pre-fix manual_scale_up(N) looped add_shard() N times, each call invoking scale_up_provisioning(1) which bumped last_scaling. The second call then immediately failed with InCooldown (default 30s cooldown), leaving the first shard half-added and returning an error to the operator with no rollback.

Source

pub fn activate(&self, shard_id: u16) -> Result<bool, ScalingError>

Transition a Provisioning shard to Active.

Returns Ok(true) if a state transition actually occurred (Provisioning → Active) and Ok(false) if the shard was already Active — the latter is the idempotent path. Returns InvalidPolicy for unknown or Draining/Stopped shards — those states require a different lifecycle path. Bumps active_count and notifies the on_shard_created callback exactly once per real transition.

Pre-fix this returned Result<(), ScalingError>, so callers (notably ShardManager::activate_shard) could not tell whether they had bumped the live count or not and double-incremented their own num_shards on every idempotent call.

Source

pub fn drain_specific(&self, shard_id: u16) -> Result<(), ScalingError>

Drain a specific shard by id, transitioning it from Active to Draining.

Companion to ShardManager::drain_shard. The previous implementation only flipped the metrics collector’s draining atomic; this version atomically updates MappedShard.state (so select_shard stops routing to the shard) and decrements active_count (so evaluate_scaling’s budget math stays consistent with scale_down). Returns an error if the shard is not in Active state, or if doing so would push the active count below min_shards.

Source

pub fn scale_down(&self, count: u16) -> Result<Vec<u16>, ScalingError>

Start draining shards for scale-down.

Marks shards as draining so they stop receiving new events. Shards will be removed once they’re empty.

Source

pub fn finalize_draining(&self) -> Vec<u16>

Check draining shards and finalize those that are empty.

Returns IDs of shards that were stopped.

This predicate looks ONLY at the ring buffer (current_len + pushes_since_drain_start); it does NOT probe the per-shard mpsc channel or the BatchWorker’s current_batch. A shard that the predicate flags as empty can still have events queued in those two places. The correctness gate is therefore bus::remove_shard_internal, which awaits the BatchWorker’s JoinHandle before constructing the stranded-flush batch — see that function’s step 3 for the rationale. Tightening this predicate is a defense-in-depth follow-up; a stricter ring-buffer-empty signal here would only narrow an already-closed window.

Source

pub fn remove_specific_stopped_shard(&self, shard_id: u16) -> bool

Remove a specific shard from the mapper if it is in the Stopped state. Used by ShardManager::remove_shard so a per-shard cleanup doesn’t disturb sibling Stopped entries — which a sequential manual_scale_down loop still needs to look up state for. Returns true if the shard existed and was Stopped (and was removed).

Source

pub fn remove_stopped_shards(&self) -> Vec<u16>

Remove stopped shards from the mapper.

Source

pub fn shard_state(&self, shard_id: u16) -> Option<ShardState>

Get the state of a specific shard.

Source

pub fn active_shard_ids(&self) -> Vec<u16>

Get all active shard IDs.

Source

pub fn all_shard_ids(&self) -> Vec<u16>

Get all shard IDs (including draining).

Source

pub fn policy(&self) -> &ScalingPolicy

Get the scaling policy.

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more