rs_store/
store_impl.rs

1use crate::channel::{BackpressureChannel, BackpressurePolicy, ReceiverChannel, SenderChannel};
2use crate::dispatcher::Dispatcher;
3use crate::metrics::{CountMetrics, Metrics, MetricsSnapshot};
4use crate::middleware::Middleware;
5use crate::{DispatchOp, Effect, MiddlewareOp, Reducer, Subscriber, Subscription};
6use fmt::Debug;
7use rusty_pool::ThreadPool;
8use std::sync::{Arc, Mutex};
9use std::thread::JoinHandle;
10use std::time::{Duration, Instant};
11use std::{fmt, thread};
12
13use crate::iterator::{StateIterator, StateIteratorSubscriber};
14use crate::store::{Store, StoreError, DEFAULT_CAPACITY, DEFAULT_STORE_NAME};
15
16const DEFAULT_STOP_TIMEOUT: Duration = Duration::from_secs(10);
17
18#[derive(Debug, Clone, PartialEq)]
19pub enum ActionOp<Action>
20where
21    Action: Send + Sync + Clone + 'static,
22{
23    Action(Action),
24    AddSubscriber,
25    #[allow(dead_code)]
26    Exit(Instant),
27}
28
29/// StoreImpl is the default implementation of a Redux store
30#[allow(clippy::type_complexity)]
31pub struct StoreImpl<State, Action>
32where
33    State: Send + Sync + Clone + 'static,
34    Action: Send + Sync + Clone + 'static,
35{
36    #[allow(dead_code)]
37    pub(crate) name: String,
38    state: Mutex<State>,
39    pub(crate) reducers: Mutex<Vec<Box<dyn Reducer<State, Action> + Send + Sync>>>,
40    pub(crate) subscribers: Arc<Mutex<Vec<Arc<dyn Subscriber<State, Action> + Send + Sync>>>>,
41    /// 임시로 추가될 subscriber들을 저장하는 벡터
42    adding_subscribers: Arc<Mutex<Vec<Arc<dyn Subscriber<State, Action> + Send + Sync>>>>,
43    pub(crate) dispatch_tx: Mutex<Option<SenderChannel<Action>>>,
44    middlewares: Mutex<Vec<Arc<dyn Middleware<State, Action> + Send + Sync>>>,
45    pub(crate) metrics: Arc<CountMetrics>,
46    /// thread pool for the store
47    pub(crate) pool: Mutex<Option<ThreadPool>>,
48}
49
50/// Subscription for a subscriber
51/// the subscriber can use it to unsubscribe from the store
52struct SubscriberSubscription {
53    unsubscribe: Box<dyn Fn() + Send + Sync>,
54}
55
56impl Subscription for SubscriberSubscription {
57    fn unsubscribe(&self) {
58        (self.unsubscribe)();
59    }
60}
61
62impl<State, Action> StoreImpl<State, Action>
63where
64    State: Send + Sync + Clone + 'static,
65    Action: Send + Sync + Clone + 'static,
66{
67    /// create a new store with an initial state
68    pub fn new(state: State) -> Arc<StoreImpl<State, Action>> {
69        Self::new_with(
70            state,
71            vec![],
72            DEFAULT_STORE_NAME.into(),
73            DEFAULT_CAPACITY,
74            BackpressurePolicy::default(),
75            vec![],
76        )
77        .unwrap()
78    }
79
80    /// create a new store with a reducer and an initial state
81    pub fn new_with_reducer(
82        state: State,
83        reducer: Box<dyn Reducer<State, Action> + Send + Sync>,
84    ) -> Arc<StoreImpl<State, Action>> {
85        Self::new_with_name(state, reducer, DEFAULT_STORE_NAME.into()).unwrap()
86    }
87
88    /// create a new store with name
89    pub fn new_with_name(
90        state: State,
91        reducer: Box<dyn Reducer<State, Action> + Send + Sync>,
92        name: String,
93    ) -> Result<Arc<StoreImpl<State, Action>>, StoreError> {
94        Self::new_with(
95            state,
96            vec![reducer],
97            name,
98            DEFAULT_CAPACITY,
99            BackpressurePolicy::default(),
100            vec![],
101        )
102    }
103
104    /// create a new store
105    pub fn new_with(
106        state: State,
107        reducers: Vec<Box<dyn Reducer<State, Action> + Send + Sync>>,
108        name: String,
109        capacity: usize,
110        policy: BackpressurePolicy<Action>,
111        middlewares: Vec<Arc<dyn Middleware<State, Action> + Send + Sync>>,
112    ) -> Result<Arc<StoreImpl<State, Action>>, StoreError> {
113        let metrics = Arc::new(CountMetrics::default());
114        let (tx, rx) = BackpressureChannel::<Action>::pair_with(
115            "dispatch",
116            capacity,
117            policy.clone(),
118            Some(metrics.clone()),
119        );
120
121        let store = StoreImpl {
122            name: name.clone(),
123            state: Mutex::new(state),
124            reducers: Mutex::new(reducers),
125            subscribers: Arc::new(Mutex::new(Vec::default())),
126            adding_subscribers: Arc::new(Mutex::new(Vec::default())),
127            middlewares: Mutex::new(middlewares),
128            dispatch_tx: Mutex::new(Some(tx)),
129            metrics,
130            pool: Mutex::new(Some(
131                rusty_pool::Builder::new().name(format!("{}-pool", name)).build(),
132            )),
133        };
134
135        // start a thread in which the store will listen for actions
136        let rx_store = Arc::new(store);
137        let tx_store = rx_store.clone();
138
139        // reducer 스레드
140        tx_store.pool.lock().unwrap().as_ref().unwrap().execute(move || {
141            #[cfg(feature = "store-log")]
142            eprintln!("store: reducer thread started");
143
144            while let Some(action_op) = rx.recv() {
145                let action_received_at = Instant::now();
146                rx_store.metrics.action_received(Some(&action_op));
147
148                match action_op {
149                    ActionOp::Action(action) => {
150                        let the_dispatcher = Arc::new(rx_store.clone());
151
152                        // do reduce
153                        let current_state = rx_store.state.lock().unwrap().clone();
154                        let (need_dispatch, new_state, effects) = rx_store.do_reduce(
155                            &action,
156                            current_state,
157                            the_dispatcher.clone(),
158                            action_received_at,
159                        );
160                        *rx_store.state.lock().unwrap() = new_state.clone();
161
162                        // do effects remain
163                        if let Some(mut effects) = effects {
164                            rx_store.do_effect(
165                                &action,
166                                &new_state,
167                                &mut effects,
168                                the_dispatcher.clone(),
169                            );
170                        }
171
172                        // do notify subscribers
173                        if need_dispatch {
174                            rx_store.do_notify(
175                                &action,
176                                &new_state,
177                                the_dispatcher.clone(),
178                                action_received_at,
179                            );
180                        }
181
182                        rx_store
183                            .metrics
184                            .action_executed(Some(&action), action_received_at.elapsed());
185                    }
186                    ActionOp::AddSubscriber => {
187                        let current_state = rx_store.state.lock().unwrap().clone();
188                        let mut adding_subscribers = rx_store.adding_subscribers.lock().unwrap();
189                        let mut subscribers = rx_store.subscribers.lock().unwrap();
190
191                        // 새로운 subscriber들에게 최신 상태를 전달하고 subscribers에 추가
192                        for subscriber in adding_subscribers.drain(..) {
193                            // 새로운 subscriber에게 최신 상태를 전달
194                            subscriber.on_subscribe(&current_state);
195                            subscribers.push(subscriber);
196                        }
197
198                        #[cfg(feature = "store-log")]
199                        eprintln!("store: new subscribers added");
200                    }
201                    ActionOp::Exit(_) => {
202                        rx_store.on_close(action_received_at);
203                        #[cfg(feature = "store-log")]
204                        eprintln!("store: reducer loop exit");
205                        break;
206                    }
207                }
208            }
209
210            // drop all subscribers
211            rx_store.clear_subscribers();
212
213            #[cfg(feature = "store-log")]
214            eprintln!("store: reducer thread done");
215        });
216
217        Ok(tx_store)
218    }
219
220    /// get the latest state(for debugging)
221    ///
222    /// prefer to use `subscribe` to get the state
223    pub fn get_state(&self) -> State {
224        self.state.lock().unwrap().clone()
225    }
226
227    /// get the metrics
228    pub fn get_metrics(&self) -> MetricsSnapshot {
229        (&(*self.metrics)).into()
230    }
231
232    /// add a reducer to the store
233    pub fn add_reducer(&self, reducer: Box<dyn Reducer<State, Action> + Send + Sync>) {
234        self.reducers.lock().unwrap().push(reducer);
235    }
236
237    /// add a subscriber to the store
238    pub fn add_subscriber(
239        &self,
240        subscriber: Arc<dyn Subscriber<State, Action> + Send + Sync>,
241    ) -> Box<dyn Subscription> {
242        // 새로운 subscriber를 adding_subscribers에 추가
243        self.adding_subscribers.lock().unwrap().push(subscriber.clone());
244
245        // ActionOp::AddSubscriber 액션을 전달하여 reducer에서 처리하도록 함
246        if let Some(tx) = self.dispatch_tx.lock().unwrap().as_ref() {
247            let _ = tx.send(ActionOp::AddSubscriber);
248        }
249
250        // disposer for the subscriber
251        let subscribers = self.subscribers.clone();
252        let adding_subscribers = self.adding_subscribers.clone();
253        Box::new(SubscriberSubscription {
254            unsubscribe: Box::new(move || {
255                let mut subscribers = subscribers.lock().unwrap();
256                subscribers.retain(|s| {
257                    let retain = !Arc::ptr_eq(s, &subscriber);
258                    if !retain {
259                        s.on_unsubscribe();
260                    }
261                    retain
262                });
263
264                // adding_subscribers에서도 제거
265                let mut adding = adding_subscribers.lock().unwrap();
266                adding.retain(|s| !Arc::ptr_eq(s, &subscriber));
267            }),
268        })
269    }
270
271    /// clear all subscribers
272    pub(crate) fn clear_subscribers(&self) {
273        #[cfg(feature = "store-log")]
274        eprintln!("store: clear_subscribers");
275        match self.subscribers.lock() {
276            Ok(mut subscribers) => {
277                for subscriber in subscribers.iter() {
278                    subscriber.on_unsubscribe();
279                }
280                subscribers.clear();
281            }
282            Err(mut e) => {
283                #[cfg(feature = "store-log")]
284                eprintln!("store: Error while locking subscribers: {:?}", e);
285                for subscriber in e.get_ref().iter() {
286                    subscriber.on_unsubscribe();
287                }
288                e.get_mut().clear();
289            }
290        }
291    }
292
293    /// do reduce
294    ///
295    /// ### Return
296    /// * bool : true if the state to be dispatched
297    /// * effects : side effects
298    pub(crate) fn do_reduce(
299        &self,
300        action: &Action,
301        mut state: State,
302        dispatcher: Arc<dyn Dispatcher<Action>>,
303        action_received_at: Instant,
304    ) -> (bool, State, Option<Vec<Effect<Action>>>) {
305        //let state = self.state.lock().unwrap().clone();
306
307        let mut reduce_action = true;
308        if !self.middlewares.lock().unwrap().is_empty() {
309            let middleware_start = Instant::now();
310            let mut middleware_executed = 0;
311            for middleware in self.middlewares.lock().unwrap().iter() {
312                middleware_executed += 1;
313                match middleware.before_reduce(action, &state, dispatcher.clone()) {
314                    Ok(MiddlewareOp::ContinueAction) => {
315                        // continue dispatching the action
316                    }
317                    Ok(MiddlewareOp::DoneAction) => {
318                        // stop dispatching the action
319                        // last middleware wins
320                        reduce_action = false;
321                    }
322                    Ok(MiddlewareOp::BreakChain) => {
323                        // break the middleware chain
324                        break;
325                    }
326                    Err(e) => {
327                        middleware.on_error(e);
328                    }
329                }
330            }
331            let middleware_duration = middleware_start.elapsed();
332            self.metrics.middleware_executed(
333                Some(action),
334                "before_reduce",
335                middleware_executed,
336                middleware_duration,
337            );
338        }
339
340        let mut effects = vec![];
341        let mut need_dispatch = true;
342        if reduce_action {
343            let reducer_start = Instant::now();
344
345            for reducer in self.reducers.lock().unwrap().iter() {
346                match reducer.reduce(&state, action) {
347                    DispatchOp::Dispatch(new_state, effect) => {
348                        state = new_state;
349                        if let Some(effect) = effect {
350                            effects.push(effect);
351                        }
352                        need_dispatch = true;
353                    }
354                    DispatchOp::Keep(new_state, effect) => {
355                        // keep the state but do not dispatch
356                        state = new_state;
357                        if let Some(effect) = effect {
358                            effects.push(effect);
359                        }
360                        need_dispatch = false;
361                    }
362                }
363            }
364
365            // reducer 실행 시간 측정 종료 및 기록
366            let reducer_duration = reducer_start.elapsed();
367            self.metrics.action_reduced(
368                Some(action),
369                reducer_duration,
370                action_received_at.elapsed(),
371            );
372        }
373
374        (need_dispatch, state, Some(effects))
375    }
376
377    pub(crate) fn do_effect(
378        &self,
379        action: &Action,
380        state: &State,
381        effects: &mut Vec<Effect<Action>>,
382        dispatcher: Arc<dyn Dispatcher<Action>>,
383    ) {
384        let effect_start = Instant::now();
385        self.metrics.effect_issued(effects.len());
386
387        if !self.middlewares.lock().unwrap().is_empty() {
388            let middleware_start = Instant::now();
389            let mut middleware_executed = 0;
390            for middleware in self.middlewares.lock().unwrap().iter() {
391                middleware_executed += 1;
392                match middleware.before_effect(action, state, effects, dispatcher.clone()) {
393                    Ok(MiddlewareOp::ContinueAction) => {
394                        // do nothing
395                    }
396                    Ok(MiddlewareOp::DoneAction) => {
397                        // do nothing
398                    }
399                    Ok(MiddlewareOp::BreakChain) => {
400                        // break the middleware chain
401                        break;
402                    }
403                    Err(e) => {
404                        middleware.on_error(e);
405                    }
406                }
407            }
408
409            let middleware_duration = middleware_start.elapsed();
410            self.metrics.middleware_executed(
411                Some(action),
412                "before_effect",
413                middleware_executed,
414                middleware_duration,
415            );
416        }
417
418        let effects_total = effects.len();
419        while !effects.is_empty() {
420            let effect = effects.remove(0);
421            match effect {
422                Effect::Action(a) => {
423                    dispatcher.dispatch_thunk(Box::new(move |dispatcher| {
424                        dispatcher.dispatch(a).expect("no dispatch failed");
425                    }));
426                }
427                Effect::Task(task) => {
428                    dispatcher.dispatch_task(task);
429                }
430                Effect::Thunk(thunk) => {
431                    dispatcher.dispatch_thunk(thunk);
432                }
433                Effect::Function(_tok, func) => {
434                    dispatcher.dispatch_task(Box::new(move || {
435                        // when the result of the function needs to be handled, it should be done in middleware
436                        let _ = func();
437                    }));
438                }
439            };
440        }
441
442        let duration = effect_start.elapsed();
443        self.metrics.effect_executed(effects_total, duration);
444    }
445
446    pub(crate) fn do_notify(
447        &self,
448        action: &Action,
449        next_state: &State,
450        dispatcher: Arc<dyn Dispatcher<Action>>,
451        _action_received_at: Instant,
452    ) {
453        let _notify_start = Instant::now();
454        self.metrics.state_notified(Some(next_state));
455
456        let mut need_notify = true;
457        if !self.middlewares.lock().unwrap().is_empty() {
458            let middleware_start = Instant::now();
459            let mut middleware_executed = 0;
460            for middleware in self.middlewares.lock().unwrap().iter() {
461                middleware_executed += 1;
462                match middleware.before_dispatch(action, next_state, dispatcher.clone()) {
463                    Ok(MiddlewareOp::ContinueAction) => {
464                        // do nothing
465                    }
466                    Ok(MiddlewareOp::DoneAction) => {
467                        // last win
468                        need_notify = false;
469                    }
470                    Ok(MiddlewareOp::BreakChain) => {
471                        // break the middleware chain
472                        break;
473                    }
474                    Err(e) => {
475                        middleware.on_error(e);
476                    }
477                }
478            }
479            let middleware_duration = middleware_start.elapsed();
480            self.metrics.middleware_executed(
481                Some(action),
482                "before_dispatch",
483                middleware_executed,
484                middleware_duration,
485            );
486        }
487
488        if need_notify {
489            let subscribers = self.subscribers.lock().unwrap().clone();
490            for subscriber in subscribers.iter() {
491                subscriber.on_notify(next_state, action);
492            }
493            let duration = _notify_start.elapsed();
494            self.metrics.subscriber_notified(Some(action), subscribers.len(), duration);
495        }
496    }
497
498    fn on_close(&self, action_received_at: Instant) {
499        #[cfg(feature = "store-log")]
500        eprintln!("store: on_close");
501
502        self.metrics.action_executed(None, action_received_at.elapsed());
503    }
504
505    /// close the store
506    ///
507    /// send an exit action to the store and drop the dispatch channel
508    pub fn close(&self) {
509        match self.dispatch_tx.lock() {
510            Ok(mut tx) => {
511                if let Some(tx) = tx.take() {
512                    #[cfg(feature = "store-log")]
513                    eprintln!("store: closing dispatch channel");
514                    match tx.send(ActionOp::Exit(Instant::now())) {
515                        Ok(_) => {
516                            #[cfg(feature = "store-log")]
517                            eprintln!("store: dispatch channel sent exit");
518                        }
519                        Err(_e) => {
520                            #[cfg(feature = "store-log")]
521                            eprintln!("store: Error while closing dispatch channel");
522                        }
523                    }
524                    drop(tx);
525                }
526            }
527            Err(_e) => {
528                #[cfg(feature = "store-log")]
529                eprintln!("store: Error while locking dispatch channel: {:?}", _e);
530                return;
531            }
532        }
533
534        #[cfg(feature = "store-log")]
535        eprintln!("store: dispatch channel closed");
536    }
537
538    /// close the store and wait for the dispatcher to finish
539    pub fn stop(&self) {
540        self.close();
541
542        // Shutdown the thread pool with timeout
543        // lock pool
544        match self.pool.lock() {
545            Ok(mut pool) => {
546                if let Some(pool) = pool.take() {
547                    pool.shutdown_join();
548                }
549                #[cfg(feature = "store-log")]
550                eprintln!("store: shutdown pool");
551            }
552            Err(_e) => {
553                #[cfg(feature = "store-log")]
554                eprintln!("store: Error while locking pool: {:?}", _e);
555                return;
556            }
557        }
558
559        #[cfg(feature = "store-log")]
560        eprintln!("store: Store stopped");
561    }
562
563    /// close the store and wait for the dispatcher to finish
564    pub fn stop_with_timeout(&self, timeout: Duration) {
565        self.close();
566
567        // Shutdown the thread pool with timeout
568        // lock pool
569        match self.pool.lock() {
570            Ok(mut pool) => {
571                if let Some(pool) = pool.take() {
572                    pool.shutdown_join_timeout(timeout);
573                }
574                #[cfg(feature = "store-log")]
575                eprintln!("store: shutdown pool");
576            }
577            Err(_e) => {
578                #[cfg(feature = "store-log")]
579                eprintln!("store: Error while locking pool: {:?}", _e);
580                return;
581            }
582        }
583
584        #[cfg(feature = "store-log")]
585        eprintln!("store: Store stopped");
586    }
587
588    /// dispatch an action
589    ///
590    /// ### Return
591    /// * Ok(()) : if the action is dispatched
592    /// * Err(StoreError) : if the dispatch channel is closed
593    pub fn dispatch(&self, action: Action) -> Result<(), StoreError> {
594        let sender = self.dispatch_tx.lock().unwrap();
595        if let Some(tx) = sender.as_ref() {
596            // the number of remaining actions in the channel
597            let remains = tx.send(ActionOp::Action(action)).unwrap_or(0);
598            self.metrics.queue_size(remains as usize);
599            Ok(())
600        } else {
601            let err = StoreError::DispatchError("Dispatch channel is closed".to_string());
602            self.metrics.error_occurred(&err);
603            Err(err)
604        }
605    }
606
607    /// Add middleware
608    pub fn add_middleware(&self, middleware: Arc<dyn Middleware<State, Action> + Send + Sync>) {
609        self.middlewares.lock().unwrap().push(middleware.clone());
610    }
611
612    /// Iterator for the state
613    ///
614    /// it uses a channel to subscribe to the state changes
615    /// the channel is rendezvous(capacity 1), the store will block on the channel until the subscriber consumes the state
616    pub fn iter(&self) -> impl Iterator<Item = (State, Action)> {
617        self.iter_with(1, BackpressurePolicy::BlockOnFull)
618    }
619
620    /// Iterator for the state
621    ///  
622    /// ### Parameters
623    /// * capacity: the capacity of the channel
624    /// * policy: the backpressure policy
625    pub(crate) fn iter_with(
626        &self,
627        capacity: usize,
628        policy: BackpressurePolicy<(State, Action)>,
629    ) -> impl Iterator<Item = (State, Action)> {
630        let (iter_tx, iter_rx) = BackpressureChannel::<(State, Action)>::pair_with(
631            "store_iter",
632            capacity,
633            policy,
634            Some(self.metrics.clone()),
635        );
636
637        let subscription = self.add_subscriber(Arc::new(StateIteratorSubscriber::new(iter_tx)));
638        StateIterator::new(iter_rx, subscription)
639    }
640
641    /// subscribing to store updates in new context
642    /// with default capacity and `BlockOnFull` policy when the channel is full
643    ///
644    /// ## Parameters
645    /// * subscriber: The subscriber to subscribe to the store
646    ///
647    /// ## Return
648    /// * Subscription: Subscription for the store,
649    pub fn subscribed(
650        &self,
651        subscriber: Box<dyn Subscriber<State, Action> + Send + Sync>,
652    ) -> Result<Box<dyn Subscription>, StoreError> {
653        self.subscribed_with(
654            DEFAULT_CAPACITY,
655            BackpressurePolicy::BlockOnFull,
656            subscriber,
657        )
658    }
659
660    /// subscribing to store updates in new context
661    ///
662    /// ### Parameters
663    /// * capacity: Channel buffer capacity
664    /// * policy: Backpressure policy for when channel is full,
665    ///     `BlockOnFull` or `DropLatestIf` is supported to prevent from dropping the ActionOp::Exit
666    ///
667    /// ### Return
668    /// * Subscription: Subscription for the store,
669    pub fn subscribed_with(
670        &self,
671        capacity: usize,
672        policy: BackpressurePolicy<(Instant, State, Action)>,
673        subscriber: Box<dyn Subscriber<State, Action> + Send + Sync>,
674    ) -> Result<Box<dyn Subscription>, StoreError> {
675        // spsc channel
676        let (tx, rx) = BackpressureChannel::<(Instant, State, Action)>::pair_with(
677            format!("{}-channel", self.name).as_str(),
678            capacity,
679            policy,
680            Some(self.metrics.clone()),
681        );
682
683        // channeled thread
684        let thread_name = format!("{}-channeled-subscriber", self.name);
685        let metrics_clone = self.metrics.clone();
686        let builder = thread::Builder::new().name(thread_name.clone());
687        let handle = match builder.spawn(move || {
688            // subscribe to the store
689            Self::subscribed_loop(thread_name, rx, subscriber, metrics_clone);
690        }) {
691            Ok(h) => h,
692            Err(e) => {
693                #[cfg(feature = "store-log")]
694                eprintln!("store: Error while spawning channel thread: {:?}", e);
695                return Err(StoreError::SubscriptionError(format!(
696                    "Error while spawning channel thread: {:?}",
697                    e
698                )));
699            }
700        };
701
702        // subscribe to the store
703        let channel_subscriber = Arc::new(ChanneledSubscriber::new(handle, tx));
704        let subscription = self.add_subscriber(channel_subscriber.clone());
705
706        Ok(subscription)
707    }
708
709    fn subscribed_loop(
710        _name: String,
711        rx: ReceiverChannel<(Instant, State, Action)>,
712        subscriber: Box<dyn Subscriber<State, Action>>,
713        metrics: Arc<dyn Metrics>,
714    ) {
715        #[cfg(feature = "store-log")]
716        eprintln!("store: {} channel thread started", _name);
717
718        while let Some(msg) = rx.recv() {
719            match msg {
720                ActionOp::Action((created_at, state, action)) => {
721                    let started_at = Instant::now();
722                    {
723                        subscriber.on_notify(&state, &action);
724                    }
725                    metrics.subscriber_notified(Some(&action), 1, started_at.elapsed());
726
727                    // action executed
728                    metrics.action_executed(Some(&action), created_at.elapsed());
729                }
730                ActionOp::AddSubscriber => {
731                    // AddSubscriber는 채널된 subscriber에서는 처리하지 않음
732                    // 이는 메인 reducer 스레드에서만 처리됨
733                    #[cfg(feature = "store-log")]
734                    eprintln!("store: {} received AddSubscriber (ignored)", _name);
735                }
736                ActionOp::Exit(created_at) => {
737                    metrics.action_executed(None, created_at.elapsed());
738                    #[cfg(feature = "store-log")]
739                    eprintln!("store: {} channel thread loop exit", _name);
740                    break;
741                }
742            }
743        }
744
745        #[cfg(feature = "store-log")]
746        eprintln!("store: {} channel thread done", _name);
747    }
748}
749
750/// Subscriber implementation that forwards store updates to a channel
751struct ChanneledSubscriber<T>
752where
753    T: Send + Sync + Clone + 'static,
754{
755    handle: Mutex<Option<JoinHandle<()>>>,
756    tx: Mutex<Option<SenderChannel<T>>>,
757}
758
759impl<T> ChanneledSubscriber<T>
760where
761    T: Send + Sync + Clone + 'static,
762{
763    pub(crate) fn new(handle: JoinHandle<()>, tx: SenderChannel<T>) -> Self {
764        Self {
765            handle: Mutex::new(Some(handle)),
766            tx: Mutex::new(Some(tx)),
767        }
768    }
769
770    fn clear_resource(&self) {
771        // drop channel
772        if let Ok(mut tx_locked) = self.tx.lock() {
773            if let Some(tx) = tx_locked.take() {
774                let _ = tx.send(ActionOp::Exit(Instant::now()));
775                drop(tx);
776            };
777        }
778
779        // join the thread
780        if let Ok(mut handle) = self.handle.lock() {
781            if let Some(h) = handle.take() {
782                let _ = h.join();
783            }
784        }
785    }
786}
787
788impl<State, Action> Subscriber<State, Action> for ChanneledSubscriber<(Instant, State, Action)>
789where
790    State: Send + Sync + Clone + 'static,
791    Action: Send + Sync + Clone + 'static,
792{
793    fn on_notify(&self, state: &State, action: &Action) {
794        match self.tx.lock() {
795            Ok(tx) => {
796                tx.as_ref().map(|tx| {
797                    tx.send(ActionOp::Action((
798                        Instant::now(),
799                        state.clone(),
800                        action.clone(),
801                    )))
802                });
803            }
804            Err(_e) => {
805                #[cfg(feature = "store-log")]
806                eprintln!("store: Error while locking channel: {:?}", _e);
807            }
808        }
809    }
810
811    fn on_unsubscribe(&self) {
812        self.clear_resource();
813    }
814}
815
816impl<T> Subscription for ChanneledSubscriber<T>
817where
818    T: Send + Sync + Clone + 'static,
819{
820    fn unsubscribe(&self) {
821        self.clear_resource();
822    }
823}
824
825/// close tx channel when the store is dropped, but not the dispatcher
826/// if you want to stop the dispatcher, call the stop method
827impl<State, Action> Drop for StoreImpl<State, Action>
828where
829    State: Send + Sync + Clone + 'static,
830    Action: Send + Sync + Clone + 'static,
831{
832    fn drop(&mut self) {
833        self.close();
834
835        // Shutdown the thread pool with timeout
836        self.stop_with_timeout(DEFAULT_STOP_TIMEOUT);
837        // if let Ok(mut lk) = self.pool.lock() {
838        //     if let Some(pool) = lk.take() {
839        //         pool.shutdown_join_timeout(Duration::from_secs(3));
840        //     }
841        // }
842
843        #[cfg(feature = "store-log")]
844        eprintln!("store: '{}' Store dropped", self.name);
845    }
846}
847
848impl<State, Action> Store<State, Action> for StoreImpl<State, Action>
849where
850    State: Send + Sync + Clone + 'static,
851    Action: Send + Sync + Clone + 'static,
852{
853    fn get_state(&self) -> State {
854        self.get_state()
855    }
856
857    fn dispatch(&self, action: Action) -> Result<(), StoreError> {
858        self.dispatch(action)
859    }
860
861    fn add_subscriber(
862        &self,
863        subscriber: Arc<dyn Subscriber<State, Action> + Send + Sync>,
864    ) -> Box<dyn Subscription> {
865        self.add_subscriber(subscriber)
866    }
867
868    fn subscribed(
869        &self,
870        subscriber: Box<dyn Subscriber<State, Action> + Send + Sync>,
871    ) -> Result<Box<dyn Subscription>, StoreError> {
872        self.subscribed(subscriber)
873    }
874
875    fn subscribed_with(
876        &self,
877        capacity: usize,
878        policy: BackpressurePolicy<(Instant, State, Action)>,
879        subscriber: Box<dyn Subscriber<State, Action> + Send + Sync>,
880    ) -> Result<Box<dyn Subscription>, StoreError> {
881        self.subscribed_with(capacity, policy, subscriber)
882    }
883
884    fn stop(&self) {
885        self.stop();
886    }
887}
888
889#[cfg(test)]
890mod tests {
891    use super::*;
892    use std::thread;
893
894    struct TestChannelSubscriber {
895        received: Arc<Mutex<Vec<(i32, i32)>>>,
896    }
897
898    impl TestChannelSubscriber {
899        fn new(received: Arc<Mutex<Vec<(i32, i32)>>>) -> Self {
900            Self { received }
901        }
902    }
903
904    impl Subscriber<i32, i32> for TestChannelSubscriber {
905        fn on_notify(&self, state: &i32, action: &i32) {
906            println!("TestChannelSubscriber: state={}, action={}", state, action);
907            self.received.lock().unwrap().push((*state, *action));
908        }
909    }
910
911    struct TestReducer;
912
913    impl Reducer<i32, i32> for TestReducer {
914        fn reduce(&self, state: &i32, action: &i32) -> DispatchOp<i32, i32> {
915            DispatchOp::Dispatch(state + action, None)
916        }
917    }
918
919    struct SlowSubscriber {
920        received: Arc<Mutex<Vec<(i32, i32)>>>,
921        delay: Duration,
922    }
923
924    impl SlowSubscriber {
925        fn new(received: Arc<Mutex<Vec<(i32, i32)>>>, delay: Duration) -> Self {
926            Self { received, delay }
927        }
928    }
929
930    impl Subscriber<i32, i32> for SlowSubscriber {
931        fn on_notify(&self, state: &i32, action: &i32) {
932            println!("SlowSubscriber: state={}, action={}", state, action);
933            std::thread::sleep(self.delay);
934            self.received.lock().unwrap().push((*state, *action));
935        }
936    }
937
938    #[test]
939    fn test_store_subscribed_basic() {
940        // Setup store with a simple counter
941        let initial_state = 0;
942        let reducer = Box::new(TestReducer);
943        let store = StoreImpl::new_with_reducer(initial_state, reducer);
944
945        // Create subscriber to receive updates
946        let received_states = Arc::new(Mutex::new(Vec::new()));
947        let subscriber1 = Box::new(TestChannelSubscriber::new(received_states.clone()));
948        // Create channel
949        let subscription = store.subscribed_with(10, BackpressurePolicy::DropOldest, subscriber1);
950
951        // Dispatch some actions
952        store.dispatch(1).unwrap();
953        store.dispatch(2).unwrap();
954
955        // Give some time for processing
956        // thread::sleep(Duration::from_millis(100));
957        store.stop();
958
959        // unsubscribe from the channel
960        subscription.unwrap().unsubscribe();
961
962        // Verify received updates
963        let states = received_states.lock().unwrap();
964        assert_eq!(states.len(), 2);
965        assert_eq!(states[0], (1, 1)); // (state, action)
966        assert_eq!(states[1], (3, 2)); // state=1+2, action=2
967    }
968
969    #[test]
970    fn test_store_subscribed_backpressure() {
971        let store = StoreImpl::new_with_reducer(0, Box::new(TestReducer));
972
973        let received = Arc::new(Mutex::new(Vec::new()));
974        let received_clone = received.clone();
975        let subscriber = Box::new(SlowSubscriber::new(
976            received_clone,
977            Duration::from_millis(100),
978        ));
979        // Create channel with small capacity
980        let subscription = store.subscribed_with(1, BackpressurePolicy::DropOldest, subscriber);
981
982        // Fill the channel
983        for i in 0..5 {
984            store.dispatch(i).unwrap();
985        }
986
987        // Give some time for having channel thread to process
988        thread::sleep(Duration::from_millis(200));
989        store.stop();
990        subscription.unwrap().unsubscribe();
991
992        // Should only receive the latest updates due to backpressure
993        let received = received.lock().unwrap();
994        assert!(received.len() <= 2); // Some messages should be dropped
995
996        if let Some((state, action)) = received.last() {
997            assert_eq!(*action, 4); // Last action should be received
998            assert!(*state <= 10); // Final state should be sum of 0..5
999        }
1000    }
1001
1002    #[test]
1003    fn test_store_subscribed_subscription() {
1004        let store = StoreImpl::new_with_reducer(0, Box::new(TestReducer));
1005
1006        let received = Arc::new(Mutex::new(Vec::new()));
1007        let subscriber1 = Box::new(TestChannelSubscriber::new(received.clone()));
1008        let subscription = store.subscribed_with(10, BackpressurePolicy::DropOldest, subscriber1);
1009
1010        // Dispatch some actions
1011        store.dispatch(1).unwrap();
1012
1013        // give some time for processing
1014        thread::sleep(Duration::from_millis(100));
1015        // subscriber should receive the state
1016        assert_eq!(received.lock().unwrap().len(), 1);
1017
1018        // unsubscribe
1019        subscription.unwrap().unsubscribe();
1020
1021        // dispatch more actions
1022        store.dispatch(2).unwrap();
1023        store.dispatch(3).unwrap();
1024        // give some time for processing
1025        store.stop();
1026        // subscriber should not receive the state
1027        assert_eq!(received.lock().unwrap().len(), 1);
1028    }
1029
1030    // 새로운 subscriber가 추가될 때 최신 상태를 받는지 테스트
1031    #[test]
1032    fn test_new_subscriber_receives_latest_state() {
1033        let store = StoreImpl::new_with_reducer(0, Box::new(TestReducer));
1034
1035        // 첫 번째 subscriber 추가
1036        let received1 = Arc::new(Mutex::new(Vec::new()));
1037        let subscriber1 = Arc::new(TestChannelSubscriber::new(received1.clone()));
1038        store.add_subscriber(subscriber1);
1039
1040        // 액션을 dispatch하여 상태 변경
1041        store.dispatch(5).unwrap();
1042        store.dispatch(10).unwrap();
1043
1044        // 잠시 대기하여 액션이 처리되도록 함
1045        thread::sleep(Duration::from_millis(100));
1046
1047        // 두 번째 subscriber 추가 (현재 상태는 15)
1048        let received2 = Arc::new(Mutex::new(Vec::new()));
1049        let subscriber2 = Arc::new(TestChannelSubscriber::new(received2.clone()));
1050        store.add_subscriber(subscriber2);
1051
1052        // 잠시 대기하여 AddSubscriber 액션이 처리되도록 함
1053        thread::sleep(Duration::from_millis(100));
1054
1055        // 새로운 액션을 dispatch
1056        store.dispatch(20).unwrap();
1057
1058        // 잠시 대기하여 액션이 처리되도록 함
1059        thread::sleep(Duration::from_millis(100));
1060
1061        // 첫 번째 subscriber는 모든 상태 변경을 받아야 함
1062        let received1 = received1.lock().unwrap();
1063        assert_eq!(received1.len(), 3);
1064        assert_eq!(received1[0], (5, 5));
1065        assert_eq!(received1[1], (15, 10));
1066        assert_eq!(received1[2], (35, 20));
1067
1068        // 두 번째 subscriber는 추가된 후의 상태 변경만 받아야 함
1069        let received2 = received2.lock().unwrap();
1070        assert_eq!(received2.len(), 1);
1071        assert_eq!(received2[0], (35, 20));
1072    }
1073
1074    // 새로운 subscriber가 추가될 때 on_subscribe가 호출되는지 테스트
1075    #[test]
1076    fn test_new_subscriber_on_subscribe_called() {
1077        let store = StoreImpl::new_with_reducer(0, Box::new(TestReducer));
1078
1079        // 액션을 dispatch하여 상태 변경
1080        store.dispatch(5).unwrap();
1081
1082        // on_subscribe를 구현한 subscriber 추가
1083        let received_states = Arc::new(Mutex::new(Vec::new()));
1084        let subscribe_called = Arc::new(Mutex::new(false));
1085
1086        struct TestSubscribeSubscriber {
1087            received_states: Arc<Mutex<Vec<i32>>>,
1088            subscribe_called: Arc<Mutex<bool>>,
1089        }
1090
1091        impl Subscriber<i32, i32> for TestSubscribeSubscriber {
1092            fn on_subscribe(&self, state: &i32) {
1093                self.received_states.lock().unwrap().push(*state);
1094                *self.subscribe_called.lock().unwrap() = true;
1095            }
1096
1097            fn on_notify(&self, state: &i32, _action: &i32) {
1098                self.received_states.lock().unwrap().push(*state);
1099            }
1100        }
1101
1102        let subscriber = Arc::new(TestSubscribeSubscriber {
1103            received_states: received_states.clone(),
1104            subscribe_called: subscribe_called.clone(),
1105        });
1106
1107        store.add_subscriber(subscriber);
1108
1109        // 잠시 대기하여 AddSubscriber 액션이 처리되도록 함
1110        thread::sleep(Duration::from_millis(100));
1111
1112        // on_subscribe가 호출되었는지 확인
1113        assert!(*subscribe_called.lock().unwrap());
1114
1115        // 최신 상태(5)를 받았는지 확인
1116        let states = received_states.lock().unwrap();
1117        assert_eq!(states.len(), 1);
1118        assert_eq!(states[0], 5);
1119    }
1120
1121    // 여러 subscriber가 동시에 추가될 때 테스트
1122    #[test]
1123    fn test_multiple_subscribers_added_simultaneously() {
1124        let store = StoreImpl::new_with_reducer(0, Box::new(TestReducer));
1125
1126        // 액션을 dispatch하여 상태 변경
1127        store.dispatch(10).unwrap();
1128        store.dispatch(20).unwrap();
1129
1130        // 잠시 대기하여 액션이 처리되도록 함
1131        thread::sleep(Duration::from_millis(100));
1132
1133        // 여러 subscriber를 동시에 추가
1134        let subscribers = vec![
1135            Arc::new(TestChannelSubscriber::new(Arc::new(Mutex::new(Vec::new())))),
1136            Arc::new(TestChannelSubscriber::new(Arc::new(Mutex::new(Vec::new())))),
1137            Arc::new(TestChannelSubscriber::new(Arc::new(Mutex::new(Vec::new())))),
1138        ];
1139
1140        for subscriber in &subscribers {
1141            store.add_subscriber(subscriber.clone());
1142        }
1143
1144        // 잠시 대기하여 AddSubscriber 액션이 처리되도록 함
1145        thread::sleep(Duration::from_millis(100));
1146
1147        // 새로운 액션을 dispatch
1148        store.dispatch(30).unwrap();
1149
1150        // 잠시 대기하여 액션이 처리되도록 함
1151        thread::sleep(Duration::from_millis(100));
1152
1153        // 모든 subscriber가 새로운 액션을 받았는지 확인
1154        for subscriber in &subscribers {
1155            let received = subscriber.received.lock().unwrap();
1156            assert_eq!(received.len(), 1);
1157            assert_eq!(received[0], (60, 30)); // state: 30+30, action: 30
1158        }
1159    }
1160
1161    // subscriber 추가 후 즉시 unsubscribe하는 테스트
1162    #[test]
1163    fn test_subscriber_unsubscribe_after_add() {
1164        let store = StoreImpl::new_with_reducer(0, Box::new(TestReducer));
1165
1166        // 액션을 dispatch하여 상태 변경
1167        store.dispatch(5).unwrap();
1168
1169        // subscriber 추가
1170        let received = Arc::new(Mutex::new(Vec::new()));
1171        let subscriber = Arc::new(TestChannelSubscriber::new(received.clone()));
1172        let subscription = store.add_subscriber(subscriber);
1173
1174        // 잠시 대기하여 AddSubscriber 액션이 처리되도록 함
1175        thread::sleep(Duration::from_millis(100));
1176
1177        // 즉시 unsubscribe
1178        subscription.unsubscribe();
1179
1180        // 새로운 액션을 dispatch
1181        store.dispatch(10).unwrap();
1182
1183        // 잠시 대기하여 액션이 처리되도록 함
1184        thread::sleep(Duration::from_millis(100));
1185
1186        // subscriber가 새로운 액션을 받지 않았는지 확인
1187        let received = received.lock().unwrap();
1188        assert_eq!(received.len(), 0);
1189    }
1190
1191    // store가 중지된 후 subscriber를 추가하는 테스트
1192    #[test]
1193    fn test_add_subscriber_after_store_stop() {
1194        let store = StoreImpl::new_with_reducer(0, Box::new(TestReducer));
1195
1196        // store 중지
1197        store.stop();
1198
1199        // subscriber 추가 시도
1200        let received = Arc::new(Mutex::new(Vec::new()));
1201        let subscriber = Arc::new(TestChannelSubscriber::new(received.clone()));
1202        let _subscription = store.add_subscriber(subscriber);
1203
1204        // 잠시 대기
1205        thread::sleep(Duration::from_millis(100));
1206
1207        // subscriber가 추가되었지만 store가 중지되어 있으므로 액션을 받지 않음
1208        let received = received.lock().unwrap();
1209        assert_eq!(received.len(), 0);
1210    }
1211
1212    // on_subscribe에서 상태를 수정하는 subscriber 테스트
1213    #[test]
1214    fn test_subscriber_modifies_state_in_on_subscribe() {
1215        let store = StoreImpl::new_with_reducer(0, Box::new(TestReducer));
1216
1217        // 액션을 dispatch하여 상태 변경
1218        store.dispatch(5).unwrap();
1219
1220        struct ModifyingSubscriber {
1221            received_states: Arc<Mutex<Vec<i32>>>,
1222            subscribe_called: Arc<Mutex<bool>>,
1223        }
1224
1225        impl Subscriber<i32, i32> for ModifyingSubscriber {
1226            fn on_subscribe(&self, state: &i32) {
1227                // on_subscribe에서 상태를 수정해도 store의 상태는 변경되지 않음
1228                self.received_states.lock().unwrap().push(*state);
1229                *self.subscribe_called.lock().unwrap() = true;
1230            }
1231
1232            fn on_notify(&self, state: &i32, _action: &i32) {
1233                self.received_states.lock().unwrap().push(*state);
1234            }
1235        }
1236
1237        let subscriber = Arc::new(ModifyingSubscriber {
1238            received_states: Arc::new(Mutex::new(Vec::new())),
1239            subscribe_called: Arc::new(Mutex::new(false)),
1240        });
1241
1242        store.add_subscriber(subscriber.clone());
1243
1244        // 잠시 대기하여 AddSubscriber 액션이 처리되도록 함
1245        thread::sleep(Duration::from_millis(100));
1246
1247        // on_subscribe가 호출되었는지 확인
1248        assert!(*subscriber.subscribe_called.lock().unwrap());
1249
1250        // 최신 상태(5)를 받았는지 확인
1251        let states = subscriber.received_states.lock().unwrap();
1252        assert_eq!(states.len(), 1);
1253        assert_eq!(states[0], 5);
1254
1255        // store의 상태가 변경되지 않았는지 확인
1256        assert_eq!(store.get_state(), 5);
1257    }
1258
1259    // 여러 번의 AddSubscriber 액션이 연속으로 발생하는 테스트
1260    #[test]
1261    fn test_consecutive_add_subscriber_actions() {
1262        let store = StoreImpl::new_with_reducer(0, Box::new(TestReducer));
1263
1264        // 첫 번째 subscriber 추가
1265        let received1 = Arc::new(Mutex::new(Vec::new()));
1266        let subscriber1 = Arc::new(TestChannelSubscriber::new(received1.clone()));
1267        store.add_subscriber(subscriber1);
1268
1269        // 잠시 대기
1270        thread::sleep(Duration::from_millis(50));
1271
1272        // 두 번째 subscriber 추가
1273        let received2 = Arc::new(Mutex::new(Vec::new()));
1274        let subscriber2 = Arc::new(TestChannelSubscriber::new(received2.clone()));
1275        store.add_subscriber(subscriber2);
1276
1277        // 잠시 대기
1278        thread::sleep(Duration::from_millis(50));
1279
1280        // 세 번째 subscriber 추가
1281        let received3 = Arc::new(Mutex::new(Vec::new()));
1282        let subscriber3 = Arc::new(TestChannelSubscriber::new(received3.clone()));
1283        store.add_subscriber(subscriber3);
1284
1285        // 잠시 대기하여 모든 AddSubscriber 액션이 처리되도록 함
1286        thread::sleep(Duration::from_millis(100));
1287
1288        // 새로운 액션을 dispatch
1289        store.dispatch(10).unwrap();
1290
1291        // 잠시 대기하여 액션이 처리되도록 함
1292        thread::sleep(Duration::from_millis(100));
1293
1294        // 모든 subscriber가 새로운 액션을 받았는지 확인
1295        assert_eq!(received1.lock().unwrap().len(), 1);
1296        assert_eq!(received2.lock().unwrap().len(), 1);
1297        assert_eq!(received3.lock().unwrap().len(), 1);
1298    }
1299}