1use std::{
2 fs::OpenOptions,
3 io::{prelude::*, BufReader, SeekFrom},
4 process::{Command, Stdio},
5};
6
7use chrono::{DateTime, Utc};
8use fs_extra::dir;
9use regex::Regex;
10use serde_json::{json, to_string_pretty};
11use tempfile::TempDir;
12use walkdir::WalkDir;
13
14use crate::{
15 argument::{Args, BackupArgs, L1Args, L2Args, RpcArgs},
16 config::Config,
17 error::{Error, Result},
18 qiniu,
19 rpc_client::RpcClient,
20};
21
22const LOG_TIMESTAMP_REGEX: &str =
23 r"^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}([.]\d{1,3}) [+-]\d{2}:\d{2} ";
24const LOG_TIMESTAMP_FORMAT: &str = "%Y-%m-%d %H:%M:%S.%f %:z";
25const LOG_MIN_TAIL_CHECK: usize = 200;
26
27pub trait CanExecute {
28 fn execute(&self, cfg: &Config) -> Result<()>;
29}
30
31impl CanExecute for Args {
32 fn execute(&self, cfg: &Config) -> Result<()> {
33 match self {
34 Self::L1(inner) => inner.execute(cfg),
35 Self::L2(inner) => inner.execute(cfg),
36 Self::Backup(inner) => inner.execute(cfg),
37 Self::Rpc(inner) => inner.execute(cfg),
38 }
39 }
40}
41
42impl CanExecute for L1Args {
43 fn execute(&self, cfg: &Config) -> Result<()> {
44 let mut command = match self {
45 Self::Start => {
46 let mut command = Command::new("systemctl");
47 command.args(&["start", &cfg.normal.ckb.service_name]);
48 command
49 }
50 Self::Stop => {
51 let mut command = Command::new("systemctl");
52 command.args(&["stop", &cfg.normal.ckb.service_name]);
53 command
54 }
55 Self::Restart => {
56 let mut command = Command::new("systemctl");
57 command.args(&["restart", &cfg.normal.ckb.service_name]);
58 command
59 }
60 Self::Status => {
61 let mut command = Command::new("systemctl");
62 command.args(&["status", &cfg.normal.ckb.service_name]);
63 command
64 }
65 Self::ResetData { peer_store } => {
66 let ckb_bin_path = cfg.normal.ckb.bin_path.to_str().expect("ckb.bin_path");
67 let ckb_root_dir = cfg.normal.ckb.root_dir.to_str().expect("ckb.root_dir");
68 let mut command = Command::new(ckb_bin_path);
69 command.args(&["reset-data", "--force", "-C", ckb_root_dir]);
70 if *peer_store {
71 command.arg("--network-peer-store");
72 } else {
73 command.arg("--all");
74 }
75 command
76 }
77 };
78 command.stdout(Stdio::inherit()).output().map_err(|err| {
79 let msg = format!("failed to execute `{:?}` since {}", command, err);
80 Error::Exec(msg)
81 })?;
82 Ok(())
83 }
84}
85
86impl CanExecute for L2Args {
87 fn execute(&self, _cfg: &Config) -> Result<()> {
88 Ok(())
89 }
90}
91
92impl CanExecute for BackupArgs {
93 fn execute(&self, cfg: &Config) -> Result<()> {
94 let ckb_data_dir = &cfg.normal.ckb.data_dir;
95 let tmp_dir = TempDir::new().map_err(|err| {
96 let msg = format!("failed to create tempdir since {}", err);
97 Error::Exec(msg)
98 })?;
99 let tgz_path = {
100 let timestamp = Utc::now().format("%Y%m%d-%H%M%S");
101 let target_name = format!("{}-{}.tar.gz", cfg.normal.host.name, timestamp);
102 tmp_dir.path().join(target_name)
103 };
104 let mut command = Command::new("tar");
105 command.current_dir(tmp_dir.path()).args(&[
106 "-czvf",
107 tgz_path.as_path().to_str().expect("tgz_path.to_str()"),
108 ]);
109 if self.peer_store {
110 let src_path = ckb_data_dir.join("network").join("peer_store");
111 let mut options = dir::CopyOptions::new();
112 options.copy_inside = true;
113 dir::copy(&src_path, &tmp_dir, &options).map_err(|err| {
114 let msg = format!(
115 "failed to copy '{}' into '{}' since {}",
116 src_path.display(),
117 tmp_dir.path().display(),
118 err
119 );
120 Error::Exec(msg)
121 })?;
122 command.arg("peer_store");
123 } else {
124 let logs_dir = ckb_data_dir.join("logs");
125 let dst_path = tmp_dir.path().join("ckb.log");
126 let mut write_file = OpenOptions::new()
127 .write(true)
128 .create_new(true)
129 .open(&dst_path)
130 .expect("open log file to write");
131 let re = Regex::new(LOG_TIMESTAMP_REGEX).expect("compile regex");
132 'read_logfile: for entry in WalkDir::new(logs_dir).into_iter().filter_map(|e| e.ok()) {
133 if entry.file_type().is_dir() {
134 continue;
135 }
136 let is_log = entry
137 .file_name()
138 .to_str()
139 .map(|s| s.ends_with(".log"))
140 .unwrap_or(false);
141 if !is_log {
142 continue;
143 }
144 log::trace!("read log file '{}'", entry.path().display());
145 let mut read_file = OpenOptions::new()
146 .read(true)
147 .open(entry.path())
148 .expect("open log file to read");
149 let mut check_tail = false;
150 let mut skip_at_head = 0;
151 for line in BufReader::new(&read_file).lines() {
152 let line_str = line.expect("read line");
153 if let Some(mat) = re.find(&line_str) {
154 let line_ts_str = &line_str.split_at(mat.end() - 1).0;
155 let line_ts = DateTime::parse_from_str(line_ts_str, LOG_TIMESTAMP_FORMAT)
156 .unwrap_or_else(|err| panic!("regex should be right but {}", err));
157 if line_ts > self.logs_around.1 {
158 log::trace!(
159 "skip log file '{}' since after time scope",
160 entry.path().display(),
161 );
162 continue 'read_logfile;
163 } else if line_ts < self.logs_around.0 {
164 check_tail = true;
165 }
166 break;
167 }
168 skip_at_head += 1
169 }
170 if check_tail {
171 read_file.seek(SeekFrom::Start(0)).expect("seek to start");
172 let lines_count = BufReader::new(&read_file).lines().count();
173 read_file.seek(SeekFrom::Start(0)).expect("seek to start");
174 let last_lines: Vec<String> = if lines_count > LOG_MIN_TAIL_CHECK {
175 BufReader::new(&read_file)
176 .lines()
177 .skip(lines_count - LOG_MIN_TAIL_CHECK)
178 .map(|line| line.expect("read line"))
179 .collect()
180 } else {
181 BufReader::new(&read_file)
182 .lines()
183 .map(|line| line.expect("read line"))
184 .collect()
185 };
186 for line_str in last_lines.into_iter().rev() {
187 if let Some(mat) = re.find(&line_str) {
188 let line_ts_str = &line_str.split_at(mat.end() - 1).0;
189 let line_ts =
190 DateTime::parse_from_str(line_ts_str, LOG_TIMESTAMP_FORMAT)
191 .unwrap_or_else(|err| {
192 panic!("regex should be right but {}", err)
193 });
194 if line_ts < self.logs_around.0 {
195 log::trace!(
196 "skip log file '{}' since before time scope",
197 entry.path().display()
198 );
199 continue 'read_logfile;
200 }
201 break;
202 }
203 }
204 }
205 read_file.seek(SeekFrom::Start(0)).expect("seek to start");
206 for line in BufReader::new(&read_file).lines().skip(skip_at_head) {
207 let line_str = line.expect("read line");
208 if let Some(mat) = re.find(&line_str) {
209 let line_ts_str = &line_str.split_at(mat.end() - 1).0;
210 let line_ts = DateTime::parse_from_str(line_ts_str, LOG_TIMESTAMP_FORMAT)
211 .unwrap_or_else(|err| panic!("regex should be right but {}", err));
212 if line_ts < self.logs_around.0 {
213 continue;
214 } else if line_ts > self.logs_around.1 {
215 break;
216 }
217 }
218 write_file
219 .write_all(line_str.as_bytes())
220 .expect("write into log file");
221 write_file
222 .write_all("\n".as_bytes())
223 .expect("write into log file");
224 }
225 }
226 command.arg("ckb.log");
227 }
228 command.stdout(Stdio::inherit()).output().map_err(|err| {
229 let msg = format!("failed to execute `{:?}` since {}", command, err);
230 Error::Exec(msg)
231 })?;
232 let url = qiniu::upload(&cfg.secret.qiniu, tgz_path.as_path())?;
233 println!("Upload {} to {}", tgz_path.as_path().display(), url);
234 drop(tmp_dir);
235 Ok(())
236 }
237}
238
239impl CanExecute for RpcArgs {
240 fn execute(&self, cfg: &Config) -> Result<()> {
241 let cli = RpcClient::new(&cfg.normal.ckb.rpc_url)?;
242 match self {
243 Self::GetPeers { stats } => {
244 let peers = cli.get_peers()?;
245 let output = if *stats {
246 let (inbound_peers_count, outbound_peers_count) =
247 peers.iter().fold((0, 0), |acc, peer| {
248 if peer.is_outbound {
249 (acc.0, acc.1 + 1)
250 } else {
251 (acc.0 + 1, acc.1)
252 }
253 });
254 let output = json!({
255 "inbound_peers_count": inbound_peers_count,
256 "outbound_peers_count": outbound_peers_count
257 });
258 to_string_pretty(&output)
259 } else {
260 to_string_pretty(&peers)
261 }
262 .expect("serde_json::to_string(..)");
263 println!("{}", output)
264 }
265 }
266 Ok(())
267 }
268}