1use crate::channel::{BackpressureChannel, BackpressurePolicy, ReceiverChannel, SenderChannel};
2use crate::dispatcher::Dispatcher;
3use crate::metrics::{CountMetrics, Metrics, MetricsSnapshot};
4use crate::middleware::Middleware;
5use crate::{DispatchOp, Effect, MiddlewareOp, Reducer, Subscriber, Subscription};
6use fmt::Debug;
7use rusty_pool::ThreadPool;
8use std::sync::{Arc, Mutex};
9use std::thread::JoinHandle;
10use std::time::{Duration, Instant};
11use std::{fmt, thread};
12
13use crate::iterator::{StateIterator, StateIteratorSubscriber};
14use crate::store::{Store, StoreError, DEFAULT_CAPACITY, DEFAULT_STORE_NAME};
15
16const DEFAULT_STOP_TIMEOUT: Duration = Duration::from_secs(10);
17
18#[derive(Debug, Clone, PartialEq)]
19pub enum ActionOp<Action>
20where
21 Action: Send + Sync + Clone + 'static,
22{
23 Action(Action),
24 AddSubscriber,
25 #[allow(dead_code)]
26 Exit(Instant),
27}
28
29#[allow(clippy::type_complexity)]
31pub struct StoreImpl<State, Action>
32where
33 State: Send + Sync + Clone + 'static,
34 Action: Send + Sync + Clone + 'static,
35{
36 #[allow(dead_code)]
37 pub(crate) name: String,
38 state: Mutex<State>,
39 pub(crate) reducers: Mutex<Vec<Box<dyn Reducer<State, Action> + Send + Sync>>>,
40 pub(crate) subscribers: Arc<Mutex<Vec<Arc<dyn Subscriber<State, Action> + Send + Sync>>>>,
41 adding_subscribers: Arc<Mutex<Vec<Arc<dyn Subscriber<State, Action> + Send + Sync>>>>,
43 pub(crate) dispatch_tx: Mutex<Option<SenderChannel<Action>>>,
44 middlewares: Mutex<Vec<Arc<dyn Middleware<State, Action> + Send + Sync>>>,
45 pub(crate) metrics: Arc<CountMetrics>,
46 pub(crate) pool: Mutex<Option<ThreadPool>>,
48}
49
50struct SubscriberSubscription {
53 unsubscribe: Box<dyn Fn() + Send + Sync>,
54}
55
56impl Subscription for SubscriberSubscription {
57 fn unsubscribe(&self) {
58 (self.unsubscribe)();
59 }
60}
61
62impl<State, Action> StoreImpl<State, Action>
63where
64 State: Send + Sync + Clone + 'static,
65 Action: Send + Sync + Clone + 'static,
66{
67 pub fn new(state: State) -> Arc<StoreImpl<State, Action>> {
69 Self::new_with(
70 state,
71 vec![],
72 DEFAULT_STORE_NAME.into(),
73 DEFAULT_CAPACITY,
74 BackpressurePolicy::default(),
75 vec![],
76 )
77 .unwrap()
78 }
79
80 pub fn new_with_reducer(
82 state: State,
83 reducer: Box<dyn Reducer<State, Action> + Send + Sync>,
84 ) -> Arc<StoreImpl<State, Action>> {
85 Self::new_with_name(state, reducer, DEFAULT_STORE_NAME.into()).unwrap()
86 }
87
88 pub fn new_with_name(
90 state: State,
91 reducer: Box<dyn Reducer<State, Action> + Send + Sync>,
92 name: String,
93 ) -> Result<Arc<StoreImpl<State, Action>>, StoreError> {
94 Self::new_with(
95 state,
96 vec![reducer],
97 name,
98 DEFAULT_CAPACITY,
99 BackpressurePolicy::default(),
100 vec![],
101 )
102 }
103
104 pub fn new_with(
106 state: State,
107 reducers: Vec<Box<dyn Reducer<State, Action> + Send + Sync>>,
108 name: String,
109 capacity: usize,
110 policy: BackpressurePolicy<Action>,
111 middlewares: Vec<Arc<dyn Middleware<State, Action> + Send + Sync>>,
112 ) -> Result<Arc<StoreImpl<State, Action>>, StoreError> {
113 let metrics = Arc::new(CountMetrics::default());
114 let (tx, rx) = BackpressureChannel::<Action>::pair_with(
115 "dispatch",
116 capacity,
117 policy.clone(),
118 Some(metrics.clone()),
119 );
120
121 let store = StoreImpl {
122 name: name.clone(),
123 state: Mutex::new(state),
124 reducers: Mutex::new(reducers),
125 subscribers: Arc::new(Mutex::new(Vec::default())),
126 adding_subscribers: Arc::new(Mutex::new(Vec::default())),
127 middlewares: Mutex::new(middlewares),
128 dispatch_tx: Mutex::new(Some(tx)),
129 metrics,
130 pool: Mutex::new(Some(
131 rusty_pool::Builder::new().name(format!("{}-pool", name)).build(),
132 )),
133 };
134
135 let rx_store = Arc::new(store);
137 let tx_store = rx_store.clone();
138
139 tx_store.pool.lock().unwrap().as_ref().unwrap().execute(move || {
141 #[cfg(feature = "store-log")]
142 eprintln!("store: reducer thread started");
143
144 while let Some(action_op) = rx.recv() {
145 let action_received_at = Instant::now();
146 rx_store.metrics.action_received(Some(&action_op));
147
148 match action_op {
149 ActionOp::Action(action) => {
150 let the_dispatcher = Arc::new(rx_store.clone());
151
152 let current_state = rx_store.state.lock().unwrap().clone();
154 let (need_dispatch, new_state, effects) = rx_store.do_reduce(
155 &action,
156 current_state,
157 the_dispatcher.clone(),
158 action_received_at,
159 );
160 *rx_store.state.lock().unwrap() = new_state.clone();
161
162 if let Some(mut effects) = effects {
164 rx_store.do_effect(
165 &action,
166 &new_state,
167 &mut effects,
168 the_dispatcher.clone(),
169 );
170 }
171
172 if need_dispatch {
174 rx_store.do_notify(
175 &action,
176 &new_state,
177 the_dispatcher.clone(),
178 action_received_at,
179 );
180 }
181
182 rx_store
183 .metrics
184 .action_executed(Some(&action), action_received_at.elapsed());
185 }
186 ActionOp::AddSubscriber => {
187 let current_state = rx_store.state.lock().unwrap().clone();
188 let mut adding_subscribers = rx_store.adding_subscribers.lock().unwrap();
189 let mut subscribers = rx_store.subscribers.lock().unwrap();
190
191 for subscriber in adding_subscribers.drain(..) {
193 subscriber.on_subscribe(¤t_state);
195 subscribers.push(subscriber);
196 }
197
198 #[cfg(feature = "store-log")]
199 eprintln!("store: new subscribers added");
200 }
201 ActionOp::Exit(_) => {
202 rx_store.on_close(action_received_at);
203 #[cfg(feature = "store-log")]
204 eprintln!("store: reducer loop exit");
205 break;
206 }
207 }
208 }
209
210 rx_store.clear_subscribers();
212
213 #[cfg(feature = "store-log")]
214 eprintln!("store: reducer thread done");
215 });
216
217 Ok(tx_store)
218 }
219
220 pub fn get_state(&self) -> State {
224 self.state.lock().unwrap().clone()
225 }
226
227 pub fn get_metrics(&self) -> MetricsSnapshot {
229 (&(*self.metrics)).into()
230 }
231
232 pub fn add_reducer(&self, reducer: Box<dyn Reducer<State, Action> + Send + Sync>) {
234 self.reducers.lock().unwrap().push(reducer);
235 }
236
237 pub fn add_subscriber(
239 &self,
240 subscriber: Arc<dyn Subscriber<State, Action> + Send + Sync>,
241 ) -> Box<dyn Subscription> {
242 self.adding_subscribers.lock().unwrap().push(subscriber.clone());
244
245 if let Some(tx) = self.dispatch_tx.lock().unwrap().as_ref() {
247 let _ = tx.send(ActionOp::AddSubscriber);
248 }
249
250 let subscribers = self.subscribers.clone();
252 let adding_subscribers = self.adding_subscribers.clone();
253 Box::new(SubscriberSubscription {
254 unsubscribe: Box::new(move || {
255 let mut subscribers = subscribers.lock().unwrap();
256 subscribers.retain(|s| {
257 let retain = !Arc::ptr_eq(s, &subscriber);
258 if !retain {
259 s.on_unsubscribe();
260 }
261 retain
262 });
263
264 let mut adding = adding_subscribers.lock().unwrap();
266 adding.retain(|s| !Arc::ptr_eq(s, &subscriber));
267 }),
268 })
269 }
270
271 pub(crate) fn clear_subscribers(&self) {
273 #[cfg(feature = "store-log")]
274 eprintln!("store: clear_subscribers");
275 match self.subscribers.lock() {
276 Ok(mut subscribers) => {
277 for subscriber in subscribers.iter() {
278 subscriber.on_unsubscribe();
279 }
280 subscribers.clear();
281 }
282 Err(mut e) => {
283 #[cfg(feature = "store-log")]
284 eprintln!("store: Error while locking subscribers: {:?}", e);
285 for subscriber in e.get_ref().iter() {
286 subscriber.on_unsubscribe();
287 }
288 e.get_mut().clear();
289 }
290 }
291 }
292
293 pub(crate) fn do_reduce(
299 &self,
300 action: &Action,
301 mut state: State,
302 dispatcher: Arc<dyn Dispatcher<Action>>,
303 action_received_at: Instant,
304 ) -> (bool, State, Option<Vec<Effect<Action>>>) {
305 let mut reduce_action = true;
308 if !self.middlewares.lock().unwrap().is_empty() {
309 let middleware_start = Instant::now();
310 let mut middleware_executed = 0;
311 for middleware in self.middlewares.lock().unwrap().iter() {
312 middleware_executed += 1;
313 match middleware.before_reduce(action, &state, dispatcher.clone()) {
314 Ok(MiddlewareOp::ContinueAction) => {
315 }
317 Ok(MiddlewareOp::DoneAction) => {
318 reduce_action = false;
321 }
322 Ok(MiddlewareOp::BreakChain) => {
323 break;
325 }
326 Err(e) => {
327 middleware.on_error(e);
328 }
329 }
330 }
331 let middleware_duration = middleware_start.elapsed();
332 self.metrics.middleware_executed(
333 Some(action),
334 "before_reduce",
335 middleware_executed,
336 middleware_duration,
337 );
338 }
339
340 let mut effects = vec![];
341 let mut need_dispatch = true;
342 if reduce_action {
343 let reducer_start = Instant::now();
344
345 for reducer in self.reducers.lock().unwrap().iter() {
346 match reducer.reduce(&state, action) {
347 DispatchOp::Dispatch(new_state, effect) => {
348 state = new_state;
349 if let Some(effect) = effect {
350 effects.push(effect);
351 }
352 need_dispatch = true;
353 }
354 DispatchOp::Keep(new_state, effect) => {
355 state = new_state;
357 if let Some(effect) = effect {
358 effects.push(effect);
359 }
360 need_dispatch = false;
361 }
362 }
363 }
364
365 let reducer_duration = reducer_start.elapsed();
367 self.metrics.action_reduced(
368 Some(action),
369 reducer_duration,
370 action_received_at.elapsed(),
371 );
372 }
373
374 (need_dispatch, state, Some(effects))
375 }
376
377 pub(crate) fn do_effect(
378 &self,
379 action: &Action,
380 state: &State,
381 effects: &mut Vec<Effect<Action>>,
382 dispatcher: Arc<dyn Dispatcher<Action>>,
383 ) {
384 let effect_start = Instant::now();
385 self.metrics.effect_issued(effects.len());
386
387 if !self.middlewares.lock().unwrap().is_empty() {
388 let middleware_start = Instant::now();
389 let mut middleware_executed = 0;
390 for middleware in self.middlewares.lock().unwrap().iter() {
391 middleware_executed += 1;
392 match middleware.before_effect(action, state, effects, dispatcher.clone()) {
393 Ok(MiddlewareOp::ContinueAction) => {
394 }
396 Ok(MiddlewareOp::DoneAction) => {
397 }
399 Ok(MiddlewareOp::BreakChain) => {
400 break;
402 }
403 Err(e) => {
404 middleware.on_error(e);
405 }
406 }
407 }
408
409 let middleware_duration = middleware_start.elapsed();
410 self.metrics.middleware_executed(
411 Some(action),
412 "before_effect",
413 middleware_executed,
414 middleware_duration,
415 );
416 }
417
418 let effects_total = effects.len();
419 while !effects.is_empty() {
420 let effect = effects.remove(0);
421 match effect {
422 Effect::Action(a) => {
423 dispatcher.dispatch_thunk(Box::new(move |dispatcher| {
424 dispatcher.dispatch(a).expect("no dispatch failed");
425 }));
426 }
427 Effect::Task(task) => {
428 dispatcher.dispatch_task(task);
429 }
430 Effect::Thunk(thunk) => {
431 dispatcher.dispatch_thunk(thunk);
432 }
433 Effect::Function(_tok, func) => {
434 dispatcher.dispatch_task(Box::new(move || {
435 let _ = func();
437 }));
438 }
439 };
440 }
441
442 let duration = effect_start.elapsed();
443 self.metrics.effect_executed(effects_total, duration);
444 }
445
446 pub(crate) fn do_notify(
447 &self,
448 action: &Action,
449 next_state: &State,
450 dispatcher: Arc<dyn Dispatcher<Action>>,
451 _action_received_at: Instant,
452 ) {
453 let _notify_start = Instant::now();
454 self.metrics.state_notified(Some(next_state));
455
456 let mut need_notify = true;
457 if !self.middlewares.lock().unwrap().is_empty() {
458 let middleware_start = Instant::now();
459 let mut middleware_executed = 0;
460 for middleware in self.middlewares.lock().unwrap().iter() {
461 middleware_executed += 1;
462 match middleware.before_dispatch(action, next_state, dispatcher.clone()) {
463 Ok(MiddlewareOp::ContinueAction) => {
464 }
466 Ok(MiddlewareOp::DoneAction) => {
467 need_notify = false;
469 }
470 Ok(MiddlewareOp::BreakChain) => {
471 break;
473 }
474 Err(e) => {
475 middleware.on_error(e);
476 }
477 }
478 }
479 let middleware_duration = middleware_start.elapsed();
480 self.metrics.middleware_executed(
481 Some(action),
482 "before_dispatch",
483 middleware_executed,
484 middleware_duration,
485 );
486 }
487
488 if need_notify {
489 let subscribers = self.subscribers.lock().unwrap().clone();
490 for subscriber in subscribers.iter() {
491 subscriber.on_notify(next_state, action);
492 }
493 let duration = _notify_start.elapsed();
494 self.metrics.subscriber_notified(Some(action), subscribers.len(), duration);
495 }
496 }
497
498 fn on_close(&self, action_received_at: Instant) {
499 #[cfg(feature = "store-log")]
500 eprintln!("store: on_close");
501
502 self.metrics.action_executed(None, action_received_at.elapsed());
503 }
504
505 pub fn close(&self) {
509 match self.dispatch_tx.lock() {
510 Ok(mut tx) => {
511 if let Some(tx) = tx.take() {
512 #[cfg(feature = "store-log")]
513 eprintln!("store: closing dispatch channel");
514 match tx.send(ActionOp::Exit(Instant::now())) {
515 Ok(_) => {
516 #[cfg(feature = "store-log")]
517 eprintln!("store: dispatch channel sent exit");
518 }
519 Err(_e) => {
520 #[cfg(feature = "store-log")]
521 eprintln!("store: Error while closing dispatch channel");
522 }
523 }
524 drop(tx);
525 }
526 }
527 Err(_e) => {
528 #[cfg(feature = "store-log")]
529 eprintln!("store: Error while locking dispatch channel: {:?}", _e);
530 return;
531 }
532 }
533
534 #[cfg(feature = "store-log")]
535 eprintln!("store: dispatch channel closed");
536 }
537
538 pub fn stop(&self) {
540 self.close();
541
542 match self.pool.lock() {
545 Ok(mut pool) => {
546 if let Some(pool) = pool.take() {
547 pool.shutdown_join();
548 }
549 #[cfg(feature = "store-log")]
550 eprintln!("store: shutdown pool");
551 }
552 Err(_e) => {
553 #[cfg(feature = "store-log")]
554 eprintln!("store: Error while locking pool: {:?}", _e);
555 return;
556 }
557 }
558
559 #[cfg(feature = "store-log")]
560 eprintln!("store: Store stopped");
561 }
562
563 pub fn stop_with_timeout(&self, timeout: Duration) {
565 self.close();
566
567 match self.pool.lock() {
570 Ok(mut pool) => {
571 if let Some(pool) = pool.take() {
572 pool.shutdown_join_timeout(timeout);
573 }
574 #[cfg(feature = "store-log")]
575 eprintln!("store: shutdown pool");
576 }
577 Err(_e) => {
578 #[cfg(feature = "store-log")]
579 eprintln!("store: Error while locking pool: {:?}", _e);
580 return;
581 }
582 }
583
584 #[cfg(feature = "store-log")]
585 eprintln!("store: Store stopped");
586 }
587
588 pub fn dispatch(&self, action: Action) -> Result<(), StoreError> {
594 let sender = self.dispatch_tx.lock().unwrap();
595 if let Some(tx) = sender.as_ref() {
596 let remains = tx.send(ActionOp::Action(action)).unwrap_or(0);
598 self.metrics.queue_size(remains as usize);
599 Ok(())
600 } else {
601 let err = StoreError::DispatchError("Dispatch channel is closed".to_string());
602 self.metrics.error_occurred(&err);
603 Err(err)
604 }
605 }
606
607 pub fn add_middleware(&self, middleware: Arc<dyn Middleware<State, Action> + Send + Sync>) {
609 self.middlewares.lock().unwrap().push(middleware.clone());
610 }
611
612 pub fn iter(&self) -> impl Iterator<Item = (State, Action)> {
617 self.iter_with(1, BackpressurePolicy::BlockOnFull)
618 }
619
620 pub(crate) fn iter_with(
626 &self,
627 capacity: usize,
628 policy: BackpressurePolicy<(State, Action)>,
629 ) -> impl Iterator<Item = (State, Action)> {
630 let (iter_tx, iter_rx) = BackpressureChannel::<(State, Action)>::pair_with(
631 "store_iter",
632 capacity,
633 policy,
634 Some(self.metrics.clone()),
635 );
636
637 let subscription = self.add_subscriber(Arc::new(StateIteratorSubscriber::new(iter_tx)));
638 StateIterator::new(iter_rx, subscription)
639 }
640
641 pub fn subscribed(
650 &self,
651 subscriber: Box<dyn Subscriber<State, Action> + Send + Sync>,
652 ) -> Result<Box<dyn Subscription>, StoreError> {
653 self.subscribed_with(
654 DEFAULT_CAPACITY,
655 BackpressurePolicy::BlockOnFull,
656 subscriber,
657 )
658 }
659
660 pub fn subscribed_with(
670 &self,
671 capacity: usize,
672 policy: BackpressurePolicy<(Instant, State, Action)>,
673 subscriber: Box<dyn Subscriber<State, Action> + Send + Sync>,
674 ) -> Result<Box<dyn Subscription>, StoreError> {
675 let (tx, rx) = BackpressureChannel::<(Instant, State, Action)>::pair_with(
677 format!("{}-channel", self.name).as_str(),
678 capacity,
679 policy,
680 Some(self.metrics.clone()),
681 );
682
683 let thread_name = format!("{}-channeled-subscriber", self.name);
685 let metrics_clone = self.metrics.clone();
686 let builder = thread::Builder::new().name(thread_name.clone());
687 let handle = match builder.spawn(move || {
688 Self::subscribed_loop(thread_name, rx, subscriber, metrics_clone);
690 }) {
691 Ok(h) => h,
692 Err(e) => {
693 #[cfg(feature = "store-log")]
694 eprintln!("store: Error while spawning channel thread: {:?}", e);
695 return Err(StoreError::SubscriptionError(format!(
696 "Error while spawning channel thread: {:?}",
697 e
698 )));
699 }
700 };
701
702 let channel_subscriber = Arc::new(ChanneledSubscriber::new(handle, tx));
704 let subscription = self.add_subscriber(channel_subscriber.clone());
705
706 Ok(subscription)
707 }
708
709 fn subscribed_loop(
710 _name: String,
711 rx: ReceiverChannel<(Instant, State, Action)>,
712 subscriber: Box<dyn Subscriber<State, Action>>,
713 metrics: Arc<dyn Metrics>,
714 ) {
715 #[cfg(feature = "store-log")]
716 eprintln!("store: {} channel thread started", _name);
717
718 while let Some(msg) = rx.recv() {
719 match msg {
720 ActionOp::Action((created_at, state, action)) => {
721 let started_at = Instant::now();
722 {
723 subscriber.on_notify(&state, &action);
724 }
725 metrics.subscriber_notified(Some(&action), 1, started_at.elapsed());
726
727 metrics.action_executed(Some(&action), created_at.elapsed());
729 }
730 ActionOp::AddSubscriber => {
731 #[cfg(feature = "store-log")]
734 eprintln!("store: {} received AddSubscriber (ignored)", _name);
735 }
736 ActionOp::Exit(created_at) => {
737 metrics.action_executed(None, created_at.elapsed());
738 #[cfg(feature = "store-log")]
739 eprintln!("store: {} channel thread loop exit", _name);
740 break;
741 }
742 }
743 }
744
745 #[cfg(feature = "store-log")]
746 eprintln!("store: {} channel thread done", _name);
747 }
748}
749
750struct ChanneledSubscriber<T>
752where
753 T: Send + Sync + Clone + 'static,
754{
755 handle: Mutex<Option<JoinHandle<()>>>,
756 tx: Mutex<Option<SenderChannel<T>>>,
757}
758
759impl<T> ChanneledSubscriber<T>
760where
761 T: Send + Sync + Clone + 'static,
762{
763 pub(crate) fn new(handle: JoinHandle<()>, tx: SenderChannel<T>) -> Self {
764 Self {
765 handle: Mutex::new(Some(handle)),
766 tx: Mutex::new(Some(tx)),
767 }
768 }
769
770 fn clear_resource(&self) {
771 if let Ok(mut tx_locked) = self.tx.lock() {
773 if let Some(tx) = tx_locked.take() {
774 let _ = tx.send(ActionOp::Exit(Instant::now()));
775 drop(tx);
776 };
777 }
778
779 if let Ok(mut handle) = self.handle.lock() {
781 if let Some(h) = handle.take() {
782 let _ = h.join();
783 }
784 }
785 }
786}
787
788impl<State, Action> Subscriber<State, Action> for ChanneledSubscriber<(Instant, State, Action)>
789where
790 State: Send + Sync + Clone + 'static,
791 Action: Send + Sync + Clone + 'static,
792{
793 fn on_notify(&self, state: &State, action: &Action) {
794 match self.tx.lock() {
795 Ok(tx) => {
796 tx.as_ref().map(|tx| {
797 tx.send(ActionOp::Action((
798 Instant::now(),
799 state.clone(),
800 action.clone(),
801 )))
802 });
803 }
804 Err(_e) => {
805 #[cfg(feature = "store-log")]
806 eprintln!("store: Error while locking channel: {:?}", _e);
807 }
808 }
809 }
810
811 fn on_unsubscribe(&self) {
812 self.clear_resource();
813 }
814}
815
816impl<T> Subscription for ChanneledSubscriber<T>
817where
818 T: Send + Sync + Clone + 'static,
819{
820 fn unsubscribe(&self) {
821 self.clear_resource();
822 }
823}
824
825impl<State, Action> Drop for StoreImpl<State, Action>
828where
829 State: Send + Sync + Clone + 'static,
830 Action: Send + Sync + Clone + 'static,
831{
832 fn drop(&mut self) {
833 self.close();
834
835 self.stop_with_timeout(DEFAULT_STOP_TIMEOUT);
837 #[cfg(feature = "store-log")]
844 eprintln!("store: '{}' Store dropped", self.name);
845 }
846}
847
848impl<State, Action> Store<State, Action> for StoreImpl<State, Action>
849where
850 State: Send + Sync + Clone + 'static,
851 Action: Send + Sync + Clone + 'static,
852{
853 fn get_state(&self) -> State {
854 self.get_state()
855 }
856
857 fn dispatch(&self, action: Action) -> Result<(), StoreError> {
858 self.dispatch(action)
859 }
860
861 fn add_subscriber(
862 &self,
863 subscriber: Arc<dyn Subscriber<State, Action> + Send + Sync>,
864 ) -> Box<dyn Subscription> {
865 self.add_subscriber(subscriber)
866 }
867
868 fn subscribed(
869 &self,
870 subscriber: Box<dyn Subscriber<State, Action> + Send + Sync>,
871 ) -> Result<Box<dyn Subscription>, StoreError> {
872 self.subscribed(subscriber)
873 }
874
875 fn subscribed_with(
876 &self,
877 capacity: usize,
878 policy: BackpressurePolicy<(Instant, State, Action)>,
879 subscriber: Box<dyn Subscriber<State, Action> + Send + Sync>,
880 ) -> Result<Box<dyn Subscription>, StoreError> {
881 self.subscribed_with(capacity, policy, subscriber)
882 }
883
884 fn stop(&self) {
885 self.stop();
886 }
887}
888
889#[cfg(test)]
890mod tests {
891 use super::*;
892 use std::thread;
893
894 struct TestChannelSubscriber {
895 received: Arc<Mutex<Vec<(i32, i32)>>>,
896 }
897
898 impl TestChannelSubscriber {
899 fn new(received: Arc<Mutex<Vec<(i32, i32)>>>) -> Self {
900 Self { received }
901 }
902 }
903
904 impl Subscriber<i32, i32> for TestChannelSubscriber {
905 fn on_notify(&self, state: &i32, action: &i32) {
906 println!("TestChannelSubscriber: state={}, action={}", state, action);
907 self.received.lock().unwrap().push((*state, *action));
908 }
909 }
910
911 struct TestReducer;
912
913 impl Reducer<i32, i32> for TestReducer {
914 fn reduce(&self, state: &i32, action: &i32) -> DispatchOp<i32, i32> {
915 DispatchOp::Dispatch(state + action, None)
916 }
917 }
918
919 struct SlowSubscriber {
920 received: Arc<Mutex<Vec<(i32, i32)>>>,
921 delay: Duration,
922 }
923
924 impl SlowSubscriber {
925 fn new(received: Arc<Mutex<Vec<(i32, i32)>>>, delay: Duration) -> Self {
926 Self { received, delay }
927 }
928 }
929
930 impl Subscriber<i32, i32> for SlowSubscriber {
931 fn on_notify(&self, state: &i32, action: &i32) {
932 println!("SlowSubscriber: state={}, action={}", state, action);
933 std::thread::sleep(self.delay);
934 self.received.lock().unwrap().push((*state, *action));
935 }
936 }
937
938 #[test]
939 fn test_store_subscribed_basic() {
940 let initial_state = 0;
942 let reducer = Box::new(TestReducer);
943 let store = StoreImpl::new_with_reducer(initial_state, reducer);
944
945 let received_states = Arc::new(Mutex::new(Vec::new()));
947 let subscriber1 = Box::new(TestChannelSubscriber::new(received_states.clone()));
948 let subscription = store.subscribed_with(10, BackpressurePolicy::DropOldest, subscriber1);
950
951 store.dispatch(1).unwrap();
953 store.dispatch(2).unwrap();
954
955 store.stop();
958
959 subscription.unwrap().unsubscribe();
961
962 let states = received_states.lock().unwrap();
964 assert_eq!(states.len(), 2);
965 assert_eq!(states[0], (1, 1)); assert_eq!(states[1], (3, 2)); }
968
969 #[test]
970 fn test_store_subscribed_backpressure() {
971 let store = StoreImpl::new_with_reducer(0, Box::new(TestReducer));
972
973 let received = Arc::new(Mutex::new(Vec::new()));
974 let received_clone = received.clone();
975 let subscriber = Box::new(SlowSubscriber::new(
976 received_clone,
977 Duration::from_millis(100),
978 ));
979 let subscription = store.subscribed_with(1, BackpressurePolicy::DropOldest, subscriber);
981
982 for i in 0..5 {
984 store.dispatch(i).unwrap();
985 }
986
987 thread::sleep(Duration::from_millis(200));
989 store.stop();
990 subscription.unwrap().unsubscribe();
991
992 let received = received.lock().unwrap();
994 assert!(received.len() <= 2); if let Some((state, action)) = received.last() {
997 assert_eq!(*action, 4); assert!(*state <= 10); }
1000 }
1001
1002 #[test]
1003 fn test_store_subscribed_subscription() {
1004 let store = StoreImpl::new_with_reducer(0, Box::new(TestReducer));
1005
1006 let received = Arc::new(Mutex::new(Vec::new()));
1007 let subscriber1 = Box::new(TestChannelSubscriber::new(received.clone()));
1008 let subscription = store.subscribed_with(10, BackpressurePolicy::DropOldest, subscriber1);
1009
1010 store.dispatch(1).unwrap();
1012
1013 thread::sleep(Duration::from_millis(100));
1015 assert_eq!(received.lock().unwrap().len(), 1);
1017
1018 subscription.unwrap().unsubscribe();
1020
1021 store.dispatch(2).unwrap();
1023 store.dispatch(3).unwrap();
1024 store.stop();
1026 assert_eq!(received.lock().unwrap().len(), 1);
1028 }
1029
1030 #[test]
1032 fn test_new_subscriber_receives_latest_state() {
1033 let store = StoreImpl::new_with_reducer(0, Box::new(TestReducer));
1034
1035 let received1 = Arc::new(Mutex::new(Vec::new()));
1037 let subscriber1 = Arc::new(TestChannelSubscriber::new(received1.clone()));
1038 store.add_subscriber(subscriber1);
1039
1040 store.dispatch(5).unwrap();
1042 store.dispatch(10).unwrap();
1043
1044 thread::sleep(Duration::from_millis(100));
1046
1047 let received2 = Arc::new(Mutex::new(Vec::new()));
1049 let subscriber2 = Arc::new(TestChannelSubscriber::new(received2.clone()));
1050 store.add_subscriber(subscriber2);
1051
1052 thread::sleep(Duration::from_millis(100));
1054
1055 store.dispatch(20).unwrap();
1057
1058 thread::sleep(Duration::from_millis(100));
1060
1061 let received1 = received1.lock().unwrap();
1063 assert_eq!(received1.len(), 3);
1064 assert_eq!(received1[0], (5, 5));
1065 assert_eq!(received1[1], (15, 10));
1066 assert_eq!(received1[2], (35, 20));
1067
1068 let received2 = received2.lock().unwrap();
1070 assert_eq!(received2.len(), 1);
1071 assert_eq!(received2[0], (35, 20));
1072 }
1073
1074 #[test]
1076 fn test_new_subscriber_on_subscribe_called() {
1077 let store = StoreImpl::new_with_reducer(0, Box::new(TestReducer));
1078
1079 store.dispatch(5).unwrap();
1081
1082 let received_states = Arc::new(Mutex::new(Vec::new()));
1084 let subscribe_called = Arc::new(Mutex::new(false));
1085
1086 struct TestSubscribeSubscriber {
1087 received_states: Arc<Mutex<Vec<i32>>>,
1088 subscribe_called: Arc<Mutex<bool>>,
1089 }
1090
1091 impl Subscriber<i32, i32> for TestSubscribeSubscriber {
1092 fn on_subscribe(&self, state: &i32) {
1093 self.received_states.lock().unwrap().push(*state);
1094 *self.subscribe_called.lock().unwrap() = true;
1095 }
1096
1097 fn on_notify(&self, state: &i32, _action: &i32) {
1098 self.received_states.lock().unwrap().push(*state);
1099 }
1100 }
1101
1102 let subscriber = Arc::new(TestSubscribeSubscriber {
1103 received_states: received_states.clone(),
1104 subscribe_called: subscribe_called.clone(),
1105 });
1106
1107 store.add_subscriber(subscriber);
1108
1109 thread::sleep(Duration::from_millis(100));
1111
1112 assert!(*subscribe_called.lock().unwrap());
1114
1115 let states = received_states.lock().unwrap();
1117 assert_eq!(states.len(), 1);
1118 assert_eq!(states[0], 5);
1119 }
1120
1121 #[test]
1123 fn test_multiple_subscribers_added_simultaneously() {
1124 let store = StoreImpl::new_with_reducer(0, Box::new(TestReducer));
1125
1126 store.dispatch(10).unwrap();
1128 store.dispatch(20).unwrap();
1129
1130 thread::sleep(Duration::from_millis(100));
1132
1133 let subscribers = vec![
1135 Arc::new(TestChannelSubscriber::new(Arc::new(Mutex::new(Vec::new())))),
1136 Arc::new(TestChannelSubscriber::new(Arc::new(Mutex::new(Vec::new())))),
1137 Arc::new(TestChannelSubscriber::new(Arc::new(Mutex::new(Vec::new())))),
1138 ];
1139
1140 for subscriber in &subscribers {
1141 store.add_subscriber(subscriber.clone());
1142 }
1143
1144 thread::sleep(Duration::from_millis(100));
1146
1147 store.dispatch(30).unwrap();
1149
1150 thread::sleep(Duration::from_millis(100));
1152
1153 for subscriber in &subscribers {
1155 let received = subscriber.received.lock().unwrap();
1156 assert_eq!(received.len(), 1);
1157 assert_eq!(received[0], (60, 30)); }
1159 }
1160
1161 #[test]
1163 fn test_subscriber_unsubscribe_after_add() {
1164 let store = StoreImpl::new_with_reducer(0, Box::new(TestReducer));
1165
1166 store.dispatch(5).unwrap();
1168
1169 let received = Arc::new(Mutex::new(Vec::new()));
1171 let subscriber = Arc::new(TestChannelSubscriber::new(received.clone()));
1172 let subscription = store.add_subscriber(subscriber);
1173
1174 thread::sleep(Duration::from_millis(100));
1176
1177 subscription.unsubscribe();
1179
1180 store.dispatch(10).unwrap();
1182
1183 thread::sleep(Duration::from_millis(100));
1185
1186 let received = received.lock().unwrap();
1188 assert_eq!(received.len(), 0);
1189 }
1190
1191 #[test]
1193 fn test_add_subscriber_after_store_stop() {
1194 let store = StoreImpl::new_with_reducer(0, Box::new(TestReducer));
1195
1196 store.stop();
1198
1199 let received = Arc::new(Mutex::new(Vec::new()));
1201 let subscriber = Arc::new(TestChannelSubscriber::new(received.clone()));
1202 let _subscription = store.add_subscriber(subscriber);
1203
1204 thread::sleep(Duration::from_millis(100));
1206
1207 let received = received.lock().unwrap();
1209 assert_eq!(received.len(), 0);
1210 }
1211
1212 #[test]
1214 fn test_subscriber_modifies_state_in_on_subscribe() {
1215 let store = StoreImpl::new_with_reducer(0, Box::new(TestReducer));
1216
1217 store.dispatch(5).unwrap();
1219
1220 struct ModifyingSubscriber {
1221 received_states: Arc<Mutex<Vec<i32>>>,
1222 subscribe_called: Arc<Mutex<bool>>,
1223 }
1224
1225 impl Subscriber<i32, i32> for ModifyingSubscriber {
1226 fn on_subscribe(&self, state: &i32) {
1227 self.received_states.lock().unwrap().push(*state);
1229 *self.subscribe_called.lock().unwrap() = true;
1230 }
1231
1232 fn on_notify(&self, state: &i32, _action: &i32) {
1233 self.received_states.lock().unwrap().push(*state);
1234 }
1235 }
1236
1237 let subscriber = Arc::new(ModifyingSubscriber {
1238 received_states: Arc::new(Mutex::new(Vec::new())),
1239 subscribe_called: Arc::new(Mutex::new(false)),
1240 });
1241
1242 store.add_subscriber(subscriber.clone());
1243
1244 thread::sleep(Duration::from_millis(100));
1246
1247 assert!(*subscriber.subscribe_called.lock().unwrap());
1249
1250 let states = subscriber.received_states.lock().unwrap();
1252 assert_eq!(states.len(), 1);
1253 assert_eq!(states[0], 5);
1254
1255 assert_eq!(store.get_state(), 5);
1257 }
1258
1259 #[test]
1261 fn test_consecutive_add_subscriber_actions() {
1262 let store = StoreImpl::new_with_reducer(0, Box::new(TestReducer));
1263
1264 let received1 = Arc::new(Mutex::new(Vec::new()));
1266 let subscriber1 = Arc::new(TestChannelSubscriber::new(received1.clone()));
1267 store.add_subscriber(subscriber1);
1268
1269 thread::sleep(Duration::from_millis(50));
1271
1272 let received2 = Arc::new(Mutex::new(Vec::new()));
1274 let subscriber2 = Arc::new(TestChannelSubscriber::new(received2.clone()));
1275 store.add_subscriber(subscriber2);
1276
1277 thread::sleep(Duration::from_millis(50));
1279
1280 let received3 = Arc::new(Mutex::new(Vec::new()));
1282 let subscriber3 = Arc::new(TestChannelSubscriber::new(received3.clone()));
1283 store.add_subscriber(subscriber3);
1284
1285 thread::sleep(Duration::from_millis(100));
1287
1288 store.dispatch(10).unwrap();
1290
1291 thread::sleep(Duration::from_millis(100));
1293
1294 assert_eq!(received1.lock().unwrap().len(), 1);
1296 assert_eq!(received2.lock().unwrap().len(), 1);
1297 assert_eq!(received3.lock().unwrap().len(), 1);
1298 }
1299}