Skip to main content

obd2_core/session/
poller.rs

1//! PID polling loop with PollEvent channel and threshold alerting.
2
3use std::time::Duration;
4use tokio::sync::mpsc;
5use crate::protocol::pid::Pid;
6use crate::vehicle::{ModuleId, ThresholdResult};
7
8/// Events emitted by the polling loop.
9#[derive(Debug, Clone)]
10#[non_exhaustive]
11pub enum PollEvent {
12    /// A standard PID was read successfully.
13    Reading {
14        pid: Pid,
15        reading: crate::protocol::enhanced::Reading,
16    },
17
18    /// An enhanced PID was read.
19    EnhancedReading {
20        did: u16,
21        module: ModuleId,
22        reading: crate::protocol::enhanced::Reading,
23    },
24
25    /// A threshold was breached.
26    Alert(ThresholdResult),
27
28    /// A diagnostic rule fired.
29    RuleFired {
30        rule_name: String,
31        description: String,
32    },
33
34    /// Polling encountered a non-fatal error (polling continues).
35    Error {
36        pid: Option<Pid>,
37        error: String,
38    },
39
40    /// Battery voltage update.
41    Voltage(f64),
42}
43
44/// Configuration for a polling session.
45#[derive(Debug, Clone)]
46pub struct PollConfig {
47    /// PIDs to poll each cycle.
48    pub pids: Vec<Pid>,
49    /// Interval between poll cycles.
50    pub interval: Duration,
51    /// Whether to read battery voltage each cycle.
52    pub read_voltage: bool,
53}
54
55impl PollConfig {
56    /// Create a basic poll config with default interval.
57    pub fn new(pids: Vec<Pid>) -> Self {
58        Self {
59            pids,
60            interval: Duration::from_millis(250),
61            read_voltage: true,
62        }
63    }
64
65    /// Set the polling interval.
66    pub fn with_interval(mut self, interval: Duration) -> Self {
67        self.interval = interval;
68        self
69    }
70
71    /// Set whether to read battery voltage.
72    pub fn with_voltage(mut self, read_voltage: bool) -> Self {
73        self.read_voltage = read_voltage;
74        self
75    }
76}
77
78/// Handle for controlling an active polling loop.
79#[derive(Debug)]
80pub struct PollHandle {
81    cancel_tx: tokio::sync::watch::Sender<bool>,
82    interval_tx: tokio::sync::watch::Sender<Duration>,
83    // Keep receivers alive so send() succeeds and updates the value.
84    _cancel_rx: tokio::sync::watch::Receiver<bool>,
85    _interval_rx: tokio::sync::watch::Receiver<Duration>,
86}
87
88impl PollHandle {
89    /// Stop the polling loop.
90    pub fn stop(&self) {
91        let _ = self.cancel_tx.send(true);
92    }
93
94    /// Adjust the polling interval dynamically (battery conservation).
95    pub fn set_interval(&self, interval: Duration) {
96        let _ = self.interval_tx.send(interval);
97    }
98
99    /// Check if the polling loop is still running.
100    pub fn is_running(&self) -> bool {
101        !*self.cancel_tx.borrow()
102    }
103}
104
105/// Start a polling loop that reads PIDs and sends events to a channel.
106///
107/// Returns a (PollHandle, Receiver, PollConfig) triple. Use PollHandle to stop or adjust.
108/// The polling task runs on the current tokio runtime.
109///
110/// BR-6.1: Cancellable via PollHandle::stop()
111/// BR-6.4: Single PID failure emits PollEvent::Error, doesn't stop the loop
112/// BR-6.5: Task is tracked via PollHandle
113pub fn start_poll_loop(
114    config: PollConfig,
115) -> (PollHandle, mpsc::Receiver<PollEvent>, PollConfig) {
116    let (event_tx, event_rx) = mpsc::channel(256);
117    let (cancel_tx, cancel_rx) = tokio::sync::watch::channel(false);
118    let (interval_tx, interval_rx) = tokio::sync::watch::channel(config.interval);
119
120    // Keep the sender alive so the channel stays open
121    let _ = event_tx;
122
123    let handle = PollHandle {
124        cancel_tx,
125        interval_tx,
126        _cancel_rx: cancel_rx,
127        _interval_rx: interval_rx,
128    };
129
130    // Return the config so Session can drive the loop itself
131    // (Session needs &mut self for adapter access, so we can't spawn internally)
132    (handle, event_rx, config)
133}
134
135/// Execute one poll cycle: read all PIDs and emit events.
136///
137/// Called by Session in its poll loop. This is NOT async over the adapter
138/// directly -- Session calls this with the adapter.
139pub async fn execute_poll_cycle<A: crate::adapter::Adapter>(
140    adapter: &mut A,
141    config: &PollConfig,
142    event_tx: &mpsc::Sender<PollEvent>,
143    spec: Option<&crate::vehicle::VehicleSpec>,
144) {
145    use crate::protocol::service::ServiceRequest;
146    use crate::protocol::enhanced::ReadingSource;
147    use std::time::Instant;
148
149    for &pid in &config.pids {
150        let req = ServiceRequest::read_pid(pid);
151        match adapter.request(&req).await {
152            Ok(data) => {
153                match pid.parse(&data) {
154                    Ok(value) => {
155                        let reading = crate::protocol::enhanced::Reading {
156                            value: value.clone(),
157                            unit: pid.unit(),
158                            timestamp: Instant::now(),
159                            raw_bytes: data,
160                            source: ReadingSource::Live,
161                        };
162
163                        // Emit reading
164                        let _ = event_tx.send(PollEvent::Reading {
165                            pid,
166                            reading: reading.clone(),
167                        }).await;
168
169                        // Check threshold (BR-5.2)
170                        if let crate::protocol::enhanced::Value::Scalar(v) = &value {
171                            if let Some(result) = super::threshold::evaluate_pid_threshold(
172                                spec, pid, *v,
173                            ) {
174                                let _ = event_tx.send(PollEvent::Alert(result)).await;
175                            }
176                        }
177                    }
178                    Err(e) => {
179                        let _ = event_tx.send(PollEvent::Error {
180                            pid: Some(pid),
181                            error: e.to_string(),
182                        }).await;
183                    }
184                }
185            }
186            Err(crate::error::Obd2Error::NoData) => {
187                // Skip -- PID not supported (BR-6.4)
188            }
189            Err(e) => {
190                let _ = event_tx.send(PollEvent::Error {
191                    pid: Some(pid),
192                    error: e.to_string(),
193                }).await;
194            }
195        }
196    }
197
198    // Battery voltage
199    if config.read_voltage {
200        if let Ok(Some(v)) = adapter.battery_voltage().await {
201            let _ = event_tx.send(PollEvent::Voltage(v)).await;
202        }
203    }
204}
205
206#[cfg(test)]
207mod tests {
208    use super::*;
209    use crate::adapter::Adapter;
210    use crate::adapter::mock::MockAdapter;
211
212    #[test]
213    fn test_poll_config_defaults() {
214        let config = PollConfig::new(vec![Pid::ENGINE_RPM, Pid::VEHICLE_SPEED]);
215        assert_eq!(config.pids.len(), 2);
216        assert_eq!(config.interval, Duration::from_millis(250));
217        assert!(config.read_voltage);
218    }
219
220    #[test]
221    fn test_poll_config_builder() {
222        let config = PollConfig::new(vec![Pid::ENGINE_RPM])
223            .with_interval(Duration::from_millis(500))
224            .with_voltage(false);
225        assert_eq!(config.interval, Duration::from_millis(500));
226        assert!(!config.read_voltage);
227    }
228
229    #[test]
230    fn test_poll_handle_stop() {
231        let (handle, _rx, _config) = start_poll_loop(
232            PollConfig::new(vec![Pid::ENGINE_RPM]),
233        );
234        assert!(handle.is_running());
235        handle.stop();
236        assert!(!handle.is_running());
237    }
238
239    #[tokio::test]
240    async fn test_execute_poll_cycle() {
241        let mut adapter = MockAdapter::new();
242        adapter.initialize().await.unwrap();
243
244        let config = PollConfig::new(vec![Pid::ENGINE_RPM, Pid::COOLANT_TEMP]);
245        let (tx, mut rx) = mpsc::channel(64);
246
247        execute_poll_cycle(&mut adapter, &config, &tx, None).await;
248
249        // Should receive at least 2 readings + 1 voltage
250        let mut count = 0;
251        while let Ok(event) = rx.try_recv() {
252            match event {
253                PollEvent::Reading { .. } => count += 1,
254                PollEvent::Voltage(_) => count += 1,
255                _ => {}
256            }
257        }
258        assert!(count >= 2, "expected at least 2 events, got {}", count);
259    }
260
261    #[tokio::test]
262    async fn test_poll_cycle_with_threshold() {
263        use crate::vehicle::{
264            VehicleSpec, SpecIdentity, EngineSpec, CommunicationSpec,
265            ThresholdSet, NamedThreshold, Threshold,
266        };
267
268        let spec = VehicleSpec {
269            spec_version: Some("1.0".into()),
270            identity: SpecIdentity {
271                name: "Test".into(),
272                model_years: (2020, 2020),
273                makes: vec![],
274                models: vec![],
275                engine: EngineSpec {
276                    code: "T".into(),
277                    displacement_l: 2.0,
278                    cylinders: 4,
279                    layout: "I4".into(),
280                    aspiration: "NA".into(),
281                    fuel_type: "Gas".into(),
282                    fuel_system: None,
283                    compression_ratio: None,
284                    max_power_kw: None,
285                    max_torque_nm: None,
286                    redline_rpm: 6500,
287                    idle_rpm_warm: 700,
288                    idle_rpm_cold: 900,
289                    firing_order: None,
290                    ecm_hardware: None,
291                },
292                transmission: None,
293                vin_match: None,
294            },
295            communication: CommunicationSpec {
296                buses: vec![],
297                elm327_protocol_code: None,
298            },
299            thresholds: Some(ThresholdSet {
300                engine: vec![NamedThreshold {
301                    name: "coolant_temp_c".into(),
302                    threshold: Threshold {
303                        min: Some(0.0),
304                        max: Some(130.0),
305                        warning_low: None,
306                        warning_high: Some(60.0),
307                        critical_low: None,
308                        critical_high: Some(100.0),
309                        unit: "\u{00B0}C".into(),
310                    },
311                }],
312                transmission: vec![],
313            }),
314            dtc_library: None,
315            polling_groups: vec![],
316            diagnostic_rules: vec![],
317            known_issues: vec![],
318            enhanced_pids: vec![],
319        };
320
321        let mut adapter = MockAdapter::new();
322        adapter.initialize().await.unwrap();
323
324        let config = PollConfig::new(vec![Pid::COOLANT_TEMP]).with_voltage(false);
325        let (tx, mut rx) = mpsc::channel(64);
326
327        execute_poll_cycle(&mut adapter, &config, &tx, Some(&spec)).await;
328
329        // MockAdapter returns 50 deg C for coolant which is below warning threshold of 60
330        // So we should NOT get an alert
331        let mut got_alert = false;
332        while let Ok(event) = rx.try_recv() {
333            if matches!(event, PollEvent::Alert(_)) {
334                got_alert = true;
335            }
336        }
337        assert!(!got_alert, "50 deg C should not trigger alert (warning at 60)");
338    }
339}