clasp_core/
state.rs

1//! State management for Clasp params
2//!
3//! Provides conflict resolution and revision tracking for stateful parameters.
4
5use crate::{ConflictStrategy, Value};
6use std::collections::HashMap;
7use std::time::{SystemTime, UNIX_EPOCH};
8
9/// State of a single parameter
10#[derive(Debug, Clone)]
11pub struct ParamState {
12    /// Current value
13    pub value: Value,
14    /// Monotonic revision number
15    pub revision: u64,
16    /// Session ID of last writer
17    pub writer: String,
18    /// Timestamp of last write (microseconds)
19    pub timestamp: u64,
20    /// Conflict resolution strategy
21    pub strategy: ConflictStrategy,
22    /// Lock holder (if locked)
23    pub lock_holder: Option<String>,
24    /// Metadata
25    pub meta: Option<ParamMeta>,
26}
27
28/// Parameter metadata
29#[derive(Debug, Clone)]
30pub struct ParamMeta {
31    pub unit: Option<String>,
32    pub range: Option<(f64, f64)>,
33    pub default: Option<Value>,
34}
35
36impl ParamState {
37    /// Create a new param state
38    pub fn new(value: Value, writer: String) -> Self {
39        Self {
40            value,
41            revision: 1,
42            writer,
43            timestamp: current_timestamp(),
44            strategy: ConflictStrategy::Lww,
45            lock_holder: None,
46            meta: None,
47        }
48    }
49
50    /// Create with specific strategy
51    pub fn with_strategy(mut self, strategy: ConflictStrategy) -> Self {
52        self.strategy = strategy;
53        self
54    }
55
56    /// Create with metadata
57    pub fn with_meta(mut self, meta: ParamMeta) -> Self {
58        self.meta = Some(meta);
59        self
60    }
61
62    /// Attempt to update the value
63    ///
64    /// Returns Ok(new_revision) if update was accepted,
65    /// Err with reason if rejected.
66    pub fn try_update(
67        &mut self,
68        new_value: Value,
69        writer: &str,
70        expected_revision: Option<u64>,
71        request_lock: bool,
72        release_lock: bool,
73    ) -> Result<u64, UpdateError> {
74        let timestamp = current_timestamp();
75
76        // Check optimistic lock (if revision specified)
77        if let Some(expected) = expected_revision {
78            if expected != self.revision {
79                return Err(UpdateError::RevisionConflict {
80                    expected,
81                    actual: self.revision,
82                });
83            }
84        }
85
86        // Check lock
87        if let Some(ref holder) = self.lock_holder {
88            if holder != writer && !release_lock {
89                return Err(UpdateError::LockHeld {
90                    holder: holder.clone(),
91                });
92            }
93        }
94
95        // Handle lock release
96        if release_lock {
97            if self.lock_holder.as_deref() == Some(writer) {
98                self.lock_holder = None;
99            }
100        }
101
102        // Apply conflict resolution
103        let should_update = match self.strategy {
104            ConflictStrategy::Lww => timestamp >= self.timestamp,
105            ConflictStrategy::Max => {
106                match (&new_value, &self.value) {
107                    (Value::Float(new), Value::Float(old)) => new > old,
108                    (Value::Int(new), Value::Int(old)) => new > old,
109                    _ => true, // Fall back to LWW for non-numeric
110                }
111            }
112            ConflictStrategy::Min => match (&new_value, &self.value) {
113                (Value::Float(new), Value::Float(old)) => new < old,
114                (Value::Int(new), Value::Int(old)) => new < old,
115                _ => true,
116            },
117            ConflictStrategy::Lock => {
118                self.lock_holder.is_none() || self.lock_holder.as_deref() == Some(writer)
119            }
120            ConflictStrategy::Merge => true, // App handles merge
121        };
122
123        if !should_update {
124            return Err(UpdateError::ConflictRejected);
125        }
126
127        // Handle lock request
128        if request_lock {
129            if self.lock_holder.is_some() && self.lock_holder.as_deref() != Some(writer) {
130                return Err(UpdateError::LockHeld {
131                    holder: self.lock_holder.clone().unwrap(),
132                });
133            }
134            self.lock_holder = Some(writer.to_string());
135        }
136
137        // Apply update
138        self.value = new_value;
139        self.revision += 1;
140        self.writer = writer.to_string();
141        self.timestamp = timestamp;
142
143        Ok(self.revision)
144    }
145
146    /// Check if value is within range (if specified)
147    pub fn validate_range(&self, value: &Value) -> bool {
148        if let Some(meta) = &self.meta {
149            if let Some((min, max)) = meta.range {
150                if let Some(v) = value.as_f64() {
151                    return v >= min && v <= max;
152                }
153            }
154        }
155        true
156    }
157}
158
159/// Errors that can occur during state updates
160#[derive(Debug, Clone)]
161pub enum UpdateError {
162    RevisionConflict { expected: u64, actual: u64 },
163    LockHeld { holder: String },
164    ConflictRejected,
165    OutOfRange,
166}
167
168impl std::fmt::Display for UpdateError {
169    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
170        match self {
171            Self::RevisionConflict { expected, actual } => {
172                write!(f, "Revision conflict: expected {}, got {}", expected, actual)
173            }
174            Self::LockHeld { holder } => {
175                write!(f, "Parameter locked by {}", holder)
176            }
177            Self::ConflictRejected => {
178                write!(f, "Update rejected by conflict strategy")
179            }
180            Self::OutOfRange => {
181                write!(f, "Value out of allowed range")
182            }
183        }
184    }
185}
186
187impl std::error::Error for UpdateError {}
188
189/// State store for multiple params
190#[derive(Debug, Default)]
191pub struct StateStore {
192    params: HashMap<String, ParamState>,
193}
194
195impl StateStore {
196    pub fn new() -> Self {
197        Self::default()
198    }
199
200    /// Get a param's current state
201    pub fn get(&self, address: &str) -> Option<&ParamState> {
202        self.params.get(address)
203    }
204
205    /// Get a param's current value
206    pub fn get_value(&self, address: &str) -> Option<&Value> {
207        self.params.get(address).map(|p| &p.value)
208    }
209
210    /// Set a param value, creating if necessary
211    pub fn set(
212        &mut self,
213        address: &str,
214        value: Value,
215        writer: &str,
216        revision: Option<u64>,
217        lock: bool,
218        unlock: bool,
219    ) -> Result<u64, UpdateError> {
220        if let Some(param) = self.params.get_mut(address) {
221            param.try_update(value, writer, revision, lock, unlock)
222        } else {
223            // Create new param
224            let mut param = ParamState::new(value, writer.to_string());
225            if lock {
226                param.lock_holder = Some(writer.to_string());
227            }
228            let rev = param.revision;
229            self.params.insert(address.to_string(), param);
230            Ok(rev)
231        }
232    }
233
234    /// Get all params matching a pattern
235    pub fn get_matching(&self, pattern: &str) -> Vec<(&str, &ParamState)> {
236        use crate::address::glob_match;
237
238        self.params
239            .iter()
240            .filter(|(addr, _)| glob_match(pattern, addr))
241            .map(|(addr, state)| (addr.as_str(), state))
242            .collect()
243    }
244
245    /// Get all params as a snapshot
246    pub fn snapshot(&self) -> Vec<(&str, &ParamState)> {
247        self.params.iter().map(|(k, v)| (k.as_str(), v)).collect()
248    }
249
250    /// Number of params
251    pub fn len(&self) -> usize {
252        self.params.len()
253    }
254
255    /// Check if empty
256    pub fn is_empty(&self) -> bool {
257        self.params.is_empty()
258    }
259
260    /// Remove a param
261    pub fn remove(&mut self, address: &str) -> Option<ParamState> {
262        self.params.remove(address)
263    }
264
265    /// Clear all params
266    pub fn clear(&mut self) {
267        self.params.clear();
268    }
269}
270
271/// Get current timestamp in microseconds
272fn current_timestamp() -> u64 {
273    SystemTime::now()
274        .duration_since(UNIX_EPOCH)
275        .map(|d| d.as_micros() as u64)
276        .unwrap_or(0)
277}
278
279#[cfg(test)]
280mod tests {
281    use super::*;
282
283    #[test]
284    fn test_basic_update() {
285        let mut state = ParamState::new(Value::Float(0.5), "session1".to_string());
286
287        let result = state.try_update(Value::Float(0.75), "session2", None, false, false);
288
289        assert!(result.is_ok());
290        assert_eq!(state.revision, 2);
291        assert_eq!(state.value, Value::Float(0.75));
292        assert_eq!(state.writer, "session2");
293    }
294
295    #[test]
296    fn test_revision_conflict() {
297        let mut state = ParamState::new(Value::Float(0.5), "session1".to_string());
298
299        let result = state.try_update(
300            Value::Float(0.75),
301            "session2",
302            Some(999), // Wrong revision
303            false,
304            false,
305        );
306
307        assert!(matches!(result, Err(UpdateError::RevisionConflict { .. })));
308    }
309
310    #[test]
311    fn test_locking() {
312        let mut state = ParamState::new(Value::Float(0.5), "session1".to_string());
313
314        // Session 1 takes lock
315        let result = state.try_update(
316            Value::Float(0.6),
317            "session1",
318            None,
319            true, // Request lock
320            false,
321        );
322        assert!(result.is_ok());
323        assert_eq!(state.lock_holder, Some("session1".to_string()));
324
325        // Session 2 tries to update - should fail
326        let result = state.try_update(Value::Float(0.7), "session2", None, false, false);
327        assert!(matches!(result, Err(UpdateError::LockHeld { .. })));
328
329        // Session 1 can still update
330        let result = state.try_update(Value::Float(0.8), "session1", None, false, false);
331        assert!(result.is_ok());
332    }
333
334    #[test]
335    fn test_max_strategy() {
336        let mut state = ParamState::new(Value::Float(0.5), "session1".to_string())
337            .with_strategy(ConflictStrategy::Max);
338
339        // Higher value wins
340        let result = state.try_update(Value::Float(0.8), "session2", None, false, false);
341        assert!(result.is_ok());
342        assert_eq!(state.value, Value::Float(0.8));
343
344        // Lower value rejected
345        let result = state.try_update(Value::Float(0.3), "session3", None, false, false);
346        assert!(matches!(result, Err(UpdateError::ConflictRejected)));
347        assert_eq!(state.value, Value::Float(0.8)); // Unchanged
348    }
349
350    #[test]
351    fn test_state_store() {
352        let mut store = StateStore::new();
353
354        store
355            .set("/test/a", Value::Float(1.0), "s1", None, false, false)
356            .unwrap();
357        store
358            .set("/test/b", Value::Float(2.0), "s1", None, false, false)
359            .unwrap();
360        store
361            .set("/other/c", Value::Float(3.0), "s1", None, false, false)
362            .unwrap();
363
364        assert_eq!(store.len(), 3);
365
366        let matching = store.get_matching("/test/*");
367        assert_eq!(matching.len(), 2);
368    }
369}