1pub mod local;
2
3use anyhow::{Context, Error, Result};
4use ssh2::Session;
5use std::fs;
6use std::io::{BufRead, BufReader};
7use std::net::TcpStream;
8use std::path::Path;
9use std::sync::Arc;
10use std::sync::mpsc;
11use std::time::Duration;
12use tokio::sync::mpsc as tokio_mpsc;
13use tracing::info;
14
15use crate::models::{ExecutionResult, SshConfig, OutputEvent, OutputType, OutputCallback};
16use crate::Step;
17use crate::vars::VariableManager;
18use crate::ExtractRule;
19
20pub struct SshExecutor;
22
23impl SshExecutor {
24 pub fn execute_script_with_realtime_output(
26 global_scripts:Arc<Vec<String>>,
27 server_name: &str,
28 ssh_config: &SshConfig,
29 step: &Step,
30 pipeline_name: &str,
31 step_name: &str,
32 output_callback: Option<OutputCallback>,
33 mut variable_manager: VariableManager,
34 extract_rules: Option<Vec<ExtractRule>>
35 ) -> Result<ExecutionResult> {
36 info!("Connecting to {}:{} as {}", ssh_config.host, ssh_config.port, ssh_config.username);
37
38 let script_path = step.script.as_str();
40
41 let script_content = std::fs::read_to_string(script_path)
43 .context(format!("Failed to read script file: {}", script_path))?;
44
45 let mut gloabl_script_content = global_scripts.iter()
46 .map(|v|std::fs::read_to_string(v).context(format!("read file:[{}]", v)))
47 .fold(Ok("".to_string()), |p:Result<String>,v|{
48 if p.is_err(){
49 return p;
50 }
51
52 if v.is_err(){
53 return Err(Error::msg(format!("{:?}", v.err())));
54 }
55 let content = v.unwrap();
56
57 let mut s = p.unwrap_or_default();
58
59 s.push_str("\n");
60 s.push_str(&content);
61
62 return Ok(s.clone());
63 })?;
64
65 gloabl_script_content.push_str("\n");
66 gloabl_script_content.push_str(&script_content);
67
68 let script_content = gloabl_script_content.clone();
69
70 variable_manager.set_variable("ssh_server_name".to_string(), server_name.to_string());
71 variable_manager.set_variable("ssh_server_ip".to_string(), ssh_config.host.to_string());
72
73 let script_content = variable_manager.replace_variables(&script_content);
74
75 let ssh_timeout_seconds = ssh_config.timeout_seconds.unwrap_or(3);
77 let ssh_timeout_duration = Duration::from_secs(ssh_timeout_seconds);
78
79 let tcp = connect_with_timeout(&format!("{}:{}", ssh_config.host, ssh_config.port), ssh_timeout_duration)
81 .context("Failed to connect to SSH server")?;
82
83 let timeout_duration = Duration::from_secs(step.timeout_seconds.unwrap_or(30));
84
85 tcp.set_read_timeout(Some(timeout_duration))
87 .context("Failed to set read timeout")?;
88 tcp.set_write_timeout(Some(timeout_duration))
89 .context("Failed to set write timeout")?;
90 tcp.set_nodelay(true)
91 .context("Failed to set TCP nodelay")?;
92
93 let mut sess = Session::new()
95 .context("Failed to create SSH session")?;
96
97 sess.set_tcp_stream(tcp);
98
99 let session_timeout_seconds = step.timeout_seconds.unwrap_or(30);
101 let session_timeout_duration = Duration::from_secs(session_timeout_seconds);
102 sess.set_timeout(session_timeout_duration.as_millis() as u32);
103
104 sess.handshake()
106 .context(format!("SSH handshake failed: timeout {} s", ssh_timeout_seconds))?;
107
108 info!("SSH handshake completed, starting authentication");
109
110 let auth_result = if let Some(ref password) = ssh_config.password {
112 sess.userauth_password(&ssh_config.username, password)
113 .context("SSH password authentication failed")
114 } else if let Some(ref key_path) = ssh_config.private_key_path {
115 sess.userauth_pubkey_file(&ssh_config.username, None, Path::new(key_path), None)
116 .context("SSH key authentication failed")
117 } else {
118 Err(anyhow::anyhow!("No authentication method provided"))
119 };
120
121 auth_result?;
122 info!("SSH authentication successful");
123
124 let mut channel = sess.channel_session()
126 .context("Failed to create SSH channel")?;
127 channel.exec("bash")
128 .context("Failed to exec remote shell")?;
129
130 use std::io::Write;
132 channel.write_all(script_content.as_bytes())
133 .context("Failed to write script to remote shell")?;
134 channel.send_eof()
135 .context("Failed to send EOF to remote shell")?;
136
137 let (tx, mut rx) = tokio_mpsc::channel::<OutputEvent>(100);
139 let output_callback = output_callback.map(|cb| Arc::new(cb));
140
141 let server_name = server_name.to_string();
143 let _step_name = step_name.to_string();
144 let pipeline_name = pipeline_name.to_string();
145 let output_callback_clone = output_callback.clone();
146
147 let output_handle = std::thread::spawn(move || {
148 while let Some(event) = rx.blocking_recv() {
149 if let Some(callback) = &output_callback_clone {
150 callback(event);
151 }
152 }
153 });
154
155 let mut stdout = String::new();
157 let mut stderr = String::new();
158 let start_time = std::time::Instant::now();
159
160 let stdout_stream = channel.stream(0);
162 let mut stdout_reader = BufReader::new(stdout_stream);
163 let mut line = String::new();
164
165 while stdout_reader.read_line(&mut line)? > 0 {
166 let content = line.clone();
167 stdout.push_str(&content);
168
169 let event = OutputEvent {
171 pipeline_name: pipeline_name.clone(),
172 server_name: server_name.clone(),
173 step: step.clone(), output_type: OutputType::Stdout,
175 content: content.trim().to_string(),
176 timestamp: std::time::Instant::now(),
177 variables: variable_manager.get_variables().clone(),
178 };
179
180 if tx.blocking_send(event).is_err() {
181 break;
182 }
183
184 line.clear();
185 }
186
187 let stderr_stream = channel.stderr();
189 let mut stderr_reader = BufReader::new(stderr_stream);
190 line.clear();
191
192 while stderr_reader.read_line(&mut line)? > 0 {
193 let content = line.clone();
194 stderr.push_str(&content);
195
196 let event = OutputEvent {
198 pipeline_name: pipeline_name.clone(),
199 server_name: server_name.clone(),
200 step: step.clone(), output_type: OutputType::Stderr,
202 content: content.trim().to_string(),
203 timestamp: std::time::Instant::now(),
204 variables: variable_manager.get_variables().clone(),
205 };
206
207 if tx.blocking_send(event).is_err() {
208 break;
209 }
210
211 line.clear();
212 }
213
214 drop(tx);
216 if let Err(e) = output_handle.join() {
217 eprintln!("Output handler thread error: {:?}", e);
218 }
219
220 channel.wait_close()
221 .context("Failed to wait for channel close")?;
222
223 let exit_code = channel.exit_status()
224 .context("Failed to get exit status")?;
225
226 let execution_time = start_time.elapsed().as_millis() as u64;
227 info!("SSH command executed with exit code: {}", exit_code);
228
229 let execution_result = ExecutionResult {
231 success: exit_code == 0,
232 stdout,
233 stderr,
234 script: step.script.to_string(),
235 exit_code,
236 execution_time_ms: execution_time,
237 error_message: None,
238 };
239
240 if let Some(rules) = extract_rules {
242 if let Err(e) = variable_manager.extract_variables(&rules, &execution_result) {
243 info!("Failed to extract variables: {}", e);
244 }
245 }
246
247 Ok(execution_result)
248 }
249
250}
251
252fn connect_with_timeout(addr: &str, timeout: Duration) -> std::io::Result<TcpStream> {
254 let (tx, rx) = mpsc::channel();
255 let addr = addr.to_string();
256 let error_message = format!("connect to {} timeout {} s", addr, timeout.as_secs());
257 std::thread::spawn(move || {
258 let res = TcpStream::connect(addr);
259 let _ = tx.send(res);
260 });
261 rx.recv_timeout(timeout).unwrap_or_else(|_| Err(std::io::Error::new(std::io::ErrorKind::TimedOut, error_message)))
262}