1use std::{
6 fmt::Debug,
7 sync::{atomic::Ordering, Arc},
8};
9
10use parking_lot::RwLock;
11use tracing::{debug, warn};
12
13use super::{
14 service::gossip::NodeStatus,
15 stores::{
16 policy_key, tree_state_key, PolicyState, RateLimitConfig, StateStores, WorkerState,
17 GLOBAL_RATE_LIMIT_COUNTER_KEY, GLOBAL_RATE_LIMIT_KEY,
18 },
19 tree_ops::{
20 hash_node_path, hash_token_path, TenantDelta, TenantEvict, TenantInsert, TreeKey,
21 TreeOperation, TreeState, TreeStateDelta,
22 },
23};
24
25pub trait TreeStateSubscriber: Send + Sync + Debug {
26 fn apply_remote_tree_state(&self, model_id: &str, tree_state: &TreeState);
27
28 fn apply_tenant_delta(
34 &self,
35 model_id: &str,
36 _inserts: &[TenantInsert],
37 evictions: &[TenantEvict],
38 ) {
39 let global_evictions: Vec<&TenantEvict> = evictions
43 .iter()
44 .filter(|e| e.node_path_hash == crate::tree_ops::GLOBAL_EVICTION_HASH)
45 .collect();
46
47 if !global_evictions.is_empty() {
48 let mut tree_state = TreeState::new(model_id.to_string());
49 for evict in global_evictions {
50 tree_state.add_operation(TreeOperation::Remove(crate::tree_ops::TreeRemoveOp {
51 tenant: evict.worker_url.clone(),
52 }));
53 }
54 self.apply_remote_tree_state(model_id, &tree_state);
55 }
56 }
57
58 fn export_tree_state(&self, _model_id: &str) -> Option<TreeState> {
63 None
64 }
65
66 fn export_tree_snapshot(&self, _model_id: &str) -> Option<kv_index::snapshot::TreeSnapshot> {
74 None
75 }
76}
77
78pub trait WorkerStateSubscriber: Send + Sync + Debug {
79 fn on_remote_worker_state(&self, state: &WorkerState);
80}
81
82#[derive(Clone, Debug)]
84pub struct MeshSyncManager {
85 pub(crate) stores: Arc<StateStores>,
86 self_name: String,
87 tree_state_subscribers: Arc<RwLock<Vec<Arc<dyn TreeStateSubscriber>>>>,
88 worker_state_subscribers: Arc<RwLock<Vec<Arc<dyn WorkerStateSubscriber>>>>,
89}
90
91impl MeshSyncManager {
92 pub fn new(stores: Arc<StateStores>, self_name: String) -> Self {
93 Self {
94 stores,
95 self_name,
96 tree_state_subscribers: Arc::new(RwLock::new(Vec::new())),
97 worker_state_subscribers: Arc::new(RwLock::new(Vec::new())),
98 }
99 }
100
101 pub fn register_tree_state_subscriber(&self, subscriber: Arc<dyn TreeStateSubscriber>) {
102 self.tree_state_subscribers.write().push(subscriber);
103 }
104
105 fn notify_tree_state_subscribers(&self, model_id: &str, tree_state: &TreeState) {
106 let subscribers = self.tree_state_subscribers.read().clone();
107 for subscriber in subscribers {
108 subscriber.apply_remote_tree_state(model_id, tree_state);
109 }
110 }
111
112 pub fn register_worker_state_subscriber(&self, subscriber: Arc<dyn WorkerStateSubscriber>) {
113 self.worker_state_subscribers.write().push(subscriber);
114 }
115
116 fn notify_worker_state_subscribers(&self, state: &WorkerState) {
117 let subscribers = self.worker_state_subscribers.read().clone();
118 for subscriber in subscribers {
119 subscriber.on_remote_worker_state(state);
120 }
121 }
122
123 pub fn self_name(&self) -> &str {
125 &self.self_name
126 }
127
128 pub fn sync_worker_state(
130 &self,
131 worker_id: String,
132 model_id: String,
133 url: String,
134 health: bool,
135 load: f64,
136 spec: Vec<u8>,
137 ) {
138 let key = worker_id.clone();
139
140 let updated_state = self.stores.worker.update(key, |current| {
141 let new_version = current
142 .map(|state| state.version)
143 .unwrap_or(0)
144 .saturating_add(1);
145
146 WorkerState {
147 worker_id: worker_id.clone(),
148 model_id,
149 url,
150 health,
151 load,
152 version: new_version,
153 spec,
154 }
155 });
156
157 match updated_state {
158 Ok(Some(state)) => {
159 debug!(
160 "Synced worker state to mesh {} (version: {})",
161 state.worker_id, state.version
162 );
163 }
164 Ok(None) => {}
165 Err(err) => {
166 debug!(error = %err, worker_id = %worker_id, "Failed to sync worker state");
167 }
168 }
169 }
170
171 pub fn remove_worker_state(&self, worker_id: &str) {
173 self.stores.worker.remove(worker_id);
174 debug!("Removed worker state from mesh {}", worker_id);
175 }
176
177 pub fn sync_policy_state(&self, model_id: String, policy_type: String, config: Vec<u8>) {
179 let key = policy_key(&model_id);
180 let model_id_for_update = model_id.clone();
181
182 let updated_state = self.stores.policy.update(key, move |current| {
183 let new_version = current
184 .map(|state| state.version)
185 .unwrap_or(0)
186 .saturating_add(1);
187
188 PolicyState {
189 model_id: model_id_for_update,
190 policy_type,
191 config,
192 version: new_version,
193 }
194 });
195
196 match updated_state {
197 Ok(Some(state)) => {
198 debug!(
199 "Synced policy state to mesh model={} (version: {})",
200 state.model_id, state.version
201 );
202 }
203 Ok(None) => {}
204 Err(err) => {
205 debug!(error = %err, model_id = %model_id, "Failed to sync policy state");
206 }
207 }
208 }
209
210 pub fn remove_policy_state(&self, model_id: &str) {
212 let key = policy_key(model_id);
213 self.stores.policy.remove(&key);
214 debug!("Removed policy state from mesh model={}", model_id);
215 }
216
217 pub fn get_worker_state(&self, worker_id: &str) -> Option<WorkerState> {
219 self.stores.worker.get(worker_id)
220 }
221
222 pub fn get_all_worker_states(&self) -> Vec<WorkerState> {
224 self.stores.worker.all().into_values().collect()
225 }
226
227 pub fn get_policy_state(&self, model_id: &str) -> Option<PolicyState> {
229 let key = policy_key(model_id);
230 self.stores.policy.get(&key)
231 }
232
233 pub fn get_all_policy_states(&self) -> Vec<PolicyState> {
235 self.stores.policy.all().into_values().collect()
236 }
237
238 pub fn apply_remote_worker_state(&self, state: WorkerState, actor: Option<String>) {
241 let key = state.worker_id.clone();
242 let actor = actor.unwrap_or_else(|| "remote".to_string());
243 let mut current_version = 0;
244
245 let update_result = self.stores.worker.update_if(key, |current| {
246 current_version = current
247 .as_ref()
248 .map(|existing| existing.version)
249 .unwrap_or(0);
250 if state.version > current_version {
251 Some(state.clone())
252 } else {
253 None
254 }
255 });
256
257 match update_result {
258 Ok((_, true)) => {
259 debug!(
260 "Applied remote worker state update: {} (version: {} -> {})",
261 state.worker_id, current_version, state.version
262 );
263 self.notify_worker_state_subscribers(&state);
264 }
265 Ok((_, false)) => {
266 debug!(
267 "Skipped remote worker state update: {} (version {} <= current {})",
268 state.worker_id, state.version, current_version
269 );
270 }
271 Err(err) => {
272 debug!(error = %err, worker_id = %state.worker_id, actor = %actor, "Failed to apply remote worker state update");
273 }
274 }
275 }
276
277 pub fn apply_remote_policy_state(&self, state: PolicyState, actor: Option<String>) {
280 let key = policy_key(&state.model_id);
281 let actor = actor.unwrap_or_else(|| "remote".to_string());
282 let mut current_version = 0;
283
284 let update_result = self.stores.policy.update_if(key, |current| {
285 current_version = current
286 .as_ref()
287 .map(|existing| existing.version)
288 .unwrap_or(0);
289 if state.version > current_version {
290 Some(state.clone())
291 } else {
292 None
293 }
294 });
295
296 match update_result {
297 Ok((_, true)) => {
298 debug!(
299 "Applied remote policy state update: {} (version: {} -> {})",
300 state.model_id, current_version, state.version
301 );
302 }
303 Ok((_, false)) => {
304 debug!(
305 "Skipped remote policy state update: {} (version {} <= current {})",
306 state.model_id, state.version, current_version
307 );
308 }
309 Err(err) => {
310 debug!(error = %err, model_id = %state.model_id, actor = %actor, "Failed to apply remote policy state update");
311 }
312 }
313 }
314
315 pub fn update_rate_limit_membership(&self) {
317 let all_members = self.stores.membership.all();
319 let alive_nodes: Vec<String> = all_members
320 .values()
321 .filter(|m| m.status == NodeStatus::Alive as i32)
322 .map(|m| m.name.clone())
323 .collect();
324
325 self.stores.rate_limit.update_membership(&alive_nodes);
326 debug!(
327 "Updated rate-limit hash ring with {} alive nodes",
328 alive_nodes.len()
329 );
330 }
331
332 pub fn handle_node_failure(&self, failed_nodes: &[String]) {
334 if failed_nodes.is_empty() {
335 return;
336 }
337
338 debug!("Handling node failure for rate-limit: {:?}", failed_nodes);
339
340 let affected_keys = self
342 .stores
343 .rate_limit
344 .check_ownership_transfer(failed_nodes);
345
346 if !affected_keys.is_empty() {
347 debug!(
348 "Ownership transfer needed for {} rate-limit keys",
349 affected_keys.len()
350 );
351
352 self.update_rate_limit_membership();
354
355 for key in &affected_keys {
357 if self.stores.rate_limit.is_owner(key) {
358 debug!("This node is now owner of rate-limit key: {}", key);
359 }
361 }
362 }
363 }
364
365 pub fn sync_rate_limit_inc(&self, key: String, delta: i64) {
367 if !self.stores.rate_limit.is_owner(&key) {
368 return;
370 }
371
372 self.stores
373 .rate_limit
374 .inc(key.clone(), self.self_name.clone(), delta);
375 debug!("Synced rate-limit increment: key={}, delta={}", key, delta);
376 }
377
378 pub fn apply_remote_rate_limit_counter(&self, log: &super::crdt_kv::OperationLog) {
380 self.stores.rate_limit.merge(log);
382 debug!("Applied remote rate-limit counter update");
383 }
384
385 pub fn apply_remote_rate_limit_counter_value(&self, key: String, counter_value: i64) {
387 self.apply_remote_rate_limit_counter_value_with_actor_and_timestamp(
388 key,
389 "remote".to_string(),
390 counter_value,
391 0,
392 );
393 }
394
395 pub fn apply_remote_rate_limit_counter_value_with_actor(
396 &self,
397 key: String,
398 actor: String,
399 counter_value: i64,
400 ) {
401 self.apply_remote_rate_limit_counter_value_with_actor_and_timestamp(
402 key,
403 actor,
404 counter_value,
405 0,
406 );
407 }
408
409 pub fn apply_remote_rate_limit_counter_value_with_actor_and_timestamp(
410 &self,
411 key: String,
412 actor: String,
413 counter_value: i64,
414 timestamp: u64,
415 ) {
416 if let Some((shard_key, payload)) =
417 super::stores::RateLimitStore::snapshot_payload_for_counter_value(
418 key,
419 actor.clone(),
420 counter_value,
421 )
422 {
423 self.stores
424 .rate_limit
425 .apply_counter_snapshot_payload(shard_key, &actor, timestamp, &payload);
426 debug!("Applied remote rate-limit counter snapshot payload");
427 }
428 }
429
430 pub fn get_rate_limit_value(&self, key: &str) -> Option<i64> {
432 self.stores.rate_limit.value(key)
433 }
434
435 pub fn get_global_rate_limit_config(&self) -> Option<RateLimitConfig> {
437 self.stores
438 .app
439 .get(GLOBAL_RATE_LIMIT_KEY)
440 .and_then(|app_state| bincode::deserialize::<RateLimitConfig>(&app_state.value).ok())
441 }
442
443 pub fn check_global_rate_limit(&self) -> (bool, i64, u64) {
446 let config = self.get_global_rate_limit_config().unwrap_or_default();
447
448 if config.limit_per_second == 0 {
449 return (false, 0, 0);
451 }
452
453 self.sync_rate_limit_inc(GLOBAL_RATE_LIMIT_COUNTER_KEY.to_string(), 1);
455
456 let current_count = self
458 .get_rate_limit_value(GLOBAL_RATE_LIMIT_COUNTER_KEY)
459 .unwrap_or(0);
460
461 let is_exceeded = current_count > config.limit_per_second as i64;
462 (is_exceeded, current_count, config.limit_per_second)
463 }
464
465 pub fn reset_global_rate_limit_counter(&self) {
467 let current_count = self
472 .get_rate_limit_value(GLOBAL_RATE_LIMIT_COUNTER_KEY)
473 .unwrap_or(0);
474
475 if current_count > 0 {
476 self.sync_rate_limit_inc(GLOBAL_RATE_LIMIT_COUNTER_KEY.to_string(), -current_count);
480 }
481 }
482
483 pub fn sync_tree_insert_hash(&self, model_id: &str, path_hash: u64, tenant: &str) {
497 let key = tree_state_key(model_id);
498
499 self.stores
500 .tenant_delta_inserts
501 .entry(model_id.to_string())
502 .or_default()
503 .push(TenantInsert {
504 node_path_hash: path_hash,
505 worker_url: tenant.to_string(),
506 epoch: self.stores.tree_version(&key),
507 });
508
509 self.stores.bump_tree_version(&key);
510 }
511
512 #[expect(
513 clippy::unnecessary_wraps,
514 reason = "Public API — callers handle Result; changing return type is a cross-crate break"
515 )]
516 pub fn sync_tree_operation(
517 &self,
518 model_id: String,
519 operation: TreeOperation,
520 ) -> Result<(), String> {
521 let key = tree_state_key(&model_id);
522
523 match &operation {
526 TreeOperation::Insert(insert) => {
527 let path_hash = match &insert.key {
528 TreeKey::Text(text) => hash_node_path(text),
529 TreeKey::Tokens(tokens) => hash_token_path(tokens),
530 };
531 self.stores
532 .tenant_delta_inserts
533 .entry(model_id.clone())
534 .or_default()
535 .push(TenantInsert {
536 node_path_hash: path_hash,
537 worker_url: insert.tenant.clone(),
538 epoch: self.stores.tree_version(&key),
539 });
540 }
541 TreeOperation::Remove(remove) => {
542 self.stores
547 .tenant_delta_evictions
548 .entry(model_id.clone())
549 .or_default()
550 .push(TenantEvict {
551 node_path_hash: crate::tree_ops::GLOBAL_EVICTION_HASH,
552 worker_url: remove.tenant.clone(),
553 });
554 }
555 }
556
557 self.stores.bump_tree_version(&key);
564
565 Ok(())
566 }
567
568 fn materialize_tree_state(&self, key: &str, model_id: &str) -> Option<TreeState> {
575 let config_bytes = self.stores.tree_configs.get(key)?;
576 let bytes = config_bytes.value();
577 if bytes.is_empty() {
578 return Some(TreeState::new(model_id.to_string()));
579 }
580 if let Ok(ts) = TreeState::from_bytes(bytes) {
582 return Some(ts);
583 }
584 if let Ok(snap) = kv_index::snapshot::TreeSnapshot::from_bytes(bytes) {
586 let version = self.stores.tree_version(key);
587 return Some(TreeState::from_snapshot(
588 model_id.to_string(),
589 &snap,
590 version,
591 ));
592 }
593 None
594 }
595
596 pub fn get_tree_state(&self, model_id: &str) -> Option<TreeState> {
599 let key = tree_state_key(model_id);
600 self.materialize_tree_state(&key, model_id)
601 }
602
603 pub fn get_all_tree_states(&self) -> Vec<TreeState> {
604 let mut results = Vec::new();
605
606 for entry in &self.stores.tree_configs {
607 let key = entry.key().clone();
608 let model_id = key.strip_prefix("tree:").unwrap_or(&key).to_string();
609 if let Some(ts) = self.materialize_tree_state(&key, &model_id) {
610 results.push(ts);
611 }
612 }
613
614 results
615 }
616
617 pub fn apply_remote_tree_operation(
626 &self,
627 model_id: String,
628 tree_state: TreeState,
629 actor: Option<String>,
630 ) {
631 use dashmap::mapref::entry::Entry;
632
633 let key = tree_state_key(&model_id);
634 let _actor = actor.unwrap_or_else(|| "remote".to_string());
635
636 let serialized = match tree_state.to_bytes() {
637 Ok(bytes) => bytes,
638 Err(err) => {
639 debug!(error = %err, model_id = %model_id, "Failed to serialize remote tree state");
640 return;
641 }
642 };
643
644 let applied = match self.stores.tree_configs.entry(key.clone()) {
647 Entry::Occupied(mut entry) => {
648 let current_version = TreeState::from_bytes(entry.get())
652 .ok()
653 .map(|ts| ts.version)
654 .unwrap_or_else(|| self.stores.tree_version(&key));
655 if tree_state.version > current_version {
656 entry.insert(serialized);
657 debug!(
658 "Applied remote tree state update: model={} (version: {} -> {})",
659 model_id, current_version, tree_state.version
660 );
661 true
662 } else {
663 debug!(
664 "Skipped remote tree state update: model={} (version {} <= current {})",
665 model_id, tree_state.version, current_version
666 );
667 false
668 }
669 }
670 Entry::Vacant(entry) => {
671 entry.insert(serialized);
672 debug!(
673 "Applied remote tree state update (new): model={} (version: {})",
674 model_id, tree_state.version
675 );
676 true
677 }
678 };
679
680 if applied {
683 self.stores.advance_tree_version(&key, tree_state.version);
684 self.stores.tree_generation.fetch_add(1, Ordering::Release);
685 self.notify_tree_state_subscribers(&model_id, &tree_state);
686 }
687 }
688
689 pub fn apply_remote_tree_delta(&self, delta: TreeStateDelta, actor: Option<String>) {
696 use dashmap::mapref::entry::Entry;
697
698 let key = tree_state_key(&delta.model_id);
699 let _actor = actor.unwrap_or_else(|| "remote".to_string());
700 let model_id = delta.model_id.clone();
701 let ops_count = delta.operations.len();
702
703 let result: Option<(TreeState, u64)> = match self.stores.tree_configs.entry(key.clone()) {
707 Entry::Occupied(mut entry) => {
708 let bytes = entry.get();
709 let current_version = if bytes.is_empty() {
710 0
711 } else {
712 match TreeState::from_bytes(bytes) {
713 Ok(ts) => ts.version,
714 Err(_) => 0,
715 }
716 };
717
718 if delta.base_version > current_version || current_version >= delta.new_version {
720 debug!(
721 "Skipped remote tree delta: model={} (base_version={}, new_version={}, current={})",
722 model_id, delta.base_version, delta.new_version, current_version
723 );
724 return;
725 }
726
727 let mut tree_state = if bytes.is_empty() {
729 if current_version > 0 {
730 debug!(
731 "Skipped remote tree delta: model={} (base_version={}, new_version={}, current={})",
732 model_id, delta.base_version, delta.new_version, current_version
733 );
734 return;
735 }
736 TreeState::new(delta.model_id.clone())
737 } else {
738 match TreeState::from_bytes(bytes) {
739 Ok(state) => state,
740 Err(err) => {
741 warn!(
742 model_id = %delta.model_id,
743 error = %err,
744 "Corrupted tree state — rejecting delta to avoid data loss"
745 );
746 return;
747 }
748 }
749 };
750
751 let old_version = current_version;
752 for op in &delta.operations {
753 tree_state.add_operation(op.clone());
754 }
755 let new_version = tree_state.version;
756
757 match tree_state.to_bytes() {
758 Ok(serialized) => {
759 entry.insert(serialized);
760 debug!(
761 "Applied remote tree delta: model={} (version: {} -> +{} ops)",
762 model_id, old_version, ops_count
763 );
764 Some((tree_state, new_version))
765 }
766 Err(err) => {
767 debug!(error = %err, model_id = %model_id, "Failed to serialize tree state after delta apply");
768 None
769 }
770 }
771 }
772 Entry::Vacant(entry) => {
773 if delta.base_version > 0 {
775 debug!(
776 "Skipped remote tree delta: model={} (base_version={}, new_version={}, no local state)",
777 model_id, delta.base_version, delta.new_version
778 );
779 return;
780 }
781 let mut tree_state = TreeState::new(delta.model_id.clone());
782 for op in &delta.operations {
783 tree_state.add_operation(op.clone());
784 }
785 let new_version = tree_state.version;
786
787 match tree_state.to_bytes() {
788 Ok(serialized) => {
789 entry.insert(serialized);
790 debug!(
791 "Applied remote tree delta (new tree): model={} (+{} ops)",
792 model_id, ops_count
793 );
794 Some((tree_state, new_version))
795 }
796 Err(err) => {
797 debug!(error = %err, model_id = %model_id, "Failed to serialize new tree state from delta");
798 None
799 }
800 }
801 }
802 };
803
804 if let Some((tree_state, new_version)) = result {
806 self.stores.advance_tree_version(&key, new_version);
807 self.stores.tree_generation.fetch_add(1, Ordering::Release);
808 self.notify_tree_state_subscribers(&model_id, &tree_state);
809 }
810 }
811
812 pub fn apply_remote_tenant_delta(&self, delta: TenantDelta, _actor: Option<String>) {
816 let key = tree_state_key(&delta.model_id);
817
818 if delta.inserts.is_empty() && delta.evictions.is_empty() {
819 return;
820 }
821
822 debug!(
829 model_id = %delta.model_id,
830 inserts = delta.inserts.len(),
831 evictions = delta.evictions.len(),
832 version = delta.version,
833 "Applying remote tenant delta"
834 );
835
836 let subscribers = self.tree_state_subscribers.read().clone();
840 for subscriber in &subscribers {
841 subscriber.apply_tenant_delta(&delta.model_id, &delta.inserts, &delta.evictions);
842 }
843
844 self.stores.advance_tree_version(&key, delta.version);
846 self.stores.tree_generation.fetch_add(1, Ordering::Release);
847 }
848
849 #[expect(
861 clippy::unused_self,
862 reason = "Public API called by controller — removing &self is a breaking change"
863 )]
864 pub fn checkpoint_tree_states(&self) {
865 }
872}
873
874pub type OptionalMeshSyncManager = Option<Arc<MeshSyncManager>>;
876
877#[cfg(test)]
878mod tests {
879 use std::{
880 collections::BTreeMap,
881 sync::{
882 atomic::{AtomicBool, Ordering},
883 Arc,
884 },
885 };
886
887 use super::*;
888 use crate::stores::{
889 AppState, MembershipState, RateLimitConfig, StateStores, GLOBAL_RATE_LIMIT_COUNTER_KEY,
890 GLOBAL_RATE_LIMIT_KEY,
891 };
892
893 fn create_test_sync_manager() -> MeshSyncManager {
894 let stores = Arc::new(StateStores::new());
895 MeshSyncManager::new(stores, "test_node".to_string())
896 }
897
898 fn create_test_manager(self_name: String) -> MeshSyncManager {
899 let stores = Arc::new(StateStores::with_self_name(self_name.clone()));
900 MeshSyncManager::new(stores, self_name)
901 }
902
903 #[derive(Debug)]
904 struct LockCheckingSubscriber {
905 manager: Arc<MeshSyncManager>,
906 can_acquire_write_lock: Arc<AtomicBool>,
907 }
908
909 impl TreeStateSubscriber for LockCheckingSubscriber {
910 fn apply_remote_tree_state(&self, _model_id: &str, _tree_state: &TreeState) {
911 let can_acquire_write_lock = self.manager.tree_state_subscribers.try_write().is_some();
912 self.can_acquire_write_lock
913 .store(can_acquire_write_lock, Ordering::SeqCst);
914 }
915 }
916
917 #[test]
918 fn test_sync_manager_new() {
919 let manager = create_test_sync_manager();
920 assert_eq!(manager.get_all_worker_states().len(), 0);
922 assert_eq!(manager.get_all_policy_states().len(), 0);
923 }
924
925 #[test]
926 fn test_sync_worker_state() {
927 let manager = create_test_manager("node1".to_string());
928
929 manager.sync_worker_state(
930 "worker1".to_string(),
931 "model1".to_string(),
932 "http://localhost:8000".to_string(),
933 true,
934 0.5,
935 vec![],
936 );
937
938 let state = manager.get_worker_state("worker1").unwrap();
939 assert_eq!(state.worker_id, "worker1");
940 assert_eq!(state.model_id, "model1");
941 assert_eq!(state.url, "http://localhost:8000");
942 assert!(state.health);
943 assert_eq!(state.load, 0.5);
944 assert_eq!(state.version, 1);
945 }
946
947 #[test]
948 fn test_sync_multiple_worker_states() {
949 let manager = create_test_sync_manager();
950
951 manager.sync_worker_state(
952 "worker1".to_string(),
953 "model1".to_string(),
954 "http://localhost:8000".to_string(),
955 true,
956 0.5,
957 vec![],
958 );
959
960 manager.sync_worker_state(
961 "worker2".to_string(),
962 "model1".to_string(),
963 "http://localhost:8001".to_string(),
964 false,
965 0.8,
966 vec![],
967 );
968
969 manager.sync_worker_state(
970 "worker3".to_string(),
971 "model2".to_string(),
972 "http://localhost:8002".to_string(),
973 true,
974 0.3,
975 vec![],
976 );
977
978 let all_states = manager.get_all_worker_states();
979 assert_eq!(all_states.len(), 3);
980
981 let worker1 = manager.get_worker_state("worker1").unwrap();
982 assert_eq!(worker1.worker_id, "worker1");
983 assert!(worker1.health);
984
985 let worker2 = manager.get_worker_state("worker2").unwrap();
986 assert_eq!(worker2.worker_id, "worker2");
987 assert!(!worker2.health);
988
989 let worker3 = manager.get_worker_state("worker3").unwrap();
990 assert_eq!(worker3.worker_id, "worker3");
991 assert_eq!(worker3.model_id, "model2");
992 }
993
994 #[test]
995 fn test_sync_worker_state_version_increment() {
996 let manager = create_test_manager("node1".to_string());
997
998 manager.sync_worker_state(
999 "worker1".to_string(),
1000 "model1".to_string(),
1001 "http://localhost:8000".to_string(),
1002 true,
1003 0.5,
1004 vec![],
1005 );
1006
1007 let state1 = manager.get_worker_state("worker1").unwrap();
1008 assert_eq!(state1.version, 1);
1009
1010 manager.sync_worker_state(
1011 "worker1".to_string(),
1012 "model1".to_string(),
1013 "http://localhost:8000".to_string(),
1014 false,
1015 0.8,
1016 vec![],
1017 );
1018
1019 let state2 = manager.get_worker_state("worker1").unwrap();
1020 assert_eq!(state2.version, 2);
1021 assert!(!state2.health);
1022 assert_eq!(state2.load, 0.8);
1023 }
1024
1025 #[test]
1026 fn test_remove_worker_state() {
1027 let manager = create_test_manager("node1".to_string());
1028
1029 manager.sync_worker_state(
1030 "worker1".to_string(),
1031 "model1".to_string(),
1032 "http://localhost:8000".to_string(),
1033 true,
1034 0.5,
1035 vec![],
1036 );
1037
1038 assert!(manager.get_worker_state("worker1").is_some());
1039
1040 manager.remove_worker_state("worker1");
1041
1042 assert!(manager.get_worker_state("worker1").is_none());
1043 assert_eq!(manager.get_all_worker_states().len(), 0);
1044 }
1045
1046 #[test]
1047 fn test_remove_nonexistent_worker_state() {
1048 let manager = create_test_sync_manager();
1049
1050 manager.remove_worker_state("nonexistent");
1052 assert!(manager.get_worker_state("nonexistent").is_none());
1053 }
1054
1055 #[test]
1056 fn test_sync_policy_state() {
1057 let manager = create_test_manager("node1".to_string());
1058
1059 manager.sync_policy_state(
1060 "model1".to_string(),
1061 "cache_aware".to_string(),
1062 b"config_data".to_vec(),
1063 );
1064
1065 let state = manager.get_policy_state("model1").unwrap();
1066 assert_eq!(state.model_id, "model1");
1067 assert_eq!(state.policy_type, "cache_aware");
1068 assert_eq!(state.config, b"config_data");
1069 assert_eq!(state.version, 1);
1070 }
1071
1072 #[test]
1073 fn test_sync_multiple_policy_states() {
1074 let manager = create_test_sync_manager();
1075
1076 manager.sync_policy_state(
1077 "model1".to_string(),
1078 "round_robin".to_string(),
1079 b"config1".to_vec(),
1080 );
1081
1082 manager.sync_policy_state(
1083 "model2".to_string(),
1084 "random".to_string(),
1085 b"config2".to_vec(),
1086 );
1087
1088 manager.sync_policy_state(
1089 "model3".to_string(),
1090 "consistent_hash".to_string(),
1091 b"config3".to_vec(),
1092 );
1093
1094 let all_states = manager.get_all_policy_states();
1095 assert_eq!(all_states.len(), 3);
1096
1097 let policy1 = manager.get_policy_state("model1").unwrap();
1098 assert_eq!(policy1.model_id, "model1");
1099 assert_eq!(policy1.policy_type, "round_robin");
1100
1101 let policy2 = manager.get_policy_state("model2").unwrap();
1102 assert_eq!(policy2.model_id, "model2");
1103 assert_eq!(policy2.policy_type, "random");
1104 }
1105
1106 #[test]
1107 fn test_remove_policy_state() {
1108 let manager = create_test_sync_manager();
1109
1110 manager.sync_policy_state(
1111 "model1".to_string(),
1112 "round_robin".to_string(),
1113 b"config".to_vec(),
1114 );
1115
1116 assert!(manager.get_policy_state("model1").is_some());
1117
1118 manager.remove_policy_state("model1");
1119
1120 assert!(manager.get_policy_state("model1").is_none());
1121 assert_eq!(manager.get_all_policy_states().len(), 0);
1122 }
1123
1124 #[test]
1125 fn test_remove_nonexistent_policy_state() {
1126 let manager = create_test_sync_manager();
1127
1128 manager.remove_policy_state("nonexistent");
1130 assert!(manager.get_policy_state("nonexistent").is_none());
1131 }
1132
1133 #[test]
1134 fn test_apply_remote_worker_state() {
1135 let manager = create_test_manager("node1".to_string());
1136
1137 let remote_state = WorkerState {
1139 worker_id: "worker1".to_string(),
1140 model_id: "model1".to_string(),
1141 url: "http://localhost:8000".to_string(),
1142 health: true,
1143 load: 0.5,
1144 version: 5,
1145 spec: vec![],
1146 };
1147
1148 manager.apply_remote_worker_state(remote_state.clone(), Some("node2".to_string()));
1149
1150 let state = manager.get_worker_state("worker1").unwrap();
1151 assert_eq!(state.version, 5);
1152 }
1153
1154 #[test]
1155 fn test_apply_remote_worker_state_basic() {
1156 let manager = create_test_sync_manager();
1157
1158 let remote_state = WorkerState {
1159 worker_id: "remote_worker1".to_string(),
1160 model_id: "model1".to_string(),
1161 url: "http://localhost:8000".to_string(),
1162 health: true,
1163 load: 0.6,
1164 version: 1,
1165 spec: vec![],
1166 };
1167
1168 manager.apply_remote_worker_state(remote_state.clone(), None);
1169
1170 let state = manager.get_worker_state("remote_worker1");
1171 assert!(state.is_some());
1172 let state = state.unwrap();
1173 assert_eq!(state.worker_id, "remote_worker1");
1174 assert_eq!(state.model_id, "model1");
1175 assert_eq!(state.url, "http://localhost:8000");
1176 assert!(state.health);
1177 assert_eq!(state.load, 0.6);
1178 }
1179
1180 #[test]
1181 fn test_apply_remote_worker_state_version_check() {
1182 let manager = create_test_manager("node1".to_string());
1183
1184 manager.sync_worker_state(
1186 "worker1".to_string(),
1187 "model1".to_string(),
1188 "http://localhost:8000".to_string(),
1189 true,
1190 0.5,
1191 vec![],
1192 );
1193
1194 let old_state = WorkerState {
1196 worker_id: "worker1".to_string(),
1197 model_id: "model1".to_string(),
1198 url: "http://localhost:8000".to_string(),
1199 health: false,
1200 load: 0.8,
1201 version: 0, spec: vec![],
1203 };
1204
1205 manager.apply_remote_worker_state(old_state, Some("node2".to_string()));
1206
1207 let state = manager.get_worker_state("worker1").unwrap();
1209 assert_eq!(state.version, 1);
1210 assert!(state.health); }
1212
1213 #[test]
1214 fn test_apply_remote_policy_state() {
1215 let manager = create_test_sync_manager();
1216
1217 let remote_state = PolicyState {
1218 model_id: "model1".to_string(),
1219 policy_type: "remote_policy".to_string(),
1220 config: b"remote_config".to_vec(),
1221 version: 1,
1222 };
1223
1224 manager.apply_remote_policy_state(remote_state.clone(), None);
1225
1226 let state = manager.get_policy_state("model1");
1227 assert!(state.is_some());
1228 let state = state.unwrap();
1229 assert_eq!(state.model_id, "model1");
1230 assert_eq!(state.policy_type, "remote_policy");
1231 assert_eq!(state.config, b"remote_config");
1232 }
1233
1234 #[test]
1235 fn test_mixed_local_and_remote_states() {
1236 let manager = create_test_sync_manager();
1237
1238 manager.sync_worker_state(
1240 "local_worker".to_string(),
1241 "model1".to_string(),
1242 "http://localhost:8000".to_string(),
1243 true,
1244 0.5,
1245 vec![],
1246 );
1247
1248 let remote_state = WorkerState {
1250 worker_id: "remote_worker".to_string(),
1251 model_id: "model1".to_string(),
1252 url: "http://localhost:8001".to_string(),
1253 health: true,
1254 load: 0.7,
1255 version: 1,
1256 spec: vec![],
1257 };
1258 manager.apply_remote_worker_state(remote_state, None);
1259
1260 let all_states = manager.get_all_worker_states();
1261 assert_eq!(all_states.len(), 2);
1262
1263 assert!(manager.get_worker_state("local_worker").is_some());
1264 assert!(manager.get_worker_state("remote_worker").is_some());
1265 }
1266
1267 #[test]
1268 fn test_update_worker_state() {
1269 let manager = create_test_sync_manager();
1270
1271 manager.sync_worker_state(
1273 "worker1".to_string(),
1274 "model1".to_string(),
1275 "http://localhost:8000".to_string(),
1276 true,
1277 0.5,
1278 vec![],
1279 );
1280
1281 manager.sync_worker_state(
1283 "worker1".to_string(),
1284 "model1".to_string(),
1285 "http://localhost:8000".to_string(),
1286 false,
1287 0.9,
1288 vec![],
1289 );
1290
1291 let state = manager.get_worker_state("worker1").unwrap();
1292 assert!(!state.health);
1293 assert_eq!(state.load, 0.9);
1294 assert_eq!(manager.get_all_worker_states().len(), 1);
1295 }
1296
1297 #[test]
1298 fn test_update_policy_state() {
1299 let manager = create_test_sync_manager();
1300
1301 manager.sync_policy_state(
1303 "model1".to_string(),
1304 "round_robin".to_string(),
1305 b"config1".to_vec(),
1306 );
1307
1308 manager.sync_policy_state(
1310 "model1".to_string(),
1311 "random".to_string(),
1312 b"config2".to_vec(),
1313 );
1314
1315 let state = manager.get_policy_state("model1").unwrap();
1316 assert_eq!(state.policy_type, "random");
1317 assert_eq!(state.config, b"config2");
1318 assert_eq!(manager.get_all_policy_states().len(), 1);
1319 }
1320
1321 #[test]
1322 fn test_get_all_worker_states_empty() {
1323 let manager = create_test_sync_manager();
1324 let states = manager.get_all_worker_states();
1325 assert!(states.is_empty());
1326 }
1327
1328 #[test]
1329 fn test_get_all_policy_states_empty() {
1330 let manager = create_test_sync_manager();
1331 let states = manager.get_all_policy_states();
1332 assert!(states.is_empty());
1333 }
1334
1335 #[test]
1336 fn test_update_rate_limit_membership() {
1337 let manager = create_test_manager("node1".to_string());
1338
1339 let _ = manager.stores.membership.insert(
1341 "node1".to_string(),
1342 MembershipState {
1343 name: "node1".to_string(),
1344 address: "127.0.0.1:8000".to_string(),
1345 status: NodeStatus::Alive as i32,
1346 version: 1,
1347 metadata: BTreeMap::new(),
1348 },
1349 );
1350
1351 let _ = manager.stores.membership.insert(
1352 "node2".to_string(),
1353 MembershipState {
1354 name: "node2".to_string(),
1355 address: "127.0.0.1:8001".to_string(),
1356 status: NodeStatus::Alive as i32,
1357 version: 1,
1358 metadata: BTreeMap::new(),
1359 },
1360 );
1361
1362 manager.update_rate_limit_membership();
1363
1364 let owners = manager.stores.rate_limit.get_owners("test_key");
1366 assert!(!owners.is_empty());
1367 }
1368
1369 #[test]
1370 fn test_handle_node_failure() {
1371 let manager = create_test_manager("node1".to_string());
1372
1373 let _ = manager.stores.membership.insert(
1375 "node1".to_string(),
1376 MembershipState {
1377 name: "node1".to_string(),
1378 address: "127.0.0.1:8000".to_string(),
1379 status: NodeStatus::Alive as i32,
1380 version: 1,
1381 metadata: BTreeMap::new(),
1382 },
1383 );
1384
1385 let _ = manager.stores.membership.insert(
1386 "node2".to_string(),
1387 MembershipState {
1388 name: "node2".to_string(),
1389 address: "127.0.0.1:8001".to_string(),
1390 status: NodeStatus::Alive as i32,
1391 version: 1,
1392 metadata: BTreeMap::new(),
1393 },
1394 );
1395
1396 manager.update_rate_limit_membership();
1397
1398 manager.handle_node_failure(&["node2".to_string()]);
1400
1401 manager.update_rate_limit_membership();
1403 }
1404
1405 #[test]
1406 fn test_sync_rate_limit_inc() {
1407 let manager = create_test_manager("node1".to_string());
1408
1409 manager
1411 .stores
1412 .rate_limit
1413 .update_membership(&["node1".to_string()]);
1414
1415 let test_key = "test_key".to_string();
1416 if manager.stores.rate_limit.is_owner(&test_key) {
1417 manager.sync_rate_limit_inc(test_key.clone(), 5);
1418
1419 let value = manager.get_rate_limit_value(&test_key);
1420 assert_eq!(value, Some(5));
1421 }
1422 }
1423
1424 #[test]
1425 fn test_sync_rate_limit_inc_non_owner() {
1426 let manager = create_test_manager("node1".to_string());
1427
1428 manager
1430 .stores
1431 .rate_limit
1432 .update_membership(&["node2".to_string(), "node3".to_string()]);
1433
1434 let test_key = "test_key".to_string();
1435 if !manager.stores.rate_limit.is_owner(&test_key) {
1436 manager.sync_rate_limit_inc(test_key.clone(), 5);
1437
1438 let value = manager.get_rate_limit_value(&test_key);
1440 assert_eq!(value, None);
1441 }
1442 }
1443
1444 #[test]
1445 fn test_get_global_rate_limit_config() {
1446 let manager = create_test_manager("node1".to_string());
1447
1448 assert!(manager.get_global_rate_limit_config().is_none());
1450
1451 let config = RateLimitConfig {
1453 limit_per_second: 100,
1454 };
1455 let serialized = bincode::serialize(&config).unwrap();
1456 let _ = manager.stores.app.insert(
1457 GLOBAL_RATE_LIMIT_KEY.to_string(),
1458 AppState {
1459 key: GLOBAL_RATE_LIMIT_KEY.to_string(),
1460 value: serialized,
1461 version: 1,
1462 },
1463 );
1464
1465 let retrieved = manager.get_global_rate_limit_config().unwrap();
1466 assert_eq!(retrieved.limit_per_second, 100);
1467 }
1468
1469 #[test]
1470 fn test_check_global_rate_limit() {
1471 let manager = create_test_manager("node1".to_string());
1472
1473 let config = RateLimitConfig {
1475 limit_per_second: 10,
1476 };
1477 let serialized = bincode::serialize(&config).unwrap();
1478 let _ = manager.stores.app.insert(
1479 GLOBAL_RATE_LIMIT_KEY.to_string(),
1480 AppState {
1481 key: GLOBAL_RATE_LIMIT_KEY.to_string(),
1482 value: serialized,
1483 version: 1,
1484 },
1485 );
1486
1487 manager
1489 .stores
1490 .rate_limit
1491 .update_membership(&["node1".to_string()]);
1492
1493 let (is_exceeded, _current_count, limit) = manager.check_global_rate_limit();
1495 assert!(!is_exceeded); assert_eq!(limit, 10);
1497
1498 for _ in 0..15 {
1500 manager.check_global_rate_limit();
1501 }
1502
1503 let (is_exceeded2, current_count2, _) = manager.check_global_rate_limit();
1504 assert!(is_exceeded2 || current_count2 > 10);
1506 }
1507
1508 #[test]
1509 fn test_reset_global_rate_limit_counter() {
1510 let manager = create_test_manager("node1".to_string());
1511
1512 manager
1514 .stores
1515 .rate_limit
1516 .update_membership(&["node1".to_string()]);
1517
1518 if manager
1520 .stores
1521 .rate_limit
1522 .is_owner(GLOBAL_RATE_LIMIT_COUNTER_KEY)
1523 {
1524 manager.sync_rate_limit_inc(GLOBAL_RATE_LIMIT_COUNTER_KEY.to_string(), 10);
1525 let value = manager.get_rate_limit_value(GLOBAL_RATE_LIMIT_COUNTER_KEY);
1526 assert!(value.is_some() && value.unwrap() > 0);
1527
1528 manager.reset_global_rate_limit_counter();
1530 let value_after = manager.get_rate_limit_value(GLOBAL_RATE_LIMIT_COUNTER_KEY);
1531 assert!(value_after.is_none() || value_after.unwrap() <= 0);
1533 }
1534 }
1535
1536 #[test]
1537 fn test_sync_tree_operation() {
1538 let manager = create_test_manager("node1".to_string());
1539
1540 use crate::tree_ops::{TreeInsertOp, TreeKey, TreeOperation};
1541
1542 let op = TreeOperation::Insert(TreeInsertOp {
1543 key: TreeKey::Text("test_text".to_string()),
1544 tenant: "http://localhost:8000".to_string(),
1545 });
1546
1547 let result = manager.sync_tree_operation("model1".to_string(), op);
1548 assert!(result.is_ok());
1549
1550 let inserts = manager.stores.tenant_delta_inserts.get("model1").unwrap();
1554 assert_eq!(inserts.len(), 1);
1555 assert_eq!(inserts[0].worker_url, "http://localhost:8000");
1556 assert_eq!(inserts[0].node_path_hash, hash_node_path("test_text"));
1557 }
1558
1559 #[test]
1560 fn test_get_tree_state() {
1561 let manager = create_test_manager("node1".to_string());
1562
1563 assert!(manager.get_tree_state("model1").is_none());
1565
1566 use crate::tree_ops::{TreeInsertOp, TreeKey, TreeOperation};
1571 let op = TreeOperation::Insert(TreeInsertOp {
1572 key: TreeKey::Text("test_text".to_string()),
1573 tenant: "http://localhost:8000".to_string(),
1574 });
1575 manager
1576 .sync_tree_operation("model1".to_string(), op)
1577 .unwrap();
1578
1579 assert!(manager.get_tree_state("model1").is_none());
1581 assert!(manager.stores.tenant_delta_inserts.get("model1").is_some());
1583 }
1584
1585 #[test]
1586 fn test_apply_remote_tree_operation() {
1587 let manager = create_test_manager("node1".to_string());
1588
1589 use crate::tree_ops::{TreeInsertOp, TreeKey, TreeOperation, TreeState};
1590
1591 let mut tree_state = TreeState::new("model1".to_string());
1592 tree_state.version = 5;
1593 tree_state.add_operation(TreeOperation::Insert(TreeInsertOp {
1594 key: TreeKey::Text("remote_text".to_string()),
1595 tenant: "http://localhost:8001".to_string(),
1596 }));
1597 manager.apply_remote_tree_operation(
1600 "model1".to_string(),
1601 tree_state,
1602 Some("node2".to_string()),
1603 );
1604
1605 let retrieved = manager.get_tree_state("model1").unwrap();
1606 assert_eq!(retrieved.version, 6); assert_eq!(retrieved.operations.len(), 1);
1608 }
1609
1610 #[test]
1611 fn test_notify_tree_state_subscribers_drops_lock_before_callback() {
1612 let manager = Arc::new(create_test_manager("node1".to_string()));
1613 let can_acquire_write_lock = Arc::new(AtomicBool::new(false));
1614 let subscriber = Arc::new(LockCheckingSubscriber {
1615 manager: manager.clone(),
1616 can_acquire_write_lock: can_acquire_write_lock.clone(),
1617 });
1618
1619 manager.register_tree_state_subscriber(subscriber);
1620 manager.notify_tree_state_subscribers("model1", &TreeState::new("model1".to_string()));
1621
1622 assert!(can_acquire_write_lock.load(Ordering::SeqCst));
1623 }
1624
1625 #[test]
1626 fn test_get_all_tree_states() {
1627 let manager = create_test_manager("node1".to_string());
1628
1629 let mut ts1 = TreeState::new("model1".to_string());
1633 ts1.add_operation(make_insert_op("alpha", "http://localhost:8000"));
1634 let mut ts2 = TreeState::new("model2".to_string());
1635 ts2.add_operation(make_insert_op("beta", "http://localhost:8001"));
1636
1637 manager
1638 .stores
1639 .tree_configs
1640 .insert("tree:model1".to_string(), ts1.to_bytes().unwrap());
1641 manager
1642 .stores
1643 .tree_configs
1644 .insert("tree:model2".to_string(), ts2.to_bytes().unwrap());
1645
1646 let mut states = manager.get_all_tree_states();
1647 states.sort_by(|left, right| left.model_id.cmp(&right.model_id));
1648
1649 assert_eq!(states.len(), 2);
1650 assert_eq!(states[0].model_id, "model1");
1651 assert_eq!(states[1].model_id, "model2");
1652 }
1653
1654 #[test]
1655 fn test_get_all_worker_states() {
1656 let manager = create_test_manager("node1".to_string());
1657
1658 manager.sync_worker_state(
1659 "worker1".to_string(),
1660 "model1".to_string(),
1661 "http://localhost:8000".to_string(),
1662 true,
1663 0.5,
1664 vec![],
1665 );
1666
1667 manager.sync_worker_state(
1668 "worker2".to_string(),
1669 "model2".to_string(),
1670 "http://localhost:8001".to_string(),
1671 false,
1672 0.8,
1673 vec![],
1674 );
1675
1676 let all_states = manager.get_all_worker_states();
1677 assert_eq!(all_states.len(), 2);
1678 }
1679
1680 #[test]
1681 fn test_get_all_policy_states() {
1682 let manager = create_test_manager("node1".to_string());
1683
1684 manager.sync_policy_state("model1".to_string(), "cache_aware".to_string(), vec![]);
1685
1686 manager.sync_policy_state("model2".to_string(), "round_robin".to_string(), vec![]);
1687
1688 let all_states = manager.get_all_policy_states();
1689 assert_eq!(all_states.len(), 2);
1690 }
1691
1692 use crate::tree_ops::{TreeInsertOp, TreeKey, TreeOperation, TreeRemoveOp, TreeStateDelta};
1695
1696 fn make_insert_op(text: &str, tenant: &str) -> TreeOperation {
1697 TreeOperation::Insert(TreeInsertOp {
1698 key: TreeKey::Text(text.to_string()),
1699 tenant: tenant.to_string(),
1700 })
1701 }
1702
1703 fn make_delta(model_id: &str, ops: Vec<TreeOperation>, base: u64, new: u64) -> TreeStateDelta {
1704 TreeStateDelta {
1705 model_id: model_id.to_string(),
1706 operations: ops,
1707 base_version: base,
1708 new_version: new,
1709 }
1710 }
1711
1712 #[test]
1713 fn test_delta_basic_apply() {
1714 let manager = create_test_manager("node1".to_string());
1715
1716 let ops = vec![
1717 make_insert_op("a", "http://w1:8000"),
1718 make_insert_op("b", "http://w2:8000"),
1719 make_insert_op("c", "http://w3:8000"),
1720 ];
1721
1722 let delta = make_delta("model1", ops, 0, 3);
1723 manager.apply_remote_tree_delta(delta, Some("node2".to_string()));
1724
1725 let tree = manager.get_tree_state("model1").unwrap();
1726 assert_eq!(tree.version, 3);
1727 assert_eq!(tree.operations.len(), 3);
1728 }
1729
1730 #[test]
1731 fn test_delta_version_check_rejects_gap() {
1732 let manager = create_test_manager("node1".to_string());
1733
1734 let mut seed = TreeState::new("model1".to_string());
1736 for i in 0..10 {
1737 seed.add_operation(make_insert_op(&format!("seed_{i}"), "http://w:8000"));
1738 }
1739 assert_eq!(seed.version, 10);
1740 manager.apply_remote_tree_operation("model1".to_string(), seed, Some("seed".to_string()));
1741
1742 let delta_ok = make_delta("model1", vec![make_insert_op("ok", "http://w:8000")], 5, 11);
1744 manager.apply_remote_tree_delta(delta_ok, None);
1745 let tree = manager.get_tree_state("model1").unwrap();
1746 assert_eq!(tree.version, 11);
1747
1748 let delta_gap = make_delta(
1750 "model1",
1751 vec![make_insert_op("gap", "http://w:8000")],
1752 20,
1753 21,
1754 );
1755 manager.apply_remote_tree_delta(delta_gap, None);
1756 let tree = manager.get_tree_state("model1").unwrap();
1757 assert_eq!(tree.version, 11);
1759 }
1760
1761 #[test]
1762 fn test_delta_concurrent_apply() {
1763 let manager = Arc::new(create_test_manager("node1".to_string()));
1764
1765 let m1 = manager.clone();
1768 let m2 = manager.clone();
1769
1770 let t1 = std::thread::spawn(move || {
1771 let delta = make_delta("model1", vec![make_insert_op("t1", "http://w1:8000")], 0, 1);
1772 m1.apply_remote_tree_delta(delta, Some("thread1".to_string()));
1773 });
1774
1775 let t2 = std::thread::spawn(move || {
1776 let delta = make_delta("model1", vec![make_insert_op("t2", "http://w2:8000")], 0, 1);
1777 m2.apply_remote_tree_delta(delta, Some("thread2".to_string()));
1778 });
1779
1780 t1.join().unwrap();
1781 t2.join().unwrap();
1782
1783 let tree = manager.get_tree_state("model1").unwrap();
1785 assert!(tree.version >= 1);
1786 assert!(!tree.operations.is_empty());
1787 }
1788
1789 #[test]
1790 fn test_delta_empty_tree() {
1791 let manager = create_test_manager("node1".to_string());
1792
1793 assert!(manager.get_tree_state("new_model").is_none());
1795
1796 let delta = make_delta(
1797 "new_model",
1798 vec![make_insert_op("first", "http://w1:8000")],
1799 0,
1800 1,
1801 );
1802 manager.apply_remote_tree_delta(delta, None);
1803
1804 let tree = manager.get_tree_state("new_model").unwrap();
1805 assert_eq!(tree.model_id, "new_model");
1806 assert_eq!(tree.version, 1);
1807 assert_eq!(tree.operations.len(), 1);
1808 }
1809
1810 #[test]
1811 fn test_delta_notifies_subscribers() {
1812 let manager = Arc::new(create_test_manager("node1".to_string()));
1813 let notified = Arc::new(AtomicBool::new(false));
1814
1815 #[derive(Debug)]
1816 struct FlagSubscriber(Arc<AtomicBool>);
1817 impl TreeStateSubscriber for FlagSubscriber {
1818 fn apply_remote_tree_state(&self, _model_id: &str, _tree_state: &TreeState) {
1819 self.0.store(true, Ordering::SeqCst);
1820 }
1821 }
1822
1823 manager.register_tree_state_subscriber(Arc::new(FlagSubscriber(notified.clone())));
1824
1825 let delta = make_delta("model1", vec![make_insert_op("x", "http://w:8000")], 0, 1);
1826 manager.apply_remote_tree_delta(delta, None);
1827
1828 assert!(
1829 notified.load(Ordering::SeqCst),
1830 "subscriber was not notified after delta apply"
1831 );
1832 }
1833
1834 #[test]
1835 fn test_collector_sends_tenant_delta() {
1836 use crate::{
1837 incremental::IncrementalUpdateCollector, stores::StoreType, tree_ops::TenantDelta,
1838 };
1839
1840 let stores = Arc::new(StateStores::with_self_name("node1".to_string()));
1841 let manager = MeshSyncManager::new(stores.clone(), "node1".to_string());
1842
1843 manager
1845 .sync_tree_operation(
1846 "model1".to_string(),
1847 make_insert_op("hello world", "http://w:8000"),
1848 )
1849 .unwrap();
1850
1851 let collector = IncrementalUpdateCollector::new(stores.clone(), "node1".to_string());
1852 let updates = collector.collect_updates_for_store(StoreType::Policy);
1853
1854 assert!(!updates.is_empty(), "expected at least one policy update");
1855
1856 let tree_update = updates
1858 .iter()
1859 .find(|u| u.key.starts_with("tree:"))
1860 .expect("expected a tree key update");
1861
1862 let policy_state: PolicyState =
1863 bincode::deserialize(&tree_update.value).expect("deserialize PolicyState");
1864 assert_eq!(
1865 policy_state.policy_type, "tenant_delta",
1866 "expected tenant_delta, got {}",
1867 policy_state.policy_type
1868 );
1869
1870 let delta = TenantDelta::from_bytes(&policy_state.config).expect("deserialize TenantDelta");
1872 assert_eq!(delta.model_id, "model1");
1873 assert_eq!(delta.inserts.len(), 1);
1874 assert_eq!(delta.inserts[0].worker_url, "http://w:8000");
1875 assert_eq!(
1876 delta.inserts[0].node_path_hash,
1877 hash_node_path("hello world")
1878 );
1879 assert!(delta.evictions.is_empty());
1880 }
1881
1882 #[test]
1883 fn test_collector_falls_back_to_full_state() {
1884 use crate::{incremental::IncrementalUpdateCollector, stores::StoreType};
1885
1886 let stores = Arc::new(StateStores::with_self_name("node1".to_string()));
1887
1888 let mut tree = TreeState::new("model1".to_string());
1892 tree.add_operation(make_insert_op("direct", "http://w:8000"));
1893 let serialized = tree.to_bytes().unwrap();
1894 stores
1895 .tree_configs
1896 .insert("tree:model1".to_string(), serialized);
1897 stores.advance_tree_version("tree:model1", tree.version);
1899 stores.bump_tree_version("tree:model1");
1901
1902 let collector = IncrementalUpdateCollector::new(stores.clone(), "node1".to_string());
1903 let updates = collector.collect_updates_for_store(StoreType::Policy);
1904
1905 assert!(!updates.is_empty(), "expected at least one policy update");
1906
1907 let tree_update = updates
1908 .iter()
1909 .find(|u| u.key.starts_with("tree:"))
1910 .expect("expected a tree key update");
1911
1912 let policy_state: PolicyState =
1914 bincode::deserialize(&tree_update.value).expect("deserialize PolicyState");
1915 assert_eq!(
1916 policy_state.policy_type, "tree_state_lz4",
1917 "expected full state fallback, got delta"
1918 );
1919 }
1920
1921 #[test]
1926 fn test_receiver_dispatches_delta_vs_full() {
1927 let manager = create_test_manager("node1".to_string());
1928
1929 let delta = make_delta(
1931 "model_d",
1932 vec![make_insert_op("delta_op", "http://w:8000")],
1933 0,
1934 1,
1935 );
1936 manager.apply_remote_tree_delta(delta, Some("remote".to_string()));
1937
1938 let tree_d = manager.get_tree_state("model_d").unwrap();
1939 assert_eq!(tree_d.version, 1);
1940 assert_eq!(tree_d.operations.len(), 1);
1941
1942 let mut full_tree = TreeState::new("model_f".to_string());
1944 full_tree.add_operation(make_insert_op("full_op1", "http://w1:8000"));
1945 full_tree.add_operation(make_insert_op("full_op2", "http://w2:8000"));
1946
1947 manager.apply_remote_tree_operation(
1948 "model_f".to_string(),
1949 full_tree,
1950 Some("remote".to_string()),
1951 );
1952
1953 let tree_f = manager.get_tree_state("model_f").unwrap();
1954 assert_eq!(tree_f.version, 2);
1955 assert_eq!(tree_f.operations.len(), 2);
1956 }
1957
1958 #[test]
1959 fn test_delta_backward_compatible_full_state() {
1960 let manager = create_test_manager("node1".to_string());
1961
1962 let mut old_format_tree = TreeState::new("legacy_model".to_string());
1964 old_format_tree.add_operation(make_insert_op("old1", "http://w:8000"));
1965 old_format_tree.add_operation(make_insert_op("old2", "http://w:8000"));
1966
1967 manager.apply_remote_tree_operation(
1969 "legacy_model".to_string(),
1970 old_format_tree.clone(),
1971 Some("old_node".to_string()),
1972 );
1973
1974 let tree = manager.get_tree_state("legacy_model").unwrap();
1975 assert_eq!(tree.version, old_format_tree.version);
1976 assert_eq!(tree.operations.len(), 2);
1977 assert_eq!(tree.model_id, "legacy_model");
1978 }
1979
1980 #[test]
1983 fn test_delta_reconnect_falls_back_to_full_state() {
1984 use crate::{incremental::IncrementalUpdateCollector, stores::StoreType};
1988
1989 let stores = Arc::new(StateStores::with_self_name("node1".to_string()));
1990
1991 let mut tree = TreeState::new("model1".to_string());
1994 for i in 0..10 {
1995 tree.add_operation(make_insert_op(&format!("op_{i}"), "http://w:8000"));
1996 }
1997 let serialized = tree.to_bytes().unwrap();
1998 stores
1999 .tree_configs
2000 .insert("tree:model1".to_string(), serialized);
2001 stores.advance_tree_version("tree:model1", tree.version);
2002 stores.bump_tree_version("tree:model1");
2003
2004 stores.tenant_delta_inserts.remove("model1");
2006 stores.tenant_delta_evictions.remove("model1");
2007
2008 let collector = IncrementalUpdateCollector::new(stores.clone(), "node1".to_string());
2010 let updates = collector.collect_updates_for_store(StoreType::Policy);
2011
2012 assert!(!updates.is_empty(), "expected at least one update");
2013
2014 let tree_update = updates
2015 .iter()
2016 .find(|u| u.key.starts_with("tree:"))
2017 .expect("expected a tree key update");
2018
2019 let policy_state: PolicyState =
2020 bincode::deserialize(&tree_update.value).expect("deserialize PolicyState");
2021 assert_eq!(
2022 policy_state.policy_type, "tree_state_lz4",
2023 "expected full state fallback when tenant delta buffers are empty, got: {}",
2024 policy_state.policy_type
2025 );
2026 }
2027
2028 #[test]
2034 fn test_delta_out_of_order_delivery() {
2035 let manager = create_test_manager("node1".to_string());
2039
2040 let ops_1_to_5: Vec<_> = (1..=5)
2041 .map(|i| make_insert_op(&format!("op_{i}"), "http://w:8000"))
2042 .collect();
2043 let delta1 = make_delta("model1", ops_1_to_5, 0, 5);
2044 manager.apply_remote_tree_delta(delta1, Some("peer_a".to_string()));
2045
2046 let tree = manager.get_tree_state("model1").unwrap();
2047 assert_eq!(tree.version, 5);
2048 assert_eq!(tree.operations.len(), 5);
2049
2050 let ops_1_to_3: Vec<_> = (1..=3)
2052 .map(|i| make_insert_op(&format!("late_op_{i}"), "http://w:8000"))
2053 .collect();
2054 let delta2 = make_delta("model1", ops_1_to_3, 0, 3);
2055 manager.apply_remote_tree_delta(delta2, Some("peer_b".to_string()));
2056
2057 let tree_after = manager.get_tree_state("model1").unwrap();
2059 assert_eq!(tree_after.version, 5);
2060 assert_eq!(tree_after.operations.len(), 5);
2061 }
2062
2063 #[test]
2064 fn test_delta_duplicate_delivery() {
2065 let manager = create_test_manager("node1".to_string());
2068
2069 let ops = vec![
2070 make_insert_op("dup1", "http://w:8000"),
2071 make_insert_op("dup2", "http://w:8000"),
2072 ];
2073 let delta = make_delta("model1", ops.clone(), 0, 2);
2074
2075 manager.apply_remote_tree_delta(delta.clone(), Some("peer".to_string()));
2076 let tree1 = manager.get_tree_state("model1").unwrap();
2077 assert_eq!(tree1.version, 2);
2078 assert_eq!(tree1.operations.len(), 2);
2079
2080 manager.apply_remote_tree_delta(delta, Some("peer".to_string()));
2082 let tree2 = manager.get_tree_state("model1").unwrap();
2083 assert_eq!(
2084 tree2.version, 2,
2085 "duplicate delta should not change version"
2086 );
2087 assert_eq!(
2088 tree2.operations.len(),
2089 2,
2090 "duplicate delta should not add extra ops"
2091 );
2092 }
2093
2094 #[test]
2095 fn test_delta_split_brain_recovery() {
2096 let manager = create_test_manager("nodeB".to_string());
2106
2107 let mut seed = TreeState::new("model1".to_string());
2109 for i in 0..5 {
2110 seed.add_operation(make_insert_op(&format!("seed_{i}"), "http://w:8000"));
2111 }
2112 assert_eq!(seed.version, 5);
2113 manager.apply_remote_tree_operation("model1".to_string(), seed, Some("origin".to_string()));
2114
2115 let tree_b = manager.get_tree_state("model1").unwrap();
2117 assert_eq!(tree_b.version, 5);
2118
2119 let a_ops: Vec<_> = (0..3)
2121 .map(|i| make_insert_op(&format!("A_op_{i}"), "http://wA:8000"))
2122 .collect();
2123 let delta_a = make_delta("model1", a_ops, 5, 8);
2124 manager.apply_remote_tree_delta(delta_a, Some("nodeA".to_string()));
2125
2126 let tree_merged = manager.get_tree_state("model1").unwrap();
2128 assert_eq!(
2129 tree_merged.version, 8,
2130 "tree_configs version should be 8 (seed 5 + 3 delta ops), got {}",
2131 tree_merged.version
2132 );
2133 assert_eq!(tree_merged.operations.len(), 8);
2134 }
2135
2136 #[test]
2145 fn test_delta_concurrent_write_and_collect() {
2146 use crate::{incremental::IncrementalUpdateCollector, stores::StoreType};
2151
2152 let stores = Arc::new(StateStores::with_self_name("node1".to_string()));
2153 let manager = Arc::new(MeshSyncManager::new(stores.clone(), "node1".to_string()));
2154
2155 let manager_clone = manager.clone();
2156 let writer = std::thread::spawn(move || {
2157 for i in 0..100 {
2158 manager_clone
2159 .sync_tree_operation(
2160 "model1".to_string(),
2161 make_insert_op(&format!("concurrent_op_{i}"), "http://w:8000"),
2162 )
2163 .unwrap();
2164 }
2165 });
2166
2167 let mut collected_any = false;
2169 for _ in 0..10 {
2170 let collector = IncrementalUpdateCollector::new(stores.clone(), "node1".to_string());
2171 let updates = collector.collect_updates_for_store(StoreType::Policy);
2172 for update in &updates {
2173 if update.key.starts_with("tree:") {
2174 let policy_state: PolicyState =
2176 bincode::deserialize(&update.value).expect("data should not be corrupted");
2177 assert!(
2178 policy_state.policy_type == "tenant_delta"
2179 || policy_state.policy_type == "tree_state_delta"
2180 || policy_state.policy_type == "tree_state_lz4"
2181 || policy_state.policy_type == "tree_state"
2182 );
2183
2184 if policy_state.policy_type == "tree_state_delta" {
2185 let delta = TreeStateDelta::from_bytes(&policy_state.config)
2186 .expect("delta should deserialize cleanly");
2187 assert!(!delta.operations.is_empty());
2188 } else {
2189 let tree = TreeState::from_bytes(&policy_state.config)
2190 .expect("tree state should deserialize cleanly");
2191 assert!(!tree.operations.is_empty());
2192 }
2193 collected_any = true;
2194 }
2195 }
2196 }
2197
2198 writer.join().unwrap();
2199
2200 let collector = IncrementalUpdateCollector::new(stores.clone(), "node1".to_string());
2202 let final_updates = collector.collect_updates_for_store(StoreType::Policy);
2203 if !collected_any {
2204 assert!(
2206 !final_updates.is_empty(),
2207 "final collection after writer finished should have updates"
2208 );
2209 }
2210 }
2211
2212 #[test]
2222 fn test_delta_with_remove_operations() {
2223 let manager = create_test_manager("node1".to_string());
2225
2226 let ops = vec![
2227 make_insert_op("text1", "http://w1:8000"),
2228 TreeOperation::Remove(TreeRemoveOp {
2229 tenant: "http://w1:8000".to_string(),
2230 }),
2231 make_insert_op("text2", "http://w2:8000"),
2232 ];
2233
2234 let delta = make_delta("model1", ops, 0, 3);
2235 manager.apply_remote_tree_delta(delta, Some("peer".to_string()));
2236
2237 let tree = manager.get_tree_state("model1").unwrap();
2238 assert_eq!(tree.version, 3);
2239 assert_eq!(tree.operations.len(), 3);
2240 assert!(matches!(
2242 tree.operations[1],
2243 TreeOperation::Remove(TreeRemoveOp { .. })
2244 ));
2245 }
2246
2247 #[test]
2248 fn test_delta_multiple_models_independent() {
2249 let manager = create_test_manager("node1".to_string());
2252
2253 let delta_a = make_delta(
2254 "model_a",
2255 vec![make_insert_op("a_op", "http://w:8000")],
2256 0,
2257 1,
2258 );
2259 let delta_b = make_delta(
2260 "model_b",
2261 vec![
2262 make_insert_op("b_op1", "http://w:8000"),
2263 make_insert_op("b_op2", "http://w:8000"),
2264 ],
2265 0,
2266 2,
2267 );
2268
2269 manager.apply_remote_tree_delta(delta_a, None);
2270 manager.apply_remote_tree_delta(delta_b, None);
2271
2272 let tree_a = manager.get_tree_state("model_a").unwrap();
2273 let tree_b = manager.get_tree_state("model_b").unwrap();
2274
2275 assert_eq!(tree_a.version, 1);
2276 assert_eq!(tree_a.operations.len(), 1);
2277 assert_eq!(tree_b.version, 2);
2278 assert_eq!(tree_b.operations.len(), 2);
2279 }
2280
2281 #[test]
2282 fn test_delta_incremental_chain() {
2283 let manager = create_test_manager("node1".to_string());
2286
2287 let delta1 = make_delta(
2288 "model1",
2289 (0..3)
2290 .map(|i| make_insert_op(&format!("batch1_op_{i}"), "http://w:8000"))
2291 .collect(),
2292 0,
2293 3,
2294 );
2295 manager.apply_remote_tree_delta(delta1, None);
2296 let tree = manager.get_tree_state("model1").unwrap();
2297 assert_eq!(tree.version, 3);
2298
2299 let delta2 = make_delta(
2300 "model1",
2301 (0..2)
2302 .map(|i| make_insert_op(&format!("batch2_op_{i}"), "http://w:8000"))
2303 .collect(),
2304 3,
2305 5,
2306 );
2307 manager.apply_remote_tree_delta(delta2, None);
2308 let tree = manager.get_tree_state("model1").unwrap();
2309 assert_eq!(tree.version, 5);
2310
2311 let delta3 = make_delta(
2312 "model1",
2313 (0..3)
2314 .map(|i| make_insert_op(&format!("batch3_op_{i}"), "http://w:8000"))
2315 .collect(),
2316 5,
2317 8,
2318 );
2319 manager.apply_remote_tree_delta(delta3, None);
2320 let tree = manager.get_tree_state("model1").unwrap();
2321 assert_eq!(tree.version, 8);
2322 assert_eq!(tree.operations.len(), 8);
2323 }
2324
2325 #[test]
2326 fn test_delta_token_key_serialization_round_trip() {
2327 use crate::tree_ops::TreeInsertOp;
2330
2331 let tokens = vec![42u32, 100, 200, 999, u32::MAX];
2332 let ops = vec![TreeOperation::Insert(TreeInsertOp {
2333 key: TreeKey::Tokens(tokens.clone()),
2334 tenant: "http://w:8000".to_string(),
2335 })];
2336
2337 let delta = TreeStateDelta {
2338 model_id: "token_model".to_string(),
2339 operations: ops,
2340 base_version: 0,
2341 new_version: 1,
2342 };
2343
2344 let bytes = delta.to_bytes().unwrap();
2346 let restored = TreeStateDelta::from_bytes(&bytes).unwrap();
2347 assert_eq!(restored.operations.len(), 1);
2348
2349 match &restored.operations[0] {
2350 TreeOperation::Insert(op) => {
2351 assert_eq!(op.key, TreeKey::Tokens(tokens));
2352 }
2353 TreeOperation::Remove(_) => panic!("expected Insert operation"),
2354 }
2355
2356 let manager = create_test_manager("node1".to_string());
2358 manager.apply_remote_tree_delta(restored, None);
2359
2360 let tree = manager.get_tree_state("token_model").unwrap();
2361 assert_eq!(tree.version, 1);
2362 assert_eq!(tree.operations.len(), 1);
2363 }
2364}