1use models::BiliMessage;
3use std::sync::Arc;
4
5use crate::models;
6
7#[derive(Debug, Clone)]
9pub struct EventContext {
10 pub cookies: Option<String>,
12 pub room_id: u64,
14}
15
16impl EventContext {
17 pub fn new_with_auto_cookies(room_id: u64) -> Self {
19 let cookies = crate::auth::get_cookies_or_browser(None);
20 Self { cookies, room_id }
21 }
22
23 pub fn new(cookies: Option<String>, room_id: u64) -> Self {
25 Self { cookies, room_id }
26 }
27}
28
29pub trait EventHandler: Send + Sync {
31 fn handle(&self, msg: &BiliMessage, context: &EventContext);
32}
33
34pub enum ScheduleMode {
36 Parallel,
37 Sequential,
38}
39
40pub struct Scheduler {
42 stages: Vec<Vec<Arc<dyn EventHandler>>>,
44 context: EventContext,
46}
47
48impl Scheduler {
49 pub fn new(context: EventContext) -> Self {
50 Scheduler {
51 stages: Vec::new(),
52 context,
53 }
54 }
55
56 pub fn add_stage(&mut self, handlers: Vec<Arc<dyn EventHandler>>) {
58 self.stages.push(handlers);
59 }
60
61 pub fn add_sequential_handler(&mut self, handler: Arc<dyn EventHandler>) {
63 self.stages.push(vec![handler]);
64 }
65
66 pub fn trigger(&self, msg: BiliMessage) {
68 for stage in &self.stages {
69 let mut handles = vec![];
70 for handler in stage {
71 let msg = msg.clone();
72 let context = self.context.clone();
73 let handler = Arc::clone(handler);
74 handles.push(std::thread::spawn(move || {
75 handler.handle(&msg, &context);
76 }));
77 }
78 for handle in handles {
80 let _ = handle.join();
81 }
82 }
83 }
84}
85
86pub fn add(left: u64, right: u64) -> u64 {
87 left + right
88}
89
90#[cfg(test)]
91mod tests {
92 use crate::models::BiliMessage;
93 use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
94 use std::sync::{mpsc, Arc, Mutex};
95
96 struct AssertHandler {
97 called: Arc<AtomicBool>,
98 last_msg: Arc<Mutex<Option<BiliMessage>>>,
99 }
100 impl super::EventHandler for AssertHandler {
101 fn handle(&self, msg: &BiliMessage, _context: &super::EventContext) {
102 self.called.store(true, Ordering::SeqCst);
103 let mut lock = self.last_msg.lock().unwrap();
104 *lock = Some(msg.clone());
105 }
106 }
107
108 #[test]
109 fn test_scheduler_with_mpsc_channel() {
110 let (tx, rx) = mpsc::channel();
111 let called = Arc::new(AtomicBool::new(false));
112 let last_msg = Arc::new(Mutex::new(None));
113 let handler = AssertHandler {
114 called: Arc::clone(&called),
115 last_msg: Arc::clone(&last_msg),
116 };
117 let context = super::EventContext {
118 cookies: Some("test_cookies".to_string()),
119 room_id: 12345,
120 };
121 let mut scheduler = super::Scheduler::new(context);
122 scheduler.add_sequential_handler(Arc::new(handler));
123
124 let test_msg = BiliMessage::Danmu {
126 user: "user1".to_string(),
127 text: "hello".to_string(),
128 };
129 tx.send(test_msg.clone()).unwrap();
130
131 if let Ok(msg) = rx.recv() {
133 scheduler.trigger(msg);
134 }
135
136 assert!(called.load(Ordering::SeqCst), "Handler was not called");
138 let lock = last_msg.lock().unwrap();
139 assert!(lock.is_some(), "No message stored in handler");
140 assert_eq!(lock.as_ref().unwrap(), &test_msg, "Message does not match");
141 }
142
143 #[test]
144 fn test_scheduler_add_stage_and_sequential_handler() {
145 use crate::models::BiliMessage;
146
147 struct CounterHandler {
148 counter: Arc<AtomicUsize>,
149 }
150 impl super::EventHandler for CounterHandler {
151 fn handle(&self, _msg: &BiliMessage, _context: &super::EventContext) {
152 self.counter.fetch_add(1, Ordering::SeqCst);
153 }
154 }
155
156 let counter1 = Arc::new(AtomicUsize::new(0));
157 let counter2 = Arc::new(AtomicUsize::new(0));
158 let counter3 = Arc::new(AtomicUsize::new(0));
159
160 let handler1 = Arc::new(CounterHandler {
161 counter: Arc::clone(&counter1),
162 });
163 let handler2 = Arc::new(CounterHandler {
164 counter: Arc::clone(&counter2),
165 });
166 let handler3 = Arc::new(CounterHandler {
167 counter: Arc::clone(&counter3),
168 });
169
170 let context = super::EventContext {
171 cookies: Some("test_cookies".to_string()),
172 room_id: 12345,
173 };
174 let mut scheduler = super::Scheduler::new(context);
175 scheduler.add_stage(vec![handler1, handler2]);
177 scheduler.add_sequential_handler(handler3);
179
180 let test_msg = BiliMessage::Danmu {
181 user: "user2".to_string(),
182 text: "test".to_string(),
183 };
184 scheduler.trigger(test_msg);
185
186 assert_eq!(counter1.load(Ordering::SeqCst), 1, "Handler1 not called");
188 assert_eq!(counter2.load(Ordering::SeqCst), 1, "Handler2 not called");
189 assert_eq!(counter3.load(Ordering::SeqCst), 1, "Handler3 not called");
191 }
192}