Struct StoreImpl

Source
pub struct StoreImpl<State, Action>
where State: Send + Sync + Clone + 'static, Action: Send + Sync + Clone + 'static,
{ /* private fields */ }
Expand description

StoreImpl is the default implementation of a Redux store

Implementations§

Source§

impl<State, Action> StoreImpl<State, Action>
where State: Send + Sync + Clone + 'static, Action: Send + Sync + Clone + 'static,

Source

pub fn new(state: State) -> Arc<StoreImpl<State, Action>>

create a new store with an initial state

Source

pub fn new_with_reducer( state: State, reducer: Box<dyn Reducer<State, Action> + Send + Sync>, ) -> Arc<StoreImpl<State, Action>>

create a new store with a reducer and an initial state

Examples found in repository?
examples/calc_basic.rs (lines 96-99)
93pub fn main() {
94    println!("Hello, Basic!");
95
96    let store = StoreImpl::<CalcState, CalcAction>::new_with_reducer(
97        CalcState::default(),
98        Box::new(CalcReducer::default()),
99    );
100
101    println!("add subscriber");
102    store.add_subscriber(Arc::new(CalcSubscriber::default()));
103    store.dispatch(CalcAction::Add(1)).expect("no dispatch failed");
104    store.dispatch(CalcAction::Subtract(1)).expect("no dispatch failed");
105
106    // stop the store
107    store.stop();
108
109    assert_eq!(store.get_state().count, 0);
110}
More examples
Hide additional examples
examples/iter_state.rs (lines 5-11)
3fn main() {
4    // new store with reducer
5    let store = StoreImpl::new_with_reducer(
6        0,
7        Box::new(FnReducer::from(|state: &i32, action: &i32| {
8            println!("reducer: {} + {}", state, action);
9            DispatchOp::Dispatch(state + action, None)
10        })),
11    );
12
13    let mut iter = store.iter();
14
15    // dispatch actions
16    let _ = store.dispatch(41);
17    let _ = store.dispatch(1);
18
19    assert_eq!(iter.next(), Some((41, 41)));
20    assert_eq!(iter.next(), Some((42, 1)));
21
22    // stop the store
23    store.stop();
24
25    assert_eq!(iter.next(), None);
26    eprintln!("exit");
27}
Source

pub fn new_with_name( state: State, reducer: Box<dyn Reducer<State, Action> + Send + Sync>, name: String, ) -> Result<Arc<StoreImpl<State, Action>>, StoreError>

create a new store with name

Source

pub fn new_with( state: State, reducers: Vec<Box<dyn Reducer<State, Action> + Send + Sync>>, name: String, capacity: usize, policy: BackpressurePolicy<Action>, middlewares: Vec<Arc<dyn Middleware<State, Action> + Send + Sync>>, ) -> Result<Arc<StoreImpl<State, Action>>, StoreError>

create a new store

Source

pub fn get_state(&self) -> State

get the latest state(for debugging)

prefer to use subscribe to get the state

Examples found in repository?
examples/calc_basic.rs (line 109)
93pub fn main() {
94    println!("Hello, Basic!");
95
96    let store = StoreImpl::<CalcState, CalcAction>::new_with_reducer(
97        CalcState::default(),
98        Box::new(CalcReducer::default()),
99    );
100
101    println!("add subscriber");
102    store.add_subscriber(Arc::new(CalcSubscriber::default()));
103    store.dispatch(CalcAction::Add(1)).expect("no dispatch failed");
104    store.dispatch(CalcAction::Subtract(1)).expect("no dispatch failed");
105
106    // stop the store
107    store.stop();
108
109    assert_eq!(store.get_state().count, 0);
110}
More examples
Hide additional examples
examples/simple.rs (line 28)
4pub fn main() {
5    // new store with reducer
6    let store = StoreBuilder::new(0)
7        .with_reducer(Box::new(FnReducer::from(|state: &i32, action: &i32| {
8            println!("reducer: {} + {}", state, action);
9            DispatchOp::Dispatch(state + action, None)
10        })))
11        .build()
12        .unwrap();
13
14    // add subscriber
15    store.add_subscriber(Arc::new(FnSubscriber::from(
16        |state: &i32, _action: &i32| {
17            println!("subscriber: state: {}", state);
18        },
19    )));
20
21    // dispatch actions
22    store.dispatch(41).expect("no error");
23    store.dispatch(1).expect("no error");
24
25    // stop the store
26    store.stop();
27
28    assert_eq!(store.get_state(), 42);
29}
examples/calc_basic_builder.rs (line 114)
94pub fn main() {
95    println!("Hello, Builder!");
96
97    let store =
98        StoreBuilder::new_with_reducer(CalcState::default(), Box::new(CalcReducer::default()))
99            .with_name("calc".to_string())
100            .build()
101            .unwrap();
102    println!("add subscriber");
103    store.add_subscriber(Arc::new(CalcSubscriber::default()));
104    let _ = store.dispatch(CalcAction::Add(1)).expect("no dispatch failed");
105
106    thread::sleep(std::time::Duration::from_secs(1));
107    println!("add more subscriber");
108    store.add_subscriber(Arc::new(CalcSubscriber::default()));
109    let _ = store.dispatch(CalcAction::Subtract(1));
110
111    // stop the store
112    store.stop();
113
114    assert_eq!(store.get_state().count, 0);
115}
examples/dropoldest_if.rs (line 58)
4fn main() {
5    // predicate 기반 drop 정책 사용 예시
6    // 작은 값들을 우선적으로 drop하는 predicate
7    let predicate = Arc::new(|action_op: &rs_store::ActionOp<i32>| {
8        match action_op {
9            rs_store::ActionOp::Action(value) => *value < 5, // 5보다 작은 값들은 drop
10            rs_store::ActionOp::Exit(_) => false,            // Exit는 drop하지 않음
11        }
12    });
13
14    let policy = BackpressurePolicy::DropOldestIf { predicate };
15
16    // 매우 작은 capacity로 store 생성하여 backpressure 상황 시뮬레이션
17    let store = StoreBuilder::new(0)
18        .with_reducer(Box::new(FnReducer::from(|state: &i32, action: &i32| {
19            // reducer에서 지연을 추가하여 backpressure 상황 생성
20            std::thread::sleep(std::time::Duration::from_millis(200));
21            println!("reducer: {} + {}", state, action);
22            DispatchOp::Dispatch(state + action, None)
23        })))
24        .with_capacity(2) // 매우 작은 capacity로 설정
25        .with_policy(policy)
26        .build()
27        .unwrap();
28
29    // subscriber 추가
30    store.add_subscriber(Arc::new(FnSubscriber::from(|state: &i32, action: &i32| {
31        println!("subscriber: state: {}, action: {}", state, action);
32    })));
33
34    println!("=== Predicate 기반 Backpressure 테스트 ===");
35    println!("채널 capacity: 2");
36    println!("predicate: 5보다 작은 값들은 drop");
37    println!("reducer 지연: 200ms");
38    println!();
39
40    // 다양한 값들을 빠르게 dispatch
41    let actions = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
42
43    for action in actions {
44        println!("dispatch: {}", action);
45        match store.dispatch(action) {
46            Ok(_) => println!("  -> 성공"),
47            Err(e) => println!("  -> 실패: {:?}", e),
48        }
49
50        // 빠른 dispatch로 backpressure 상황 생성
51        std::thread::sleep(std::time::Duration::from_millis(50));
52    }
53
54    // 처리 완료 대기
55    std::thread::sleep(std::time::Duration::from_millis(3000));
56
57    // 최종 상태 확인
58    let final_state = store.get_state();
59    println!();
60    println!("최종 상태: {}", final_state);
61
62    // 메트릭 확인
63    let metrics = store.get_metrics();
64    println!("메트릭:");
65    println!("  - 받은 액션: {}", metrics.action_received);
66    println!("  - 처리된 액션: {}", metrics.action_reduced);
67    println!("  - drop된 액션: {}", metrics.action_dropped);
68
69    store.stop();
70}
Source

pub fn get_metrics(&self) -> MetricsSnapshot

get the metrics

Examples found in repository?
examples/dropoldest_if.rs (line 63)
4fn main() {
5    // predicate 기반 drop 정책 사용 예시
6    // 작은 값들을 우선적으로 drop하는 predicate
7    let predicate = Arc::new(|action_op: &rs_store::ActionOp<i32>| {
8        match action_op {
9            rs_store::ActionOp::Action(value) => *value < 5, // 5보다 작은 값들은 drop
10            rs_store::ActionOp::Exit(_) => false,            // Exit는 drop하지 않음
11        }
12    });
13
14    let policy = BackpressurePolicy::DropOldestIf { predicate };
15
16    // 매우 작은 capacity로 store 생성하여 backpressure 상황 시뮬레이션
17    let store = StoreBuilder::new(0)
18        .with_reducer(Box::new(FnReducer::from(|state: &i32, action: &i32| {
19            // reducer에서 지연을 추가하여 backpressure 상황 생성
20            std::thread::sleep(std::time::Duration::from_millis(200));
21            println!("reducer: {} + {}", state, action);
22            DispatchOp::Dispatch(state + action, None)
23        })))
24        .with_capacity(2) // 매우 작은 capacity로 설정
25        .with_policy(policy)
26        .build()
27        .unwrap();
28
29    // subscriber 추가
30    store.add_subscriber(Arc::new(FnSubscriber::from(|state: &i32, action: &i32| {
31        println!("subscriber: state: {}, action: {}", state, action);
32    })));
33
34    println!("=== Predicate 기반 Backpressure 테스트 ===");
35    println!("채널 capacity: 2");
36    println!("predicate: 5보다 작은 값들은 drop");
37    println!("reducer 지연: 200ms");
38    println!();
39
40    // 다양한 값들을 빠르게 dispatch
41    let actions = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
42
43    for action in actions {
44        println!("dispatch: {}", action);
45        match store.dispatch(action) {
46            Ok(_) => println!("  -> 성공"),
47            Err(e) => println!("  -> 실패: {:?}", e),
48        }
49
50        // 빠른 dispatch로 backpressure 상황 생성
51        std::thread::sleep(std::time::Duration::from_millis(50));
52    }
53
54    // 처리 완료 대기
55    std::thread::sleep(std::time::Duration::from_millis(3000));
56
57    // 최종 상태 확인
58    let final_state = store.get_state();
59    println!();
60    println!("최종 상태: {}", final_state);
61
62    // 메트릭 확인
63    let metrics = store.get_metrics();
64    println!("메트릭:");
65    println!("  - 받은 액션: {}", metrics.action_received);
66    println!("  - 처리된 액션: {}", metrics.action_reduced);
67    println!("  - drop된 액션: {}", metrics.action_dropped);
68
69    store.stop();
70}
Source

pub fn add_reducer( &self, reducer: Box<dyn Reducer<State, Action> + Send + Sync>, )

add a reducer to the store

Source

pub fn add_subscriber( &self, subscriber: Arc<dyn Subscriber<State, Action> + Send + Sync>, ) -> Box<dyn Subscription>

add a subscriber to the store

Examples found in repository?
examples/calc_effect.rs (line 133)
124pub fn main() {
125    println!("Hello, Effect!");
126
127    let store =
128        StoreBuilder::new_with_reducer(CalcState::default(), Box::new(CalcReducer::default()))
129            .with_name("store-effect".into())
130            .build()
131            .unwrap();
132
133    store.add_subscriber(Arc::new(CalcSubscriber::default()));
134    let _ = store.dispatch(CalcAction::AddWillProduceThunk(1));
135
136    store.stop();
137}
More examples
Hide additional examples
examples/calc_basic.rs (line 102)
93pub fn main() {
94    println!("Hello, Basic!");
95
96    let store = StoreImpl::<CalcState, CalcAction>::new_with_reducer(
97        CalcState::default(),
98        Box::new(CalcReducer::default()),
99    );
100
101    println!("add subscriber");
102    store.add_subscriber(Arc::new(CalcSubscriber::default()));
103    store.dispatch(CalcAction::Add(1)).expect("no dispatch failed");
104    store.dispatch(CalcAction::Subtract(1)).expect("no dispatch failed");
105
106    // stop the store
107    store.stop();
108
109    assert_eq!(store.get_state().count, 0);
110}
examples/calc_fn_basic.rs (line 69)
58pub fn main() {
59    println!("Hello, reduce function !");
60
61    let store = StoreBuilder::new_with_reducer(
62        CalcState::default(),
63        Box::new(FnReducer::from(calc_reducer)),
64    )
65    .with_name("store-reduce-fn".into())
66    .build()
67    .unwrap();
68
69    store.add_subscriber(Arc::new(FnSubscriber::from(calc_subscriber)));
70    let _ = store.dispatch(CalcAction::Add(1));
71
72    thread::sleep(std::time::Duration::from_secs(1));
73    store.add_subscriber(Arc::new(FnSubscriber::from(calc_subscriber)));
74    let _ = store.dispatch(CalcAction::Subtract(1));
75
76    store.stop();
77}
examples/simple.rs (lines 15-19)
4pub fn main() {
5    // new store with reducer
6    let store = StoreBuilder::new(0)
7        .with_reducer(Box::new(FnReducer::from(|state: &i32, action: &i32| {
8            println!("reducer: {} + {}", state, action);
9            DispatchOp::Dispatch(state + action, None)
10        })))
11        .build()
12        .unwrap();
13
14    // add subscriber
15    store.add_subscriber(Arc::new(FnSubscriber::from(
16        |state: &i32, _action: &i32| {
17            println!("subscriber: state: {}", state);
18        },
19    )));
20
21    // dispatch actions
22    store.dispatch(41).expect("no error");
23    store.dispatch(1).expect("no error");
24
25    // stop the store
26    store.stop();
27
28    assert_eq!(store.get_state(), 42);
29}
examples/calc_basic_builder.rs (line 103)
94pub fn main() {
95    println!("Hello, Builder!");
96
97    let store =
98        StoreBuilder::new_with_reducer(CalcState::default(), Box::new(CalcReducer::default()))
99            .with_name("calc".to_string())
100            .build()
101            .unwrap();
102    println!("add subscriber");
103    store.add_subscriber(Arc::new(CalcSubscriber::default()));
104    let _ = store.dispatch(CalcAction::Add(1)).expect("no dispatch failed");
105
106    thread::sleep(std::time::Duration::from_secs(1));
107    println!("add more subscriber");
108    store.add_subscriber(Arc::new(CalcSubscriber::default()));
109    let _ = store.dispatch(CalcAction::Subtract(1));
110
111    // stop the store
112    store.stop();
113
114    assert_eq!(store.get_state().count, 0);
115}
examples/calc_concurrent.rs (line 117)
107pub fn main() {
108    println!("Hello, Concurrent!");
109
110    let store =
111        StoreBuilder::new_with_reducer(CalcState::default(), Box::new(CalcReducer::default()))
112            .with_name("store-concurrent".into())
113            .build()
114            .unwrap();
115
116    println!("add subscriber");
117    store.add_subscriber(Arc::new(CalcSubscriber::default()));
118    let _ = store.dispatch(CalcAction::Add(1));
119
120    let store_clone = store.clone();
121    thread::spawn(move || {
122        thread::sleep(std::time::Duration::from_secs(1));
123
124        println!("add more subscriber");
125        store_clone.add_subscriber(Arc::new(CalcSubscriber::new(1)));
126        let _ = store_clone.dispatch(CalcAction::Subtract(1));
127    })
128    .join()
129    .unwrap();
130
131    store.stop();
132}
Source

pub fn close(&self)

close the store

send an exit action to the store and drop the dispatch channel

Source

pub fn stop(&self)

close the store and wait for the dispatcher to finish

Examples found in repository?
examples/calc_effect.rs (line 136)
124pub fn main() {
125    println!("Hello, Effect!");
126
127    let store =
128        StoreBuilder::new_with_reducer(CalcState::default(), Box::new(CalcReducer::default()))
129            .with_name("store-effect".into())
130            .build()
131            .unwrap();
132
133    store.add_subscriber(Arc::new(CalcSubscriber::default()));
134    let _ = store.dispatch(CalcAction::AddWillProduceThunk(1));
135
136    store.stop();
137}
More examples
Hide additional examples
examples/calc_basic.rs (line 107)
93pub fn main() {
94    println!("Hello, Basic!");
95
96    let store = StoreImpl::<CalcState, CalcAction>::new_with_reducer(
97        CalcState::default(),
98        Box::new(CalcReducer::default()),
99    );
100
101    println!("add subscriber");
102    store.add_subscriber(Arc::new(CalcSubscriber::default()));
103    store.dispatch(CalcAction::Add(1)).expect("no dispatch failed");
104    store.dispatch(CalcAction::Subtract(1)).expect("no dispatch failed");
105
106    // stop the store
107    store.stop();
108
109    assert_eq!(store.get_state().count, 0);
110}
examples/calc_fn_basic.rs (line 76)
58pub fn main() {
59    println!("Hello, reduce function !");
60
61    let store = StoreBuilder::new_with_reducer(
62        CalcState::default(),
63        Box::new(FnReducer::from(calc_reducer)),
64    )
65    .with_name("store-reduce-fn".into())
66    .build()
67    .unwrap();
68
69    store.add_subscriber(Arc::new(FnSubscriber::from(calc_subscriber)));
70    let _ = store.dispatch(CalcAction::Add(1));
71
72    thread::sleep(std::time::Duration::from_secs(1));
73    store.add_subscriber(Arc::new(FnSubscriber::from(calc_subscriber)));
74    let _ = store.dispatch(CalcAction::Subtract(1));
75
76    store.stop();
77}
examples/iter_state.rs (line 23)
3fn main() {
4    // new store with reducer
5    let store = StoreImpl::new_with_reducer(
6        0,
7        Box::new(FnReducer::from(|state: &i32, action: &i32| {
8            println!("reducer: {} + {}", state, action);
9            DispatchOp::Dispatch(state + action, None)
10        })),
11    );
12
13    let mut iter = store.iter();
14
15    // dispatch actions
16    let _ = store.dispatch(41);
17    let _ = store.dispatch(1);
18
19    assert_eq!(iter.next(), Some((41, 41)));
20    assert_eq!(iter.next(), Some((42, 1)));
21
22    // stop the store
23    store.stop();
24
25    assert_eq!(iter.next(), None);
26    eprintln!("exit");
27}
examples/simple.rs (line 26)
4pub fn main() {
5    // new store with reducer
6    let store = StoreBuilder::new(0)
7        .with_reducer(Box::new(FnReducer::from(|state: &i32, action: &i32| {
8            println!("reducer: {} + {}", state, action);
9            DispatchOp::Dispatch(state + action, None)
10        })))
11        .build()
12        .unwrap();
13
14    // add subscriber
15    store.add_subscriber(Arc::new(FnSubscriber::from(
16        |state: &i32, _action: &i32| {
17            println!("subscriber: state: {}", state);
18        },
19    )));
20
21    // dispatch actions
22    store.dispatch(41).expect("no error");
23    store.dispatch(1).expect("no error");
24
25    // stop the store
26    store.stop();
27
28    assert_eq!(store.get_state(), 42);
29}
examples/calc_basic_builder.rs (line 112)
94pub fn main() {
95    println!("Hello, Builder!");
96
97    let store =
98        StoreBuilder::new_with_reducer(CalcState::default(), Box::new(CalcReducer::default()))
99            .with_name("calc".to_string())
100            .build()
101            .unwrap();
102    println!("add subscriber");
103    store.add_subscriber(Arc::new(CalcSubscriber::default()));
104    let _ = store.dispatch(CalcAction::Add(1)).expect("no dispatch failed");
105
106    thread::sleep(std::time::Duration::from_secs(1));
107    println!("add more subscriber");
108    store.add_subscriber(Arc::new(CalcSubscriber::default()));
109    let _ = store.dispatch(CalcAction::Subtract(1));
110
111    // stop the store
112    store.stop();
113
114    assert_eq!(store.get_state().count, 0);
115}
Source

pub fn stop_with_timeout(&self, timeout: Duration)

close the store and wait for the dispatcher to finish

Source

pub fn dispatch(&self, action: Action) -> Result<(), StoreError>

dispatch an action

§Return
  • Ok(()) : if the action is dispatched
  • Err(StoreError) : if the dispatch channel is closed
Examples found in repository?
examples/calc_fn_basic.rs (line 70)
58pub fn main() {
59    println!("Hello, reduce function !");
60
61    let store = StoreBuilder::new_with_reducer(
62        CalcState::default(),
63        Box::new(FnReducer::from(calc_reducer)),
64    )
65    .with_name("store-reduce-fn".into())
66    .build()
67    .unwrap();
68
69    store.add_subscriber(Arc::new(FnSubscriber::from(calc_subscriber)));
70    let _ = store.dispatch(CalcAction::Add(1));
71
72    thread::sleep(std::time::Duration::from_secs(1));
73    store.add_subscriber(Arc::new(FnSubscriber::from(calc_subscriber)));
74    let _ = store.dispatch(CalcAction::Subtract(1));
75
76    store.stop();
77}
More examples
Hide additional examples
examples/iter_state.rs (line 16)
3fn main() {
4    // new store with reducer
5    let store = StoreImpl::new_with_reducer(
6        0,
7        Box::new(FnReducer::from(|state: &i32, action: &i32| {
8            println!("reducer: {} + {}", state, action);
9            DispatchOp::Dispatch(state + action, None)
10        })),
11    );
12
13    let mut iter = store.iter();
14
15    // dispatch actions
16    let _ = store.dispatch(41);
17    let _ = store.dispatch(1);
18
19    assert_eq!(iter.next(), Some((41, 41)));
20    assert_eq!(iter.next(), Some((42, 1)));
21
22    // stop the store
23    store.stop();
24
25    assert_eq!(iter.next(), None);
26    eprintln!("exit");
27}
examples/calc_basic_builder.rs (line 104)
94pub fn main() {
95    println!("Hello, Builder!");
96
97    let store =
98        StoreBuilder::new_with_reducer(CalcState::default(), Box::new(CalcReducer::default()))
99            .with_name("calc".to_string())
100            .build()
101            .unwrap();
102    println!("add subscriber");
103    store.add_subscriber(Arc::new(CalcSubscriber::default()));
104    let _ = store.dispatch(CalcAction::Add(1)).expect("no dispatch failed");
105
106    thread::sleep(std::time::Duration::from_secs(1));
107    println!("add more subscriber");
108    store.add_subscriber(Arc::new(CalcSubscriber::default()));
109    let _ = store.dispatch(CalcAction::Subtract(1));
110
111    // stop the store
112    store.stop();
113
114    assert_eq!(store.get_state().count, 0);
115}
examples/calc_concurrent.rs (line 118)
107pub fn main() {
108    println!("Hello, Concurrent!");
109
110    let store =
111        StoreBuilder::new_with_reducer(CalcState::default(), Box::new(CalcReducer::default()))
112            .with_name("store-concurrent".into())
113            .build()
114            .unwrap();
115
116    println!("add subscriber");
117    store.add_subscriber(Arc::new(CalcSubscriber::default()));
118    let _ = store.dispatch(CalcAction::Add(1));
119
120    let store_clone = store.clone();
121    thread::spawn(move || {
122        thread::sleep(std::time::Duration::from_secs(1));
123
124        println!("add more subscriber");
125        store_clone.add_subscriber(Arc::new(CalcSubscriber::new(1)));
126        let _ = store_clone.dispatch(CalcAction::Subtract(1));
127    })
128    .join()
129    .unwrap();
130
131    store.stop();
132}
examples/calc_unsubscribe.rs (line 118)
107pub fn main() {
108    println!("Hello, Unsubscribe!");
109
110    let store =
111        StoreBuilder::new_with_reducer(CalcState::default(), Box::new(CalcReducer::default()))
112            .with_name("store-unsubscribe".into())
113            .build()
114            .unwrap();
115
116    println!("add subscriber");
117    store.add_subscriber(Arc::new(CalcSubscriber::default()));
118    let _ = store.dispatch(CalcAction::Add(1));
119
120    let store_clone = store.clone();
121    let handle = thread::spawn(move || {
122        thread::sleep(std::time::Duration::from_secs(1));
123
124        // subscribe
125        println!("add more subscriber");
126        let subscription = store_clone.add_subscriber(Arc::new(CalcSubscriber::new(1)));
127        let _ = store_clone.dispatch(CalcAction::Subtract(1));
128        subscription
129    });
130
131    let subscription = handle.join().unwrap();
132
133    println!("Unsubscribing...");
134    subscription.unsubscribe();
135
136    println!("Send 42...");
137    let _ = store.dispatch(CalcAction::Add(42));
138
139    store.stop();
140
141    println!("Done!");
142}
examples/dropoldest_if.rs (line 45)
4fn main() {
5    // predicate 기반 drop 정책 사용 예시
6    // 작은 값들을 우선적으로 drop하는 predicate
7    let predicate = Arc::new(|action_op: &rs_store::ActionOp<i32>| {
8        match action_op {
9            rs_store::ActionOp::Action(value) => *value < 5, // 5보다 작은 값들은 drop
10            rs_store::ActionOp::Exit(_) => false,            // Exit는 drop하지 않음
11        }
12    });
13
14    let policy = BackpressurePolicy::DropOldestIf { predicate };
15
16    // 매우 작은 capacity로 store 생성하여 backpressure 상황 시뮬레이션
17    let store = StoreBuilder::new(0)
18        .with_reducer(Box::new(FnReducer::from(|state: &i32, action: &i32| {
19            // reducer에서 지연을 추가하여 backpressure 상황 생성
20            std::thread::sleep(std::time::Duration::from_millis(200));
21            println!("reducer: {} + {}", state, action);
22            DispatchOp::Dispatch(state + action, None)
23        })))
24        .with_capacity(2) // 매우 작은 capacity로 설정
25        .with_policy(policy)
26        .build()
27        .unwrap();
28
29    // subscriber 추가
30    store.add_subscriber(Arc::new(FnSubscriber::from(|state: &i32, action: &i32| {
31        println!("subscriber: state: {}, action: {}", state, action);
32    })));
33
34    println!("=== Predicate 기반 Backpressure 테스트 ===");
35    println!("채널 capacity: 2");
36    println!("predicate: 5보다 작은 값들은 drop");
37    println!("reducer 지연: 200ms");
38    println!();
39
40    // 다양한 값들을 빠르게 dispatch
41    let actions = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
42
43    for action in actions {
44        println!("dispatch: {}", action);
45        match store.dispatch(action) {
46            Ok(_) => println!("  -> 성공"),
47            Err(e) => println!("  -> 실패: {:?}", e),
48        }
49
50        // 빠른 dispatch로 backpressure 상황 생성
51        std::thread::sleep(std::time::Duration::from_millis(50));
52    }
53
54    // 처리 완료 대기
55    std::thread::sleep(std::time::Duration::from_millis(3000));
56
57    // 최종 상태 확인
58    let final_state = store.get_state();
59    println!();
60    println!("최종 상태: {}", final_state);
61
62    // 메트릭 확인
63    let metrics = store.get_metrics();
64    println!("메트릭:");
65    println!("  - 받은 액션: {}", metrics.action_received);
66    println!("  - 처리된 액션: {}", metrics.action_reduced);
67    println!("  - drop된 액션: {}", metrics.action_dropped);
68
69    store.stop();
70}
Source

pub fn add_middleware( &self, middleware: Arc<dyn Middleware<State, Action> + Send + Sync>, )

Add middleware

Source

pub fn iter(&self) -> impl Iterator<Item = (State, Action)>

Iterator for the state

it uses a channel to subscribe to the state changes the channel is rendezvous(capacity 1), the store will block on the channel until the subscriber consumes the state

Examples found in repository?
examples/iter_state.rs (line 13)
3fn main() {
4    // new store with reducer
5    let store = StoreImpl::new_with_reducer(
6        0,
7        Box::new(FnReducer::from(|state: &i32, action: &i32| {
8            println!("reducer: {} + {}", state, action);
9            DispatchOp::Dispatch(state + action, None)
10        })),
11    );
12
13    let mut iter = store.iter();
14
15    // dispatch actions
16    let _ = store.dispatch(41);
17    let _ = store.dispatch(1);
18
19    assert_eq!(iter.next(), Some((41, 41)));
20    assert_eq!(iter.next(), Some((42, 1)));
21
22    // stop the store
23    store.stop();
24
25    assert_eq!(iter.next(), None);
26    eprintln!("exit");
27}
Source

pub fn subscribed( &self, subscriber: Box<dyn Subscriber<State, Action> + Send + Sync>, ) -> Result<Box<dyn Subscription>, StoreError>

subscribing to store updates in new context with default capacity and BlockOnFull policy when the channel is full

§Parameters
  • subscriber: The subscriber to subscribe to the store
§Return
  • Subscription: Subscription for the store,
Source

pub fn subscribed_with( &self, capacity: usize, policy: BackpressurePolicy<(Instant, State, Action)>, subscriber: Box<dyn Subscriber<State, Action> + Send + Sync>, ) -> Result<Box<dyn Subscription>, StoreError>

subscribing to store updates in new context

§Parameters
  • capacity: Channel buffer capacity
  • policy: Backpressure policy for when channel is full, BlockOnFull or DropLatestIf is supported to prevent from dropping the ActionOp::Exit
§Return
  • Subscription: Subscription for the store,

Trait Implementations§

Source§

impl<State, Action> Drop for StoreImpl<State, Action>
where State: Send + Sync + Clone + 'static, Action: Send + Sync + Clone + 'static,

close tx channel when the store is dropped, but not the dispatcher if you want to stop the dispatcher, call the stop method

Source§

fn drop(&mut self)

Executes the destructor for this type. Read more
Source§

impl<State, Action> Store<State, Action> for StoreImpl<State, Action>
where State: Send + Sync + Clone + 'static, Action: Send + Sync + Clone + 'static,

Source§

fn get_state(&self) -> State

Get the current state
Source§

fn dispatch(&self, action: Action) -> Result<(), StoreError>

Dispatch an action
Source§

fn add_subscriber( &self, subscriber: Arc<dyn Subscriber<State, Action> + Send + Sync>, ) -> Box<dyn Subscription>

Add a subscriber to the store store updates are delivered to the subscriber in same reducer thread
Source§

fn subscribed( &self, subscriber: Box<dyn Subscriber<State, Action> + Send + Sync>, ) -> Result<Box<dyn Subscription>, StoreError>

Iterate over the store’s state and action pairs subscribe to the store in new context store updates are delivered to the subscriber in the new context
Source§

fn subscribed_with( &self, capacity: usize, policy: BackpressurePolicy<(Instant, State, Action)>, subscriber: Box<dyn Subscriber<State, Action> + Send + Sync>, ) -> Result<Box<dyn Subscription>, StoreError>

subscribe to the store in new context store updates are delivered to the subscriber in the new context Read more
Source§

fn stop(&self)

Stop the store

Auto Trait Implementations§

§

impl<State, Action> !Freeze for StoreImpl<State, Action>

§

impl<State, Action> RefUnwindSafe for StoreImpl<State, Action>

§

impl<State, Action> Send for StoreImpl<State, Action>

§

impl<State, Action> Sync for StoreImpl<State, Action>

§

impl<State, Action> Unpin for StoreImpl<State, Action>
where State: Unpin,

§

impl<State, Action> UnwindSafe for StoreImpl<State, Action>

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.