1use btleplug::{
2 api::{Central, CentralEvent, Manager as _, Peripheral as _},
3 platform::{Adapter, Manager, Peripheral, PeripheralId},
4 Error,
5};
6use std::{
7 collections::HashSet,
8 pin::Pin,
9 sync::{
10 atomic::{AtomicBool, Ordering},
11 Arc, Mutex, RwLock, Weak,
12 },
13 time::{Duration, Instant},
14};
15use stream_cancel::{Trigger, Valved};
16use tokio::sync::broadcast::{self, Sender};
17use tokio_stream::{wrappers::BroadcastStream, Stream, StreamExt};
18use uuid::Uuid;
19
20use crate::{Device, DeviceEvent};
21
22#[derive(Debug, Clone, Hash, Eq, PartialEq)]
23pub enum Filter {
24 Address(String),
25 Characteristic(Uuid),
26 Name(String),
27 Rssi(i16),
28 Service(Uuid),
29}
30
31#[derive(Default)]
32pub struct ScanConfig {
33 adapter_index: usize,
35 filters: Vec<Filter>,
37 address_filter: Option<Box<dyn Fn(&str) -> bool + Send + Sync>>,
39 name_filter: Option<Box<dyn Fn(&str) -> bool + Send + Sync>>,
41 rssi_filter: Option<Box<dyn Fn(i16) -> bool + Send + Sync>>,
43 service_filter: Option<Box<dyn Fn(&Vec<Uuid>, &Uuid) -> bool + Send + Sync>>,
45 characteristics_filter: Option<Box<dyn Fn(&Vec<Uuid>) -> bool + Send + Sync>>,
47 max_results: Option<usize>,
49 timeout: Option<Duration>,
51 force_disconnect: bool,
53}
54
55impl ScanConfig {
56 #[inline]
58 pub fn adapter_index(mut self, index: usize) -> Self {
59 self.adapter_index = index;
60 self
61 }
62
63 #[inline]
64 pub fn with_filters(mut self, filters: &[Filter]) -> Self {
65 self.filters.extend_from_slice(filters);
66 self
67 }
68
69 #[inline]
71 pub fn filter_by_address(
72 mut self,
73 func: impl Fn(&str) -> bool + Send + Sync + 'static,
74 ) -> Self {
75 self.address_filter = Some(Box::new(func));
76 self
77 }
78
79 #[inline]
81 pub fn filter_by_name(mut self, func: impl Fn(&str) -> bool + Send + Sync + 'static) -> Self {
82 self.name_filter = Some(Box::new(func));
83 self
84 }
85
86 #[inline]
87 pub fn filter_by_rssi(mut self, func: impl Fn(i16) -> bool + Send + Sync + 'static) -> Self {
88 self.rssi_filter = Some(Box::new(func));
89 self
90 }
91
92 #[inline]
93 pub fn filter_by_service(
94 mut self,
95 func: impl Fn(&Vec<Uuid>, &Uuid) -> bool + Send + Sync + 'static,
96 ) -> Self {
97 self.service_filter = Some(Box::new(func));
98 self
99 }
100
101 #[inline]
103 pub fn filter_by_characteristics(
104 mut self,
105 func: impl Fn(&Vec<Uuid>) -> bool + Send + Sync + 'static,
106 ) -> Self {
107 self.characteristics_filter = Some(Box::new(func));
108 self
109 }
110
111 #[inline]
113 pub fn stop_after_matches(mut self, max_results: usize) -> Self {
114 self.max_results = Some(max_results);
115 self
116 }
117
118 #[inline]
120 pub fn stop_after_first_match(self) -> Self {
121 self.stop_after_matches(1)
122 }
123
124 #[inline]
126 pub fn stop_after_timeout(mut self, timeout: Duration) -> Self {
127 self.timeout = Some(timeout);
128 self
129 }
130
131 #[inline]
132 pub fn force_disconnect(mut self, force_disconnect: bool) -> Self {
133 self.force_disconnect = force_disconnect;
134 self
135 }
136
137 #[inline]
139 pub fn require_name(self) -> Self {
140 if self.name_filter.is_none() {
141 self.filter_by_name(|src| !src.is_empty())
142 } else {
143 self
144 }
145 }
146}
147
148#[derive(Debug, Clone)]
149pub(crate) struct Session {
150 pub(crate) _manager: Manager,
151 pub(crate) adapter: Adapter,
152}
153
154#[derive(Debug, Clone)]
155pub struct Scanner {
156 session: Weak<Session>,
157 event_sender: Sender<DeviceEvent>,
158 stoppers: Arc<RwLock<Vec<Trigger>>>,
159 scan_stopper: Arc<AtomicBool>,
160}
161
162impl Default for Scanner {
163 fn default() -> Self {
164 Scanner::new()
165 }
166}
167
168impl Scanner {
169 pub fn new() -> Self {
170 let (event_sender, _) = broadcast::channel(32);
171 Self {
172 scan_stopper: Arc::new(AtomicBool::new(false)),
173 session: Weak::new(),
174 event_sender,
175 stoppers: Arc::new(RwLock::new(Vec::new())),
176 }
177 }
178
179 pub async fn start(&mut self, config: ScanConfig) -> Result<(), Error> {
181 if self.session.upgrade().is_some() {
182 log::info!("Scanner is already started.");
183 return Ok(());
184 }
185
186 let manager = Manager::new().await?;
187 let mut adapters = manager.adapters().await?;
188
189 if config.adapter_index >= adapters.len() {
190 return Err(Error::DeviceNotFound);
191 }
192
193 let adapter = adapters.swap_remove(config.adapter_index);
194 log::trace!("Using adapter: {:?}", adapter);
195
196 let session = Arc::new(Session {
197 _manager: manager,
198 adapter,
199 });
200 self.session = Arc::downgrade(&session);
201
202 let event_sender = self.event_sender.clone();
203
204 let mut worker = ScannerWorker::new(
205 config,
206 session.clone(),
207 event_sender,
208 self.scan_stopper.clone(),
209 );
210 tokio::spawn(async move {
211 let _ = worker.scan().await;
212 });
213
214 Ok(())
215 }
216
217 pub async fn stop(&self) -> Result<(), Error> {
219 self.scan_stopper.store(true, Ordering::Relaxed);
220 self.stoppers.write()?.clear();
221 log::info!("Scanner is stopped.");
222
223 Ok(())
224 }
225
226 pub fn is_active(&self) -> bool {
228 self.session.upgrade().is_some()
229 }
230
231 pub fn device_event_stream(
233 &self,
234 ) -> Result<Valved<Pin<Box<dyn Stream<Item = DeviceEvent> + Send>>>, Error> {
235 let receiver = self.event_sender.subscribe();
236
237 let stream: Pin<Box<dyn Stream<Item = DeviceEvent> + Send>> =
238 Box::pin(BroadcastStream::new(receiver).filter_map(|x| match x {
239 Ok(event) => {
240 log::debug!("Broadcasting device: {:?}", event);
241 Some(event)
242 }
243 Err(e) => {
244 log::warn!("Error: {:?} when broadcasting device event!", e);
245 None
246 }
247 }));
248
249 let (trigger, stream) = Valved::new(stream);
250 self.stoppers.write()?.push(trigger);
251
252 Ok(stream)
253 }
254
255 pub fn device_stream(
257 &self,
258 ) -> Result<Valved<Pin<Box<dyn Stream<Item = Device> + Send>>>, Error> {
259 let receiver = self.event_sender.subscribe();
260
261 let stream: Pin<Box<dyn Stream<Item = Device> + Send>> =
262 Box::pin(BroadcastStream::new(receiver).filter_map(|x| match x {
263 Ok(DeviceEvent::Discovered(device)) => {
264 log::debug!("Broadcasting device: {:?}", device.address());
265 Some(device)
266 }
267 Err(e) => {
268 log::warn!("Error: {:?} when broadcasting device!", e);
269 None
270 }
271 _ => None,
272 }));
273
274 let (trigger, stream) = Valved::new(stream);
275 self.stoppers.write()?.push(trigger);
276
277 Ok(stream)
278 }
279}
280
281pub struct ScannerWorker {
282 config: ScanConfig,
284 session: Arc<Session>,
286 result_count: usize,
288 filtered: HashSet<PeripheralId>,
290 connecting: Arc<Mutex<HashSet<PeripheralId>>>,
292 matched: HashSet<PeripheralId>,
294 event_sender: Sender<DeviceEvent>,
296 stopper: Arc<AtomicBool>,
298}
299
300impl ScannerWorker {
301 fn new(
302 config: ScanConfig,
303 session: Arc<Session>,
304 event_sender: Sender<DeviceEvent>,
305 stopper: Arc<AtomicBool>,
306 ) -> Self {
307 Self {
308 config,
309 session,
310 result_count: 0,
311 filtered: HashSet::new(),
312 connecting: Arc::new(Mutex::new(HashSet::new())),
313 matched: HashSet::new(),
314 event_sender,
315 stopper,
316 }
317 }
318
319 async fn scan(&mut self) -> Result<(), Error> {
320 log::info!("Starting the scan");
321
322 self.session.adapter.start_scan(Default::default()).await?;
323
324 while let Ok(mut stream) = self.session.adapter.events().await {
325 let start_time = Instant::now();
326
327 while let Some(event) = stream.next().await {
328 match event {
329 CentralEvent::DeviceDiscovered(v) => self.on_device_discovered(v).await,
330 CentralEvent::DeviceUpdated(v) => self.on_device_updated(v).await,
331 CentralEvent::DeviceConnected(v) => self.on_device_connected(v).await?,
332 CentralEvent::DeviceDisconnected(v) => self.on_device_disconnected(v).await?,
333 _ => {}
334 }
335
336 let timeout_reached = self
337 .config
338 .timeout
339 .filter(|timeout| Instant::now().duration_since(start_time).ge(timeout))
340 .is_some();
341
342 let max_result_reached = self
343 .config
344 .max_results
345 .filter(|max_results| self.result_count >= *max_results)
346 .is_some();
347
348 if timeout_reached || max_result_reached || self.stopper.load(Ordering::Relaxed) {
349 log::info!("Scanner stop condition reached.");
350 return Ok(());
351 }
352 }
353 }
354
355 Ok(())
356 }
357
358 async fn on_device_discovered(&mut self, peripheral_id: PeripheralId) {
359 if let Ok(peripheral) = self.session.adapter.peripheral(&peripheral_id).await {
360 log::trace!("Device discovered: {:?}", peripheral);
361
362 self.apply_filter(peripheral_id).await;
363 }
364 }
365
366 async fn on_device_updated(&mut self, peripheral_id: PeripheralId) {
367 if let Ok(peripheral) = self.session.adapter.peripheral(&peripheral_id).await {
368 log::trace!("Device updated: {:?}", peripheral);
369
370 if self.matched.contains(&peripheral_id) {
371 let address = peripheral.address();
372 match self.event_sender.send(DeviceEvent::Updated(Device::new(
373 self.session.adapter.clone(),
374 peripheral,
375 ))) {
376 Ok(value) => log::debug!("Sent device: {}, size: {}...", address, value),
377 Err(e) => log::debug!("Error: {:?} when Sending device: {}...", e, address),
378 }
379 } else {
380 self.apply_filter(peripheral_id).await;
381 }
382 }
383 }
384
385 async fn on_device_connected(&mut self, peripheral_id: PeripheralId) -> Result<(), Error> {
386 self.connecting.lock()?.remove(&peripheral_id);
387
388 if let Ok(peripheral) = self.session.adapter.peripheral(&peripheral_id).await {
389 log::trace!("Device connected: {:?}", peripheral);
390
391 if self.matched.contains(&peripheral_id) {
392 let address = peripheral.address();
393 match self.event_sender.send(DeviceEvent::Connected(Device::new(
394 self.session.adapter.clone(),
395 peripheral,
396 ))) {
397 Ok(value) => log::trace!("Sent device: {}, size: {}...", address, value),
398 Err(e) => log::warn!("Error: {:?} when Sending device: {}...", e, address),
399 }
400 } else {
401 self.apply_filter(peripheral_id).await;
402 }
403 }
404
405 Ok(())
406 }
407
408 async fn on_device_disconnected(&self, peripheral_id: PeripheralId) -> Result<(), Error> {
409 if let Ok(peripheral) = self.session.adapter.peripheral(&peripheral_id).await {
410 log::trace!("Device disconnected: {:?}", peripheral);
411
412 if self.matched.contains(&peripheral_id) {
413 let address = peripheral.address();
414 match self
415 .event_sender
416 .send(DeviceEvent::Disconnected(Device::new(
417 self.session.adapter.clone(),
418 peripheral,
419 ))) {
420 Ok(value) => log::trace!("Sent device: {}, size: {}...", address, value),
421 Err(e) => log::warn!("Error: {:?} when Sending device: {}...", e, address),
422 }
423 }
424 }
425
426 self.connecting.lock()?.remove(&peripheral_id);
427
428 Ok(())
429 }
430
431 async fn apply_filter(&mut self, peripheral_id: PeripheralId) {
432 if self.filtered.contains(&peripheral_id) {
433 return;
434 }
435
436 if let Ok(peripheral) = self.session.adapter.peripheral(&peripheral_id).await {
437 if let Ok(Some(property)) = peripheral.properties().await {
438 let mut passed = true;
439 log::trace!("filtering: {:?}", property);
440
441 for filter in self.config.filters.iter() {
442 if !passed {
443 break;
444 }
445 match filter {
446 Filter::Name(v) => {
447 passed &= property.local_name.as_ref().is_some_and(|name| {
448 if let Some(name_filter) = &self.config.name_filter {
449 name_filter(name)
450 } else {
451 name == v
452 }
453 })
454 }
455 Filter::Rssi(v) => {
456 passed &= property.rssi.is_some_and(|rssi| {
457 if let Some(rssi_filter) = &self.config.rssi_filter {
458 rssi_filter(rssi)
459 } else {
460 rssi >= *v
461 }
462 });
463 }
464 Filter::Service(v) => {
465 let services = &property.services;
466 if let Some(service_filter) = &self.config.service_filter {
467 passed &= service_filter(&services, v);
468 } else {
469 passed &= property.services.contains(v);
470 }
471 }
472 Filter::Address(v) => {
473 let addr = property.address.to_string();
474 if let Some(address_filter) = &self.config.address_filter {
475 passed &= address_filter(&addr);
476 } else {
477 passed &= addr == *v;
478 }
479 }
480 Filter::Characteristic(v) => {
481 let _ = self
482 .apply_character_filter(&peripheral, v, &mut passed)
483 .await;
484 }
485 }
486 }
487
488 if passed {
489 self.matched.insert(peripheral_id.clone());
490 self.result_count += 1;
491
492 if let Err(e) = self.event_sender.send(DeviceEvent::Discovered(Device::new(
493 self.session.adapter.clone(),
494 peripheral,
495 ))) {
496 log::warn!("error: {} when sending device", e);
497 }
498 }
499
500 log::debug!(
501 "current matched: {}, current filtered: {}",
502 self.matched.len(),
503 self.filtered.len()
504 );
505 }
506
507 self.filtered.insert(peripheral_id);
508 }
509 }
510
511 async fn apply_character_filter(
512 &self,
513 peripheral: &Peripheral,
514 uuid: &Uuid,
515 passed: &mut bool,
516 ) -> Result<(), Error> {
517 if !peripheral.is_connected().await.unwrap_or(false) {
518 if self.connecting.lock()?.insert(peripheral.id()) {
519 log::debug!("Connecting to device {}", peripheral.address());
520
521 let connecting_map = self.connecting.clone();
524 if let Err(e) = peripheral.connect().await {
525 log::warn!("Could not connect to {}: {:?}", peripheral.address(), e);
526
527 connecting_map.lock()?.remove(&peripheral.id());
528
529 return Ok(());
530 };
531 }
532 }
533
534 let mut characteristics = Vec::new();
535 characteristics.extend(peripheral.characteristics());
536
537 if self.config.force_disconnect {
538 if let Err(e) = peripheral.disconnect().await {
539 log::warn!("Error: {} when disconnect device", e);
540 }
541 }
542
543 *passed &= if characteristics.is_empty() {
544 let address = peripheral.address();
545 log::debug!("Discovering characteristics for {}", address);
546
547 match peripheral.discover_services().await {
548 Ok(()) => {
549 characteristics.extend(peripheral.characteristics());
550 let characteristics = characteristics
551 .into_iter()
552 .map(|c| c.uuid)
553 .collect::<Vec<_>>();
554
555 if let Some(characteristics_filter) = &self.config.characteristics_filter {
556 characteristics_filter(&characteristics)
557 } else {
558 characteristics.contains(uuid)
559 }
560 }
561 Err(e) => {
562 log::warn!(
563 "Error: `{:?}` when discovering characteristics for {}",
564 e,
565 address
566 );
567 false
568 }
569 }
570 } else {
571 true
572 };
573
574 Ok(())
575 }
576}
577
578#[cfg(test)]
579mod tests {
580 use super::{Filter, ScanConfig, Scanner};
581 use crate::Device;
582 use btleplug::{api::BDAddr, Error};
583 use std::{future::Future, time::Duration};
584 use tokio_stream::StreamExt;
585 use uuid::Uuid;
586
587 async fn device_stream<T: Future<Output = ()>>(
588 scanner: Scanner,
589 callback: impl Fn(Device) -> T,
590 ) {
591 let duration = Duration::from_millis(15_000);
592 if let Err(_) = tokio::time::timeout(duration, async move {
593 if let Ok(mut stream) = scanner.device_stream() {
594 while let Some(device) = stream.next().await {
595 callback(device).await;
596 break;
597 }
598 }
599 })
600 .await
601 {
602 eprintln!("timeout....");
603 }
604 }
605
606 #[tokio::test]
607 async fn test_filter_by_address() -> Result<(), Error> {
608 pretty_env_logger::init();
609
610 let mac_addr = [0xE3, 0x9E, 0x2A, 0x4D, 0xAA, 0x97];
611 let filers = vec![Filter::Address("E3:9E:2A:4D:AA:97".into())];
612 let cfg = ScanConfig::default()
613 .with_filters(&filers)
614 .stop_after_first_match();
615 let mut scanner = Scanner::default();
616
617 scanner.start(cfg).await?;
618 device_stream(scanner, |device| async move {
619 assert_eq!(device.address(), BDAddr::from(mac_addr));
620 })
621 .await;
622
623 Ok(())
624 }
625
626 #[tokio::test]
627 async fn test_filter_by_character() -> Result<(), Error> {
628 pretty_env_logger::init();
629
630 let filers = vec![Filter::Characteristic(Uuid::from_u128(
631 0x6e400001_b5a3_f393_e0a9_e50e24dcca9e,
632 ))];
633 let cfg = ScanConfig::default()
634 .with_filters(&filers)
635 .stop_after_first_match();
636 let mut scanner = Scanner::default();
637
638 scanner.start(cfg).await?;
639 device_stream(scanner, |device| async move {
640 println!("device: {:?} found", device);
641 })
642 .await;
643
644 Ok(())
645 }
646
647 #[tokio::test]
648 async fn test_filter_by_name() -> Result<(), Error> {
649 pretty_env_logger::init();
650
651 let name = "73429485";
652 let filers = vec![Filter::Name(name.into())];
653 let cfg = ScanConfig::default()
654 .with_filters(&filers)
655 .stop_after_first_match();
656 let mut scanner = Scanner::default();
657
658 scanner.start(cfg).await?;
659 device_stream(scanner, |device| async move {
660 assert_eq!(device.local_name().await, Some(name.into()));
661 })
662 .await;
663
664 Ok(())
665 }
666
667 #[tokio::test]
668 async fn test_filter_by_rssi() -> Result<(), Error> {
669 pretty_env_logger::init();
670
671 let filers = vec![Filter::Rssi(-70)];
672 let cfg = ScanConfig::default()
673 .with_filters(&filers)
674 .stop_after_first_match();
675 let mut scanner = Scanner::default();
676
677 scanner.start(cfg).await?;
678 device_stream(scanner, |device| async move {
679 println!("device: {:?} found", device);
680 })
681 .await;
682
683 Ok(())
684 }
685
686 #[tokio::test]
687 async fn test_filter_by_service() -> Result<(), Error> {
688 pretty_env_logger::init();
689
690 let service = Uuid::from_u128(0x6e400001_b5a3_f393_e0a9_e50e24dcca9e);
691 let filers = vec![Filter::Service(service)];
692 let cfg = ScanConfig::default()
693 .with_filters(&filers)
694 .stop_after_first_match();
695 let mut scanner = Scanner::default();
696
697 scanner.start(cfg).await?;
698 device_stream(scanner, |device| async move {
699 println!("device: {:?} found", device);
700 })
701 .await;
702
703 Ok(())
704 }
705}