1use crate::{ConflictStrategy, Value};
6use std::collections::HashMap;
7use std::time::{SystemTime, UNIX_EPOCH};
8
9#[derive(Debug, Clone)]
11pub struct ParamState {
12 pub value: Value,
14 pub revision: u64,
16 pub writer: String,
18 pub timestamp: u64,
20 pub strategy: ConflictStrategy,
22 pub lock_holder: Option<String>,
24 pub meta: Option<ParamMeta>,
26}
27
28#[derive(Debug, Clone)]
30pub struct ParamMeta {
31 pub unit: Option<String>,
32 pub range: Option<(f64, f64)>,
33 pub default: Option<Value>,
34}
35
36impl ParamState {
37 pub fn new(value: Value, writer: String) -> Self {
39 Self {
40 value,
41 revision: 1,
42 writer,
43 timestamp: current_timestamp(),
44 strategy: ConflictStrategy::Lww,
45 lock_holder: None,
46 meta: None,
47 }
48 }
49
50 pub fn with_strategy(mut self, strategy: ConflictStrategy) -> Self {
52 self.strategy = strategy;
53 self
54 }
55
56 pub fn with_meta(mut self, meta: ParamMeta) -> Self {
58 self.meta = Some(meta);
59 self
60 }
61
62 pub fn try_update(
67 &mut self,
68 new_value: Value,
69 writer: &str,
70 expected_revision: Option<u64>,
71 request_lock: bool,
72 release_lock: bool,
73 ) -> Result<u64, UpdateError> {
74 let timestamp = current_timestamp();
75
76 if let Some(expected) = expected_revision {
78 if expected != self.revision {
79 return Err(UpdateError::RevisionConflict {
80 expected,
81 actual: self.revision,
82 });
83 }
84 }
85
86 if let Some(ref holder) = self.lock_holder {
88 if holder != writer && !release_lock {
89 return Err(UpdateError::LockHeld {
90 holder: holder.clone(),
91 });
92 }
93 }
94
95 if release_lock {
97 if self.lock_holder.as_deref() == Some(writer) {
98 self.lock_holder = None;
99 }
100 }
101
102 let should_update = match self.strategy {
104 ConflictStrategy::Lww => timestamp >= self.timestamp,
105 ConflictStrategy::Max => {
106 match (&new_value, &self.value) {
107 (Value::Float(new), Value::Float(old)) => new > old,
108 (Value::Int(new), Value::Int(old)) => new > old,
109 _ => true, }
111 }
112 ConflictStrategy::Min => match (&new_value, &self.value) {
113 (Value::Float(new), Value::Float(old)) => new < old,
114 (Value::Int(new), Value::Int(old)) => new < old,
115 _ => true,
116 },
117 ConflictStrategy::Lock => {
118 self.lock_holder.is_none() || self.lock_holder.as_deref() == Some(writer)
119 }
120 ConflictStrategy::Merge => true, };
122
123 if !should_update {
124 return Err(UpdateError::ConflictRejected);
125 }
126
127 if request_lock {
129 if self.lock_holder.is_some() && self.lock_holder.as_deref() != Some(writer) {
130 return Err(UpdateError::LockHeld {
131 holder: self.lock_holder.clone().unwrap(),
132 });
133 }
134 self.lock_holder = Some(writer.to_string());
135 }
136
137 self.value = new_value;
139 self.revision += 1;
140 self.writer = writer.to_string();
141 self.timestamp = timestamp;
142
143 Ok(self.revision)
144 }
145
146 pub fn validate_range(&self, value: &Value) -> bool {
148 if let Some(meta) = &self.meta {
149 if let Some((min, max)) = meta.range {
150 if let Some(v) = value.as_f64() {
151 return v >= min && v <= max;
152 }
153 }
154 }
155 true
156 }
157}
158
159#[derive(Debug, Clone)]
161pub enum UpdateError {
162 RevisionConflict { expected: u64, actual: u64 },
163 LockHeld { holder: String },
164 ConflictRejected,
165 OutOfRange,
166}
167
168impl std::fmt::Display for UpdateError {
169 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
170 match self {
171 Self::RevisionConflict { expected, actual } => {
172 write!(f, "Revision conflict: expected {}, got {}", expected, actual)
173 }
174 Self::LockHeld { holder } => {
175 write!(f, "Parameter locked by {}", holder)
176 }
177 Self::ConflictRejected => {
178 write!(f, "Update rejected by conflict strategy")
179 }
180 Self::OutOfRange => {
181 write!(f, "Value out of allowed range")
182 }
183 }
184 }
185}
186
187impl std::error::Error for UpdateError {}
188
189#[derive(Debug, Default)]
191pub struct StateStore {
192 params: HashMap<String, ParamState>,
193}
194
195impl StateStore {
196 pub fn new() -> Self {
197 Self::default()
198 }
199
200 pub fn get(&self, address: &str) -> Option<&ParamState> {
202 self.params.get(address)
203 }
204
205 pub fn get_value(&self, address: &str) -> Option<&Value> {
207 self.params.get(address).map(|p| &p.value)
208 }
209
210 pub fn set(
212 &mut self,
213 address: &str,
214 value: Value,
215 writer: &str,
216 revision: Option<u64>,
217 lock: bool,
218 unlock: bool,
219 ) -> Result<u64, UpdateError> {
220 if let Some(param) = self.params.get_mut(address) {
221 param.try_update(value, writer, revision, lock, unlock)
222 } else {
223 let mut param = ParamState::new(value, writer.to_string());
225 if lock {
226 param.lock_holder = Some(writer.to_string());
227 }
228 let rev = param.revision;
229 self.params.insert(address.to_string(), param);
230 Ok(rev)
231 }
232 }
233
234 pub fn get_matching(&self, pattern: &str) -> Vec<(&str, &ParamState)> {
236 use crate::address::glob_match;
237
238 self.params
239 .iter()
240 .filter(|(addr, _)| glob_match(pattern, addr))
241 .map(|(addr, state)| (addr.as_str(), state))
242 .collect()
243 }
244
245 pub fn snapshot(&self) -> Vec<(&str, &ParamState)> {
247 self.params.iter().map(|(k, v)| (k.as_str(), v)).collect()
248 }
249
250 pub fn len(&self) -> usize {
252 self.params.len()
253 }
254
255 pub fn is_empty(&self) -> bool {
257 self.params.is_empty()
258 }
259
260 pub fn remove(&mut self, address: &str) -> Option<ParamState> {
262 self.params.remove(address)
263 }
264
265 pub fn clear(&mut self) {
267 self.params.clear();
268 }
269}
270
271fn current_timestamp() -> u64 {
273 SystemTime::now()
274 .duration_since(UNIX_EPOCH)
275 .map(|d| d.as_micros() as u64)
276 .unwrap_or(0)
277}
278
279#[cfg(test)]
280mod tests {
281 use super::*;
282
283 #[test]
284 fn test_basic_update() {
285 let mut state = ParamState::new(Value::Float(0.5), "session1".to_string());
286
287 let result = state.try_update(Value::Float(0.75), "session2", None, false, false);
288
289 assert!(result.is_ok());
290 assert_eq!(state.revision, 2);
291 assert_eq!(state.value, Value::Float(0.75));
292 assert_eq!(state.writer, "session2");
293 }
294
295 #[test]
296 fn test_revision_conflict() {
297 let mut state = ParamState::new(Value::Float(0.5), "session1".to_string());
298
299 let result = state.try_update(
300 Value::Float(0.75),
301 "session2",
302 Some(999), false,
304 false,
305 );
306
307 assert!(matches!(result, Err(UpdateError::RevisionConflict { .. })));
308 }
309
310 #[test]
311 fn test_locking() {
312 let mut state = ParamState::new(Value::Float(0.5), "session1".to_string());
313
314 let result = state.try_update(
316 Value::Float(0.6),
317 "session1",
318 None,
319 true, false,
321 );
322 assert!(result.is_ok());
323 assert_eq!(state.lock_holder, Some("session1".to_string()));
324
325 let result = state.try_update(Value::Float(0.7), "session2", None, false, false);
327 assert!(matches!(result, Err(UpdateError::LockHeld { .. })));
328
329 let result = state.try_update(Value::Float(0.8), "session1", None, false, false);
331 assert!(result.is_ok());
332 }
333
334 #[test]
335 fn test_max_strategy() {
336 let mut state = ParamState::new(Value::Float(0.5), "session1".to_string())
337 .with_strategy(ConflictStrategy::Max);
338
339 let result = state.try_update(Value::Float(0.8), "session2", None, false, false);
341 assert!(result.is_ok());
342 assert_eq!(state.value, Value::Float(0.8));
343
344 let result = state.try_update(Value::Float(0.3), "session3", None, false, false);
346 assert!(matches!(result, Err(UpdateError::ConflictRejected)));
347 assert_eq!(state.value, Value::Float(0.8)); }
349
350 #[test]
351 fn test_state_store() {
352 let mut store = StateStore::new();
353
354 store
355 .set("/test/a", Value::Float(1.0), "s1", None, false, false)
356 .unwrap();
357 store
358 .set("/test/b", Value::Float(2.0), "s1", None, false, false)
359 .unwrap();
360 store
361 .set("/other/c", Value::Float(3.0), "s1", None, false, false)
362 .unwrap();
363
364 assert_eq!(store.len(), 3);
365
366 let matching = store.get_matching("/test/*");
367 assert_eq!(matching.len(), 2);
368 }
369}