bleasy/
scanner.rs

1pub(crate) mod config;
2mod worker;
3
4use btleplug::{api::Manager as _, platform::Manager, Error};
5use std::{
6    pin::Pin,
7    sync::{
8        atomic::{AtomicBool, Ordering},
9        Arc, RwLock, Weak,
10    },
11};
12use stream_cancel::{Trigger, Valved};
13use tokio::sync::broadcast::{self, Sender};
14use tokio_stream::{wrappers::BroadcastStream, Stream, StreamExt};
15
16use self::{
17    config::ScanConfig,
18    worker::{ScanEvent, ScannerWorker, Session},
19};
20use crate::{Device, DeviceEvent};
21
22#[derive(Debug, Clone)]
23pub struct Scanner {
24    session: Weak<Session>,
25    event_sender: Sender<ScanEvent>,
26    stoppers: Arc<RwLock<Vec<Trigger>>>,
27    scan_stopper: Arc<AtomicBool>,
28}
29
30impl Default for Scanner {
31    fn default() -> Self {
32        Scanner::new()
33    }
34}
35
36impl Scanner {
37    pub fn new() -> Self {
38        let (event_sender, _) = broadcast::channel(32);
39        Self {
40            scan_stopper: Arc::new(AtomicBool::new(false)),
41            session: Weak::new(),
42            event_sender,
43            stoppers: Arc::new(RwLock::new(Vec::new())),
44        }
45    }
46
47    /// Start scanning for ble devices.
48    pub async fn start(&mut self, config: ScanConfig) -> Result<(), Error> {
49        if self.session.upgrade().is_some() {
50            log::info!("Scanner is already started.");
51            return Ok(());
52        }
53
54        let manager = Manager::new().await?;
55        let mut adapters = manager.adapters().await?;
56
57        if config.adapter_index >= adapters.len() {
58            return Err(Error::DeviceNotFound);
59        }
60
61        let adapter = adapters.swap_remove(config.adapter_index);
62        log::trace!("Using adapter: {:?}", adapter);
63
64        let session = Arc::new(Session {
65            _manager: manager,
66            adapter,
67        });
68        self.session = Arc::downgrade(&session);
69
70        let event_sender = self.event_sender.clone();
71
72        let mut worker = ScannerWorker::new(
73            config,
74            session.clone(),
75            event_sender,
76            self.scan_stopper.clone(),
77        );
78        tokio::spawn(async move {
79            let _ = worker.scan().await;
80        });
81
82        Ok(())
83    }
84
85    /// Stop scanning for ble devices.
86    pub async fn stop(&self) -> Result<(), Error> {
87        self.scan_stopper.store(true, Ordering::Relaxed);
88        self.stoppers.write()?.clear();
89        log::info!("Scanner is stopped.");
90
91        Ok(())
92    }
93
94    /// Returns true if the scanner is active.
95    pub fn is_active(&self) -> bool {
96        self.session.upgrade().is_some()
97    }
98
99    /// Create a new stream that receives ble device events.
100    pub fn device_event_stream(
101        &self,
102    ) -> Result<Valved<Pin<Box<dyn Stream<Item = DeviceEvent> + Send>>>, Error> {
103        let receiver = self.event_sender.subscribe();
104
105        let stream: Pin<Box<dyn Stream<Item = DeviceEvent> + Send>> = Box::pin(
106            BroadcastStream::new(receiver)
107                .take_while(|x| match x {
108                    Ok(ScanEvent::ScanStopped) => {
109                        log::info!("Received ScanStopped event, ending device event stream");
110                        false
111                    }
112                    _ => true,
113                })
114                .filter_map(|x| match x {
115                    Ok(ScanEvent::DeviceEvent(event)) => {
116                        log::trace!("Broadcasting device: {:?}", event);
117                        Some(event)
118                    }
119                    Err(e) => {
120                        log::warn!("Error: {:?} when broadcasting device event!", e);
121                        None
122                    }
123                    _ => None,
124                }),
125        );
126
127        let (trigger, stream) = Valved::new(stream);
128        self.stoppers.write()?.push(trigger);
129
130        Ok(stream)
131    }
132
133    /// Create a new stream that receives discovered ble devices.
134    pub fn device_stream(
135        &self,
136    ) -> Result<Valved<Pin<Box<dyn Stream<Item = Device> + Send>>>, Error> {
137        let receiver = self.event_sender.subscribe();
138
139        let stream: Pin<Box<dyn Stream<Item = Device> + Send>> = Box::pin(
140            BroadcastStream::new(receiver)
141                .take_while(|x| match x {
142                    Ok(ScanEvent::ScanStopped) => {
143                        log::info!("Received ScanStopped event, ending device stream");
144                        false
145                    }
146                    _ => true,
147                })
148                .filter_map(|x| match x {
149                    Ok(ScanEvent::DeviceEvent(DeviceEvent::Discovered(device))) => {
150                        log::trace!("Broadcasting device: {:?}", device.address());
151                        Some(device)
152                    }
153                    Err(e) => {
154                        log::warn!("Error: {:?} when broadcasting device!", e);
155                        None
156                    }
157                    _ => None,
158                }),
159        );
160
161        let (trigger, stream) = Valved::new(stream);
162        self.stoppers.write()?.push(trigger);
163
164        Ok(stream)
165    }
166}