sn_testnet_deploy/
logs.rs

1// Copyright (c) 2023, MaidSafe.
2// All rights reserved.
3//
4// This SAFE Network Software is licensed under the BSD-3-Clause license.
5// Please see the LICENSE file for more details.
6
7use crate::{
8    error::{Error, Result},
9    get_progress_bar,
10    inventory::VirtualMachine,
11    run_external_command,
12    s3::S3Repository,
13    TestnetDeployer,
14};
15use fs_extra::dir::{copy, remove, CopyOptions};
16use log::debug;
17use rayon::iter::{IntoParallelIterator, IntoParallelRefIterator, ParallelIterator};
18use std::{
19    fs::File,
20    io::{Cursor, Read, Write},
21    path::{Path, PathBuf},
22};
23
24const DEFAULT_RSYNC_ARGS: [&str; 8] = [
25    "--compress",
26    "--archive",
27    "--prune-empty-dirs",
28    "--verbose",
29    "--verbose",
30    "--filter=+ */",     // Include all directories for traversal
31    "--filter=+ *.log*", // Include all *.log* files
32    "--filter=- *",      // Exclude all other files
33];
34
35const NODE_LOG_DIR: &str = "/mnt/antnode-storage/log/";
36
37impl TestnetDeployer {
38    pub fn rsync_logs(
39        &self,
40        name: &str,
41        vm_filter: Option<String>,
42        disable_client_logs: bool,
43    ) -> Result<()> {
44        // take root_dir at the top as `get_all_node_inventory` changes the working dir.
45        let root_dir = std::env::current_dir()?;
46
47        let mut all_inventory = vec![];
48        if !disable_client_logs {
49            all_inventory.extend(self.get_client_inventory(name)?);
50        }
51
52        all_inventory.extend(self.get_all_node_inventory(name)?);
53
54        let all_inventory = if let Some(filter) = vm_filter {
55            all_inventory
56                .into_iter()
57                .filter(|vm| vm.name.contains(&filter))
58                .collect()
59        } else {
60            all_inventory
61        };
62
63        let log_base_dir = create_initial_log_dir_setup(&root_dir, name, &all_inventory)?;
64
65        // We might use the script, so goto the resource dir.
66        std::env::set_current_dir(self.working_directory_path.clone())?;
67        println!("Starting to rsync the log files");
68        let progress_bar = get_progress_bar(all_inventory.len() as u64)?;
69
70        let rsync_args = all_inventory
71            .iter()
72            .map(|vm| {
73                let args = if vm.name.contains("symmetric") {
74                    let args = self.construct_symmetric_private_node_args(vm, &log_base_dir)?;
75                    debug!("Using symmetric rsync args for {:?}", vm.name);
76                    debug!("Args for {}: {:?}", vm.name, args);
77                    args
78                } else if vm.name.contains("full-cone") {
79                    let args = self.construct_full_cone_private_node_args(vm, &log_base_dir)?;
80                    debug!("Using symmetric rsync args for {:?}", vm.name);
81                    debug!("Args for {}: {:?}", vm.name, args);
82                    args
83                } else if vm.name.contains("ant-client") {
84                    let args = self.construct_client_args(vm, &log_base_dir);
85                    debug!("Using Client rsync args for {:?} ", vm.name);
86                    debug!("Args for {}: {:?}", vm.name, args);
87                    args
88                } else {
89                    let args = self.construct_public_node_args(vm, &log_base_dir);
90                    debug!("Using public rsync args for {:?}", vm.name);
91                    debug!("Args for {}: {:?}", vm.name, args);
92                    args
93                };
94
95                Ok((vm.clone(), args))
96            })
97            .collect::<Result<Vec<_>>>()?;
98
99        let failed_inventory = rsync_args
100            .par_iter()
101            .filter_map(|(vm, args)| {
102                if let Err(err) = Self::run_rsync(vm, args) {
103                    println!(
104                        "Failed to rsync. Retrying it after ssh-keygen {:?} : {} with err: {err:?}",
105                        vm.name, vm.public_ip_addr
106                    );
107                    return Some((vm.clone(), args.clone()));
108                }
109                progress_bar.inc(1);
110                None
111            })
112            .collect::<Vec<_>>();
113
114        // try ssh-keygen for the failed inventory and try to rsync again
115        failed_inventory
116            .into_par_iter()
117            .for_each(|(vm, args)| {
118                debug!("Trying to ssh-keygen for {:?} : {}", vm.name, vm.public_ip_addr);
119                if let Err(err) = run_external_command(
120                    PathBuf::from("ssh-keygen"),
121                    PathBuf::from("."),
122                    vec!["-R".to_string(), format!("{}", vm.public_ip_addr)],
123                    false,
124                    false,
125                ) {
126                    println!("Failed to ssh-keygen {:?} : {} with err: {err:?}", vm.name, vm.public_ip_addr);
127                } else if let Err(err) =
128                    Self::run_rsync(&vm, &args)
129                {
130                    println!("Failed to rsync even after ssh-keygen. Could not obtain logs for {:?} : {} with err: {err:?}", vm.name, vm.public_ip_addr);
131                }
132                progress_bar.inc(1);
133            });
134        progress_bar.finish_and_clear();
135        println!("Rsync completed!");
136        Ok(())
137    }
138
139    fn construct_client_args(&self, vm: &VirtualMachine, log_base_dir: &Path) -> Vec<String> {
140        let vm_path = log_base_dir.join(&vm.name);
141        let mut rsync_args = DEFAULT_RSYNC_ARGS
142            .iter()
143            .map(|str| str.to_string())
144            .collect::<Vec<String>>();
145
146        // TODO: SSH limits the connections/instances to 10 at a time. Changing /etc/ssh/sshd_config, doesn't work?
147        // How to bypass this?
148        rsync_args.extend(vec![
149            "-e".to_string(),
150            format!(
151                "ssh -i {} -q -o StrictHostKeyChecking=no -o BatchMode=yes -o ConnectTimeout=30",
152                self.ssh_client
153                    .get_private_key_path()
154                    .to_string_lossy()
155                    .as_ref()
156            ),
157            format!("root@{}:/mnt/client-logs/log/", vm.public_ip_addr),
158            vm_path.to_string_lossy().to_string(),
159        ]);
160
161        rsync_args
162    }
163
164    fn construct_public_node_args(&self, vm: &VirtualMachine, log_base_dir: &Path) -> Vec<String> {
165        let vm_path = log_base_dir.join(&vm.name);
166        let mut rsync_args = DEFAULT_RSYNC_ARGS
167            .iter()
168            .map(|str| str.to_string())
169            .collect::<Vec<String>>();
170
171        // TODO: SSH limits the connections/instances to 10 at a time. Changing /etc/ssh/sshd_config, doesn't work?
172        // How to bypass this?
173        rsync_args.extend(vec![
174            "-e".to_string(),
175            format!(
176                "ssh -i {} -q -o StrictHostKeyChecking=no -o BatchMode=yes -o ConnectTimeout=30",
177                self.ssh_client
178                    .get_private_key_path()
179                    .to_string_lossy()
180                    .as_ref()
181            ),
182            format!("root@{}:{NODE_LOG_DIR}", vm.public_ip_addr),
183            vm_path.to_string_lossy().to_string(),
184        ]);
185
186        rsync_args
187    }
188
189    fn construct_full_cone_private_node_args(
190        &self,
191        private_vm: &VirtualMachine,
192        log_base_dir: &Path,
193    ) -> Result<Vec<String>> {
194        let vm_path = log_base_dir.join(&private_vm.name);
195
196        let mut rsync_args = DEFAULT_RSYNC_ARGS
197            .iter()
198            .map(|str| str.to_string())
199            .collect::<Vec<String>>();
200
201        let read_lock = self.ssh_client.routed_vms.read().map_err(|err| {
202            log::error!("Failed to set routed VMs: {err}");
203            Error::SshSettingsRwLockError
204        })?;
205        let (_, gateway_ip) = read_lock
206            .as_ref()
207            .and_then(|routed_vms| {
208                routed_vms.find_full_cone_nat_routed_node(&private_vm.public_ip_addr)
209            })
210            .ok_or(Error::RoutedVmNotFound(private_vm.public_ip_addr))?;
211
212        rsync_args.extend(vec![
213            "-e".to_string(),
214            format!(
215                "ssh -i {} -q -o StrictHostKeyChecking=no -o BatchMode=yes -o ConnectTimeout=30",
216                self.ssh_client
217                    .get_private_key_path()
218                    .to_string_lossy()
219                    .as_ref()
220            ),
221            format!("root@{}:{NODE_LOG_DIR}", gateway_ip),
222            vm_path.to_string_lossy().to_string(),
223        ]);
224
225        Ok(rsync_args)
226    }
227
228    fn construct_symmetric_private_node_args(
229        &self,
230        private_vm: &VirtualMachine,
231        log_base_dir: &Path,
232    ) -> Result<Vec<String>> {
233        let vm_path = log_base_dir.join(&private_vm.name);
234
235        let mut rsync_args = DEFAULT_RSYNC_ARGS
236            .iter()
237            .map(|str| str.to_string())
238            .collect::<Vec<String>>();
239
240        let read_lock = self.ssh_client.routed_vms.read().map_err(|err| {
241            log::error!("Failed to set routed VMs: {err}");
242            Error::SshSettingsRwLockError
243        })?;
244        let (_, gateway_ip) = read_lock
245            .as_ref()
246            .and_then(|routed_vms| {
247                routed_vms.find_symmetric_nat_routed_node(&private_vm.public_ip_addr)
248            })
249            .ok_or(Error::RoutedVmNotFound(private_vm.public_ip_addr))?;
250
251        rsync_args.extend(vec![
252                "-e".to_string(),
253                format!(
254                    "ssh -i {} -q -o StrictHostKeyChecking=no -o BatchMode=yes -o ConnectTimeout=30 -o ProxyCommand='ssh root@{gateway_ip} -W %h:%p -i {}'",
255                    self.ssh_client
256                        .get_private_key_path()
257                        .to_string_lossy()
258                        .as_ref(),
259                    self.ssh_client
260                        .get_private_key_path()
261                        .to_string_lossy()
262                        .as_ref(),
263                ),
264                format!("root@{}:{NODE_LOG_DIR}", private_vm.private_ip_addr),
265                vm_path.to_string_lossy().to_string(),
266            ]);
267
268        Ok(rsync_args)
269    }
270
271    fn run_rsync(vm: &VirtualMachine, rsync_args: &[String]) -> Result<()> {
272        debug!(
273            "Rsync logs to our machine for {:?} : {}",
274            vm.name, vm.public_ip_addr
275        );
276        run_external_command(
277            PathBuf::from("rsync"),
278            PathBuf::from("."),
279            rsync_args.to_vec(),
280            true,
281            false,
282        )?;
283
284        debug!(
285            "Finished rsync for for {:?} : {}",
286            vm.name, vm.public_ip_addr
287        );
288        Ok(())
289    }
290
291    pub fn ripgrep_logs(&self, name: &str, rg_args: &str) -> Result<()> {
292        // take root_dir at the top as `get_all_node_inventory` changes the working dir.
293        let root_dir = std::env::current_dir()?;
294        let all_node_inventory = self.get_all_node_inventory(name)?;
295        let log_abs_dest = create_initial_log_dir_setup(&root_dir, name, &all_node_inventory)?;
296
297        let rg_cmd = format!("rg {rg_args} /mnt/antnode-storage/log//");
298        println!("Running ripgrep with command: {rg_cmd}");
299
300        // Get current date and time
301        let now = chrono::Utc::now();
302        let timestamp = now.format("%Y%m%dT%H%M%S").to_string();
303        let progress_bar = get_progress_bar(all_node_inventory.len() as u64)?;
304        let _failed_inventory = all_node_inventory
305            .par_iter()
306            .filter_map(|vm| {
307                let op =
308                    match self
309                        .ssh_client
310                        .run_command(&vm.public_ip_addr, "root", &rg_cmd, true)
311                    {
312                        Ok(output) => {
313                            match Self::store_rg_output(
314                                &timestamp,
315                                &rg_cmd,
316                                &output,
317                                &log_abs_dest,
318                                &vm.name,
319                            ) {
320                                Ok(_) => None,
321                                Err(err) => {
322                                    println!(
323                                        "Failed store output for {:?} with: {err:?}",
324                                        vm.public_ip_addr
325                                    );
326                                    Some(vm)
327                                }
328                            }
329                        }
330                        Err(Error::ExternalCommandRunFailed {
331                            binary,
332                            exit_status,
333                        }) => {
334                            if let Some(1) = exit_status.code() {
335                                debug!("No matches found for {:?}", vm.public_ip_addr);
336                                match Self::store_rg_output(
337                                    &timestamp,
338                                    &rg_cmd,
339                                    &["No matches found".to_string()],
340                                    &log_abs_dest,
341                                    &vm.name,
342                                ) {
343                                    Ok(_) => None,
344                                    Err(err) => {
345                                        println!(
346                                            "Failed store output for {:?} with: {err:?}",
347                                            vm.public_ip_addr
348                                        );
349                                        Some(vm)
350                                    }
351                                }
352                            } else {
353                                println!(
354                                    "Failed to run rg query for {:?} with: {binary}",
355                                    vm.public_ip_addr
356                                );
357                                Some(vm)
358                            }
359                        }
360                        Err(err) => {
361                            println!(
362                                "Failed to run rg query for {:?} with: {err:?}",
363                                vm.public_ip_addr
364                            );
365                            Some(vm)
366                        }
367                    };
368                progress_bar.inc(1);
369                op
370            })
371            .collect::<Vec<_>>();
372
373        progress_bar.finish_and_clear();
374        println!("Ripgrep completed!");
375
376        Ok(())
377    }
378
379    fn store_rg_output(
380        timestamp: &str,
381        cmd: &str,
382        output: &[String],
383        log_abs_dest: &Path,
384        vm_name: &str,
385    ) -> Result<()> {
386        std::fs::create_dir_all(log_abs_dest.join(vm_name))?;
387
388        let mut file = File::create(
389            log_abs_dest
390                .join(vm_name)
391                .join(format!("rg-{timestamp}.log")),
392        )?;
393
394        writeln!(file, "Command: {cmd}")?;
395
396        for line in output {
397            writeln!(file, "{}", line)?;
398        }
399
400        Ok(())
401    }
402
403    /// Run an Ansible playbook to copy the logs from all the machines in the inventory.
404    ///
405    /// It needs to be part of `TestnetDeploy` because the Ansible runner is already setup in that
406    /// context.
407    pub fn copy_logs(&self, name: &str, resources_only: bool) -> Result<()> {
408        let dest = PathBuf::from(".").join("logs").join(name);
409        if dest.exists() {
410            println!("Removing existing {} directory", dest.to_string_lossy());
411            remove(dest.clone())?;
412        }
413        std::fs::create_dir_all(&dest)?;
414        self.ansible_provisioner.copy_logs(name, resources_only)?;
415        Ok(())
416    }
417
418    // Return the list of all the node machines.
419    fn get_all_node_inventory(&self, name: &str) -> Result<Vec<VirtualMachine>> {
420        let environments = self.terraform_runner.workspace_list()?;
421        if !environments.contains(&name.to_string()) {
422            return Err(Error::EnvironmentDoesNotExist(name.to_string()));
423        }
424        self.ansible_provisioner.get_all_node_inventory()
425    }
426
427    fn get_client_inventory(&self, name: &str) -> Result<Vec<VirtualMachine>> {
428        let environments = self.terraform_runner.workspace_list()?;
429        if !environments.contains(&name.to_string()) {
430            return Err(Error::EnvironmentDoesNotExist(name.to_string()));
431        }
432        self.ansible_provisioner.get_client_inventory()
433    }
434}
435
436pub async fn get_logs(name: &str) -> Result<()> {
437    let dest_path = std::env::current_dir()?.join("logs").join(name);
438    std::fs::create_dir_all(dest_path.clone())?;
439    let s3_repository = S3Repository {};
440    s3_repository
441        .download_folder("sn-testnet", &format!("testnet-logs/{name}"), &dest_path)
442        .await?;
443    Ok(())
444}
445
446pub fn reassemble_logs(name: &str) -> Result<()> {
447    let src = PathBuf::from(".").join("logs").join(name);
448    if !src.exists() {
449        return Err(Error::LogsNotRetrievedError(name.to_string()));
450    }
451    let dest = PathBuf::from(".")
452        .join("logs")
453        .join(format!("{name}-reassembled"));
454    if dest.exists() {
455        println!("Removing previous {name}-reassembled directory");
456        remove(dest.clone())?;
457    }
458
459    std::fs::create_dir_all(&dest)?;
460    let mut options = CopyOptions::new();
461    options.overwrite = true;
462    copy(src.clone(), dest.clone(), &options)?;
463
464    visit_dirs(&dest, &process_part_files, &src, &dest)?;
465    Ok(())
466}
467
468pub async fn rm_logs(name: &str) -> Result<()> {
469    let s3_repository = S3Repository {};
470    s3_repository
471        .delete_folder("sn-testnet", &format!("testnet-logs/{name}"))
472        .await?;
473    Ok(())
474}
475
476fn process_part_files(dir_path: &Path, source_root: &PathBuf, dest_root: &PathBuf) -> Result<()> {
477    let reassembled_dir_path = if dir_path == dest_root {
478        dest_root.clone()
479    } else {
480        dest_root.join(dir_path.strip_prefix(source_root)?)
481    };
482    std::fs::create_dir_all(&reassembled_dir_path)?;
483
484    let entries: Vec<_> = std::fs::read_dir(dir_path)?
485        .map(|res| res.map(|e| e.path()))
486        .collect::<Result<Vec<_>, _>>()?;
487
488    let mut part_files: Vec<_> = entries
489        .iter()
490        .filter(|path| path.is_file() && path.to_string_lossy().contains("part"))
491        .collect();
492
493    part_files.sort_by_key(|a| {
494        a.file_stem()
495            .unwrap()
496            .to_string_lossy()
497            .split(".part")
498            .nth(1)
499            .unwrap()
500            .parse::<u32>()
501            .unwrap()
502    });
503
504    if part_files.is_empty() {
505        return Ok(());
506    }
507
508    let output_file_path = reassembled_dir_path.join("reassembled.log");
509    println!("Creating reassembled file at {output_file_path:#?}");
510    let mut output_file = File::create(&output_file_path)?;
511    for part_file in part_files.iter() {
512        let mut part_content = String::new();
513        File::open(part_file)?.read_to_string(&mut part_content)?;
514
515        // For some reason logstash writes "\n" as a literal string rather than a newline
516        // character.
517        part_content = part_content.replace("\\n", "\n");
518
519        let mut cursor = Cursor::new(part_content);
520        std::io::copy(&mut cursor, &mut output_file)?;
521        std::fs::remove_file(part_file)?;
522    }
523
524    Ok(())
525}
526
527fn visit_dirs(
528    dir: &Path,
529    cb: &dyn Fn(&Path, &PathBuf, &PathBuf) -> Result<()>,
530    source_root: &PathBuf,
531    dest_root: &PathBuf,
532) -> Result<()> {
533    if dir.is_dir() {
534        cb(dir, source_root, dest_root)?;
535        for entry in std::fs::read_dir(dir)? {
536            let entry = entry?;
537            let path = entry.path();
538            if path.is_dir() {
539                visit_dirs(&path, cb, dest_root, dest_root)?;
540            }
541        }
542    }
543    Ok(())
544}
545
546// Create the log dirs for all the machines. Returns the absolute path to the `logs/name`
547fn create_initial_log_dir_setup(
548    root_dir: &Path,
549    name: &str,
550    all_node_inventory: &[VirtualMachine],
551) -> Result<PathBuf> {
552    let log_dest = root_dir.join("logs").join(name);
553    if !log_dest.exists() {
554        std::fs::create_dir_all(&log_dest)?;
555    }
556    // Get the absolute path here. We might be changing the current_dir and we don't want to run into problems.
557    let log_abs_dest = std::fs::canonicalize(log_dest)?;
558    // Create a log dir per VM
559    all_node_inventory.par_iter().for_each(|vm| {
560        let vm_path = log_abs_dest.join(&vm.name);
561        let _ = std::fs::create_dir_all(vm_path);
562    });
563    Ok(log_abs_dest)
564}