Skip to main content

hap_ble/
bluest_gatt.rs

1//! A [`GattConnection`] backed by the `bluest` crate, with a **reconnect-and-
2//! resume supervisor**: sleepy HAP accessories drop the link every few
3//! operations during the long attribute-database sweep, so each operation
4//! reconnects (re-discovering its characteristic handles by UUID) and retries
5//! on a clean disconnect, resuming where it left off.
6
7use crate::error::{BleError, Result};
8use crate::gatt::{
9    u16_le, AdvertSource, GattCharacteristic, GattConnection, GattService, RawAdvert,
10    HAP_INSTANCE_ID_DESC, HAP_SERVICE_ID_CHAR,
11};
12use async_trait::async_trait;
13use bluest::error::ErrorKind;
14use bluest::{Adapter, Characteristic, Device};
15use std::collections::HashMap;
16use std::sync::atomic::{AtomicU64, Ordering};
17use std::time::Duration;
18use tokio::sync::{mpsc, Mutex};
19
20/// Per-attempt timeout for re-establishing the link (connect + service
21/// discovery). On macOS a `connect_device` attempted while a scan is running can
22/// hang indefinitely; bounding it (as aiohomekit does via `bleak_retry_connector`)
23/// turns a wedged connect into a failed attempt the backstop can retry.
24const CONNECT_TIMEOUT: Duration = Duration::from_secs(10);
25
26/// The HAP Service-Signature characteristic — appears in *every* service. Only
27/// the one in the Protocol Information service is addressable/used; the rest are
28/// dropped during discovery so the (UUID-keyed) handle map doesn't collide and
29/// the generate-broadcast-key write reaches the correct characteristic.
30const SERVICE_SIGNATURE_CHAR: &str = "000000a5-0000-1000-8000-0026bb765291";
31/// The HAP Protocol Information service — the one whose Service-Signature char is
32/// the generate-broadcast-key target (matches aiohomekit's service-scoped lookup).
33const PROTOCOL_INFO_SERVICE: &str = "000000a2-0000-1000-8000-0026bb765291";
34
35/// Consecutive reconnects allowed *within a single operation* before it gives up
36/// (a runaway backstop). This bounds one stuck read/write, not the connection's
37/// lifetime — a sleepy accessory may legitimately drop the link on most
38/// operations, so a healthy long-lived subscription can far exceed this in
39/// aggregate; only a link that will not stay up for one op long enough to make
40/// progress trips it.
41const MAX_OP_RECONNECTS: u32 = 8;
42
43/// Map a bluest error to a [`BleError`], classifying link-loss conditions as
44/// [`BleError::Disconnected`] (so the supervisor reconnects) from bluest's typed
45/// [`ErrorKind`] rather than by string-matching. A [`Timeout`](ErrorKind::Timeout)
46/// is treated as a disconnect: on some platforms a dropped link surfaces as a
47/// read/write timeout, and a reconnect+retry against a merely-slow accessory is
48/// cheap and self-correcting.
49// By value for ergonomic `.map_err(be)`.
50#[allow(clippy::needless_pass_by_value)]
51pub(crate) fn be(e: bluest::Error) -> BleError {
52    match e.kind() {
53        ErrorKind::NotConnected
54        | ErrorKind::AdapterUnavailable
55        | ErrorKind::ConnectionFailed
56        | ErrorKind::ServiceChanged
57        | ErrorKind::NotReady
58        | ErrorKind::Timeout => BleError::Disconnected,
59        _ => BleError::Backend(e.to_string()),
60    }
61}
62
63/// Whether an error means the link dropped (so reconnecting may recover).
64fn is_disconnect(e: &BleError) -> bool {
65    matches!(e, BleError::Disconnected)
66}
67
68/// The discovered structure of one service: its UUID and its characteristics'
69/// UUIDs (stable across reconnects, unlike the bluest handles).
70#[derive(Clone)]
71struct ServiceShape {
72    uuid: String,
73    char_uuids: Vec<String>,
74}
75
76/// A `GattConnection` over a connected `bluest` [`Device`] that reconnects and
77/// retries on a dropped link.
78pub struct BluestConnection {
79    adapter: Adapter,
80    device: Device,
81    /// Lowercased characteristic UUID -> the (current) bluest handle.
82    chars: Mutex<HashMap<String, Characteristic>>,
83    /// The service/characteristic UUID structure (stable across reconnects).
84    shape: Vec<ServiceShape>,
85    /// Increments on every reconnect — also the backstop count. A change since a
86    /// secure session was established means the accessory dropped that session.
87    generation: AtomicU64,
88}
89
90impl BluestConnection {
91    /// Wrap an already-connected device, discovering its services and
92    /// characteristics.
93    ///
94    /// # Errors
95    /// Returns [`BleError::Backend`] on a bluest discovery failure.
96    pub async fn new(adapter: Adapter, device: Device) -> Result<Self> {
97        let (chars, shape) = Self::discover(&device).await?;
98        Ok(Self {
99            adapter,
100            device,
101            chars: Mutex::new(chars),
102            shape,
103            generation: AtomicU64::new(0),
104        })
105    }
106
107    /// Release the active GATT link. A sleepy HAP accessory only advertises and
108    /// emits encrypted broadcasts while disconnected, and on macOS CoreBluetooth
109    /// filters a connected peripheral out of scan results — so a caller watching
110    /// for sleepy events must disconnect after setup. A subsequent encrypted
111    /// operation (e.g. a disconnected-event poll read) transparently reconnects
112    /// via the supervisor.
113    pub async fn disconnect(&self) {
114        let _ = self.adapter.disconnect_device(&self.device).await;
115    }
116
117    async fn discover(
118        device: &Device,
119    ) -> Result<(HashMap<String, Characteristic>, Vec<ServiceShape>)> {
120        let mut chars = HashMap::new();
121        let mut shape = Vec::new();
122        for svc in device.discover_services().await.map_err(be)? {
123            let svc_uuid = svc.uuid().to_string().to_ascii_lowercase();
124            let is_protocol_info = svc_uuid == PROTOCOL_INFO_SERVICE;
125            let mut char_uuids = Vec::new();
126            for ch in svc.discover_characteristics().await.map_err(be)? {
127                let uuid = ch.uuid().to_string().to_ascii_lowercase();
128                char_uuids.push(uuid.clone());
129                // The Service-Signature char exists in every service and they all
130                // share one UUID; keep only the Protocol Information service's so
131                // the UUID-keyed handle map resolves the generate-broadcast-key
132                // target deterministically (and survives reconnects, unlike an
133                // iid-keyed map that would need a re-sweep).
134                if uuid == SERVICE_SIGNATURE_CHAR && !is_protocol_info {
135                    continue;
136                }
137                chars.insert(uuid, ch);
138            }
139            shape.push(ServiceShape {
140                uuid: svc.uuid().to_string(),
141                char_uuids,
142            });
143        }
144        Ok((chars, shape))
145    }
146
147    /// Re-establish the link and rebuild the characteristic handle map, advancing
148    /// the link [`generation`](Self::generation). The UUID structure
149    /// ([`shape`](Self::shape)) is unchanged.
150    async fn reconnect(&self) -> Result<()> {
151        self.generation.fetch_add(1, Ordering::SeqCst);
152        let _ = self.adapter.disconnect_device(&self.device).await;
153        let _ = self.adapter.wait_available().await;
154        // Bound the connect + service discovery: a connect attempted while a scan
155        // is running can wedge on macOS, so a timeout surfaces as a recoverable
156        // disconnect that the per-operation backstop retries rather than hanging.
157        let establish = async {
158            self.adapter
159                .connect_device(&self.device)
160                .await
161                .map_err(be)?;
162            Self::discover(&self.device).await
163        };
164        let (fresh, _shape) = tokio::time::timeout(CONNECT_TIMEOUT, establish)
165            .await
166            .map_err(|_| BleError::Disconnected)??;
167        *self.chars.lock().await = fresh;
168        Ok(())
169    }
170
171    /// Reconnect for one in-flight operation, giving up once a single operation
172    /// has forced [`MAX_OP_RECONNECTS`] reconnects without making progress (the
173    /// link will not stay up long enough to complete it). `attempts` is the
174    /// per-operation reconnect count, owned by the caller's retry loop — it does
175    /// not bound the connection's lifetime.
176    async fn reconnect_bounded(&self, attempts: &mut u32) -> Result<()> {
177        *attempts += 1;
178        if *attempts > MAX_OP_RECONNECTS {
179            return Err(BleError::Disconnected);
180        }
181        self.reconnect().await
182    }
183
184    /// Look up the current handle for a characteristic UUID.
185    async fn handle(&self, char_uuid: &str) -> Result<Characteristic> {
186        self.chars
187            .lock()
188            .await
189            .get(&char_uuid.to_ascii_lowercase())
190            .cloned()
191            .ok_or(BleError::MalformedPdu("gatt characteristic not found"))
192    }
193
194    /// Read a characteristic's HAP instance-id descriptor, reconnecting on drop.
195    async fn read_iid(&self, char_uuid: &str) -> Result<Option<u16>> {
196        let mut attempts = 0;
197        loop {
198            let ch = self.handle(char_uuid).await?;
199            let attempt = async {
200                let descriptors = ch.discover_descriptors().await.map_err(be)?;
201                let Some(desc) = descriptors.iter().find(|d| {
202                    d.uuid()
203                        .to_string()
204                        .eq_ignore_ascii_case(HAP_INSTANCE_ID_DESC)
205                }) else {
206                    return Ok(None);
207                };
208                Ok(u16_le(&desc.read().await.map_err(be)?))
209            }
210            .await;
211            match attempt {
212                Ok(v) => return Ok(v),
213                Err(ref e) if is_disconnect(e) => self.reconnect_bounded(&mut attempts).await?,
214                Err(e) => return Err(e),
215            }
216        }
217    }
218}
219
220#[async_trait]
221impl GattConnection for BluestConnection {
222    async fn instance_id(&self, char_uuid: &str) -> Result<u16> {
223        self.read_iid(char_uuid)
224            .await?
225            .ok_or(BleError::MalformedPdu("no instance id descriptor"))
226    }
227
228    async fn max_write(&self) -> usize {
229        // The MTU is connection-wide, so any characteristic's max write works.
230        let ch = self.chars.lock().await.values().next().cloned();
231        ch.and_then(|c| c.max_write_len().ok())
232            .map_or(crate::gatt::DEFAULT_FRAGMENT_SIZE, |n| n.clamp(20, 512))
233    }
234
235    async fn generation(&self) -> u64 {
236        self.generation.load(Ordering::SeqCst)
237    }
238
239    async fn write(&self, char_uuid: &str, value: &[u8]) -> Result<()> {
240        let mut attempts = 0;
241        loop {
242            let ch = self.handle(char_uuid).await?;
243            match ch.write(value).await.map_err(be) {
244                Ok(()) => return Ok(()),
245                Err(ref e) if is_disconnect(e) => self.reconnect_bounded(&mut attempts).await?,
246                Err(e) => return Err(e),
247            }
248        }
249    }
250
251    async fn read(&self, char_uuid: &str) -> Result<Vec<u8>> {
252        let mut attempts = 0;
253        loop {
254            let ch = self.handle(char_uuid).await?;
255            match ch.read().await.map_err(be) {
256                Ok(v) => return Ok(v),
257                Err(ref e) if is_disconnect(e) => self.reconnect_bounded(&mut attempts).await?,
258                Err(e) => return Err(e),
259            }
260        }
261    }
262
263    // Connected GATT notify is BEST-EFFORT: the spawned task ends when the
264    // notification stream ends (a link drop). It is deliberately NOT re-armed and
265    // does NOT reconnect — a sleepy accessory intentionally drops idle links, so
266    // auto-reconnecting here causes a reconnect storm (validated on hardware).
267    // Durable events come from the advertisement channels (broadcast +
268    // disconnected-event poll); the session re-verifies lazily on the next read.
269    async fn subscribe(&self, char_uuid: &str) -> Result<mpsc::Receiver<Vec<u8>>> {
270        let ch = self.handle(char_uuid).await?;
271        let (tx, rx) = mpsc::channel(16);
272        tokio::spawn(async move {
273            use tokio_stream::StreamExt as _;
274            if let Ok(mut stream) = ch.notify().await {
275                while let Some(item) = stream.next().await {
276                    let Ok(v) = item else { break };
277                    if tx.send(v).await.is_err() {
278                        break;
279                    }
280                }
281            }
282        });
283        Ok(rx)
284    }
285
286    async fn enumerate(&self) -> Result<Vec<GattService>> {
287        let mut services = Vec::new();
288        for svc in &self.shape {
289            let mut characteristics = Vec::new();
290            for char_uuid in &svc.char_uuids {
291                // The Service-Instance-ID characteristic is not a HAP
292                // characteristic; its value would need a paired read.
293                if char_uuid.eq_ignore_ascii_case(HAP_SERVICE_ID_CHAR) {
294                    continue;
295                }
296                // The Service-Signature char is a service-level signature, not a
297                // model characteristic — skip it (it also shares a UUID across
298                // services, so reading it here would yield duplicate iids).
299                if char_uuid.eq_ignore_ascii_case(SERVICE_SIGNATURE_CHAR) {
300                    continue;
301                }
302                // Per-characteristic resilient instance-id read: resumes the
303                // sweep across the device's periodic disconnects.
304                if let Some(iid) = self.read_iid(char_uuid).await? {
305                    characteristics.push(GattCharacteristic {
306                        uuid: char_uuid.clone(),
307                        iid,
308                    });
309                }
310            }
311            services.push(GattService {
312                uuid: svc.uuid.clone(),
313                iid: 0,
314                characteristics,
315            });
316        }
317        Ok(services)
318    }
319}
320
321/// Apple's Bluetooth company identifier; HAP advertisements live under it.
322const APPLE_COMPANY_ID: u16 = 0x004C;
323
324#[async_trait]
325impl AdvertSource for BluestConnection {
326    /// Stream Apple HAP advertisements by running a continuous adapter scan.
327    ///
328    /// Spawns a background task that feeds every Apple (company id `0x004C`)
329    /// manufacturer-data frame into the returned channel. The task stops when
330    /// the receiver is dropped or the adapter's scan stream ends.
331    ///
332    /// # Errors
333    /// Returns [`crate::error::BleError`] on adapter/scan failures.
334    async fn watch_adverts(&self) -> Result<mpsc::Receiver<RawAdvert>> {
335        let adapter = self.adapter.clone();
336        let (tx, rx) = mpsc::channel(32);
337        tokio::spawn(async move {
338            use tokio_stream::StreamExt as _;
339            let Ok(mut scan) = adapter.scan(&[]).await else {
340                return;
341            };
342            while let Some(adv) = scan.next().await {
343                let Some(md) = adv.adv_data.manufacturer_data else {
344                    continue;
345                };
346                if md.company_id == APPLE_COMPANY_ID
347                    && tx
348                        .send(RawAdvert {
349                            manufacturer_data: md.data,
350                        })
351                        .await
352                        .is_err()
353                {
354                    return; // receiver dropped — stop scanning
355                }
356            }
357        });
358        Ok(rx)
359    }
360}