1use std::path::Path;
17use std::sync::Arc;
18
19use clap::{crate_authors, crate_version, Parser};
20use raft_engine::env::{DefaultFileSystem, FileSystem};
21use raft_engine::internals::LogQueue;
22use raft_engine::{Engine, Error, Result as EngineResult};
23
24#[derive(Debug, clap::Parser)]
25#[clap(
26 name = "ctl",
27 author = crate_authors!(),
28 version = crate_version!(),
29 dont_collapse_args_in_usage = true,
30)]
31pub struct ControlOpt {
32 #[clap(subcommand)]
34 cmd: Option<Cmd>,
35}
36
37#[derive(Debug, Parser)]
38enum Cmd {
39 Dump {
41 #[clap(short, long)]
43 path: String,
44
45 #[clap(short, long, use_value_delimiter = true)]
46 raft_groups: Vec<u64>,
47 },
48
49 Check {
51 #[clap(short, long)]
53 path: String,
54 },
55
56 Repair {
58 #[clap(short, long)]
60 path: String,
61
62 #[clap(
63 short,
64 long,
65 possible_values = &["append", "rewrite", "all"]
66 )]
67 queue: String,
68
69 #[clap(short, long)]
71 script: String,
72 },
73
74 TryPurge {
76 #[clap(short, long)]
78 path: String,
79 },
80}
81
82fn convert_queue(queue: &str) -> Option<LogQueue> {
83 match queue {
84 "append" => Some(LogQueue::Append),
85 "rewrite" => Some(LogQueue::Rewrite),
86 "all" => None,
87 _ => unreachable!(),
88 }
89}
90
91impl ControlOpt {
92 pub fn validate_and_execute(self) -> EngineResult<()> {
93 self.validate_and_execute_with_file_system(Arc::new(DefaultFileSystem))
94 }
95
96 pub fn validate_and_execute_with_file_system<F: FileSystem>(
97 mut self,
98 fs: Arc<F>,
99 ) -> EngineResult<()> {
100 if self.cmd.is_none() {
101 return Err(Error::InvalidArgument("subcommand is needed".to_owned()));
102 }
103
104 match self.cmd.take().unwrap() {
105 Cmd::Dump { path, raft_groups } => {
106 let it = Engine::dump_with_file_system(Path::new(&path), fs)?;
107 for item in it {
108 if let Ok(v) = item {
109 if raft_groups.is_empty() || raft_groups.contains(&v.raft_group_id) {
110 println!("{v:?}")
111 }
112 } else {
113 println!("{item:?}")
115 }
116 }
117 }
118 Cmd::Repair {
119 path,
120 queue,
121 script,
122 } => {
123 let script = std::fs::read_to_string(script)?;
124 Engine::unsafe_repair_with_file_system(
125 Path::new(&path),
126 convert_queue(&queue),
127 script,
128 fs,
129 )?;
130 }
131 Cmd::Check { path } => {
132 let r = Engine::consistency_check_with_file_system(Path::new(&path), fs)?;
133 if r.is_empty() {
134 println!("All data is Ok")
135 } else {
136 println!("Corrupted info are as follows:\nraft_group_id, last_intact_index\n");
137 r.iter().for_each(|(x, y)| println!("{x:?}, {y:?}"))
138 }
139 }
140 Cmd::TryPurge { path } => {
141 let e = Engine::open_with_file_system(
142 raft_engine::Config {
143 dir: path,
144 ..Default::default()
145 },
146 fs,
147 )?;
148 println!(
149 "purge_expired_files() returns {:?}",
150 e.purge_expired_files()?
151 );
152 }
153 }
154 Ok(())
155 }
156}
157
158pub fn run_command<F: FileSystem>(mut args: Vec<String>, fs: Arc<F>) {
159 args.insert(0, "ctl".to_owned());
160 let opts = ControlOpt::parse_from(args);
161 if let Err(e) = opts.validate_and_execute_with_file_system(fs) {
162 println!("{e:?}");
163 }
164}