pub struct StoreImpl<State, Action>{ /* private fields */ }Expand description
StoreImpl is the default implementation of a Redux store
Implementations§
Source§impl<State, Action> StoreImpl<State, Action>
impl<State, Action> StoreImpl<State, Action>
Sourcepub fn new(state: State) -> Arc<StoreImpl<State, Action>>
pub fn new(state: State) -> Arc<StoreImpl<State, Action>>
create a new store with an initial state
Sourcepub fn new_with_reducer(
state: State,
reducer: Box<dyn Reducer<State, Action> + Send + Sync>,
) -> Arc<StoreImpl<State, Action>>
pub fn new_with_reducer( state: State, reducer: Box<dyn Reducer<State, Action> + Send + Sync>, ) -> Arc<StoreImpl<State, Action>>
create a new store with a reducer and an initial state
Examples found in repository?
93pub fn main() {
94 println!("Hello, Basic!");
95
96 let store = StoreImpl::<CalcState, CalcAction>::new_with_reducer(
97 CalcState::default(),
98 Box::new(CalcReducer::default()),
99 );
100
101 println!("add subscriber");
102 store.add_subscriber(Arc::new(CalcSubscriber::default()));
103 store.dispatch(CalcAction::Add(1)).expect("no dispatch failed");
104 store.dispatch(CalcAction::Subtract(1)).expect("no dispatch failed");
105
106 // stop the store
107 store.stop();
108
109 assert_eq!(store.get_state().count, 0);
110}More examples
3fn main() {
4 // new store with reducer
5 let store = StoreImpl::new_with_reducer(
6 0,
7 Box::new(FnReducer::from(|state: &i32, action: &i32| {
8 println!("reducer: {} + {}", state, action);
9 DispatchOp::Dispatch(state + action, None)
10 })),
11 );
12
13 let mut iter = store.iter();
14
15 // dispatch actions
16 let _ = store.dispatch(41);
17 let _ = store.dispatch(1);
18
19 assert_eq!(iter.next(), Some((41, 41)));
20 assert_eq!(iter.next(), Some((42, 1)));
21
22 // stop the store
23 store.stop();
24
25 assert_eq!(iter.next(), None);
26 eprintln!("exit");
27}Sourcepub fn new_with_name(
state: State,
reducer: Box<dyn Reducer<State, Action> + Send + Sync>,
name: String,
) -> Result<Arc<StoreImpl<State, Action>>, StoreError>
pub fn new_with_name( state: State, reducer: Box<dyn Reducer<State, Action> + Send + Sync>, name: String, ) -> Result<Arc<StoreImpl<State, Action>>, StoreError>
create a new store with name
Sourcepub fn new_with(
state: State,
reducers: Vec<Box<dyn Reducer<State, Action> + Send + Sync>>,
name: String,
capacity: usize,
policy: BackpressurePolicy<Action>,
middlewares: Vec<Arc<dyn Middleware<State, Action> + Send + Sync>>,
) -> Result<Arc<StoreImpl<State, Action>>, StoreError>
pub fn new_with( state: State, reducers: Vec<Box<dyn Reducer<State, Action> + Send + Sync>>, name: String, capacity: usize, policy: BackpressurePolicy<Action>, middlewares: Vec<Arc<dyn Middleware<State, Action> + Send + Sync>>, ) -> Result<Arc<StoreImpl<State, Action>>, StoreError>
create a new store
Sourcepub fn get_state(&self) -> State
pub fn get_state(&self) -> State
get the latest state(for debugging)
prefer to use subscribe to get the state
Examples found in repository?
93pub fn main() {
94 println!("Hello, Basic!");
95
96 let store = StoreImpl::<CalcState, CalcAction>::new_with_reducer(
97 CalcState::default(),
98 Box::new(CalcReducer::default()),
99 );
100
101 println!("add subscriber");
102 store.add_subscriber(Arc::new(CalcSubscriber::default()));
103 store.dispatch(CalcAction::Add(1)).expect("no dispatch failed");
104 store.dispatch(CalcAction::Subtract(1)).expect("no dispatch failed");
105
106 // stop the store
107 store.stop();
108
109 assert_eq!(store.get_state().count, 0);
110}More examples
4pub fn main() {
5 // new store with reducer
6 let store = StoreBuilder::new(0)
7 .with_reducer(Box::new(FnReducer::from(|state: &i32, action: &i32| {
8 println!("reducer: {} + {}", state, action);
9 DispatchOp::Dispatch(state + action, None)
10 })))
11 .build()
12 .unwrap();
13
14 // add subscriber
15 store.add_subscriber(Arc::new(FnSubscriber::from(
16 |state: &i32, _action: &i32| {
17 println!("subscriber: state: {}", state);
18 },
19 )));
20
21 // dispatch actions
22 store.dispatch(41).expect("no error");
23 store.dispatch(1).expect("no error");
24
25 // stop the store
26 store.stop();
27
28 assert_eq!(store.get_state(), 42);
29}94pub fn main() {
95 println!("Hello, Builder!");
96
97 let store =
98 StoreBuilder::new_with_reducer(CalcState::default(), Box::new(CalcReducer::default()))
99 .with_name("calc".to_string())
100 .build()
101 .unwrap();
102 println!("add subscriber");
103 store.add_subscriber(Arc::new(CalcSubscriber::default()));
104 let _ = store.dispatch(CalcAction::Add(1)).expect("no dispatch failed");
105
106 thread::sleep(std::time::Duration::from_secs(1));
107 println!("add more subscriber");
108 store.add_subscriber(Arc::new(CalcSubscriber::default()));
109 let _ = store.dispatch(CalcAction::Subtract(1));
110
111 // stop the store
112 store.stop();
113
114 assert_eq!(store.get_state().count, 0);
115}4fn main() {
5 // predicate 기반 drop 정책 사용 예시
6 // 작은 값들을 우선적으로 drop하는 predicate
7 let predicate = Arc::new(|action_op: &rs_store::ActionOp<i32>| {
8 match action_op {
9 rs_store::ActionOp::Action(value) => *value < 5, // 5보다 작은 값들은 drop
10 rs_store::ActionOp::Exit(_) => false, // Exit는 drop하지 않음
11 }
12 });
13
14 let policy = BackpressurePolicy::DropOldestIf { predicate };
15
16 // 매우 작은 capacity로 store 생성하여 backpressure 상황 시뮬레이션
17 let store = StoreBuilder::new(0)
18 .with_reducer(Box::new(FnReducer::from(|state: &i32, action: &i32| {
19 // reducer에서 지연을 추가하여 backpressure 상황 생성
20 std::thread::sleep(std::time::Duration::from_millis(200));
21 println!("reducer: {} + {}", state, action);
22 DispatchOp::Dispatch(state + action, None)
23 })))
24 .with_capacity(2) // 매우 작은 capacity로 설정
25 .with_policy(policy)
26 .build()
27 .unwrap();
28
29 // subscriber 추가
30 store.add_subscriber(Arc::new(FnSubscriber::from(|state: &i32, action: &i32| {
31 println!("subscriber: state: {}, action: {}", state, action);
32 })));
33
34 println!("=== Predicate 기반 Backpressure 테스트 ===");
35 println!("채널 capacity: 2");
36 println!("predicate: 5보다 작은 값들은 drop");
37 println!("reducer 지연: 200ms");
38 println!();
39
40 // 다양한 값들을 빠르게 dispatch
41 let actions = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
42
43 for action in actions {
44 println!("dispatch: {}", action);
45 match store.dispatch(action) {
46 Ok(_) => println!(" -> 성공"),
47 Err(e) => println!(" -> 실패: {:?}", e),
48 }
49
50 // 빠른 dispatch로 backpressure 상황 생성
51 std::thread::sleep(std::time::Duration::from_millis(50));
52 }
53
54 // 처리 완료 대기
55 std::thread::sleep(std::time::Duration::from_millis(3000));
56
57 // 최종 상태 확인
58 let final_state = store.get_state();
59 println!();
60 println!("최종 상태: {}", final_state);
61
62 // 메트릭 확인
63 let metrics = store.get_metrics();
64 println!("메트릭:");
65 println!(" - 받은 액션: {}", metrics.action_received);
66 println!(" - 처리된 액션: {}", metrics.action_reduced);
67 println!(" - drop된 액션: {}", metrics.action_dropped);
68
69 store.stop();
70}Sourcepub fn get_metrics(&self) -> MetricsSnapshot
pub fn get_metrics(&self) -> MetricsSnapshot
get the metrics
Examples found in repository?
4fn main() {
5 // predicate 기반 drop 정책 사용 예시
6 // 작은 값들을 우선적으로 drop하는 predicate
7 let predicate = Arc::new(|action_op: &rs_store::ActionOp<i32>| {
8 match action_op {
9 rs_store::ActionOp::Action(value) => *value < 5, // 5보다 작은 값들은 drop
10 rs_store::ActionOp::Exit(_) => false, // Exit는 drop하지 않음
11 }
12 });
13
14 let policy = BackpressurePolicy::DropOldestIf { predicate };
15
16 // 매우 작은 capacity로 store 생성하여 backpressure 상황 시뮬레이션
17 let store = StoreBuilder::new(0)
18 .with_reducer(Box::new(FnReducer::from(|state: &i32, action: &i32| {
19 // reducer에서 지연을 추가하여 backpressure 상황 생성
20 std::thread::sleep(std::time::Duration::from_millis(200));
21 println!("reducer: {} + {}", state, action);
22 DispatchOp::Dispatch(state + action, None)
23 })))
24 .with_capacity(2) // 매우 작은 capacity로 설정
25 .with_policy(policy)
26 .build()
27 .unwrap();
28
29 // subscriber 추가
30 store.add_subscriber(Arc::new(FnSubscriber::from(|state: &i32, action: &i32| {
31 println!("subscriber: state: {}, action: {}", state, action);
32 })));
33
34 println!("=== Predicate 기반 Backpressure 테스트 ===");
35 println!("채널 capacity: 2");
36 println!("predicate: 5보다 작은 값들은 drop");
37 println!("reducer 지연: 200ms");
38 println!();
39
40 // 다양한 값들을 빠르게 dispatch
41 let actions = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
42
43 for action in actions {
44 println!("dispatch: {}", action);
45 match store.dispatch(action) {
46 Ok(_) => println!(" -> 성공"),
47 Err(e) => println!(" -> 실패: {:?}", e),
48 }
49
50 // 빠른 dispatch로 backpressure 상황 생성
51 std::thread::sleep(std::time::Duration::from_millis(50));
52 }
53
54 // 처리 완료 대기
55 std::thread::sleep(std::time::Duration::from_millis(3000));
56
57 // 최종 상태 확인
58 let final_state = store.get_state();
59 println!();
60 println!("최종 상태: {}", final_state);
61
62 // 메트릭 확인
63 let metrics = store.get_metrics();
64 println!("메트릭:");
65 println!(" - 받은 액션: {}", metrics.action_received);
66 println!(" - 처리된 액션: {}", metrics.action_reduced);
67 println!(" - drop된 액션: {}", metrics.action_dropped);
68
69 store.stop();
70}Sourcepub fn add_reducer(
&self,
reducer: Box<dyn Reducer<State, Action> + Send + Sync>,
)
pub fn add_reducer( &self, reducer: Box<dyn Reducer<State, Action> + Send + Sync>, )
add a reducer to the store
Sourcepub fn add_subscriber(
&self,
subscriber: Arc<dyn Subscriber<State, Action> + Send + Sync>,
) -> Box<dyn Subscription>
pub fn add_subscriber( &self, subscriber: Arc<dyn Subscriber<State, Action> + Send + Sync>, ) -> Box<dyn Subscription>
add a subscriber to the store
Examples found in repository?
124pub fn main() {
125 println!("Hello, Effect!");
126
127 let store =
128 StoreBuilder::new_with_reducer(CalcState::default(), Box::new(CalcReducer::default()))
129 .with_name("store-effect".into())
130 .build()
131 .unwrap();
132
133 store.add_subscriber(Arc::new(CalcSubscriber::default()));
134 let _ = store.dispatch(CalcAction::AddWillProduceThunk(1));
135
136 store.stop();
137}More examples
93pub fn main() {
94 println!("Hello, Basic!");
95
96 let store = StoreImpl::<CalcState, CalcAction>::new_with_reducer(
97 CalcState::default(),
98 Box::new(CalcReducer::default()),
99 );
100
101 println!("add subscriber");
102 store.add_subscriber(Arc::new(CalcSubscriber::default()));
103 store.dispatch(CalcAction::Add(1)).expect("no dispatch failed");
104 store.dispatch(CalcAction::Subtract(1)).expect("no dispatch failed");
105
106 // stop the store
107 store.stop();
108
109 assert_eq!(store.get_state().count, 0);
110}58pub fn main() {
59 println!("Hello, reduce function !");
60
61 let store = StoreBuilder::new_with_reducer(
62 CalcState::default(),
63 Box::new(FnReducer::from(calc_reducer)),
64 )
65 .with_name("store-reduce-fn".into())
66 .build()
67 .unwrap();
68
69 store.add_subscriber(Arc::new(FnSubscriber::from(calc_subscriber)));
70 let _ = store.dispatch(CalcAction::Add(1));
71
72 thread::sleep(std::time::Duration::from_secs(1));
73 store.add_subscriber(Arc::new(FnSubscriber::from(calc_subscriber)));
74 let _ = store.dispatch(CalcAction::Subtract(1));
75
76 store.stop();
77}4pub fn main() {
5 // new store with reducer
6 let store = StoreBuilder::new(0)
7 .with_reducer(Box::new(FnReducer::from(|state: &i32, action: &i32| {
8 println!("reducer: {} + {}", state, action);
9 DispatchOp::Dispatch(state + action, None)
10 })))
11 .build()
12 .unwrap();
13
14 // add subscriber
15 store.add_subscriber(Arc::new(FnSubscriber::from(
16 |state: &i32, _action: &i32| {
17 println!("subscriber: state: {}", state);
18 },
19 )));
20
21 // dispatch actions
22 store.dispatch(41).expect("no error");
23 store.dispatch(1).expect("no error");
24
25 // stop the store
26 store.stop();
27
28 assert_eq!(store.get_state(), 42);
29}94pub fn main() {
95 println!("Hello, Builder!");
96
97 let store =
98 StoreBuilder::new_with_reducer(CalcState::default(), Box::new(CalcReducer::default()))
99 .with_name("calc".to_string())
100 .build()
101 .unwrap();
102 println!("add subscriber");
103 store.add_subscriber(Arc::new(CalcSubscriber::default()));
104 let _ = store.dispatch(CalcAction::Add(1)).expect("no dispatch failed");
105
106 thread::sleep(std::time::Duration::from_secs(1));
107 println!("add more subscriber");
108 store.add_subscriber(Arc::new(CalcSubscriber::default()));
109 let _ = store.dispatch(CalcAction::Subtract(1));
110
111 // stop the store
112 store.stop();
113
114 assert_eq!(store.get_state().count, 0);
115}107pub fn main() {
108 println!("Hello, Concurrent!");
109
110 let store =
111 StoreBuilder::new_with_reducer(CalcState::default(), Box::new(CalcReducer::default()))
112 .with_name("store-concurrent".into())
113 .build()
114 .unwrap();
115
116 println!("add subscriber");
117 store.add_subscriber(Arc::new(CalcSubscriber::default()));
118 let _ = store.dispatch(CalcAction::Add(1));
119
120 let store_clone = store.clone();
121 thread::spawn(move || {
122 thread::sleep(std::time::Duration::from_secs(1));
123
124 println!("add more subscriber");
125 store_clone.add_subscriber(Arc::new(CalcSubscriber::new(1)));
126 let _ = store_clone.dispatch(CalcAction::Subtract(1));
127 })
128 .join()
129 .unwrap();
130
131 store.stop();
132}Sourcepub fn close(&self)
pub fn close(&self)
close the store
send an exit action to the store and drop the dispatch channel
Sourcepub fn stop(&self)
pub fn stop(&self)
close the store and wait for the dispatcher to finish
Examples found in repository?
124pub fn main() {
125 println!("Hello, Effect!");
126
127 let store =
128 StoreBuilder::new_with_reducer(CalcState::default(), Box::new(CalcReducer::default()))
129 .with_name("store-effect".into())
130 .build()
131 .unwrap();
132
133 store.add_subscriber(Arc::new(CalcSubscriber::default()));
134 let _ = store.dispatch(CalcAction::AddWillProduceThunk(1));
135
136 store.stop();
137}More examples
93pub fn main() {
94 println!("Hello, Basic!");
95
96 let store = StoreImpl::<CalcState, CalcAction>::new_with_reducer(
97 CalcState::default(),
98 Box::new(CalcReducer::default()),
99 );
100
101 println!("add subscriber");
102 store.add_subscriber(Arc::new(CalcSubscriber::default()));
103 store.dispatch(CalcAction::Add(1)).expect("no dispatch failed");
104 store.dispatch(CalcAction::Subtract(1)).expect("no dispatch failed");
105
106 // stop the store
107 store.stop();
108
109 assert_eq!(store.get_state().count, 0);
110}58pub fn main() {
59 println!("Hello, reduce function !");
60
61 let store = StoreBuilder::new_with_reducer(
62 CalcState::default(),
63 Box::new(FnReducer::from(calc_reducer)),
64 )
65 .with_name("store-reduce-fn".into())
66 .build()
67 .unwrap();
68
69 store.add_subscriber(Arc::new(FnSubscriber::from(calc_subscriber)));
70 let _ = store.dispatch(CalcAction::Add(1));
71
72 thread::sleep(std::time::Duration::from_secs(1));
73 store.add_subscriber(Arc::new(FnSubscriber::from(calc_subscriber)));
74 let _ = store.dispatch(CalcAction::Subtract(1));
75
76 store.stop();
77}3fn main() {
4 // new store with reducer
5 let store = StoreImpl::new_with_reducer(
6 0,
7 Box::new(FnReducer::from(|state: &i32, action: &i32| {
8 println!("reducer: {} + {}", state, action);
9 DispatchOp::Dispatch(state + action, None)
10 })),
11 );
12
13 let mut iter = store.iter();
14
15 // dispatch actions
16 let _ = store.dispatch(41);
17 let _ = store.dispatch(1);
18
19 assert_eq!(iter.next(), Some((41, 41)));
20 assert_eq!(iter.next(), Some((42, 1)));
21
22 // stop the store
23 store.stop();
24
25 assert_eq!(iter.next(), None);
26 eprintln!("exit");
27}4pub fn main() {
5 // new store with reducer
6 let store = StoreBuilder::new(0)
7 .with_reducer(Box::new(FnReducer::from(|state: &i32, action: &i32| {
8 println!("reducer: {} + {}", state, action);
9 DispatchOp::Dispatch(state + action, None)
10 })))
11 .build()
12 .unwrap();
13
14 // add subscriber
15 store.add_subscriber(Arc::new(FnSubscriber::from(
16 |state: &i32, _action: &i32| {
17 println!("subscriber: state: {}", state);
18 },
19 )));
20
21 // dispatch actions
22 store.dispatch(41).expect("no error");
23 store.dispatch(1).expect("no error");
24
25 // stop the store
26 store.stop();
27
28 assert_eq!(store.get_state(), 42);
29}94pub fn main() {
95 println!("Hello, Builder!");
96
97 let store =
98 StoreBuilder::new_with_reducer(CalcState::default(), Box::new(CalcReducer::default()))
99 .with_name("calc".to_string())
100 .build()
101 .unwrap();
102 println!("add subscriber");
103 store.add_subscriber(Arc::new(CalcSubscriber::default()));
104 let _ = store.dispatch(CalcAction::Add(1)).expect("no dispatch failed");
105
106 thread::sleep(std::time::Duration::from_secs(1));
107 println!("add more subscriber");
108 store.add_subscriber(Arc::new(CalcSubscriber::default()));
109 let _ = store.dispatch(CalcAction::Subtract(1));
110
111 // stop the store
112 store.stop();
113
114 assert_eq!(store.get_state().count, 0);
115}Sourcepub fn stop_with_timeout(&self, timeout: Duration)
pub fn stop_with_timeout(&self, timeout: Duration)
close the store and wait for the dispatcher to finish
Sourcepub fn dispatch(&self, action: Action) -> Result<(), StoreError>
pub fn dispatch(&self, action: Action) -> Result<(), StoreError>
dispatch an action
§Return
- Ok(()) : if the action is dispatched
- Err(StoreError) : if the dispatch channel is closed
Examples found in repository?
58pub fn main() {
59 println!("Hello, reduce function !");
60
61 let store = StoreBuilder::new_with_reducer(
62 CalcState::default(),
63 Box::new(FnReducer::from(calc_reducer)),
64 )
65 .with_name("store-reduce-fn".into())
66 .build()
67 .unwrap();
68
69 store.add_subscriber(Arc::new(FnSubscriber::from(calc_subscriber)));
70 let _ = store.dispatch(CalcAction::Add(1));
71
72 thread::sleep(std::time::Duration::from_secs(1));
73 store.add_subscriber(Arc::new(FnSubscriber::from(calc_subscriber)));
74 let _ = store.dispatch(CalcAction::Subtract(1));
75
76 store.stop();
77}More examples
3fn main() {
4 // new store with reducer
5 let store = StoreImpl::new_with_reducer(
6 0,
7 Box::new(FnReducer::from(|state: &i32, action: &i32| {
8 println!("reducer: {} + {}", state, action);
9 DispatchOp::Dispatch(state + action, None)
10 })),
11 );
12
13 let mut iter = store.iter();
14
15 // dispatch actions
16 let _ = store.dispatch(41);
17 let _ = store.dispatch(1);
18
19 assert_eq!(iter.next(), Some((41, 41)));
20 assert_eq!(iter.next(), Some((42, 1)));
21
22 // stop the store
23 store.stop();
24
25 assert_eq!(iter.next(), None);
26 eprintln!("exit");
27}94pub fn main() {
95 println!("Hello, Builder!");
96
97 let store =
98 StoreBuilder::new_with_reducer(CalcState::default(), Box::new(CalcReducer::default()))
99 .with_name("calc".to_string())
100 .build()
101 .unwrap();
102 println!("add subscriber");
103 store.add_subscriber(Arc::new(CalcSubscriber::default()));
104 let _ = store.dispatch(CalcAction::Add(1)).expect("no dispatch failed");
105
106 thread::sleep(std::time::Duration::from_secs(1));
107 println!("add more subscriber");
108 store.add_subscriber(Arc::new(CalcSubscriber::default()));
109 let _ = store.dispatch(CalcAction::Subtract(1));
110
111 // stop the store
112 store.stop();
113
114 assert_eq!(store.get_state().count, 0);
115}107pub fn main() {
108 println!("Hello, Concurrent!");
109
110 let store =
111 StoreBuilder::new_with_reducer(CalcState::default(), Box::new(CalcReducer::default()))
112 .with_name("store-concurrent".into())
113 .build()
114 .unwrap();
115
116 println!("add subscriber");
117 store.add_subscriber(Arc::new(CalcSubscriber::default()));
118 let _ = store.dispatch(CalcAction::Add(1));
119
120 let store_clone = store.clone();
121 thread::spawn(move || {
122 thread::sleep(std::time::Duration::from_secs(1));
123
124 println!("add more subscriber");
125 store_clone.add_subscriber(Arc::new(CalcSubscriber::new(1)));
126 let _ = store_clone.dispatch(CalcAction::Subtract(1));
127 })
128 .join()
129 .unwrap();
130
131 store.stop();
132}107pub fn main() {
108 println!("Hello, Unsubscribe!");
109
110 let store =
111 StoreBuilder::new_with_reducer(CalcState::default(), Box::new(CalcReducer::default()))
112 .with_name("store-unsubscribe".into())
113 .build()
114 .unwrap();
115
116 println!("add subscriber");
117 store.add_subscriber(Arc::new(CalcSubscriber::default()));
118 let _ = store.dispatch(CalcAction::Add(1));
119
120 let store_clone = store.clone();
121 let handle = thread::spawn(move || {
122 thread::sleep(std::time::Duration::from_secs(1));
123
124 // subscribe
125 println!("add more subscriber");
126 let subscription = store_clone.add_subscriber(Arc::new(CalcSubscriber::new(1)));
127 let _ = store_clone.dispatch(CalcAction::Subtract(1));
128 subscription
129 });
130
131 let subscription = handle.join().unwrap();
132
133 println!("Unsubscribing...");
134 subscription.unsubscribe();
135
136 println!("Send 42...");
137 let _ = store.dispatch(CalcAction::Add(42));
138
139 store.stop();
140
141 println!("Done!");
142}4fn main() {
5 // predicate 기반 drop 정책 사용 예시
6 // 작은 값들을 우선적으로 drop하는 predicate
7 let predicate = Arc::new(|action_op: &rs_store::ActionOp<i32>| {
8 match action_op {
9 rs_store::ActionOp::Action(value) => *value < 5, // 5보다 작은 값들은 drop
10 rs_store::ActionOp::Exit(_) => false, // Exit는 drop하지 않음
11 }
12 });
13
14 let policy = BackpressurePolicy::DropOldestIf { predicate };
15
16 // 매우 작은 capacity로 store 생성하여 backpressure 상황 시뮬레이션
17 let store = StoreBuilder::new(0)
18 .with_reducer(Box::new(FnReducer::from(|state: &i32, action: &i32| {
19 // reducer에서 지연을 추가하여 backpressure 상황 생성
20 std::thread::sleep(std::time::Duration::from_millis(200));
21 println!("reducer: {} + {}", state, action);
22 DispatchOp::Dispatch(state + action, None)
23 })))
24 .with_capacity(2) // 매우 작은 capacity로 설정
25 .with_policy(policy)
26 .build()
27 .unwrap();
28
29 // subscriber 추가
30 store.add_subscriber(Arc::new(FnSubscriber::from(|state: &i32, action: &i32| {
31 println!("subscriber: state: {}, action: {}", state, action);
32 })));
33
34 println!("=== Predicate 기반 Backpressure 테스트 ===");
35 println!("채널 capacity: 2");
36 println!("predicate: 5보다 작은 값들은 drop");
37 println!("reducer 지연: 200ms");
38 println!();
39
40 // 다양한 값들을 빠르게 dispatch
41 let actions = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
42
43 for action in actions {
44 println!("dispatch: {}", action);
45 match store.dispatch(action) {
46 Ok(_) => println!(" -> 성공"),
47 Err(e) => println!(" -> 실패: {:?}", e),
48 }
49
50 // 빠른 dispatch로 backpressure 상황 생성
51 std::thread::sleep(std::time::Duration::from_millis(50));
52 }
53
54 // 처리 완료 대기
55 std::thread::sleep(std::time::Duration::from_millis(3000));
56
57 // 최종 상태 확인
58 let final_state = store.get_state();
59 println!();
60 println!("최종 상태: {}", final_state);
61
62 // 메트릭 확인
63 let metrics = store.get_metrics();
64 println!("메트릭:");
65 println!(" - 받은 액션: {}", metrics.action_received);
66 println!(" - 처리된 액션: {}", metrics.action_reduced);
67 println!(" - drop된 액션: {}", metrics.action_dropped);
68
69 store.stop();
70}Sourcepub fn add_middleware(
&self,
middleware: Arc<dyn Middleware<State, Action> + Send + Sync>,
)
pub fn add_middleware( &self, middleware: Arc<dyn Middleware<State, Action> + Send + Sync>, )
Add middleware
Sourcepub fn iter(&self) -> impl Iterator<Item = (State, Action)>
pub fn iter(&self) -> impl Iterator<Item = (State, Action)>
Iterator for the state
it uses a channel to subscribe to the state changes the channel is rendezvous(capacity 1), the store will block on the channel until the subscriber consumes the state
Examples found in repository?
3fn main() {
4 // new store with reducer
5 let store = StoreImpl::new_with_reducer(
6 0,
7 Box::new(FnReducer::from(|state: &i32, action: &i32| {
8 println!("reducer: {} + {}", state, action);
9 DispatchOp::Dispatch(state + action, None)
10 })),
11 );
12
13 let mut iter = store.iter();
14
15 // dispatch actions
16 let _ = store.dispatch(41);
17 let _ = store.dispatch(1);
18
19 assert_eq!(iter.next(), Some((41, 41)));
20 assert_eq!(iter.next(), Some((42, 1)));
21
22 // stop the store
23 store.stop();
24
25 assert_eq!(iter.next(), None);
26 eprintln!("exit");
27}Sourcepub fn subscribed(
&self,
subscriber: Box<dyn Subscriber<State, Action> + Send + Sync>,
) -> Result<Box<dyn Subscription>, StoreError>
pub fn subscribed( &self, subscriber: Box<dyn Subscriber<State, Action> + Send + Sync>, ) -> Result<Box<dyn Subscription>, StoreError>
Sourcepub fn subscribed_with(
&self,
capacity: usize,
policy: BackpressurePolicy<(Instant, State, Action)>,
subscriber: Box<dyn Subscriber<State, Action> + Send + Sync>,
) -> Result<Box<dyn Subscription>, StoreError>
pub fn subscribed_with( &self, capacity: usize, policy: BackpressurePolicy<(Instant, State, Action)>, subscriber: Box<dyn Subscriber<State, Action> + Send + Sync>, ) -> Result<Box<dyn Subscription>, StoreError>
Trait Implementations§
Source§impl<State, Action> Drop for StoreImpl<State, Action>
close tx channel when the store is dropped, but not the dispatcher
if you want to stop the dispatcher, call the stop method
impl<State, Action> Drop for StoreImpl<State, Action>
close tx channel when the store is dropped, but not the dispatcher if you want to stop the dispatcher, call the stop method