1use crate::{ConflictStrategy, Value};
6use std::collections::HashMap;
7use std::time::{Duration, SystemTime, UNIX_EPOCH};
8
9#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
11pub enum EvictionStrategy {
12 #[default]
14 Lru,
15 OldestFirst,
17 RejectNew,
19}
20
21#[derive(Debug, Clone)]
23pub struct StateStoreConfig {
24 pub max_params: Option<usize>,
26 pub param_ttl: Option<Duration>,
28 pub eviction: EvictionStrategy,
30}
31
32impl Default for StateStoreConfig {
33 fn default() -> Self {
34 Self {
35 max_params: Some(10_000),
36 param_ttl: Some(Duration::from_secs(3600)), eviction: EvictionStrategy::Lru,
38 }
39 }
40}
41
42impl StateStoreConfig {
43 pub fn unlimited() -> Self {
45 Self {
46 max_params: None,
47 param_ttl: None,
48 eviction: EvictionStrategy::Lru,
49 }
50 }
51
52 pub fn with_limits(max_params: usize, ttl_secs: u64) -> Self {
54 Self {
55 max_params: Some(max_params),
56 param_ttl: Some(Duration::from_secs(ttl_secs)),
57 eviction: EvictionStrategy::Lru,
58 }
59 }
60}
61
62#[derive(Debug, Clone)]
64pub struct ParamState {
65 pub value: Value,
67 pub revision: u64,
69 pub writer: String,
71 pub timestamp: u64,
73 pub last_accessed: u64,
75 pub strategy: ConflictStrategy,
77 pub lock_holder: Option<String>,
79 pub meta: Option<ParamMeta>,
81 pub origin: Option<String>,
83}
84
85#[derive(Debug, Clone)]
87pub struct ParamMeta {
88 pub unit: Option<String>,
89 pub range: Option<(f64, f64)>,
90 pub default: Option<Value>,
91}
92
93impl ParamState {
94 pub fn new(value: Value, writer: String) -> Self {
96 let now = current_timestamp();
97 Self {
98 value,
99 revision: 1,
100 writer,
101 timestamp: now,
102 last_accessed: now,
103 strategy: ConflictStrategy::Lww,
104 lock_holder: None,
105 meta: None,
106 origin: None,
107 }
108 }
109
110 pub fn touch(&mut self) {
112 self.last_accessed = current_timestamp();
113 }
114
115 pub fn with_strategy(mut self, strategy: ConflictStrategy) -> Self {
117 self.strategy = strategy;
118 self
119 }
120
121 pub fn with_meta(mut self, meta: ParamMeta) -> Self {
123 self.meta = Some(meta);
124 self
125 }
126
127 pub fn try_update(
132 &mut self,
133 new_value: Value,
134 writer: &str,
135 expected_revision: Option<u64>,
136 request_lock: bool,
137 release_lock: bool,
138 ) -> Result<u64, UpdateError> {
139 let timestamp = current_timestamp();
140
141 if let Some(expected) = expected_revision {
143 if expected != self.revision {
144 return Err(UpdateError::RevisionConflict {
145 expected,
146 actual: self.revision,
147 });
148 }
149 }
150
151 if let Some(ref holder) = self.lock_holder {
153 if holder != writer && !release_lock {
154 return Err(UpdateError::LockHeld {
155 holder: holder.clone(),
156 });
157 }
158 }
159
160 if release_lock && self.lock_holder.as_deref() == Some(writer) {
162 self.lock_holder = None;
163 }
164
165 let should_update = match self.strategy {
167 ConflictStrategy::Lww => timestamp >= self.timestamp,
168 ConflictStrategy::Max => {
169 match (&new_value, &self.value) {
170 (Value::Float(new), Value::Float(old)) => new > old,
171 (Value::Int(new), Value::Int(old)) => new > old,
172 _ => true, }
174 }
175 ConflictStrategy::Min => match (&new_value, &self.value) {
176 (Value::Float(new), Value::Float(old)) => new < old,
177 (Value::Int(new), Value::Int(old)) => new < old,
178 _ => true,
179 },
180 ConflictStrategy::Lock => {
181 self.lock_holder.is_none() || self.lock_holder.as_deref() == Some(writer)
182 }
183 ConflictStrategy::Merge => true, };
185
186 if !should_update {
187 return Err(UpdateError::ConflictRejected);
188 }
189
190 if request_lock {
192 if self.lock_holder.is_some() && self.lock_holder.as_deref() != Some(writer) {
193 return Err(UpdateError::LockHeld {
194 holder: self.lock_holder.clone().unwrap(),
195 });
196 }
197 self.lock_holder = Some(writer.to_string());
198 }
199
200 self.value = new_value;
202 self.revision += 1;
203 self.writer = writer.to_string();
204 self.timestamp = timestamp;
205 self.last_accessed = timestamp;
206
207 Ok(self.revision)
208 }
209
210 pub fn validate_range(&self, value: &Value) -> bool {
212 if let Some(meta) = &self.meta {
213 if let Some((min, max)) = meta.range {
214 if let Some(v) = value.as_f64() {
215 return v >= min && v <= max;
216 }
217 }
218 }
219 true
220 }
221}
222
223#[derive(Debug, Clone)]
225pub enum UpdateError {
226 RevisionConflict { expected: u64, actual: u64 },
227 LockHeld { holder: String },
228 ConflictRejected,
229 OutOfRange,
230 AtCapacity,
231}
232
233impl std::fmt::Display for UpdateError {
234 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
235 match self {
236 Self::RevisionConflict { expected, actual } => {
237 write!(
238 f,
239 "Revision conflict: expected {}, got {}",
240 expected, actual
241 )
242 }
243 Self::LockHeld { holder } => {
244 write!(f, "Parameter locked by {}", holder)
245 }
246 Self::ConflictRejected => {
247 write!(f, "Update rejected by conflict strategy")
248 }
249 Self::OutOfRange => {
250 write!(f, "Value out of allowed range")
251 }
252 Self::AtCapacity => {
253 write!(f, "State store at capacity")
254 }
255 }
256 }
257}
258
259impl std::error::Error for UpdateError {}
260
261#[derive(Debug, Clone)]
263pub struct CapacityError;
264
265impl std::fmt::Display for CapacityError {
266 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
267 write!(f, "State store at capacity")
268 }
269}
270
271impl std::error::Error for CapacityError {}
272
273#[derive(Debug)]
275pub struct StateStore {
276 params: HashMap<String, ParamState>,
277 config: StateStoreConfig,
278}
279
280impl Default for StateStore {
281 fn default() -> Self {
282 Self {
283 params: HashMap::new(),
284 config: StateStoreConfig::unlimited(), }
286 }
287}
288
289impl StateStore {
290 pub fn new() -> Self {
292 Self::default()
293 }
294
295 pub fn with_config(config: StateStoreConfig) -> Self {
297 Self {
298 params: HashMap::new(),
299 config,
300 }
301 }
302
303 pub fn config(&self) -> &StateStoreConfig {
305 &self.config
306 }
307
308 pub fn get(&self, address: &str) -> Option<&ParamState> {
310 self.params.get(address)
311 }
312
313 pub fn get_mut(&mut self, address: &str) -> Option<&mut ParamState> {
315 let param = self.params.get_mut(address)?;
316 param.touch();
317 Some(param)
318 }
319
320 pub fn get_value(&self, address: &str) -> Option<&Value> {
322 self.params.get(address).map(|p| &p.value)
323 }
324
325 pub fn get_value_mut(&mut self, address: &str) -> Option<&Value> {
327 let param = self.params.get_mut(address)?;
328 param.touch();
329 Some(¶m.value)
330 }
331
332 pub fn set(
334 &mut self,
335 address: &str,
336 value: Value,
337 writer: &str,
338 revision: Option<u64>,
339 lock: bool,
340 unlock: bool,
341 ) -> Result<u64, UpdateError> {
342 if let Some(param) = self.params.get_mut(address) {
343 param.try_update(value, writer, revision, lock, unlock)
344 } else {
345 if let Some(max) = self.config.max_params {
347 if self.params.len() >= max {
348 match self.config.eviction {
349 EvictionStrategy::RejectNew => {
350 return Err(UpdateError::AtCapacity);
351 }
352 EvictionStrategy::Lru => {
353 self.evict_lru();
354 }
355 EvictionStrategy::OldestFirst => {
356 self.evict_oldest();
357 }
358 }
359 }
360 }
361
362 let mut param = ParamState::new(value, writer.to_string());
364 if lock {
365 param.lock_holder = Some(writer.to_string());
366 }
367 let rev = param.revision;
368 self.params.insert(address.to_string(), param);
369 Ok(rev)
370 }
371 }
372
373 fn evict_lru(&mut self) {
375 if let Some(oldest_key) = self
376 .params
377 .iter()
378 .min_by_key(|(_, v)| v.last_accessed)
379 .map(|(k, _)| k.clone())
380 {
381 self.params.remove(&oldest_key);
382 }
383 }
384
385 fn evict_oldest(&mut self) {
387 if let Some(oldest_key) = self
388 .params
389 .iter()
390 .min_by_key(|(_, v)| v.timestamp)
391 .map(|(k, _)| k.clone())
392 {
393 self.params.remove(&oldest_key);
394 }
395 }
396
397 pub fn cleanup_stale(&mut self, ttl: Duration) -> usize {
400 let now = current_timestamp();
401 let ttl_micros = ttl.as_micros() as u64;
402 let cutoff = now.saturating_sub(ttl_micros);
403
404 let before = self.params.len();
405 self.params.retain(|_, v| v.last_accessed >= cutoff);
406 before - self.params.len()
407 }
408
409 pub fn cleanup_stale_with_config(&mut self) -> usize {
412 if let Some(ttl) = self.config.param_ttl {
413 self.cleanup_stale(ttl)
414 } else {
415 0
416 }
417 }
418
419 pub fn get_matching(&self, pattern: &str) -> Vec<(&str, &ParamState)> {
421 use crate::address::glob_match;
422
423 self.params
424 .iter()
425 .filter(|(addr, _)| glob_match(pattern, addr))
426 .map(|(addr, state)| (addr.as_str(), state))
427 .collect()
428 }
429
430 pub fn snapshot(&self) -> Vec<(&str, &ParamState)> {
432 self.params.iter().map(|(k, v)| (k.as_str(), v)).collect()
433 }
434
435 pub fn len(&self) -> usize {
437 self.params.len()
438 }
439
440 pub fn is_empty(&self) -> bool {
442 self.params.is_empty()
443 }
444
445 pub fn remove(&mut self, address: &str) -> Option<ParamState> {
447 self.params.remove(address)
448 }
449
450 pub fn clear(&mut self) {
452 self.params.clear();
453 }
454}
455
456fn current_timestamp() -> u64 {
458 SystemTime::now()
459 .duration_since(UNIX_EPOCH)
460 .map(|d| d.as_micros() as u64)
461 .unwrap_or(0)
462}
463
464#[cfg(test)]
465mod tests {
466 use super::*;
467
468 #[test]
469 fn test_basic_update() {
470 let mut state = ParamState::new(Value::Float(0.5), "session1".to_string());
471
472 let result = state.try_update(Value::Float(0.75), "session2", None, false, false);
473
474 assert!(result.is_ok());
475 assert_eq!(state.revision, 2);
476 assert_eq!(state.value, Value::Float(0.75));
477 assert_eq!(state.writer, "session2");
478 }
479
480 #[test]
481 fn test_revision_conflict() {
482 let mut state = ParamState::new(Value::Float(0.5), "session1".to_string());
483
484 let result = state.try_update(
485 Value::Float(0.75),
486 "session2",
487 Some(999), false,
489 false,
490 );
491
492 assert!(matches!(result, Err(UpdateError::RevisionConflict { .. })));
493 }
494
495 #[test]
496 fn test_locking() {
497 let mut state = ParamState::new(Value::Float(0.5), "session1".to_string());
498
499 let result = state.try_update(
501 Value::Float(0.6),
502 "session1",
503 None,
504 true, false,
506 );
507 assert!(result.is_ok());
508 assert_eq!(state.lock_holder, Some("session1".to_string()));
509
510 let result = state.try_update(Value::Float(0.7), "session2", None, false, false);
512 assert!(matches!(result, Err(UpdateError::LockHeld { .. })));
513
514 let result = state.try_update(Value::Float(0.8), "session1", None, false, false);
516 assert!(result.is_ok());
517 }
518
519 #[test]
520 fn test_max_strategy() {
521 let mut state = ParamState::new(Value::Float(0.5), "session1".to_string())
522 .with_strategy(ConflictStrategy::Max);
523
524 let result = state.try_update(Value::Float(0.8), "session2", None, false, false);
526 assert!(result.is_ok());
527 assert_eq!(state.value, Value::Float(0.8));
528
529 let result = state.try_update(Value::Float(0.3), "session3", None, false, false);
531 assert!(matches!(result, Err(UpdateError::ConflictRejected)));
532 assert_eq!(state.value, Value::Float(0.8)); }
534
535 #[test]
536 fn test_state_store() {
537 let mut store = StateStore::new();
538
539 store
540 .set("/test/a", Value::Float(1.0), "s1", None, false, false)
541 .unwrap();
542 store
543 .set("/test/b", Value::Float(2.0), "s1", None, false, false)
544 .unwrap();
545 store
546 .set("/other/c", Value::Float(3.0), "s1", None, false, false)
547 .unwrap();
548
549 assert_eq!(store.len(), 3);
550
551 let matching = store.get_matching("/test/*");
552 assert_eq!(matching.len(), 2);
553 }
554
555 #[test]
556 fn test_state_store_capacity_reject() {
557 let config = StateStoreConfig {
558 max_params: Some(2),
559 param_ttl: None,
560 eviction: EvictionStrategy::RejectNew,
561 };
562 let mut store = StateStore::with_config(config);
563
564 store
565 .set("/test/a", Value::Float(1.0), "s1", None, false, false)
566 .unwrap();
567 store
568 .set("/test/b", Value::Float(2.0), "s1", None, false, false)
569 .unwrap();
570
571 let result = store.set("/test/c", Value::Float(3.0), "s1", None, false, false);
573 assert!(matches!(result, Err(UpdateError::AtCapacity)));
574 assert_eq!(store.len(), 2);
575
576 store
578 .set("/test/a", Value::Float(1.5), "s1", None, false, false)
579 .unwrap();
580 assert_eq!(store.get_value("/test/a"), Some(&Value::Float(1.5)));
581 }
582
583 #[test]
584 fn test_state_store_capacity_lru_eviction() {
585 let config = StateStoreConfig {
586 max_params: Some(2),
587 param_ttl: None,
588 eviction: EvictionStrategy::Lru,
589 };
590 let mut store = StateStore::with_config(config);
591
592 store
593 .set("/test/a", Value::Float(1.0), "s1", None, false, false)
594 .unwrap();
595 std::thread::sleep(std::time::Duration::from_millis(1));
596 store
597 .set("/test/b", Value::Float(2.0), "s1", None, false, false)
598 .unwrap();
599
600 std::thread::sleep(std::time::Duration::from_millis(1));
602 store.get_mut("/test/a");
603
604 store
606 .set("/test/c", Value::Float(3.0), "s1", None, false, false)
607 .unwrap();
608
609 assert_eq!(store.len(), 2);
610 assert!(store.get("/test/a").is_some());
611 assert!(store.get("/test/b").is_none()); assert!(store.get("/test/c").is_some());
613 }
614
615 #[test]
616 fn test_state_store_capacity_oldest_eviction() {
617 let config = StateStoreConfig {
618 max_params: Some(2),
619 param_ttl: None,
620 eviction: EvictionStrategy::OldestFirst,
621 };
622 let mut store = StateStore::with_config(config);
623
624 store
625 .set("/test/a", Value::Float(1.0), "s1", None, false, false)
626 .unwrap();
627 std::thread::sleep(std::time::Duration::from_millis(1));
628 store
629 .set("/test/b", Value::Float(2.0), "s1", None, false, false)
630 .unwrap();
631
632 store
634 .set("/test/c", Value::Float(3.0), "s1", None, false, false)
635 .unwrap();
636
637 assert_eq!(store.len(), 2);
638 assert!(store.get("/test/a").is_none()); assert!(store.get("/test/b").is_some());
640 assert!(store.get("/test/c").is_some());
641 }
642
643 #[test]
644 fn test_state_store_cleanup_stale() {
645 let mut store = StateStore::new();
646
647 store
648 .set("/test/a", Value::Float(1.0), "s1", None, false, false)
649 .unwrap();
650 store
651 .set("/test/b", Value::Float(2.0), "s1", None, false, false)
652 .unwrap();
653
654 std::thread::sleep(std::time::Duration::from_millis(10));
656 store.get_mut("/test/a");
657
658 let removed = store.cleanup_stale(Duration::from_millis(5));
660 assert_eq!(removed, 1);
661 assert!(store.get("/test/a").is_some());
662 assert!(store.get("/test/b").is_none());
663 }
664
665 #[test]
666 fn test_state_store_cleanup_stale_with_config() {
667 let config = StateStoreConfig {
668 max_params: None,
669 param_ttl: Some(Duration::from_millis(5)),
670 eviction: EvictionStrategy::Lru,
671 };
672 let mut store = StateStore::with_config(config);
673
674 store
675 .set("/test/a", Value::Float(1.0), "s1", None, false, false)
676 .unwrap();
677
678 let removed = store.cleanup_stale_with_config();
680 assert_eq!(removed, 0);
681
682 std::thread::sleep(std::time::Duration::from_millis(10));
684 let removed = store.cleanup_stale_with_config();
685 assert_eq!(removed, 1);
686 assert!(store.is_empty());
687 }
688
689 #[test]
690 fn test_last_accessed_tracking() {
691 let mut state = ParamState::new(Value::Float(0.5), "session1".to_string());
692 let initial_accessed = state.last_accessed;
693
694 std::thread::sleep(std::time::Duration::from_millis(1));
695 state.touch();
696
697 assert!(state.last_accessed > initial_accessed);
698 }
699}