use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use clap::Parser;
use tokio::sync::watch;
use tracing::{debug, error, info, warn};
use tracing_subscriber::EnvFilter;
use viva_service::config::Cli;
use viva_service::device::DeviceHandle;
use viva_service::{acquisition, nodes, status, xml};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let cli = Cli::parse();
init_tracing(cli.verbose);
info!("Starting genicam-service");
let zenoh_config = load_zenoh_config(cli.zenoh_config.as_deref())?;
let session = Arc::new(zenoh::open(zenoh_config).await?);
info!("Zenoh session opened");
let (shutdown_tx, shutdown_rx) = watch::channel(false);
let discovery_timeout = cli.discovery_timeout();
let discovery_interval = cli.discovery_interval();
let iface = cli.iface.clone();
let active_devices: Arc<tokio::sync::Mutex<HashMap<String, Vec<tokio::task::JoinHandle<()>>>>> =
Arc::new(tokio::sync::Mutex::new(HashMap::new()));
let session_ref = session.clone();
let shutdown_rx_ref = shutdown_rx.clone();
let active_ref = active_devices.clone();
let discovery_handle = tokio::spawn(async move {
run_discovery_loop(
session_ref,
discovery_timeout,
discovery_interval,
iface,
shutdown_rx_ref,
active_ref,
)
.await;
});
tokio::signal::ctrl_c().await?;
info!("Shutdown requested (CTRL+C)");
let _ = shutdown_tx.send(true);
let _ = discovery_handle.await;
let mut active = active_devices.lock().await;
for (device_id, tasks) in active.drain() {
info!(device_id, "waiting for device tasks to finish");
for task in tasks {
let _ = task.await;
}
}
session.close().await?;
info!("genicam-service shut down");
Ok(())
}
async fn run_discovery_loop(
session: Arc<zenoh::Session>,
discovery_timeout: std::time::Duration,
discovery_interval: std::time::Duration,
iface: Option<String>,
mut shutdown: watch::Receiver<bool>,
active_devices: Arc<tokio::sync::Mutex<HashMap<String, Vec<tokio::task::JoinHandle<()>>>>>,
) {
use viva_genicam::gige;
loop {
let devices = match &iface {
Some(name) => gige::discover_on_interface(discovery_timeout, name).await,
None => gige::discover(discovery_timeout).await,
};
let mut discovered_ids = std::collections::HashSet::new();
match devices {
Ok(found) => {
for dev_info in &found {
discovered_ids.insert(derive_device_id(dev_info));
}
for dev_info in found {
let device_id = derive_device_id(&dev_info);
let mut active = active_devices.lock().await;
if active.contains_key(&device_id) {
drop(active);
publish_announce(
&session,
&device_id,
dev_info.model.as_deref().unwrap_or("Unknown"),
)
.await;
continue;
}
info!(device_id, ip = %dev_info.ip, "new camera, connecting...");
match DeviceHandle::connect(&dev_info, iface.clone()).await {
Ok(handle) => {
let handle = Arc::new(handle);
info!(device_id, "connected, spawning service tasks");
let shutdown_rx = shutdown.clone();
let tasks =
spawn_device_tasks(session.clone(), handle, shutdown_rx).await;
active.insert(device_id.clone(), tasks);
publish_announce(
&session,
&device_id,
dev_info.model.as_deref().unwrap_or("Unknown"),
)
.await;
}
Err(e) => {
error!(device_id, error = %e, "failed to connect");
}
}
}
}
Err(e) => {
error!(error = %e, "discovery failed");
}
}
if !discovered_ids.is_empty() {
let mut active = active_devices.lock().await;
let lost: Vec<String> = active
.keys()
.filter(|id| !discovered_ids.contains(id.as_str()))
.cloned()
.collect();
for device_id in lost {
warn!(device_id, "device lost, cleaning up");
if let Some(tasks) = active.remove(&device_id) {
for task in tasks {
task.abort();
}
}
status::publish_disconnected(&session, &device_id, "device lost").await;
}
}
tokio::select! {
_ = tokio::time::sleep(discovery_interval) => {}
_ = shutdown.changed() => {
if *shutdown.borrow() {
info!("discovery loop shutting down");
return;
}
}
}
}
}
async fn spawn_device_tasks(
session: Arc<zenoh::Session>,
device: Arc<DeviceHandle>,
shutdown: watch::Receiver<bool>,
) -> Vec<tokio::task::JoinHandle<()>> {
let device_id = device.device_id().to_string();
status::publish_connected(&session, &device_id).await;
nodes::publish_initial_values(&session, device.as_ref()).await;
vec![
tokio::spawn(xml::run(
session.clone(),
device_id.clone(),
device.raw_xml().to_string(),
shutdown.clone(),
)),
tokio::spawn(nodes::run_set_queryable(
session.clone(),
device.clone(),
shutdown.clone(),
)),
tokio::spawn(nodes::run_execute_queryable(
session.clone(),
device.clone(),
shutdown.clone(),
)),
tokio::spawn(nodes::run_bulk_read_queryable(
session.clone(),
device.clone(),
shutdown.clone(),
)),
tokio::spawn(nodes::run_introspect_queryable(
session.clone(),
device.clone(),
shutdown.clone(),
)),
tokio::spawn(nodes::run_bulk_state_queryable(
session.clone(),
device.clone(),
shutdown.clone(),
)),
tokio::spawn(acquisition::run(
session.clone(),
device.clone(),
shutdown.clone(),
)),
tokio::spawn(heartbeat_loop(device.clone(), shutdown.clone())),
]
}
async fn publish_announce(session: &zenoh::Session, device_id: &str, model: &str) {
use viva_zenoh_api::{API_VERSION, DeviceAnnounce, keys};
let announce = DeviceAnnounce {
id: device_id.to_string(),
name: model.to_string(),
model: model.to_string(),
serial: device_id.to_string(),
api_version: Some(API_VERSION),
};
let key = keys::announce(device_id);
if let Ok(payload) = serde_json::to_vec(&announce) {
let _ = session.put(&key, payload).await;
}
}
fn derive_device_id(info: &viva_genicam::gige::DeviceInfo) -> String {
let mac = info
.mac
.iter()
.map(|b| format!("{b:02x}"))
.collect::<Vec<_>>()
.join("");
format!("cam-{mac}")
}
fn init_tracing(verbose: u8) {
let default_level = match verbose {
0 => "info",
1 => "debug",
_ => "trace",
};
let filter =
EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new(default_level));
tracing_subscriber::fmt().with_env_filter(filter).init();
}
async fn heartbeat_loop(device: Arc<DeviceHandle>, mut shutdown: watch::Receiver<bool>) {
use tokio::time::MissedTickBehavior;
let mut interval = tokio::time::interval(Duration::from_millis(500));
interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
let mut consecutive_failures: u32 = 0;
loop {
tokio::select! {
_ = interval.tick() => {
if device.is_heartbeat_paused() {
debug!("heartbeat skipped (paused for connection refresh)");
continue;
}
let start = tokio::time::Instant::now();
match device.heartbeat_ping().await {
Ok(()) => {
if consecutive_failures > 0 {
info!(
consecutive_failures,
"heartbeat recovered"
);
}
consecutive_failures = 0;
}
Err(e) => {
consecutive_failures += 1;
warn!(
error = %e,
consecutive_failures,
"heartbeat failed"
);
}
}
let elapsed = start.elapsed();
if elapsed > Duration::from_millis(400) {
warn!(
elapsed_ms = elapsed.as_millis() as u64,
"heartbeat ping slow (possible mutex contention)"
);
}
}
_ = shutdown.changed() => {
if *shutdown.borrow() { break; }
}
}
}
}
fn load_zenoh_config(
path: Option<&str>,
) -> Result<zenoh::Config, Box<dyn std::error::Error + Send + Sync>> {
match path {
Some(p) => Ok(zenoh::Config::from_file(p)?),
None => Ok(zenoh::Config::default()),
}
}