ckb_dev/
execute.rs

1use std::{
2    fs::OpenOptions,
3    io::{prelude::*, BufReader, SeekFrom},
4    process::{Command, Stdio},
5};
6
7use chrono::{DateTime, Utc};
8use fs_extra::dir;
9use regex::Regex;
10use serde_json::{json, to_string_pretty};
11use tempfile::TempDir;
12use walkdir::WalkDir;
13
14use crate::{
15    argument::{Args, BackupArgs, L1Args, L2Args, RpcArgs},
16    config::Config,
17    error::{Error, Result},
18    qiniu,
19    rpc_client::RpcClient,
20};
21
22const LOG_TIMESTAMP_REGEX: &str =
23    r"^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}([.]\d{1,3}) [+-]\d{2}:\d{2} ";
24const LOG_TIMESTAMP_FORMAT: &str = "%Y-%m-%d %H:%M:%S.%f %:z";
25const LOG_MIN_TAIL_CHECK: usize = 200;
26
27pub trait CanExecute {
28    fn execute(&self, cfg: &Config) -> Result<()>;
29}
30
31impl CanExecute for Args {
32    fn execute(&self, cfg: &Config) -> Result<()> {
33        match self {
34            Self::L1(inner) => inner.execute(cfg),
35            Self::L2(inner) => inner.execute(cfg),
36            Self::Backup(inner) => inner.execute(cfg),
37            Self::Rpc(inner) => inner.execute(cfg),
38        }
39    }
40}
41
42impl CanExecute for L1Args {
43    fn execute(&self, cfg: &Config) -> Result<()> {
44        let mut command = match self {
45            Self::Start => {
46                let mut command = Command::new("systemctl");
47                command.args(&["start", &cfg.normal.ckb.service_name]);
48                command
49            }
50            Self::Stop => {
51                let mut command = Command::new("systemctl");
52                command.args(&["stop", &cfg.normal.ckb.service_name]);
53                command
54            }
55            Self::Restart => {
56                let mut command = Command::new("systemctl");
57                command.args(&["restart", &cfg.normal.ckb.service_name]);
58                command
59            }
60            Self::Status => {
61                let mut command = Command::new("systemctl");
62                command.args(&["status", &cfg.normal.ckb.service_name]);
63                command
64            }
65            Self::ResetData { peer_store } => {
66                let ckb_bin_path = cfg.normal.ckb.bin_path.to_str().expect("ckb.bin_path");
67                let ckb_root_dir = cfg.normal.ckb.root_dir.to_str().expect("ckb.root_dir");
68                let mut command = Command::new(ckb_bin_path);
69                command.args(&["reset-data", "--force", "-C", ckb_root_dir]);
70                if *peer_store {
71                    command.arg("--network-peer-store");
72                } else {
73                    command.arg("--all");
74                }
75                command
76            }
77        };
78        command.stdout(Stdio::inherit()).output().map_err(|err| {
79            let msg = format!("failed to execute `{:?}` since {}", command, err);
80            Error::Exec(msg)
81        })?;
82        Ok(())
83    }
84}
85
86impl CanExecute for L2Args {
87    fn execute(&self, _cfg: &Config) -> Result<()> {
88        Ok(())
89    }
90}
91
92impl CanExecute for BackupArgs {
93    fn execute(&self, cfg: &Config) -> Result<()> {
94        let ckb_data_dir = &cfg.normal.ckb.data_dir;
95        let tmp_dir = TempDir::new().map_err(|err| {
96            let msg = format!("failed to create tempdir since {}", err);
97            Error::Exec(msg)
98        })?;
99        let tgz_path = {
100            let timestamp = Utc::now().format("%Y%m%d-%H%M%S");
101            let target_name = format!("{}-{}.tar.gz", cfg.normal.host.name, timestamp);
102            tmp_dir.path().join(target_name)
103        };
104        let mut command = Command::new("tar");
105        command.current_dir(tmp_dir.path()).args(&[
106            "-czvf",
107            tgz_path.as_path().to_str().expect("tgz_path.to_str()"),
108        ]);
109        if self.peer_store {
110            let src_path = ckb_data_dir.join("network").join("peer_store");
111            let mut options = dir::CopyOptions::new();
112            options.copy_inside = true;
113            dir::copy(&src_path, &tmp_dir, &options).map_err(|err| {
114                let msg = format!(
115                    "failed to copy '{}' into '{}' since {}",
116                    src_path.display(),
117                    tmp_dir.path().display(),
118                    err
119                );
120                Error::Exec(msg)
121            })?;
122            command.arg("peer_store");
123        } else {
124            let logs_dir = ckb_data_dir.join("logs");
125            let dst_path = tmp_dir.path().join("ckb.log");
126            let mut write_file = OpenOptions::new()
127                .write(true)
128                .create_new(true)
129                .open(&dst_path)
130                .expect("open log file to write");
131            let re = Regex::new(LOG_TIMESTAMP_REGEX).expect("compile regex");
132            'read_logfile: for entry in WalkDir::new(logs_dir).into_iter().filter_map(|e| e.ok()) {
133                if entry.file_type().is_dir() {
134                    continue;
135                }
136                let is_log = entry
137                    .file_name()
138                    .to_str()
139                    .map(|s| s.ends_with(".log"))
140                    .unwrap_or(false);
141                if !is_log {
142                    continue;
143                }
144                log::trace!("read log file '{}'", entry.path().display());
145                let mut read_file = OpenOptions::new()
146                    .read(true)
147                    .open(entry.path())
148                    .expect("open log file to read");
149                let mut check_tail = false;
150                let mut skip_at_head = 0;
151                for line in BufReader::new(&read_file).lines() {
152                    let line_str = line.expect("read line");
153                    if let Some(mat) = re.find(&line_str) {
154                        let line_ts_str = &line_str.split_at(mat.end() - 1).0;
155                        let line_ts = DateTime::parse_from_str(line_ts_str, LOG_TIMESTAMP_FORMAT)
156                            .unwrap_or_else(|err| panic!("regex should be right but {}", err));
157                        if line_ts > self.logs_around.1 {
158                            log::trace!(
159                                "skip log file '{}' since after time scope",
160                                entry.path().display(),
161                            );
162                            continue 'read_logfile;
163                        } else if line_ts < self.logs_around.0 {
164                            check_tail = true;
165                        }
166                        break;
167                    }
168                    skip_at_head += 1
169                }
170                if check_tail {
171                    read_file.seek(SeekFrom::Start(0)).expect("seek to start");
172                    let lines_count = BufReader::new(&read_file).lines().count();
173                    read_file.seek(SeekFrom::Start(0)).expect("seek to start");
174                    let last_lines: Vec<String> = if lines_count > LOG_MIN_TAIL_CHECK {
175                        BufReader::new(&read_file)
176                            .lines()
177                            .skip(lines_count - LOG_MIN_TAIL_CHECK)
178                            .map(|line| line.expect("read line"))
179                            .collect()
180                    } else {
181                        BufReader::new(&read_file)
182                            .lines()
183                            .map(|line| line.expect("read line"))
184                            .collect()
185                    };
186                    for line_str in last_lines.into_iter().rev() {
187                        if let Some(mat) = re.find(&line_str) {
188                            let line_ts_str = &line_str.split_at(mat.end() - 1).0;
189                            let line_ts =
190                                DateTime::parse_from_str(line_ts_str, LOG_TIMESTAMP_FORMAT)
191                                    .unwrap_or_else(|err| {
192                                        panic!("regex should be right but {}", err)
193                                    });
194                            if line_ts < self.logs_around.0 {
195                                log::trace!(
196                                    "skip log file '{}' since before time scope",
197                                    entry.path().display()
198                                );
199                                continue 'read_logfile;
200                            }
201                            break;
202                        }
203                    }
204                }
205                read_file.seek(SeekFrom::Start(0)).expect("seek to start");
206                for line in BufReader::new(&read_file).lines().skip(skip_at_head) {
207                    let line_str = line.expect("read line");
208                    if let Some(mat) = re.find(&line_str) {
209                        let line_ts_str = &line_str.split_at(mat.end() - 1).0;
210                        let line_ts = DateTime::parse_from_str(line_ts_str, LOG_TIMESTAMP_FORMAT)
211                            .unwrap_or_else(|err| panic!("regex should be right but {}", err));
212                        if line_ts < self.logs_around.0 {
213                            continue;
214                        } else if line_ts > self.logs_around.1 {
215                            break;
216                        }
217                    }
218                    write_file
219                        .write_all(line_str.as_bytes())
220                        .expect("write into log file");
221                    write_file
222                        .write_all("\n".as_bytes())
223                        .expect("write into log file");
224                }
225            }
226            command.arg("ckb.log");
227        }
228        command.stdout(Stdio::inherit()).output().map_err(|err| {
229            let msg = format!("failed to execute `{:?}` since {}", command, err);
230            Error::Exec(msg)
231        })?;
232        let url = qiniu::upload(&cfg.secret.qiniu, tgz_path.as_path())?;
233        println!("Upload {} to {}", tgz_path.as_path().display(), url);
234        drop(tmp_dir);
235        Ok(())
236    }
237}
238
239impl CanExecute for RpcArgs {
240    fn execute(&self, cfg: &Config) -> Result<()> {
241        let cli = RpcClient::new(&cfg.normal.ckb.rpc_url)?;
242        match self {
243            Self::GetPeers { stats } => {
244                let peers = cli.get_peers()?;
245                let output = if *stats {
246                    let (inbound_peers_count, outbound_peers_count) =
247                        peers.iter().fold((0, 0), |acc, peer| {
248                            if peer.is_outbound {
249                                (acc.0, acc.1 + 1)
250                            } else {
251                                (acc.0 + 1, acc.1)
252                            }
253                        });
254                    let output = json!({
255                      "inbound_peers_count": inbound_peers_count,
256                      "outbound_peers_count": outbound_peers_count
257                    });
258                    to_string_pretty(&output)
259                } else {
260                    to_string_pretty(&peers)
261                }
262                .expect("serde_json::to_string(..)");
263                println!("{}", output)
264            }
265        }
266        Ok(())
267    }
268}