1use crate::{
8 error::{Error, Result},
9 get_progress_bar,
10 inventory::VirtualMachine,
11 run_external_command,
12 s3::S3Repository,
13 TestnetDeployer,
14};
15use fs_extra::dir::{copy, remove, CopyOptions};
16use log::debug;
17use rayon::iter::{IntoParallelIterator, IntoParallelRefIterator, ParallelIterator};
18use std::{
19 fs::File,
20 io::{Cursor, Read, Write},
21 net::IpAddr,
22 path::{Path, PathBuf},
23};
24
25impl TestnetDeployer {
26 pub fn rsync_logs(&self, name: &str, vm_filter: Option<String>) -> Result<()> {
27 let root_dir = std::env::current_dir()?;
29 let all_node_inventory = self.get_all_node_inventory(name)?;
30 let all_node_inventory = if let Some(filter) = vm_filter {
31 all_node_inventory
32 .into_iter()
33 .filter(|vm| vm.name.contains(&filter))
34 .collect()
35 } else {
36 all_node_inventory
37 };
38
39 let log_abs_dest = create_initial_log_dir_setup(&root_dir, name, &all_node_inventory)?;
40
41 let rsync_args = vec![
43 "--compress".to_string(),
44 "--archive".to_string(),
45 "--prune-empty-dirs".to_string(),
46 "--verbose".to_string(),
47 "--verbose".to_string(),
48 "--filter=+ */".to_string(), "--filter=+ *.log*".to_string(), "--filter=- *".to_string(), ];
52
53 let mut public_rsync_args = rsync_args.clone();
54 public_rsync_args.extend(vec![
58 "-e".to_string(),
59 format!(
60 "ssh -i {} -q -o StrictHostKeyChecking=no -o BatchMode=yes -o ConnectTimeout=30",
61 self.ssh_client
62 .get_private_key_path()
63 .to_string_lossy()
64 .as_ref()
65 ),
66 ]);
67
68 std::env::set_current_dir(self.working_directory_path.clone())?;
94 println!("Starting to rsync the log files");
95 let progress_bar = get_progress_bar(all_node_inventory.len() as u64)?;
96
97 let failed_inventory = all_node_inventory.par_iter().filter_map(|vm| {
98 let args = if vm.name.contains("private") {
99 debug!(
104 "Fallback to public rsync args for private node {:?}",
105 vm.name
106 );
107 &public_rsync_args
108 } else {
110 debug!("Using public rsync args for {:?}", vm.name);
111 &public_rsync_args
112 };
113
114 if let Err(err) = Self::run_rsync(&vm.name, &vm.public_ip_addr, &log_abs_dest, args) {
115 println!(
116 "Failed to rsync. Retrying it after ssh-keygen {:?} : {} with err: {err:?}",
117 vm.name, vm.public_ip_addr
118 );
119 return Some(vm.clone());
120 }
121 progress_bar.inc(1);
122 None
123 });
124
125 failed_inventory
127 .into_par_iter()
128 .for_each(|vm| {
129 debug!("Trying to ssh-keygen for {:?} : {}", vm.name, vm.public_ip_addr);
130 if let Err(err) = run_external_command(
131 PathBuf::from("ssh-keygen"),
132 PathBuf::from("."),
133 vec!["-R".to_string(), format!("{}", vm.public_ip_addr)],
134 false,
135 false,
136 ) {
137 println!("Failed to ssh-keygen {:?} : {} with err: {err:?}", vm.name, vm.public_ip_addr);
138 } else if let Err(err) =
139 Self::run_rsync(&vm.name, &vm.public_ip_addr, &log_abs_dest, &rsync_args)
140 {
141 println!("Failed to rsync even after ssh-keygen. Could not obtain logs for {:?} : {} with err: {err:?}", vm.name, vm.public_ip_addr);
142 }
143 progress_bar.inc(1);
144 });
145 progress_bar.finish_and_clear();
146 println!("Rsync completed!");
147 Ok(())
148 }
149
150 fn run_rsync(
151 vm_name: &String,
152 ip_address: &IpAddr,
153 log_abs_dest: &Path,
154 rsync_args: &[String],
155 ) -> Result<()> {
156 let vm_path = log_abs_dest.join(vm_name);
157 let mut rsync_args_clone = rsync_args.to_vec();
158
159 rsync_args_clone.push(format!("root@{ip_address}:/mnt/antnode-storage/log/"));
160 rsync_args_clone.push(vm_path.to_string_lossy().to_string());
161
162 debug!("Rsync logs to our machine for {vm_name:?} : {ip_address}");
163 run_external_command(
164 PathBuf::from("rsync"),
165 PathBuf::from("."),
166 rsync_args_clone.clone(),
167 true,
168 false,
169 )?;
170
171 debug!("Finished rsync for for {vm_name:?} : {ip_address}");
172 Ok(())
173 }
174
175 pub fn ripgrep_logs(&self, name: &str, rg_args: &str) -> Result<()> {
176 let root_dir = std::env::current_dir()?;
178 let all_node_inventory = self.get_all_node_inventory(name)?;
179 let log_abs_dest = create_initial_log_dir_setup(&root_dir, name, &all_node_inventory)?;
180
181 let rg_cmd = format!("rg {rg_args} /mnt/antnode-storage/log//");
182 println!("Running ripgrep with command: {rg_cmd}");
183
184 let now = chrono::Utc::now();
186 let timestamp = now.format("%Y%m%dT%H%M%S").to_string();
187 let progress_bar = get_progress_bar(all_node_inventory.len() as u64)?;
188 let _failed_inventory = all_node_inventory
189 .par_iter()
190 .filter_map(|vm| {
191 let op =
192 match self
193 .ssh_client
194 .run_command(&vm.public_ip_addr, "root", &rg_cmd, true)
195 {
196 Ok(output) => {
197 match Self::store_rg_output(
198 ×tamp,
199 &rg_cmd,
200 &output,
201 &log_abs_dest,
202 &vm.name,
203 ) {
204 Ok(_) => None,
205 Err(err) => {
206 println!(
207 "Failed store output for {:?} with: {err:?}",
208 vm.public_ip_addr
209 );
210 Some(vm)
211 }
212 }
213 }
214 Err(Error::ExternalCommandRunFailed {
215 binary,
216 exit_status,
217 }) => {
218 if let Some(1) = exit_status.code() {
219 debug!("No matches found for {:?}", vm.public_ip_addr);
220 match Self::store_rg_output(
221 ×tamp,
222 &rg_cmd,
223 &["No matches found".to_string()],
224 &log_abs_dest,
225 &vm.name,
226 ) {
227 Ok(_) => None,
228 Err(err) => {
229 println!(
230 "Failed store output for {:?} with: {err:?}",
231 vm.public_ip_addr
232 );
233 Some(vm)
234 }
235 }
236 } else {
237 println!(
238 "Failed to run rg query for {:?} with: {binary}",
239 vm.public_ip_addr
240 );
241 Some(vm)
242 }
243 }
244 Err(err) => {
245 println!(
246 "Failed to run rg query for {:?} with: {err:?}",
247 vm.public_ip_addr
248 );
249 Some(vm)
250 }
251 };
252 progress_bar.inc(1);
253 op
254 })
255 .collect::<Vec<_>>();
256
257 progress_bar.finish_and_clear();
258 println!("Ripgrep completed!");
259
260 Ok(())
261 }
262
263 fn store_rg_output(
264 timestamp: &str,
265 cmd: &str,
266 output: &[String],
267 log_abs_dest: &Path,
268 vm_name: &str,
269 ) -> Result<()> {
270 std::fs::create_dir_all(log_abs_dest.join(vm_name))?;
271
272 let mut file = File::create(
273 log_abs_dest
274 .join(vm_name)
275 .join(format!("rg-{timestamp}.log")),
276 )?;
277
278 writeln!(file, "Command: {cmd}")?;
279
280 for line in output {
281 writeln!(file, "{}", line)?;
282 }
283
284 Ok(())
285 }
286
287 pub fn copy_logs(&self, name: &str, resources_only: bool) -> Result<()> {
292 let dest = PathBuf::from(".").join("logs").join(name);
293 if dest.exists() {
294 println!("Removing existing {} directory", dest.to_string_lossy());
295 remove(dest.clone())?;
296 }
297 std::fs::create_dir_all(&dest)?;
298 self.ansible_provisioner.copy_logs(name, resources_only)?;
299 Ok(())
300 }
301
302 fn get_all_node_inventory(&self, name: &str) -> Result<Vec<VirtualMachine>> {
304 let environments = self.terraform_runner.workspace_list()?;
305 if !environments.contains(&name.to_string()) {
306 return Err(Error::EnvironmentDoesNotExist(name.to_string()));
307 }
308 self.ansible_provisioner.get_all_node_inventory()
309 }
310
311 }
329
330pub async fn get_logs(name: &str) -> Result<()> {
331 let dest_path = std::env::current_dir()?.join("logs").join(name);
332 std::fs::create_dir_all(dest_path.clone())?;
333 let s3_repository = S3Repository {};
334 s3_repository
335 .download_folder("sn-testnet", &format!("testnet-logs/{name}"), &dest_path)
336 .await?;
337 Ok(())
338}
339
340pub fn reassemble_logs(name: &str) -> Result<()> {
341 let src = PathBuf::from(".").join("logs").join(name);
342 if !src.exists() {
343 return Err(Error::LogsNotRetrievedError(name.to_string()));
344 }
345 let dest = PathBuf::from(".")
346 .join("logs")
347 .join(format!("{name}-reassembled"));
348 if dest.exists() {
349 println!("Removing previous {name}-reassembled directory");
350 remove(dest.clone())?;
351 }
352
353 std::fs::create_dir_all(&dest)?;
354 let mut options = CopyOptions::new();
355 options.overwrite = true;
356 copy(src.clone(), dest.clone(), &options)?;
357
358 visit_dirs(&dest, &process_part_files, &src, &dest)?;
359 Ok(())
360}
361
362pub async fn rm_logs(name: &str) -> Result<()> {
363 let s3_repository = S3Repository {};
364 s3_repository
365 .delete_folder("sn-testnet", &format!("testnet-logs/{name}"))
366 .await?;
367 Ok(())
368}
369
370fn process_part_files(dir_path: &Path, source_root: &PathBuf, dest_root: &PathBuf) -> Result<()> {
371 let reassembled_dir_path = if dir_path == dest_root {
372 dest_root.clone()
373 } else {
374 dest_root.join(dir_path.strip_prefix(source_root)?)
375 };
376 std::fs::create_dir_all(&reassembled_dir_path)?;
377
378 let entries: Vec<_> = std::fs::read_dir(dir_path)?
379 .map(|res| res.map(|e| e.path()))
380 .collect::<Result<Vec<_>, _>>()?;
381
382 let mut part_files: Vec<_> = entries
383 .iter()
384 .filter(|path| path.is_file() && path.to_string_lossy().contains("part"))
385 .collect();
386
387 part_files.sort_by_key(|a| {
388 a.file_stem()
389 .unwrap()
390 .to_string_lossy()
391 .split(".part")
392 .nth(1)
393 .unwrap()
394 .parse::<u32>()
395 .unwrap()
396 });
397
398 if part_files.is_empty() {
399 return Ok(());
400 }
401
402 let output_file_path = reassembled_dir_path.join("reassembled.log");
403 println!("Creating reassembled file at {output_file_path:#?}");
404 let mut output_file = File::create(&output_file_path)?;
405 for part_file in part_files.iter() {
406 let mut part_content = String::new();
407 File::open(part_file)?.read_to_string(&mut part_content)?;
408
409 part_content = part_content.replace("\\n", "\n");
412
413 let mut cursor = Cursor::new(part_content);
414 std::io::copy(&mut cursor, &mut output_file)?;
415 std::fs::remove_file(part_file)?;
416 }
417
418 Ok(())
419}
420
421fn visit_dirs(
422 dir: &Path,
423 cb: &dyn Fn(&Path, &PathBuf, &PathBuf) -> Result<()>,
424 source_root: &PathBuf,
425 dest_root: &PathBuf,
426) -> Result<()> {
427 if dir.is_dir() {
428 cb(dir, source_root, dest_root)?;
429 for entry in std::fs::read_dir(dir)? {
430 let entry = entry?;
431 let path = entry.path();
432 if path.is_dir() {
433 visit_dirs(&path, cb, dest_root, dest_root)?;
434 }
435 }
436 }
437 Ok(())
438}
439
440fn create_initial_log_dir_setup(
442 root_dir: &Path,
443 name: &str,
444 all_node_inventory: &[VirtualMachine],
445) -> Result<PathBuf> {
446 let log_dest = root_dir.join("logs").join(name);
447 if !log_dest.exists() {
448 std::fs::create_dir_all(&log_dest)?;
449 }
450 let log_abs_dest = std::fs::canonicalize(log_dest)?;
452 all_node_inventory.par_iter().for_each(|vm| {
454 let vm_path = log_abs_dest.join(&vm.name);
455 let _ = std::fs::create_dir_all(vm_path);
456 });
457 Ok(log_abs_dest)
458}