#[cfg(feature = "file")]
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use serde_json::Value;
use tokio::sync::{Mutex as AsyncMutex, broadcast, broadcast::error::RecvError, mpsc};
use crate::FnvIndexMap;
use crate::agent::{Agent, AgentMessage, AgentStatus, agent_new};
use crate::config::{AgentConfigs, AgentConfigsMap};
use crate::context::AgentContext;
use crate::definition::{AgentConfigSpecs, AgentDefinition, AgentDefinitions};
use crate::error::AgentError;
use crate::id::{new_id, update_ids};
use crate::message::{self, AgentEventMessage};
use crate::preset::{Preset, PresetInfo};
use crate::registry;
use crate::spec::{AgentSpec, ConnectionSpec, PresetSpec};
use crate::value::AgentValue;
const MESSAGE_LIMIT: usize = 1024;
const EVENT_CHANNEL_CAPACITY: usize = 256;
#[derive(Clone)]
pub struct MAK {
pub(crate) agents: Arc<Mutex<FnvIndexMap<String, Arc<AsyncMutex<Box<dyn Agent>>>>>>,
pub(crate) agent_txs: Arc<Mutex<FnvIndexMap<String, mpsc::Sender<AgentMessage>>>>,
pub(crate) board_out_agents: Arc<Mutex<FnvIndexMap<String, Vec<String>>>>,
pub(crate) board_value: Arc<Mutex<FnvIndexMap<String, AgentValue>>>,
pub(crate) connections: Arc<Mutex<FnvIndexMap<String, Vec<(String, String, String)>>>>,
pub(crate) defs: Arc<Mutex<AgentDefinitions>>,
pub(crate) presets: Arc<Mutex<FnvIndexMap<String, Arc<AsyncMutex<Preset>>>>>,
pub(crate) global_configs_map: Arc<Mutex<FnvIndexMap<String, AgentConfigs>>>,
pub(crate) tx: Arc<Mutex<Option<mpsc::Sender<AgentEventMessage>>>>,
pub(crate) observers: broadcast::Sender<MAKEvent>,
}
impl MAK {
pub fn new() -> Self {
let (tx, _rx) = broadcast::channel(EVENT_CHANNEL_CAPACITY);
Self {
agents: Default::default(),
agent_txs: Default::default(),
board_out_agents: Default::default(),
board_value: Default::default(),
connections: Default::default(),
defs: Default::default(),
presets: Default::default(),
global_configs_map: Default::default(),
tx: Arc::new(Mutex::new(None)),
observers: tx,
}
}
pub(crate) fn tx(&self) -> Result<mpsc::Sender<AgentEventMessage>, AgentError> {
self.tx
.lock()
.unwrap()
.clone()
.ok_or(AgentError::TxNotInitialized)
}
pub fn init() -> Result<Self, AgentError> {
let mak = Self::new();
mak.register_agents();
Ok(mak)
}
fn register_agents(&self) {
registry::register_inventory_agents(self);
}
pub async fn ready(&self) -> Result<(), AgentError> {
self.spawn_message_loop().await?;
Ok(())
}
pub fn quit(&self) {
let mut tx_lock = self.tx.lock().unwrap();
*tx_lock = None;
}
pub fn new_preset(&self) -> Result<String, AgentError> {
let spec = PresetSpec::default();
let id = self.add_preset(spec)?;
Ok(id)
}
pub fn get_preset(&self, id: &str) -> Option<Arc<AsyncMutex<Preset>>> {
let presets = self.presets.lock().unwrap();
presets.get(id).cloned()
}
pub fn add_preset(&self, spec: PresetSpec) -> Result<String, AgentError> {
let preset = Preset::new(spec);
let id = preset.id().to_string();
for agent in &preset.spec().agents {
if let Err(e) = self.add_agent_internal(id.clone(), agent.clone()) {
log::error!("Failed to add_agent {}: {}", agent.id, e);
}
}
for connection in &preset.spec().connections {
self.add_connection_internal(connection.clone())
.unwrap_or_else(|e| {
log::error!("Failed to add_connection {}: {}", connection.source, e);
});
}
let mut presets = self.presets.lock().unwrap();
if presets.contains_key(&id) {
return Err(AgentError::DuplicateId(id.into()));
}
presets.insert(id.to_string(), Arc::new(AsyncMutex::new(preset)));
Ok(id)
}
pub async fn remove_preset(&self, id: &str) -> Result<(), AgentError> {
let preset = self
.get_preset(id)
.ok_or_else(|| AgentError::PresetNotFound(id.to_string()))?;
let mut preset = preset.lock().await;
preset.stop(self).await.unwrap_or_else(|e| {
log::error!("Failed to stop preset {}: {}", id, e);
});
for agent in &preset.spec().agents {
self.remove_agent_internal(&agent.id)
.await
.unwrap_or_else(|e| {
log::error!("Failed to remove_agent {}: {}", agent.id, e);
});
}
for connection in &preset.spec().connections {
self.remove_connection_internal(connection);
}
Ok(())
}
pub async fn start_preset(&self, id: &str) -> Result<(), AgentError> {
let preset = self
.get_preset(id)
.ok_or_else(|| AgentError::PresetNotFound(id.to_string()))?;
let mut preset = preset.lock().await;
preset.start(self).await?;
Ok(())
}
pub async fn stop_preset(&self, id: &str) -> Result<(), AgentError> {
let preset = self
.get_preset(id)
.ok_or_else(|| AgentError::PresetNotFound(id.to_string()))?;
let mut preset = preset.lock().await;
preset.stop(self).await?;
Ok(())
}
#[cfg(feature = "file")]
pub async fn open_preset_from_file(&self, path: &str) -> Result<String, AgentError> {
let json_str =
std::fs::read_to_string(path).map_err(|e| AgentError::IoError(e.to_string()))?;
let spec = PresetSpec::from_json(&json_str)?;
let id = self.add_preset(spec)?;
self.set_preset_file_name(&id, path).await?;
Ok(id)
}
#[cfg(feature = "file")]
pub async fn save_preset(&self, id: &str) -> Result<(), AgentError> {
let Some(preset_spec) = self.get_preset_spec(id).await else {
return Err(AgentError::PresetNotFound(id.to_string()));
};
let json_str = preset_spec.to_json()?;
let path = self
.get_preset_path(id)
.await
.ok_or_else(|| AgentError::EmptyFileName)?;
std::fs::write(&path, json_str).map_err(|e| AgentError::IoError(e.to_string()))?;
Ok(())
}
#[cfg(feature = "file")]
pub async fn save_preset_as(&self, id: &str, path: &str) -> Result<(), AgentError> {
self.set_preset_file_name(id, path).await?;
self.save_preset(id).await?;
Ok(())
}
#[cfg(feature = "file")]
pub async fn get_preset_path(&self, id: &str) -> Option<PathBuf> {
let Some(preset) = self.get_preset(id) else {
return None;
};
let preset = preset.lock().await;
let Some(name) = preset.name() else {
return None;
};
let Some(dir) = preset.dir() else {
return None;
};
let path = std::path::Path::new(&dir).join(name);
Some(path)
}
#[cfg(feature = "file")]
pub async fn set_preset_file_name(&self, id: &str, path: &str) -> Result<(), AgentError> {
let path = std::path::Path::new(path);
let name = path
.file_stem()
.and_then(|s| s.to_str())
.ok_or(AgentError::InvalidFileExtension)?;
let dir = path
.parent()
.and_then(|s| s.to_str())
.unwrap_or("")
.to_string();
let preset = {
let presets = self.presets.lock().unwrap();
let Some(preset) = presets.get(id) else {
return Err(AgentError::PresetNotFound(id.to_string()));
};
preset.clone()
};
let mut preset = preset.lock().await;
preset.set_name(name.to_string());
preset.set_dir(dir);
Ok(())
}
pub async fn get_preset_spec(&self, id: &str) -> Option<PresetSpec> {
let Some(preset) = self.get_preset(id) else {
return None;
};
let mut preset_spec = {
let preset = preset.lock().await;
preset.spec().clone()
};
let mut agent_specs = Vec::new();
for agent in &preset_spec.agents {
if let Some(spec) = self.get_agent_spec(&agent.id).await {
agent_specs.push(spec);
}
}
preset_spec.agents = agent_specs;
Some(preset_spec)
}
pub async fn update_preset_spec(&self, id: &str, value: &Value) -> Result<(), AgentError> {
let preset = self
.get_preset(id)
.ok_or_else(|| AgentError::PresetNotFound(id.to_string()))?;
let mut preset = preset.lock().await;
preset.update_spec(value)?;
Ok(())
}
pub async fn get_preset_info(&self, id: &str) -> Option<PresetInfo> {
let Some(preset) = self.get_preset(id) else {
return None;
};
Some(PresetInfo::from(&*preset.lock().await))
}
pub async fn get_preset_infos(&self) -> Vec<PresetInfo> {
let presets = {
let presets = self.presets.lock().unwrap();
presets.values().cloned().collect::<Vec<_>>()
};
let mut preset_infos = Vec::new();
for preset in presets {
let preset_guard = preset.lock().await;
preset_infos.push(PresetInfo::from(&*preset_guard));
}
preset_infos
}
pub fn register_agent_definiton(&self, def: AgentDefinition) {
let def_name = def.name.clone();
let def_global_configs = def.global_configs.clone();
let mut defs = self.defs.lock().unwrap();
defs.insert(def.name.clone(), def);
if let Some(def_global_configs) = def_global_configs {
let mut new_configs = AgentConfigs::default();
for (key, config_entry) in def_global_configs.iter() {
new_configs.set(key.clone(), config_entry.value.clone());
}
self.set_global_configs(def_name, new_configs);
}
}
pub fn get_agent_definitions(&self) -> AgentDefinitions {
let defs = self.defs.lock().unwrap();
defs.clone()
}
pub fn get_agent_definition(&self, def_name: &str) -> Option<AgentDefinition> {
let defs = self.defs.lock().unwrap();
defs.get(def_name).cloned()
}
pub fn get_agent_config_specs(&self, def_name: &str) -> Option<AgentConfigSpecs> {
let defs = self.defs.lock().unwrap();
let Some(def) = defs.get(def_name) else {
return None;
};
def.configs.clone()
}
pub async fn get_agent_spec(&self, agent_id: &str) -> Option<AgentSpec> {
let agent = {
let agents = self.agents.lock().unwrap();
let Some(agent) = agents.get(agent_id) else {
return None;
};
agent.clone()
};
let agent = agent.lock().await;
Some(agent.spec().clone())
}
pub async fn update_agent_spec(&self, agent_id: &str, value: &Value) -> Result<(), AgentError> {
let agent = {
let agents = self.agents.lock().unwrap();
let Some(agent) = agents.get(agent_id) else {
return Err(AgentError::AgentNotFound(agent_id.to_string()));
};
agent.clone()
};
let mut agent = agent.lock().await;
agent.update_spec(value)?;
Ok(())
}
pub fn new_agent_spec(&self, def_name: &str) -> Result<AgentSpec, AgentError> {
let def = self
.get_agent_definition(def_name)
.ok_or_else(|| AgentError::AgentDefinitionNotFound(def_name.to_string()))?;
Ok(def.to_spec())
}
pub async fn add_agent(
&self,
preset_id: String,
mut spec: AgentSpec,
) -> Result<String, AgentError> {
let preset = self
.get_preset(&preset_id)
.ok_or_else(|| AgentError::PresetNotFound(preset_id.to_string()))?;
let id = new_id();
spec.id = id.clone();
self.add_agent_internal(preset_id, spec.clone())?;
let mut preset = preset.lock().await;
preset.add_agent(spec.clone());
Ok(id)
}
fn add_agent_internal(&self, preset_id: String, spec: AgentSpec) -> Result<(), AgentError> {
let mut agents = self.agents.lock().unwrap();
if agents.contains_key(&spec.id) {
return Err(AgentError::AgentAlreadyExists(spec.id.to_string()));
}
let spec_id = spec.id.clone();
let mut agent = agent_new(self.clone(), spec_id.clone(), spec)?;
agent.set_preset_id(preset_id);
agents.insert(spec_id, Arc::new(AsyncMutex::new(agent)));
Ok(())
}
pub fn get_agent(&self, agent_id: &str) -> Option<Arc<AsyncMutex<Box<dyn Agent>>>> {
let agents = self.agents.lock().unwrap();
agents.get(agent_id).cloned()
}
pub async fn add_connection(
&self,
preset_id: &str,
connection: ConnectionSpec,
) -> Result<(), AgentError> {
{
let agents = self.agents.lock().unwrap();
if !agents.contains_key(&connection.source) {
return Err(AgentError::AgentNotFound(connection.source.to_string()));
}
if !agents.contains_key(&connection.target) {
return Err(AgentError::AgentNotFound(connection.target.to_string()));
}
}
if connection.source_handle.is_empty() {
return Err(AgentError::EmptySourceHandle);
}
if connection.target_handle.is_empty() {
return Err(AgentError::EmptyTargetHandle);
}
let preset = self
.get_preset(preset_id)
.ok_or_else(|| AgentError::PresetNotFound(preset_id.to_string()))?;
let mut preset = preset.lock().await;
preset.add_connection(connection.clone());
self.add_connection_internal(connection)?;
Ok(())
}
fn add_connection_internal(&self, connection: ConnectionSpec) -> Result<(), AgentError> {
let mut connections = self.connections.lock().unwrap();
if let Some(targets) = connections.get_mut(&connection.source) {
if targets
.iter()
.any(|(target, source_handle, target_handle)| {
*target == connection.target
&& *source_handle == connection.source_handle
&& *target_handle == connection.target_handle
})
{
return Err(AgentError::ConnectionAlreadyExists);
}
targets.push((
connection.target,
connection.source_handle,
connection.target_handle,
));
} else {
connections.insert(
connection.source,
vec![(
connection.target,
connection.source_handle,
connection.target_handle,
)],
);
}
Ok(())
}
pub async fn add_agents_and_connections(
&self,
preset_id: &str,
agents: &Vec<AgentSpec>,
connections: &Vec<ConnectionSpec>,
) -> Result<(Vec<AgentSpec>, Vec<ConnectionSpec>), AgentError> {
let (agents, connections) = update_ids(agents, connections);
let preset = self
.get_preset(preset_id)
.ok_or_else(|| AgentError::PresetNotFound(preset_id.to_string()))?;
let mut preset = preset.lock().await;
for agent in &agents {
self.add_agent_internal(preset_id.to_string(), agent.clone())?;
preset.add_agent(agent.clone());
}
for connection in &connections {
self.add_connection_internal(connection.clone())?;
preset.add_connection(connection.clone());
}
Ok((agents, connections))
}
pub async fn remove_agent(&self, preset_id: &str, agent_id: &str) -> Result<(), AgentError> {
{
let preset = self
.get_preset(preset_id)
.ok_or_else(|| AgentError::PresetNotFound(preset_id.to_string()))?;
let mut preset = preset.lock().await;
preset.remove_agent(agent_id);
}
if let Err(e) = self.remove_agent_internal(agent_id).await {
return Err(e);
}
Ok(())
}
async fn remove_agent_internal(&self, agent_id: &str) -> Result<(), AgentError> {
self.stop_agent(agent_id).await?;
{
let mut connections = self.connections.lock().unwrap();
let mut sources_to_remove = Vec::new();
for (source, targets) in connections.iter_mut() {
targets.retain(|(target, _, _)| target != agent_id);
if targets.is_empty() {
sources_to_remove.push(source.clone());
}
}
for source in sources_to_remove {
connections.swap_remove(&source);
}
connections.swap_remove(agent_id);
}
{
let mut agents = self.agents.lock().unwrap();
agents.swap_remove(agent_id);
}
Ok(())
}
pub async fn remove_connection(
&self,
preset_id: &str,
connection: &ConnectionSpec,
) -> Result<(), AgentError> {
let preset = self
.get_preset(preset_id)
.ok_or_else(|| AgentError::PresetNotFound(preset_id.to_string()))?;
let mut preset = preset.lock().await;
let Some(connection) = preset.remove_connection(connection) else {
return Err(AgentError::ConnectionNotFound(format!(
"{}:{}->{}:{}",
connection.source,
connection.source_handle,
connection.target,
connection.target_handle
)));
};
self.remove_connection_internal(&connection);
Ok(())
}
fn remove_connection_internal(&self, connection: &ConnectionSpec) {
let mut connections = self.connections.lock().unwrap();
if let Some(targets) = connections.get_mut(&connection.source) {
targets.retain(|(target, source_handle, target_handle)| {
*target != connection.target
|| *source_handle != connection.source_handle
|| *target_handle != connection.target_handle
});
if targets.is_empty() {
connections.swap_remove(&connection.source);
}
}
}
pub async fn start_agent(&self, agent_id: &str) -> Result<(), AgentError> {
let agent = {
let agents = self.agents.lock().unwrap();
let Some(a) = agents.get(agent_id) else {
return Err(AgentError::AgentNotFound(agent_id.to_string()));
};
a.clone()
};
let def_name = {
let agent = agent.lock().await;
agent.def_name().to_string()
};
let uses_native_thread = {
let defs = self.defs.lock().unwrap();
let Some(def) = defs.get(&def_name) else {
return Err(AgentError::AgentDefinitionNotFound(agent_id.to_string()));
};
def.native_thread
};
let agent_status = {
let agent = agent.lock().await;
agent.status().clone()
};
if agent_status == AgentStatus::Init {
log::info!("Starting agent {}", agent_id);
let (tx, mut rx) = mpsc::channel(MESSAGE_LIMIT);
{
let mut agent_txs = self.agent_txs.lock().unwrap();
agent_txs.insert(agent_id.to_string(), tx.clone());
};
let agent_clone = agent.clone();
let agent_id_clone = agent_id.to_string();
let agent_loop = async move {
{
let mut agent_guard = agent_clone.lock().await;
if let Err(e) = agent_guard.start().await {
log::error!("Failed to start agent {}: {}", agent_id_clone, e);
return;
}
}
while let Some(message) = rx.recv().await {
match message {
AgentMessage::Input { ctx, port, value } => {
agent_clone
.lock()
.await
.process(ctx, port, value)
.await
.unwrap_or_else(|e| {
log::error!("Process Error {}: {}", agent_id_clone, e);
});
}
AgentMessage::Config { key, value } => {
agent_clone
.lock()
.await
.set_config(key, value)
.unwrap_or_else(|e| {
log::error!("Config Error {}: {}", agent_id_clone, e);
});
}
AgentMessage::Configs { configs } => {
agent_clone
.lock()
.await
.set_configs(configs)
.unwrap_or_else(|e| {
log::error!("Configs Error {}: {}", agent_id_clone, e);
});
}
AgentMessage::Stop => {
rx.close();
break;
}
}
}
};
if uses_native_thread {
std::thread::spawn(move || {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
rt.block_on(agent_loop);
});
} else {
tokio::spawn(agent_loop);
}
}
Ok(())
}
pub async fn stop_agent(&self, agent_id: &str) -> Result<(), AgentError> {
{
let mut agent_txs = self.agent_txs.lock().unwrap();
if let Some(tx) = agent_txs.swap_remove(agent_id) {
if let Err(e) = tx.try_send(AgentMessage::Stop) {
log::warn!("Failed to send stop message to agent {}: {}", agent_id, e);
}
}
}
let agent = {
let agents = self.agents.lock().unwrap();
let Some(a) = agents.get(agent_id) else {
return Err(AgentError::AgentNotFound(agent_id.to_string()));
};
a.clone()
};
let mut agent_guard = agent.lock().await;
if *agent_guard.status() == AgentStatus::Start {
log::info!("Stopping agent {}", agent_id);
agent_guard.stop().await?;
}
Ok(())
}
pub async fn set_agent_configs(
&self,
agent_id: String,
configs: AgentConfigs,
) -> Result<(), AgentError> {
let tx = {
let agent_txs = self.agent_txs.lock().unwrap();
agent_txs.get(&agent_id).cloned()
};
let Some(tx) = tx else {
let agent = {
let agents = self.agents.lock().unwrap();
let Some(a) = agents.get(&agent_id) else {
return Err(AgentError::AgentNotFound(agent_id.to_string()));
};
a.clone()
};
agent.lock().await.set_configs(configs.clone())?;
return Ok(());
};
let message = AgentMessage::Configs { configs };
tx.send(message).await.map_err(|_| {
AgentError::SendMessageFailed("Failed to send config message".to_string())
})?;
Ok(())
}
pub fn get_global_configs(&self, def_name: &str) -> Option<AgentConfigs> {
let global_configs_map = self.global_configs_map.lock().unwrap();
global_configs_map.get(def_name).cloned()
}
pub fn set_global_configs(&self, def_name: String, configs: AgentConfigs) {
let mut global_configs_map = self.global_configs_map.lock().unwrap();
let Some(existing_configs) = global_configs_map.get_mut(&def_name) else {
global_configs_map.insert(def_name, configs);
return;
};
for (key, value) in configs {
existing_configs.set(key, value);
}
}
pub fn get_global_configs_map(&self) -> AgentConfigsMap {
let global_configs_map = self.global_configs_map.lock().unwrap();
global_configs_map.clone()
}
pub fn set_global_configs_map(&self, new_configs_map: AgentConfigsMap) {
for (agent_name, new_configs) in new_configs_map {
self.set_global_configs(agent_name, new_configs);
}
}
pub(crate) async fn agent_input(
&self,
agent_id: String,
ctx: AgentContext,
port: String,
value: AgentValue,
) -> Result<(), AgentError> {
let message = if port.starts_with("config:") {
let config_key = port[7..].to_string();
AgentMessage::Config {
key: config_key,
value,
}
} else {
AgentMessage::Input {
ctx,
port: port.clone(),
value,
}
};
let tx = {
let agent_txs = self.agent_txs.lock().unwrap();
agent_txs.get(&agent_id).cloned()
};
let Some(tx) = tx else {
let agent: Arc<AsyncMutex<Box<dyn Agent>>> = {
let agents = self.agents.lock().unwrap();
let Some(a) = agents.get(&agent_id) else {
return Err(AgentError::AgentNotFound(agent_id.to_string()));
};
a.clone()
};
if let AgentMessage::Config { key, value } = message {
agent.lock().await.set_config(key, value)?;
}
return Ok(());
};
tx.send(message).await.map_err(|_| {
AgentError::SendMessageFailed("Failed to send input message".to_string())
})?;
self.emit_agent_input(agent_id.to_string(), port);
Ok(())
}
pub async fn send_agent_out(
&self,
agent_id: String,
ctx: AgentContext,
port: String,
value: AgentValue,
) -> Result<(), AgentError> {
message::send_agent_out(self, agent_id, ctx, port, value).await
}
pub fn try_send_agent_out(
&self,
agent_id: String,
ctx: AgentContext,
port: String,
value: AgentValue,
) -> Result<(), AgentError> {
message::try_send_agent_out(self, agent_id, ctx, port, value)
}
pub async fn write_board_value(
&self,
name: String,
value: AgentValue,
) -> Result<(), AgentError> {
self.send_board_out(name, AgentContext::new(), value).await
}
pub async fn write_var_value(
&self,
preset_id: &str,
name: &str,
value: AgentValue,
) -> Result<(), AgentError> {
let var_name = format!("%{}/{}", preset_id, name);
self.send_board_out(var_name, AgentContext::new(), value)
.await
}
pub(crate) async fn send_board_out(
&self,
name: String,
ctx: AgentContext,
value: AgentValue,
) -> Result<(), AgentError> {
message::send_board_out(self, name, ctx, value).await
}
async fn spawn_message_loop(&self) -> Result<(), AgentError> {
let (tx, mut rx) = mpsc::channel(4096);
{
let mut tx_lock = self.tx.lock().unwrap();
*tx_lock = Some(tx);
}
let mak = self.clone();
tokio::spawn(async move {
while let Some(message) = rx.recv().await {
use AgentEventMessage::*;
match message {
AgentOut {
agent,
ctx,
port,
value,
} => {
message::agent_out(&mak, agent, ctx, port, value).await;
}
BoardOut { name, ctx, value } => {
message::board_out(&mak, name, ctx, value).await;
}
}
}
});
tokio::task::yield_now().await;
Ok(())
}
pub fn subscribe(&self) -> broadcast::Receiver<MAKEvent> {
self.observers.subscribe()
}
pub fn subscribe_to_event<F, T>(&self, mut filter_map: F) -> mpsc::UnboundedReceiver<T>
where
F: FnMut(MAKEvent) -> Option<T> + Send + 'static,
T: Send + 'static,
{
let (tx, rx) = mpsc::unbounded_channel();
let mut event_rx = self.subscribe();
tokio::spawn(async move {
loop {
match event_rx.recv().await {
Ok(event) => {
if let Some(mapped_event) = filter_map(event) {
if tx.send(mapped_event).is_err() {
break;
}
}
}
Err(RecvError::Lagged(n)) => {
log::warn!("Event subscriber lagged by {} events", n);
}
Err(RecvError::Closed) => {
break;
}
}
}
});
rx
}
pub(crate) fn emit_agent_config_updated(
&self,
agent_id: String,
key: String,
value: AgentValue,
) {
self.notify_observers(MAKEvent::AgentConfigUpdated(agent_id, key, value));
}
pub(crate) fn emit_agent_error(&self, agent_id: String, message: String) {
self.notify_observers(MAKEvent::AgentError(agent_id, message));
}
pub(crate) fn emit_agent_input(&self, agent_id: String, port: String) {
self.notify_observers(MAKEvent::AgentIn(agent_id, port));
}
pub(crate) fn emit_agent_spec_updated(&self, agent_id: String) {
self.notify_observers(MAKEvent::AgentSpecUpdated(agent_id));
}
pub(crate) fn emit_board(&self, name: String, value: AgentValue) {
self.notify_observers(MAKEvent::Board(name, value));
}
fn notify_observers(&self, event: MAKEvent) {
let _ = self.observers.send(event);
}
}
#[derive(Clone, Debug)]
pub enum MAKEvent {
AgentConfigUpdated(String, String, AgentValue), AgentError(String, String), AgentIn(String, String), AgentSpecUpdated(String), Board(String, AgentValue), }