rs_store/
store.rs

1use crate::{BackpressurePolicy, Subscriber, Subscription};
2use std::sync::Arc;
3use std::time::Instant;
4
5/// Default capacity for the channel
6pub const DEFAULT_CAPACITY: usize = 16;
7pub const DEFAULT_STORE_NAME: &str = "store";
8
9/// StoreError represents an error that occurred in the store
10#[derive(Debug, thiserror::Error)]
11pub enum StoreError {
12    #[error("dispatch error: {0}")]
13    DispatchError(String),
14    #[error("reducer error: {0}")]
15    ReducerError(String),
16    #[error("subscription error: {0}")]
17    SubscriptionError(String),
18    #[error("middleware error: {0}")]
19    MiddlewareError(String),
20    #[error("initialization error: {0}")]
21    InitError(String),
22    /// state update failed with context and source
23    #[error("state update failed: {context}, cause: {source}")]
24    StateUpdateError {
25        context: String,
26        source: Box<dyn std::error::Error + Send + Sync>,
27    },
28}
29
30/// Store trait defines the interface for a Redux-like store
31pub trait Store<State, Action>: Send + Sync
32where
33    State: Send + Sync + Clone + 'static,
34    Action: Send + Sync + Clone + 'static,
35{
36    /// Get the current state
37    fn get_state(&self) -> State;
38
39    /// Dispatch an action
40    fn dispatch(&self, action: Action) -> Result<(), StoreError>;
41
42    /// Add a subscriber to the store
43    /// store updates are delivered to the subscriber in same reducer thread
44    fn add_subscriber(
45        &self,
46        subscriber: Arc<dyn Subscriber<State, Action> + Send + Sync>,
47    ) -> Box<dyn Subscription>;
48
49    /// Iterate over the store's state and action pairs
50    //fn iter(&self) -> impl Iterator<Item = (State, Action)>;
51
52    /// subscribe to the store in new context
53    /// store updates are delivered to the subscriber in the new context
54    fn subscribed(
55        &self,
56        subscriber: Box<dyn Subscriber<State, Action> + Send + Sync>,
57    ) -> Result<Box<dyn Subscription>, StoreError>;
58
59    /// subscribe to the store in new context
60    /// store updates are delivered to the subscriber in the new context
61    ///
62    /// ### Parameters
63    /// * capacity: Channel buffer capacity
64    /// * policy: Backpressure policy for when channel is full,
65    ///     `BlockOnFull` or `DropLatestIf` is supported to prevent from dropping the ActionOp::Exit
66    ///
67    /// ### Return
68    /// * Subscription: Subscription for the store,
69    fn subscribed_with(
70        &self,
71        capacity: usize,
72        policy: BackpressurePolicy<(Instant, State, Action)>,
73        subscriber: Box<dyn Subscriber<State, Action> + Send + Sync>,
74    ) -> Result<Box<dyn Subscription>, StoreError>;
75
76    /// Stop the store
77    fn stop(&self);
78}
79
80#[cfg(test)]
81mod tests {
82    use super::*;
83    use crate::builder::StoreBuilder;
84    use crate::store_droppable::DroppableStore;
85    use crate::BackpressurePolicy;
86    use crate::DispatchOp;
87    use crate::Reducer;
88    use std::sync::Arc;
89    use std::thread;
90    use std::time::Duration;
91
92    // Mock implementations for testing
93    #[derive(Debug, Clone, PartialEq)]
94    struct TestState {
95        counter: i32,
96        message: String,
97    }
98
99    impl Default for TestState {
100        fn default() -> Self {
101            TestState {
102                counter: 0,
103                message: String::new(),
104            }
105        }
106    }
107
108    #[derive(Debug, Clone)]
109    enum TestAction {
110        Increment,
111        Decrement,
112        SetMessage(String),
113    }
114
115    struct TestReducer;
116
117    impl Reducer<TestState, TestAction> for TestReducer {
118        fn reduce(
119            &self,
120            state: &TestState,
121            action: &TestAction,
122        ) -> DispatchOp<TestState, TestAction> {
123            match action {
124                TestAction::Increment => {
125                    let mut new_state = state.clone();
126                    new_state.counter += 1;
127                    DispatchOp::Dispatch(new_state, None)
128                }
129                TestAction::Decrement => {
130                    let mut new_state = state.clone();
131                    new_state.counter -= 1;
132                    DispatchOp::Dispatch(new_state, None)
133                }
134                TestAction::SetMessage(msg) => {
135                    let mut new_state = state.clone();
136                    new_state.message = msg.clone();
137                    DispatchOp::Dispatch(new_state, None)
138                }
139            }
140        }
141    }
142
143    fn create_test_store() -> DroppableStore<TestState, TestAction> {
144        DroppableStore::new(
145            StoreBuilder::new(TestState::default())
146                .with_reducer(Box::new(TestReducer))
147                .with_name("test-store".into())
148                .build()
149                .unwrap(),
150        )
151    }
152
153    #[test]
154    fn test_store_get_state() {
155        let store = create_test_store();
156        let initial_state = store.get_state();
157        assert_eq!(initial_state.counter, 0);
158        assert_eq!(initial_state.message, "");
159    }
160
161    #[test]
162    fn test_store_dispatch() {
163        let store = create_test_store();
164
165        // Dispatch increment action
166        store.dispatch(TestAction::Increment).unwrap();
167        thread::sleep(Duration::from_millis(50)); // Wait for async processing
168
169        let state = store.get_state();
170        assert_eq!(state.counter, 1);
171
172        // Dispatch set message action
173        store.dispatch(TestAction::SetMessage("Hello".into())).unwrap();
174        thread::sleep(Duration::from_millis(50));
175
176        let state = store.get_state();
177        assert_eq!(state.message, "Hello");
178
179        store.stop();
180    }
181
182    #[test]
183    fn test_store_multiple_actions() {
184        let store = create_test_store();
185
186        // Dispatch multiple actions
187        store.dispatch(TestAction::Increment).unwrap();
188        store.dispatch(TestAction::Increment).unwrap();
189        store.dispatch(TestAction::SetMessage("Test".into())).unwrap();
190        store.dispatch(TestAction::Decrement).unwrap();
191
192        thread::sleep(Duration::from_millis(100));
193
194        let final_state = store.get_state();
195        assert_eq!(final_state.counter, 1);
196        assert_eq!(final_state.message, "Test");
197
198        store.stop();
199    }
200
201    #[test]
202    fn test_store_after_stop() {
203        let store = create_test_store();
204        store.stop();
205
206        // Dispatch should fail after stop
207        let result = store.dispatch(TestAction::Increment);
208        assert!(result.is_err());
209
210        match result {
211            Err(StoreError::DispatchError(_)) => (),
212            _ => panic!("Expected DispatchError"),
213        }
214    }
215
216    #[test]
217    fn test_store_concurrent_access() {
218        let store = Arc::new(create_test_store());
219        let store_clone = store.clone();
220
221        let handle = thread::spawn(move || {
222            for _ in 0..5 {
223                store_clone.dispatch(TestAction::Increment).unwrap();
224                thread::sleep(Duration::from_millis(10));
225            }
226        });
227
228        for _ in 0..5 {
229            store.dispatch(TestAction::Decrement).unwrap();
230            thread::sleep(Duration::from_millis(10));
231        }
232
233        handle.join().unwrap();
234        thread::sleep(Duration::from_millis(100));
235
236        let final_state = store.get_state();
237        // Final counter should be 0 (5 increments and 5 decrements)
238        assert_eq!(final_state.counter, 0);
239
240        store.stop();
241    }
242
243    #[test]
244    fn test_store_builder_configurations() {
245        let store = StoreBuilder::new(TestState::default())
246            .with_reducer(Box::new(TestReducer))
247            .with_name("custom-store".into())
248            .with_capacity(32)
249            .with_policy(BackpressurePolicy::DropLatest)
250            .build()
251            .unwrap();
252
253        store.dispatch(TestAction::Increment).unwrap();
254        thread::sleep(Duration::from_millis(50));
255
256        let state = store.get_state();
257        assert_eq!(state.counter, 1);
258
259        store.stop();
260    }
261
262    #[test]
263    fn test_store_error_handling() {
264        let store = create_test_store();
265        store.stop();
266
267        // Test various error conditions
268        let dispatch_result = store.dispatch(TestAction::Increment);
269        // println!("dispatch_result: {:?}", dispatch_result);
270        // dispatch_result: Err(DispatchError("Dispatch channel is closed"))
271        assert!(matches!(dispatch_result, Err(StoreError::DispatchError(_))));
272
273        // Test that the store remains in a consistent state after errors
274        let state = store.get_state();
275        assert_eq!(state.counter, 0);
276    }
277}