use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use asyn_rs::runtime::config::RuntimeConfig;
use asyn_rs::runtime::port::{PortRuntimeHandle, create_port_runtime};
use asyn_rs::trace::TraceManager;
use epics_base_rs::server::iocsh::registry::*;
use crate::address::TopicAddress;
use crate::config::{MqttConfig, QoS};
use crate::driver::MqttDriver;
use crate::event_loop::mqtt_event_loop;
static PENDING_TOPICS: Mutex<Option<HashMap<String, Vec<TopicAddress>>>> = Mutex::new(None);
static PORT_RUNTIMES: Mutex<Option<Vec<PortRuntimeHandle>>> = Mutex::new(None);
fn keep_runtime(handle: PortRuntimeHandle) {
let mut guard = PORT_RUNTIMES.lock().unwrap();
let vec = guard.get_or_insert_with(Vec::new);
vec.push(handle);
}
pub(crate) fn register_pending_topic(port_name: &str, addr: TopicAddress) {
let mut guard = PENDING_TOPICS.lock().unwrap();
let map = guard.get_or_insert_with(HashMap::new);
map.entry(port_name.to_string()).or_default().push(addr);
}
fn take_pending_topics(port_name: &str) -> Vec<TopicAddress> {
let mut guard = PENDING_TOPICS.lock().unwrap();
guard
.as_mut()
.and_then(|map| map.remove(port_name))
.unwrap_or_default()
}
pub fn mqtt_add_topic_command() -> CommandDef {
CommandDef::new(
"mqttAddTopic",
vec![
ArgDesc {
name: "portName",
arg_type: ArgType::String,
optional: false,
},
ArgDesc {
name: "drvInfo",
arg_type: ArgType::String,
optional: false,
},
],
"mqttAddTopic portName drvInfo - Register an MQTT topic before driver creation",
|args: &[ArgValue], _ctx: &CommandContext| -> CommandResult {
let port_name = match &args[0] {
ArgValue::String(s) => s.clone(),
_ => return Err("portName required".into()),
};
let drv_info = match &args[1] {
ArgValue::String(s) => s.clone(),
_ => return Err("drvInfo required".into()),
};
let addr = TopicAddress::parse(&drv_info).map_err(|e| e.to_string())?;
println!(
"mqttAddTopic: port={port_name} topic={}",
addr.to_drv_info()
);
register_pending_topic(&port_name, addr);
Ok(CommandOutcome::Continue)
},
)
}
pub fn mqtt_driver_configure_command(
handle: epics_base_rs::runtime::task::RuntimeHandle,
trace: Arc<TraceManager>,
) -> CommandDef {
CommandDef::new(
"mqttDriverConfigure",
vec![
ArgDesc {
name: "portName",
arg_type: ArgType::String,
optional: false,
},
ArgDesc {
name: "brokerUrl",
arg_type: ArgType::String,
optional: false,
},
ArgDesc {
name: "clientId",
arg_type: ArgType::String,
optional: false,
},
ArgDesc {
name: "qos",
arg_type: ArgType::Int,
optional: true,
},
ArgDesc {
name: "connPvName",
arg_type: ArgType::String,
optional: true,
},
],
"mqttDriverConfigure portName brokerUrl clientId [qos] [connPvName] - Create MQTT driver",
MqttConfigHandler { handle, trace },
)
}
struct MqttConfigHandler {
handle: epics_base_rs::runtime::task::RuntimeHandle,
trace: Arc<TraceManager>,
}
impl CommandHandler for MqttConfigHandler {
fn call(&self, args: &[ArgValue], ctx: &CommandContext) -> CommandResult {
let port_name = match &args[0] {
ArgValue::String(s) => s.clone(),
_ => return Err("portName required".into()),
};
let broker_url = match &args[1] {
ArgValue::String(s) => s.clone(),
_ => return Err("brokerUrl required".into()),
};
let client_id = match &args[2] {
ArgValue::String(s) => s.clone(),
_ => return Err("clientId required".into()),
};
let qos = match &args[3] {
ArgValue::Int(v) => QoS::from_int(*v as i32),
_ => QoS::default(),
};
let conn_pv_name = match &args[4] {
ArgValue::String(s) if !s.is_empty() => Some(s.clone()),
_ => None,
};
let (host, port) = MqttConfig::parse_broker_url(&broker_url);
let config = MqttConfig {
broker_host: host,
broker_port: port,
client_id,
qos,
..MqttConfig::default()
};
let topics = take_pending_topics(&port_name);
if topics.is_empty() {
println!("mqttDriverConfigure: WARNING — no topics registered for port '{port_name}'");
println!(" Use mqttAddTopic before mqttDriverConfigure");
} else {
println!(
"mqttDriverConfigure: port={port_name} broker={}:{} topics={}",
config.broker_host,
config.broker_port,
topics.len()
);
}
let (publish_tx, publish_rx) = tokio::sync::mpsc::unbounded_channel();
let driver = MqttDriver::new(&port_name, &config, topics, publish_tx);
let subscribed_topics = driver.subscribed_topics();
let topic_map = driver.topic_map().clone();
let connected_param = driver.connected_param;
let (runtime_handle, _actor_jh) = create_port_runtime(driver, RuntimeConfig::default());
let port_handle = runtime_handle.port_handle().clone();
asyn_rs::asyn_record::register_port(&port_name, port_handle.clone(), self.trace.clone());
keep_runtime(runtime_handle);
if let Some(pv_name) = conn_pv_name {
let db_str = format!(
concat!(
"record(bi, \"{pv}\") {{\n",
" field(DTYP, \"asynInt32\")\n",
" field(INP, \"@asyn({port}) {param}\")\n",
" field(SCAN, \"I/O Intr\")\n",
" field(ZNAM, \"Disconnected\")\n",
" field(ONAM, \"Connected\")\n",
" field(ZSV, \"MAJOR\")\n",
" field(OSV, \"NO_ALARM\")\n",
"}}\n",
),
pv = pv_name,
port = port_name,
param = crate::driver::PARAM_CONNECTED,
);
let macros = std::collections::HashMap::new();
if let Ok(defs) = epics_base_rs::server::db_loader::parse_db(&db_str, ¯os) {
for def in defs {
if let Ok(mut record) =
epics_base_rs::server::db_loader::create_record(&def.record_type)
{
let mut common_fields = Vec::new();
let _ = epics_base_rs::server::db_loader::apply_fields(
&mut record,
&def.fields,
&mut common_fields,
);
ctx.block_on(async {
if let Err(e) = ctx.db().add_record(&def.name, record).await {
eprintln!(
"mqttDriverConfigure: register '{}' skipped: {e}",
def.name
);
return;
}
if let Some(rec_arc) = ctx.db().get_record(&def.name).await {
let mut instance = rec_arc.write().await;
for (name, value) in common_fields {
let _ = instance.put_common_field(&name, value);
}
}
});
}
}
}
println!("mqttDriverConfigure: connected PV = {pv_name}");
}
let event_config = config.clone();
self.handle.spawn(async move {
mqtt_event_loop(
event_config,
subscribed_topics,
topic_map,
port_handle,
publish_rx,
connected_param,
)
.await;
});
Ok(CommandOutcome::Continue)
}
}
pub fn register_mqtt_commands(
app: epics_ca_rs::server::ioc_app::IocApplication,
handle: epics_base_rs::runtime::task::RuntimeHandle,
trace: Arc<TraceManager>,
) -> epics_ca_rs::server::ioc_app::IocApplication {
app.register_startup_command(mqtt_add_topic_command())
.register_startup_command(mqtt_driver_configure_command(handle, trace))
}