Skip to main content

plexus_substrate/activations/chaos/
activation.rs

1use super::types::*;
2use crate::activations::lattice::{LatticeStorage, NodeStatus};
3use async_stream::stream;
4use futures::Stream;
5use std::sync::Arc;
6
7/// Chaos activation — fault injection and observability for anti-fragility testing.
8///
9/// Exposes controlled chaos primitives:
10/// - Observe running nodes across all graphs
11/// - Inject failures or successes directly into lattice nodes
12/// - List and kill system processes (Claude sessions, etc.)
13/// - Snapshot graph execution state
14/// - Hard-crash the substrate to test recovery
15#[derive(Clone)]
16pub struct Chaos {
17    lattice: Arc<LatticeStorage>,
18}
19
20impl Chaos {
21    pub fn new(lattice: Arc<LatticeStorage>) -> Self {
22        Self { lattice }
23    }
24}
25
26/// Parse the outermost serde tag from a JSON spec blob
27fn spec_type(spec_json: &str) -> String {
28    serde_json::from_str::<serde_json::Value>(spec_json)
29        .ok()
30        .and_then(|v| v.get("type").and_then(|t| t.as_str()).map(|s| s.to_string()))
31        .unwrap_or_else(|| "unknown".to_string())
32}
33
34/// Find all PIDs whose /proc/<pid>/cmdline contains `pattern`
35fn find_pids_by_cmdline(pattern: &str) -> Vec<(u32, String)> {
36    let mut results = Vec::new();
37    let Ok(entries) = std::fs::read_dir("/proc") else { return results };
38    for entry in entries.flatten() {
39        let name = entry.file_name();
40        let pid_str = name.to_string_lossy();
41        let Ok(pid) = pid_str.parse::<u32>() else { continue };
42        let cmdline_path = format!("/proc/{}/cmdline", pid);
43        let Ok(raw) = std::fs::read(&cmdline_path) else { continue };
44        // cmdline is NUL-separated
45        let cmdline = raw.iter().map(|&b| if b == 0 { b' ' } else { b }).collect::<Vec<_>>();
46        let cmdline = String::from_utf8_lossy(&cmdline).to_string();
47        if cmdline.contains(pattern) {
48            results.push((pid, cmdline.trim().to_string()));
49        }
50    }
51    results
52}
53
54#[plexus_macros::activation(namespace = "chaos",
55version = "1.0.0",
56description = "Fault injection and observability for anti-fragility testing", crate_path = "plexus_core")]
57impl Chaos {
58    /// List all nodes currently in Running state across every lattice graph.
59    #[plexus_macros::method(description = "List all Running nodes across all lattice graphs")]
60    async fn list_running_nodes(
61        &self,
62    ) -> impl Stream<Item = ListRunningResult> + Send + 'static {
63        let lattice = self.lattice.clone();
64        stream! {
65            let graphs = match lattice.list_graphs().await {
66                Ok(g) => g,
67                Err(e) => { yield ListRunningResult::Err { message: e }; return; }
68            };
69
70            let mut count = 0;
71            for graph in &graphs {
72                let nodes = match lattice.get_nodes(&graph.id).await {
73                    Ok(n) => n,
74                    Err(_) => continue,
75                };
76                for node in nodes {
77                    if node.status == NodeStatus::Running {
78                        count += 1;
79                        let st = spec_type(&serde_json::to_string(&node.spec).unwrap_or_default());
80                        yield ListRunningResult::Node(RunningNode {
81                            graph_id: graph.id.clone(),
82                            node_id: node.id.clone(),
83                            spec_type: st,
84                        });
85                    }
86                }
87            }
88            yield ListRunningResult::Done { count };
89        }
90    }
91
92    /// Force-fail a specific node. Calls advance_graph with an error token,
93    /// triggering downstream failure propagation and retry logic.
94    #[plexus_macros::method(description = "Inject a failure into a running node",
95    params(
96        graph_id = "Lattice graph ID",
97        node_id = "Node to fail",
98        error = "Error message to inject (default: 'chaos: injected failure')"
99    ))]
100    async fn inject_failure(
101        &self,
102        graph_id: String,
103        node_id: String,
104        error: Option<String>,
105    ) -> impl Stream<Item = InjectResult> + Send + 'static {
106        let lattice = self.lattice.clone();
107        stream! {
108            let error_msg = error.unwrap_or_else(|| "chaos: injected failure".to_string());
109
110            // Verify node is Running before injecting
111            let node = match lattice.get_node(&node_id).await {
112                Ok(n) => n,
113                Err(e) => { yield InjectResult::Err { message: e }; return; }
114            };
115            if node.status != NodeStatus::Running {
116                yield InjectResult::Skipped {
117                    reason: format!("node is {:?}, not Running", node.status),
118                };
119                return;
120            }
121
122            match lattice.advance_graph(&graph_id, &node_id, None, Some(error_msg.clone())).await {
123                Ok(()) => yield InjectResult::Ok {
124                    graph_id,
125                    node_id,
126                    action: format!("failed: {}", error_msg),
127                },
128                Err(e) => yield InjectResult::Err { message: e },
129            }
130        }
131    }
132
133    /// Force-complete a specific node with an ok token.
134    /// Useful for unblocking stuck nodes or skipping tasks in a test graph.
135    #[plexus_macros::method(description = "Inject a success into a running node",
136    params(
137        graph_id = "Lattice graph ID",
138        node_id = "Node to complete",
139        value = "JSON value to use as the output token (default: null)"
140    ))]
141    async fn inject_success(
142        &self,
143        graph_id: String,
144        node_id: String,
145        value: Option<String>,
146    ) -> impl Stream<Item = InjectResult> + Send + 'static {
147        use crate::activations::lattice::{NodeOutput, Token, TokenColor, TokenPayload};
148        let lattice = self.lattice.clone();
149        stream! {
150            let node = match lattice.get_node(&node_id).await {
151                Ok(n) => n,
152                Err(e) => { yield InjectResult::Err { message: e }; return; }
153            };
154            if node.status != NodeStatus::Running {
155                yield InjectResult::Skipped {
156                    reason: format!("node is {:?}, not Running", node.status),
157                };
158                return;
159            }
160
161            let payload_value = value
162                .as_deref()
163                .and_then(|s| serde_json::from_str(s).ok())
164                .unwrap_or(serde_json::Value::Null);
165
166            let output = NodeOutput::Single(Token {
167                color: TokenColor::Ok,
168                payload: Some(TokenPayload::Data { value: payload_value }),
169            });
170
171            match lattice.advance_graph(&graph_id, &node_id, Some(output), None).await {
172                Ok(()) => yield InjectResult::Ok {
173                    graph_id,
174                    node_id,
175                    action: "succeeded".to_string(),
176                },
177                Err(e) => yield InjectResult::Err { message: e },
178            }
179        }
180    }
181
182    /// List system processes whose cmdline contains the given pattern.
183    #[plexus_macros::method(description = "List processes matching a cmdline pattern",
184    params(pattern = "Substring to search for in /proc/*/cmdline"))]
185    async fn list_processes(
186        &self,
187        pattern: String,
188    ) -> impl Stream<Item = ListProcessesResult> + Send + 'static {
189        stream! {
190            let procs = find_pids_by_cmdline(&pattern);
191            let count = procs.len();
192            for (pid, cmdline) in procs {
193                yield ListProcessesResult::Process(ProcessInfo { pid, cmdline });
194            }
195            yield ListProcessesResult::Done { count };
196        }
197    }
198
199    /// Send SIGKILL to a process by PID.
200    #[plexus_macros::method(description = "Kill a process by PID (SIGKILL)",
201    params(pid = "Process ID to kill"))]
202    async fn kill_process(
203        &self,
204        pid: u32,
205    ) -> impl Stream<Item = KillProcessResult> + Send + 'static {
206        stream! {
207            // Verify process exists first
208            let cmdline_path = format!("/proc/{}/cmdline", pid);
209            if !std::path::Path::new(&cmdline_path).exists() {
210                yield KillProcessResult::NotFound;
211                return;
212            }
213
214            let result = unsafe { libc::kill(pid as i32, libc::SIGKILL) };
215            if result == 0 {
216                yield KillProcessResult::Killed { pid };
217            } else {
218                let errno = std::io::Error::last_os_error();
219                if errno.raw_os_error() == Some(libc::ESRCH) {
220                    yield KillProcessResult::NotFound;
221                } else {
222                    yield KillProcessResult::Err { message: format!("kill failed: {}", errno) };
223                }
224            }
225        }
226    }
227
228    /// Snapshot all nodes in a graph with their current statuses.
229    #[plexus_macros::method(description = "Get a full status snapshot of a lattice graph",
230    params(graph_id = "Lattice graph ID"))]
231    async fn graph_snapshot(
232        &self,
233        graph_id: String,
234    ) -> impl Stream<Item = GraphSnapshotResult> + Send + 'static {
235        let lattice = self.lattice.clone();
236        stream! {
237            let graph = match lattice.get_graph(&graph_id).await {
238                Ok(g) => g,
239                Err(e) => { yield GraphSnapshotResult::Err { message: e }; return; }
240            };
241            let nodes = match lattice.get_nodes(&graph_id).await {
242                Ok(n) => n,
243                Err(e) => { yield GraphSnapshotResult::Err { message: e }; return; }
244            };
245
246            let mut pending = 0usize;
247            let mut ready = 0usize;
248            let mut running = 0usize;
249            let mut complete = 0usize;
250            let mut failed = 0usize;
251
252            for node in &nodes {
253                match node.status {
254                    NodeStatus::Pending  => pending  += 1,
255                    NodeStatus::Ready    => ready    += 1,
256                    NodeStatus::Running  => running  += 1,
257                    NodeStatus::Complete => complete += 1,
258                    NodeStatus::Failed   => failed   += 1,
259                }
260                let st = spec_type(&serde_json::to_string(&node.spec).unwrap_or_default());
261                yield GraphSnapshotResult::Node(NodeSnapshot {
262                    node_id: node.id.clone(),
263                    status: format!("{:?}", node.status).to_lowercase(),
264                    spec_type: st,
265                    error: node.error.clone(),
266                });
267            }
268
269            yield GraphSnapshotResult::Summary {
270                graph_id,
271                graph_status: format!("{}", graph.status),
272                total: nodes.len(),
273                pending, ready, running, complete, failed,
274            };
275        }
276    }
277
278    /// Hard-crash the substrate process (SIGKILL self).
279    /// Used to test crash recovery — the substrate will not respond after this call.
280    /// Restart with `make restart` and observe `recovery: re-dispatching` in the logs.
281    #[plexus_macros::method(description = "Hard-crash the substrate (SIGKILL self) — use to test crash recovery")]
282    async fn crash(&self) -> impl Stream<Item = InjectResult> + Send + 'static {
283        stream! {
284            tracing::warn!("chaos: crash() called — killing substrate process");
285            yield InjectResult::Ok {
286                graph_id: "self".to_string(),
287                node_id: "self".to_string(),
288                action: "crashing".to_string(),
289            };
290            // Small delay so the response can be flushed
291            tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
292            unsafe { libc::kill(std::process::id() as i32, libc::SIGKILL) };
293        }
294    }
295}