1use crate::{
8 error::{Error, Result},
9 get_progress_bar,
10 inventory::VirtualMachine,
11 run_external_command,
12 s3::S3Repository,
13 TestnetDeployer,
14};
15use fs_extra::dir::{copy, remove, CopyOptions};
16use log::debug;
17use rayon::iter::{IntoParallelIterator, IntoParallelRefIterator, ParallelIterator};
18use std::{
19 fs::File,
20 io::{Cursor, Read, Write},
21 path::{Path, PathBuf},
22};
23
24const DEFAULT_RSYNC_ARGS: [&str; 8] = [
25 "--compress",
26 "--archive",
27 "--prune-empty-dirs",
28 "--verbose",
29 "--verbose",
30 "--filter=+ */", "--filter=+ *.log*", "--filter=- *", ];
34
35const NODE_LOG_DIR: &str = "/mnt/antnode-storage/log/";
36
37impl TestnetDeployer {
38 pub fn rsync_logs(&self, name: &str, vm_filter: Option<String>) -> Result<()> {
39 let root_dir = std::env::current_dir()?;
41 let all_node_inventory = self.get_all_node_inventory(name)?;
42 let all_node_inventory = if let Some(filter) = vm_filter {
43 all_node_inventory
44 .into_iter()
45 .filter(|vm| vm.name.contains(&filter))
46 .collect()
47 } else {
48 all_node_inventory
49 };
50
51 let log_base_dir = create_initial_log_dir_setup(&root_dir, name, &all_node_inventory)?;
52
53 std::env::set_current_dir(self.working_directory_path.clone())?;
55 println!("Starting to rsync the log files");
56 let progress_bar = get_progress_bar(all_node_inventory.len() as u64)?;
57
58 let rsync_args = all_node_inventory
59 .iter()
60 .map(|vm| {
61 let args = if vm.name.contains("symmetric") {
62 let args = self.construct_symmetric_private_node_args(vm, &log_base_dir)?;
63 debug!("Using symmetric rsync args for {:?}", vm.name);
64 debug!("Args for {}: {:?}", vm.name, args);
65 args
66 } else if vm.name.contains("full-cone") {
67 let args = self.construct_full_cone_private_node_args(vm, &log_base_dir)?;
68 debug!("Using symmetric rsync args for {:?}", vm.name);
69 debug!("Args for {}: {:?}", vm.name, args);
70 args
71 } else {
72 let args = self.construct_default_args(vm, &log_base_dir);
73 debug!("Using public rsync args for {:?}", vm.name);
74 debug!("Args for {}: {:?}", vm.name, args);
75 args
76 };
77
78 Ok((vm.clone(), args))
79 })
80 .collect::<Result<Vec<_>>>()?;
81
82 let failed_inventory = rsync_args
83 .par_iter()
84 .filter_map(|(vm, args)| {
85 if let Err(err) = Self::run_rsync(vm, args) {
86 println!(
87 "Failed to rsync. Retrying it after ssh-keygen {:?} : {} with err: {err:?}",
88 vm.name, vm.public_ip_addr
89 );
90 return Some((vm.clone(), args.clone()));
91 }
92 progress_bar.inc(1);
93 None
94 })
95 .collect::<Vec<_>>();
96
97 failed_inventory
99 .into_par_iter()
100 .for_each(|(vm, args)| {
101 debug!("Trying to ssh-keygen for {:?} : {}", vm.name, vm.public_ip_addr);
102 if let Err(err) = run_external_command(
103 PathBuf::from("ssh-keygen"),
104 PathBuf::from("."),
105 vec!["-R".to_string(), format!("{}", vm.public_ip_addr)],
106 false,
107 false,
108 ) {
109 println!("Failed to ssh-keygen {:?} : {} with err: {err:?}", vm.name, vm.public_ip_addr);
110 } else if let Err(err) =
111 Self::run_rsync(&vm, &args)
112 {
113 println!("Failed to rsync even after ssh-keygen. Could not obtain logs for {:?} : {} with err: {err:?}", vm.name, vm.public_ip_addr);
114 }
115 progress_bar.inc(1);
116 });
117 progress_bar.finish_and_clear();
118 println!("Rsync completed!");
119 Ok(())
120 }
121
122 fn construct_default_args(&self, vm: &VirtualMachine, log_base_dir: &Path) -> Vec<String> {
123 let vm_path = log_base_dir.join(&vm.name);
124 let mut rsync_args = DEFAULT_RSYNC_ARGS
125 .iter()
126 .map(|str| str.to_string())
127 .collect::<Vec<String>>();
128
129 rsync_args.extend(vec![
132 "-e".to_string(),
133 format!(
134 "ssh -i {} -q -o StrictHostKeyChecking=no -o BatchMode=yes -o ConnectTimeout=30",
135 self.ssh_client
136 .get_private_key_path()
137 .to_string_lossy()
138 .as_ref()
139 ),
140 format!("root@{}:{NODE_LOG_DIR}", vm.public_ip_addr),
141 vm_path.to_string_lossy().to_string(),
142 ]);
143
144 rsync_args
145 }
146
147 fn construct_full_cone_private_node_args(
148 &self,
149 private_vm: &VirtualMachine,
150 log_base_dir: &Path,
151 ) -> Result<Vec<String>> {
152 let vm_path = log_base_dir.join(&private_vm.name);
153
154 let mut rsync_args = DEFAULT_RSYNC_ARGS
155 .iter()
156 .map(|str| str.to_string())
157 .collect::<Vec<String>>();
158
159 let read_lock = self.ssh_client.routed_vms.read().map_err(|err| {
160 log::error!("Failed to set routed VMs: {err}");
161 Error::SshSettingsRwLockError
162 })?;
163 let (_, gateway_ip) = read_lock
164 .as_ref()
165 .and_then(|routed_vms| {
166 routed_vms.find_full_cone_nat_routed_node(&private_vm.public_ip_addr)
167 })
168 .ok_or(Error::RoutedVmNotFound(private_vm.public_ip_addr))?;
169
170 rsync_args.extend(vec![
171 "-e".to_string(),
172 format!(
173 "ssh -i {} -q -o StrictHostKeyChecking=no -o BatchMode=yes -o ConnectTimeout=30",
174 self.ssh_client
175 .get_private_key_path()
176 .to_string_lossy()
177 .as_ref()
178 ),
179 format!("root@{}:{NODE_LOG_DIR}", gateway_ip),
180 vm_path.to_string_lossy().to_string(),
181 ]);
182
183 Ok(rsync_args)
184 }
185
186 fn construct_symmetric_private_node_args(
187 &self,
188 private_vm: &VirtualMachine,
189 log_base_dir: &Path,
190 ) -> Result<Vec<String>> {
191 let vm_path = log_base_dir.join(&private_vm.name);
192
193 let mut rsync_args = DEFAULT_RSYNC_ARGS
194 .iter()
195 .map(|str| str.to_string())
196 .collect::<Vec<String>>();
197
198 let read_lock = self.ssh_client.routed_vms.read().map_err(|err| {
199 log::error!("Failed to set routed VMs: {err}");
200 Error::SshSettingsRwLockError
201 })?;
202 let (_, gateway_ip) = read_lock
203 .as_ref()
204 .and_then(|routed_vms| {
205 routed_vms.find_symmetric_nat_routed_node(&private_vm.public_ip_addr)
206 })
207 .ok_or(Error::RoutedVmNotFound(private_vm.public_ip_addr))?;
208
209 rsync_args.extend(vec![
210 "-e".to_string(),
211 format!(
212 "ssh -i {} -q -o StrictHostKeyChecking=no -o BatchMode=yes -o ConnectTimeout=30 -o ProxyCommand='ssh root@{gateway_ip} -W %h:%p -i {}'",
213 self.ssh_client
214 .get_private_key_path()
215 .to_string_lossy()
216 .as_ref(),
217 self.ssh_client
218 .get_private_key_path()
219 .to_string_lossy()
220 .as_ref(),
221 ),
222 format!("root@{}:{NODE_LOG_DIR}", private_vm.private_ip_addr),
223 vm_path.to_string_lossy().to_string(),
224 ]);
225
226 Ok(rsync_args)
227 }
228
229 fn run_rsync(vm: &VirtualMachine, rsync_args: &[String]) -> Result<()> {
230 debug!(
231 "Rsync logs to our machine for {:?} : {}",
232 vm.name, vm.public_ip_addr
233 );
234 run_external_command(
235 PathBuf::from("rsync"),
236 PathBuf::from("."),
237 rsync_args.to_vec(),
238 true,
239 false,
240 )?;
241
242 debug!(
243 "Finished rsync for for {:?} : {}",
244 vm.name, vm.public_ip_addr
245 );
246 Ok(())
247 }
248
249 pub fn ripgrep_logs(&self, name: &str, rg_args: &str) -> Result<()> {
250 let root_dir = std::env::current_dir()?;
252 let all_node_inventory = self.get_all_node_inventory(name)?;
253 let log_abs_dest = create_initial_log_dir_setup(&root_dir, name, &all_node_inventory)?;
254
255 let rg_cmd = format!("rg {rg_args} /mnt/antnode-storage/log//");
256 println!("Running ripgrep with command: {rg_cmd}");
257
258 let now = chrono::Utc::now();
260 let timestamp = now.format("%Y%m%dT%H%M%S").to_string();
261 let progress_bar = get_progress_bar(all_node_inventory.len() as u64)?;
262 let _failed_inventory = all_node_inventory
263 .par_iter()
264 .filter_map(|vm| {
265 let op =
266 match self
267 .ssh_client
268 .run_command(&vm.public_ip_addr, "root", &rg_cmd, true)
269 {
270 Ok(output) => {
271 match Self::store_rg_output(
272 ×tamp,
273 &rg_cmd,
274 &output,
275 &log_abs_dest,
276 &vm.name,
277 ) {
278 Ok(_) => None,
279 Err(err) => {
280 println!(
281 "Failed store output for {:?} with: {err:?}",
282 vm.public_ip_addr
283 );
284 Some(vm)
285 }
286 }
287 }
288 Err(Error::ExternalCommandRunFailed {
289 binary,
290 exit_status,
291 }) => {
292 if let Some(1) = exit_status.code() {
293 debug!("No matches found for {:?}", vm.public_ip_addr);
294 match Self::store_rg_output(
295 ×tamp,
296 &rg_cmd,
297 &["No matches found".to_string()],
298 &log_abs_dest,
299 &vm.name,
300 ) {
301 Ok(_) => None,
302 Err(err) => {
303 println!(
304 "Failed store output for {:?} with: {err:?}",
305 vm.public_ip_addr
306 );
307 Some(vm)
308 }
309 }
310 } else {
311 println!(
312 "Failed to run rg query for {:?} with: {binary}",
313 vm.public_ip_addr
314 );
315 Some(vm)
316 }
317 }
318 Err(err) => {
319 println!(
320 "Failed to run rg query for {:?} with: {err:?}",
321 vm.public_ip_addr
322 );
323 Some(vm)
324 }
325 };
326 progress_bar.inc(1);
327 op
328 })
329 .collect::<Vec<_>>();
330
331 progress_bar.finish_and_clear();
332 println!("Ripgrep completed!");
333
334 Ok(())
335 }
336
337 fn store_rg_output(
338 timestamp: &str,
339 cmd: &str,
340 output: &[String],
341 log_abs_dest: &Path,
342 vm_name: &str,
343 ) -> Result<()> {
344 std::fs::create_dir_all(log_abs_dest.join(vm_name))?;
345
346 let mut file = File::create(
347 log_abs_dest
348 .join(vm_name)
349 .join(format!("rg-{timestamp}.log")),
350 )?;
351
352 writeln!(file, "Command: {cmd}")?;
353
354 for line in output {
355 writeln!(file, "{}", line)?;
356 }
357
358 Ok(())
359 }
360
361 pub fn copy_logs(&self, name: &str, resources_only: bool) -> Result<()> {
366 let dest = PathBuf::from(".").join("logs").join(name);
367 if dest.exists() {
368 println!("Removing existing {} directory", dest.to_string_lossy());
369 remove(dest.clone())?;
370 }
371 std::fs::create_dir_all(&dest)?;
372 self.ansible_provisioner.copy_logs(name, resources_only)?;
373 Ok(())
374 }
375
376 fn get_all_node_inventory(&self, name: &str) -> Result<Vec<VirtualMachine>> {
378 let environments = self.terraform_runner.workspace_list()?;
379 if !environments.contains(&name.to_string()) {
380 return Err(Error::EnvironmentDoesNotExist(name.to_string()));
381 }
382 self.ansible_provisioner.get_all_node_inventory()
383 }
384}
385
386pub async fn get_logs(name: &str) -> Result<()> {
387 let dest_path = std::env::current_dir()?.join("logs").join(name);
388 std::fs::create_dir_all(dest_path.clone())?;
389 let s3_repository = S3Repository {};
390 s3_repository
391 .download_folder("sn-testnet", &format!("testnet-logs/{name}"), &dest_path)
392 .await?;
393 Ok(())
394}
395
396pub fn reassemble_logs(name: &str) -> Result<()> {
397 let src = PathBuf::from(".").join("logs").join(name);
398 if !src.exists() {
399 return Err(Error::LogsNotRetrievedError(name.to_string()));
400 }
401 let dest = PathBuf::from(".")
402 .join("logs")
403 .join(format!("{name}-reassembled"));
404 if dest.exists() {
405 println!("Removing previous {name}-reassembled directory");
406 remove(dest.clone())?;
407 }
408
409 std::fs::create_dir_all(&dest)?;
410 let mut options = CopyOptions::new();
411 options.overwrite = true;
412 copy(src.clone(), dest.clone(), &options)?;
413
414 visit_dirs(&dest, &process_part_files, &src, &dest)?;
415 Ok(())
416}
417
418pub async fn rm_logs(name: &str) -> Result<()> {
419 let s3_repository = S3Repository {};
420 s3_repository
421 .delete_folder("sn-testnet", &format!("testnet-logs/{name}"))
422 .await?;
423 Ok(())
424}
425
426fn process_part_files(dir_path: &Path, source_root: &PathBuf, dest_root: &PathBuf) -> Result<()> {
427 let reassembled_dir_path = if dir_path == dest_root {
428 dest_root.clone()
429 } else {
430 dest_root.join(dir_path.strip_prefix(source_root)?)
431 };
432 std::fs::create_dir_all(&reassembled_dir_path)?;
433
434 let entries: Vec<_> = std::fs::read_dir(dir_path)?
435 .map(|res| res.map(|e| e.path()))
436 .collect::<Result<Vec<_>, _>>()?;
437
438 let mut part_files: Vec<_> = entries
439 .iter()
440 .filter(|path| path.is_file() && path.to_string_lossy().contains("part"))
441 .collect();
442
443 part_files.sort_by_key(|a| {
444 a.file_stem()
445 .unwrap()
446 .to_string_lossy()
447 .split(".part")
448 .nth(1)
449 .unwrap()
450 .parse::<u32>()
451 .unwrap()
452 });
453
454 if part_files.is_empty() {
455 return Ok(());
456 }
457
458 let output_file_path = reassembled_dir_path.join("reassembled.log");
459 println!("Creating reassembled file at {output_file_path:#?}");
460 let mut output_file = File::create(&output_file_path)?;
461 for part_file in part_files.iter() {
462 let mut part_content = String::new();
463 File::open(part_file)?.read_to_string(&mut part_content)?;
464
465 part_content = part_content.replace("\\n", "\n");
468
469 let mut cursor = Cursor::new(part_content);
470 std::io::copy(&mut cursor, &mut output_file)?;
471 std::fs::remove_file(part_file)?;
472 }
473
474 Ok(())
475}
476
477fn visit_dirs(
478 dir: &Path,
479 cb: &dyn Fn(&Path, &PathBuf, &PathBuf) -> Result<()>,
480 source_root: &PathBuf,
481 dest_root: &PathBuf,
482) -> Result<()> {
483 if dir.is_dir() {
484 cb(dir, source_root, dest_root)?;
485 for entry in std::fs::read_dir(dir)? {
486 let entry = entry?;
487 let path = entry.path();
488 if path.is_dir() {
489 visit_dirs(&path, cb, dest_root, dest_root)?;
490 }
491 }
492 }
493 Ok(())
494}
495
496fn create_initial_log_dir_setup(
498 root_dir: &Path,
499 name: &str,
500 all_node_inventory: &[VirtualMachine],
501) -> Result<PathBuf> {
502 let log_dest = root_dir.join("logs").join(name);
503 if !log_dest.exists() {
504 std::fs::create_dir_all(&log_dest)?;
505 }
506 let log_abs_dest = std::fs::canonicalize(log_dest)?;
508 all_node_inventory.par_iter().for_each(|vm| {
510 let vm_path = log_abs_dest.join(&vm.name);
511 let _ = std::fs::create_dir_all(vm_path);
512 });
513 Ok(log_abs_dest)
514}