1use crate::{
8 error::{Error, Result},
9 get_progress_bar,
10 inventory::VirtualMachine,
11 run_external_command,
12 s3::S3Repository,
13 TestnetDeployer,
14};
15use fs_extra::dir::{copy, remove, CopyOptions};
16use log::debug;
17use rayon::iter::{IntoParallelIterator, IntoParallelRefIterator, ParallelIterator};
18use std::{
19 fs::File,
20 io::{Cursor, Read, Write},
21 path::{Path, PathBuf},
22};
23
24const DEFAULT_RSYNC_ARGS: [&str; 8] = [
25 "--compress",
26 "--archive",
27 "--prune-empty-dirs",
28 "--verbose",
29 "--verbose",
30 "--filter=+ */", "--filter=+ *.log*", "--filter=- *", ];
34
35const NODE_LOG_DIR: &str = "/mnt/antnode-storage/log/";
36
37impl TestnetDeployer {
38 pub fn rsync_logs(
39 &self,
40 name: &str,
41 vm_filter: Option<String>,
42 disable_client_logs: bool,
43 ) -> Result<()> {
44 let root_dir = std::env::current_dir()?;
46
47 let mut uploader_inventory = vec![];
48 if !disable_client_logs {
49 let unfiltered_uploader_vms = self.ansible_provisioner.get_current_uploader_count()?;
50 let uploader_vm = if let Some(filter) = &vm_filter {
51 unfiltered_uploader_vms
52 .into_iter()
53 .filter(|(vm, _)| vm.name.contains(filter))
54 .collect()
55 } else {
56 unfiltered_uploader_vms
57 };
58 uploader_inventory.extend(uploader_vm);
59 }
60
61 let unfiltered_node_vms = self.get_all_node_inventory(name)?;
62 let all_node_inventory = if let Some(filter) = vm_filter {
63 unfiltered_node_vms
64 .into_iter()
65 .filter(|vm| vm.name.contains(&filter))
66 .collect()
67 } else {
68 unfiltered_node_vms
69 };
70
71 create_initial_log_dir_setup_client(&root_dir, name, &uploader_inventory)?;
72 let log_base_dir = create_initial_log_dir_setup_node(&root_dir, name, &all_node_inventory)?;
73
74 std::env::set_current_dir(self.working_directory_path.clone())?;
76 println!("Starting to rsync the log files");
77 let progress_bar = get_progress_bar(all_node_inventory.len() as u64)?;
78
79 let mut rsync_args = all_node_inventory
80 .iter()
81 .map(|vm| {
82 let args = if vm.name.contains("symmetric") {
83 let args = self.construct_symmetric_private_node_args(vm, &log_base_dir)?;
84 debug!("Using symmetric rsync args for {:?}", vm.name);
85 debug!("Args for {}: {:?}", vm.name, args);
86 args
87 } else if vm.name.contains("full-cone") {
88 let args = self.construct_full_cone_private_node_args(vm, &log_base_dir)?;
89 debug!("Using symmetric rsync args for {:?}", vm.name);
90 debug!("Args for {}: {:?}", vm.name, args);
91 args
92 } else {
93 let args = self.construct_public_node_args(vm, &log_base_dir);
94 debug!("Using public rsync args for {:?}", vm.name);
95 debug!("Args for {}: {:?}", vm.name, args);
96 args
97 };
98
99 Ok((vm.clone(), args))
100 })
101 .collect::<Result<Vec<_>>>()?;
102
103 rsync_args.extend(uploader_inventory.iter().flat_map(|(vm, uploader_count)| {
104 let mut all_user_args = vec![];
105 for ant_user in 1..=*uploader_count {
106 let vm = vm.clone();
107 let args = self.construct_uploader_args(&vm, ant_user, &log_base_dir);
108 debug!(
109 "Using uploader rsync args for {:?} and user {ant_user}",
110 vm.name
111 );
112 debug!("Args for {} and user {ant_user}: {:?}", vm.name, args);
113 all_user_args.push((vm, args));
114 }
115 all_user_args
116 }));
117
118 let failed_inventory = rsync_args
119 .par_iter()
120 .filter_map(|(vm, args)| {
121 if let Err(err) = Self::run_rsync(vm, args) {
122 println!(
123 "Failed to rsync. Retrying it after ssh-keygen {:?} : {} with err: {err:?}",
124 vm.name, vm.public_ip_addr
125 );
126 return Some((vm.clone(), args.clone()));
127 }
128 progress_bar.inc(1);
129 None
130 })
131 .collect::<Vec<_>>();
132
133 failed_inventory
135 .into_par_iter()
136 .for_each(|(vm, args)| {
137 debug!("Trying to ssh-keygen for {:?} : {}", vm.name, vm.public_ip_addr);
138 if let Err(err) = run_external_command(
139 PathBuf::from("ssh-keygen"),
140 PathBuf::from("."),
141 vec!["-R".to_string(), format!("{}", vm.public_ip_addr)],
142 false,
143 false,
144 ) {
145 println!("Failed to ssh-keygen {:?} : {} with err: {err:?}", vm.name, vm.public_ip_addr);
146 } else if let Err(err) =
147 Self::run_rsync(&vm, &args)
148 {
149 println!("Failed to rsync even after ssh-keygen. Could not obtain logs for {:?} : {} with err: {err:?}", vm.name, vm.public_ip_addr);
150 }
151 progress_bar.inc(1);
152 });
153 progress_bar.finish_and_clear();
154 println!("Rsync completed!");
155 Ok(())
156 }
157
158 fn construct_uploader_args(
159 &self,
160 vm: &VirtualMachine,
161 ant_user: usize,
162 log_base_dir: &Path,
163 ) -> Vec<String> {
164 let vm_path = log_base_dir.join(format!("{}_ant{ant_user}", vm.name));
165 let mut rsync_args = DEFAULT_RSYNC_ARGS
166 .iter()
167 .map(|str| str.to_string())
168 .collect::<Vec<String>>();
169
170 rsync_args.extend(vec![
173 "-e".to_string(),
174 format!(
175 "ssh -i {} -q -o StrictHostKeyChecking=no -o BatchMode=yes -o ConnectTimeout=30",
176 self.ssh_client
177 .get_private_key_path()
178 .to_string_lossy()
179 .as_ref()
180 ),
181 format!(
182 "root@{}:/home/ant{ant_user}/.local/share/autonomi/client/logs/",
183 vm.public_ip_addr
184 ),
185 vm_path.to_string_lossy().to_string(),
186 ]);
187
188 rsync_args
189 }
190
191 fn construct_public_node_args(&self, vm: &VirtualMachine, log_base_dir: &Path) -> Vec<String> {
192 let vm_path = log_base_dir.join(&vm.name);
193 let mut rsync_args = DEFAULT_RSYNC_ARGS
194 .iter()
195 .map(|str| str.to_string())
196 .collect::<Vec<String>>();
197
198 rsync_args.extend(vec![
201 "-e".to_string(),
202 format!(
203 "ssh -i {} -q -o StrictHostKeyChecking=no -o BatchMode=yes -o ConnectTimeout=30",
204 self.ssh_client
205 .get_private_key_path()
206 .to_string_lossy()
207 .as_ref()
208 ),
209 format!("root@{}:{NODE_LOG_DIR}", vm.public_ip_addr),
210 vm_path.to_string_lossy().to_string(),
211 ]);
212
213 rsync_args
214 }
215
216 fn construct_full_cone_private_node_args(
217 &self,
218 private_vm: &VirtualMachine,
219 log_base_dir: &Path,
220 ) -> Result<Vec<String>> {
221 let vm_path = log_base_dir.join(&private_vm.name);
222
223 let mut rsync_args = DEFAULT_RSYNC_ARGS
224 .iter()
225 .map(|str| str.to_string())
226 .collect::<Vec<String>>();
227
228 let read_lock = self.ssh_client.routed_vms.read().map_err(|err| {
229 log::error!("Failed to set routed VMs: {err}");
230 Error::SshSettingsRwLockError
231 })?;
232 let (_, gateway_ip) = read_lock
233 .as_ref()
234 .and_then(|routed_vms| {
235 routed_vms.find_full_cone_nat_routed_node(&private_vm.public_ip_addr)
236 })
237 .ok_or(Error::RoutedVmNotFound(private_vm.public_ip_addr))?;
238
239 rsync_args.extend(vec![
240 "-e".to_string(),
241 format!(
242 "ssh -i {} -q -o StrictHostKeyChecking=no -o BatchMode=yes -o ConnectTimeout=30",
243 self.ssh_client
244 .get_private_key_path()
245 .to_string_lossy()
246 .as_ref()
247 ),
248 format!("root@{}:{NODE_LOG_DIR}", gateway_ip),
249 vm_path.to_string_lossy().to_string(),
250 ]);
251
252 Ok(rsync_args)
253 }
254
255 fn construct_symmetric_private_node_args(
256 &self,
257 private_vm: &VirtualMachine,
258 log_base_dir: &Path,
259 ) -> Result<Vec<String>> {
260 let vm_path = log_base_dir.join(&private_vm.name);
261
262 let mut rsync_args = DEFAULT_RSYNC_ARGS
263 .iter()
264 .map(|str| str.to_string())
265 .collect::<Vec<String>>();
266
267 let read_lock = self.ssh_client.routed_vms.read().map_err(|err| {
268 log::error!("Failed to set routed VMs: {err}");
269 Error::SshSettingsRwLockError
270 })?;
271 let (_, gateway_ip) = read_lock
272 .as_ref()
273 .and_then(|routed_vms| {
274 routed_vms.find_symmetric_nat_routed_node(&private_vm.public_ip_addr)
275 })
276 .ok_or(Error::RoutedVmNotFound(private_vm.public_ip_addr))?;
277
278 rsync_args.extend(vec![
279 "-e".to_string(),
280 format!(
281 "ssh -i {} -q -o StrictHostKeyChecking=no -o BatchMode=yes -o ConnectTimeout=30 -o ProxyCommand='ssh root@{gateway_ip} -W %h:%p -i {}'",
282 self.ssh_client
283 .get_private_key_path()
284 .to_string_lossy()
285 .as_ref(),
286 self.ssh_client
287 .get_private_key_path()
288 .to_string_lossy()
289 .as_ref(),
290 ),
291 format!("root@{}:{NODE_LOG_DIR}", private_vm.private_ip_addr),
292 vm_path.to_string_lossy().to_string(),
293 ]);
294
295 Ok(rsync_args)
296 }
297
298 fn run_rsync(vm: &VirtualMachine, rsync_args: &[String]) -> Result<()> {
299 debug!(
300 "Rsync logs to our machine for {:?} : {}",
301 vm.name, vm.public_ip_addr
302 );
303 run_external_command(
304 PathBuf::from("rsync"),
305 PathBuf::from("."),
306 rsync_args.to_vec(),
307 true,
308 false,
309 )?;
310
311 debug!(
312 "Finished rsync for for {:?} : {}",
313 vm.name, vm.public_ip_addr
314 );
315 Ok(())
316 }
317
318 pub fn ripgrep_logs(&self, name: &str, rg_args: &str) -> Result<()> {
319 let root_dir = std::env::current_dir()?;
321 let all_node_inventory = self.get_all_node_inventory(name)?;
322 let log_abs_dest = create_initial_log_dir_setup_node(&root_dir, name, &all_node_inventory)?;
323
324 let rg_cmd = format!("rg {rg_args} /mnt/antnode-storage/log//");
325 println!("Running ripgrep with command: {rg_cmd}");
326
327 let now = chrono::Utc::now();
329 let timestamp = now.format("%Y%m%dT%H%M%S").to_string();
330 let progress_bar = get_progress_bar(all_node_inventory.len() as u64)?;
331 let _failed_inventory = all_node_inventory
332 .par_iter()
333 .filter_map(|vm| {
334 let op =
335 match self
336 .ssh_client
337 .run_command(&vm.public_ip_addr, "root", &rg_cmd, true)
338 {
339 Ok(output) => {
340 match Self::store_rg_output(
341 ×tamp,
342 &rg_cmd,
343 &output,
344 &log_abs_dest,
345 &vm.name,
346 ) {
347 Ok(_) => None,
348 Err(err) => {
349 println!(
350 "Failed store output for {:?} with: {err:?}",
351 vm.public_ip_addr
352 );
353 Some(vm)
354 }
355 }
356 }
357 Err(Error::ExternalCommandRunFailed {
358 binary,
359 exit_status,
360 }) => {
361 if let Some(1) = exit_status.code() {
362 debug!("No matches found for {:?}", vm.public_ip_addr);
363 match Self::store_rg_output(
364 ×tamp,
365 &rg_cmd,
366 &["No matches found".to_string()],
367 &log_abs_dest,
368 &vm.name,
369 ) {
370 Ok(_) => None,
371 Err(err) => {
372 println!(
373 "Failed store output for {:?} with: {err:?}",
374 vm.public_ip_addr
375 );
376 Some(vm)
377 }
378 }
379 } else {
380 println!(
381 "Failed to run rg query for {:?} with: {binary}",
382 vm.public_ip_addr
383 );
384 Some(vm)
385 }
386 }
387 Err(err) => {
388 println!(
389 "Failed to run rg query for {:?} with: {err:?}",
390 vm.public_ip_addr
391 );
392 Some(vm)
393 }
394 };
395 progress_bar.inc(1);
396 op
397 })
398 .collect::<Vec<_>>();
399
400 progress_bar.finish_and_clear();
401 println!("Ripgrep completed!");
402
403 Ok(())
404 }
405
406 fn store_rg_output(
407 timestamp: &str,
408 cmd: &str,
409 output: &[String],
410 log_abs_dest: &Path,
411 vm_name: &str,
412 ) -> Result<()> {
413 std::fs::create_dir_all(log_abs_dest.join(vm_name))?;
414
415 let mut file = File::create(
416 log_abs_dest
417 .join(vm_name)
418 .join(format!("rg-{timestamp}.log")),
419 )?;
420
421 writeln!(file, "Command: {cmd}")?;
422
423 for line in output {
424 writeln!(file, "{}", line)?;
425 }
426
427 Ok(())
428 }
429
430 pub fn copy_logs(&self, name: &str, resources_only: bool) -> Result<()> {
435 let dest = PathBuf::from(".").join("logs").join(name);
436 if dest.exists() {
437 println!("Removing existing {} directory", dest.to_string_lossy());
438 remove(dest.clone())?;
439 }
440 std::fs::create_dir_all(&dest)?;
441 self.ansible_provisioner.copy_logs(name, resources_only)?;
442 Ok(())
443 }
444
445 fn get_all_node_inventory(&self, name: &str) -> Result<Vec<VirtualMachine>> {
447 let environments = self.terraform_runner.workspace_list()?;
448 if !environments.contains(&name.to_string()) {
449 return Err(Error::EnvironmentDoesNotExist(name.to_string()));
450 }
451 self.ansible_provisioner.get_all_node_inventory()
452 }
453}
454
455pub async fn get_logs(name: &str) -> Result<()> {
456 let dest_path = std::env::current_dir()?.join("logs").join(name);
457 std::fs::create_dir_all(dest_path.clone())?;
458 let s3_repository = S3Repository {};
459 s3_repository
460 .download_folder("sn-testnet", &format!("testnet-logs/{name}"), &dest_path)
461 .await?;
462 Ok(())
463}
464
465pub fn reassemble_logs(name: &str) -> Result<()> {
466 let src = PathBuf::from(".").join("logs").join(name);
467 if !src.exists() {
468 return Err(Error::LogsNotRetrievedError(name.to_string()));
469 }
470 let dest = PathBuf::from(".")
471 .join("logs")
472 .join(format!("{name}-reassembled"));
473 if dest.exists() {
474 println!("Removing previous {name}-reassembled directory");
475 remove(dest.clone())?;
476 }
477
478 std::fs::create_dir_all(&dest)?;
479 let mut options = CopyOptions::new();
480 options.overwrite = true;
481 copy(src.clone(), dest.clone(), &options)?;
482
483 visit_dirs(&dest, &process_part_files, &src, &dest)?;
484 Ok(())
485}
486
487pub async fn rm_logs(name: &str) -> Result<()> {
488 let s3_repository = S3Repository {};
489 s3_repository
490 .delete_folder("sn-testnet", &format!("testnet-logs/{name}"))
491 .await?;
492 Ok(())
493}
494
495fn process_part_files(dir_path: &Path, source_root: &PathBuf, dest_root: &PathBuf) -> Result<()> {
496 let reassembled_dir_path = if dir_path == dest_root {
497 dest_root.clone()
498 } else {
499 dest_root.join(dir_path.strip_prefix(source_root)?)
500 };
501 std::fs::create_dir_all(&reassembled_dir_path)?;
502
503 let entries: Vec<_> = std::fs::read_dir(dir_path)?
504 .map(|res| res.map(|e| e.path()))
505 .collect::<Result<Vec<_>, _>>()?;
506
507 let mut part_files: Vec<_> = entries
508 .iter()
509 .filter(|path| path.is_file() && path.to_string_lossy().contains("part"))
510 .collect();
511
512 part_files.sort_by_key(|a| {
513 a.file_stem()
514 .unwrap()
515 .to_string_lossy()
516 .split(".part")
517 .nth(1)
518 .unwrap()
519 .parse::<u32>()
520 .unwrap()
521 });
522
523 if part_files.is_empty() {
524 return Ok(());
525 }
526
527 let output_file_path = reassembled_dir_path.join("reassembled.log");
528 println!("Creating reassembled file at {output_file_path:#?}");
529 let mut output_file = File::create(&output_file_path)?;
530 for part_file in part_files.iter() {
531 let mut part_content = String::new();
532 File::open(part_file)?.read_to_string(&mut part_content)?;
533
534 part_content = part_content.replace("\\n", "\n");
537
538 let mut cursor = Cursor::new(part_content);
539 std::io::copy(&mut cursor, &mut output_file)?;
540 std::fs::remove_file(part_file)?;
541 }
542
543 Ok(())
544}
545
546fn visit_dirs(
547 dir: &Path,
548 cb: &dyn Fn(&Path, &PathBuf, &PathBuf) -> Result<()>,
549 source_root: &PathBuf,
550 dest_root: &PathBuf,
551) -> Result<()> {
552 if dir.is_dir() {
553 cb(dir, source_root, dest_root)?;
554 for entry in std::fs::read_dir(dir)? {
555 let entry = entry?;
556 let path = entry.path();
557 if path.is_dir() {
558 visit_dirs(&path, cb, dest_root, dest_root)?;
559 }
560 }
561 }
562 Ok(())
563}
564
565fn create_initial_log_dir_setup_node(
567 root_dir: &Path,
568 name: &str,
569 all_node_inventory: &[VirtualMachine],
570) -> Result<PathBuf> {
571 let log_dest = root_dir.join("logs").join(name);
572 if !log_dest.exists() {
573 std::fs::create_dir_all(&log_dest)?;
574 }
575 let log_abs_dest = std::fs::canonicalize(log_dest)?;
577 all_node_inventory.par_iter().for_each(|vm| {
579 let vm_path = log_abs_dest.join(&vm.name);
580 let _ = std::fs::create_dir_all(vm_path);
581 });
582 Ok(log_abs_dest)
583}
584
585fn create_initial_log_dir_setup_client(
587 root_dir: &Path,
588 name: &str,
589 all_node_inventory: &[(VirtualMachine, usize)],
590) -> Result<()> {
591 let log_dest = root_dir.join("logs").join(name);
592 if !log_dest.exists() {
593 std::fs::create_dir_all(&log_dest)?;
594 }
595 let log_abs_dest = std::fs::canonicalize(log_dest)?;
597 all_node_inventory.par_iter().for_each(|(vm, users)| {
599 for ant_user in 1..=*users {
600 let vm_path = log_abs_dest.join(format!("{}_ant{ant_user}", vm.name));
601 let _ = std::fs::create_dir_all(vm_path);
602 }
603 });
604 Ok(())
605}