ble_peripheral_rust/peripheral/bluez/
mod.rs

1mod bluez_utils;
2mod characteristic_utils;
3
4use crate::{
5    error::{Error, ErrorType},
6    gatt::{
7        peripheral_event::{PeripheralEvent, PeripheralRequest},
8        service,
9    },
10};
11use async_trait::async_trait;
12use bluer::{
13    adv::{Advertisement, AdvertisementHandle},
14    gatt::{
15        local::{Application, ApplicationHandle, CharacteristicControlEvent},
16        CharacteristicWriter,
17    },
18    Adapter, AdapterEvent, AdapterProperty,
19};
20use bluez_utils::CharNotifyHandler;
21use characteristic_utils::parse_services;
22use futures::{channel::oneshot, StreamExt};
23use std::{
24    collections::{BTreeMap, BTreeSet, HashMap},
25    sync::{Arc, Mutex},
26};
27use tokio::sync::mpsc::Sender;
28use uuid::Uuid;
29
30use super::PeripheralImpl;
31
32#[derive(Debug)]
33pub struct Peripheral {
34    adapter: Adapter,
35    services: Vec<service::Service>,
36    adv_handle: Option<AdvertisementHandle>,
37    app_handle: Option<ApplicationHandle>,
38    sender_tx: Sender<PeripheralEvent>,
39    writers: Arc<Mutex<HashMap<Uuid, Arc<CharacteristicWriter>>>>,
40    _drop_tx: oneshot::Sender<()>,
41}
42
43#[async_trait]
44impl PeripheralImpl for Peripheral {
45    type Peripheral = Self;
46
47    async fn new(sender_tx: Sender<PeripheralEvent>) -> Result<Self, Error> {
48        let session = bluer::Session::new().await?;
49        let adapter = session.default_adapter().await?;
50        adapter.set_powered(true).await?;
51        log::debug!(
52            "Initialize Bluetooth adapter {} with address {}",
53            adapter.name(),
54            adapter.address().await?
55        );
56
57        let (drop_tx, drop_rx) = oneshot::channel();
58        if let Ok(mut adapter_stream) = adapter.events().await {
59            let sender = sender_tx.clone();
60            tokio::spawn(async move {
61                let stream_future = async {
62                    while let Some(AdapterEvent::PropertyChanged(event)) =
63                        adapter_stream.next().await
64                    {
65                        match event {
66                            AdapterProperty::ActiveAdvertisingInstances(i) => {
67                                log::debug!("ActiveAdvertisingInstances: {i}")
68                            }
69                            AdapterProperty::Powered(powered) => {
70                                if let Err(err) = sender
71                                    .send(PeripheralEvent::StateUpdate {
72                                        is_powered: powered,
73                                    })
74                                    .await
75                                {
76                                    log::error!("Error sending state update event: {:?}", err);
77                                }
78                            }
79                            _ => {}
80                        }
81                    }
82                };
83                tokio::select! {
84                    _ = stream_future => {},
85                    _ = drop_rx => {}
86                }
87            });
88        }
89
90        Ok(Peripheral {
91            adapter,
92            services: Vec::new(),
93            adv_handle: None,
94            app_handle: None,
95            sender_tx,
96            writers: Arc::new(Mutex::new(HashMap::new())),
97            _drop_tx: drop_tx,
98        })
99    }
100
101    async fn is_powered(&mut self) -> Result<bool, Error> {
102        let result = self.adapter.is_powered().await?;
103        return Ok(result);
104    }
105
106    async fn is_advertising(&mut self) -> Result<bool, Error> {
107        let result = self.adapter.active_advertising_instances().await?;
108        return Ok(result > 0 && self.adv_handle.is_some());
109    }
110
111    async fn start_advertising(&mut self, name: &str, uuids: &[Uuid]) -> Result<(), Error> {
112        let manufacturer_data = BTreeMap::new();
113
114        let mut services: BTreeSet<Uuid> = BTreeSet::new();
115        for uuid in uuids {
116            services.insert(*uuid);
117        }
118
119        let le_advertisement = Advertisement {
120            service_uuids: services,
121            manufacturer_data,
122            discoverable: Some(true),
123            local_name: Some(name.to_string()),
124            ..Default::default()
125        };
126        let adv_handle: AdvertisementHandle = self.adapter.advertise(le_advertisement).await?;
127
128        let (handlers, services) = parse_services(self.services.clone(), self.sender_tx.clone());
129
130        let app_handle = self
131            .adapter
132            .serve_gatt_application(Application {
133                services,
134                ..Default::default()
135            })
136            .await?;
137
138        self.setup_char_handlers(handlers);
139
140        self.adv_handle = Some(adv_handle);
141        self.app_handle = Some(app_handle);
142        Ok(())
143    }
144
145    async fn stop_advertising(&mut self) -> Result<(), Error> {
146        self.adv_handle = None;
147        self.app_handle = None;
148        Ok(())
149    }
150
151    async fn add_service(&mut self, service: &service::Service) -> Result<(), Error> {
152        self.services.push(service.clone());
153        Ok(())
154    }
155
156    async fn update_characteristic(
157        &mut self,
158        characteristic: Uuid,
159        value: Vec<u8>,
160    ) -> Result<(), Error> {
161        let writers = match self.writers.lock() {
162            Ok(w) => w,
163            Err(err) => return Err(Error::from_string(err.to_string(), ErrorType::Bluez)),
164        };
165        let writer = writers.get(&characteristic).cloned();
166        drop(writers);
167        tokio::spawn(async move {
168            if let Some(writer) = writer {
169                if let Err(err) = writer.send(&value).await {
170                    log::error!("Error sending value {err:?}")
171                }
172            }
173        });
174        Ok(())
175    }
176}
177
178impl Peripheral {
179    // Handle Characteristic Subscriptions
180    fn setup_char_handlers(&mut self, handlers: Vec<CharNotifyHandler>) {
181        for mut handler in handlers {
182            let sender_tx = self.sender_tx.clone();
183            let writers = self.writers.clone();
184
185            tokio::spawn(async move {
186                while let Some(CharacteristicControlEvent::Notify(writer)) =
187                    handler.control.next().await
188                {
189                    let writer = Arc::new(writer);
190
191                    let peripheral_request = PeripheralRequest {
192                        client: writer.device_address().to_string(),
193                        service: handler.service_uuid,
194                        characteristic: handler.characteristic_uuid,
195                    };
196
197                    if let Err(err) = sender_tx
198                        .send(PeripheralEvent::CharacteristicSubscriptionUpdate {
199                            request: peripheral_request.clone(),
200                            subscribed: true,
201                        })
202                        .await
203                    {
204                        log::error!("Error sending read request event: {:?}", err);
205                    }
206
207                    if let Ok(mut writers_lock) = writers.lock() {
208                        writers_lock.insert(handler.characteristic_uuid, writer.clone());
209                    } else {
210                        log::error!("Failed to lock writers for adding a writer");
211                    }
212
213                    if let Err(err) = writer.closed().await {
214                        log::error!("NotifyClosedErr {err:?}");
215                    }
216
217                    if let Ok(mut writers_lock) = writers.lock() {
218                        writers_lock.remove(&handler.characteristic_uuid);
219                    } else {
220                        log::error!("Failed to lock writers for removing a writer");
221                    }
222
223                    if let Err(err) = sender_tx
224                        .send(PeripheralEvent::CharacteristicSubscriptionUpdate {
225                            request: peripheral_request,
226                            subscribed: false,
227                        })
228                        .await
229                    {
230                        log::error!("Error sending read request event: {:?}", err);
231                    }
232                }
233            });
234        }
235    }
236}
237
238impl Drop for Peripheral {
239    fn drop(&mut self) {
240        // required for drop order
241    }
242}