Skip to main content

clasp_core/
state.rs

1//! State management for Clasp params
2//!
3//! Provides conflict resolution and revision tracking for stateful parameters.
4
5use crate::{ConflictStrategy, Value};
6use std::collections::HashMap;
7use std::time::{Duration, SystemTime, UNIX_EPOCH};
8
9/// Eviction strategy when the state store reaches capacity
10#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
11pub enum EvictionStrategy {
12    /// Evict least recently accessed entries (default)
13    #[default]
14    Lru,
15    /// Evict oldest entries by creation time
16    OldestFirst,
17    /// Reject new entries when at capacity
18    RejectNew,
19}
20
21/// Configuration for state store limits
22#[derive(Debug, Clone)]
23pub struct StateStoreConfig {
24    /// Maximum number of parameters (None = unlimited)
25    pub max_params: Option<usize>,
26    /// Time-to-live for parameters without access (None = never expire)
27    pub param_ttl: Option<Duration>,
28    /// Strategy for eviction when at capacity
29    pub eviction: EvictionStrategy,
30}
31
32impl Default for StateStoreConfig {
33    fn default() -> Self {
34        Self {
35            max_params: Some(10_000),
36            param_ttl: Some(Duration::from_secs(3600)), // 1 hour
37            eviction: EvictionStrategy::Lru,
38        }
39    }
40}
41
42impl StateStoreConfig {
43    /// Create config with no limits (for backwards compatibility)
44    pub fn unlimited() -> Self {
45        Self {
46            max_params: None,
47            param_ttl: None,
48            eviction: EvictionStrategy::Lru,
49        }
50    }
51
52    /// Create config with custom limits
53    pub fn with_limits(max_params: usize, ttl_secs: u64) -> Self {
54        Self {
55            max_params: Some(max_params),
56            param_ttl: Some(Duration::from_secs(ttl_secs)),
57            eviction: EvictionStrategy::Lru,
58        }
59    }
60}
61
62/// State of a single parameter
63#[derive(Debug, Clone)]
64pub struct ParamState {
65    /// Current value
66    pub value: Value,
67    /// Monotonic revision number
68    pub revision: u64,
69    /// Session ID of last writer
70    pub writer: String,
71    /// Timestamp of last write (microseconds)
72    pub timestamp: u64,
73    /// Timestamp of last access (microseconds) - for TTL eviction
74    pub last_accessed: u64,
75    /// Conflict resolution strategy
76    pub strategy: ConflictStrategy,
77    /// Lock holder (if locked)
78    pub lock_holder: Option<String>,
79    /// Metadata
80    pub meta: Option<ParamMeta>,
81}
82
83/// Parameter metadata
84#[derive(Debug, Clone)]
85pub struct ParamMeta {
86    pub unit: Option<String>,
87    pub range: Option<(f64, f64)>,
88    pub default: Option<Value>,
89}
90
91impl ParamState {
92    /// Create a new param state
93    pub fn new(value: Value, writer: String) -> Self {
94        let now = current_timestamp();
95        Self {
96            value,
97            revision: 1,
98            writer,
99            timestamp: now,
100            last_accessed: now,
101            strategy: ConflictStrategy::Lww,
102            lock_holder: None,
103            meta: None,
104        }
105    }
106
107    /// Update the last_accessed timestamp
108    pub fn touch(&mut self) {
109        self.last_accessed = current_timestamp();
110    }
111
112    /// Create with specific strategy
113    pub fn with_strategy(mut self, strategy: ConflictStrategy) -> Self {
114        self.strategy = strategy;
115        self
116    }
117
118    /// Create with metadata
119    pub fn with_meta(mut self, meta: ParamMeta) -> Self {
120        self.meta = Some(meta);
121        self
122    }
123
124    /// Attempt to update the value
125    ///
126    /// Returns Ok(new_revision) if update was accepted,
127    /// Err with reason if rejected.
128    pub fn try_update(
129        &mut self,
130        new_value: Value,
131        writer: &str,
132        expected_revision: Option<u64>,
133        request_lock: bool,
134        release_lock: bool,
135    ) -> Result<u64, UpdateError> {
136        let timestamp = current_timestamp();
137
138        // Check optimistic lock (if revision specified)
139        if let Some(expected) = expected_revision {
140            if expected != self.revision {
141                return Err(UpdateError::RevisionConflict {
142                    expected,
143                    actual: self.revision,
144                });
145            }
146        }
147
148        // Check lock
149        if let Some(ref holder) = self.lock_holder {
150            if holder != writer && !release_lock {
151                return Err(UpdateError::LockHeld {
152                    holder: holder.clone(),
153                });
154            }
155        }
156
157        // Handle lock release
158        if release_lock {
159            if self.lock_holder.as_deref() == Some(writer) {
160                self.lock_holder = None;
161            }
162        }
163
164        // Apply conflict resolution
165        let should_update = match self.strategy {
166            ConflictStrategy::Lww => timestamp >= self.timestamp,
167            ConflictStrategy::Max => {
168                match (&new_value, &self.value) {
169                    (Value::Float(new), Value::Float(old)) => new > old,
170                    (Value::Int(new), Value::Int(old)) => new > old,
171                    _ => true, // Fall back to LWW for non-numeric
172                }
173            }
174            ConflictStrategy::Min => match (&new_value, &self.value) {
175                (Value::Float(new), Value::Float(old)) => new < old,
176                (Value::Int(new), Value::Int(old)) => new < old,
177                _ => true,
178            },
179            ConflictStrategy::Lock => {
180                self.lock_holder.is_none() || self.lock_holder.as_deref() == Some(writer)
181            }
182            ConflictStrategy::Merge => true, // App handles merge
183        };
184
185        if !should_update {
186            return Err(UpdateError::ConflictRejected);
187        }
188
189        // Handle lock request
190        if request_lock {
191            if self.lock_holder.is_some() && self.lock_holder.as_deref() != Some(writer) {
192                return Err(UpdateError::LockHeld {
193                    holder: self.lock_holder.clone().unwrap(),
194                });
195            }
196            self.lock_holder = Some(writer.to_string());
197        }
198
199        // Apply update
200        self.value = new_value;
201        self.revision += 1;
202        self.writer = writer.to_string();
203        self.timestamp = timestamp;
204        self.last_accessed = timestamp;
205
206        Ok(self.revision)
207    }
208
209    /// Check if value is within range (if specified)
210    pub fn validate_range(&self, value: &Value) -> bool {
211        if let Some(meta) = &self.meta {
212            if let Some((min, max)) = meta.range {
213                if let Some(v) = value.as_f64() {
214                    return v >= min && v <= max;
215                }
216            }
217        }
218        true
219    }
220}
221
222/// Errors that can occur during state updates
223#[derive(Debug, Clone)]
224pub enum UpdateError {
225    RevisionConflict { expected: u64, actual: u64 },
226    LockHeld { holder: String },
227    ConflictRejected,
228    OutOfRange,
229    AtCapacity,
230}
231
232impl std::fmt::Display for UpdateError {
233    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
234        match self {
235            Self::RevisionConflict { expected, actual } => {
236                write!(
237                    f,
238                    "Revision conflict: expected {}, got {}",
239                    expected, actual
240                )
241            }
242            Self::LockHeld { holder } => {
243                write!(f, "Parameter locked by {}", holder)
244            }
245            Self::ConflictRejected => {
246                write!(f, "Update rejected by conflict strategy")
247            }
248            Self::OutOfRange => {
249                write!(f, "Value out of allowed range")
250            }
251            Self::AtCapacity => {
252                write!(f, "State store at capacity")
253            }
254        }
255    }
256}
257
258impl std::error::Error for UpdateError {}
259
260/// Error returned when state store is at capacity
261#[derive(Debug, Clone)]
262pub struct CapacityError;
263
264impl std::fmt::Display for CapacityError {
265    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
266        write!(f, "State store at capacity")
267    }
268}
269
270impl std::error::Error for CapacityError {}
271
272/// State store for multiple params
273#[derive(Debug)]
274pub struct StateStore {
275    params: HashMap<String, ParamState>,
276    config: StateStoreConfig,
277}
278
279impl Default for StateStore {
280    fn default() -> Self {
281        Self {
282            params: HashMap::new(),
283            config: StateStoreConfig::unlimited(), // Backwards compatible default
284        }
285    }
286}
287
288impl StateStore {
289    /// Create a new state store with default (unlimited) config
290    pub fn new() -> Self {
291        Self::default()
292    }
293
294    /// Create a new state store with the specified config
295    pub fn with_config(config: StateStoreConfig) -> Self {
296        Self {
297            params: HashMap::new(),
298            config,
299        }
300    }
301
302    /// Get the current configuration
303    pub fn config(&self) -> &StateStoreConfig {
304        &self.config
305    }
306
307    /// Get a param's current state (does not update last_accessed)
308    pub fn get(&self, address: &str) -> Option<&ParamState> {
309        self.params.get(address)
310    }
311
312    /// Get a param's current state and update last_accessed
313    pub fn get_mut(&mut self, address: &str) -> Option<&mut ParamState> {
314        let param = self.params.get_mut(address)?;
315        param.touch();
316        Some(param)
317    }
318
319    /// Get a param's current value (does not update last_accessed)
320    pub fn get_value(&self, address: &str) -> Option<&Value> {
321        self.params.get(address).map(|p| &p.value)
322    }
323
324    /// Get a param's current value and update last_accessed
325    pub fn get_value_mut(&mut self, address: &str) -> Option<&Value> {
326        let param = self.params.get_mut(address)?;
327        param.touch();
328        Some(&param.value)
329    }
330
331    /// Set a param value, creating if necessary
332    pub fn set(
333        &mut self,
334        address: &str,
335        value: Value,
336        writer: &str,
337        revision: Option<u64>,
338        lock: bool,
339        unlock: bool,
340    ) -> Result<u64, UpdateError> {
341        if let Some(param) = self.params.get_mut(address) {
342            param.try_update(value, writer, revision, lock, unlock)
343        } else {
344            // Check capacity before creating new param
345            if let Some(max) = self.config.max_params {
346                if self.params.len() >= max {
347                    match self.config.eviction {
348                        EvictionStrategy::RejectNew => {
349                            return Err(UpdateError::AtCapacity);
350                        }
351                        EvictionStrategy::Lru => {
352                            self.evict_lru();
353                        }
354                        EvictionStrategy::OldestFirst => {
355                            self.evict_oldest();
356                        }
357                    }
358                }
359            }
360
361            // Create new param
362            let mut param = ParamState::new(value, writer.to_string());
363            if lock {
364                param.lock_holder = Some(writer.to_string());
365            }
366            let rev = param.revision;
367            self.params.insert(address.to_string(), param);
368            Ok(rev)
369        }
370    }
371
372    /// Evict the least recently accessed param
373    fn evict_lru(&mut self) {
374        if let Some(oldest_key) = self
375            .params
376            .iter()
377            .min_by_key(|(_, v)| v.last_accessed)
378            .map(|(k, _)| k.clone())
379        {
380            self.params.remove(&oldest_key);
381        }
382    }
383
384    /// Evict the oldest param by creation time (lowest revision is oldest)
385    fn evict_oldest(&mut self) {
386        if let Some(oldest_key) = self
387            .params
388            .iter()
389            .min_by_key(|(_, v)| v.timestamp)
390            .map(|(k, _)| k.clone())
391        {
392            self.params.remove(&oldest_key);
393        }
394    }
395
396    /// Remove params that haven't been accessed within the TTL
397    /// Returns the number of params removed
398    pub fn cleanup_stale(&mut self, ttl: Duration) -> usize {
399        let now = current_timestamp();
400        let ttl_micros = ttl.as_micros() as u64;
401        let cutoff = now.saturating_sub(ttl_micros);
402
403        let before = self.params.len();
404        self.params.retain(|_, v| v.last_accessed >= cutoff);
405        before - self.params.len()
406    }
407
408    /// Run cleanup using the configured TTL (if any)
409    /// Returns the number of params removed
410    pub fn cleanup_stale_with_config(&mut self) -> usize {
411        if let Some(ttl) = self.config.param_ttl {
412            self.cleanup_stale(ttl)
413        } else {
414            0
415        }
416    }
417
418    /// Get all params matching a pattern
419    pub fn get_matching(&self, pattern: &str) -> Vec<(&str, &ParamState)> {
420        use crate::address::glob_match;
421
422        self.params
423            .iter()
424            .filter(|(addr, _)| glob_match(pattern, addr))
425            .map(|(addr, state)| (addr.as_str(), state))
426            .collect()
427    }
428
429    /// Get all params as a snapshot
430    pub fn snapshot(&self) -> Vec<(&str, &ParamState)> {
431        self.params.iter().map(|(k, v)| (k.as_str(), v)).collect()
432    }
433
434    /// Number of params
435    pub fn len(&self) -> usize {
436        self.params.len()
437    }
438
439    /// Check if empty
440    pub fn is_empty(&self) -> bool {
441        self.params.is_empty()
442    }
443
444    /// Remove a param
445    pub fn remove(&mut self, address: &str) -> Option<ParamState> {
446        self.params.remove(address)
447    }
448
449    /// Clear all params
450    pub fn clear(&mut self) {
451        self.params.clear();
452    }
453}
454
455/// Get current timestamp in microseconds
456fn current_timestamp() -> u64 {
457    SystemTime::now()
458        .duration_since(UNIX_EPOCH)
459        .map(|d| d.as_micros() as u64)
460        .unwrap_or(0)
461}
462
463#[cfg(test)]
464mod tests {
465    use super::*;
466
467    #[test]
468    fn test_basic_update() {
469        let mut state = ParamState::new(Value::Float(0.5), "session1".to_string());
470
471        let result = state.try_update(Value::Float(0.75), "session2", None, false, false);
472
473        assert!(result.is_ok());
474        assert_eq!(state.revision, 2);
475        assert_eq!(state.value, Value::Float(0.75));
476        assert_eq!(state.writer, "session2");
477    }
478
479    #[test]
480    fn test_revision_conflict() {
481        let mut state = ParamState::new(Value::Float(0.5), "session1".to_string());
482
483        let result = state.try_update(
484            Value::Float(0.75),
485            "session2",
486            Some(999), // Wrong revision
487            false,
488            false,
489        );
490
491        assert!(matches!(result, Err(UpdateError::RevisionConflict { .. })));
492    }
493
494    #[test]
495    fn test_locking() {
496        let mut state = ParamState::new(Value::Float(0.5), "session1".to_string());
497
498        // Session 1 takes lock
499        let result = state.try_update(
500            Value::Float(0.6),
501            "session1",
502            None,
503            true, // Request lock
504            false,
505        );
506        assert!(result.is_ok());
507        assert_eq!(state.lock_holder, Some("session1".to_string()));
508
509        // Session 2 tries to update - should fail
510        let result = state.try_update(Value::Float(0.7), "session2", None, false, false);
511        assert!(matches!(result, Err(UpdateError::LockHeld { .. })));
512
513        // Session 1 can still update
514        let result = state.try_update(Value::Float(0.8), "session1", None, false, false);
515        assert!(result.is_ok());
516    }
517
518    #[test]
519    fn test_max_strategy() {
520        let mut state = ParamState::new(Value::Float(0.5), "session1".to_string())
521            .with_strategy(ConflictStrategy::Max);
522
523        // Higher value wins
524        let result = state.try_update(Value::Float(0.8), "session2", None, false, false);
525        assert!(result.is_ok());
526        assert_eq!(state.value, Value::Float(0.8));
527
528        // Lower value rejected
529        let result = state.try_update(Value::Float(0.3), "session3", None, false, false);
530        assert!(matches!(result, Err(UpdateError::ConflictRejected)));
531        assert_eq!(state.value, Value::Float(0.8)); // Unchanged
532    }
533
534    #[test]
535    fn test_state_store() {
536        let mut store = StateStore::new();
537
538        store
539            .set("/test/a", Value::Float(1.0), "s1", None, false, false)
540            .unwrap();
541        store
542            .set("/test/b", Value::Float(2.0), "s1", None, false, false)
543            .unwrap();
544        store
545            .set("/other/c", Value::Float(3.0), "s1", None, false, false)
546            .unwrap();
547
548        assert_eq!(store.len(), 3);
549
550        let matching = store.get_matching("/test/*");
551        assert_eq!(matching.len(), 2);
552    }
553
554    #[test]
555    fn test_state_store_capacity_reject() {
556        let config = StateStoreConfig {
557            max_params: Some(2),
558            param_ttl: None,
559            eviction: EvictionStrategy::RejectNew,
560        };
561        let mut store = StateStore::with_config(config);
562
563        store
564            .set("/test/a", Value::Float(1.0), "s1", None, false, false)
565            .unwrap();
566        store
567            .set("/test/b", Value::Float(2.0), "s1", None, false, false)
568            .unwrap();
569
570        // Third should fail
571        let result = store.set("/test/c", Value::Float(3.0), "s1", None, false, false);
572        assert!(matches!(result, Err(UpdateError::AtCapacity)));
573        assert_eq!(store.len(), 2);
574
575        // Updating existing should still work
576        store
577            .set("/test/a", Value::Float(1.5), "s1", None, false, false)
578            .unwrap();
579        assert_eq!(store.get_value("/test/a"), Some(&Value::Float(1.5)));
580    }
581
582    #[test]
583    fn test_state_store_capacity_lru_eviction() {
584        let config = StateStoreConfig {
585            max_params: Some(2),
586            param_ttl: None,
587            eviction: EvictionStrategy::Lru,
588        };
589        let mut store = StateStore::with_config(config);
590
591        store
592            .set("/test/a", Value::Float(1.0), "s1", None, false, false)
593            .unwrap();
594        std::thread::sleep(std::time::Duration::from_millis(1));
595        store
596            .set("/test/b", Value::Float(2.0), "s1", None, false, false)
597            .unwrap();
598
599        // Access /test/a to make it more recent
600        std::thread::sleep(std::time::Duration::from_millis(1));
601        store.get_mut("/test/a");
602
603        // Third should evict /test/b (least recently accessed)
604        store
605            .set("/test/c", Value::Float(3.0), "s1", None, false, false)
606            .unwrap();
607
608        assert_eq!(store.len(), 2);
609        assert!(store.get("/test/a").is_some());
610        assert!(store.get("/test/b").is_none()); // Evicted
611        assert!(store.get("/test/c").is_some());
612    }
613
614    #[test]
615    fn test_state_store_capacity_oldest_eviction() {
616        let config = StateStoreConfig {
617            max_params: Some(2),
618            param_ttl: None,
619            eviction: EvictionStrategy::OldestFirst,
620        };
621        let mut store = StateStore::with_config(config);
622
623        store
624            .set("/test/a", Value::Float(1.0), "s1", None, false, false)
625            .unwrap();
626        std::thread::sleep(std::time::Duration::from_millis(1));
627        store
628            .set("/test/b", Value::Float(2.0), "s1", None, false, false)
629            .unwrap();
630
631        // Third should evict /test/a (oldest)
632        store
633            .set("/test/c", Value::Float(3.0), "s1", None, false, false)
634            .unwrap();
635
636        assert_eq!(store.len(), 2);
637        assert!(store.get("/test/a").is_none()); // Evicted
638        assert!(store.get("/test/b").is_some());
639        assert!(store.get("/test/c").is_some());
640    }
641
642    #[test]
643    fn test_state_store_cleanup_stale() {
644        let mut store = StateStore::new();
645
646        store
647            .set("/test/a", Value::Float(1.0), "s1", None, false, false)
648            .unwrap();
649        store
650            .set("/test/b", Value::Float(2.0), "s1", None, false, false)
651            .unwrap();
652
653        // Sleep a bit, then access /test/a
654        std::thread::sleep(std::time::Duration::from_millis(10));
655        store.get_mut("/test/a");
656
657        // Cleanup with a very short TTL - should remove /test/b but not /test/a
658        let removed = store.cleanup_stale(Duration::from_millis(5));
659        assert_eq!(removed, 1);
660        assert!(store.get("/test/a").is_some());
661        assert!(store.get("/test/b").is_none());
662    }
663
664    #[test]
665    fn test_state_store_cleanup_stale_with_config() {
666        let config = StateStoreConfig {
667            max_params: None,
668            param_ttl: Some(Duration::from_millis(5)),
669            eviction: EvictionStrategy::Lru,
670        };
671        let mut store = StateStore::with_config(config);
672
673        store
674            .set("/test/a", Value::Float(1.0), "s1", None, false, false)
675            .unwrap();
676
677        // Immediate cleanup should remove nothing
678        let removed = store.cleanup_stale_with_config();
679        assert_eq!(removed, 0);
680
681        // Wait and cleanup
682        std::thread::sleep(std::time::Duration::from_millis(10));
683        let removed = store.cleanup_stale_with_config();
684        assert_eq!(removed, 1);
685        assert!(store.is_empty());
686    }
687
688    #[test]
689    fn test_last_accessed_tracking() {
690        let mut state = ParamState::new(Value::Float(0.5), "session1".to_string());
691        let initial_accessed = state.last_accessed;
692
693        std::thread::sleep(std::time::Duration::from_millis(1));
694        state.touch();
695
696        assert!(state.last_accessed > initial_accessed);
697    }
698}