use meerkat_client::{LlmClient, LlmClientAdapter};
#[cfg(feature = "comms")]
use meerkat_comms::agent::wrap_with_comms;
#[cfg(feature = "comms")]
use meerkat_comms::runtime::{CommsBootstrap, CommsRuntime, CoreCommsConfig, ParentCommsContext};
#[cfg(feature = "comms")]
use meerkat_comms::{PubKey, TrustedPeer, TrustedPeers};
use meerkat_core::ops::{OperationId, OperationResult};
use meerkat_core::session::Session;
use meerkat_core::sub_agent::SubAgentManager;
use meerkat_core::types::{Message, SystemMessage, UserMessage};
use meerkat_core::{AgentBuilder, AgentSessionStore, AgentToolDispatcher, BudgetLimits};
#[cfg(feature = "comms")]
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Instant;
#[cfg(feature = "comms")]
use tokio::sync::RwLock;
#[cfg(feature = "comms")]
#[derive(Debug, Clone)]
pub struct SubAgentCommsConfig {
pub name: String,
pub base_dir: PathBuf,
pub parent_context: ParentCommsContext,
}
#[derive(Debug)]
pub struct SubAgentHandle {
pub id: OperationId,
pub name: String,
pub child_pubkey: [u8; 32],
pub child_addr: String,
}
#[cfg(feature = "comms")]
pub fn create_child_trusted_peers(parent_context: &ParentCommsContext) -> TrustedPeers {
TrustedPeers {
peers: vec![TrustedPeer {
name: parent_context.parent_name.clone(),
pubkey: PubKey::new(parent_context.parent_pubkey),
addr: parent_context.parent_addr.clone(),
}],
}
}
#[cfg(feature = "comms")]
pub fn create_child_comms_config(child_name: &str, base_dir: &std::path::Path) -> CoreCommsConfig {
CoreCommsConfig {
enabled: true,
name: child_name.to_string(),
listen_uds: Some(base_dir.join(format!("{}.sock", child_name))),
listen_tcp: None,
identity_dir: base_dir.join(format!("{}/identity", child_name)),
trusted_peers_path: base_dir.join(format!("{}/trusted_peers.json", child_name)),
ack_timeout_secs: 30,
max_message_bytes: 1_048_576,
}
}
pub fn create_spawn_session(prompt: &str, system_prompt: Option<&str>) -> Session {
let mut session = Session::new();
if let Some(sys) = system_prompt {
session.push(Message::System(SystemMessage {
content: sys.to_string(),
}));
}
session.push(Message::User(UserMessage {
content: prompt.to_string(),
}));
session
}
pub fn create_fork_session(parent_session: &Session, fork_prompt: &str) -> Session {
let mut session = parent_session.clone();
session.push(Message::User(UserMessage {
content: fork_prompt.to_string(),
}));
session
}
#[cfg(feature = "comms")]
pub async fn setup_child_comms(
config: &SubAgentCommsConfig,
) -> Result<(CommsRuntime, [u8; 32], String), SubAgentRunnerError> {
let trusted_peers = create_child_trusted_peers(&config.parent_context);
let comms_config = create_child_comms_config(&config.name, &config.base_dir);
let resolved_config = comms_config.resolve_paths(&config.base_dir);
tokio::fs::create_dir_all(&resolved_config.identity_dir)
.await
.map_err(|e| {
SubAgentRunnerError::CommsSetup(format!("Failed to create identity dir: {}", e))
})?;
if let Some(parent) = resolved_config.trusted_peers_path.parent() {
tokio::fs::create_dir_all(parent)
.await
.map_err(|e| SubAgentRunnerError::CommsSetup(format!("Failed to create dir: {}", e)))?;
}
let trusted_peers_path = resolved_config.trusted_peers_path.clone();
trusted_peers.save(&trusted_peers_path).await.map_err(|e| {
SubAgentRunnerError::CommsSetup(format!("Failed to save trusted peers: {}", e))
})?;
let mut runtime = CommsRuntime::new(resolved_config.clone())
.await
.map_err(|e| {
SubAgentRunnerError::CommsSetup(format!("Failed to create comms runtime: {}", e))
})?;
let child_pubkey = *runtime.public_key().as_bytes();
let child_addr = format!(
"uds://{}",
config
.base_dir
.join(format!("{}.sock", config.name))
.display()
);
runtime.start_listeners().await.map_err(|e| {
SubAgentRunnerError::CommsSetup(format!("Failed to start listeners: {}", e))
})?;
Ok((runtime, child_pubkey, child_addr))
}
#[cfg(feature = "comms")]
pub fn create_child_peer_entry(
child_name: &str,
child_pubkey: [u8; 32],
child_addr: &str,
) -> TrustedPeer {
TrustedPeer {
name: child_name.to_string(),
pubkey: PubKey::new(child_pubkey),
addr: child_addr.to_string(),
}
}
pub struct SubAgentSpec<C, T, S>
where
C: LlmClient + 'static,
T: AgentToolDispatcher + 'static,
S: AgentSessionStore + 'static,
{
pub client: Arc<C>,
pub model: String,
pub tools: Arc<T>,
pub store: Arc<S>,
pub session: Session,
pub budget: Option<BudgetLimits>,
pub depth: u32,
pub system_prompt: Option<String>,
}
pub struct DynSubAgentSpec {
pub client: Arc<dyn LlmClient>,
pub model: String,
pub tools: Arc<dyn AgentToolDispatcher>,
pub store: Arc<dyn AgentSessionStore>,
pub session: Session,
pub budget: Option<BudgetLimits>,
pub depth: u32,
pub system_prompt: Option<String>,
#[cfg(feature = "comms")]
pub comms_config: Option<SubAgentCommsConfig>,
#[cfg(feature = "comms")]
pub parent_trusted_peers: Option<Arc<RwLock<TrustedPeers>>>,
pub host_mode: bool,
}
pub async fn spawn_sub_agent_dyn(
id: OperationId,
name: String,
spec: DynSubAgentSpec,
manager: Arc<SubAgentManager>,
) -> Result<(), SubAgentRunnerError> {
use meerkat_core::sub_agent::SubAgentCommsInfo;
let started_at = Instant::now();
let client: Arc<dyn LlmClient> = spec.client;
let llm_adapter = Arc::new(LlmClientAdapter::new(client, spec.model.clone()));
let mut builder = AgentBuilder::new()
.model(&spec.model)
.depth(spec.depth)
.resume_session(spec.session);
if let Some(sys_prompt) = &spec.system_prompt {
builder = builder.system_prompt(sys_prompt);
}
if let Some(budget) = spec.budget {
builder = builder.budget(budget);
}
#[cfg(feature = "comms")]
let (tools, comms_info) = if let Some(comms_config) = spec.comms_config {
let bootstrap = CommsBootstrap::for_child_inproc(
comms_config.name.clone(),
comms_config.parent_context.clone(),
);
match bootstrap.prepare().await {
Ok(Some(prepared)) => {
let tools_with_comms = wrap_with_comms(spec.tools.clone(), &prepared.runtime);
let comms_info = prepared.advertise.map(|adv| SubAgentCommsInfo {
pubkey: adv.pubkey,
addr: adv.addr,
});
if let (Some(info), Some(parent_peers)) = (&comms_info, &spec.parent_trusted_peers)
{
let child_peer = TrustedPeer {
name: name.clone(),
pubkey: PubKey::new(info.pubkey),
addr: info.addr.clone(),
};
let mut peers = parent_peers.write().await;
peers.upsert(child_peer);
tracing::debug!("Added sub-agent '{}' to parent's trusted peers", name);
}
builder = builder.with_comms_runtime(Arc::new(prepared.runtime));
(tools_with_comms, comms_info)
}
Ok(None) => {
(spec.tools, None)
}
Err(e) => {
tracing::warn!("Failed to set up comms for sub-agent '{}': {}", name, e);
(spec.tools, None)
}
}
} else {
(spec.tools, None)
};
#[cfg(not(feature = "comms"))]
let (tools, comms_info) = (spec.tools, None);
let mut agent = builder.build(llm_adapter, tools, spec.store).await;
manager
.register_with_comms(id.clone(), name.clone(), comms_info)
.await
.map_err(|e| {
SubAgentRunnerError::ExecutionError(format!("Failed to register sub-agent: {}", e))
})?;
let id_for_task = id.clone();
let _name_for_task = name.clone();
let manager_for_task = manager.clone();
let host_mode = spec.host_mode;
tokio::spawn(async move {
let result = if host_mode {
agent.run_host_mode(String::new()).await
} else {
agent.run_pending().await
};
let duration_ms = started_at.elapsed().as_millis() as u64;
match result {
Ok(run_result) => {
manager_for_task
.complete(
&id_for_task,
OperationResult {
id: id_for_task.clone(),
content: run_result.text,
is_error: false,
duration_ms,
tokens_used: run_result.usage.total_tokens(),
},
)
.await;
}
Err(e) => {
manager_for_task.fail(&id_for_task, e.to_string()).await;
}
}
});
Ok(())
}
pub async fn spawn_sub_agent<C, T, S>(
id: OperationId,
name: String,
spec: SubAgentSpec<C, T, S>,
manager: Arc<SubAgentManager>,
) -> Result<(), SubAgentRunnerError>
where
C: LlmClient + 'static,
T: AgentToolDispatcher + 'static,
S: AgentSessionStore + 'static,
{
let started_at = Instant::now();
let llm_adapter = Arc::new(LlmClientAdapter::new(spec.client, spec.model.clone()));
let mut builder = AgentBuilder::new()
.model(&spec.model)
.depth(spec.depth)
.resume_session(spec.session);
if let Some(sys_prompt) = &spec.system_prompt {
builder = builder.system_prompt(sys_prompt);
}
if let Some(budget) = spec.budget {
builder = builder.budget(budget);
}
let mut agent = builder.build(llm_adapter, spec.tools, spec.store).await;
manager
.register(id.clone(), name.clone())
.await
.map_err(|e| {
SubAgentRunnerError::ExecutionError(format!("Failed to register sub-agent: {}", e))
})?;
let id_for_task = id.clone();
let _name_for_task = name.clone(); let manager_for_task = manager.clone();
tokio::spawn(async move {
let result = agent.run_pending().await;
let duration_ms = started_at.elapsed().as_millis() as u64;
match result {
Ok(run_result) => {
manager_for_task
.complete(
&id_for_task,
OperationResult {
id: id_for_task.clone(),
content: run_result.text,
is_error: false,
duration_ms,
tokens_used: run_result.usage.total_tokens(),
},
)
.await;
}
Err(e) => {
manager_for_task.fail(&id_for_task, e.to_string()).await;
}
}
});
Ok(())
}
#[derive(Debug, thiserror::Error)]
pub enum SubAgentRunnerError {
#[error("Comms setup failed: {0}")]
CommsSetup(String),
#[error("Agent execution error: {0}")]
ExecutionError(String),
#[error("Session error: {0}")]
SessionError(String),
#[error("LLM client error: {0}")]
LlmError(String),
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used)]
mod tests {
use super::*;
#[test]
fn test_create_spawn_session() {
let session = create_spawn_session("Do this task", None);
assert_eq!(session.messages().len(), 1);
match &session.messages()[0] {
Message::User(u) => assert_eq!(u.content, "Do this task"),
_ => unreachable!("Expected User message"),
}
}
#[test]
fn test_create_spawn_session_with_system() {
let session = create_spawn_session("Do this task", Some("You are helpful"));
assert_eq!(session.messages().len(), 2);
match &session.messages()[0] {
Message::System(s) => assert_eq!(s.content, "You are helpful"),
_ => unreachable!("Expected System message"),
}
match &session.messages()[1] {
Message::User(u) => assert_eq!(u.content, "Do this task"),
_ => unreachable!("Expected User message"),
}
}
#[test]
fn test_create_fork_session() {
let mut parent = Session::new();
parent.push(Message::User(UserMessage {
content: "Original prompt".to_string(),
}));
let forked = create_fork_session(&parent, "Continue with this");
assert_eq!(forked.messages().len(), 2);
match &forked.messages()[1] {
Message::User(u) => assert_eq!(u.content, "Continue with this"),
_ => unreachable!("Expected User message"),
}
}
#[cfg(feature = "comms")]
#[test]
fn test_create_child_trusted_peers() {
let parent_context = ParentCommsContext {
parent_name: "parent-agent".to_string(),
parent_pubkey: [42u8; 32],
parent_addr: "uds:///tmp/parent.sock".to_string(),
comms_base_dir: PathBuf::from("/tmp/comms"),
};
let trusted = create_child_trusted_peers(&parent_context);
assert_eq!(trusted.peers.len(), 1);
assert_eq!(trusted.peers[0].name, "parent-agent");
assert_eq!(*trusted.peers[0].pubkey.as_bytes(), [42u8; 32]);
}
#[cfg(feature = "comms")]
#[test]
fn test_create_child_comms_config() {
let base_dir = PathBuf::from("/tmp/agents");
let config = create_child_comms_config("child-1", &base_dir);
assert!(config.enabled);
assert_eq!(config.name, "child-1");
assert!(config.listen_uds.is_some());
assert!(config.listen_tcp.is_none());
}
#[cfg(feature = "comms")]
#[test]
fn test_create_child_peer_entry() {
let peer = create_child_peer_entry("child-1", [1u8; 32], "uds:///tmp/child.sock");
assert_eq!(peer.name, "child-1");
assert_eq!(*peer.pubkey.as_bytes(), [1u8; 32]);
assert_eq!(peer.addr, "uds:///tmp/child.sock");
}
#[test]
fn test_sub_agent_runner_error_display() {
let err = SubAgentRunnerError::CommsSetup("test error".to_string());
assert!(err.to_string().contains("Comms setup failed"));
assert!(err.to_string().contains("test error"));
let err = SubAgentRunnerError::ExecutionError("runtime error".to_string());
assert!(err.to_string().contains("execution error"));
}
}