rs_store/
store_impl.rs

1use crate::channel::{BackpressureChannel, BackpressurePolicy, ReceiverChannel, SenderChannel};
2use crate::dispatcher::Dispatcher;
3use crate::metrics::{CountMetrics, Metrics, MetricsSnapshot};
4use crate::middleware::Middleware;
5use crate::{DispatchOp, Effect, MiddlewareOp, Reducer, Subscriber, Subscription};
6use fmt::Debug;
7use rusty_pool::ThreadPool;
8use std::sync::{Arc, Mutex};
9use std::thread::JoinHandle;
10use std::time::{Duration, Instant};
11use std::{fmt, thread};
12
13use crate::iterator::{StateIterator, StateIteratorSubscriber};
14use crate::store::{Store, StoreError, DEFAULT_CAPACITY, DEFAULT_STORE_NAME};
15
16const DEFAULT_STOP_TIMEOUT: Duration = Duration::from_secs(10);
17
18#[derive(Debug, Clone, PartialEq)]
19pub enum ActionOp<Action>
20where
21    Action: Send + Sync + Clone + 'static,
22{
23    Action(Action),
24    #[allow(dead_code)]
25    Exit(Instant),
26}
27
28/// StoreImpl is the default implementation of a Redux store
29#[allow(clippy::type_complexity)]
30pub struct StoreImpl<State, Action>
31where
32    State: Send + Sync + Clone + 'static,
33    Action: Send + Sync + Clone + 'static,
34{
35    #[allow(dead_code)]
36    pub(crate) name: String,
37    state: Mutex<State>,
38    pub(crate) reducers: Mutex<Vec<Box<dyn Reducer<State, Action> + Send + Sync>>>,
39    pub(crate) subscribers: Arc<Mutex<Vec<Arc<dyn Subscriber<State, Action> + Send + Sync>>>>,
40    pub(crate) dispatch_tx: Mutex<Option<SenderChannel<Action>>>,
41    middlewares: Mutex<Vec<Arc<dyn Middleware<State, Action> + Send + Sync>>>,
42    pub(crate) metrics: Arc<CountMetrics>,
43    /// thread pool for the store
44    pub(crate) pool: Mutex<Option<ThreadPool>>,
45}
46
47/// Subscription for a subscriber
48/// the subscriber can use it to unsubscribe from the store
49struct SubscriberSubscription {
50    unsubscribe: Box<dyn Fn() + Send + Sync>,
51}
52
53impl Subscription for SubscriberSubscription {
54    fn unsubscribe(&self) {
55        (self.unsubscribe)();
56    }
57}
58
59impl<State, Action> StoreImpl<State, Action>
60where
61    State: Send + Sync + Clone + 'static,
62    Action: Send + Sync + Clone + 'static,
63{
64    /// create a new store with an initial state
65    pub fn new(state: State) -> Arc<StoreImpl<State, Action>> {
66        Self::new_with(
67            state,
68            vec![],
69            DEFAULT_STORE_NAME.into(),
70            DEFAULT_CAPACITY,
71            BackpressurePolicy::default(),
72            vec![],
73        )
74        .unwrap()
75    }
76
77    /// create a new store with a reducer and an initial state
78    pub fn new_with_reducer(
79        state: State,
80        reducer: Box<dyn Reducer<State, Action> + Send + Sync>,
81    ) -> Arc<StoreImpl<State, Action>> {
82        Self::new_with_name(state, reducer, DEFAULT_STORE_NAME.into()).unwrap()
83    }
84
85    /// create a new store with name
86    pub fn new_with_name(
87        state: State,
88        reducer: Box<dyn Reducer<State, Action> + Send + Sync>,
89        name: String,
90    ) -> Result<Arc<StoreImpl<State, Action>>, StoreError> {
91        Self::new_with(
92            state,
93            vec![reducer],
94            name,
95            DEFAULT_CAPACITY,
96            BackpressurePolicy::default(),
97            vec![],
98        )
99    }
100
101    /// create a new store
102    pub fn new_with(
103        state: State,
104        reducers: Vec<Box<dyn Reducer<State, Action> + Send + Sync>>,
105        name: String,
106        capacity: usize,
107        policy: BackpressurePolicy<Action>,
108        middlewares: Vec<Arc<dyn Middleware<State, Action> + Send + Sync>>,
109    ) -> Result<Arc<StoreImpl<State, Action>>, StoreError> {
110        let metrics = Arc::new(CountMetrics::default());
111        let (tx, rx) = BackpressureChannel::<Action>::pair_with(
112            "dispatch",
113            capacity,
114            policy.clone(),
115            Some(metrics.clone()),
116        );
117
118        let store = StoreImpl {
119            name: name.clone(),
120            state: Mutex::new(state),
121            reducers: Mutex::new(reducers),
122            subscribers: Arc::new(Mutex::new(Vec::default())),
123            middlewares: Mutex::new(middlewares),
124            dispatch_tx: Mutex::new(Some(tx)),
125            metrics,
126            pool: Mutex::new(Some(
127                rusty_pool::Builder::new().name(format!("{}-pool", name)).build(),
128            )),
129        };
130
131        // start a thread in which the store will listen for actions
132        let rx_store = Arc::new(store);
133        let tx_store = rx_store.clone();
134
135        // reducer 스레드
136        tx_store.pool.lock().unwrap().as_ref().unwrap().execute(move || {
137            #[cfg(feature = "store-log")]
138            eprintln!("store: reducer thread started");
139
140            while let Some(action_op) = rx.recv() {
141                let action_received_at = Instant::now();
142                rx_store.metrics.action_received(Some(&action_op));
143
144                match action_op {
145                    ActionOp::Action(action) => {
146                        let the_dispatcher = Arc::new(rx_store.clone());
147
148                        // do reduce
149                        let current_state = rx_store.state.lock().unwrap().clone();
150                        let (need_dispatch, new_state, effects) = rx_store.do_reduce(
151                            &action,
152                            current_state,
153                            the_dispatcher.clone(),
154                            action_received_at,
155                        );
156                        *rx_store.state.lock().unwrap() = new_state.clone();
157
158                        // do effects remain
159                        if let Some(mut effects) = effects {
160                            rx_store.do_effect(
161                                &action,
162                                &new_state,
163                                &mut effects,
164                                the_dispatcher.clone(),
165                            );
166                        }
167
168                        // do notify subscribers
169                        if need_dispatch {
170                            rx_store.do_notify(
171                                &action,
172                                &new_state,
173                                the_dispatcher.clone(),
174                                action_received_at,
175                            );
176                        }
177
178                        rx_store
179                            .metrics
180                            .action_executed(Some(&action), action_received_at.elapsed());
181                    }
182                    ActionOp::Exit(_) => {
183                        rx_store.on_close(action_received_at);
184                        #[cfg(feature = "store-log")]
185                        eprintln!("store: reducer loop exit");
186                        break;
187                    }
188                }
189            }
190
191            // drop all subscribers
192            rx_store.clear_subscribers();
193
194            #[cfg(feature = "store-log")]
195            eprintln!("store: reducer thread done");
196        });
197
198        Ok(tx_store)
199    }
200
201    /// get the latest state(for debugging)
202    ///
203    /// prefer to use `subscribe` to get the state
204    pub fn get_state(&self) -> State {
205        self.state.lock().unwrap().clone()
206    }
207
208    /// get the metrics
209    pub fn get_metrics(&self) -> MetricsSnapshot {
210        (&(*self.metrics)).into()
211    }
212
213    /// add a reducer to the store
214    pub fn add_reducer(&self, reducer: Box<dyn Reducer<State, Action> + Send + Sync>) {
215        self.reducers.lock().unwrap().push(reducer);
216    }
217
218    /// add a subscriber to the store
219    pub fn add_subscriber(
220        &self,
221        subscriber: Arc<dyn Subscriber<State, Action> + Send + Sync>,
222    ) -> Box<dyn Subscription> {
223        // append a subscriber
224        self.subscribers.lock().unwrap().push(subscriber.clone());
225
226        // disposer for the subscriber
227        let subscribers = self.subscribers.clone();
228        Box::new(SubscriberSubscription {
229            unsubscribe: Box::new(move || {
230                let mut subscribers = subscribers.lock().unwrap();
231                subscribers.retain(|s| {
232                    let retain = !Arc::ptr_eq(s, &subscriber);
233                    if !retain {
234                        s.on_unsubscribe();
235                    }
236                    retain
237                });
238            }),
239        })
240    }
241
242    /// clear all subscribers
243    pub(crate) fn clear_subscribers(&self) {
244        #[cfg(feature = "store-log")]
245        eprintln!("store: clear_subscribers");
246        match self.subscribers.lock() {
247            Ok(mut subscribers) => {
248                for subscriber in subscribers.iter() {
249                    subscriber.on_unsubscribe();
250                }
251                subscribers.clear();
252            }
253            Err(mut e) => {
254                #[cfg(feature = "store-log")]
255                eprintln!("store: Error while locking subscribers: {:?}", e);
256                for subscriber in e.get_ref().iter() {
257                    subscriber.on_unsubscribe();
258                }
259                e.get_mut().clear();
260            }
261        }
262    }
263
264    /// do reduce
265    ///
266    /// ### Return
267    /// * bool : true if the state to be dispatched
268    /// * effects : side effects
269    pub(crate) fn do_reduce(
270        &self,
271        action: &Action,
272        mut state: State,
273        dispatcher: Arc<dyn Dispatcher<Action>>,
274        action_received_at: Instant,
275    ) -> (bool, State, Option<Vec<Effect<Action>>>) {
276        //let state = self.state.lock().unwrap().clone();
277
278        let mut reduce_action = true;
279        if !self.middlewares.lock().unwrap().is_empty() {
280            let middleware_start = Instant::now();
281            let mut middleware_executed = 0;
282            for middleware in self.middlewares.lock().unwrap().iter() {
283                middleware_executed += 1;
284                match middleware.before_reduce(action, &state, dispatcher.clone()) {
285                    Ok(MiddlewareOp::ContinueAction) => {
286                        // continue dispatching the action
287                    }
288                    Ok(MiddlewareOp::DoneAction) => {
289                        // stop dispatching the action
290                        // last middleware wins
291                        reduce_action = false;
292                    }
293                    Ok(MiddlewareOp::BreakChain) => {
294                        // break the middleware chain
295                        break;
296                    }
297                    Err(e) => {
298                        middleware.on_error(e);
299                    }
300                }
301            }
302            let middleware_duration = middleware_start.elapsed();
303            self.metrics.middleware_executed(
304                Some(action),
305                "before_reduce",
306                middleware_executed,
307                middleware_duration,
308            );
309        }
310
311        let mut effects = vec![];
312        let mut need_dispatch = true;
313        if reduce_action {
314            let reducer_start = Instant::now();
315
316            for reducer in self.reducers.lock().unwrap().iter() {
317                match reducer.reduce(&state, action) {
318                    DispatchOp::Dispatch(new_state, effect) => {
319                        state = new_state;
320                        if let Some(effect) = effect {
321                            effects.push(effect);
322                        }
323                        need_dispatch = true;
324                    }
325                    DispatchOp::Keep(new_state, effect) => {
326                        // keep the state but do not dispatch
327                        state = new_state;
328                        if let Some(effect) = effect {
329                            effects.push(effect);
330                        }
331                        need_dispatch = false;
332                    }
333                }
334            }
335
336            // reducer 실행 시간 측정 종료 및 기록
337            let reducer_duration = reducer_start.elapsed();
338            self.metrics.action_reduced(
339                Some(action),
340                reducer_duration,
341                action_received_at.elapsed(),
342            );
343        }
344
345        (need_dispatch, state, Some(effects))
346    }
347
348    pub(crate) fn do_effect(
349        &self,
350        action: &Action,
351        state: &State,
352        effects: &mut Vec<Effect<Action>>,
353        dispatcher: Arc<dyn Dispatcher<Action>>,
354    ) {
355        let effect_start = Instant::now();
356        self.metrics.effect_issued(effects.len());
357
358        if !self.middlewares.lock().unwrap().is_empty() {
359            let middleware_start = Instant::now();
360            let mut middleware_executed = 0;
361            for middleware in self.middlewares.lock().unwrap().iter() {
362                middleware_executed += 1;
363                match middleware.before_effect(action, state, effects, dispatcher.clone()) {
364                    Ok(MiddlewareOp::ContinueAction) => {
365                        // do nothing
366                    }
367                    Ok(MiddlewareOp::DoneAction) => {
368                        // do nothing
369                    }
370                    Ok(MiddlewareOp::BreakChain) => {
371                        // break the middleware chain
372                        break;
373                    }
374                    Err(e) => {
375                        middleware.on_error(e);
376                    }
377                }
378            }
379
380            let middleware_duration = middleware_start.elapsed();
381            self.metrics.middleware_executed(
382                Some(action),
383                "before_effect",
384                middleware_executed,
385                middleware_duration,
386            );
387        }
388
389        let effects_total = effects.len();
390        while !effects.is_empty() {
391            let effect = effects.remove(0);
392            match effect {
393                Effect::Action(a) => {
394                    dispatcher.dispatch_thunk(Box::new(move |dispatcher| {
395                        dispatcher.dispatch(a).expect("no dispatch failed");
396                    }));
397                }
398                Effect::Task(task) => {
399                    dispatcher.dispatch_task(task);
400                }
401                Effect::Thunk(thunk) => {
402                    dispatcher.dispatch_thunk(thunk);
403                }
404                Effect::Function(_tok, func) => {
405                    dispatcher.dispatch_task(Box::new(move || {
406                        // when the result of the function needs to be handled, it should be done in middleware
407                        let _ = func();
408                    }));
409                }
410            };
411        }
412
413        let duration = effect_start.elapsed();
414        self.metrics.effect_executed(effects_total, duration);
415    }
416
417    pub(crate) fn do_notify(
418        &self,
419        action: &Action,
420        next_state: &State,
421        dispatcher: Arc<dyn Dispatcher<Action>>,
422        _action_received_at: Instant,
423    ) {
424        let _notify_start = Instant::now();
425        self.metrics.state_notified(Some(next_state));
426
427        let mut need_notify = true;
428        if !self.middlewares.lock().unwrap().is_empty() {
429            let middleware_start = Instant::now();
430            let mut middleware_executed = 0;
431            for middleware in self.middlewares.lock().unwrap().iter() {
432                middleware_executed += 1;
433                match middleware.before_dispatch(action, next_state, dispatcher.clone()) {
434                    Ok(MiddlewareOp::ContinueAction) => {
435                        // do nothing
436                    }
437                    Ok(MiddlewareOp::DoneAction) => {
438                        // last win
439                        need_notify = false;
440                    }
441                    Ok(MiddlewareOp::BreakChain) => {
442                        // break the middleware chain
443                        break;
444                    }
445                    Err(e) => {
446                        middleware.on_error(e);
447                    }
448                }
449            }
450            let middleware_duration = middleware_start.elapsed();
451            self.metrics.middleware_executed(
452                Some(action),
453                "before_dispatch",
454                middleware_executed,
455                middleware_duration,
456            );
457        }
458
459        if need_notify {
460            let subscribers = self.subscribers.lock().unwrap().clone();
461            for subscriber in subscribers.iter() {
462                subscriber.on_notify(next_state, action);
463            }
464            let duration = _notify_start.elapsed();
465            self.metrics.subscriber_notified(Some(action), subscribers.len(), duration);
466        }
467    }
468
469    fn on_close(&self, action_received_at: Instant) {
470        #[cfg(feature = "store-log")]
471        eprintln!("store: on_close");
472
473        self.metrics.action_executed(None, action_received_at.elapsed());
474    }
475
476    /// close the store
477    ///
478    /// send an exit action to the store and drop the dispatch channel
479    pub fn close(&self) {
480        match self.dispatch_tx.lock() {
481            Ok(mut tx) => {
482                if let Some(tx) = tx.take() {
483                    #[cfg(feature = "store-log")]
484                    eprintln!("store: closing dispatch channel");
485                    match tx.send(ActionOp::Exit(Instant::now())) {
486                        Ok(_) => {
487                            #[cfg(feature = "store-log")]
488                            eprintln!("store: dispatch channel sent exit");
489                        }
490                        Err(_e) => {
491                            #[cfg(feature = "store-log")]
492                            eprintln!("store: Error while closing dispatch channel");
493                        }
494                    }
495                    drop(tx);
496                }
497            }
498            Err(_e) => {
499                #[cfg(feature = "store-log")]
500                eprintln!("store: Error while locking dispatch channel: {:?}", _e);
501                return;
502            }
503        }
504
505        #[cfg(feature = "store-log")]
506        eprintln!("store: dispatch channel closed");
507    }
508
509    /// close the store and wait for the dispatcher to finish
510    pub fn stop(&self) {
511        self.close();
512
513        // Shutdown the thread pool with timeout
514        // lock pool
515        match self.pool.lock() {
516            Ok(mut pool) => {
517                if let Some(pool) = pool.take() {
518                    pool.shutdown_join();
519                }
520                #[cfg(feature = "store-log")]
521                eprintln!("store: shutdown pool");
522            }
523            Err(_e) => {
524                #[cfg(feature = "store-log")]
525                eprintln!("store: Error while locking pool: {:?}", _e);
526                return;
527            }
528        }
529
530        #[cfg(feature = "store-log")]
531        eprintln!("store: Store stopped");
532    }
533
534    /// close the store and wait for the dispatcher to finish
535    pub fn stop_with_timeout(&self, timeout: Duration) {
536        self.close();
537
538        // Shutdown the thread pool with timeout
539        // lock pool
540        match self.pool.lock() {
541            Ok(mut pool) => {
542                if let Some(pool) = pool.take() {
543                    pool.shutdown_join_timeout(timeout);
544                }
545                #[cfg(feature = "store-log")]
546                eprintln!("store: shutdown pool");
547            }
548            Err(_e) => {
549                #[cfg(feature = "store-log")]
550                eprintln!("store: Error while locking pool: {:?}", _e);
551                return;
552            }
553        }
554
555        #[cfg(feature = "store-log")]
556        eprintln!("store: Store stopped");
557    }
558
559    /// dispatch an action
560    ///
561    /// ### Return
562    /// * Ok(()) : if the action is dispatched
563    /// * Err(StoreError) : if the dispatch channel is closed
564    pub fn dispatch(&self, action: Action) -> Result<(), StoreError> {
565        let sender = self.dispatch_tx.lock().unwrap();
566        if let Some(tx) = sender.as_ref() {
567            // the number of remaining actions in the channel
568            let remains = tx.send(ActionOp::Action(action)).unwrap_or(0);
569            self.metrics.queue_size(remains as usize);
570            Ok(())
571        } else {
572            let err = StoreError::DispatchError("Dispatch channel is closed".to_string());
573            self.metrics.error_occurred(&err);
574            Err(err)
575        }
576    }
577
578    /// Add middleware
579    pub fn add_middleware(&self, middleware: Arc<dyn Middleware<State, Action> + Send + Sync>) {
580        self.middlewares.lock().unwrap().push(middleware.clone());
581    }
582
583    /// Iterator for the state
584    ///
585    /// it uses a channel to subscribe to the state changes
586    /// the channel is rendezvous(capacity 1), the store will block on the channel until the subscriber consumes the state
587    pub fn iter(&self) -> impl Iterator<Item = (State, Action)> {
588        self.iter_with(1, BackpressurePolicy::BlockOnFull)
589    }
590
591    /// Iterator for the state
592    ///  
593    /// ### Parameters
594    /// * capacity: the capacity of the channel
595    /// * policy: the backpressure policy
596    pub(crate) fn iter_with(
597        &self,
598        capacity: usize,
599        policy: BackpressurePolicy<(State, Action)>,
600    ) -> impl Iterator<Item = (State, Action)> {
601        let (iter_tx, iter_rx) = BackpressureChannel::<(State, Action)>::pair_with(
602            "store_iter",
603            capacity,
604            policy,
605            Some(self.metrics.clone()),
606        );
607
608        let subscription = self.add_subscriber(Arc::new(StateIteratorSubscriber::new(iter_tx)));
609        StateIterator::new(iter_rx, subscription)
610    }
611
612    /// subscribing to store updates in new context
613    /// with default capacity and `BlockOnFull` policy when the channel is full
614    ///
615    /// ## Parameters
616    /// * subscriber: The subscriber to subscribe to the store
617    ///
618    /// ## Return
619    /// * Subscription: Subscription for the store,
620    pub fn subscribed(
621        &self,
622        subscriber: Box<dyn Subscriber<State, Action> + Send + Sync>,
623    ) -> Result<Box<dyn Subscription>, StoreError> {
624        self.subscribed_with(
625            DEFAULT_CAPACITY,
626            BackpressurePolicy::BlockOnFull,
627            subscriber,
628        )
629    }
630
631    /// subscribing to store updates in new context
632    ///
633    /// ### Parameters
634    /// * capacity: Channel buffer capacity
635    /// * policy: Backpressure policy for when channel is full,
636    ///     `BlockOnFull` or `DropLatestIf` is supported to prevent from dropping the ActionOp::Exit
637    ///
638    /// ### Return
639    /// * Subscription: Subscription for the store,
640    pub fn subscribed_with(
641        &self,
642        capacity: usize,
643        policy: BackpressurePolicy<(Instant, State, Action)>,
644        subscriber: Box<dyn Subscriber<State, Action> + Send + Sync>,
645    ) -> Result<Box<dyn Subscription>, StoreError> {
646        // spsc channel
647        let (tx, rx) = BackpressureChannel::<(Instant, State, Action)>::pair_with(
648            format!("{}-channel", self.name).as_str(),
649            capacity,
650            policy,
651            Some(self.metrics.clone()),
652        );
653
654        // channeled thread
655        let thread_name = format!("{}-channeled-subscriber", self.name);
656        let metrics_clone = self.metrics.clone();
657        let builder = thread::Builder::new().name(thread_name.clone());
658        let handle = match builder.spawn(move || {
659            // subscribe to the store
660            Self::subscribed_loop(thread_name, rx, subscriber, metrics_clone);
661        }) {
662            Ok(h) => h,
663            Err(e) => {
664                #[cfg(feature = "store-log")]
665                eprintln!("store: Error while spawning channel thread: {:?}", e);
666                return Err(StoreError::SubscriptionError(format!(
667                    "Error while spawning channel thread: {:?}",
668                    e
669                )));
670            }
671        };
672
673        // subscribe to the store
674        let channel_subscriber = Arc::new(ChanneledSubscriber::new(handle, tx));
675        let subscription = self.add_subscriber(channel_subscriber.clone());
676
677        Ok(subscription)
678    }
679
680    fn subscribed_loop(
681        _name: String,
682        rx: ReceiverChannel<(Instant, State, Action)>,
683        subscriber: Box<dyn Subscriber<State, Action>>,
684        metrics: Arc<dyn Metrics>,
685    ) {
686        #[cfg(feature = "store-log")]
687        eprintln!("store: {} channel thread started", _name);
688
689        while let Some(msg) = rx.recv() {
690            match msg {
691                ActionOp::Action((created_at, state, action)) => {
692                    let started_at = Instant::now();
693                    {
694                        subscriber.on_notify(&state, &action);
695                    }
696                    metrics.subscriber_notified(Some(&action), 1, started_at.elapsed());
697
698                    // action executed
699                    metrics.action_executed(Some(&action), created_at.elapsed());
700                }
701                ActionOp::Exit(created_at) => {
702                    metrics.action_executed(None, created_at.elapsed());
703                    #[cfg(feature = "store-log")]
704                    eprintln!("store: {} channel thread loop exit", _name);
705                    break;
706                }
707            }
708        }
709
710        #[cfg(feature = "store-log")]
711        eprintln!("store: {} channel thread done", _name);
712    }
713}
714
715/// Subscriber implementation that forwards store updates to a channel
716struct ChanneledSubscriber<T>
717where
718    T: Send + Sync + Clone + 'static,
719{
720    handle: Mutex<Option<JoinHandle<()>>>,
721    tx: Mutex<Option<SenderChannel<T>>>,
722}
723
724impl<T> ChanneledSubscriber<T>
725where
726    T: Send + Sync + Clone + 'static,
727{
728    pub(crate) fn new(handle: JoinHandle<()>, tx: SenderChannel<T>) -> Self {
729        Self {
730            handle: Mutex::new(Some(handle)),
731            tx: Mutex::new(Some(tx)),
732        }
733    }
734
735    fn clear_resource(&self) {
736        // drop channel
737        if let Ok(mut tx_locked) = self.tx.lock() {
738            if let Some(tx) = tx_locked.take() {
739                let _ = tx.send(ActionOp::Exit(Instant::now()));
740                drop(tx);
741            };
742        }
743
744        // join the thread
745        if let Ok(mut handle) = self.handle.lock() {
746            if let Some(h) = handle.take() {
747                let _ = h.join();
748            }
749        }
750    }
751}
752
753impl<State, Action> Subscriber<State, Action> for ChanneledSubscriber<(Instant, State, Action)>
754where
755    State: Send + Sync + Clone + 'static,
756    Action: Send + Sync + Clone + 'static,
757{
758    fn on_notify(&self, state: &State, action: &Action) {
759        match self.tx.lock() {
760            Ok(tx) => {
761                tx.as_ref().map(|tx| {
762                    tx.send(ActionOp::Action((
763                        Instant::now(),
764                        state.clone(),
765                        action.clone(),
766                    )))
767                });
768            }
769            Err(_e) => {
770                #[cfg(feature = "store-log")]
771                eprintln!("store: Error while locking channel: {:?}", _e);
772            }
773        }
774    }
775
776    fn on_unsubscribe(&self) {
777        self.clear_resource();
778    }
779}
780
781impl<T> Subscription for ChanneledSubscriber<T>
782where
783    T: Send + Sync + Clone + 'static,
784{
785    fn unsubscribe(&self) {
786        self.clear_resource();
787    }
788}
789
790/// close tx channel when the store is dropped, but not the dispatcher
791/// if you want to stop the dispatcher, call the stop method
792impl<State, Action> Drop for StoreImpl<State, Action>
793where
794    State: Send + Sync + Clone + 'static,
795    Action: Send + Sync + Clone + 'static,
796{
797    fn drop(&mut self) {
798        self.close();
799
800        // Shutdown the thread pool with timeout
801        self.stop_with_timeout(DEFAULT_STOP_TIMEOUT);
802        // if let Ok(mut lk) = self.pool.lock() {
803        //     if let Some(pool) = lk.take() {
804        //         pool.shutdown_join_timeout(Duration::from_secs(3));
805        //     }
806        // }
807
808        #[cfg(feature = "store-log")]
809        eprintln!("store: '{}' Store dropped", self.name);
810    }
811}
812
813impl<State, Action> Store<State, Action> for StoreImpl<State, Action>
814where
815    State: Send + Sync + Clone + 'static,
816    Action: Send + Sync + Clone + 'static,
817{
818    fn get_state(&self) -> State {
819        self.get_state()
820    }
821
822    fn dispatch(&self, action: Action) -> Result<(), StoreError> {
823        self.dispatch(action)
824    }
825
826    fn add_subscriber(
827        &self,
828        subscriber: Arc<dyn Subscriber<State, Action> + Send + Sync>,
829    ) -> Box<dyn Subscription> {
830        self.add_subscriber(subscriber)
831    }
832
833    fn subscribed(
834        &self,
835        subscriber: Box<dyn Subscriber<State, Action> + Send + Sync>,
836    ) -> Result<Box<dyn Subscription>, StoreError> {
837        self.subscribed(subscriber)
838    }
839
840    fn subscribed_with(
841        &self,
842        capacity: usize,
843        policy: BackpressurePolicy<(Instant, State, Action)>,
844        subscriber: Box<dyn Subscriber<State, Action> + Send + Sync>,
845    ) -> Result<Box<dyn Subscription>, StoreError> {
846        self.subscribed_with(capacity, policy, subscriber)
847    }
848
849    fn stop(&self) {
850        self.stop();
851    }
852}
853
854#[cfg(test)]
855mod tests {
856    use super::*;
857    use std::thread;
858
859    struct TestChannelSubscriber {
860        received: Arc<Mutex<Vec<(i32, i32)>>>,
861    }
862
863    impl TestChannelSubscriber {
864        fn new(received: Arc<Mutex<Vec<(i32, i32)>>>) -> Self {
865            Self { received }
866        }
867    }
868
869    impl Subscriber<i32, i32> for TestChannelSubscriber {
870        fn on_notify(&self, state: &i32, action: &i32) {
871            println!("TestChannelSubscriber: state={}, action={}", state, action);
872            self.received.lock().unwrap().push((*state, *action));
873        }
874    }
875
876    struct TestReducer;
877
878    impl Reducer<i32, i32> for TestReducer {
879        fn reduce(&self, state: &i32, action: &i32) -> DispatchOp<i32, i32> {
880            DispatchOp::Dispatch(state + action, None)
881        }
882    }
883
884    struct SlowSubscriber {
885        received: Arc<Mutex<Vec<(i32, i32)>>>,
886        delay: Duration,
887    }
888
889    impl SlowSubscriber {
890        fn new(received: Arc<Mutex<Vec<(i32, i32)>>>, delay: Duration) -> Self {
891            Self { received, delay }
892        }
893    }
894
895    impl Subscriber<i32, i32> for SlowSubscriber {
896        fn on_notify(&self, state: &i32, action: &i32) {
897            println!("SlowSubscriber: state={}, action={}", state, action);
898            std::thread::sleep(self.delay);
899            self.received.lock().unwrap().push((*state, *action));
900        }
901    }
902
903    #[test]
904    fn test_store_subscribed_basic() {
905        // Setup store with a simple counter
906        let initial_state = 0;
907        let reducer = Box::new(TestReducer);
908        let store = StoreImpl::new_with_reducer(initial_state, reducer);
909
910        // Create subscriber to receive updates
911        let received_states = Arc::new(Mutex::new(Vec::new()));
912        let subscriber1 = Box::new(TestChannelSubscriber::new(received_states.clone()));
913        // Create channel
914        let subscription = store.subscribed_with(10, BackpressurePolicy::DropOldest, subscriber1);
915
916        // Dispatch some actions
917        store.dispatch(1).unwrap();
918        store.dispatch(2).unwrap();
919
920        // Give some time for processing
921        // thread::sleep(Duration::from_millis(100));
922        store.stop();
923
924        // unsubscribe from the channel
925        subscription.unwrap().unsubscribe();
926
927        // Verify received updates
928        let states = received_states.lock().unwrap();
929        assert_eq!(states.len(), 2);
930        assert_eq!(states[0], (1, 1)); // (state, action)
931        assert_eq!(states[1], (3, 2)); // state=1+2, action=2
932    }
933
934    #[test]
935    fn test_store_subscribed_backpressure() {
936        let store = StoreImpl::new_with_reducer(0, Box::new(TestReducer));
937
938        let received = Arc::new(Mutex::new(Vec::new()));
939        let received_clone = received.clone();
940        let subscriber = Box::new(SlowSubscriber::new(
941            received_clone,
942            Duration::from_millis(100),
943        ));
944        // Create channel with small capacity
945        let subscription = store.subscribed_with(1, BackpressurePolicy::DropOldest, subscriber);
946
947        // Fill the channel
948        for i in 0..5 {
949            store.dispatch(i).unwrap();
950        }
951
952        // Give some time for having channel thread to process
953        thread::sleep(Duration::from_millis(200));
954        store.stop();
955        subscription.unwrap().unsubscribe();
956
957        // Should only receive the latest updates due to backpressure
958        let received = received.lock().unwrap();
959        assert!(received.len() <= 2); // Some messages should be dropped
960
961        if let Some((state, action)) = received.last() {
962            assert_eq!(*action, 4); // Last action should be received
963            assert!(*state <= 10); // Final state should be sum of 0..5
964        }
965    }
966
967    #[test]
968    fn test_store_subscribed_subscription() {
969        let store = StoreImpl::new_with_reducer(0, Box::new(TestReducer));
970
971        let received = Arc::new(Mutex::new(Vec::new()));
972        let subscriber1 = Box::new(TestChannelSubscriber::new(received.clone()));
973        let subscription = store.subscribed_with(10, BackpressurePolicy::DropOldest, subscriber1);
974
975        // Dispatch some actions
976        store.dispatch(1).unwrap();
977
978        // give some time for processing
979        thread::sleep(Duration::from_millis(100));
980        // subscriber should receive the state
981        assert_eq!(received.lock().unwrap().len(), 1);
982
983        // unsubscribe
984        subscription.unwrap().unsubscribe();
985
986        // dispatch more actions
987        store.dispatch(2).unwrap();
988        store.dispatch(3).unwrap();
989        // give some time for processing
990        store.stop();
991        // subscriber should not receive the state
992        assert_eq!(received.lock().unwrap().len(), 1);
993    }
994}