1use crate::{BackpressurePolicy, Subscriber, Subscription};
2use std::sync::Arc;
3use std::time::{Duration, Instant};
4
5pub const DEFAULT_CAPACITY: usize = 16;
7pub const DEFAULT_STORE_NAME: &str = "store";
8
9#[derive(Debug, thiserror::Error)]
11pub enum StoreError {
12 #[error("dispatch error: {0}")]
13 DispatchError(String),
14 #[error("reducer error: {0}")]
15 ReducerError(String),
16 #[error("subscription error: {0}")]
17 SubscriptionError(String),
18 #[error("middleware error: {0}")]
19 MiddlewareError(String),
20 #[error("initialization error: {0}")]
21 InitError(String),
22 #[error("state update failed: {context}, cause: {source}")]
24 StateUpdateError {
25 context: String,
26 source: Box<dyn std::error::Error + Send + Sync>,
27 },
28}
29
30pub trait Store<State, Action>: Send + Sync
34where
35 State: Send + Sync + Clone + 'static,
36 Action: Send + Sync + Clone + std::fmt::Debug + 'static,
37{
38 fn get_state(&self) -> State;
40
41 fn dispatch(&self, action: Action) -> Result<(), StoreError>;
43
44 fn add_subscriber(
47 &self,
48 subscriber: Arc<dyn Subscriber<State, Action> + Send + Sync>,
49 ) -> Result<Box<dyn Subscription>, StoreError>;
50
51 fn subscribed(
57 &self,
58 subscriber: Box<dyn Subscriber<State, Action> + Send + Sync>,
59 ) -> Result<Box<dyn Subscription>, StoreError>;
60
61 fn subscribed_with(
71 &self,
72 capacity: usize,
73 policy: BackpressurePolicy<(Instant, State, Action)>,
74 subscriber: Box<dyn Subscriber<State, Action> + Send + Sync>,
75 ) -> Result<Box<dyn Subscription>, StoreError>;
76
77 fn stop(&self) -> Result<(), StoreError>;
80
81 fn stop_timeout(&self, timeout: Duration) -> Result<(), StoreError>;
83}
84
85#[cfg(test)]
86mod tests {
87 use super::*;
88 use crate::builder::StoreBuilder;
89 use crate::BackpressurePolicy;
90 use crate::Reducer;
91 use crate::StoreImpl;
92 use std::sync::Arc;
93 use std::sync::Mutex;
94 use std::thread;
95 use std::time::Duration;
96
97 #[derive(Debug, Clone, PartialEq)]
99 struct TestState {
100 counter: i32,
101 message: String,
102 }
103
104 impl Default for TestState {
105 fn default() -> Self {
106 TestState {
107 counter: 0,
108 message: String::new(),
109 }
110 }
111 }
112
113 #[derive(Debug, Clone)]
114 enum TestAction {
115 Increment,
116 Decrement,
117 SetMessage(String),
118 }
119
120 struct TestReducer;
121
122 impl Reducer<TestState, TestAction> for TestReducer {
123 fn reduce(
124 &self,
125 state: &TestState,
126 action: &TestAction,
127 ) -> crate::DispatchOp<TestState, TestAction> {
128 match action {
129 TestAction::Increment => {
130 let mut new_state = state.clone();
131 new_state.counter += 1;
132 crate::DispatchOp::Dispatch(new_state, vec![])
133 }
134 TestAction::Decrement => {
135 let mut new_state = state.clone();
136 new_state.counter -= 1;
137 crate::DispatchOp::Dispatch(new_state, vec![])
138 }
139 TestAction::SetMessage(msg) => {
140 let mut new_state = state.clone();
141 new_state.message = msg.clone();
142 crate::DispatchOp::Dispatch(new_state, vec![])
143 }
144 }
145 }
146 }
147
148 fn create_test_store() -> Arc<StoreImpl<TestState, TestAction>> {
149 StoreImpl::new_with(
150 TestState::default(),
151 vec![Box::new(TestReducer)],
152 "test-store".into(),
153 16,
154 BackpressurePolicy::default(),
155 vec![],
156 )
157 .unwrap()
158 }
159
160 struct TestChannneledReducer;
161
162 impl Reducer<i32, i32> for TestChannneledReducer {
163 fn reduce(&self, state: &i32, action: &i32) -> crate::DispatchOp<i32, i32> {
164 crate::DispatchOp::Dispatch(state + action, vec![])
165 }
166 }
167
168 struct TestChannelSubscriber {
169 received: Arc<Mutex<Vec<(i32, i32)>>>,
170 }
171
172 impl TestChannelSubscriber {
173 fn new(received: Arc<Mutex<Vec<(i32, i32)>>>) -> Self {
174 Self { received }
175 }
176 }
177
178 impl Subscriber<i32, i32> for TestChannelSubscriber {
179 fn on_notify(&self, state: &i32, action: &i32) {
180 self.received.lock().unwrap().push((*state, *action));
182 }
183 }
184
185 struct SlowSubscriber {
186 received: Arc<Mutex<Vec<(i32, i32)>>>,
187 delay: Duration,
188 }
189
190 impl SlowSubscriber {
191 fn new(received: Arc<Mutex<Vec<(i32, i32)>>>, delay: Duration) -> Self {
192 Self { received, delay }
193 }
194 }
195
196 impl Subscriber<i32, i32> for SlowSubscriber {
197 fn on_notify(&self, state: &i32, action: &i32) {
198 std::thread::sleep(self.delay);
200 self.received.lock().unwrap().push((*state, *action));
201 }
202 }
203
204 #[test]
205 fn test_store_get_state() {
206 let store = create_test_store();
207 let initial_state = store.get_state();
208 assert_eq!(initial_state.counter, 0);
209 assert_eq!(initial_state.message, "");
210 }
211
212 #[test]
213 fn test_store_dispatch() {
214 let store = create_test_store();
215
216 store.dispatch(TestAction::Increment).unwrap();
218 thread::sleep(Duration::from_millis(50)); let state = store.get_state();
221 assert_eq!(state.counter, 1);
222
223 store.dispatch(TestAction::SetMessage("Hello".into())).unwrap();
225 thread::sleep(Duration::from_millis(50));
226
227 let state = store.get_state();
228 assert_eq!(state.message, "Hello");
229
230 match store.stop() {
231 Ok(_) => println!("store stopped"),
232 Err(e) => {
233 panic!("store stop failed : {:?}", e);
234 }
235 }
236 }
237
238 #[test]
239 fn test_store_multiple_actions() {
240 let store = create_test_store();
241
242 store.dispatch(TestAction::Increment).unwrap();
244 store.dispatch(TestAction::Increment).unwrap();
245 store.dispatch(TestAction::SetMessage("Test".into())).unwrap();
246 store.dispatch(TestAction::Decrement).unwrap();
247
248 thread::sleep(Duration::from_millis(100));
249
250 let final_state = store.get_state();
251 assert_eq!(final_state.counter, 1);
252 assert_eq!(final_state.message, "Test");
253
254 match store.stop() {
255 Ok(_) => println!("store stopped"),
256 Err(e) => {
257 panic!("store stop failed : {:?}", e);
258 }
259 }
260 }
261
262 #[test]
263 fn test_store_after_stop() {
264 let store = create_test_store();
265 match store.stop() {
266 Ok(_) => println!("store stopped"),
267 Err(e) => {
268 panic!("store stop failed : {:?}", e);
269 }
270 }
271
272 let result = store.dispatch(TestAction::Increment);
274 assert!(result.is_err());
275
276 match result {
277 Err(StoreError::DispatchError(_)) => (),
278 _ => panic!("Expected DispatchError"),
279 }
280 }
281
282 #[test]
283 fn test_store_concurrent_access() {
284 let store = Arc::new(create_test_store());
285 let store_clone = store.clone();
286
287 let handle = thread::spawn(move || {
288 for _ in 0..5 {
289 store_clone.dispatch(TestAction::Increment).unwrap();
290 thread::sleep(Duration::from_millis(10));
291 }
292 });
293
294 for _ in 0..5 {
295 store.dispatch(TestAction::Decrement).unwrap();
296 thread::sleep(Duration::from_millis(10));
297 }
298
299 handle.join().unwrap();
300 thread::sleep(Duration::from_millis(100));
301
302 let final_state = store.get_state();
303 assert_eq!(final_state.counter, 0);
305
306 match store.stop() {
307 Ok(_) => println!("store stopped"),
308 Err(e) => {
309 panic!("store stop failed : {:?}", e);
310 }
311 }
312 }
313
314 #[test]
315 fn test_store_builder_configurations() {
316 #[allow(deprecated)]
317 let store = StoreBuilder::new(TestState::default())
318 .with_reducer(Box::new(TestReducer))
319 .with_name("custom-store".into())
320 .with_capacity(32)
321 .with_policy(BackpressurePolicy::DropLatestIf(None))
322 .build()
323 .unwrap();
324
325 store.dispatch(TestAction::Increment).unwrap();
326 thread::sleep(Duration::from_millis(50));
327
328 let state = store.get_state();
329 assert_eq!(state.counter, 1);
330
331 match store.stop() {
332 Ok(_) => println!("store stopped"),
333 Err(e) => {
334 panic!("store stop failed : {:?}", e);
335 }
336 }
337 }
338
339 #[test]
340 fn test_store_error_handling() {
341 let store = create_test_store();
342 match store.stop() {
343 Ok(_) => println!("store stopped"),
344 Err(e) => {
345 panic!("store stop failed : {:?}", e);
346 }
347 }
348
349 let dispatch_result = store.dispatch(TestAction::Increment);
351 assert!(matches!(dispatch_result, Err(StoreError::DispatchError(_))));
354
355 let state = store.get_state();
357 assert_eq!(state.counter, 0);
358 }
359
360 #[test]
362 fn test_subscribed_basic_functionality() {
363 let store =
364 StoreBuilder::new_with_reducer(0, Box::new(TestChannneledReducer)).build().unwrap();
365
366 let received = Arc::new(Mutex::new(Vec::new()));
367 let subscriber = Box::new(TestChannelSubscriber::new(received.clone()));
368
369 let subscription = store.subscribed(subscriber).unwrap();
371
372 store.dispatch(1).unwrap();
374 store.dispatch(2).unwrap();
375 store.dispatch(3).unwrap();
376
377 thread::sleep(Duration::from_millis(100));
379
380 store.stop().unwrap();
382
383 subscription.unsubscribe();
385
386 let states = received.lock().unwrap();
388 assert_eq!(states.len(), 3);
389 assert_eq!(states[0], (1, 1)); assert_eq!(states[1], (3, 2)); assert_eq!(states[2], (6, 3)); }
393
394 #[test]
396 fn test_subscribed_concurrent_subscribers() {
397 let store =
398 StoreBuilder::new_with_reducer(0, Box::new(TestChannneledReducer)).build().unwrap();
399
400 let received1 = Arc::new(Mutex::new(Vec::new()));
401 let received2 = Arc::new(Mutex::new(Vec::new()));
402 let received3 = Arc::new(Mutex::new(Vec::new()));
403
404 let subscription1 =
406 store.subscribed(Box::new(TestChannelSubscriber::new(received1.clone()))).unwrap();
407 let subscription2 =
408 store.subscribed(Box::new(TestChannelSubscriber::new(received2.clone()))).unwrap();
409 let subscription3 =
410 store.subscribed(Box::new(TestChannelSubscriber::new(received3.clone()))).unwrap();
411
412 for i in 1..=10 {
414 store.dispatch(i).unwrap();
415 }
416
417 thread::sleep(Duration::from_millis(200));
419
420 store.stop().unwrap();
422
423 subscription1.unsubscribe();
425 subscription2.unsubscribe();
426 subscription3.unsubscribe();
427
428 let states1 = received1.lock().unwrap();
430 let states2 = received2.lock().unwrap();
431 let states3 = received3.lock().unwrap();
432
433 assert_eq!(states1.len(), 10);
434 assert_eq!(states2.len(), 10);
435 assert_eq!(states3.len(), 10);
436
437 for i in 0..10 {
439 assert_eq!(states1[i].1, states2[i].1); assert_eq!(states2[i].1, states3[i].1);
441 }
442 }
443
444 #[test]
446 fn test_subscribed_drop_latest_if_policy() {
447 let store =
448 StoreBuilder::new_with_reducer(0, Box::new(TestChannneledReducer)).build().unwrap();
449
450 let received = Arc::new(Mutex::new(Vec::new()));
451 let subscriber = Box::new(SlowSubscriber::new(
452 received.clone(),
453 Duration::from_millis(50),
454 ));
455
456 let predicate = Box::new(|(_, _, action): &(Instant, i32, i32)| *action < 5);
458 let policy = BackpressurePolicy::DropLatestIf(Some(predicate));
459 let subscription = store.subscribed_with(2, policy, subscriber).unwrap();
460
461 for i in 1..=10 {
463 store.dispatch(i).unwrap();
464 }
465
466 thread::sleep(Duration::from_millis(300));
468
469 store.stop().unwrap();
470 subscription.unsubscribe();
471
472 let states = received.lock().unwrap();
474 assert!(
476 states.len() < 10,
477 "Expected fewer than 10 messages due to backpressure, got {}",
478 states.len()
479 );
480
481 assert!(
483 states.len() > 0,
484 "Expected at least some messages to be received"
485 );
486 }
487
488 #[test]
490 fn test_subscribed_error_handling() {
491 let store =
492 StoreBuilder::new_with_reducer(0, Box::new(TestChannneledReducer)).build().unwrap();
493
494 let received = Arc::new(Mutex::new(Vec::new()));
495 let subscriber = Box::new(TestChannelSubscriber::new(received.clone()));
496
497 let subscription = store.subscribed(subscriber).unwrap();
499 subscription.unsubscribe();
503
504 store.dispatch(1).unwrap();
506 thread::sleep(Duration::from_millis(50));
507
508 let states = received.lock().unwrap();
509 assert_eq!(states.len(), 0); store.stop().unwrap();
512 }
513
514 #[test]
516 fn test_subscribed_thread_lifecycle() {
517 let store =
518 StoreBuilder::new_with_reducer(0, Box::new(TestChannneledReducer)).build().unwrap();
519
520 let received = Arc::new(Mutex::new(Vec::new()));
521 let subscriber = Box::new(TestChannelSubscriber::new(received.clone()));
522
523 let subscription = store.subscribed(subscriber).unwrap();
525
526 store.dispatch(1).unwrap();
528 thread::sleep(Duration::from_millis(50));
529
530 subscription.unsubscribe();
532
533 store.dispatch(2).unwrap();
535 thread::sleep(Duration::from_millis(50));
536
537 let states = received.lock().unwrap();
538 assert_eq!(states.len(), 1); assert_eq!(states[0], (1, 1));
540
541 store.stop().unwrap();
542 }
543
544 #[test]
546 fn test_subscribed_mixed_with_add_subscriber() {
547 let store =
548 StoreBuilder::new_with_reducer(0, Box::new(TestChannneledReducer)).build().unwrap();
549
550 let received_main = Arc::new(Mutex::new(Vec::new()));
552 let subscriber_main = Arc::new(TestChannelSubscriber::new(received_main.clone()));
553 let _subscription_main = store.add_subscriber(subscriber_main).unwrap();
554
555 let received_channeled = Arc::new(Mutex::new(Vec::new()));
557 let subscriber_channeled = Box::new(TestChannelSubscriber::new(received_channeled.clone()));
558 let subscription_channeled = store.subscribed(subscriber_channeled).unwrap();
559
560 for i in 1..=5 {
562 store.dispatch(i).unwrap();
563 }
564
565 thread::sleep(Duration::from_millis(100));
566
567 store.stop().unwrap();
568 subscription_channeled.unsubscribe();
569
570 let states_main = received_main.lock().unwrap();
572 let states_channeled = received_channeled.lock().unwrap();
573
574 assert_eq!(states_main.len(), 5);
575 assert_eq!(states_channeled.len(), 5);
576
577 for i in 0..5 {
578 assert_eq!(states_main[i], states_channeled[i]);
579 }
580 }
581}