1pub(crate) mod config;
2mod worker;
3
4use btleplug::{api::Manager as _, platform::Manager, Error};
5use std::{
6 pin::Pin,
7 sync::{
8 atomic::{AtomicBool, Ordering},
9 Arc, RwLock, Weak,
10 },
11};
12use stream_cancel::{Trigger, Valved};
13use tokio::sync::broadcast::{self, Sender};
14use tokio_stream::{wrappers::BroadcastStream, Stream, StreamExt};
15
16use self::{
17 config::ScanConfig,
18 worker::{ScanEvent, ScannerWorker, Session},
19};
20use crate::{Device, DeviceEvent};
21
22#[derive(Debug, Clone)]
23pub struct Scanner {
24 session: Weak<Session>,
25 event_sender: Sender<ScanEvent>,
26 stoppers: Arc<RwLock<Vec<Trigger>>>,
27 scan_stopper: Arc<AtomicBool>,
28}
29
30impl Default for Scanner {
31 fn default() -> Self {
32 Scanner::new()
33 }
34}
35
36impl Scanner {
37 pub fn new() -> Self {
38 let (event_sender, _) = broadcast::channel(32);
39 Self {
40 scan_stopper: Arc::new(AtomicBool::new(false)),
41 session: Weak::new(),
42 event_sender,
43 stoppers: Arc::new(RwLock::new(Vec::new())),
44 }
45 }
46
47 pub async fn start(&mut self, config: ScanConfig) -> Result<(), Error> {
49 if self.session.upgrade().is_some() {
50 log::info!("Scanner is already started.");
51 return Ok(());
52 }
53
54 let manager = Manager::new().await?;
55 let mut adapters = manager.adapters().await?;
56
57 if config.adapter_index >= adapters.len() {
58 return Err(Error::DeviceNotFound);
59 }
60
61 let adapter = adapters.swap_remove(config.adapter_index);
62 log::trace!("Using adapter: {:?}", adapter);
63
64 let session = Arc::new(Session {
65 _manager: manager,
66 adapter,
67 });
68 self.session = Arc::downgrade(&session);
69
70 let event_sender = self.event_sender.clone();
71
72 let mut worker = ScannerWorker::new(
73 config,
74 session.clone(),
75 event_sender,
76 self.scan_stopper.clone(),
77 );
78 tokio::spawn(async move {
79 let _ = worker.scan().await;
80 });
81
82 Ok(())
83 }
84
85 pub async fn stop(&self) -> Result<(), Error> {
87 self.scan_stopper.store(true, Ordering::Relaxed);
88 self.stoppers.write()?.clear();
89 log::info!("Scanner is stopped.");
90
91 Ok(())
92 }
93
94 pub fn is_active(&self) -> bool {
96 self.session.upgrade().is_some()
97 }
98
99 pub fn device_event_stream(
101 &self,
102 ) -> Result<Valved<Pin<Box<dyn Stream<Item = DeviceEvent> + Send>>>, Error> {
103 let receiver = self.event_sender.subscribe();
104
105 let stream: Pin<Box<dyn Stream<Item = DeviceEvent> + Send>> = Box::pin(
106 BroadcastStream::new(receiver)
107 .take_while(|x| match x {
108 Ok(ScanEvent::ScanStopped) => {
109 log::info!("Received ScanStopped event, ending device event stream");
110 false
111 }
112 _ => true,
113 })
114 .filter_map(|x| match x {
115 Ok(ScanEvent::DeviceEvent(event)) => {
116 log::trace!("Broadcasting device: {:?}", event);
117 Some(event)
118 }
119 Err(e) => {
120 log::warn!("Error: {:?} when broadcasting device event!", e);
121 None
122 }
123 _ => None,
124 }),
125 );
126
127 let (trigger, stream) = Valved::new(stream);
128 self.stoppers.write()?.push(trigger);
129
130 Ok(stream)
131 }
132
133 pub fn device_stream(
135 &self,
136 ) -> Result<Valved<Pin<Box<dyn Stream<Item = Device> + Send>>>, Error> {
137 let receiver = self.event_sender.subscribe();
138
139 let stream: Pin<Box<dyn Stream<Item = Device> + Send>> = Box::pin(
140 BroadcastStream::new(receiver)
141 .take_while(|x| match x {
142 Ok(ScanEvent::ScanStopped) => {
143 log::info!("Received ScanStopped event, ending device stream");
144 false
145 }
146 _ => true,
147 })
148 .filter_map(|x| match x {
149 Ok(ScanEvent::DeviceEvent(DeviceEvent::Discovered(device))) => {
150 log::trace!("Broadcasting device: {:?}", device.address());
151 Some(device)
152 }
153 Err(e) => {
154 log::warn!("Error: {:?} when broadcasting device!", e);
155 None
156 }
157 _ => None,
158 }),
159 );
160
161 let (trigger, stream) = Valved::new(stream);
162 self.stoppers.write()?.push(trigger);
163
164 Ok(stream)
165 }
166}