raft_engine_ctl/
lib.rs

1// Copyright (c) 2017-present, PingCAP, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14//! # Raft Engine Control
15
16use std::path::Path;
17use std::sync::Arc;
18
19use clap::{crate_authors, crate_version, Parser};
20use raft_engine::env::{DefaultFileSystem, FileSystem};
21use raft_engine::internals::LogQueue;
22use raft_engine::{Engine, Error, Result as EngineResult};
23
24#[derive(Debug, clap::Parser)]
25#[clap(
26    name = "ctl",
27    author = crate_authors!(),
28    version = crate_version!(),
29    dont_collapse_args_in_usage = true,
30)]
31pub struct ControlOpt {
32    // sub command type
33    #[clap(subcommand)]
34    cmd: Option<Cmd>,
35}
36
37#[derive(Debug, Parser)]
38enum Cmd {
39    /// Dump log entries in data file(s).
40    Dump {
41        /// Path of Raft Engine directory or specific log file.
42        #[clap(short, long)]
43        path: String,
44
45        #[clap(short, long, use_value_delimiter = true)]
46        raft_groups: Vec<u64>,
47    },
48
49    /// Check data files for logical errors.
50    Check {
51        /// Path of Raft Engine directory.
52        #[clap(short, long)]
53        path: String,
54    },
55
56    /// Run Rhai script to repair data files.
57    Repair {
58        /// Path of Raft Engine directory.
59        #[clap(short, long)]
60        path: String,
61
62        #[clap(
63            short,
64            long,
65            possible_values = &["append", "rewrite", "all"]
66        )]
67        queue: String,
68
69        /// Path of Rhai script file.
70        #[clap(short, long)]
71        script: String,
72    },
73
74    /// Try running `purge_expired_files` on existing data directory.
75    TryPurge {
76        /// Path of Raft Engine directory.
77        #[clap(short, long)]
78        path: String,
79    },
80}
81
82fn convert_queue(queue: &str) -> Option<LogQueue> {
83    match queue {
84        "append" => Some(LogQueue::Append),
85        "rewrite" => Some(LogQueue::Rewrite),
86        "all" => None,
87        _ => unreachable!(),
88    }
89}
90
91impl ControlOpt {
92    pub fn validate_and_execute(self) -> EngineResult<()> {
93        self.validate_and_execute_with_file_system(Arc::new(DefaultFileSystem))
94    }
95
96    pub fn validate_and_execute_with_file_system<F: FileSystem>(
97        mut self,
98        fs: Arc<F>,
99    ) -> EngineResult<()> {
100        if self.cmd.is_none() {
101            return Err(Error::InvalidArgument("subcommand is needed".to_owned()));
102        }
103
104        match self.cmd.take().unwrap() {
105            Cmd::Dump { path, raft_groups } => {
106                let it = Engine::dump_with_file_system(Path::new(&path), fs)?;
107                for item in it {
108                    if let Ok(v) = item {
109                        if raft_groups.is_empty() || raft_groups.contains(&v.raft_group_id) {
110                            println!("{v:?}")
111                        }
112                    } else {
113                        // output error message
114                        println!("{item:?}")
115                    }
116                }
117            }
118            Cmd::Repair {
119                path,
120                queue,
121                script,
122            } => {
123                let script = std::fs::read_to_string(script)?;
124                Engine::unsafe_repair_with_file_system(
125                    Path::new(&path),
126                    convert_queue(&queue),
127                    script,
128                    fs,
129                )?;
130            }
131            Cmd::Check { path } => {
132                let r = Engine::consistency_check_with_file_system(Path::new(&path), fs)?;
133                if r.is_empty() {
134                    println!("All data is Ok")
135                } else {
136                    println!("Corrupted info are as follows:\nraft_group_id, last_intact_index\n");
137                    r.iter().for_each(|(x, y)| println!("{x:?}, {y:?}"))
138                }
139            }
140            Cmd::TryPurge { path } => {
141                let e = Engine::open_with_file_system(
142                    raft_engine::Config {
143                        dir: path,
144                        ..Default::default()
145                    },
146                    fs,
147                )?;
148                println!(
149                    "purge_expired_files() returns {:?}",
150                    e.purge_expired_files()?
151                );
152            }
153        }
154        Ok(())
155    }
156}
157
158pub fn run_command<F: FileSystem>(mut args: Vec<String>, fs: Arc<F>) {
159    args.insert(0, "ctl".to_owned());
160    let opts = ControlOpt::parse_from(args);
161    if let Err(e) = opts.validate_and_execute_with_file_system(fs) {
162        println!("{e:?}");
163    }
164}