1use std::time::Duration;
4use tokio::sync::mpsc;
5use crate::protocol::pid::Pid;
6use crate::vehicle::{ModuleId, ThresholdResult};
7
8#[derive(Debug, Clone)]
10#[non_exhaustive]
11pub enum PollEvent {
12 Reading {
14 pid: Pid,
15 reading: crate::protocol::enhanced::Reading,
16 },
17
18 EnhancedReading {
20 did: u16,
21 module: ModuleId,
22 reading: crate::protocol::enhanced::Reading,
23 },
24
25 Alert(ThresholdResult),
27
28 RuleFired {
30 rule_name: String,
31 description: String,
32 },
33
34 Error {
36 pid: Option<Pid>,
37 error: String,
38 },
39
40 Voltage(f64),
42}
43
44#[derive(Debug, Clone)]
46pub struct PollConfig {
47 pub pids: Vec<Pid>,
49 pub interval: Duration,
51 pub read_voltage: bool,
53}
54
55impl PollConfig {
56 pub fn new(pids: Vec<Pid>) -> Self {
58 Self {
59 pids,
60 interval: Duration::from_millis(250),
61 read_voltage: true,
62 }
63 }
64
65 pub fn with_interval(mut self, interval: Duration) -> Self {
67 self.interval = interval;
68 self
69 }
70
71 pub fn with_voltage(mut self, read_voltage: bool) -> Self {
73 self.read_voltage = read_voltage;
74 self
75 }
76}
77
78#[derive(Debug)]
80pub struct PollHandle {
81 cancel_tx: tokio::sync::watch::Sender<bool>,
82 interval_tx: tokio::sync::watch::Sender<Duration>,
83 _cancel_rx: tokio::sync::watch::Receiver<bool>,
85 _interval_rx: tokio::sync::watch::Receiver<Duration>,
86}
87
88impl PollHandle {
89 pub fn stop(&self) {
91 let _ = self.cancel_tx.send(true);
92 }
93
94 pub fn set_interval(&self, interval: Duration) {
96 let _ = self.interval_tx.send(interval);
97 }
98
99 pub fn is_running(&self) -> bool {
101 !*self.cancel_tx.borrow()
102 }
103}
104
105pub fn start_poll_loop(
114 config: PollConfig,
115) -> (PollHandle, mpsc::Receiver<PollEvent>, PollConfig) {
116 let (event_tx, event_rx) = mpsc::channel(256);
117 let (cancel_tx, cancel_rx) = tokio::sync::watch::channel(false);
118 let (interval_tx, interval_rx) = tokio::sync::watch::channel(config.interval);
119
120 let _ = event_tx;
122
123 let handle = PollHandle {
124 cancel_tx,
125 interval_tx,
126 _cancel_rx: cancel_rx,
127 _interval_rx: interval_rx,
128 };
129
130 (handle, event_rx, config)
133}
134
135pub async fn execute_poll_cycle<A: crate::adapter::Adapter>(
140 adapter: &mut A,
141 config: &PollConfig,
142 event_tx: &mpsc::Sender<PollEvent>,
143 spec: Option<&crate::vehicle::VehicleSpec>,
144) {
145 use crate::protocol::service::ServiceRequest;
146 use crate::protocol::enhanced::ReadingSource;
147 use std::time::Instant;
148
149 for &pid in &config.pids {
150 let req = ServiceRequest::read_pid(pid);
151 match adapter.request(&req).await {
152 Ok(data) => {
153 match pid.parse(&data) {
154 Ok(value) => {
155 let reading = crate::protocol::enhanced::Reading {
156 value: value.clone(),
157 unit: pid.unit(),
158 timestamp: Instant::now(),
159 raw_bytes: data,
160 source: ReadingSource::Live,
161 };
162
163 let _ = event_tx.send(PollEvent::Reading {
165 pid,
166 reading: reading.clone(),
167 }).await;
168
169 if let crate::protocol::enhanced::Value::Scalar(v) = &value {
171 if let Some(result) = super::threshold::evaluate_pid_threshold(
172 spec, pid, *v,
173 ) {
174 let _ = event_tx.send(PollEvent::Alert(result)).await;
175 }
176 }
177 }
178 Err(e) => {
179 let _ = event_tx.send(PollEvent::Error {
180 pid: Some(pid),
181 error: e.to_string(),
182 }).await;
183 }
184 }
185 }
186 Err(crate::error::Obd2Error::NoData) => {
187 }
189 Err(e) => {
190 let _ = event_tx.send(PollEvent::Error {
191 pid: Some(pid),
192 error: e.to_string(),
193 }).await;
194 }
195 }
196 }
197
198 if config.read_voltage {
200 if let Ok(Some(v)) = adapter.battery_voltage().await {
201 let _ = event_tx.send(PollEvent::Voltage(v)).await;
202 }
203 }
204}
205
206#[cfg(test)]
207mod tests {
208 use super::*;
209 use crate::adapter::Adapter;
210 use crate::adapter::mock::MockAdapter;
211
212 #[test]
213 fn test_poll_config_defaults() {
214 let config = PollConfig::new(vec![Pid::ENGINE_RPM, Pid::VEHICLE_SPEED]);
215 assert_eq!(config.pids.len(), 2);
216 assert_eq!(config.interval, Duration::from_millis(250));
217 assert!(config.read_voltage);
218 }
219
220 #[test]
221 fn test_poll_config_builder() {
222 let config = PollConfig::new(vec![Pid::ENGINE_RPM])
223 .with_interval(Duration::from_millis(500))
224 .with_voltage(false);
225 assert_eq!(config.interval, Duration::from_millis(500));
226 assert!(!config.read_voltage);
227 }
228
229 #[test]
230 fn test_poll_handle_stop() {
231 let (handle, _rx, _config) = start_poll_loop(
232 PollConfig::new(vec![Pid::ENGINE_RPM]),
233 );
234 assert!(handle.is_running());
235 handle.stop();
236 assert!(!handle.is_running());
237 }
238
239 #[tokio::test]
240 async fn test_execute_poll_cycle() {
241 let mut adapter = MockAdapter::new();
242 adapter.initialize().await.unwrap();
243
244 let config = PollConfig::new(vec![Pid::ENGINE_RPM, Pid::COOLANT_TEMP]);
245 let (tx, mut rx) = mpsc::channel(64);
246
247 execute_poll_cycle(&mut adapter, &config, &tx, None).await;
248
249 let mut count = 0;
251 while let Ok(event) = rx.try_recv() {
252 match event {
253 PollEvent::Reading { .. } => count += 1,
254 PollEvent::Voltage(_) => count += 1,
255 _ => {}
256 }
257 }
258 assert!(count >= 2, "expected at least 2 events, got {}", count);
259 }
260
261 #[tokio::test]
262 async fn test_poll_cycle_with_threshold() {
263 use crate::vehicle::{
264 VehicleSpec, SpecIdentity, EngineSpec, CommunicationSpec,
265 ThresholdSet, NamedThreshold, Threshold,
266 };
267
268 let spec = VehicleSpec {
269 spec_version: Some("1.0".into()),
270 identity: SpecIdentity {
271 name: "Test".into(),
272 model_years: (2020, 2020),
273 makes: vec![],
274 models: vec![],
275 engine: EngineSpec {
276 code: "T".into(),
277 displacement_l: 2.0,
278 cylinders: 4,
279 layout: "I4".into(),
280 aspiration: "NA".into(),
281 fuel_type: "Gas".into(),
282 fuel_system: None,
283 compression_ratio: None,
284 max_power_kw: None,
285 max_torque_nm: None,
286 redline_rpm: 6500,
287 idle_rpm_warm: 700,
288 idle_rpm_cold: 900,
289 firing_order: None,
290 ecm_hardware: None,
291 },
292 transmission: None,
293 vin_match: None,
294 },
295 communication: CommunicationSpec {
296 buses: vec![],
297 elm327_protocol_code: None,
298 },
299 thresholds: Some(ThresholdSet {
300 engine: vec![NamedThreshold {
301 name: "coolant_temp_c".into(),
302 threshold: Threshold {
303 min: Some(0.0),
304 max: Some(130.0),
305 warning_low: None,
306 warning_high: Some(60.0),
307 critical_low: None,
308 critical_high: Some(100.0),
309 unit: "\u{00B0}C".into(),
310 },
311 }],
312 transmission: vec![],
313 }),
314 dtc_library: None,
315 polling_groups: vec![],
316 diagnostic_rules: vec![],
317 known_issues: vec![],
318 enhanced_pids: vec![],
319 };
320
321 let mut adapter = MockAdapter::new();
322 adapter.initialize().await.unwrap();
323
324 let config = PollConfig::new(vec![Pid::COOLANT_TEMP]).with_voltage(false);
325 let (tx, mut rx) = mpsc::channel(64);
326
327 execute_poll_cycle(&mut adapter, &config, &tx, Some(&spec)).await;
328
329 let mut got_alert = false;
332 while let Ok(event) = rx.try_recv() {
333 if matches!(event, PollEvent::Alert(_)) {
334 got_alert = true;
335 }
336 }
337 assert!(!got_alert, "50 deg C should not trigger alert (warning at 60)");
338 }
339}