dropoldest_if/
dropoldest_if.rs1use rs_store::{BackpressurePolicy, DispatchOp, FnReducer, FnSubscriber, StoreBuilder};
2use std::sync::Arc;
3
4fn main() {
5 let predicate = Arc::new(|action_op: &rs_store::ActionOp<i32>| {
8 match action_op {
9 rs_store::ActionOp::Action(value) => *value < 5, rs_store::ActionOp::Exit(_) => false, }
12 });
13
14 let policy = BackpressurePolicy::DropOldestIf { predicate };
15
16 let store = StoreBuilder::new(0)
18 .with_reducer(Box::new(FnReducer::from(|state: &i32, action: &i32| {
19 std::thread::sleep(std::time::Duration::from_millis(200));
21 println!("reducer: {} + {}", state, action);
22 DispatchOp::Dispatch(state + action, None)
23 })))
24 .with_capacity(2) .with_policy(policy)
26 .build()
27 .unwrap();
28
29 store.add_subscriber(Arc::new(FnSubscriber::from(|state: &i32, action: &i32| {
31 println!("subscriber: state: {}, action: {}", state, action);
32 })));
33
34 println!("=== Predicate 기반 Backpressure 테스트 ===");
35 println!("채널 capacity: 2");
36 println!("predicate: 5보다 작은 값들은 drop");
37 println!("reducer 지연: 200ms");
38 println!();
39
40 let actions = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
42
43 for action in actions {
44 println!("dispatch: {}", action);
45 match store.dispatch(action) {
46 Ok(_) => println!(" -> 성공"),
47 Err(e) => println!(" -> 실패: {:?}", e),
48 }
49
50 std::thread::sleep(std::time::Duration::from_millis(50));
52 }
53
54 std::thread::sleep(std::time::Duration::from_millis(3000));
56
57 let final_state = store.get_state();
59 println!();
60 println!("최종 상태: {}", final_state);
61
62 let metrics = store.get_metrics();
64 println!("메트릭:");
65 println!(" - 받은 액션: {}", metrics.action_received);
66 println!(" - 처리된 액션: {}", metrics.action_reduced);
67 println!(" - drop된 액션: {}", metrics.action_dropped);
68
69 store.stop();
70}