rs_store/
store.rs

1use crate::{BackpressurePolicy, Subscriber, Subscription};
2use std::sync::Arc;
3use std::time::{Duration, Instant};
4
5/// Default capacity for the channel
6pub const DEFAULT_CAPACITY: usize = 16;
7pub const DEFAULT_STORE_NAME: &str = "store";
8
9/// StoreError represents an error that occurred in the store
10#[derive(Debug, thiserror::Error)]
11pub enum StoreError {
12    #[error("dispatch error: {0}")]
13    DispatchError(String),
14    #[error("reducer error: {0}")]
15    ReducerError(String),
16    #[error("subscription error: {0}")]
17    SubscriptionError(String),
18    #[error("middleware error: {0}")]
19    MiddlewareError(String),
20    #[error("initialization error: {0}")]
21    InitError(String),
22    /// state update failed with context and source
23    #[error("state update failed: {context}, cause: {source}")]
24    StateUpdateError {
25        context: String,
26        source: Box<dyn std::error::Error + Send + Sync>,
27    },
28}
29
30/// Store is a thread-safe and concurrent-safe store for Redux-like state management
31/// It provides a way to manage the state of an application in a thread-safe and concurrent-safe manner
32/// It supports reducers, subscribers, and async actions through Thunk
33pub trait Store<State, Action>: Send + Sync
34where
35    State: Send + Sync + Clone + 'static,
36    Action: Send + Sync + Clone + std::fmt::Debug + 'static,
37{
38    /// Get the current state
39    fn get_state(&self) -> State;
40
41    /// Dispatch an action
42    fn dispatch(&self, action: Action) -> Result<(), StoreError>;
43
44    /// Add a subscriber to the store
45    /// store updates are delivered to the subscriber in same reducer thread
46    fn add_subscriber(
47        &self,
48        subscriber: Arc<dyn Subscriber<State, Action> + Send + Sync>,
49    ) -> Result<Box<dyn Subscription>, StoreError>;
50
51    ///// Iterate over the store's state and action pairs
52    //fn iter(&self) -> impl Iterator<Item = (State, Action)>;
53
54    /// subscribe to the store in new context
55    /// store updates are delivered to the subscriber in the new context
56    fn subscribed(
57        &self,
58        subscriber: Box<dyn Subscriber<State, Action> + Send + Sync>,
59    ) -> Result<Box<dyn Subscription>, StoreError>;
60
61    /// subscribe to the store in new context
62    /// store updates are delivered to the subscriber in the new context
63    ///
64    /// ### Parameters
65    /// * capacity: Channel buffer capacity
66    /// * policy: Backpressure policy for when down channel(store to subscriber) is full
67    ///
68    /// ### Return
69    /// * Subscription: Subscription for the store,
70    fn subscribed_with(
71        &self,
72        capacity: usize,
73        policy: BackpressurePolicy<(Instant, State, Action)>,
74        subscriber: Box<dyn Subscriber<State, Action> + Send + Sync>,
75    ) -> Result<Box<dyn Subscription>, StoreError>;
76
77    /// Stop the store
78    /// when the queue is full, the send can be blocked if there is no droppable item
79    fn stop(&self) -> Result<(), StoreError>;
80
81    /// Stop the store with timeout
82    fn stop_timeout(&self, timeout: Duration) -> Result<(), StoreError>;
83}
84
85#[cfg(test)]
86mod tests {
87    use super::*;
88    use crate::builder::StoreBuilder;
89    use crate::BackpressurePolicy;
90    use crate::Reducer;
91    use crate::StoreImpl;
92    use std::sync::Arc;
93    use std::sync::Mutex;
94    use std::thread;
95    use std::time::Duration;
96
97    // Mock implementations for testing
98    #[derive(Debug, Clone, PartialEq)]
99    struct TestState {
100        counter: i32,
101        message: String,
102    }
103
104    impl Default for TestState {
105        fn default() -> Self {
106            TestState {
107                counter: 0,
108                message: String::new(),
109            }
110        }
111    }
112
113    #[derive(Debug, Clone)]
114    enum TestAction {
115        Increment,
116        Decrement,
117        SetMessage(String),
118    }
119
120    struct TestReducer;
121
122    impl Reducer<TestState, TestAction> for TestReducer {
123        fn reduce(
124            &self,
125            state: &TestState,
126            action: &TestAction,
127        ) -> crate::DispatchOp<TestState, TestAction> {
128            match action {
129                TestAction::Increment => {
130                    let mut new_state = state.clone();
131                    new_state.counter += 1;
132                    crate::DispatchOp::Dispatch(new_state, vec![])
133                }
134                TestAction::Decrement => {
135                    let mut new_state = state.clone();
136                    new_state.counter -= 1;
137                    crate::DispatchOp::Dispatch(new_state, vec![])
138                }
139                TestAction::SetMessage(msg) => {
140                    let mut new_state = state.clone();
141                    new_state.message = msg.clone();
142                    crate::DispatchOp::Dispatch(new_state, vec![])
143                }
144            }
145        }
146    }
147
148    fn create_test_store() -> Arc<StoreImpl<TestState, TestAction>> {
149        StoreImpl::new_with(
150            TestState::default(),
151            vec![Box::new(TestReducer)],
152            "test-store".into(),
153            16,
154            BackpressurePolicy::default(),
155            vec![],
156        )
157        .unwrap()
158    }
159
160    struct TestChannneledReducer;
161
162    impl Reducer<i32, i32> for TestChannneledReducer {
163        fn reduce(&self, state: &i32, action: &i32) -> crate::DispatchOp<i32, i32> {
164            crate::DispatchOp::Dispatch(state + action, vec![])
165        }
166    }
167
168    struct TestChannelSubscriber {
169        received: Arc<Mutex<Vec<(i32, i32)>>>,
170    }
171
172    impl TestChannelSubscriber {
173        fn new(received: Arc<Mutex<Vec<(i32, i32)>>>) -> Self {
174            Self { received }
175        }
176    }
177
178    impl Subscriber<i32, i32> for TestChannelSubscriber {
179        fn on_notify(&self, state: &i32, action: &i32) {
180            //println!("TestChannelSubscriber: state={}, action={}", state, action);
181            self.received.lock().unwrap().push((*state, *action));
182        }
183    }
184
185    struct SlowSubscriber {
186        received: Arc<Mutex<Vec<(i32, i32)>>>,
187        delay: Duration,
188    }
189
190    impl SlowSubscriber {
191        fn new(received: Arc<Mutex<Vec<(i32, i32)>>>, delay: Duration) -> Self {
192            Self { received, delay }
193        }
194    }
195
196    impl Subscriber<i32, i32> for SlowSubscriber {
197        fn on_notify(&self, state: &i32, action: &i32) {
198            //println!("SlowSubscriber: state={}, action={}", state, action);
199            std::thread::sleep(self.delay);
200            self.received.lock().unwrap().push((*state, *action));
201        }
202    }
203
204    #[test]
205    fn test_store_get_state() {
206        let store = create_test_store();
207        let initial_state = store.get_state();
208        assert_eq!(initial_state.counter, 0);
209        assert_eq!(initial_state.message, "");
210    }
211
212    #[test]
213    fn test_store_dispatch() {
214        let store = create_test_store();
215
216        // Dispatch increment action
217        store.dispatch(TestAction::Increment).unwrap();
218        thread::sleep(Duration::from_millis(50)); // Wait for async processing
219
220        let state = store.get_state();
221        assert_eq!(state.counter, 1);
222
223        // Dispatch set message action
224        store.dispatch(TestAction::SetMessage("Hello".into())).unwrap();
225        thread::sleep(Duration::from_millis(50));
226
227        let state = store.get_state();
228        assert_eq!(state.message, "Hello");
229
230        match store.stop() {
231            Ok(_) => println!("store stopped"),
232            Err(e) => {
233                panic!("store stop failed  : {:?}", e);
234            }
235        }
236    }
237
238    #[test]
239    fn test_store_multiple_actions() {
240        let store = create_test_store();
241
242        // Dispatch multiple actions
243        store.dispatch(TestAction::Increment).unwrap();
244        store.dispatch(TestAction::Increment).unwrap();
245        store.dispatch(TestAction::SetMessage("Test".into())).unwrap();
246        store.dispatch(TestAction::Decrement).unwrap();
247
248        thread::sleep(Duration::from_millis(100));
249
250        let final_state = store.get_state();
251        assert_eq!(final_state.counter, 1);
252        assert_eq!(final_state.message, "Test");
253
254        match store.stop() {
255            Ok(_) => println!("store stopped"),
256            Err(e) => {
257                panic!("store stop failed  : {:?}", e);
258            }
259        }
260    }
261
262    #[test]
263    fn test_store_after_stop() {
264        let store = create_test_store();
265        match store.stop() {
266            Ok(_) => println!("store stopped"),
267            Err(e) => {
268                panic!("store stop failed  : {:?}", e);
269            }
270        }
271
272        // Dispatch should fail after stop
273        let result = store.dispatch(TestAction::Increment);
274        assert!(result.is_err());
275
276        match result {
277            Err(StoreError::DispatchError(_)) => (),
278            _ => panic!("Expected DispatchError"),
279        }
280    }
281
282    #[test]
283    fn test_store_concurrent_access() {
284        let store = Arc::new(create_test_store());
285        let store_clone = store.clone();
286
287        let handle = thread::spawn(move || {
288            for _ in 0..5 {
289                store_clone.dispatch(TestAction::Increment).unwrap();
290                thread::sleep(Duration::from_millis(10));
291            }
292        });
293
294        for _ in 0..5 {
295            store.dispatch(TestAction::Decrement).unwrap();
296            thread::sleep(Duration::from_millis(10));
297        }
298
299        handle.join().unwrap();
300        thread::sleep(Duration::from_millis(100));
301
302        let final_state = store.get_state();
303        // Final counter should be 0 (5 increments and 5 decrements)
304        assert_eq!(final_state.counter, 0);
305
306        match store.stop() {
307            Ok(_) => println!("store stopped"),
308            Err(e) => {
309                panic!("store stop failed  : {:?}", e);
310            }
311        }
312    }
313
314    #[test]
315    fn test_store_builder_configurations() {
316        #[allow(deprecated)]
317        let store = StoreBuilder::new(TestState::default())
318            .with_reducer(Box::new(TestReducer))
319            .with_name("custom-store".into())
320            .with_capacity(32)
321            .with_policy(BackpressurePolicy::DropLatestIf(None))
322            .build()
323            .unwrap();
324
325        store.dispatch(TestAction::Increment).unwrap();
326        thread::sleep(Duration::from_millis(50));
327
328        let state = store.get_state();
329        assert_eq!(state.counter, 1);
330
331        match store.stop() {
332            Ok(_) => println!("store stopped"),
333            Err(e) => {
334                panic!("store stop failed  : {:?}", e);
335            }
336        }
337    }
338
339    #[test]
340    fn test_store_error_handling() {
341        let store = create_test_store();
342        match store.stop() {
343            Ok(_) => println!("store stopped"),
344            Err(e) => {
345                panic!("store stop failed  : {:?}", e);
346            }
347        }
348
349        // Test various error conditions
350        let dispatch_result = store.dispatch(TestAction::Increment);
351        // println!("dispatch_result: {:?}", dispatch_result);
352        // dispatch_result: Err(DispatchError("Dispatch channel is closed"))
353        assert!(matches!(dispatch_result, Err(StoreError::DispatchError(_))));
354
355        // Test that the store remains in a consistent state after errors
356        let state = store.get_state();
357        assert_eq!(state.counter, 0);
358    }
359
360    // subscribed() 기본 기능 테스트 - 별도 스레드에서 실행되는지 확인
361    #[test]
362    fn test_subscribed_basic_functionality() {
363        let store =
364            StoreBuilder::new_with_reducer(0, Box::new(TestChannneledReducer)).build().unwrap();
365
366        let received = Arc::new(Mutex::new(Vec::new()));
367        let subscriber = Box::new(TestChannelSubscriber::new(received.clone()));
368
369        // subscribed()로 구독 - 별도 스레드에서 실행됨
370        let subscription = store.subscribed(subscriber).unwrap();
371
372        // 액션들을 dispatch
373        store.dispatch(1).unwrap();
374        store.dispatch(2).unwrap();
375        store.dispatch(3).unwrap();
376
377        // 잠시 대기하여 채널을 통해 메시지가 전달되도록 함
378        thread::sleep(Duration::from_millis(100));
379
380        // store 정지
381        store.stop().unwrap();
382
383        // 구독 해제
384        subscription.unsubscribe();
385
386        // 수신된 메시지 검증
387        let states = received.lock().unwrap();
388        assert_eq!(states.len(), 3);
389        assert_eq!(states[0], (1, 1)); // state=0+1, action=1
390        assert_eq!(states[1], (3, 2)); // state=1+2, action=2
391        assert_eq!(states[2], (6, 3)); // state=3+3, action=3
392    }
393
394    // subscribed() 동시성 테스트 - 여러 subscriber가 동시에 실행되는지 확인
395    #[test]
396    fn test_subscribed_concurrent_subscribers() {
397        let store =
398            StoreBuilder::new_with_reducer(0, Box::new(TestChannneledReducer)).build().unwrap();
399
400        let received1 = Arc::new(Mutex::new(Vec::new()));
401        let received2 = Arc::new(Mutex::new(Vec::new()));
402        let received3 = Arc::new(Mutex::new(Vec::new()));
403
404        // 3개의 subscriber를 각각 별도 스레드에서 실행
405        let subscription1 =
406            store.subscribed(Box::new(TestChannelSubscriber::new(received1.clone()))).unwrap();
407        let subscription2 =
408            store.subscribed(Box::new(TestChannelSubscriber::new(received2.clone()))).unwrap();
409        let subscription3 =
410            store.subscribed(Box::new(TestChannelSubscriber::new(received3.clone()))).unwrap();
411
412        // 동시에 여러 액션을 dispatch
413        for i in 1..=10 {
414            store.dispatch(i).unwrap();
415        }
416
417        // 잠시 대기
418        thread::sleep(Duration::from_millis(200));
419
420        // store 정지
421        store.stop().unwrap();
422
423        // 모든 구독 해제
424        subscription1.unsubscribe();
425        subscription2.unsubscribe();
426        subscription3.unsubscribe();
427
428        // 모든 subscriber가 동일한 메시지를 받았는지 확인
429        let states1 = received1.lock().unwrap();
430        let states2 = received2.lock().unwrap();
431        let states3 = received3.lock().unwrap();
432
433        assert_eq!(states1.len(), 10);
434        assert_eq!(states2.len(), 10);
435        assert_eq!(states3.len(), 10);
436
437        // 각 subscriber가 동일한 순서로 메시지를 받았는지 확인
438        for i in 0..10 {
439            assert_eq!(states1[i].1, states2[i].1); // action이 동일
440            assert_eq!(states2[i].1, states3[i].1);
441        }
442    }
443
444    // subscribed() 백프레셔 정책 테스트 - DropLatestIf 정책
445    #[test]
446    fn test_subscribed_drop_latest_if_policy() {
447        let store =
448            StoreBuilder::new_with_reducer(0, Box::new(TestChannneledReducer)).build().unwrap();
449
450        let received = Arc::new(Mutex::new(Vec::new()));
451        let subscriber = Box::new(SlowSubscriber::new(
452            received.clone(),
453            Duration::from_millis(50),
454        ));
455
456        // 작은 용량과 DropLatestIf 정책으로 구독
457        let predicate = Box::new(|(_, _, action): &(Instant, i32, i32)| *action < 5);
458        let policy = BackpressurePolicy::DropLatestIf(Some(predicate));
459        let subscription = store.subscribed_with(2, policy, subscriber).unwrap();
460
461        // 채널을 가득 채우는 액션들을 빠르게 dispatch
462        for i in 1..=10 {
463            store.dispatch(i).unwrap();
464        }
465
466        // 처리 시간 대기
467        thread::sleep(Duration::from_millis(300));
468
469        store.stop().unwrap();
470        subscription.unsubscribe();
471
472        // 백프레셔 정책에 의해 일부 메시지가 드롭되었는지 확인
473        let states = received.lock().unwrap();
474        // 채널 용량이 2이고 SlowSubscriber가 느리므로 일부 메시지만 처리됨
475        assert!(
476            states.len() < 10,
477            "Expected fewer than 10 messages due to backpressure, got {}",
478            states.len()
479        );
480
481        // 최소한 일부 메시지는 받았는지 확인
482        assert!(
483            states.len() > 0,
484            "Expected at least some messages to be received"
485        );
486    }
487
488    // subscribed() 에러 처리 테스트 - 잘못된 capacity로 구독 시도
489    #[test]
490    fn test_subscribed_error_handling() {
491        let store =
492            StoreBuilder::new_with_reducer(0, Box::new(TestChannneledReducer)).build().unwrap();
493
494        let received = Arc::new(Mutex::new(Vec::new()));
495        let subscriber = Box::new(TestChannelSubscriber::new(received.clone()));
496
497        // 유효한 구독
498        let subscription = store.subscribed(subscriber).unwrap();
499        // subscription은 Box<dyn Subscription>이므로 is_some() 메서드가 없음
500
501        // 구독 해제
502        subscription.unsubscribe();
503
504        // 이미 해제된 구독에 대한 추가 액션은 처리되지 않아야 함
505        store.dispatch(1).unwrap();
506        thread::sleep(Duration::from_millis(50));
507
508        let states = received.lock().unwrap();
509        assert_eq!(states.len(), 0); // 해제 후에는 메시지를 받지 않아야 함
510
511        store.stop().unwrap();
512    }
513
514    // subscribed() 스레드 생명주기 테스트
515    #[test]
516    fn test_subscribed_thread_lifecycle() {
517        let store =
518            StoreBuilder::new_with_reducer(0, Box::new(TestChannneledReducer)).build().unwrap();
519
520        let received = Arc::new(Mutex::new(Vec::new()));
521        let subscriber = Box::new(TestChannelSubscriber::new(received.clone()));
522
523        // 구독 생성
524        let subscription = store.subscribed(subscriber).unwrap();
525
526        // 액션 dispatch
527        store.dispatch(1).unwrap();
528        thread::sleep(Duration::from_millis(50));
529
530        // 구독 해제 - 스레드가 정상적으로 종료되어야 함
531        subscription.unsubscribe();
532
533        // 추가 액션은 처리되지 않아야 함
534        store.dispatch(2).unwrap();
535        thread::sleep(Duration::from_millis(50));
536
537        let states = received.lock().unwrap();
538        assert_eq!(states.len(), 1); // 첫 번째 액션만 처리됨
539        assert_eq!(states[0], (1, 1));
540
541        store.stop().unwrap();
542    }
543
544    // subscribed()와 add_subscriber() 혼합 사용 테스트
545    #[test]
546    fn test_subscribed_mixed_with_add_subscriber() {
547        let store =
548            StoreBuilder::new_with_reducer(0, Box::new(TestChannneledReducer)).build().unwrap();
549
550        // 일반 subscriber (reducer 스레드에서 실행)
551        let received_main = Arc::new(Mutex::new(Vec::new()));
552        let subscriber_main = Arc::new(TestChannelSubscriber::new(received_main.clone()));
553        let _subscription_main = store.add_subscriber(subscriber_main).unwrap();
554
555        // subscribed subscriber (별도 스레드에서 실행)
556        let received_channeled = Arc::new(Mutex::new(Vec::new()));
557        let subscriber_channeled = Box::new(TestChannelSubscriber::new(received_channeled.clone()));
558        let subscription_channeled = store.subscribed(subscriber_channeled).unwrap();
559
560        // 액션들 dispatch
561        for i in 1..=5 {
562            store.dispatch(i).unwrap();
563        }
564
565        thread::sleep(Duration::from_millis(100));
566
567        store.stop().unwrap();
568        subscription_channeled.unsubscribe();
569
570        // 두 subscriber 모두 동일한 메시지를 받았는지 확인
571        let states_main = received_main.lock().unwrap();
572        let states_channeled = received_channeled.lock().unwrap();
573
574        assert_eq!(states_main.len(), 5);
575        assert_eq!(states_channeled.len(), 5);
576
577        for i in 0..5 {
578            assert_eq!(states_main[i], states_channeled[i]);
579        }
580    }
581}