1use crate::channel::{BackpressureChannel, BackpressurePolicy, ReceiverChannel, SenderChannel};
2use crate::dispatcher::Dispatcher;
3use crate::metrics::{CountMetrics, Metrics, MetricsSnapshot};
4use crate::middleware::{MiddlewareFn, MiddlewareFnFactory};
5use crate::subscriber::SubscriberWithId;
6use crate::{DispatchOp, Effect, Reducer, SenderError, Subscriber, Subscription};
7use rusty_pool::ThreadPool;
8use std::sync::{Arc, Mutex};
9use std::thread::JoinHandle;
10use std::time::{Duration, Instant};
11use std::{fmt, thread};
12
13use crate::iterator::{StateIterator, StateIteratorSubscriber};
14use crate::store::{Store, StoreError, DEFAULT_CAPACITY, DEFAULT_STORE_NAME};
15
16const DEFAULT_STOP_TIMEOUT: Duration = Duration::from_secs(5);
17
18#[derive(Clone, PartialEq)]
20pub(crate) enum ActionOp<Action>
21where
22 Action: Send + Sync + Clone + 'static,
23{
24 Action(Action),
26 AddSubscriber,
28 StateFunction,
30 #[allow(dead_code)]
32 Exit(Instant),
33}
34
35impl<Action> fmt::Debug for ActionOp<Action>
36where
37 Action: fmt::Debug + Send + Sync + Clone + 'static,
38{
39 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
40 match self {
41 ActionOp::Action(action) => f.debug_tuple("Action").field(action).finish(),
42 ActionOp::AddSubscriber => f.write_str("AddSubscriber"),
43 ActionOp::StateFunction => f.write_str("StateFunction"),
44 ActionOp::Exit(instant) => f.debug_tuple("Exit").field(instant).finish(),
45 }
46 }
47}
48
49#[cfg(feature = "store-log")]
51pub(crate) fn describe_action<Action>(_action: &Action) -> String
52where
53 Action: Send + Sync + Clone + 'static,
54{
55 format!("Action<{}>(..)", std::any::type_name::<Action>())
56}
57
58pub(crate) fn describe_action_op<Action>(action_op: &ActionOp<Action>) -> String
61where
62 Action: Send + Sync + Clone + 'static,
63{
64 match action_op {
65 ActionOp::Action(_) => {
66 format!("Action<{}>(..)", std::any::type_name::<Action>())
67 }
68 ActionOp::AddSubscriber => "AddSubscriber".to_string(),
69 ActionOp::StateFunction => "StateFunction".to_string(),
70 ActionOp::Exit(instant) => format!("Exit({instant:?})"),
71 }
72}
73
74#[allow(clippy::type_complexity)]
80pub struct StoreImpl<State, Action>
81where
82 State: Send + Sync + Clone + 'static,
83 Action: Send + Sync + Clone + 'static,
84{
85 #[allow(dead_code)]
86 pub(crate) name: String,
87 state: Mutex<State>,
88 pub(crate) reducers: Mutex<Vec<Box<dyn Reducer<State, Action> + Send + Sync>>>,
89 pub(crate) subscribers: Arc<Mutex<Vec<SubscriberWithId<State, Action>>>>,
90 adding_subscribers: Arc<Mutex<Vec<SubscriberWithId<State, Action>>>>,
92 state_functions: Arc<Mutex<Vec<Box<dyn FnOnce(&State) + Send + Sync + 'static>>>>,
93 pub(crate) dispatch_tx: Mutex<Option<SenderChannel<Action>>>,
94 middleware_factories: Mutex<Vec<Arc<dyn MiddlewareFnFactory<State, Action> + Send + Sync>>>, pub(crate) metrics: Arc<CountMetrics>,
98 pub(crate) pool: Mutex<Option<ThreadPool>>,
100}
101
102struct SubscriberSubscription {
105 #[allow(dead_code)]
106 subscriber_id: u64, unsubscribe: Box<dyn Fn(u64) + Send + Sync>,
108}
109
110impl Subscription for SubscriberSubscription {
111 fn unsubscribe(&self) {
112 (self.unsubscribe)(self.subscriber_id);
113 }
114}
115
116impl<State, Action> StoreImpl<State, Action>
117where
118 State: Send + Sync + Clone + 'static,
119 Action: Send + Sync + Clone + 'static,
120{
121 pub fn new_with_reducer(
135 state: State,
136 reducer: Box<dyn Reducer<State, Action> + Send + Sync>,
137 ) -> Result<Arc<StoreImpl<State, Action>>, StoreError> {
138 Self::new_with(
139 state,
140 vec![reducer],
141 DEFAULT_STORE_NAME.into(),
142 DEFAULT_CAPACITY,
143 BackpressurePolicy::default(),
144 vec![],
145 )
146 }
147
148 pub fn new_with_name(
150 state: State,
151 reducer: Box<dyn Reducer<State, Action> + Send + Sync>,
152 name: String,
153 ) -> Result<Arc<StoreImpl<State, Action>>, StoreError> {
154 Self::new_with(
155 state,
156 vec![reducer],
157 name,
158 DEFAULT_CAPACITY,
159 BackpressurePolicy::default(),
160 vec![],
161 )
162 }
163
164 pub fn new_with(
166 state: State,
167 reducers: Vec<Box<dyn Reducer<State, Action> + Send + Sync>>,
168 name: String,
169 capacity: usize,
170 policy: BackpressurePolicy<Action>,
171 middlewares: Vec<Arc<dyn MiddlewareFnFactory<State, Action> + Send + Sync>>,
172 ) -> Result<Arc<StoreImpl<State, Action>>, StoreError> {
173 let metrics = Arc::new(CountMetrics::default());
174 let (tx, rx) = BackpressureChannel::<Action>::pair_with(
175 "dispatch",
176 capacity,
177 policy,
178 Some(metrics.clone()),
179 );
180
181 if reducers.is_empty() {
182 return Err(StoreError::InitError(
183 "At least one reducer is required".to_string(),
184 ));
185 }
186
187 let store_impl = StoreImpl {
188 name: name.clone(),
189 state: Mutex::new(state),
190 reducers: Mutex::new(reducers),
191 subscribers: Arc::new(Mutex::new(Vec::default())),
192 adding_subscribers: Arc::new(Mutex::new(Vec::default())),
193 state_functions: Arc::new(Mutex::new(Vec::default())),
194 middleware_factories: Mutex::new(middlewares),
195 dispatch_tx: Mutex::new(Some(tx)),
196 metrics,
197 pool: Mutex::new(Some(
198 rusty_pool::Builder::new().name(format!("{}-pool", name)).build(),
199 )),
200 };
201
202 let rx_store = Arc::new(store_impl);
204 let tx_store = rx_store.clone();
205
206 match tx_store.pool.lock() {
208 Ok(pool) => {
209 if let Some(pool) = pool.as_ref() {
210 pool.execute(move || {
211 StoreImpl::reducer_thread(rx, rx_store);
212 })
213 }
214 }
215 Err(_e) => {
216 #[cfg(feature = "store-log")]
217 eprintln!("store: Error while locking pool: {:?}", _e);
218 return Err(StoreError::InitError(format!(
219 "Error while locking pool: {:?}",
220 _e
221 )));
222 }
223 }
224
225 Ok(tx_store)
226 }
227
228 pub(crate) fn reducer_thread(
230 rx: ReceiverChannel<Action>,
231 store_impl: Arc<StoreImpl<State, Action>>,
232 ) {
233 #[cfg(feature = "store-log")]
234 eprintln!("store: reducer thread started");
235
236 let store_clone = store_impl.clone();
237 let reducer_middleware: MiddlewareFn<State, Action> =
238 Arc::new(move |state: &State, action: &Action| {
239 let started_at = Instant::now();
240
241 let dispatch_op = if store_clone.reducers.lock().unwrap().len() == 1 {
242 store_clone.reducers.lock().unwrap()[0].reduce(state, action)
243 } else {
244 let reducers = store_clone.reducers.lock().unwrap();
245 let mut iter = reducers.iter();
246
247 let mut result = iter.next().unwrap().reduce(state, action);
249
250 for reducer in iter {
252 match result {
253 DispatchOp::Dispatch(current_state, current_effects) => {
254 result = reducer.reduce(¤t_state, action);
255 match result {
257 DispatchOp::Dispatch(s, mut e) => {
258 e.extend(current_effects);
259 result = DispatchOp::Dispatch(s, e);
260 }
261 DispatchOp::Keep(s, mut e) => {
262 e.extend(current_effects);
263 result = DispatchOp::Keep(s, e);
264 }
265 }
266 }
267 DispatchOp::Keep(current_state, current_effects) => {
268 result = reducer.reduce(¤t_state, action);
269 match result {
271 DispatchOp::Dispatch(s, mut e) => {
272 e.extend(current_effects);
273 result = DispatchOp::Dispatch(s, e);
274 }
275 DispatchOp::Keep(s, mut e) => {
276 e.extend(current_effects);
277 result = DispatchOp::Keep(s, e);
278 }
279 }
280 }
281 }
282 }
283 result
284 };
285
286 store_clone.metrics.action_reduced(
287 Some(action),
288 started_at.elapsed(),
289 Instant::now().elapsed(),
290 );
291 Ok(dispatch_op)
292 });
293
294 let mut middleware_deco = reducer_middleware;
295 for middleware in store_impl.middleware_factories.lock().unwrap().iter().rev() {
297 middleware_deco = middleware.create(middleware_deco);
298 }
299
300 let middleware_deco_arc = Arc::new(middleware_deco);
301 while let Some(action_op) = rx.recv() {
302 let action_received_at = Instant::now();
303 store_impl.metrics.action_received(Some(&action_op));
304 #[cfg(feature = "store-log")]
305 eprintln!(
306 "store: dispatch: action: {:?}, remains: {}",
307 describe_action_op(&action_op),
308 rx.len()
309 );
310 match action_op {
311 ActionOp::Action(action) => {
312 let current_state_ref = {
315 let state_guard = store_impl.state.lock().unwrap();
316 state_guard.clone()
317 };
318 let mut effects = vec![];
319 let result = store_impl.do_reduce(
320 ¤t_state_ref,
321 &action,
322 &mut effects,
323 middleware_deco_arc.clone(),
324 );
325
326 match result {
327 Ok(dispatch_op) => {
328 store_impl.do_effect(&mut effects, store_impl.clone());
330 match dispatch_op {
332 DispatchOp::Dispatch(new_state, _) => {
333 *store_impl.state.lock().unwrap() = new_state.clone();
335 store_impl.do_notify(
337 &action,
338 &new_state,
339 store_impl.clone(),
340 action_received_at,
341 );
342 }
343 DispatchOp::Keep(new_state, _) => {
344 *store_impl.state.lock().unwrap() = new_state;
346 }
347 }
348 }
349 Err(_e) => {
350 #[cfg(feature = "store-log")]
351 eprintln!(
352 "store: error do_reduce: action: {}, remains: {}",
353 describe_action(&action),
354 rx.len()
355 );
356 }
357 }
358 }
360 ActionOp::AddSubscriber => {
361 let mut new_subscribers = store_impl.adding_subscribers.lock().unwrap();
362 let new_subscribers_len = new_subscribers.len();
363 if new_subscribers_len > 0 {
364 let current_state = store_impl.state.lock().unwrap().clone();
365 let iter_subscribers = new_subscribers.drain(..);
366
367 store_impl.do_subscribe(current_state, iter_subscribers);
368 }
369
370 #[cfg(feature = "store-log")]
371 eprintln!("store: {} subscribers added", new_subscribers_len);
372 }
373
374 ActionOp::StateFunction => {
380 store_impl.do_state_function();
381 }
382 ActionOp::Exit(_) => {
383 store_impl.on_close(action_received_at);
384 #[cfg(feature = "store-log")]
385 eprintln!("store: reducer loop exit");
386 break;
387 }
388 }
389 store_impl.metrics.action_executed(None, action_received_at.elapsed());
390 }
391
392 store_impl.clear_subscribers();
394
395 #[cfg(feature = "store-log")]
396 eprintln!("store: reducer thread done");
397 }
398
399 pub fn get_state(&self) -> State {
403 self.state.lock().unwrap().clone()
404 }
405
406 pub fn get_metrics(&self) -> MetricsSnapshot {
408 (&(*self.metrics)).into()
409 }
410
411 pub fn add_subscriber(
426 &self,
427 subscriber: Arc<dyn Subscriber<State, Action> + Send + Sync>,
428 ) -> Result<Box<dyn Subscription>, StoreError> {
429 let subscriber_with_id = SubscriberWithId::new(subscriber);
431 let subscriber_id = subscriber_with_id.id;
432
433 self.adding_subscribers.lock().unwrap().push(subscriber_with_id);
435
436 if let Some(tx) = self.dispatch_tx.lock().unwrap().as_ref() {
438 match tx.send(ActionOp::AddSubscriber) {
439 Ok(_) => {}
440 Err(_e) => {
441 #[cfg(feature = "store-log")]
442 eprintln!(
443 "store: Error while sending add subscriber to dispatch channel: {:?}",
444 _e
445 );
446 self.adding_subscribers.lock().unwrap().retain(|s| s.id != subscriber_id);
447 return Err(StoreError::DispatchError(format!(
448 "Error while sending add subscriber to dispatch channel: {:?}",
449 _e
450 )));
451 }
452 }
453 } else {
454 self.adding_subscribers.lock().unwrap().retain(|s| s.id != subscriber_id);
455 return Err(StoreError::DispatchError(
456 "Dispatch channel is closed".to_string(),
457 ));
458 }
459
460 let subscribers = self.subscribers.clone();
462 let adding_subscribers = self.adding_subscribers.clone();
463 let subscription = Box::new(SubscriberSubscription {
464 subscriber_id, unsubscribe: Box::new(move |subscriber_id| {
466 let mut adding = adding_subscribers.lock().unwrap();
471 adding.retain(|s| s.id != subscriber_id); let mut subscribers = subscribers.lock().unwrap();
474 subscribers.retain(|s| {
475 let retain = s.id != subscriber_id; if !retain {
477 s.on_unsubscribe();
478 }
479 retain
480 });
481 }),
482 });
483
484 Ok(subscription)
485 }
486
487 pub(crate) fn clear_subscribers(&self) {
489 #[cfg(feature = "store-log")]
490 eprintln!("store: clear_subscribers");
491 match self.subscribers.lock() {
492 Ok(mut subscribers) => {
493 for subscriber_with_id in subscribers.iter() {
494 subscriber_with_id.on_unsubscribe();
495 }
496 subscribers.clear();
497 }
498 Err(mut e) => {
499 #[cfg(feature = "store-log")]
500 eprintln!("store: Error while locking subscribers: {:?}", e);
501 for subscriber_with_id in e.get_ref().iter() {
502 subscriber_with_id.on_unsubscribe();
503 }
504 e.get_mut().clear();
505 }
506 }
507 }
508
509 pub(crate) fn do_reduce(
521 &self,
522 state: &State,
523 action: &Action,
524 effects: &mut Vec<Effect<Action>>,
525 middleware_deco: Arc<MiddlewareFn<State, Action>>,
526 ) -> Result<DispatchOp<State, Action>, StoreError> {
527 let started_at = Instant::now();
528
529 let mut dispatch_op = middleware_deco(state, action)?;
531
532 match &mut dispatch_op {
534 DispatchOp::Dispatch(_, ref mut result_effects) => {
535 effects.append(result_effects);
536 }
537 DispatchOp::Keep(_, ref mut result_effects) => {
538 effects.append(result_effects);
539 }
540 }
541
542 self.metrics.middleware_executed(Some(action), "", 1, started_at.elapsed());
543
544 Ok(dispatch_op)
545 }
546
547 pub(crate) fn do_effect(
548 &self,
549 effects: &mut Vec<Effect<Action>>,
550 dispatcher: Arc<StoreImpl<State, Action>>,
551 ) {
552 let effect_start = Instant::now();
553 self.metrics.effect_issued(effects.len());
554
555 let effects_total = effects.len();
556 while !effects.is_empty() {
557 let effect = effects.remove(0);
558 match effect {
559 Effect::Action(a) => {
560 dispatcher.dispatch_thunk(Box::new(
561 move |weak_dispatcher| match weak_dispatcher.dispatch(a) {
562 Ok(_) => {}
563 Err(_e) => {
564 #[cfg(feature = "store-log")]
565 eprintln!("Error while dispatching action: {:?}", _e);
566 }
567 },
568 ));
569 }
570 Effect::Task(task) => {
571 dispatcher.dispatch_task(task);
572 }
573 Effect::Thunk(thunk) => {
574 dispatcher.dispatch_thunk(thunk);
575 }
576 Effect::Function(_tok, func) => {
577 let dispatcher_clone = dispatcher.clone();
578 dispatcher.dispatch_task(Box::new(move || {
579 match func() {
581 Ok(result) => {
582 if let Ok(action) = result.downcast::<Action>() {
584 let _ = dispatcher_clone.dispatch(*action);
585 return;
586 }
587 #[cfg(feature = "store-log")]
591 eprintln!("Effect function should be handled by a middleware, did you miss a middleware?");
592 }
593 Err(_e) => {
594 #[cfg(feature = "store-log")]
596 eprintln!("Effect function error: {:?}", _e);
597 }
598 }
599 }));
600 }
601 };
602 }
603
604 let duration = effect_start.elapsed();
605 self.metrics.effect_executed(effects_total, duration);
606 }
607
608 pub(crate) fn do_notify(
609 &self,
610 action: &Action,
611 next_state: &State,
612 _dispatcher: Arc<StoreImpl<State, Action>>,
613 _action_received_at: Instant,
614 ) {
615 let notify_start = Instant::now();
616 self.metrics.state_notified(Some(next_state));
617
618 #[cfg(feature = "store-log")]
619 eprintln!("store: notify: action: {}", describe_action(action));
620
621 let subscribers = self.subscribers.lock().unwrap().clone();
622 let subscriber_count = subscribers.len();
623
624 let action_for_metrics = action.clone();
626
627 for subscriber_with_id in subscribers.iter() {
629 subscriber_with_id.on_notify(next_state, action);
630 }
631
632 let duration = notify_start.elapsed();
633 self.metrics.subscriber_notified(Some(&action_for_metrics), subscriber_count, duration);
634 }
635
636 fn do_subscribe(
637 &self,
638 state: State,
639 new_subscribers: impl Iterator<Item = SubscriberWithId<State, Action>>,
640 ) {
641 let mut subscribers = self.subscribers.lock().unwrap();
642
643 for subscriber_with_id in new_subscribers {
645 subscriber_with_id.on_subscribe(&state);
646
647 subscribers.push(subscriber_with_id);
648 }
649 }
650
651 #[allow(dead_code)]
652 fn do_remove_subscriber(&self, subscriber_id: u64) {
653 let mut adding_subscribers = self.adding_subscribers.lock().unwrap();
655 adding_subscribers.retain(|s| s.id != subscriber_id);
656
657 let mut subscribers = self.subscribers.lock().unwrap();
659 subscribers.retain(|s| {
660 let retain = s.id != subscriber_id;
661 if !retain {
662 s.on_unsubscribe();
663 }
664 retain
665 });
666 }
667
668 fn do_state_function(&self) {
669 let state_ref = match self.state.lock() {
670 Ok(state) => state,
671 Err(_e) => {
672 #[cfg(feature = "store-log")]
673 eprintln!("store: Error while locking state: {:?}", _e);
674 return;
675 }
676 };
677 match self.state_functions.lock() {
678 Ok(mut state_functions) => {
679 for state_function in state_functions.drain(..) {
680 state_function(&state_ref);
681 }
682 }
684 Err(_e) => {
685 #[cfg(feature = "store-log")]
686 eprintln!("store: Error while locking state functions: {:?}", _e);
687 }
688 };
689 }
690
691 fn on_close(&self, action_received_at: Instant) {
692 #[cfg(feature = "store-log")]
693 eprintln!("store: on_close");
694
695 self.metrics.action_executed(None, action_received_at.elapsed());
696 }
697
698 pub fn close(&self) -> Result<(), StoreError> {
706 let dispatch_tx = self.dispatch_tx.lock().map(|mut tx| tx.take());
708 match dispatch_tx {
709 Ok(Some(tx)) => {
710 #[cfg(feature = "store-log")]
711 eprintln!("store: close: sending exit to dispatch channel");
712 match tx.send(ActionOp::Exit(Instant::now())) {
713 Ok(_) => {
714 #[cfg(feature = "store-log")]
715 eprintln!("store: close: dispatch channel sent exit");
716 }
717 Err(_e) => {
718 #[cfg(feature = "store-log")]
719 eprintln!(
720 "store: close: Error while sending exit to dispatch channel: {:?}",
721 _e
722 );
723 return Err(StoreError::DispatchError(format!(
724 "Error while sending exit to dispatch channel, try it later: {:?}",
725 _e
726 )));
727 }
728 }
729 }
730 Ok(None) => {
731 #[cfg(feature = "store-log")]
732 eprintln!("store: close: dispatch channel already closed");
733 return Ok(());
734 }
735 Err(_e) => {
736 #[cfg(feature = "store-log")]
737 eprintln!(
738 "store: close: Error while locking dispatch channel: {:?}",
739 _e
740 );
741 return Err(StoreError::DispatchError(format!(
742 "Error while locking dispatch channel: {:?}",
743 _e
744 )));
745 }
746 }
747
748 #[cfg(feature = "store-log")]
749 eprintln!("store: close: dispatch channel closed");
750 Ok(())
751 }
752
753 pub fn stop(&self) -> Result<(), StoreError> {
759 self.stop_with_timeout(Duration::from_millis(0))
760 }
761
762 pub fn stop_with_timeout(&self, timeout: Duration) -> Result<(), StoreError> {
764 match self.close() {
765 Ok(_) => {}
766 Err(_e) => {
767 #[cfg(feature = "store-log")]
768 eprintln!("store: Error while closing dispatch channel: {:?}", _e);
769 }
772 }
773
774 let pool = self.pool.lock().map(|mut pool_guard| pool_guard.take());
777 match pool {
778 Ok(Some(pool)) => {
779 if timeout.is_zero() {
780 pool.shutdown_join();
781 } else {
782 pool.shutdown_join_timeout(timeout);
783 }
784 #[cfg(feature = "store-log")]
785 eprintln!("store: pool shutdown completed");
786 }
787 Ok(None) => {
788 #[cfg(feature = "store-log")]
789 eprintln!("store: pool already shutdown");
790 }
791 Err(_e) => {
792 #[cfg(feature = "store-log")]
793 eprintln!("store: Error while locking pool: {:?}", _e);
794 return Err(StoreError::DispatchError(format!(
795 "Error while shutting down pool: {:?}",
796 _e
797 )));
798 }
799 }
800
801 #[cfg(feature = "store-log")]
802 eprintln!("store: stopped");
803 Ok(())
804 }
805
806 pub(crate) fn dispatch(&self, action: Action) -> Result<(), StoreError> {
812 let sender = self.dispatch_tx.lock().unwrap();
813 if let Some(tx) = sender.as_ref() {
814 match tx.send(ActionOp::Action(action)) {
816 Ok(remains) => {
817 self.metrics.queue_size(remains as usize);
818 Ok(())
819 }
820 Err(e) => match e {
821 SenderError::SendError(action_op) => {
822 let action_desc = describe_action_op(&action_op);
823 let err = StoreError::DispatchError(format!(
824 "Error while sending '{}' to dispatch channel",
825 action_desc
826 ));
827 self.metrics.error_occurred(&err);
828 Err(err)
829 }
830 SenderError::ChannelClosed => {
831 let err =
832 StoreError::DispatchError("Dispatch channel is closed".to_string());
833 self.metrics.error_occurred(&err);
834 Err(err)
835 }
836 },
837 }
838 } else {
839 let err = StoreError::DispatchError("Dispatch channel is closed".to_string());
840 self.metrics.error_occurred(&err);
841 Err(err)
842 }
843 }
844
845 pub fn query_state<F>(&self, query_fn: F) -> Result<(), StoreError>
857 where
858 F: FnOnce(&State) + Send + Sync + 'static,
859 {
860 self.state_functions.lock().unwrap().push(Box::new(query_fn));
861
862 if let Ok(tx) = self.dispatch_tx.lock() {
863 if let Some(tx) = tx.as_ref() {
864 return match tx.send(ActionOp::StateFunction) {
865 Ok(_) => Ok(()),
866 Err(e) => Err(StoreError::DispatchError(format!(
867 "Error while sending state function to dispatch channel: {:?}",
868 e
869 ))),
870 };
871 }
872 Err(StoreError::DispatchError(
873 "Dispatch channel is closed".to_string(),
874 ))
875 } else {
876 Err(StoreError::DispatchError(
877 "Dispatch channel is closed".to_string(),
878 ))
879 }
880 }
881
882 #[allow(dead_code)]
892 #[doc(hidden)]
893 pub(crate) fn iter(&self) -> Result<impl Iterator<Item = (State, Action)>, StoreError> {
894 self.iter_with(1, BackpressurePolicy::BlockOnFull)
895 }
896
897 #[allow(dead_code)]
903 #[doc(hidden)]
904 pub(crate) fn iter_with(
905 &self,
906 capacity: usize,
907 policy: BackpressurePolicy<(State, Action)>,
908 ) -> Result<impl Iterator<Item = (State, Action)>, StoreError> {
909 let (iter_tx, iter_rx) = BackpressureChannel::<(State, Action)>::pair_with(
910 "store_iter",
911 capacity,
912 policy,
913 Some(self.metrics.clone()),
914 );
915
916 let subscription = self.add_subscriber(Arc::new(StateIteratorSubscriber::new(iter_tx)));
917 Ok(StateIterator::new(iter_rx, subscription?))
918 }
919
920 pub fn subscribed(
929 &self,
930 subscriber: Box<dyn Subscriber<State, Action> + Send + Sync>,
931 ) -> Result<Box<dyn Subscription>, StoreError> {
932 self.subscribed_with(
933 DEFAULT_CAPACITY,
934 BackpressurePolicy::BlockOnFull,
935 subscriber,
936 )
937 }
938
939 pub fn subscribed_with(
948 &self,
949 capacity: usize,
950 policy: BackpressurePolicy<(Instant, State, Action)>,
951 subscriber: Box<dyn Subscriber<State, Action> + Send + Sync>,
952 ) -> Result<Box<dyn Subscription>, StoreError> {
953 let (tx, rx) = BackpressureChannel::<(Instant, State, Action)>::pair_with(
955 format!("{}-channel", self.name).as_str(),
956 capacity,
957 policy,
958 Some(self.metrics.clone()),
959 );
960
961 let thread_name = format!("{}-channeled-subscriber", self.name);
963 let metrics_clone = self.metrics.clone();
964 let builder = thread::Builder::new().name(thread_name.clone());
965 let handle = match builder.spawn(move || {
966 Self::subscribed_loop(thread_name, rx, subscriber, metrics_clone);
968 }) {
969 Ok(h) => h,
970 Err(e) => {
971 #[cfg(feature = "store-log")]
972 eprintln!("store: Error while spawning channel thread: {:?}", e);
973 return Err(StoreError::SubscriptionError(format!(
974 "Error while spawning channel thread: {:?}",
975 e
976 )));
977 }
978 };
979
980 let channel_subscriber = Arc::new(ChanneledSubscriber::new(handle, tx));
982 let subscription = self.add_subscriber(channel_subscriber);
983
984 #[allow(clippy::let_and_return)]
986 subscription
987 }
988
989 fn subscribed_loop(
990 _name: String,
991 rx: ReceiverChannel<(Instant, State, Action)>,
992 subscriber: Box<dyn Subscriber<State, Action>>,
993 metrics: Arc<dyn Metrics>,
994 ) {
995 #[cfg(feature = "store-log")]
996 eprintln!("store: {} channel thread started", _name);
997
998 while let Some(msg) = rx.recv() {
999 match msg {
1000 ActionOp::Action((created_at, state, action)) => {
1001 let started_at = Instant::now();
1002 {
1003 subscriber.on_notify(&state, &action);
1004 }
1005 metrics.subscriber_notified(Some(&action), 1, started_at.elapsed());
1006
1007 metrics.action_executed(Some(&action), created_at.elapsed());
1009 }
1010 ActionOp::AddSubscriber => {
1011 #[cfg(feature = "store-log")]
1014 eprintln!("store: {} received AddSubscriber (ignored)", _name);
1015 }
1016 ActionOp::StateFunction => {
1017 #[cfg(feature = "store-log")]
1018 eprintln!("store: {} received StateFunction (ignored)", _name);
1019 }
1020
1021 ActionOp::Exit(created_at) => {
1022 metrics.action_executed(None, created_at.elapsed());
1023 #[cfg(feature = "store-log")]
1024 eprintln!("store: {} channel thread loop exit", _name);
1025 break;
1026 }
1027 }
1028 }
1029
1030 #[cfg(feature = "store-log")]
1031 eprintln!("store: {} channel thread done", _name);
1032 }
1033}
1034
1035struct ChanneledSubscriber<T>
1037where
1038 T: Send + Sync + Clone + 'static,
1039{
1040 handle: Mutex<Option<JoinHandle<()>>>,
1041 tx: Mutex<Option<SenderChannel<T>>>,
1042}
1043
1044impl<T> ChanneledSubscriber<T>
1045where
1046 T: Send + Sync + Clone + 'static,
1047{
1048 pub(crate) fn new(handle: JoinHandle<()>, tx: SenderChannel<T>) -> Self {
1049 Self {
1050 handle: Mutex::new(Some(handle)),
1051 tx: Mutex::new(Some(tx)),
1052 }
1053 }
1054
1055 fn clear_resource(&self) {
1056 let tx_owned = self.tx.lock().map(|mut tx| tx.take());
1059 match tx_owned {
1060 Ok(Some(tx)) => {
1061 #[cfg(feature = "store-log")]
1062 eprintln!("store: ChanneledSubscriber: clearing resource: sending exit");
1063 let _ = tx.send(ActionOp::Exit(Instant::now()));
1064 drop(tx);
1065 }
1066 Ok(None) => {
1067 #[cfg(feature = "store-log")]
1068 eprintln!("store: ChanneledSubscriber: clearing resource: channel already closed");
1069 }
1070 Err(_e) => {
1071 #[cfg(feature = "store-log")]
1072 eprintln!(
1073 "store: ChanneledSubscriber: clearing resource: Error while locking channel: {:?}",
1074 _e
1075 );
1076 }
1077 }
1078
1079 let handled_owned = self.handle.lock().map(|mut handle| handle.take());
1081 match handled_owned {
1082 Ok(Some(h)) => {
1083 #[cfg(feature = "store-log")]
1084 eprintln!("store: ChanneledSubscriber: clearing resource: joining thread");
1085 let _ = h.join();
1086 }
1087 Ok(None) => {
1088 #[cfg(feature = "store-log")]
1089 eprintln!("store: ChanneledSubscriber: clearing resource: thread already joined");
1090 }
1091 Err(_e) => {
1092 #[cfg(feature = "store-log")]
1093 eprintln!(
1094 "store: ChanneledSubscriber: clearing resource: Error while locking thread handle: {:?}",
1095 _e
1096 );
1097 }
1098 }
1099 }
1100}
1101
1102impl<State, Action> Subscriber<State, Action> for ChanneledSubscriber<(Instant, State, Action)>
1103where
1104 State: Send + Sync + Clone + 'static,
1105 Action: Send + Sync + Clone + 'static,
1106{
1107 fn on_notify(&self, state: &State, action: &Action) {
1108 match self.tx.lock() {
1109 Ok(tx) => {
1110 tx.as_ref().map(|tx| {
1112 tx.send(ActionOp::Action((
1113 Instant::now(),
1114 state.clone(),
1115 action.clone(),
1116 )))
1117 });
1118 }
1119 Err(_e) => {
1120 #[cfg(feature = "store-log")]
1121 eprintln!("store: Error while locking channel: {:?}", _e);
1122 }
1123 }
1124 }
1125
1126 fn on_unsubscribe(&self) {
1127 self.clear_resource();
1128 }
1129}
1130
1131impl<T> Subscription for ChanneledSubscriber<T>
1132where
1133 T: Send + Sync + Clone + 'static,
1134{
1135 fn unsubscribe(&self) {
1136 self.clear_resource();
1137 }
1138}
1139
1140impl<State, Action> Drop for StoreImpl<State, Action>
1143where
1144 State: Send + Sync + Clone + 'static,
1145 Action: Send + Sync + Clone + 'static,
1146{
1147 fn drop(&mut self) {
1148 let _ = self.close();
1149
1150 let _ = self.stop_with_timeout(DEFAULT_STOP_TIMEOUT);
1152 #[cfg(feature = "store-log")]
1159 eprintln!("store: '{}' dropped", self.name);
1160 }
1161}
1162
1163impl<State, Action> Store<State, Action> for StoreImpl<State, Action>
1164where
1165 State: Send + Sync + Clone + 'static,
1166 Action: Send + Sync + Clone + std::fmt::Debug + 'static,
1167{
1168 fn get_state(&self) -> State {
1169 self.get_state()
1170 }
1171
1172 fn dispatch(&self, action: Action) -> Result<(), StoreError> {
1173 self.dispatch(action)
1174 }
1175
1176 fn add_subscriber(
1177 &self,
1178 subscriber: Arc<dyn Subscriber<State, Action> + Send + Sync>,
1179 ) -> Result<Box<dyn Subscription>, StoreError> {
1180 self.add_subscriber(subscriber)
1181 }
1182
1183 fn subscribed(
1184 &self,
1185 subscriber: Box<dyn Subscriber<State, Action> + Send + Sync>,
1186 ) -> Result<Box<dyn Subscription>, StoreError> {
1187 self.subscribed(subscriber)
1188 }
1189
1190 fn subscribed_with(
1191 &self,
1192 capacity: usize,
1193 policy: BackpressurePolicy<(Instant, State, Action)>,
1194 subscriber: Box<dyn Subscriber<State, Action> + Send + Sync>,
1195 ) -> Result<Box<dyn Subscription>, StoreError> {
1196 self.subscribed_with(capacity, policy, subscriber)
1197 }
1198
1199 fn stop(&self) -> Result<(), StoreError> {
1200 self.stop()
1201 }
1202
1203 fn stop_timeout(&self, timeout: Duration) -> Result<(), StoreError> {
1204 self.stop_with_timeout(timeout)
1205 }
1206}
1207
1208#[cfg(test)]
1209mod tests {
1210 use super::*;
1211 use crate::{BackpressurePolicy, Dispatcher, Effect, FnReducer, Reducer, StoreBuilder};
1212 use std::sync::Arc;
1213 use std::thread;
1214 use std::time::Duration;
1215
1216 struct TestChannelSubscriber {
1217 received: Arc<Mutex<Vec<(i32, i32)>>>,
1218 }
1219
1220 impl TestChannelSubscriber {
1221 fn new(received: Arc<Mutex<Vec<(i32, i32)>>>) -> Self {
1222 Self { received }
1223 }
1224 }
1225
1226 impl Subscriber<i32, i32> for TestChannelSubscriber {
1227 fn on_notify(&self, state: &i32, action: &i32) {
1228 self.received.lock().unwrap().push((*state, *action));
1230 }
1231 }
1232
1233 struct TestReducer;
1234
1235 impl Reducer<i32, i32> for TestReducer {
1236 fn reduce(&self, state: &i32, action: &i32) -> DispatchOp<i32, i32> {
1237 DispatchOp::Dispatch(state + action, vec![])
1238 }
1239 }
1240
1241 struct SlowSubscriber {
1242 received: Arc<Mutex<Vec<(i32, i32)>>>,
1243 delay: Duration,
1244 }
1245
1246 impl SlowSubscriber {
1247 fn new(received: Arc<Mutex<Vec<(i32, i32)>>>, delay: Duration) -> Self {
1248 Self { received, delay }
1249 }
1250 }
1251
1252 impl Subscriber<i32, i32> for SlowSubscriber {
1253 fn on_notify(&self, state: &i32, action: &i32) {
1254 std::thread::sleep(self.delay);
1256 self.received.lock().unwrap().push((*state, *action));
1257 }
1258 }
1259
1260 #[test]
1261 fn test_store_subscribed_basic() {
1262 let initial_state = 0;
1264 let reducer = Box::new(TestReducer);
1265 let store_result = StoreImpl::new_with_reducer(initial_state, reducer);
1266 assert!(store_result.is_ok());
1267 let store = store_result.unwrap();
1268
1269 let received_states = Arc::new(Mutex::new(Vec::new()));
1271 let subscriber1 = Box::new(TestChannelSubscriber::new(received_states.clone()));
1272 let subscription =
1274 store.subscribed_with(10, BackpressurePolicy::DropOldestIf(None), subscriber1);
1275
1276 store.dispatch(1).unwrap();
1278 store.dispatch(2).unwrap();
1279
1280 match store.stop() {
1283 Ok(_) => println!("store stopped"),
1284 Err(e) => {
1285 panic!("store stop failed : {:?}", e);
1286 }
1287 }
1288
1289 subscription.unwrap().unsubscribe();
1291
1292 let states = received_states.lock().unwrap();
1294 assert_eq!(states.len(), 2);
1295 assert_eq!(states[0], (1, 1)); assert_eq!(states[1], (3, 2)); }
1298
1299 #[test]
1300 fn test_store_subscribed_backpressure() {
1301 let store_result = StoreImpl::new_with_reducer(0, Box::new(TestReducer));
1302 assert!(store_result.is_ok());
1303 let store = store_result.unwrap();
1304
1305 let received = Arc::new(Mutex::new(Vec::new()));
1306 let received_clone = received.clone();
1307 let subscriber = Box::new(SlowSubscriber::new(
1308 received_clone,
1309 Duration::from_millis(100),
1310 ));
1311 let subscription =
1313 store.subscribed_with(1, BackpressurePolicy::DropOldestIf(None), subscriber);
1314
1315 for i in 0..5 {
1317 let _ = store.dispatch(i).unwrap();
1318 }
1319
1320 thread::sleep(Duration::from_millis(200));
1322 match store.stop() {
1323 Ok(_) => println!("store stopped"),
1324 Err(e) => {
1325 panic!("store stop failed : {:?}", e);
1326 }
1327 }
1328 subscription.unwrap().unsubscribe();
1329
1330 let received = received.lock().unwrap();
1332 assert!(received.len() <= 2); if let Some((state, action)) = received.last() {
1335 assert_eq!(*action, 4); assert!(*state <= 10); }
1338 }
1339
1340 #[test]
1341 fn test_store_subscribed_subscription() {
1342 let store = StoreImpl::new_with_reducer(0, Box::new(TestReducer)).unwrap();
1343
1344 let received = Arc::new(Mutex::new(Vec::new()));
1345 let subscriber1 = Box::new(TestChannelSubscriber::new(received.clone()));
1346 let subscription =
1347 store.subscribed_with(10, BackpressurePolicy::DropOldestIf(None), subscriber1);
1348
1349 store.dispatch(1).unwrap();
1351
1352 thread::sleep(Duration::from_millis(100));
1354 assert_eq!(received.lock().unwrap().len(), 1);
1356
1357 subscription.unwrap().unsubscribe();
1359
1360 store.dispatch(2).unwrap();
1362 store.dispatch(3).unwrap();
1363 match store.stop() {
1365 Ok(_) => println!("store stopped"),
1366 Err(e) => {
1367 panic!("store stop failed : {:?}", e);
1368 }
1369 }
1370 assert_eq!(received.lock().unwrap().len(), 1);
1372 }
1373
1374 #[test]
1376 fn test_new_subscriber_receives_latest_state() {
1377 let store = StoreImpl::new_with_reducer(0, Box::new(TestReducer)).unwrap();
1378
1379 let received1 = Arc::new(Mutex::new(Vec::new()));
1381 let subscriber1 = Arc::new(TestChannelSubscriber::new(received1.clone()));
1382 store.add_subscriber(subscriber1).unwrap();
1383
1384 store.dispatch(5).unwrap();
1386 store.dispatch(10).unwrap();
1387
1388 thread::sleep(Duration::from_millis(100));
1390
1391 let received2 = Arc::new(Mutex::new(Vec::new()));
1393 let subscriber2 = Arc::new(TestChannelSubscriber::new(received2.clone()));
1394 store.add_subscriber(subscriber2).unwrap();
1395
1396 thread::sleep(Duration::from_millis(100));
1398
1399 store.dispatch(20).unwrap();
1401
1402 thread::sleep(Duration::from_millis(100));
1404
1405 let received1 = received1.lock().unwrap();
1407 assert_eq!(received1.len(), 3);
1408 assert_eq!(received1[0], (5, 5));
1409 assert_eq!(received1[1], (15, 10));
1410 assert_eq!(received1[2], (35, 20));
1411
1412 let received2 = received2.lock().unwrap();
1414 assert_eq!(received2.len(), 1);
1415 assert_eq!(received2[0], (35, 20));
1416 }
1417
1418 #[test]
1420 fn test_new_subscriber_on_subscribe_called() {
1421 let store = StoreImpl::new_with_reducer(0, Box::new(TestReducer)).unwrap();
1422
1423 store.dispatch(5).unwrap();
1425
1426 let received_states = Arc::new(Mutex::new(Vec::new()));
1428 let subscribe_called = Arc::new(Mutex::new(false));
1429
1430 struct TestSubscribeSubscriber {
1431 received_states: Arc<Mutex<Vec<i32>>>,
1432 subscribe_called: Arc<Mutex<bool>>,
1433 }
1434
1435 impl Subscriber<i32, i32> for TestSubscribeSubscriber {
1436 fn on_subscribe(&self, state: &i32) {
1437 self.received_states.lock().unwrap().push(*state);
1438 *self.subscribe_called.lock().unwrap() = true;
1439 }
1440
1441 fn on_notify(&self, state: &i32, _action: &i32) {
1442 self.received_states.lock().unwrap().push(*state);
1443 }
1444 }
1445
1446 let subscriber = Arc::new(TestSubscribeSubscriber {
1447 received_states: received_states.clone(),
1448 subscribe_called: subscribe_called.clone(),
1449 });
1450
1451 store.add_subscriber(subscriber).unwrap();
1452
1453 thread::sleep(Duration::from_millis(100));
1455
1456 assert!(*subscribe_called.lock().unwrap());
1458
1459 let states = received_states.lock().unwrap();
1461 assert_eq!(states.len(), 1);
1462 assert_eq!(states[0], 5);
1463 }
1464
1465 #[test]
1467 fn test_multiple_subscribers_added_simultaneously() {
1468 let store = StoreImpl::new_with_reducer(0, Box::new(TestReducer)).unwrap();
1469
1470 store.dispatch(10).unwrap();
1472 store.dispatch(20).unwrap();
1473
1474 thread::sleep(Duration::from_millis(100));
1476
1477 let subscribers = vec![
1479 Arc::new(TestChannelSubscriber::new(Arc::new(Mutex::new(Vec::new())))),
1480 Arc::new(TestChannelSubscriber::new(Arc::new(Mutex::new(Vec::new())))),
1481 Arc::new(TestChannelSubscriber::new(Arc::new(Mutex::new(Vec::new())))),
1482 ];
1483
1484 for subscriber in &subscribers {
1485 store.add_subscriber(subscriber.clone()).unwrap();
1486 }
1487
1488 thread::sleep(Duration::from_millis(100));
1490
1491 store.dispatch(30).unwrap();
1493
1494 thread::sleep(Duration::from_millis(100));
1496
1497 for subscriber in &subscribers {
1499 let received = subscriber.received.lock().unwrap();
1500 assert_eq!(received.len(), 1);
1501 assert_eq!(received[0], (60, 30)); }
1503 }
1504
1505 #[test]
1507 fn test_subscriber_unsubscribe_after_add() {
1508 let store = StoreImpl::new_with_reducer(0, Box::new(TestReducer)).unwrap();
1509
1510 store.dispatch(5).unwrap();
1512
1513 let received = Arc::new(Mutex::new(Vec::new()));
1515 let subscriber = Arc::new(TestChannelSubscriber::new(received.clone()));
1516 let subscription = store.add_subscriber(subscriber).unwrap();
1517
1518 thread::sleep(Duration::from_millis(100));
1520
1521 subscription.unsubscribe();
1523
1524 store.dispatch(10).unwrap();
1526
1527 thread::sleep(Duration::from_millis(100));
1529
1530 let received = received.lock().unwrap();
1532 assert_eq!(received.len(), 0);
1533 }
1534
1535 #[test]
1537 fn test_add_subscriber_after_store_stop() {
1538 let store = StoreImpl::new_with_reducer(0, Box::new(TestReducer)).unwrap();
1539
1540 match store.stop() {
1542 Ok(_) => println!("store stopped"),
1543 Err(e) => {
1544 panic!("store stop failed : {:?}", e);
1545 }
1546 }
1547
1548 let received = Arc::new(Mutex::new(Vec::new()));
1550 let subscriber = Arc::new(TestChannelSubscriber::new(received.clone()));
1551 let subscription = store.add_subscriber(subscriber);
1552 assert!(subscription.is_err());
1553 }
1554
1555 #[test]
1557 fn test_subscriber_modifies_state_in_on_subscribe() {
1558 let store = StoreImpl::new_with_reducer(0, Box::new(TestReducer)).unwrap();
1559
1560 store.dispatch(5).unwrap();
1562
1563 struct ModifyingSubscriber {
1564 received_states: Arc<Mutex<Vec<i32>>>,
1565 subscribe_called: Arc<Mutex<bool>>,
1566 }
1567
1568 impl Subscriber<i32, i32> for ModifyingSubscriber {
1569 fn on_subscribe(&self, state: &i32) {
1570 self.received_states.lock().unwrap().push(*state);
1572 *self.subscribe_called.lock().unwrap() = true;
1573 }
1574
1575 fn on_notify(&self, state: &i32, _action: &i32) {
1576 self.received_states.lock().unwrap().push(*state);
1577 }
1578 }
1579
1580 let subscriber = Arc::new(ModifyingSubscriber {
1581 received_states: Arc::new(Mutex::new(Vec::new())),
1582 subscribe_called: Arc::new(Mutex::new(false)),
1583 });
1584
1585 store.add_subscriber(subscriber.clone()).unwrap();
1586
1587 thread::sleep(Duration::from_millis(100));
1589
1590 assert!(*subscriber.subscribe_called.lock().unwrap());
1592
1593 let states = subscriber.received_states.lock().unwrap();
1595 assert_eq!(states.len(), 1);
1596 assert_eq!(states[0], 5);
1597
1598 assert_eq!(store.get_state(), 5);
1600 }
1601
1602 #[test]
1604 fn test_consecutive_add_subscriber_actions() {
1605 let store = StoreImpl::new_with_reducer(0, Box::new(TestReducer)).unwrap();
1606
1607 let received1 = Arc::new(Mutex::new(Vec::new()));
1609 let subscriber1 = Arc::new(TestChannelSubscriber::new(received1.clone()));
1610 store.add_subscriber(subscriber1).unwrap();
1611
1612 thread::sleep(Duration::from_millis(50));
1614
1615 let received2 = Arc::new(Mutex::new(Vec::new()));
1617 let subscriber2 = Arc::new(TestChannelSubscriber::new(received2.clone()));
1618 store.add_subscriber(subscriber2).unwrap();
1619
1620 thread::sleep(Duration::from_millis(50));
1622
1623 let received3 = Arc::new(Mutex::new(Vec::new()));
1625 let subscriber3 = Arc::new(TestChannelSubscriber::new(received3.clone()));
1626 store.add_subscriber(subscriber3).unwrap();
1627
1628 thread::sleep(Duration::from_millis(100));
1630
1631 store.dispatch(10).unwrap();
1633
1634 thread::sleep(Duration::from_millis(100));
1636
1637 assert_eq!(received1.lock().unwrap().len(), 1);
1639 assert_eq!(received2.lock().unwrap().len(), 1);
1640 assert_eq!(received3.lock().unwrap().len(), 1);
1641 }
1642
1643 #[test]
1645 fn test_store_iter_basic() {
1646 let store = StoreImpl::new_with_reducer(0, Box::new(TestReducer)).unwrap();
1654
1655 let mut iter = store.iter().unwrap();
1657
1658 store.dispatch(10).expect("dispatch should succeed");
1660 store.dispatch(20).expect("dispatch should succeed");
1661 store.dispatch(30).expect("dispatch should succeed");
1662
1663 assert_eq!(iter.next(), Some((10, 10))); assert_eq!(iter.next(), Some((30, 20))); assert_eq!(iter.next(), Some((60, 30))); store.stop().expect("store should stop");
1670 assert_eq!(iter.next(), None);
1671 }
1672
1673 #[test]
1675 fn test_store_iter_no_actions() {
1676 let store = StoreImpl::new_with_reducer(0, Box::new(TestReducer)).unwrap();
1678
1679 let mut iter = store.iter().unwrap();
1681
1682 store.stop().expect("store should stop");
1685 assert_eq!(iter.next(), None);
1686 }
1687
1688 #[test]
1690 fn test_store_iter_complex_types() {
1691 #[derive(Debug, Clone, PartialEq)]
1693 struct ComplexState {
1694 value: i32,
1695 name: String,
1696 }
1697
1698 #[allow(dead_code)]
1699 #[derive(Debug, Clone, PartialEq)]
1700 enum ComplexAction {
1701 Add(i32),
1702 SetName(String),
1703 Reset,
1704 }
1705
1706 let store = StoreImpl::new_with_reducer(
1707 ComplexState {
1708 value: 0,
1709 name: "initial".to_string(),
1710 },
1711 Box::new(FnReducer::from(
1712 |state: &ComplexState, action: &ComplexAction| match action {
1713 ComplexAction::Add(n) => DispatchOp::Dispatch(
1714 ComplexState {
1715 value: state.value + n,
1716 name: state.name.clone(),
1717 },
1718 vec![],
1719 ),
1720 ComplexAction::SetName(name) => DispatchOp::Dispatch(
1721 ComplexState {
1722 value: state.value,
1723 name: name.clone(),
1724 },
1725 vec![],
1726 ),
1727 ComplexAction::Reset => DispatchOp::Dispatch(
1728 ComplexState {
1729 value: 0,
1730 name: "reset".to_string(),
1731 },
1732 vec![],
1733 ),
1734 },
1735 )),
1736 )
1737 .unwrap();
1738
1739 let mut iter = store.iter().unwrap();
1741
1742 store.dispatch(ComplexAction::Add(10)).expect("dispatch should succeed");
1743 store
1744 .dispatch(ComplexAction::SetName("test".to_string()))
1745 .expect("dispatch should succeed");
1746 store.dispatch(ComplexAction::Add(5)).expect("dispatch should succeed");
1747
1748 assert_eq!(
1750 iter.next(),
1751 Some((
1752 ComplexState {
1753 value: 10,
1754 name: "initial".to_string(),
1755 },
1756 ComplexAction::Add(10)
1757 ))
1758 );
1759 assert_eq!(
1760 iter.next(),
1761 Some((
1762 ComplexState {
1763 value: 10,
1764 name: "test".to_string(),
1765 },
1766 ComplexAction::SetName("test".to_string())
1767 ))
1768 );
1769 assert_eq!(
1770 iter.next(),
1771 Some((
1772 ComplexState {
1773 value: 15,
1774 name: "test".to_string(),
1775 },
1776 ComplexAction::Add(5)
1777 ))
1778 );
1779
1780 store.stop().expect("store should stop");
1781 assert_eq!(iter.next(), None);
1782 }
1783
1784 #[test]
1786 fn test_store_iter_concurrent_actions() {
1787 let store = StoreImpl::new_with_reducer(0, Box::new(TestReducer)).unwrap();
1789
1790 let mut iter = store.iter().unwrap();
1792
1793 for i in 1..=10 {
1795 store.dispatch(i).expect("dispatch should succeed");
1796 }
1797
1798 let mut expected_state = 0;
1800 for i in 1..=10 {
1801 expected_state += i;
1802 assert_eq!(iter.next(), Some((expected_state, i)));
1803 }
1804
1805 store.stop().expect("store should stop");
1806 assert_eq!(iter.next(), None);
1807 }
1808
1809 #[test]
1811 fn test_store_iter_with_middleware() {
1812 struct TestMiddleware;
1814
1815 impl<State, Action> MiddlewareFnFactory<State, Action> for TestMiddleware
1816 where
1817 State: Send + Sync + Clone + 'static,
1818 Action: Send + Sync + Clone + std::fmt::Debug + 'static,
1819 {
1820 fn create(&self, inner: MiddlewareFn<State, Action>) -> MiddlewareFn<State, Action> {
1821 inner
1822 }
1823 }
1824
1825 let store = StoreImpl::new_with(
1826 0,
1827 vec![Box::new(FnReducer::from(|state: &i32, action: &i32| {
1828 DispatchOp::Dispatch(state + action, vec![])
1829 }))],
1830 "test".to_string(),
1831 1,
1832 BackpressurePolicy::default(),
1833 vec![Arc::new(TestMiddleware)],
1834 )
1835 .unwrap();
1836
1837 let mut iter = store.iter().unwrap();
1839
1840 store.dispatch(5).expect("dispatch should succeed");
1841 store.dispatch(10).expect("dispatch should succeed");
1842
1843 assert_eq!(iter.next(), Some((5, 5)));
1845 assert_eq!(iter.next(), Some((15, 10)));
1846
1847 store.stop().expect("store should stop");
1848 assert_eq!(iter.next(), None);
1849 }
1850
1851 #[test]
1853 fn test_store_iter_with_effects() {
1854 let store = StoreImpl::new_with_reducer(
1856 0,
1857 Box::new(FnReducer::from(|state: &i32, action: &i32| {
1858 let new_state = state + action;
1859 let mut effects_vec = vec![];
1860 if *action > 5 {
1861 effects_vec.push(Effect::Task(Box::new(|| {
1862 })));
1864 }
1865 DispatchOp::Dispatch(new_state, effects_vec)
1866 })),
1867 )
1868 .unwrap();
1869
1870 let mut iter = store.iter().unwrap();
1872
1873 store.dispatch(3).expect("dispatch should succeed"); store.dispatch(10).expect("dispatch should succeed"); assert_eq!(iter.next(), Some((3, 3)));
1878 assert_eq!(iter.next(), Some((13, 10)));
1879
1880 store.stop().expect("store should stop");
1881 assert_eq!(iter.next(), None);
1882 }
1883
1884 #[test]
1886 fn test_store_iter_with_multiple_reducers() {
1887 let store = StoreBuilder::new(0)
1891 .with_reducer(Box::new(FnReducer::from(|state: &i32, action: &i32| {
1892 DispatchOp::Dispatch(state + action, vec![])
1893 })))
1894 .with_reducer(Box::new(FnReducer::from(|state: &i32, _action: &i32| {
1895 DispatchOp::Dispatch(state * 2, vec![])
1896 })))
1897 .build()
1898 .unwrap();
1899
1900 store.dispatch(5).expect("dispatch should succeed");
1904 store.dispatch(10).expect("dispatch should succeed");
1905
1906 store.stop().expect("store should stop");
1918 }
1920
1921 #[test]
1923 fn test_store_iter_early_stop() {
1924 let store = StoreImpl::new_with_reducer(0, Box::new(TestReducer)).unwrap();
1926
1927 let mut iter = store.iter().unwrap();
1929
1930 store.dispatch(5).expect("dispatch should succeed");
1931 store.dispatch(10).expect("dispatch should succeed");
1932 store.dispatch(15).expect("dispatch should succeed");
1933
1934 assert_eq!(iter.next(), Some((5, 5)));
1936 assert_eq!(iter.next(), Some((15, 10))); assert_eq!(iter.next(), Some((30, 15))); store.stop().expect("store should stop");
1941 }
1942
1943 #[test]
1945 fn test_store_iter_with_block_on_full() {
1946 let store = StoreImpl::new_with(
1948 0,
1949 vec![Box::new(FnReducer::from(|state: &i32, action: &i32| {
1950 DispatchOp::Dispatch(state + action, vec![])
1951 }))],
1952 "test".to_string(),
1953 2,
1954 BackpressurePolicy::BlockOnFull,
1955 vec![],
1956 )
1957 .unwrap();
1958
1959 let mut iter = store.iter_with(1, BackpressurePolicy::BlockOnFull).unwrap();
1961
1962 store.dispatch(5).expect("dispatch should succeed");
1963 store.dispatch(10).expect("dispatch should succeed");
1964
1965 assert_eq!(iter.next(), Some((5, 5)));
1967 assert_eq!(iter.next(), Some((15, 10)));
1968
1969 store.stop().expect("store should stop");
1970 }
1971
1972 #[test]
2000 fn test_store_iter_string_types() {
2001 let store = StoreImpl::new_with_reducer(
2003 "".to_string(),
2004 Box::new(FnReducer::from(|state: &String, action: &String| {
2005 let new_state = format!("{}{}", state, action);
2006 DispatchOp::Dispatch(new_state, vec![])
2007 })),
2008 )
2009 .unwrap();
2010
2011 let mut iter = store.iter().unwrap();
2013
2014 store.dispatch("hello".to_string()).expect("dispatch should succeed");
2015 store.dispatch(" world".to_string()).expect("dispatch should succeed");
2016
2017 assert_eq!(
2019 iter.next(),
2020 Some(("hello".to_string(), "hello".to_string()))
2021 );
2022 assert_eq!(
2023 iter.next(),
2024 Some(("hello world".to_string(), " world".to_string()))
2025 );
2026
2027 store.stop().expect("store should stop");
2028 assert_eq!(iter.next(), None);
2029 }
2030
2031 #[test]
2033 fn test_store_iter_no_state_change() {
2034 let store = StoreImpl::new_with_reducer(
2036 0,
2037 Box::new(FnReducer::from(|state: &i32, _action: &i32| {
2038 DispatchOp::Dispatch(state.clone(), vec![]) })),
2040 )
2041 .unwrap();
2042 let mut iter = store.iter().unwrap();
2044
2045 store.dispatch(5).expect("dispatch should succeed");
2046 store.dispatch(10).expect("dispatch should succeed");
2047
2048 assert_eq!(iter.next(), Some((0, 5))); assert_eq!(iter.next(), Some((0, 10))); store.stop().expect("store should stop");
2053 assert_eq!(iter.next(), None);
2054 }
2055
2056 #[test]
2058 fn test_query_state_basic() {
2059 let store = StoreImpl::new_with_reducer(0, Box::new(TestReducer)).unwrap();
2061
2062 store.dispatch(5).unwrap();
2064 store.dispatch(10).unwrap();
2065
2066 std::thread::sleep(std::time::Duration::from_millis(100));
2068
2069 let queried_value = std::sync::Arc::new(std::sync::Mutex::new(0));
2071 let queried_value_clone = queried_value.clone();
2072 store
2073 .query_state(move |state| {
2074 *queried_value_clone.lock().unwrap() = *state;
2075 })
2076 .unwrap();
2077
2078 let _ = store.stop();
2079 assert_eq!(*queried_value.lock().unwrap(), 15);
2081 }
2082
2083 #[test]
2085 fn test_query_state_complex() {
2086 #[derive(Debug, Clone, PartialEq)]
2088 struct ComplexState {
2089 value: i32,
2090 name: String,
2091 }
2092
2093 #[derive(Debug, Clone, PartialEq)]
2094 enum ComplexAction {
2095 Add(i32),
2096 SetName(String),
2097 }
2098
2099 let store = StoreImpl::new_with_reducer(
2100 ComplexState {
2101 value: 0,
2102 name: "initial".to_string(),
2103 },
2104 Box::new(FnReducer::from(
2105 |state: &ComplexState, action: &ComplexAction| match action {
2106 ComplexAction::Add(n) => DispatchOp::Dispatch(
2107 ComplexState {
2108 value: state.value + n,
2109 name: state.name.clone(),
2110 },
2111 vec![],
2112 ),
2113 ComplexAction::SetName(name) => DispatchOp::Dispatch(
2114 ComplexState {
2115 value: state.value,
2116 name: name.clone(),
2117 },
2118 vec![],
2119 ),
2120 },
2121 )),
2122 )
2123 .unwrap();
2124
2125 store.dispatch(ComplexAction::Add(10)).unwrap();
2127 store.dispatch(ComplexAction::SetName("test".to_string())).unwrap();
2128 store.dispatch(ComplexAction::Add(5)).unwrap();
2129
2130 let queried_value = std::sync::Arc::new(std::sync::Mutex::new(0));
2132 let queried_name = std::sync::Arc::new(std::sync::Mutex::new(String::new()));
2133 let queried_value_clone = queried_value.clone();
2134 let queried_name_clone = queried_name.clone();
2135 store
2136 .query_state(move |state| {
2137 *queried_value_clone.lock().unwrap() = state.value;
2138 *queried_name_clone.lock().unwrap() = state.name.clone();
2139 })
2140 .unwrap();
2141
2142 store.stop().unwrap();
2143
2144 assert_eq!(*queried_value.lock().unwrap(), 15); assert_eq!(*queried_name.lock().unwrap(), "test");
2147 }
2148
2149 #[test]
2151 fn test_query_state_multiple_queries() {
2152 let store = StoreImpl::new_with_reducer(0, Box::new(TestReducer)).unwrap();
2154
2155 store.dispatch(5).unwrap();
2157 store.dispatch(10).unwrap();
2158
2159 std::thread::sleep(std::time::Duration::from_millis(100));
2161
2162 let query1_result = std::sync::Arc::new(std::sync::Mutex::new(0));
2164 let query2_result = std::sync::Arc::new(std::sync::Mutex::new(0));
2165
2166 let query1_clone = query1_result.clone();
2167 store
2168 .query_state(move |state| {
2169 *query1_clone.lock().unwrap() = *state;
2170 })
2171 .unwrap();
2172
2173 store.dispatch(20).unwrap();
2174
2175 std::thread::sleep(std::time::Duration::from_millis(100));
2177
2178 let query2_clone = query2_result.clone();
2179 store
2180 .query_state(move |state| {
2181 *query2_clone.lock().unwrap() = *state;
2182 })
2183 .unwrap();
2184
2185 store.stop().unwrap();
2186
2187 assert_eq!(*query1_result.lock().unwrap(), 15); assert_eq!(*query2_result.lock().unwrap(), 35); }
2191
2192 #[test]
2194 fn test_query_state_error_handling() {
2195 let store = StoreImpl::new_with_reducer(0, Box::new(TestReducer)).unwrap();
2197
2198 let queried_value = std::sync::Arc::new(std::sync::Mutex::new(0));
2200 let queried_value_clone = queried_value.clone();
2201 let result = store.query_state(move |state| {
2202 *queried_value_clone.lock().unwrap() = *state;
2203 });
2204
2205 assert!(result.is_ok());
2207 assert_eq!(*queried_value.lock().unwrap(), 0);
2208 }
2209}