use std::future::Future;
use std::sync::OnceLock;
use jni::objects::{JObject, JObjectArray};
use jni::{jni_sig, jni_str};
use parking_lot::Mutex;
use tokio::sync::{mpsc, oneshot};
use tokio_stream::wrappers::{ReceiverStream, UnboundedReceiverStream};
use tracing::debug;
use uuid::Uuid;
use crate::error::{BlewError, BlewResult};
use crate::gatt::props::CharacteristicProperties;
use crate::gatt::service::GattService;
use crate::l2cap::{L2capChannel, types::Psm};
use crate::peripheral::backend::{self, PeripheralBackend};
use crate::peripheral::types::{AdvertisingConfig, PeripheralEvent};
use super::jni_globals::{jvm, peripheral_class};
struct PeripheralState {
event_tx: Mutex<mpsc::UnboundedSender<PeripheralEvent>>,
}
static STATE: OnceLock<PeripheralState> = OnceLock::new();
fn state() -> &'static PeripheralState {
STATE.get().expect("AndroidPeripheral not initialized")
}
pub(crate) fn send_event(event: PeripheralEvent) {
if let Some(s) = STATE.get() {
let _ = s.event_tx.lock().send(event);
}
}
pub struct AndroidPeripheral;
impl backend::private::Sealed for AndroidPeripheral {}
impl PeripheralBackend for AndroidPeripheral {
type EventStream = UnboundedReceiverStream<PeripheralEvent>;
fn new() -> impl Future<Output = BlewResult<Self>> + Send
where
Self: Sized,
{
async {
let (tx, _rx) = mpsc::unbounded_channel();
let _ = STATE.set(PeripheralState {
event_tx: Mutex::new(tx),
});
debug!("AndroidPeripheral initialized");
Ok(AndroidPeripheral)
}
}
fn is_powered(&self) -> impl Future<Output = BlewResult<bool>> + Send {
async {
jvm()
.attach_current_thread(|env| {
let result = env.call_static_method(
peripheral_class(),
jni_str!("isPowered"),
jni_sig!("()Z"),
&[],
)?;
Ok(result.z()?)
})
.map_err(jni_err)
}
}
fn add_service(&self, service: &GattService) -> impl Future<Output = BlewResult<()>> + Send {
let service = service.clone();
async move {
jvm()
.attach_current_thread(|env| {
let service_uuid = env.new_string(service.uuid.to_string())?;
let n = service.characteristics.len();
let string_class = env.find_class(jni_str!("java/lang/String"))?;
let byte_array_class = env.find_class(jni_str!("[B"))?;
let char_uuids: JObjectArray =
env.new_object_array(n as i32, &string_class, &JObject::null())?;
let char_values: JObjectArray =
env.new_object_array(n as i32, &byte_array_class, &JObject::null())?;
let mut props_arr = vec![0i32; n];
let mut perms_arr = vec![0i32; n];
for (i, ch) in service.characteristics.iter().enumerate() {
let uuid_str = env.new_string(ch.uuid.to_string())?;
char_uuids.set_element(env, i, &uuid_str)?;
props_arr[i] = blew_props_to_android(ch.properties);
perms_arr[i] = blew_perms_to_android(ch.permissions);
let value = env.byte_array_from_slice(&ch.value)?;
char_values.set_element(env, i, &value)?;
}
let j_props = env.new_int_array(n)?;
j_props.set_region(env, 0, &props_arr)?;
let j_perms = env.new_int_array(n)?;
j_perms.set_region(env, 0, &perms_arr)?;
env.call_static_method(
peripheral_class(),
jni_str!("addService"),
jni_sig!("(Ljava/lang/String;[Ljava/lang/String;[I[I[[B)V"),
&[
(&service_uuid).into(),
(&char_uuids).into(),
(&j_props).into(),
(&j_perms).into(),
(&char_values).into(),
],
)?;
Ok(())
})
.map_err(jni_err)?;
debug!(uuid = %service.uuid, "added GATT service");
Ok(())
}
}
fn start_advertising(
&self,
config: &AdvertisingConfig,
) -> impl Future<Output = BlewResult<()>> + Send {
let config = config.clone();
async move {
jvm()
.attach_current_thread(|env| {
let name = env.new_string(&config.local_name)?;
let string_class = env.find_class(jni_str!("java/lang/String"))?;
let uuids: JObjectArray = env.new_object_array(
config.service_uuids.len() as i32,
&string_class,
&JObject::null(),
)?;
for (i, uuid) in config.service_uuids.iter().enumerate() {
let s = env.new_string(uuid.to_string())?;
uuids.set_element(env, i, &s)?;
}
env.call_static_method(
peripheral_class(),
jni_str!("startAdvertising"),
jni_sig!("(Ljava/lang/String;[Ljava/lang/String;)V"),
&[(&name).into(), (&uuids).into()],
)?;
Ok(())
})
.map_err(jni_err)?;
debug!("advertising started");
Ok(())
}
}
fn stop_advertising(&self) -> impl Future<Output = BlewResult<()>> + Send {
async {
jvm()
.attach_current_thread(|env| {
env.call_static_method(
peripheral_class(),
jni_str!("stopAdvertising"),
jni_sig!("()V"),
&[],
)?;
Ok(())
})
.map_err(jni_err)?;
Ok(())
}
}
fn notify_characteristic(
&self,
char_uuid: Uuid,
value: Vec<u8>,
) -> impl Future<Output = BlewResult<()>> + Send {
async move {
for attempt in 0..50u32 {
let status: i32 = jvm()
.attach_current_thread(|env| {
let uuid_str = env.new_string(char_uuid.to_string())?;
let j_value = env.byte_array_from_slice(&value)?;
let ret = env.call_static_method(
peripheral_class(),
jni_str!("notifyCharacteristic"),
jni_sig!("(Ljava/lang/String;[B)I"),
&[(&uuid_str).into(), (&j_value).into()],
)?;
ret.i()
})
.map_err(jni_err)?;
match status {
0 => return Ok(()), 1 => {
if attempt < 49 {
tokio::time::sleep(std::time::Duration::from_millis(5)).await;
}
}
2 => return Ok(()), 3 => {
return Err(BlewError::LocalCharacteristicNotFound { char_uuid });
}
other => {
return Err(BlewError::Peripheral {
source: format!("notify returned unknown status {other}").into(),
});
}
}
}
Err(BlewError::Peripheral {
source: "notification busy after retries".into(),
})
}
}
fn l2cap_listener(
&self,
) -> impl Future<
Output = BlewResult<(
Psm,
impl futures_core::Stream<Item = BlewResult<L2capChannel>> + Send + 'static,
)>,
> + Send {
async {
let (psm_tx, psm_rx) = oneshot::channel();
super::l2cap_state::set_pending_server(psm_tx);
let (accept_tx, accept_rx) = mpsc::channel(16);
super::l2cap_state::set_accept_tx(accept_tx);
jvm()
.attach_current_thread(|env| {
env.call_static_method(
peripheral_class(),
jni_str!("openL2capServer"),
jni_sig!("()V"),
&[],
)?;
Ok(())
})
.map_err(jni_err)?;
let psm = psm_rx
.await
.map_err(|_| BlewError::Internal("L2CAP server open cancelled".into()))??;
Ok((psm, ReceiverStream::new(accept_rx)))
}
}
fn events(&self) -> Self::EventStream {
let (tx, rx) = mpsc::unbounded_channel();
*state().event_tx.lock() = tx;
UnboundedReceiverStream::new(rx)
}
}
fn jni_err(e: jni::errors::Error) -> BlewError {
BlewError::Internal(format!("JNI error: {e}"))
}
fn blew_props_to_android(props: CharacteristicProperties) -> i32 {
let mut out = 0i32;
if props.contains(CharacteristicProperties::BROADCAST) {
out |= 0x01; }
if props.contains(CharacteristicProperties::READ) {
out |= 0x02; }
if props.contains(CharacteristicProperties::WRITE_WITHOUT_RESPONSE) {
out |= 0x04; }
if props.contains(CharacteristicProperties::WRITE) {
out |= 0x08; }
if props.contains(CharacteristicProperties::NOTIFY) {
out |= 0x10; }
if props.contains(CharacteristicProperties::INDICATE) {
out |= 0x20; }
out
}
fn blew_perms_to_android(perms: crate::gatt::props::AttributePermissions) -> i32 {
use crate::gatt::props::AttributePermissions;
let mut out = 0i32;
if perms.contains(AttributePermissions::READ) {
out |= 0x01; }
if perms.contains(AttributePermissions::WRITE) {
out |= 0x10; }
if perms.contains(AttributePermissions::READ_ENCRYPTED) {
out |= 0x02; }
if perms.contains(AttributePermissions::WRITE_ENCRYPTED) {
out |= 0x20; }
out
}