1use crate::{
8 error::{Error, Result},
9 get_progress_bar,
10 inventory::VirtualMachine,
11 run_external_command,
12 s3::S3Repository,
13 TestnetDeployer,
14};
15use fs_extra::dir::{copy, remove, CopyOptions};
16use log::debug;
17use rayon::iter::{IntoParallelIterator, IntoParallelRefIterator, ParallelIterator};
18use std::{
19 fs::File,
20 io::{Cursor, Read, Write},
21 path::{Path, PathBuf},
22};
23
24const DEFAULT_RSYNC_ARGS: [&str; 8] = [
25 "--compress",
26 "--archive",
27 "--prune-empty-dirs",
28 "--verbose",
29 "--verbose",
30 "--filter=+ */", "--filter=+ *.log*", "--filter=- *", ];
34
35const NODE_LOG_DIR: &str = "/mnt/antnode-storage/log/";
36
37impl TestnetDeployer {
38 pub fn rsync_logs(
39 &self,
40 name: &str,
41 vm_filter: Option<String>,
42 disable_client_logs: bool,
43 ) -> Result<()> {
44 let root_dir = std::env::current_dir()?;
46
47 let mut all_inventory = vec![];
48 if !disable_client_logs {
49 all_inventory.extend(self.get_client_inventory(name)?);
50 }
51
52 all_inventory.extend(self.get_all_node_inventory(name)?);
53
54 let all_inventory = if let Some(filter) = vm_filter {
55 all_inventory
56 .into_iter()
57 .filter(|vm| vm.name.contains(&filter))
58 .collect()
59 } else {
60 all_inventory
61 };
62
63 let log_base_dir = create_initial_log_dir_setup(&root_dir, name, &all_inventory)?;
64
65 std::env::set_current_dir(self.working_directory_path.clone())?;
67 println!("Starting to rsync the log files");
68 let progress_bar = get_progress_bar(all_inventory.len() as u64)?;
69
70 let rsync_args = all_inventory
71 .iter()
72 .map(|vm| {
73 let args = if vm.name.contains("symmetric") {
74 let args = self.construct_symmetric_private_node_args(vm, &log_base_dir)?;
75 debug!("Using symmetric rsync args for {:?}", vm.name);
76 debug!("Args for {}: {:?}", vm.name, args);
77 args
78 } else if vm.name.contains("full-cone") {
79 let args = self.construct_full_cone_private_node_args(vm, &log_base_dir)?;
80 debug!("Using symmetric rsync args for {:?}", vm.name);
81 debug!("Args for {}: {:?}", vm.name, args);
82 args
83 } else if vm.name.contains("ant-client") {
84 let args = self.construct_client_args(vm, &log_base_dir);
85 debug!("Using Client rsync args for {:?} ", vm.name);
86 debug!("Args for {}: {:?}", vm.name, args);
87 args
88 } else {
89 let args = self.construct_public_node_args(vm, &log_base_dir);
90 debug!("Using public rsync args for {:?}", vm.name);
91 debug!("Args for {}: {:?}", vm.name, args);
92 args
93 };
94
95 Ok((vm.clone(), args))
96 })
97 .collect::<Result<Vec<_>>>()?;
98
99 let failed_inventory = rsync_args
100 .par_iter()
101 .filter_map(|(vm, args)| {
102 if let Err(err) = Self::run_rsync(vm, args) {
103 println!(
104 "Failed to rsync. Retrying it after ssh-keygen {:?} : {} with err: {err:?}",
105 vm.name, vm.public_ip_addr
106 );
107 return Some((vm.clone(), args.clone()));
108 }
109 progress_bar.inc(1);
110 None
111 })
112 .collect::<Vec<_>>();
113
114 failed_inventory
116 .into_par_iter()
117 .for_each(|(vm, args)| {
118 debug!("Trying to ssh-keygen for {:?} : {}", vm.name, vm.public_ip_addr);
119 if let Err(err) = run_external_command(
120 PathBuf::from("ssh-keygen"),
121 PathBuf::from("."),
122 vec!["-R".to_string(), format!("{}", vm.public_ip_addr)],
123 false,
124 false,
125 ) {
126 println!("Failed to ssh-keygen {:?} : {} with err: {err:?}", vm.name, vm.public_ip_addr);
127 } else if let Err(err) =
128 Self::run_rsync(&vm, &args)
129 {
130 println!("Failed to rsync even after ssh-keygen. Could not obtain logs for {:?} : {} with err: {err:?}", vm.name, vm.public_ip_addr);
131 }
132 progress_bar.inc(1);
133 });
134 progress_bar.finish_and_clear();
135 println!("Rsync completed!");
136 Ok(())
137 }
138
139 fn construct_client_args(&self, vm: &VirtualMachine, log_base_dir: &Path) -> Vec<String> {
140 let vm_path = log_base_dir.join(&vm.name);
141 let mut rsync_args = DEFAULT_RSYNC_ARGS
142 .iter()
143 .map(|str| str.to_string())
144 .collect::<Vec<String>>();
145
146 rsync_args.extend(vec![
149 "-e".to_string(),
150 format!(
151 "ssh -i {} -q -o StrictHostKeyChecking=no -o BatchMode=yes -o ConnectTimeout=30",
152 self.ssh_client
153 .get_private_key_path()
154 .to_string_lossy()
155 .as_ref()
156 ),
157 format!("root@{}:/mnt/client-logs/log/", vm.public_ip_addr),
158 vm_path.to_string_lossy().to_string(),
159 ]);
160
161 rsync_args
162 }
163
164 fn construct_public_node_args(&self, vm: &VirtualMachine, log_base_dir: &Path) -> Vec<String> {
165 let vm_path = log_base_dir.join(&vm.name);
166 let mut rsync_args = DEFAULT_RSYNC_ARGS
167 .iter()
168 .map(|str| str.to_string())
169 .collect::<Vec<String>>();
170
171 rsync_args.extend(vec![
174 "-e".to_string(),
175 format!(
176 "ssh -i {} -q -o StrictHostKeyChecking=no -o BatchMode=yes -o ConnectTimeout=30",
177 self.ssh_client
178 .get_private_key_path()
179 .to_string_lossy()
180 .as_ref()
181 ),
182 format!("root@{}:{NODE_LOG_DIR}", vm.public_ip_addr),
183 vm_path.to_string_lossy().to_string(),
184 ]);
185
186 rsync_args
187 }
188
189 fn construct_full_cone_private_node_args(
190 &self,
191 private_vm: &VirtualMachine,
192 log_base_dir: &Path,
193 ) -> Result<Vec<String>> {
194 let vm_path = log_base_dir.join(&private_vm.name);
195
196 let mut rsync_args = DEFAULT_RSYNC_ARGS
197 .iter()
198 .map(|str| str.to_string())
199 .collect::<Vec<String>>();
200
201 let read_lock = self.ssh_client.routed_vms.read().map_err(|err| {
202 log::error!("Failed to set routed VMs: {err}");
203 Error::SshSettingsRwLockError
204 })?;
205 let (_, gateway_ip) = read_lock
206 .as_ref()
207 .and_then(|routed_vms| {
208 routed_vms.find_full_cone_nat_routed_node(&private_vm.public_ip_addr)
209 })
210 .ok_or(Error::RoutedVmNotFound(private_vm.public_ip_addr))?;
211
212 rsync_args.extend(vec![
213 "-e".to_string(),
214 format!(
215 "ssh -i {} -q -o StrictHostKeyChecking=no -o BatchMode=yes -o ConnectTimeout=30",
216 self.ssh_client
217 .get_private_key_path()
218 .to_string_lossy()
219 .as_ref()
220 ),
221 format!("root@{}:{NODE_LOG_DIR}", gateway_ip),
222 vm_path.to_string_lossy().to_string(),
223 ]);
224
225 Ok(rsync_args)
226 }
227
228 fn construct_symmetric_private_node_args(
229 &self,
230 private_vm: &VirtualMachine,
231 log_base_dir: &Path,
232 ) -> Result<Vec<String>> {
233 let vm_path = log_base_dir.join(&private_vm.name);
234
235 let mut rsync_args = DEFAULT_RSYNC_ARGS
236 .iter()
237 .map(|str| str.to_string())
238 .collect::<Vec<String>>();
239
240 let read_lock = self.ssh_client.routed_vms.read().map_err(|err| {
241 log::error!("Failed to set routed VMs: {err}");
242 Error::SshSettingsRwLockError
243 })?;
244 let (_, gateway_ip) = read_lock
245 .as_ref()
246 .and_then(|routed_vms| {
247 routed_vms.find_symmetric_nat_routed_node(&private_vm.public_ip_addr)
248 })
249 .ok_or(Error::RoutedVmNotFound(private_vm.public_ip_addr))?;
250
251 rsync_args.extend(vec![
252 "-e".to_string(),
253 format!(
254 "ssh -i {} -q -o StrictHostKeyChecking=no -o BatchMode=yes -o ConnectTimeout=30 -o ProxyCommand='ssh root@{gateway_ip} -W %h:%p -i {}'",
255 self.ssh_client
256 .get_private_key_path()
257 .to_string_lossy()
258 .as_ref(),
259 self.ssh_client
260 .get_private_key_path()
261 .to_string_lossy()
262 .as_ref(),
263 ),
264 format!("root@{}:{NODE_LOG_DIR}", private_vm.private_ip_addr),
265 vm_path.to_string_lossy().to_string(),
266 ]);
267
268 Ok(rsync_args)
269 }
270
271 fn run_rsync(vm: &VirtualMachine, rsync_args: &[String]) -> Result<()> {
272 debug!(
273 "Rsync logs to our machine for {:?} : {}",
274 vm.name, vm.public_ip_addr
275 );
276 run_external_command(
277 PathBuf::from("rsync"),
278 PathBuf::from("."),
279 rsync_args.to_vec(),
280 true,
281 false,
282 )?;
283
284 debug!(
285 "Finished rsync for for {:?} : {}",
286 vm.name, vm.public_ip_addr
287 );
288 Ok(())
289 }
290
291 pub fn ripgrep_logs(&self, name: &str, rg_args: &str) -> Result<()> {
292 let root_dir = std::env::current_dir()?;
294 let all_node_inventory = self.get_all_node_inventory(name)?;
295 let log_abs_dest = create_initial_log_dir_setup(&root_dir, name, &all_node_inventory)?;
296
297 let rg_cmd = format!("rg {rg_args} /mnt/antnode-storage/log//");
298 println!("Running ripgrep with command: {rg_cmd}");
299
300 let now = chrono::Utc::now();
302 let timestamp = now.format("%Y%m%dT%H%M%S").to_string();
303 let progress_bar = get_progress_bar(all_node_inventory.len() as u64)?;
304 let _failed_inventory = all_node_inventory
305 .par_iter()
306 .filter_map(|vm| {
307 let op =
308 match self
309 .ssh_client
310 .run_command(&vm.public_ip_addr, "root", &rg_cmd, true)
311 {
312 Ok(output) => {
313 match Self::store_rg_output(
314 ×tamp,
315 &rg_cmd,
316 &output,
317 &log_abs_dest,
318 &vm.name,
319 ) {
320 Ok(_) => None,
321 Err(err) => {
322 println!(
323 "Failed store output for {:?} with: {err:?}",
324 vm.public_ip_addr
325 );
326 Some(vm)
327 }
328 }
329 }
330 Err(Error::ExternalCommandRunFailed {
331 binary,
332 exit_status,
333 }) => {
334 if let Some(1) = exit_status.code() {
335 debug!("No matches found for {:?}", vm.public_ip_addr);
336 match Self::store_rg_output(
337 ×tamp,
338 &rg_cmd,
339 &["No matches found".to_string()],
340 &log_abs_dest,
341 &vm.name,
342 ) {
343 Ok(_) => None,
344 Err(err) => {
345 println!(
346 "Failed store output for {:?} with: {err:?}",
347 vm.public_ip_addr
348 );
349 Some(vm)
350 }
351 }
352 } else {
353 println!(
354 "Failed to run rg query for {:?} with: {binary}",
355 vm.public_ip_addr
356 );
357 Some(vm)
358 }
359 }
360 Err(err) => {
361 println!(
362 "Failed to run rg query for {:?} with: {err:?}",
363 vm.public_ip_addr
364 );
365 Some(vm)
366 }
367 };
368 progress_bar.inc(1);
369 op
370 })
371 .collect::<Vec<_>>();
372
373 progress_bar.finish_and_clear();
374 println!("Ripgrep completed!");
375
376 Ok(())
377 }
378
379 fn store_rg_output(
380 timestamp: &str,
381 cmd: &str,
382 output: &[String],
383 log_abs_dest: &Path,
384 vm_name: &str,
385 ) -> Result<()> {
386 std::fs::create_dir_all(log_abs_dest.join(vm_name))?;
387
388 let mut file = File::create(
389 log_abs_dest
390 .join(vm_name)
391 .join(format!("rg-{timestamp}.log")),
392 )?;
393
394 writeln!(file, "Command: {cmd}")?;
395
396 for line in output {
397 writeln!(file, "{}", line)?;
398 }
399
400 Ok(())
401 }
402
403 pub fn copy_logs(&self, name: &str, resources_only: bool) -> Result<()> {
408 let dest = PathBuf::from(".").join("logs").join(name);
409 if dest.exists() {
410 println!("Removing existing {} directory", dest.to_string_lossy());
411 remove(dest.clone())?;
412 }
413 std::fs::create_dir_all(&dest)?;
414 self.ansible_provisioner.copy_logs(name, resources_only)?;
415 Ok(())
416 }
417
418 fn get_all_node_inventory(&self, name: &str) -> Result<Vec<VirtualMachine>> {
420 let environments = self.terraform_runner.workspace_list()?;
421 if !environments.contains(&name.to_string()) {
422 return Err(Error::EnvironmentDoesNotExist(name.to_string()));
423 }
424 self.ansible_provisioner.get_all_node_inventory()
425 }
426
427 fn get_client_inventory(&self, name: &str) -> Result<Vec<VirtualMachine>> {
428 let environments = self.terraform_runner.workspace_list()?;
429 if !environments.contains(&name.to_string()) {
430 return Err(Error::EnvironmentDoesNotExist(name.to_string()));
431 }
432 self.ansible_provisioner.get_client_inventory()
433 }
434}
435
436pub async fn get_logs(name: &str) -> Result<()> {
437 let dest_path = std::env::current_dir()?.join("logs").join(name);
438 std::fs::create_dir_all(dest_path.clone())?;
439 let s3_repository = S3Repository {};
440 s3_repository
441 .download_folder("sn-testnet", &format!("testnet-logs/{name}"), &dest_path)
442 .await?;
443 Ok(())
444}
445
446pub fn reassemble_logs(name: &str) -> Result<()> {
447 let src = PathBuf::from(".").join("logs").join(name);
448 if !src.exists() {
449 return Err(Error::LogsNotRetrievedError(name.to_string()));
450 }
451 let dest = PathBuf::from(".")
452 .join("logs")
453 .join(format!("{name}-reassembled"));
454 if dest.exists() {
455 println!("Removing previous {name}-reassembled directory");
456 remove(dest.clone())?;
457 }
458
459 std::fs::create_dir_all(&dest)?;
460 let mut options = CopyOptions::new();
461 options.overwrite = true;
462 copy(src.clone(), dest.clone(), &options)?;
463
464 visit_dirs(&dest, &process_part_files, &src, &dest)?;
465 Ok(())
466}
467
468pub async fn rm_logs(name: &str) -> Result<()> {
469 let s3_repository = S3Repository {};
470 s3_repository
471 .delete_folder("sn-testnet", &format!("testnet-logs/{name}"))
472 .await?;
473 Ok(())
474}
475
476fn process_part_files(dir_path: &Path, source_root: &PathBuf, dest_root: &PathBuf) -> Result<()> {
477 let reassembled_dir_path = if dir_path == dest_root {
478 dest_root.clone()
479 } else {
480 dest_root.join(dir_path.strip_prefix(source_root)?)
481 };
482 std::fs::create_dir_all(&reassembled_dir_path)?;
483
484 let entries: Vec<_> = std::fs::read_dir(dir_path)?
485 .map(|res| res.map(|e| e.path()))
486 .collect::<Result<Vec<_>, _>>()?;
487
488 let mut part_files: Vec<_> = entries
489 .iter()
490 .filter(|path| path.is_file() && path.to_string_lossy().contains("part"))
491 .collect();
492
493 part_files.sort_by_key(|a| {
494 a.file_stem()
495 .unwrap()
496 .to_string_lossy()
497 .split(".part")
498 .nth(1)
499 .unwrap()
500 .parse::<u32>()
501 .unwrap()
502 });
503
504 if part_files.is_empty() {
505 return Ok(());
506 }
507
508 let output_file_path = reassembled_dir_path.join("reassembled.log");
509 println!("Creating reassembled file at {output_file_path:#?}");
510 let mut output_file = File::create(&output_file_path)?;
511 for part_file in part_files.iter() {
512 let mut part_content = String::new();
513 File::open(part_file)?.read_to_string(&mut part_content)?;
514
515 part_content = part_content.replace("\\n", "\n");
518
519 let mut cursor = Cursor::new(part_content);
520 std::io::copy(&mut cursor, &mut output_file)?;
521 std::fs::remove_file(part_file)?;
522 }
523
524 Ok(())
525}
526
527fn visit_dirs(
528 dir: &Path,
529 cb: &dyn Fn(&Path, &PathBuf, &PathBuf) -> Result<()>,
530 source_root: &PathBuf,
531 dest_root: &PathBuf,
532) -> Result<()> {
533 if dir.is_dir() {
534 cb(dir, source_root, dest_root)?;
535 for entry in std::fs::read_dir(dir)? {
536 let entry = entry?;
537 let path = entry.path();
538 if path.is_dir() {
539 visit_dirs(&path, cb, dest_root, dest_root)?;
540 }
541 }
542 }
543 Ok(())
544}
545
546fn create_initial_log_dir_setup(
548 root_dir: &Path,
549 name: &str,
550 all_node_inventory: &[VirtualMachine],
551) -> Result<PathBuf> {
552 let log_dest = root_dir.join("logs").join(name);
553 if !log_dest.exists() {
554 std::fs::create_dir_all(&log_dest)?;
555 }
556 let log_abs_dest = std::fs::canonicalize(log_dest)?;
558 all_node_inventory.par_iter().for_each(|vm| {
560 let vm_path = log_abs_dest.join(&vm.name);
561 let _ = std::fs::create_dir_all(vm_path);
562 });
563 Ok(log_abs_dest)
564}