use crate::error::{BlewError, BlewResult};
use crate::gatt::props::CharacteristicProperties;
use crate::gatt::service::GattService;
use crate::l2cap::{L2capChannel, types::Psm};
use crate::peripheral::backend::{self, PeripheralBackend};
use crate::peripheral::types::{AdvertisingConfig, PeripheralEvent, ReadResponder, WriteResponder};
use crate::platform::linux::l2cap::bridge_l2cap;
use crate::types::DeviceId;
use bluer::adv::{Advertisement, SecondaryChannel, Type as AdvType};
use bluer::gatt::local::{
Application, ApplicationHandle, Characteristic, CharacteristicControlHandle,
CharacteristicNotifier, CharacteristicNotify, CharacteristicNotifyMethod, CharacteristicRead,
CharacteristicReadRequest, CharacteristicWrite, CharacteristicWriteMethod,
CharacteristicWriteRequest, ReqError, Service, ServiceControlHandle,
};
use bluer::{Adapter, Session};
use std::collections::HashMap;
use std::future::Future;
use std::sync::{Arc, Mutex};
use tokio::sync::mpsc;
use tokio_stream::wrappers::{ReceiverStream, UnboundedReceiverStream};
use tracing::{debug, trace, warn};
use uuid::Uuid;
type SharedNotifier = Arc<tokio::sync::Mutex<CharacteristicNotifier>>;
struct PeripheralInner {
_session: Session,
adapter: Adapter,
pending_services: Mutex<Vec<GattService>>,
adv_handle: Mutex<Option<bluer::adv::AdvertisementHandle>>,
app_handle: Mutex<Option<ApplicationHandle>>,
notifiers: Mutex<HashMap<Uuid, Vec<SharedNotifier>>>,
event_tx: Arc<Mutex<Option<mpsc::UnboundedSender<PeripheralEvent>>>>,
_adapter_task: tokio::task::JoinHandle<()>,
}
pub struct LinuxPeripheral(Arc<PeripheralInner>);
impl backend::private::Sealed for LinuxPeripheral {}
fn emit(inner: &Arc<PeripheralInner>, event: PeripheralEvent) {
let guard = inner.event_tx.lock().unwrap();
if let Some(tx) = guard.as_ref() {
let _ = tx.send(event);
}
}
#[allow(clippy::too_many_lines)]
fn build_characteristic(
ch: &crate::gatt::service::GattCharacteristic,
svc_uuid: Uuid,
inner: &Arc<PeripheralInner>,
) -> Characteristic {
let uuid = ch.uuid;
let props = ch.properties;
let read = if props.contains(CharacteristicProperties::READ) {
let static_value = if ch.value.is_empty() {
None
} else {
Some(ch.value.clone())
};
let inner_r = Arc::clone(inner);
Some(CharacteristicRead {
read: true,
fun: Box::new(move |req: CharacteristicReadRequest| {
let inner_r = Arc::clone(&inner_r);
let static_value = static_value.clone();
Box::pin(async move {
if let Some(val) = static_value {
let offset = req.offset as usize;
return Ok(if offset > 0 && offset < val.len() {
val[offset..].to_vec()
} else {
val
});
}
let client_id = DeviceId(req.device_address.to_string());
let (tx, rx) = tokio::sync::oneshot::channel();
emit(
&inner_r,
PeripheralEvent::ReadRequest {
client_id,
service_uuid: svc_uuid,
char_uuid: uuid,
offset: req.offset,
responder: ReadResponder::new(tx),
},
);
match rx.await {
Ok(Ok(value)) => Ok(value),
_ => Err(ReqError::Failed),
}
})
}),
..Default::default()
})
} else {
None
};
let write = if props.intersects(
CharacteristicProperties::WRITE | CharacteristicProperties::WRITE_WITHOUT_RESPONSE,
) {
let inner_w = Arc::clone(inner);
let write_req = props.contains(CharacteristicProperties::WRITE);
let write_cmd = props.contains(CharacteristicProperties::WRITE_WITHOUT_RESPONSE);
Some(CharacteristicWrite {
write: write_req,
write_without_response: write_cmd,
method: CharacteristicWriteMethod::Fun(Box::new(
move |value: Vec<u8>, req: CharacteristicWriteRequest| {
let inner_w = Arc::clone(&inner_w);
Box::pin(async move {
let client_id = DeviceId(req.device_address.to_string());
let (responder, rx) = if req.op_type == bluer::gatt::WriteOp::Request {
let (tx, rx) = tokio::sync::oneshot::channel::<bool>();
(Some(WriteResponder::new(tx)), Some(rx))
} else {
(None, None)
};
emit(
&inner_w,
PeripheralEvent::WriteRequest {
client_id,
service_uuid: svc_uuid,
char_uuid: uuid,
value,
responder,
},
);
if let Some(rx) = rx {
match rx.await {
Ok(true) => Ok(()),
_ => Err(ReqError::Failed),
}
} else {
Ok(())
}
})
},
)),
..Default::default()
})
} else {
None
};
let notify = if props.contains(CharacteristicProperties::NOTIFY) {
let inner_n = Arc::clone(inner);
Some(CharacteristicNotify {
notify: true,
method: CharacteristicNotifyMethod::Fun(Box::new(
move |notifier: CharacteristicNotifier| {
let inner_n = Arc::clone(&inner_n);
Box::pin(async move {
inner_n
.notifiers
.lock()
.unwrap()
.entry(uuid)
.or_default()
.push(Arc::new(tokio::sync::Mutex::new(notifier)));
emit(
&inner_n,
PeripheralEvent::SubscriptionChanged {
client_id: DeviceId(String::new()),
char_uuid: uuid,
subscribed: true,
},
);
})
},
)),
..Default::default()
})
} else {
None
};
Characteristic {
uuid,
handle: None,
broadcast: false,
writable_auxiliaries: false,
authorize: false,
descriptors: vec![],
read,
write,
notify,
control_handle: CharacteristicControlHandle::default(),
_non_exhaustive: (),
}
}
impl PeripheralBackend for LinuxPeripheral {
type EventStream = UnboundedReceiverStream<PeripheralEvent>;
async fn new() -> BlewResult<Self>
where
Self: Sized,
{
let session = Session::new().await.map_err(|e| BlewError::Peripheral {
source: Box::new(e),
})?;
let adapter = session
.default_adapter()
.await
.map_err(|_| BlewError::AdapterNotFound)?;
debug!(adapter = %adapter.name(), "BLE adapter initialized");
let event_tx: Arc<Mutex<Option<mpsc::UnboundedSender<PeripheralEvent>>>> =
Arc::new(Mutex::new(None));
let event_tx_clone = Arc::clone(&event_tx);
let adapter_clone = adapter.clone();
let adapter_task = tokio::spawn(async move {
use tokio_stream::StreamExt as _;
let Ok(events) = adapter_clone.events().await else {
warn!("failed to subscribe to adapter events");
return;
};
let mut events = Box::pin(events);
while let Some(event) = events.next().await {
if let bluer::AdapterEvent::PropertyChanged(bluer::AdapterProperty::Powered(
powered,
)) = event
{
debug!(powered, "peripheral adapter state changed");
let guard = event_tx_clone.lock().unwrap();
if let Some(tx) = guard.as_ref() {
let _ = tx.send(PeripheralEvent::AdapterStateChanged { powered });
}
}
}
});
Ok(LinuxPeripheral(Arc::new(PeripheralInner {
_session: session,
adapter,
pending_services: Mutex::new(Vec::new()),
adv_handle: Mutex::new(None),
app_handle: Mutex::new(None),
notifiers: Mutex::new(HashMap::new()),
event_tx,
_adapter_task: adapter_task,
})))
}
fn is_powered(&self) -> impl Future<Output = BlewResult<bool>> + Send {
let handle = Arc::clone(&self.0);
async move {
handle
.adapter
.is_powered()
.await
.map_err(|e| BlewError::Peripheral {
source: Box::new(e),
})
}
}
fn add_service(&self, service: &GattService) -> impl Future<Output = BlewResult<()>> + Send {
let handle = Arc::clone(&self.0);
let service = service.clone();
async move {
debug!(service_uuid = %service.uuid, characteristics = service.characteristics.len(), "queuing GATT service");
handle.pending_services.lock().unwrap().push(service);
Ok(())
}
}
fn start_advertising(
&self,
config: &AdvertisingConfig,
) -> impl Future<Output = BlewResult<()>> + Send {
let handle = Arc::clone(&self.0);
let config = config.clone();
async move {
if handle.adv_handle.lock().unwrap().is_some() {
return Err(BlewError::AlreadyAdvertising);
}
debug!(local_name = %config.local_name, "starting advertising");
let pending: Vec<GattService> = handle.pending_services.lock().unwrap().clone();
let bluer_services: Vec<Service> = pending
.iter()
.map(|svc| {
let chars = svc
.characteristics
.iter()
.map(|ch| build_characteristic(ch, svc.uuid, &handle))
.collect();
Service {
uuid: svc.uuid,
handle: None,
primary: svc.primary,
characteristics: chars,
control_handle: ServiceControlHandle::default(),
_non_exhaustive: (),
}
})
.collect();
let app = Application {
services: bluer_services,
_non_exhaustive: (),
};
let app_handle = handle
.adapter
.serve_gatt_application(app)
.await
.map_err(|e| BlewError::Peripheral {
source: Box::new(e),
})?;
*handle.app_handle.lock().unwrap() = Some(app_handle);
let make_adv = |secondary_channel| Advertisement {
advertisement_type: AdvType::Peripheral,
local_name: Some(config.local_name.clone()),
service_uuids: config.service_uuids.clone().into_iter().collect(),
secondary_channel,
..Default::default()
};
let adv_handle = match handle
.adapter
.advertise(make_adv(Some(SecondaryChannel::TwoM)))
.await
{
Ok(h) => {
debug!("advertising started (BLE 5 extended)");
h
}
Err(e) => {
warn!(error = %e, "BLE 5 extended advertising unavailable, falling back to legacy");
let h = handle
.adapter
.advertise(make_adv(None))
.await
.map_err(|e| BlewError::Peripheral {
source: Box::new(e),
})?;
debug!("advertising started (legacy)");
h
}
};
*handle.adv_handle.lock().unwrap() = Some(adv_handle);
Ok(())
}
}
fn stop_advertising(&self) -> impl Future<Output = BlewResult<()>> + Send {
let handle = Arc::clone(&self.0);
async move {
debug!("stopping advertising");
handle.adv_handle.lock().unwrap().take();
handle.app_handle.lock().unwrap().take();
handle.notifiers.lock().unwrap().clear();
Ok(())
}
}
fn notify_characteristic(
&self,
_device_id: &crate::types::DeviceId,
char_uuid: Uuid,
value: Vec<u8>,
) -> impl Future<Output = BlewResult<()>> + Send {
let handle = Arc::clone(&self.0);
async move {
trace!(%char_uuid, len = value.len(), "notifying characteristic");
let arcs: Vec<SharedNotifier> = handle
.notifiers
.lock()
.unwrap()
.get(&char_uuid)
.cloned()
.unwrap_or_default();
let mut any_stopped = false;
for arc in arcs {
let mut notifier = arc.lock().await;
if notifier.is_stopped() {
any_stopped = true;
continue;
}
let _ = notifier.notify(value.clone()).await;
}
if any_stopped {
handle
.notifiers
.lock()
.unwrap()
.entry(char_uuid)
.and_modify(|v| {
v.retain(|arc| !arc.try_lock().map_or(true, |n| n.is_stopped()));
});
}
Ok(())
}
}
async fn l2cap_listener(
&self,
) -> BlewResult<(
Psm,
impl futures_core::Stream<Item = BlewResult<(DeviceId, L2capChannel)>> + Send + 'static,
)> {
debug!("starting L2CAP CoC listener");
let socket = bluer::l2cap::Socket::new_stream().map_err(|e| BlewError::L2cap {
source: Box::new(e),
})?;
socket
.set_security(bluer::l2cap::Security {
level: bluer::l2cap::SecurityLevel::Low,
key_size: 0,
})
.map_err(|e| BlewError::L2cap {
source: Box::new(e),
})?;
socket.set_recv_mtu(65535).map_err(|e| BlewError::L2cap {
source: Box::new(e),
})?;
socket
.bind(bluer::l2cap::SocketAddr::any_le())
.map_err(|e| BlewError::L2cap {
source: Box::new(e),
})?;
let listener = socket.listen(1).map_err(|e| BlewError::L2cap {
source: Box::new(e),
})?;
let local_addr = listener
.as_ref()
.local_addr()
.map_err(|e| BlewError::L2cap {
source: Box::new(e),
})?;
let psm = Psm(local_addr.psm);
debug!(psm = psm.0, "L2CAP listener ready");
let (tx, rx) = mpsc::channel::<BlewResult<(DeviceId, L2capChannel)>>(16);
tokio::spawn(async move {
loop {
match listener.accept().await {
Ok((stream, addr)) => {
debug!(peer = ?addr, "incoming L2CAP connection accepted");
let device_id = DeviceId(addr.addr.to_string());
if tx
.send(Ok((device_id, bridge_l2cap(stream))))
.await
.is_err()
{
break;
}
}
Err(e) => {
warn!(error = %e, "L2CAP accept error");
let _ = tx
.send(Err(BlewError::L2cap {
source: Box::new(e),
}))
.await;
break;
}
}
}
});
Ok((psm, ReceiverStream::new(rx)))
}
fn events(&self) -> Self::EventStream {
let (tx, rx) = mpsc::unbounded_channel();
*self.0.event_tx.lock().unwrap() = Some(tx);
UnboundedReceiverStream::new(rx)
}
}