1use anyhow::Result;
7use colored::Colorize;
8use rustyline::error::ReadlineError;
9use rustyline::history::DefaultHistory;
10use rustyline::{Editor, Result as RustylineResult};
11
12use crate::config::Config;
13
14pub struct InteractiveSession {
16 editor: Editor<(), DefaultHistory>,
18 pub broker_url: String,
20 pub queue_name: String,
22}
23
24impl InteractiveSession {
25 pub fn new(config: Config) -> RustylineResult<Self> {
27 let mut editor = Editor::<(), DefaultHistory>::new()?;
28
29 let history_path = dirs::home_dir().map(|mut p| {
31 p.push(".celers_history");
32 p
33 });
34
35 if let Some(ref path) = history_path {
36 let _ = editor.load_history(path);
37 }
38
39 let broker_url = config.broker.url;
40 let queue_name = config.broker.queue;
41
42 Ok(Self {
43 editor,
44 broker_url,
45 queue_name,
46 })
47 }
48
49 fn get_prompt(&self) -> String {
51 format!(
52 "{}@{} {} ",
53 "celers".cyan().bold(),
54 self.queue_name.yellow(),
55 "❯".green().bold()
56 )
57 }
58
59 pub async fn run(&mut self) -> Result<()> {
61 println!("{}", "CeleRS Interactive Mode".green().bold());
62 println!(
63 "Type {} for help, {} to exit\n",
64 "help".cyan(),
65 "exit".cyan()
66 );
67 println!("Current broker: {}", self.broker_url.yellow());
68 println!("Current queue: {}\n", self.queue_name.yellow());
69
70 loop {
71 let prompt = self.get_prompt();
72
73 match self.editor.readline(&prompt) {
74 Ok(line) => {
75 let line = line.trim();
76
77 if line.is_empty() {
79 continue;
80 }
81
82 let _ = self.editor.add_history_entry(line);
84
85 if matches!(line, "exit" | "quit" | "q") {
87 println!("{}", "Goodbye!".green());
88 break;
89 }
90
91 if let Err(e) = self.process_command(line).await {
93 eprintln!("{} {}", "Error:".red().bold(), e);
94 }
95 }
96 Err(ReadlineError::Interrupted) => {
97 println!("{}", "^C".yellow());
98 continue;
99 }
100 Err(ReadlineError::Eof) => {
101 println!("{}", "Goodbye!".green());
102 break;
103 }
104 Err(err) => {
105 eprintln!("{} {}", "Error:".red().bold(), err);
106 break;
107 }
108 }
109 }
110
111 if let Some(mut path) = dirs::home_dir() {
113 path.push(".celers_history");
114 let _ = self.editor.save_history(&path);
115 }
116
117 Ok(())
118 }
119
120 async fn process_command(&mut self, line: &str) -> Result<()> {
122 let parts: Vec<&str> = line.split_whitespace().collect();
123
124 if parts.is_empty() {
125 return Ok(());
126 }
127
128 match parts[0] {
129 "help" | "?" => {
130 self.print_help();
131 }
132 "status" | "st" => {
133 crate::commands::show_status(&self.broker_url, &self.queue_name).await?;
134 }
135 "queues" | "ls" => {
136 crate::commands::list_queues(&self.broker_url).await?;
137 }
138 "workers" | "w" => {
139 crate::commands::list_workers(&self.broker_url).await?;
140 }
141 "health" | "h" => {
142 crate::commands::health_check(&self.broker_url, &self.queue_name).await?;
143 }
144 "doctor" | "d" => {
145 crate::commands::doctor(&self.broker_url, &self.queue_name).await?;
146 }
147 "metrics" | "m" => {
148 crate::commands::show_metrics("text", None, None, None).await?;
149 }
150 "dlq" => {
151 if parts.len() < 2 {
152 println!("{} dlq <inspect|clear>", "Usage:".yellow());
153 return Ok(());
154 }
155 match parts[1] {
156 "inspect" | "i" => {
157 let limit = if parts.len() > 2 {
158 parts[2].parse().unwrap_or(10)
159 } else {
160 10
161 };
162 crate::commands::inspect_dlq(&self.broker_url, &self.queue_name, limit)
163 .await?;
164 }
165 "clear" | "c" => {
166 println!(
167 "{}",
168 "This will delete all DLQ tasks. Are you sure? (yes/no)".yellow()
169 );
170 let confirm_prompt = format!("{} ", "❯".green());
171 if let Ok(response) = self.editor.readline(&confirm_prompt) {
172 if response.trim() == "yes" {
173 crate::commands::clear_dlq(
174 &self.broker_url,
175 &self.queue_name,
176 true,
177 )
178 .await?;
179 } else {
180 println!("{}", "Cancelled".yellow());
181 }
182 }
183 }
184 _ => println!("{} dlq <inspect|clear>", "Usage:".yellow()),
185 }
186 }
187 "use" => {
188 if parts.len() < 2 {
189 println!("{} use <queue_name>", "Usage:".yellow());
190 return Ok(());
191 }
192 self.queue_name = parts[1].to_string();
193 println!(
194 "{} {}",
195 "Switched to queue:".green(),
196 self.queue_name.yellow()
197 );
198 }
199 "broker" => {
200 if parts.len() < 2 {
201 println!("{} Current: {}", "Broker:".cyan(), self.broker_url.yellow());
202 return Ok(());
203 }
204 self.broker_url = parts[1].to_string();
205 println!(
206 "{} {}",
207 "Switched to broker:".green(),
208 self.broker_url.yellow()
209 );
210 }
211 "clear" | "cls" => {
212 print!("\x1B[2J\x1B[1;1H");
213 }
214 _ => {
215 println!("{} Unknown command: {}", "Error:".red().bold(), parts[0]);
216 println!("Type {} for available commands", "help".cyan());
217 }
218 }
219
220 Ok(())
221 }
222
223 fn print_help(&self) {
225 println!("\n{}", "Available Commands:".green().bold());
226 println!();
227
228 let commands = vec![
229 ("status, st", "Show queue status"),
230 ("queues, ls", "List all queues"),
231 ("workers, w", "List all workers"),
232 ("health, h", "Run health diagnostics"),
233 ("doctor, d", "Automatic problem detection"),
234 ("metrics, m", "Display metrics"),
235 ("dlq inspect [limit]", "Inspect DLQ tasks"),
236 ("dlq clear", "Clear all DLQ tasks"),
237 ("use <queue>", "Switch to different queue"),
238 ("broker [url]", "Show/set broker URL"),
239 ("clear, cls", "Clear screen"),
240 ("help, ?", "Show this help"),
241 ("exit, quit, q", "Exit interactive mode"),
242 ];
243
244 for (cmd, desc) in commands {
245 println!(" {:<25} {}", cmd.cyan(), desc);
246 }
247 println!();
248 }
249}
250
251pub async fn start_interactive(config: Config) -> Result<()> {
266 let mut session = InteractiveSession::new(config)?;
267 session.run().await
268}