mod config;
mod device;
pub use config::ManagerConfig;
pub use device::Device;
use std::sync::Arc;
use anyhow::{Context, Result};
use std::time::Duration;
use tokio::sync::mpsc::UnboundedReceiver;
use crate::{certmanager, controller, discover::{self, MatterDeviceInfo}, fabric::Fabric, mdns2, onboarding, transport};
pub struct DeviceManager {
base_path: String,
config: ManagerConfig,
transport: Arc<transport::Transport>,
controller: Arc<controller::Controller>,
certmanager: Arc<dyn certmanager::CertManager>,
registry: std::sync::Mutex<device::DeviceRegistry>,
mdns: Arc<mdns2::MdnsService>,
mdns_receiver: tokio::sync::Mutex<UnboundedReceiver<mdns2::MdnsEvent>>,
}
impl DeviceManager {
pub async fn create(base_path: &str, config: ManagerConfig) -> Result<Self> {
std::fs::create_dir_all(base_path)
.context(format!("creating base directory {}", base_path))?;
config::save_config(base_path, &config)?;
let pem = config::pem_path(base_path);
let cm = certmanager::FileCertManager::new(config.fabric_id, &pem);
cm.bootstrap()?;
cm.create_user(config.controller_id)?;
let cm: Arc<dyn certmanager::CertManager> = certmanager::FileCertManager::load(&pem)?;
let transport = transport::Transport::new(&config.local_address).await?;
let controller = controller::Controller::new(&cm, &transport, config.fabric_id)?;
let registry = device::DeviceRegistry::load(&config::devices_path(base_path))?;
let (mdns, mdns_receiver) = mdns2::MdnsService::new().await?;
Ok(Self {
base_path: base_path.to_owned(),
config,
transport,
controller,
certmanager: cm,
registry: std::sync::Mutex::new(registry),
mdns,
mdns_receiver: tokio::sync::Mutex::new(mdns_receiver),
})
}
pub async fn load(base_path: &str) -> Result<Self> {
let config = config::load_config(base_path)?;
let pem = config::pem_path(base_path);
let cm: Arc<dyn certmanager::CertManager> = certmanager::FileCertManager::load(&pem)?;
let transport = transport::Transport::new(&config.local_address).await?;
let controller = controller::Controller::new(&cm, &transport, config.fabric_id)?;
let registry = device::DeviceRegistry::load(&config::devices_path(base_path))?;
let (mdns, mdns_receiver) = mdns2::MdnsService::new().await?;
Ok(Self {
base_path: base_path.to_owned(),
config,
transport,
controller,
certmanager: cm,
registry: std::sync::Mutex::new(registry),
mdns,
mdns_receiver: tokio::sync::Mutex::new(mdns_receiver),
})
}
pub async fn commission(
&self,
address: &str,
pin: u32,
node_id: u64,
name: &str,
) -> Result<controller::Connection> {
let conn = self.transport.create_connection(address).await;
let connection = self
.controller
.commission(&conn, pin, node_id, self.config.controller_id)
.await?;
let device = Device {
node_id,
address: address.to_owned(),
name: name.to_owned(),
};
self.registry
.lock()
.map_err(|e| anyhow::anyhow!("registry lock: {}", e))?
.add(device)?;
Ok(connection)
}
pub async fn connect(&self, node_id: u64) -> Result<controller::Connection> {
let address = {
let reg = self.registry.lock().map_err(|e| anyhow::anyhow!("registry lock: {}", e))?;
reg.get(node_id)
.context(format!("device {} not found in registry", node_id))?
.address
.clone()
};
self.connect_with_rediscovery(node_id, &address).await
}
pub async fn connect_by_name(&self, name: &str) -> Result<controller::Connection> {
let (node_id, address) = {
let reg = self.registry.lock().map_err(|e| anyhow::anyhow!("registry lock: {}", e))?;
let dev = reg
.get_by_name(name)
.context(format!("device '{}' not found in registry", name))?;
(dev.node_id, dev.address.clone())
};
self.connect_with_rediscovery(node_id, &address).await
}
async fn connect_with_rediscovery(&self, node_id: u64, address: &str) -> Result<controller::Connection> {
let mut current_address = address.to_string();
let mut conn = self.transport.create_connection(¤t_address).await;
match self.controller.auth_sigma_with_busy_retry(&conn, node_id, self.config.controller_id).await {
Ok(ses) => Ok(controller::Connection::from_parts(conn, ses)),
Err(e) => {
log::info!(
"Connection to {} failed ({}), attempting operational rediscovery...",
current_address, e
);
let new_address = self
.discover_device(node_id, Duration::from_secs(10))
.await
.context(format!("rediscovery for node {} after connect failure", node_id))?;
current_address = new_address;
conn = self.transport.create_connection(¤t_address).await;
let ses = self
.controller
.auth_sigma_with_busy_retry(&conn, node_id, self.config.controller_id)
.await
.context(format!(
"connection still failed after rediscovery at {}", current_address
))?;
Ok(controller::Connection::from_parts(conn, ses))
}
}
}
pub async fn reauth(&self, conn: &controller::Connection, node_id: u64) -> Result<()> {
conn.reauth(&self.controller, node_id, self.config.controller_id).await
}
pub async fn commission_with_code(
&self,
pairing_code: &str,
node_id: u64,
name: &str,
) -> Result<controller::Connection> {
let info = onboarding::decode_manual_pairing_code(pairing_code)
.context("decoding manual pairing code")?;
let discriminator = info.discriminator;
let passcode = info.passcode;
log::info!("Discovering device with discriminator {}...", discriminator);
let mut receiver = self.mdns_receiver.lock().await;
self.mdns.active_lookup("_matterc._udp.local", 0xff).await;
let discovery_timeout = Duration::from_secs(10);
let start_time = std::time::Instant::now();
let (ips, port) = loop {
if start_time.elapsed() > discovery_timeout {
anyhow::bail!("timed out waiting for device with discriminator {}", discriminator);
}
let remaining = discovery_timeout.checked_sub(start_time.elapsed()).unwrap_or_default();
let event = tokio::time::timeout(remaining, receiver.recv())
.await
.map_err(|_| anyhow::anyhow!("timed out waiting for device with discriminator {}", discriminator))?;
match event {
Some(mdns2::MdnsEvent::ServiceDiscovered { name: svc_name, records: _, target }) => {
if svc_name != "_matterc._udp.local." {
continue;
}
let matter_info = match discover::extract_matter_info(&target, &self.mdns).await {
Ok(i) => i,
Err(e) => {
log::debug!("Failed to extract Matter info from {}: {}", target, e);
continue;
}
};
if let Some(ref d) = matter_info.discriminator {
let mut mdns_discriminator = d.parse::<u16>()?;
if info.is_short_discriminator {
mdns_discriminator &= 0xf00;
}
if mdns_discriminator == discriminator {
break (matter_info.ips, matter_info.port.unwrap_or(5540));
}
}
}
None => {
anyhow::bail!("no commissionable device found with discriminator {}", discriminator);
}
_ => {}
}
};
drop(receiver);
if ips.is_empty() {
anyhow::bail!("discovered device with discriminator {} but no IPs returned", discriminator);
}
let mut last_err = anyhow::anyhow!("no IPs to try");
for ip in &ips {
let address = if ip.is_ipv6() {
format!("[{}]:{}", ip, port)
} else {
format!("{}:{}", ip, port)
};
match self.commission(&address, passcode, node_id, name).await {
Ok(conn) => return Ok(conn),
Err(e) => {
log::debug!("Commission attempt at {} failed: {}", address, e);
last_err = e;
}
}
}
Err(last_err).context(format!("commissioning failed on all IPs for discriminator {}", discriminator))
}
#[cfg(feature = "ble")]
pub async fn commission_ble_with_code(
&self,
pairing_code: &str,
node_id: u64,
name: &str,
network_creds: crate::commission::NetworkCreds,
) -> Result<controller::Connection> {
let info = if pairing_code.starts_with("MT:") || pairing_code.starts_with("mt:") {
crate::onboarding::decode_qr_payload(pairing_code)
} else {
crate::onboarding::decode_manual_pairing_code(pairing_code)
}
.context("decoding pairing code")?;
let connection = self
.controller
.commission_ble(
info.discriminator,
info.is_short_discriminator,
info.passcode,
node_id,
self.config.controller_id,
network_creds,
&self.mdns,
&self.mdns_receiver,
)
.await?;
let device = crate::devman::Device {
node_id,
address: String::new(), name: name.to_owned(),
};
self.registry
.lock()
.map_err(|e| anyhow::anyhow!("registry lock: {}", e))?
.add(device)?;
Ok(connection)
}
pub async fn discover_device(&self, node_id: u64, timeout: Duration) -> Result<String> {
let ca_public_key = self.certmanager.get_ca_public_key()?;
let fabric = Fabric::new(self.config.fabric_id, 0, &ca_public_key);
let compressed = fabric.compressed().context("computing compressed fabric ID")?;
let instance_name = format!("{}-{:016X}", hex::encode_upper(&compressed), node_id);
let expected_target = format!("{}._matter._tcp.local.", instance_name);
log::info!("Operational discovery for instance {}...", instance_name);
let mut receiver = self.mdns_receiver.lock().await;
self.mdns.active_lookup("_matter._tcp.local", 0xff).await;
let (ips, port) = loop {
let event = tokio::time::timeout(timeout, receiver.recv())
.await
.map_err(|_| anyhow::anyhow!("operational discovery timeout for node {}", node_id))?;
match event {
Some(mdns2::MdnsEvent::ServiceDiscovered { name: svc_name, records: _, target }) => {
if svc_name != "_matter._tcp.local." {
continue;
}
if target != expected_target {
continue;
}
let matter_info = match discover::extract_matter_info(&target, &self.mdns).await {
Ok(i) => i,
Err(e) => {
log::debug!("Failed to extract Matter info from {}: {}", target, e);
continue;
}
};
break (matter_info.ips, matter_info.port.unwrap_or(5540));
}
None => {
anyhow::bail!("no operational mDNS result for instance {}", instance_name);
}
_ => {}
}
};
drop(receiver);
let ip = ips.first()
.context(format!("discovered {} but no IPs in response", instance_name))?;
let address = if ip.is_ipv6() {
format!("[{}]:{}", ip, port)
} else {
format!("{}:{}", ip, port)
};
self.update_device_address(node_id, &address)?;
Ok(address)
}
pub async fn discover_commissionable_devices(&self, timeout: Duration) -> Result<Vec<(String, MatterDeviceInfo)>> {
let mut receiver = self.mdns_receiver.lock().await;
self.mdns.active_lookup("_matterc._udp.local", 0xff).await;
let mut devices = Vec::new();
let start = std::time::Instant::now();
while start.elapsed() < timeout {
let remaining = timeout.checked_sub(start.elapsed()).unwrap_or_default();
let event = tokio::time::timeout(remaining, receiver.recv())
.await;
let event = match event {
Ok(e) => e,
Err(_) => break, };
match event {
Some(mdns2::MdnsEvent::ServiceDiscovered { name: svc_name, records: _, target }) => {
if svc_name != "_matterc._udp.local." {
continue;
}
let matter_info = match discover::extract_matter_info(&target, &self.mdns).await {
Ok(i) => i,
Err(e) => {
log::debug!("Failed to extract Matter info from {}: {}", target, e);
continue;
}
};
devices.push((target, matter_info));
}
None => break,
_ => {}
}
}
Ok(devices)
}
pub fn list_devices(&self) -> Result<Vec<Device>> {
let reg = self.registry.lock().map_err(|e| anyhow::anyhow!("registry lock: {}", e))?;
Ok(reg.list().to_vec())
}
pub fn get_device(&self, node_id: u64) -> Result<Option<Device>> {
let reg = self.registry.lock().map_err(|e| anyhow::anyhow!("registry lock: {}", e))?;
Ok(reg.get(node_id).cloned())
}
pub fn get_device_by_name(&self, name: &str) -> Result<Option<Device>> {
let reg = self.registry.lock().map_err(|e| anyhow::anyhow!("registry lock: {}", e))?;
Ok(reg.get_by_name(name).cloned())
}
pub fn remove_device(&self, node_id: u64) -> Result<()> {
self.registry
.lock()
.map_err(|e| anyhow::anyhow!("registry lock: {}", e))?
.remove(node_id)
}
pub fn rename_device(&self, node_id: u64, name: &str) -> Result<()> {
self.registry
.lock()
.map_err(|e| anyhow::anyhow!("registry lock: {}", e))?
.rename(node_id, name)
}
pub fn update_device_address(&self, node_id: u64, address: &str) -> Result<()> {
self.registry
.lock()
.map_err(|e| anyhow::anyhow!("registry lock: {}", e))?
.update_address(node_id, address)
}
pub fn mdns(&self) -> &Arc<mdns2::MdnsService> {
&self.mdns
}
pub fn controller(&self) -> &Arc<controller::Controller> {
&self.controller
}
pub fn transport(&self) -> &Arc<transport::Transport> {
&self.transport
}
pub fn certmanager(&self) -> &Arc<dyn certmanager::CertManager> {
&self.certmanager
}
pub fn config(&self) -> &ManagerConfig {
&self.config
}
pub fn base_path(&self) -> &str {
&self.base_path
}
}