pokeys_thread/
observer.rs

1//! State observer for monitoring state changes
2
3use crate::state::{SharedDeviceState, StateChangeType};
4use crossbeam_channel::{Receiver, RecvTimeoutError, TryRecvError};
5use log::warn;
6use std::sync::Arc;
7use std::time::Duration;
8
9/// State observer for monitoring state changes
10pub struct StateObserver {
11    /// Shared device state
12    shared_state: Arc<SharedDeviceState>,
13    /// State change notification receiver
14    notification_rx: Receiver<StateChangeType>,
15    /// Thread ID
16    thread_id: u32,
17}
18
19impl StateObserver {
20    /// Create a new state observer
21    pub fn new(thread_id: u32, shared_state: Arc<SharedDeviceState>) -> Self {
22        let notification_rx = shared_state.setup_notifications();
23        Self {
24            shared_state,
25            notification_rx,
26            thread_id,
27        }
28    }
29
30    /// Wait for a state change with timeout
31    pub fn wait_for_change(&self, timeout: Duration) -> Option<StateChangeType> {
32        match self.notification_rx.recv_timeout(timeout) {
33            Ok(change_type) => Some(change_type),
34            Err(RecvTimeoutError::Timeout) => None,
35            Err(RecvTimeoutError::Disconnected) => {
36                warn!("State observer for thread {} disconnected", self.thread_id);
37                None
38            }
39        }
40    }
41
42    /// Check for a state change without blocking
43    pub fn check_for_change(&self) -> Option<StateChangeType> {
44        match self.notification_rx.try_recv() {
45            Ok(change_type) => Some(change_type),
46            Err(TryRecvError::Empty) => None,
47            Err(TryRecvError::Disconnected) => {
48                warn!("State observer for thread {} disconnected", self.thread_id);
49                None
50            }
51        }
52    }
53
54    /// Process all pending state changes
55    pub fn process_all_changes<F>(&self, mut handler: F)
56    where
57        F: FnMut(StateChangeType),
58    {
59        while let Some(change_type) = self.check_for_change() {
60            handler(change_type);
61        }
62    }
63
64    /// Get the shared state
65    pub fn shared_state(&self) -> Arc<SharedDeviceState> {
66        self.shared_state.clone()
67    }
68
69    /// Get the thread ID
70    pub fn thread_id(&self) -> u32 {
71        self.thread_id
72    }
73}