bleasy/
scanner.rs

1use btleplug::{
2    api::{Central, CentralEvent, Manager as _, Peripheral as _},
3    platform::{Adapter, Manager, Peripheral, PeripheralId},
4    Error,
5};
6use std::{
7    collections::HashSet,
8    pin::Pin,
9    sync::{
10        atomic::{AtomicBool, Ordering},
11        Arc, Mutex, RwLock, Weak,
12    },
13    time::{Duration, Instant},
14};
15use stream_cancel::{Trigger, Valved};
16use tokio::sync::broadcast::{self, Sender};
17use tokio_stream::{wrappers::BroadcastStream, Stream, StreamExt};
18use uuid::Uuid;
19
20use crate::{Device, DeviceEvent};
21
22#[derive(Debug, Clone, Hash, Eq, PartialEq)]
23pub enum Filter {
24    Address(String),
25    Characteristic(Uuid),
26    Name(String),
27    Rssi(i16),
28    Service(Uuid),
29}
30
31#[derive(Default)]
32pub struct ScanConfig {
33    /// Index of the Bluetooth adapter to use. The first found adapter is used by default.
34    adapter_index: usize,
35    /// Filters objects
36    filters: Vec<Filter>,
37    /// Filters the found devices based on device address.
38    address_filter: Option<Box<dyn Fn(&str) -> bool + Send + Sync>>,
39    /// Filters the found devices based on local name.
40    name_filter: Option<Box<dyn Fn(&str) -> bool + Send + Sync>>,
41    /// Filters the found devices based on rssi.
42    rssi_filter: Option<Box<dyn Fn(i16) -> bool + Send + Sync>>,
43    /// Filters the found devices based on service's uuid.
44    service_filter: Option<Box<dyn Fn(&Vec<Uuid>, &Uuid) -> bool + Send + Sync>>,
45    /// Filters the found devices based on characteristics. Requires a connection to the device.
46    characteristics_filter: Option<Box<dyn Fn(&Vec<Uuid>) -> bool + Send + Sync>>,
47    /// Maximum results before the scan is stopped.
48    max_results: Option<usize>,
49    /// The scan is stopped when timeout duration is reached.
50    timeout: Option<Duration>,
51    /// Force disconnect when listen the device is connected.
52    force_disconnect: bool,
53}
54
55impl ScanConfig {
56    /// Index of bluetooth adapter to use
57    #[inline]
58    pub fn adapter_index(mut self, index: usize) -> Self {
59        self.adapter_index = index;
60        self
61    }
62
63    #[inline]
64    pub fn with_filters(mut self, filters: &[Filter]) -> Self {
65        self.filters.extend_from_slice(filters);
66        self
67    }
68
69    /// Filter scanned devices based on the device address
70    #[inline]
71    pub fn filter_by_address(
72        mut self,
73        func: impl Fn(&str) -> bool + Send + Sync + 'static,
74    ) -> Self {
75        self.address_filter = Some(Box::new(func));
76        self
77    }
78
79    /// Filter scanned devices based on the device name
80    #[inline]
81    pub fn filter_by_name(mut self, func: impl Fn(&str) -> bool + Send + Sync + 'static) -> Self {
82        self.name_filter = Some(Box::new(func));
83        self
84    }
85
86    #[inline]
87    pub fn filter_by_rssi(mut self, func: impl Fn(i16) -> bool + Send + Sync + 'static) -> Self {
88        self.rssi_filter = Some(Box::new(func));
89        self
90    }
91
92    #[inline]
93    pub fn filter_by_service(
94        mut self,
95        func: impl Fn(&Vec<Uuid>, &Uuid) -> bool + Send + Sync + 'static,
96    ) -> Self {
97        self.service_filter = Some(Box::new(func));
98        self
99    }
100
101    /// Filter scanned devices based on available characteristics
102    #[inline]
103    pub fn filter_by_characteristics(
104        mut self,
105        func: impl Fn(&Vec<Uuid>) -> bool + Send + Sync + 'static,
106    ) -> Self {
107        self.characteristics_filter = Some(Box::new(func));
108        self
109    }
110
111    /// Stop the scan after given number of matches
112    #[inline]
113    pub fn stop_after_matches(mut self, max_results: usize) -> Self {
114        self.max_results = Some(max_results);
115        self
116    }
117
118    /// Stop the scan after the first match
119    #[inline]
120    pub fn stop_after_first_match(self) -> Self {
121        self.stop_after_matches(1)
122    }
123
124    /// Stop the scan after given duration
125    #[inline]
126    pub fn stop_after_timeout(mut self, timeout: Duration) -> Self {
127        self.timeout = Some(timeout);
128        self
129    }
130
131    #[inline]
132    pub fn force_disconnect(mut self, force_disconnect: bool) -> Self {
133        self.force_disconnect = force_disconnect;
134        self
135    }
136
137    /// Require that the scanned devices have a name
138    #[inline]
139    pub fn require_name(self) -> Self {
140        if self.name_filter.is_none() {
141            self.filter_by_name(|src| !src.is_empty())
142        } else {
143            self
144        }
145    }
146}
147
148#[derive(Debug, Clone)]
149pub(crate) struct Session {
150    pub(crate) _manager: Manager,
151    pub(crate) adapter: Adapter,
152}
153
154#[derive(Debug, Clone)]
155pub struct Scanner {
156    session: Weak<Session>,
157    event_sender: Sender<DeviceEvent>,
158    stoppers: Arc<RwLock<Vec<Trigger>>>,
159    scan_stopper: Arc<AtomicBool>,
160}
161
162impl Default for Scanner {
163    fn default() -> Self {
164        Scanner::new()
165    }
166}
167
168impl Scanner {
169    pub fn new() -> Self {
170        let (event_sender, _) = broadcast::channel(32);
171        Self {
172            scan_stopper: Arc::new(AtomicBool::new(false)),
173            session: Weak::new(),
174            event_sender,
175            stoppers: Arc::new(RwLock::new(Vec::new())),
176        }
177    }
178
179    /// Start scanning for ble devices.
180    pub async fn start(&mut self, config: ScanConfig) -> Result<(), Error> {
181        if self.session.upgrade().is_some() {
182            log::info!("Scanner is already started.");
183            return Ok(());
184        }
185
186        let manager = Manager::new().await?;
187        let mut adapters = manager.adapters().await?;
188
189        if config.adapter_index >= adapters.len() {
190            return Err(Error::DeviceNotFound);
191        }
192
193        let adapter = adapters.swap_remove(config.adapter_index);
194        log::trace!("Using adapter: {:?}", adapter);
195
196        let session = Arc::new(Session {
197            _manager: manager,
198            adapter,
199        });
200        self.session = Arc::downgrade(&session);
201
202        let event_sender = self.event_sender.clone();
203
204        let mut worker = ScannerWorker::new(
205            config,
206            session.clone(),
207            event_sender,
208            self.scan_stopper.clone(),
209        );
210        tokio::spawn(async move {
211            let _ = worker.scan().await;
212        });
213
214        Ok(())
215    }
216
217    /// Stop scanning for ble devices.
218    pub async fn stop(&self) -> Result<(), Error> {
219        self.scan_stopper.store(true, Ordering::Relaxed);
220        self.stoppers.write()?.clear();
221        log::info!("Scanner is stopped.");
222
223        Ok(())
224    }
225
226    /// Returns true if the scanner is active.
227    pub fn is_active(&self) -> bool {
228        self.session.upgrade().is_some()
229    }
230
231    /// Create a new stream that receives ble device events.
232    pub fn device_event_stream(
233        &self,
234    ) -> Result<Valved<Pin<Box<dyn Stream<Item = DeviceEvent> + Send>>>, Error> {
235        let receiver = self.event_sender.subscribe();
236
237        let stream: Pin<Box<dyn Stream<Item = DeviceEvent> + Send>> =
238            Box::pin(BroadcastStream::new(receiver).filter_map(|x| match x {
239                Ok(event) => {
240                    log::debug!("Broadcasting device: {:?}", event);
241                    Some(event)
242                }
243                Err(e) => {
244                    log::warn!("Error: {:?} when broadcasting device event!", e);
245                    None
246                }
247            }));
248
249        let (trigger, stream) = Valved::new(stream);
250        self.stoppers.write()?.push(trigger);
251
252        Ok(stream)
253    }
254
255    /// Create a new stream that receives discovered ble devices.
256    pub fn device_stream(
257        &self,
258    ) -> Result<Valved<Pin<Box<dyn Stream<Item = Device> + Send>>>, Error> {
259        let receiver = self.event_sender.subscribe();
260
261        let stream: Pin<Box<dyn Stream<Item = Device> + Send>> =
262            Box::pin(BroadcastStream::new(receiver).filter_map(|x| match x {
263                Ok(DeviceEvent::Discovered(device)) => {
264                    log::debug!("Broadcasting device: {:?}", device.address());
265                    Some(device)
266                }
267                Err(e) => {
268                    log::warn!("Error: {:?} when broadcasting device!", e);
269                    None
270                }
271                _ => None,
272            }));
273
274        let (trigger, stream) = Valved::new(stream);
275        self.stoppers.write()?.push(trigger);
276
277        Ok(stream)
278    }
279}
280
281pub struct ScannerWorker {
282    /// Configurations for the scan, such as filters and stop conditions
283    config: ScanConfig,
284    /// Reference to the bluetooth session instance
285    session: Arc<Session>,
286    /// Number of matching devices found so far
287    result_count: usize,
288    /// Set of devices that have been filtered and will be ignored
289    filtered: HashSet<PeripheralId>,
290    /// Set of devices that we are currently connecting to
291    connecting: Arc<Mutex<HashSet<PeripheralId>>>,
292    /// Set of devices that matched the filters
293    matched: HashSet<PeripheralId>,
294    /// Channel for sending events to the client
295    event_sender: Sender<DeviceEvent>,
296    /// Stop the scan event.
297    stopper: Arc<AtomicBool>,
298}
299
300impl ScannerWorker {
301    fn new(
302        config: ScanConfig,
303        session: Arc<Session>,
304        event_sender: Sender<DeviceEvent>,
305        stopper: Arc<AtomicBool>,
306    ) -> Self {
307        Self {
308            config,
309            session,
310            result_count: 0,
311            filtered: HashSet::new(),
312            connecting: Arc::new(Mutex::new(HashSet::new())),
313            matched: HashSet::new(),
314            event_sender,
315            stopper,
316        }
317    }
318
319    async fn scan(&mut self) -> Result<(), Error> {
320        log::info!("Starting the scan");
321
322        self.session.adapter.start_scan(Default::default()).await?;
323
324        while let Ok(mut stream) = self.session.adapter.events().await {
325            let start_time = Instant::now();
326
327            while let Some(event) = stream.next().await {
328                match event {
329                    CentralEvent::DeviceDiscovered(v) => self.on_device_discovered(v).await,
330                    CentralEvent::DeviceUpdated(v) => self.on_device_updated(v).await,
331                    CentralEvent::DeviceConnected(v) => self.on_device_connected(v).await?,
332                    CentralEvent::DeviceDisconnected(v) => self.on_device_disconnected(v).await?,
333                    _ => {}
334                }
335
336                let timeout_reached = self
337                    .config
338                    .timeout
339                    .filter(|timeout| Instant::now().duration_since(start_time).ge(timeout))
340                    .is_some();
341
342                let max_result_reached = self
343                    .config
344                    .max_results
345                    .filter(|max_results| self.result_count >= *max_results)
346                    .is_some();
347
348                if timeout_reached || max_result_reached || self.stopper.load(Ordering::Relaxed) {
349                    log::info!("Scanner stop condition reached.");
350                    return Ok(());
351                }
352            }
353        }
354
355        Ok(())
356    }
357
358    async fn on_device_discovered(&mut self, peripheral_id: PeripheralId) {
359        if let Ok(peripheral) = self.session.adapter.peripheral(&peripheral_id).await {
360            log::trace!("Device discovered: {:?}", peripheral);
361
362            self.apply_filter(peripheral_id).await;
363        }
364    }
365
366    async fn on_device_updated(&mut self, peripheral_id: PeripheralId) {
367        if let Ok(peripheral) = self.session.adapter.peripheral(&peripheral_id).await {
368            log::trace!("Device updated: {:?}", peripheral);
369
370            if self.matched.contains(&peripheral_id) {
371                let address = peripheral.address();
372                match self.event_sender.send(DeviceEvent::Updated(Device::new(
373                    self.session.adapter.clone(),
374                    peripheral,
375                ))) {
376                    Ok(value) => log::debug!("Sent device: {}, size: {}...", address, value),
377                    Err(e) => log::debug!("Error: {:?} when Sending device: {}...", e, address),
378                }
379            } else {
380                self.apply_filter(peripheral_id).await;
381            }
382        }
383    }
384
385    async fn on_device_connected(&mut self, peripheral_id: PeripheralId) -> Result<(), Error> {
386        self.connecting.lock()?.remove(&peripheral_id);
387
388        if let Ok(peripheral) = self.session.adapter.peripheral(&peripheral_id).await {
389            log::trace!("Device connected: {:?}", peripheral);
390
391            if self.matched.contains(&peripheral_id) {
392                let address = peripheral.address();
393                match self.event_sender.send(DeviceEvent::Connected(Device::new(
394                    self.session.adapter.clone(),
395                    peripheral,
396                ))) {
397                    Ok(value) => log::trace!("Sent device: {}, size: {}...", address, value),
398                    Err(e) => log::warn!("Error: {:?} when Sending device: {}...", e, address),
399                }
400            } else {
401                self.apply_filter(peripheral_id).await;
402            }
403        }
404
405        Ok(())
406    }
407
408    async fn on_device_disconnected(&self, peripheral_id: PeripheralId) -> Result<(), Error> {
409        if let Ok(peripheral) = self.session.adapter.peripheral(&peripheral_id).await {
410            log::trace!("Device disconnected: {:?}", peripheral);
411
412            if self.matched.contains(&peripheral_id) {
413                let address = peripheral.address();
414                match self
415                    .event_sender
416                    .send(DeviceEvent::Disconnected(Device::new(
417                        self.session.adapter.clone(),
418                        peripheral,
419                    ))) {
420                    Ok(value) => log::trace!("Sent device: {}, size: {}...", address, value),
421                    Err(e) => log::warn!("Error: {:?} when Sending device: {}...", e, address),
422                }
423            }
424        }
425
426        self.connecting.lock()?.remove(&peripheral_id);
427
428        Ok(())
429    }
430
431    async fn apply_filter(&mut self, peripheral_id: PeripheralId) {
432        if self.filtered.contains(&peripheral_id) {
433            return;
434        }
435
436        if let Ok(peripheral) = self.session.adapter.peripheral(&peripheral_id).await {
437            if let Ok(Some(property)) = peripheral.properties().await {
438                let mut passed = true;
439                log::trace!("filtering: {:?}", property);
440
441                for filter in self.config.filters.iter() {
442                    if !passed {
443                        break;
444                    }
445                    match filter {
446                        Filter::Name(v) => {
447                            passed &= property.local_name.as_ref().is_some_and(|name| {
448                                if let Some(name_filter) = &self.config.name_filter {
449                                    name_filter(name)
450                                } else {
451                                    name == v
452                                }
453                            })
454                        }
455                        Filter::Rssi(v) => {
456                            passed &= property.rssi.is_some_and(|rssi| {
457                                if let Some(rssi_filter) = &self.config.rssi_filter {
458                                    rssi_filter(rssi)
459                                } else {
460                                    rssi >= *v
461                                }
462                            });
463                        }
464                        Filter::Service(v) => {
465                            let services = &property.services;
466                            if let Some(service_filter) = &self.config.service_filter {
467                                passed &= service_filter(&services, v);
468                            } else {
469                                passed &= property.services.contains(v);
470                            }
471                        }
472                        Filter::Address(v) => {
473                            let addr = property.address.to_string();
474                            if let Some(address_filter) = &self.config.address_filter {
475                                passed &= address_filter(&addr);
476                            } else {
477                                passed &= addr == *v;
478                            }
479                        }
480                        Filter::Characteristic(v) => {
481                            let _ = self
482                                .apply_character_filter(&peripheral, v, &mut passed)
483                                .await;
484                        }
485                    }
486                }
487
488                if passed {
489                    self.matched.insert(peripheral_id.clone());
490                    self.result_count += 1;
491
492                    if let Err(e) = self.event_sender.send(DeviceEvent::Discovered(Device::new(
493                        self.session.adapter.clone(),
494                        peripheral,
495                    ))) {
496                        log::warn!("error: {} when sending device", e);
497                    }
498                }
499
500                log::debug!(
501                    "current matched: {}, current filtered: {}",
502                    self.matched.len(),
503                    self.filtered.len()
504                );
505            }
506
507            self.filtered.insert(peripheral_id);
508        }
509    }
510
511    async fn apply_character_filter(
512        &self,
513        peripheral: &Peripheral,
514        uuid: &Uuid,
515        passed: &mut bool,
516    ) -> Result<(), Error> {
517        if !peripheral.is_connected().await.unwrap_or(false) {
518            if self.connecting.lock()?.insert(peripheral.id()) {
519                log::debug!("Connecting to device {}", peripheral.address());
520
521                // Connect in another thread, so we can keep filtering other devices meanwhile.
522                // let peripheral_clone = peripheral.clone();
523                let connecting_map = self.connecting.clone();
524                if let Err(e) = peripheral.connect().await {
525                    log::warn!("Could not connect to {}: {:?}", peripheral.address(), e);
526
527                    connecting_map.lock()?.remove(&peripheral.id());
528
529                    return Ok(());
530                };
531            }
532        }
533
534        let mut characteristics = Vec::new();
535        characteristics.extend(peripheral.characteristics());
536
537        if self.config.force_disconnect {
538            if let Err(e) = peripheral.disconnect().await {
539                log::warn!("Error: {} when disconnect device", e);
540            }
541        }
542
543        *passed &= if characteristics.is_empty() {
544            let address = peripheral.address();
545            log::debug!("Discovering characteristics for {}", address);
546
547            match peripheral.discover_services().await {
548                Ok(()) => {
549                    characteristics.extend(peripheral.characteristics());
550                    let characteristics = characteristics
551                        .into_iter()
552                        .map(|c| c.uuid)
553                        .collect::<Vec<_>>();
554
555                    if let Some(characteristics_filter) = &self.config.characteristics_filter {
556                        characteristics_filter(&characteristics)
557                    } else {
558                        characteristics.contains(uuid)
559                    }
560                }
561                Err(e) => {
562                    log::warn!(
563                        "Error: `{:?}` when discovering characteristics for {}",
564                        e,
565                        address
566                    );
567                    false
568                }
569            }
570        } else {
571            true
572        };
573
574        Ok(())
575    }
576}
577
578#[cfg(test)]
579mod tests {
580    use super::{Filter, ScanConfig, Scanner};
581    use crate::Device;
582    use btleplug::{api::BDAddr, Error};
583    use std::{future::Future, time::Duration};
584    use tokio_stream::StreamExt;
585    use uuid::Uuid;
586
587    async fn device_stream<T: Future<Output = ()>>(
588        scanner: Scanner,
589        callback: impl Fn(Device) -> T,
590    ) {
591        let duration = Duration::from_millis(15_000);
592        if let Err(_) = tokio::time::timeout(duration, async move {
593            if let Ok(mut stream) = scanner.device_stream() {
594                while let Some(device) = stream.next().await {
595                    callback(device).await;
596                    break;
597                }
598            }
599        })
600        .await
601        {
602            eprintln!("timeout....");
603        }
604    }
605
606    #[tokio::test]
607    async fn test_filter_by_address() -> Result<(), Error> {
608        pretty_env_logger::init();
609
610        let mac_addr = [0xE3, 0x9E, 0x2A, 0x4D, 0xAA, 0x97];
611        let filers = vec![Filter::Address("E3:9E:2A:4D:AA:97".into())];
612        let cfg = ScanConfig::default()
613            .with_filters(&filers)
614            .stop_after_first_match();
615        let mut scanner = Scanner::default();
616
617        scanner.start(cfg).await?;
618        device_stream(scanner, |device| async move {
619            assert_eq!(device.address(), BDAddr::from(mac_addr));
620        })
621        .await;
622
623        Ok(())
624    }
625
626    #[tokio::test]
627    async fn test_filter_by_character() -> Result<(), Error> {
628        pretty_env_logger::init();
629
630        let filers = vec![Filter::Characteristic(Uuid::from_u128(
631            0x6e400001_b5a3_f393_e0a9_e50e24dcca9e,
632        ))];
633        let cfg = ScanConfig::default()
634            .with_filters(&filers)
635            .stop_after_first_match();
636        let mut scanner = Scanner::default();
637
638        scanner.start(cfg).await?;
639        device_stream(scanner, |device| async move {
640            println!("device: {:?} found", device);
641        })
642        .await;
643
644        Ok(())
645    }
646
647    #[tokio::test]
648    async fn test_filter_by_name() -> Result<(), Error> {
649        pretty_env_logger::init();
650
651        let name = "73429485";
652        let filers = vec![Filter::Name(name.into())];
653        let cfg = ScanConfig::default()
654            .with_filters(&filers)
655            .stop_after_first_match();
656        let mut scanner = Scanner::default();
657
658        scanner.start(cfg).await?;
659        device_stream(scanner, |device| async move {
660            assert_eq!(device.local_name().await, Some(name.into()));
661        })
662        .await;
663
664        Ok(())
665    }
666
667    #[tokio::test]
668    async fn test_filter_by_rssi() -> Result<(), Error> {
669        pretty_env_logger::init();
670
671        let filers = vec![Filter::Rssi(-70)];
672        let cfg = ScanConfig::default()
673            .with_filters(&filers)
674            .stop_after_first_match();
675        let mut scanner = Scanner::default();
676
677        scanner.start(cfg).await?;
678        device_stream(scanner, |device| async move {
679            println!("device: {:?} found", device);
680        })
681        .await;
682
683        Ok(())
684    }
685
686    #[tokio::test]
687    async fn test_filter_by_service() -> Result<(), Error> {
688        pretty_env_logger::init();
689
690        let service = Uuid::from_u128(0x6e400001_b5a3_f393_e0a9_e50e24dcca9e);
691        let filers = vec![Filter::Service(service)];
692        let cfg = ScanConfig::default()
693            .with_filters(&filers)
694            .stop_after_first_match();
695        let mut scanner = Scanner::default();
696
697        scanner.start(cfg).await?;
698        device_stream(scanner, |device| async move {
699            println!("device: {:?} found", device);
700        })
701        .await;
702
703        Ok(())
704    }
705}