Skip to main content

ai_agent/state/
store.rs

1// Source: /data/home/swei/claudecode/openclaudecode/src/state/store.ts
2//! Simple state store implementation
3
4use std::collections::VecDeque;
5use std::fmt::Debug;
6use std::marker::PhantomData;
7
8/// Listener callback type
9type Listener = Box<dyn Fn() + Send + Sync>;
10
11/// Callback for state changes
12type OnChange<T> = Box<dyn Fn(&T, &T) + Send + Sync>;
13
14/// A simple state store
15#[derive(Clone)]
16pub struct Store<T: Clone + PartialEq + Send + Sync> {
17    inner: std::sync::Arc<StoreInner<T>>,
18}
19
20struct StoreInner<T: Clone + PartialEq> {
21    state: std::sync::Mutex<T>,
22    listeners: std::sync::Mutex<VecDeque<Listener>>,
23    on_change: Option<OnChange<T>>,
24}
25
26impl<T: Clone + PartialEq + Send + Sync + 'static> Store<T> {
27    /// Create a new store with initial state
28    pub fn new(initial_state: T) -> Self {
29        Self {
30            inner: std::sync::Arc::new(StoreInner {
31                state: std::sync::Mutex::new(initial_state),
32                listeners: std::sync::Mutex::new(VecDeque::new()),
33                on_change: None,
34            }),
35        }
36    }
37
38    /// Create a new store with initial state and change callback
39    pub fn with_on_change(
40        initial_state: T,
41        on_change: impl Fn(&T, &T) + Send + Sync + 'static,
42    ) -> Self {
43        Self {
44            inner: std::sync::Arc::new(StoreInner {
45                state: std::sync::Mutex::new(initial_state),
46                listeners: std::sync::Mutex::new(VecDeque::new()),
47                on_change: Some(Box::new(on_change)),
48            }),
49        }
50    }
51
52    /// Get the current state
53    pub fn get_state(&self) -> T {
54        self.inner.state.lock().unwrap().clone()
55    }
56
57    /// Update the state using an updater function
58    pub fn set_state(&self, updater: impl FnOnce(T) -> T + Send + Sync) {
59        let mut state = self.inner.state.lock().unwrap();
60        let prev = state.clone();
61        let next = updater(prev.clone());
62
63        if next == prev {
64            return; // State didn't change
65        }
66
67        *state = next.clone();
68
69        // Call on_change callback if set
70        if let Some(ref callback) = self.inner.on_change {
71            callback(&next, &prev);
72        }
73
74        // Notify all listeners
75        let listeners = self.inner.listeners.lock().unwrap();
76        for listener in listeners.iter() {
77            listener();
78        }
79    }
80
81    /// Subscribe to state changes
82    pub fn subscribe(&self, listener: impl Fn() + Send + Sync + 'static) -> impl Fn() {
83        let listener = Box::new(listener) as Listener;
84        self.inner.listeners.lock().unwrap().push_back(listener);
85
86        Box::new(move || {
87            // Listener will be dropped when the returned closure is dropped
88        })
89    }
90}
91
92impl<T: Clone + PartialEq + Debug + Send + Sync> Debug for Store<T> {
93    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
94        f.debug_struct("Store")
95            .field("state", &self.inner.state.lock().unwrap())
96            .finish()
97    }
98}
99
100#[cfg(test)]
101mod tests {
102    use super::*;
103
104    #[test]
105    fn test_store_basic() {
106        let store = Store::new(42i32);
107        assert_eq!(store.get_state(), 42);
108    }
109
110    #[test]
111    fn test_store_set_state() {
112        let store = Store::new(0i32);
113        store.set_state(|_| 10);
114        assert_eq!(store.get_state(), 10);
115    }
116
117    #[test]
118    fn test_store_subscription() {
119        let store = Store::new(0i32);
120        let called = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
121        let called_clone = called.clone();
122
123        store.subscribe(move || {
124            called_clone.store(true, std::sync::atomic::Ordering::SeqCst);
125        });
126
127        store.set_state(|_| 5);
128
129        std::thread::sleep(std::time::Duration::from_millis(10));
130        assert!(called.load(std::sync::atomic::Ordering::SeqCst));
131    }
132
133    #[test]
134    fn test_store_no_change() {
135        let store = Store::new(42i32);
136        let call_count = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0));
137        let call_count_clone = call_count.clone();
138
139        store.subscribe(move || {
140            call_count_clone.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
141        });
142
143        // Set to same value - should not trigger listener
144        store.set_state(|s| s);
145
146        std::thread::sleep(std::time::Duration::from_millis(10));
147        assert_eq!(call_count.load(std::sync::atomic::Ordering::SeqCst), 0);
148    }
149
150    #[test]
151    fn test_store_on_change() {
152        let store = Store::with_on_change(0i32, |new_val, old_val| {
153            assert_eq!(*old_val, 0);
154            assert_eq!(*new_val, 42);
155        });
156
157        store.set_state(|_| 42);
158        assert_eq!(store.get_state(), 42);
159    }
160}