1pub mod local;
2
3use anyhow::{Context, Error, Result};
4use ssh2::Session;
5use std::fs;
6use std::io::{BufRead, BufReader};
7use std::net::TcpStream;
8use std::path::Path;
9use std::sync::Arc;
10use std::sync::mpsc;
11use std::time::{Duration};
12use tokio::sync::mpsc as tokio_mpsc;
13use tracing::info;
14
15use crate::models::{ExecutionResult, SshConfig, OutputEvent, OutputType, OutputCallback};
16use crate::Step;
17use crate::vars::VariableManager;
18use crate::ExtractRule;
19
20pub struct SshExecutor;
22
23impl SshExecutor {
24 pub fn execute_script_with_realtime_output(
26 script: Option<String>,
27 global_scripts:Arc<Vec<String>>,
28 server_name: &str,
29 ssh_config: &SshConfig,
30 step: &Step,
31 pipeline_name: &str,
32 step_name: &str,
33 output_callback: Option<OutputCallback>,
34 mut variable_manager: VariableManager,
35 extract_rules: Option<Vec<ExtractRule>>
36 ) -> Result<ExecutionResult> {
37 info!("Connecting to {}:{} as {}", ssh_config.host, ssh_config.port, ssh_config.username);
38
39 let script_path = step.script.as_str();
41
42 let script_content = std::fs::read_to_string(script_path)
44 .context(format!("Failed to read script file: {}", script_path))?;
45
46 let mut gloabl_script_content = global_scripts.iter()
47 .map(|v|std::fs::read_to_string(v).context(format!("read file:[{}]", v)))
48 .fold(Ok("".to_string()), |p:Result<String>,v|{
49 if p.is_err(){
50 return p;
51 }
52
53 if v.is_err(){
54 return Err(Error::msg(format!("{:?}", v.err())));
55 }
56 let content = v.unwrap();
57
58 let mut s = p.unwrap_or_default();
59
60 s.push_str("\n");
61 s.push_str(&content);
62
63 return Ok(s.clone());
64 })?;
65
66 if let Some(script_header) = script {
67 let cont = fs::read_to_string(&script_header)
68 .map_err(|e| anyhow::anyhow!("Failed to read script header file '{}': {}", script_header, e))?;
69 gloabl_script_content.push_str("\n");
70 gloabl_script_content.push_str(&cont);
71 }
72
73 gloabl_script_content.push_str("\n");
74 gloabl_script_content.push_str(&script_content);
75
76 let script_content = gloabl_script_content.clone();
77
78 variable_manager.set_variable("ssh_server_name".to_string(), server_name.to_string());
79 variable_manager.set_variable("ssh_server_ip".to_string(), ssh_config.host.to_string());
80
81 let script_content = variable_manager.replace_variables(&script_content);
82
83 let ssh_timeout_seconds = ssh_config.timeout_seconds.unwrap_or(3);
85 let ssh_timeout_duration = Duration::from_secs(ssh_timeout_seconds);
86
87 let tcp = connect_with_timeout(&format!("{}:{}", ssh_config.host, ssh_config.port), ssh_timeout_duration)
89 .context("Failed to connect to SSH server")?;
90
91 let timeout_duration = Duration::from_secs(step.timeout_seconds.unwrap_or(30));
92
93 tcp.set_read_timeout(Some(timeout_duration))
95 .context("Failed to set read timeout")?;
96 tcp.set_write_timeout(Some(timeout_duration))
97 .context("Failed to set write timeout")?;
98 tcp.set_nodelay(true)
99 .context("Failed to set TCP nodelay")?;
100
101 let mut sess = Session::new()
103 .context("Failed to create SSH session")?;
104
105 sess.set_tcp_stream(tcp);
106
107 let session_timeout_seconds = step.timeout_seconds.unwrap_or(30);
109 let session_timeout_duration = Duration::from_secs(session_timeout_seconds);
110 sess.set_timeout(session_timeout_duration.as_millis() as u32);
111
112 sess.handshake()
114 .context(format!("SSH handshake failed: timeout {} s", ssh_timeout_seconds))?;
115
116 info!("SSH handshake completed, starting authentication");
117
118 let auth_result = if let Some(ref password) = ssh_config.password {
120 sess.userauth_password(&ssh_config.username, password)
121 .context("SSH password authentication failed")
122 } else if let Some(ref key_path) = ssh_config.private_key_path {
123 sess.userauth_pubkey_file(&ssh_config.username, None, Path::new(key_path), None)
124 .context("SSH key authentication failed")
125 } else {
126 Err(anyhow::anyhow!("No authentication method provided"))
127 };
128
129 auth_result?;
130 info!("SSH authentication successful");
131
132 let mut channel = sess.channel_session()
134 .context("Failed to create SSH channel")?;
135 channel.exec("bash")
136 .context("Failed to exec remote shell")?;
137
138 use std::io::Write;
140 channel.write_all(script_content.as_bytes())
141 .context("Failed to write script to remote shell")?;
142 channel.send_eof()
143 .context("Failed to send EOF to remote shell")?;
144
145 let (tx, mut rx) = tokio_mpsc::channel::<OutputEvent>(100);
147 let output_callback = output_callback.map(|cb| Arc::new(cb));
148
149 let server_name = server_name.to_string();
151 let _step_name = step_name.to_string();
152 let pipeline_name = pipeline_name.to_string();
153 let output_callback_clone = output_callback.clone();
154
155 let output_handle = std::thread::spawn(move || {
156 while let Some(event) = rx.blocking_recv() {
157 if let Some(callback) = &output_callback_clone {
158 callback(event);
159 }
160 }
161 });
162
163 let mut stdout = String::new();
165 let mut stderr = String::new();
166 let start_time = std::time::Instant::now();
167
168 let stdout_stream = channel.stream(0);
170 let mut stdout_reader = BufReader::new(stdout_stream);
171 let mut line = String::new();
172
173 while stdout_reader.read_line(&mut line)? > 0 {
174 let content = line.clone();
175 stdout.push_str(&content);
176
177 let event = OutputEvent {
179 pipeline_name: pipeline_name.clone(),
180 server_name: server_name.clone(),
181 step: step.clone(), script_path:step.script.to_string(),
183 output_type: OutputType::Stdout,
184 content: content.trim().to_string(),
185 timestamp: std::time::Instant::now(),
186 variables: variable_manager.get_variables().clone(),
187 };
188
189 if tx.blocking_send(event).is_err() {
190 break;
191 }
192
193 line.clear();
194 }
195
196 let stderr_stream = channel.stderr();
198 let mut stderr_reader = BufReader::new(stderr_stream);
199 line.clear();
200
201 while stderr_reader.read_line(&mut line)? > 0 {
202 let content = line.clone();
203 stderr.push_str(&content);
204
205 let event = OutputEvent {
207 pipeline_name: pipeline_name.clone(),
208 server_name: server_name.clone(),
209 step: step.clone(), script_path:step.script.to_string(),
211 output_type: OutputType::Stderr,
212 content: content.trim().to_string(),
213 timestamp: std::time::Instant::now(),
214 variables: variable_manager.get_variables().clone(),
215 };
216
217 if tx.blocking_send(event).is_err() {
218 break;
219 }
220
221 line.clear();
222 }
223
224 drop(tx);
226 if let Err(e) = output_handle.join() {
227 eprintln!("Output handler thread error: {:?}", e);
228 }
229
230 channel.wait_close()
231 .context("Failed to wait for channel close")?;
232
233 let exit_code = channel.exit_status()
234 .context("Failed to get exit status")?;
235
236 let execution_time = start_time.elapsed().as_millis() as u64;
237 info!("SSH command executed with exit code: {}", exit_code);
238
239 let execution_result = ExecutionResult {
241 success: exit_code == 0,
242 stdout,
243 stderr,
244 script: step.script.to_string(),
245 exit_code,
246 execution_time_ms: execution_time,
247 error_message: None,
248 };
249
250 if let Some(rules) = extract_rules {
252 if let Err(e) = variable_manager.extract_variables(&rules, &execution_result) {
253 info!("Failed to extract variables: {}", e);
254 }
255 }
256
257 Ok(execution_result)
258 }
259
260}
261
262fn connect_with_timeout(addr: &str, timeout: Duration) -> std::io::Result<TcpStream> {
264 let (tx, rx) = mpsc::channel();
265 let addr = addr.to_string();
266 let error_message = format!("connect to {} timeout {} s", addr, timeout.as_secs());
267 std::thread::spawn(move || {
268 let res = TcpStream::connect(addr);
269 let _ = tx.send(res);
270 });
271 rx.recv_timeout(timeout).unwrap_or_else(|_| Err(std::io::Error::new(std::io::ErrorKind::TimedOut, error_message)))
272}