rs_store/
store_impl.rs

1use crate::channel::{BackpressureChannel, BackpressurePolicy, ReceiverChannel, SenderChannel};
2use crate::dispatcher::Dispatcher;
3use crate::metrics::{CountMetrics, Metrics, MetricsSnapshot};
4use crate::middleware::{MiddlewareFn, MiddlewareFnFactory};
5use crate::subscriber::SubscriberWithId;
6use crate::{DispatchOp, Effect, Reducer, SenderError, Subscriber, Subscription};
7use rusty_pool::ThreadPool;
8use std::sync::{Arc, Mutex};
9use std::thread::JoinHandle;
10use std::time::{Duration, Instant};
11use std::{fmt, thread};
12
13use crate::iterator::{StateIterator, StateIteratorSubscriber};
14use crate::store::{Store, StoreError, DEFAULT_CAPACITY, DEFAULT_STORE_NAME};
15
16const DEFAULT_STOP_TIMEOUT: Duration = Duration::from_secs(5);
17
18/// ActionOp is used to dispatch an action to the store
19#[derive(Clone, PartialEq)]
20pub(crate) enum ActionOp<Action>
21where
22    Action: Send + Sync + Clone + 'static,
23{
24    /// Action is used to dispatch an action to the store
25    Action(Action),
26    /// AddSubscriber is used to add a subscriber to the store
27    AddSubscriber,
28    /// StateFunction is used to execute a function with the current state
29    StateFunction,
30    /// Exit is used to exit the store and should not be dropped
31    #[allow(dead_code)]
32    Exit(Instant),
33}
34
35impl<Action> fmt::Debug for ActionOp<Action>
36where
37    Action: fmt::Debug + Send + Sync + Clone + 'static,
38{
39    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
40        match self {
41            ActionOp::Action(action) => f.debug_tuple("Action").field(action).finish(),
42            ActionOp::AddSubscriber => f.write_str("AddSubscriber"),
43            ActionOp::StateFunction => f.write_str("StateFunction"),
44            ActionOp::Exit(instant) => f.debug_tuple("Exit").field(instant).finish(),
45        }
46    }
47}
48
49// format Action without Debug
50#[cfg(feature = "store-log")]
51pub(crate) fn describe_action<Action>(_action: &Action) -> String
52where
53    Action: Send + Sync + Clone + 'static,
54{
55    format!("Action<{}>(..)", std::any::type_name::<Action>())
56}
57
58// format ActionOp<Action> in which Action is not bound to Debug
59// #[cfg(feature = "store-log")]
60pub(crate) fn describe_action_op<Action>(action_op: &ActionOp<Action>) -> String
61where
62    Action: Send + Sync + Clone + 'static,
63{
64    match action_op {
65        ActionOp::Action(_) => {
66            format!("Action<{}>(..)", std::any::type_name::<Action>())
67        }
68        ActionOp::AddSubscriber => "AddSubscriber".to_string(),
69        ActionOp::StateFunction => "StateFunction".to_string(),
70        ActionOp::Exit(instant) => format!("Exit({instant:?})"),
71    }
72}
73
74/// StoreImpl is the default implementation of a Redux store.
75///
76/// ## Caution
77/// [`StoreImpl`] is the default implementation of the [`Store`] trait, and its interface can be changed in the future.
78/// [`Store`] is the stable interface for the store that user code should depend on.
79#[allow(clippy::type_complexity)]
80pub struct StoreImpl<State, Action>
81where
82    State: Send + Sync + Clone + 'static,
83    Action: Send + Sync + Clone + 'static,
84{
85    #[allow(dead_code)]
86    pub(crate) name: String,
87    state: Mutex<State>,
88    pub(crate) reducers: Mutex<Vec<Box<dyn Reducer<State, Action> + Send + Sync>>>,
89    pub(crate) subscribers: Arc<Mutex<Vec<SubscriberWithId<State, Action>>>>,
90    /// temporary vector to store subscribers to be added
91    adding_subscribers: Arc<Mutex<Vec<SubscriberWithId<State, Action>>>>,
92    state_functions: Arc<Mutex<Vec<Box<dyn FnOnce(&State) + Send + Sync + 'static>>>>,
93    pub(crate) dispatch_tx: Mutex<Option<SenderChannel<Action>>>,
94    /// middleware factories
95    middleware_factories: Mutex<Vec<Arc<dyn MiddlewareFnFactory<State, Action> + Send + Sync>>>, // New middleware chain
96
97    pub(crate) metrics: Arc<CountMetrics>,
98    /// thread pool for the store
99    pub(crate) pool: Mutex<Option<ThreadPool>>,
100}
101
102/// Subscription for a subscriber
103/// the subscriber can use it to unsubscribe from the store
104struct SubscriberSubscription {
105    #[allow(dead_code)]
106    subscriber_id: u64, // Store subscriber ID instead of Arc reference
107    unsubscribe: Box<dyn Fn(u64) + Send + Sync>,
108}
109
110impl Subscription for SubscriberSubscription {
111    fn unsubscribe(&self) {
112        (self.unsubscribe)(self.subscriber_id);
113    }
114}
115
116impl<State, Action> StoreImpl<State, Action>
117where
118    State: Send + Sync + Clone + 'static,
119    Action: Send + Sync + Clone + 'static,
120{
121    // /// create a new store with an initial state
122    // pub(crate) fn new(state: State) -> Result<Arc<StoreImpl<State, Action>>, StoreError> {
123    //     Self::new_with(
124    //         state,
125    //         vec![],
126    //         DEFAULT_STORE_NAME.into(),
127    //         DEFAULT_CAPACITY,
128    //         BackpressurePolicy::default(),
129    //         vec![],
130    //     )
131    // }
132
133    /// create a new store with a reducer and an initial state
134    pub fn new_with_reducer(
135        state: State,
136        reducer: Box<dyn Reducer<State, Action> + Send + Sync>,
137    ) -> Result<Arc<StoreImpl<State, Action>>, StoreError> {
138        Self::new_with(
139            state,
140            vec![reducer],
141            DEFAULT_STORE_NAME.into(),
142            DEFAULT_CAPACITY,
143            BackpressurePolicy::default(),
144            vec![],
145        )
146    }
147
148    /// create a new store with name
149    pub fn new_with_name(
150        state: State,
151        reducer: Box<dyn Reducer<State, Action> + Send + Sync>,
152        name: String,
153    ) -> Result<Arc<StoreImpl<State, Action>>, StoreError> {
154        Self::new_with(
155            state,
156            vec![reducer],
157            name,
158            DEFAULT_CAPACITY,
159            BackpressurePolicy::default(),
160            vec![],
161        )
162    }
163
164    /// create a new store
165    pub fn new_with(
166        state: State,
167        reducers: Vec<Box<dyn Reducer<State, Action> + Send + Sync>>,
168        name: String,
169        capacity: usize,
170        policy: BackpressurePolicy<Action>,
171        middlewares: Vec<Arc<dyn MiddlewareFnFactory<State, Action> + Send + Sync>>,
172    ) -> Result<Arc<StoreImpl<State, Action>>, StoreError> {
173        let metrics = Arc::new(CountMetrics::default());
174        let (tx, rx) = BackpressureChannel::<Action>::pair_with(
175            "dispatch",
176            capacity,
177            policy,
178            Some(metrics.clone()),
179        );
180
181        if reducers.is_empty() {
182            return Err(StoreError::InitError(
183                "At least one reducer is required".to_string(),
184            ));
185        }
186
187        let store_impl = StoreImpl {
188            name: name.clone(),
189            state: Mutex::new(state),
190            reducers: Mutex::new(reducers),
191            subscribers: Arc::new(Mutex::new(Vec::default())),
192            adding_subscribers: Arc::new(Mutex::new(Vec::default())),
193            state_functions: Arc::new(Mutex::new(Vec::default())),
194            middleware_factories: Mutex::new(middlewares),
195            dispatch_tx: Mutex::new(Some(tx)),
196            metrics,
197            pool: Mutex::new(Some(
198                rusty_pool::Builder::new().name(format!("{}-pool", name)).build(),
199            )),
200        };
201
202        // start a thread in which the store will listen for actions
203        let rx_store = Arc::new(store_impl);
204        let tx_store = rx_store.clone();
205
206        // reducer thread
207        match tx_store.pool.lock() {
208            Ok(pool) => {
209                if let Some(pool) = pool.as_ref() {
210                    pool.execute(move || {
211                        StoreImpl::reducer_thread(rx, rx_store);
212                    })
213                }
214            }
215            Err(_e) => {
216                #[cfg(feature = "store-log")]
217                eprintln!("store: Error while locking pool: {:?}", _e);
218                return Err(StoreError::InitError(format!(
219                    "Error while locking pool: {:?}",
220                    _e
221                )));
222            }
223        }
224
225        Ok(tx_store)
226    }
227
228    // reducer thread
229    pub(crate) fn reducer_thread(
230        rx: ReceiverChannel<Action>,
231        store_impl: Arc<StoreImpl<State, Action>>,
232    ) {
233        #[cfg(feature = "store-log")]
234        eprintln!("store: reducer thread started");
235
236        let store_clone = store_impl.clone();
237        let reducer_middleware: MiddlewareFn<State, Action> =
238            Arc::new(move |state: &State, action: &Action| {
239                let started_at = Instant::now();
240
241                let dispatch_op = if store_clone.reducers.lock().unwrap().len() == 1 {
242                    store_clone.reducers.lock().unwrap()[0].reduce(state, action)
243                } else {
244                    let reducers = store_clone.reducers.lock().unwrap();
245                    let mut iter = reducers.iter();
246
247                    // First reducer uses the input references directly
248                    let mut result = iter.next().unwrap().reduce(state, action);
249
250                    // Remaining reducers use the result from previous reducer
251                    for reducer in iter {
252                        match result {
253                            DispatchOp::Dispatch(current_state, current_effects) => {
254                                result = reducer.reduce(&current_state, action);
255                                // Merge effects from both reducers
256                                match result {
257                                    DispatchOp::Dispatch(s, mut e) => {
258                                        e.extend(current_effects);
259                                        result = DispatchOp::Dispatch(s, e);
260                                    }
261                                    DispatchOp::Keep(s, mut e) => {
262                                        e.extend(current_effects);
263                                        result = DispatchOp::Keep(s, e);
264                                    }
265                                }
266                            }
267                            DispatchOp::Keep(current_state, current_effects) => {
268                                result = reducer.reduce(&current_state, action);
269                                // Merge effects from both reducers
270                                match result {
271                                    DispatchOp::Dispatch(s, mut e) => {
272                                        e.extend(current_effects);
273                                        result = DispatchOp::Dispatch(s, e);
274                                    }
275                                    DispatchOp::Keep(s, mut e) => {
276                                        e.extend(current_effects);
277                                        result = DispatchOp::Keep(s, e);
278                                    }
279                                }
280                            }
281                        }
282                    }
283                    result
284                };
285
286                store_clone.metrics.action_reduced(
287                    Some(action),
288                    started_at.elapsed(),
289                    Instant::now().elapsed(),
290                );
291                Ok(dispatch_op)
292            });
293
294        let mut middleware_deco = reducer_middleware;
295        // chain middlewares in **REVERSE** order
296        for middleware in store_impl.middleware_factories.lock().unwrap().iter().rev() {
297            middleware_deco = middleware.create(middleware_deco);
298        }
299
300        let middleware_deco_arc = Arc::new(middleware_deco);
301        while let Some(action_op) = rx.recv() {
302            let action_received_at = Instant::now();
303            store_impl.metrics.action_received(Some(&action_op));
304            #[cfg(feature = "store-log")]
305            eprintln!(
306                "store: dispatch: action: {:?}, remains: {}",
307                describe_action_op(&action_op),
308                rx.len()
309            );
310            match action_op {
311                ActionOp::Action(action) => {
312                    // do reduce
313                    // Get current state reference while holding lock for minimal time
314                    let current_state_ref = {
315                        let state_guard = store_impl.state.lock().unwrap();
316                        state_guard.clone()
317                    };
318                    let mut effects = vec![];
319                    let result = store_impl.do_reduce(
320                        &current_state_ref,
321                        &action,
322                        &mut effects,
323                        middleware_deco_arc.clone(),
324                    );
325
326                    match result {
327                        Ok(dispatch_op) => {
328                            // do effects remain
329                            store_impl.do_effect(&mut effects, store_impl.clone());
330                            // do notify subscribers and update store state
331                            match dispatch_op {
332                                DispatchOp::Dispatch(new_state, _) => {
333                                    // Update store state
334                                    *store_impl.state.lock().unwrap() = new_state.clone();
335                                    // Notify subscribers with refs
336                                    store_impl.do_notify(
337                                        &action,
338                                        &new_state,
339                                        store_impl.clone(),
340                                        action_received_at,
341                                    );
342                                }
343                                DispatchOp::Keep(new_state, _) => {
344                                    // Update store state even if not dispatching
345                                    *store_impl.state.lock().unwrap() = new_state;
346                                }
347                            }
348                        }
349                        Err(_e) => {
350                            #[cfg(feature = "store-log")]
351                            eprintln!(
352                                "store: error do_reduce: action: {}, remains: {}",
353                                describe_action(&action),
354                                rx.len()
355                            );
356                        }
357                    }
358                    // store_impl.metrics.action_executed(Some(&action), action_received_at.elapsed());
359                }
360                ActionOp::AddSubscriber => {
361                    let mut new_subscribers = store_impl.adding_subscribers.lock().unwrap();
362                    let new_subscribers_len = new_subscribers.len();
363                    if new_subscribers_len > 0 {
364                        let current_state = store_impl.state.lock().unwrap().clone();
365                        let iter_subscribers = new_subscribers.drain(..);
366
367                        store_impl.do_subscribe(current_state, iter_subscribers);
368                    }
369
370                    #[cfg(feature = "store-log")]
371                    eprintln!("store: {} subscribers added", new_subscribers_len);
372                }
373
374                // ActionOp::RemoveSubscriber(subscriber_id) => {
375                //     rx_store.do_remove_subscriber(subscriber_id);
376                //     #[cfg(feature = "store-log")]
377                //     eprintln!("store: {} subscribers removed", subscriber_id);
378                // }
379                ActionOp::StateFunction => {
380                    store_impl.do_state_function();
381                }
382                ActionOp::Exit(_) => {
383                    store_impl.on_close(action_received_at);
384                    #[cfg(feature = "store-log")]
385                    eprintln!("store: reducer loop exit");
386                    break;
387                }
388            }
389            store_impl.metrics.action_executed(None, action_received_at.elapsed());
390        }
391
392        // drop all subscribers
393        store_impl.clear_subscribers();
394
395        #[cfg(feature = "store-log")]
396        eprintln!("store: reducer thread done");
397    }
398
399    /// get the latest state(for debugging)
400    ///
401    /// prefer to use `subscribe` to get the state
402    pub fn get_state(&self) -> State {
403        self.state.lock().unwrap().clone()
404    }
405
406    /// get the metrics
407    pub fn get_metrics(&self) -> MetricsSnapshot {
408        (&(*self.metrics)).into()
409    }
410
411    // /// add a reducer to the store
412    // pub(crate) fn add_reducer(&self, reducer: Box<dyn Reducer<State, Action> + Send + Sync>) {
413    //     let reducer_arc = Arc::from(reducer);
414    //     let mut chain_guard = self.reducer_chain.lock().unwrap();
415    //     if let Some(reducer_chain) = chain_guard.take() {
416    //         // Chain the new reducer to the existing chain
417    //         *chain_guard = Some(reducer_chain.chain(reducer_arc));
418    //     } else {
419    //         // Create new chain if none exists
420    //         *chain_guard = Some(ReducerChain::new(reducer_arc));
421    //     }
422    // }
423
424    /// add a subscriber to the store
425    pub fn add_subscriber(
426        &self,
427        subscriber: Arc<dyn Subscriber<State, Action> + Send + Sync>,
428    ) -> Result<Box<dyn Subscription>, StoreError> {
429        // SubscriberWithId로 래핑하여 unique ID 할당
430        let subscriber_with_id = SubscriberWithId::new(subscriber);
431        let subscriber_id = subscriber_with_id.id;
432
433        // 새로운 subscriber를 adding_subscribers에 추가
434        self.adding_subscribers.lock().unwrap().push(subscriber_with_id);
435
436        // ActionOp::AddSubscriber 액션을 전달하여 reducer에서 처리하도록 함
437        if let Some(tx) = self.dispatch_tx.lock().unwrap().as_ref() {
438            match tx.send(ActionOp::AddSubscriber) {
439                Ok(_) => {}
440                Err(_e) => {
441                    #[cfg(feature = "store-log")]
442                    eprintln!(
443                        "store: Error while sending add subscriber to dispatch channel: {:?}",
444                        _e
445                    );
446                    self.adding_subscribers.lock().unwrap().retain(|s| s.id != subscriber_id);
447                    return Err(StoreError::DispatchError(format!(
448                        "Error while sending add subscriber to dispatch channel: {:?}",
449                        _e
450                    )));
451                }
452            }
453        } else {
454            self.adding_subscribers.lock().unwrap().retain(|s| s.id != subscriber_id);
455            return Err(StoreError::DispatchError(
456                "Dispatch channel is closed".to_string(),
457            ));
458        }
459
460        // disposer for the subscriber
461        let subscribers = self.subscribers.clone();
462        let adding_subscribers = self.adding_subscribers.clone();
463        let subscription = Box::new(SubscriberSubscription {
464            subscriber_id, // Store the ID for comparison
465            unsubscribe: Box::new(move |subscriber_id| {
466                // dispacher는 Arc<StoreImpl<State, Action>> 이므로 RemoveSubscriber action 을 사용할 수 없는 이유
467                // 그래서 직접 vector에서 제거한다.
468
469                // remove from adding_subscribers
470                let mut adding = adding_subscribers.lock().unwrap();
471                adding.retain(|s| s.id != subscriber_id); // Compare by ID
472
473                let mut subscribers = subscribers.lock().unwrap();
474                subscribers.retain(|s| {
475                    let retain = s.id != subscriber_id; // Compare by ID
476                    if !retain {
477                        s.on_unsubscribe();
478                    }
479                    retain
480                });
481            }),
482        });
483
484        Ok(subscription)
485    }
486
487    /// clear all subscribers
488    pub(crate) fn clear_subscribers(&self) {
489        #[cfg(feature = "store-log")]
490        eprintln!("store: clear_subscribers");
491        match self.subscribers.lock() {
492            Ok(mut subscribers) => {
493                for subscriber_with_id in subscribers.iter() {
494                    subscriber_with_id.on_unsubscribe();
495                }
496                subscribers.clear();
497            }
498            Err(mut e) => {
499                #[cfg(feature = "store-log")]
500                eprintln!("store: Error while locking subscribers: {:?}", e);
501                for subscriber_with_id in e.get_ref().iter() {
502                    subscriber_with_id.on_unsubscribe();
503                }
504                e.get_mut().clear();
505            }
506        }
507    }
508
509    /// Run middleware + reducers for a single action.
510    ///
511    /// ### Parameters
512    /// * `state`: Current state reference
513    /// * `action`: Action reference
514    /// * `effects`: Mutable reference to an effects buffer that will be extended with all emitted effects
515    /// * `middleware_deco`: Middleware chain entry point
516    ///
517    /// ### Returns
518    /// * `Ok(DispatchOp<State, Action>)`: dispatch operation containing the next state and effects
519    /// * `Err(StoreError)`: if an error occurs in middleware or reducers
520    pub(crate) fn do_reduce(
521        &self,
522        state: &State,
523        action: &Action,
524        effects: &mut Vec<Effect<Action>>,
525        middleware_deco: Arc<MiddlewareFn<State, Action>>,
526    ) -> Result<DispatchOp<State, Action>, StoreError> {
527        let started_at = Instant::now();
528
529        // call middleware chain
530        let mut dispatch_op = middleware_deco(state, action)?;
531
532        // Extract effects from DispatchOp and add to effects vector (mutable parameter)
533        match &mut dispatch_op {
534            DispatchOp::Dispatch(_, ref mut result_effects) => {
535                effects.append(result_effects);
536            }
537            DispatchOp::Keep(_, ref mut result_effects) => {
538                effects.append(result_effects);
539            }
540        }
541
542        self.metrics.middleware_executed(Some(action), "", 1, started_at.elapsed());
543
544        Ok(dispatch_op)
545    }
546
547    pub(crate) fn do_effect(
548        &self,
549        effects: &mut Vec<Effect<Action>>,
550        dispatcher: Arc<StoreImpl<State, Action>>,
551    ) {
552        let effect_start = Instant::now();
553        self.metrics.effect_issued(effects.len());
554
555        let effects_total = effects.len();
556        while !effects.is_empty() {
557            let effect = effects.remove(0);
558            match effect {
559                Effect::Action(a) => {
560                    dispatcher.dispatch_thunk(Box::new(
561                        move |weak_dispatcher| match weak_dispatcher.dispatch(a) {
562                            Ok(_) => {}
563                            Err(_e) => {
564                                #[cfg(feature = "store-log")]
565                                eprintln!("Error while dispatching action: {:?}", _e);
566                            }
567                        },
568                    ));
569                }
570                Effect::Task(task) => {
571                    dispatcher.dispatch_task(task);
572                }
573                Effect::Thunk(thunk) => {
574                    dispatcher.dispatch_thunk(thunk);
575                }
576                Effect::Function(_tok, func) => {
577                    let dispatcher_clone = dispatcher.clone();
578                    dispatcher.dispatch_task(Box::new(move || {
579                        // Execute the function and try to convert result to Action
580                        match func() {
581                            Ok(result) => {
582                                // Try to downcast the result to Action type first
583                                if let Ok(action) = result.downcast::<Action>() {
584                                    let _ = dispatcher_clone.dispatch(*action);
585                                    return;
586                                }
587                                // Note: Generic conversion from result to Action is not type-safe
588                                // Effect::Function results should be handled by middleware or
589                                // use Effect::Thunk/Effect::Action instead for type safety
590                                #[cfg(feature = "store-log")]
591                                eprintln!("Effect function should be handled by a middleware, did you miss a middleware?");
592                            }
593                            Err(_e) => {
594                                // Error in effect function, ignore
595                                #[cfg(feature = "store-log")]
596                                eprintln!("Effect function error: {:?}", _e);
597                            }
598                        }
599                    }));
600                }
601            };
602        }
603
604        let duration = effect_start.elapsed();
605        self.metrics.effect_executed(effects_total, duration);
606    }
607
608    pub(crate) fn do_notify(
609        &self,
610        action: &Action,
611        next_state: &State,
612        _dispatcher: Arc<StoreImpl<State, Action>>,
613        _action_received_at: Instant,
614    ) {
615        let notify_start = Instant::now();
616        self.metrics.state_notified(Some(next_state));
617
618        #[cfg(feature = "store-log")]
619        eprintln!("store: notify: action: {}", describe_action(action));
620
621        let subscribers = self.subscribers.lock().unwrap().clone();
622        let subscriber_count = subscribers.len();
623
624        // Clone action for metrics
625        let action_for_metrics = action.clone();
626
627        // Notify all subscribers with refs (no clone needed)
628        for subscriber_with_id in subscribers.iter() {
629            subscriber_with_id.on_notify(next_state, action);
630        }
631
632        let duration = notify_start.elapsed();
633        self.metrics.subscriber_notified(Some(&action_for_metrics), subscriber_count, duration);
634    }
635
636    fn do_subscribe(
637        &self,
638        state: State,
639        new_subscribers: impl Iterator<Item = SubscriberWithId<State, Action>>,
640    ) {
641        let mut subscribers = self.subscribers.lock().unwrap();
642
643        // notify new subscribers with the latest state and add to subscribers
644        for subscriber_with_id in new_subscribers {
645            subscriber_with_id.on_subscribe(&state);
646
647            subscribers.push(subscriber_with_id);
648        }
649    }
650
651    #[allow(dead_code)]
652    fn do_remove_subscriber(&self, subscriber_id: u64) {
653        // remove from adding_subscribers
654        let mut adding_subscribers = self.adding_subscribers.lock().unwrap();
655        adding_subscribers.retain(|s| s.id != subscriber_id);
656
657        // remove from subscribers
658        let mut subscribers = self.subscribers.lock().unwrap();
659        subscribers.retain(|s| {
660            let retain = s.id != subscriber_id;
661            if !retain {
662                s.on_unsubscribe();
663            }
664            retain
665        });
666    }
667
668    fn do_state_function(&self) {
669        let state_ref = match self.state.lock() {
670            Ok(state) => state,
671            Err(_e) => {
672                #[cfg(feature = "store-log")]
673                eprintln!("store: Error while locking state: {:?}", _e);
674                return;
675            }
676        };
677        match self.state_functions.lock() {
678            Ok(mut state_functions) => {
679                for state_function in state_functions.drain(..) {
680                    state_function(&state_ref);
681                }
682                //state_functions.clear();
683            }
684            Err(_e) => {
685                #[cfg(feature = "store-log")]
686                eprintln!("store: Error while locking state functions: {:?}", _e);
687            }
688        };
689    }
690
691    fn on_close(&self, action_received_at: Instant) {
692        #[cfg(feature = "store-log")]
693        eprintln!("store: on_close");
694
695        self.metrics.action_executed(None, action_received_at.elapsed());
696    }
697
698    /// close the store
699    ///
700    /// send an exit action to the store and drop the dispatch channel
701    ///
702    /// ## Return
703    /// * Ok(()) : if the store is closed
704    /// * Err(StoreError) : if the store is not closed, this can be happened when the queue is full
705    pub fn close(&self) -> Result<(), StoreError> {
706        // take the ownership and release the lock to avoid deadlock
707        let dispatch_tx = self.dispatch_tx.lock().map(|mut tx| tx.take());
708        match dispatch_tx {
709            Ok(Some(tx)) => {
710                #[cfg(feature = "store-log")]
711                eprintln!("store: close: sending exit to dispatch channel");
712                match tx.send(ActionOp::Exit(Instant::now())) {
713                    Ok(_) => {
714                        #[cfg(feature = "store-log")]
715                        eprintln!("store: close: dispatch channel sent exit");
716                    }
717                    Err(_e) => {
718                        #[cfg(feature = "store-log")]
719                        eprintln!(
720                            "store: close: Error while sending exit to dispatch channel: {:?}",
721                            _e
722                        );
723                        return Err(StoreError::DispatchError(format!(
724                            "Error while sending exit to dispatch channel, try it later: {:?}",
725                            _e
726                        )));
727                    }
728                }
729            }
730            Ok(None) => {
731                #[cfg(feature = "store-log")]
732                eprintln!("store: close: dispatch channel already closed");
733                return Ok(());
734            }
735            Err(_e) => {
736                #[cfg(feature = "store-log")]
737                eprintln!(
738                    "store: close: Error while locking dispatch channel: {:?}",
739                    _e
740                );
741                return Err(StoreError::DispatchError(format!(
742                    "Error while locking dispatch channel: {:?}",
743                    _e
744                )));
745            }
746        }
747
748        #[cfg(feature = "store-log")]
749        eprintln!("store: close: dispatch channel closed");
750        Ok(())
751    }
752
753    /// close the store and wait for the dispatcher to finish
754    ///
755    /// ## Return
756    /// * Ok(()) : if the store is closed
757    /// * Err(StoreError) : if the store is not closed, this can be happened when the queue is full
758    pub fn stop(&self) -> Result<(), StoreError> {
759        self.stop_with_timeout(Duration::from_millis(0))
760    }
761
762    /// close the store and wait for the dispatcher to finish
763    pub fn stop_with_timeout(&self, timeout: Duration) -> Result<(), StoreError> {
764        match self.close() {
765            Ok(_) => {}
766            Err(_e) => {
767                #[cfg(feature = "store-log")]
768                eprintln!("store: Error while closing dispatch channel: {:?}", _e);
769                // fall through
770                //return Err(_e);
771            }
772        }
773
774        // Shutdown the thread pool with timeout
775        // take the ownership and release the lock to avoid deadlock
776        let pool = self.pool.lock().map(|mut pool_guard| pool_guard.take());
777        match pool {
778            Ok(Some(pool)) => {
779                if timeout.is_zero() {
780                    pool.shutdown_join();
781                } else {
782                    pool.shutdown_join_timeout(timeout);
783                }
784                #[cfg(feature = "store-log")]
785                eprintln!("store: pool shutdown completed");
786            }
787            Ok(None) => {
788                #[cfg(feature = "store-log")]
789                eprintln!("store: pool already shutdown");
790            }
791            Err(_e) => {
792                #[cfg(feature = "store-log")]
793                eprintln!("store: Error while locking pool: {:?}", _e);
794                return Err(StoreError::DispatchError(format!(
795                    "Error while shutting down pool: {:?}",
796                    _e
797                )));
798            }
799        }
800
801        #[cfg(feature = "store-log")]
802        eprintln!("store: stopped");
803        Ok(())
804    }
805
806    /// dispatch an action
807    ///
808    /// ### Return
809    /// * Ok(()) : if the action is dispatched
810    /// * Err(StoreError) : if the dispatch channel is closed
811    pub(crate) fn dispatch(&self, action: Action) -> Result<(), StoreError> {
812        let sender = self.dispatch_tx.lock().unwrap();
813        if let Some(tx) = sender.as_ref() {
814            // the number of remaining actions in the channel
815            match tx.send(ActionOp::Action(action)) {
816                Ok(remains) => {
817                    self.metrics.queue_size(remains as usize);
818                    Ok(())
819                }
820                Err(e) => match e {
821                    SenderError::SendError(action_op) => {
822                        let action_desc = describe_action_op(&action_op);
823                        let err = StoreError::DispatchError(format!(
824                            "Error while sending '{}' to dispatch channel",
825                            action_desc
826                        ));
827                        self.metrics.error_occurred(&err);
828                        Err(err)
829                    }
830                    SenderError::ChannelClosed => {
831                        let err =
832                            StoreError::DispatchError("Dispatch channel is closed".to_string());
833                        self.metrics.error_occurred(&err);
834                        Err(err)
835                    }
836                },
837            }
838        } else {
839            let err = StoreError::DispatchError("Dispatch channel is closed".to_string());
840            self.metrics.error_occurred(&err);
841            Err(err)
842        }
843    }
844
845    /// Query the current state with a function.
846    ///
847    /// The function will be executed in the store thread with the current state *moved* into it.
848    /// This is useful for read‑only inspections or aggregations that should observe a consistent snapshot.
849    ///
850    /// ### Parameters
851    /// * `query_fn`: A function that receives the current state by value (`State`)
852    ///
853    /// ### Returns
854    /// * `Ok(())` : if the query is scheduled successfully
855    /// * `Err(StoreError)` : if the store is not available
856    pub fn query_state<F>(&self, query_fn: F) -> Result<(), StoreError>
857    where
858        F: FnOnce(&State) + Send + Sync + 'static,
859    {
860        self.state_functions.lock().unwrap().push(Box::new(query_fn));
861
862        if let Ok(tx) = self.dispatch_tx.lock() {
863            if let Some(tx) = tx.as_ref() {
864                return match tx.send(ActionOp::StateFunction) {
865                    Ok(_) => Ok(()),
866                    Err(e) => Err(StoreError::DispatchError(format!(
867                        "Error while sending state function to dispatch channel: {:?}",
868                        e
869                    ))),
870                };
871            }
872            Err(StoreError::DispatchError(
873                "Dispatch channel is closed".to_string(),
874            ))
875        } else {
876            Err(StoreError::DispatchError(
877                "Dispatch channel is closed".to_string(),
878            ))
879        }
880    }
881
882    // /// Add middleware
883    // pub(crate) fn add_middleware(&self, middleware: Arc<dyn NewMiddlewareFnFactory<State, Action> + Send + Sync>) {
884    //     self.middleware_factories.lock().unwrap().push(middleware);
885    // }
886
887    /// Iterator for the state
888    ///
889    /// it uses a channel to subscribe to the state changes
890    /// the channel is rendezvous(capacity 1), the store will block on the channel until the subscriber consumes the state
891    #[allow(dead_code)]
892    #[doc(hidden)]
893    pub(crate) fn iter(&self) -> Result<impl Iterator<Item = (State, Action)>, StoreError> {
894        self.iter_with(1, BackpressurePolicy::BlockOnFull)
895    }
896
897    /// Iterator for the state
898    ///  
899    /// ### Parameters
900    /// * capacity: the capacity of the channel
901    /// * policy: the backpressure policy
902    #[allow(dead_code)]
903    #[doc(hidden)]
904    pub(crate) fn iter_with(
905        &self,
906        capacity: usize,
907        policy: BackpressurePolicy<(State, Action)>,
908    ) -> Result<impl Iterator<Item = (State, Action)>, StoreError> {
909        let (iter_tx, iter_rx) = BackpressureChannel::<(State, Action)>::pair_with(
910            "store_iter",
911            capacity,
912            policy,
913            Some(self.metrics.clone()),
914        );
915
916        let subscription = self.add_subscriber(Arc::new(StateIteratorSubscriber::new(iter_tx)));
917        Ok(StateIterator::new(iter_rx, subscription?))
918    }
919
920    /// subscribing to store updates in new context
921    /// with default capacity and `BlockOnFull` policy when the channel is full
922    ///
923    /// ## Parameters
924    /// * subscriber: The subscriber to subscribe to the store
925    ///
926    /// ## Return
927    /// * Subscription: Subscription for the store,
928    pub fn subscribed(
929        &self,
930        subscriber: Box<dyn Subscriber<State, Action> + Send + Sync>,
931    ) -> Result<Box<dyn Subscription>, StoreError> {
932        self.subscribed_with(
933            DEFAULT_CAPACITY,
934            BackpressurePolicy::BlockOnFull,
935            subscriber,
936        )
937    }
938
939    /// subscribing to store updates in new context
940    ///
941    /// ### Parameters
942    /// * capacity: Channel buffer capacity
943    /// * policy: Backpressure policy for when down channel(store to subscriber) is full
944    ///
945    /// ### Return
946    /// * Subscription: Subscription for the store,
947    pub fn subscribed_with(
948        &self,
949        capacity: usize,
950        policy: BackpressurePolicy<(Instant, State, Action)>,
951        subscriber: Box<dyn Subscriber<State, Action> + Send + Sync>,
952    ) -> Result<Box<dyn Subscription>, StoreError> {
953        // spsc channel
954        let (tx, rx) = BackpressureChannel::<(Instant, State, Action)>::pair_with(
955            format!("{}-channel", self.name).as_str(),
956            capacity,
957            policy,
958            Some(self.metrics.clone()),
959        );
960
961        // channeled thread
962        let thread_name = format!("{}-channeled-subscriber", self.name);
963        let metrics_clone = self.metrics.clone();
964        let builder = thread::Builder::new().name(thread_name.clone());
965        let handle = match builder.spawn(move || {
966            // subscribe to the store
967            Self::subscribed_loop(thread_name, rx, subscriber, metrics_clone);
968        }) {
969            Ok(h) => h,
970            Err(e) => {
971                #[cfg(feature = "store-log")]
972                eprintln!("store: Error while spawning channel thread: {:?}", e);
973                return Err(StoreError::SubscriptionError(format!(
974                    "Error while spawning channel thread: {:?}",
975                    e
976                )));
977            }
978        };
979
980        // subscribe to the store
981        let channel_subscriber = Arc::new(ChanneledSubscriber::new(handle, tx));
982        let subscription = self.add_subscriber(channel_subscriber);
983
984        // return subscription
985        #[allow(clippy::let_and_return)]
986        subscription
987    }
988
989    fn subscribed_loop(
990        _name: String,
991        rx: ReceiverChannel<(Instant, State, Action)>,
992        subscriber: Box<dyn Subscriber<State, Action>>,
993        metrics: Arc<dyn Metrics>,
994    ) {
995        #[cfg(feature = "store-log")]
996        eprintln!("store: {} channel thread started", _name);
997
998        while let Some(msg) = rx.recv() {
999            match msg {
1000                ActionOp::Action((created_at, state, action)) => {
1001                    let started_at = Instant::now();
1002                    {
1003                        subscriber.on_notify(&state, &action);
1004                    }
1005                    metrics.subscriber_notified(Some(&action), 1, started_at.elapsed());
1006
1007                    // action executed
1008                    metrics.action_executed(Some(&action), created_at.elapsed());
1009                }
1010                ActionOp::AddSubscriber => {
1011                    // AddSubscriber는 채널된 subscriber에서는 처리하지 않음
1012                    // 이는 메인 reducer 스레드에서만 처리됨
1013                    #[cfg(feature = "store-log")]
1014                    eprintln!("store: {} received AddSubscriber (ignored)", _name);
1015                }
1016                ActionOp::StateFunction => {
1017                    #[cfg(feature = "store-log")]
1018                    eprintln!("store: {} received StateFunction (ignored)", _name);
1019                }
1020
1021                ActionOp::Exit(created_at) => {
1022                    metrics.action_executed(None, created_at.elapsed());
1023                    #[cfg(feature = "store-log")]
1024                    eprintln!("store: {} channel thread loop exit", _name);
1025                    break;
1026                }
1027            }
1028        }
1029
1030        #[cfg(feature = "store-log")]
1031        eprintln!("store: {} channel thread done", _name);
1032    }
1033}
1034
1035/// Subscriber implementation that forwards store updates to a channel
1036struct ChanneledSubscriber<T>
1037where
1038    T: Send + Sync + Clone + 'static,
1039{
1040    handle: Mutex<Option<JoinHandle<()>>>,
1041    tx: Mutex<Option<SenderChannel<T>>>,
1042}
1043
1044impl<T> ChanneledSubscriber<T>
1045where
1046    T: Send + Sync + Clone + 'static,
1047{
1048    pub(crate) fn new(handle: JoinHandle<()>, tx: SenderChannel<T>) -> Self {
1049        Self {
1050            handle: Mutex::new(Some(handle)),
1051            tx: Mutex::new(Some(tx)),
1052        }
1053    }
1054
1055    fn clear_resource(&self) {
1056        // drop channel
1057        // take the ownership and release the lock to avoid deadlock
1058        let tx_owned = self.tx.lock().map(|mut tx| tx.take());
1059        match tx_owned {
1060            Ok(Some(tx)) => {
1061                #[cfg(feature = "store-log")]
1062                eprintln!("store: ChanneledSubscriber: clearing resource: sending exit");
1063                let _ = tx.send(ActionOp::Exit(Instant::now()));
1064                drop(tx);
1065            }
1066            Ok(None) => {
1067                #[cfg(feature = "store-log")]
1068                eprintln!("store: ChanneledSubscriber: clearing resource: channel already closed");
1069            }
1070            Err(_e) => {
1071                #[cfg(feature = "store-log")]
1072                eprintln!(
1073                    "store: ChanneledSubscriber: clearing resource: Error while locking channel: {:?}",
1074                    _e
1075                );
1076            }
1077        }
1078
1079        // join the thread
1080        let handled_owned = self.handle.lock().map(|mut handle| handle.take());
1081        match handled_owned {
1082            Ok(Some(h)) => {
1083                #[cfg(feature = "store-log")]
1084                eprintln!("store: ChanneledSubscriber: clearing resource: joining thread");
1085                let _ = h.join();
1086            }
1087            Ok(None) => {
1088                #[cfg(feature = "store-log")]
1089                eprintln!("store: ChanneledSubscriber: clearing resource: thread already joined");
1090            }
1091            Err(_e) => {
1092                #[cfg(feature = "store-log")]
1093                eprintln!(
1094                    "store: ChanneledSubscriber: clearing resource: Error while locking thread handle: {:?}",
1095                    _e
1096                );
1097            }
1098        }
1099    }
1100}
1101
1102impl<State, Action> Subscriber<State, Action> for ChanneledSubscriber<(Instant, State, Action)>
1103where
1104    State: Send + Sync + Clone + 'static,
1105    Action: Send + Sync + Clone + 'static,
1106{
1107    fn on_notify(&self, state: &State, action: &Action) {
1108        match self.tx.lock() {
1109            Ok(tx) => {
1110                // Clone needed for sending through channel
1111                tx.as_ref().map(|tx| {
1112                    tx.send(ActionOp::Action((
1113                        Instant::now(),
1114                        state.clone(),
1115                        action.clone(),
1116                    )))
1117                });
1118            }
1119            Err(_e) => {
1120                #[cfg(feature = "store-log")]
1121                eprintln!("store: Error while locking channel: {:?}", _e);
1122            }
1123        }
1124    }
1125
1126    fn on_unsubscribe(&self) {
1127        self.clear_resource();
1128    }
1129}
1130
1131impl<T> Subscription for ChanneledSubscriber<T>
1132where
1133    T: Send + Sync + Clone + 'static,
1134{
1135    fn unsubscribe(&self) {
1136        self.clear_resource();
1137    }
1138}
1139
1140/// close tx channel when the store is dropped, but not the dispatcher
1141/// if you want to stop the dispatcher, call the stop method
1142impl<State, Action> Drop for StoreImpl<State, Action>
1143where
1144    State: Send + Sync + Clone + 'static,
1145    Action: Send + Sync + Clone + 'static,
1146{
1147    fn drop(&mut self) {
1148        let _ = self.close();
1149
1150        // Shutdown the thread pool with timeout
1151        let _ = self.stop_with_timeout(DEFAULT_STOP_TIMEOUT);
1152        // if let Ok(mut lk) = self.pool.lock() {
1153        //     if let Some(pool) = lk.take() {
1154        //         pool.shutdown_join_timeout(Duration::from_secs(3));
1155        //     }
1156        // }
1157
1158        #[cfg(feature = "store-log")]
1159        eprintln!("store: '{}' dropped", self.name);
1160    }
1161}
1162
1163impl<State, Action> Store<State, Action> for StoreImpl<State, Action>
1164where
1165    State: Send + Sync + Clone + 'static,
1166    Action: Send + Sync + Clone + std::fmt::Debug + 'static,
1167{
1168    fn get_state(&self) -> State {
1169        self.get_state()
1170    }
1171
1172    fn dispatch(&self, action: Action) -> Result<(), StoreError> {
1173        self.dispatch(action)
1174    }
1175
1176    fn add_subscriber(
1177        &self,
1178        subscriber: Arc<dyn Subscriber<State, Action> + Send + Sync>,
1179    ) -> Result<Box<dyn Subscription>, StoreError> {
1180        self.add_subscriber(subscriber)
1181    }
1182
1183    fn subscribed(
1184        &self,
1185        subscriber: Box<dyn Subscriber<State, Action> + Send + Sync>,
1186    ) -> Result<Box<dyn Subscription>, StoreError> {
1187        self.subscribed(subscriber)
1188    }
1189
1190    fn subscribed_with(
1191        &self,
1192        capacity: usize,
1193        policy: BackpressurePolicy<(Instant, State, Action)>,
1194        subscriber: Box<dyn Subscriber<State, Action> + Send + Sync>,
1195    ) -> Result<Box<dyn Subscription>, StoreError> {
1196        self.subscribed_with(capacity, policy, subscriber)
1197    }
1198
1199    fn stop(&self) -> Result<(), StoreError> {
1200        self.stop()
1201    }
1202
1203    fn stop_timeout(&self, timeout: Duration) -> Result<(), StoreError> {
1204        self.stop_with_timeout(timeout)
1205    }
1206}
1207
1208#[cfg(test)]
1209mod tests {
1210    use super::*;
1211    use crate::{BackpressurePolicy, Dispatcher, Effect, FnReducer, Reducer, StoreBuilder};
1212    use std::sync::Arc;
1213    use std::thread;
1214    use std::time::Duration;
1215
1216    struct TestChannelSubscriber {
1217        received: Arc<Mutex<Vec<(i32, i32)>>>,
1218    }
1219
1220    impl TestChannelSubscriber {
1221        fn new(received: Arc<Mutex<Vec<(i32, i32)>>>) -> Self {
1222            Self { received }
1223        }
1224    }
1225
1226    impl Subscriber<i32, i32> for TestChannelSubscriber {
1227        fn on_notify(&self, state: &i32, action: &i32) {
1228            //println!("TestChannelSubscriber: state={}, action={}", state, action);
1229            self.received.lock().unwrap().push((*state, *action));
1230        }
1231    }
1232
1233    struct TestReducer;
1234
1235    impl Reducer<i32, i32> for TestReducer {
1236        fn reduce(&self, state: &i32, action: &i32) -> DispatchOp<i32, i32> {
1237            DispatchOp::Dispatch(state + action, vec![])
1238        }
1239    }
1240
1241    struct SlowSubscriber {
1242        received: Arc<Mutex<Vec<(i32, i32)>>>,
1243        delay: Duration,
1244    }
1245
1246    impl SlowSubscriber {
1247        fn new(received: Arc<Mutex<Vec<(i32, i32)>>>, delay: Duration) -> Self {
1248            Self { received, delay }
1249        }
1250    }
1251
1252    impl Subscriber<i32, i32> for SlowSubscriber {
1253        fn on_notify(&self, state: &i32, action: &i32) {
1254            //println!("SlowSubscriber: state={}, action={}", state, action);
1255            std::thread::sleep(self.delay);
1256            self.received.lock().unwrap().push((*state, *action));
1257        }
1258    }
1259
1260    #[test]
1261    fn test_store_subscribed_basic() {
1262        // Setup store with a simple counter
1263        let initial_state = 0;
1264        let reducer = Box::new(TestReducer);
1265        let store_result = StoreImpl::new_with_reducer(initial_state, reducer);
1266        assert!(store_result.is_ok());
1267        let store = store_result.unwrap();
1268
1269        // Create subscriber to receive updates
1270        let received_states = Arc::new(Mutex::new(Vec::new()));
1271        let subscriber1 = Box::new(TestChannelSubscriber::new(received_states.clone()));
1272        // Create channel
1273        let subscription =
1274            store.subscribed_with(10, BackpressurePolicy::DropOldestIf(None), subscriber1);
1275
1276        // Dispatch some actions
1277        store.dispatch(1).unwrap();
1278        store.dispatch(2).unwrap();
1279
1280        // Give some time for processing
1281        // thread::sleep(Duration::from_millis(100));
1282        match store.stop() {
1283            Ok(_) => println!("store stopped"),
1284            Err(e) => {
1285                panic!("store stop failed  : {:?}", e);
1286            }
1287        }
1288
1289        // unsubscribe from the channel
1290        subscription.unwrap().unsubscribe();
1291
1292        // Verify received updates
1293        let states = received_states.lock().unwrap();
1294        assert_eq!(states.len(), 2);
1295        assert_eq!(states[0], (1, 1)); // (state, action)
1296        assert_eq!(states[1], (3, 2)); // state=1+2, action=2
1297    }
1298
1299    #[test]
1300    fn test_store_subscribed_backpressure() {
1301        let store_result = StoreImpl::new_with_reducer(0, Box::new(TestReducer));
1302        assert!(store_result.is_ok());
1303        let store = store_result.unwrap();
1304
1305        let received = Arc::new(Mutex::new(Vec::new()));
1306        let received_clone = received.clone();
1307        let subscriber = Box::new(SlowSubscriber::new(
1308            received_clone,
1309            Duration::from_millis(100),
1310        ));
1311        // Create channel with small capacity
1312        let subscription =
1313            store.subscribed_with(1, BackpressurePolicy::DropOldestIf(None), subscriber);
1314
1315        // Fill the channel
1316        for i in 0..5 {
1317            let _ = store.dispatch(i).unwrap();
1318        }
1319
1320        // Give some time for having channel thread to process
1321        thread::sleep(Duration::from_millis(200));
1322        match store.stop() {
1323            Ok(_) => println!("store stopped"),
1324            Err(e) => {
1325                panic!("store stop failed  : {:?}", e);
1326            }
1327        }
1328        subscription.unwrap().unsubscribe();
1329
1330        // Should only receive the latest updates due to backpressure
1331        let received = received.lock().unwrap();
1332        assert!(received.len() <= 2); // Some messages should be dropped
1333
1334        if let Some((state, action)) = received.last() {
1335            assert_eq!(*action, 4); // Last action should be received
1336            assert!(*state <= 10); // Final state should be sum of 0..5
1337        }
1338    }
1339
1340    #[test]
1341    fn test_store_subscribed_subscription() {
1342        let store = StoreImpl::new_with_reducer(0, Box::new(TestReducer)).unwrap();
1343
1344        let received = Arc::new(Mutex::new(Vec::new()));
1345        let subscriber1 = Box::new(TestChannelSubscriber::new(received.clone()));
1346        let subscription =
1347            store.subscribed_with(10, BackpressurePolicy::DropOldestIf(None), subscriber1);
1348
1349        // Dispatch some actions
1350        store.dispatch(1).unwrap();
1351
1352        // give some time for processing
1353        thread::sleep(Duration::from_millis(100));
1354        // subscriber should receive the state
1355        assert_eq!(received.lock().unwrap().len(), 1);
1356
1357        // unsubscribe
1358        subscription.unwrap().unsubscribe();
1359
1360        // dispatch more actions
1361        store.dispatch(2).unwrap();
1362        store.dispatch(3).unwrap();
1363        // give some time for processing
1364        match store.stop() {
1365            Ok(_) => println!("store stopped"),
1366            Err(e) => {
1367                panic!("store stop failed  : {:?}", e);
1368            }
1369        }
1370        // subscriber should not receive the state
1371        assert_eq!(received.lock().unwrap().len(), 1);
1372    }
1373
1374    // 새로운 subscriber가 추가될 때 최신 상태를 받는지 테스트
1375    #[test]
1376    fn test_new_subscriber_receives_latest_state() {
1377        let store = StoreImpl::new_with_reducer(0, Box::new(TestReducer)).unwrap();
1378
1379        // 첫 번째 subscriber 추가
1380        let received1 = Arc::new(Mutex::new(Vec::new()));
1381        let subscriber1 = Arc::new(TestChannelSubscriber::new(received1.clone()));
1382        store.add_subscriber(subscriber1).unwrap();
1383
1384        // 액션을 dispatch하여 상태 변경
1385        store.dispatch(5).unwrap();
1386        store.dispatch(10).unwrap();
1387
1388        // 잠시 대기하여 액션이 처리되도록 함
1389        thread::sleep(Duration::from_millis(100));
1390
1391        // 두 번째 subscriber 추가 (현재 상태는 15)
1392        let received2 = Arc::new(Mutex::new(Vec::new()));
1393        let subscriber2 = Arc::new(TestChannelSubscriber::new(received2.clone()));
1394        store.add_subscriber(subscriber2).unwrap();
1395
1396        // 잠시 대기하여 AddSubscriber 액션이 처리되도록 함
1397        thread::sleep(Duration::from_millis(100));
1398
1399        // 새로운 액션을 dispatch
1400        store.dispatch(20).unwrap();
1401
1402        // 잠시 대기하여 액션이 처리되도록 함
1403        thread::sleep(Duration::from_millis(100));
1404
1405        // 첫 번째 subscriber는 모든 상태 변경을 받아야 함
1406        let received1 = received1.lock().unwrap();
1407        assert_eq!(received1.len(), 3);
1408        assert_eq!(received1[0], (5, 5));
1409        assert_eq!(received1[1], (15, 10));
1410        assert_eq!(received1[2], (35, 20));
1411
1412        // 두 번째 subscriber는 추가된 후의 상태 변경만 받아야 함
1413        let received2 = received2.lock().unwrap();
1414        assert_eq!(received2.len(), 1);
1415        assert_eq!(received2[0], (35, 20));
1416    }
1417
1418    // 새로운 subscriber가 추가될 때 on_subscribe가 호출되는지 테스트
1419    #[test]
1420    fn test_new_subscriber_on_subscribe_called() {
1421        let store = StoreImpl::new_with_reducer(0, Box::new(TestReducer)).unwrap();
1422
1423        // 액션을 dispatch하여 상태 변경
1424        store.dispatch(5).unwrap();
1425
1426        // on_subscribe를 구현한 subscriber 추가
1427        let received_states = Arc::new(Mutex::new(Vec::new()));
1428        let subscribe_called = Arc::new(Mutex::new(false));
1429
1430        struct TestSubscribeSubscriber {
1431            received_states: Arc<Mutex<Vec<i32>>>,
1432            subscribe_called: Arc<Mutex<bool>>,
1433        }
1434
1435        impl Subscriber<i32, i32> for TestSubscribeSubscriber {
1436            fn on_subscribe(&self, state: &i32) {
1437                self.received_states.lock().unwrap().push(*state);
1438                *self.subscribe_called.lock().unwrap() = true;
1439            }
1440
1441            fn on_notify(&self, state: &i32, _action: &i32) {
1442                self.received_states.lock().unwrap().push(*state);
1443            }
1444        }
1445
1446        let subscriber = Arc::new(TestSubscribeSubscriber {
1447            received_states: received_states.clone(),
1448            subscribe_called: subscribe_called.clone(),
1449        });
1450
1451        store.add_subscriber(subscriber).unwrap();
1452
1453        // 잠시 대기하여 AddSubscriber 액션이 처리되도록 함
1454        thread::sleep(Duration::from_millis(100));
1455
1456        // on_subscribe가 호출되었는지 확인
1457        assert!(*subscribe_called.lock().unwrap());
1458
1459        // 최신 상태(5)를 받았는지 확인
1460        let states = received_states.lock().unwrap();
1461        assert_eq!(states.len(), 1);
1462        assert_eq!(states[0], 5);
1463    }
1464
1465    // 여러 subscriber가 동시에 추가될 때 테스트
1466    #[test]
1467    fn test_multiple_subscribers_added_simultaneously() {
1468        let store = StoreImpl::new_with_reducer(0, Box::new(TestReducer)).unwrap();
1469
1470        // 액션을 dispatch하여 상태 변경
1471        store.dispatch(10).unwrap();
1472        store.dispatch(20).unwrap();
1473
1474        // 잠시 대기하여 액션이 처리되도록 함
1475        thread::sleep(Duration::from_millis(100));
1476
1477        // 여러 subscriber를 동시에 추가
1478        let subscribers = vec![
1479            Arc::new(TestChannelSubscriber::new(Arc::new(Mutex::new(Vec::new())))),
1480            Arc::new(TestChannelSubscriber::new(Arc::new(Mutex::new(Vec::new())))),
1481            Arc::new(TestChannelSubscriber::new(Arc::new(Mutex::new(Vec::new())))),
1482        ];
1483
1484        for subscriber in &subscribers {
1485            store.add_subscriber(subscriber.clone()).unwrap();
1486        }
1487
1488        // 잠시 대기하여 AddSubscriber 액션이 처리되도록 함
1489        thread::sleep(Duration::from_millis(100));
1490
1491        // 새로운 액션을 dispatch
1492        store.dispatch(30).unwrap();
1493
1494        // 잠시 대기하여 액션이 처리되도록 함
1495        thread::sleep(Duration::from_millis(100));
1496
1497        // 모든 subscriber가 새로운 액션을 받았는지 확인
1498        for subscriber in &subscribers {
1499            let received = subscriber.received.lock().unwrap();
1500            assert_eq!(received.len(), 1);
1501            assert_eq!(received[0], (60, 30)); // state: 30+30, action: 30
1502        }
1503    }
1504
1505    // subscriber 추가 후 즉시 unsubscribe하는 테스트
1506    #[test]
1507    fn test_subscriber_unsubscribe_after_add() {
1508        let store = StoreImpl::new_with_reducer(0, Box::new(TestReducer)).unwrap();
1509
1510        // 액션을 dispatch하여 상태 변경
1511        store.dispatch(5).unwrap();
1512
1513        // subscriber 추가
1514        let received = Arc::new(Mutex::new(Vec::new()));
1515        let subscriber = Arc::new(TestChannelSubscriber::new(received.clone()));
1516        let subscription = store.add_subscriber(subscriber).unwrap();
1517
1518        // 잠시 대기하여 AddSubscriber 액션이 처리되도록 함
1519        thread::sleep(Duration::from_millis(100));
1520
1521        // 즉시 unsubscribe
1522        subscription.unsubscribe();
1523
1524        // 새로운 액션을 dispatch
1525        store.dispatch(10).unwrap();
1526
1527        // 잠시 대기하여 액션이 처리되도록 함
1528        thread::sleep(Duration::from_millis(100));
1529
1530        // subscriber가 새로운 액션을 받지 않았는지 확인
1531        let received = received.lock().unwrap();
1532        assert_eq!(received.len(), 0);
1533    }
1534
1535    // store가 중지된 후 subscriber를 추가하는 테스트
1536    #[test]
1537    fn test_add_subscriber_after_store_stop() {
1538        let store = StoreImpl::new_with_reducer(0, Box::new(TestReducer)).unwrap();
1539
1540        // store 중지
1541        match store.stop() {
1542            Ok(_) => println!("store stopped"),
1543            Err(e) => {
1544                panic!("store stop failed  : {:?}", e);
1545            }
1546        }
1547
1548        // subscriber 추가 시도
1549        let received = Arc::new(Mutex::new(Vec::new()));
1550        let subscriber = Arc::new(TestChannelSubscriber::new(received.clone()));
1551        let subscription = store.add_subscriber(subscriber);
1552        assert!(subscription.is_err());
1553    }
1554
1555    // on_subscribe에서 상태를 수정하는 subscriber 테스트
1556    #[test]
1557    fn test_subscriber_modifies_state_in_on_subscribe() {
1558        let store = StoreImpl::new_with_reducer(0, Box::new(TestReducer)).unwrap();
1559
1560        // 액션을 dispatch하여 상태 변경
1561        store.dispatch(5).unwrap();
1562
1563        struct ModifyingSubscriber {
1564            received_states: Arc<Mutex<Vec<i32>>>,
1565            subscribe_called: Arc<Mutex<bool>>,
1566        }
1567
1568        impl Subscriber<i32, i32> for ModifyingSubscriber {
1569            fn on_subscribe(&self, state: &i32) {
1570                // on_subscribe에서 상태를 수정해도 store의 상태는 변경되지 않음
1571                self.received_states.lock().unwrap().push(*state);
1572                *self.subscribe_called.lock().unwrap() = true;
1573            }
1574
1575            fn on_notify(&self, state: &i32, _action: &i32) {
1576                self.received_states.lock().unwrap().push(*state);
1577            }
1578        }
1579
1580        let subscriber = Arc::new(ModifyingSubscriber {
1581            received_states: Arc::new(Mutex::new(Vec::new())),
1582            subscribe_called: Arc::new(Mutex::new(false)),
1583        });
1584
1585        store.add_subscriber(subscriber.clone()).unwrap();
1586
1587        // 잠시 대기하여 AddSubscriber 액션이 처리되도록 함
1588        thread::sleep(Duration::from_millis(100));
1589
1590        // on_subscribe가 호출되었는지 확인
1591        assert!(*subscriber.subscribe_called.lock().unwrap());
1592
1593        // 최신 상태(5)를 받았는지 확인
1594        let states = subscriber.received_states.lock().unwrap();
1595        assert_eq!(states.len(), 1);
1596        assert_eq!(states[0], 5);
1597
1598        // store의 상태가 변경되지 않았는지 확인
1599        assert_eq!(store.get_state(), 5);
1600    }
1601
1602    // 여러 번의 AddSubscriber 액션이 연속으로 발생하는 테스트
1603    #[test]
1604    fn test_consecutive_add_subscriber_actions() {
1605        let store = StoreImpl::new_with_reducer(0, Box::new(TestReducer)).unwrap();
1606
1607        // 첫 번째 subscriber 추가
1608        let received1 = Arc::new(Mutex::new(Vec::new()));
1609        let subscriber1 = Arc::new(TestChannelSubscriber::new(received1.clone()));
1610        store.add_subscriber(subscriber1).unwrap();
1611
1612        // 잠시 대기
1613        thread::sleep(Duration::from_millis(50));
1614
1615        // 두 번째 subscriber 추가
1616        let received2 = Arc::new(Mutex::new(Vec::new()));
1617        let subscriber2 = Arc::new(TestChannelSubscriber::new(received2.clone()));
1618        store.add_subscriber(subscriber2).unwrap();
1619
1620        // 잠시 대기
1621        thread::sleep(Duration::from_millis(50));
1622
1623        // 세 번째 subscriber 추가
1624        let received3 = Arc::new(Mutex::new(Vec::new()));
1625        let subscriber3 = Arc::new(TestChannelSubscriber::new(received3.clone()));
1626        store.add_subscriber(subscriber3).unwrap();
1627
1628        // 잠시 대기하여 모든 AddSubscriber 액션이 처리되도록 함
1629        thread::sleep(Duration::from_millis(100));
1630
1631        // 새로운 액션을 dispatch
1632        store.dispatch(10).unwrap();
1633
1634        // 잠시 대기하여 액션이 처리되도록 함
1635        thread::sleep(Duration::from_millis(100));
1636
1637        // 모든 subscriber가 새로운 액션을 받았는지 확인
1638        assert_eq!(received1.lock().unwrap().len(), 1);
1639        assert_eq!(received2.lock().unwrap().len(), 1);
1640        assert_eq!(received3.lock().unwrap().len(), 1);
1641    }
1642
1643    /// Test basic iterator functionality
1644    #[test]
1645    fn test_store_iter_basic() {
1646        // given: store with reducer
1647        // let store = StoreBuilder::new(0)
1648        //     .with_reducer(Box::new(FnReducer::from(|state: &i32, action: &i32| {
1649        //         DispatchOp::Dispatch(state + action, None)
1650        //     })))
1651        //     .build()
1652        //     .unwrap();
1653        let store = StoreImpl::new_with_reducer(0, Box::new(TestReducer)).unwrap();
1654
1655        // when: create iterator and dispatch actions
1656        let mut iter = store.iter().unwrap();
1657
1658        // dispatch actions
1659        store.dispatch(10).expect("dispatch should succeed");
1660        store.dispatch(20).expect("dispatch should succeed");
1661        store.dispatch(30).expect("dispatch should succeed");
1662
1663        // then: iterator should return state and action pairs
1664        assert_eq!(iter.next(), Some((10, 10))); // state: 0+10=10, action: 10
1665        assert_eq!(iter.next(), Some((30, 20))); // state: 10+20=30, action: 20
1666        assert_eq!(iter.next(), Some((60, 30))); // state: 30+30=60, action: 30
1667
1668        // stop store and verify iterator ends
1669        store.stop().expect("store should stop");
1670        assert_eq!(iter.next(), None);
1671    }
1672
1673    /// Test iterator with no actions dispatched
1674    #[test]
1675    fn test_store_iter_no_actions() {
1676        // given: store with reducer
1677        let store = StoreImpl::new_with_reducer(0, Box::new(TestReducer)).unwrap();
1678
1679        // when: create iterator without dispatching actions
1680        let mut iter = store.iter().unwrap();
1681
1682        // then: iterator should return None immediately
1683        // since no actions were dispatched, no state changes occurred
1684        store.stop().expect("store should stop");
1685        assert_eq!(iter.next(), None);
1686    }
1687
1688    /// Test iterator with complex state and action types
1689    #[test]
1690    fn test_store_iter_complex_types() {
1691        // given: store with complex state and action
1692        #[derive(Debug, Clone, PartialEq)]
1693        struct ComplexState {
1694            value: i32,
1695            name: String,
1696        }
1697
1698        #[allow(dead_code)]
1699        #[derive(Debug, Clone, PartialEq)]
1700        enum ComplexAction {
1701            Add(i32),
1702            SetName(String),
1703            Reset,
1704        }
1705
1706        let store = StoreImpl::new_with_reducer(
1707            ComplexState {
1708                value: 0,
1709                name: "initial".to_string(),
1710            },
1711            Box::new(FnReducer::from(
1712                |state: &ComplexState, action: &ComplexAction| match action {
1713                    ComplexAction::Add(n) => DispatchOp::Dispatch(
1714                        ComplexState {
1715                            value: state.value + n,
1716                            name: state.name.clone(),
1717                        },
1718                        vec![],
1719                    ),
1720                    ComplexAction::SetName(name) => DispatchOp::Dispatch(
1721                        ComplexState {
1722                            value: state.value,
1723                            name: name.clone(),
1724                        },
1725                        vec![],
1726                    ),
1727                    ComplexAction::Reset => DispatchOp::Dispatch(
1728                        ComplexState {
1729                            value: 0,
1730                            name: "reset".to_string(),
1731                        },
1732                        vec![],
1733                    ),
1734                },
1735            )),
1736        )
1737        .unwrap();
1738
1739        // when: create iterator and dispatch actions
1740        let mut iter = store.iter().unwrap();
1741
1742        store.dispatch(ComplexAction::Add(10)).expect("dispatch should succeed");
1743        store
1744            .dispatch(ComplexAction::SetName("test".to_string()))
1745            .expect("dispatch should succeed");
1746        store.dispatch(ComplexAction::Add(5)).expect("dispatch should succeed");
1747
1748        // then: iterator should return correct state and action pairs
1749        assert_eq!(
1750            iter.next(),
1751            Some((
1752                ComplexState {
1753                    value: 10,
1754                    name: "initial".to_string(),
1755                },
1756                ComplexAction::Add(10)
1757            ))
1758        );
1759        assert_eq!(
1760            iter.next(),
1761            Some((
1762                ComplexState {
1763                    value: 10,
1764                    name: "test".to_string(),
1765                },
1766                ComplexAction::SetName("test".to_string())
1767            ))
1768        );
1769        assert_eq!(
1770            iter.next(),
1771            Some((
1772                ComplexState {
1773                    value: 15,
1774                    name: "test".to_string(),
1775                },
1776                ComplexAction::Add(5)
1777            ))
1778        );
1779
1780        store.stop().expect("store should stop");
1781        assert_eq!(iter.next(), None);
1782    }
1783
1784    /// Test iterator with multiple concurrent actions
1785    #[test]
1786    fn test_store_iter_concurrent_actions() {
1787        // given: store with reducer
1788        let store = StoreImpl::new_with_reducer(0, Box::new(TestReducer)).unwrap();
1789
1790        // when: create iterator and dispatch many actions quickly
1791        let mut iter = store.iter().unwrap();
1792
1793        // dispatch multiple actions
1794        for i in 1..=10 {
1795            store.dispatch(i).expect("dispatch should succeed");
1796        }
1797
1798        // then: iterator should return all state and action pairs in order
1799        let mut expected_state = 0;
1800        for i in 1..=10 {
1801            expected_state += i;
1802            assert_eq!(iter.next(), Some((expected_state, i)));
1803        }
1804
1805        store.stop().expect("store should stop");
1806        assert_eq!(iter.next(), None);
1807    }
1808
1809    /// Test iterator with store that has middleware
1810    #[test]
1811    fn test_store_iter_with_middleware() {
1812        // given: store with middleware
1813        struct TestMiddleware;
1814
1815        impl<State, Action> MiddlewareFnFactory<State, Action> for TestMiddleware
1816        where
1817            State: Send + Sync + Clone + 'static,
1818            Action: Send + Sync + Clone + std::fmt::Debug + 'static,
1819        {
1820            fn create(&self, inner: MiddlewareFn<State, Action>) -> MiddlewareFn<State, Action> {
1821                inner
1822            }
1823        }
1824
1825        let store = StoreImpl::new_with(
1826            0,
1827            vec![Box::new(FnReducer::from(|state: &i32, action: &i32| {
1828                DispatchOp::Dispatch(state + action, vec![])
1829            }))],
1830            "test".to_string(),
1831            1,
1832            BackpressurePolicy::default(),
1833            vec![Arc::new(TestMiddleware)],
1834        )
1835        .unwrap();
1836
1837        // when: create iterator and dispatch actions
1838        let mut iter = store.iter().unwrap();
1839
1840        store.dispatch(5).expect("dispatch should succeed");
1841        store.dispatch(10).expect("dispatch should succeed");
1842
1843        // then: iterator should work correctly with middleware
1844        assert_eq!(iter.next(), Some((5, 5)));
1845        assert_eq!(iter.next(), Some((15, 10)));
1846
1847        store.stop().expect("store should stop");
1848        assert_eq!(iter.next(), None);
1849    }
1850
1851    /// Test iterator with store that has effects
1852    #[test]
1853    fn test_store_iter_with_effects() {
1854        // given: store with reducer that produces effects
1855        let store = StoreImpl::new_with_reducer(
1856            0,
1857            Box::new(FnReducer::from(|state: &i32, action: &i32| {
1858                let new_state = state + action;
1859                let mut effects_vec = vec![];
1860                if *action > 5 {
1861                    effects_vec.push(Effect::Task(Box::new(|| {
1862                        // effect that does nothing
1863                    })));
1864                }
1865                DispatchOp::Dispatch(new_state, effects_vec)
1866            })),
1867        )
1868        .unwrap();
1869
1870        // when: create iterator and dispatch actions
1871        let mut iter = store.iter().unwrap();
1872
1873        store.dispatch(3).expect("dispatch should succeed"); // no effect
1874        store.dispatch(10).expect("dispatch should succeed"); // with effect
1875
1876        // then: iterator should work correctly with effects
1877        assert_eq!(iter.next(), Some((3, 3)));
1878        assert_eq!(iter.next(), Some((13, 10)));
1879
1880        store.stop().expect("store should stop");
1881        assert_eq!(iter.next(), None);
1882    }
1883
1884    /// Test iterator with store that has multiple reducers
1885    #[test]
1886    fn test_store_iter_with_multiple_reducers() {
1887        // given: store with multiple reducers
1888        // StoreBuilder의 with_reducer는 기존 리듀서를 대체하므로
1889        // 실제로는 마지막 리듀서만 사용됩니다
1890        let store = StoreBuilder::new(0)
1891            .with_reducer(Box::new(FnReducer::from(|state: &i32, action: &i32| {
1892                DispatchOp::Dispatch(state + action, vec![])
1893            })))
1894            .with_reducer(Box::new(FnReducer::from(|state: &i32, _action: &i32| {
1895                DispatchOp::Dispatch(state * 2, vec![])
1896            })))
1897            .build()
1898            .unwrap();
1899
1900        // when: create iterator and dispatch actions
1901        // let mut iter = store.iter().unwrap();
1902
1903        store.dispatch(5).expect("dispatch should succeed");
1904        store.dispatch(10).expect("dispatch should succeed");
1905
1906        // // then: iterator should work with multiple reducers
1907        // // 실제로는 마지막 리듀서만 사용되므로: 0 * 2 = 0, 0 * 2 = 0
1908        // let first_result = iter.next();
1909        // println!("First result: {:?}", first_result);
1910        // let second_result = iter.next();
1911        // println!("Second result: {:?}", second_result);
1912
1913        // // 마지막 리듀서만 사용되므로 상태는 항상 0
1914        // assert_eq!(first_result, Some((0, 5)));
1915        // assert_eq!(second_result, Some((0, 10)));
1916
1917        store.stop().expect("store should stop");
1918        // assert_eq!(iter.next(), None);
1919    }
1920
1921    /// Test iterator behavior when store is stopped before consuming all items
1922    #[test]
1923    fn test_store_iter_early_stop() {
1924        // given: store with reducer
1925        let store = StoreImpl::new_with_reducer(0, Box::new(TestReducer)).unwrap();
1926
1927        // when: create iterator, dispatch actions, but stop store early
1928        let mut iter = store.iter().unwrap();
1929
1930        store.dispatch(5).expect("dispatch should succeed");
1931        store.dispatch(10).expect("dispatch should succeed");
1932        store.dispatch(15).expect("dispatch should succeed");
1933
1934        // consume all items before stopping store
1935        assert_eq!(iter.next(), Some((5, 5)));
1936        assert_eq!(iter.next(), Some((15, 10))); // 5 + 10 = 15
1937        assert_eq!(iter.next(), Some((30, 15))); // 15 + 15 = 30
1938
1939        // stop store after consuming all items
1940        store.stop().expect("store should stop");
1941    }
1942
1943    /// Test iterator with different backpressure policies
1944    #[test]
1945    fn test_store_iter_with_block_on_full() {
1946        // given: store with different backpressure policies
1947        let store = StoreImpl::new_with(
1948            0,
1949            vec![Box::new(FnReducer::from(|state: &i32, action: &i32| {
1950                DispatchOp::Dispatch(state + action, vec![])
1951            }))],
1952            "test".to_string(),
1953            2,
1954            BackpressurePolicy::BlockOnFull,
1955            vec![],
1956        )
1957        .unwrap();
1958
1959        // when: create iterator with different capacity and policy
1960        let mut iter = store.iter_with(1, BackpressurePolicy::BlockOnFull).unwrap();
1961
1962        store.dispatch(5).expect("dispatch should succeed");
1963        store.dispatch(10).expect("dispatch should succeed");
1964
1965        // then: iterator should work with custom capacity and policy
1966        assert_eq!(iter.next(), Some((5, 5)));
1967        assert_eq!(iter.next(), Some((15, 10)));
1968
1969        store.stop().expect("store should stop");
1970    }
1971
1972    // #[test]
1973    // fn test_store_iter_with_different_policies() {
1974    //     // given: store with different backpressure policies
1975    //     let store = StoreBuilder::new(0)
1976    //         .with_reducer(Box::new(FnReducer::from(|state: &i32, action: &i32| {
1977    //             DispatchOp::Dispatch(state + action, None)
1978    //         })))
1979    //         .with_capacity(2)
1980    //         .with_policy(BackpressurePolicy::DropOldestIf(None))
1981    //         .build()
1982    //         .unwrap();
1983    //
1984    //     // when: create iterator with different capacity and policy
1985    //     let mut iter = store.iter_with(1, BackpressurePolicy::BlockOnFull);
1986    //
1987    //     store.dispatch(5).expect("dispat
1988    // ch should succeed");
1989    //     store.dispatch(10).expect("dispatch should succeed");
1990    //
1991    //     // then: iterator should work with custom capacity and policy
1992    //     assert_eq!(iter.next(), Some((5, 5)));
1993    //     assert_eq!(iter.next(), Some((15, 10)));
1994    //
1995    //     store.stop().expect("store should stop");
1996    // }
1997
1998    /// Test iterator with string state and action
1999    #[test]
2000    fn test_store_iter_string_types() {
2001        // given: store with string state and action
2002        let store = StoreImpl::new_with_reducer(
2003            "".to_string(),
2004            Box::new(FnReducer::from(|state: &String, action: &String| {
2005                let new_state = format!("{}{}", state, action);
2006                DispatchOp::Dispatch(new_state, vec![])
2007            })),
2008        )
2009        .unwrap();
2010
2011        // when: create iterator and dispatch string actions
2012        let mut iter = store.iter().unwrap();
2013
2014        store.dispatch("hello".to_string()).expect("dispatch should succeed");
2015        store.dispatch(" world".to_string()).expect("dispatch should succeed");
2016
2017        // then: iterator should work with string types
2018        assert_eq!(
2019            iter.next(),
2020            Some(("hello".to_string(), "hello".to_string()))
2021        );
2022        assert_eq!(
2023            iter.next(),
2024            Some(("hello world".to_string(), " world".to_string()))
2025        );
2026
2027        store.stop().expect("store should stop");
2028        assert_eq!(iter.next(), None);
2029    }
2030
2031    /// Test iterator with empty state changes (reducer returns same state)
2032    #[test]
2033    fn test_store_iter_no_state_change() {
2034        // given: store with reducer that doesn't change state
2035        let store = StoreImpl::new_with_reducer(
2036            0,
2037            Box::new(FnReducer::from(|state: &i32, _action: &i32| {
2038                DispatchOp::Dispatch(state.clone(), vec![]) // return same state
2039            })),
2040        )
2041        .unwrap();
2042        // when: create iterator and dispatch actions
2043        let mut iter = store.iter().unwrap();
2044
2045        store.dispatch(5).expect("dispatch should succeed");
2046        store.dispatch(10).expect("dispatch should succeed");
2047
2048        // then: iterator should still return state and action pairs
2049        assert_eq!(iter.next(), Some((0, 5))); // state remains 0
2050        assert_eq!(iter.next(), Some((0, 10))); // state remains 0
2051
2052        store.stop().expect("store should stop");
2053        assert_eq!(iter.next(), None);
2054    }
2055
2056    /// Test query_state functionality
2057    #[test]
2058    fn test_query_state_basic() {
2059        // given: store with a simple counter
2060        let store = StoreImpl::new_with_reducer(0, Box::new(TestReducer)).unwrap();
2061
2062        // dispatch some actions to change state
2063        store.dispatch(5).unwrap();
2064        store.dispatch(10).unwrap();
2065
2066        // wait for actions to be processed
2067        std::thread::sleep(std::time::Duration::from_millis(100));
2068
2069        // when: query the current state
2070        let queried_value = std::sync::Arc::new(std::sync::Mutex::new(0));
2071        let queried_value_clone = queried_value.clone();
2072        store
2073            .query_state(move |state| {
2074                *queried_value_clone.lock().unwrap() = *state;
2075            })
2076            .unwrap();
2077
2078        let _ = store.stop();
2079        // then: should get the current state (0 + 5 + 10 = 15)
2080        assert_eq!(*queried_value.lock().unwrap(), 15);
2081    }
2082
2083    /// Test query_state with complex state
2084    #[test]
2085    fn test_query_state_complex() {
2086        // given: store with complex state
2087        #[derive(Debug, Clone, PartialEq)]
2088        struct ComplexState {
2089            value: i32,
2090            name: String,
2091        }
2092
2093        #[derive(Debug, Clone, PartialEq)]
2094        enum ComplexAction {
2095            Add(i32),
2096            SetName(String),
2097        }
2098
2099        let store = StoreImpl::new_with_reducer(
2100            ComplexState {
2101                value: 0,
2102                name: "initial".to_string(),
2103            },
2104            Box::new(FnReducer::from(
2105                |state: &ComplexState, action: &ComplexAction| match action {
2106                    ComplexAction::Add(n) => DispatchOp::Dispatch(
2107                        ComplexState {
2108                            value: state.value + n,
2109                            name: state.name.clone(),
2110                        },
2111                        vec![],
2112                    ),
2113                    ComplexAction::SetName(name) => DispatchOp::Dispatch(
2114                        ComplexState {
2115                            value: state.value,
2116                            name: name.clone(),
2117                        },
2118                        vec![],
2119                    ),
2120                },
2121            )),
2122        )
2123        .unwrap();
2124
2125        // dispatch some actions
2126        store.dispatch(ComplexAction::Add(10)).unwrap();
2127        store.dispatch(ComplexAction::SetName("test".to_string())).unwrap();
2128        store.dispatch(ComplexAction::Add(5)).unwrap();
2129
2130        // when: query the current state
2131        let queried_value = std::sync::Arc::new(std::sync::Mutex::new(0));
2132        let queried_name = std::sync::Arc::new(std::sync::Mutex::new(String::new()));
2133        let queried_value_clone = queried_value.clone();
2134        let queried_name_clone = queried_name.clone();
2135        store
2136            .query_state(move |state| {
2137                *queried_value_clone.lock().unwrap() = state.value;
2138                *queried_name_clone.lock().unwrap() = state.name.clone();
2139            })
2140            .unwrap();
2141
2142        store.stop().unwrap();
2143
2144        // then: should get the current state
2145        assert_eq!(*queried_value.lock().unwrap(), 15); // 0 + 10 + 5
2146        assert_eq!(*queried_name.lock().unwrap(), "test");
2147    }
2148
2149    /// Test query_state with multiple queries
2150    #[test]
2151    fn test_query_state_multiple_queries() {
2152        // given: store with a simple counter
2153        let store = StoreImpl::new_with_reducer(0, Box::new(TestReducer)).unwrap();
2154
2155        // dispatch some actions
2156        store.dispatch(5).unwrap();
2157        store.dispatch(10).unwrap();
2158
2159        // wait for actions to be processed
2160        std::thread::sleep(std::time::Duration::from_millis(100));
2161
2162        // when: query the state multiple times
2163        let query1_result = std::sync::Arc::new(std::sync::Mutex::new(0));
2164        let query2_result = std::sync::Arc::new(std::sync::Mutex::new(0));
2165
2166        let query1_clone = query1_result.clone();
2167        store
2168            .query_state(move |state| {
2169                *query1_clone.lock().unwrap() = *state;
2170            })
2171            .unwrap();
2172
2173        store.dispatch(20).unwrap();
2174
2175        // wait for action to be processed
2176        std::thread::sleep(std::time::Duration::from_millis(100));
2177
2178        let query2_clone = query2_result.clone();
2179        store
2180            .query_state(move |state| {
2181                *query2_clone.lock().unwrap() = *state;
2182            })
2183            .unwrap();
2184
2185        store.stop().unwrap();
2186
2187        // then: should get the correct states
2188        assert_eq!(*query1_result.lock().unwrap(), 15); // 0 + 5 + 10
2189        assert_eq!(*query2_result.lock().unwrap(), 35); // 15 + 20
2190    }
2191
2192    /// Test query_state with error handling
2193    #[test]
2194    fn test_query_state_error_handling() {
2195        // given: store with a simple counter
2196        let store = StoreImpl::new_with_reducer(0, Box::new(TestReducer)).unwrap();
2197
2198        // when: query the state (should succeed)
2199        let queried_value = std::sync::Arc::new(std::sync::Mutex::new(0));
2200        let queried_value_clone = queried_value.clone();
2201        let result = store.query_state(move |state| {
2202            *queried_value_clone.lock().unwrap() = *state;
2203        });
2204
2205        // then: should succeed and get the initial state
2206        assert!(result.is_ok());
2207        assert_eq!(*queried_value.lock().unwrap(), 0);
2208    }
2209}