use super::AgentLoop;
use crate::runtime_control::RuntimeControlCommand;
use agent_diva_core::bus::{AgentEvent, InboundMessage};
use tokio::sync::mpsc;
use tokio::sync::mpsc::error::TryRecvError;
use tracing::info;
impl AgentLoop {
pub(super) async fn handle_runtime_control_command(&mut self, cmd: RuntimeControlCommand) {
match cmd {
RuntimeControlCommand::UpdateNetwork(network) => {
self.apply_network_config(network).await;
}
RuntimeControlCommand::UpdateMcp { servers } => {
self.apply_mcp_config(servers).await;
}
RuntimeControlCommand::StopSession { session_key } => {
self.cancelled_sessions.insert(session_key);
}
RuntimeControlCommand::ResetSession { session_key } => {
if let Err(e) = self.sessions.archive_and_reset(&session_key) {
tracing::error!("Failed to archive and reset session: {}", e);
} else {
info!("Archived and reset session: {}", session_key);
}
}
RuntimeControlCommand::GetSessions { reply_tx } => {
let sessions = self.sessions.list_sessions();
let _ = reply_tx.send(sessions);
}
RuntimeControlCommand::GetSession {
session_key,
reply_tx,
} => {
let session = self.sessions.get_or_load(&session_key).cloned();
let _ = reply_tx.send(session);
}
RuntimeControlCommand::DeleteSession {
session_key,
reply_tx,
} => {
let result = self
.sessions
.delete(&session_key)
.map_err(|e| e.to_string());
match &result {
Ok(deleted) => {
info!(
session_key = %session_key,
deleted = *deleted,
"Runtime delete session completed"
);
}
Err(err) => {
tracing::error!(
session_key = %session_key,
error = %err,
"Runtime delete session failed"
);
}
}
let _ = reply_tx.send(result);
}
}
}
pub(super) async fn drain_runtime_control_commands(&mut self) {
while let Some(rx) = self.runtime_control_rx.as_mut() {
let cmd = match rx.try_recv() {
Ok(cmd) => cmd,
Err(TryRecvError::Empty) => break,
Err(TryRecvError::Disconnected) => {
info!("Runtime control channel closed");
self.runtime_control_rx = None;
break;
}
};
self.handle_runtime_control_command(cmd).await;
}
}
pub(super) fn is_session_cancelled(&self, session_key: &str) -> bool {
self.cancelled_sessions.contains(session_key)
}
pub(super) fn clear_session_cancellation(&mut self, session_key: &str) {
self.cancelled_sessions.remove(session_key);
}
pub(super) fn emit_error_event(
&self,
msg: &InboundMessage,
event_tx: Option<&mpsc::UnboundedSender<AgentEvent>>,
message: impl Into<String>,
) {
let event = AgentEvent::Error {
message: message.into(),
};
if let Some(tx) = event_tx {
let _ = tx.send(event.clone());
}
let _ = self
.bus
.publish_event(msg.channel.clone(), msg.chat_id.clone(), event);
}
}