pub use btleplug;
use btleplug::api::{
Central, CentralEvent, Manager as _, Peripheral as _, PeripheralProperties,
ScanFilter, ValueNotification,
};
use btleplug::platform::{Adapter, Manager, PeripheralId};
use tokio_util::sync::CancellationToken;
use core::time::Duration;
pub use futures;
use futures::{stream::StreamExt, Stream};
use hubs::HubNotification;
use std::sync::Arc;
use tokio::sync::broadcast;
use tokio::sync::Mutex;
#[macro_use]
extern crate log;
use core::fmt::Debug;
use core::pin::Pin;
use num_traits::FromPrimitive;
pub mod consts;
pub mod error;
pub mod hubs;
pub mod iodevice;
pub mod notifications;
pub mod setup;
pub use crate::consts::IoTypeId;
pub use crate::iodevice::IoDevice;
pub use hubs::Hub;
use consts::{BLEManufacturerData, HubType};
pub use error::{Error, OptionContext, Result};
use notifications::{
NetworkCommand, PortOutputCommandFeedbackFormat, PortValueCombinedFormat,
PortValueSingleFormat,
};
pub type HubMutex = Arc<Mutex<Box<dyn Hub>>>;
type NotificationStream = Pin<Box<dyn Stream<Item = ValueNotification> + Send>>;
pub struct PoweredUp {
adapter: Adapter,
}
impl PoweredUp {
pub async fn adapters() -> Result<Vec<Adapter>> {
let manager = Manager::new().await?;
Ok(manager.adapters().await?)
}
pub async fn init() -> Result<Self> {
let manager = Manager::new().await?;
let adapter = manager
.adapters()
.await?
.into_iter()
.next()
.context("No adapter found")?;
Self::with_adapter(adapter).await
}
pub async fn with_device_index(index: usize) -> Result<Self> {
let manager = Manager::new().await?;
let adapter = manager
.adapters()
.await?
.into_iter()
.nth(index)
.context("No adapter found")?;
Self::with_adapter(adapter).await
}
pub async fn with_adapter(adapter: Adapter) -> Result<Self> {
Ok(Self { adapter })
}
pub async fn run(&mut self) -> Result<()> {
self.adapter.start_scan(ScanFilter::default()).await?;
Ok(())
}
pub async fn find_hub(&mut self) -> Result<Option<DiscoveredHub>> {
let hubs = self.list_discovered_hubs().await?;
Ok(hubs.into_iter().next())
}
pub async fn list_discovered_hubs(&mut self) -> Result<Vec<DiscoveredHub>> {
let peripherals = self.adapter.peripherals().await?;
let mut hubs = Vec::new();
for peripheral in peripherals {
let Some(props) = peripheral.properties().await? else {
continue;
};
if let Some(hub_type) = identify_hub(&props).await? {
hubs.push(DiscoveredHub {
hub_type,
addr: peripheral.id(),
name: props
.local_name
.unwrap_or_else(|| "unknown".to_string()),
});
}
}
Ok(hubs)
}
pub async fn wait_for_hub(&mut self) -> Result<DiscoveredHub> {
self.wait_for_hub_filter(HubFilter::Null).await
}
pub async fn wait_for_hub_filter(
&mut self,
filter: HubFilter,
) -> Result<DiscoveredHub> {
let mut events = self.adapter.events().await?;
self.adapter.start_scan(scanfilter()).await?;
while let Some(event) = events.next().await {
let CentralEvent::DeviceDiscovered(id) = event else {
continue;
};
let peripheral = self.adapter.peripheral(&id).await?;
let Some(props) = peripheral.properties().await? else {
continue;
};
if let Some(hub_type) = identify_hub(&props).await? {
let hub = DiscoveredHub {
hub_type,
addr: id,
name: props
.local_name
.unwrap_or_else(|| "unknown".to_string()),
};
if filter.matches(&hub) {
self.adapter.stop_scan().await?;
return Ok(hub);
}
}
}
panic!()
}
pub async fn wait_for_hubs_filter(
&mut self,
filter: HubFilter,
count: &u8,
) -> Result<Vec<DiscoveredHub>> {
let mut events = self.adapter.events().await?;
let mut hubs = Vec::new();
self.adapter.start_scan(scanfilter()).await?;
while let Some(event) = events.next().await {
let CentralEvent::DeviceDiscovered(id) = event else {
continue;
};
let peripheral = self.adapter.peripheral(&id).await?;
let Some(props) = peripheral.properties().await? else {
continue;
};
if let Some(hub_type) = identify_hub(&props).await? {
let hub = DiscoveredHub {
hub_type,
addr: id,
name: props
.local_name
.unwrap_or_else(|| "unknown".to_string()),
};
if filter.matches(&hub) {
hubs.push(hub);
}
if hubs.len() == *count as usize {
self.adapter.stop_scan().await?;
return Ok(hubs);
}
}
}
panic!()
}
pub async fn create_hub(
&mut self,
hub: &DiscoveredHub,
) -> Result<Box<dyn Hub>> {
info!("Connecting to hub {}...", hub.addr,);
let peripheral = self.adapter.peripheral(&hub.addr).await?;
peripheral.connect().await?;
peripheral.discover_services().await?;
let chars = peripheral.characteristics();
let lpf_char = chars
.iter()
.find(|c| c.uuid == *consts::blecharacteristic::LPF2_ALL)
.context("Device does not advertise LPF2_ALL characteristic")?
.clone();
let cancel = CancellationToken::new();
match hub.hub_type {
HubType::TechnicMediumHub
| HubType::MoveHub
| HubType::RemoteControl => Ok(Box::new(
hubs::generic_hub::GenericHub::init(
peripheral,
lpf_char,
hub.hub_type,
cancel,
)
.await?,
)),
HubType::Wedo2SmartHub
| HubType::Hub
| HubType::DuploTrainBase
| HubType::Mario => Ok(Box::new(
hubs::generic_hub::GenericHub::init(
peripheral,
lpf_char,
hub.hub_type,
cancel,
)
.await?,
)),
_ => Ok(Box::new(
hubs::generic_hub::GenericHub::init(
peripheral,
lpf_char,
HubType::Unknown,
cancel,
)
.await?,
)),
}
}
pub async fn scan(
&mut self,
) -> Result<impl Stream<Item = DiscoveredHub> + '_> {
let events = self.adapter.events().await?;
self.adapter.start_scan(scanfilter()).await?;
Ok(events.filter_map(|event| async {
let CentralEvent::DeviceDiscovered(id) = event else {
None?
};
let peripheral = self.adapter.peripheral(&id).await.ok()?;
println!("{:?}", peripheral.properties().await.unwrap());
let Some(props) = peripheral.properties().await.ok()? else {
None?
};
if let Some(hub_type) = identify_hub(&props).await.ok()? {
let hub = DiscoveredHub {
hub_type,
addr: id,
name: props
.local_name
.unwrap_or_else(|| "unknown".to_string()),
};
Some(hub)
} else {
None
}
}))
}
pub async fn scan2(
&mut self,
) -> Result<Pin<Box<dyn Stream<Item = DiscoveredHub> + Send + '_>>> {
let events = self.adapter.events().await?;
self.adapter.start_scan(scanfilter()).await?;
Ok(Box::pin(events.filter_map(|event| async {
let CentralEvent::DeviceDiscovered(id) = event else {
None?
};
let peripheral = self.adapter.peripheral(&id).await.ok()?;
println!("{:?}", peripheral.properties().await.unwrap());
let Some(props) = peripheral.properties().await.ok()? else {
None?
};
if let Some(hub_type) = identify_hub(&props).await.ok()? {
let hub = DiscoveredHub {
hub_type,
addr: id,
name: props
.local_name
.unwrap_or_else(|| "unknown".to_string()),
};
Some(hub)
} else {
None
}
})))
}
}
fn scanfilter() -> ScanFilter {
ScanFilter {
services: vec![
*consts::bleservice::LPF2_HUB,
*consts::bleservice::WEDO2_SMART_HUB,
],
}
}
#[derive(Debug, PartialEq, Eq)]
pub enum HubFilter {
Name(String),
Addr(String),
Kind(HubType),
Null,
}
impl HubFilter {
pub fn matches(&self, hub: &DiscoveredHub) -> bool {
use HubFilter::*;
match self {
Name(n) => hub.name == *n,
Addr(a) => format!("{:?}", hub.addr) == *a,
Kind(k) => hub.hub_type == *k,
Null => true,
}
}
}
#[derive(Clone, Debug)]
pub struct DiscoveredHub {
pub hub_type: HubType,
pub addr: PeripheralId,
pub name: String,
}
async fn identify_hub(props: &PeripheralProperties) -> Result<Option<HubType>> {
use HubType::*;
if props
.services
.contains(&consts::bleservice::WEDO2_SMART_HUB)
{
return Ok(Some(Wedo2SmartHub));
} else if props.services.contains(&consts::bleservice::LPF2_HUB) {
if let Some(manufacturer_id) = props.manufacturer_data.get(&919) {
if let Some(m) = BLEManufacturerData::from_u8(manufacturer_id[1]) {
use BLEManufacturerData::*;
return Ok(Some(match m {
DuploTrainBaseId => DuploTrainBase,
HubId => Hub,
MarioId => Mario,
MoveHubId => MoveHub,
RemoteControlId => RemoteControl,
TechnicMediumHubId => TechnicMediumHub,
}));
}
}
}
Ok(None)
}
pub struct ConnectedHub {
pub name: String,
pub mutex: HubMutex,
pub kind: HubType,
pub cancel: CancellationToken,
}
impl ConnectedHub {
pub async fn setup_hub(created_hub: Box<dyn Hub>) -> Result<ConnectedHub> {
let connected_hub = ConnectedHub {
kind: created_hub.kind(),
name: created_hub.name().await?,
cancel: created_hub.cancel_token(),
mutex: Arc::new(Mutex::new(created_hub)),
};
{
let lock = &mut connected_hub.mutex.lock().await;
lock.channels().singlevalue_sender =
Some(broadcast::channel::<PortValueSingleFormat>(32).0);
lock.channels().combinedvalue_sender =
Some(broadcast::channel::<PortValueCombinedFormat>(32).0);
lock.channels().networkcmd_sender =
Some(broadcast::channel::<NetworkCommand>(16).0);
lock.channels().hubnotification_sender =
Some(broadcast::channel::<HubNotification>(16).0);
lock.channels().commandfeedback_sender = Some(
broadcast::channel::<PortOutputCommandFeedbackFormat>(16).0,
);
}
let hub_mutex = connected_hub.mutex.clone();
{
let lock = &mut connected_hub.mutex.lock().await;
let stream: NotificationStream =
lock.peripheral().notifications().await?;
let senders = lock.channels().clone();
let io_handler_cancel = connected_hub.cancel.clone();
let _io_handler_task = tokio::spawn(async move {
crate::hubs::io_event::io_event_handler(
stream,
hub_mutex,
senders,
io_handler_cancel,
)
.await
.expect("Error setting up main notification handler");
});
}
{
let lock = connected_hub.mutex.lock().await;
match lock.peripheral().subscribe(&lock.characteristic()).await {
Ok(()) => (),
Err(e) => {
eprintln!(
"Error subscribing to peripheral notifications: {:#?}",
e
)
}
}
}
tokio::time::sleep(Duration::from_millis(3000)).await;
Ok(connected_hub)
}
}