Skip to main content

clawft_kernel/
process.rs

1//! Process table for PID-based agent tracking.
2//!
3//! The [`ProcessTable`] uses [`DashMap`] for lock-free concurrent
4//! access, allowing multiple kernel subsystems to query and update
5//! process state without contention.
6
7use std::sync::atomic::{AtomicU64, Ordering};
8
9use dashmap::DashMap;
10use serde::{Deserialize, Serialize};
11use tokio_util::sync::CancellationToken;
12
13use crate::capability::AgentCapabilities;
14use crate::error::KernelError;
15
16/// Process identifier. Monotonically increasing, never reused.
17pub type Pid = u64;
18
19/// State of a kernel-managed process.
20#[non_exhaustive]
21#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
22pub enum ProcessState {
23    /// Process is initializing.
24    Starting,
25    /// Process is actively running.
26    Running,
27    /// Process has been suspended (paused).
28    Suspended,
29    /// Process is in the process of stopping.
30    Stopping,
31    /// Process has exited with a status code.
32    Exited(i32),
33}
34
35impl ProcessState {
36    /// Check whether a state transition is valid.
37    ///
38    /// Valid transitions:
39    /// - Starting -> Running | Exited
40    /// - Running -> Suspended | Stopping | Exited
41    /// - Suspended -> Running | Stopping | Exited
42    /// - Stopping -> Exited
43    /// - Exited -> (none)
44    pub fn can_transition_to(&self, next: &ProcessState) -> bool {
45        matches!(
46            (self, next),
47            (ProcessState::Starting, ProcessState::Running)
48                | (ProcessState::Starting, ProcessState::Exited(_))
49                | (ProcessState::Running, ProcessState::Suspended)
50                | (ProcessState::Running, ProcessState::Stopping)
51                | (ProcessState::Running, ProcessState::Exited(_))
52                | (ProcessState::Suspended, ProcessState::Running)
53                | (ProcessState::Suspended, ProcessState::Stopping)
54                | (ProcessState::Suspended, ProcessState::Exited(_))
55                | (ProcessState::Stopping, ProcessState::Exited(_))
56        )
57    }
58}
59
60impl std::fmt::Display for ProcessState {
61    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
62        match self {
63            ProcessState::Starting => write!(f, "starting"),
64            ProcessState::Running => write!(f, "running"),
65            ProcessState::Suspended => write!(f, "suspended"),
66            ProcessState::Stopping => write!(f, "stopping"),
67            ProcessState::Exited(code) => write!(f, "exited({code})"),
68        }
69    }
70}
71
72/// Resource usage counters for a process.
73#[derive(Debug, Clone, Default, Serialize, Deserialize)]
74pub struct ResourceUsage {
75    /// Approximate memory usage in bytes.
76    pub memory_bytes: u64,
77    /// Accumulated CPU time in milliseconds.
78    pub cpu_time_ms: u64,
79    /// Total number of tool calls made.
80    pub tool_calls: u64,
81    /// Total number of IPC messages sent.
82    pub messages_sent: u64,
83}
84
85/// A single entry in the process table.
86#[derive(Debug, Clone)]
87pub struct ProcessEntry {
88    /// Unique process identifier.
89    pub pid: Pid,
90    /// Agent identifier string.
91    pub agent_id: String,
92    /// Current state.
93    pub state: ProcessState,
94    /// Capabilities granted to this process.
95    pub capabilities: AgentCapabilities,
96    /// Resource usage counters.
97    pub resource_usage: ResourceUsage,
98    /// Cancellation token for cooperative shutdown.
99    pub cancel_token: CancellationToken,
100    /// PID of the parent process (None for the root process).
101    pub parent_pid: Option<Pid>,
102}
103
104/// Concurrent process table with PID allocation.
105///
106/// Uses [`DashMap`] for lock-free concurrent reads and writes.
107/// PIDs are allocated monotonically from an [`AtomicU64`] and are
108/// never reused within a kernel session.
109pub struct ProcessTable {
110    next_pid: AtomicU64,
111    entries: DashMap<Pid, ProcessEntry>,
112    max_processes: u32,
113}
114
115impl ProcessTable {
116    /// Create a new process table with the given maximum capacity.
117    pub fn new(max_processes: u32) -> Self {
118        Self {
119            next_pid: AtomicU64::new(1), // PID 0 reserved for kernel
120            entries: DashMap::new(),
121            max_processes,
122        }
123    }
124
125    /// Allocate the next PID without inserting an entry.
126    pub fn allocate_pid(&self) -> Pid {
127        self.next_pid.fetch_add(1, Ordering::Relaxed)
128    }
129
130    /// Insert a process entry into the table.
131    ///
132    /// The entry's `pid` field is set to the next available PID.
133    /// Returns the assigned PID, or an error if the process table
134    /// is at capacity.
135    pub fn insert(&self, mut entry: ProcessEntry) -> Result<Pid, KernelError> {
136        if self.entries.len() >= self.max_processes as usize {
137            return Err(KernelError::ProcessTableFull {
138                max: self.max_processes,
139            });
140        }
141        let pid = self.allocate_pid();
142        entry.pid = pid;
143        self.entries.insert(pid, entry);
144        Ok(pid)
145    }
146
147    /// Insert a process entry with a specific PID (for kernel PID 0).
148    pub fn insert_with_pid(&self, entry: ProcessEntry) -> Result<(), KernelError> {
149        if self.entries.len() >= self.max_processes as usize {
150            return Err(KernelError::ProcessTableFull {
151                max: self.max_processes,
152            });
153        }
154        self.entries.insert(entry.pid, entry);
155        Ok(())
156    }
157
158    /// Get a clone of a process entry by PID.
159    pub fn get(&self, pid: Pid) -> Option<ProcessEntry> {
160        self.entries.get(&pid).map(|e| e.value().clone())
161    }
162
163    /// Remove a process entry by PID.
164    pub fn remove(&self, pid: Pid) -> Option<ProcessEntry> {
165        self.entries.remove(&pid).map(|(_, e)| e)
166    }
167
168    /// List all process entries (cloned).
169    pub fn list(&self) -> Vec<ProcessEntry> {
170        self.entries.iter().map(|e| e.value().clone()).collect()
171    }
172
173    /// Update the state of a process.
174    ///
175    /// Validates the state transition before applying.
176    pub fn update_state(&self, pid: Pid, new_state: ProcessState) -> Result<(), KernelError> {
177        let mut entry = self
178            .entries
179            .get_mut(&pid)
180            .ok_or(KernelError::ProcessNotFound { pid })?;
181
182        if !entry.state.can_transition_to(&new_state) {
183            return Err(KernelError::InvalidStateTransition {
184                pid,
185                from: entry.state.clone(),
186                to: new_state,
187            });
188        }
189
190        entry.state = new_state;
191        Ok(())
192    }
193
194    /// Update resource usage for a process.
195    pub fn update_resources(&self, pid: Pid, usage: ResourceUsage) -> Result<(), KernelError> {
196        let mut entry = self
197            .entries
198            .get_mut(&pid)
199            .ok_or(KernelError::ProcessNotFound { pid })?;
200        entry.resource_usage = usage;
201        Ok(())
202    }
203
204    /// Get the number of processes in the table.
205    pub fn len(&self) -> usize {
206        self.entries.len()
207    }
208
209    /// Check whether the process table is empty.
210    pub fn is_empty(&self) -> bool {
211        self.entries.is_empty()
212    }
213
214    /// Get the maximum process capacity.
215    pub fn max_processes(&self) -> u32 {
216        self.max_processes
217    }
218
219    /// Update the capabilities of a process.
220    ///
221    /// Replaces the existing capabilities with the given ones.
222    /// This is used by the supervisor when hot-updating an agent's
223    /// permissions (future work) or during restart.
224    pub fn set_capabilities(
225        &self,
226        pid: Pid,
227        capabilities: AgentCapabilities,
228    ) -> Result<(), KernelError> {
229        let mut entry = self
230            .entries
231            .get_mut(&pid)
232            .ok_or(KernelError::ProcessNotFound { pid })?;
233        entry.capabilities = capabilities;
234        Ok(())
235    }
236
237    /// Count processes in the given state.
238    pub fn count_by_state(&self, state: &ProcessState) -> usize {
239        self.entries.iter().filter(|e| &e.state == state).count()
240    }
241}
242
243#[cfg(test)]
244mod tests {
245    use super::*;
246
247    fn make_entry(agent_id: &str) -> ProcessEntry {
248        ProcessEntry {
249            pid: 0, // Will be set by insert()
250            agent_id: agent_id.to_owned(),
251            state: ProcessState::Starting,
252            capabilities: AgentCapabilities::default(),
253            resource_usage: ResourceUsage::default(),
254            cancel_token: CancellationToken::new(),
255            parent_pid: None,
256        }
257    }
258
259    #[test]
260    fn insert_and_get() {
261        let table = ProcessTable::new(64);
262        let pid = table.insert(make_entry("agent-1")).unwrap();
263        assert_eq!(pid, 1);
264
265        let entry = table.get(pid).unwrap();
266        assert_eq!(entry.agent_id, "agent-1");
267        assert_eq!(entry.pid, 1);
268    }
269
270    #[test]
271    fn insert_multiple() {
272        let table = ProcessTable::new(64);
273        let p1 = table.insert(make_entry("a1")).unwrap();
274        let p2 = table.insert(make_entry("a2")).unwrap();
275        let p3 = table.insert(make_entry("a3")).unwrap();
276
277        assert_eq!(p1, 1);
278        assert_eq!(p2, 2);
279        assert_eq!(p3, 3);
280        assert_eq!(table.len(), 3);
281    }
282
283    #[test]
284    fn remove() {
285        let table = ProcessTable::new(64);
286        let pid = table.insert(make_entry("agent-1")).unwrap();
287        let removed = table.remove(pid);
288        assert!(removed.is_some());
289        assert!(table.get(pid).is_none());
290        assert!(table.is_empty());
291    }
292
293    #[test]
294    fn remove_nonexistent() {
295        let table = ProcessTable::new(64);
296        assert!(table.remove(999).is_none());
297    }
298
299    #[test]
300    fn list() {
301        let table = ProcessTable::new(64);
302        table.insert(make_entry("a1")).unwrap();
303        table.insert(make_entry("a2")).unwrap();
304
305        let entries = table.list();
306        assert_eq!(entries.len(), 2);
307    }
308
309    #[test]
310    fn update_state_valid() {
311        let table = ProcessTable::new(64);
312        let pid = table.insert(make_entry("agent-1")).unwrap();
313
314        // Starting -> Running
315        table.update_state(pid, ProcessState::Running).unwrap();
316        assert_eq!(table.get(pid).unwrap().state, ProcessState::Running);
317
318        // Running -> Stopping
319        table.update_state(pid, ProcessState::Stopping).unwrap();
320        assert_eq!(table.get(pid).unwrap().state, ProcessState::Stopping);
321
322        // Stopping -> Exited
323        table.update_state(pid, ProcessState::Exited(0)).unwrap();
324        assert_eq!(table.get(pid).unwrap().state, ProcessState::Exited(0));
325    }
326
327    #[test]
328    fn update_state_invalid_transition() {
329        let table = ProcessTable::new(64);
330        let pid = table.insert(make_entry("agent-1")).unwrap();
331
332        // Starting -> Suspended is not valid
333        let result = table.update_state(pid, ProcessState::Suspended);
334        assert!(result.is_err());
335    }
336
337    #[test]
338    fn update_state_nonexistent_pid() {
339        let table = ProcessTable::new(64);
340        let result = table.update_state(999, ProcessState::Running);
341        assert!(result.is_err());
342    }
343
344    #[test]
345    fn process_table_full() {
346        let table = ProcessTable::new(2);
347        table.insert(make_entry("a1")).unwrap();
348        table.insert(make_entry("a2")).unwrap();
349        let result = table.insert(make_entry("a3"));
350        assert!(result.is_err());
351    }
352
353    #[test]
354    fn update_resources() {
355        let table = ProcessTable::new(64);
356        let pid = table.insert(make_entry("agent-1")).unwrap();
357
358        let usage = ResourceUsage {
359            memory_bytes: 1024,
360            cpu_time_ms: 500,
361            tool_calls: 10,
362            messages_sent: 5,
363        };
364        table.update_resources(pid, usage).unwrap();
365
366        let entry = table.get(pid).unwrap();
367        assert_eq!(entry.resource_usage.memory_bytes, 1024);
368        assert_eq!(entry.resource_usage.cpu_time_ms, 500);
369    }
370
371    #[test]
372    fn state_display() {
373        assert_eq!(ProcessState::Starting.to_string(), "starting");
374        assert_eq!(ProcessState::Running.to_string(), "running");
375        assert_eq!(ProcessState::Suspended.to_string(), "suspended");
376        assert_eq!(ProcessState::Stopping.to_string(), "stopping");
377        assert_eq!(ProcessState::Exited(0).to_string(), "exited(0)");
378        assert_eq!(ProcessState::Exited(-1).to_string(), "exited(-1)");
379    }
380
381    #[test]
382    fn state_transitions() {
383        // Valid transitions
384        assert!(ProcessState::Starting.can_transition_to(&ProcessState::Running));
385        assert!(ProcessState::Starting.can_transition_to(&ProcessState::Exited(1)));
386        assert!(ProcessState::Running.can_transition_to(&ProcessState::Suspended));
387        assert!(ProcessState::Running.can_transition_to(&ProcessState::Stopping));
388        assert!(ProcessState::Running.can_transition_to(&ProcessState::Exited(0)));
389        assert!(ProcessState::Suspended.can_transition_to(&ProcessState::Running));
390        assert!(ProcessState::Suspended.can_transition_to(&ProcessState::Stopping));
391        assert!(ProcessState::Stopping.can_transition_to(&ProcessState::Exited(0)));
392
393        // Invalid transitions
394        assert!(!ProcessState::Starting.can_transition_to(&ProcessState::Suspended));
395        assert!(!ProcessState::Starting.can_transition_to(&ProcessState::Stopping));
396        assert!(!ProcessState::Stopping.can_transition_to(&ProcessState::Running));
397        assert!(!ProcessState::Exited(0).can_transition_to(&ProcessState::Running));
398        assert!(!ProcessState::Exited(0).can_transition_to(&ProcessState::Starting));
399    }
400
401    #[test]
402    fn insert_with_pid_kernel() {
403        let table = ProcessTable::new(64);
404        let entry = ProcessEntry {
405            pid: 0,
406            agent_id: "kernel".to_owned(),
407            state: ProcessState::Running,
408            capabilities: AgentCapabilities::default(),
409            resource_usage: ResourceUsage::default(),
410            cancel_token: CancellationToken::new(),
411            parent_pid: None,
412        };
413        table.insert_with_pid(entry).unwrap();
414        assert_eq!(table.get(0).unwrap().agent_id, "kernel");
415    }
416
417    #[test]
418    fn set_capabilities() {
419        let table = ProcessTable::new(64);
420        let pid = table.insert(make_entry("agent-1")).unwrap();
421
422        let new_caps = AgentCapabilities {
423            can_spawn: false,
424            can_network: true,
425            ..Default::default()
426        };
427        table.set_capabilities(pid, new_caps).unwrap();
428
429        let entry = table.get(pid).unwrap();
430        assert!(!entry.capabilities.can_spawn);
431        assert!(entry.capabilities.can_network);
432    }
433
434    #[test]
435    fn set_capabilities_nonexistent_pid() {
436        let table = ProcessTable::new(64);
437        let result = table.set_capabilities(999, AgentCapabilities::default());
438        assert!(result.is_err());
439    }
440
441    #[test]
442    fn count_by_state() {
443        let table = ProcessTable::new(64);
444        let p1 = table.insert(make_entry("a1")).unwrap();
445        let p2 = table.insert(make_entry("a2")).unwrap();
446        table.insert(make_entry("a3")).unwrap();
447
448        // All start as Starting
449        assert_eq!(table.count_by_state(&ProcessState::Starting), 3);
450        assert_eq!(table.count_by_state(&ProcessState::Running), 0);
451
452        table.update_state(p1, ProcessState::Running).unwrap();
453        table.update_state(p2, ProcessState::Running).unwrap();
454
455        assert_eq!(table.count_by_state(&ProcessState::Starting), 1);
456        assert_eq!(table.count_by_state(&ProcessState::Running), 2);
457    }
458}