1use crate::RuleEngine;
2use anyhow::{Context, Result};
3use arrow::datatypes::Schema;
4use arrow_json::ReaderBuilder;
5use rustyline::error::ReadlineError;
6use rustyline::DefaultEditor;
7use serde_json::Value;
8use std::io::Cursor;
9use std::sync::Arc;
10use tokio::sync::RwLock;
11
12pub struct Repl {
13 engine: Arc<RwLock<RuleEngine>>,
14 schema: Arc<Schema>,
15}
16
17impl Repl {
18 pub fn new(engine: Arc<RwLock<RuleEngine>>, schema: Arc<Schema>) -> Self {
19 Self { engine, schema }
20 }
21
22 pub async fn run(&mut self) -> Result<()> {
23 let mut rl = DefaultEditor::new()?;
24 let _ = rl.load_history(".fuserule_history");
25
26 println!("🔥 FuseRule Interactive REPL");
27 println!("Commands:");
28 println!(" ingest <json> - Ingest JSON data");
29 println!(" state - Show all rule states");
30 println!(" state <id> - Show state for specific rule");
31 println!(" rules - List all rules");
32 println!(" eval <pred> - Evaluate predicate on last batch");
33 println!(" help - Show this help");
34 println!(" quit/exit - Exit REPL");
35 println!();
36
37 loop {
38 let readline = rl.readline("fuserule> ");
39 match readline {
40 Ok(line) => {
41 let _ = rl.add_history_entry(line.as_str());
42 let trimmed = line.trim();
43
44 if trimmed.is_empty() {
45 continue;
46 }
47
48 let parts: Vec<&str> = trimmed.splitn(2, ' ').collect();
49 let command = parts[0];
50 let args = parts.get(1).unwrap_or(&"");
51
52 match command {
53 "ingest" | "i" => {
54 if let Err(e) = self.handle_ingest(args).await {
55 eprintln!("❌ Error: {}", e);
56 }
57 }
58 "state" | "s" => {
59 if let Err(e) = self.handle_state(args).await {
60 eprintln!("❌ Error: {}", e);
61 }
62 }
63 "rules" | "r" => {
64 if let Err(e) = self.handle_rules().await {
65 eprintln!("❌ Error: {}", e);
66 }
67 }
68 "eval" | "e" => {
69 if let Err(e) = self.handle_eval(args).await {
70 eprintln!("❌ Error: {}", e);
71 }
72 }
73 "help" | "h" | "?" => {
74 self.show_help();
75 }
76 "quit" | "exit" | "q" => {
77 println!("👋 Goodbye!");
78 break;
79 }
80 _ => {
81 eprintln!("❌ Unknown command: {}. Type 'help' for commands.", command);
82 }
83 }
84 }
85 Err(ReadlineError::Interrupted) => {
86 println!("CTRL-C");
87 break;
88 }
89 Err(ReadlineError::Eof) => {
90 println!("CTRL-D");
91 break;
92 }
93 Err(err) => {
94 eprintln!("Error: {:?}", err);
95 break;
96 }
97 }
98 }
99
100 let _ = rl.save_history(".fuserule_history");
101 Ok(())
102 }
103
104 async fn handle_ingest(&self, json_str: &str) -> Result<()> {
105 if json_str.is_empty() {
106 eprintln!("Usage: ingest <json_data>");
107 return Ok(());
108 }
109
110 let json_value: Value = serde_json::from_str(json_str).context("Failed to parse JSON")?;
112
113 let json_data = serde_json::to_vec(&json_value)?;
115 let cursor = Cursor::new(json_data);
116
117 let reader = ReaderBuilder::new(self.schema.clone())
118 .build(cursor)
119 .context("Failed to create JSON reader")?;
120
121 let mut engine = self.engine.write().await;
122 let mut batch_count = 0;
123
124 for batch_result in reader {
125 match batch_result {
126 Ok(batch) => {
127 batch_count += 1;
128 let traces = engine.process_batch(&batch).await?;
129
130 println!(
131 "✅ Ingested batch #{} ({} rows)",
132 batch_count,
133 batch.num_rows()
134 );
135
136 for trace in traces {
137 if trace.action_fired {
138 println!(
139 " 🔔 Rule '{}' ({}) {} -> {}",
140 trace.rule_name,
141 trace.rule_id,
142 match trace.transition.as_str() {
143 "Activated" => "⚡ ACTIVATED",
144 "Deactivated" => "🔻 DEACTIVATED",
145 _ => "",
146 },
147 if matches!(trace.result, crate::state::PredicateResult::True) {
148 "TRUE"
149 } else {
150 "FALSE"
151 }
152 );
153 }
154 }
155 }
156 Err(e) => {
157 eprintln!("❌ Failed to read batch: {}", e);
158 }
159 }
160 }
161
162 Ok(())
163 }
164
165 async fn handle_state(&self, rule_id: &str) -> Result<()> {
166 let engine = self.engine.read().await;
167
168 if rule_id.is_empty() {
169 println!("📊 Rule States:");
171 for rule in &engine.rules {
172 let last_result = engine.state.get_last_result(rule.rule.id.as_str()).await?;
173 let window_size = engine
174 .window_buffers
175 .get(rule.rule.id.as_str())
176 .map(|b| {
177 b.get_batches()
178 .iter()
179 .map(|batch| batch.num_rows())
180 .sum::<usize>()
181 })
182 .unwrap_or(0);
183
184 println!(
185 " {} {} [{}] - Window: {} rows",
186 if matches!(last_result, crate::state::PredicateResult::True) {
187 "🟢"
188 } else {
189 "⚪"
190 },
191 rule.rule.name,
192 rule.rule.id,
193 window_size
194 );
195 }
196 } else {
197 let rule = engine.rules.iter().find(|r| r.rule.id == rule_id);
199 if let Some(rule) = rule {
200 let last_result = engine.state.get_last_result(rule_id).await?;
201 let last_transition = engine.state.get_last_transition_time(rule_id).await?;
202 let window_size = engine
203 .window_buffers
204 .get(rule_id)
205 .map(|b| {
206 b.get_batches()
207 .iter()
208 .map(|batch| batch.num_rows())
209 .sum::<usize>()
210 })
211 .unwrap_or(0);
212
213 println!("📊 Rule State: {}", rule.rule.name);
214 println!(" ID: {}", rule.rule.id);
215 println!(" Predicate: {}", rule.rule.predicate);
216 println!(" Current State: {:?}", last_result);
217 println!(" Window Size: {} rows", window_size);
218 println!(" Enabled: {}", rule.rule.enabled);
219 if let Some(ts) = last_transition {
220 println!(" Last Transition: {}", ts);
221 }
222 } else {
223 eprintln!("❌ Rule not found: {}", rule_id);
224 }
225 }
226
227 Ok(())
228 }
229
230 async fn handle_rules(&self) -> Result<()> {
231 let engine = self.engine.read().await;
232
233 println!("📋 Rules:");
234 for (i, rule) in engine.rules.iter().enumerate() {
235 println!(
236 " {}. {} ({}) - {} [{}]",
237 i + 1,
238 rule.rule.name,
239 rule.rule.id,
240 if rule.rule.enabled { "✅" } else { "❌" },
241 rule.rule.predicate
242 );
243 }
244
245 Ok(())
246 }
247
248 async fn handle_eval(&self, predicate: &str) -> Result<()> {
249 if predicate.is_empty() {
250 eprintln!("Usage: eval <predicate>");
251 return Ok(());
252 }
253
254 eprintln!("⚠️ Eval command requires last batch - use 'ingest' first");
255 eprintln!(" This feature will be enhanced in the debugger mode");
256 Ok(())
257 }
258
259 fn show_help(&self) {
260 println!("Commands:");
261 println!(" ingest <json> - Ingest JSON data");
262 println!(" state - Show all rule states");
263 println!(" state <id> - Show state for specific rule");
264 println!(" rules - List all rules");
265 println!(" eval <pred> - Evaluate predicate on last batch");
266 println!(" help - Show this help");
267 println!(" quit/exit - Exit REPL");
268 }
269}