use base64::Engine;
use peat_btle::{
config::BleConfig,
gatt::PeatCharacteristicUuids,
platform::{linux::BluerAdapter, BleAdapter, DiscoveredDevice},
security::MeshGenesis,
NodeId, PeatMesh, PeatMeshConfig, PEAT_SERVICE_UUID,
};
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use tokio::sync::RwLock;
fn now_ms() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis() as u64
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init();
let args: Vec<String> = std::env::args().collect();
let adapter_name = args
.iter()
.position(|a| a == "--adapter")
.and_then(|i| args.get(i + 1))
.map(|s| s.as_str())
.unwrap_or("hci1"); let mesh_id_arg = args
.iter()
.position(|a| a == "--mesh-id")
.and_then(|i| args.get(i + 1))
.map(|s| s.as_str());
let timeout_secs: u64 = args
.iter()
.position(|a| a == "--timeout")
.and_then(|i| args.get(i + 1))
.and_then(|s| s.parse().ok())
.unwrap_or(30);
let use_encryption_flag = args.iter().any(|a| a == "--encrypt");
let genesis_b64 = args
.iter()
.position(|a| a == "--genesis")
.and_then(|i| args.get(i + 1))
.cloned();
let callsign = "TEST-CLI";
let node_id = NodeId::new(0xC11E_0001);
const TEST_SECRET: [u8; 32] = [
0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0a, 0x0b, 0x0c, 0x0d, 0x0e, 0x0f,
0x10, 0x11, 0x12, 0x13, 0x14, 0x15, 0x16, 0x17, 0x18, 0x19, 0x1a, 0x1b, 0x1c, 0x1d, 0x1e,
0x1f, 0x20,
];
let (mesh_id, use_encryption, mesh_config) = if let Some(ref b64) = genesis_b64 {
let genesis_bytes = base64::engine::general_purpose::STANDARD
.decode(b64)
.unwrap_or_else(|e| {
base64::engine::general_purpose::STANDARD_NO_PAD
.decode(b64)
.unwrap_or_else(|_| panic!("Invalid base64 genesis: {}", e))
});
let genesis =
MeshGenesis::decode(&genesis_bytes).expect("Failed to decode genesis (invalid format)");
let mesh_id = genesis.mesh_id();
let secret = genesis.encryption_secret();
let config = PeatMeshConfig::new(node_id, callsign, &mesh_id).with_encryption(secret);
(mesh_id, true, config)
} else {
let mesh_id = mesh_id_arg.unwrap_or("TEST").to_string();
let mut config = PeatMeshConfig::new(node_id, callsign, &mesh_id);
if use_encryption_flag {
config = config.with_encryption(TEST_SECRET);
}
(mesh_id, use_encryption_flag, config)
};
log::info!("===========================================");
log::info!("Peat BLE Test Client");
log::info!("===========================================");
log::info!("Adapter: {}", adapter_name);
log::info!("Node ID: 0x{:08X}", node_id.as_u32());
log::info!("Callsign: {}", callsign);
log::info!("Mesh ID: {}", mesh_id);
log::info!("Timeout: {}s", timeout_secs);
log::info!("Encrypt: {}", use_encryption);
if genesis_b64.is_some() {
log::info!("Genesis: YES (shared genesis provided)");
}
log::info!("===========================================");
let mesh = Arc::new(RwLock::new(PeatMesh::new(mesh_config)));
let adapter = BluerAdapter::with_adapter_name(adapter_name).await?;
log::info!(
"Bluetooth adapter: {} ({})",
adapter.adapter_name(),
adapter.address().unwrap_or_else(|| "unknown".to_string())
);
let ble_config = BleConfig::new(node_id);
let mut adapter = adapter;
adapter.init(&ble_config).await?;
let found_peer = Arc::new(AtomicBool::new(false));
let sync_received = Arc::new(AtomicBool::new(false));
let peer_node_id = Arc::new(AtomicU32::new(0));
let peer_callsign = Arc::new(RwLock::new(String::new()));
let found_peer_cb = found_peer.clone();
let peer_node_id_cb = peer_node_id.clone();
let mesh_id_prefix = format!("PEAT_{}-", mesh_id);
adapter.set_discovery_callback(Some(Arc::new(move |device: DiscoveredDevice| {
if device.is_peat_node {
let name = device.name.as_deref().unwrap_or("unknown");
log::info!(
"Found Peat node: {} ({}) RSSI={}",
name,
device.address,
device.rssi
);
let matches_mesh = name.starts_with(&mesh_id_prefix) || name.starts_with("PEAT-");
if !matches_mesh {
log::debug!("Skipping non-Peat peer: {}", name);
return;
}
if let Some(pid) = device.node_id {
peer_node_id_cb.store(pid.as_u32(), Ordering::SeqCst);
found_peer_cb.store(true, Ordering::SeqCst);
}
}
})));
adapter.register_gatt_service().await?;
let adapter = Arc::new(adapter);
let mesh_for_callback = mesh.clone();
let sync_received_cb = sync_received.clone();
let peer_callsign_cb = peer_callsign.clone();
adapter
.set_sync_data_callback(move |data| {
let mesh = mesh_for_callback.clone();
let sync_flag = sync_received_cb.clone();
let callsign_store = peer_callsign_cb.clone();
tokio::spawn(async move {
let now = now_ms();
let mesh_guard = mesh.read().await;
if let Some(result) =
mesh_guard.on_ble_data_received_anonymous("gatt-peer", &data, now)
{
log::info!(
"SYNC RECEIVED from 0x{:08X}: counter_changed={}, total={}",
result.source_node.as_u32(),
result.counter_changed,
result.total_count
);
if let Some(cs) = &result.callsign {
log::info!(" Peer callsign: {}", cs);
*callsign_store.write().await = cs.clone();
}
sync_flag.store(true, Ordering::SeqCst);
}
});
})
.await;
adapter.start().await?;
log::info!("Scanning for Peat nodes...");
let start = Instant::now();
let timeout = Duration::from_secs(timeout_secs);
let mut tick_count = 0u64;
let mut connected = false;
loop {
if start.elapsed() > timeout {
log::error!("TEST FAILED: Timeout after {}s", timeout_secs);
log::error!(" Found peer: {}", found_peer.load(Ordering::SeqCst));
log::error!(" Connected: {}", connected);
log::error!(" Sync received: {}", sync_received.load(Ordering::SeqCst));
adapter.stop().await?;
std::process::exit(1);
}
if !found_peer.load(Ordering::SeqCst) {
tokio::time::sleep(Duration::from_millis(100)).await;
continue;
}
let pid = NodeId::new(peer_node_id.load(Ordering::SeqCst));
if !connected {
log::info!("Connecting to peer 0x{:08X}...", pid.as_u32());
match adapter.connect(&pid).await {
Ok(_conn) => {
log::info!("Connected!");
connected = true;
let mesh_guard = mesh.read().await;
mesh_guard.on_ble_connected(&format!("{:08X}", pid.as_u32()), now_ms());
}
Err(e) => {
log::warn!("Connection failed: {}, retrying...", e);
tokio::time::sleep(Duration::from_secs(1)).await;
continue;
}
}
}
tick_count += 1;
if let Some(conn) = adapter.get_connection(&pid).await {
let mesh_guard = mesh.read().await;
let doc = if use_encryption {
mesh_guard.build_full_delta_document(now_ms())
} else {
mesh_guard.build_document()
};
adapter.update_sync_state(&doc).await;
match conn
.write_characteristic(
PEAT_SERVICE_UUID,
PeatCharacteristicUuids::sync_data(),
&doc,
)
.await
{
Ok(()) => {
log::debug!("Tick {} - wrote {} bytes to peer", tick_count, doc.len())
}
Err(e) => log::warn!("Tick {} - failed to write sync_data: {}", tick_count, e),
}
drop(mesh_guard);
match conn
.read_characteristic(PEAT_SERVICE_UUID, PeatCharacteristicUuids::sync_state())
.await
{
Ok(data) if !data.is_empty() => {
log::debug!("Read {} bytes from peer sync_state", data.len());
let mesh_guard = mesh.read().await;
if let Some(result) =
mesh_guard.on_ble_data_received_anonymous("gatt-peer", &data, now_ms())
{
log::info!(
"SYNC from peer 0x{:08X}: counter_changed={}, total={}",
result.source_node.as_u32(),
result.counter_changed,
result.total_count
);
if let Some(cs) = &result.callsign {
log::info!(" Peer callsign: {}", cs);
*peer_callsign.write().await = cs.clone();
}
sync_received.store(true, Ordering::SeqCst);
}
}
Ok(_) => {}
Err(e) => log::debug!("Failed to read peer sync_state: {}", e),
}
}
if sync_received.load(Ordering::SeqCst) {
let mesh_guard = mesh.read().await;
let total = mesh_guard.total_count();
let app_docs = mesh_guard.app_document_count();
let peer_cs = peer_callsign.read().await;
log::info!("===========================================");
log::info!("TEST PASSED!");
log::info!("===========================================");
log::info!(" Total mesh count: {}", total);
log::info!(" App documents: {}", app_docs);
log::info!(" Peer callsign: {}", *peer_cs);
log::info!(" Time elapsed: {:?}", start.elapsed());
log::info!("===========================================");
adapter.stop().await?;
std::process::exit(0);
}
tokio::time::sleep(Duration::from_millis(500)).await;
}
}