1pub mod local;
2
3use anyhow::{Context, Error, Result};
4use ssh2::Session;
5use std::collections::HashMap;
6use std::fs;
7use std::io::{BufRead, BufReader};
8use std::net::TcpStream;
9use std::path::Path;
10use std::sync::Arc;
11use std::sync::mpsc;
12use std::time::{Duration, Instant};
13use tokio::sync::mpsc as tokio_mpsc;
14use tracing::info;
15
16use crate::models::{ExecutionResult, SshConfig, OutputEvent, OutputType, OutputCallback};
17use crate::Step;
18use crate::vars::VariableManager;
19use crate::ExtractRule;
20
21pub struct SshExecutor;
23
24impl SshExecutor {
25 pub fn execute_script_with_realtime_output(
27 global_scripts:Arc<Vec<String>>,
28 server_name: &str,
29 ssh_config: &SshConfig,
30 step: &Step,
31 pipeline_name: &str,
32 step_name: &str,
33 output_callback: Option<OutputCallback>,
34 mut variable_manager: VariableManager,
35 extract_rules: Option<Vec<ExtractRule>>
36 ) -> Result<ExecutionResult> {
37 info!("Connecting to {}:{} as {}", ssh_config.host, ssh_config.port, ssh_config.username);
38
39 let script_path = step.script.as_str();
41
42 let script_content = std::fs::read_to_string(script_path)
44 .context(format!("Failed to read script file: {}", script_path))?;
45
46 let mut gloabl_script_content = global_scripts.iter()
47 .map(|v|std::fs::read_to_string(v).context(format!("read file:[{}]", v)))
48 .fold(Ok("".to_string()), |p:Result<String>,v|{
49 if p.is_err(){
50 return p;
51 }
52
53 if v.is_err(){
54 return Err(Error::msg(format!("{:?}", v.err())));
55 }
56 let content = v.unwrap();
57
58 let mut s = p.unwrap_or_default();
59
60 s.push_str("\n");
61 s.push_str(&content);
62
63 return Ok(s.clone());
64 })?;
65
66 gloabl_script_content.push_str("\n");
67 gloabl_script_content.push_str(&script_content);
68
69 let script_content = gloabl_script_content.clone();
70
71 variable_manager.set_variable("ssh_server_name".to_string(), server_name.to_string());
72 variable_manager.set_variable("ssh_server_ip".to_string(), ssh_config.host.to_string());
73
74 let script_content = variable_manager.replace_variables(&script_content);
75
76 let ssh_timeout_seconds = ssh_config.timeout_seconds.unwrap_or(3);
78 let ssh_timeout_duration = Duration::from_secs(ssh_timeout_seconds);
79
80 let tcp = connect_with_timeout(&format!("{}:{}", ssh_config.host, ssh_config.port), ssh_timeout_duration)
82 .context("Failed to connect to SSH server")?;
83
84 let timeout_duration = Duration::from_secs(step.timeout_seconds.unwrap_or(30));
85
86 tcp.set_read_timeout(Some(timeout_duration))
88 .context("Failed to set read timeout")?;
89 tcp.set_write_timeout(Some(timeout_duration))
90 .context("Failed to set write timeout")?;
91 tcp.set_nodelay(true)
92 .context("Failed to set TCP nodelay")?;
93
94 let mut sess = Session::new()
96 .context("Failed to create SSH session")?;
97
98 sess.set_tcp_stream(tcp);
99
100 let session_timeout_seconds = step.timeout_seconds.unwrap_or(30);
102 let session_timeout_duration = Duration::from_secs(session_timeout_seconds);
103 sess.set_timeout(session_timeout_duration.as_millis() as u32);
104
105 sess.handshake()
107 .context(format!("SSH handshake failed: timeout {} s", ssh_timeout_seconds))?;
108
109 info!("SSH handshake completed, starting authentication");
110
111 let auth_result = if let Some(ref password) = ssh_config.password {
113 sess.userauth_password(&ssh_config.username, password)
114 .context("SSH password authentication failed")
115 } else if let Some(ref key_path) = ssh_config.private_key_path {
116 sess.userauth_pubkey_file(&ssh_config.username, None, Path::new(key_path), None)
117 .context("SSH key authentication failed")
118 } else {
119 Err(anyhow::anyhow!("No authentication method provided"))
120 };
121
122 auth_result?;
123 info!("SSH authentication successful");
124
125 let mut channel = sess.channel_session()
127 .context("Failed to create SSH channel")?;
128 channel.exec("bash")
129 .context("Failed to exec remote shell")?;
130
131 use std::io::Write;
133 channel.write_all(script_content.as_bytes())
134 .context("Failed to write script to remote shell")?;
135 channel.send_eof()
136 .context("Failed to send EOF to remote shell")?;
137
138 let (tx, mut rx) = tokio_mpsc::channel::<OutputEvent>(100);
140 let output_callback = output_callback.map(|cb| Arc::new(cb));
141
142 let server_name = server_name.to_string();
144 let _step_name = step_name.to_string();
145 let pipeline_name = pipeline_name.to_string();
146 let output_callback_clone = output_callback.clone();
147
148 let output_handle = std::thread::spawn(move || {
149 while let Some(event) = rx.blocking_recv() {
150 if let Some(callback) = &output_callback_clone {
151 callback(event);
152 }
153 }
154 });
155
156 let mut stdout = String::new();
158 let mut stderr = String::new();
159 let start_time = std::time::Instant::now();
160
161 let stdout_stream = channel.stream(0);
163 let mut stdout_reader = BufReader::new(stdout_stream);
164 let mut line = String::new();
165
166 while stdout_reader.read_line(&mut line)? > 0 {
167 let content = line.clone();
168 stdout.push_str(&content);
169
170 let event = OutputEvent {
172 pipeline_name: pipeline_name.clone(),
173 server_name: server_name.clone(),
174 step: step.clone(), script_path:step.script.to_string(),
176 output_type: OutputType::Stdout,
177 content: content.trim().to_string(),
178 timestamp: std::time::Instant::now(),
179 variables: variable_manager.get_variables().clone(),
180 };
181
182 if tx.blocking_send(event).is_err() {
183 break;
184 }
185
186 line.clear();
187 }
188
189 let stderr_stream = channel.stderr();
191 let mut stderr_reader = BufReader::new(stderr_stream);
192 line.clear();
193
194 while stderr_reader.read_line(&mut line)? > 0 {
195 let content = line.clone();
196 stderr.push_str(&content);
197
198 let event = OutputEvent {
200 pipeline_name: pipeline_name.clone(),
201 server_name: server_name.clone(),
202 step: step.clone(), script_path:step.script.to_string(),
204 output_type: OutputType::Stderr,
205 content: content.trim().to_string(),
206 timestamp: std::time::Instant::now(),
207 variables: variable_manager.get_variables().clone(),
208 };
209
210 if tx.blocking_send(event).is_err() {
211 break;
212 }
213
214 line.clear();
215 }
216
217 drop(tx);
219 if let Err(e) = output_handle.join() {
220 eprintln!("Output handler thread error: {:?}", e);
221 }
222
223 channel.wait_close()
224 .context("Failed to wait for channel close")?;
225
226 let exit_code = channel.exit_status()
227 .context("Failed to get exit status")?;
228
229 let execution_time = start_time.elapsed().as_millis() as u64;
230 info!("SSH command executed with exit code: {}", exit_code);
231
232 let execution_result = ExecutionResult {
234 success: exit_code == 0,
235 stdout,
236 stderr,
237 script: step.script.to_string(),
238 exit_code,
239 execution_time_ms: execution_time,
240 error_message: None,
241 };
242
243 if let Some(rules) = extract_rules {
245 if let Err(e) = variable_manager.extract_variables(&rules, &execution_result) {
246 info!("Failed to extract variables: {}", e);
247 }
248 }
249
250 Ok(execution_result)
251 }
252
253}
254
255fn connect_with_timeout(addr: &str, timeout: Duration) -> std::io::Result<TcpStream> {
257 let (tx, rx) = mpsc::channel();
258 let addr = addr.to_string();
259 let error_message = format!("connect to {} timeout {} s", addr, timeout.as_secs());
260 std::thread::spawn(move || {
261 let res = TcpStream::connect(addr);
262 let _ = tx.send(res);
263 });
264 rx.recv_timeout(timeout).unwrap_or_else(|_| Err(std::io::Error::new(std::io::ErrorKind::TimedOut, error_message)))
265}