pub struct MeshSyncManager { /* private fields */ }Expand description
Mesh sync manager for coordinating state synchronization
Implementations§
Source§impl MeshSyncManager
impl MeshSyncManager
pub fn new(stores: Arc<StateStores>, self_name: String) -> Self
pub fn register_tree_state_subscriber( &self, subscriber: Arc<dyn TreeStateSubscriber>, )
pub fn register_worker_state_subscriber( &self, subscriber: Arc<dyn WorkerStateSubscriber>, )
Sourcepub fn sync_worker_state(
&self,
worker_id: String,
model_id: String,
url: String,
health: bool,
load: f64,
spec: Vec<u8>,
)
pub fn sync_worker_state( &self, worker_id: String, model_id: String, url: String, health: bool, load: f64, spec: Vec<u8>, )
Sync worker state to mesh stores
Sourcepub fn remove_worker_state(&self, worker_id: &str)
pub fn remove_worker_state(&self, worker_id: &str)
Remove worker state from mesh stores
Sourcepub fn sync_policy_state(
&self,
model_id: String,
policy_type: String,
config: Vec<u8>,
)
pub fn sync_policy_state( &self, model_id: String, policy_type: String, config: Vec<u8>, )
Sync policy state to mesh stores
Sourcepub fn remove_policy_state(&self, model_id: &str)
pub fn remove_policy_state(&self, model_id: &str)
Remove policy state from mesh stores
Sourcepub fn get_worker_state(&self, worker_id: &str) -> Option<WorkerState>
pub fn get_worker_state(&self, worker_id: &str) -> Option<WorkerState>
Get worker state from mesh stores
Sourcepub fn get_all_worker_states(&self) -> Vec<WorkerState>
pub fn get_all_worker_states(&self) -> Vec<WorkerState>
Get all worker states from mesh stores
Sourcepub fn get_policy_state(&self, model_id: &str) -> Option<PolicyState>
pub fn get_policy_state(&self, model_id: &str) -> Option<PolicyState>
Get policy state from mesh stores
Sourcepub fn get_all_policy_states(&self) -> Vec<PolicyState>
pub fn get_all_policy_states(&self) -> Vec<PolicyState>
Get all policy states from mesh stores
Sourcepub fn apply_remote_worker_state(
&self,
state: WorkerState,
actor: Option<String>,
)
pub fn apply_remote_worker_state( &self, state: WorkerState, actor: Option<String>, )
Apply worker state update from remote node The actor should be extracted from the state update context (e.g., from StateUpdate message)
Sourcepub fn apply_remote_policy_state(
&self,
state: PolicyState,
actor: Option<String>,
)
pub fn apply_remote_policy_state( &self, state: PolicyState, actor: Option<String>, )
Apply policy state update from remote node The actor should be extracted from the state update context (e.g., from StateUpdate message)
Sourcepub fn update_rate_limit_membership(&self)
pub fn update_rate_limit_membership(&self)
Update rate-limit hash ring with current membership
Sourcepub fn handle_node_failure(&self, failed_nodes: &[String])
pub fn handle_node_failure(&self, failed_nodes: &[String])
Handle node failure and transfer rate-limit ownership
Sourcepub fn sync_rate_limit_inc(&self, key: String, delta: i64)
pub fn sync_rate_limit_inc(&self, key: String, delta: i64)
Sync rate-limit counter increment (only if this node is an owner)
Sourcepub fn apply_remote_rate_limit_counter(&self, log: &OperationLog)
pub fn apply_remote_rate_limit_counter(&self, log: &OperationLog)
Apply remote rate-limit counter update (merge CRDT)
Sourcepub fn apply_remote_rate_limit_counter_value(
&self,
key: String,
counter_value: i64,
)
pub fn apply_remote_rate_limit_counter_value( &self, key: String, counter_value: i64, )
Apply remote rate-limit counter snapshot encoded as raw i64.
pub fn apply_remote_rate_limit_counter_value_with_actor( &self, key: String, actor: String, counter_value: i64, )
pub fn apply_remote_rate_limit_counter_value_with_actor_and_timestamp( &self, key: String, actor: String, counter_value: i64, timestamp: u64, )
Sourcepub fn get_rate_limit_value(&self, key: &str) -> Option<i64>
pub fn get_rate_limit_value(&self, key: &str) -> Option<i64>
Get rate-limit value (aggregate from all owners)
Sourcepub fn get_global_rate_limit_config(&self) -> Option<RateLimitConfig>
pub fn get_global_rate_limit_config(&self) -> Option<RateLimitConfig>
Get global rate limit configuration from AppStore
Sourcepub fn check_global_rate_limit(&self) -> (bool, i64, u64)
pub fn check_global_rate_limit(&self) -> (bool, i64, u64)
Check if global rate limit is exceeded Returns (is_exceeded, current_count, limit)
Sourcepub fn reset_global_rate_limit_counter(&self)
pub fn reset_global_rate_limit_counter(&self)
Reset global rate limit counter (called periodically for time window reset)
Sourcepub fn sync_tree_insert_hash(
&self,
model_id: &str,
path_hash: u64,
tenant: &str,
)
pub fn sync_tree_insert_hash( &self, model_id: &str, path_hash: u64, tenant: &str, )
Sync tree operation to mesh stores.
This is called on every request (hot path). The operation is appended to the pending buffer for delta sync — the collector serializes and sends it to peers. We do NOT read/deserialize/serialize the full TreeState here, because that is O(tree_size) per request and caused multi-GB memory usage at 200+ rps.
The policy store version is bumped so the generation-based collector
detects the change, but the config blob is NOT updated on every call.
It is rebuilt lazily by the collector when a full-state fallback is needed.
Lightweight sync: accepts a pre-computed hash + tenant, avoiding
the 80k+ String allocation from TreeKey::Text on every request.
pub fn sync_tree_operation( &self, model_id: String, operation: TreeOperation, ) -> Result<(), String>
Sourcepub fn get_tree_state(&self, model_id: &str) -> Option<TreeState>
pub fn get_tree_state(&self, model_id: &str) -> Option<TreeState>
Get tree state for a model from mesh stores.
Reads from tree_configs (populated by periodic checkpoint from live tree).
pub fn get_all_tree_states(&self) -> Vec<TreeState>
Sourcepub fn apply_remote_tree_operation(
&self,
model_id: String,
tree_state: TreeState,
actor: Option<String>,
)
pub fn apply_remote_tree_operation( &self, model_id: String, tree_state: TreeState, actor: Option<String>, )
Apply remote tree operation to local stores. This is called when receiving full tree state updates from other nodes.
Writes to tree_configs (plain DashMap) instead of the CRDT policy
store to avoid operation log memory accumulation.
Uses DashMap::entry() for atomic read-modify-write on tree_configs
to avoid the TOCTOU gap between get() and insert().
Sourcepub fn apply_remote_tree_delta(
&self,
delta: TreeStateDelta,
actor: Option<String>,
)
pub fn apply_remote_tree_delta( &self, delta: TreeStateDelta, actor: Option<String>, )
Apply a delta (incremental operations) from a remote node. Merges the delta operations into the existing local tree state, avoiding the cost of replacing the entire tree state on every sync.
Uses DashMap::entry() for atomic read-modify-write on tree_configs
to avoid the TOCTOU gap between get() and insert().
Sourcepub fn apply_remote_tenant_delta(
&self,
delta: TenantDelta,
_actor: Option<String>,
)
pub fn apply_remote_tenant_delta( &self, delta: TenantDelta, _actor: Option<String>, )
Apply a lightweight tenant delta from a remote node. Updates the local radix tree directly via subscribers without going through the CRDT or the full TreeState machinery.
Sourcepub fn checkpoint_tree_states(&self)
pub fn checkpoint_tree_states(&self)
Checkpoint tree state by exporting compact snapshots from the live radix tree via subscribers.
Called periodically (~every 10s) to keep tree_configs fresh for
the periodic structure snapshot (every 30 gossip rounds).
Uses TreeStateSubscriber::export_tree_snapshot to obtain a
compact kv_index::snapshot::TreeSnapshot that preserves shared
prefixes. This is much smaller than the flat TreeState produced
by export_tree_state (~2-4 MB vs ~40 MB for 2048 entries sharing
80% prefixes) and avoids accumulating full prompt text in memory.
Trait Implementations§
Source§impl Clone for MeshSyncManager
impl Clone for MeshSyncManager
Source§fn clone(&self) -> MeshSyncManager
fn clone(&self) -> MeshSyncManager
1.0.0 · Source§fn clone_from(&mut self, source: &Self)
fn clone_from(&mut self, source: &Self)
source. Read moreAuto Trait Implementations§
impl Freeze for MeshSyncManager
impl !RefUnwindSafe for MeshSyncManager
impl Send for MeshSyncManager
impl Sync for MeshSyncManager
impl Unpin for MeshSyncManager
impl UnsafeUnpin for MeshSyncManager
impl !UnwindSafe for MeshSyncManager
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> CloneToUninit for Twhere
T: Clone,
impl<T> CloneToUninit for Twhere
T: Clone,
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
Source§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
T in a tonic::Request