agent_kernel/
lib.rs

1//! Agent lifecycle state machine and execution loop.
2//!
3//! This crate provides the building blocks required by MXP-native agents: lifecycle
4//! management, message routing, and a lightweight scheduler backed by `tokio`.
5
6#![warn(missing_docs, clippy::pedantic)]
7
8mod call;
9mod lifecycle;
10mod mxp_handlers;
11mod registry;
12mod scheduler;
13
14use std::sync::Arc;
15
16use agent_primitives::{AgentId, AgentManifest};
17use mxp::Message;
18use mxp_handlers::dispatch_message;
19use thiserror::Error;
20use tokio::task::JoinHandle;
21use tracing::warn;
22
23pub use call::{
24    AuditEmitter, CallExecutor, CallOutcome, CallOutcomeSink, CollectingSink,
25    CompositeAuditEmitter, CompositePolicyObserver, GovernanceAuditEmitter, KernelMessageHandler,
26    KernelMessageHandlerBuilder, MxpAuditObserver, PolicyObserver, ToolInvocationResult,
27    TracingAuditEmitter, TracingCallSink, TracingPolicyObserver,
28};
29pub use lifecycle::{AgentState, Lifecycle, LifecycleError, LifecycleEvent, LifecycleResult};
30pub use mxp_handlers::{AgentMessageHandler, HandlerContext, HandlerError, HandlerResult};
31pub use registry::{AgentRegistry, RegistrationConfig, RegistryError, RegistryResult};
32pub use scheduler::{SchedulerConfig, SchedulerError, SchedulerResult, TaskScheduler};
33
34use registry::RegistrationController;
35
36/// Core runtime that wires lifecycle, scheduler, and MXP handlers.
37#[derive(Debug)]
38pub struct AgentKernel<H>
39where
40    H: AgentMessageHandler + 'static,
41{
42    agent_id: AgentId,
43    lifecycle: Lifecycle,
44    handler: Arc<H>,
45    scheduler: TaskScheduler,
46    registry: Option<RegistrationController>,
47}
48
49impl<H> AgentKernel<H>
50where
51    H: AgentMessageHandler + 'static,
52{
53    /// Creates a new agent kernel with the provided handler and scheduler.
54    #[must_use]
55    pub fn new(agent_id: AgentId, handler: Arc<H>, scheduler: TaskScheduler) -> Self {
56        Self {
57            agent_id,
58            lifecycle: Lifecycle::new(agent_id),
59            handler,
60            scheduler,
61            registry: None,
62        }
63    }
64
65    /// Provides registry integration for mesh discovery and heartbeat management.
66    pub fn set_registry<R>(
67        &mut self,
68        registry: Arc<R>,
69        manifest: AgentManifest,
70        config: RegistrationConfig,
71    ) where
72        R: AgentRegistry + 'static,
73    {
74        let registry: Arc<dyn AgentRegistry> = registry;
75        self.registry = Some(RegistrationController::new(registry, manifest, config));
76    }
77
78    /// Returns the identifier associated with this agent.
79    #[must_use]
80    pub const fn agent_id(&self) -> AgentId {
81        self.agent_id
82    }
83
84    /// Returns the current lifecycle state.
85    #[must_use]
86    pub fn state(&self) -> AgentState {
87        self.lifecycle.state()
88    }
89
90    /// Applies a lifecycle event, returning the new state on success.
91    ///
92    /// # Errors
93    ///
94    /// Returns [`LifecycleError`](LifecycleError) when the transition is
95    /// not permitted from the current state.
96    pub fn transition(&mut self, event: LifecycleEvent) -> KernelResult<AgentState> {
97        let state = self.lifecycle.transition(event)?;
98        if let Some(controller) = &mut self.registry
99            && let Err(err) = controller.on_state_change(state, &self.scheduler)
100        {
101            warn!(?err, "registry hook failed during state transition");
102            return Err(err.into());
103        }
104
105        Ok(state)
106    }
107
108    /// Handles an MXP message immediately on the current task.
109    ///
110    /// # Errors
111    ///
112    /// Propagates any error returned by the message handler implementation.
113    pub async fn handle_message(&self, message: Message) -> HandlerResult {
114        let ctx = HandlerContext::from_message(self.agent_id, message);
115        dispatch_message(self.handler.as_ref(), ctx).await
116    }
117
118    /// Enqueues an MXP message for asynchronous processing via the scheduler.
119    ///
120    /// # Errors
121    ///
122    /// Returns [`SchedulerError`] when the scheduler has been closed.
123    pub fn schedule_message(&self, message: Message) -> SchedulerResult<JoinHandle<HandlerResult>> {
124        let handler = Arc::clone(&self.handler);
125        let agent_id = self.agent_id;
126        self.scheduler.spawn(async move {
127            let ctx = HandlerContext::from_message(agent_id, message);
128            dispatch_message(handler.as_ref(), ctx).await
129        })
130    }
131
132    /// Returns a reference to the underlying scheduler.
133    #[must_use]
134    pub fn scheduler(&self) -> &TaskScheduler {
135        &self.scheduler
136    }
137}
138
139/// Errors emitted by [`AgentKernel`] operations.
140#[derive(Debug, Error)]
141pub enum KernelError {
142    /// Lifecycle transition failure.
143    #[error(transparent)]
144    Lifecycle(#[from] LifecycleError),
145    /// Registry hook failure.
146    #[error(transparent)]
147    Registry(#[from] RegistryError),
148}
149
150/// Result alias for kernel operations.
151pub type KernelResult<T> = Result<T, KernelError>;
152
153#[cfg(test)]
154mod tests {
155    use super::*;
156    use std::num::NonZeroUsize;
157    use std::sync::Arc;
158    use std::sync::atomic::{AtomicUsize, Ordering};
159    use std::time::Duration;
160
161    use agent_primitives::{Capability, CapabilityId};
162
163    struct NullHandler;
164
165    impl AgentMessageHandler for NullHandler {}
166
167    #[derive(Default)]
168    struct CountingRegistry {
169        registers: Arc<AtomicUsize>,
170        heartbeats: Arc<AtomicUsize>,
171        deregisters: Arc<AtomicUsize>,
172    }
173
174    #[async_trait::async_trait]
175    impl AgentRegistry for CountingRegistry {
176        async fn register(&self, _manifest: &AgentManifest) -> RegistryResult<()> {
177            self.registers.fetch_add(1, Ordering::SeqCst);
178            Ok(())
179        }
180
181        async fn heartbeat(&self, _manifest: &AgentManifest) -> RegistryResult<()> {
182            self.heartbeats.fetch_add(1, Ordering::SeqCst);
183            Ok(())
184        }
185
186        async fn deregister(&self, _manifest: &AgentManifest) -> RegistryResult<()> {
187            self.deregisters.fetch_add(1, Ordering::SeqCst);
188            Ok(())
189        }
190    }
191
192    fn capability() -> Capability {
193        Capability::builder(CapabilityId::new("kernel.test").unwrap())
194            .name("Test")
195            .unwrap()
196            .version("1.0.0")
197            .unwrap()
198            .add_scope("read:test")
199            .unwrap()
200            .build()
201            .unwrap()
202    }
203
204    fn manifest() -> AgentManifest {
205        AgentManifest::builder(AgentId::random())
206            .name("kernel-agent")
207            .unwrap()
208            .version("0.0.1")
209            .unwrap()
210            .capabilities(vec![capability()])
211            .build()
212            .unwrap()
213    }
214
215    #[tokio::test]
216    async fn registry_hooks_trigger_lifecycle_actions() {
217        let scheduler = TaskScheduler::default();
218        let handler = Arc::new(NullHandler);
219        let mut kernel = AgentKernel::new(AgentId::random(), handler, scheduler.clone());
220
221        let registry = Arc::new(CountingRegistry::default());
222        let config = RegistrationConfig::new(
223            Duration::from_millis(10),
224            Duration::from_millis(5),
225            Duration::from_millis(20),
226            NonZeroUsize::new(3).unwrap(),
227        );
228        kernel.set_registry(registry.clone(), manifest(), config);
229
230        kernel.transition(LifecycleEvent::Boot).unwrap();
231        kernel.transition(LifecycleEvent::Activate).unwrap();
232
233        tokio::time::sleep(Duration::from_millis(35)).await;
234        assert!(registry.registers.load(Ordering::SeqCst) >= 1);
235        assert!(registry.heartbeats.load(Ordering::SeqCst) >= 1);
236
237        kernel.transition(LifecycleEvent::Retire).unwrap();
238        kernel.transition(LifecycleEvent::Terminate).unwrap();
239        tokio::time::sleep(Duration::from_millis(20)).await;
240        assert!(registry.deregisters.load(Ordering::SeqCst) >= 1);
241    }
242}