plexus_substrate/activations/chaos/
activation.rs1use super::types::*;
2use crate::activations::lattice::{LatticeStorage, NodeStatus};
3use async_stream::stream;
4use futures::Stream;
5use std::sync::Arc;
6
7#[derive(Clone)]
16pub struct Chaos {
17 lattice: Arc<LatticeStorage>,
18}
19
20impl Chaos {
21 pub fn new(lattice: Arc<LatticeStorage>) -> Self {
22 Self { lattice }
23 }
24}
25
26fn spec_type(spec_json: &str) -> String {
28 serde_json::from_str::<serde_json::Value>(spec_json)
29 .ok()
30 .and_then(|v| v.get("type").and_then(|t| t.as_str()).map(|s| s.to_string()))
31 .unwrap_or_else(|| "unknown".to_string())
32}
33
34fn find_pids_by_cmdline(pattern: &str) -> Vec<(u32, String)> {
36 let mut results = Vec::new();
37 let Ok(entries) = std::fs::read_dir("/proc") else { return results };
38 for entry in entries.flatten() {
39 let name = entry.file_name();
40 let pid_str = name.to_string_lossy();
41 let Ok(pid) = pid_str.parse::<u32>() else { continue };
42 let cmdline_path = format!("/proc/{}/cmdline", pid);
43 let Ok(raw) = std::fs::read(&cmdline_path) else { continue };
44 let cmdline = raw.iter().map(|&b| if b == 0 { b' ' } else { b }).collect::<Vec<_>>();
46 let cmdline = String::from_utf8_lossy(&cmdline).to_string();
47 if cmdline.contains(pattern) {
48 results.push((pid, cmdline.trim().to_string()));
49 }
50 }
51 results
52}
53
54#[plexus_macros::activation(namespace = "chaos",
55version = "1.0.0",
56description = "Fault injection and observability for anti-fragility testing", crate_path = "plexus_core")]
57impl Chaos {
58 #[plexus_macros::method(description = "List all Running nodes across all lattice graphs")]
60 async fn list_running_nodes(
61 &self,
62 ) -> impl Stream<Item = ListRunningResult> + Send + 'static {
63 let lattice = self.lattice.clone();
64 stream! {
65 let graphs = match lattice.list_graphs().await {
66 Ok(g) => g,
67 Err(e) => { yield ListRunningResult::Err { message: e }; return; }
68 };
69
70 let mut count = 0;
71 for graph in &graphs {
72 let nodes = match lattice.get_nodes(&graph.id).await {
73 Ok(n) => n,
74 Err(_) => continue,
75 };
76 for node in nodes {
77 if node.status == NodeStatus::Running {
78 count += 1;
79 let st = spec_type(&serde_json::to_string(&node.spec).unwrap_or_default());
80 yield ListRunningResult::Node(RunningNode {
81 graph_id: graph.id.clone(),
82 node_id: node.id.clone(),
83 spec_type: st,
84 });
85 }
86 }
87 }
88 yield ListRunningResult::Done { count };
89 }
90 }
91
92 #[plexus_macros::method(description = "Inject a failure into a running node",
95 params(
96 graph_id = "Lattice graph ID",
97 node_id = "Node to fail",
98 error = "Error message to inject (default: 'chaos: injected failure')"
99 ))]
100 async fn inject_failure(
101 &self,
102 graph_id: String,
103 node_id: String,
104 error: Option<String>,
105 ) -> impl Stream<Item = InjectResult> + Send + 'static {
106 let lattice = self.lattice.clone();
107 stream! {
108 let error_msg = error.unwrap_or_else(|| "chaos: injected failure".to_string());
109
110 let node = match lattice.get_node(&node_id).await {
112 Ok(n) => n,
113 Err(e) => { yield InjectResult::Err { message: e }; return; }
114 };
115 if node.status != NodeStatus::Running {
116 yield InjectResult::Skipped {
117 reason: format!("node is {:?}, not Running", node.status),
118 };
119 return;
120 }
121
122 match lattice.advance_graph(&graph_id, &node_id, None, Some(error_msg.clone())).await {
123 Ok(()) => yield InjectResult::Ok {
124 graph_id,
125 node_id,
126 action: format!("failed: {}", error_msg),
127 },
128 Err(e) => yield InjectResult::Err { message: e },
129 }
130 }
131 }
132
133 #[plexus_macros::method(description = "Inject a success into a running node",
136 params(
137 graph_id = "Lattice graph ID",
138 node_id = "Node to complete",
139 value = "JSON value to use as the output token (default: null)"
140 ))]
141 async fn inject_success(
142 &self,
143 graph_id: String,
144 node_id: String,
145 value: Option<String>,
146 ) -> impl Stream<Item = InjectResult> + Send + 'static {
147 use crate::activations::lattice::{NodeOutput, Token, TokenColor, TokenPayload};
148 let lattice = self.lattice.clone();
149 stream! {
150 let node = match lattice.get_node(&node_id).await {
151 Ok(n) => n,
152 Err(e) => { yield InjectResult::Err { message: e }; return; }
153 };
154 if node.status != NodeStatus::Running {
155 yield InjectResult::Skipped {
156 reason: format!("node is {:?}, not Running", node.status),
157 };
158 return;
159 }
160
161 let payload_value = value
162 .as_deref()
163 .and_then(|s| serde_json::from_str(s).ok())
164 .unwrap_or(serde_json::Value::Null);
165
166 let output = NodeOutput::Single(Token {
167 color: TokenColor::Ok,
168 payload: Some(TokenPayload::Data { value: payload_value }),
169 });
170
171 match lattice.advance_graph(&graph_id, &node_id, Some(output), None).await {
172 Ok(()) => yield InjectResult::Ok {
173 graph_id,
174 node_id,
175 action: "succeeded".to_string(),
176 },
177 Err(e) => yield InjectResult::Err { message: e },
178 }
179 }
180 }
181
182 #[plexus_macros::method(description = "List processes matching a cmdline pattern",
184 params(pattern = "Substring to search for in /proc/*/cmdline"))]
185 async fn list_processes(
186 &self,
187 pattern: String,
188 ) -> impl Stream<Item = ListProcessesResult> + Send + 'static {
189 stream! {
190 let procs = find_pids_by_cmdline(&pattern);
191 let count = procs.len();
192 for (pid, cmdline) in procs {
193 yield ListProcessesResult::Process(ProcessInfo { pid, cmdline });
194 }
195 yield ListProcessesResult::Done { count };
196 }
197 }
198
199 #[plexus_macros::method(description = "Kill a process by PID (SIGKILL)",
201 params(pid = "Process ID to kill"))]
202 async fn kill_process(
203 &self,
204 pid: u32,
205 ) -> impl Stream<Item = KillProcessResult> + Send + 'static {
206 stream! {
207 let cmdline_path = format!("/proc/{}/cmdline", pid);
209 if !std::path::Path::new(&cmdline_path).exists() {
210 yield KillProcessResult::NotFound;
211 return;
212 }
213
214 let result = unsafe { libc::kill(pid as i32, libc::SIGKILL) };
215 if result == 0 {
216 yield KillProcessResult::Killed { pid };
217 } else {
218 let errno = std::io::Error::last_os_error();
219 if errno.raw_os_error() == Some(libc::ESRCH) {
220 yield KillProcessResult::NotFound;
221 } else {
222 yield KillProcessResult::Err { message: format!("kill failed: {}", errno) };
223 }
224 }
225 }
226 }
227
228 #[plexus_macros::method(description = "Get a full status snapshot of a lattice graph",
230 params(graph_id = "Lattice graph ID"))]
231 async fn graph_snapshot(
232 &self,
233 graph_id: String,
234 ) -> impl Stream<Item = GraphSnapshotResult> + Send + 'static {
235 let lattice = self.lattice.clone();
236 stream! {
237 let graph = match lattice.get_graph(&graph_id).await {
238 Ok(g) => g,
239 Err(e) => { yield GraphSnapshotResult::Err { message: e }; return; }
240 };
241 let nodes = match lattice.get_nodes(&graph_id).await {
242 Ok(n) => n,
243 Err(e) => { yield GraphSnapshotResult::Err { message: e }; return; }
244 };
245
246 let mut pending = 0usize;
247 let mut ready = 0usize;
248 let mut running = 0usize;
249 let mut complete = 0usize;
250 let mut failed = 0usize;
251
252 for node in &nodes {
253 match node.status {
254 NodeStatus::Pending => pending += 1,
255 NodeStatus::Ready => ready += 1,
256 NodeStatus::Running => running += 1,
257 NodeStatus::Complete => complete += 1,
258 NodeStatus::Failed => failed += 1,
259 }
260 let st = spec_type(&serde_json::to_string(&node.spec).unwrap_or_default());
261 yield GraphSnapshotResult::Node(NodeSnapshot {
262 node_id: node.id.clone(),
263 status: format!("{:?}", node.status).to_lowercase(),
264 spec_type: st,
265 error: node.error.clone(),
266 });
267 }
268
269 yield GraphSnapshotResult::Summary {
270 graph_id,
271 graph_status: format!("{}", graph.status),
272 total: nodes.len(),
273 pending, ready, running, complete, failed,
274 };
275 }
276 }
277
278 #[plexus_macros::method(description = "Hard-crash the substrate (SIGKILL self) — use to test crash recovery")]
282 async fn crash(&self) -> impl Stream<Item = InjectResult> + Send + 'static {
283 stream! {
284 tracing::warn!("chaos: crash() called — killing substrate process");
285 yield InjectResult::Ok {
286 graph_id: "self".to_string(),
287 node_id: "self".to_string(),
288 action: "crashing".to_string(),
289 };
290 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
292 unsafe { libc::kill(std::process::id() as i32, libc::SIGKILL) };
293 }
294 }
295}