lipl_gatt_bluer/
lib.rs

1#![doc = include_str!(concat!(env!("CARGO_MANIFEST_DIR"), "/README.md"))]
2
3use std::{collections::BTreeMap, sync::Arc, thread::JoinHandle, time::Duration};
4
5use bluer::{
6    adv::{Advertisement, AdvertisementHandle},
7    gatt::local::{Application, ApplicationHandle, Characteristic, Service},
8    Uuid,
9};
10use lipl_display_common::{BackgroundThread, Message};
11
12use futures_channel::mpsc;
13use futures_util::{Stream, StreamExt};
14use log::{error, trace};
15use pin_project::{pin_project, pinned_drop};
16use std::pin::Pin;
17use tokio::sync::Mutex;
18
19mod characteristic;
20mod error;
21
22pub use error::Error;
23pub type Result<T> = std::result::Result<T, Error>;
24
25#[pin_project(PinnedDrop)]
26pub struct MessageStream {
27    values_tx: mpsc::Sender<Message>,
28    #[pin]
29    values_rx: mpsc::Receiver<Message>,
30    adv_handle: Option<AdvertisementHandle>,
31    app_handle: Option<ApplicationHandle>,
32}
33
34#[pinned_drop]
35impl PinnedDrop for MessageStream {
36    fn drop(self: Pin<&mut Self>) {
37        let this = self.project();
38        if let Some(handle) = this.adv_handle.take() {
39            drop(handle);
40            trace!("Handle dropped for Advertisement");
41        };
42        if let Some(handle) = this.app_handle.take() {
43            drop(handle);
44            trace!("Handle dropped for Application");
45        };
46    }
47}
48
49impl Stream for MessageStream {
50    type Item = Message;
51    fn poll_next(
52        self: std::pin::Pin<&mut Self>,
53        cx: &mut std::task::Context<'_>,
54    ) -> std::task::Poll<Option<Self::Item>> {
55        self.project().values_rx.poll_next(cx)
56    }
57}
58
59/// Utility function so that dependent crates do not need tokio dependency
60pub fn create_runtime() -> Result<tokio::runtime::Runtime> {
61    tokio::runtime::Builder::new_current_thread()
62        .enable_all()
63        .build()
64        .map_err(|_| lipl_display_common::Error::Runtime)
65        .map_err(Error::Common)
66}
67
68async fn advertise(adapter: &bluer::Adapter) -> Result<AdvertisementHandle> {
69    let mut manufacturer_data = BTreeMap::new();
70    manufacturer_data.insert(
71        lipl_display_common::MANUFACTURER_ID,
72        vec![0x21, 0x22, 0x23, 0x24],
73    );
74    let le_advertisement = Advertisement {
75        service_uuids: vec![lipl_display_common::SERVICE_UUID]
76            .into_iter()
77            .collect(),
78        manufacturer_data,
79        discoverable: Some(true),
80        local_name: Some(lipl_display_common::LOCAL_NAME.to_owned()),
81        tx_power: Some(8),
82        ..Default::default()
83    };
84    let handle = adapter.advertise(le_advertisement).await?;
85    Ok(handle)
86}
87
88pub struct ListenBluer {
89    sender: Option<tokio::sync::oneshot::Sender<()>>,
90    thread: Option<JoinHandle<()>>,
91}
92
93fn wait() -> impl Stream<Item = Message> {
94    futures_util::stream::once(async { Message::Command(lipl_display_common::Command::Wait) })
95}
96
97impl ListenBluer {
98    pub fn new(callback: impl Fn(Message) + Send + 'static) -> Self {
99        let (tx, mut rx) = tokio::sync::oneshot::channel::<()>();
100        let thread = std::thread::spawn(move || {
101            let runtime = tokio::runtime::Builder::new_current_thread()
102                .enable_all()
103                .build()
104                .expect("Unable to create tokio runtime");
105
106            runtime.block_on(async move {
107                let mut s = wait().chain(
108                        listen_stream()
109                        .await
110                        .expect("Failed to start Gatt peripheral")    
111                    )
112                    .boxed();
113                loop {
114                    tokio::select! {
115                        option_message = s.next() => {
116                            match option_message {
117                                Some(message) => {
118                                    callback(message.clone());
119                                }
120                                None => break,
121                            }
122                        }
123                        received = &mut rx => {
124                            match received {
125                                Ok(_) => {
126                                    break;
127                                },
128                                Err(error) => {
129                                    log::error!("Error receiving signal to quit background thread: {}", error);
130                                    break;
131                                },
132                            }
133                        }
134                    }
135                }
136            });
137            log::info!("Background thread almost finished");
138        });
139        ListenBluer {
140            sender: Some(tx),
141            thread: Some(thread),
142        }
143    }
144}
145
146impl BackgroundThread for ListenBluer {
147    fn stop(&mut self) {
148        if let Some(tx) = self.sender.take() {
149            match tx.send(()) {
150                Ok(_) => {
151                    if let Some(thread) = self.thread.take() {
152                        match thread.join() {
153                            Ok(_) => {
154                                std::thread::sleep(Duration::from_secs(1));
155                                trace!("Finished sleeping for 1 second");
156                            }
157                            Err(_) => {
158                                error!("Error joining background thread");
159                            }
160                        }
161                    }
162                }
163                Err(_) => {
164                    error!("Error sending signal to background thread");
165                }
166            }
167        }
168    }
169}
170
171/// Used in flutter version
172pub async fn listen_stream() -> Result<MessageStream> {
173    let (values_tx, values_rx) = mpsc::channel::<Message>(100);
174
175    let session = bluer::Session::new().await?;
176    let adapter = session.default_adapter().await?;
177    trace!("Bluetooth adapter {} found", adapter.name());
178    let capabilities = adapter.supported_advertising_capabilities().await?;
179    if let Some(caps) = capabilities {
180        trace!(
181            "max advertisement length: {}",
182            caps.max_advertisement_length
183        );
184        trace!(
185            "max scan reponse length : {}",
186            caps.max_scan_response_length
187        );
188        trace!("max tx power: {}", caps.max_tx_power);
189        trace!("min tx power: {}", caps.min_tx_power);
190    }
191
192    let adv_handle = advertise(&adapter).await?;
193    trace!("Advertising started");
194    let uuid: Uuid = lipl_display_common::SERVICE_UUID;
195    let primary: bool = true;
196    let characteristics: Vec<Characteristic> = [
197        lipl_display_common::CHARACTERISTIC_TEXT_UUID,
198        lipl_display_common::CHARACTERISTIC_STATUS_UUID,
199        lipl_display_common::CHARACTERISTIC_COMMAND_UUID,
200    ]
201    .into_iter()
202    .map(|c| (c, Arc::new(Mutex::new(vec![]))))
203    .map(|v| characteristic::write_no_response_characteristic(v.0, v.1, values_tx.clone()))
204    .collect();
205
206    let app = Application {
207        services: vec![Service {
208            uuid,
209            primary,
210            characteristics,
211            ..Default::default()
212        }],
213        ..Default::default()
214    };
215
216    let app_handle = adapter.serve_gatt_application(app).await?;
217
218    Ok(MessageStream {
219        values_tx,
220        values_rx,
221        adv_handle: Some(adv_handle),
222        app_handle: Some(app_handle),
223    })
224}