sn_testnet_deploy/
logs.rs

1// Copyright (c) 2023, MaidSafe.
2// All rights reserved.
3//
4// This SAFE Network Software is licensed under the BSD-3-Clause license.
5// Please see the LICENSE file for more details.
6
7use crate::{
8    error::{Error, Result},
9    get_progress_bar,
10    inventory::VirtualMachine,
11    run_external_command,
12    s3::S3Repository,
13    TestnetDeployer,
14};
15use fs_extra::dir::{copy, remove, CopyOptions};
16use log::debug;
17use rayon::iter::{IntoParallelIterator, IntoParallelRefIterator, ParallelIterator};
18use std::{
19    fs::File,
20    io::{Cursor, Read, Write},
21    net::IpAddr,
22    path::{Path, PathBuf},
23};
24
25impl TestnetDeployer {
26    pub fn rsync_logs(&self, name: &str, vm_filter: Option<String>) -> Result<()> {
27        // take root_dir at the top as `get_all_node_inventory` changes the working dir.
28        let root_dir = std::env::current_dir()?;
29        let all_node_inventory = self.get_all_node_inventory(name)?;
30        let all_node_inventory = if let Some(filter) = vm_filter {
31            all_node_inventory
32                .into_iter()
33                .filter(|vm| vm.name.contains(&filter))
34                .collect()
35        } else {
36            all_node_inventory
37        };
38
39        let log_abs_dest = create_initial_log_dir_setup(&root_dir, name, &all_node_inventory)?;
40
41        // Rsync args
42        let rsync_args = vec![
43            "--compress".to_string(),
44            "--archive".to_string(),
45            "--prune-empty-dirs".to_string(),
46            "--verbose".to_string(),
47            "--verbose".to_string(),
48            "--filter=+ */".to_string(), // Include all directories for traversal
49            "--filter=+ *.log*".to_string(), // Include all *.log* files
50            "--filter=- *".to_string(),  // Exclude all other files
51        ];
52
53        let mut public_rsync_args = rsync_args.clone();
54        // Add the ssh details
55        // TODO: SSH limits the connections/instances to 10 at a time. Changing /etc/ssh/sshd_config, doesn't work?
56        // How to bypass this?
57        public_rsync_args.extend(vec![
58            "-e".to_string(),
59            format!(
60                "ssh -i {} -q -o StrictHostKeyChecking=no -o BatchMode=yes -o ConnectTimeout=30",
61                self.ssh_client
62                    .get_private_key_path()
63                    .to_string_lossy()
64                    .as_ref()
65            ),
66        ]);
67
68        // let symmetric_nat_gateway_inventory = self.get_symmetric_nat_gateway_inventory(name)?;
69        // let private_rsync_args = if !nat_gateway_inventory.is_empty() {
70        //     let nat_gateway_inventory = nat_gateway_inventory.first().unwrap();
71        //     let mut private_rsync_args = rsync_args.clone();
72        //     private_rsync_args.extend(vec![
73        //         "-e".to_string(),
74        //         format!(
75        //             "ssh -i {} -q -o StrictHostKeyChecking=no -o BatchMode=yes -o ConnectTimeout=30 -o ProxyCommand='ssh root@{} -W %h:%p -i {}'",
76        //             self.ssh_client
77        //                 .get_private_key_path()
78        //                 .to_string_lossy()
79        //                 .as_ref(),
80        //                 nat_gateway_inventory.public_ip_addr,
81        //             self.ssh_client
82        //                 .get_private_key_path()
83        //                 .to_string_lossy()
84        //                 .as_ref(),
85        //         ),
86        //     ]);
87        //     Some(private_rsync_args)
88        // } else {
89        //     None
90        // };
91
92        // We might use the script, so goto the resource dir.
93        std::env::set_current_dir(self.working_directory_path.clone())?;
94        println!("Starting to rsync the log files");
95        let progress_bar = get_progress_bar(all_node_inventory.len() as u64)?;
96
97        let failed_inventory = all_node_inventory.par_iter().filter_map(|vm| {
98            let args = if vm.name.contains("private") {
99                // if let Some(private_rsync_args) = &private_rsync_args {
100                //     debug!("Using private rsync args for {:?}", vm.name);
101                //     private_rsync_args
102                // } else {
103                debug!(
104                    "Fallback to public rsync args for private node {:?}",
105                    vm.name
106                );
107                &public_rsync_args
108                // }
109            } else {
110                debug!("Using public rsync args for {:?}", vm.name);
111                &public_rsync_args
112            };
113
114            if let Err(err) = Self::run_rsync(&vm.name, &vm.public_ip_addr, &log_abs_dest, args) {
115                println!(
116                    "Failed to rsync. Retrying it after ssh-keygen {:?} : {} with err: {err:?}",
117                    vm.name, vm.public_ip_addr
118                );
119                return Some(vm.clone());
120            }
121            progress_bar.inc(1);
122            None
123        });
124
125        // try ssh-keygen for the failed inventory and try to rsync again
126        failed_inventory
127            .into_par_iter()
128            .for_each(|vm| {
129                debug!("Trying to ssh-keygen for {:?} : {}", vm.name, vm.public_ip_addr);
130                if let Err(err) = run_external_command(
131                    PathBuf::from("ssh-keygen"),
132                    PathBuf::from("."),
133                    vec!["-R".to_string(), format!("{}", vm.public_ip_addr)],
134                    false,
135                    false,
136                ) {
137                    println!("Failed to ssh-keygen {:?} : {} with err: {err:?}", vm.name, vm.public_ip_addr);
138                } else if let Err(err) =
139                    Self::run_rsync(&vm.name, &vm.public_ip_addr, &log_abs_dest, &rsync_args)
140                {
141                    println!("Failed to rsync even after ssh-keygen. Could not obtain logs for {:?} : {} with err: {err:?}", vm.name, vm.public_ip_addr);
142                }
143                progress_bar.inc(1);
144            });
145        progress_bar.finish_and_clear();
146        println!("Rsync completed!");
147        Ok(())
148    }
149
150    fn run_rsync(
151        vm_name: &String,
152        ip_address: &IpAddr,
153        log_abs_dest: &Path,
154        rsync_args: &[String],
155    ) -> Result<()> {
156        let vm_path = log_abs_dest.join(vm_name);
157        let mut rsync_args_clone = rsync_args.to_vec();
158
159        rsync_args_clone.push(format!("root@{ip_address}:/mnt/antnode-storage/log/"));
160        rsync_args_clone.push(vm_path.to_string_lossy().to_string());
161
162        debug!("Rsync logs to our machine for {vm_name:?} : {ip_address}");
163        run_external_command(
164            PathBuf::from("rsync"),
165            PathBuf::from("."),
166            rsync_args_clone.clone(),
167            true,
168            false,
169        )?;
170
171        debug!("Finished rsync for for {vm_name:?} : {ip_address}");
172        Ok(())
173    }
174
175    pub fn ripgrep_logs(&self, name: &str, rg_args: &str) -> Result<()> {
176        // take root_dir at the top as `get_all_node_inventory` changes the working dir.
177        let root_dir = std::env::current_dir()?;
178        let all_node_inventory = self.get_all_node_inventory(name)?;
179        let log_abs_dest = create_initial_log_dir_setup(&root_dir, name, &all_node_inventory)?;
180
181        let rg_cmd = format!("rg {rg_args} /mnt/antnode-storage/log//");
182        println!("Running ripgrep with command: {rg_cmd}");
183
184        // Get current date and time
185        let now = chrono::Utc::now();
186        let timestamp = now.format("%Y%m%dT%H%M%S").to_string();
187        let progress_bar = get_progress_bar(all_node_inventory.len() as u64)?;
188        let _failed_inventory = all_node_inventory
189            .par_iter()
190            .filter_map(|vm| {
191                let op =
192                    match self
193                        .ssh_client
194                        .run_command(&vm.public_ip_addr, "root", &rg_cmd, true)
195                    {
196                        Ok(output) => {
197                            match Self::store_rg_output(
198                                &timestamp,
199                                &rg_cmd,
200                                &output,
201                                &log_abs_dest,
202                                &vm.name,
203                            ) {
204                                Ok(_) => None,
205                                Err(err) => {
206                                    println!(
207                                        "Failed store output for {:?} with: {err:?}",
208                                        vm.public_ip_addr
209                                    );
210                                    Some(vm)
211                                }
212                            }
213                        }
214                        Err(Error::ExternalCommandRunFailed {
215                            binary,
216                            exit_status,
217                        }) => {
218                            if let Some(1) = exit_status.code() {
219                                debug!("No matches found for {:?}", vm.public_ip_addr);
220                                match Self::store_rg_output(
221                                    &timestamp,
222                                    &rg_cmd,
223                                    &["No matches found".to_string()],
224                                    &log_abs_dest,
225                                    &vm.name,
226                                ) {
227                                    Ok(_) => None,
228                                    Err(err) => {
229                                        println!(
230                                            "Failed store output for {:?} with: {err:?}",
231                                            vm.public_ip_addr
232                                        );
233                                        Some(vm)
234                                    }
235                                }
236                            } else {
237                                println!(
238                                    "Failed to run rg query for {:?} with: {binary}",
239                                    vm.public_ip_addr
240                                );
241                                Some(vm)
242                            }
243                        }
244                        Err(err) => {
245                            println!(
246                                "Failed to run rg query for {:?} with: {err:?}",
247                                vm.public_ip_addr
248                            );
249                            Some(vm)
250                        }
251                    };
252                progress_bar.inc(1);
253                op
254            })
255            .collect::<Vec<_>>();
256
257        progress_bar.finish_and_clear();
258        println!("Ripgrep completed!");
259
260        Ok(())
261    }
262
263    fn store_rg_output(
264        timestamp: &str,
265        cmd: &str,
266        output: &[String],
267        log_abs_dest: &Path,
268        vm_name: &str,
269    ) -> Result<()> {
270        std::fs::create_dir_all(log_abs_dest.join(vm_name))?;
271
272        let mut file = File::create(
273            log_abs_dest
274                .join(vm_name)
275                .join(format!("rg-{timestamp}.log")),
276        )?;
277
278        writeln!(file, "Command: {cmd}")?;
279
280        for line in output {
281            writeln!(file, "{}", line)?;
282        }
283
284        Ok(())
285    }
286
287    /// Run an Ansible playbook to copy the logs from all the machines in the inventory.
288    ///
289    /// It needs to be part of `TestnetDeploy` because the Ansible runner is already setup in that
290    /// context.
291    pub fn copy_logs(&self, name: &str, resources_only: bool) -> Result<()> {
292        let dest = PathBuf::from(".").join("logs").join(name);
293        if dest.exists() {
294            println!("Removing existing {} directory", dest.to_string_lossy());
295            remove(dest.clone())?;
296        }
297        std::fs::create_dir_all(&dest)?;
298        self.ansible_provisioner.copy_logs(name, resources_only)?;
299        Ok(())
300    }
301
302    // Return the list of all the node machines.
303    fn get_all_node_inventory(&self, name: &str) -> Result<Vec<VirtualMachine>> {
304        let environments = self.terraform_runner.workspace_list()?;
305        if !environments.contains(&name.to_string()) {
306            return Err(Error::EnvironmentDoesNotExist(name.to_string()));
307        }
308        self.ansible_provisioner.get_all_node_inventory()
309    }
310
311    // fn get_symmetric_nat_gateway_inventory(&self, name: &str) -> Result<Vec<VirtualMachine>> {
312    //     let environments = self.terraform_runner.workspace_list()?;
313    //     if !environments.contains(&name.to_string()) {
314    //         return Err(Error::EnvironmentDoesNotExist(name.to_string()));
315    //     }
316    //     self.ansible_provisioner
317    //         .get_symmetric_nat_gateway_inventory()
318    // }
319
320    // fn get_full_cone_nat_gateway_inventory(&self, name: &str) -> Result<Vec<VirtualMachine>> {
321    //     let environments = self.terraform_runner.workspace_list()?;
322    //     if !environments.contains(&name.to_string()) {
323    //         return Err(Error::EnvironmentDoesNotExist(name.to_string()));
324    //     }
325    //     self.ansible_provisioner
326    //         .get_full_cone_nat_gateway_inventory()
327    // }
328}
329
330pub async fn get_logs(name: &str) -> Result<()> {
331    let dest_path = std::env::current_dir()?.join("logs").join(name);
332    std::fs::create_dir_all(dest_path.clone())?;
333    let s3_repository = S3Repository {};
334    s3_repository
335        .download_folder("sn-testnet", &format!("testnet-logs/{name}"), &dest_path)
336        .await?;
337    Ok(())
338}
339
340pub fn reassemble_logs(name: &str) -> Result<()> {
341    let src = PathBuf::from(".").join("logs").join(name);
342    if !src.exists() {
343        return Err(Error::LogsNotRetrievedError(name.to_string()));
344    }
345    let dest = PathBuf::from(".")
346        .join("logs")
347        .join(format!("{name}-reassembled"));
348    if dest.exists() {
349        println!("Removing previous {name}-reassembled directory");
350        remove(dest.clone())?;
351    }
352
353    std::fs::create_dir_all(&dest)?;
354    let mut options = CopyOptions::new();
355    options.overwrite = true;
356    copy(src.clone(), dest.clone(), &options)?;
357
358    visit_dirs(&dest, &process_part_files, &src, &dest)?;
359    Ok(())
360}
361
362pub async fn rm_logs(name: &str) -> Result<()> {
363    let s3_repository = S3Repository {};
364    s3_repository
365        .delete_folder("sn-testnet", &format!("testnet-logs/{name}"))
366        .await?;
367    Ok(())
368}
369
370fn process_part_files(dir_path: &Path, source_root: &PathBuf, dest_root: &PathBuf) -> Result<()> {
371    let reassembled_dir_path = if dir_path == dest_root {
372        dest_root.clone()
373    } else {
374        dest_root.join(dir_path.strip_prefix(source_root)?)
375    };
376    std::fs::create_dir_all(&reassembled_dir_path)?;
377
378    let entries: Vec<_> = std::fs::read_dir(dir_path)?
379        .map(|res| res.map(|e| e.path()))
380        .collect::<Result<Vec<_>, _>>()?;
381
382    let mut part_files: Vec<_> = entries
383        .iter()
384        .filter(|path| path.is_file() && path.to_string_lossy().contains("part"))
385        .collect();
386
387    part_files.sort_by_key(|a| {
388        a.file_stem()
389            .unwrap()
390            .to_string_lossy()
391            .split(".part")
392            .nth(1)
393            .unwrap()
394            .parse::<u32>()
395            .unwrap()
396    });
397
398    if part_files.is_empty() {
399        return Ok(());
400    }
401
402    let output_file_path = reassembled_dir_path.join("reassembled.log");
403    println!("Creating reassembled file at {output_file_path:#?}");
404    let mut output_file = File::create(&output_file_path)?;
405    for part_file in part_files.iter() {
406        let mut part_content = String::new();
407        File::open(part_file)?.read_to_string(&mut part_content)?;
408
409        // For some reason logstash writes "\n" as a literal string rather than a newline
410        // character.
411        part_content = part_content.replace("\\n", "\n");
412
413        let mut cursor = Cursor::new(part_content);
414        std::io::copy(&mut cursor, &mut output_file)?;
415        std::fs::remove_file(part_file)?;
416    }
417
418    Ok(())
419}
420
421fn visit_dirs(
422    dir: &Path,
423    cb: &dyn Fn(&Path, &PathBuf, &PathBuf) -> Result<()>,
424    source_root: &PathBuf,
425    dest_root: &PathBuf,
426) -> Result<()> {
427    if dir.is_dir() {
428        cb(dir, source_root, dest_root)?;
429        for entry in std::fs::read_dir(dir)? {
430            let entry = entry?;
431            let path = entry.path();
432            if path.is_dir() {
433                visit_dirs(&path, cb, dest_root, dest_root)?;
434            }
435        }
436    }
437    Ok(())
438}
439
440// Create the log dirs for all the machines. Returns the absolute path to the `logs/name`
441fn create_initial_log_dir_setup(
442    root_dir: &Path,
443    name: &str,
444    all_node_inventory: &[VirtualMachine],
445) -> Result<PathBuf> {
446    let log_dest = root_dir.join("logs").join(name);
447    if !log_dest.exists() {
448        std::fs::create_dir_all(&log_dest)?;
449    }
450    // Get the absolute path here. We might be changing the current_dir and we don't want to run into problems.
451    let log_abs_dest = std::fs::canonicalize(log_dest)?;
452    // Create a log dir per VM
453    all_node_inventory.par_iter().for_each(|vm| {
454        let vm_path = log_abs_dest.join(&vm.name);
455        let _ = std::fs::create_dir_all(vm_path);
456    });
457    Ok(log_abs_dest)
458}