1#![warn(missing_docs, clippy::pedantic)]
7
8mod call;
9mod lifecycle;
10mod mxp_handlers;
11mod registry;
12mod scheduler;
13
14use std::sync::Arc;
15
16use agent_primitives::{AgentId, AgentManifest};
17use mxp::Message;
18use mxp_handlers::dispatch_message;
19use thiserror::Error;
20use tokio::task::JoinHandle;
21use tracing::warn;
22
23pub use call::{
24 AuditEmitter, CallExecutor, CallOutcome, CallOutcomeSink, CollectingSink,
25 CompositeAuditEmitter, CompositePolicyObserver, GovernanceAuditEmitter, KernelMessageHandler,
26 KernelMessageHandlerBuilder, MxpAuditObserver, PolicyObserver, ToolInvocationResult,
27 TracingAuditEmitter, TracingCallSink, TracingPolicyObserver,
28};
29pub use lifecycle::{AgentState, Lifecycle, LifecycleError, LifecycleEvent, LifecycleResult};
30pub use mxp_handlers::{AgentMessageHandler, HandlerContext, HandlerError, HandlerResult};
31pub use registry::{AgentRegistry, RegistrationConfig, RegistryError, RegistryResult};
32pub use scheduler::{SchedulerConfig, SchedulerError, SchedulerResult, TaskScheduler};
33
34use registry::RegistrationController;
35
36#[derive(Debug)]
38pub struct AgentKernel<H>
39where
40 H: AgentMessageHandler + 'static,
41{
42 agent_id: AgentId,
43 lifecycle: Lifecycle,
44 handler: Arc<H>,
45 scheduler: TaskScheduler,
46 registry: Option<RegistrationController>,
47}
48
49impl<H> AgentKernel<H>
50where
51 H: AgentMessageHandler + 'static,
52{
53 #[must_use]
55 pub fn new(agent_id: AgentId, handler: Arc<H>, scheduler: TaskScheduler) -> Self {
56 Self {
57 agent_id,
58 lifecycle: Lifecycle::new(agent_id),
59 handler,
60 scheduler,
61 registry: None,
62 }
63 }
64
65 pub fn set_registry<R>(
67 &mut self,
68 registry: Arc<R>,
69 manifest: AgentManifest,
70 config: RegistrationConfig,
71 ) where
72 R: AgentRegistry + 'static,
73 {
74 let registry: Arc<dyn AgentRegistry> = registry;
75 self.registry = Some(RegistrationController::new(registry, manifest, config));
76 }
77
78 #[must_use]
80 pub const fn agent_id(&self) -> AgentId {
81 self.agent_id
82 }
83
84 #[must_use]
86 pub fn state(&self) -> AgentState {
87 self.lifecycle.state()
88 }
89
90 pub fn transition(&mut self, event: LifecycleEvent) -> KernelResult<AgentState> {
97 let state = self.lifecycle.transition(event)?;
98 if let Some(controller) = &mut self.registry
99 && let Err(err) = controller.on_state_change(state, &self.scheduler)
100 {
101 warn!(?err, "registry hook failed during state transition");
102 return Err(err.into());
103 }
104
105 Ok(state)
106 }
107
108 pub async fn handle_message(&self, message: Message) -> HandlerResult {
114 let ctx = HandlerContext::from_message(self.agent_id, message);
115 dispatch_message(self.handler.as_ref(), ctx).await
116 }
117
118 pub fn schedule_message(&self, message: Message) -> SchedulerResult<JoinHandle<HandlerResult>> {
124 let handler = Arc::clone(&self.handler);
125 let agent_id = self.agent_id;
126 self.scheduler.spawn(async move {
127 let ctx = HandlerContext::from_message(agent_id, message);
128 dispatch_message(handler.as_ref(), ctx).await
129 })
130 }
131
132 #[must_use]
134 pub fn scheduler(&self) -> &TaskScheduler {
135 &self.scheduler
136 }
137}
138
139#[derive(Debug, Error)]
141pub enum KernelError {
142 #[error(transparent)]
144 Lifecycle(#[from] LifecycleError),
145 #[error(transparent)]
147 Registry(#[from] RegistryError),
148}
149
150pub type KernelResult<T> = Result<T, KernelError>;
152
153#[cfg(test)]
154mod tests {
155 use super::*;
156 use std::num::NonZeroUsize;
157 use std::sync::Arc;
158 use std::sync::atomic::{AtomicUsize, Ordering};
159 use std::time::Duration;
160
161 use agent_primitives::{Capability, CapabilityId};
162
163 struct NullHandler;
164
165 impl AgentMessageHandler for NullHandler {}
166
167 #[derive(Default)]
168 struct CountingRegistry {
169 registers: Arc<AtomicUsize>,
170 heartbeats: Arc<AtomicUsize>,
171 deregisters: Arc<AtomicUsize>,
172 }
173
174 #[async_trait::async_trait]
175 impl AgentRegistry for CountingRegistry {
176 async fn register(&self, _manifest: &AgentManifest) -> RegistryResult<()> {
177 self.registers.fetch_add(1, Ordering::SeqCst);
178 Ok(())
179 }
180
181 async fn heartbeat(&self, _manifest: &AgentManifest) -> RegistryResult<()> {
182 self.heartbeats.fetch_add(1, Ordering::SeqCst);
183 Ok(())
184 }
185
186 async fn deregister(&self, _manifest: &AgentManifest) -> RegistryResult<()> {
187 self.deregisters.fetch_add(1, Ordering::SeqCst);
188 Ok(())
189 }
190 }
191
192 fn capability() -> Capability {
193 Capability::builder(CapabilityId::new("kernel.test").unwrap())
194 .name("Test")
195 .unwrap()
196 .version("1.0.0")
197 .unwrap()
198 .add_scope("read:test")
199 .unwrap()
200 .build()
201 .unwrap()
202 }
203
204 fn manifest() -> AgentManifest {
205 AgentManifest::builder(AgentId::random())
206 .name("kernel-agent")
207 .unwrap()
208 .version("0.0.1")
209 .unwrap()
210 .capabilities(vec![capability()])
211 .build()
212 .unwrap()
213 }
214
215 #[tokio::test]
216 async fn registry_hooks_trigger_lifecycle_actions() {
217 let scheduler = TaskScheduler::default();
218 let handler = Arc::new(NullHandler);
219 let mut kernel = AgentKernel::new(AgentId::random(), handler, scheduler.clone());
220
221 let registry = Arc::new(CountingRegistry::default());
222 let config = RegistrationConfig::new(
223 Duration::from_millis(10),
224 Duration::from_millis(5),
225 Duration::from_millis(20),
226 NonZeroUsize::new(3).unwrap(),
227 );
228 kernel.set_registry(registry.clone(), manifest(), config);
229
230 kernel.transition(LifecycleEvent::Boot).unwrap();
231 kernel.transition(LifecycleEvent::Activate).unwrap();
232
233 tokio::time::sleep(Duration::from_millis(35)).await;
234 assert!(registry.registers.load(Ordering::SeqCst) >= 1);
235 assert!(registry.heartbeats.load(Ordering::SeqCst) >= 1);
236
237 kernel.transition(LifecycleEvent::Retire).unwrap();
238 kernel.transition(LifecycleEvent::Terminate).unwrap();
239 tokio::time::sleep(Duration::from_millis(20)).await;
240 assert!(registry.deregisters.load(Ordering::SeqCst) >= 1);
241 }
242}