Skip to main content

clasp_core/
state.rs

1//! State management for Clasp params
2//!
3//! Provides conflict resolution and revision tracking for stateful parameters.
4
5use crate::{ConflictStrategy, Value};
6use std::collections::HashMap;
7use std::time::{Duration, SystemTime, UNIX_EPOCH};
8
9/// Eviction strategy when the state store reaches capacity
10#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
11pub enum EvictionStrategy {
12    /// Evict least recently accessed entries (default)
13    #[default]
14    Lru,
15    /// Evict oldest entries by creation time
16    OldestFirst,
17    /// Reject new entries when at capacity
18    RejectNew,
19}
20
21/// Configuration for state store limits
22#[derive(Debug, Clone)]
23pub struct StateStoreConfig {
24    /// Maximum number of parameters (None = unlimited)
25    pub max_params: Option<usize>,
26    /// Time-to-live for parameters without access (None = never expire)
27    pub param_ttl: Option<Duration>,
28    /// Strategy for eviction when at capacity
29    pub eviction: EvictionStrategy,
30}
31
32impl Default for StateStoreConfig {
33    fn default() -> Self {
34        Self {
35            max_params: Some(10_000),
36            param_ttl: Some(Duration::from_secs(3600)), // 1 hour
37            eviction: EvictionStrategy::Lru,
38        }
39    }
40}
41
42impl StateStoreConfig {
43    /// Create config with no limits (for backwards compatibility)
44    pub fn unlimited() -> Self {
45        Self {
46            max_params: None,
47            param_ttl: None,
48            eviction: EvictionStrategy::Lru,
49        }
50    }
51
52    /// Create config with custom limits
53    pub fn with_limits(max_params: usize, ttl_secs: u64) -> Self {
54        Self {
55            max_params: Some(max_params),
56            param_ttl: Some(Duration::from_secs(ttl_secs)),
57            eviction: EvictionStrategy::Lru,
58        }
59    }
60}
61
62/// State of a single parameter
63#[derive(Debug, Clone)]
64pub struct ParamState {
65    /// Current value
66    pub value: Value,
67    /// Monotonic revision number
68    pub revision: u64,
69    /// Session ID of last writer
70    pub writer: String,
71    /// Timestamp of last write (microseconds)
72    pub timestamp: u64,
73    /// Timestamp of last access (microseconds) - for TTL eviction
74    pub last_accessed: u64,
75    /// Conflict resolution strategy
76    pub strategy: ConflictStrategy,
77    /// Lock holder (if locked)
78    pub lock_holder: Option<String>,
79    /// Metadata
80    pub meta: Option<ParamMeta>,
81    /// Origin router ID (for federation loop prevention)
82    pub origin: Option<String>,
83}
84
85/// Parameter metadata
86#[derive(Debug, Clone)]
87pub struct ParamMeta {
88    pub unit: Option<String>,
89    pub range: Option<(f64, f64)>,
90    pub default: Option<Value>,
91}
92
93impl ParamState {
94    /// Create a new param state
95    pub fn new(value: Value, writer: String) -> Self {
96        let now = current_timestamp();
97        Self {
98            value,
99            revision: 1,
100            writer,
101            timestamp: now,
102            last_accessed: now,
103            strategy: ConflictStrategy::Lww,
104            lock_holder: None,
105            meta: None,
106            origin: None,
107        }
108    }
109
110    /// Update the last_accessed timestamp
111    pub fn touch(&mut self) {
112        self.last_accessed = current_timestamp();
113    }
114
115    /// Create with specific strategy
116    pub fn with_strategy(mut self, strategy: ConflictStrategy) -> Self {
117        self.strategy = strategy;
118        self
119    }
120
121    /// Create with metadata
122    pub fn with_meta(mut self, meta: ParamMeta) -> Self {
123        self.meta = Some(meta);
124        self
125    }
126
127    /// Attempt to update the value
128    ///
129    /// Returns Ok(new_revision) if update was accepted,
130    /// Err with reason if rejected.
131    pub fn try_update(
132        &mut self,
133        new_value: Value,
134        writer: &str,
135        expected_revision: Option<u64>,
136        request_lock: bool,
137        release_lock: bool,
138    ) -> Result<u64, UpdateError> {
139        let timestamp = current_timestamp();
140
141        // Check optimistic lock (if revision specified)
142        if let Some(expected) = expected_revision {
143            if expected != self.revision {
144                return Err(UpdateError::RevisionConflict {
145                    expected,
146                    actual: self.revision,
147                });
148            }
149        }
150
151        // Check lock
152        if let Some(ref holder) = self.lock_holder {
153            if holder != writer && !release_lock {
154                return Err(UpdateError::LockHeld {
155                    holder: holder.clone(),
156                });
157            }
158        }
159
160        // Handle lock release
161        if release_lock && self.lock_holder.as_deref() == Some(writer) {
162            self.lock_holder = None;
163        }
164
165        // Apply conflict resolution
166        let should_update = match self.strategy {
167            ConflictStrategy::Lww => timestamp >= self.timestamp,
168            ConflictStrategy::Max => {
169                match (&new_value, &self.value) {
170                    (Value::Float(new), Value::Float(old)) => new > old,
171                    (Value::Int(new), Value::Int(old)) => new > old,
172                    _ => true, // Fall back to LWW for non-numeric
173                }
174            }
175            ConflictStrategy::Min => match (&new_value, &self.value) {
176                (Value::Float(new), Value::Float(old)) => new < old,
177                (Value::Int(new), Value::Int(old)) => new < old,
178                _ => true,
179            },
180            ConflictStrategy::Lock => {
181                self.lock_holder.is_none() || self.lock_holder.as_deref() == Some(writer)
182            }
183            ConflictStrategy::Merge => true, // App handles merge
184        };
185
186        if !should_update {
187            return Err(UpdateError::ConflictRejected);
188        }
189
190        // Handle lock request
191        if request_lock {
192            if self.lock_holder.is_some() && self.lock_holder.as_deref() != Some(writer) {
193                return Err(UpdateError::LockHeld {
194                    holder: self.lock_holder.clone().unwrap(),
195                });
196            }
197            self.lock_holder = Some(writer.to_string());
198        }
199
200        // Apply update
201        self.value = new_value;
202        self.revision += 1;
203        self.writer = writer.to_string();
204        self.timestamp = timestamp;
205        self.last_accessed = timestamp;
206
207        Ok(self.revision)
208    }
209
210    /// Check if value is within range (if specified)
211    pub fn validate_range(&self, value: &Value) -> bool {
212        if let Some(meta) = &self.meta {
213            if let Some((min, max)) = meta.range {
214                if let Some(v) = value.as_f64() {
215                    return v >= min && v <= max;
216                }
217            }
218        }
219        true
220    }
221}
222
223/// Errors that can occur during state updates
224#[derive(Debug, Clone)]
225pub enum UpdateError {
226    RevisionConflict { expected: u64, actual: u64 },
227    LockHeld { holder: String },
228    ConflictRejected,
229    OutOfRange,
230    AtCapacity,
231}
232
233impl std::fmt::Display for UpdateError {
234    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
235        match self {
236            Self::RevisionConflict { expected, actual } => {
237                write!(
238                    f,
239                    "Revision conflict: expected {}, got {}",
240                    expected, actual
241                )
242            }
243            Self::LockHeld { holder } => {
244                write!(f, "Parameter locked by {}", holder)
245            }
246            Self::ConflictRejected => {
247                write!(f, "Update rejected by conflict strategy")
248            }
249            Self::OutOfRange => {
250                write!(f, "Value out of allowed range")
251            }
252            Self::AtCapacity => {
253                write!(f, "State store at capacity")
254            }
255        }
256    }
257}
258
259impl std::error::Error for UpdateError {}
260
261/// Error returned when state store is at capacity
262#[derive(Debug, Clone)]
263pub struct CapacityError;
264
265impl std::fmt::Display for CapacityError {
266    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
267        write!(f, "State store at capacity")
268    }
269}
270
271impl std::error::Error for CapacityError {}
272
273/// State store for multiple params
274#[derive(Debug)]
275pub struct StateStore {
276    params: HashMap<String, ParamState>,
277    config: StateStoreConfig,
278}
279
280impl Default for StateStore {
281    fn default() -> Self {
282        Self {
283            params: HashMap::new(),
284            config: StateStoreConfig::unlimited(), // Backwards compatible default
285        }
286    }
287}
288
289impl StateStore {
290    /// Create a new state store with default (unlimited) config
291    pub fn new() -> Self {
292        Self::default()
293    }
294
295    /// Create a new state store with the specified config
296    pub fn with_config(config: StateStoreConfig) -> Self {
297        Self {
298            params: HashMap::new(),
299            config,
300        }
301    }
302
303    /// Get the current configuration
304    pub fn config(&self) -> &StateStoreConfig {
305        &self.config
306    }
307
308    /// Get a param's current state (does not update last_accessed)
309    pub fn get(&self, address: &str) -> Option<&ParamState> {
310        self.params.get(address)
311    }
312
313    /// Get a param's current state and update last_accessed
314    pub fn get_mut(&mut self, address: &str) -> Option<&mut ParamState> {
315        let param = self.params.get_mut(address)?;
316        param.touch();
317        Some(param)
318    }
319
320    /// Get a param's current value (does not update last_accessed)
321    pub fn get_value(&self, address: &str) -> Option<&Value> {
322        self.params.get(address).map(|p| &p.value)
323    }
324
325    /// Get a param's current value and update last_accessed
326    pub fn get_value_mut(&mut self, address: &str) -> Option<&Value> {
327        let param = self.params.get_mut(address)?;
328        param.touch();
329        Some(&param.value)
330    }
331
332    /// Set a param value, creating if necessary
333    pub fn set(
334        &mut self,
335        address: &str,
336        value: Value,
337        writer: &str,
338        revision: Option<u64>,
339        lock: bool,
340        unlock: bool,
341    ) -> Result<u64, UpdateError> {
342        if let Some(param) = self.params.get_mut(address) {
343            param.try_update(value, writer, revision, lock, unlock)
344        } else {
345            // Check capacity before creating new param
346            if let Some(max) = self.config.max_params {
347                if self.params.len() >= max {
348                    match self.config.eviction {
349                        EvictionStrategy::RejectNew => {
350                            return Err(UpdateError::AtCapacity);
351                        }
352                        EvictionStrategy::Lru => {
353                            self.evict_lru();
354                        }
355                        EvictionStrategy::OldestFirst => {
356                            self.evict_oldest();
357                        }
358                    }
359                }
360            }
361
362            // Create new param
363            let mut param = ParamState::new(value, writer.to_string());
364            if lock {
365                param.lock_holder = Some(writer.to_string());
366            }
367            let rev = param.revision;
368            self.params.insert(address.to_string(), param);
369            Ok(rev)
370        }
371    }
372
373    /// Evict the least recently accessed param
374    fn evict_lru(&mut self) {
375        if let Some(oldest_key) = self
376            .params
377            .iter()
378            .min_by_key(|(_, v)| v.last_accessed)
379            .map(|(k, _)| k.clone())
380        {
381            self.params.remove(&oldest_key);
382        }
383    }
384
385    /// Evict the oldest param by creation time (lowest revision is oldest)
386    fn evict_oldest(&mut self) {
387        if let Some(oldest_key) = self
388            .params
389            .iter()
390            .min_by_key(|(_, v)| v.timestamp)
391            .map(|(k, _)| k.clone())
392        {
393            self.params.remove(&oldest_key);
394        }
395    }
396
397    /// Remove params that haven't been accessed within the TTL
398    /// Returns the number of params removed
399    pub fn cleanup_stale(&mut self, ttl: Duration) -> usize {
400        let now = current_timestamp();
401        let ttl_micros = ttl.as_micros() as u64;
402        let cutoff = now.saturating_sub(ttl_micros);
403
404        let before = self.params.len();
405        self.params.retain(|_, v| v.last_accessed >= cutoff);
406        before - self.params.len()
407    }
408
409    /// Run cleanup using the configured TTL (if any)
410    /// Returns the number of params removed
411    pub fn cleanup_stale_with_config(&mut self) -> usize {
412        if let Some(ttl) = self.config.param_ttl {
413            self.cleanup_stale(ttl)
414        } else {
415            0
416        }
417    }
418
419    /// Get all params matching a pattern
420    pub fn get_matching(&self, pattern: &str) -> Vec<(&str, &ParamState)> {
421        use crate::address::glob_match;
422
423        self.params
424            .iter()
425            .filter(|(addr, _)| glob_match(pattern, addr))
426            .map(|(addr, state)| (addr.as_str(), state))
427            .collect()
428    }
429
430    /// Get all params as a snapshot
431    pub fn snapshot(&self) -> Vec<(&str, &ParamState)> {
432        self.params.iter().map(|(k, v)| (k.as_str(), v)).collect()
433    }
434
435    /// Number of params
436    pub fn len(&self) -> usize {
437        self.params.len()
438    }
439
440    /// Check if empty
441    pub fn is_empty(&self) -> bool {
442        self.params.is_empty()
443    }
444
445    /// Remove a param
446    pub fn remove(&mut self, address: &str) -> Option<ParamState> {
447        self.params.remove(address)
448    }
449
450    /// Clear all params
451    pub fn clear(&mut self) {
452        self.params.clear();
453    }
454}
455
456/// Get current timestamp in microseconds
457fn current_timestamp() -> u64 {
458    SystemTime::now()
459        .duration_since(UNIX_EPOCH)
460        .map(|d| d.as_micros() as u64)
461        .unwrap_or(0)
462}
463
464#[cfg(test)]
465mod tests {
466    use super::*;
467
468    #[test]
469    fn test_basic_update() {
470        let mut state = ParamState::new(Value::Float(0.5), "session1".to_string());
471
472        let result = state.try_update(Value::Float(0.75), "session2", None, false, false);
473
474        assert!(result.is_ok());
475        assert_eq!(state.revision, 2);
476        assert_eq!(state.value, Value::Float(0.75));
477        assert_eq!(state.writer, "session2");
478    }
479
480    #[test]
481    fn test_revision_conflict() {
482        let mut state = ParamState::new(Value::Float(0.5), "session1".to_string());
483
484        let result = state.try_update(
485            Value::Float(0.75),
486            "session2",
487            Some(999), // Wrong revision
488            false,
489            false,
490        );
491
492        assert!(matches!(result, Err(UpdateError::RevisionConflict { .. })));
493    }
494
495    #[test]
496    fn test_locking() {
497        let mut state = ParamState::new(Value::Float(0.5), "session1".to_string());
498
499        // Session 1 takes lock
500        let result = state.try_update(
501            Value::Float(0.6),
502            "session1",
503            None,
504            true, // Request lock
505            false,
506        );
507        assert!(result.is_ok());
508        assert_eq!(state.lock_holder, Some("session1".to_string()));
509
510        // Session 2 tries to update - should fail
511        let result = state.try_update(Value::Float(0.7), "session2", None, false, false);
512        assert!(matches!(result, Err(UpdateError::LockHeld { .. })));
513
514        // Session 1 can still update
515        let result = state.try_update(Value::Float(0.8), "session1", None, false, false);
516        assert!(result.is_ok());
517    }
518
519    #[test]
520    fn test_max_strategy() {
521        let mut state = ParamState::new(Value::Float(0.5), "session1".to_string())
522            .with_strategy(ConflictStrategy::Max);
523
524        // Higher value wins
525        let result = state.try_update(Value::Float(0.8), "session2", None, false, false);
526        assert!(result.is_ok());
527        assert_eq!(state.value, Value::Float(0.8));
528
529        // Lower value rejected
530        let result = state.try_update(Value::Float(0.3), "session3", None, false, false);
531        assert!(matches!(result, Err(UpdateError::ConflictRejected)));
532        assert_eq!(state.value, Value::Float(0.8)); // Unchanged
533    }
534
535    #[test]
536    fn test_state_store() {
537        let mut store = StateStore::new();
538
539        store
540            .set("/test/a", Value::Float(1.0), "s1", None, false, false)
541            .unwrap();
542        store
543            .set("/test/b", Value::Float(2.0), "s1", None, false, false)
544            .unwrap();
545        store
546            .set("/other/c", Value::Float(3.0), "s1", None, false, false)
547            .unwrap();
548
549        assert_eq!(store.len(), 3);
550
551        let matching = store.get_matching("/test/*");
552        assert_eq!(matching.len(), 2);
553    }
554
555    #[test]
556    fn test_state_store_capacity_reject() {
557        let config = StateStoreConfig {
558            max_params: Some(2),
559            param_ttl: None,
560            eviction: EvictionStrategy::RejectNew,
561        };
562        let mut store = StateStore::with_config(config);
563
564        store
565            .set("/test/a", Value::Float(1.0), "s1", None, false, false)
566            .unwrap();
567        store
568            .set("/test/b", Value::Float(2.0), "s1", None, false, false)
569            .unwrap();
570
571        // Third should fail
572        let result = store.set("/test/c", Value::Float(3.0), "s1", None, false, false);
573        assert!(matches!(result, Err(UpdateError::AtCapacity)));
574        assert_eq!(store.len(), 2);
575
576        // Updating existing should still work
577        store
578            .set("/test/a", Value::Float(1.5), "s1", None, false, false)
579            .unwrap();
580        assert_eq!(store.get_value("/test/a"), Some(&Value::Float(1.5)));
581    }
582
583    #[test]
584    fn test_state_store_capacity_lru_eviction() {
585        let config = StateStoreConfig {
586            max_params: Some(2),
587            param_ttl: None,
588            eviction: EvictionStrategy::Lru,
589        };
590        let mut store = StateStore::with_config(config);
591
592        store
593            .set("/test/a", Value::Float(1.0), "s1", None, false, false)
594            .unwrap();
595        std::thread::sleep(std::time::Duration::from_millis(1));
596        store
597            .set("/test/b", Value::Float(2.0), "s1", None, false, false)
598            .unwrap();
599
600        // Access /test/a to make it more recent
601        std::thread::sleep(std::time::Duration::from_millis(1));
602        store.get_mut("/test/a");
603
604        // Third should evict /test/b (least recently accessed)
605        store
606            .set("/test/c", Value::Float(3.0), "s1", None, false, false)
607            .unwrap();
608
609        assert_eq!(store.len(), 2);
610        assert!(store.get("/test/a").is_some());
611        assert!(store.get("/test/b").is_none()); // Evicted
612        assert!(store.get("/test/c").is_some());
613    }
614
615    #[test]
616    fn test_state_store_capacity_oldest_eviction() {
617        let config = StateStoreConfig {
618            max_params: Some(2),
619            param_ttl: None,
620            eviction: EvictionStrategy::OldestFirst,
621        };
622        let mut store = StateStore::with_config(config);
623
624        store
625            .set("/test/a", Value::Float(1.0), "s1", None, false, false)
626            .unwrap();
627        std::thread::sleep(std::time::Duration::from_millis(1));
628        store
629            .set("/test/b", Value::Float(2.0), "s1", None, false, false)
630            .unwrap();
631
632        // Third should evict /test/a (oldest)
633        store
634            .set("/test/c", Value::Float(3.0), "s1", None, false, false)
635            .unwrap();
636
637        assert_eq!(store.len(), 2);
638        assert!(store.get("/test/a").is_none()); // Evicted
639        assert!(store.get("/test/b").is_some());
640        assert!(store.get("/test/c").is_some());
641    }
642
643    #[test]
644    fn test_state_store_cleanup_stale() {
645        let mut store = StateStore::new();
646
647        store
648            .set("/test/a", Value::Float(1.0), "s1", None, false, false)
649            .unwrap();
650        store
651            .set("/test/b", Value::Float(2.0), "s1", None, false, false)
652            .unwrap();
653
654        // Sleep a bit, then access /test/a
655        std::thread::sleep(std::time::Duration::from_millis(10));
656        store.get_mut("/test/a");
657
658        // Cleanup with a very short TTL - should remove /test/b but not /test/a
659        let removed = store.cleanup_stale(Duration::from_millis(5));
660        assert_eq!(removed, 1);
661        assert!(store.get("/test/a").is_some());
662        assert!(store.get("/test/b").is_none());
663    }
664
665    #[test]
666    fn test_state_store_cleanup_stale_with_config() {
667        let config = StateStoreConfig {
668            max_params: None,
669            param_ttl: Some(Duration::from_millis(5)),
670            eviction: EvictionStrategy::Lru,
671        };
672        let mut store = StateStore::with_config(config);
673
674        store
675            .set("/test/a", Value::Float(1.0), "s1", None, false, false)
676            .unwrap();
677
678        // Immediate cleanup should remove nothing
679        let removed = store.cleanup_stale_with_config();
680        assert_eq!(removed, 0);
681
682        // Wait and cleanup
683        std::thread::sleep(std::time::Duration::from_millis(10));
684        let removed = store.cleanup_stale_with_config();
685        assert_eq!(removed, 1);
686        assert!(store.is_empty());
687    }
688
689    #[test]
690    fn test_last_accessed_tracking() {
691        let mut state = ParamState::new(Value::Float(0.5), "session1".to_string());
692        let initial_accessed = state.last_accessed;
693
694        std::thread::sleep(std::time::Duration::from_millis(1));
695        state.touch();
696
697        assert!(state.last_accessed > initial_accessed);
698    }
699}