1use crate::{
8 error::{Error, Result},
9 get_progress_bar,
10 inventory::VirtualMachine,
11 run_external_command,
12 s3::S3Repository,
13 TestnetDeployer,
14};
15use fs_extra::dir::{copy, remove, CopyOptions};
16use log::debug;
17use rayon::iter::{IntoParallelIterator, IntoParallelRefIterator, ParallelIterator};
18use std::{
19 fs::File,
20 io::{Cursor, Read, Write},
21 path::{Path, PathBuf},
22};
23
24const DEFAULT_RSYNC_ARGS: [&str; 8] = [
25 "--compress",
26 "--archive",
27 "--prune-empty-dirs",
28 "--verbose",
29 "--verbose",
30 "--filter=+ */", "--filter=+ *.log*", "--filter=- *", ];
34
35const NODE_LOG_DIR: &str = "/mnt/antnode-storage/log/";
36
37impl TestnetDeployer {
38 pub fn rsync_logs(
39 &self,
40 name: &str,
41 vm_filter: Option<String>,
42 disable_client_logs: bool,
43 ) -> Result<()> {
44 let root_dir = std::env::current_dir()?;
46
47 let mut all_inventory = vec![];
48 if !disable_client_logs {
49 all_inventory.extend(self.get_client_inventory(name)?);
50 }
51
52 all_inventory.extend(self.get_all_node_inventory(name)?);
53
54 let all_inventory = if let Some(filter) = vm_filter {
55 all_inventory
56 .into_iter()
57 .filter(|vm| vm.name.contains(&filter))
58 .collect()
59 } else {
60 all_inventory
61 };
62
63 let log_base_dir = create_initial_log_dir_setup(&root_dir, name, &all_inventory)?;
64
65 std::env::set_current_dir(self.working_directory_path.clone())?;
67 println!("Starting to rsync the log files");
68 let progress_bar = get_progress_bar(all_inventory.len() as u64)?;
69
70 let rsync_args = all_inventory
71 .iter()
72 .map(|vm| {
73 let args = if vm.name.contains("symmetric") {
74 let args = self.construct_symmetric_private_node_args(vm, &log_base_dir)?;
75 debug!("Using symmetric rsync args for {:?}", vm.name);
76 debug!("Args for {}: {:?}", vm.name, args);
77 args
78 } else if vm.name.contains("full-cone") {
79 let args = self.construct_full_cone_private_node_args(vm, &log_base_dir)?;
80 debug!("Using symmetric rsync args for {:?}", vm.name);
81 debug!("Args for {}: {:?}", vm.name, args);
82 args
83 } else if vm.name.contains("ant-client") {
84 let args = self.construct_client_args(vm, &log_base_dir);
85 debug!("Using Client rsync args for {:?} ", vm.name);
86 debug!("Args for {}: {:?}", vm.name, args);
87 args
88 } else {
89 let args = self.construct_public_node_args(vm, &log_base_dir);
90 debug!("Using public rsync args for {:?}", vm.name);
91 debug!("Args for {}: {:?}", vm.name, args);
92 args
93 };
94
95 Ok((vm.clone(), args))
96 })
97 .collect::<Result<Vec<_>>>()?;
98
99 let failed_inventory = rsync_args
100 .par_iter()
101 .filter_map(|(vm, args)| {
102 if let Err(err) = Self::run_rsync(vm, args) {
103 println!(
104 "Failed to rsync. Retrying it after ssh-keygen {:?} : {} with err: {err:?}",
105 vm.name, vm.public_ip_addr
106 );
107 return Some((vm.clone(), args.clone()));
108 }
109 progress_bar.inc(1);
110 None
111 })
112 .collect::<Vec<_>>();
113
114 failed_inventory
116 .into_par_iter()
117 .for_each(|(vm, args)| {
118 debug!("Trying to ssh-keygen for {:?} : {}", vm.name, vm.public_ip_addr);
119 if let Err(err) = run_external_command(
120 PathBuf::from("ssh-keygen"),
121 PathBuf::from("."),
122 vec!["-R".to_string(), format!("{}", vm.public_ip_addr)],
123 false,
124 false,
125 ) {
126 println!("Failed to ssh-keygen {:?} : {} with err: {err:?}", vm.name, vm.public_ip_addr);
127 } else if let Err(err) =
128 Self::run_rsync(&vm, &args)
129 {
130 println!("Failed to rsync even after ssh-keygen. Could not obtain logs for {:?} : {} with err: {err:?}", vm.name, vm.public_ip_addr);
131 }
132 progress_bar.inc(1);
133 });
134 progress_bar.finish_and_clear();
135 println!("Rsync completed!");
136 Ok(())
137 }
138
139 fn construct_client_args(&self, vm: &VirtualMachine, log_base_dir: &Path) -> Vec<String> {
140 let vm_path = log_base_dir.join(&vm.name);
141 let mut rsync_args = DEFAULT_RSYNC_ARGS
142 .iter()
143 .map(|str| str.to_string())
144 .collect::<Vec<String>>();
145
146 rsync_args.extend(vec![
149 "-e".to_string(),
150 format!(
151 "ssh -i {} -q -o StrictHostKeyChecking=no -o BatchMode=yes -o ConnectTimeout=30",
152 self.ssh_client
153 .get_private_key_path()
154 .to_string_lossy()
155 .as_ref()
156 ),
157 format!("root@{}:/mnt/client/log/", vm.public_ip_addr),
158 vm_path.to_string_lossy().to_string(),
159 ]);
160
161 rsync_args
162 }
163
164 fn construct_public_node_args(&self, vm: &VirtualMachine, log_base_dir: &Path) -> Vec<String> {
165 let vm_path = log_base_dir.join(&vm.name);
166 let mut rsync_args = DEFAULT_RSYNC_ARGS
167 .iter()
168 .map(|str| str.to_string())
169 .collect::<Vec<String>>();
170
171 rsync_args.extend(vec![
174 "-e".to_string(),
175 format!(
176 "ssh -i {} -q -o StrictHostKeyChecking=no -o BatchMode=yes -o ConnectTimeout=30",
177 self.ssh_client
178 .get_private_key_path()
179 .to_string_lossy()
180 .as_ref()
181 ),
182 format!("root@{}:{NODE_LOG_DIR}", vm.public_ip_addr),
183 vm_path.to_string_lossy().to_string(),
184 ]);
185
186 rsync_args
187 }
188
189 fn construct_full_cone_private_node_args(
190 &self,
191 private_vm: &VirtualMachine,
192 log_base_dir: &Path,
193 ) -> Result<Vec<String>> {
194 let vm_path = log_base_dir.join(&private_vm.name);
195
196 let mut rsync_args = DEFAULT_RSYNC_ARGS
197 .iter()
198 .map(|str| str.to_string())
199 .collect::<Vec<String>>();
200
201 let read_lock = self.ssh_client.routed_vms.read().map_err(|err| {
202 log::error!("Failed to set routed VMs: {err}");
203 Error::SshSettingsRwLockError
204 })?;
205 let (_, gateway_ip) = read_lock
206 .as_ref()
207 .and_then(|routed_vms| {
208 routed_vms.find_full_cone_nat_routed_node(&private_vm.public_ip_addr)
209 })
210 .ok_or(Error::RoutedVmNotFound(private_vm.public_ip_addr))?;
211
212 rsync_args.extend(vec![
213 "-e".to_string(),
214 format!(
215 "ssh -i {} -q -o StrictHostKeyChecking=no -o BatchMode=yes -o ConnectTimeout=30",
216 self.ssh_client
217 .get_private_key_path()
218 .to_string_lossy()
219 .as_ref()
220 ),
221 format!("root@{}:{NODE_LOG_DIR}", gateway_ip),
222 vm_path.to_string_lossy().to_string(),
223 ]);
224
225 Ok(rsync_args)
226 }
227
228 fn construct_symmetric_private_node_args(
229 &self,
230 private_vm: &VirtualMachine,
231 log_base_dir: &Path,
232 ) -> Result<Vec<String>> {
233 let vm_path = log_base_dir.join(&private_vm.name);
234
235 let mut rsync_args = DEFAULT_RSYNC_ARGS
236 .iter()
237 .map(|str| str.to_string())
238 .collect::<Vec<String>>();
239
240 let read_lock = self.ssh_client.routed_vms.read().map_err(|err| {
241 log::error!("Failed to set routed VMs: {err}");
242 Error::SshSettingsRwLockError
243 })?;
244 let (_, gateway_ip) = read_lock
245 .as_ref()
246 .and_then(|routed_vms| {
247 routed_vms.find_symmetric_nat_routed_node(&private_vm.public_ip_addr)
248 })
249 .ok_or(Error::RoutedVmNotFound(private_vm.public_ip_addr))?;
250
251 rsync_args.extend(vec![
252 "-e".to_string(),
253 format!(
254 "ssh -i {} -q -o StrictHostKeyChecking=no -o BatchMode=yes -o ConnectTimeout=30 -o ProxyCommand='ssh -o StrictHostKeyChecking=no -o BatchMode=yes root@{gateway_ip} -W %h:%p -i {}'",
255 self.ssh_client
256 .get_private_key_path()
257 .to_string_lossy()
258 .as_ref(),
259 self.ssh_client
260 .get_private_key_path()
261 .to_string_lossy()
262 .as_ref(),
263 ),
264 format!("root@{}:{NODE_LOG_DIR}", private_vm.private_ip_addr),
265 vm_path.to_string_lossy().to_string(),
266 ]);
267
268 Ok(rsync_args)
269 }
270 fn run_rsync(vm: &VirtualMachine, rsync_args: &[String]) -> Result<()> {
271 debug!(
272 "Rsync logs to our machine for {:?} : {}",
273 vm.name, vm.public_ip_addr
274 );
275 run_external_command(
276 PathBuf::from("rsync"),
277 PathBuf::from("."),
278 rsync_args.to_vec(),
279 true,
280 false,
281 )?;
282
283 debug!(
284 "Finished rsync for for {:?} : {}",
285 vm.name, vm.public_ip_addr
286 );
287 Ok(())
288 }
289
290 pub fn ripgrep_logs(&self, name: &str, rg_args: &str) -> Result<()> {
291 let root_dir = std::env::current_dir()?;
293 let all_node_inventory = self.get_all_node_inventory(name)?;
294 let log_abs_dest = create_initial_log_dir_setup(&root_dir, name, &all_node_inventory)?;
295
296 let rg_cmd = format!("rg {rg_args} /mnt/antnode-storage/log//");
297 println!("Running ripgrep with command: {rg_cmd}");
298
299 let now = chrono::Utc::now();
301 let timestamp = now.format("%Y%m%dT%H%M%S").to_string();
302 let progress_bar = get_progress_bar(all_node_inventory.len() as u64)?;
303 let _failed_inventory = all_node_inventory
304 .par_iter()
305 .filter_map(|vm| {
306 let op =
307 match self
308 .ssh_client
309 .run_command(&vm.public_ip_addr, "root", &rg_cmd, true)
310 {
311 Ok(output) => {
312 match Self::store_rg_output(
313 ×tamp,
314 &rg_cmd,
315 &output,
316 &log_abs_dest,
317 &vm.name,
318 ) {
319 Ok(_) => None,
320 Err(err) => {
321 println!(
322 "Failed store output for {:?} with: {err:?}",
323 vm.public_ip_addr
324 );
325 Some(vm)
326 }
327 }
328 }
329 Err(Error::ExternalCommandRunFailed {
330 binary,
331 exit_status,
332 }) => {
333 if let Some(1) = exit_status.code() {
334 debug!("No matches found for {:?}", vm.public_ip_addr);
335 match Self::store_rg_output(
336 ×tamp,
337 &rg_cmd,
338 &["No matches found".to_string()],
339 &log_abs_dest,
340 &vm.name,
341 ) {
342 Ok(_) => None,
343 Err(err) => {
344 println!(
345 "Failed store output for {:?} with: {err:?}",
346 vm.public_ip_addr
347 );
348 Some(vm)
349 }
350 }
351 } else {
352 println!(
353 "Failed to run rg query for {:?} with: {binary}",
354 vm.public_ip_addr
355 );
356 Some(vm)
357 }
358 }
359 Err(err) => {
360 println!(
361 "Failed to run rg query for {:?} with: {err:?}",
362 vm.public_ip_addr
363 );
364 Some(vm)
365 }
366 };
367 progress_bar.inc(1);
368 op
369 })
370 .collect::<Vec<_>>();
371
372 progress_bar.finish_and_clear();
373 println!("Ripgrep completed!");
374
375 Ok(())
376 }
377
378 fn store_rg_output(
379 timestamp: &str,
380 cmd: &str,
381 output: &[String],
382 log_abs_dest: &Path,
383 vm_name: &str,
384 ) -> Result<()> {
385 std::fs::create_dir_all(log_abs_dest.join(vm_name))?;
386
387 let mut file = File::create(
388 log_abs_dest
389 .join(vm_name)
390 .join(format!("rg-{timestamp}.log")),
391 )?;
392
393 writeln!(file, "Command: {cmd}")?;
394
395 for line in output {
396 writeln!(file, "{line}")?;
397 }
398
399 Ok(())
400 }
401
402 pub fn copy_logs(&self, name: &str, resources_only: bool) -> Result<()> {
407 let dest = PathBuf::from(".").join("logs").join(name);
408 if dest.exists() {
409 println!("Removing existing {} directory", dest.to_string_lossy());
410 remove(dest.clone())?;
411 }
412 std::fs::create_dir_all(&dest)?;
413 self.ansible_provisioner.copy_logs(name, resources_only)?;
414 Ok(())
415 }
416
417 fn get_all_node_inventory(&self, name: &str) -> Result<Vec<VirtualMachine>> {
419 let environments = self.terraform_runner.workspace_list()?;
420 if !environments.contains(&name.to_string()) {
421 return Err(Error::EnvironmentDoesNotExist(name.to_string()));
422 }
423 self.ansible_provisioner.get_all_node_inventory()
424 }
425
426 fn get_client_inventory(&self, name: &str) -> Result<Vec<VirtualMachine>> {
427 let environments = self.terraform_runner.workspace_list()?;
428 if !environments.contains(&name.to_string()) {
429 return Err(Error::EnvironmentDoesNotExist(name.to_string()));
430 }
431 self.ansible_provisioner.get_client_inventory()
432 }
433}
434
435pub async fn get_logs(name: &str) -> Result<()> {
436 let dest_path = std::env::current_dir()?.join("logs").join(name);
437 std::fs::create_dir_all(dest_path.clone())?;
438 let s3_repository = S3Repository {};
439 s3_repository
440 .download_folder("sn-testnet", &format!("testnet-logs/{name}"), &dest_path)
441 .await?;
442 Ok(())
443}
444
445pub fn reassemble_logs(name: &str) -> Result<()> {
446 let src = PathBuf::from(".").join("logs").join(name);
447 if !src.exists() {
448 return Err(Error::LogsNotRetrievedError(name.to_string()));
449 }
450 let dest = PathBuf::from(".")
451 .join("logs")
452 .join(format!("{name}-reassembled"));
453 if dest.exists() {
454 println!("Removing previous {name}-reassembled directory");
455 remove(dest.clone())?;
456 }
457
458 std::fs::create_dir_all(&dest)?;
459 let mut options = CopyOptions::new();
460 options.overwrite = true;
461 copy(src.clone(), dest.clone(), &options)?;
462
463 visit_dirs(&dest, &process_part_files, &src, &dest)?;
464 Ok(())
465}
466
467pub async fn rm_logs(name: &str) -> Result<()> {
468 let s3_repository = S3Repository {};
469 s3_repository
470 .delete_folder("sn-testnet", &format!("testnet-logs/{name}"))
471 .await?;
472 Ok(())
473}
474
475fn process_part_files(dir_path: &Path, source_root: &PathBuf, dest_root: &PathBuf) -> Result<()> {
476 let reassembled_dir_path = if dir_path == dest_root {
477 dest_root.clone()
478 } else {
479 dest_root.join(dir_path.strip_prefix(source_root)?)
480 };
481 std::fs::create_dir_all(&reassembled_dir_path)?;
482
483 let entries: Vec<_> = std::fs::read_dir(dir_path)?
484 .map(|res| res.map(|e| e.path()))
485 .collect::<Result<Vec<_>, _>>()?;
486
487 let mut part_files: Vec<_> = entries
488 .iter()
489 .filter(|path| path.is_file() && path.to_string_lossy().contains("part"))
490 .collect();
491
492 part_files.sort_by_key(|a| {
493 a.file_stem()
494 .unwrap()
495 .to_string_lossy()
496 .split(".part")
497 .nth(1)
498 .unwrap()
499 .parse::<u32>()
500 .unwrap()
501 });
502
503 if part_files.is_empty() {
504 return Ok(());
505 }
506
507 let output_file_path = reassembled_dir_path.join("reassembled.log");
508 println!("Creating reassembled file at {output_file_path:#?}");
509 let mut output_file = File::create(&output_file_path)?;
510 for part_file in part_files.iter() {
511 let mut part_content = String::new();
512 File::open(part_file)?.read_to_string(&mut part_content)?;
513
514 part_content = part_content.replace("\\n", "\n");
517
518 let mut cursor = Cursor::new(part_content);
519 std::io::copy(&mut cursor, &mut output_file)?;
520 std::fs::remove_file(part_file)?;
521 }
522
523 Ok(())
524}
525
526fn visit_dirs(
527 dir: &Path,
528 cb: &dyn Fn(&Path, &PathBuf, &PathBuf) -> Result<()>,
529 source_root: &PathBuf,
530 dest_root: &PathBuf,
531) -> Result<()> {
532 if dir.is_dir() {
533 cb(dir, source_root, dest_root)?;
534 for entry in std::fs::read_dir(dir)? {
535 let entry = entry?;
536 let path = entry.path();
537 if path.is_dir() {
538 visit_dirs(&path, cb, dest_root, dest_root)?;
539 }
540 }
541 }
542 Ok(())
543}
544
545fn create_initial_log_dir_setup(
547 root_dir: &Path,
548 name: &str,
549 all_node_inventory: &[VirtualMachine],
550) -> Result<PathBuf> {
551 let log_dest = root_dir.join("logs").join(name);
552 if !log_dest.exists() {
553 std::fs::create_dir_all(&log_dest)?;
554 }
555 let log_abs_dest = std::fs::canonicalize(log_dest)?;
557 all_node_inventory.par_iter().for_each(|vm| {
559 let vm_path = log_abs_dest.join(&vm.name);
560 let _ = std::fs::create_dir_all(vm_path);
561 });
562 Ok(log_abs_dest)
563}