dropoldest_if/
dropoldest_if.rs

1use rs_store::{BackpressurePolicy, DispatchOp, FnReducer, FnSubscriber, StoreBuilder};
2use std::sync::Arc;
3
4fn main() {
5    // predicate 기반 drop 정책 사용 예시
6    // 작은 값들을 우선적으로 drop하는 predicate
7    let predicate = Arc::new(|action_op: &rs_store::ActionOp<i32>| {
8        match action_op {
9            rs_store::ActionOp::Action(value) => *value < 5, // 5보다 작은 값들은 drop
10            rs_store::ActionOp::Exit(_) => false,            // Exit는 drop하지 않음
11        }
12    });
13
14    let policy = BackpressurePolicy::DropOldestIf { predicate };
15
16    // 매우 작은 capacity로 store 생성하여 backpressure 상황 시뮬레이션
17    let store = StoreBuilder::new(0)
18        .with_reducer(Box::new(FnReducer::from(|state: &i32, action: &i32| {
19            // reducer에서 지연을 추가하여 backpressure 상황 생성
20            std::thread::sleep(std::time::Duration::from_millis(200));
21            println!("reducer: {} + {}", state, action);
22            DispatchOp::Dispatch(state + action, None)
23        })))
24        .with_capacity(2) // 매우 작은 capacity로 설정
25        .with_policy(policy)
26        .build()
27        .unwrap();
28
29    // subscriber 추가
30    store.add_subscriber(Arc::new(FnSubscriber::from(|state: &i32, action: &i32| {
31        println!("subscriber: state: {}, action: {}", state, action);
32    })));
33
34    println!("=== Predicate 기반 Backpressure 테스트 ===");
35    println!("채널 capacity: 2");
36    println!("predicate: 5보다 작은 값들은 drop");
37    println!("reducer 지연: 200ms");
38    println!();
39
40    // 다양한 값들을 빠르게 dispatch
41    let actions = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
42
43    for action in actions {
44        println!("dispatch: {}", action);
45        match store.dispatch(action) {
46            Ok(_) => println!("  -> 성공"),
47            Err(e) => println!("  -> 실패: {:?}", e),
48        }
49
50        // 빠른 dispatch로 backpressure 상황 생성
51        std::thread::sleep(std::time::Duration::from_millis(50));
52    }
53
54    // 처리 완료 대기
55    std::thread::sleep(std::time::Duration::from_millis(3000));
56
57    // 최종 상태 확인
58    let final_state = store.get_state();
59    println!();
60    println!("최종 상태: {}", final_state);
61
62    // 메트릭 확인
63    let metrics = store.get_metrics();
64    println!("메트릭:");
65    println!("  - 받은 액션: {}", metrics.action_received);
66    println!("  - 처리된 액션: {}", metrics.action_reduced);
67    println!("  - drop된 액션: {}", metrics.action_dropped);
68
69    store.stop();
70}