use crate::kernel::discovery::CapabilityRegistry;
use crate::kernel::logging::init_and_store_logging;
use crate::kernel::KernelConfig;
use crate::messages::{
AgentMessage, AgentSpawned, AnnounceCapabilities, CapableAgentFound, DelegateTask,
FindCapableAgent, GetAgentStatus, IncomingAgentMessage, IncomingTask, RouteMessage, SpawnAgent,
StopAgent, SystemEvent,
};
use acton_reactive::prelude::*;
use std::collections::HashMap;
#[derive(Debug, Clone, Default)]
pub struct KernelMetrics {
pub agents_spawned: usize,
pub agents_stopped: usize,
pub messages_routed: usize,
}
#[acton_message]
pub struct InitKernel {
pub config: KernelConfig,
}
#[acton_actor]
pub struct Kernel {
pub config: KernelConfig,
pub agents: HashMap<String, ActorHandle>,
pub metrics: KernelMetrics,
pub shutting_down: bool,
pub capability_registry: CapabilityRegistry,
}
impl Kernel {
pub async fn spawn(runtime: &mut ActorRuntime) -> ActorHandle {
Self::spawn_with_config(runtime, KernelConfig::default()).await
}
pub async fn spawn_with_config(
runtime: &mut ActorRuntime,
config: KernelConfig,
) -> ActorHandle {
if let Some(ref logging_config) = config.logging {
match init_and_store_logging(logging_config) {
Ok(true) => {
if let Ok(log_dir) = crate::kernel::logging::get_log_dir(logging_config) {
tracing::info!(
log_dir = %log_dir.display(),
app_name = %logging_config.app_name,
"File logging initialized"
);
}
}
Ok(false) => {
}
Err(e) => {
eprintln!("Warning: file logging initialization failed: {e}");
}
}
}
let mut builder = runtime.new_actor_with_name::<Kernel>("kernel".to_string());
let kernel_config = config.clone();
builder
.before_start(|_actor| {
tracing::debug!("Kernel initializing");
Reply::ready()
})
.after_start(|actor| {
tracing::info!(
max_agents = actor.model.config.max_agents,
"Kernel ready to accept agent spawn requests"
);
Reply::ready()
})
.before_stop(|actor| {
tracing::info!(
active_agents = actor.model.agents.len(),
total_spawned = actor.model.metrics.agents_spawned,
"Kernel shutting down"
);
Reply::ready()
});
configure_handlers(&mut builder);
let handle = builder.start().await;
handle
.send(InitKernel {
config: kernel_config,
})
.await;
handle
}
}
fn configure_handlers(builder: &mut ManagedActor<Idle, Kernel>) {
builder.mutate_on::<InitKernel>(|actor, envelope| {
actor.model.config = envelope.message().config.clone();
actor.model.shutting_down = false;
tracing::info!(
max_agents = actor.model.config.max_agents,
metrics_enabled = actor.model.config.enable_metrics,
"Kernel configured"
);
Reply::ready()
});
builder.mutate_on::<SpawnAgent>(|actor, envelope| {
let config = envelope.message().config.clone();
let reply = envelope.reply_envelope();
if actor.model.shutting_down {
tracing::warn!("Rejecting spawn request - kernel is shutting down");
return Reply::ready();
}
if actor.model.agents.len() >= actor.model.config.max_agents {
tracing::warn!(
current = actor.model.agents.len(),
max = actor.model.config.max_agents,
"Rejecting spawn request - agent limit reached"
);
return Reply::ready();
}
let agent_id = config.agent_id();
tracing::info!(
agent_id = %agent_id,
name = ?config.name,
"Spawning new agent"
);
let spawned_id = agent_id.clone();
actor.model.metrics.agents_spawned += 1;
let broker = actor.broker().clone();
Reply::pending(async move {
broker
.broadcast(SystemEvent::AgentSpawned {
id: spawned_id.clone(),
})
.await;
reply
.send(AgentSpawned {
agent_id: spawned_id,
})
.await;
})
});
builder.mutate_on::<StopAgent>(|actor, envelope| {
let agent_id = &envelope.message().agent_id;
let agent_id_str = agent_id.to_string();
if let Some(handle) = actor.model.agents.remove(&agent_id_str) {
tracing::info!(agent_id = %agent_id, "Stopping agent");
actor.model.metrics.agents_stopped += 1;
let broker = actor.broker().clone();
let stopped_id = agent_id.clone();
Reply::pending(async move {
broker
.broadcast(SystemEvent::AgentStopped {
id: stopped_id,
reason: "requested".to_string(),
})
.await;
let _ = handle.stop().await;
})
} else {
tracing::warn!(agent_id = %agent_id, "Agent not found for stop request");
Reply::ready()
}
});
builder.mutate_on::<RouteMessage>(|actor, envelope| {
let msg = envelope.message();
let to_str = msg.to.to_string();
if let Some(target_handle) = actor.model.agents.get(&to_str) {
tracing::debug!(
from = %msg.from,
to = %msg.to,
payload_length = msg.payload.len(),
"Routing message between agents"
);
actor.model.metrics.messages_routed += 1;
let handle = target_handle.clone();
let payload = msg.payload.clone();
let from = msg.from.clone();
Reply::pending(async move {
tracing::debug!(
from = %from,
payload = %payload,
"Message routed to agent"
);
drop(handle);
})
} else {
tracing::warn!(
from = %msg.from,
to = %msg.to,
"Cannot route message - target agent not found"
);
Reply::ready()
}
});
builder.act_on::<GetAgentStatus>(|actor, envelope| {
let agent_id = &envelope.message().agent_id;
let agent_id_str = agent_id.to_string();
if let Some(agent_handle) = actor.model.agents.get(&agent_id_str) {
let handle = agent_handle.clone();
let id = agent_id.clone();
Reply::pending(async move {
handle.send(GetAgentStatus { agent_id: id }).await;
})
} else {
tracing::warn!(agent_id = %agent_id, "Agent not found for status request");
Reply::ready()
}
});
builder.mutate_on::<ChildTerminated>(|_actor, envelope| {
let msg = envelope.message();
tracing::info!(
child_id = ?msg.child_id,
reason = ?msg.reason,
"Child actor terminated"
);
Reply::ready()
});
builder.try_mutate_on::<AgentMessage, (), crate::error::MultiAgentError>(|actor, envelope| {
let msg = envelope.message();
let to_str = msg.to.to_string();
if let Some(target_handle) = actor.model.agents.get(&to_str) {
let handle = target_handle.clone();
let incoming = IncomingAgentMessage::from(msg.clone());
tracing::debug!(
from = %msg.from,
to = %msg.to,
"Routing agent message"
);
actor.model.metrics.messages_routed += 1;
Reply::try_pending(async move {
handle.send(incoming).await;
Ok(())
})
} else {
tracing::warn!(to = %msg.to, "Target agent not found for message");
Reply::try_err(crate::error::MultiAgentError::agent_not_found(
msg.to.clone(),
))
}
});
builder.try_mutate_on::<DelegateTask, (), crate::error::MultiAgentError>(|actor, envelope| {
let msg = envelope.message();
let to_str = msg.to.to_string();
if let Some(target_handle) = actor.model.agents.get(&to_str) {
let handle = target_handle.clone();
let incoming = IncomingTask::from_delegate(msg);
tracing::info!(
from = %msg.from,
to = %msg.to,
task_id = %msg.task_id,
task_type = %msg.task_type,
"Routing task delegation"
);
actor.model.metrics.messages_routed += 1;
Reply::try_pending(async move {
handle.send(incoming).await;
Ok(())
})
} else {
tracing::warn!(to = %msg.to, "Target agent not found for task delegation");
Reply::try_err(crate::error::MultiAgentError::agent_not_found(
msg.to.clone(),
))
}
});
builder.mutate_on::<AnnounceCapabilities>(|actor, envelope| {
let msg = envelope.message();
actor
.model
.capability_registry
.register(msg.agent_id.clone(), msg.capabilities.clone());
tracing::info!(
agent_id = %msg.agent_id,
capabilities = ?msg.capabilities,
"Agent capabilities registered"
);
Reply::ready()
});
builder.act_on::<FindCapableAgent>(|actor, envelope| {
let msg = envelope.message();
let reply = envelope.reply_envelope();
let agent_id = actor
.model
.capability_registry
.find_capable_agent(&msg.capability);
tracing::debug!(
capability = %msg.capability,
found = agent_id.is_some(),
"Capability search"
);
let response = CapableAgentFound {
correlation_id: msg.correlation_id.clone(),
agent_id,
capability: msg.capability.clone(),
};
Reply::pending(async move {
reply.send(response).await;
})
});
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn kernel_metrics_default() {
let metrics = KernelMetrics::default();
assert_eq!(metrics.agents_spawned, 0);
assert_eq!(metrics.agents_stopped, 0);
assert_eq!(metrics.messages_routed, 0);
}
}