1#![doc = include_str!(concat!(env!("CARGO_MANIFEST_DIR"), "/README.md"))]
2
3use std::{collections::BTreeMap, sync::Arc, thread::JoinHandle, time::Duration};
4
5use bluer::{
6 adv::{Advertisement, AdvertisementHandle},
7 gatt::local::{Application, ApplicationHandle, Characteristic, Service},
8 Uuid,
9};
10use lipl_display_common::{BackgroundThread, Message};
11
12use futures_channel::mpsc;
13use futures_util::{Stream, StreamExt};
14use log::{error, trace};
15use pin_project::{pin_project, pinned_drop};
16use std::pin::Pin;
17use tokio::sync::Mutex;
18
19mod characteristic;
20mod error;
21
22pub use error::Error;
23pub type Result<T> = std::result::Result<T, Error>;
24
25#[pin_project(PinnedDrop)]
26pub struct MessageStream {
27 values_tx: mpsc::Sender<Message>,
28 #[pin]
29 values_rx: mpsc::Receiver<Message>,
30 adv_handle: Option<AdvertisementHandle>,
31 app_handle: Option<ApplicationHandle>,
32}
33
34#[pinned_drop]
35impl PinnedDrop for MessageStream {
36 fn drop(self: Pin<&mut Self>) {
37 let this = self.project();
38 if let Some(handle) = this.adv_handle.take() {
39 drop(handle);
40 trace!("Handle dropped for Advertisement");
41 };
42 if let Some(handle) = this.app_handle.take() {
43 drop(handle);
44 trace!("Handle dropped for Application");
45 };
46 }
47}
48
49impl Stream for MessageStream {
50 type Item = Message;
51 fn poll_next(
52 self: std::pin::Pin<&mut Self>,
53 cx: &mut std::task::Context<'_>,
54 ) -> std::task::Poll<Option<Self::Item>> {
55 self.project().values_rx.poll_next(cx)
56 }
57}
58
59pub fn create_runtime() -> Result<tokio::runtime::Runtime> {
61 tokio::runtime::Builder::new_current_thread()
62 .enable_all()
63 .build()
64 .map_err(|_| lipl_display_common::Error::Runtime)
65 .map_err(Error::Common)
66}
67
68async fn advertise(adapter: &bluer::Adapter) -> Result<AdvertisementHandle> {
69 let mut manufacturer_data = BTreeMap::new();
70 manufacturer_data.insert(
71 lipl_display_common::MANUFACTURER_ID,
72 vec![0x21, 0x22, 0x23, 0x24],
73 );
74 let le_advertisement = Advertisement {
75 service_uuids: vec![lipl_display_common::SERVICE_UUID]
76 .into_iter()
77 .collect(),
78 manufacturer_data,
79 discoverable: Some(true),
80 local_name: Some(lipl_display_common::LOCAL_NAME.to_owned()),
81 tx_power: Some(8),
82 ..Default::default()
83 };
84 let handle = adapter.advertise(le_advertisement).await?;
85 Ok(handle)
86}
87
88pub struct ListenBluer {
89 sender: Option<tokio::sync::oneshot::Sender<()>>,
90 thread: Option<JoinHandle<()>>,
91}
92
93fn wait() -> impl Stream<Item = Message> {
94 futures_util::stream::once(async { Message::Command(lipl_display_common::Command::Wait) })
95}
96
97impl ListenBluer {
98 pub fn new(callback: impl Fn(Message) + Send + 'static) -> Self {
99 let (tx, mut rx) = tokio::sync::oneshot::channel::<()>();
100 let thread = std::thread::spawn(move || {
101 let runtime = tokio::runtime::Builder::new_current_thread()
102 .enable_all()
103 .build()
104 .expect("Unable to create tokio runtime");
105
106 runtime.block_on(async move {
107 let mut s = wait().chain(
108 listen_stream()
109 .await
110 .expect("Failed to start Gatt peripheral")
111 )
112 .boxed();
113 loop {
114 tokio::select! {
115 option_message = s.next() => {
116 match option_message {
117 Some(message) => {
118 callback(message.clone());
119 }
120 None => break,
121 }
122 }
123 received = &mut rx => {
124 match received {
125 Ok(_) => {
126 break;
127 },
128 Err(error) => {
129 log::error!("Error receiving signal to quit background thread: {}", error);
130 break;
131 },
132 }
133 }
134 }
135 }
136 });
137 log::info!("Background thread almost finished");
138 });
139 ListenBluer {
140 sender: Some(tx),
141 thread: Some(thread),
142 }
143 }
144}
145
146impl BackgroundThread for ListenBluer {
147 fn stop(&mut self) {
148 if let Some(tx) = self.sender.take() {
149 match tx.send(()) {
150 Ok(_) => {
151 if let Some(thread) = self.thread.take() {
152 match thread.join() {
153 Ok(_) => {
154 std::thread::sleep(Duration::from_secs(1));
155 trace!("Finished sleeping for 1 second");
156 }
157 Err(_) => {
158 error!("Error joining background thread");
159 }
160 }
161 }
162 }
163 Err(_) => {
164 error!("Error sending signal to background thread");
165 }
166 }
167 }
168 }
169}
170
171pub async fn listen_stream() -> Result<MessageStream> {
173 let (values_tx, values_rx) = mpsc::channel::<Message>(100);
174
175 let session = bluer::Session::new().await?;
176 let adapter = session.default_adapter().await?;
177 trace!("Bluetooth adapter {} found", adapter.name());
178 let capabilities = adapter.supported_advertising_capabilities().await?;
179 if let Some(caps) = capabilities {
180 trace!(
181 "max advertisement length: {}",
182 caps.max_advertisement_length
183 );
184 trace!(
185 "max scan reponse length : {}",
186 caps.max_scan_response_length
187 );
188 trace!("max tx power: {}", caps.max_tx_power);
189 trace!("min tx power: {}", caps.min_tx_power);
190 }
191
192 let adv_handle = advertise(&adapter).await?;
193 trace!("Advertising started");
194 let uuid: Uuid = lipl_display_common::SERVICE_UUID;
195 let primary: bool = true;
196 let characteristics: Vec<Characteristic> = [
197 lipl_display_common::CHARACTERISTIC_TEXT_UUID,
198 lipl_display_common::CHARACTERISTIC_STATUS_UUID,
199 lipl_display_common::CHARACTERISTIC_COMMAND_UUID,
200 ]
201 .into_iter()
202 .map(|c| (c, Arc::new(Mutex::new(vec![]))))
203 .map(|v| characteristic::write_no_response_characteristic(v.0, v.1, values_tx.clone()))
204 .collect();
205
206 let app = Application {
207 services: vec![Service {
208 uuid,
209 primary,
210 characteristics,
211 ..Default::default()
212 }],
213 ..Default::default()
214 };
215
216 let app_handle = adapter.serve_gatt_application(app).await?;
217
218 Ok(MessageStream {
219 values_tx,
220 values_rx,
221 adv_handle: Some(adv_handle),
222 app_handle: Some(app_handle),
223 })
224}