use crate::runtime::TaskIntrinsicTool;
use super::*;
impl RuntimeHandle {
fn build_command_request(
&self,
agent_id: &str,
command: String,
requested_timeout: Option<Duration>,
cwd: PathBuf,
background: bool,
) -> Result<(AgentExecutionConfig, CommandRequest), String> {
let config = self.agent_config(agent_id)?;
if let Err(detail) =
self.execution
.policy
.authorize_command_execution(&config.base_dir, &cwd, background)
{
let _ = self.emit_hook(RuntimeHookEvent::AuthorizationDenied {
agent_id: agent_id.to_string(),
action: if background {
"background_command".to_string()
} else {
"shell_command".to_string()
},
detail: detail.clone(),
});
return Err(detail);
}
let command_request = CommandRequest {
spec: CommandSpec::Shell { command },
cwd,
timeout: self.execution.policy.effective_timeout(requested_timeout),
env: self.execution.policy.allowed_environment(),
max_output_bytes_per_stream: self.execution.policy.max_output_bytes_per_stream,
};
Ok((config, command_request))
}
pub fn start_background_task(
&self,
agent_id: &str,
command: String,
_justification: Option<String>,
requested_timeout: Option<Duration>,
cwd: PathBuf,
) -> Result<BackgroundTaskSummary, String> {
let (_config, command_request) =
self.build_command_request(agent_id, command, requested_timeout, cwd, true)?;
if let Some(limit) = self.execution.policy.background_task_limit
&& self
.collaboration
.background_tasks
.running_task_count(agent_id)
>= limit
{
let detail = format!("Background task limit of {limit} reached");
let _ = self.emit_hook(RuntimeHookEvent::AuthorizationDenied {
agent_id: agent_id.to_string(),
action: "background_limit".to_string(),
detail: detail.clone(),
});
return Err(detail);
}
self.collaboration
.background_tasks
.start_task(agent_id, command_request)
}
pub fn check_background_task(
&self,
agent_id: &str,
task_id: Option<&str>,
) -> Result<String, String> {
self.collaboration
.background_tasks
.check_task(agent_id, task_id)
}
pub fn drain_background_notifications(&self, agent_id: &str) -> Vec<BackgroundNotification> {
self.collaboration
.background_tasks
.drain_notifications(agent_id)
}
pub fn has_deliverable_background_notifications(&self, agent_id: &str) -> bool {
self.collaboration
.background_tasks
.has_deliverable_notifications(agent_id)
}
pub fn requeue_background_notifications(
&self,
agent_id: &str,
notifications: Vec<BackgroundNotification>,
) {
self.collaboration
.background_tasks
.requeue_notifications(agent_id, notifications);
}
pub fn acknowledge_background_notifications(&self, agent_id: &str) {
self.collaboration
.background_tasks
.acknowledge_notifications(agent_id);
}
pub fn spawn_teammate_actor(
&self,
team_dir: &Path,
teammate_name: &str,
agent: std::sync::Arc<tokio::sync::Mutex<crate::Agent>>,
) -> Result<crate::team::TeammateActorHandle, RuntimeError> {
Ok(self.collaboration.teammate_host.spawn_teammate(
self.collaboration.team.clone(),
team_dir.to_path_buf(),
teammate_name.to_string(),
agent,
))
}
pub fn register_teammate(
&self,
team_dir: &Path,
summary: TeamMemberSummary,
actor: crate::team::TeammateActorHandle,
) -> Result<TeamMemberSummary, RuntimeError> {
self.collaboration
.team
.spawn_teammate(team_dir, summary, actor)
}
pub fn wake_teammate(&self, team_dir: &Path, teammate_name: &str) -> Result<(), RuntimeError> {
self.collaboration
.team
.wake_teammate(team_dir, teammate_name)
}
pub fn send_team_message(
&self,
team_dir: &Path,
sender: &str,
to: &str,
content: String,
) -> Result<TeamDispatch, RuntimeError> {
self.collaboration
.team
.send_message(team_dir, sender, to, content)
}
pub fn broadcast_team_message(
&self,
team_dir: &Path,
sender: &str,
content: String,
) -> Result<Vec<TeamDispatch>, RuntimeError> {
self.collaboration
.team
.broadcast_message(team_dir, sender, content)
}
pub fn read_team_inbox(
&self,
team_dir: &Path,
agent_name: &str,
) -> Result<Vec<TeamMessage>, RuntimeError> {
self.collaboration.team.read_inbox(team_dir, agent_name)
}
pub fn requeue_team_messages(
&self,
team_dir: &Path,
agent_name: &str,
messages: Vec<TeamMessage>,
) -> Result<(), RuntimeError> {
self.collaboration
.team
.requeue_messages(team_dir, agent_name, messages)
}
pub fn acknowledge_team_messages(
&self,
team_dir: &Path,
agent_name: &str,
) -> Result<(), RuntimeError> {
self.collaboration
.team
.acknowledge_messages(team_dir, agent_name)
}
pub fn create_team_request(
&self,
team_dir: &Path,
sender: &str,
to: &str,
protocol: String,
content: String,
) -> Result<TeamProtocolRequestSummary, RuntimeError> {
self.collaboration
.team
.create_request(team_dir, sender, to, protocol, content)
}
pub fn resolve_team_request(
&self,
team_dir: &Path,
responder: &str,
request_id: &str,
approve: bool,
reason: Option<String>,
) -> Result<TeamProtocolRequestSummary, RuntimeError> {
self.collaboration
.team
.resolve_request(team_dir, responder, request_id, approve, reason)
}
pub fn list_team_requests(
&self,
team_dir: &Path,
agent_name: &str,
filter: TeamRequestFilter,
) -> Result<Vec<TeamProtocolRequestSummary>, RuntimeError> {
self.collaboration
.team
.list_requests(team_dir, agent_name, filter)
}
pub fn execute_task_mutation(
&self,
tool: &TaskIntrinsicTool,
input: serde_json::Value,
dir: &Path,
access: TaskAccess<'_>,
) -> Result<String, String> {
task::execute_with_store(self.persistence.store.as_ref(), tool, input, dir, access)
}
pub async fn execute_shell_command(
&self,
agent_id: &str,
command: String,
_justification: Option<String>,
requested_timeout: Option<Duration>,
cwd: PathBuf,
) -> Result<CommandOutput, String> {
let (_config, command_request) =
self.build_command_request(agent_id, command, requested_timeout, cwd, false)?;
self.execution.executor.run(command_request).await
}
pub async fn read_file(
&self,
agent_id: &str,
path: &str,
max_lines: Option<usize>,
) -> Result<String, String> {
let config = self.agent_config(agent_id)?;
let resolved = match self
.execution
.policy
.authorize_file_read(&config.base_dir, Path::new(path))
{
Ok(path) => path,
Err(detail) => {
let _ = self.emit_hook(RuntimeHookEvent::AuthorizationDenied {
agent_id: agent_id.to_string(),
action: "read_file".to_string(),
detail: detail.clone(),
});
return Err(detail);
}
};
read_limited_file(&resolved, max_lines).await
}
pub fn resolve_working_directory(
&self,
agent_id: &str,
explicit_directory: Option<&str>,
) -> Result<PathBuf, String> {
let config = self.agent_config(agent_id)?;
if let Some(directory) = explicit_directory {
return Ok(resolve_path(&config.base_dir, directory));
}
if !config.auto_route_shell {
return Ok(config.base_dir);
}
let tasks = self
.persistence
.store
.load_tasks(&config.tasks_dir)
.map_err(|error| error.to_string())?;
let owned = tasks
.into_iter()
.filter(|task| {
config.is_teammate
&& task.owner == config.name
&& !matches!(task.status, crate::runtime::TaskStatus::Completed)
})
.collect::<Vec<_>>();
let directories = owned
.iter()
.filter_map(|task| task.working_directory.as_deref())
.map(|path| resolve_path(&config.base_dir, path))
.collect::<BTreeSet<_>>();
if directories.is_empty() {
return Ok(config.base_dir);
}
if directories.len() > 1 {
return Err(
"Multiple owned task directories are active. Pass workingDirectory explicitly."
.to_string(),
);
}
Ok(directories.into_iter().next().expect("one directory"))
}
pub fn default_working_directory(&self, agent_id: &str) -> PathBuf {
self.agent_contexts
.read()
.expect("agent context registry poisoned")
.get(agent_id)
.map(|config| config.base_dir.clone())
.unwrap_or_else(|| PathBuf::from("."))
}
pub fn emit_hook(&self, event: RuntimeHookEvent) -> Result<(), RuntimeError> {
self.execution
.hooks
.emit(self.persistence.store.as_ref(), &event)
}
}
fn resolve_path(base_dir: &Path, path: &str) -> PathBuf {
let candidate = PathBuf::from(path);
if candidate.is_absolute() {
candidate
} else {
base_dir.join(candidate)
}
}