1use serde::{Deserialize, Serialize};
2use time::OffsetDateTime;
3
4use crate::events::{ThreadId, TurnId};
5use crate::remote_runner::{RemoteRunnerSessionId, RunnerDestinationId};
6use crate::tasks::{TaskId, TaskOutputStream};
7
8pub type ProcessId = String;
9
10#[async_trait::async_trait]
11pub trait ProcessStopper: Send + Sync + 'static {
12 async fn stop(&self, reason: Option<String>) -> anyhow::Result<()>;
13}
14
15#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
16#[serde(rename_all = "snake_case")]
17pub enum ProcessOrigin {
18 CommandExec,
19 BackgroundTask,
20 ShellTool,
21 RemoteRunner,
22}
23
24#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
25#[serde(rename_all = "snake_case", rename_all_fields = "camelCase")]
26pub enum ProcessState {
27 Starting,
28 Running,
29 Stopping,
30 Exited { exit_code: Option<i32> },
31 Failed { error: String },
32 Stopped,
33}
34
35#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
36#[serde(rename_all = "camelCase")]
37pub struct ProcessDescriptor {
38 pub process_id: ProcessId,
39 pub origin: ProcessOrigin,
40 pub state: ProcessState,
41 pub command: Vec<String>,
42 pub command_summary: String,
43 #[serde(default, skip_serializing_if = "Option::is_none")]
44 pub cwd: Option<String>,
45 #[serde(default, skip_serializing_if = "Option::is_none")]
46 pub pid: Option<u32>,
47 #[serde(default, skip_serializing_if = "Option::is_none")]
48 pub task_id: Option<TaskId>,
49 #[serde(default, skip_serializing_if = "Option::is_none")]
50 pub thread_id: Option<ThreadId>,
51 #[serde(default, skip_serializing_if = "Option::is_none")]
52 pub turn_id: Option<TurnId>,
53 #[serde(default, skip_serializing_if = "Option::is_none")]
54 pub runner_destination_id: Option<RunnerDestinationId>,
55 #[serde(default, skip_serializing_if = "Option::is_none")]
56 pub runner_session_id: Option<RemoteRunnerSessionId>,
57 pub stoppable: bool,
58 #[serde(with = "time::serde::rfc3339")]
59 pub started_at: OffsetDateTime,
60 #[serde(with = "time::serde::rfc3339")]
61 pub updated_at: OffsetDateTime,
62 #[serde(default, skip_serializing_if = "Option::is_none")]
63 pub stdout_tail: Option<String>,
64 #[serde(default, skip_serializing_if = "Option::is_none")]
65 pub stderr_tail: Option<String>,
66}
67
68#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
69#[serde(rename_all = "camelCase")]
70pub struct ProcessOutput {
71 pub process_id: ProcessId,
72 pub stream: TaskOutputStream,
73 pub chunk: String,
74 #[serde(default)]
75 pub dropped_bytes: u64,
76 #[serde(default, skip_serializing_if = "Option::is_none")]
77 pub thread_id: Option<ThreadId>,
78 #[serde(default, skip_serializing_if = "Option::is_none")]
79 pub turn_id: Option<TurnId>,
80 #[serde(with = "time::serde::rfc3339")]
81 pub timestamp: OffsetDateTime,
82}
83
84#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
85#[serde(rename_all = "camelCase")]
86pub struct ProcessStopResult {
87 pub process_id: ProcessId,
88 pub stopped: bool,
89 #[serde(default, skip_serializing_if = "Option::is_none")]
90 pub process: Option<ProcessDescriptor>,
91}
92
93#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
94#[serde(rename_all = "camelCase")]
95pub struct ProcessStarted {
96 pub process: ProcessDescriptor,
97 #[serde(with = "time::serde::rfc3339")]
98 pub timestamp: OffsetDateTime,
99}
100
101#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
102#[serde(rename_all = "camelCase")]
103pub struct ProcessStopping {
104 pub process_id: ProcessId,
105 #[serde(default, skip_serializing_if = "Option::is_none")]
106 pub reason: Option<String>,
107 #[serde(with = "time::serde::rfc3339")]
108 pub timestamp: OffsetDateTime,
109}
110
111#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
112#[serde(rename_all = "camelCase")]
113pub struct ProcessExited {
114 pub process: ProcessDescriptor,
115 #[serde(default, skip_serializing_if = "Option::is_none")]
116 pub exit_code: Option<i32>,
117 #[serde(with = "time::serde::rfc3339")]
118 pub timestamp: OffsetDateTime,
119}
120
121#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
122#[serde(rename_all = "camelCase")]
123pub struct ProcessStopped {
124 pub process: ProcessDescriptor,
125 #[serde(default, skip_serializing_if = "Option::is_none")]
126 pub reason: Option<String>,
127 #[serde(with = "time::serde::rfc3339")]
128 pub timestamp: OffsetDateTime,
129}
130
131#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
132#[serde(rename_all = "camelCase")]
133pub struct ProcessFailed {
134 pub process: ProcessDescriptor,
135 pub error: String,
136 #[serde(with = "time::serde::rfc3339")]
137 pub timestamp: OffsetDateTime,
138}
139
140#[async_trait::async_trait]
141pub trait ProcessRegistrySink: Send + Sync + 'static {
142 async fn register_process(
143 &self,
144 process: ProcessDescriptor,
145 stopper: Option<std::sync::Arc<dyn ProcessStopper>>,
146 ) -> anyhow::Result<ProcessDescriptor>;
147
148 async fn append_process_output(&self, output: ProcessOutput) -> anyhow::Result<()>;
149
150 async fn mark_process_exited(
151 &self,
152 process_id: &str,
153 exit_code: Option<i32>,
154 ) -> anyhow::Result<()>;
155
156 async fn mark_process_failed(&self, process_id: &str, error: String) -> anyhow::Result<()>;
157
158 async fn mark_process_stopped(
159 &self,
160 process_id: &str,
161 reason: Option<String>,
162 ) -> anyhow::Result<()>;
163}
164
165pub fn command_summary(command: &[String]) -> String {
166 command
167 .iter()
168 .map(|part| redact_command_part(part))
169 .collect::<Vec<_>>()
170 .join(" ")
171}
172
173fn redact_command_part(part: &str) -> String {
174 let lower = part.to_ascii_lowercase();
175 let secret_like = [
176 "token",
177 "secret",
178 "password",
179 "passwd",
180 "apikey",
181 "api_key",
182 "authorization",
183 "bearer",
184 ];
185 if secret_like.iter().any(|needle| lower.contains(needle)) {
186 if let Some((key, _)) = part.split_once('=') {
187 format!("{key}=<redacted>")
188 } else {
189 "<redacted>".to_string()
190 }
191 } else if part.contains(char::is_whitespace) {
192 format!("{part:?}")
193 } else {
194 part.to_string()
195 }
196}
197
198#[cfg(test)]
199mod tests {
200 use super::*;
201
202 fn descriptor() -> ProcessDescriptor {
203 ProcessDescriptor {
204 process_id: "process-1".to_string(),
205 origin: ProcessOrigin::CommandExec,
206 state: ProcessState::Running,
207 command: vec![
208 "curl".to_string(),
209 "Authorization=Bearer abc123".to_string(),
210 "https://example.test".to_string(),
211 ],
212 command_summary: "curl Authorization=<redacted> https://example.test".to_string(),
213 cwd: Some("/repo".to_string()),
214 pid: Some(1234),
215 task_id: Some("task-1".to_string()),
216 thread_id: Some("thread-1".to_string()),
217 turn_id: Some("turn-1".to_string()),
218 runner_destination_id: None,
219 runner_session_id: None,
220 stoppable: true,
221 started_at: OffsetDateTime::UNIX_EPOCH,
222 updated_at: OffsetDateTime::UNIX_EPOCH,
223 stdout_tail: Some("ready\n".to_string()),
224 stderr_tail: None,
225 }
226 }
227
228 #[test]
229 fn process_descriptor_uses_public_process_id_and_camel_case_fields() {
230 let descriptor = descriptor();
231 let value = serde_json::to_value(&descriptor).unwrap();
232
233 assert_eq!(value["processId"], "process-1");
234 assert_eq!(value["pid"], 1234);
235 assert!(value.get("process_id").is_none());
236 assert_eq!(value["state"], "running");
237 assert_eq!(value["commandSummary"], descriptor.command_summary);
238
239 let decoded: ProcessDescriptor = serde_json::from_value(value).unwrap();
240 assert_eq!(decoded, descriptor);
241 }
242
243 #[test]
244 fn process_state_variants_round_trip_with_payloads() {
245 let exited = ProcessState::Exited { exit_code: Some(0) };
246 let value = serde_json::to_value(&exited).unwrap();
247 assert_eq!(value["exited"]["exitCode"], 0);
248 assert_eq!(
249 serde_json::from_value::<ProcessState>(value).unwrap(),
250 exited
251 );
252
253 let failed = ProcessState::Failed {
254 error: "spawn failed".to_string(),
255 };
256 let value = serde_json::to_value(&failed).unwrap();
257 assert_eq!(value["failed"]["error"], "spawn failed");
258 assert_eq!(
259 serde_json::from_value::<ProcessState>(value).unwrap(),
260 failed
261 );
262 }
263
264 #[test]
265 fn command_summary_redacts_secret_like_arguments() {
266 let command = vec![
267 "curl".to_string(),
268 "API_KEY=abc123".to_string(),
269 "--header".to_string(),
270 "Authorization: Bearer abc123".to_string(),
271 "hello world".to_string(),
272 ];
273
274 assert_eq!(
275 command_summary(&command),
276 "curl API_KEY=<redacted> --header <redacted> \"hello world\""
277 );
278 }
279
280 #[test]
281 fn process_stop_result_round_trips_descriptor() {
282 let result = ProcessStopResult {
283 process_id: "process-1".to_string(),
284 stopped: true,
285 process: Some(descriptor()),
286 };
287
288 let value = serde_json::to_value(&result).unwrap();
289 assert_eq!(value["processId"], "process-1");
290 assert!(value["process"]["pid"].is_number());
291 assert_eq!(
292 serde_json::from_value::<ProcessStopResult>(value).unwrap(),
293 result
294 );
295 }
296}