hap_ble/bluest_gatt.rs
1//! A [`GattConnection`] backed by the `bluest` crate, with a **reconnect-and-
2//! resume supervisor**: sleepy HAP accessories drop the link every few
3//! operations during the long attribute-database sweep, so each operation
4//! reconnects (re-discovering its characteristic handles by UUID) and retries
5//! on a clean disconnect, resuming where it left off.
6
7use crate::error::{BleError, Result};
8use crate::gatt::{
9 u16_le, AdvertSource, GattCharacteristic, GattConnection, GattService, RawAdvert,
10 HAP_INSTANCE_ID_DESC, HAP_SERVICE_ID_CHAR,
11};
12use async_trait::async_trait;
13use bluest::error::ErrorKind;
14use bluest::{Adapter, Characteristic, Device};
15use std::collections::HashMap;
16use std::sync::atomic::{AtomicU64, Ordering};
17use std::time::Duration;
18use tokio::sync::{mpsc, Mutex};
19
20/// Per-attempt timeout for re-establishing the link (connect + service
21/// discovery). On macOS a `connect_device` attempted while a scan is running can
22/// hang indefinitely; bounding it (as aiohomekit does via `bleak_retry_connector`)
23/// turns a wedged connect into a failed attempt the backstop can retry.
24const CONNECT_TIMEOUT: Duration = Duration::from_secs(10);
25
26/// The HAP Service-Signature characteristic — appears in *every* service. Only
27/// the one in the Protocol Information service is addressable/used; the rest are
28/// dropped during discovery so the (UUID-keyed) handle map doesn't collide and
29/// the generate-broadcast-key write reaches the correct characteristic.
30const SERVICE_SIGNATURE_CHAR: &str = "000000a5-0000-1000-8000-0026bb765291";
31/// The HAP Protocol Information service — the one whose Service-Signature char is
32/// the generate-broadcast-key target (matches aiohomekit's service-scoped lookup).
33const PROTOCOL_INFO_SERVICE: &str = "000000a2-0000-1000-8000-0026bb765291";
34
35/// Consecutive reconnects allowed *within a single operation* before it gives up
36/// (a runaway backstop). This bounds one stuck read/write, not the connection's
37/// lifetime — a sleepy accessory may legitimately drop the link on most
38/// operations, so a healthy long-lived subscription can far exceed this in
39/// aggregate; only a link that will not stay up for one op long enough to make
40/// progress trips it.
41const MAX_OP_RECONNECTS: u32 = 8;
42
43/// Map a bluest error to a [`BleError`], classifying link-loss conditions as
44/// [`BleError::Disconnected`] (so the supervisor reconnects) from bluest's typed
45/// [`ErrorKind`] rather than by string-matching. A [`Timeout`](ErrorKind::Timeout)
46/// is treated as a disconnect: on some platforms a dropped link surfaces as a
47/// read/write timeout, and a reconnect+retry against a merely-slow accessory is
48/// cheap and self-correcting.
49// By value for ergonomic `.map_err(be)`.
50#[allow(clippy::needless_pass_by_value)]
51pub(crate) fn be(e: bluest::Error) -> BleError {
52 match e.kind() {
53 ErrorKind::NotConnected
54 | ErrorKind::AdapterUnavailable
55 | ErrorKind::ConnectionFailed
56 | ErrorKind::ServiceChanged
57 | ErrorKind::NotReady
58 | ErrorKind::Timeout => BleError::Disconnected,
59 _ => BleError::Backend(e.to_string()),
60 }
61}
62
63/// Whether an error means the link dropped (so reconnecting may recover).
64fn is_disconnect(e: &BleError) -> bool {
65 matches!(e, BleError::Disconnected)
66}
67
68/// The discovered structure of one service: its UUID and its characteristics'
69/// UUIDs (stable across reconnects, unlike the bluest handles).
70#[derive(Clone)]
71struct ServiceShape {
72 uuid: String,
73 char_uuids: Vec<String>,
74}
75
76/// A `GattConnection` over a connected `bluest` [`Device`] that reconnects and
77/// retries on a dropped link.
78pub struct BluestConnection {
79 adapter: Adapter,
80 device: Device,
81 /// Lowercased characteristic UUID -> the (current) bluest handle.
82 chars: Mutex<HashMap<String, Characteristic>>,
83 /// The service/characteristic UUID structure (stable across reconnects).
84 shape: Vec<ServiceShape>,
85 /// Increments on every reconnect — also the backstop count. A change since a
86 /// secure session was established means the accessory dropped that session.
87 generation: AtomicU64,
88}
89
90impl BluestConnection {
91 /// Wrap an already-connected device, discovering its services and
92 /// characteristics.
93 ///
94 /// # Errors
95 /// Returns [`BleError::Backend`] on a bluest discovery failure.
96 pub async fn new(adapter: Adapter, device: Device) -> Result<Self> {
97 let (chars, shape) = Self::discover(&device).await?;
98 Ok(Self {
99 adapter,
100 device,
101 chars: Mutex::new(chars),
102 shape,
103 generation: AtomicU64::new(0),
104 })
105 }
106
107 /// Release the active GATT link. A sleepy HAP accessory only advertises and
108 /// emits encrypted broadcasts while disconnected, and on macOS CoreBluetooth
109 /// filters a connected peripheral out of scan results — so a caller watching
110 /// for sleepy events must disconnect after setup. A subsequent encrypted
111 /// operation (e.g. a disconnected-event poll read) transparently reconnects
112 /// via the supervisor.
113 pub async fn disconnect(&self) {
114 let _ = self.adapter.disconnect_device(&self.device).await;
115 }
116
117 async fn discover(
118 device: &Device,
119 ) -> Result<(HashMap<String, Characteristic>, Vec<ServiceShape>)> {
120 let mut chars = HashMap::new();
121 let mut shape = Vec::new();
122 for svc in device.discover_services().await.map_err(be)? {
123 let svc_uuid = svc.uuid().to_string().to_ascii_lowercase();
124 let is_protocol_info = svc_uuid == PROTOCOL_INFO_SERVICE;
125 let mut char_uuids = Vec::new();
126 for ch in svc.discover_characteristics().await.map_err(be)? {
127 let uuid = ch.uuid().to_string().to_ascii_lowercase();
128 char_uuids.push(uuid.clone());
129 // The Service-Signature char exists in every service and they all
130 // share one UUID; keep only the Protocol Information service's so
131 // the UUID-keyed handle map resolves the generate-broadcast-key
132 // target deterministically (and survives reconnects, unlike an
133 // iid-keyed map that would need a re-sweep).
134 if uuid == SERVICE_SIGNATURE_CHAR && !is_protocol_info {
135 continue;
136 }
137 chars.insert(uuid, ch);
138 }
139 shape.push(ServiceShape {
140 uuid: svc.uuid().to_string(),
141 char_uuids,
142 });
143 }
144 Ok((chars, shape))
145 }
146
147 /// Re-establish the link and rebuild the characteristic handle map, advancing
148 /// the link [`generation`](Self::generation). The UUID structure
149 /// ([`shape`](Self::shape)) is unchanged.
150 async fn reconnect(&self) -> Result<()> {
151 self.generation.fetch_add(1, Ordering::SeqCst);
152 let _ = self.adapter.disconnect_device(&self.device).await;
153 let _ = self.adapter.wait_available().await;
154 // Bound the connect + service discovery: a connect attempted while a scan
155 // is running can wedge on macOS, so a timeout surfaces as a recoverable
156 // disconnect that the per-operation backstop retries rather than hanging.
157 let establish = async {
158 self.adapter
159 .connect_device(&self.device)
160 .await
161 .map_err(be)?;
162 Self::discover(&self.device).await
163 };
164 let (fresh, _shape) = tokio::time::timeout(CONNECT_TIMEOUT, establish)
165 .await
166 .map_err(|_| BleError::Disconnected)??;
167 *self.chars.lock().await = fresh;
168 Ok(())
169 }
170
171 /// Reconnect for one in-flight operation, giving up once a single operation
172 /// has forced [`MAX_OP_RECONNECTS`] reconnects without making progress (the
173 /// link will not stay up long enough to complete it). `attempts` is the
174 /// per-operation reconnect count, owned by the caller's retry loop — it does
175 /// not bound the connection's lifetime.
176 async fn reconnect_bounded(&self, attempts: &mut u32) -> Result<()> {
177 *attempts += 1;
178 if *attempts > MAX_OP_RECONNECTS {
179 return Err(BleError::Disconnected);
180 }
181 self.reconnect().await
182 }
183
184 /// Look up the current handle for a characteristic UUID.
185 async fn handle(&self, char_uuid: &str) -> Result<Characteristic> {
186 self.chars
187 .lock()
188 .await
189 .get(&char_uuid.to_ascii_lowercase())
190 .cloned()
191 .ok_or(BleError::MalformedPdu("gatt characteristic not found"))
192 }
193
194 /// Read a characteristic's HAP instance-id descriptor, reconnecting on drop.
195 async fn read_iid(&self, char_uuid: &str) -> Result<Option<u16>> {
196 let mut attempts = 0;
197 loop {
198 let ch = self.handle(char_uuid).await?;
199 let attempt = async {
200 let descriptors = ch.discover_descriptors().await.map_err(be)?;
201 let Some(desc) = descriptors.iter().find(|d| {
202 d.uuid()
203 .to_string()
204 .eq_ignore_ascii_case(HAP_INSTANCE_ID_DESC)
205 }) else {
206 return Ok(None);
207 };
208 Ok(u16_le(&desc.read().await.map_err(be)?))
209 }
210 .await;
211 match attempt {
212 Ok(v) => return Ok(v),
213 Err(ref e) if is_disconnect(e) => self.reconnect_bounded(&mut attempts).await?,
214 Err(e) => return Err(e),
215 }
216 }
217 }
218}
219
220#[async_trait]
221impl GattConnection for BluestConnection {
222 async fn instance_id(&self, char_uuid: &str) -> Result<u16> {
223 self.read_iid(char_uuid)
224 .await?
225 .ok_or(BleError::MalformedPdu("no instance id descriptor"))
226 }
227
228 async fn max_write(&self) -> usize {
229 // The MTU is connection-wide, so any characteristic's max write works.
230 let ch = self.chars.lock().await.values().next().cloned();
231 ch.and_then(|c| c.max_write_len().ok())
232 .map_or(crate::gatt::DEFAULT_FRAGMENT_SIZE, |n| n.clamp(20, 512))
233 }
234
235 async fn generation(&self) -> u64 {
236 self.generation.load(Ordering::SeqCst)
237 }
238
239 async fn write(&self, char_uuid: &str, value: &[u8]) -> Result<()> {
240 let mut attempts = 0;
241 loop {
242 let ch = self.handle(char_uuid).await?;
243 match ch.write(value).await.map_err(be) {
244 Ok(()) => return Ok(()),
245 Err(ref e) if is_disconnect(e) => self.reconnect_bounded(&mut attempts).await?,
246 Err(e) => return Err(e),
247 }
248 }
249 }
250
251 async fn read(&self, char_uuid: &str) -> Result<Vec<u8>> {
252 let mut attempts = 0;
253 loop {
254 let ch = self.handle(char_uuid).await?;
255 match ch.read().await.map_err(be) {
256 Ok(v) => return Ok(v),
257 Err(ref e) if is_disconnect(e) => self.reconnect_bounded(&mut attempts).await?,
258 Err(e) => return Err(e),
259 }
260 }
261 }
262
263 // Connected GATT notify is BEST-EFFORT: the spawned task ends when the
264 // notification stream ends (a link drop). It is deliberately NOT re-armed and
265 // does NOT reconnect — a sleepy accessory intentionally drops idle links, so
266 // auto-reconnecting here causes a reconnect storm (validated on hardware).
267 // Durable events come from the advertisement channels (broadcast +
268 // disconnected-event poll); the session re-verifies lazily on the next read.
269 async fn subscribe(&self, char_uuid: &str) -> Result<mpsc::Receiver<Vec<u8>>> {
270 let ch = self.handle(char_uuid).await?;
271 let (tx, rx) = mpsc::channel(16);
272 tokio::spawn(async move {
273 use tokio_stream::StreamExt as _;
274 if let Ok(mut stream) = ch.notify().await {
275 while let Some(item) = stream.next().await {
276 let Ok(v) = item else { break };
277 if tx.send(v).await.is_err() {
278 break;
279 }
280 }
281 }
282 });
283 Ok(rx)
284 }
285
286 async fn enumerate(&self) -> Result<Vec<GattService>> {
287 let mut services = Vec::new();
288 for svc in &self.shape {
289 let mut characteristics = Vec::new();
290 for char_uuid in &svc.char_uuids {
291 // The Service-Instance-ID characteristic is not a HAP
292 // characteristic; its value would need a paired read.
293 if char_uuid.eq_ignore_ascii_case(HAP_SERVICE_ID_CHAR) {
294 continue;
295 }
296 // The Service-Signature char is a service-level signature, not a
297 // model characteristic — skip it (it also shares a UUID across
298 // services, so reading it here would yield duplicate iids).
299 if char_uuid.eq_ignore_ascii_case(SERVICE_SIGNATURE_CHAR) {
300 continue;
301 }
302 // Per-characteristic resilient instance-id read: resumes the
303 // sweep across the device's periodic disconnects.
304 if let Some(iid) = self.read_iid(char_uuid).await? {
305 characteristics.push(GattCharacteristic {
306 uuid: char_uuid.clone(),
307 iid,
308 });
309 }
310 }
311 services.push(GattService {
312 uuid: svc.uuid.clone(),
313 iid: 0,
314 characteristics,
315 });
316 }
317 Ok(services)
318 }
319}
320
321/// Apple's Bluetooth company identifier; HAP advertisements live under it.
322const APPLE_COMPANY_ID: u16 = 0x004C;
323
324#[async_trait]
325impl AdvertSource for BluestConnection {
326 /// Stream Apple HAP advertisements by running a continuous adapter scan.
327 ///
328 /// Spawns a background task that feeds every Apple (company id `0x004C`)
329 /// manufacturer-data frame into the returned channel. The task stops when
330 /// the receiver is dropped or the adapter's scan stream ends.
331 ///
332 /// # Errors
333 /// Returns [`crate::error::BleError`] on adapter/scan failures.
334 async fn watch_adverts(&self) -> Result<mpsc::Receiver<RawAdvert>> {
335 let adapter = self.adapter.clone();
336 let (tx, rx) = mpsc::channel(32);
337 tokio::spawn(async move {
338 use tokio_stream::StreamExt as _;
339 let Ok(mut scan) = adapter.scan(&[]).await else {
340 return;
341 };
342 while let Some(adv) = scan.next().await {
343 let Some(md) = adv.adv_data.manufacturer_data else {
344 continue;
345 };
346 if md.company_id == APPLE_COMPANY_ID
347 && tx
348 .send(RawAdvert {
349 manufacturer_data: md.data,
350 })
351 .await
352 .is_err()
353 {
354 return; // receiver dropped — stop scanning
355 }
356 }
357 });
358 Ok(rx)
359 }
360}