Skip to main content

smg_mesh/
sync.rs

1//! Mesh state synchronization module
2//!
3//! Handles synchronization of worker and policy states across mesh cluster nodes
4
5use std::{
6    fmt::Debug,
7    sync::{atomic::Ordering, Arc},
8};
9
10use parking_lot::RwLock;
11use tracing::{debug, warn};
12
13use super::{
14    service::gossip::NodeStatus,
15    stores::{
16        policy_key, tree_state_key, PolicyState, RateLimitConfig, StateStores, WorkerState,
17        GLOBAL_RATE_LIMIT_COUNTER_KEY, GLOBAL_RATE_LIMIT_KEY,
18    },
19    tree_ops::{
20        hash_node_path, hash_token_path, TenantDelta, TenantEvict, TenantInsert, TreeKey,
21        TreeOperation, TreeState, TreeStateDelta,
22    },
23};
24
25pub trait TreeStateSubscriber: Send + Sync + Debug {
26    fn apply_remote_tree_state(&self, model_id: &str, tree_state: &TreeState);
27
28    /// Apply lightweight tenant delta — inserts and evictions by hash.
29    /// Default: process global evictions only (where `node_path_hash == GLOBAL_EVICTION_HASH`).
30    /// Inserts require the actual tree to resolve hashes to nodes,
31    /// so they are dropped here; implementations that maintain a
32    /// hash→node index (e.g. `CacheAwarePolicy`) should override.
33    fn apply_tenant_delta(
34        &self,
35        model_id: &str,
36        _inserts: &[TenantInsert],
37        evictions: &[TenantEvict],
38    ) {
39        // Default: only convert global evictions (hash=GLOBAL_EVICTION_HASH)
40        // into Remove ops. Targeted evictions (non-zero hash) are skipped
41        // because we can't resolve the hash without a path index.
42        let global_evictions: Vec<&TenantEvict> = evictions
43            .iter()
44            .filter(|e| e.node_path_hash == crate::tree_ops::GLOBAL_EVICTION_HASH)
45            .collect();
46
47        if !global_evictions.is_empty() {
48            let mut tree_state = TreeState::new(model_id.to_string());
49            for evict in global_evictions {
50                tree_state.add_operation(TreeOperation::Remove(crate::tree_ops::TreeRemoveOp {
51                    tenant: evict.worker_url.clone(),
52                }));
53            }
54            self.apply_remote_tree_state(model_id, &tree_state);
55        }
56    }
57
58    /// Export the current tree state for a model from the live radix tree.
59    /// Used by `checkpoint_tree_states` to build periodic structure snapshots
60    /// WITHOUT accumulating full prompt text in memory on every request.
61    /// Returns None if the subscriber doesn't have a tree for this model.
62    fn export_tree_state(&self, _model_id: &str) -> Option<TreeState> {
63        None
64    }
65
66    /// Export a compact tree snapshot for a model from the live radix tree.
67    /// Returns a [`kv_index::snapshot::TreeSnapshot`] that encodes the tree
68    /// structure with shared prefixes — much smaller than the flat
69    /// `TreeState` returned by [`export_tree_state`].
70    ///
71    /// Used by `checkpoint_tree_states` to populate `tree_configs` for
72    /// Layer 2 periodic snapshots.
73    fn export_tree_snapshot(&self, _model_id: &str) -> Option<kv_index::snapshot::TreeSnapshot> {
74        None
75    }
76}
77
78pub trait WorkerStateSubscriber: Send + Sync + Debug {
79    fn on_remote_worker_state(&self, state: &WorkerState);
80}
81
82/// Mesh sync manager for coordinating state synchronization
83#[derive(Clone, Debug)]
84pub struct MeshSyncManager {
85    pub(crate) stores: Arc<StateStores>,
86    self_name: String,
87    tree_state_subscribers: Arc<RwLock<Vec<Arc<dyn TreeStateSubscriber>>>>,
88    worker_state_subscribers: Arc<RwLock<Vec<Arc<dyn WorkerStateSubscriber>>>>,
89}
90
91impl MeshSyncManager {
92    pub fn new(stores: Arc<StateStores>, self_name: String) -> Self {
93        Self {
94            stores,
95            self_name,
96            tree_state_subscribers: Arc::new(RwLock::new(Vec::new())),
97            worker_state_subscribers: Arc::new(RwLock::new(Vec::new())),
98        }
99    }
100
101    pub fn register_tree_state_subscriber(&self, subscriber: Arc<dyn TreeStateSubscriber>) {
102        self.tree_state_subscribers.write().push(subscriber);
103    }
104
105    fn notify_tree_state_subscribers(&self, model_id: &str, tree_state: &TreeState) {
106        let subscribers = self.tree_state_subscribers.read().clone();
107        for subscriber in subscribers {
108            subscriber.apply_remote_tree_state(model_id, tree_state);
109        }
110    }
111
112    pub fn register_worker_state_subscriber(&self, subscriber: Arc<dyn WorkerStateSubscriber>) {
113        self.worker_state_subscribers.write().push(subscriber);
114    }
115
116    fn notify_worker_state_subscribers(&self, state: &WorkerState) {
117        let subscribers = self.worker_state_subscribers.read().clone();
118        for subscriber in subscribers {
119            subscriber.on_remote_worker_state(state);
120        }
121    }
122
123    /// Get the node name (actor) for this sync manager
124    pub fn self_name(&self) -> &str {
125        &self.self_name
126    }
127
128    /// Sync worker state to mesh stores
129    pub fn sync_worker_state(
130        &self,
131        worker_id: String,
132        model_id: String,
133        url: String,
134        health: bool,
135        load: f64,
136        spec: Vec<u8>,
137    ) {
138        let key = worker_id.clone();
139
140        let updated_state = self.stores.worker.update(key, |current| {
141            let new_version = current
142                .map(|state| state.version)
143                .unwrap_or(0)
144                .saturating_add(1);
145
146            WorkerState {
147                worker_id: worker_id.clone(),
148                model_id,
149                url,
150                health,
151                load,
152                version: new_version,
153                spec,
154            }
155        });
156
157        match updated_state {
158            Ok(Some(state)) => {
159                debug!(
160                    "Synced worker state to mesh {} (version: {})",
161                    state.worker_id, state.version
162                );
163            }
164            Ok(None) => {}
165            Err(err) => {
166                debug!(error = %err, worker_id = %worker_id, "Failed to sync worker state");
167            }
168        }
169    }
170
171    /// Remove worker state from mesh stores
172    pub fn remove_worker_state(&self, worker_id: &str) {
173        self.stores.worker.remove(worker_id);
174        debug!("Removed worker state from mesh {}", worker_id);
175    }
176
177    /// Sync policy state to mesh stores
178    pub fn sync_policy_state(&self, model_id: String, policy_type: String, config: Vec<u8>) {
179        let key = policy_key(&model_id);
180        let model_id_for_update = model_id.clone();
181
182        let updated_state = self.stores.policy.update(key, move |current| {
183            let new_version = current
184                .map(|state| state.version)
185                .unwrap_or(0)
186                .saturating_add(1);
187
188            PolicyState {
189                model_id: model_id_for_update,
190                policy_type,
191                config,
192                version: new_version,
193            }
194        });
195
196        match updated_state {
197            Ok(Some(state)) => {
198                debug!(
199                    "Synced policy state to mesh model={} (version: {})",
200                    state.model_id, state.version
201                );
202            }
203            Ok(None) => {}
204            Err(err) => {
205                debug!(error = %err, model_id = %model_id, "Failed to sync policy state");
206            }
207        }
208    }
209
210    /// Remove policy state from mesh stores
211    pub fn remove_policy_state(&self, model_id: &str) {
212        let key = policy_key(model_id);
213        self.stores.policy.remove(&key);
214        debug!("Removed policy state from mesh model={}", model_id);
215    }
216
217    /// Get worker state from mesh stores
218    pub fn get_worker_state(&self, worker_id: &str) -> Option<WorkerState> {
219        self.stores.worker.get(worker_id)
220    }
221
222    /// Get all worker states from mesh stores
223    pub fn get_all_worker_states(&self) -> Vec<WorkerState> {
224        self.stores.worker.all().into_values().collect()
225    }
226
227    /// Get policy state from mesh stores
228    pub fn get_policy_state(&self, model_id: &str) -> Option<PolicyState> {
229        let key = policy_key(model_id);
230        self.stores.policy.get(&key)
231    }
232
233    /// Get all policy states from mesh stores
234    pub fn get_all_policy_states(&self) -> Vec<PolicyState> {
235        self.stores.policy.all().into_values().collect()
236    }
237
238    /// Apply worker state update from remote node
239    /// The actor should be extracted from the state update context (e.g., from StateUpdate message)
240    pub fn apply_remote_worker_state(&self, state: WorkerState, actor: Option<String>) {
241        let key = state.worker_id.clone();
242        let actor = actor.unwrap_or_else(|| "remote".to_string());
243        let mut current_version = 0;
244
245        let update_result = self.stores.worker.update_if(key, |current| {
246            current_version = current
247                .as_ref()
248                .map(|existing| existing.version)
249                .unwrap_or(0);
250            if state.version > current_version {
251                Some(state.clone())
252            } else {
253                None
254            }
255        });
256
257        match update_result {
258            Ok((_, true)) => {
259                debug!(
260                    "Applied remote worker state update: {} (version: {} -> {})",
261                    state.worker_id, current_version, state.version
262                );
263                self.notify_worker_state_subscribers(&state);
264            }
265            Ok((_, false)) => {
266                debug!(
267                    "Skipped remote worker state update: {} (version {} <= current {})",
268                    state.worker_id, state.version, current_version
269                );
270            }
271            Err(err) => {
272                debug!(error = %err, worker_id = %state.worker_id, actor = %actor, "Failed to apply remote worker state update");
273            }
274        }
275    }
276
277    /// Apply policy state update from remote node
278    /// The actor should be extracted from the state update context (e.g., from StateUpdate message)
279    pub fn apply_remote_policy_state(&self, state: PolicyState, actor: Option<String>) {
280        let key = policy_key(&state.model_id);
281        let actor = actor.unwrap_or_else(|| "remote".to_string());
282        let mut current_version = 0;
283
284        let update_result = self.stores.policy.update_if(key, |current| {
285            current_version = current
286                .as_ref()
287                .map(|existing| existing.version)
288                .unwrap_or(0);
289            if state.version > current_version {
290                Some(state.clone())
291            } else {
292                None
293            }
294        });
295
296        match update_result {
297            Ok((_, true)) => {
298                debug!(
299                    "Applied remote policy state update: {} (version: {} -> {})",
300                    state.model_id, current_version, state.version
301                );
302            }
303            Ok((_, false)) => {
304                debug!(
305                    "Skipped remote policy state update: {} (version {} <= current {})",
306                    state.model_id, state.version, current_version
307                );
308            }
309            Err(err) => {
310                debug!(error = %err, model_id = %state.model_id, actor = %actor, "Failed to apply remote policy state update");
311            }
312        }
313    }
314
315    /// Update rate-limit hash ring with current membership
316    pub fn update_rate_limit_membership(&self) {
317        // Get all alive nodes from membership store
318        let all_members = self.stores.membership.all();
319        let alive_nodes: Vec<String> = all_members
320            .values()
321            .filter(|m| m.status == NodeStatus::Alive as i32)
322            .map(|m| m.name.clone())
323            .collect();
324
325        self.stores.rate_limit.update_membership(&alive_nodes);
326        debug!(
327            "Updated rate-limit hash ring with {} alive nodes",
328            alive_nodes.len()
329        );
330    }
331
332    /// Handle node failure and transfer rate-limit ownership
333    pub fn handle_node_failure(&self, failed_nodes: &[String]) {
334        if failed_nodes.is_empty() {
335            return;
336        }
337
338        debug!("Handling node failure for rate-limit: {:?}", failed_nodes);
339
340        // Check which keys need ownership transfer
341        let affected_keys = self
342            .stores
343            .rate_limit
344            .check_ownership_transfer(failed_nodes);
345
346        if !affected_keys.is_empty() {
347            debug!(
348                "Ownership transfer needed for {} rate-limit keys",
349                affected_keys.len()
350            );
351
352            // Update membership to reflect node failures
353            self.update_rate_limit_membership();
354
355            // For each affected key, we may need to initialize counters if we're now an owner
356            for key in &affected_keys {
357                if self.stores.rate_limit.is_owner(key) {
358                    debug!("This node is now owner of rate-limit key: {}", key);
359                    // Counter will be created on first inc() call
360                }
361            }
362        }
363    }
364
365    /// Sync rate-limit counter increment (only if this node is an owner)
366    pub fn sync_rate_limit_inc(&self, key: String, delta: i64) {
367        if !self.stores.rate_limit.is_owner(&key) {
368            // Not an owner, skip
369            return;
370        }
371
372        self.stores
373            .rate_limit
374            .inc(key.clone(), self.self_name.clone(), delta);
375        debug!("Synced rate-limit increment: key={}, delta={}", key, delta);
376    }
377
378    /// Apply remote rate-limit counter update (merge CRDT)
379    pub fn apply_remote_rate_limit_counter(&self, log: &super::crdt_kv::OperationLog) {
380        // Merge operation log regardless of ownership (for CRDT consistency)
381        self.stores.rate_limit.merge(log);
382        debug!("Applied remote rate-limit counter update");
383    }
384
385    /// Apply remote rate-limit counter snapshot encoded as raw i64.
386    pub fn apply_remote_rate_limit_counter_value(&self, key: String, counter_value: i64) {
387        self.apply_remote_rate_limit_counter_value_with_actor_and_timestamp(
388            key,
389            "remote".to_string(),
390            counter_value,
391            0,
392        );
393    }
394
395    pub fn apply_remote_rate_limit_counter_value_with_actor(
396        &self,
397        key: String,
398        actor: String,
399        counter_value: i64,
400    ) {
401        self.apply_remote_rate_limit_counter_value_with_actor_and_timestamp(
402            key,
403            actor,
404            counter_value,
405            0,
406        );
407    }
408
409    pub fn apply_remote_rate_limit_counter_value_with_actor_and_timestamp(
410        &self,
411        key: String,
412        actor: String,
413        counter_value: i64,
414        timestamp: u64,
415    ) {
416        if let Some((shard_key, payload)) =
417            super::stores::RateLimitStore::snapshot_payload_for_counter_value(
418                key,
419                actor.clone(),
420                counter_value,
421            )
422        {
423            self.stores
424                .rate_limit
425                .apply_counter_snapshot_payload(shard_key, &actor, timestamp, &payload);
426            debug!("Applied remote rate-limit counter snapshot payload");
427        }
428    }
429
430    /// Get rate-limit value (aggregate from all owners)
431    pub fn get_rate_limit_value(&self, key: &str) -> Option<i64> {
432        self.stores.rate_limit.value(key)
433    }
434
435    /// Get global rate limit configuration from AppStore
436    pub fn get_global_rate_limit_config(&self) -> Option<RateLimitConfig> {
437        self.stores
438            .app
439            .get(GLOBAL_RATE_LIMIT_KEY)
440            .and_then(|app_state| bincode::deserialize::<RateLimitConfig>(&app_state.value).ok())
441    }
442
443    /// Check if global rate limit is exceeded
444    /// Returns (is_exceeded, current_count, limit)
445    pub fn check_global_rate_limit(&self) -> (bool, i64, u64) {
446        let config = self.get_global_rate_limit_config().unwrap_or_default();
447
448        if config.limit_per_second == 0 {
449            // Rate limit disabled
450            return (false, 0, 0);
451        }
452
453        // Increment counter if this node is an owner
454        self.sync_rate_limit_inc(GLOBAL_RATE_LIMIT_COUNTER_KEY.to_string(), 1);
455
456        // Get aggregated counter value from all owners
457        let current_count = self
458            .get_rate_limit_value(GLOBAL_RATE_LIMIT_COUNTER_KEY)
459            .unwrap_or(0);
460
461        let is_exceeded = current_count > config.limit_per_second as i64;
462        (is_exceeded, current_count, config.limit_per_second)
463    }
464
465    /// Reset global rate limit counter (called periodically for time window reset)
466    pub fn reset_global_rate_limit_counter(&self) {
467        // Reset by decrementing the current value
468        // Since we use PNCounter, we can't directly reset, but we can track the window
469        // For simplicity, we'll use a time-based approach where counters are reset periodically
470        // The actual reset logic will be handled by the window manager
471        let current_count = self
472            .get_rate_limit_value(GLOBAL_RATE_LIMIT_COUNTER_KEY)
473            .unwrap_or(0);
474
475        if current_count > 0 {
476            // Decrement by current count to effectively reset
477            // Note: This is a workaround since PNCounter doesn't support direct reset
478            // In production, you might want to use a different approach like timestamped counters
479            self.sync_rate_limit_inc(GLOBAL_RATE_LIMIT_COUNTER_KEY.to_string(), -current_count);
480        }
481    }
482
483    /// Sync tree operation to mesh stores.
484    ///
485    /// This is called on every request (hot path). The operation is appended to
486    /// the pending buffer for delta sync — the collector serializes and sends it
487    /// to peers. We do NOT read/deserialize/serialize the full TreeState here,
488    /// because that is O(tree_size) per request and caused multi-GB memory usage
489    /// at 200+ rps.
490    ///
491    /// The policy store version is bumped so the generation-based collector
492    /// detects the change, but the `config` blob is NOT updated on every call.
493    /// It is rebuilt lazily by the collector when a full-state fallback is needed.
494    /// Lightweight sync: accepts a pre-computed hash + tenant, avoiding
495    /// the 80k+ String allocation from TreeKey::Text on every request.
496    pub fn sync_tree_insert_hash(&self, model_id: &str, path_hash: u64, tenant: &str) {
497        let key = tree_state_key(model_id);
498
499        self.stores
500            .tenant_delta_inserts
501            .entry(model_id.to_string())
502            .or_default()
503            .push(TenantInsert {
504                node_path_hash: path_hash,
505                worker_url: tenant.to_string(),
506                epoch: self.stores.tree_version(&key),
507            });
508
509        self.stores.bump_tree_version(&key);
510    }
511
512    #[expect(
513        clippy::unnecessary_wraps,
514        reason = "Public API — callers handle Result; changing return type is a cross-crate break"
515    )]
516    pub fn sync_tree_operation(
517        &self,
518        model_id: String,
519        operation: TreeOperation,
520    ) -> Result<(), String> {
521        let key = tree_state_key(&model_id);
522
523        // Buffer a lightweight tenant delta — 24 bytes per insert (hash + epoch)
524        // instead of 80k+ bytes (full prompt text).
525        match &operation {
526            TreeOperation::Insert(insert) => {
527                let path_hash = match &insert.key {
528                    TreeKey::Text(text) => hash_node_path(text),
529                    TreeKey::Tokens(tokens) => hash_token_path(tokens),
530                };
531                self.stores
532                    .tenant_delta_inserts
533                    .entry(model_id.clone())
534                    .or_default()
535                    .push(TenantInsert {
536                        node_path_hash: path_hash,
537                        worker_url: insert.tenant.clone(),
538                        epoch: self.stores.tree_version(&key),
539                    });
540            }
541            TreeOperation::Remove(remove) => {
542                // TODO: capture the specific prefix hash being evicted.
543                // For now, 0 means "evict from all nodes" (global eviction).
544                // This is overly aggressive but correct — the next structure
545                // snapshot will restore any wrongly evicted entries.
546                self.stores
547                    .tenant_delta_evictions
548                    .entry(model_id.clone())
549                    .or_default()
550                    .push(TenantEvict {
551                        node_path_hash: crate::tree_ops::GLOBAL_EVICTION_HASH,
552                        worker_url: remove.tenant.clone(),
553                    });
554            }
555        }
556
557        // NOTE: We intentionally do NOT push to tree_ops_pending here.
558        // That would store the full TreeOperation (including 20KB+ prompt text)
559        // on every request — 40MB between checkpoints at 200 rps.
560        // Instead, checkpoint_tree_states exports the live tree via subscribers.
561
562        // Bump the lightweight atomic version counter (O(1), no serialization).
563        self.stores.bump_tree_version(&key);
564
565        Ok(())
566    }
567
568    /// Load the materialized TreeState from `tree_configs`.
569    /// Returns None if no checkpoint exists for this key.
570    ///
571    /// Handles two storage formats:
572    /// - `TreeState` bytes (from remote full-state updates)
573    /// - `TreeSnapshot` bytes (from local `checkpoint_tree_states`)
574    fn materialize_tree_state(&self, key: &str, model_id: &str) -> Option<TreeState> {
575        let config_bytes = self.stores.tree_configs.get(key)?;
576        let bytes = config_bytes.value();
577        if bytes.is_empty() {
578            return Some(TreeState::new(model_id.to_string()));
579        }
580        // Try TreeState first (remote full-state updates store this format).
581        if let Ok(ts) = TreeState::from_bytes(bytes) {
582            return Some(ts);
583        }
584        // Fall back to TreeSnapshot (local checkpoint format).
585        if let Ok(snap) = kv_index::snapshot::TreeSnapshot::from_bytes(bytes) {
586            let version = self.stores.tree_version(key);
587            return Some(TreeState::from_snapshot(
588                model_id.to_string(),
589                &snap,
590                version,
591            ));
592        }
593        None
594    }
595
596    /// Get tree state for a model from mesh stores.
597    /// Reads from `tree_configs` (populated by periodic checkpoint from live tree).
598    pub fn get_tree_state(&self, model_id: &str) -> Option<TreeState> {
599        let key = tree_state_key(model_id);
600        self.materialize_tree_state(&key, model_id)
601    }
602
603    pub fn get_all_tree_states(&self) -> Vec<TreeState> {
604        let mut results = Vec::new();
605
606        for entry in &self.stores.tree_configs {
607            let key = entry.key().clone();
608            let model_id = key.strip_prefix("tree:").unwrap_or(&key).to_string();
609            if let Some(ts) = self.materialize_tree_state(&key, &model_id) {
610                results.push(ts);
611            }
612        }
613
614        results
615    }
616
617    /// Apply remote tree operation to local stores.
618    /// This is called when receiving full tree state updates from other nodes.
619    ///
620    /// Writes to `tree_configs` (plain DashMap) instead of the CRDT policy
621    /// store to avoid operation log memory accumulation.
622    ///
623    /// Uses `DashMap::entry()` for atomic read-modify-write on `tree_configs`
624    /// to avoid the TOCTOU gap between `get()` and `insert()`.
625    pub fn apply_remote_tree_operation(
626        &self,
627        model_id: String,
628        tree_state: TreeState,
629        actor: Option<String>,
630    ) {
631        use dashmap::mapref::entry::Entry;
632
633        let key = tree_state_key(&model_id);
634        let _actor = actor.unwrap_or_else(|| "remote".to_string());
635
636        let serialized = match tree_state.to_bytes() {
637            Ok(bytes) => bytes,
638            Err(err) => {
639                debug!(error = %err, model_id = %model_id, "Failed to serialize remote tree state");
640                return;
641            }
642        };
643
644        // Atomic read-modify-write via entry() — version check and insert
645        // happen under the same shard lock, closing the TOCTOU gap.
646        let applied = match self.stores.tree_configs.entry(key.clone()) {
647            Entry::Occupied(mut entry) => {
648                // tree_configs may hold TreeState bytes (from remote) or
649                // TreeSnapshot bytes (from local checkpoint). Fall back to
650                // the authoritative atomic version counter if deserialization fails.
651                let current_version = TreeState::from_bytes(entry.get())
652                    .ok()
653                    .map(|ts| ts.version)
654                    .unwrap_or_else(|| self.stores.tree_version(&key));
655                if tree_state.version > current_version {
656                    entry.insert(serialized);
657                    debug!(
658                        "Applied remote tree state update: model={} (version: {} -> {})",
659                        model_id, current_version, tree_state.version
660                    );
661                    true
662                } else {
663                    debug!(
664                        "Skipped remote tree state update: model={} (version {} <= current {})",
665                        model_id, tree_state.version, current_version
666                    );
667                    false
668                }
669            }
670            Entry::Vacant(entry) => {
671                entry.insert(serialized);
672                debug!(
673                    "Applied remote tree state update (new): model={} (version: {})",
674                    model_id, tree_state.version
675                );
676                true
677            }
678        };
679
680        // Subscriber notification and version advancement happen after
681        // dropping the entry (shard lock released).
682        if applied {
683            self.stores.advance_tree_version(&key, tree_state.version);
684            self.stores.tree_generation.fetch_add(1, Ordering::Release);
685            self.notify_tree_state_subscribers(&model_id, &tree_state);
686        }
687    }
688
689    /// Apply a delta (incremental operations) from a remote node.
690    /// Merges the delta operations into the existing local tree state,
691    /// avoiding the cost of replacing the entire tree state on every sync.
692    ///
693    /// Uses `DashMap::entry()` for atomic read-modify-write on `tree_configs`
694    /// to avoid the TOCTOU gap between `get()` and `insert()`.
695    pub fn apply_remote_tree_delta(&self, delta: TreeStateDelta, actor: Option<String>) {
696        use dashmap::mapref::entry::Entry;
697
698        let key = tree_state_key(&delta.model_id);
699        let _actor = actor.unwrap_or_else(|| "remote".to_string());
700        let model_id = delta.model_id.clone();
701        let ops_count = delta.operations.len();
702
703        // Perform the atomic read-modify-write inside the entry block.
704        // Tree construction and serialization happen while holding the
705        // shard write lock; subscriber notification happens after.
706        let result: Option<(TreeState, u64)> = match self.stores.tree_configs.entry(key.clone()) {
707            Entry::Occupied(mut entry) => {
708                let bytes = entry.get();
709                let current_version = if bytes.is_empty() {
710                    0
711                } else {
712                    match TreeState::from_bytes(bytes) {
713                        Ok(ts) => ts.version,
714                        Err(_) => 0,
715                    }
716                };
717
718                // Version checks
719                if delta.base_version > current_version || current_version >= delta.new_version {
720                    debug!(
721                        "Skipped remote tree delta: model={} (base_version={}, new_version={}, current={})",
722                        model_id, delta.base_version, delta.new_version, current_version
723                    );
724                    return;
725                }
726
727                // Build base tree from config only.
728                let mut tree_state = if bytes.is_empty() {
729                    if current_version > 0 {
730                        debug!(
731                            "Skipped remote tree delta: model={} (base_version={}, new_version={}, current={})",
732                            model_id, delta.base_version, delta.new_version, current_version
733                        );
734                        return;
735                    }
736                    TreeState::new(delta.model_id.clone())
737                } else {
738                    match TreeState::from_bytes(bytes) {
739                        Ok(state) => state,
740                        Err(err) => {
741                            warn!(
742                                model_id = %delta.model_id,
743                                error = %err,
744                                "Corrupted tree state — rejecting delta to avoid data loss"
745                            );
746                            return;
747                        }
748                    }
749                };
750
751                let old_version = current_version;
752                for op in &delta.operations {
753                    tree_state.add_operation(op.clone());
754                }
755                let new_version = tree_state.version;
756
757                match tree_state.to_bytes() {
758                    Ok(serialized) => {
759                        entry.insert(serialized);
760                        debug!(
761                            "Applied remote tree delta: model={} (version: {} -> +{} ops)",
762                            model_id, old_version, ops_count
763                        );
764                        Some((tree_state, new_version))
765                    }
766                    Err(err) => {
767                        debug!(error = %err, model_id = %model_id, "Failed to serialize tree state after delta apply");
768                        None
769                    }
770                }
771            }
772            Entry::Vacant(entry) => {
773                // No existing config — new tree from delta.
774                if delta.base_version > 0 {
775                    debug!(
776                        "Skipped remote tree delta: model={} (base_version={}, new_version={}, no local state)",
777                        model_id, delta.base_version, delta.new_version
778                    );
779                    return;
780                }
781                let mut tree_state = TreeState::new(delta.model_id.clone());
782                for op in &delta.operations {
783                    tree_state.add_operation(op.clone());
784                }
785                let new_version = tree_state.version;
786
787                match tree_state.to_bytes() {
788                    Ok(serialized) => {
789                        entry.insert(serialized);
790                        debug!(
791                            "Applied remote tree delta (new tree): model={} (+{} ops)",
792                            model_id, ops_count
793                        );
794                        Some((tree_state, new_version))
795                    }
796                    Err(err) => {
797                        debug!(error = %err, model_id = %model_id, "Failed to serialize new tree state from delta");
798                        None
799                    }
800                }
801            }
802        };
803
804        // Notification happens outside the entry block (shard lock released).
805        if let Some((tree_state, new_version)) = result {
806            self.stores.advance_tree_version(&key, new_version);
807            self.stores.tree_generation.fetch_add(1, Ordering::Release);
808            self.notify_tree_state_subscribers(&model_id, &tree_state);
809        }
810    }
811
812    /// Apply a lightweight tenant delta from a remote node.
813    /// Updates the local radix tree directly via subscribers without
814    /// going through the CRDT or the full TreeState machinery.
815    pub fn apply_remote_tenant_delta(&self, delta: TenantDelta, _actor: Option<String>) {
816        let key = tree_state_key(&delta.model_id);
817
818        if delta.inserts.is_empty() && delta.evictions.is_empty() {
819            return;
820        }
821
822        // No version check — both routers independently bump tree_version
823        // on local inserts, so the remote delta's version can be lower than
824        // the local version even though it contains novel inserts. Tenant
825        // inserts are idempotent (insert_text is a no-op if the tenant
826        // already exists at the node), so applying "stale" deltas is safe.
827
828        debug!(
829            model_id = %delta.model_id,
830            inserts = delta.inserts.len(),
831            evictions = delta.evictions.len(),
832            version = delta.version,
833            "Applying remote tenant delta"
834        );
835
836        // Clone subscriber list before calling back — same pattern as
837        // notify_tree_state_subscribers — so we don't hold the read guard
838        // during potentially expensive subscriber callbacks.
839        let subscribers = self.tree_state_subscribers.read().clone();
840        for subscriber in &subscribers {
841            subscriber.apply_tenant_delta(&delta.model_id, &delta.inserts, &delta.evictions);
842        }
843
844        // Advance version and bump generation so collector re-scans
845        self.stores.advance_tree_version(&key, delta.version);
846        self.stores.tree_generation.fetch_add(1, Ordering::Release);
847    }
848
849    /// Checkpoint tree state by exporting compact snapshots from the live
850    /// radix tree via subscribers.
851    ///
852    /// Called periodically (~every 10s) to keep `tree_configs` fresh for
853    /// the periodic structure snapshot (every 30 gossip rounds).
854    ///
855    /// Uses [`TreeStateSubscriber::export_tree_snapshot`] to obtain a
856    /// compact [`kv_index::snapshot::TreeSnapshot`] that preserves shared
857    /// prefixes.  This is much smaller than the flat `TreeState` produced
858    /// by `export_tree_state` (~2-4 MB vs ~40 MB for 2048 entries sharing
859    /// 80% prefixes) and avoids accumulating full prompt text in memory.
860    #[expect(
861        clippy::unused_self,
862        reason = "Public API called by controller — removing &self is a breaking change"
863    )]
864    pub fn checkpoint_tree_states(&self) {
865        // FIXME: Layer 2 (full tree snapshots) is disabled because the
866        // snapshot can be 170+ MB for large trees with long prompts, and
867        // allocating it every 10s causes allocator fragmentation. Tree data
868        // currently syncs via Layer 1 only (tenant deltas, ~50 bytes each).
869        // TODO: implement chunked snapshots or incremental tree diffs so
870        // Layer 2 works for large trees without excessive memory allocation.
871    }
872}
873
874/// Optional mesh sync manager (can be None if mesh is not enabled)
875pub type OptionalMeshSyncManager = Option<Arc<MeshSyncManager>>;
876
877#[cfg(test)]
878mod tests {
879    use std::{
880        collections::BTreeMap,
881        sync::{
882            atomic::{AtomicBool, Ordering},
883            Arc,
884        },
885    };
886
887    use super::*;
888    use crate::stores::{
889        AppState, MembershipState, RateLimitConfig, StateStores, GLOBAL_RATE_LIMIT_COUNTER_KEY,
890        GLOBAL_RATE_LIMIT_KEY,
891    };
892
893    fn create_test_sync_manager() -> MeshSyncManager {
894        let stores = Arc::new(StateStores::new());
895        MeshSyncManager::new(stores, "test_node".to_string())
896    }
897
898    fn create_test_manager(self_name: String) -> MeshSyncManager {
899        let stores = Arc::new(StateStores::with_self_name(self_name.clone()));
900        MeshSyncManager::new(stores, self_name)
901    }
902
903    #[derive(Debug)]
904    struct LockCheckingSubscriber {
905        manager: Arc<MeshSyncManager>,
906        can_acquire_write_lock: Arc<AtomicBool>,
907    }
908
909    impl TreeStateSubscriber for LockCheckingSubscriber {
910        fn apply_remote_tree_state(&self, _model_id: &str, _tree_state: &TreeState) {
911            let can_acquire_write_lock = self.manager.tree_state_subscribers.try_write().is_some();
912            self.can_acquire_write_lock
913                .store(can_acquire_write_lock, Ordering::SeqCst);
914        }
915    }
916
917    #[test]
918    fn test_sync_manager_new() {
919        let manager = create_test_sync_manager();
920        // Should create without panicking
921        assert_eq!(manager.get_all_worker_states().len(), 0);
922        assert_eq!(manager.get_all_policy_states().len(), 0);
923    }
924
925    #[test]
926    fn test_sync_worker_state() {
927        let manager = create_test_manager("node1".to_string());
928
929        manager.sync_worker_state(
930            "worker1".to_string(),
931            "model1".to_string(),
932            "http://localhost:8000".to_string(),
933            true,
934            0.5,
935            vec![],
936        );
937
938        let state = manager.get_worker_state("worker1").unwrap();
939        assert_eq!(state.worker_id, "worker1");
940        assert_eq!(state.model_id, "model1");
941        assert_eq!(state.url, "http://localhost:8000");
942        assert!(state.health);
943        assert_eq!(state.load, 0.5);
944        assert_eq!(state.version, 1);
945    }
946
947    #[test]
948    fn test_sync_multiple_worker_states() {
949        let manager = create_test_sync_manager();
950
951        manager.sync_worker_state(
952            "worker1".to_string(),
953            "model1".to_string(),
954            "http://localhost:8000".to_string(),
955            true,
956            0.5,
957            vec![],
958        );
959
960        manager.sync_worker_state(
961            "worker2".to_string(),
962            "model1".to_string(),
963            "http://localhost:8001".to_string(),
964            false,
965            0.8,
966            vec![],
967        );
968
969        manager.sync_worker_state(
970            "worker3".to_string(),
971            "model2".to_string(),
972            "http://localhost:8002".to_string(),
973            true,
974            0.3,
975            vec![],
976        );
977
978        let all_states = manager.get_all_worker_states();
979        assert_eq!(all_states.len(), 3);
980
981        let worker1 = manager.get_worker_state("worker1").unwrap();
982        assert_eq!(worker1.worker_id, "worker1");
983        assert!(worker1.health);
984
985        let worker2 = manager.get_worker_state("worker2").unwrap();
986        assert_eq!(worker2.worker_id, "worker2");
987        assert!(!worker2.health);
988
989        let worker3 = manager.get_worker_state("worker3").unwrap();
990        assert_eq!(worker3.worker_id, "worker3");
991        assert_eq!(worker3.model_id, "model2");
992    }
993
994    #[test]
995    fn test_sync_worker_state_version_increment() {
996        let manager = create_test_manager("node1".to_string());
997
998        manager.sync_worker_state(
999            "worker1".to_string(),
1000            "model1".to_string(),
1001            "http://localhost:8000".to_string(),
1002            true,
1003            0.5,
1004            vec![],
1005        );
1006
1007        let state1 = manager.get_worker_state("worker1").unwrap();
1008        assert_eq!(state1.version, 1);
1009
1010        manager.sync_worker_state(
1011            "worker1".to_string(),
1012            "model1".to_string(),
1013            "http://localhost:8000".to_string(),
1014            false,
1015            0.8,
1016            vec![],
1017        );
1018
1019        let state2 = manager.get_worker_state("worker1").unwrap();
1020        assert_eq!(state2.version, 2);
1021        assert!(!state2.health);
1022        assert_eq!(state2.load, 0.8);
1023    }
1024
1025    #[test]
1026    fn test_remove_worker_state() {
1027        let manager = create_test_manager("node1".to_string());
1028
1029        manager.sync_worker_state(
1030            "worker1".to_string(),
1031            "model1".to_string(),
1032            "http://localhost:8000".to_string(),
1033            true,
1034            0.5,
1035            vec![],
1036        );
1037
1038        assert!(manager.get_worker_state("worker1").is_some());
1039
1040        manager.remove_worker_state("worker1");
1041
1042        assert!(manager.get_worker_state("worker1").is_none());
1043        assert_eq!(manager.get_all_worker_states().len(), 0);
1044    }
1045
1046    #[test]
1047    fn test_remove_nonexistent_worker_state() {
1048        let manager = create_test_sync_manager();
1049
1050        // Should not panic
1051        manager.remove_worker_state("nonexistent");
1052        assert!(manager.get_worker_state("nonexistent").is_none());
1053    }
1054
1055    #[test]
1056    fn test_sync_policy_state() {
1057        let manager = create_test_manager("node1".to_string());
1058
1059        manager.sync_policy_state(
1060            "model1".to_string(),
1061            "cache_aware".to_string(),
1062            b"config_data".to_vec(),
1063        );
1064
1065        let state = manager.get_policy_state("model1").unwrap();
1066        assert_eq!(state.model_id, "model1");
1067        assert_eq!(state.policy_type, "cache_aware");
1068        assert_eq!(state.config, b"config_data");
1069        assert_eq!(state.version, 1);
1070    }
1071
1072    #[test]
1073    fn test_sync_multiple_policy_states() {
1074        let manager = create_test_sync_manager();
1075
1076        manager.sync_policy_state(
1077            "model1".to_string(),
1078            "round_robin".to_string(),
1079            b"config1".to_vec(),
1080        );
1081
1082        manager.sync_policy_state(
1083            "model2".to_string(),
1084            "random".to_string(),
1085            b"config2".to_vec(),
1086        );
1087
1088        manager.sync_policy_state(
1089            "model3".to_string(),
1090            "consistent_hash".to_string(),
1091            b"config3".to_vec(),
1092        );
1093
1094        let all_states = manager.get_all_policy_states();
1095        assert_eq!(all_states.len(), 3);
1096
1097        let policy1 = manager.get_policy_state("model1").unwrap();
1098        assert_eq!(policy1.model_id, "model1");
1099        assert_eq!(policy1.policy_type, "round_robin");
1100
1101        let policy2 = manager.get_policy_state("model2").unwrap();
1102        assert_eq!(policy2.model_id, "model2");
1103        assert_eq!(policy2.policy_type, "random");
1104    }
1105
1106    #[test]
1107    fn test_remove_policy_state() {
1108        let manager = create_test_sync_manager();
1109
1110        manager.sync_policy_state(
1111            "model1".to_string(),
1112            "round_robin".to_string(),
1113            b"config".to_vec(),
1114        );
1115
1116        assert!(manager.get_policy_state("model1").is_some());
1117
1118        manager.remove_policy_state("model1");
1119
1120        assert!(manager.get_policy_state("model1").is_none());
1121        assert_eq!(manager.get_all_policy_states().len(), 0);
1122    }
1123
1124    #[test]
1125    fn test_remove_nonexistent_policy_state() {
1126        let manager = create_test_sync_manager();
1127
1128        // Should not panic
1129        manager.remove_policy_state("nonexistent");
1130        assert!(manager.get_policy_state("nonexistent").is_none());
1131    }
1132
1133    #[test]
1134    fn test_apply_remote_worker_state() {
1135        let manager = create_test_manager("node1".to_string());
1136
1137        // Apply remote state with higher version
1138        let remote_state = WorkerState {
1139            worker_id: "worker1".to_string(),
1140            model_id: "model1".to_string(),
1141            url: "http://localhost:8000".to_string(),
1142            health: true,
1143            load: 0.5,
1144            version: 5,
1145            spec: vec![],
1146        };
1147
1148        manager.apply_remote_worker_state(remote_state.clone(), Some("node2".to_string()));
1149
1150        let state = manager.get_worker_state("worker1").unwrap();
1151        assert_eq!(state.version, 5);
1152    }
1153
1154    #[test]
1155    fn test_apply_remote_worker_state_basic() {
1156        let manager = create_test_sync_manager();
1157
1158        let remote_state = WorkerState {
1159            worker_id: "remote_worker1".to_string(),
1160            model_id: "model1".to_string(),
1161            url: "http://localhost:8000".to_string(),
1162            health: true,
1163            load: 0.6,
1164            version: 1,
1165            spec: vec![],
1166        };
1167
1168        manager.apply_remote_worker_state(remote_state.clone(), None);
1169
1170        let state = manager.get_worker_state("remote_worker1");
1171        assert!(state.is_some());
1172        let state = state.unwrap();
1173        assert_eq!(state.worker_id, "remote_worker1");
1174        assert_eq!(state.model_id, "model1");
1175        assert_eq!(state.url, "http://localhost:8000");
1176        assert!(state.health);
1177        assert_eq!(state.load, 0.6);
1178    }
1179
1180    #[test]
1181    fn test_apply_remote_worker_state_version_check() {
1182        let manager = create_test_manager("node1".to_string());
1183
1184        // First insert local state
1185        manager.sync_worker_state(
1186            "worker1".to_string(),
1187            "model1".to_string(),
1188            "http://localhost:8000".to_string(),
1189            true,
1190            0.5,
1191            vec![],
1192        );
1193
1194        // Try to apply older version - should be skipped
1195        let old_state = WorkerState {
1196            worker_id: "worker1".to_string(),
1197            model_id: "model1".to_string(),
1198            url: "http://localhost:8000".to_string(),
1199            health: false,
1200            load: 0.8,
1201            version: 0, // Older version
1202            spec: vec![],
1203        };
1204
1205        manager.apply_remote_worker_state(old_state, Some("node2".to_string()));
1206
1207        // Should still have version 1
1208        let state = manager.get_worker_state("worker1").unwrap();
1209        assert_eq!(state.version, 1);
1210        assert!(state.health); // Not updated
1211    }
1212
1213    #[test]
1214    fn test_apply_remote_policy_state() {
1215        let manager = create_test_sync_manager();
1216
1217        let remote_state = PolicyState {
1218            model_id: "model1".to_string(),
1219            policy_type: "remote_policy".to_string(),
1220            config: b"remote_config".to_vec(),
1221            version: 1,
1222        };
1223
1224        manager.apply_remote_policy_state(remote_state.clone(), None);
1225
1226        let state = manager.get_policy_state("model1");
1227        assert!(state.is_some());
1228        let state = state.unwrap();
1229        assert_eq!(state.model_id, "model1");
1230        assert_eq!(state.policy_type, "remote_policy");
1231        assert_eq!(state.config, b"remote_config");
1232    }
1233
1234    #[test]
1235    fn test_mixed_local_and_remote_states() {
1236        let manager = create_test_sync_manager();
1237
1238        // Add local worker
1239        manager.sync_worker_state(
1240            "local_worker".to_string(),
1241            "model1".to_string(),
1242            "http://localhost:8000".to_string(),
1243            true,
1244            0.5,
1245            vec![],
1246        );
1247
1248        // Add remote worker
1249        let remote_state = WorkerState {
1250            worker_id: "remote_worker".to_string(),
1251            model_id: "model1".to_string(),
1252            url: "http://localhost:8001".to_string(),
1253            health: true,
1254            load: 0.7,
1255            version: 1,
1256            spec: vec![],
1257        };
1258        manager.apply_remote_worker_state(remote_state, None);
1259
1260        let all_states = manager.get_all_worker_states();
1261        assert_eq!(all_states.len(), 2);
1262
1263        assert!(manager.get_worker_state("local_worker").is_some());
1264        assert!(manager.get_worker_state("remote_worker").is_some());
1265    }
1266
1267    #[test]
1268    fn test_update_worker_state() {
1269        let manager = create_test_sync_manager();
1270
1271        // Initial state
1272        manager.sync_worker_state(
1273            "worker1".to_string(),
1274            "model1".to_string(),
1275            "http://localhost:8000".to_string(),
1276            true,
1277            0.5,
1278            vec![],
1279        );
1280
1281        // Update state
1282        manager.sync_worker_state(
1283            "worker1".to_string(),
1284            "model1".to_string(),
1285            "http://localhost:8000".to_string(),
1286            false,
1287            0.9,
1288            vec![],
1289        );
1290
1291        let state = manager.get_worker_state("worker1").unwrap();
1292        assert!(!state.health);
1293        assert_eq!(state.load, 0.9);
1294        assert_eq!(manager.get_all_worker_states().len(), 1);
1295    }
1296
1297    #[test]
1298    fn test_update_policy_state() {
1299        let manager = create_test_sync_manager();
1300
1301        // Initial state
1302        manager.sync_policy_state(
1303            "model1".to_string(),
1304            "round_robin".to_string(),
1305            b"config1".to_vec(),
1306        );
1307
1308        // Update state
1309        manager.sync_policy_state(
1310            "model1".to_string(),
1311            "random".to_string(),
1312            b"config2".to_vec(),
1313        );
1314
1315        let state = manager.get_policy_state("model1").unwrap();
1316        assert_eq!(state.policy_type, "random");
1317        assert_eq!(state.config, b"config2");
1318        assert_eq!(manager.get_all_policy_states().len(), 1);
1319    }
1320
1321    #[test]
1322    fn test_get_all_worker_states_empty() {
1323        let manager = create_test_sync_manager();
1324        let states = manager.get_all_worker_states();
1325        assert!(states.is_empty());
1326    }
1327
1328    #[test]
1329    fn test_get_all_policy_states_empty() {
1330        let manager = create_test_sync_manager();
1331        let states = manager.get_all_policy_states();
1332        assert!(states.is_empty());
1333    }
1334
1335    #[test]
1336    fn test_update_rate_limit_membership() {
1337        let manager = create_test_manager("node1".to_string());
1338
1339        // Add membership nodes
1340        let _ = manager.stores.membership.insert(
1341            "node1".to_string(),
1342            MembershipState {
1343                name: "node1".to_string(),
1344                address: "127.0.0.1:8000".to_string(),
1345                status: NodeStatus::Alive as i32,
1346                version: 1,
1347                metadata: BTreeMap::new(),
1348            },
1349        );
1350
1351        let _ = manager.stores.membership.insert(
1352            "node2".to_string(),
1353            MembershipState {
1354                name: "node2".to_string(),
1355                address: "127.0.0.1:8001".to_string(),
1356                status: NodeStatus::Alive as i32,
1357                version: 1,
1358                metadata: BTreeMap::new(),
1359            },
1360        );
1361
1362        manager.update_rate_limit_membership();
1363
1364        // Check that hash ring was updated
1365        let owners = manager.stores.rate_limit.get_owners("test_key");
1366        assert!(!owners.is_empty());
1367    }
1368
1369    #[test]
1370    fn test_handle_node_failure() {
1371        let manager = create_test_manager("node1".to_string());
1372
1373        // Setup membership
1374        let _ = manager.stores.membership.insert(
1375            "node1".to_string(),
1376            MembershipState {
1377                name: "node1".to_string(),
1378                address: "127.0.0.1:8000".to_string(),
1379                status: NodeStatus::Alive as i32,
1380                version: 1,
1381                metadata: BTreeMap::new(),
1382            },
1383        );
1384
1385        let _ = manager.stores.membership.insert(
1386            "node2".to_string(),
1387            MembershipState {
1388                name: "node2".to_string(),
1389                address: "127.0.0.1:8001".to_string(),
1390                status: NodeStatus::Alive as i32,
1391                version: 1,
1392                metadata: BTreeMap::new(),
1393            },
1394        );
1395
1396        manager.update_rate_limit_membership();
1397
1398        // Handle node failure
1399        manager.handle_node_failure(&["node2".to_string()]);
1400
1401        // Membership should be updated
1402        manager.update_rate_limit_membership();
1403    }
1404
1405    #[test]
1406    fn test_sync_rate_limit_inc() {
1407        let manager = create_test_manager("node1".to_string());
1408
1409        // Setup membership to make node1 an owner
1410        manager
1411            .stores
1412            .rate_limit
1413            .update_membership(&["node1".to_string()]);
1414
1415        let test_key = "test_key".to_string();
1416        if manager.stores.rate_limit.is_owner(&test_key) {
1417            manager.sync_rate_limit_inc(test_key.clone(), 5);
1418
1419            let value = manager.get_rate_limit_value(&test_key);
1420            assert_eq!(value, Some(5));
1421        }
1422    }
1423
1424    #[test]
1425    fn test_sync_rate_limit_inc_non_owner() {
1426        let manager = create_test_manager("node1".to_string());
1427
1428        // Setup membership without node1
1429        manager
1430            .stores
1431            .rate_limit
1432            .update_membership(&["node2".to_string(), "node3".to_string()]);
1433
1434        let test_key = "test_key".to_string();
1435        if !manager.stores.rate_limit.is_owner(&test_key) {
1436            manager.sync_rate_limit_inc(test_key.clone(), 5);
1437
1438            // Should not increment if not owner
1439            let value = manager.get_rate_limit_value(&test_key);
1440            assert_eq!(value, None);
1441        }
1442    }
1443
1444    #[test]
1445    fn test_get_global_rate_limit_config() {
1446        let manager = create_test_manager("node1".to_string());
1447
1448        // Initially should be None
1449        assert!(manager.get_global_rate_limit_config().is_none());
1450
1451        // Set config
1452        let config = RateLimitConfig {
1453            limit_per_second: 100,
1454        };
1455        let serialized = bincode::serialize(&config).unwrap();
1456        let _ = manager.stores.app.insert(
1457            GLOBAL_RATE_LIMIT_KEY.to_string(),
1458            AppState {
1459                key: GLOBAL_RATE_LIMIT_KEY.to_string(),
1460                value: serialized,
1461                version: 1,
1462            },
1463        );
1464
1465        let retrieved = manager.get_global_rate_limit_config().unwrap();
1466        assert_eq!(retrieved.limit_per_second, 100);
1467    }
1468
1469    #[test]
1470    fn test_check_global_rate_limit() {
1471        let manager = create_test_manager("node1".to_string());
1472
1473        // Setup config
1474        let config = RateLimitConfig {
1475            limit_per_second: 10,
1476        };
1477        let serialized = bincode::serialize(&config).unwrap();
1478        let _ = manager.stores.app.insert(
1479            GLOBAL_RATE_LIMIT_KEY.to_string(),
1480            AppState {
1481                key: GLOBAL_RATE_LIMIT_KEY.to_string(),
1482                value: serialized,
1483                version: 1,
1484            },
1485        );
1486
1487        // Setup membership
1488        manager
1489            .stores
1490            .rate_limit
1491            .update_membership(&["node1".to_string()]);
1492
1493        // Check rate limit
1494        let (is_exceeded, _current_count, limit) = manager.check_global_rate_limit();
1495        assert!(!is_exceeded); // First check should not exceed
1496        assert_eq!(limit, 10);
1497
1498        // Increment multiple times
1499        for _ in 0..15 {
1500            manager.check_global_rate_limit();
1501        }
1502
1503        let (is_exceeded2, current_count2, _) = manager.check_global_rate_limit();
1504        // Should exceed after many increments
1505        assert!(is_exceeded2 || current_count2 > 10);
1506    }
1507
1508    #[test]
1509    fn test_reset_global_rate_limit_counter() {
1510        let manager = create_test_manager("node1".to_string());
1511
1512        // Setup membership
1513        manager
1514            .stores
1515            .rate_limit
1516            .update_membership(&["node1".to_string()]);
1517
1518        // Increment counter
1519        if manager
1520            .stores
1521            .rate_limit
1522            .is_owner(GLOBAL_RATE_LIMIT_COUNTER_KEY)
1523        {
1524            manager.sync_rate_limit_inc(GLOBAL_RATE_LIMIT_COUNTER_KEY.to_string(), 10);
1525            let value = manager.get_rate_limit_value(GLOBAL_RATE_LIMIT_COUNTER_KEY);
1526            assert!(value.is_some() && value.unwrap() > 0);
1527
1528            // Reset
1529            manager.reset_global_rate_limit_counter();
1530            let value_after = manager.get_rate_limit_value(GLOBAL_RATE_LIMIT_COUNTER_KEY);
1531            // Should be reset (0 or negative)
1532            assert!(value_after.is_none() || value_after.unwrap() <= 0);
1533        }
1534    }
1535
1536    #[test]
1537    fn test_sync_tree_operation() {
1538        let manager = create_test_manager("node1".to_string());
1539
1540        use crate::tree_ops::{TreeInsertOp, TreeKey, TreeOperation};
1541
1542        let op = TreeOperation::Insert(TreeInsertOp {
1543            key: TreeKey::Text("test_text".to_string()),
1544            tenant: "http://localhost:8000".to_string(),
1545        });
1546
1547        let result = manager.sync_tree_operation("model1".to_string(), op);
1548        assert!(result.is_ok());
1549
1550        // sync_tree_operation no longer populates tree_configs (no subscribers
1551        // in unit tests), so get_tree_state returns None.  Instead, verify
1552        // that the tenant delta was buffered.
1553        let inserts = manager.stores.tenant_delta_inserts.get("model1").unwrap();
1554        assert_eq!(inserts.len(), 1);
1555        assert_eq!(inserts[0].worker_url, "http://localhost:8000");
1556        assert_eq!(inserts[0].node_path_hash, hash_node_path("test_text"));
1557    }
1558
1559    #[test]
1560    fn test_get_tree_state() {
1561        let manager = create_test_manager("node1".to_string());
1562
1563        // Initially should be None
1564        assert!(manager.get_tree_state("model1").is_none());
1565
1566        // sync_tree_operation only buffers tenant deltas and bumps the version
1567        // counter — it does NOT populate tree_configs (that requires a
1568        // subscriber-backed checkpoint).  Verify get_tree_state returns None
1569        // after sync, but the tenant delta was buffered.
1570        use crate::tree_ops::{TreeInsertOp, TreeKey, TreeOperation};
1571        let op = TreeOperation::Insert(TreeInsertOp {
1572            key: TreeKey::Text("test_text".to_string()),
1573            tenant: "http://localhost:8000".to_string(),
1574        });
1575        manager
1576            .sync_tree_operation("model1".to_string(), op)
1577            .unwrap();
1578
1579        // get_tree_state reads from tree_configs which is empty (no subscriber)
1580        assert!(manager.get_tree_state("model1").is_none());
1581        // But the tenant delta insert was buffered
1582        assert!(manager.stores.tenant_delta_inserts.get("model1").is_some());
1583    }
1584
1585    #[test]
1586    fn test_apply_remote_tree_operation() {
1587        let manager = create_test_manager("node1".to_string());
1588
1589        use crate::tree_ops::{TreeInsertOp, TreeKey, TreeOperation, TreeState};
1590
1591        let mut tree_state = TreeState::new("model1".to_string());
1592        tree_state.version = 5;
1593        tree_state.add_operation(TreeOperation::Insert(TreeInsertOp {
1594            key: TreeKey::Text("remote_text".to_string()),
1595            tenant: "http://localhost:8001".to_string(),
1596        }));
1597        // add_operation increments version, so version is now 6
1598
1599        manager.apply_remote_tree_operation(
1600            "model1".to_string(),
1601            tree_state,
1602            Some("node2".to_string()),
1603        );
1604
1605        let retrieved = manager.get_tree_state("model1").unwrap();
1606        assert_eq!(retrieved.version, 6); // add_operation increments version from 5 to 6
1607        assert_eq!(retrieved.operations.len(), 1);
1608    }
1609
1610    #[test]
1611    fn test_notify_tree_state_subscribers_drops_lock_before_callback() {
1612        let manager = Arc::new(create_test_manager("node1".to_string()));
1613        let can_acquire_write_lock = Arc::new(AtomicBool::new(false));
1614        let subscriber = Arc::new(LockCheckingSubscriber {
1615            manager: manager.clone(),
1616            can_acquire_write_lock: can_acquire_write_lock.clone(),
1617        });
1618
1619        manager.register_tree_state_subscriber(subscriber);
1620        manager.notify_tree_state_subscribers("model1", &TreeState::new("model1".to_string()));
1621
1622        assert!(can_acquire_write_lock.load(Ordering::SeqCst));
1623    }
1624
1625    #[test]
1626    fn test_get_all_tree_states() {
1627        let manager = create_test_manager("node1".to_string());
1628
1629        // get_all_tree_states reads from tree_configs. In unit tests there are
1630        // no subscribers, so sync_tree_operation won't populate tree_configs.
1631        // Instead, insert TreeStates directly into tree_configs.
1632        let mut ts1 = TreeState::new("model1".to_string());
1633        ts1.add_operation(make_insert_op("alpha", "http://localhost:8000"));
1634        let mut ts2 = TreeState::new("model2".to_string());
1635        ts2.add_operation(make_insert_op("beta", "http://localhost:8001"));
1636
1637        manager
1638            .stores
1639            .tree_configs
1640            .insert("tree:model1".to_string(), ts1.to_bytes().unwrap());
1641        manager
1642            .stores
1643            .tree_configs
1644            .insert("tree:model2".to_string(), ts2.to_bytes().unwrap());
1645
1646        let mut states = manager.get_all_tree_states();
1647        states.sort_by(|left, right| left.model_id.cmp(&right.model_id));
1648
1649        assert_eq!(states.len(), 2);
1650        assert_eq!(states[0].model_id, "model1");
1651        assert_eq!(states[1].model_id, "model2");
1652    }
1653
1654    #[test]
1655    fn test_get_all_worker_states() {
1656        let manager = create_test_manager("node1".to_string());
1657
1658        manager.sync_worker_state(
1659            "worker1".to_string(),
1660            "model1".to_string(),
1661            "http://localhost:8000".to_string(),
1662            true,
1663            0.5,
1664            vec![],
1665        );
1666
1667        manager.sync_worker_state(
1668            "worker2".to_string(),
1669            "model2".to_string(),
1670            "http://localhost:8001".to_string(),
1671            false,
1672            0.8,
1673            vec![],
1674        );
1675
1676        let all_states = manager.get_all_worker_states();
1677        assert_eq!(all_states.len(), 2);
1678    }
1679
1680    #[test]
1681    fn test_get_all_policy_states() {
1682        let manager = create_test_manager("node1".to_string());
1683
1684        manager.sync_policy_state("model1".to_string(), "cache_aware".to_string(), vec![]);
1685
1686        manager.sync_policy_state("model2".to_string(), "round_robin".to_string(), vec![]);
1687
1688        let all_states = manager.get_all_policy_states();
1689        assert_eq!(all_states.len(), 2);
1690    }
1691
1692    // ── Delta encoding tests ────────────────────────────────────────────
1693
1694    use crate::tree_ops::{TreeInsertOp, TreeKey, TreeOperation, TreeRemoveOp, TreeStateDelta};
1695
1696    fn make_insert_op(text: &str, tenant: &str) -> TreeOperation {
1697        TreeOperation::Insert(TreeInsertOp {
1698            key: TreeKey::Text(text.to_string()),
1699            tenant: tenant.to_string(),
1700        })
1701    }
1702
1703    fn make_delta(model_id: &str, ops: Vec<TreeOperation>, base: u64, new: u64) -> TreeStateDelta {
1704        TreeStateDelta {
1705            model_id: model_id.to_string(),
1706            operations: ops,
1707            base_version: base,
1708            new_version: new,
1709        }
1710    }
1711
1712    #[test]
1713    fn test_delta_basic_apply() {
1714        let manager = create_test_manager("node1".to_string());
1715
1716        let ops = vec![
1717            make_insert_op("a", "http://w1:8000"),
1718            make_insert_op("b", "http://w2:8000"),
1719            make_insert_op("c", "http://w3:8000"),
1720        ];
1721
1722        let delta = make_delta("model1", ops, 0, 3);
1723        manager.apply_remote_tree_delta(delta, Some("node2".to_string()));
1724
1725        let tree = manager.get_tree_state("model1").unwrap();
1726        assert_eq!(tree.version, 3);
1727        assert_eq!(tree.operations.len(), 3);
1728    }
1729
1730    #[test]
1731    fn test_delta_version_check_rejects_gap() {
1732        let manager = create_test_manager("node1".to_string());
1733
1734        // Seed tree at version 10
1735        let mut seed = TreeState::new("model1".to_string());
1736        for i in 0..10 {
1737            seed.add_operation(make_insert_op(&format!("seed_{i}"), "http://w:8000"));
1738        }
1739        assert_eq!(seed.version, 10);
1740        manager.apply_remote_tree_operation("model1".to_string(), seed, Some("seed".to_string()));
1741
1742        // Delta with base_version=5 should be accepted (base <= current)
1743        let delta_ok = make_delta("model1", vec![make_insert_op("ok", "http://w:8000")], 5, 11);
1744        manager.apply_remote_tree_delta(delta_ok, None);
1745        let tree = manager.get_tree_state("model1").unwrap();
1746        assert_eq!(tree.version, 11);
1747
1748        // Delta with base_version=20 should be rejected (gap: base > current)
1749        let delta_gap = make_delta(
1750            "model1",
1751            vec![make_insert_op("gap", "http://w:8000")],
1752            20,
1753            21,
1754        );
1755        manager.apply_remote_tree_delta(delta_gap, None);
1756        let tree = manager.get_tree_state("model1").unwrap();
1757        // Version should still be 11 — the gap delta was rejected
1758        assert_eq!(tree.version, 11);
1759    }
1760
1761    #[test]
1762    fn test_delta_concurrent_apply() {
1763        let manager = Arc::new(create_test_manager("node1".to_string()));
1764
1765        // Both deltas target the same empty tree.  At least one must succeed,
1766        // and the resulting version must reflect the applied operations.
1767        let m1 = manager.clone();
1768        let m2 = manager.clone();
1769
1770        let t1 = std::thread::spawn(move || {
1771            let delta = make_delta("model1", vec![make_insert_op("t1", "http://w1:8000")], 0, 1);
1772            m1.apply_remote_tree_delta(delta, Some("thread1".to_string()));
1773        });
1774
1775        let t2 = std::thread::spawn(move || {
1776            let delta = make_delta("model1", vec![make_insert_op("t2", "http://w2:8000")], 0, 1);
1777            m2.apply_remote_tree_delta(delta, Some("thread2".to_string()));
1778        });
1779
1780        t1.join().unwrap();
1781        t2.join().unwrap();
1782
1783        // At least one delta should have been applied
1784        let tree = manager.get_tree_state("model1").unwrap();
1785        assert!(tree.version >= 1);
1786        assert!(!tree.operations.is_empty());
1787    }
1788
1789    #[test]
1790    fn test_delta_empty_tree() {
1791        let manager = create_test_manager("node1".to_string());
1792
1793        // No pre-existing tree for "new_model"
1794        assert!(manager.get_tree_state("new_model").is_none());
1795
1796        let delta = make_delta(
1797            "new_model",
1798            vec![make_insert_op("first", "http://w1:8000")],
1799            0,
1800            1,
1801        );
1802        manager.apply_remote_tree_delta(delta, None);
1803
1804        let tree = manager.get_tree_state("new_model").unwrap();
1805        assert_eq!(tree.model_id, "new_model");
1806        assert_eq!(tree.version, 1);
1807        assert_eq!(tree.operations.len(), 1);
1808    }
1809
1810    #[test]
1811    fn test_delta_notifies_subscribers() {
1812        let manager = Arc::new(create_test_manager("node1".to_string()));
1813        let notified = Arc::new(AtomicBool::new(false));
1814
1815        #[derive(Debug)]
1816        struct FlagSubscriber(Arc<AtomicBool>);
1817        impl TreeStateSubscriber for FlagSubscriber {
1818            fn apply_remote_tree_state(&self, _model_id: &str, _tree_state: &TreeState) {
1819                self.0.store(true, Ordering::SeqCst);
1820            }
1821        }
1822
1823        manager.register_tree_state_subscriber(Arc::new(FlagSubscriber(notified.clone())));
1824
1825        let delta = make_delta("model1", vec![make_insert_op("x", "http://w:8000")], 0, 1);
1826        manager.apply_remote_tree_delta(delta, None);
1827
1828        assert!(
1829            notified.load(Ordering::SeqCst),
1830            "subscriber was not notified after delta apply"
1831        );
1832    }
1833
1834    #[test]
1835    fn test_collector_sends_tenant_delta() {
1836        use crate::{
1837            incremental::IncrementalUpdateCollector, stores::StoreType, tree_ops::TenantDelta,
1838        };
1839
1840        let stores = Arc::new(StateStores::with_self_name("node1".to_string()));
1841        let manager = MeshSyncManager::new(stores.clone(), "node1".to_string());
1842
1843        // Sync a tree operation — buffers a tenant insert
1844        manager
1845            .sync_tree_operation(
1846                "model1".to_string(),
1847                make_insert_op("hello world", "http://w:8000"),
1848            )
1849            .unwrap();
1850
1851        let collector = IncrementalUpdateCollector::new(stores.clone(), "node1".to_string());
1852        let updates = collector.collect_updates_for_store(StoreType::Policy);
1853
1854        assert!(!updates.is_empty(), "expected at least one policy update");
1855
1856        // The update should be a tenant delta (not full tree state)
1857        let tree_update = updates
1858            .iter()
1859            .find(|u| u.key.starts_with("tree:"))
1860            .expect("expected a tree key update");
1861
1862        let policy_state: PolicyState =
1863            bincode::deserialize(&tree_update.value).expect("deserialize PolicyState");
1864        assert_eq!(
1865            policy_state.policy_type, "tenant_delta",
1866            "expected tenant_delta, got {}",
1867            policy_state.policy_type
1868        );
1869
1870        // Verify the tenant delta deserializes and contains the insert
1871        let delta = TenantDelta::from_bytes(&policy_state.config).expect("deserialize TenantDelta");
1872        assert_eq!(delta.model_id, "model1");
1873        assert_eq!(delta.inserts.len(), 1);
1874        assert_eq!(delta.inserts[0].worker_url, "http://w:8000");
1875        assert_eq!(
1876            delta.inserts[0].node_path_hash,
1877            hash_node_path("hello world")
1878        );
1879        assert!(delta.evictions.is_empty());
1880    }
1881
1882    #[test]
1883    fn test_collector_falls_back_to_full_state() {
1884        use crate::{incremental::IncrementalUpdateCollector, stores::StoreType};
1885
1886        let stores = Arc::new(StateStores::with_self_name("node1".to_string()));
1887
1888        // Directly insert a tree state into tree_configs WITHOUT going through
1889        // sync_tree_operation (so tree_ops_pending is empty).  This simulates
1890        // a remote tree state received via apply_remote_tree_operation.
1891        let mut tree = TreeState::new("model1".to_string());
1892        tree.add_operation(make_insert_op("direct", "http://w:8000"));
1893        let serialized = tree.to_bytes().unwrap();
1894        stores
1895            .tree_configs
1896            .insert("tree:model1".to_string(), serialized);
1897        // Advance tree version so the collector sees it as changed.
1898        stores.advance_tree_version("tree:model1", tree.version);
1899        // Bump tree_generation so the collector's tree_changed check fires.
1900        stores.bump_tree_version("tree:model1");
1901
1902        let collector = IncrementalUpdateCollector::new(stores.clone(), "node1".to_string());
1903        let updates = collector.collect_updates_for_store(StoreType::Policy);
1904
1905        assert!(!updates.is_empty(), "expected at least one policy update");
1906
1907        let tree_update = updates
1908            .iter()
1909            .find(|u| u.key.starts_with("tree:"))
1910            .expect("expected a tree key update");
1911
1912        // Since there are no pending ops, it should fall back to full PolicyState
1913        let policy_state: PolicyState =
1914            bincode::deserialize(&tree_update.value).expect("deserialize PolicyState");
1915        assert_eq!(
1916            policy_state.policy_type, "tree_state_lz4",
1917            "expected full state fallback, got delta"
1918        );
1919    }
1920
1921    // test_collector_buffer_survives_mark_sent removed: tested tree_ops_pending
1922    // buffer survival across mark_sent calls, which is a dead code path now
1923    // that sync_tree_operation no longer pushes to tree_ops_pending.
1924
1925    #[test]
1926    fn test_receiver_dispatches_delta_vs_full() {
1927        let manager = create_test_manager("node1".to_string());
1928
1929        // 1. Apply via delta path
1930        let delta = make_delta(
1931            "model_d",
1932            vec![make_insert_op("delta_op", "http://w:8000")],
1933            0,
1934            1,
1935        );
1936        manager.apply_remote_tree_delta(delta, Some("remote".to_string()));
1937
1938        let tree_d = manager.get_tree_state("model_d").unwrap();
1939        assert_eq!(tree_d.version, 1);
1940        assert_eq!(tree_d.operations.len(), 1);
1941
1942        // 2. Apply via full state path
1943        let mut full_tree = TreeState::new("model_f".to_string());
1944        full_tree.add_operation(make_insert_op("full_op1", "http://w1:8000"));
1945        full_tree.add_operation(make_insert_op("full_op2", "http://w2:8000"));
1946
1947        manager.apply_remote_tree_operation(
1948            "model_f".to_string(),
1949            full_tree,
1950            Some("remote".to_string()),
1951        );
1952
1953        let tree_f = manager.get_tree_state("model_f").unwrap();
1954        assert_eq!(tree_f.version, 2);
1955        assert_eq!(tree_f.operations.len(), 2);
1956    }
1957
1958    #[test]
1959    fn test_delta_backward_compatible_full_state() {
1960        let manager = create_test_manager("node1".to_string());
1961
1962        // Simulate receiving a full TreeState (the old, pre-delta format)
1963        let mut old_format_tree = TreeState::new("legacy_model".to_string());
1964        old_format_tree.add_operation(make_insert_op("old1", "http://w:8000"));
1965        old_format_tree.add_operation(make_insert_op("old2", "http://w:8000"));
1966
1967        // The full-state path (apply_remote_tree_operation) should handle it
1968        manager.apply_remote_tree_operation(
1969            "legacy_model".to_string(),
1970            old_format_tree.clone(),
1971            Some("old_node".to_string()),
1972        );
1973
1974        let tree = manager.get_tree_state("legacy_model").unwrap();
1975        assert_eq!(tree.version, old_format_tree.version);
1976        assert_eq!(tree.operations.len(), 2);
1977        assert_eq!(tree.model_id, "legacy_model");
1978    }
1979
1980    // ── Edge-case delta encoding tests ─────────────────────────────────
1981
1982    #[test]
1983    fn test_delta_reconnect_falls_back_to_full_state() {
1984        // Simulate a reconnected peer scenario: tree_configs has a materialized
1985        // tree state but tenant delta buffers are empty.  The collector should
1986        // produce a full PolicyState (lz4-compressed), not a delta.
1987        use crate::{incremental::IncrementalUpdateCollector, stores::StoreType};
1988
1989        let stores = Arc::new(StateStores::with_self_name("node1".to_string()));
1990
1991        // Directly insert a tree state into tree_configs (simulating a
1992        // checkpoint that ran with real subscribers in production).
1993        let mut tree = TreeState::new("model1".to_string());
1994        for i in 0..10 {
1995            tree.add_operation(make_insert_op(&format!("op_{i}"), "http://w:8000"));
1996        }
1997        let serialized = tree.to_bytes().unwrap();
1998        stores
1999            .tree_configs
2000            .insert("tree:model1".to_string(), serialized);
2001        stores.advance_tree_version("tree:model1", tree.version);
2002        stores.bump_tree_version("tree:model1");
2003
2004        // Ensure tenant delta buffers are empty (simulating buffer drain)
2005        stores.tenant_delta_inserts.remove("model1");
2006        stores.tenant_delta_evictions.remove("model1");
2007
2008        // New collector (simulating reconnected peer)
2009        let collector = IncrementalUpdateCollector::new(stores.clone(), "node1".to_string());
2010        let updates = collector.collect_updates_for_store(StoreType::Policy);
2011
2012        assert!(!updates.is_empty(), "expected at least one update");
2013
2014        let tree_update = updates
2015            .iter()
2016            .find(|u| u.key.starts_with("tree:"))
2017            .expect("expected a tree key update");
2018
2019        let policy_state: PolicyState =
2020            bincode::deserialize(&tree_update.value).expect("deserialize PolicyState");
2021        assert_eq!(
2022            policy_state.policy_type, "tree_state_lz4",
2023            "expected full state fallback when tenant delta buffers are empty, got: {}",
2024            policy_state.policy_type
2025        );
2026    }
2027
2028    // test_delta_compaction_divergence removed: tested TreeState compaction
2029    // via sync_tree_operation + get_tree_state, which relied on tree_ops_pending
2030    // replay. sync_tree_operation no longer pushes to tree_ops_pending, and
2031    // get_tree_state reads only from tree_configs (populated by subscribers).
2032
2033    #[test]
2034    fn test_delta_out_of_order_delivery() {
2035        // Create tree at version 0.  Apply delta [0→5], then apply stale
2036        // delta [0→3].  The second delta should be rejected because the
2037        // tree is already at version 5.
2038        let manager = create_test_manager("node1".to_string());
2039
2040        let ops_1_to_5: Vec<_> = (1..=5)
2041            .map(|i| make_insert_op(&format!("op_{i}"), "http://w:8000"))
2042            .collect();
2043        let delta1 = make_delta("model1", ops_1_to_5, 0, 5);
2044        manager.apply_remote_tree_delta(delta1, Some("peer_a".to_string()));
2045
2046        let tree = manager.get_tree_state("model1").unwrap();
2047        assert_eq!(tree.version, 5);
2048        assert_eq!(tree.operations.len(), 5);
2049
2050        // Late-arriving delta with lower new_version
2051        let ops_1_to_3: Vec<_> = (1..=3)
2052            .map(|i| make_insert_op(&format!("late_op_{i}"), "http://w:8000"))
2053            .collect();
2054        let delta2 = make_delta("model1", ops_1_to_3, 0, 3);
2055        manager.apply_remote_tree_delta(delta2, Some("peer_b".to_string()));
2056
2057        // Tree should be unchanged — stale delta rejected
2058        let tree_after = manager.get_tree_state("model1").unwrap();
2059        assert_eq!(tree_after.version, 5);
2060        assert_eq!(tree_after.operations.len(), 5);
2061    }
2062
2063    #[test]
2064    fn test_delta_duplicate_delivery() {
2065        // Apply the same delta twice.  The second application must be a
2066        // no-op because current version >= delta.new_version.
2067        let manager = create_test_manager("node1".to_string());
2068
2069        let ops = vec![
2070            make_insert_op("dup1", "http://w:8000"),
2071            make_insert_op("dup2", "http://w:8000"),
2072        ];
2073        let delta = make_delta("model1", ops.clone(), 0, 2);
2074
2075        manager.apply_remote_tree_delta(delta.clone(), Some("peer".to_string()));
2076        let tree1 = manager.get_tree_state("model1").unwrap();
2077        assert_eq!(tree1.version, 2);
2078        assert_eq!(tree1.operations.len(), 2);
2079
2080        // Second apply — duplicate
2081        manager.apply_remote_tree_delta(delta, Some("peer".to_string()));
2082        let tree2 = manager.get_tree_state("model1").unwrap();
2083        assert_eq!(
2084            tree2.version, 2,
2085            "duplicate delta should not change version"
2086        );
2087        assert_eq!(
2088            tree2.operations.len(),
2089            2,
2090            "duplicate delta should not add extra ops"
2091        );
2092    }
2093
2094    #[test]
2095    fn test_delta_split_brain_recovery() {
2096        // Node A and Node B both start at version 5.
2097        // A processes 3 ops (version 8).  B has the seed at version 5
2098        // in tree_configs (local ops via sync_tree_operation only bump
2099        // the atomic counter, not tree_configs).
2100        // A sends delta(base=5, new=8) to B.
2101        // B's tree_configs version is 5.
2102        //   base(5) <= current(5) ✓
2103        //   current(5) < new(8) ✓
2104        // So B accepts and applies the 3 ops.
2105        let manager = create_test_manager("nodeB".to_string());
2106
2107        // Seed the tree at version 5 (common ancestor) — writes to tree_configs
2108        let mut seed = TreeState::new("model1".to_string());
2109        for i in 0..5 {
2110            seed.add_operation(make_insert_op(&format!("seed_{i}"), "http://w:8000"));
2111        }
2112        assert_eq!(seed.version, 5);
2113        manager.apply_remote_tree_operation("model1".to_string(), seed, Some("origin".to_string()));
2114
2115        // Verify tree_configs has version 5
2116        let tree_b = manager.get_tree_state("model1").unwrap();
2117        assert_eq!(tree_b.version, 5);
2118
2119        // A's delta: base=5, new=8, 3 ops
2120        let a_ops: Vec<_> = (0..3)
2121            .map(|i| make_insert_op(&format!("A_op_{i}"), "http://wA:8000"))
2122            .collect();
2123        let delta_a = make_delta("model1", a_ops, 5, 8);
2124        manager.apply_remote_tree_delta(delta_a, Some("nodeA".to_string()));
2125
2126        // After apply, tree should have seed ops + A's ops.
2127        let tree_merged = manager.get_tree_state("model1").unwrap();
2128        assert_eq!(
2129            tree_merged.version, 8,
2130            "tree_configs version should be 8 (seed 5 + 3 delta ops), got {}",
2131            tree_merged.version
2132        );
2133        assert_eq!(tree_merged.operations.len(), 8);
2134    }
2135
2136    // test_delta_buffer_trim_multi_peer removed: tested tree_ops_pending trim
2137    // behavior across multiple peer collectors. sync_tree_operation no longer
2138    // pushes to tree_ops_pending, making this a dead code path.
2139
2140    // test_delta_empty_pending_vec removed: tested empty tree_ops_pending
2141    // fallback to full state. sync_tree_operation no longer pushes to
2142    // tree_ops_pending, making this a dead code path.
2143
2144    #[test]
2145    fn test_delta_concurrent_write_and_collect() {
2146        // Spawn a thread that adds 100 ops via sync_tree_operation.
2147        // Simultaneously run the collector.  The collector should get a
2148        // consistent snapshot — either some ops or all ops, but never
2149        // corrupted data.
2150        use crate::{incremental::IncrementalUpdateCollector, stores::StoreType};
2151
2152        let stores = Arc::new(StateStores::with_self_name("node1".to_string()));
2153        let manager = Arc::new(MeshSyncManager::new(stores.clone(), "node1".to_string()));
2154
2155        let manager_clone = manager.clone();
2156        let writer = std::thread::spawn(move || {
2157            for i in 0..100 {
2158                manager_clone
2159                    .sync_tree_operation(
2160                        "model1".to_string(),
2161                        make_insert_op(&format!("concurrent_op_{i}"), "http://w:8000"),
2162                    )
2163                    .unwrap();
2164            }
2165        });
2166
2167        // Collect multiple times while writer is active
2168        let mut collected_any = false;
2169        for _ in 0..10 {
2170            let collector = IncrementalUpdateCollector::new(stores.clone(), "node1".to_string());
2171            let updates = collector.collect_updates_for_store(StoreType::Policy);
2172            for update in &updates {
2173                if update.key.starts_with("tree:") {
2174                    // Verify the data deserializes without corruption
2175                    let policy_state: PolicyState =
2176                        bincode::deserialize(&update.value).expect("data should not be corrupted");
2177                    assert!(
2178                        policy_state.policy_type == "tenant_delta"
2179                            || policy_state.policy_type == "tree_state_delta"
2180                            || policy_state.policy_type == "tree_state_lz4"
2181                            || policy_state.policy_type == "tree_state"
2182                    );
2183
2184                    if policy_state.policy_type == "tree_state_delta" {
2185                        let delta = TreeStateDelta::from_bytes(&policy_state.config)
2186                            .expect("delta should deserialize cleanly");
2187                        assert!(!delta.operations.is_empty());
2188                    } else {
2189                        let tree = TreeState::from_bytes(&policy_state.config)
2190                            .expect("tree state should deserialize cleanly");
2191                        assert!(!tree.operations.is_empty());
2192                    }
2193                    collected_any = true;
2194                }
2195            }
2196        }
2197
2198        writer.join().unwrap();
2199
2200        // After writer finishes, one final collect should succeed
2201        let collector = IncrementalUpdateCollector::new(stores.clone(), "node1".to_string());
2202        let final_updates = collector.collect_updates_for_store(StoreType::Policy);
2203        if !collected_any {
2204            // Writer may have been too fast; at least final collection must succeed
2205            assert!(
2206                !final_updates.is_empty(),
2207                "final collection after writer finished should have updates"
2208            );
2209        }
2210    }
2211
2212    // test_delta_oversized_mark_sent_trims_buffer removed: tested
2213    // tree_ops_pending trim threshold during mark_sent. sync_tree_operation
2214    // no longer pushes to tree_ops_pending, making this a dead code path.
2215
2216    // test_delta_version_monotonic_after_compaction removed: tested version
2217    // monotonicity across compaction by calling sync_tree_operation 3000 times
2218    // and reading back via get_tree_state. Both paths relied on tree_ops_pending
2219    // replay, which is a dead code path now.
2220
2221    #[test]
2222    fn test_delta_with_remove_operations() {
2223        // Verify that deltas containing Remove operations work correctly
2224        let manager = create_test_manager("node1".to_string());
2225
2226        let ops = vec![
2227            make_insert_op("text1", "http://w1:8000"),
2228            TreeOperation::Remove(TreeRemoveOp {
2229                tenant: "http://w1:8000".to_string(),
2230            }),
2231            make_insert_op("text2", "http://w2:8000"),
2232        ];
2233
2234        let delta = make_delta("model1", ops, 0, 3);
2235        manager.apply_remote_tree_delta(delta, Some("peer".to_string()));
2236
2237        let tree = manager.get_tree_state("model1").unwrap();
2238        assert_eq!(tree.version, 3);
2239        assert_eq!(tree.operations.len(), 3);
2240        // Verify the remove op is present
2241        assert!(matches!(
2242            tree.operations[1],
2243            TreeOperation::Remove(TreeRemoveOp { .. })
2244        ));
2245    }
2246
2247    #[test]
2248    fn test_delta_multiple_models_independent() {
2249        // Verify that deltas for different models don't interfere with
2250        // each other
2251        let manager = create_test_manager("node1".to_string());
2252
2253        let delta_a = make_delta(
2254            "model_a",
2255            vec![make_insert_op("a_op", "http://w:8000")],
2256            0,
2257            1,
2258        );
2259        let delta_b = make_delta(
2260            "model_b",
2261            vec![
2262                make_insert_op("b_op1", "http://w:8000"),
2263                make_insert_op("b_op2", "http://w:8000"),
2264            ],
2265            0,
2266            2,
2267        );
2268
2269        manager.apply_remote_tree_delta(delta_a, None);
2270        manager.apply_remote_tree_delta(delta_b, None);
2271
2272        let tree_a = manager.get_tree_state("model_a").unwrap();
2273        let tree_b = manager.get_tree_state("model_b").unwrap();
2274
2275        assert_eq!(tree_a.version, 1);
2276        assert_eq!(tree_a.operations.len(), 1);
2277        assert_eq!(tree_b.version, 2);
2278        assert_eq!(tree_b.operations.len(), 2);
2279    }
2280
2281    #[test]
2282    fn test_delta_incremental_chain() {
2283        // Apply a chain of sequential deltas: 0→3, 3→5, 5→8
2284        // Each should be accepted and the tree should accumulate all ops.
2285        let manager = create_test_manager("node1".to_string());
2286
2287        let delta1 = make_delta(
2288            "model1",
2289            (0..3)
2290                .map(|i| make_insert_op(&format!("batch1_op_{i}"), "http://w:8000"))
2291                .collect(),
2292            0,
2293            3,
2294        );
2295        manager.apply_remote_tree_delta(delta1, None);
2296        let tree = manager.get_tree_state("model1").unwrap();
2297        assert_eq!(tree.version, 3);
2298
2299        let delta2 = make_delta(
2300            "model1",
2301            (0..2)
2302                .map(|i| make_insert_op(&format!("batch2_op_{i}"), "http://w:8000"))
2303                .collect(),
2304            3,
2305            5,
2306        );
2307        manager.apply_remote_tree_delta(delta2, None);
2308        let tree = manager.get_tree_state("model1").unwrap();
2309        assert_eq!(tree.version, 5);
2310
2311        let delta3 = make_delta(
2312            "model1",
2313            (0..3)
2314                .map(|i| make_insert_op(&format!("batch3_op_{i}"), "http://w:8000"))
2315                .collect(),
2316            5,
2317            8,
2318        );
2319        manager.apply_remote_tree_delta(delta3, None);
2320        let tree = manager.get_tree_state("model1").unwrap();
2321        assert_eq!(tree.version, 8);
2322        assert_eq!(tree.operations.len(), 8);
2323    }
2324
2325    #[test]
2326    fn test_delta_token_key_serialization_round_trip() {
2327        // Verify that deltas with TreeKey::Tokens survive serialization
2328        // through the full delta encode/decode path.
2329        use crate::tree_ops::TreeInsertOp;
2330
2331        let tokens = vec![42u32, 100, 200, 999, u32::MAX];
2332        let ops = vec![TreeOperation::Insert(TreeInsertOp {
2333            key: TreeKey::Tokens(tokens.clone()),
2334            tenant: "http://w:8000".to_string(),
2335        })];
2336
2337        let delta = TreeStateDelta {
2338            model_id: "token_model".to_string(),
2339            operations: ops,
2340            base_version: 0,
2341            new_version: 1,
2342        };
2343
2344        // Serialize and deserialize
2345        let bytes = delta.to_bytes().unwrap();
2346        let restored = TreeStateDelta::from_bytes(&bytes).unwrap();
2347        assert_eq!(restored.operations.len(), 1);
2348
2349        match &restored.operations[0] {
2350            TreeOperation::Insert(op) => {
2351                assert_eq!(op.key, TreeKey::Tokens(tokens));
2352            }
2353            TreeOperation::Remove(_) => panic!("expected Insert operation"),
2354        }
2355
2356        // Apply the delta to a manager and verify the tree
2357        let manager = create_test_manager("node1".to_string());
2358        manager.apply_remote_tree_delta(restored, None);
2359
2360        let tree = manager.get_tree_state("token_model").unwrap();
2361        assert_eq!(tree.version, 1);
2362        assert_eq!(tree.operations.len(), 1);
2363    }
2364}