client/
scheduler.rs

1// In Cargo.toml, ensure you have: client = { path = "../client" }
2use models::BiliMessage;
3use std::sync::Arc;
4
5use crate::models;
6
7/// Context information passed to event handlers
8#[derive(Debug, Clone)]
9pub struct EventContext {
10    /// Bilibili cookies for authentication
11    pub cookies: Option<String>,
12    /// Room ID where the event occurred
13    pub room_id: u64,
14}
15
16impl EventContext {
17    /// Create a new EventContext with automatic cookie detection
18    pub fn new_with_auto_cookies(room_id: u64) -> Self {
19        let cookies = crate::auth::get_cookies_or_browser(None);
20        Self { cookies, room_id }
21    }
22
23    /// Create a new EventContext with provided cookies
24    pub fn new(cookies: Option<String>, room_id: u64) -> Self {
25        Self { cookies, room_id }
26    }
27}
28
29/// Trait for event handlers (plugins) that process BiliMessage.
30pub trait EventHandler: Send + Sync {
31    fn handle(&self, msg: &BiliMessage, context: &EventContext);
32}
33
34/// Scheduling mode: Parallel or Sequential.
35pub enum ScheduleMode {
36    Parallel,
37    Sequential,
38}
39
40/// Scheduler struct: manages event handlers and dispatches messages.
41pub struct Scheduler {
42    /// Each stage is a Vec of handlers to run in parallel; stages run sequentially.
43    stages: Vec<Vec<Arc<dyn EventHandler>>>,
44    /// Context information for event handlers
45    context: EventContext,
46}
47
48impl Scheduler {
49    pub fn new(context: EventContext) -> Self {
50        Scheduler { 
51            stages: Vec::new(),
52            context,
53        }
54    }
55
56    /// Add a new stage (group of handlers to run in parallel)
57    pub fn add_stage(&mut self, handlers: Vec<Arc<dyn EventHandler>>) {
58        self.stages.push(handlers);
59    }
60
61    /// Add a single handler as a new sequential stage
62    pub fn add_sequential_handler(&mut self, handler: Arc<dyn EventHandler>) {
63        self.stages.push(vec![handler]);
64    }
65
66    /// Trigger all stages with the given BiliMessage.
67    pub fn trigger(&self, msg: BiliMessage) {
68        for stage in &self.stages {
69            let mut handles = vec![];
70            for handler in stage {
71                let msg = msg.clone();
72                let context = self.context.clone();
73                let handler = Arc::clone(handler);
74                handles.push(std::thread::spawn(move || {
75                    handler.handle(&msg, &context);
76                }));
77            }
78            // Wait for all handlers in this stage to finish before next stage
79            for handle in handles {
80                let _ = handle.join();
81            }
82        }
83    }
84}
85
86pub fn add(left: u64, right: u64) -> u64 {
87    left + right
88}
89
90#[cfg(test)]
91mod tests {
92    use crate::models::BiliMessage;
93    use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
94    use std::sync::{mpsc, Arc, Mutex};
95
96    struct AssertHandler {
97        called: Arc<AtomicBool>,
98        last_msg: Arc<Mutex<Option<BiliMessage>>>,
99    }
100    impl super::EventHandler for AssertHandler {
101        fn handle(&self, msg: &BiliMessage, _context: &super::EventContext) {
102            self.called.store(true, Ordering::SeqCst);
103            let mut lock = self.last_msg.lock().unwrap();
104            *lock = Some(msg.clone());
105        }
106    }
107
108    #[test]
109    fn test_scheduler_with_mpsc_channel() {
110        let (tx, rx) = mpsc::channel();
111        let called = Arc::new(AtomicBool::new(false));
112        let last_msg = Arc::new(Mutex::new(None));
113        let handler = AssertHandler {
114            called: Arc::clone(&called),
115            last_msg: Arc::clone(&last_msg),
116        };
117        let context = super::EventContext {
118            cookies: Some("test_cookies".to_string()),
119            room_id: 12345,
120        };
121        let mut scheduler = super::Scheduler::new(context);
122        scheduler.add_sequential_handler(Arc::new(handler));
123
124        // Send a test message
125        let test_msg = BiliMessage::Danmu {
126            user: "user1".to_string(),
127            text: "hello".to_string(),
128        };
129        tx.send(test_msg.clone()).unwrap();
130
131        // Simulate receiving and triggering
132        if let Ok(msg) = rx.recv() {
133            scheduler.trigger(msg);
134        }
135
136        // Assert handler was called and message matches
137        assert!(called.load(Ordering::SeqCst), "Handler was not called");
138        let lock = last_msg.lock().unwrap();
139        assert!(lock.is_some(), "No message stored in handler");
140        assert_eq!(lock.as_ref().unwrap(), &test_msg, "Message does not match");
141    }
142
143    #[test]
144    fn test_scheduler_add_stage_and_sequential_handler() {
145        use crate::models::BiliMessage;
146
147        struct CounterHandler {
148            counter: Arc<AtomicUsize>,
149        }
150        impl super::EventHandler for CounterHandler {
151            fn handle(&self, _msg: &BiliMessage, _context: &super::EventContext) {
152                self.counter.fetch_add(1, Ordering::SeqCst);
153            }
154        }
155
156        let counter1 = Arc::new(AtomicUsize::new(0));
157        let counter2 = Arc::new(AtomicUsize::new(0));
158        let counter3 = Arc::new(AtomicUsize::new(0));
159
160        let handler1 = Arc::new(CounterHandler {
161            counter: Arc::clone(&counter1),
162        });
163        let handler2 = Arc::new(CounterHandler {
164            counter: Arc::clone(&counter2),
165        });
166        let handler3 = Arc::new(CounterHandler {
167            counter: Arc::clone(&counter3),
168        });
169
170        let context = super::EventContext {
171            cookies: Some("test_cookies".to_string()),
172            room_id: 12345,
173        };
174        let mut scheduler = super::Scheduler::new(context);
175        // Add a parallel stage (handler1 and handler2)
176        scheduler.add_stage(vec![handler1, handler2]);
177        // Add a sequential stage (handler3)
178        scheduler.add_sequential_handler(handler3);
179
180        let test_msg = BiliMessage::Danmu {
181            user: "user2".to_string(),
182            text: "test".to_string(),
183        };
184        scheduler.trigger(test_msg);
185
186        // Both handler1 and handler2 should be called once (parallel stage)
187        assert_eq!(counter1.load(Ordering::SeqCst), 1, "Handler1 not called");
188        assert_eq!(counter2.load(Ordering::SeqCst), 1, "Handler2 not called");
189        // handler3 should be called once (sequential stage)
190        assert_eq!(counter3.load(Ordering::SeqCst), 1, "Handler3 not called");
191    }
192}