sn_testnet_deploy/
logs.rs

1// Copyright (c) 2023, MaidSafe.
2// All rights reserved.
3//
4// This SAFE Network Software is licensed under the BSD-3-Clause license.
5// Please see the LICENSE file for more details.
6
7use crate::{
8    error::{Error, Result},
9    get_progress_bar,
10    inventory::VirtualMachine,
11    run_external_command,
12    s3::S3Repository,
13    TestnetDeployer,
14};
15use fs_extra::dir::{copy, remove, CopyOptions};
16use log::debug;
17use rayon::iter::{IntoParallelIterator, IntoParallelRefIterator, ParallelIterator};
18use std::{
19    fs::File,
20    io::{Cursor, Read, Write},
21    path::{Path, PathBuf},
22};
23
24const DEFAULT_RSYNC_ARGS: [&str; 8] = [
25    "--compress",
26    "--archive",
27    "--prune-empty-dirs",
28    "--verbose",
29    "--verbose",
30    "--filter=+ */",     // Include all directories for traversal
31    "--filter=+ *.log*", // Include all *.log* files
32    "--filter=- *",      // Exclude all other files
33];
34
35const NODE_LOG_DIR: &str = "/mnt/antnode-storage/log/";
36
37impl TestnetDeployer {
38    pub fn rsync_logs(
39        &self,
40        name: &str,
41        vm_filter: Option<String>,
42        disable_client_logs: bool,
43    ) -> Result<()> {
44        // take root_dir at the top as `get_all_node_inventory` changes the working dir.
45        let root_dir = std::env::current_dir()?;
46
47        let mut uploader_inventory = vec![];
48        if !disable_client_logs {
49            let unfiltered_uploader_vms = self.ansible_provisioner.get_current_uploader_count()?;
50            let uploader_vm = if let Some(filter) = &vm_filter {
51                unfiltered_uploader_vms
52                    .into_iter()
53                    .filter(|(vm, _)| vm.name.contains(filter))
54                    .collect()
55            } else {
56                unfiltered_uploader_vms
57            };
58            uploader_inventory.extend(uploader_vm);
59        }
60
61        let unfiltered_node_vms = self.get_all_node_inventory(name)?;
62        let all_node_inventory = if let Some(filter) = vm_filter {
63            unfiltered_node_vms
64                .into_iter()
65                .filter(|vm| vm.name.contains(&filter))
66                .collect()
67        } else {
68            unfiltered_node_vms
69        };
70
71        create_initial_log_dir_setup_client(&root_dir, name, &uploader_inventory)?;
72        let log_base_dir = create_initial_log_dir_setup_node(&root_dir, name, &all_node_inventory)?;
73
74        // We might use the script, so goto the resource dir.
75        std::env::set_current_dir(self.working_directory_path.clone())?;
76        println!("Starting to rsync the log files");
77        let progress_bar = get_progress_bar(all_node_inventory.len() as u64)?;
78
79        let mut rsync_args = all_node_inventory
80            .iter()
81            .map(|vm| {
82                let args = if vm.name.contains("symmetric") {
83                    let args = self.construct_symmetric_private_node_args(vm, &log_base_dir)?;
84                    debug!("Using symmetric rsync args for {:?}", vm.name);
85                    debug!("Args for {}: {:?}", vm.name, args);
86                    args
87                } else if vm.name.contains("full-cone") {
88                    let args = self.construct_full_cone_private_node_args(vm, &log_base_dir)?;
89                    debug!("Using symmetric rsync args for {:?}", vm.name);
90                    debug!("Args for {}: {:?}", vm.name, args);
91                    args
92                } else {
93                    let args = self.construct_public_node_args(vm, &log_base_dir);
94                    debug!("Using public rsync args for {:?}", vm.name);
95                    debug!("Args for {}: {:?}", vm.name, args);
96                    args
97                };
98
99                Ok((vm.clone(), args))
100            })
101            .collect::<Result<Vec<_>>>()?;
102
103        rsync_args.extend(uploader_inventory.iter().flat_map(|(vm, uploader_count)| {
104            let mut all_user_args = vec![];
105            for ant_user in 1..=*uploader_count {
106                let vm = vm.clone();
107                let args = self.construct_uploader_args(&vm, ant_user, &log_base_dir);
108                debug!(
109                    "Using uploader rsync args for {:?} and user {ant_user}",
110                    vm.name
111                );
112                debug!("Args for {} and user {ant_user}: {:?}", vm.name, args);
113                all_user_args.push((vm, args));
114            }
115            all_user_args
116        }));
117
118        let failed_inventory = rsync_args
119            .par_iter()
120            .filter_map(|(vm, args)| {
121                if let Err(err) = Self::run_rsync(vm, args) {
122                    println!(
123                        "Failed to rsync. Retrying it after ssh-keygen {:?} : {} with err: {err:?}",
124                        vm.name, vm.public_ip_addr
125                    );
126                    return Some((vm.clone(), args.clone()));
127                }
128                progress_bar.inc(1);
129                None
130            })
131            .collect::<Vec<_>>();
132
133        // try ssh-keygen for the failed inventory and try to rsync again
134        failed_inventory
135            .into_par_iter()
136            .for_each(|(vm, args)| {
137                debug!("Trying to ssh-keygen for {:?} : {}", vm.name, vm.public_ip_addr);
138                if let Err(err) = run_external_command(
139                    PathBuf::from("ssh-keygen"),
140                    PathBuf::from("."),
141                    vec!["-R".to_string(), format!("{}", vm.public_ip_addr)],
142                    false,
143                    false,
144                ) {
145                    println!("Failed to ssh-keygen {:?} : {} with err: {err:?}", vm.name, vm.public_ip_addr);
146                } else if let Err(err) =
147                    Self::run_rsync(&vm, &args)
148                {
149                    println!("Failed to rsync even after ssh-keygen. Could not obtain logs for {:?} : {} with err: {err:?}", vm.name, vm.public_ip_addr);
150                }
151                progress_bar.inc(1);
152            });
153        progress_bar.finish_and_clear();
154        println!("Rsync completed!");
155        Ok(())
156    }
157
158    fn construct_uploader_args(
159        &self,
160        vm: &VirtualMachine,
161        ant_user: usize,
162        log_base_dir: &Path,
163    ) -> Vec<String> {
164        let vm_path = log_base_dir.join(format!("{}_ant{ant_user}", vm.name));
165        let mut rsync_args = DEFAULT_RSYNC_ARGS
166            .iter()
167            .map(|str| str.to_string())
168            .collect::<Vec<String>>();
169
170        // TODO: SSH limits the connections/instances to 10 at a time. Changing /etc/ssh/sshd_config, doesn't work?
171        // How to bypass this?
172        rsync_args.extend(vec![
173            "-e".to_string(),
174            format!(
175                "ssh -i {} -q -o StrictHostKeyChecking=no -o BatchMode=yes -o ConnectTimeout=30",
176                self.ssh_client
177                    .get_private_key_path()
178                    .to_string_lossy()
179                    .as_ref()
180            ),
181            format!(
182                "root@{}:/home/ant{ant_user}/.local/share/autonomi/client/logs/",
183                vm.public_ip_addr
184            ),
185            vm_path.to_string_lossy().to_string(),
186        ]);
187
188        rsync_args
189    }
190
191    fn construct_public_node_args(&self, vm: &VirtualMachine, log_base_dir: &Path) -> Vec<String> {
192        let vm_path = log_base_dir.join(&vm.name);
193        let mut rsync_args = DEFAULT_RSYNC_ARGS
194            .iter()
195            .map(|str| str.to_string())
196            .collect::<Vec<String>>();
197
198        // TODO: SSH limits the connections/instances to 10 at a time. Changing /etc/ssh/sshd_config, doesn't work?
199        // How to bypass this?
200        rsync_args.extend(vec![
201            "-e".to_string(),
202            format!(
203                "ssh -i {} -q -o StrictHostKeyChecking=no -o BatchMode=yes -o ConnectTimeout=30",
204                self.ssh_client
205                    .get_private_key_path()
206                    .to_string_lossy()
207                    .as_ref()
208            ),
209            format!("root@{}:{NODE_LOG_DIR}", vm.public_ip_addr),
210            vm_path.to_string_lossy().to_string(),
211        ]);
212
213        rsync_args
214    }
215
216    fn construct_full_cone_private_node_args(
217        &self,
218        private_vm: &VirtualMachine,
219        log_base_dir: &Path,
220    ) -> Result<Vec<String>> {
221        let vm_path = log_base_dir.join(&private_vm.name);
222
223        let mut rsync_args = DEFAULT_RSYNC_ARGS
224            .iter()
225            .map(|str| str.to_string())
226            .collect::<Vec<String>>();
227
228        let read_lock = self.ssh_client.routed_vms.read().map_err(|err| {
229            log::error!("Failed to set routed VMs: {err}");
230            Error::SshSettingsRwLockError
231        })?;
232        let (_, gateway_ip) = read_lock
233            .as_ref()
234            .and_then(|routed_vms| {
235                routed_vms.find_full_cone_nat_routed_node(&private_vm.public_ip_addr)
236            })
237            .ok_or(Error::RoutedVmNotFound(private_vm.public_ip_addr))?;
238
239        rsync_args.extend(vec![
240            "-e".to_string(),
241            format!(
242                "ssh -i {} -q -o StrictHostKeyChecking=no -o BatchMode=yes -o ConnectTimeout=30",
243                self.ssh_client
244                    .get_private_key_path()
245                    .to_string_lossy()
246                    .as_ref()
247            ),
248            format!("root@{}:{NODE_LOG_DIR}", gateway_ip),
249            vm_path.to_string_lossy().to_string(),
250        ]);
251
252        Ok(rsync_args)
253    }
254
255    fn construct_symmetric_private_node_args(
256        &self,
257        private_vm: &VirtualMachine,
258        log_base_dir: &Path,
259    ) -> Result<Vec<String>> {
260        let vm_path = log_base_dir.join(&private_vm.name);
261
262        let mut rsync_args = DEFAULT_RSYNC_ARGS
263            .iter()
264            .map(|str| str.to_string())
265            .collect::<Vec<String>>();
266
267        let read_lock = self.ssh_client.routed_vms.read().map_err(|err| {
268            log::error!("Failed to set routed VMs: {err}");
269            Error::SshSettingsRwLockError
270        })?;
271        let (_, gateway_ip) = read_lock
272            .as_ref()
273            .and_then(|routed_vms| {
274                routed_vms.find_symmetric_nat_routed_node(&private_vm.public_ip_addr)
275            })
276            .ok_or(Error::RoutedVmNotFound(private_vm.public_ip_addr))?;
277
278        rsync_args.extend(vec![
279                "-e".to_string(),
280                format!(
281                    "ssh -i {} -q -o StrictHostKeyChecking=no -o BatchMode=yes -o ConnectTimeout=30 -o ProxyCommand='ssh root@{gateway_ip} -W %h:%p -i {}'",
282                    self.ssh_client
283                        .get_private_key_path()
284                        .to_string_lossy()
285                        .as_ref(),
286                    self.ssh_client
287                        .get_private_key_path()
288                        .to_string_lossy()
289                        .as_ref(),
290                ),
291                format!("root@{}:{NODE_LOG_DIR}", private_vm.private_ip_addr),
292                vm_path.to_string_lossy().to_string(),
293            ]);
294
295        Ok(rsync_args)
296    }
297
298    fn run_rsync(vm: &VirtualMachine, rsync_args: &[String]) -> Result<()> {
299        debug!(
300            "Rsync logs to our machine for {:?} : {}",
301            vm.name, vm.public_ip_addr
302        );
303        run_external_command(
304            PathBuf::from("rsync"),
305            PathBuf::from("."),
306            rsync_args.to_vec(),
307            true,
308            false,
309        )?;
310
311        debug!(
312            "Finished rsync for for {:?} : {}",
313            vm.name, vm.public_ip_addr
314        );
315        Ok(())
316    }
317
318    pub fn ripgrep_logs(&self, name: &str, rg_args: &str) -> Result<()> {
319        // take root_dir at the top as `get_all_node_inventory` changes the working dir.
320        let root_dir = std::env::current_dir()?;
321        let all_node_inventory = self.get_all_node_inventory(name)?;
322        let log_abs_dest = create_initial_log_dir_setup_node(&root_dir, name, &all_node_inventory)?;
323
324        let rg_cmd = format!("rg {rg_args} /mnt/antnode-storage/log//");
325        println!("Running ripgrep with command: {rg_cmd}");
326
327        // Get current date and time
328        let now = chrono::Utc::now();
329        let timestamp = now.format("%Y%m%dT%H%M%S").to_string();
330        let progress_bar = get_progress_bar(all_node_inventory.len() as u64)?;
331        let _failed_inventory = all_node_inventory
332            .par_iter()
333            .filter_map(|vm| {
334                let op =
335                    match self
336                        .ssh_client
337                        .run_command(&vm.public_ip_addr, "root", &rg_cmd, true)
338                    {
339                        Ok(output) => {
340                            match Self::store_rg_output(
341                                &timestamp,
342                                &rg_cmd,
343                                &output,
344                                &log_abs_dest,
345                                &vm.name,
346                            ) {
347                                Ok(_) => None,
348                                Err(err) => {
349                                    println!(
350                                        "Failed store output for {:?} with: {err:?}",
351                                        vm.public_ip_addr
352                                    );
353                                    Some(vm)
354                                }
355                            }
356                        }
357                        Err(Error::ExternalCommandRunFailed {
358                            binary,
359                            exit_status,
360                        }) => {
361                            if let Some(1) = exit_status.code() {
362                                debug!("No matches found for {:?}", vm.public_ip_addr);
363                                match Self::store_rg_output(
364                                    &timestamp,
365                                    &rg_cmd,
366                                    &["No matches found".to_string()],
367                                    &log_abs_dest,
368                                    &vm.name,
369                                ) {
370                                    Ok(_) => None,
371                                    Err(err) => {
372                                        println!(
373                                            "Failed store output for {:?} with: {err:?}",
374                                            vm.public_ip_addr
375                                        );
376                                        Some(vm)
377                                    }
378                                }
379                            } else {
380                                println!(
381                                    "Failed to run rg query for {:?} with: {binary}",
382                                    vm.public_ip_addr
383                                );
384                                Some(vm)
385                            }
386                        }
387                        Err(err) => {
388                            println!(
389                                "Failed to run rg query for {:?} with: {err:?}",
390                                vm.public_ip_addr
391                            );
392                            Some(vm)
393                        }
394                    };
395                progress_bar.inc(1);
396                op
397            })
398            .collect::<Vec<_>>();
399
400        progress_bar.finish_and_clear();
401        println!("Ripgrep completed!");
402
403        Ok(())
404    }
405
406    fn store_rg_output(
407        timestamp: &str,
408        cmd: &str,
409        output: &[String],
410        log_abs_dest: &Path,
411        vm_name: &str,
412    ) -> Result<()> {
413        std::fs::create_dir_all(log_abs_dest.join(vm_name))?;
414
415        let mut file = File::create(
416            log_abs_dest
417                .join(vm_name)
418                .join(format!("rg-{timestamp}.log")),
419        )?;
420
421        writeln!(file, "Command: {cmd}")?;
422
423        for line in output {
424            writeln!(file, "{}", line)?;
425        }
426
427        Ok(())
428    }
429
430    /// Run an Ansible playbook to copy the logs from all the machines in the inventory.
431    ///
432    /// It needs to be part of `TestnetDeploy` because the Ansible runner is already setup in that
433    /// context.
434    pub fn copy_logs(&self, name: &str, resources_only: bool) -> Result<()> {
435        let dest = PathBuf::from(".").join("logs").join(name);
436        if dest.exists() {
437            println!("Removing existing {} directory", dest.to_string_lossy());
438            remove(dest.clone())?;
439        }
440        std::fs::create_dir_all(&dest)?;
441        self.ansible_provisioner.copy_logs(name, resources_only)?;
442        Ok(())
443    }
444
445    // Return the list of all the node machines.
446    fn get_all_node_inventory(&self, name: &str) -> Result<Vec<VirtualMachine>> {
447        let environments = self.terraform_runner.workspace_list()?;
448        if !environments.contains(&name.to_string()) {
449            return Err(Error::EnvironmentDoesNotExist(name.to_string()));
450        }
451        self.ansible_provisioner.get_all_node_inventory()
452    }
453}
454
455pub async fn get_logs(name: &str) -> Result<()> {
456    let dest_path = std::env::current_dir()?.join("logs").join(name);
457    std::fs::create_dir_all(dest_path.clone())?;
458    let s3_repository = S3Repository {};
459    s3_repository
460        .download_folder("sn-testnet", &format!("testnet-logs/{name}"), &dest_path)
461        .await?;
462    Ok(())
463}
464
465pub fn reassemble_logs(name: &str) -> Result<()> {
466    let src = PathBuf::from(".").join("logs").join(name);
467    if !src.exists() {
468        return Err(Error::LogsNotRetrievedError(name.to_string()));
469    }
470    let dest = PathBuf::from(".")
471        .join("logs")
472        .join(format!("{name}-reassembled"));
473    if dest.exists() {
474        println!("Removing previous {name}-reassembled directory");
475        remove(dest.clone())?;
476    }
477
478    std::fs::create_dir_all(&dest)?;
479    let mut options = CopyOptions::new();
480    options.overwrite = true;
481    copy(src.clone(), dest.clone(), &options)?;
482
483    visit_dirs(&dest, &process_part_files, &src, &dest)?;
484    Ok(())
485}
486
487pub async fn rm_logs(name: &str) -> Result<()> {
488    let s3_repository = S3Repository {};
489    s3_repository
490        .delete_folder("sn-testnet", &format!("testnet-logs/{name}"))
491        .await?;
492    Ok(())
493}
494
495fn process_part_files(dir_path: &Path, source_root: &PathBuf, dest_root: &PathBuf) -> Result<()> {
496    let reassembled_dir_path = if dir_path == dest_root {
497        dest_root.clone()
498    } else {
499        dest_root.join(dir_path.strip_prefix(source_root)?)
500    };
501    std::fs::create_dir_all(&reassembled_dir_path)?;
502
503    let entries: Vec<_> = std::fs::read_dir(dir_path)?
504        .map(|res| res.map(|e| e.path()))
505        .collect::<Result<Vec<_>, _>>()?;
506
507    let mut part_files: Vec<_> = entries
508        .iter()
509        .filter(|path| path.is_file() && path.to_string_lossy().contains("part"))
510        .collect();
511
512    part_files.sort_by_key(|a| {
513        a.file_stem()
514            .unwrap()
515            .to_string_lossy()
516            .split(".part")
517            .nth(1)
518            .unwrap()
519            .parse::<u32>()
520            .unwrap()
521    });
522
523    if part_files.is_empty() {
524        return Ok(());
525    }
526
527    let output_file_path = reassembled_dir_path.join("reassembled.log");
528    println!("Creating reassembled file at {output_file_path:#?}");
529    let mut output_file = File::create(&output_file_path)?;
530    for part_file in part_files.iter() {
531        let mut part_content = String::new();
532        File::open(part_file)?.read_to_string(&mut part_content)?;
533
534        // For some reason logstash writes "\n" as a literal string rather than a newline
535        // character.
536        part_content = part_content.replace("\\n", "\n");
537
538        let mut cursor = Cursor::new(part_content);
539        std::io::copy(&mut cursor, &mut output_file)?;
540        std::fs::remove_file(part_file)?;
541    }
542
543    Ok(())
544}
545
546fn visit_dirs(
547    dir: &Path,
548    cb: &dyn Fn(&Path, &PathBuf, &PathBuf) -> Result<()>,
549    source_root: &PathBuf,
550    dest_root: &PathBuf,
551) -> Result<()> {
552    if dir.is_dir() {
553        cb(dir, source_root, dest_root)?;
554        for entry in std::fs::read_dir(dir)? {
555            let entry = entry?;
556            let path = entry.path();
557            if path.is_dir() {
558                visit_dirs(&path, cb, dest_root, dest_root)?;
559            }
560        }
561    }
562    Ok(())
563}
564
565// Create the log dirs for all the machines. Returns the absolute path to the `logs/name`
566fn create_initial_log_dir_setup_node(
567    root_dir: &Path,
568    name: &str,
569    all_node_inventory: &[VirtualMachine],
570) -> Result<PathBuf> {
571    let log_dest = root_dir.join("logs").join(name);
572    if !log_dest.exists() {
573        std::fs::create_dir_all(&log_dest)?;
574    }
575    // Get the absolute path here. We might be changing the current_dir and we don't want to run into problems.
576    let log_abs_dest = std::fs::canonicalize(log_dest)?;
577    // Create a log dir per VM
578    all_node_inventory.par_iter().for_each(|vm| {
579        let vm_path = log_abs_dest.join(&vm.name);
580        let _ = std::fs::create_dir_all(vm_path);
581    });
582    Ok(log_abs_dest)
583}
584
585// Create the log dirs for all the machines.
586fn create_initial_log_dir_setup_client(
587    root_dir: &Path,
588    name: &str,
589    all_node_inventory: &[(VirtualMachine, usize)],
590) -> Result<()> {
591    let log_dest = root_dir.join("logs").join(name);
592    if !log_dest.exists() {
593        std::fs::create_dir_all(&log_dest)?;
594    }
595    // Get the absolute path here. We might be changing the current_dir and we don't want to run into problems.
596    let log_abs_dest = std::fs::canonicalize(log_dest)?;
597    // Create a log dir per VM
598    all_node_inventory.par_iter().for_each(|(vm, users)| {
599        for ant_user in 1..=*users {
600            let vm_path = log_abs_dest.join(format!("{}_ant{ant_user}", vm.name));
601            let _ = std::fs::create_dir_all(vm_path);
602        }
603    });
604    Ok(())
605}