1use std::sync::atomic::{AtomicU64, Ordering};
8
9use dashmap::DashMap;
10use serde::{Deserialize, Serialize};
11use tokio_util::sync::CancellationToken;
12
13use crate::capability::AgentCapabilities;
14use crate::error::KernelError;
15
16pub type Pid = u64;
18
19#[non_exhaustive]
21#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
22pub enum ProcessState {
23 Starting,
25 Running,
27 Suspended,
29 Stopping,
31 Exited(i32),
33}
34
35impl ProcessState {
36 pub fn can_transition_to(&self, next: &ProcessState) -> bool {
45 matches!(
46 (self, next),
47 (ProcessState::Starting, ProcessState::Running)
48 | (ProcessState::Starting, ProcessState::Exited(_))
49 | (ProcessState::Running, ProcessState::Suspended)
50 | (ProcessState::Running, ProcessState::Stopping)
51 | (ProcessState::Running, ProcessState::Exited(_))
52 | (ProcessState::Suspended, ProcessState::Running)
53 | (ProcessState::Suspended, ProcessState::Stopping)
54 | (ProcessState::Suspended, ProcessState::Exited(_))
55 | (ProcessState::Stopping, ProcessState::Exited(_))
56 )
57 }
58}
59
60impl std::fmt::Display for ProcessState {
61 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
62 match self {
63 ProcessState::Starting => write!(f, "starting"),
64 ProcessState::Running => write!(f, "running"),
65 ProcessState::Suspended => write!(f, "suspended"),
66 ProcessState::Stopping => write!(f, "stopping"),
67 ProcessState::Exited(code) => write!(f, "exited({code})"),
68 }
69 }
70}
71
72#[derive(Debug, Clone, Default, Serialize, Deserialize)]
74pub struct ResourceUsage {
75 pub memory_bytes: u64,
77 pub cpu_time_ms: u64,
79 pub tool_calls: u64,
81 pub messages_sent: u64,
83}
84
85#[derive(Debug, Clone)]
87pub struct ProcessEntry {
88 pub pid: Pid,
90 pub agent_id: String,
92 pub state: ProcessState,
94 pub capabilities: AgentCapabilities,
96 pub resource_usage: ResourceUsage,
98 pub cancel_token: CancellationToken,
100 pub parent_pid: Option<Pid>,
102}
103
104pub struct ProcessTable {
110 next_pid: AtomicU64,
111 entries: DashMap<Pid, ProcessEntry>,
112 max_processes: u32,
113}
114
115impl ProcessTable {
116 pub fn new(max_processes: u32) -> Self {
118 Self {
119 next_pid: AtomicU64::new(1), entries: DashMap::new(),
121 max_processes,
122 }
123 }
124
125 pub fn allocate_pid(&self) -> Pid {
127 self.next_pid.fetch_add(1, Ordering::Relaxed)
128 }
129
130 pub fn insert(&self, mut entry: ProcessEntry) -> Result<Pid, KernelError> {
136 if self.entries.len() >= self.max_processes as usize {
137 return Err(KernelError::ProcessTableFull {
138 max: self.max_processes,
139 });
140 }
141 let pid = self.allocate_pid();
142 entry.pid = pid;
143 self.entries.insert(pid, entry);
144 Ok(pid)
145 }
146
147 pub fn insert_with_pid(&self, entry: ProcessEntry) -> Result<(), KernelError> {
149 if self.entries.len() >= self.max_processes as usize {
150 return Err(KernelError::ProcessTableFull {
151 max: self.max_processes,
152 });
153 }
154 self.entries.insert(entry.pid, entry);
155 Ok(())
156 }
157
158 pub fn get(&self, pid: Pid) -> Option<ProcessEntry> {
160 self.entries.get(&pid).map(|e| e.value().clone())
161 }
162
163 pub fn remove(&self, pid: Pid) -> Option<ProcessEntry> {
165 self.entries.remove(&pid).map(|(_, e)| e)
166 }
167
168 pub fn list(&self) -> Vec<ProcessEntry> {
170 self.entries.iter().map(|e| e.value().clone()).collect()
171 }
172
173 pub fn update_state(&self, pid: Pid, new_state: ProcessState) -> Result<(), KernelError> {
177 let mut entry = self
178 .entries
179 .get_mut(&pid)
180 .ok_or(KernelError::ProcessNotFound { pid })?;
181
182 if !entry.state.can_transition_to(&new_state) {
183 return Err(KernelError::InvalidStateTransition {
184 pid,
185 from: entry.state.clone(),
186 to: new_state,
187 });
188 }
189
190 entry.state = new_state;
191 Ok(())
192 }
193
194 pub fn update_resources(&self, pid: Pid, usage: ResourceUsage) -> Result<(), KernelError> {
196 let mut entry = self
197 .entries
198 .get_mut(&pid)
199 .ok_or(KernelError::ProcessNotFound { pid })?;
200 entry.resource_usage = usage;
201 Ok(())
202 }
203
204 pub fn len(&self) -> usize {
206 self.entries.len()
207 }
208
209 pub fn is_empty(&self) -> bool {
211 self.entries.is_empty()
212 }
213
214 pub fn max_processes(&self) -> u32 {
216 self.max_processes
217 }
218
219 pub fn set_capabilities(
225 &self,
226 pid: Pid,
227 capabilities: AgentCapabilities,
228 ) -> Result<(), KernelError> {
229 let mut entry = self
230 .entries
231 .get_mut(&pid)
232 .ok_or(KernelError::ProcessNotFound { pid })?;
233 entry.capabilities = capabilities;
234 Ok(())
235 }
236
237 pub fn count_by_state(&self, state: &ProcessState) -> usize {
239 self.entries.iter().filter(|e| &e.state == state).count()
240 }
241}
242
243#[cfg(test)]
244mod tests {
245 use super::*;
246
247 fn make_entry(agent_id: &str) -> ProcessEntry {
248 ProcessEntry {
249 pid: 0, agent_id: agent_id.to_owned(),
251 state: ProcessState::Starting,
252 capabilities: AgentCapabilities::default(),
253 resource_usage: ResourceUsage::default(),
254 cancel_token: CancellationToken::new(),
255 parent_pid: None,
256 }
257 }
258
259 #[test]
260 fn insert_and_get() {
261 let table = ProcessTable::new(64);
262 let pid = table.insert(make_entry("agent-1")).unwrap();
263 assert_eq!(pid, 1);
264
265 let entry = table.get(pid).unwrap();
266 assert_eq!(entry.agent_id, "agent-1");
267 assert_eq!(entry.pid, 1);
268 }
269
270 #[test]
271 fn insert_multiple() {
272 let table = ProcessTable::new(64);
273 let p1 = table.insert(make_entry("a1")).unwrap();
274 let p2 = table.insert(make_entry("a2")).unwrap();
275 let p3 = table.insert(make_entry("a3")).unwrap();
276
277 assert_eq!(p1, 1);
278 assert_eq!(p2, 2);
279 assert_eq!(p3, 3);
280 assert_eq!(table.len(), 3);
281 }
282
283 #[test]
284 fn remove() {
285 let table = ProcessTable::new(64);
286 let pid = table.insert(make_entry("agent-1")).unwrap();
287 let removed = table.remove(pid);
288 assert!(removed.is_some());
289 assert!(table.get(pid).is_none());
290 assert!(table.is_empty());
291 }
292
293 #[test]
294 fn remove_nonexistent() {
295 let table = ProcessTable::new(64);
296 assert!(table.remove(999).is_none());
297 }
298
299 #[test]
300 fn list() {
301 let table = ProcessTable::new(64);
302 table.insert(make_entry("a1")).unwrap();
303 table.insert(make_entry("a2")).unwrap();
304
305 let entries = table.list();
306 assert_eq!(entries.len(), 2);
307 }
308
309 #[test]
310 fn update_state_valid() {
311 let table = ProcessTable::new(64);
312 let pid = table.insert(make_entry("agent-1")).unwrap();
313
314 table.update_state(pid, ProcessState::Running).unwrap();
316 assert_eq!(table.get(pid).unwrap().state, ProcessState::Running);
317
318 table.update_state(pid, ProcessState::Stopping).unwrap();
320 assert_eq!(table.get(pid).unwrap().state, ProcessState::Stopping);
321
322 table.update_state(pid, ProcessState::Exited(0)).unwrap();
324 assert_eq!(table.get(pid).unwrap().state, ProcessState::Exited(0));
325 }
326
327 #[test]
328 fn update_state_invalid_transition() {
329 let table = ProcessTable::new(64);
330 let pid = table.insert(make_entry("agent-1")).unwrap();
331
332 let result = table.update_state(pid, ProcessState::Suspended);
334 assert!(result.is_err());
335 }
336
337 #[test]
338 fn update_state_nonexistent_pid() {
339 let table = ProcessTable::new(64);
340 let result = table.update_state(999, ProcessState::Running);
341 assert!(result.is_err());
342 }
343
344 #[test]
345 fn process_table_full() {
346 let table = ProcessTable::new(2);
347 table.insert(make_entry("a1")).unwrap();
348 table.insert(make_entry("a2")).unwrap();
349 let result = table.insert(make_entry("a3"));
350 assert!(result.is_err());
351 }
352
353 #[test]
354 fn update_resources() {
355 let table = ProcessTable::new(64);
356 let pid = table.insert(make_entry("agent-1")).unwrap();
357
358 let usage = ResourceUsage {
359 memory_bytes: 1024,
360 cpu_time_ms: 500,
361 tool_calls: 10,
362 messages_sent: 5,
363 };
364 table.update_resources(pid, usage).unwrap();
365
366 let entry = table.get(pid).unwrap();
367 assert_eq!(entry.resource_usage.memory_bytes, 1024);
368 assert_eq!(entry.resource_usage.cpu_time_ms, 500);
369 }
370
371 #[test]
372 fn state_display() {
373 assert_eq!(ProcessState::Starting.to_string(), "starting");
374 assert_eq!(ProcessState::Running.to_string(), "running");
375 assert_eq!(ProcessState::Suspended.to_string(), "suspended");
376 assert_eq!(ProcessState::Stopping.to_string(), "stopping");
377 assert_eq!(ProcessState::Exited(0).to_string(), "exited(0)");
378 assert_eq!(ProcessState::Exited(-1).to_string(), "exited(-1)");
379 }
380
381 #[test]
382 fn state_transitions() {
383 assert!(ProcessState::Starting.can_transition_to(&ProcessState::Running));
385 assert!(ProcessState::Starting.can_transition_to(&ProcessState::Exited(1)));
386 assert!(ProcessState::Running.can_transition_to(&ProcessState::Suspended));
387 assert!(ProcessState::Running.can_transition_to(&ProcessState::Stopping));
388 assert!(ProcessState::Running.can_transition_to(&ProcessState::Exited(0)));
389 assert!(ProcessState::Suspended.can_transition_to(&ProcessState::Running));
390 assert!(ProcessState::Suspended.can_transition_to(&ProcessState::Stopping));
391 assert!(ProcessState::Stopping.can_transition_to(&ProcessState::Exited(0)));
392
393 assert!(!ProcessState::Starting.can_transition_to(&ProcessState::Suspended));
395 assert!(!ProcessState::Starting.can_transition_to(&ProcessState::Stopping));
396 assert!(!ProcessState::Stopping.can_transition_to(&ProcessState::Running));
397 assert!(!ProcessState::Exited(0).can_transition_to(&ProcessState::Running));
398 assert!(!ProcessState::Exited(0).can_transition_to(&ProcessState::Starting));
399 }
400
401 #[test]
402 fn insert_with_pid_kernel() {
403 let table = ProcessTable::new(64);
404 let entry = ProcessEntry {
405 pid: 0,
406 agent_id: "kernel".to_owned(),
407 state: ProcessState::Running,
408 capabilities: AgentCapabilities::default(),
409 resource_usage: ResourceUsage::default(),
410 cancel_token: CancellationToken::new(),
411 parent_pid: None,
412 };
413 table.insert_with_pid(entry).unwrap();
414 assert_eq!(table.get(0).unwrap().agent_id, "kernel");
415 }
416
417 #[test]
418 fn set_capabilities() {
419 let table = ProcessTable::new(64);
420 let pid = table.insert(make_entry("agent-1")).unwrap();
421
422 let new_caps = AgentCapabilities {
423 can_spawn: false,
424 can_network: true,
425 ..Default::default()
426 };
427 table.set_capabilities(pid, new_caps).unwrap();
428
429 let entry = table.get(pid).unwrap();
430 assert!(!entry.capabilities.can_spawn);
431 assert!(entry.capabilities.can_network);
432 }
433
434 #[test]
435 fn set_capabilities_nonexistent_pid() {
436 let table = ProcessTable::new(64);
437 let result = table.set_capabilities(999, AgentCapabilities::default());
438 assert!(result.is_err());
439 }
440
441 #[test]
442 fn count_by_state() {
443 let table = ProcessTable::new(64);
444 let p1 = table.insert(make_entry("a1")).unwrap();
445 let p2 = table.insert(make_entry("a2")).unwrap();
446 table.insert(make_entry("a3")).unwrap();
447
448 assert_eq!(table.count_by_state(&ProcessState::Starting), 3);
450 assert_eq!(table.count_by_state(&ProcessState::Running), 0);
451
452 table.update_state(p1, ProcessState::Running).unwrap();
453 table.update_state(p2, ProcessState::Running).unwrap();
454
455 assert_eq!(table.count_by_state(&ProcessState::Starting), 1);
456 assert_eq!(table.count_by_state(&ProcessState::Running), 2);
457 }
458}