fresh/services/remote/
spawner.rs1use crate::services::remote::channel::{AgentChannel, ChannelError};
8use crate::services::remote::protocol::{decode_base64, exec_params};
9use std::sync::Arc;
10
11#[derive(Debug, Clone)]
13pub struct SpawnResult {
14 pub stdout: String,
15 pub stderr: String,
16 pub exit_code: i32,
17}
18
19#[derive(Debug, thiserror::Error)]
21pub enum SpawnError {
22 #[error("Channel error: {0}")]
23 Channel(#[from] ChannelError),
24
25 #[error("Process error: {0}")]
26 Process(String),
27
28 #[error("Decode error: {0}")]
29 Decode(String),
30}
31
32#[async_trait::async_trait]
37pub trait ProcessSpawner: Send + Sync {
38 async fn spawn(
40 &self,
41 command: String,
42 args: Vec<String>,
43 cwd: Option<String>,
44 ) -> Result<SpawnResult, SpawnError>;
45}
46
47pub struct LocalProcessSpawner;
51
52#[async_trait::async_trait]
53impl ProcessSpawner for LocalProcessSpawner {
54 async fn spawn(
55 &self,
56 command: String,
57 args: Vec<String>,
58 cwd: Option<String>,
59 ) -> Result<SpawnResult, SpawnError> {
60 let mut cmd = tokio::process::Command::new(&command);
61 cmd.args(&args);
62
63 if let Some(ref dir) = cwd {
64 cmd.current_dir(dir);
65 }
66
67 let output = cmd
68 .output()
69 .await
70 .map_err(|e| SpawnError::Process(e.to_string()))?;
71
72 Ok(SpawnResult {
73 stdout: String::from_utf8_lossy(&output.stdout).to_string(),
74 stderr: String::from_utf8_lossy(&output.stderr).to_string(),
75 exit_code: output.status.code().unwrap_or(-1),
76 })
77 }
78}
79
80pub struct RemoteProcessSpawner {
82 channel: Arc<AgentChannel>,
83}
84
85impl RemoteProcessSpawner {
86 pub fn new(channel: Arc<AgentChannel>) -> Self {
88 Self { channel }
89 }
90}
91
92#[async_trait::async_trait]
93impl ProcessSpawner for RemoteProcessSpawner {
94 async fn spawn(
95 &self,
96 command: String,
97 args: Vec<String>,
98 cwd: Option<String>,
99 ) -> Result<SpawnResult, SpawnError> {
100 let params = exec_params(&command, &args, cwd.as_deref());
101
102 let (mut data_rx, result_rx) = self.channel.request_streaming("exec", params).await?;
104
105 let mut stdout = Vec::new();
106 let mut stderr = Vec::new();
107
108 while let Some(data) = data_rx.recv().await {
110 if let Some(out) = data.get("out").and_then(|v| v.as_str()) {
111 if let Ok(decoded) = decode_base64(out) {
112 stdout.extend_from_slice(&decoded);
113 }
114 }
115 if let Some(err) = data.get("err").and_then(|v| v.as_str()) {
116 if let Ok(decoded) = decode_base64(err) {
117 stderr.extend_from_slice(&decoded);
118 }
119 }
120 }
121
122 let result = result_rx
124 .await
125 .map_err(|_| SpawnError::Channel(ChannelError::ChannelClosed))?
126 .map_err(SpawnError::Process)?;
127
128 let exit_code = result
129 .get("code")
130 .and_then(|v| v.as_i64())
131 .map(|c| c as i32)
132 .unwrap_or(-1);
133
134 Ok(SpawnResult {
135 stdout: String::from_utf8_lossy(&stdout).to_string(),
136 stderr: String::from_utf8_lossy(&stderr).to_string(),
137 exit_code,
138 })
139 }
140}
141
142#[cfg(test)]
143mod tests {
144 use super::*;
145
146 #[tokio::test]
147 async fn test_local_spawner() {
148 let spawner = LocalProcessSpawner;
149 let result = spawner
150 .spawn("echo".to_string(), vec!["hello".to_string()], None)
151 .await
152 .unwrap();
153
154 assert_eq!(result.exit_code, 0);
155 assert!(result.stdout.trim() == "hello");
156 }
157}