Skip to main content

MeshSyncManager

Struct MeshSyncManager 

Source
pub struct MeshSyncManager { /* private fields */ }
Expand description

Mesh sync manager for coordinating state synchronization

Implementations§

Source§

impl MeshSyncManager

Source

pub fn new(stores: Arc<StateStores>, self_name: String) -> Self

Source

pub fn register_tree_state_subscriber( &self, subscriber: Arc<dyn TreeStateSubscriber>, )

Source

pub fn register_worker_state_subscriber( &self, subscriber: Arc<dyn WorkerStateSubscriber>, )

Source

pub fn self_name(&self) -> &str

Get the node name (actor) for this sync manager

Source

pub fn sync_worker_state( &self, worker_id: String, model_id: String, url: String, health: bool, load: f64, spec: Vec<u8>, )

Sync worker state to mesh stores

Source

pub fn remove_worker_state(&self, worker_id: &str)

Remove worker state from mesh stores

Source

pub fn sync_policy_state( &self, model_id: String, policy_type: String, config: Vec<u8>, )

Sync policy state to mesh stores

Source

pub fn remove_policy_state(&self, model_id: &str)

Remove policy state from mesh stores

Source

pub fn get_worker_state(&self, worker_id: &str) -> Option<WorkerState>

Get worker state from mesh stores

Source

pub fn get_all_worker_states(&self) -> Vec<WorkerState>

Get all worker states from mesh stores

Source

pub fn get_policy_state(&self, model_id: &str) -> Option<PolicyState>

Get policy state from mesh stores

Source

pub fn get_all_policy_states(&self) -> Vec<PolicyState>

Get all policy states from mesh stores

Source

pub fn apply_remote_worker_state( &self, state: WorkerState, actor: Option<String>, )

Apply worker state update from remote node The actor should be extracted from the state update context (e.g., from StateUpdate message)

Source

pub fn apply_remote_policy_state( &self, state: PolicyState, actor: Option<String>, )

Apply policy state update from remote node The actor should be extracted from the state update context (e.g., from StateUpdate message)

Source

pub fn update_rate_limit_membership(&self)

Update rate-limit hash ring with current membership

Source

pub fn handle_node_failure(&self, failed_nodes: &[String])

Handle node failure and transfer rate-limit ownership

Source

pub fn sync_rate_limit_inc(&self, key: String, delta: i64)

Sync rate-limit counter increment (only if this node is an owner)

Source

pub fn apply_remote_rate_limit_counter(&self, log: &OperationLog)

Apply remote rate-limit counter update (merge CRDT)

Source

pub fn apply_remote_rate_limit_counter_value( &self, key: String, counter_value: i64, )

Apply remote rate-limit counter snapshot encoded as raw i64.

Source

pub fn apply_remote_rate_limit_counter_value_with_actor( &self, key: String, actor: String, counter_value: i64, )

Source

pub fn apply_remote_rate_limit_counter_value_with_actor_and_timestamp( &self, key: String, actor: String, counter_value: i64, timestamp: u64, )

Source

pub fn get_rate_limit_value(&self, key: &str) -> Option<i64>

Get rate-limit value (aggregate from all owners)

Source

pub fn get_global_rate_limit_config(&self) -> Option<RateLimitConfig>

Get global rate limit configuration from AppStore

Source

pub fn check_global_rate_limit(&self) -> (bool, i64, u64)

Check if global rate limit is exceeded Returns (is_exceeded, current_count, limit)

Source

pub fn reset_global_rate_limit_counter(&self)

Reset global rate limit counter (called periodically for time window reset)

Source

pub fn sync_tree_insert_hash( &self, model_id: &str, path_hash: u64, tenant: &str, )

Sync tree operation to mesh stores.

This is called on every request (hot path). The operation is appended to the pending buffer for delta sync — the collector serializes and sends it to peers. We do NOT read/deserialize/serialize the full TreeState here, because that is O(tree_size) per request and caused multi-GB memory usage at 200+ rps.

The policy store version is bumped so the generation-based collector detects the change, but the config blob is NOT updated on every call. It is rebuilt lazily by the collector when a full-state fallback is needed. Lightweight sync: accepts a pre-computed hash + tenant, avoiding the 80k+ String allocation from TreeKey::Text on every request.

Source

pub fn sync_tree_operation( &self, model_id: String, operation: TreeOperation, ) -> Result<(), String>

Source

pub fn get_tree_state(&self, model_id: &str) -> Option<TreeState>

Get tree state for a model from mesh stores. Reads from tree_configs (populated by periodic checkpoint from live tree).

Source

pub fn get_all_tree_states(&self) -> Vec<TreeState>

Source

pub fn apply_remote_tree_operation( &self, model_id: String, tree_state: TreeState, actor: Option<String>, )

Apply remote tree operation to local stores. This is called when receiving full tree state updates from other nodes.

Writes to tree_configs (plain DashMap) instead of the CRDT policy store to avoid operation log memory accumulation.

Uses DashMap::entry() for atomic read-modify-write on tree_configs to avoid the TOCTOU gap between get() and insert().

Source

pub fn apply_remote_tree_delta( &self, delta: TreeStateDelta, actor: Option<String>, )

Apply a delta (incremental operations) from a remote node. Merges the delta operations into the existing local tree state, avoiding the cost of replacing the entire tree state on every sync.

Uses DashMap::entry() for atomic read-modify-write on tree_configs to avoid the TOCTOU gap between get() and insert().

Source

pub fn apply_remote_tenant_delta( &self, delta: TenantDelta, _actor: Option<String>, )

Apply a lightweight tenant delta from a remote node. Updates the local radix tree directly via subscribers without going through the CRDT or the full TreeState machinery.

Source

pub fn checkpoint_tree_states(&self)

Checkpoint tree state by exporting compact snapshots from the live radix tree via subscribers.

Called periodically (~every 10s) to keep tree_configs fresh for the periodic structure snapshot (every 30 gossip rounds).

Uses TreeStateSubscriber::export_tree_snapshot to obtain a compact kv_index::snapshot::TreeSnapshot that preserves shared prefixes. This is much smaller than the flat TreeState produced by export_tree_state (~2-4 MB vs ~40 MB for 2048 entries sharing 80% prefixes) and avoids accumulating full prompt text in memory.

Trait Implementations§

Source§

impl Clone for MeshSyncManager

Source§

fn clone(&self) -> MeshSyncManager

Returns a duplicate of the value. Read more
1.0.0 · Source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more
Source§

impl Debug for MeshSyncManager

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> CloneToUninit for T
where T: Clone,

Source§

unsafe fn clone_to_uninit(&self, dest: *mut u8)

🔬This is a nightly-only experimental API. (clone_to_uninit)
Performs copy-assignment from self to dest. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> FromRef<T> for T
where T: Clone,

Source§

fn from_ref(input: &T) -> T

Converts to this type from a reference to the input type.
Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> IntoRequest<T> for T

Source§

fn into_request(self) -> Request<T>

Wrap the input message T in a tonic::Request
Source§

impl<L> LayerExt<L> for L

Source§

fn named_layer<S>(&self, service: S) -> Layered<<L as Layer<S>>::Service, S>
where L: Layer<S>,

Applies the layer to a service and wraps it in Layered.
Source§

impl<T> ToOwned for T
where T: Clone,

Source§

type Owned = T

The resulting type after obtaining ownership.
Source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
Source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V

Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more