blew 0.3.0

Cross-platform async BLE library for Rust (Apple, Linux, Android)
Documentation
use jni::objects::{JObject, JObjectArray};
use jni::{jni_sig, jni_str};
use parking_lot::Mutex;
use tokio::sync::{broadcast, mpsc, oneshot};
use tokio_stream::wrappers::UnboundedReceiverStream;
use tracing::debug;
use uuid::Uuid;

use crate::error::{BlewError, BlewResult};
use crate::gatt::props::CharacteristicProperties;
use crate::gatt::service::GattService;
use crate::l2cap::{L2capChannel, types::Psm};
use crate::peripheral::backend::{self, PeripheralBackend};
use crate::peripheral::types::{
    AdvertisingConfig, PeripheralConfig, PeripheralRequest, PeripheralStateEvent,
};
use crate::types::DeviceId;
use crate::util::BroadcastEventStream;

use super::jni_globals::{jvm, peripheral_class};

struct PeripheralState {
    request_tx: mpsc::UnboundedSender<PeripheralRequest>,
    request_rx: Mutex<Option<mpsc::UnboundedReceiver<PeripheralRequest>>>,
    state_tx: broadcast::Sender<PeripheralStateEvent>,
}

static STATE: Mutex<Option<PeripheralState>> = Mutex::new(None);

pub(crate) fn send_request(request: PeripheralRequest) {
    if let Some(s) = STATE.lock().as_ref() {
        let _ = s.request_tx.send(request);
    }
}

pub(crate) fn send_state_event(event: PeripheralStateEvent) {
    if let Some(s) = STATE.lock().as_ref() {
        let _ = s.state_tx.send(event);
    }
}

pub struct AndroidPeripheral;

impl AndroidPeripheral {
    pub async fn with_config(_config: PeripheralConfig) -> BlewResult<Self> {
        <Self as PeripheralBackend>::new().await
    }
}

impl backend::private::Sealed for AndroidPeripheral {}

impl PeripheralBackend for AndroidPeripheral {
    type StateEvents = BroadcastEventStream<PeripheralStateEvent>;
    type Requests = UnboundedReceiverStream<PeripheralRequest>;

    async fn new() -> BlewResult<Self>
    where
        Self: Sized,
    {
        if !super::are_ble_permissions_granted() {
            return Err(BlewError::PermissionDenied);
        }
        let (request_tx, request_rx) = mpsc::unbounded_channel();
        let (state_tx, _) = broadcast::channel(256);
        *STATE.lock() = Some(PeripheralState {
            request_tx,
            request_rx: Mutex::new(Some(request_rx)),
            state_tx,
        });
        debug!("AndroidPeripheral initialized");
        Ok(AndroidPeripheral)
    }

    async fn is_powered(&self) -> BlewResult<bool> {
        jvm()
            .attach_current_thread(|env| {
                let result = env.call_static_method(
                    peripheral_class(),
                    jni_str!("isPowered"),
                    jni_sig!("()Z"),
                    &[],
                )?;
                result.z()
            })
            .map_err(|e| jni_err(&e))
    }

    async fn add_service(&self, service: &GattService) -> BlewResult<()> {
        let service = service.clone();
        let n = service.characteristics.len();
        let n_i32 = i32::try_from(n)
            .map_err(|_| BlewError::Internal("too many characteristics for JNI".into()))?;
        jvm()
            .attach_current_thread(|env| {
                let service_uuid = env.new_string(service.uuid.to_string())?;

                let string_class = env.find_class(jni_str!("java/lang/String"))?;
                let byte_array_class = env.find_class(jni_str!("[B"))?;

                let char_uuids: JObjectArray =
                    env.new_object_array(n_i32, &string_class, JObject::null())?;
                let char_values: JObjectArray =
                    env.new_object_array(n_i32, &byte_array_class, JObject::null())?;
                let mut props_arr = vec![0_i32; n];
                let mut perms_arr = vec![0_i32; n];

                for (i, ch) in service.characteristics.iter().enumerate() {
                    let uuid_str = env.new_string(ch.uuid.to_string())?;
                    char_uuids.set_element(env, i, &uuid_str)?;

                    props_arr[i] = blew_props_to_android(ch.properties);
                    perms_arr[i] = blew_perms_to_android(ch.permissions);

                    let value = env.byte_array_from_slice(&ch.value)?;
                    char_values.set_element(env, i, &value)?;
                }

                let j_props = env.new_int_array(n)?;
                j_props.set_region(env, 0, &props_arr)?;

                let j_perms = env.new_int_array(n)?;
                j_perms.set_region(env, 0, &perms_arr)?;

                env.call_static_method(
                    peripheral_class(),
                    jni_str!("addService"),
                    jni_sig!("(Ljava/lang/String;[Ljava/lang/String;[I[I[[B)V"),
                    &[
                        (&service_uuid).into(),
                        (&char_uuids).into(),
                        (&j_props).into(),
                        (&j_perms).into(),
                        (&char_values).into(),
                    ],
                )?;

                Ok(())
            })
            .map_err(|e| jni_err(&e))?;

        debug!(uuid = %service.uuid, "added GATT service");
        Ok(())
    }

    async fn start_advertising(&self, config: &AdvertisingConfig) -> BlewResult<()> {
        let uuid_count = i32::try_from(config.service_uuids.len()).map_err(|_| {
            BlewError::Internal("too many service UUIDs in AdvertisingConfig".into())
        })?;
        jvm()
            .attach_current_thread(|env| {
                let name = env.new_string(&config.local_name)?;

                let string_class = env.find_class(jni_str!("java/lang/String"))?;
                let uuids: JObjectArray =
                    env.new_object_array(uuid_count, &string_class, JObject::null())?;
                for (i, uuid) in config.service_uuids.iter().enumerate() {
                    let s = env.new_string(uuid.to_string())?;
                    uuids.set_element(env, i, &s)?;
                }

                env.call_static_method(
                    peripheral_class(),
                    jni_str!("startAdvertising"),
                    jni_sig!("(Ljava/lang/String;[Ljava/lang/String;)V"),
                    &[(&name).into(), (&uuids).into()],
                )?;

                Ok(())
            })
            .map_err(|e| jni_err(&e))?;

        debug!("advertising started");
        Ok(())
    }

    async fn stop_advertising(&self) -> BlewResult<()> {
        jvm()
            .attach_current_thread(|env| {
                env.call_static_method(
                    peripheral_class(),
                    jni_str!("stopAdvertising"),
                    jni_sig!("()V"),
                    &[],
                )?;
                Ok(())
            })
            .map_err(|e| jni_err(&e))?;
        Ok(())
    }

    async fn notify_characteristic(
        &self,
        device_id: &DeviceId,
        char_uuid: Uuid,
        value: Vec<u8>,
    ) -> BlewResult<()> {
        let device_addr = device_id.as_str().to_owned();
        // Retry loop: Kotlin returns 1 ("busy") when the previous
        // notification hasn't completed yet (onNotificationSent pending).
        // We retry with async sleep so we don't block the tokio thread.
        for attempt in 0..50_u32 {
            let status: i32 = jvm()
                .attach_current_thread(|env| {
                    let addr_str = env.new_string(&device_addr)?;
                    let uuid_str = env.new_string(char_uuid.to_string())?;
                    let j_value = env.byte_array_from_slice(&value)?;

                    let ret = env.call_static_method(
                        peripheral_class(),
                        jni_str!("notifyCharacteristic"),
                        jni_sig!("(Ljava/lang/String;Ljava/lang/String;[B)I"),
                        &[(&addr_str).into(), (&uuid_str).into(), (&j_value).into()],
                    )?;
                    ret.i()
                })
                .map_err(|e| jni_err(&e))?;

            match status {
                // 0 = success; 2 = no subscribers (not an error).
                0 | 2 => return Ok(()),
                1 => {
                    // Busy -- previous notification still in flight.
                    // Yield to tokio and retry after a short delay.
                    if attempt < 49 {
                        tokio::time::sleep(std::time::Duration::from_millis(5)).await;
                    }
                }
                3 => {
                    return Err(BlewError::LocalCharacteristicNotFound { char_uuid });
                }
                other => {
                    return Err(BlewError::Peripheral {
                        source: format!("notify returned unknown status {other}").into(),
                    });
                }
            }
        }
        // Exhausted retries -- treat as transient error.
        Err(BlewError::Peripheral {
            source: "notification busy after retries".into(),
        })
    }

    async fn l2cap_listener(
        &self,
    ) -> BlewResult<(
        Psm,
        impl futures_core::Stream<Item = BlewResult<(DeviceId, L2capChannel)>> + Send + 'static,
    )> {
        let (psm_tx, psm_rx) = oneshot::channel();
        super::l2cap_state::set_pending_server(psm_tx);

        let (accept_tx, accept_rx) = mpsc::unbounded_channel();
        super::l2cap_state::set_accept_tx(accept_tx);

        jvm()
            .attach_current_thread(|env| {
                env.call_static_method(
                    peripheral_class(),
                    jni_str!("openL2capServer"),
                    jni_sig!("()V"),
                    &[],
                )?;
                Ok(())
            })
            .map_err(|e| jni_err(&e))?;

        let psm = psm_rx
            .await
            .map_err(|_| BlewError::Internal("L2CAP server open cancelled".into()))??;

        Ok((psm, UnboundedReceiverStream::new(accept_rx)))
    }

    fn state_events(&self) -> Self::StateEvents {
        let receiver = STATE
            .lock()
            .as_ref()
            .expect("AndroidPeripheral not initialized")
            .state_tx
            .subscribe();
        BroadcastEventStream::new(receiver)
    }

    fn take_requests(&self) -> Option<Self::Requests> {
        let rx = {
            let guard = STATE.lock();
            guard.as_ref()?.request_rx.lock().take()
        };
        rx.map(UnboundedReceiverStream::new)
    }
}

fn jni_err(e: &jni::errors::Error) -> BlewError {
    BlewError::Internal(format!("JNI error: {e}"))
}

/// Convert blew CharacteristicProperties to Android BluetoothGattCharacteristic property bits.
fn blew_props_to_android(props: CharacteristicProperties) -> i32 {
    let mut out = 0_i32;
    if props.contains(CharacteristicProperties::BROADCAST) {
        out |= 0x01; // PROPERTY_BROADCAST
    }
    if props.contains(CharacteristicProperties::READ) {
        out |= 0x02; // PROPERTY_READ
    }
    if props.contains(CharacteristicProperties::WRITE_WITHOUT_RESPONSE) {
        out |= 0x04; // PROPERTY_WRITE_NO_RESPONSE
    }
    if props.contains(CharacteristicProperties::WRITE) {
        out |= 0x08; // PROPERTY_WRITE
    }
    if props.contains(CharacteristicProperties::NOTIFY) {
        out |= 0x10; // PROPERTY_NOTIFY
    }
    if props.contains(CharacteristicProperties::INDICATE) {
        out |= 0x20; // PROPERTY_INDICATE
    }
    out
}

/// Convert blew AttributePermissions to Android permission bits.
fn blew_perms_to_android(perms: crate::gatt::props::AttributePermissions) -> i32 {
    use crate::gatt::props::AttributePermissions;
    let mut out = 0_i32;
    if perms.contains(AttributePermissions::READ) {
        out |= 0x01; // PERMISSION_READ
    }
    if perms.contains(AttributePermissions::WRITE) {
        out |= 0x10; // PERMISSION_WRITE
    }
    if perms.contains(AttributePermissions::READ_ENCRYPTED) {
        out |= 0x02; // PERMISSION_READ_ENCRYPTED
    }
    if perms.contains(AttributePermissions::WRITE_ENCRYPTED) {
        out |= 0x20; // PERMISSION_WRITE_ENCRYPTED
    }
    out
}