fresh/services/remote/
spawner.rs1use crate::services::remote::channel::{AgentChannel, ChannelError};
11use crate::services::remote::protocol::{decode_base64, exec_params};
12use std::sync::Arc;
13
14#[derive(Debug, Clone)]
19pub struct SpawnResult {
20 pub stdout: String,
21 pub stderr: String,
22 pub exit_code: i32,
23}
24
25#[derive(Debug, thiserror::Error)]
29pub enum SpawnError {
30 #[error("Channel error: {0}")]
31 Channel(#[from] ChannelError),
32
33 #[error("Process error: {0}")]
34 Process(String),
35
36 #[error("Decode error: {0}")]
37 Decode(String),
38}
39
40#[async_trait::async_trait]
46pub trait ProcessSpawner: Send + Sync {
47 async fn spawn(
49 &self,
50 command: String,
51 args: Vec<String>,
52 cwd: Option<String>,
53 ) -> Result<SpawnResult, SpawnError>;
54}
55
56pub struct LocalProcessSpawner;
61
62#[async_trait::async_trait]
63impl ProcessSpawner for LocalProcessSpawner {
64 async fn spawn(
65 &self,
66 command: String,
67 args: Vec<String>,
68 cwd: Option<String>,
69 ) -> Result<SpawnResult, SpawnError> {
70 let mut cmd = tokio::process::Command::new(&command);
71 cmd.args(&args);
72
73 if let Some(ref dir) = cwd {
74 cmd.current_dir(dir);
75 }
76
77 let output = cmd
78 .output()
79 .await
80 .map_err(|e| SpawnError::Process(e.to_string()))?;
81
82 Ok(SpawnResult {
83 stdout: String::from_utf8_lossy(&output.stdout).to_string(),
84 stderr: String::from_utf8_lossy(&output.stderr).to_string(),
85 exit_code: output.status.code().unwrap_or(-1),
86 })
87 }
88}
89
90pub struct RemoteProcessSpawner {
92 channel: Arc<AgentChannel>,
93}
94
95impl RemoteProcessSpawner {
96 pub fn new(channel: Arc<AgentChannel>) -> Self {
98 Self { channel }
99 }
100}
101
102#[async_trait::async_trait]
103impl ProcessSpawner for RemoteProcessSpawner {
104 async fn spawn(
105 &self,
106 command: String,
107 args: Vec<String>,
108 cwd: Option<String>,
109 ) -> Result<SpawnResult, SpawnError> {
110 let params = exec_params(&command, &args, cwd.as_deref());
111
112 let (mut data_rx, result_rx) = self.channel.request_streaming("exec", params).await?;
114
115 let mut stdout = Vec::new();
116 let mut stderr = Vec::new();
117
118 while let Some(data) = data_rx.recv().await {
120 if let Some(out) = data.get("out").and_then(|v| v.as_str()) {
121 if let Ok(decoded) = decode_base64(out) {
122 stdout.extend_from_slice(&decoded);
123 }
124 }
125 if let Some(err) = data.get("err").and_then(|v| v.as_str()) {
126 if let Ok(decoded) = decode_base64(err) {
127 stderr.extend_from_slice(&decoded);
128 }
129 }
130 }
131
132 let result = result_rx
134 .await
135 .map_err(|_| SpawnError::Channel(ChannelError::ChannelClosed))?
136 .map_err(SpawnError::Process)?;
137
138 let exit_code = result
139 .get("code")
140 .and_then(|v| v.as_i64())
141 .map(|c| c as i32)
142 .unwrap_or(-1);
143
144 Ok(SpawnResult {
145 stdout: String::from_utf8_lossy(&stdout).to_string(),
146 stderr: String::from_utf8_lossy(&stderr).to_string(),
147 exit_code,
148 })
149 }
150}
151
152#[cfg(test)]
153mod tests {
154 use super::*;
155
156 #[tokio::test]
157 async fn test_local_spawner() {
158 let spawner = LocalProcessSpawner;
159 let result = spawner
160 .spawn("echo".to_string(), vec!["hello".to_string()], None)
161 .await
162 .unwrap();
163
164 assert_eq!(result.exit_code, 0);
165 assert!(result.stdout.trim() == "hello");
166 }
167}