pub struct StoreImpl<State, Action>{ /* private fields */ }Expand description
Implementations§
Source§impl<State, Action> StoreImpl<State, Action>
impl<State, Action> StoreImpl<State, Action>
Sourcepub fn new_with_reducer(
state: State,
reducer: Box<dyn Reducer<State, Action> + Send + Sync>,
) -> Result<Arc<StoreImpl<State, Action>>, StoreError>
pub fn new_with_reducer( state: State, reducer: Box<dyn Reducer<State, Action> + Send + Sync>, ) -> Result<Arc<StoreImpl<State, Action>>, StoreError>
create a new store with a reducer and an initial state
Examples found in repository?
94pub fn main() {
95 println!("Hello, Basic!");
96
97 let store = StoreImpl::<CalcState, CalcAction>::new_with_reducer(
98 CalcState::default(),
99 Box::new(CalcReducer::default()),
100 )
101 .unwrap();
102
103 println!("add subscriber");
104 store.add_subscriber(Arc::new(CalcSubscriber::default())).unwrap();
105 store.dispatch(CalcAction::Add(1)).expect("no dispatch failed");
106 store.dispatch(CalcAction::Subtract(1)).expect("no dispatch failed");
107
108 // stop the store
109 match store.stop() {
110 Ok(_) => println!("store stopped"),
111 Err(e) => {
112 panic!("store stop failed : {:?}", e);
113 }
114 }
115
116 assert_eq!(store.get_state().count, 0);
117}More examples
96fn main() {
97 println!("=== Query State Example ===");
98
99 // Create a store with initial state
100 let store_impl =
101 StoreImpl::new_with_reducer(CalcState::default(), Box::new(CalcReducer::default()))
102 .unwrap();
103
104 // Dispatch some actions
105 println!("Dispatching actions...");
106 store_impl.dispatch(CalcAction::Add(10)).unwrap();
107 store_impl.dispatch(CalcAction::Multiply(2)).unwrap();
108 store_impl.dispatch(CalcAction::Subtract(5)).unwrap();
109
110 // Wait for actions to be processed
111 std::thread::sleep(std::time::Duration::from_millis(100));
112
113 // Query the current state using query_state
114 println!("\n=== Querying Current State ===");
115
116 // Query 1: Get current count
117 let current_count = Arc::new(Mutex::new(0));
118 let count_clone = current_count.clone();
119 store_impl
120 .query_state(move |state| {
121 *count_clone.lock().unwrap() = state.count;
122 })
123 .unwrap();
124
125 // Wait for actions to be processed
126 std::thread::sleep(std::time::Duration::from_millis(100));
127 println!("Current count: {}", *current_count.lock().unwrap());
128
129 // Query 2: Get history length
130 let history_length = Arc::new(Mutex::new(0));
131 let history_clone = history_length.clone();
132 store_impl
133 .query_state(move |state| {
134 *history_clone.lock().unwrap() = state.history.len();
135 })
136 .unwrap();
137
138 // Wait for actions to be processed
139 std::thread::sleep(std::time::Duration::from_millis(100));
140 println!("History length: {}", *history_length.lock().unwrap());
141
142 // Query 3: Print all history
143 println!("\n=== History ===");
144 store_impl
145 .query_state(|state| {
146 for (i, entry) in state.history.iter().enumerate() {
147 println!(" {}: {}", i + 1, entry);
148 }
149 })
150 .unwrap();
151
152 // Query 4: Check if count is even
153 let is_even = Arc::new(Mutex::new(false));
154 let even_clone = is_even.clone();
155 store_impl
156 .query_state(move |state| {
157 *even_clone.lock().unwrap() = state.count % 2 == 0;
158 })
159 .unwrap();
160
161 // Wait for actions to be processed
162 std::thread::sleep(std::time::Duration::from_millis(100));
163 println!("\nIs count even? {}", *is_even.lock().unwrap());
164
165 // Query 5: Get the last operation
166 let last_operation = Arc::new(Mutex::new(String::new()));
167 let last_op_clone = last_operation.clone();
168 store_impl
169 .query_state(move |state| {
170 if let Some(last) = state.history.last() {
171 *last_op_clone.lock().unwrap() = last.clone();
172 }
173 })
174 .unwrap();
175
176 // Wait for actions to be processed
177 std::thread::sleep(std::time::Duration::from_millis(100));
178 println!("Last operation: {}", *last_operation.lock().unwrap());
179
180 // Dispatch more actions and query again
181 println!("\n=== After More Actions ===");
182 store_impl.dispatch(CalcAction::Add(100)).unwrap();
183 store_impl.dispatch(CalcAction::Reset).unwrap();
184
185 // Wait for actions to be processed
186 std::thread::sleep(std::time::Duration::from_millis(100));
187
188 // Query final state
189 let final_count = Arc::new(Mutex::new(0));
190 let final_clone = final_count.clone();
191 store_impl
192 .query_state(move |state| {
193 *final_clone.lock().unwrap() = state.count;
194 })
195 .unwrap();
196
197 // Wait for actions to be processed
198 std::thread::sleep(std::time::Duration::from_millis(100));
199 println!("Final count: {}", *final_count.lock().unwrap());
200
201 // Query final history
202 println!("\n=== Final History ===");
203 store_impl
204 .query_state(|state| {
205 for (i, entry) in state.history.iter().enumerate() {
206 println!(" {}: {}", i + 1, entry);
207 }
208 })
209 .unwrap();
210
211 // Stop the store
212 store_impl.stop().unwrap();
213 println!("\nStore stopped.");
214}Sourcepub fn new_with_name(
state: State,
reducer: Box<dyn Reducer<State, Action> + Send + Sync>,
name: String,
) -> Result<Arc<StoreImpl<State, Action>>, StoreError>
pub fn new_with_name( state: State, reducer: Box<dyn Reducer<State, Action> + Send + Sync>, name: String, ) -> Result<Arc<StoreImpl<State, Action>>, StoreError>
create a new store with name
Sourcepub fn new_with(
state: State,
reducers: Vec<Box<dyn Reducer<State, Action> + Send + Sync>>,
name: String,
capacity: usize,
policy: BackpressurePolicy<Action>,
middlewares: Vec<Arc<dyn MiddlewareFnFactory<State, Action> + Send + Sync>>,
) -> Result<Arc<StoreImpl<State, Action>>, StoreError>
pub fn new_with( state: State, reducers: Vec<Box<dyn Reducer<State, Action> + Send + Sync>>, name: String, capacity: usize, policy: BackpressurePolicy<Action>, middlewares: Vec<Arc<dyn MiddlewareFnFactory<State, Action> + Send + Sync>>, ) -> Result<Arc<StoreImpl<State, Action>>, StoreError>
create a new store
Examples found in repository?
123pub fn main() {
124 println!("Hello, Thunk!");
125
126 // create a thunk somewhere else
127 let lock_done = Arc::new(Mutex::new(false));
128 let cond_done: Arc<Condvar> = Arc::new(Condvar::new());
129 let subtract_thunk = get_subtract_thunk(cond_done.clone(), 1);
130
131 let store = StoreImpl::new_with(
132 CalcState::default(),
133 vec![Box::new(CalcReducer::default())],
134 "store-thunk".into(),
135 rs_store::DEFAULT_CAPACITY,
136 rs_store::BackpressurePolicy::default(),
137 vec![],
138 )
139 .unwrap();
140
141 store.add_subscriber(Arc::new(CalcSubscriber::default())).unwrap();
142 store.dispatch(CalcAction::Add(1)).expect("no dispatch failed");
143
144 // send thunk to store
145 store.dispatch_thunk(subtract_thunk);
146
147 // wait for thunk to finish
148 drop(cond_done.wait(lock_done.lock().unwrap()).unwrap());
149
150 match store.stop() {
151 Ok(_) => println!("store stopped"),
152 Err(e) => {
153 panic!("store stop failed : {:?}", e);
154 }
155 }
156}Sourcepub fn get_state(&self) -> State
pub fn get_state(&self) -> State
get the latest state(for debugging)
prefer to use subscribe to get the state
Examples found in repository?
94pub fn main() {
95 println!("Hello, Basic!");
96
97 let store = StoreImpl::<CalcState, CalcAction>::new_with_reducer(
98 CalcState::default(),
99 Box::new(CalcReducer::default()),
100 )
101 .unwrap();
102
103 println!("add subscriber");
104 store.add_subscriber(Arc::new(CalcSubscriber::default())).unwrap();
105 store.dispatch(CalcAction::Add(1)).expect("no dispatch failed");
106 store.dispatch(CalcAction::Subtract(1)).expect("no dispatch failed");
107
108 // stop the store
109 match store.stop() {
110 Ok(_) => println!("store stopped"),
111 Err(e) => {
112 panic!("store stop failed : {:?}", e);
113 }
114 }
115
116 assert_eq!(store.get_state().count, 0);
117}Sourcepub fn get_metrics(&self) -> MetricsSnapshot
pub fn get_metrics(&self) -> MetricsSnapshot
get the metrics
Sourcepub fn add_subscriber(
&self,
subscriber: Arc<dyn Subscriber<State, Action> + Send + Sync>,
) -> Result<Box<dyn Subscription>, StoreError>
pub fn add_subscriber( &self, subscriber: Arc<dyn Subscriber<State, Action> + Send + Sync>, ) -> Result<Box<dyn Subscription>, StoreError>
add a subscriber to the store
Examples found in repository?
94pub fn main() {
95 println!("Hello, Basic!");
96
97 let store = StoreImpl::<CalcState, CalcAction>::new_with_reducer(
98 CalcState::default(),
99 Box::new(CalcReducer::default()),
100 )
101 .unwrap();
102
103 println!("add subscriber");
104 store.add_subscriber(Arc::new(CalcSubscriber::default())).unwrap();
105 store.dispatch(CalcAction::Add(1)).expect("no dispatch failed");
106 store.dispatch(CalcAction::Subtract(1)).expect("no dispatch failed");
107
108 // stop the store
109 match store.stop() {
110 Ok(_) => println!("store stopped"),
111 Err(e) => {
112 panic!("store stop failed : {:?}", e);
113 }
114 }
115
116 assert_eq!(store.get_state().count, 0);
117}More examples
123pub fn main() {
124 println!("Hello, Thunk!");
125
126 // create a thunk somewhere else
127 let lock_done = Arc::new(Mutex::new(false));
128 let cond_done: Arc<Condvar> = Arc::new(Condvar::new());
129 let subtract_thunk = get_subtract_thunk(cond_done.clone(), 1);
130
131 let store = StoreImpl::new_with(
132 CalcState::default(),
133 vec![Box::new(CalcReducer::default())],
134 "store-thunk".into(),
135 rs_store::DEFAULT_CAPACITY,
136 rs_store::BackpressurePolicy::default(),
137 vec![],
138 )
139 .unwrap();
140
141 store.add_subscriber(Arc::new(CalcSubscriber::default())).unwrap();
142 store.dispatch(CalcAction::Add(1)).expect("no dispatch failed");
143
144 // send thunk to store
145 store.dispatch_thunk(subtract_thunk);
146
147 // wait for thunk to finish
148 drop(cond_done.wait(lock_done.lock().unwrap()).unwrap());
149
150 match store.stop() {
151 Ok(_) => println!("store stopped"),
152 Err(e) => {
153 panic!("store stop failed : {:?}", e);
154 }
155 }
156}Sourcepub fn close(&self) -> Result<(), StoreError>
pub fn close(&self) -> Result<(), StoreError>
close the store
send an exit action to the store and drop the dispatch channel
§Return
- Ok(()) : if the store is closed
- Err(StoreError) : if the store is not closed, this can be happened when the queue is full
Sourcepub fn stop(&self) -> Result<(), StoreError>
pub fn stop(&self) -> Result<(), StoreError>
close the store and wait for the dispatcher to finish
§Return
- Ok(()) : if the store is closed
- Err(StoreError) : if the store is not closed, this can be happened when the queue is full
Examples found in repository?
94pub fn main() {
95 println!("Hello, Basic!");
96
97 let store = StoreImpl::<CalcState, CalcAction>::new_with_reducer(
98 CalcState::default(),
99 Box::new(CalcReducer::default()),
100 )
101 .unwrap();
102
103 println!("add subscriber");
104 store.add_subscriber(Arc::new(CalcSubscriber::default())).unwrap();
105 store.dispatch(CalcAction::Add(1)).expect("no dispatch failed");
106 store.dispatch(CalcAction::Subtract(1)).expect("no dispatch failed");
107
108 // stop the store
109 match store.stop() {
110 Ok(_) => println!("store stopped"),
111 Err(e) => {
112 panic!("store stop failed : {:?}", e);
113 }
114 }
115
116 assert_eq!(store.get_state().count, 0);
117}More examples
123pub fn main() {
124 println!("Hello, Thunk!");
125
126 // create a thunk somewhere else
127 let lock_done = Arc::new(Mutex::new(false));
128 let cond_done: Arc<Condvar> = Arc::new(Condvar::new());
129 let subtract_thunk = get_subtract_thunk(cond_done.clone(), 1);
130
131 let store = StoreImpl::new_with(
132 CalcState::default(),
133 vec![Box::new(CalcReducer::default())],
134 "store-thunk".into(),
135 rs_store::DEFAULT_CAPACITY,
136 rs_store::BackpressurePolicy::default(),
137 vec![],
138 )
139 .unwrap();
140
141 store.add_subscriber(Arc::new(CalcSubscriber::default())).unwrap();
142 store.dispatch(CalcAction::Add(1)).expect("no dispatch failed");
143
144 // send thunk to store
145 store.dispatch_thunk(subtract_thunk);
146
147 // wait for thunk to finish
148 drop(cond_done.wait(lock_done.lock().unwrap()).unwrap());
149
150 match store.stop() {
151 Ok(_) => println!("store stopped"),
152 Err(e) => {
153 panic!("store stop failed : {:?}", e);
154 }
155 }
156}96fn main() {
97 println!("=== Query State Example ===");
98
99 // Create a store with initial state
100 let store_impl =
101 StoreImpl::new_with_reducer(CalcState::default(), Box::new(CalcReducer::default()))
102 .unwrap();
103
104 // Dispatch some actions
105 println!("Dispatching actions...");
106 store_impl.dispatch(CalcAction::Add(10)).unwrap();
107 store_impl.dispatch(CalcAction::Multiply(2)).unwrap();
108 store_impl.dispatch(CalcAction::Subtract(5)).unwrap();
109
110 // Wait for actions to be processed
111 std::thread::sleep(std::time::Duration::from_millis(100));
112
113 // Query the current state using query_state
114 println!("\n=== Querying Current State ===");
115
116 // Query 1: Get current count
117 let current_count = Arc::new(Mutex::new(0));
118 let count_clone = current_count.clone();
119 store_impl
120 .query_state(move |state| {
121 *count_clone.lock().unwrap() = state.count;
122 })
123 .unwrap();
124
125 // Wait for actions to be processed
126 std::thread::sleep(std::time::Duration::from_millis(100));
127 println!("Current count: {}", *current_count.lock().unwrap());
128
129 // Query 2: Get history length
130 let history_length = Arc::new(Mutex::new(0));
131 let history_clone = history_length.clone();
132 store_impl
133 .query_state(move |state| {
134 *history_clone.lock().unwrap() = state.history.len();
135 })
136 .unwrap();
137
138 // Wait for actions to be processed
139 std::thread::sleep(std::time::Duration::from_millis(100));
140 println!("History length: {}", *history_length.lock().unwrap());
141
142 // Query 3: Print all history
143 println!("\n=== History ===");
144 store_impl
145 .query_state(|state| {
146 for (i, entry) in state.history.iter().enumerate() {
147 println!(" {}: {}", i + 1, entry);
148 }
149 })
150 .unwrap();
151
152 // Query 4: Check if count is even
153 let is_even = Arc::new(Mutex::new(false));
154 let even_clone = is_even.clone();
155 store_impl
156 .query_state(move |state| {
157 *even_clone.lock().unwrap() = state.count % 2 == 0;
158 })
159 .unwrap();
160
161 // Wait for actions to be processed
162 std::thread::sleep(std::time::Duration::from_millis(100));
163 println!("\nIs count even? {}", *is_even.lock().unwrap());
164
165 // Query 5: Get the last operation
166 let last_operation = Arc::new(Mutex::new(String::new()));
167 let last_op_clone = last_operation.clone();
168 store_impl
169 .query_state(move |state| {
170 if let Some(last) = state.history.last() {
171 *last_op_clone.lock().unwrap() = last.clone();
172 }
173 })
174 .unwrap();
175
176 // Wait for actions to be processed
177 std::thread::sleep(std::time::Duration::from_millis(100));
178 println!("Last operation: {}", *last_operation.lock().unwrap());
179
180 // Dispatch more actions and query again
181 println!("\n=== After More Actions ===");
182 store_impl.dispatch(CalcAction::Add(100)).unwrap();
183 store_impl.dispatch(CalcAction::Reset).unwrap();
184
185 // Wait for actions to be processed
186 std::thread::sleep(std::time::Duration::from_millis(100));
187
188 // Query final state
189 let final_count = Arc::new(Mutex::new(0));
190 let final_clone = final_count.clone();
191 store_impl
192 .query_state(move |state| {
193 *final_clone.lock().unwrap() = state.count;
194 })
195 .unwrap();
196
197 // Wait for actions to be processed
198 std::thread::sleep(std::time::Duration::from_millis(100));
199 println!("Final count: {}", *final_count.lock().unwrap());
200
201 // Query final history
202 println!("\n=== Final History ===");
203 store_impl
204 .query_state(|state| {
205 for (i, entry) in state.history.iter().enumerate() {
206 println!(" {}: {}", i + 1, entry);
207 }
208 })
209 .unwrap();
210
211 // Stop the store
212 store_impl.stop().unwrap();
213 println!("\nStore stopped.");
214}Sourcepub fn stop_with_timeout(&self, timeout: Duration) -> Result<(), StoreError>
pub fn stop_with_timeout(&self, timeout: Duration) -> Result<(), StoreError>
close the store and wait for the dispatcher to finish
Sourcepub fn query_state<F>(&self, query_fn: F) -> Result<(), StoreError>
pub fn query_state<F>(&self, query_fn: F) -> Result<(), StoreError>
Query the current state with a function.
The function will be executed in the store thread with the current state moved into it. This is useful for read‑only inspections or aggregations that should observe a consistent snapshot.
§Parameters
query_fn: A function that receives the current state by value (State)
§Returns
Ok(()): if the query is scheduled successfullyErr(StoreError): if the store is not available
Examples found in repository?
96fn main() {
97 println!("=== Query State Example ===");
98
99 // Create a store with initial state
100 let store_impl =
101 StoreImpl::new_with_reducer(CalcState::default(), Box::new(CalcReducer::default()))
102 .unwrap();
103
104 // Dispatch some actions
105 println!("Dispatching actions...");
106 store_impl.dispatch(CalcAction::Add(10)).unwrap();
107 store_impl.dispatch(CalcAction::Multiply(2)).unwrap();
108 store_impl.dispatch(CalcAction::Subtract(5)).unwrap();
109
110 // Wait for actions to be processed
111 std::thread::sleep(std::time::Duration::from_millis(100));
112
113 // Query the current state using query_state
114 println!("\n=== Querying Current State ===");
115
116 // Query 1: Get current count
117 let current_count = Arc::new(Mutex::new(0));
118 let count_clone = current_count.clone();
119 store_impl
120 .query_state(move |state| {
121 *count_clone.lock().unwrap() = state.count;
122 })
123 .unwrap();
124
125 // Wait for actions to be processed
126 std::thread::sleep(std::time::Duration::from_millis(100));
127 println!("Current count: {}", *current_count.lock().unwrap());
128
129 // Query 2: Get history length
130 let history_length = Arc::new(Mutex::new(0));
131 let history_clone = history_length.clone();
132 store_impl
133 .query_state(move |state| {
134 *history_clone.lock().unwrap() = state.history.len();
135 })
136 .unwrap();
137
138 // Wait for actions to be processed
139 std::thread::sleep(std::time::Duration::from_millis(100));
140 println!("History length: {}", *history_length.lock().unwrap());
141
142 // Query 3: Print all history
143 println!("\n=== History ===");
144 store_impl
145 .query_state(|state| {
146 for (i, entry) in state.history.iter().enumerate() {
147 println!(" {}: {}", i + 1, entry);
148 }
149 })
150 .unwrap();
151
152 // Query 4: Check if count is even
153 let is_even = Arc::new(Mutex::new(false));
154 let even_clone = is_even.clone();
155 store_impl
156 .query_state(move |state| {
157 *even_clone.lock().unwrap() = state.count % 2 == 0;
158 })
159 .unwrap();
160
161 // Wait for actions to be processed
162 std::thread::sleep(std::time::Duration::from_millis(100));
163 println!("\nIs count even? {}", *is_even.lock().unwrap());
164
165 // Query 5: Get the last operation
166 let last_operation = Arc::new(Mutex::new(String::new()));
167 let last_op_clone = last_operation.clone();
168 store_impl
169 .query_state(move |state| {
170 if let Some(last) = state.history.last() {
171 *last_op_clone.lock().unwrap() = last.clone();
172 }
173 })
174 .unwrap();
175
176 // Wait for actions to be processed
177 std::thread::sleep(std::time::Duration::from_millis(100));
178 println!("Last operation: {}", *last_operation.lock().unwrap());
179
180 // Dispatch more actions and query again
181 println!("\n=== After More Actions ===");
182 store_impl.dispatch(CalcAction::Add(100)).unwrap();
183 store_impl.dispatch(CalcAction::Reset).unwrap();
184
185 // Wait for actions to be processed
186 std::thread::sleep(std::time::Duration::from_millis(100));
187
188 // Query final state
189 let final_count = Arc::new(Mutex::new(0));
190 let final_clone = final_count.clone();
191 store_impl
192 .query_state(move |state| {
193 *final_clone.lock().unwrap() = state.count;
194 })
195 .unwrap();
196
197 // Wait for actions to be processed
198 std::thread::sleep(std::time::Duration::from_millis(100));
199 println!("Final count: {}", *final_count.lock().unwrap());
200
201 // Query final history
202 println!("\n=== Final History ===");
203 store_impl
204 .query_state(|state| {
205 for (i, entry) in state.history.iter().enumerate() {
206 println!(" {}: {}", i + 1, entry);
207 }
208 })
209 .unwrap();
210
211 // Stop the store
212 store_impl.stop().unwrap();
213 println!("\nStore stopped.");
214}Sourcepub fn subscribed(
&self,
subscriber: Box<dyn Subscriber<State, Action> + Send + Sync>,
) -> Result<Box<dyn Subscription>, StoreError>
pub fn subscribed( &self, subscriber: Box<dyn Subscriber<State, Action> + Send + Sync>, ) -> Result<Box<dyn Subscription>, StoreError>
Sourcepub fn subscribed_with(
&self,
capacity: usize,
policy: BackpressurePolicy<(Instant, State, Action)>,
subscriber: Box<dyn Subscriber<State, Action> + Send + Sync>,
) -> Result<Box<dyn Subscription>, StoreError>
pub fn subscribed_with( &self, capacity: usize, policy: BackpressurePolicy<(Instant, State, Action)>, subscriber: Box<dyn Subscriber<State, Action> + Send + Sync>, ) -> Result<Box<dyn Subscription>, StoreError>
Trait Implementations§
Source§impl<State, Action> Drop for StoreImpl<State, Action>
close tx channel when the store is dropped, but not the dispatcher
if you want to stop the dispatcher, call the stop method
impl<State, Action> Drop for StoreImpl<State, Action>
close tx channel when the store is dropped, but not the dispatcher if you want to stop the dispatcher, call the stop method