Skip to main content

roder_api/
processes.rs

1use serde::{Deserialize, Serialize};
2use time::OffsetDateTime;
3
4use crate::events::{ThreadId, TurnId};
5use crate::remote_runner::{RemoteRunnerSessionId, RunnerDestinationId};
6use crate::tasks::{TaskId, TaskOutputStream};
7
8pub type ProcessId = String;
9
10#[async_trait::async_trait]
11pub trait ProcessStopper: Send + Sync + 'static {
12    async fn stop(&self, reason: Option<String>) -> anyhow::Result<()>;
13}
14
15#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
16#[serde(rename_all = "snake_case")]
17pub enum ProcessOrigin {
18    CommandExec,
19    BackgroundTask,
20    ShellTool,
21    RemoteRunner,
22}
23
24#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
25#[serde(rename_all = "snake_case", rename_all_fields = "camelCase")]
26pub enum ProcessState {
27    Starting,
28    Running,
29    Stopping,
30    Exited { exit_code: Option<i32> },
31    Failed { error: String },
32    Stopped,
33}
34
35#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
36#[serde(rename_all = "camelCase")]
37pub struct ProcessDescriptor {
38    pub process_id: ProcessId,
39    pub origin: ProcessOrigin,
40    pub state: ProcessState,
41    pub command: Vec<String>,
42    pub command_summary: String,
43    #[serde(default, skip_serializing_if = "Option::is_none")]
44    pub cwd: Option<String>,
45    #[serde(default, skip_serializing_if = "Option::is_none")]
46    pub pid: Option<u32>,
47    #[serde(default, skip_serializing_if = "Option::is_none")]
48    pub task_id: Option<TaskId>,
49    #[serde(default, skip_serializing_if = "Option::is_none")]
50    pub thread_id: Option<ThreadId>,
51    #[serde(default, skip_serializing_if = "Option::is_none")]
52    pub turn_id: Option<TurnId>,
53    #[serde(default, skip_serializing_if = "Option::is_none")]
54    pub runner_destination_id: Option<RunnerDestinationId>,
55    #[serde(default, skip_serializing_if = "Option::is_none")]
56    pub runner_session_id: Option<RemoteRunnerSessionId>,
57    pub stoppable: bool,
58    #[serde(with = "time::serde::rfc3339")]
59    pub started_at: OffsetDateTime,
60    #[serde(with = "time::serde::rfc3339")]
61    pub updated_at: OffsetDateTime,
62    #[serde(default, skip_serializing_if = "Option::is_none")]
63    pub stdout_tail: Option<String>,
64    #[serde(default, skip_serializing_if = "Option::is_none")]
65    pub stderr_tail: Option<String>,
66}
67
68#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
69#[serde(rename_all = "camelCase")]
70pub struct ProcessOutput {
71    pub process_id: ProcessId,
72    pub stream: TaskOutputStream,
73    pub chunk: String,
74    #[serde(default)]
75    pub dropped_bytes: u64,
76    #[serde(default, skip_serializing_if = "Option::is_none")]
77    pub thread_id: Option<ThreadId>,
78    #[serde(default, skip_serializing_if = "Option::is_none")]
79    pub turn_id: Option<TurnId>,
80    #[serde(with = "time::serde::rfc3339")]
81    pub timestamp: OffsetDateTime,
82}
83
84#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
85#[serde(rename_all = "camelCase")]
86pub struct ProcessStopResult {
87    pub process_id: ProcessId,
88    pub stopped: bool,
89    #[serde(default, skip_serializing_if = "Option::is_none")]
90    pub process: Option<ProcessDescriptor>,
91}
92
93#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
94#[serde(rename_all = "camelCase")]
95pub struct ProcessStarted {
96    pub process: ProcessDescriptor,
97    #[serde(with = "time::serde::rfc3339")]
98    pub timestamp: OffsetDateTime,
99}
100
101#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
102#[serde(rename_all = "camelCase")]
103pub struct ProcessStopping {
104    pub process_id: ProcessId,
105    #[serde(default, skip_serializing_if = "Option::is_none")]
106    pub reason: Option<String>,
107    #[serde(with = "time::serde::rfc3339")]
108    pub timestamp: OffsetDateTime,
109}
110
111#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
112#[serde(rename_all = "camelCase")]
113pub struct ProcessExited {
114    pub process: ProcessDescriptor,
115    #[serde(default, skip_serializing_if = "Option::is_none")]
116    pub exit_code: Option<i32>,
117    #[serde(with = "time::serde::rfc3339")]
118    pub timestamp: OffsetDateTime,
119}
120
121#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
122#[serde(rename_all = "camelCase")]
123pub struct ProcessStopped {
124    pub process: ProcessDescriptor,
125    #[serde(default, skip_serializing_if = "Option::is_none")]
126    pub reason: Option<String>,
127    #[serde(with = "time::serde::rfc3339")]
128    pub timestamp: OffsetDateTime,
129}
130
131#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
132#[serde(rename_all = "camelCase")]
133pub struct ProcessFailed {
134    pub process: ProcessDescriptor,
135    pub error: String,
136    #[serde(with = "time::serde::rfc3339")]
137    pub timestamp: OffsetDateTime,
138}
139
140#[async_trait::async_trait]
141pub trait ProcessRegistrySink: Send + Sync + 'static {
142    async fn register_process(
143        &self,
144        process: ProcessDescriptor,
145        stopper: Option<std::sync::Arc<dyn ProcessStopper>>,
146    ) -> anyhow::Result<ProcessDescriptor>;
147
148    async fn append_process_output(&self, output: ProcessOutput) -> anyhow::Result<()>;
149
150    async fn mark_process_exited(
151        &self,
152        process_id: &str,
153        exit_code: Option<i32>,
154    ) -> anyhow::Result<()>;
155
156    async fn mark_process_failed(&self, process_id: &str, error: String) -> anyhow::Result<()>;
157
158    async fn mark_process_stopped(
159        &self,
160        process_id: &str,
161        reason: Option<String>,
162    ) -> anyhow::Result<()>;
163}
164
165pub fn command_summary(command: &[String]) -> String {
166    command
167        .iter()
168        .map(|part| redact_command_part(part))
169        .collect::<Vec<_>>()
170        .join(" ")
171}
172
173fn redact_command_part(part: &str) -> String {
174    let lower = part.to_ascii_lowercase();
175    let secret_like = [
176        "token",
177        "secret",
178        "password",
179        "passwd",
180        "apikey",
181        "api_key",
182        "authorization",
183        "bearer",
184    ];
185    if secret_like.iter().any(|needle| lower.contains(needle)) {
186        if let Some((key, _)) = part.split_once('=') {
187            format!("{key}=<redacted>")
188        } else {
189            "<redacted>".to_string()
190        }
191    } else if part.contains(char::is_whitespace) {
192        format!("{part:?}")
193    } else {
194        part.to_string()
195    }
196}
197
198#[cfg(test)]
199mod tests {
200    use super::*;
201
202    fn descriptor() -> ProcessDescriptor {
203        ProcessDescriptor {
204            process_id: "process-1".to_string(),
205            origin: ProcessOrigin::CommandExec,
206            state: ProcessState::Running,
207            command: vec![
208                "curl".to_string(),
209                "Authorization=Bearer abc123".to_string(),
210                "https://example.test".to_string(),
211            ],
212            command_summary: "curl Authorization=<redacted> https://example.test".to_string(),
213            cwd: Some("/repo".to_string()),
214            pid: Some(1234),
215            task_id: Some("task-1".to_string()),
216            thread_id: Some("thread-1".to_string()),
217            turn_id: Some("turn-1".to_string()),
218            runner_destination_id: None,
219            runner_session_id: None,
220            stoppable: true,
221            started_at: OffsetDateTime::UNIX_EPOCH,
222            updated_at: OffsetDateTime::UNIX_EPOCH,
223            stdout_tail: Some("ready\n".to_string()),
224            stderr_tail: None,
225        }
226    }
227
228    #[test]
229    fn process_descriptor_uses_public_process_id_and_camel_case_fields() {
230        let descriptor = descriptor();
231        let value = serde_json::to_value(&descriptor).unwrap();
232
233        assert_eq!(value["processId"], "process-1");
234        assert_eq!(value["pid"], 1234);
235        assert!(value.get("process_id").is_none());
236        assert_eq!(value["state"], "running");
237        assert_eq!(value["commandSummary"], descriptor.command_summary);
238
239        let decoded: ProcessDescriptor = serde_json::from_value(value).unwrap();
240        assert_eq!(decoded, descriptor);
241    }
242
243    #[test]
244    fn process_state_variants_round_trip_with_payloads() {
245        let exited = ProcessState::Exited { exit_code: Some(0) };
246        let value = serde_json::to_value(&exited).unwrap();
247        assert_eq!(value["exited"]["exitCode"], 0);
248        assert_eq!(
249            serde_json::from_value::<ProcessState>(value).unwrap(),
250            exited
251        );
252
253        let failed = ProcessState::Failed {
254            error: "spawn failed".to_string(),
255        };
256        let value = serde_json::to_value(&failed).unwrap();
257        assert_eq!(value["failed"]["error"], "spawn failed");
258        assert_eq!(
259            serde_json::from_value::<ProcessState>(value).unwrap(),
260            failed
261        );
262    }
263
264    #[test]
265    fn command_summary_redacts_secret_like_arguments() {
266        let command = vec![
267            "curl".to_string(),
268            "API_KEY=abc123".to_string(),
269            "--header".to_string(),
270            "Authorization: Bearer abc123".to_string(),
271            "hello world".to_string(),
272        ];
273
274        assert_eq!(
275            command_summary(&command),
276            "curl API_KEY=<redacted> --header <redacted> \"hello world\""
277        );
278    }
279
280    #[test]
281    fn process_stop_result_round_trips_descriptor() {
282        let result = ProcessStopResult {
283            process_id: "process-1".to_string(),
284            stopped: true,
285            process: Some(descriptor()),
286        };
287
288        let value = serde_json::to_value(&result).unwrap();
289        assert_eq!(value["processId"], "process-1");
290        assert!(value["process"]["pid"].is_number());
291        assert_eq!(
292            serde_json::from_value::<ProcessStopResult>(value).unwrap(),
293            result
294        );
295    }
296}