1use crate::channel::{BackpressureChannel, BackpressurePolicy, ReceiverChannel, SenderChannel};
2use crate::dispatcher::Dispatcher;
3use crate::metrics::{CountMetrics, Metrics, MetricsSnapshot};
4use crate::middleware::Middleware;
5use crate::{DispatchOp, Effect, MiddlewareOp, Reducer, Subscriber, Subscription};
6use fmt::Debug;
7use rusty_pool::ThreadPool;
8use std::sync::{Arc, Mutex};
9use std::thread::JoinHandle;
10use std::time::{Duration, Instant};
11use std::{fmt, thread};
12
13use crate::iterator::{StateIterator, StateIteratorSubscriber};
14use crate::store::{Store, StoreError, DEFAULT_CAPACITY, DEFAULT_STORE_NAME};
15
16const DEFAULT_STOP_TIMEOUT: Duration = Duration::from_secs(10);
17
18#[derive(Debug, Clone, PartialEq)]
19pub enum ActionOp<Action>
20where
21 Action: Send + Sync + Clone + 'static,
22{
23 Action(Action),
24 #[allow(dead_code)]
25 Exit(Instant),
26}
27
28#[allow(clippy::type_complexity)]
30pub struct StoreImpl<State, Action>
31where
32 State: Send + Sync + Clone + 'static,
33 Action: Send + Sync + Clone + 'static,
34{
35 #[allow(dead_code)]
36 pub(crate) name: String,
37 state: Mutex<State>,
38 pub(crate) reducers: Mutex<Vec<Box<dyn Reducer<State, Action> + Send + Sync>>>,
39 pub(crate) subscribers: Arc<Mutex<Vec<Arc<dyn Subscriber<State, Action> + Send + Sync>>>>,
40 pub(crate) dispatch_tx: Mutex<Option<SenderChannel<Action>>>,
41 middlewares: Mutex<Vec<Arc<dyn Middleware<State, Action> + Send + Sync>>>,
42 pub(crate) metrics: Arc<CountMetrics>,
43 pub(crate) pool: Mutex<Option<ThreadPool>>,
45}
46
47struct SubscriberSubscription {
50 unsubscribe: Box<dyn Fn() + Send + Sync>,
51}
52
53impl Subscription for SubscriberSubscription {
54 fn unsubscribe(&self) {
55 (self.unsubscribe)();
56 }
57}
58
59impl<State, Action> StoreImpl<State, Action>
60where
61 State: Send + Sync + Clone + 'static,
62 Action: Send + Sync + Clone + 'static,
63{
64 pub fn new(state: State) -> Arc<StoreImpl<State, Action>> {
66 Self::new_with(
67 state,
68 vec![],
69 DEFAULT_STORE_NAME.into(),
70 DEFAULT_CAPACITY,
71 BackpressurePolicy::default(),
72 vec![],
73 )
74 .unwrap()
75 }
76
77 pub fn new_with_reducer(
79 state: State,
80 reducer: Box<dyn Reducer<State, Action> + Send + Sync>,
81 ) -> Arc<StoreImpl<State, Action>> {
82 Self::new_with_name(state, reducer, DEFAULT_STORE_NAME.into()).unwrap()
83 }
84
85 pub fn new_with_name(
87 state: State,
88 reducer: Box<dyn Reducer<State, Action> + Send + Sync>,
89 name: String,
90 ) -> Result<Arc<StoreImpl<State, Action>>, StoreError> {
91 Self::new_with(
92 state,
93 vec![reducer],
94 name,
95 DEFAULT_CAPACITY,
96 BackpressurePolicy::default(),
97 vec![],
98 )
99 }
100
101 pub fn new_with(
103 state: State,
104 reducers: Vec<Box<dyn Reducer<State, Action> + Send + Sync>>,
105 name: String,
106 capacity: usize,
107 policy: BackpressurePolicy<Action>,
108 middlewares: Vec<Arc<dyn Middleware<State, Action> + Send + Sync>>,
109 ) -> Result<Arc<StoreImpl<State, Action>>, StoreError> {
110 let metrics = Arc::new(CountMetrics::default());
111 let (tx, rx) = BackpressureChannel::<Action>::pair_with(
112 "dispatch",
113 capacity,
114 policy.clone(),
115 Some(metrics.clone()),
116 );
117
118 let store = StoreImpl {
119 name: name.clone(),
120 state: Mutex::new(state),
121 reducers: Mutex::new(reducers),
122 subscribers: Arc::new(Mutex::new(Vec::default())),
123 middlewares: Mutex::new(middlewares),
124 dispatch_tx: Mutex::new(Some(tx)),
125 metrics,
126 pool: Mutex::new(Some(
127 rusty_pool::Builder::new().name(format!("{}-pool", name)).build(),
128 )),
129 };
130
131 let rx_store = Arc::new(store);
133 let tx_store = rx_store.clone();
134
135 tx_store.pool.lock().unwrap().as_ref().unwrap().execute(move || {
137 #[cfg(feature = "store-log")]
138 eprintln!("store: reducer thread started");
139
140 while let Some(action_op) = rx.recv() {
141 let action_received_at = Instant::now();
142 rx_store.metrics.action_received(Some(&action_op));
143
144 match action_op {
145 ActionOp::Action(action) => {
146 let the_dispatcher = Arc::new(rx_store.clone());
147
148 let current_state = rx_store.state.lock().unwrap().clone();
150 let (need_dispatch, new_state, effects) = rx_store.do_reduce(
151 &action,
152 current_state,
153 the_dispatcher.clone(),
154 action_received_at,
155 );
156 *rx_store.state.lock().unwrap() = new_state.clone();
157
158 if let Some(mut effects) = effects {
160 rx_store.do_effect(
161 &action,
162 &new_state,
163 &mut effects,
164 the_dispatcher.clone(),
165 );
166 }
167
168 if need_dispatch {
170 rx_store.do_notify(
171 &action,
172 &new_state,
173 the_dispatcher.clone(),
174 action_received_at,
175 );
176 }
177
178 rx_store
179 .metrics
180 .action_executed(Some(&action), action_received_at.elapsed());
181 }
182 ActionOp::Exit(_) => {
183 rx_store.on_close(action_received_at);
184 #[cfg(feature = "store-log")]
185 eprintln!("store: reducer loop exit");
186 break;
187 }
188 }
189 }
190
191 rx_store.clear_subscribers();
193
194 #[cfg(feature = "store-log")]
195 eprintln!("store: reducer thread done");
196 });
197
198 Ok(tx_store)
199 }
200
201 pub fn get_state(&self) -> State {
205 self.state.lock().unwrap().clone()
206 }
207
208 pub fn get_metrics(&self) -> MetricsSnapshot {
210 (&(*self.metrics)).into()
211 }
212
213 pub fn add_reducer(&self, reducer: Box<dyn Reducer<State, Action> + Send + Sync>) {
215 self.reducers.lock().unwrap().push(reducer);
216 }
217
218 pub fn add_subscriber(
220 &self,
221 subscriber: Arc<dyn Subscriber<State, Action> + Send + Sync>,
222 ) -> Box<dyn Subscription> {
223 self.subscribers.lock().unwrap().push(subscriber.clone());
225
226 let subscribers = self.subscribers.clone();
228 Box::new(SubscriberSubscription {
229 unsubscribe: Box::new(move || {
230 let mut subscribers = subscribers.lock().unwrap();
231 subscribers.retain(|s| {
232 let retain = !Arc::ptr_eq(s, &subscriber);
233 if !retain {
234 s.on_unsubscribe();
235 }
236 retain
237 });
238 }),
239 })
240 }
241
242 pub(crate) fn clear_subscribers(&self) {
244 #[cfg(feature = "store-log")]
245 eprintln!("store: clear_subscribers");
246 match self.subscribers.lock() {
247 Ok(mut subscribers) => {
248 for subscriber in subscribers.iter() {
249 subscriber.on_unsubscribe();
250 }
251 subscribers.clear();
252 }
253 Err(mut e) => {
254 #[cfg(feature = "store-log")]
255 eprintln!("store: Error while locking subscribers: {:?}", e);
256 for subscriber in e.get_ref().iter() {
257 subscriber.on_unsubscribe();
258 }
259 e.get_mut().clear();
260 }
261 }
262 }
263
264 pub(crate) fn do_reduce(
270 &self,
271 action: &Action,
272 mut state: State,
273 dispatcher: Arc<dyn Dispatcher<Action>>,
274 action_received_at: Instant,
275 ) -> (bool, State, Option<Vec<Effect<Action>>>) {
276 let mut reduce_action = true;
279 if !self.middlewares.lock().unwrap().is_empty() {
280 let middleware_start = Instant::now();
281 let mut middleware_executed = 0;
282 for middleware in self.middlewares.lock().unwrap().iter() {
283 middleware_executed += 1;
284 match middleware.before_reduce(action, &state, dispatcher.clone()) {
285 Ok(MiddlewareOp::ContinueAction) => {
286 }
288 Ok(MiddlewareOp::DoneAction) => {
289 reduce_action = false;
292 }
293 Ok(MiddlewareOp::BreakChain) => {
294 break;
296 }
297 Err(e) => {
298 middleware.on_error(e);
299 }
300 }
301 }
302 let middleware_duration = middleware_start.elapsed();
303 self.metrics.middleware_executed(
304 Some(action),
305 "before_reduce",
306 middleware_executed,
307 middleware_duration,
308 );
309 }
310
311 let mut effects = vec![];
312 let mut need_dispatch = true;
313 if reduce_action {
314 let reducer_start = Instant::now();
315
316 for reducer in self.reducers.lock().unwrap().iter() {
317 match reducer.reduce(&state, action) {
318 DispatchOp::Dispatch(new_state, effect) => {
319 state = new_state;
320 if let Some(effect) = effect {
321 effects.push(effect);
322 }
323 need_dispatch = true;
324 }
325 DispatchOp::Keep(new_state, effect) => {
326 state = new_state;
328 if let Some(effect) = effect {
329 effects.push(effect);
330 }
331 need_dispatch = false;
332 }
333 }
334 }
335
336 let reducer_duration = reducer_start.elapsed();
338 self.metrics.action_reduced(
339 Some(action),
340 reducer_duration,
341 action_received_at.elapsed(),
342 );
343 }
344
345 (need_dispatch, state, Some(effects))
346 }
347
348 pub(crate) fn do_effect(
349 &self,
350 action: &Action,
351 state: &State,
352 effects: &mut Vec<Effect<Action>>,
353 dispatcher: Arc<dyn Dispatcher<Action>>,
354 ) {
355 let effect_start = Instant::now();
356 self.metrics.effect_issued(effects.len());
357
358 if !self.middlewares.lock().unwrap().is_empty() {
359 let middleware_start = Instant::now();
360 let mut middleware_executed = 0;
361 for middleware in self.middlewares.lock().unwrap().iter() {
362 middleware_executed += 1;
363 match middleware.before_effect(action, state, effects, dispatcher.clone()) {
364 Ok(MiddlewareOp::ContinueAction) => {
365 }
367 Ok(MiddlewareOp::DoneAction) => {
368 }
370 Ok(MiddlewareOp::BreakChain) => {
371 break;
373 }
374 Err(e) => {
375 middleware.on_error(e);
376 }
377 }
378 }
379
380 let middleware_duration = middleware_start.elapsed();
381 self.metrics.middleware_executed(
382 Some(action),
383 "before_effect",
384 middleware_executed,
385 middleware_duration,
386 );
387 }
388
389 let effects_total = effects.len();
390 while !effects.is_empty() {
391 let effect = effects.remove(0);
392 match effect {
393 Effect::Action(a) => {
394 dispatcher.dispatch_thunk(Box::new(move |dispatcher| {
395 dispatcher.dispatch(a).expect("no dispatch failed");
396 }));
397 }
398 Effect::Task(task) => {
399 dispatcher.dispatch_task(task);
400 }
401 Effect::Thunk(thunk) => {
402 dispatcher.dispatch_thunk(thunk);
403 }
404 Effect::Function(_tok, func) => {
405 dispatcher.dispatch_task(Box::new(move || {
406 let _ = func();
408 }));
409 }
410 };
411 }
412
413 let duration = effect_start.elapsed();
414 self.metrics.effect_executed(effects_total, duration);
415 }
416
417 pub(crate) fn do_notify(
418 &self,
419 action: &Action,
420 next_state: &State,
421 dispatcher: Arc<dyn Dispatcher<Action>>,
422 _action_received_at: Instant,
423 ) {
424 let _notify_start = Instant::now();
425 self.metrics.state_notified(Some(next_state));
426
427 let mut need_notify = true;
428 if !self.middlewares.lock().unwrap().is_empty() {
429 let middleware_start = Instant::now();
430 let mut middleware_executed = 0;
431 for middleware in self.middlewares.lock().unwrap().iter() {
432 middleware_executed += 1;
433 match middleware.before_dispatch(action, next_state, dispatcher.clone()) {
434 Ok(MiddlewareOp::ContinueAction) => {
435 }
437 Ok(MiddlewareOp::DoneAction) => {
438 need_notify = false;
440 }
441 Ok(MiddlewareOp::BreakChain) => {
442 break;
444 }
445 Err(e) => {
446 middleware.on_error(e);
447 }
448 }
449 }
450 let middleware_duration = middleware_start.elapsed();
451 self.metrics.middleware_executed(
452 Some(action),
453 "before_dispatch",
454 middleware_executed,
455 middleware_duration,
456 );
457 }
458
459 if need_notify {
460 let subscribers = self.subscribers.lock().unwrap().clone();
461 for subscriber in subscribers.iter() {
462 subscriber.on_notify(next_state, action);
463 }
464 let duration = _notify_start.elapsed();
465 self.metrics.subscriber_notified(Some(action), subscribers.len(), duration);
466 }
467 }
468
469 fn on_close(&self, action_received_at: Instant) {
470 #[cfg(feature = "store-log")]
471 eprintln!("store: on_close");
472
473 self.metrics.action_executed(None, action_received_at.elapsed());
474 }
475
476 pub fn close(&self) {
480 match self.dispatch_tx.lock() {
481 Ok(mut tx) => {
482 if let Some(tx) = tx.take() {
483 #[cfg(feature = "store-log")]
484 eprintln!("store: closing dispatch channel");
485 match tx.send(ActionOp::Exit(Instant::now())) {
486 Ok(_) => {
487 #[cfg(feature = "store-log")]
488 eprintln!("store: dispatch channel sent exit");
489 }
490 Err(_e) => {
491 #[cfg(feature = "store-log")]
492 eprintln!("store: Error while closing dispatch channel");
493 }
494 }
495 drop(tx);
496 }
497 }
498 Err(_e) => {
499 #[cfg(feature = "store-log")]
500 eprintln!("store: Error while locking dispatch channel: {:?}", _e);
501 return;
502 }
503 }
504
505 #[cfg(feature = "store-log")]
506 eprintln!("store: dispatch channel closed");
507 }
508
509 pub fn stop(&self) {
511 self.close();
512
513 match self.pool.lock() {
516 Ok(mut pool) => {
517 if let Some(pool) = pool.take() {
518 pool.shutdown_join();
519 }
520 #[cfg(feature = "store-log")]
521 eprintln!("store: shutdown pool");
522 }
523 Err(_e) => {
524 #[cfg(feature = "store-log")]
525 eprintln!("store: Error while locking pool: {:?}", _e);
526 return;
527 }
528 }
529
530 #[cfg(feature = "store-log")]
531 eprintln!("store: Store stopped");
532 }
533
534 pub fn stop_with_timeout(&self, timeout: Duration) {
536 self.close();
537
538 match self.pool.lock() {
541 Ok(mut pool) => {
542 if let Some(pool) = pool.take() {
543 pool.shutdown_join_timeout(timeout);
544 }
545 #[cfg(feature = "store-log")]
546 eprintln!("store: shutdown pool");
547 }
548 Err(_e) => {
549 #[cfg(feature = "store-log")]
550 eprintln!("store: Error while locking pool: {:?}", _e);
551 return;
552 }
553 }
554
555 #[cfg(feature = "store-log")]
556 eprintln!("store: Store stopped");
557 }
558
559 pub fn dispatch(&self, action: Action) -> Result<(), StoreError> {
565 let sender = self.dispatch_tx.lock().unwrap();
566 if let Some(tx) = sender.as_ref() {
567 let remains = tx.send(ActionOp::Action(action)).unwrap_or(0);
569 self.metrics.queue_size(remains as usize);
570 Ok(())
571 } else {
572 let err = StoreError::DispatchError("Dispatch channel is closed".to_string());
573 self.metrics.error_occurred(&err);
574 Err(err)
575 }
576 }
577
578 pub fn add_middleware(&self, middleware: Arc<dyn Middleware<State, Action> + Send + Sync>) {
580 self.middlewares.lock().unwrap().push(middleware.clone());
581 }
582
583 pub fn iter(&self) -> impl Iterator<Item = (State, Action)> {
588 self.iter_with(1, BackpressurePolicy::BlockOnFull)
589 }
590
591 pub(crate) fn iter_with(
597 &self,
598 capacity: usize,
599 policy: BackpressurePolicy<(State, Action)>,
600 ) -> impl Iterator<Item = (State, Action)> {
601 let (iter_tx, iter_rx) = BackpressureChannel::<(State, Action)>::pair_with(
602 "store_iter",
603 capacity,
604 policy,
605 Some(self.metrics.clone()),
606 );
607
608 let subscription = self.add_subscriber(Arc::new(StateIteratorSubscriber::new(iter_tx)));
609 StateIterator::new(iter_rx, subscription)
610 }
611
612 pub fn subscribed(
621 &self,
622 subscriber: Box<dyn Subscriber<State, Action> + Send + Sync>,
623 ) -> Result<Box<dyn Subscription>, StoreError> {
624 self.subscribed_with(
625 DEFAULT_CAPACITY,
626 BackpressurePolicy::BlockOnFull,
627 subscriber,
628 )
629 }
630
631 pub fn subscribed_with(
641 &self,
642 capacity: usize,
643 policy: BackpressurePolicy<(Instant, State, Action)>,
644 subscriber: Box<dyn Subscriber<State, Action> + Send + Sync>,
645 ) -> Result<Box<dyn Subscription>, StoreError> {
646 let (tx, rx) = BackpressureChannel::<(Instant, State, Action)>::pair_with(
648 format!("{}-channel", self.name).as_str(),
649 capacity,
650 policy,
651 Some(self.metrics.clone()),
652 );
653
654 let thread_name = format!("{}-channeled-subscriber", self.name);
656 let metrics_clone = self.metrics.clone();
657 let builder = thread::Builder::new().name(thread_name.clone());
658 let handle = match builder.spawn(move || {
659 Self::subscribed_loop(thread_name, rx, subscriber, metrics_clone);
661 }) {
662 Ok(h) => h,
663 Err(e) => {
664 #[cfg(feature = "store-log")]
665 eprintln!("store: Error while spawning channel thread: {:?}", e);
666 return Err(StoreError::SubscriptionError(format!(
667 "Error while spawning channel thread: {:?}",
668 e
669 )));
670 }
671 };
672
673 let channel_subscriber = Arc::new(ChanneledSubscriber::new(handle, tx));
675 let subscription = self.add_subscriber(channel_subscriber.clone());
676
677 Ok(subscription)
678 }
679
680 fn subscribed_loop(
681 _name: String,
682 rx: ReceiverChannel<(Instant, State, Action)>,
683 subscriber: Box<dyn Subscriber<State, Action>>,
684 metrics: Arc<dyn Metrics>,
685 ) {
686 #[cfg(feature = "store-log")]
687 eprintln!("store: {} channel thread started", _name);
688
689 while let Some(msg) = rx.recv() {
690 match msg {
691 ActionOp::Action((created_at, state, action)) => {
692 let started_at = Instant::now();
693 {
694 subscriber.on_notify(&state, &action);
695 }
696 metrics.subscriber_notified(Some(&action), 1, started_at.elapsed());
697
698 metrics.action_executed(Some(&action), created_at.elapsed());
700 }
701 ActionOp::Exit(created_at) => {
702 metrics.action_executed(None, created_at.elapsed());
703 #[cfg(feature = "store-log")]
704 eprintln!("store: {} channel thread loop exit", _name);
705 break;
706 }
707 }
708 }
709
710 #[cfg(feature = "store-log")]
711 eprintln!("store: {} channel thread done", _name);
712 }
713}
714
715struct ChanneledSubscriber<T>
717where
718 T: Send + Sync + Clone + 'static,
719{
720 handle: Mutex<Option<JoinHandle<()>>>,
721 tx: Mutex<Option<SenderChannel<T>>>,
722}
723
724impl<T> ChanneledSubscriber<T>
725where
726 T: Send + Sync + Clone + 'static,
727{
728 pub(crate) fn new(handle: JoinHandle<()>, tx: SenderChannel<T>) -> Self {
729 Self {
730 handle: Mutex::new(Some(handle)),
731 tx: Mutex::new(Some(tx)),
732 }
733 }
734
735 fn clear_resource(&self) {
736 if let Ok(mut tx_locked) = self.tx.lock() {
738 if let Some(tx) = tx_locked.take() {
739 let _ = tx.send(ActionOp::Exit(Instant::now()));
740 drop(tx);
741 };
742 }
743
744 if let Ok(mut handle) = self.handle.lock() {
746 if let Some(h) = handle.take() {
747 let _ = h.join();
748 }
749 }
750 }
751}
752
753impl<State, Action> Subscriber<State, Action> for ChanneledSubscriber<(Instant, State, Action)>
754where
755 State: Send + Sync + Clone + 'static,
756 Action: Send + Sync + Clone + 'static,
757{
758 fn on_notify(&self, state: &State, action: &Action) {
759 match self.tx.lock() {
760 Ok(tx) => {
761 tx.as_ref().map(|tx| {
762 tx.send(ActionOp::Action((
763 Instant::now(),
764 state.clone(),
765 action.clone(),
766 )))
767 });
768 }
769 Err(_e) => {
770 #[cfg(feature = "store-log")]
771 eprintln!("store: Error while locking channel: {:?}", _e);
772 }
773 }
774 }
775
776 fn on_unsubscribe(&self) {
777 self.clear_resource();
778 }
779}
780
781impl<T> Subscription for ChanneledSubscriber<T>
782where
783 T: Send + Sync + Clone + 'static,
784{
785 fn unsubscribe(&self) {
786 self.clear_resource();
787 }
788}
789
790impl<State, Action> Drop for StoreImpl<State, Action>
793where
794 State: Send + Sync + Clone + 'static,
795 Action: Send + Sync + Clone + 'static,
796{
797 fn drop(&mut self) {
798 self.close();
799
800 self.stop_with_timeout(DEFAULT_STOP_TIMEOUT);
802 #[cfg(feature = "store-log")]
809 eprintln!("store: '{}' Store dropped", self.name);
810 }
811}
812
813impl<State, Action> Store<State, Action> for StoreImpl<State, Action>
814where
815 State: Send + Sync + Clone + 'static,
816 Action: Send + Sync + Clone + 'static,
817{
818 fn get_state(&self) -> State {
819 self.get_state()
820 }
821
822 fn dispatch(&self, action: Action) -> Result<(), StoreError> {
823 self.dispatch(action)
824 }
825
826 fn add_subscriber(
827 &self,
828 subscriber: Arc<dyn Subscriber<State, Action> + Send + Sync>,
829 ) -> Box<dyn Subscription> {
830 self.add_subscriber(subscriber)
831 }
832
833 fn subscribed(
834 &self,
835 subscriber: Box<dyn Subscriber<State, Action> + Send + Sync>,
836 ) -> Result<Box<dyn Subscription>, StoreError> {
837 self.subscribed(subscriber)
838 }
839
840 fn subscribed_with(
841 &self,
842 capacity: usize,
843 policy: BackpressurePolicy<(Instant, State, Action)>,
844 subscriber: Box<dyn Subscriber<State, Action> + Send + Sync>,
845 ) -> Result<Box<dyn Subscription>, StoreError> {
846 self.subscribed_with(capacity, policy, subscriber)
847 }
848
849 fn stop(&self) {
850 self.stop();
851 }
852}
853
854#[cfg(test)]
855mod tests {
856 use super::*;
857 use std::thread;
858
859 struct TestChannelSubscriber {
860 received: Arc<Mutex<Vec<(i32, i32)>>>,
861 }
862
863 impl TestChannelSubscriber {
864 fn new(received: Arc<Mutex<Vec<(i32, i32)>>>) -> Self {
865 Self { received }
866 }
867 }
868
869 impl Subscriber<i32, i32> for TestChannelSubscriber {
870 fn on_notify(&self, state: &i32, action: &i32) {
871 println!("TestChannelSubscriber: state={}, action={}", state, action);
872 self.received.lock().unwrap().push((*state, *action));
873 }
874 }
875
876 struct TestReducer;
877
878 impl Reducer<i32, i32> for TestReducer {
879 fn reduce(&self, state: &i32, action: &i32) -> DispatchOp<i32, i32> {
880 DispatchOp::Dispatch(state + action, None)
881 }
882 }
883
884 struct SlowSubscriber {
885 received: Arc<Mutex<Vec<(i32, i32)>>>,
886 delay: Duration,
887 }
888
889 impl SlowSubscriber {
890 fn new(received: Arc<Mutex<Vec<(i32, i32)>>>, delay: Duration) -> Self {
891 Self { received, delay }
892 }
893 }
894
895 impl Subscriber<i32, i32> for SlowSubscriber {
896 fn on_notify(&self, state: &i32, action: &i32) {
897 println!("SlowSubscriber: state={}, action={}", state, action);
898 std::thread::sleep(self.delay);
899 self.received.lock().unwrap().push((*state, *action));
900 }
901 }
902
903 #[test]
904 fn test_store_subscribed_basic() {
905 let initial_state = 0;
907 let reducer = Box::new(TestReducer);
908 let store = StoreImpl::new_with_reducer(initial_state, reducer);
909
910 let received_states = Arc::new(Mutex::new(Vec::new()));
912 let subscriber1 = Box::new(TestChannelSubscriber::new(received_states.clone()));
913 let subscription = store.subscribed_with(10, BackpressurePolicy::DropOldest, subscriber1);
915
916 store.dispatch(1).unwrap();
918 store.dispatch(2).unwrap();
919
920 store.stop();
923
924 subscription.unwrap().unsubscribe();
926
927 let states = received_states.lock().unwrap();
929 assert_eq!(states.len(), 2);
930 assert_eq!(states[0], (1, 1)); assert_eq!(states[1], (3, 2)); }
933
934 #[test]
935 fn test_store_subscribed_backpressure() {
936 let store = StoreImpl::new_with_reducer(0, Box::new(TestReducer));
937
938 let received = Arc::new(Mutex::new(Vec::new()));
939 let received_clone = received.clone();
940 let subscriber = Box::new(SlowSubscriber::new(
941 received_clone,
942 Duration::from_millis(100),
943 ));
944 let subscription = store.subscribed_with(1, BackpressurePolicy::DropOldest, subscriber);
946
947 for i in 0..5 {
949 store.dispatch(i).unwrap();
950 }
951
952 thread::sleep(Duration::from_millis(200));
954 store.stop();
955 subscription.unwrap().unsubscribe();
956
957 let received = received.lock().unwrap();
959 assert!(received.len() <= 2); if let Some((state, action)) = received.last() {
962 assert_eq!(*action, 4); assert!(*state <= 10); }
965 }
966
967 #[test]
968 fn test_store_subscribed_subscription() {
969 let store = StoreImpl::new_with_reducer(0, Box::new(TestReducer));
970
971 let received = Arc::new(Mutex::new(Vec::new()));
972 let subscriber1 = Box::new(TestChannelSubscriber::new(received.clone()));
973 let subscription = store.subscribed_with(10, BackpressurePolicy::DropOldest, subscriber1);
974
975 store.dispatch(1).unwrap();
977
978 thread::sleep(Duration::from_millis(100));
980 assert_eq!(received.lock().unwrap().len(), 1);
982
983 subscription.unwrap().unsubscribe();
985
986 store.dispatch(2).unwrap();
988 store.dispatch(3).unwrap();
989 store.stop();
991 assert_eq!(received.lock().unwrap().len(), 1);
993 }
994}