1use crate::config::FuseRuleConfig;
2use crate::evaluator::{DataFusionEvaluator, RuleEvaluator};
3use crate::rule::Rule;
4use crate::state::PredicateResult;
5use anyhow::{Context, Result};
6use arrow::datatypes::{DataType, Field, Schema};
7use arrow::record_batch::RecordBatch;
8use rustyline::error::ReadlineError;
9use rustyline::DefaultEditor;
10use std::sync::Arc;
11
12pub struct RuleDebugger {
13 evaluator: DataFusionEvaluator,
14 schema: Arc<Schema>,
15 last_batch: Option<RecordBatch>,
16 breakpoints: std::collections::HashSet<String>,
17}
18
19impl RuleDebugger {
20 pub fn new(schema: Arc<Schema>) -> Self {
21 Self {
22 evaluator: DataFusionEvaluator::new(),
23 schema,
24 last_batch: None,
25 breakpoints: std::collections::HashSet::new(),
26 }
27 }
28
29 pub async fn run(&mut self) -> Result<()> {
30 let mut rl = DefaultEditor::new()?;
31 let _ = rl.load_history(".fuserule_debug_history");
32
33 println!("π FuseRule Rule Debugger");
34 println!("Commands:");
35 println!(" load <config> - Load rules from config file");
36 println!(" batch <json> - Load test batch from JSON");
37 println!(" step <rule_id> - Step through rule evaluation");
38 println!(" break <id> - Set breakpoint on rule");
39 println!(" unbreak <id> - Remove breakpoint");
40 println!(" list - List all rules");
41 println!(" breakpoints - Show breakpoints");
42 println!(" run - Run evaluation on current batch");
43 println!(" help - Show this help");
44 println!(" quit/exit - Exit debugger");
45 println!();
46
47 loop {
48 let readline = rl.readline("debugger> ");
49 match readline {
50 Ok(line) => {
51 let _ = rl.add_history_entry(line.as_str());
52 let trimmed = line.trim();
53
54 if trimmed.is_empty() {
55 continue;
56 }
57
58 let parts: Vec<&str> = trimmed.splitn(2, ' ').collect();
59 let command = parts[0];
60 let args = parts.get(1).unwrap_or(&"");
61
62 match command {
63 "load" => {
64 if let Err(e) = self.handle_load(args).await {
65 eprintln!("β Error: {}", e);
66 }
67 }
68 "batch" | "b" => {
69 if let Err(e) = self.handle_batch(args).await {
70 eprintln!("β Error: {}", e);
71 }
72 }
73 "step" | "s" => {
74 if let Err(e) = self.handle_step(args).await {
75 eprintln!("β Error: {}", e);
76 }
77 }
78 "break" => {
79 self.handle_break(args);
80 }
81 "unbreak" => {
82 self.handle_unbreak(args);
83 }
84 "list" | "l" => {
85 self.handle_list();
86 }
87 "breakpoints" | "bp" => {
88 self.handle_breakpoints();
89 }
90 "run" | "r" => {
91 if let Err(e) = self.handle_run().await {
92 eprintln!("β Error: {}", e);
93 }
94 }
95 "help" | "h" | "?" => {
96 self.show_help();
97 }
98 "quit" | "exit" | "q" => {
99 println!("π Goodbye!");
100 break;
101 }
102 _ => {
103 eprintln!("β Unknown command: {}. Type 'help' for commands.", command);
104 }
105 }
106 }
107 Err(ReadlineError::Interrupted) => {
108 println!("CTRL-C");
109 break;
110 }
111 Err(ReadlineError::Eof) => {
112 println!("CTRL-D");
113 break;
114 }
115 Err(err) => {
116 eprintln!("Error: {:?}", err);
117 break;
118 }
119 }
120 }
121
122 let _ = rl.save_history(".fuserule_debug_history");
123 Ok(())
124 }
125
126 async fn handle_load(&mut self, config_path: &str) -> Result<()> {
127 if config_path.is_empty() {
128 eprintln!("Usage: load <config_file>");
129 return Ok(());
130 }
131
132 let config = FuseRuleConfig::from_file(config_path)?;
133
134 let mut fields = Vec::new();
136 for f in config.schema {
137 let dt = match f.data_type.as_str() {
138 "int32" => DataType::Int32,
139 "int64" => DataType::Int64,
140 "float32" => DataType::Float32,
141 "float64" => DataType::Float64,
142 "bool" => DataType::Boolean,
143 "utf8" | "string" => DataType::Utf8,
144 _ => DataType::Utf8,
145 };
146 fields.push(Field::new(f.name, dt, true));
147 }
148 self.schema = Arc::new(Schema::new(fields));
149
150 println!("β
Loaded config from: {}", config_path);
151 println!(" Rules: {}", config.rules.len());
152 println!(" Schema fields: {}", self.schema.fields().len());
153 for field in self.schema.fields() {
154 println!(" - {} ({:?})", field.name(), field.data_type());
155 }
156
157 Ok(())
158 }
159
160 async fn handle_batch(&mut self, json_str: &str) -> Result<()> {
161 if json_str.is_empty() {
162 eprintln!("Usage: batch <json_data>");
163 eprintln!(" Example: batch '[{{\"price\": 150, \"volume\": 1000}}]'");
164 return Ok(());
165 }
166
167 use arrow_json::ReaderBuilder;
169 use std::io::Cursor;
170
171 let json_value: serde_json::Value =
172 serde_json::from_str(json_str).context("Failed to parse JSON")?;
173
174 let json_data = serde_json::to_vec(&json_value)?;
175 let cursor = Cursor::new(json_data);
176
177 let mut reader = ReaderBuilder::new(self.schema.clone())
178 .build(cursor)
179 .context("Failed to create JSON reader")?;
180
181 if let Some(batch_result) = reader.next() {
183 match batch_result {
184 Ok(batch) => {
185 self.last_batch = Some(batch.clone());
186 println!("β
Loaded batch: {} rows", batch.num_rows());
187 println!(" Columns:");
188 for field in batch.schema().fields() {
189 println!(" - {} ({:?})", field.name(), field.data_type());
190 }
191 return Ok(());
192 }
193 Err(e) => {
194 anyhow::bail!("Failed to read batch: {}", e);
195 }
196 }
197 }
198
199 anyhow::bail!("No data found in JSON");
200 }
201
202 async fn handle_step(&mut self, rule_id: &str) -> Result<()> {
203 if rule_id.is_empty() {
204 eprintln!("Usage: step <rule_id> [predicate]");
205 eprintln!(" Example: step r1 'price > 100'");
206 return Ok(());
207 }
208
209 let batch = match &self.last_batch {
210 Some(b) => b,
211 None => {
212 eprintln!("β No batch loaded. Use 'batch <json>' first.");
213 return Ok(());
214 }
215 };
216
217 let parts: Vec<&str> = rule_id.splitn(2, ' ').collect();
219 let rule_id = parts[0];
220 let predicate = parts
221 .get(1)
222 .map(|s| s.trim_matches('\'').trim_matches('"').to_string())
223 .unwrap_or_else(|| "price > 100".to_string());
224
225 let rule = Rule {
227 id: rule_id.to_string(),
228 name: format!("Debug Rule {}", rule_id),
229 predicate,
230 action: "logger".to_string(),
231 window_seconds: None,
232 version: 1,
233 enabled: true,
234 };
235
236 println!("π Stepping through rule: {}", rule_id);
237 println!(" Predicate: {}", rule.predicate);
238 println!(" Batch: {} rows", batch.num_rows());
239
240 println!("π Compiling predicate...");
242 let compiled = self.evaluator.compile(rule.clone(), &self.schema)?;
243 println!(" β
Compiled successfully");
244 if compiled.has_aggregates {
245 println!(" β οΈ Contains aggregate functions");
246 }
247
248 if self.breakpoints.contains(rule_id) {
250 println!("βΈοΈ Breakpoint hit at rule: {}", rule_id);
251 println!(" Press Enter to continue...");
252 let mut line = String::new();
253 std::io::stdin().read_line(&mut line)?;
254 }
255
256 println!("βοΈ Evaluating predicate...");
258 let results = self
259 .evaluator
260 .evaluate_batch(batch, &[compiled], &[vec![]])
261 .await?;
262
263 let result = results[0].0;
264 println!("π Result: {:?}", result);
265 println!(
266 " {}",
267 if matches!(result, PredicateResult::True) {
268 "β
Predicate is TRUE"
269 } else {
270 "β Predicate is FALSE"
271 }
272 );
273
274 if let Some(matched_batch) = &results[0].1 {
275 println!(" Matched rows: {}", matched_batch.num_rows());
276 }
277
278 Ok(())
279 }
280
281 fn handle_break(&mut self, rule_id: &str) {
282 if rule_id.is_empty() {
283 eprintln!("Usage: break <rule_id>");
284 return;
285 }
286 self.breakpoints.insert(rule_id.to_string());
287 println!("β
Breakpoint set on rule: {}", rule_id);
288 }
289
290 fn handle_unbreak(&mut self, rule_id: &str) {
291 if rule_id.is_empty() {
292 eprintln!("Usage: unbreak <rule_id>");
293 return;
294 }
295 if self.breakpoints.remove(rule_id) {
296 println!("β
Breakpoint removed from rule: {}", rule_id);
297 } else {
298 println!("β οΈ No breakpoint found on rule: {}", rule_id);
299 }
300 }
301
302 fn handle_list(&self) {
303 println!("π Loaded rules:");
304 println!(" (Use 'load <config>' to load rules from config)");
305 }
306
307 fn handle_breakpoints(&self) {
308 if self.breakpoints.is_empty() {
309 println!("No breakpoints set");
310 } else {
311 println!("βΈοΈ Breakpoints:");
312 for bp in &self.breakpoints {
313 println!(" - {}", bp);
314 }
315 }
316 }
317
318 async fn handle_run(&mut self) -> Result<()> {
319 let batch = match &self.last_batch {
320 Some(b) => b,
321 None => {
322 eprintln!("β No batch loaded. Use 'batch <json>' first.");
323 return Ok(());
324 }
325 };
326
327 println!(
328 "π Running evaluation on batch ({} rows)...",
329 batch.num_rows()
330 );
331 println!(" (This is a simplified run - use 'step' for detailed debugging)");
332
333 Ok(())
334 }
335
336 fn show_help(&self) {
337 println!("Commands:");
338 println!(" load <config> - Load rules from config file");
339 println!(" batch <json> - Load test batch from JSON");
340 println!(" step <rule_id> - Step through rule evaluation");
341 println!(" break <id> - Set breakpoint on rule");
342 println!(" unbreak <id> - Remove breakpoint");
343 println!(" list - List all rules");
344 println!(" breakpoints - Show breakpoints");
345 println!(" run - Run evaluation on current batch");
346 println!(" help - Show this help");
347 println!(" quit/exit - Exit debugger");
348 }
349}