sn_testnet_deploy/
logs.rs

1// Copyright (c) 2023, MaidSafe.
2// All rights reserved.
3//
4// This SAFE Network Software is licensed under the BSD-3-Clause license.
5// Please see the LICENSE file for more details.
6
7use crate::{
8    error::{Error, Result},
9    get_progress_bar,
10    inventory::VirtualMachine,
11    run_external_command,
12    s3::S3Repository,
13    TestnetDeployer,
14};
15use fs_extra::dir::{copy, remove, CopyOptions};
16use log::debug;
17use rayon::iter::{IntoParallelIterator, IntoParallelRefIterator, ParallelIterator};
18use std::{
19    fs::File,
20    io::{Cursor, Read, Write},
21    path::{Path, PathBuf},
22};
23
24const DEFAULT_RSYNC_ARGS: [&str; 8] = [
25    "--compress",
26    "--archive",
27    "--prune-empty-dirs",
28    "--verbose",
29    "--verbose",
30    "--filter=+ */",     // Include all directories for traversal
31    "--filter=+ *.log*", // Include all *.log* files
32    "--filter=- *",      // Exclude all other files
33];
34
35const NODE_LOG_DIR: &str = "/mnt/antnode-storage/log/";
36
37impl TestnetDeployer {
38    pub fn rsync_logs(&self, name: &str, vm_filter: Option<String>) -> Result<()> {
39        // take root_dir at the top as `get_all_node_inventory` changes the working dir.
40        let root_dir = std::env::current_dir()?;
41        let all_node_inventory = self.get_all_node_inventory(name)?;
42        let all_node_inventory = if let Some(filter) = vm_filter {
43            all_node_inventory
44                .into_iter()
45                .filter(|vm| vm.name.contains(&filter))
46                .collect()
47        } else {
48            all_node_inventory
49        };
50
51        let log_base_dir = create_initial_log_dir_setup(&root_dir, name, &all_node_inventory)?;
52
53        // We might use the script, so goto the resource dir.
54        std::env::set_current_dir(self.working_directory_path.clone())?;
55        println!("Starting to rsync the log files");
56        let progress_bar = get_progress_bar(all_node_inventory.len() as u64)?;
57
58        let rsync_args = all_node_inventory
59            .iter()
60            .map(|vm| {
61                let args = if vm.name.contains("symmetric") {
62                    let args = self.construct_symmetric_private_node_args(vm, &log_base_dir)?;
63                    debug!("Using symmetric rsync args for {:?}", vm.name);
64                    debug!("Args for {}: {:?}", vm.name, args);
65                    args
66                } else if vm.name.contains("full-cone") {
67                    let args = self.construct_full_cone_private_node_args(vm, &log_base_dir)?;
68                    debug!("Using symmetric rsync args for {:?}", vm.name);
69                    debug!("Args for {}: {:?}", vm.name, args);
70                    args
71                } else {
72                    let args = self.construct_default_args(vm, &log_base_dir);
73                    debug!("Using public rsync args for {:?}", vm.name);
74                    debug!("Args for {}: {:?}", vm.name, args);
75                    args
76                };
77
78                Ok((vm.clone(), args))
79            })
80            .collect::<Result<Vec<_>>>()?;
81
82        let failed_inventory = rsync_args
83            .par_iter()
84            .filter_map(|(vm, args)| {
85                if let Err(err) = Self::run_rsync(vm, args) {
86                    println!(
87                        "Failed to rsync. Retrying it after ssh-keygen {:?} : {} with err: {err:?}",
88                        vm.name, vm.public_ip_addr
89                    );
90                    return Some((vm.clone(), args.clone()));
91                }
92                progress_bar.inc(1);
93                None
94            })
95            .collect::<Vec<_>>();
96
97        // try ssh-keygen for the failed inventory and try to rsync again
98        failed_inventory
99            .into_par_iter()
100            .for_each(|(vm, args)| {
101                debug!("Trying to ssh-keygen for {:?} : {}", vm.name, vm.public_ip_addr);
102                if let Err(err) = run_external_command(
103                    PathBuf::from("ssh-keygen"),
104                    PathBuf::from("."),
105                    vec!["-R".to_string(), format!("{}", vm.public_ip_addr)],
106                    false,
107                    false,
108                ) {
109                    println!("Failed to ssh-keygen {:?} : {} with err: {err:?}", vm.name, vm.public_ip_addr);
110                } else if let Err(err) =
111                    Self::run_rsync(&vm, &args)
112                {
113                    println!("Failed to rsync even after ssh-keygen. Could not obtain logs for {:?} : {} with err: {err:?}", vm.name, vm.public_ip_addr);
114                }
115                progress_bar.inc(1);
116            });
117        progress_bar.finish_and_clear();
118        println!("Rsync completed!");
119        Ok(())
120    }
121
122    fn construct_default_args(&self, vm: &VirtualMachine, log_base_dir: &Path) -> Vec<String> {
123        let vm_path = log_base_dir.join(&vm.name);
124        let mut rsync_args = DEFAULT_RSYNC_ARGS
125            .iter()
126            .map(|str| str.to_string())
127            .collect::<Vec<String>>();
128
129        // TODO: SSH limits the connections/instances to 10 at a time. Changing /etc/ssh/sshd_config, doesn't work?
130        // How to bypass this?
131        rsync_args.extend(vec![
132            "-e".to_string(),
133            format!(
134                "ssh -i {} -q -o StrictHostKeyChecking=no -o BatchMode=yes -o ConnectTimeout=30",
135                self.ssh_client
136                    .get_private_key_path()
137                    .to_string_lossy()
138                    .as_ref()
139            ),
140            format!("root@{}:{NODE_LOG_DIR}", vm.public_ip_addr),
141            vm_path.to_string_lossy().to_string(),
142        ]);
143
144        rsync_args
145    }
146
147    fn construct_full_cone_private_node_args(
148        &self,
149        private_vm: &VirtualMachine,
150        log_base_dir: &Path,
151    ) -> Result<Vec<String>> {
152        let vm_path = log_base_dir.join(&private_vm.name);
153
154        let mut rsync_args = DEFAULT_RSYNC_ARGS
155            .iter()
156            .map(|str| str.to_string())
157            .collect::<Vec<String>>();
158
159        let read_lock = self.ssh_client.routed_vms.read().map_err(|err| {
160            log::error!("Failed to set routed VMs: {err}");
161            Error::SshSettingsRwLockError
162        })?;
163        let (_, gateway_ip) = read_lock
164            .as_ref()
165            .and_then(|routed_vms| {
166                routed_vms.find_full_cone_nat_routed_node(&private_vm.public_ip_addr)
167            })
168            .ok_or(Error::RoutedVmNotFound(private_vm.public_ip_addr))?;
169
170        rsync_args.extend(vec![
171            "-e".to_string(),
172            format!(
173                "ssh -i {} -q -o StrictHostKeyChecking=no -o BatchMode=yes -o ConnectTimeout=30",
174                self.ssh_client
175                    .get_private_key_path()
176                    .to_string_lossy()
177                    .as_ref()
178            ),
179            format!("root@{}:{NODE_LOG_DIR}", gateway_ip),
180            vm_path.to_string_lossy().to_string(),
181        ]);
182
183        Ok(rsync_args)
184    }
185
186    fn construct_symmetric_private_node_args(
187        &self,
188        private_vm: &VirtualMachine,
189        log_base_dir: &Path,
190    ) -> Result<Vec<String>> {
191        let vm_path = log_base_dir.join(&private_vm.name);
192
193        let mut rsync_args = DEFAULT_RSYNC_ARGS
194            .iter()
195            .map(|str| str.to_string())
196            .collect::<Vec<String>>();
197
198        let read_lock = self.ssh_client.routed_vms.read().map_err(|err| {
199            log::error!("Failed to set routed VMs: {err}");
200            Error::SshSettingsRwLockError
201        })?;
202        let (_, gateway_ip) = read_lock
203            .as_ref()
204            .and_then(|routed_vms| {
205                routed_vms.find_symmetric_nat_routed_node(&private_vm.public_ip_addr)
206            })
207            .ok_or(Error::RoutedVmNotFound(private_vm.public_ip_addr))?;
208
209        rsync_args.extend(vec![
210                "-e".to_string(),
211                format!(
212                    "ssh -i {} -q -o StrictHostKeyChecking=no -o BatchMode=yes -o ConnectTimeout=30 -o ProxyCommand='ssh root@{gateway_ip} -W %h:%p -i {}'",
213                    self.ssh_client
214                        .get_private_key_path()
215                        .to_string_lossy()
216                        .as_ref(),
217                    self.ssh_client
218                        .get_private_key_path()
219                        .to_string_lossy()
220                        .as_ref(),
221                ),
222                format!("root@{}:{NODE_LOG_DIR}", private_vm.private_ip_addr),
223                vm_path.to_string_lossy().to_string(),
224            ]);
225
226        Ok(rsync_args)
227    }
228
229    fn run_rsync(vm: &VirtualMachine, rsync_args: &[String]) -> Result<()> {
230        debug!(
231            "Rsync logs to our machine for {:?} : {}",
232            vm.name, vm.public_ip_addr
233        );
234        run_external_command(
235            PathBuf::from("rsync"),
236            PathBuf::from("."),
237            rsync_args.to_vec(),
238            true,
239            false,
240        )?;
241
242        debug!(
243            "Finished rsync for for {:?} : {}",
244            vm.name, vm.public_ip_addr
245        );
246        Ok(())
247    }
248
249    pub fn ripgrep_logs(&self, name: &str, rg_args: &str) -> Result<()> {
250        // take root_dir at the top as `get_all_node_inventory` changes the working dir.
251        let root_dir = std::env::current_dir()?;
252        let all_node_inventory = self.get_all_node_inventory(name)?;
253        let log_abs_dest = create_initial_log_dir_setup(&root_dir, name, &all_node_inventory)?;
254
255        let rg_cmd = format!("rg {rg_args} /mnt/antnode-storage/log//");
256        println!("Running ripgrep with command: {rg_cmd}");
257
258        // Get current date and time
259        let now = chrono::Utc::now();
260        let timestamp = now.format("%Y%m%dT%H%M%S").to_string();
261        let progress_bar = get_progress_bar(all_node_inventory.len() as u64)?;
262        let _failed_inventory = all_node_inventory
263            .par_iter()
264            .filter_map(|vm| {
265                let op =
266                    match self
267                        .ssh_client
268                        .run_command(&vm.public_ip_addr, "root", &rg_cmd, true)
269                    {
270                        Ok(output) => {
271                            match Self::store_rg_output(
272                                &timestamp,
273                                &rg_cmd,
274                                &output,
275                                &log_abs_dest,
276                                &vm.name,
277                            ) {
278                                Ok(_) => None,
279                                Err(err) => {
280                                    println!(
281                                        "Failed store output for {:?} with: {err:?}",
282                                        vm.public_ip_addr
283                                    );
284                                    Some(vm)
285                                }
286                            }
287                        }
288                        Err(Error::ExternalCommandRunFailed {
289                            binary,
290                            exit_status,
291                        }) => {
292                            if let Some(1) = exit_status.code() {
293                                debug!("No matches found for {:?}", vm.public_ip_addr);
294                                match Self::store_rg_output(
295                                    &timestamp,
296                                    &rg_cmd,
297                                    &["No matches found".to_string()],
298                                    &log_abs_dest,
299                                    &vm.name,
300                                ) {
301                                    Ok(_) => None,
302                                    Err(err) => {
303                                        println!(
304                                            "Failed store output for {:?} with: {err:?}",
305                                            vm.public_ip_addr
306                                        );
307                                        Some(vm)
308                                    }
309                                }
310                            } else {
311                                println!(
312                                    "Failed to run rg query for {:?} with: {binary}",
313                                    vm.public_ip_addr
314                                );
315                                Some(vm)
316                            }
317                        }
318                        Err(err) => {
319                            println!(
320                                "Failed to run rg query for {:?} with: {err:?}",
321                                vm.public_ip_addr
322                            );
323                            Some(vm)
324                        }
325                    };
326                progress_bar.inc(1);
327                op
328            })
329            .collect::<Vec<_>>();
330
331        progress_bar.finish_and_clear();
332        println!("Ripgrep completed!");
333
334        Ok(())
335    }
336
337    fn store_rg_output(
338        timestamp: &str,
339        cmd: &str,
340        output: &[String],
341        log_abs_dest: &Path,
342        vm_name: &str,
343    ) -> Result<()> {
344        std::fs::create_dir_all(log_abs_dest.join(vm_name))?;
345
346        let mut file = File::create(
347            log_abs_dest
348                .join(vm_name)
349                .join(format!("rg-{timestamp}.log")),
350        )?;
351
352        writeln!(file, "Command: {cmd}")?;
353
354        for line in output {
355            writeln!(file, "{}", line)?;
356        }
357
358        Ok(())
359    }
360
361    /// Run an Ansible playbook to copy the logs from all the machines in the inventory.
362    ///
363    /// It needs to be part of `TestnetDeploy` because the Ansible runner is already setup in that
364    /// context.
365    pub fn copy_logs(&self, name: &str, resources_only: bool) -> Result<()> {
366        let dest = PathBuf::from(".").join("logs").join(name);
367        if dest.exists() {
368            println!("Removing existing {} directory", dest.to_string_lossy());
369            remove(dest.clone())?;
370        }
371        std::fs::create_dir_all(&dest)?;
372        self.ansible_provisioner.copy_logs(name, resources_only)?;
373        Ok(())
374    }
375
376    // Return the list of all the node machines.
377    fn get_all_node_inventory(&self, name: &str) -> Result<Vec<VirtualMachine>> {
378        let environments = self.terraform_runner.workspace_list()?;
379        if !environments.contains(&name.to_string()) {
380            return Err(Error::EnvironmentDoesNotExist(name.to_string()));
381        }
382        self.ansible_provisioner.get_all_node_inventory()
383    }
384}
385
386pub async fn get_logs(name: &str) -> Result<()> {
387    let dest_path = std::env::current_dir()?.join("logs").join(name);
388    std::fs::create_dir_all(dest_path.clone())?;
389    let s3_repository = S3Repository {};
390    s3_repository
391        .download_folder("sn-testnet", &format!("testnet-logs/{name}"), &dest_path)
392        .await?;
393    Ok(())
394}
395
396pub fn reassemble_logs(name: &str) -> Result<()> {
397    let src = PathBuf::from(".").join("logs").join(name);
398    if !src.exists() {
399        return Err(Error::LogsNotRetrievedError(name.to_string()));
400    }
401    let dest = PathBuf::from(".")
402        .join("logs")
403        .join(format!("{name}-reassembled"));
404    if dest.exists() {
405        println!("Removing previous {name}-reassembled directory");
406        remove(dest.clone())?;
407    }
408
409    std::fs::create_dir_all(&dest)?;
410    let mut options = CopyOptions::new();
411    options.overwrite = true;
412    copy(src.clone(), dest.clone(), &options)?;
413
414    visit_dirs(&dest, &process_part_files, &src, &dest)?;
415    Ok(())
416}
417
418pub async fn rm_logs(name: &str) -> Result<()> {
419    let s3_repository = S3Repository {};
420    s3_repository
421        .delete_folder("sn-testnet", &format!("testnet-logs/{name}"))
422        .await?;
423    Ok(())
424}
425
426fn process_part_files(dir_path: &Path, source_root: &PathBuf, dest_root: &PathBuf) -> Result<()> {
427    let reassembled_dir_path = if dir_path == dest_root {
428        dest_root.clone()
429    } else {
430        dest_root.join(dir_path.strip_prefix(source_root)?)
431    };
432    std::fs::create_dir_all(&reassembled_dir_path)?;
433
434    let entries: Vec<_> = std::fs::read_dir(dir_path)?
435        .map(|res| res.map(|e| e.path()))
436        .collect::<Result<Vec<_>, _>>()?;
437
438    let mut part_files: Vec<_> = entries
439        .iter()
440        .filter(|path| path.is_file() && path.to_string_lossy().contains("part"))
441        .collect();
442
443    part_files.sort_by_key(|a| {
444        a.file_stem()
445            .unwrap()
446            .to_string_lossy()
447            .split(".part")
448            .nth(1)
449            .unwrap()
450            .parse::<u32>()
451            .unwrap()
452    });
453
454    if part_files.is_empty() {
455        return Ok(());
456    }
457
458    let output_file_path = reassembled_dir_path.join("reassembled.log");
459    println!("Creating reassembled file at {output_file_path:#?}");
460    let mut output_file = File::create(&output_file_path)?;
461    for part_file in part_files.iter() {
462        let mut part_content = String::new();
463        File::open(part_file)?.read_to_string(&mut part_content)?;
464
465        // For some reason logstash writes "\n" as a literal string rather than a newline
466        // character.
467        part_content = part_content.replace("\\n", "\n");
468
469        let mut cursor = Cursor::new(part_content);
470        std::io::copy(&mut cursor, &mut output_file)?;
471        std::fs::remove_file(part_file)?;
472    }
473
474    Ok(())
475}
476
477fn visit_dirs(
478    dir: &Path,
479    cb: &dyn Fn(&Path, &PathBuf, &PathBuf) -> Result<()>,
480    source_root: &PathBuf,
481    dest_root: &PathBuf,
482) -> Result<()> {
483    if dir.is_dir() {
484        cb(dir, source_root, dest_root)?;
485        for entry in std::fs::read_dir(dir)? {
486            let entry = entry?;
487            let path = entry.path();
488            if path.is_dir() {
489                visit_dirs(&path, cb, dest_root, dest_root)?;
490            }
491        }
492    }
493    Ok(())
494}
495
496// Create the log dirs for all the machines. Returns the absolute path to the `logs/name`
497fn create_initial_log_dir_setup(
498    root_dir: &Path,
499    name: &str,
500    all_node_inventory: &[VirtualMachine],
501) -> Result<PathBuf> {
502    let log_dest = root_dir.join("logs").join(name);
503    if !log_dest.exists() {
504        std::fs::create_dir_all(&log_dest)?;
505    }
506    // Get the absolute path here. We might be changing the current_dir and we don't want to run into problems.
507    let log_abs_dest = std::fs::canonicalize(log_dest)?;
508    // Create a log dir per VM
509    all_node_inventory.par_iter().for_each(|vm| {
510        let vm_path = log_abs_dest.join(&vm.name);
511        let _ = std::fs::create_dir_all(vm_path);
512    });
513    Ok(log_abs_dest)
514}