Skip to main content

smg_mesh/
stores.rs

1//! State stores for mesh cluster synchronization
2//!
3//! Four types of state stores:
4//! - MembershipStore: Router node membership
5//! - AppStore: Application configuration, rate-limiting rules, LB algorithms
6//! - WorkerStore: Worker status, load, health
7//! - PolicyStore: Routing policy internal state
8
9use std::{
10    collections::{BTreeMap, BTreeSet},
11    marker::PhantomData,
12    sync::{
13        atomic::{AtomicU64, Ordering},
14        Arc,
15    },
16};
17
18use dashmap::DashMap;
19use parking_lot::RwLock;
20use serde::{de::DeserializeOwned, Deserialize, Serialize};
21use tracing::debug;
22
23use super::{
24    consistent_hash::ConsistentHashRing,
25    crdt_kv::{CrdtOrMap, Operation, OperationLog, ReplicaId},
26    tree_ops::TreeOperation,
27};
28
29// ============================================================================
30// Type-Safe Serialization Layer - Transparent T ↔ Vec<u8> Conversion
31// ============================================================================
32
33/// Trait for CRDT-compatible value types.
34/// Uses bincode for compact binary serialization. This is critical for
35/// PolicyState which contains TreeState with token payloads — JSON
36/// serialization of Vec<u8> is ~4x larger than binary.
37trait CrdtValue: Serialize + DeserializeOwned + Clone {
38    fn to_bytes(&self) -> Result<Vec<u8>, CrdtSerError> {
39        bincode::serialize(self).map_err(CrdtSerError)
40    }
41
42    fn from_bytes(bytes: &[u8]) -> Result<Self, CrdtSerError> {
43        bincode::deserialize(bytes).map_err(CrdtSerError)
44    }
45}
46
47/// Serialization error wrapper for CRDT values.
48#[derive(Debug)]
49pub struct CrdtSerError(Box<bincode::ErrorKind>);
50
51impl std::fmt::Display for CrdtSerError {
52    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
53        write!(f, "CRDT serialization error: {}", self.0)
54    }
55}
56
57impl std::error::Error for CrdtSerError {}
58
59// Blanket implementation for all types that satisfy the bounds
60impl<T> CrdtValue for T where T: Serialize + DeserializeOwned + Clone {}
61
62// ============================================================================
63// Generic CRDT Store Wrapper - Type-Safe Interface Over CrdtOrMap
64// ============================================================================
65
66/// Generic store wrapper providing type-safe operations over CrdtOrMap
67#[derive(Clone)]
68struct CrdtStore<T> {
69    inner: CrdtOrMap,
70    _phantom: PhantomData<T>,
71}
72
73impl<T> std::fmt::Debug for CrdtStore<T> {
74    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
75        f.debug_struct("CrdtStore")
76            .field("inner", &"<CrdtOrMap>")
77            .finish()
78    }
79}
80
81impl<T: CrdtValue> CrdtStore<T> {
82    fn new() -> Self {
83        Self {
84            inner: CrdtOrMap::new(),
85            _phantom: PhantomData,
86        }
87    }
88
89    /// Mutation generation counter. Cheap check to skip unchanged stores.
90    fn generation(&self) -> u64 {
91        self.inner.generation()
92    }
93
94    fn get(&self, key: &str) -> Option<T> {
95        self.inner.get(key).and_then(|bytes| {
96            T::from_bytes(&bytes)
97                .map_err(|err| {
98                    debug!(error = %err, %key, "Failed to deserialize CRDT value");
99                })
100                .ok()
101        })
102    }
103
104    fn insert(&self, key: String, value: T) -> Result<Option<T>, CrdtSerError> {
105        let bytes = value.to_bytes().map_err(|err| {
106            debug!(error = %err, %key, "Failed to serialize CRDT value");
107            err
108        })?;
109
110        Ok(self.inner.insert(key, bytes).and_then(|old_bytes| {
111            T::from_bytes(&old_bytes)
112                .map_err(|err| {
113                    debug!(error = %err, "Failed to deserialize old CRDT value");
114                })
115                .ok()
116        }))
117    }
118
119    fn remove(&self, key: &str) -> Option<T> {
120        self.inner.remove(key).and_then(|bytes| {
121            T::from_bytes(&bytes)
122                .map_err(|err| {
123                    debug!(error = %err, %key, "Failed to deserialize removed CRDT value");
124                })
125                .ok()
126        })
127    }
128
129    fn update<F>(&self, key: String, updater: F) -> Result<Option<T>, CrdtSerError>
130    where
131        F: FnOnce(Option<T>) -> T,
132    {
133        let updated_bytes = self.inner.try_upsert(key, |current_bytes| {
134            let current = current_bytes.and_then(|bytes| {
135                T::from_bytes(bytes)
136                    .map_err(|err| {
137                        debug!(error = %err, "Failed to deserialize current CRDT value");
138                    })
139                    .ok()
140            });
141
142            let updated = updater(current);
143            updated.to_bytes()
144        })?;
145
146        Ok(T::from_bytes(&updated_bytes)
147            .map_err(|err| {
148                debug!(error = %err, "Failed to deserialize updated CRDT value");
149                err
150            })
151            .ok())
152    }
153
154    fn update_if<F>(&self, key: String, updater: F) -> Result<(Option<T>, bool), CrdtSerError>
155    where
156        F: FnOnce(Option<T>) -> Option<T>,
157    {
158        let (updated_bytes, changed) = self.inner.try_upsert_if(key, |current_bytes| {
159            let current = current_bytes.and_then(|bytes| {
160                T::from_bytes(bytes)
161                    .map_err(|err| {
162                        debug!(error = %err, "Failed to deserialize current CRDT value");
163                    })
164                    .ok()
165            });
166
167            let updated = updater(current);
168            updated.map(|value| value.to_bytes()).transpose()
169        })?;
170
171        let value = T::from_bytes(&updated_bytes)
172            .map_err(|err| {
173                debug!(error = %err, "Failed to deserialize conditionally updated CRDT value");
174                err
175            })
176            .ok();
177
178        Ok((value, changed))
179    }
180
181    fn len(&self) -> usize {
182        self.inner.len()
183    }
184
185    fn is_empty(&self) -> bool {
186        self.len() == 0
187    }
188
189    fn merge(&self, log: &OperationLog) {
190        self.inner.merge(log);
191    }
192
193    fn get_operation_log(&self) -> OperationLog {
194        self.inner.get_operation_log()
195    }
196
197    fn all(&self) -> BTreeMap<String, T> {
198        self.inner
199            .all()
200            .into_iter()
201            .filter_map(|(k, v)| {
202                let key_for_log = k.clone();
203                T::from_bytes(&v)
204                    .map(|val| (k, val))
205                    .map_err(|err| {
206                        debug!(error = %err, key = %key_for_log, "Failed to deserialize CRDT value in all()");
207                    })
208                    .ok()
209            })
210            .collect()
211    }
212
213    /// Remove tombstoned keys from CRDT metadata maps.
214    fn gc_tombstones(&self) -> usize {
215        self.inner.gc_tombstones()
216    }
217}
218
219impl<T: CrdtValue> Default for CrdtStore<T> {
220    fn default() -> Self {
221        Self::new()
222    }
223}
224
225/// Store type identifier
226#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
227pub enum StoreType {
228    Membership,
229    App,
230    Worker,
231    Policy,
232    RateLimit,
233}
234
235impl StoreType {
236    pub fn as_str(self) -> &'static str {
237        match self {
238            StoreType::Membership => "membership",
239            StoreType::App => "app",
240            StoreType::Worker => "worker",
241            StoreType::Policy => "policy",
242            StoreType::RateLimit => "rate_limit",
243        }
244    }
245
246    /// Convert to proto StoreType (i32)
247    pub fn to_proto(self) -> i32 {
248        use super::service::gossip::StoreType as ProtoStoreType;
249        match self {
250            StoreType::Membership => ProtoStoreType::Membership as i32,
251            StoreType::App => ProtoStoreType::App as i32,
252            StoreType::Worker => ProtoStoreType::Worker as i32,
253            StoreType::Policy => ProtoStoreType::Policy as i32,
254            StoreType::RateLimit => ProtoStoreType::RateLimit as i32,
255        }
256    }
257
258    /// Convert from proto StoreType (i32) to local StoreType
259    pub fn from_proto(proto_value: i32) -> Self {
260        match proto_value {
261            0 => StoreType::Membership,
262            1 => StoreType::App,
263            2 => StoreType::Worker,
264            3 => StoreType::Policy,
265            4 => StoreType::RateLimit,
266            unknown => {
267                tracing::warn!(
268                    proto_value = unknown,
269                    "Unknown StoreType proto value, defaulting to Membership"
270                );
271                StoreType::Membership
272            }
273        }
274    }
275}
276
277/// Membership state entry
278#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash, Default)]
279pub struct MembershipState {
280    pub name: String,
281    pub address: String,
282    pub status: i32, // NodeStatus enum value
283    pub version: u64,
284    pub metadata: BTreeMap<String, Vec<u8>>,
285}
286
287/// App state entry (application configuration)
288#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash, Default)]
289pub struct AppState {
290    pub key: String,
291    pub value: Vec<u8>, // Serialized config
292    pub version: u64,
293}
294
295/// Global rate limit configuration
296#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
297pub struct RateLimitConfig {
298    pub limit_per_second: u64,
299}
300
301/// Key for global rate limit configuration in AppStore
302pub const GLOBAL_RATE_LIMIT_KEY: &str = "global_rate_limit";
303/// Key for global rate limit counter in RateLimitStore
304pub const GLOBAL_RATE_LIMIT_COUNTER_KEY: &str = "global";
305
306/// Worker state entry synced across mesh nodes.
307///
308/// Contains runtime state (`health`, `load`) plus an opaque `spec` blob
309/// carrying the full worker configuration. The mesh crate doesn't interpret
310/// `spec` — the gateway serializes `WorkerSpec` into it on the sending side
311/// and deserializes on the receiving side.
312#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
313pub struct WorkerState {
314    pub worker_id: String,
315    pub model_id: String,
316    pub url: String,
317    pub health: bool,
318    pub load: f64,
319    pub version: u64,
320    /// Opaque worker specification (bincode-serialized WorkerSpec from the
321    /// gateway). Empty on old nodes that don't populate this field.
322    #[serde(default)]
323    pub spec: Vec<u8>,
324}
325
326// Implement Hash manually for WorkerState (excluding f64)
327impl std::hash::Hash for WorkerState {
328    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
329        self.worker_id.hash(state);
330        self.model_id.hash(state);
331        self.url.hash(state);
332        self.health.hash(state);
333        (self.load as i64).hash(state);
334        self.version.hash(state);
335        self.spec.hash(state);
336    }
337}
338
339// Implement Eq manually (f64 comparison with epsilon)
340impl Eq for WorkerState {}
341
342/// Policy state entry
343#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash, Default)]
344pub struct PolicyState {
345    pub model_id: String,
346    pub policy_type: String,
347    pub config: Vec<u8>, // Serialized policy config
348    pub version: u64,
349}
350
351/// Helper function to get policy state key for a model
352pub fn policy_key(model_id: &str) -> String {
353    format!("policy:{model_id}")
354}
355
356/// Helper function to get tree state key for a model
357pub fn tree_state_key(model_id: &str) -> String {
358    format!("tree:{model_id}")
359}
360
361macro_rules! define_state_store {
362    ($store_name:ident, $value_type:ty) => {
363        #[derive(Debug, Clone)]
364        pub struct $store_name {
365            inner: CrdtStore<$value_type>,
366        }
367
368        impl $store_name {
369            pub fn new() -> Self {
370                Self {
371                    inner: CrdtStore::new(),
372                }
373            }
374
375            /// Mutation generation counter. Cheap check to skip unchanged stores.
376            pub fn generation(&self) -> u64 {
377                self.inner.generation()
378            }
379
380            pub fn get(&self, key: &str) -> Option<$value_type> {
381                self.inner.get(key)
382            }
383
384            pub fn insert(
385                &self,
386                key: String,
387                value: $value_type,
388            ) -> Result<Option<$value_type>, CrdtSerError> {
389                self.inner.insert(key, value)
390            }
391
392            pub fn remove(&self, key: &str) {
393                self.inner.remove(key);
394            }
395
396            pub fn merge(&self, log: &OperationLog) {
397                self.inner.merge(log);
398            }
399
400            pub fn get_operation_log(&self) -> OperationLog {
401                self.inner.get_operation_log()
402            }
403
404            pub fn update<F>(
405                &self,
406                key: String,
407                updater: F,
408            ) -> Result<Option<$value_type>, CrdtSerError>
409            where
410                F: FnOnce(Option<$value_type>) -> $value_type,
411            {
412                self.inner.update(key, updater)
413            }
414
415            pub fn update_if<F>(
416                &self,
417                key: String,
418                updater: F,
419            ) -> Result<(Option<$value_type>, bool), CrdtSerError>
420            where
421                F: FnOnce(Option<$value_type>) -> Option<$value_type>,
422            {
423                self.inner.update_if(key, updater)
424            }
425
426            pub fn len(&self) -> usize {
427                self.inner.len()
428            }
429
430            pub fn is_empty(&self) -> bool {
431                self.inner.is_empty()
432            }
433
434            pub fn all(&self) -> BTreeMap<String, $value_type> {
435                self.inner.all()
436            }
437
438            /// Remove tombstoned keys from CRDT metadata to bound memory growth.
439            pub fn gc_tombstones(&self) -> usize {
440                self.inner.gc_tombstones()
441            }
442        }
443
444        impl Default for $store_name {
445            fn default() -> Self {
446                Self::new()
447            }
448        }
449    };
450}
451
452define_state_store!(MembershipStore, MembershipState);
453define_state_store!(AppStore, AppState);
454define_state_store!(WorkerStore, WorkerState);
455define_state_store!(PolicyStore, PolicyState);
456
457// ============================================================================
458// Rate Limit Counter - Simplified Counter Using CrdtOrMap
459// ============================================================================
460
461/// Counter value wrapper for rate limiting
462#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
463struct CounterValue {
464    value: i64,
465}
466
467/// Rate-limit counter store (using CrdtOrMap with consistent hashing)
468#[derive(Debug, Clone)]
469pub struct RateLimitStore {
470    counters: CrdtStore<CounterValue>,
471    hash_ring: Arc<RwLock<ConsistentHashRing>>,
472    self_name: String,
473    actor_replica_ids: Arc<DashMap<String, ReplicaId>>,
474}
475
476impl RateLimitStore {
477    const SHARD_SEPARATOR: &'static str = "::actor:";
478
479    pub fn new(self_name: String) -> Self {
480        Self {
481            counters: CrdtStore::new(),
482            hash_ring: Arc::new(RwLock::new(ConsistentHashRing::new())),
483            self_name,
484            actor_replica_ids: Arc::new(DashMap::new()),
485        }
486    }
487
488    fn shard_key(key: &str, actor: &str) -> String {
489        format!("{key}{}{actor}", Self::SHARD_SEPARATOR)
490    }
491
492    fn split_shard_key(shard_key: &str) -> Option<(&str, &str)> {
493        shard_key.rsplit_once(Self::SHARD_SEPARATOR)
494    }
495
496    fn base_key(shard_key: &str) -> &str {
497        Self::split_shard_key(shard_key).map_or(shard_key, |(base, _)| base)
498    }
499
500    fn replica_id_for_actor(&self, actor: &str) -> ReplicaId {
501        if let Ok(replica_id) = ReplicaId::from_string(actor) {
502            return replica_id;
503        }
504
505        *self.actor_replica_ids.entry(actor.to_string()).or_default()
506    }
507
508    fn aggregate_counter(&self, key: &str) -> Option<i64> {
509        let all_counters = self.counters.all();
510        let mut has_shard = false;
511        let mut total = 0;
512
513        for (shard_key, counter) in all_counters {
514            if Self::base_key(&shard_key) == key {
515                has_shard = true;
516                total += counter.value;
517            }
518        }
519
520        if has_shard {
521            Some(total)
522        } else {
523            None
524        }
525    }
526
527    /// Update the hash ring with current membership
528    pub fn update_membership(&self, nodes: &[String]) {
529        let mut ring = self.hash_ring.write();
530        ring.update_membership(nodes);
531        debug!("Updated rate-limit hash ring with {} nodes", nodes.len());
532    }
533
534    /// Check if this node is an owner of a key
535    pub fn is_owner(&self, key: &str) -> bool {
536        let ring = self.hash_ring.read();
537        ring.is_owner(key, &self.self_name)
538    }
539
540    /// Get owners for a key
541    pub fn get_owners(&self, key: &str) -> Vec<String> {
542        let ring = self.hash_ring.read();
543        ring.get_owners(key)
544    }
545
546    /// Get or create counter (only if this node is an owner)
547    #[expect(dead_code)]
548    fn get_or_create_counter_internal(&self, key: String) -> Option<i64> {
549        if !self.is_owner(&key) {
550            return None;
551        }
552
553        let shard_key = Self::shard_key(&key, &self.self_name);
554        if let Some(counter) = self.counters.get(&shard_key) {
555            return Some(counter.value);
556        }
557
558        let _ = self.counters.insert(shard_key, CounterValue::default());
559        Some(0)
560    }
561
562    pub fn get_counter(&self, key: &str) -> Option<i64> {
563        if !self.is_owner(key) {
564            return None;
565        }
566        self.aggregate_counter(key)
567    }
568
569    /// Get all actor shards as (base_key, actor, value).
570    pub fn all_shards(&self) -> Vec<(String, String, i64)> {
571        self.counters
572            .all()
573            .into_iter()
574            .filter_map(|(shard_key, counter)| {
575                Self::split_shard_key(&shard_key).map(|(base_key, actor)| {
576                    (base_key.to_string(), actor.to_string(), counter.value)
577                })
578            })
579            .collect()
580    }
581
582    /// Increment counter (only if this node is an owner)
583    pub fn inc(&self, key: String, actor: String, delta: i64) {
584        if !self.is_owner(&key) {
585            return;
586        }
587
588        let shard_key = Self::shard_key(&key, &actor);
589        if let Err(err) = self.counters.update(shard_key, |current| CounterValue {
590            value: current.map_or(delta, |existing| existing.value + delta),
591        }) {
592            debug!(error = %err, %key, %actor, "Failed to update rate-limit counter shard");
593        }
594    }
595
596    /// Set a snapshot value for one actor shard.
597    pub fn set_counter_snapshot(&self, key: String, actor: String, counter_value: i64) {
598        if !self.is_owner(&key) {
599            return;
600        }
601
602        let shard_key = Self::shard_key(&key, &actor);
603        if let Err(err) = self.counters.insert(
604            shard_key,
605            CounterValue {
606                value: counter_value,
607            },
608        ) {
609            debug!(error = %err, %key, %actor, "Failed to set rate-limit counter snapshot");
610        }
611    }
612
613    /// Build serialized snapshot payload and shard key for a counter value.
614    ///
615    /// NOTE: This intentionally does not fabricate CRDT operation IDs.
616    pub fn snapshot_payload_for_counter_value(
617        key: String,
618        actor: String,
619        counter_value: i64,
620    ) -> Option<(String, Vec<u8>)> {
621        let bytes = match (CounterValue {
622            value: counter_value,
623        })
624        .to_bytes()
625        {
626            Ok(bytes) => bytes,
627            Err(err) => {
628                debug!(error = %err, "Failed to serialize rate-limit counter snapshot");
629                return None;
630            }
631        };
632
633        let shard_key = Self::shard_key(&key, &actor);
634        Some((shard_key, bytes))
635    }
636
637    pub fn apply_counter_snapshot_payload(
638        &self,
639        shard_key: String,
640        actor: &str,
641        timestamp: u64,
642        payload: &[u8],
643    ) {
644        let Some((base_key, _)) = Self::split_shard_key(&shard_key) else {
645            debug!(%shard_key, "Invalid rate-limit shard key in snapshot payload");
646            return;
647        };
648
649        if !self.is_owner(base_key) {
650            return;
651        }
652
653        if let Err(err) = CounterValue::from_bytes(payload) {
654            debug!(error = %err, %shard_key, "Failed to decode rate-limit snapshot payload");
655            return;
656        }
657
658        let replica_id = self.replica_id_for_actor(actor);
659        let mut log = OperationLog::new();
660        log.append(Operation::insert(
661            shard_key,
662            payload.to_vec(),
663            timestamp,
664            replica_id,
665        ));
666        self.counters.merge(&log);
667    }
668
669    /// Get counter value
670    pub fn value(&self, key: &str) -> Option<i64> {
671        self.aggregate_counter(key)
672    }
673
674    /// Merge operation log from another node
675    pub fn merge(&self, log: &OperationLog) {
676        self.counters.merge(log);
677    }
678
679    /// Get operation log for synchronization
680    pub fn get_operation_log(&self) -> OperationLog {
681        self.counters.get_operation_log()
682    }
683
684    /// Get all counter keys
685    pub fn keys(&self) -> Vec<String> {
686        self.counters
687            .all()
688            .keys()
689            .map(|key| Self::base_key(key).to_string())
690            .collect::<BTreeSet<_>>()
691            .into_iter()
692            .collect()
693    }
694
695    /// Check if we need to transfer ownership due to node failure
696    pub fn check_ownership_transfer(&self, failed_nodes: &[String]) -> Vec<String> {
697        let mut affected_keys = Vec::new();
698        let ring = self.hash_ring.read();
699        for key in self.keys() {
700            let owners = ring.get_owners(&key);
701            if owners.iter().any(|owner| failed_nodes.contains(owner))
702                && ring.is_owner(&key, &self.self_name)
703            {
704                affected_keys.push(key);
705            }
706        }
707
708        affected_keys
709    }
710}
711
712impl Default for RateLimitStore {
713    fn default() -> Self {
714        Self::new("default".to_string())
715    }
716}
717
718/// All state stores container
719#[derive(Debug, Clone)]
720pub struct StateStores {
721    pub membership: MembershipStore,
722    pub app: AppStore,
723    pub worker: WorkerStore,
724    pub policy: PolicyStore,
725    pub rate_limit: RateLimitStore,
726    /// Pending tree operations for delta sync.
727    /// Key: tree key (e.g., "tree:model-name"), Value: operations since last successful send.
728    pub tree_ops_pending: DashMap<String, Vec<TreeOperation>>,
729    /// Per-key version counters for tree state, bumped atomically on every
730    /// `sync_tree_operation` call.  Replaces the expensive CRDT
731    /// `policy.update()` that previously serialized the entire TreeState
732    /// config blob (~1 MB) on every request.
733    pub tree_versions: DashMap<String, Arc<AtomicU64>>,
734    /// Global generation counter for tree changes.  The incremental collector
735    /// checks this (in addition to `policy.generation()`) to decide whether
736    /// the policy store needs scanning.
737    pub tree_generation: Arc<AtomicU64>,
738    /// Materialized tree state config blobs, stored outside the CRDT policy
739    /// store to avoid operation log memory accumulation (~50 MB/min leak).
740    /// Key: tree key (e.g., "tree:model-name"), Value: bincode-serialized TreeState.
741    pub tree_configs: DashMap<String, Vec<u8>>,
742    /// Tenant delta buffer for efficient two-layer sync.
743    /// Key: model_id, Value: pending tenant inserts since last gossip round.
744    /// Drained by the collector each round and sent as TenantDelta messages.
745    pub tenant_delta_inserts: DashMap<String, Vec<crate::tree_ops::TenantInsert>>,
746    /// Tenant eviction buffer — same pattern as inserts.
747    pub tenant_delta_evictions: DashMap<String, Vec<crate::tree_ops::TenantEvict>>,
748}
749
750impl StateStores {
751    pub fn new() -> Self {
752        Self {
753            membership: MembershipStore::new(),
754            app: AppStore::new(),
755            worker: WorkerStore::new(),
756            policy: PolicyStore::new(),
757            rate_limit: RateLimitStore::new("default".to_string()),
758            tree_ops_pending: DashMap::new(),
759            tree_versions: DashMap::new(),
760            tree_generation: Arc::new(AtomicU64::new(0)),
761            tree_configs: DashMap::new(),
762            tenant_delta_inserts: DashMap::new(),
763            tenant_delta_evictions: DashMap::new(),
764        }
765    }
766
767    pub fn with_self_name(self_name: String) -> Self {
768        Self {
769            membership: MembershipStore::new(),
770            app: AppStore::new(),
771            worker: WorkerStore::new(),
772            policy: PolicyStore::new(),
773            rate_limit: RateLimitStore::new(self_name),
774            tree_ops_pending: DashMap::new(),
775            tree_versions: DashMap::new(),
776            tree_generation: Arc::new(AtomicU64::new(0)),
777            tree_configs: DashMap::new(),
778            tenant_delta_inserts: DashMap::new(),
779            tenant_delta_evictions: DashMap::new(),
780        }
781    }
782
783    /// Get the current version for a tree key.
784    pub fn tree_version(&self, key: &str) -> u64 {
785        self.tree_versions
786            .get(key)
787            .map(|v| v.load(Ordering::Acquire))
788            .unwrap_or(0)
789    }
790
791    /// Atomically bump the version for a tree key and the global tree
792    /// generation.  Returns the new version.  This is O(1) with no
793    /// serialization — unlike `policy.update()` which deserializes and
794    /// re-serializes the entire config blob.
795    ///
796    /// On the first call for a given key the counter is seeded from the
797    /// existing PolicyState version (if any) so that local ops don't
798    /// regress the advertised version below a remote/checkpointed baseline.
799    pub fn bump_tree_version(&self, key: &str) -> u64 {
800        let version = self
801            .tree_versions
802            .entry(key.to_string())
803            .or_insert_with(|| {
804                // Seed from the committed tree config version so deltas
805                // start above any existing remote/checkpointed state.
806                let base = self
807                    .tree_configs
808                    .get(key)
809                    .and_then(|bytes| {
810                        super::tree_ops::TreeState::from_bytes(&bytes)
811                            .ok()
812                            .map(|ts| ts.version)
813                    })
814                    .unwrap_or(0);
815                Arc::new(AtomicU64::new(base))
816            })
817            .fetch_add(1, Ordering::Release)
818            + 1;
819        self.tree_generation.fetch_add(1, Ordering::Release);
820        version
821    }
822
823    /// Advance the tree version counter to at least `version`.
824    /// Called after applying remote deltas/full-state updates so that
825    /// subsequent local ops start above the remote baseline.
826    pub fn advance_tree_version(&self, key: &str, version: u64) {
827        self.tree_versions
828            .entry(key.to_string())
829            .or_insert_with(|| Arc::new(AtomicU64::new(0)))
830            .fetch_max(version, Ordering::Release);
831    }
832
833    /// Run garbage collection across all stores, removing tombstoned CRDT
834    /// metadata entries. Returns the total number of entries removed.
835    pub fn gc_tombstones(&self) -> usize {
836        self.membership.gc_tombstones()
837            + self.app.gc_tombstones()
838            + self.worker.gc_tombstones()
839            + self.policy.gc_tombstones()
840    }
841
842    /// Remove stale tree entries that have no pending operations.
843    /// Returns the total number of entries removed across all tree maps.
844    ///
845    /// `tree_versions` entries are never removed during normal operation, so
846    /// using `tree_versions.contains_key()` as a liveness signal is
847    /// ineffective — it always returns true for any key that was ever used.
848    /// Instead, we use pending ops as the primary liveness indicator.
849    pub fn gc_stale_tree_entries(&self) -> usize {
850        let before =
851            self.tree_configs.len() + self.tree_versions.len() + self.tree_ops_pending.len();
852
853        // tree_configs is the authoritative store — only remove entries
854        // for models that are truly gone (no version counter AND no
855        // pending ops).
856        //
857        // An active tree has: tree_configs entry (from checkpoint or
858        // remote apply). Pending ops drain every 10 rounds via
859        // checkpoint, so empty pending does NOT mean the tree is stale.
860
861        // Remove tree_versions for models with no tree_configs AND no
862        // pending ops (model was fully deregistered).
863        self.tree_versions.retain(|k, _| {
864            self.tree_configs.contains_key(k)
865                || self.tree_ops_pending.get(k).is_some_and(|v| !v.is_empty())
866        });
867
868        // Remove empty pending op buffers for models with no tree_configs.
869        self.tree_ops_pending
870            .retain(|k, v| !v.is_empty() || self.tree_configs.contains_key(k));
871
872        // Only remove tree_configs for models with no version counter
873        // AND no pending ops — these are truly orphaned entries.
874        self.tree_configs.retain(|k, _| {
875            self.tree_versions.contains_key(k)
876                || self.tree_ops_pending.get(k).is_some_and(|v| !v.is_empty())
877        });
878
879        let after =
880            self.tree_configs.len() + self.tree_versions.len() + self.tree_ops_pending.len();
881        before.saturating_sub(after)
882    }
883}
884
885impl Default for StateStores {
886    fn default() -> Self {
887        Self::new()
888    }
889}
890
891#[cfg(test)]
892mod tests {
893    use std::collections::BTreeMap;
894
895    use super::*;
896    use crate::service::gossip::NodeStatus;
897
898    #[test]
899    fn test_membership_store() {
900        let store = MembershipStore::new();
901        let key = "node1".to_string();
902        let state = MembershipState {
903            name: "node1".to_string(),
904            address: "127.0.0.1:8000".to_string(),
905            status: NodeStatus::Alive as i32,
906            version: 1,
907            metadata: BTreeMap::new(),
908        };
909
910        let _ = store.insert(key.clone(), state.clone());
911        assert_eq!(store.get(&key).unwrap().name, "node1");
912
913        store.remove(&key);
914        assert!(store.get(&key).is_none());
915    }
916
917    #[test]
918    fn test_app_store() {
919        let store = AppStore::new();
920        let key = "app_key1".to_string();
921        let state = AppState {
922            key: "app_key1".to_string(),
923            value: b"app_value".to_vec(),
924            version: 1,
925        };
926
927        let _ = store.insert(key.clone(), state.clone());
928        assert_eq!(store.get(&key).unwrap().key, "app_key1");
929    }
930
931    #[test]
932    fn test_worker_store() {
933        let store = WorkerStore::new();
934        let key = "worker1".to_string();
935        let state = WorkerState {
936            worker_id: "worker1".to_string(),
937            model_id: "model1".to_string(),
938            url: "http://localhost:8000".to_string(),
939            health: true,
940            load: 0.5,
941            version: 1,
942            spec: vec![],
943        };
944
945        let _ = store.insert(key.clone(), state.clone());
946        assert_eq!(store.get(&key).unwrap().worker_id, "worker1");
947    }
948
949    #[test]
950    fn test_policy_store() {
951        let store = PolicyStore::new();
952        let key = "policy:model1".to_string();
953        let state = PolicyState {
954            model_id: "model1".to_string(),
955            policy_type: "cache_aware".to_string(),
956            config: b"config_data".to_vec(),
957            version: 1,
958        };
959
960        let _ = store.insert(key.clone(), state.clone());
961        assert_eq!(store.get(&key).unwrap().model_id, "model1");
962    }
963
964    #[test]
965    fn test_rate_limit_store_update_membership() {
966        let store = RateLimitStore::new("node1".to_string());
967
968        store.update_membership(&[
969            "node1".to_string(),
970            "node2".to_string(),
971            "node3".to_string(),
972        ]);
973
974        let owners = store.get_owners("test_key");
975        assert_eq!(owners.len(), 3);
976        assert!(
977            owners.contains(&"node1".to_string())
978                || owners.contains(&"node2".to_string())
979                || owners.contains(&"node3".to_string())
980        );
981    }
982
983    #[test]
984    fn test_rate_limit_store_is_owner() {
985        let store = RateLimitStore::new("node1".to_string());
986
987        store.update_membership(&["node1".to_string()]);
988
989        let test_key = "test_key".to_string();
990        let is_owner = store.is_owner(&test_key);
991        // node1 should be owner since it's the only node
992        assert!(is_owner);
993    }
994
995    #[test]
996    fn test_rate_limit_store_inc_only_owner() {
997        let store = RateLimitStore::new("node1".to_string());
998
999        store.update_membership(&["node1".to_string()]);
1000
1001        let test_key = "test_key".to_string();
1002        if store.is_owner(&test_key) {
1003            store.inc(test_key.clone(), "node1".to_string(), 5);
1004
1005            let value = store.value(&test_key);
1006            assert_eq!(value, Some(5));
1007        }
1008    }
1009
1010    #[test]
1011    fn test_rate_limit_store_inc_non_owner() {
1012        let store = RateLimitStore::new("node1".to_string());
1013
1014        // Setup membership without node1 as owner
1015        store.update_membership(&["node2".to_string(), "node3".to_string()]);
1016
1017        let test_key = "test_key".to_string();
1018        if !store.is_owner(&test_key) {
1019            store.inc(test_key.clone(), "node1".to_string(), 5);
1020
1021            // Should not increment if not owner
1022            let value = store.value(&test_key);
1023            assert_eq!(value, None);
1024        }
1025    }
1026
1027    #[test]
1028    fn test_rate_limit_store_merge_counter() {
1029        let store1 = RateLimitStore::new("node1".to_string());
1030        let store2 = RateLimitStore::new("node2".to_string());
1031
1032        store1.update_membership(&["node1".to_string()]);
1033        store2.update_membership(&["node2".to_string()]);
1034
1035        let test_key = "test_key".to_string();
1036
1037        // Both nodes increment their counters
1038        if store1.is_owner(&test_key) {
1039            store1.inc(test_key.clone(), "node1".to_string(), 10);
1040        }
1041
1042        if store2.is_owner(&test_key) {
1043            store2.inc(test_key.clone(), "node2".to_string(), 5);
1044        }
1045
1046        // Merge operation log from store2 into store1
1047        let log2 = store2.get_operation_log();
1048        store1.merge(&log2);
1049
1050        // Get aggregated value (if node1 is owner)
1051        if store1.is_owner(&test_key) {
1052            let value = store1.value(&test_key);
1053            assert_eq!(value, Some(15));
1054        }
1055    }
1056
1057    #[test]
1058    fn test_rate_limit_store_check_ownership_transfer() {
1059        let store = RateLimitStore::new("node1".to_string());
1060
1061        store.update_membership(&[
1062            "node1".to_string(),
1063            "node2".to_string(),
1064            "node3".to_string(),
1065        ]);
1066
1067        let test_key = "test_key".to_string();
1068
1069        // Setup a counter (if node1 is owner)
1070        if store.is_owner(&test_key) {
1071            store.inc(test_key.clone(), "node1".to_string(), 10);
1072        }
1073
1074        // Check ownership transfer when node2 fails
1075        let affected = store.check_ownership_transfer(&["node2".to_string()]);
1076        // Should detect if node2 was an owner
1077        let _ = affected;
1078    }
1079
1080    #[test]
1081    fn test_rate_limit_store_keys() {
1082        let store = RateLimitStore::new("node1".to_string());
1083
1084        store.update_membership(&["node1".to_string()]);
1085
1086        let key1 = "key1".to_string();
1087        let key2 = "key2".to_string();
1088
1089        if store.is_owner(&key1) {
1090            store.inc(key1.clone(), "node1".to_string(), 1);
1091        }
1092
1093        if store.is_owner(&key2) {
1094            store.inc(key2.clone(), "node1".to_string(), 1);
1095        }
1096
1097        let keys = store.keys();
1098        // Should contain keys where node1 is owner
1099        let _ = keys;
1100    }
1101
1102    #[test]
1103    fn test_state_stores_new() {
1104        let stores = StateStores::new();
1105        assert_eq!(stores.membership.len(), 0);
1106        assert_eq!(stores.app.len(), 0);
1107        assert_eq!(stores.worker.len(), 0);
1108        assert_eq!(stores.policy.len(), 0);
1109    }
1110
1111    #[test]
1112    fn test_state_stores_with_self_name() {
1113        let stores = StateStores::with_self_name("test_node".to_string());
1114        // Rate limit store should have the self_name
1115        let test_key = "test_key".to_string();
1116        stores
1117            .rate_limit
1118            .update_membership(&["test_node".to_string()]);
1119        assert!(stores.rate_limit.is_owner(&test_key));
1120    }
1121}