blew 0.1.0

Cross-platform async BLE library for Rust (Apple, Linux, Android)
Documentation
use std::future::Future;
use std::sync::OnceLock;

use jni::objects::{JObject, JObjectArray};
use jni::{jni_sig, jni_str};
use parking_lot::Mutex;
use tokio::sync::{mpsc, oneshot};
use tokio_stream::wrappers::{ReceiverStream, UnboundedReceiverStream};
use tracing::debug;
use uuid::Uuid;

use crate::error::{BlewError, BlewResult};
use crate::gatt::props::CharacteristicProperties;
use crate::gatt::service::GattService;
use crate::l2cap::{L2capChannel, types::Psm};
use crate::peripheral::backend::{self, PeripheralBackend};
use crate::peripheral::types::{AdvertisingConfig, PeripheralEvent};
use crate::types::DeviceId;

use super::jni_globals::{jvm, peripheral_class};

struct PeripheralState {
    event_tx: Mutex<mpsc::UnboundedSender<PeripheralEvent>>,
}

static STATE: OnceLock<PeripheralState> = OnceLock::new();

fn state() -> &'static PeripheralState {
    STATE.get().expect("AndroidPeripheral not initialized")
}

pub(crate) fn send_event(event: PeripheralEvent) {
    if let Some(s) = STATE.get() {
        let _ = s.event_tx.lock().send(event);
    }
}

pub struct AndroidPeripheral;

impl backend::private::Sealed for AndroidPeripheral {}

impl PeripheralBackend for AndroidPeripheral {
    type EventStream = UnboundedReceiverStream<PeripheralEvent>;

    fn new() -> impl Future<Output = BlewResult<Self>> + Send
    where
        Self: Sized,
    {
        async {
            let (tx, _rx) = mpsc::unbounded_channel();
            let _ = STATE.set(PeripheralState {
                event_tx: Mutex::new(tx),
            });
            debug!("AndroidPeripheral initialized");
            Ok(AndroidPeripheral)
        }
    }

    fn is_powered(&self) -> impl Future<Output = BlewResult<bool>> + Send {
        async {
            jvm()
                .attach_current_thread(|env| {
                    let result = env.call_static_method(
                        peripheral_class(),
                        jni_str!("isPowered"),
                        jni_sig!("()Z"),
                        &[],
                    )?;
                    Ok(result.z()?)
                })
                .map_err(jni_err)
        }
    }

    fn add_service(&self, service: &GattService) -> impl Future<Output = BlewResult<()>> + Send {
        let service = service.clone();
        async move {
            jvm()
                .attach_current_thread(|env| {
                    let service_uuid = env.new_string(service.uuid.to_string())?;

                    let n = service.characteristics.len();

                    let string_class = env.find_class(jni_str!("java/lang/String"))?;
                    let byte_array_class = env.find_class(jni_str!("[B"))?;

                    let char_uuids: JObjectArray =
                        env.new_object_array(n as i32, &string_class, &JObject::null())?;
                    let char_values: JObjectArray =
                        env.new_object_array(n as i32, &byte_array_class, &JObject::null())?;
                    let mut props_arr = vec![0i32; n];
                    let mut perms_arr = vec![0i32; n];

                    for (i, ch) in service.characteristics.iter().enumerate() {
                        let uuid_str = env.new_string(ch.uuid.to_string())?;
                        char_uuids.set_element(env, i, &uuid_str)?;

                        props_arr[i] = blew_props_to_android(ch.properties);
                        perms_arr[i] = blew_perms_to_android(ch.permissions);

                        let value = env.byte_array_from_slice(&ch.value)?;
                        char_values.set_element(env, i, &value)?;
                    }

                    let j_props = env.new_int_array(n)?;
                    j_props.set_region(env, 0, &props_arr)?;

                    let j_perms = env.new_int_array(n)?;
                    j_perms.set_region(env, 0, &perms_arr)?;

                    env.call_static_method(
                        peripheral_class(),
                        jni_str!("addService"),
                        jni_sig!("(Ljava/lang/String;[Ljava/lang/String;[I[I[[B)V"),
                        &[
                            (&service_uuid).into(),
                            (&char_uuids).into(),
                            (&j_props).into(),
                            (&j_perms).into(),
                            (&char_values).into(),
                        ],
                    )?;

                    Ok(())
                })
                .map_err(jni_err)?;

            debug!(uuid = %service.uuid, "added GATT service");
            Ok(())
        }
    }

    fn start_advertising(
        &self,
        config: &AdvertisingConfig,
    ) -> impl Future<Output = BlewResult<()>> + Send {
        let config = config.clone();
        async move {
            jvm()
                .attach_current_thread(|env| {
                    let name = env.new_string(&config.local_name)?;

                    let string_class = env.find_class(jni_str!("java/lang/String"))?;
                    let uuids: JObjectArray = env.new_object_array(
                        config.service_uuids.len() as i32,
                        &string_class,
                        &JObject::null(),
                    )?;
                    for (i, uuid) in config.service_uuids.iter().enumerate() {
                        let s = env.new_string(uuid.to_string())?;
                        uuids.set_element(env, i, &s)?;
                    }

                    env.call_static_method(
                        peripheral_class(),
                        jni_str!("startAdvertising"),
                        jni_sig!("(Ljava/lang/String;[Ljava/lang/String;)V"),
                        &[(&name).into(), (&uuids).into()],
                    )?;

                    Ok(())
                })
                .map_err(jni_err)?;

            debug!("advertising started");
            Ok(())
        }
    }

    fn stop_advertising(&self) -> impl Future<Output = BlewResult<()>> + Send {
        async {
            jvm()
                .attach_current_thread(|env| {
                    env.call_static_method(
                        peripheral_class(),
                        jni_str!("stopAdvertising"),
                        jni_sig!("()V"),
                        &[],
                    )?;
                    Ok(())
                })
                .map_err(jni_err)?;
            Ok(())
        }
    }

    fn notify_characteristic(
        &self,
        device_id: &DeviceId,
        char_uuid: Uuid,
        value: Vec<u8>,
    ) -> impl Future<Output = BlewResult<()>> + Send {
        let device_addr = device_id.as_str().to_owned();
        async move {
            // Retry loop: Kotlin returns 1 ("busy") when the previous
            // notification hasn't completed yet (onNotificationSent pending).
            // We retry with async sleep so we don't block the tokio thread.
            for attempt in 0..50u32 {
                let status: i32 = jvm()
                    .attach_current_thread(|env| {
                        let addr_str = env.new_string(&device_addr)?;
                        let uuid_str = env.new_string(char_uuid.to_string())?;
                        let j_value = env.byte_array_from_slice(&value)?;

                        let ret = env.call_static_method(
                            peripheral_class(),
                            jni_str!("notifyCharacteristic"),
                            jni_sig!("(Ljava/lang/String;Ljava/lang/String;[B)I"),
                            &[(&addr_str).into(), (&uuid_str).into(), (&j_value).into()],
                        )?;
                        ret.i()
                    })
                    .map_err(jni_err)?;

                match status {
                    0 => return Ok(()), // success
                    1 => {
                        // Busy -- previous notification still in flight.
                        // Yield to tokio and retry after a short delay.
                        if attempt < 49 {
                            tokio::time::sleep(std::time::Duration::from_millis(5)).await;
                        }
                    }
                    2 => return Ok(()), // no subscribers -- not an error
                    3 => {
                        return Err(BlewError::LocalCharacteristicNotFound { char_uuid });
                    }
                    other => {
                        return Err(BlewError::Peripheral {
                            source: format!("notify returned unknown status {other}").into(),
                        });
                    }
                }
            }
            // Exhausted retries -- treat as transient error.
            Err(BlewError::Peripheral {
                source: "notification busy after retries".into(),
            })
        }
    }

    fn l2cap_listener(
        &self,
    ) -> impl Future<
        Output = BlewResult<(
            Psm,
            impl futures_core::Stream<Item = BlewResult<(DeviceId, L2capChannel)>> + Send + 'static,
        )>,
    > + Send {
        async {
            let (psm_tx, psm_rx) = oneshot::channel();
            super::l2cap_state::set_pending_server(psm_tx);

            let (accept_tx, accept_rx) = mpsc::channel(16);
            super::l2cap_state::set_accept_tx(accept_tx);

            jvm()
                .attach_current_thread(|env| {
                    env.call_static_method(
                        peripheral_class(),
                        jni_str!("openL2capServer"),
                        jni_sig!("()V"),
                        &[],
                    )?;
                    Ok(())
                })
                .map_err(jni_err)?;

            let psm = psm_rx
                .await
                .map_err(|_| BlewError::Internal("L2CAP server open cancelled".into()))??;

            Ok((psm, ReceiverStream::new(accept_rx)))
        }
    }

    fn events(&self) -> Self::EventStream {
        // Only the most recent subscriber receives events.
        let (tx, rx) = mpsc::unbounded_channel();
        *state().event_tx.lock() = tx;
        UnboundedReceiverStream::new(rx)
    }
}

fn jni_err(e: jni::errors::Error) -> BlewError {
    BlewError::Internal(format!("JNI error: {e}"))
}

/// Convert blew CharacteristicProperties to Android BluetoothGattCharacteristic property bits.
fn blew_props_to_android(props: CharacteristicProperties) -> i32 {
    let mut out = 0i32;
    if props.contains(CharacteristicProperties::BROADCAST) {
        out |= 0x01; // PROPERTY_BROADCAST
    }
    if props.contains(CharacteristicProperties::READ) {
        out |= 0x02; // PROPERTY_READ
    }
    if props.contains(CharacteristicProperties::WRITE_WITHOUT_RESPONSE) {
        out |= 0x04; // PROPERTY_WRITE_NO_RESPONSE
    }
    if props.contains(CharacteristicProperties::WRITE) {
        out |= 0x08; // PROPERTY_WRITE
    }
    if props.contains(CharacteristicProperties::NOTIFY) {
        out |= 0x10; // PROPERTY_NOTIFY
    }
    if props.contains(CharacteristicProperties::INDICATE) {
        out |= 0x20; // PROPERTY_INDICATE
    }
    out
}

/// Convert blew AttributePermissions to Android permission bits.
fn blew_perms_to_android(perms: crate::gatt::props::AttributePermissions) -> i32 {
    use crate::gatt::props::AttributePermissions;
    let mut out = 0i32;
    if perms.contains(AttributePermissions::READ) {
        out |= 0x01; // PERMISSION_READ
    }
    if perms.contains(AttributePermissions::WRITE) {
        out |= 0x10; // PERMISSION_WRITE
    }
    if perms.contains(AttributePermissions::READ_ENCRYPTED) {
        out |= 0x02; // PERMISSION_READ_ENCRYPTED
    }
    if perms.contains(AttributePermissions::WRITE_ENCRYPTED) {
        out |= 0x20; // PERMISSION_WRITE_ENCRYPTED
    }
    out
}