Skip to main content

clasp_core/
state.rs

1//! State management for Clasp params
2//!
3//! Provides conflict resolution and revision tracking for stateful parameters.
4
5use crate::{ConflictStrategy, Ttl, Value};
6use std::collections::HashMap;
7use std::time::{Duration, SystemTime, UNIX_EPOCH};
8
9/// Eviction strategy when the state store reaches capacity
10#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
11pub enum EvictionStrategy {
12    /// Evict least recently accessed entries (default)
13    #[default]
14    Lru,
15    /// Evict oldest entries by creation time
16    OldestFirst,
17    /// Reject new entries when at capacity
18    RejectNew,
19}
20
21/// Configuration for state store limits
22#[derive(Debug, Clone)]
23pub struct StateStoreConfig {
24    /// Maximum number of parameters (None = unlimited)
25    pub max_params: Option<usize>,
26    /// Time-to-live for parameters without access (None = never expire)
27    pub param_ttl: Option<Duration>,
28    /// Strategy for eviction when at capacity
29    pub eviction: EvictionStrategy,
30}
31
32impl Default for StateStoreConfig {
33    fn default() -> Self {
34        Self {
35            max_params: Some(10_000),
36            param_ttl: Some(Duration::from_secs(3600)), // 1 hour
37            eviction: EvictionStrategy::Lru,
38        }
39    }
40}
41
42impl StateStoreConfig {
43    /// Create config with no limits (for backwards compatibility)
44    pub fn unlimited() -> Self {
45        Self {
46            max_params: None,
47            param_ttl: None,
48            eviction: EvictionStrategy::Lru,
49        }
50    }
51
52    /// Create config with custom limits
53    pub fn with_limits(max_params: usize, ttl_secs: u64) -> Self {
54        Self {
55            max_params: Some(max_params),
56            param_ttl: Some(Duration::from_secs(ttl_secs)),
57            eviction: EvictionStrategy::Lru,
58        }
59    }
60}
61
62/// State of a single parameter
63#[derive(Debug, Clone)]
64pub struct ParamState {
65    /// Current value
66    pub value: Value,
67    /// Monotonic revision number
68    pub revision: u64,
69    /// Session ID of last writer
70    pub writer: String,
71    /// Timestamp of last write (microseconds)
72    pub timestamp: u64,
73    /// Timestamp of last access (microseconds) - for TTL eviction
74    pub last_accessed: u64,
75    /// Conflict resolution strategy
76    pub strategy: ConflictStrategy,
77    /// Lock holder (if locked)
78    pub lock_holder: Option<String>,
79    /// Metadata
80    pub meta: Option<ParamMeta>,
81    /// Origin router ID (for federation loop prevention)
82    pub origin: Option<String>,
83    /// Per-param TTL (overrides server-wide TTL when set)
84    pub ttl: Option<Ttl>,
85}
86
87/// Parameter metadata
88#[derive(Debug, Clone)]
89pub struct ParamMeta {
90    pub unit: Option<String>,
91    pub range: Option<(f64, f64)>,
92    pub default: Option<Value>,
93}
94
95impl ParamState {
96    /// Create a new param state
97    pub fn new(value: Value, writer: String) -> Self {
98        let now = current_timestamp();
99        Self {
100            value,
101            revision: 1,
102            writer,
103            timestamp: now,
104            last_accessed: now,
105            strategy: ConflictStrategy::Lww,
106            lock_holder: None,
107            meta: None,
108            origin: None,
109            ttl: None,
110        }
111    }
112
113    /// Update the last_accessed timestamp
114    pub fn touch(&mut self) {
115        self.last_accessed = current_timestamp();
116    }
117
118    /// Create with specific strategy
119    pub fn with_strategy(mut self, strategy: ConflictStrategy) -> Self {
120        self.strategy = strategy;
121        self
122    }
123
124    /// Create with metadata
125    pub fn with_meta(mut self, meta: ParamMeta) -> Self {
126        self.meta = Some(meta);
127        self
128    }
129
130    /// Attempt to update the value
131    ///
132    /// Returns Ok(new_revision) if update was accepted,
133    /// Err with reason if rejected.
134    pub fn try_update(
135        &mut self,
136        new_value: Value,
137        writer: &str,
138        expected_revision: Option<u64>,
139        request_lock: bool,
140        release_lock: bool,
141        ttl: Option<Ttl>,
142    ) -> Result<u64, UpdateError> {
143        let timestamp = current_timestamp();
144
145        // Check optimistic lock (if revision specified)
146        if let Some(expected) = expected_revision {
147            if expected != self.revision {
148                return Err(UpdateError::RevisionConflict {
149                    expected,
150                    actual: self.revision,
151                });
152            }
153        }
154
155        // Check lock
156        if let Some(ref holder) = self.lock_holder {
157            if holder != writer && !release_lock {
158                return Err(UpdateError::LockHeld {
159                    holder: holder.clone(),
160                });
161            }
162        }
163
164        // Handle lock release
165        if release_lock && self.lock_holder.as_deref() == Some(writer) {
166            self.lock_holder = None;
167        }
168
169        // Apply conflict resolution
170        let should_update = match self.strategy {
171            ConflictStrategy::Lww => timestamp >= self.timestamp,
172            ConflictStrategy::Max => {
173                match (&new_value, &self.value) {
174                    (Value::Float(new), Value::Float(old)) => new > old,
175                    (Value::Int(new), Value::Int(old)) => new > old,
176                    _ => true, // Fall back to LWW for non-numeric
177                }
178            }
179            ConflictStrategy::Min => match (&new_value, &self.value) {
180                (Value::Float(new), Value::Float(old)) => new < old,
181                (Value::Int(new), Value::Int(old)) => new < old,
182                _ => true,
183            },
184            ConflictStrategy::Lock => {
185                self.lock_holder.is_none() || self.lock_holder.as_deref() == Some(writer)
186            }
187            ConflictStrategy::Merge => true, // App handles merge
188        };
189
190        if !should_update {
191            return Err(UpdateError::ConflictRejected);
192        }
193
194        // Handle lock request
195        if request_lock {
196            if self.lock_holder.is_some() && self.lock_holder.as_deref() != Some(writer) {
197                return Err(UpdateError::LockHeld {
198                    holder: self.lock_holder.clone().unwrap(),
199                });
200            }
201            self.lock_holder = Some(writer.to_string());
202        }
203
204        // Apply update
205        self.value = new_value;
206        self.revision += 1;
207        self.writer = writer.to_string();
208        self.timestamp = timestamp;
209        self.last_accessed = timestamp;
210
211        // Update per-param TTL if provided
212        if let Some(t) = ttl {
213            self.ttl = Some(t);
214        }
215
216        Ok(self.revision)
217    }
218
219    /// Check if value is within range (if specified)
220    pub fn validate_range(&self, value: &Value) -> bool {
221        if let Some(meta) = &self.meta {
222            if let Some((min, max)) = meta.range {
223                if let Some(v) = value.as_f64() {
224                    return v >= min && v <= max;
225                }
226            }
227        }
228        true
229    }
230}
231
232/// Errors that can occur during state updates
233#[derive(Debug, Clone)]
234pub enum UpdateError {
235    RevisionConflict { expected: u64, actual: u64 },
236    LockHeld { holder: String },
237    ConflictRejected,
238    OutOfRange,
239    AtCapacity,
240}
241
242impl std::fmt::Display for UpdateError {
243    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
244        match self {
245            Self::RevisionConflict { expected, actual } => {
246                write!(
247                    f,
248                    "Revision conflict: expected {}, got {}",
249                    expected, actual
250                )
251            }
252            Self::LockHeld { holder } => {
253                write!(f, "Parameter locked by {}", holder)
254            }
255            Self::ConflictRejected => {
256                write!(f, "Update rejected by conflict strategy")
257            }
258            Self::OutOfRange => {
259                write!(f, "Value out of allowed range")
260            }
261            Self::AtCapacity => {
262                write!(f, "State store at capacity")
263            }
264        }
265    }
266}
267
268impl std::error::Error for UpdateError {}
269
270/// Error returned when state store is at capacity
271#[derive(Debug, Clone)]
272pub struct CapacityError;
273
274impl std::fmt::Display for CapacityError {
275    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
276        write!(f, "State store at capacity")
277    }
278}
279
280impl std::error::Error for CapacityError {}
281
282/// State store for multiple params
283#[derive(Debug)]
284pub struct StateStore {
285    params: HashMap<String, ParamState>,
286    config: StateStoreConfig,
287}
288
289impl Default for StateStore {
290    fn default() -> Self {
291        Self {
292            params: HashMap::new(),
293            config: StateStoreConfig::unlimited(), // Backwards compatible default
294        }
295    }
296}
297
298impl StateStore {
299    /// Create a new state store with default (unlimited) config
300    pub fn new() -> Self {
301        Self::default()
302    }
303
304    /// Create a new state store with the specified config
305    pub fn with_config(config: StateStoreConfig) -> Self {
306        Self {
307            params: HashMap::new(),
308            config,
309        }
310    }
311
312    /// Get the current configuration
313    pub fn config(&self) -> &StateStoreConfig {
314        &self.config
315    }
316
317    /// Get a param's current state (does not update last_accessed)
318    pub fn get(&self, address: &str) -> Option<&ParamState> {
319        self.params.get(address)
320    }
321
322    /// Get a param's current state and update last_accessed
323    pub fn get_mut(&mut self, address: &str) -> Option<&mut ParamState> {
324        let param = self.params.get_mut(address)?;
325        param.touch();
326        Some(param)
327    }
328
329    /// Get a param's current value (does not update last_accessed)
330    pub fn get_value(&self, address: &str) -> Option<&Value> {
331        self.params.get(address).map(|p| &p.value)
332    }
333
334    /// Get a param's current value and update last_accessed
335    pub fn get_value_mut(&mut self, address: &str) -> Option<&Value> {
336        let param = self.params.get_mut(address)?;
337        param.touch();
338        Some(&param.value)
339    }
340
341    /// Set a param value, creating if necessary
342    pub fn set(
343        &mut self,
344        address: &str,
345        value: Value,
346        writer: &str,
347        revision: Option<u64>,
348        lock: bool,
349        unlock: bool,
350        ttl: Option<Ttl>,
351    ) -> Result<u64, UpdateError> {
352        if let Some(param) = self.params.get_mut(address) {
353            param.try_update(value, writer, revision, lock, unlock, ttl)
354        } else {
355            // Check capacity before creating new param
356            if let Some(max) = self.config.max_params {
357                if self.params.len() >= max {
358                    match self.config.eviction {
359                        EvictionStrategy::RejectNew => {
360                            return Err(UpdateError::AtCapacity);
361                        }
362                        EvictionStrategy::Lru => {
363                            self.evict_lru();
364                        }
365                        EvictionStrategy::OldestFirst => {
366                            self.evict_oldest();
367                        }
368                    }
369                }
370            }
371
372            // Create new param
373            let mut param = ParamState::new(value, writer.to_string());
374            if lock {
375                param.lock_holder = Some(writer.to_string());
376            }
377            param.ttl = ttl;
378            let rev = param.revision;
379            self.params.insert(address.to_string(), param);
380            Ok(rev)
381        }
382    }
383
384    /// Evict the least recently accessed param
385    fn evict_lru(&mut self) {
386        if let Some(oldest_key) = self
387            .params
388            .iter()
389            .min_by_key(|(_, v)| v.last_accessed)
390            .map(|(k, _)| k.clone())
391        {
392            self.params.remove(&oldest_key);
393        }
394    }
395
396    /// Evict the oldest param by creation time (lowest revision is oldest)
397    fn evict_oldest(&mut self) {
398        if let Some(oldest_key) = self
399            .params
400            .iter()
401            .min_by_key(|(_, v)| v.timestamp)
402            .map(|(k, _)| k.clone())
403        {
404            self.params.remove(&oldest_key);
405        }
406    }
407
408    /// Remove params that haven't been accessed within the TTL
409    /// Returns the number of params removed
410    pub fn cleanup_stale(&mut self, ttl: Duration) -> usize {
411        let now = current_timestamp();
412        let global_ttl_micros = ttl.as_micros() as u64;
413
414        let before = self.params.len();
415        self.params.retain(|_, v| match v.ttl {
416            Some(Ttl::Never) => true,
417            Some(Ttl::Sliding(secs)) => {
418                let cutoff = now.saturating_sub(secs as u64 * 1_000_000);
419                v.last_accessed >= cutoff
420            }
421            Some(Ttl::Absolute(secs)) => {
422                let expires_at = v.timestamp.saturating_add(secs as u64 * 1_000_000);
423                now < expires_at
424            }
425            None => {
426                let cutoff = now.saturating_sub(global_ttl_micros);
427                v.last_accessed >= cutoff
428            }
429        });
430        before - self.params.len()
431    }
432
433    /// Run cleanup using the configured TTL (if any)
434    /// Returns the number of params removed
435    pub fn cleanup_stale_with_config(&mut self) -> usize {
436        if let Some(ttl) = self.config.param_ttl {
437            self.cleanup_stale(ttl)
438        } else {
439            0
440        }
441    }
442
443    /// Get all params matching a pattern
444    pub fn get_matching(&self, pattern: &str) -> Vec<(&str, &ParamState)> {
445        use crate::address::glob_match;
446
447        self.params
448            .iter()
449            .filter(|(addr, _)| glob_match(pattern, addr))
450            .map(|(addr, state)| (addr.as_str(), state))
451            .collect()
452    }
453
454    /// Get all params as a snapshot
455    pub fn snapshot(&self) -> Vec<(&str, &ParamState)> {
456        self.params.iter().map(|(k, v)| (k.as_str(), v)).collect()
457    }
458
459    /// Number of params
460    pub fn len(&self) -> usize {
461        self.params.len()
462    }
463
464    /// Check if empty
465    pub fn is_empty(&self) -> bool {
466        self.params.is_empty()
467    }
468
469    /// Remove a param
470    pub fn remove(&mut self, address: &str) -> Option<ParamState> {
471        self.params.remove(address)
472    }
473
474    /// Clear all params
475    pub fn clear(&mut self) {
476        self.params.clear();
477    }
478}
479
480/// Get current timestamp in microseconds
481fn current_timestamp() -> u64 {
482    SystemTime::now()
483        .duration_since(UNIX_EPOCH)
484        .map(|d| d.as_micros() as u64)
485        .unwrap_or(0)
486}
487
488#[cfg(test)]
489mod tests {
490    use super::*;
491
492    #[test]
493    fn test_basic_update() {
494        let mut state = ParamState::new(Value::Float(0.5), "session1".to_string());
495
496        let result = state.try_update(Value::Float(0.75), "session2", None, false, false, None);
497
498        assert!(result.is_ok());
499        assert_eq!(state.revision, 2);
500        assert_eq!(state.value, Value::Float(0.75));
501        assert_eq!(state.writer, "session2");
502    }
503
504    #[test]
505    fn test_revision_conflict() {
506        let mut state = ParamState::new(Value::Float(0.5), "session1".to_string());
507
508        let result = state.try_update(
509            Value::Float(0.75),
510            "session2",
511            Some(999), // Wrong revision
512            false,
513            false,
514            None,
515        );
516
517        assert!(matches!(result, Err(UpdateError::RevisionConflict { .. })));
518    }
519
520    #[test]
521    fn test_locking() {
522        let mut state = ParamState::new(Value::Float(0.5), "session1".to_string());
523
524        // Session 1 takes lock
525        let result = state.try_update(
526            Value::Float(0.6),
527            "session1",
528            None,
529            true, // Request lock
530            false,
531            None,
532        );
533        assert!(result.is_ok());
534        assert_eq!(state.lock_holder, Some("session1".to_string()));
535
536        // Session 2 tries to update - should fail
537        let result = state.try_update(Value::Float(0.7), "session2", None, false, false, None);
538        assert!(matches!(result, Err(UpdateError::LockHeld { .. })));
539
540        // Session 1 can still update
541        let result = state.try_update(Value::Float(0.8), "session1", None, false, false, None);
542        assert!(result.is_ok());
543    }
544
545    #[test]
546    fn test_max_strategy() {
547        let mut state = ParamState::new(Value::Float(0.5), "session1".to_string())
548            .with_strategy(ConflictStrategy::Max);
549
550        // Higher value wins
551        let result = state.try_update(Value::Float(0.8), "session2", None, false, false, None);
552        assert!(result.is_ok());
553        assert_eq!(state.value, Value::Float(0.8));
554
555        // Lower value rejected
556        let result = state.try_update(Value::Float(0.3), "session3", None, false, false, None);
557        assert!(matches!(result, Err(UpdateError::ConflictRejected)));
558        assert_eq!(state.value, Value::Float(0.8)); // Unchanged
559    }
560
561    #[test]
562    fn test_state_store() {
563        let mut store = StateStore::new();
564
565        store
566            .set("/test/a", Value::Float(1.0), "s1", None, false, false, None)
567            .unwrap();
568        store
569            .set("/test/b", Value::Float(2.0), "s1", None, false, false, None)
570            .unwrap();
571        store
572            .set(
573                "/other/c",
574                Value::Float(3.0),
575                "s1",
576                None,
577                false,
578                false,
579                None,
580            )
581            .unwrap();
582
583        assert_eq!(store.len(), 3);
584
585        let matching = store.get_matching("/test/*");
586        assert_eq!(matching.len(), 2);
587    }
588
589    #[test]
590    fn test_state_store_capacity_reject() {
591        let config = StateStoreConfig {
592            max_params: Some(2),
593            param_ttl: None,
594            eviction: EvictionStrategy::RejectNew,
595        };
596        let mut store = StateStore::with_config(config);
597
598        store
599            .set("/test/a", Value::Float(1.0), "s1", None, false, false, None)
600            .unwrap();
601        store
602            .set("/test/b", Value::Float(2.0), "s1", None, false, false, None)
603            .unwrap();
604
605        // Third should fail
606        let result = store.set("/test/c", Value::Float(3.0), "s1", None, false, false, None);
607        assert!(matches!(result, Err(UpdateError::AtCapacity)));
608        assert_eq!(store.len(), 2);
609
610        // Updating existing should still work
611        store
612            .set("/test/a", Value::Float(1.5), "s1", None, false, false, None)
613            .unwrap();
614        assert_eq!(store.get_value("/test/a"), Some(&Value::Float(1.5)));
615    }
616
617    #[test]
618    fn test_state_store_capacity_lru_eviction() {
619        let config = StateStoreConfig {
620            max_params: Some(2),
621            param_ttl: None,
622            eviction: EvictionStrategy::Lru,
623        };
624        let mut store = StateStore::with_config(config);
625
626        store
627            .set("/test/a", Value::Float(1.0), "s1", None, false, false, None)
628            .unwrap();
629        std::thread::sleep(std::time::Duration::from_millis(1));
630        store
631            .set("/test/b", Value::Float(2.0), "s1", None, false, false, None)
632            .unwrap();
633
634        // Access /test/a to make it more recent
635        std::thread::sleep(std::time::Duration::from_millis(1));
636        store.get_mut("/test/a");
637
638        // Third should evict /test/b (least recently accessed)
639        store
640            .set("/test/c", Value::Float(3.0), "s1", None, false, false, None)
641            .unwrap();
642
643        assert_eq!(store.len(), 2);
644        assert!(store.get("/test/a").is_some());
645        assert!(store.get("/test/b").is_none()); // Evicted
646        assert!(store.get("/test/c").is_some());
647    }
648
649    #[test]
650    fn test_state_store_capacity_oldest_eviction() {
651        let config = StateStoreConfig {
652            max_params: Some(2),
653            param_ttl: None,
654            eviction: EvictionStrategy::OldestFirst,
655        };
656        let mut store = StateStore::with_config(config);
657
658        store
659            .set("/test/a", Value::Float(1.0), "s1", None, false, false, None)
660            .unwrap();
661        std::thread::sleep(std::time::Duration::from_millis(1));
662        store
663            .set("/test/b", Value::Float(2.0), "s1", None, false, false, None)
664            .unwrap();
665
666        // Third should evict /test/a (oldest)
667        store
668            .set("/test/c", Value::Float(3.0), "s1", None, false, false, None)
669            .unwrap();
670
671        assert_eq!(store.len(), 2);
672        assert!(store.get("/test/a").is_none()); // Evicted
673        assert!(store.get("/test/b").is_some());
674        assert!(store.get("/test/c").is_some());
675    }
676
677    #[test]
678    fn test_state_store_cleanup_stale() {
679        let mut store = StateStore::new();
680
681        store
682            .set("/test/a", Value::Float(1.0), "s1", None, false, false, None)
683            .unwrap();
684        store
685            .set("/test/b", Value::Float(2.0), "s1", None, false, false, None)
686            .unwrap();
687
688        // Sleep a bit, then access /test/a
689        std::thread::sleep(std::time::Duration::from_millis(10));
690        store.get_mut("/test/a");
691
692        // Cleanup with a very short TTL - should remove /test/b but not /test/a
693        let removed = store.cleanup_stale(Duration::from_millis(5));
694        assert_eq!(removed, 1);
695        assert!(store.get("/test/a").is_some());
696        assert!(store.get("/test/b").is_none());
697    }
698
699    #[test]
700    fn test_state_store_cleanup_stale_with_config() {
701        let config = StateStoreConfig {
702            max_params: None,
703            param_ttl: Some(Duration::from_millis(5)),
704            eviction: EvictionStrategy::Lru,
705        };
706        let mut store = StateStore::with_config(config);
707
708        store
709            .set("/test/a", Value::Float(1.0), "s1", None, false, false, None)
710            .unwrap();
711
712        // Immediate cleanup should remove nothing
713        let removed = store.cleanup_stale_with_config();
714        assert_eq!(removed, 0);
715
716        // Wait and cleanup
717        std::thread::sleep(std::time::Duration::from_millis(10));
718        let removed = store.cleanup_stale_with_config();
719        assert_eq!(removed, 1);
720        assert!(store.is_empty());
721    }
722
723    #[test]
724    fn test_last_accessed_tracking() {
725        let mut state = ParamState::new(Value::Float(0.5), "session1".to_string());
726        let initial_accessed = state.last_accessed;
727
728        std::thread::sleep(std::time::Duration::from_millis(1));
729        state.touch();
730
731        assert!(state.last_accessed > initial_accessed);
732    }
733}