1use std::{
10 collections::{BTreeMap, BTreeSet},
11 marker::PhantomData,
12 sync::{
13 atomic::{AtomicU64, Ordering},
14 Arc,
15 },
16};
17
18use dashmap::DashMap;
19use parking_lot::RwLock;
20use serde::{de::DeserializeOwned, Deserialize, Serialize};
21use tracing::debug;
22
23use super::{
24 consistent_hash::ConsistentHashRing,
25 crdt_kv::{CrdtOrMap, Operation, OperationLog, ReplicaId},
26 tree_ops::TreeOperation,
27};
28
29trait CrdtValue: Serialize + DeserializeOwned + Clone {
38 fn to_bytes(&self) -> Result<Vec<u8>, CrdtSerError> {
39 bincode::serialize(self).map_err(CrdtSerError)
40 }
41
42 fn from_bytes(bytes: &[u8]) -> Result<Self, CrdtSerError> {
43 bincode::deserialize(bytes).map_err(CrdtSerError)
44 }
45}
46
47#[derive(Debug)]
49pub struct CrdtSerError(Box<bincode::ErrorKind>);
50
51impl std::fmt::Display for CrdtSerError {
52 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
53 write!(f, "CRDT serialization error: {}", self.0)
54 }
55}
56
57impl std::error::Error for CrdtSerError {}
58
59impl<T> CrdtValue for T where T: Serialize + DeserializeOwned + Clone {}
61
62#[derive(Clone)]
68struct CrdtStore<T> {
69 inner: CrdtOrMap,
70 _phantom: PhantomData<T>,
71}
72
73impl<T> std::fmt::Debug for CrdtStore<T> {
74 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
75 f.debug_struct("CrdtStore")
76 .field("inner", &"<CrdtOrMap>")
77 .finish()
78 }
79}
80
81impl<T: CrdtValue> CrdtStore<T> {
82 fn new() -> Self {
83 Self {
84 inner: CrdtOrMap::new(),
85 _phantom: PhantomData,
86 }
87 }
88
89 fn generation(&self) -> u64 {
91 self.inner.generation()
92 }
93
94 fn get(&self, key: &str) -> Option<T> {
95 self.inner.get(key).and_then(|bytes| {
96 T::from_bytes(&bytes)
97 .map_err(|err| {
98 debug!(error = %err, %key, "Failed to deserialize CRDT value");
99 })
100 .ok()
101 })
102 }
103
104 fn insert(&self, key: String, value: T) -> Result<Option<T>, CrdtSerError> {
105 let bytes = value.to_bytes().map_err(|err| {
106 debug!(error = %err, %key, "Failed to serialize CRDT value");
107 err
108 })?;
109
110 Ok(self.inner.insert(key, bytes).and_then(|old_bytes| {
111 T::from_bytes(&old_bytes)
112 .map_err(|err| {
113 debug!(error = %err, "Failed to deserialize old CRDT value");
114 })
115 .ok()
116 }))
117 }
118
119 fn remove(&self, key: &str) -> Option<T> {
120 self.inner.remove(key).and_then(|bytes| {
121 T::from_bytes(&bytes)
122 .map_err(|err| {
123 debug!(error = %err, %key, "Failed to deserialize removed CRDT value");
124 })
125 .ok()
126 })
127 }
128
129 fn update<F>(&self, key: String, updater: F) -> Result<Option<T>, CrdtSerError>
130 where
131 F: FnOnce(Option<T>) -> T,
132 {
133 let updated_bytes = self.inner.try_upsert(key, |current_bytes| {
134 let current = current_bytes.and_then(|bytes| {
135 T::from_bytes(bytes)
136 .map_err(|err| {
137 debug!(error = %err, "Failed to deserialize current CRDT value");
138 })
139 .ok()
140 });
141
142 let updated = updater(current);
143 updated.to_bytes()
144 })?;
145
146 Ok(T::from_bytes(&updated_bytes)
147 .map_err(|err| {
148 debug!(error = %err, "Failed to deserialize updated CRDT value");
149 err
150 })
151 .ok())
152 }
153
154 fn update_if<F>(&self, key: String, updater: F) -> Result<(Option<T>, bool), CrdtSerError>
155 where
156 F: FnOnce(Option<T>) -> Option<T>,
157 {
158 let (updated_bytes, changed) = self.inner.try_upsert_if(key, |current_bytes| {
159 let current = current_bytes.and_then(|bytes| {
160 T::from_bytes(bytes)
161 .map_err(|err| {
162 debug!(error = %err, "Failed to deserialize current CRDT value");
163 })
164 .ok()
165 });
166
167 let updated = updater(current);
168 updated.map(|value| value.to_bytes()).transpose()
169 })?;
170
171 let value = T::from_bytes(&updated_bytes)
172 .map_err(|err| {
173 debug!(error = %err, "Failed to deserialize conditionally updated CRDT value");
174 err
175 })
176 .ok();
177
178 Ok((value, changed))
179 }
180
181 fn len(&self) -> usize {
182 self.inner.len()
183 }
184
185 fn is_empty(&self) -> bool {
186 self.len() == 0
187 }
188
189 fn merge(&self, log: &OperationLog) {
190 self.inner.merge(log);
191 }
192
193 fn get_operation_log(&self) -> OperationLog {
194 self.inner.get_operation_log()
195 }
196
197 fn all(&self) -> BTreeMap<String, T> {
198 self.inner
199 .all()
200 .into_iter()
201 .filter_map(|(k, v)| {
202 let key_for_log = k.clone();
203 T::from_bytes(&v)
204 .map(|val| (k, val))
205 .map_err(|err| {
206 debug!(error = %err, key = %key_for_log, "Failed to deserialize CRDT value in all()");
207 })
208 .ok()
209 })
210 .collect()
211 }
212
213 fn gc_tombstones(&self) -> usize {
215 self.inner.gc_tombstones()
216 }
217}
218
219impl<T: CrdtValue> Default for CrdtStore<T> {
220 fn default() -> Self {
221 Self::new()
222 }
223}
224
225#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
227pub enum StoreType {
228 Membership,
229 App,
230 Worker,
231 Policy,
232 RateLimit,
233}
234
235impl StoreType {
236 pub fn as_str(self) -> &'static str {
237 match self {
238 StoreType::Membership => "membership",
239 StoreType::App => "app",
240 StoreType::Worker => "worker",
241 StoreType::Policy => "policy",
242 StoreType::RateLimit => "rate_limit",
243 }
244 }
245
246 pub fn to_proto(self) -> i32 {
248 use super::service::gossip::StoreType as ProtoStoreType;
249 match self {
250 StoreType::Membership => ProtoStoreType::Membership as i32,
251 StoreType::App => ProtoStoreType::App as i32,
252 StoreType::Worker => ProtoStoreType::Worker as i32,
253 StoreType::Policy => ProtoStoreType::Policy as i32,
254 StoreType::RateLimit => ProtoStoreType::RateLimit as i32,
255 }
256 }
257
258 pub fn from_proto(proto_value: i32) -> Self {
260 match proto_value {
261 0 => StoreType::Membership,
262 1 => StoreType::App,
263 2 => StoreType::Worker,
264 3 => StoreType::Policy,
265 4 => StoreType::RateLimit,
266 unknown => {
267 tracing::warn!(
268 proto_value = unknown,
269 "Unknown StoreType proto value, defaulting to Membership"
270 );
271 StoreType::Membership
272 }
273 }
274 }
275}
276
277#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash, Default)]
279pub struct MembershipState {
280 pub name: String,
281 pub address: String,
282 pub status: i32, pub version: u64,
284 pub metadata: BTreeMap<String, Vec<u8>>,
285}
286
287#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash, Default)]
289pub struct AppState {
290 pub key: String,
291 pub value: Vec<u8>, pub version: u64,
293}
294
295#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
297pub struct RateLimitConfig {
298 pub limit_per_second: u64,
299}
300
301pub const GLOBAL_RATE_LIMIT_KEY: &str = "global_rate_limit";
303pub const GLOBAL_RATE_LIMIT_COUNTER_KEY: &str = "global";
305
306#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
313pub struct WorkerState {
314 pub worker_id: String,
315 pub model_id: String,
316 pub url: String,
317 pub health: bool,
318 pub load: f64,
319 pub version: u64,
320 #[serde(default)]
323 pub spec: Vec<u8>,
324}
325
326impl std::hash::Hash for WorkerState {
328 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
329 self.worker_id.hash(state);
330 self.model_id.hash(state);
331 self.url.hash(state);
332 self.health.hash(state);
333 (self.load as i64).hash(state);
334 self.version.hash(state);
335 self.spec.hash(state);
336 }
337}
338
339impl Eq for WorkerState {}
341
342#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash, Default)]
344pub struct PolicyState {
345 pub model_id: String,
346 pub policy_type: String,
347 pub config: Vec<u8>, pub version: u64,
349}
350
351pub fn policy_key(model_id: &str) -> String {
353 format!("policy:{model_id}")
354}
355
356pub fn tree_state_key(model_id: &str) -> String {
358 format!("tree:{model_id}")
359}
360
361macro_rules! define_state_store {
362 ($store_name:ident, $value_type:ty) => {
363 #[derive(Debug, Clone)]
364 pub struct $store_name {
365 inner: CrdtStore<$value_type>,
366 }
367
368 impl $store_name {
369 pub fn new() -> Self {
370 Self {
371 inner: CrdtStore::new(),
372 }
373 }
374
375 pub fn generation(&self) -> u64 {
377 self.inner.generation()
378 }
379
380 pub fn get(&self, key: &str) -> Option<$value_type> {
381 self.inner.get(key)
382 }
383
384 pub fn insert(
385 &self,
386 key: String,
387 value: $value_type,
388 ) -> Result<Option<$value_type>, CrdtSerError> {
389 self.inner.insert(key, value)
390 }
391
392 pub fn remove(&self, key: &str) {
393 self.inner.remove(key);
394 }
395
396 pub fn merge(&self, log: &OperationLog) {
397 self.inner.merge(log);
398 }
399
400 pub fn get_operation_log(&self) -> OperationLog {
401 self.inner.get_operation_log()
402 }
403
404 pub fn update<F>(
405 &self,
406 key: String,
407 updater: F,
408 ) -> Result<Option<$value_type>, CrdtSerError>
409 where
410 F: FnOnce(Option<$value_type>) -> $value_type,
411 {
412 self.inner.update(key, updater)
413 }
414
415 pub fn update_if<F>(
416 &self,
417 key: String,
418 updater: F,
419 ) -> Result<(Option<$value_type>, bool), CrdtSerError>
420 where
421 F: FnOnce(Option<$value_type>) -> Option<$value_type>,
422 {
423 self.inner.update_if(key, updater)
424 }
425
426 pub fn len(&self) -> usize {
427 self.inner.len()
428 }
429
430 pub fn is_empty(&self) -> bool {
431 self.inner.is_empty()
432 }
433
434 pub fn all(&self) -> BTreeMap<String, $value_type> {
435 self.inner.all()
436 }
437
438 pub fn gc_tombstones(&self) -> usize {
440 self.inner.gc_tombstones()
441 }
442 }
443
444 impl Default for $store_name {
445 fn default() -> Self {
446 Self::new()
447 }
448 }
449 };
450}
451
452define_state_store!(MembershipStore, MembershipState);
453define_state_store!(AppStore, AppState);
454define_state_store!(WorkerStore, WorkerState);
455define_state_store!(PolicyStore, PolicyState);
456
457#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
463struct CounterValue {
464 value: i64,
465}
466
467#[derive(Debug, Clone)]
469pub struct RateLimitStore {
470 counters: CrdtStore<CounterValue>,
471 hash_ring: Arc<RwLock<ConsistentHashRing>>,
472 self_name: String,
473 actor_replica_ids: Arc<DashMap<String, ReplicaId>>,
474}
475
476impl RateLimitStore {
477 const SHARD_SEPARATOR: &'static str = "::actor:";
478
479 pub fn new(self_name: String) -> Self {
480 Self {
481 counters: CrdtStore::new(),
482 hash_ring: Arc::new(RwLock::new(ConsistentHashRing::new())),
483 self_name,
484 actor_replica_ids: Arc::new(DashMap::new()),
485 }
486 }
487
488 fn shard_key(key: &str, actor: &str) -> String {
489 format!("{key}{}{actor}", Self::SHARD_SEPARATOR)
490 }
491
492 fn split_shard_key(shard_key: &str) -> Option<(&str, &str)> {
493 shard_key.rsplit_once(Self::SHARD_SEPARATOR)
494 }
495
496 fn base_key(shard_key: &str) -> &str {
497 Self::split_shard_key(shard_key).map_or(shard_key, |(base, _)| base)
498 }
499
500 fn replica_id_for_actor(&self, actor: &str) -> ReplicaId {
501 if let Ok(replica_id) = ReplicaId::from_string(actor) {
502 return replica_id;
503 }
504
505 *self.actor_replica_ids.entry(actor.to_string()).or_default()
506 }
507
508 fn aggregate_counter(&self, key: &str) -> Option<i64> {
509 let all_counters = self.counters.all();
510 let mut has_shard = false;
511 let mut total = 0;
512
513 for (shard_key, counter) in all_counters {
514 if Self::base_key(&shard_key) == key {
515 has_shard = true;
516 total += counter.value;
517 }
518 }
519
520 if has_shard {
521 Some(total)
522 } else {
523 None
524 }
525 }
526
527 pub fn update_membership(&self, nodes: &[String]) {
529 let mut ring = self.hash_ring.write();
530 ring.update_membership(nodes);
531 debug!("Updated rate-limit hash ring with {} nodes", nodes.len());
532 }
533
534 pub fn is_owner(&self, key: &str) -> bool {
536 let ring = self.hash_ring.read();
537 ring.is_owner(key, &self.self_name)
538 }
539
540 pub fn get_owners(&self, key: &str) -> Vec<String> {
542 let ring = self.hash_ring.read();
543 ring.get_owners(key)
544 }
545
546 #[expect(dead_code)]
548 fn get_or_create_counter_internal(&self, key: String) -> Option<i64> {
549 if !self.is_owner(&key) {
550 return None;
551 }
552
553 let shard_key = Self::shard_key(&key, &self.self_name);
554 if let Some(counter) = self.counters.get(&shard_key) {
555 return Some(counter.value);
556 }
557
558 let _ = self.counters.insert(shard_key, CounterValue::default());
559 Some(0)
560 }
561
562 pub fn get_counter(&self, key: &str) -> Option<i64> {
563 if !self.is_owner(key) {
564 return None;
565 }
566 self.aggregate_counter(key)
567 }
568
569 pub fn all_shards(&self) -> Vec<(String, String, i64)> {
571 self.counters
572 .all()
573 .into_iter()
574 .filter_map(|(shard_key, counter)| {
575 Self::split_shard_key(&shard_key).map(|(base_key, actor)| {
576 (base_key.to_string(), actor.to_string(), counter.value)
577 })
578 })
579 .collect()
580 }
581
582 pub fn inc(&self, key: String, actor: String, delta: i64) {
584 if !self.is_owner(&key) {
585 return;
586 }
587
588 let shard_key = Self::shard_key(&key, &actor);
589 if let Err(err) = self.counters.update(shard_key, |current| CounterValue {
590 value: current.map_or(delta, |existing| existing.value + delta),
591 }) {
592 debug!(error = %err, %key, %actor, "Failed to update rate-limit counter shard");
593 }
594 }
595
596 pub fn set_counter_snapshot(&self, key: String, actor: String, counter_value: i64) {
598 if !self.is_owner(&key) {
599 return;
600 }
601
602 let shard_key = Self::shard_key(&key, &actor);
603 if let Err(err) = self.counters.insert(
604 shard_key,
605 CounterValue {
606 value: counter_value,
607 },
608 ) {
609 debug!(error = %err, %key, %actor, "Failed to set rate-limit counter snapshot");
610 }
611 }
612
613 pub fn snapshot_payload_for_counter_value(
617 key: String,
618 actor: String,
619 counter_value: i64,
620 ) -> Option<(String, Vec<u8>)> {
621 let bytes = match (CounterValue {
622 value: counter_value,
623 })
624 .to_bytes()
625 {
626 Ok(bytes) => bytes,
627 Err(err) => {
628 debug!(error = %err, "Failed to serialize rate-limit counter snapshot");
629 return None;
630 }
631 };
632
633 let shard_key = Self::shard_key(&key, &actor);
634 Some((shard_key, bytes))
635 }
636
637 pub fn apply_counter_snapshot_payload(
638 &self,
639 shard_key: String,
640 actor: &str,
641 timestamp: u64,
642 payload: &[u8],
643 ) {
644 let Some((base_key, _)) = Self::split_shard_key(&shard_key) else {
645 debug!(%shard_key, "Invalid rate-limit shard key in snapshot payload");
646 return;
647 };
648
649 if !self.is_owner(base_key) {
650 return;
651 }
652
653 if let Err(err) = CounterValue::from_bytes(payload) {
654 debug!(error = %err, %shard_key, "Failed to decode rate-limit snapshot payload");
655 return;
656 }
657
658 let replica_id = self.replica_id_for_actor(actor);
659 let mut log = OperationLog::new();
660 log.append(Operation::insert(
661 shard_key,
662 payload.to_vec(),
663 timestamp,
664 replica_id,
665 ));
666 self.counters.merge(&log);
667 }
668
669 pub fn value(&self, key: &str) -> Option<i64> {
671 self.aggregate_counter(key)
672 }
673
674 pub fn merge(&self, log: &OperationLog) {
676 self.counters.merge(log);
677 }
678
679 pub fn get_operation_log(&self) -> OperationLog {
681 self.counters.get_operation_log()
682 }
683
684 pub fn keys(&self) -> Vec<String> {
686 self.counters
687 .all()
688 .keys()
689 .map(|key| Self::base_key(key).to_string())
690 .collect::<BTreeSet<_>>()
691 .into_iter()
692 .collect()
693 }
694
695 pub fn check_ownership_transfer(&self, failed_nodes: &[String]) -> Vec<String> {
697 let mut affected_keys = Vec::new();
698 let ring = self.hash_ring.read();
699 for key in self.keys() {
700 let owners = ring.get_owners(&key);
701 if owners.iter().any(|owner| failed_nodes.contains(owner))
702 && ring.is_owner(&key, &self.self_name)
703 {
704 affected_keys.push(key);
705 }
706 }
707
708 affected_keys
709 }
710}
711
712impl Default for RateLimitStore {
713 fn default() -> Self {
714 Self::new("default".to_string())
715 }
716}
717
718#[derive(Debug, Clone)]
720pub struct StateStores {
721 pub membership: MembershipStore,
722 pub app: AppStore,
723 pub worker: WorkerStore,
724 pub policy: PolicyStore,
725 pub rate_limit: RateLimitStore,
726 pub tree_ops_pending: DashMap<String, Vec<TreeOperation>>,
729 pub tree_versions: DashMap<String, Arc<AtomicU64>>,
734 pub tree_generation: Arc<AtomicU64>,
738 pub tree_configs: DashMap<String, Vec<u8>>,
742 pub tenant_delta_inserts: DashMap<String, Vec<crate::tree_ops::TenantInsert>>,
746 pub tenant_delta_evictions: DashMap<String, Vec<crate::tree_ops::TenantEvict>>,
748}
749
750impl StateStores {
751 pub fn new() -> Self {
752 Self {
753 membership: MembershipStore::new(),
754 app: AppStore::new(),
755 worker: WorkerStore::new(),
756 policy: PolicyStore::new(),
757 rate_limit: RateLimitStore::new("default".to_string()),
758 tree_ops_pending: DashMap::new(),
759 tree_versions: DashMap::new(),
760 tree_generation: Arc::new(AtomicU64::new(0)),
761 tree_configs: DashMap::new(),
762 tenant_delta_inserts: DashMap::new(),
763 tenant_delta_evictions: DashMap::new(),
764 }
765 }
766
767 pub fn with_self_name(self_name: String) -> Self {
768 Self {
769 membership: MembershipStore::new(),
770 app: AppStore::new(),
771 worker: WorkerStore::new(),
772 policy: PolicyStore::new(),
773 rate_limit: RateLimitStore::new(self_name),
774 tree_ops_pending: DashMap::new(),
775 tree_versions: DashMap::new(),
776 tree_generation: Arc::new(AtomicU64::new(0)),
777 tree_configs: DashMap::new(),
778 tenant_delta_inserts: DashMap::new(),
779 tenant_delta_evictions: DashMap::new(),
780 }
781 }
782
783 pub fn tree_version(&self, key: &str) -> u64 {
785 self.tree_versions
786 .get(key)
787 .map(|v| v.load(Ordering::Acquire))
788 .unwrap_or(0)
789 }
790
791 pub fn bump_tree_version(&self, key: &str) -> u64 {
800 let version = self
801 .tree_versions
802 .entry(key.to_string())
803 .or_insert_with(|| {
804 let base = self
807 .tree_configs
808 .get(key)
809 .and_then(|bytes| {
810 super::tree_ops::TreeState::from_bytes(&bytes)
811 .ok()
812 .map(|ts| ts.version)
813 })
814 .unwrap_or(0);
815 Arc::new(AtomicU64::new(base))
816 })
817 .fetch_add(1, Ordering::Release)
818 + 1;
819 self.tree_generation.fetch_add(1, Ordering::Release);
820 version
821 }
822
823 pub fn advance_tree_version(&self, key: &str, version: u64) {
827 self.tree_versions
828 .entry(key.to_string())
829 .or_insert_with(|| Arc::new(AtomicU64::new(0)))
830 .fetch_max(version, Ordering::Release);
831 }
832
833 pub fn gc_tombstones(&self) -> usize {
836 self.membership.gc_tombstones()
837 + self.app.gc_tombstones()
838 + self.worker.gc_tombstones()
839 + self.policy.gc_tombstones()
840 }
841
842 pub fn gc_stale_tree_entries(&self) -> usize {
850 let before =
851 self.tree_configs.len() + self.tree_versions.len() + self.tree_ops_pending.len();
852
853 self.tree_versions.retain(|k, _| {
864 self.tree_configs.contains_key(k)
865 || self.tree_ops_pending.get(k).is_some_and(|v| !v.is_empty())
866 });
867
868 self.tree_ops_pending
870 .retain(|k, v| !v.is_empty() || self.tree_configs.contains_key(k));
871
872 self.tree_configs.retain(|k, _| {
875 self.tree_versions.contains_key(k)
876 || self.tree_ops_pending.get(k).is_some_and(|v| !v.is_empty())
877 });
878
879 let after =
880 self.tree_configs.len() + self.tree_versions.len() + self.tree_ops_pending.len();
881 before.saturating_sub(after)
882 }
883}
884
885impl Default for StateStores {
886 fn default() -> Self {
887 Self::new()
888 }
889}
890
891#[cfg(test)]
892mod tests {
893 use std::collections::BTreeMap;
894
895 use super::*;
896 use crate::service::gossip::NodeStatus;
897
898 #[test]
899 fn test_membership_store() {
900 let store = MembershipStore::new();
901 let key = "node1".to_string();
902 let state = MembershipState {
903 name: "node1".to_string(),
904 address: "127.0.0.1:8000".to_string(),
905 status: NodeStatus::Alive as i32,
906 version: 1,
907 metadata: BTreeMap::new(),
908 };
909
910 let _ = store.insert(key.clone(), state.clone());
911 assert_eq!(store.get(&key).unwrap().name, "node1");
912
913 store.remove(&key);
914 assert!(store.get(&key).is_none());
915 }
916
917 #[test]
918 fn test_app_store() {
919 let store = AppStore::new();
920 let key = "app_key1".to_string();
921 let state = AppState {
922 key: "app_key1".to_string(),
923 value: b"app_value".to_vec(),
924 version: 1,
925 };
926
927 let _ = store.insert(key.clone(), state.clone());
928 assert_eq!(store.get(&key).unwrap().key, "app_key1");
929 }
930
931 #[test]
932 fn test_worker_store() {
933 let store = WorkerStore::new();
934 let key = "worker1".to_string();
935 let state = WorkerState {
936 worker_id: "worker1".to_string(),
937 model_id: "model1".to_string(),
938 url: "http://localhost:8000".to_string(),
939 health: true,
940 load: 0.5,
941 version: 1,
942 spec: vec![],
943 };
944
945 let _ = store.insert(key.clone(), state.clone());
946 assert_eq!(store.get(&key).unwrap().worker_id, "worker1");
947 }
948
949 #[test]
950 fn test_policy_store() {
951 let store = PolicyStore::new();
952 let key = "policy:model1".to_string();
953 let state = PolicyState {
954 model_id: "model1".to_string(),
955 policy_type: "cache_aware".to_string(),
956 config: b"config_data".to_vec(),
957 version: 1,
958 };
959
960 let _ = store.insert(key.clone(), state.clone());
961 assert_eq!(store.get(&key).unwrap().model_id, "model1");
962 }
963
964 #[test]
965 fn test_rate_limit_store_update_membership() {
966 let store = RateLimitStore::new("node1".to_string());
967
968 store.update_membership(&[
969 "node1".to_string(),
970 "node2".to_string(),
971 "node3".to_string(),
972 ]);
973
974 let owners = store.get_owners("test_key");
975 assert_eq!(owners.len(), 3);
976 assert!(
977 owners.contains(&"node1".to_string())
978 || owners.contains(&"node2".to_string())
979 || owners.contains(&"node3".to_string())
980 );
981 }
982
983 #[test]
984 fn test_rate_limit_store_is_owner() {
985 let store = RateLimitStore::new("node1".to_string());
986
987 store.update_membership(&["node1".to_string()]);
988
989 let test_key = "test_key".to_string();
990 let is_owner = store.is_owner(&test_key);
991 assert!(is_owner);
993 }
994
995 #[test]
996 fn test_rate_limit_store_inc_only_owner() {
997 let store = RateLimitStore::new("node1".to_string());
998
999 store.update_membership(&["node1".to_string()]);
1000
1001 let test_key = "test_key".to_string();
1002 if store.is_owner(&test_key) {
1003 store.inc(test_key.clone(), "node1".to_string(), 5);
1004
1005 let value = store.value(&test_key);
1006 assert_eq!(value, Some(5));
1007 }
1008 }
1009
1010 #[test]
1011 fn test_rate_limit_store_inc_non_owner() {
1012 let store = RateLimitStore::new("node1".to_string());
1013
1014 store.update_membership(&["node2".to_string(), "node3".to_string()]);
1016
1017 let test_key = "test_key".to_string();
1018 if !store.is_owner(&test_key) {
1019 store.inc(test_key.clone(), "node1".to_string(), 5);
1020
1021 let value = store.value(&test_key);
1023 assert_eq!(value, None);
1024 }
1025 }
1026
1027 #[test]
1028 fn test_rate_limit_store_merge_counter() {
1029 let store1 = RateLimitStore::new("node1".to_string());
1030 let store2 = RateLimitStore::new("node2".to_string());
1031
1032 store1.update_membership(&["node1".to_string()]);
1033 store2.update_membership(&["node2".to_string()]);
1034
1035 let test_key = "test_key".to_string();
1036
1037 if store1.is_owner(&test_key) {
1039 store1.inc(test_key.clone(), "node1".to_string(), 10);
1040 }
1041
1042 if store2.is_owner(&test_key) {
1043 store2.inc(test_key.clone(), "node2".to_string(), 5);
1044 }
1045
1046 let log2 = store2.get_operation_log();
1048 store1.merge(&log2);
1049
1050 if store1.is_owner(&test_key) {
1052 let value = store1.value(&test_key);
1053 assert_eq!(value, Some(15));
1054 }
1055 }
1056
1057 #[test]
1058 fn test_rate_limit_store_check_ownership_transfer() {
1059 let store = RateLimitStore::new("node1".to_string());
1060
1061 store.update_membership(&[
1062 "node1".to_string(),
1063 "node2".to_string(),
1064 "node3".to_string(),
1065 ]);
1066
1067 let test_key = "test_key".to_string();
1068
1069 if store.is_owner(&test_key) {
1071 store.inc(test_key.clone(), "node1".to_string(), 10);
1072 }
1073
1074 let affected = store.check_ownership_transfer(&["node2".to_string()]);
1076 let _ = affected;
1078 }
1079
1080 #[test]
1081 fn test_rate_limit_store_keys() {
1082 let store = RateLimitStore::new("node1".to_string());
1083
1084 store.update_membership(&["node1".to_string()]);
1085
1086 let key1 = "key1".to_string();
1087 let key2 = "key2".to_string();
1088
1089 if store.is_owner(&key1) {
1090 store.inc(key1.clone(), "node1".to_string(), 1);
1091 }
1092
1093 if store.is_owner(&key2) {
1094 store.inc(key2.clone(), "node1".to_string(), 1);
1095 }
1096
1097 let keys = store.keys();
1098 let _ = keys;
1100 }
1101
1102 #[test]
1103 fn test_state_stores_new() {
1104 let stores = StateStores::new();
1105 assert_eq!(stores.membership.len(), 0);
1106 assert_eq!(stores.app.len(), 0);
1107 assert_eq!(stores.worker.len(), 0);
1108 assert_eq!(stores.policy.len(), 0);
1109 }
1110
1111 #[test]
1112 fn test_state_stores_with_self_name() {
1113 let stores = StateStores::with_self_name("test_node".to_string());
1114 let test_key = "test_key".to_string();
1116 stores
1117 .rate_limit
1118 .update_membership(&["test_node".to_string()]);
1119 assert!(stores.rate_limit.is_owner(&test_key));
1120 }
1121}