sn_testnet_deploy/
logs.rs

1// Copyright (c) 2023, MaidSafe.
2// All rights reserved.
3//
4// This SAFE Network Software is licensed under the BSD-3-Clause license.
5// Please see the LICENSE file for more details.
6
7use crate::{
8    error::{Error, Result},
9    get_progress_bar,
10    inventory::VirtualMachine,
11    run_external_command,
12    s3::S3Repository,
13    TestnetDeployer,
14};
15use fs_extra::dir::{copy, remove, CopyOptions};
16use log::debug;
17use rayon::iter::{IntoParallelIterator, IntoParallelRefIterator, ParallelIterator};
18use std::{
19    fs::File,
20    io::{Cursor, Read, Write},
21    path::{Path, PathBuf},
22};
23
24const DEFAULT_RSYNC_ARGS: [&str; 8] = [
25    "--compress",
26    "--archive",
27    "--prune-empty-dirs",
28    "--verbose",
29    "--verbose",
30    "--filter=+ */",     // Include all directories for traversal
31    "--filter=+ *.log*", // Include all *.log* files
32    "--filter=- *",      // Exclude all other files
33];
34
35const NODE_LOG_DIR: &str = "/mnt/antnode-storage/log/";
36
37impl TestnetDeployer {
38    pub fn rsync_logs(
39        &self,
40        name: &str,
41        vm_filter: Option<String>,
42        disable_client_logs: bool,
43    ) -> Result<()> {
44        // take root_dir at the top as `get_all_node_inventory` changes the working dir.
45        let root_dir = std::env::current_dir()?;
46
47        let mut all_inventory = vec![];
48        if !disable_client_logs {
49            all_inventory.extend(self.get_client_inventory(name)?);
50        }
51
52        all_inventory.extend(self.get_all_node_inventory(name)?);
53
54        let all_inventory = if let Some(filter) = vm_filter {
55            all_inventory
56                .into_iter()
57                .filter(|vm| vm.name.contains(&filter))
58                .collect()
59        } else {
60            all_inventory
61        };
62
63        let log_base_dir = create_initial_log_dir_setup(&root_dir, name, &all_inventory)?;
64
65        // We might use the script, so goto the resource dir.
66        std::env::set_current_dir(self.working_directory_path.clone())?;
67        println!("Starting to rsync the log files");
68        let progress_bar = get_progress_bar(all_inventory.len() as u64)?;
69
70        let rsync_args = all_inventory
71            .iter()
72            .map(|vm| {
73                let args = if vm.name.contains("symmetric") {
74                    let args = self.construct_symmetric_private_node_args(vm, &log_base_dir)?;
75                    debug!("Using symmetric rsync args for {:?}", vm.name);
76                    debug!("Args for {}: {:?}", vm.name, args);
77                    args
78                } else if vm.name.contains("full-cone") {
79                    let args = self.construct_full_cone_private_node_args(vm, &log_base_dir)?;
80                    debug!("Using symmetric rsync args for {:?}", vm.name);
81                    debug!("Args for {}: {:?}", vm.name, args);
82                    args
83                } else if vm.name.contains("ant-client") {
84                    let args = self.construct_client_args(vm, &log_base_dir);
85                    debug!("Using Client rsync args for {:?} ", vm.name);
86                    debug!("Args for {}: {:?}", vm.name, args);
87                    args
88                } else {
89                    let args = self.construct_public_node_args(vm, &log_base_dir);
90                    debug!("Using public rsync args for {:?}", vm.name);
91                    debug!("Args for {}: {:?}", vm.name, args);
92                    args
93                };
94
95                Ok((vm.clone(), args))
96            })
97            .collect::<Result<Vec<_>>>()?;
98
99        let failed_inventory = rsync_args
100            .par_iter()
101            .filter_map(|(vm, args)| {
102                if let Err(err) = Self::run_rsync(vm, args) {
103                    println!(
104                        "Failed to rsync. Retrying it after ssh-keygen {:?} : {} with err: {err:?}",
105                        vm.name, vm.public_ip_addr
106                    );
107                    return Some((vm.clone(), args.clone()));
108                }
109                progress_bar.inc(1);
110                None
111            })
112            .collect::<Vec<_>>();
113
114        // try ssh-keygen for the failed inventory and try to rsync again
115        failed_inventory
116            .into_par_iter()
117            .for_each(|(vm, args)| {
118                debug!("Trying to ssh-keygen for {:?} : {}", vm.name, vm.public_ip_addr);
119                if let Err(err) = run_external_command(
120                    PathBuf::from("ssh-keygen"),
121                    PathBuf::from("."),
122                    vec!["-R".to_string(), format!("{}", vm.public_ip_addr)],
123                    false,
124                    false,
125                ) {
126                    println!("Failed to ssh-keygen {:?} : {} with err: {err:?}", vm.name, vm.public_ip_addr);
127                } else if let Err(err) =
128                    Self::run_rsync(&vm, &args)
129                {
130                    println!("Failed to rsync even after ssh-keygen. Could not obtain logs for {:?} : {} with err: {err:?}", vm.name, vm.public_ip_addr);
131                }
132                progress_bar.inc(1);
133            });
134        progress_bar.finish_and_clear();
135        println!("Rsync completed!");
136        Ok(())
137    }
138
139    fn construct_client_args(&self, vm: &VirtualMachine, log_base_dir: &Path) -> Vec<String> {
140        let vm_path = log_base_dir.join(&vm.name);
141        let mut rsync_args = DEFAULT_RSYNC_ARGS
142            .iter()
143            .map(|str| str.to_string())
144            .collect::<Vec<String>>();
145
146        // TODO: SSH limits the connections/instances to 10 at a time. Changing /etc/ssh/sshd_config, doesn't work?
147        // How to bypass this?
148        rsync_args.extend(vec![
149            "-e".to_string(),
150            format!(
151                "ssh -i {} -q -o StrictHostKeyChecking=no -o BatchMode=yes -o ConnectTimeout=30",
152                self.ssh_client
153                    .get_private_key_path()
154                    .to_string_lossy()
155                    .as_ref()
156            ),
157            format!("root@{}:/mnt/client/log/", vm.public_ip_addr),
158            vm_path.to_string_lossy().to_string(),
159        ]);
160
161        rsync_args
162    }
163
164    fn construct_public_node_args(&self, vm: &VirtualMachine, log_base_dir: &Path) -> Vec<String> {
165        let vm_path = log_base_dir.join(&vm.name);
166        let mut rsync_args = DEFAULT_RSYNC_ARGS
167            .iter()
168            .map(|str| str.to_string())
169            .collect::<Vec<String>>();
170
171        // TODO: SSH limits the connections/instances to 10 at a time. Changing /etc/ssh/sshd_config, doesn't work?
172        // How to bypass this?
173        rsync_args.extend(vec![
174            "-e".to_string(),
175            format!(
176                "ssh -i {} -q -o StrictHostKeyChecking=no -o BatchMode=yes -o ConnectTimeout=30",
177                self.ssh_client
178                    .get_private_key_path()
179                    .to_string_lossy()
180                    .as_ref()
181            ),
182            format!("root@{}:{NODE_LOG_DIR}", vm.public_ip_addr),
183            vm_path.to_string_lossy().to_string(),
184        ]);
185
186        rsync_args
187    }
188
189    fn construct_full_cone_private_node_args(
190        &self,
191        private_vm: &VirtualMachine,
192        log_base_dir: &Path,
193    ) -> Result<Vec<String>> {
194        let vm_path = log_base_dir.join(&private_vm.name);
195
196        let mut rsync_args = DEFAULT_RSYNC_ARGS
197            .iter()
198            .map(|str| str.to_string())
199            .collect::<Vec<String>>();
200
201        let read_lock = self.ssh_client.routed_vms.read().map_err(|err| {
202            log::error!("Failed to set routed VMs: {err}");
203            Error::SshSettingsRwLockError
204        })?;
205        let (_, gateway_ip) = read_lock
206            .as_ref()
207            .and_then(|routed_vms| {
208                routed_vms.find_full_cone_nat_routed_node(&private_vm.public_ip_addr)
209            })
210            .ok_or(Error::RoutedVmNotFound(private_vm.public_ip_addr))?;
211
212        rsync_args.extend(vec![
213            "-e".to_string(),
214            format!(
215                "ssh -i {} -q -o StrictHostKeyChecking=no -o BatchMode=yes -o ConnectTimeout=30",
216                self.ssh_client
217                    .get_private_key_path()
218                    .to_string_lossy()
219                    .as_ref()
220            ),
221            format!("root@{}:{NODE_LOG_DIR}", gateway_ip),
222            vm_path.to_string_lossy().to_string(),
223        ]);
224
225        Ok(rsync_args)
226    }
227
228    fn construct_symmetric_private_node_args(
229        &self,
230        private_vm: &VirtualMachine,
231        log_base_dir: &Path,
232    ) -> Result<Vec<String>> {
233        let vm_path = log_base_dir.join(&private_vm.name);
234
235        let mut rsync_args = DEFAULT_RSYNC_ARGS
236            .iter()
237            .map(|str| str.to_string())
238            .collect::<Vec<String>>();
239
240        let read_lock = self.ssh_client.routed_vms.read().map_err(|err| {
241            log::error!("Failed to set routed VMs: {err}");
242            Error::SshSettingsRwLockError
243        })?;
244        let (_, gateway_ip) = read_lock
245            .as_ref()
246            .and_then(|routed_vms| {
247                routed_vms.find_symmetric_nat_routed_node(&private_vm.public_ip_addr)
248            })
249            .ok_or(Error::RoutedVmNotFound(private_vm.public_ip_addr))?;
250
251        rsync_args.extend(vec![
252            "-e".to_string(),
253            format!(
254                "ssh -i {} -q -o StrictHostKeyChecking=no -o BatchMode=yes -o ConnectTimeout=30 -o ProxyCommand='ssh -o StrictHostKeyChecking=no -o BatchMode=yes root@{gateway_ip} -W %h:%p -i {}'",
255                self.ssh_client
256                    .get_private_key_path()
257                    .to_string_lossy()
258                    .as_ref(),
259                self.ssh_client
260                    .get_private_key_path()
261                    .to_string_lossy()
262                    .as_ref(),
263            ),
264            format!("root@{}:{NODE_LOG_DIR}", private_vm.private_ip_addr),
265            vm_path.to_string_lossy().to_string(),
266        ]);
267
268        Ok(rsync_args)
269    }
270    fn run_rsync(vm: &VirtualMachine, rsync_args: &[String]) -> Result<()> {
271        debug!(
272            "Rsync logs to our machine for {:?} : {}",
273            vm.name, vm.public_ip_addr
274        );
275        run_external_command(
276            PathBuf::from("rsync"),
277            PathBuf::from("."),
278            rsync_args.to_vec(),
279            true,
280            false,
281        )?;
282
283        debug!(
284            "Finished rsync for for {:?} : {}",
285            vm.name, vm.public_ip_addr
286        );
287        Ok(())
288    }
289
290    pub fn ripgrep_logs(&self, name: &str, rg_args: &str) -> Result<()> {
291        // take root_dir at the top as `get_all_node_inventory` changes the working dir.
292        let root_dir = std::env::current_dir()?;
293        let all_node_inventory = self.get_all_node_inventory(name)?;
294        let log_abs_dest = create_initial_log_dir_setup(&root_dir, name, &all_node_inventory)?;
295
296        let rg_cmd = format!("rg {rg_args} /mnt/antnode-storage/log//");
297        println!("Running ripgrep with command: {rg_cmd}");
298
299        // Get current date and time
300        let now = chrono::Utc::now();
301        let timestamp = now.format("%Y%m%dT%H%M%S").to_string();
302        let progress_bar = get_progress_bar(all_node_inventory.len() as u64)?;
303        let _failed_inventory = all_node_inventory
304            .par_iter()
305            .filter_map(|vm| {
306                let op =
307                    match self
308                        .ssh_client
309                        .run_command(&vm.public_ip_addr, "root", &rg_cmd, true)
310                    {
311                        Ok(output) => {
312                            match Self::store_rg_output(
313                                &timestamp,
314                                &rg_cmd,
315                                &output,
316                                &log_abs_dest,
317                                &vm.name,
318                            ) {
319                                Ok(_) => None,
320                                Err(err) => {
321                                    println!(
322                                        "Failed store output for {:?} with: {err:?}",
323                                        vm.public_ip_addr
324                                    );
325                                    Some(vm)
326                                }
327                            }
328                        }
329                        Err(Error::ExternalCommandRunFailed {
330                            binary,
331                            exit_status,
332                        }) => {
333                            if let Some(1) = exit_status.code() {
334                                debug!("No matches found for {:?}", vm.public_ip_addr);
335                                match Self::store_rg_output(
336                                    &timestamp,
337                                    &rg_cmd,
338                                    &["No matches found".to_string()],
339                                    &log_abs_dest,
340                                    &vm.name,
341                                ) {
342                                    Ok(_) => None,
343                                    Err(err) => {
344                                        println!(
345                                            "Failed store output for {:?} with: {err:?}",
346                                            vm.public_ip_addr
347                                        );
348                                        Some(vm)
349                                    }
350                                }
351                            } else {
352                                println!(
353                                    "Failed to run rg query for {:?} with: {binary}",
354                                    vm.public_ip_addr
355                                );
356                                Some(vm)
357                            }
358                        }
359                        Err(err) => {
360                            println!(
361                                "Failed to run rg query for {:?} with: {err:?}",
362                                vm.public_ip_addr
363                            );
364                            Some(vm)
365                        }
366                    };
367                progress_bar.inc(1);
368                op
369            })
370            .collect::<Vec<_>>();
371
372        progress_bar.finish_and_clear();
373        println!("Ripgrep completed!");
374
375        Ok(())
376    }
377
378    fn store_rg_output(
379        timestamp: &str,
380        cmd: &str,
381        output: &[String],
382        log_abs_dest: &Path,
383        vm_name: &str,
384    ) -> Result<()> {
385        std::fs::create_dir_all(log_abs_dest.join(vm_name))?;
386
387        let mut file = File::create(
388            log_abs_dest
389                .join(vm_name)
390                .join(format!("rg-{timestamp}.log")),
391        )?;
392
393        writeln!(file, "Command: {cmd}")?;
394
395        for line in output {
396            writeln!(file, "{line}")?;
397        }
398
399        Ok(())
400    }
401
402    /// Run an Ansible playbook to copy the logs from all the machines in the inventory.
403    ///
404    /// It needs to be part of `TestnetDeploy` because the Ansible runner is already setup in that
405    /// context.
406    pub fn copy_logs(&self, name: &str, resources_only: bool) -> Result<()> {
407        let dest = PathBuf::from(".").join("logs").join(name);
408        if dest.exists() {
409            println!("Removing existing {} directory", dest.to_string_lossy());
410            remove(dest.clone())?;
411        }
412        std::fs::create_dir_all(&dest)?;
413        self.ansible_provisioner.copy_logs(name, resources_only)?;
414        Ok(())
415    }
416
417    // Return the list of all the node machines.
418    fn get_all_node_inventory(&self, name: &str) -> Result<Vec<VirtualMachine>> {
419        let environments = self.terraform_runner.workspace_list()?;
420        if !environments.contains(&name.to_string()) {
421            return Err(Error::EnvironmentDoesNotExist(name.to_string()));
422        }
423        self.ansible_provisioner.get_all_node_inventory()
424    }
425
426    fn get_client_inventory(&self, name: &str) -> Result<Vec<VirtualMachine>> {
427        let environments = self.terraform_runner.workspace_list()?;
428        if !environments.contains(&name.to_string()) {
429            return Err(Error::EnvironmentDoesNotExist(name.to_string()));
430        }
431        self.ansible_provisioner.get_client_inventory()
432    }
433}
434
435pub async fn get_logs(name: &str) -> Result<()> {
436    let dest_path = std::env::current_dir()?.join("logs").join(name);
437    std::fs::create_dir_all(dest_path.clone())?;
438    let s3_repository = S3Repository {};
439    s3_repository
440        .download_folder("sn-testnet", &format!("testnet-logs/{name}"), &dest_path)
441        .await?;
442    Ok(())
443}
444
445pub fn reassemble_logs(name: &str) -> Result<()> {
446    let src = PathBuf::from(".").join("logs").join(name);
447    if !src.exists() {
448        return Err(Error::LogsNotRetrievedError(name.to_string()));
449    }
450    let dest = PathBuf::from(".")
451        .join("logs")
452        .join(format!("{name}-reassembled"));
453    if dest.exists() {
454        println!("Removing previous {name}-reassembled directory");
455        remove(dest.clone())?;
456    }
457
458    std::fs::create_dir_all(&dest)?;
459    let mut options = CopyOptions::new();
460    options.overwrite = true;
461    copy(src.clone(), dest.clone(), &options)?;
462
463    visit_dirs(&dest, &process_part_files, &src, &dest)?;
464    Ok(())
465}
466
467pub async fn rm_logs(name: &str) -> Result<()> {
468    let s3_repository = S3Repository {};
469    s3_repository
470        .delete_folder("sn-testnet", &format!("testnet-logs/{name}"))
471        .await?;
472    Ok(())
473}
474
475fn process_part_files(dir_path: &Path, source_root: &PathBuf, dest_root: &PathBuf) -> Result<()> {
476    let reassembled_dir_path = if dir_path == dest_root {
477        dest_root.clone()
478    } else {
479        dest_root.join(dir_path.strip_prefix(source_root)?)
480    };
481    std::fs::create_dir_all(&reassembled_dir_path)?;
482
483    let entries: Vec<_> = std::fs::read_dir(dir_path)?
484        .map(|res| res.map(|e| e.path()))
485        .collect::<Result<Vec<_>, _>>()?;
486
487    let mut part_files: Vec<_> = entries
488        .iter()
489        .filter(|path| path.is_file() && path.to_string_lossy().contains("part"))
490        .collect();
491
492    part_files.sort_by_key(|a| {
493        a.file_stem()
494            .unwrap()
495            .to_string_lossy()
496            .split(".part")
497            .nth(1)
498            .unwrap()
499            .parse::<u32>()
500            .unwrap()
501    });
502
503    if part_files.is_empty() {
504        return Ok(());
505    }
506
507    let output_file_path = reassembled_dir_path.join("reassembled.log");
508    println!("Creating reassembled file at {output_file_path:#?}");
509    let mut output_file = File::create(&output_file_path)?;
510    for part_file in part_files.iter() {
511        let mut part_content = String::new();
512        File::open(part_file)?.read_to_string(&mut part_content)?;
513
514        // For some reason logstash writes "\n" as a literal string rather than a newline
515        // character.
516        part_content = part_content.replace("\\n", "\n");
517
518        let mut cursor = Cursor::new(part_content);
519        std::io::copy(&mut cursor, &mut output_file)?;
520        std::fs::remove_file(part_file)?;
521    }
522
523    Ok(())
524}
525
526fn visit_dirs(
527    dir: &Path,
528    cb: &dyn Fn(&Path, &PathBuf, &PathBuf) -> Result<()>,
529    source_root: &PathBuf,
530    dest_root: &PathBuf,
531) -> Result<()> {
532    if dir.is_dir() {
533        cb(dir, source_root, dest_root)?;
534        for entry in std::fs::read_dir(dir)? {
535            let entry = entry?;
536            let path = entry.path();
537            if path.is_dir() {
538                visit_dirs(&path, cb, dest_root, dest_root)?;
539            }
540        }
541    }
542    Ok(())
543}
544
545// Create the log dirs for all the machines. Returns the absolute path to the `logs/name`
546fn create_initial_log_dir_setup(
547    root_dir: &Path,
548    name: &str,
549    all_node_inventory: &[VirtualMachine],
550) -> Result<PathBuf> {
551    let log_dest = root_dir.join("logs").join(name);
552    if !log_dest.exists() {
553        std::fs::create_dir_all(&log_dest)?;
554    }
555    // Get the absolute path here. We might be changing the current_dir and we don't want to run into problems.
556    let log_abs_dest = std::fs::canonicalize(log_dest)?;
557    // Create a log dir per VM
558    all_node_inventory.par_iter().for_each(|vm| {
559        let vm_path = log_abs_dest.join(&vm.name);
560        let _ = std::fs::create_dir_all(vm_path);
561    });
562    Ok(log_abs_dest)
563}