fuse_rule/
debugger.rs

1use crate::config::FuseRuleConfig;
2use crate::evaluator::{DataFusionEvaluator, RuleEvaluator};
3use crate::rule::Rule;
4use crate::state::PredicateResult;
5use anyhow::{Context, Result};
6use arrow::datatypes::{DataType, Field, Schema};
7use arrow::record_batch::RecordBatch;
8use rustyline::error::ReadlineError;
9use rustyline::DefaultEditor;
10use std::sync::Arc;
11
12pub struct RuleDebugger {
13    evaluator: DataFusionEvaluator,
14    schema: Arc<Schema>,
15    last_batch: Option<RecordBatch>,
16    breakpoints: std::collections::HashSet<String>,
17}
18
19impl RuleDebugger {
20    pub fn new(schema: Arc<Schema>) -> Self {
21        Self {
22            evaluator: DataFusionEvaluator::new(),
23            schema,
24            last_batch: None,
25            breakpoints: std::collections::HashSet::new(),
26        }
27    }
28
29    pub async fn run(&mut self) -> Result<()> {
30        let mut rl = DefaultEditor::new()?;
31        let _ = rl.load_history(".fuserule_debug_history");
32
33        println!("πŸ› FuseRule Rule Debugger");
34        println!("Commands:");
35        println!("  load <config>  - Load rules from config file");
36        println!("  batch <json>   - Load test batch from JSON");
37        println!("  step <rule_id> - Step through rule evaluation");
38        println!("  break <id>     - Set breakpoint on rule");
39        println!("  unbreak <id>   - Remove breakpoint");
40        println!("  list           - List all rules");
41        println!("  breakpoints    - Show breakpoints");
42        println!("  run            - Run evaluation on current batch");
43        println!("  help           - Show this help");
44        println!("  quit/exit      - Exit debugger");
45        println!();
46
47        loop {
48            let readline = rl.readline("debugger> ");
49            match readline {
50                Ok(line) => {
51                    let _ = rl.add_history_entry(line.as_str());
52                    let trimmed = line.trim();
53
54                    if trimmed.is_empty() {
55                        continue;
56                    }
57
58                    let parts: Vec<&str> = trimmed.splitn(2, ' ').collect();
59                    let command = parts[0];
60                    let args = parts.get(1).unwrap_or(&"");
61
62                    match command {
63                        "load" => {
64                            if let Err(e) = self.handle_load(args).await {
65                                eprintln!("❌ Error: {}", e);
66                            }
67                        }
68                        "batch" | "b" => {
69                            if let Err(e) = self.handle_batch(args).await {
70                                eprintln!("❌ Error: {}", e);
71                            }
72                        }
73                        "step" | "s" => {
74                            if let Err(e) = self.handle_step(args).await {
75                                eprintln!("❌ Error: {}", e);
76                            }
77                        }
78                        "break" => {
79                            self.handle_break(args);
80                        }
81                        "unbreak" => {
82                            self.handle_unbreak(args);
83                        }
84                        "list" | "l" => {
85                            self.handle_list();
86                        }
87                        "breakpoints" | "bp" => {
88                            self.handle_breakpoints();
89                        }
90                        "run" | "r" => {
91                            if let Err(e) = self.handle_run().await {
92                                eprintln!("❌ Error: {}", e);
93                            }
94                        }
95                        "help" | "h" | "?" => {
96                            self.show_help();
97                        }
98                        "quit" | "exit" | "q" => {
99                            println!("πŸ‘‹ Goodbye!");
100                            break;
101                        }
102                        _ => {
103                            eprintln!("❌ Unknown command: {}. Type 'help' for commands.", command);
104                        }
105                    }
106                }
107                Err(ReadlineError::Interrupted) => {
108                    println!("CTRL-C");
109                    break;
110                }
111                Err(ReadlineError::Eof) => {
112                    println!("CTRL-D");
113                    break;
114                }
115                Err(err) => {
116                    eprintln!("Error: {:?}", err);
117                    break;
118                }
119            }
120        }
121
122        let _ = rl.save_history(".fuserule_debug_history");
123        Ok(())
124    }
125
126    async fn handle_load(&mut self, config_path: &str) -> Result<()> {
127        if config_path.is_empty() {
128            eprintln!("Usage: load <config_file>");
129            return Ok(());
130        }
131
132        let config = FuseRuleConfig::from_file(config_path)?;
133
134        // Update schema from config
135        let mut fields = Vec::new();
136        for f in config.schema {
137            let dt = match f.data_type.as_str() {
138                "int32" => DataType::Int32,
139                "int64" => DataType::Int64,
140                "float32" => DataType::Float32,
141                "float64" => DataType::Float64,
142                "bool" => DataType::Boolean,
143                "utf8" | "string" => DataType::Utf8,
144                _ => DataType::Utf8,
145            };
146            fields.push(Field::new(f.name, dt, true));
147        }
148        self.schema = Arc::new(Schema::new(fields));
149
150        println!("βœ… Loaded config from: {}", config_path);
151        println!("   Rules: {}", config.rules.len());
152        println!("   Schema fields: {}", self.schema.fields().len());
153        for field in self.schema.fields() {
154            println!("     - {} ({:?})", field.name(), field.data_type());
155        }
156
157        Ok(())
158    }
159
160    async fn handle_batch(&mut self, json_str: &str) -> Result<()> {
161        if json_str.is_empty() {
162            eprintln!("Usage: batch <json_data>");
163            eprintln!("   Example: batch '[{{\"price\": 150, \"volume\": 1000}}]'");
164            return Ok(());
165        }
166
167        // Use arrow-json to parse JSON into RecordBatch
168        use arrow_json::ReaderBuilder;
169        use std::io::Cursor;
170
171        let json_value: serde_json::Value =
172            serde_json::from_str(json_str).context("Failed to parse JSON")?;
173
174        let json_data = serde_json::to_vec(&json_value)?;
175        let cursor = Cursor::new(json_data);
176
177        let mut reader = ReaderBuilder::new(self.schema.clone())
178            .build(cursor)
179            .context("Failed to create JSON reader")?;
180
181        // Read first batch
182        if let Some(batch_result) = reader.next() {
183            match batch_result {
184                Ok(batch) => {
185                    self.last_batch = Some(batch.clone());
186                    println!("βœ… Loaded batch: {} rows", batch.num_rows());
187                    println!("   Columns:");
188                    for field in batch.schema().fields() {
189                        println!("     - {} ({:?})", field.name(), field.data_type());
190                    }
191                    return Ok(());
192                }
193                Err(e) => {
194                    anyhow::bail!("Failed to read batch: {}", e);
195                }
196            }
197        }
198
199        anyhow::bail!("No data found in JSON");
200    }
201
202    async fn handle_step(&mut self, rule_id: &str) -> Result<()> {
203        if rule_id.is_empty() {
204            eprintln!("Usage: step <rule_id> [predicate]");
205            eprintln!("   Example: step r1 'price > 100'");
206            return Ok(());
207        }
208
209        let batch = match &self.last_batch {
210            Some(b) => b,
211            None => {
212                eprintln!("❌ No batch loaded. Use 'batch <json>' first.");
213                return Ok(());
214            }
215        };
216
217        // Parse predicate if provided, otherwise use default
218        let parts: Vec<&str> = rule_id.splitn(2, ' ').collect();
219        let rule_id = parts[0];
220        let predicate = parts
221            .get(1)
222            .map(|s| s.trim_matches('\'').trim_matches('"').to_string())
223            .unwrap_or_else(|| "price > 100".to_string());
224
225        // Create a test rule for stepping
226        let rule = Rule {
227            id: rule_id.to_string(),
228            name: format!("Debug Rule {}", rule_id),
229            predicate,
230            action: "logger".to_string(),
231            window_seconds: None,
232            version: 1,
233            enabled: true,
234        };
235
236        println!("πŸ” Stepping through rule: {}", rule_id);
237        println!("   Predicate: {}", rule.predicate);
238        println!("   Batch: {} rows", batch.num_rows());
239
240        // Compile
241        println!("πŸ“ Compiling predicate...");
242        let compiled = self.evaluator.compile(rule.clone(), &self.schema)?;
243        println!("   βœ… Compiled successfully");
244        if compiled.has_aggregates {
245            println!("   ⚠️  Contains aggregate functions");
246        }
247
248        // Check breakpoint
249        if self.breakpoints.contains(rule_id) {
250            println!("⏸️  Breakpoint hit at rule: {}", rule_id);
251            println!("   Press Enter to continue...");
252            let mut line = String::new();
253            std::io::stdin().read_line(&mut line)?;
254        }
255
256        // Evaluate
257        println!("βš™οΈ  Evaluating predicate...");
258        let results = self
259            .evaluator
260            .evaluate_batch(batch, &[compiled], &[vec![]])
261            .await?;
262
263        let result = results[0].0;
264        println!("πŸ“Š Result: {:?}", result);
265        println!(
266            "   {}",
267            if matches!(result, PredicateResult::True) {
268                "βœ… Predicate is TRUE"
269            } else {
270                "❌ Predicate is FALSE"
271            }
272        );
273
274        if let Some(matched_batch) = &results[0].1 {
275            println!("   Matched rows: {}", matched_batch.num_rows());
276        }
277
278        Ok(())
279    }
280
281    fn handle_break(&mut self, rule_id: &str) {
282        if rule_id.is_empty() {
283            eprintln!("Usage: break <rule_id>");
284            return;
285        }
286        self.breakpoints.insert(rule_id.to_string());
287        println!("βœ… Breakpoint set on rule: {}", rule_id);
288    }
289
290    fn handle_unbreak(&mut self, rule_id: &str) {
291        if rule_id.is_empty() {
292            eprintln!("Usage: unbreak <rule_id>");
293            return;
294        }
295        if self.breakpoints.remove(rule_id) {
296            println!("βœ… Breakpoint removed from rule: {}", rule_id);
297        } else {
298            println!("⚠️  No breakpoint found on rule: {}", rule_id);
299        }
300    }
301
302    fn handle_list(&self) {
303        println!("πŸ“‹ Loaded rules:");
304        println!("   (Use 'load <config>' to load rules from config)");
305    }
306
307    fn handle_breakpoints(&self) {
308        if self.breakpoints.is_empty() {
309            println!("No breakpoints set");
310        } else {
311            println!("⏸️  Breakpoints:");
312            for bp in &self.breakpoints {
313                println!("   - {}", bp);
314            }
315        }
316    }
317
318    async fn handle_run(&mut self) -> Result<()> {
319        let batch = match &self.last_batch {
320            Some(b) => b,
321            None => {
322                eprintln!("❌ No batch loaded. Use 'batch <json>' first.");
323                return Ok(());
324            }
325        };
326
327        println!(
328            "πŸš€ Running evaluation on batch ({} rows)...",
329            batch.num_rows()
330        );
331        println!("   (This is a simplified run - use 'step' for detailed debugging)");
332
333        Ok(())
334    }
335
336    fn show_help(&self) {
337        println!("Commands:");
338        println!("  load <config>  - Load rules from config file");
339        println!("  batch <json>   - Load test batch from JSON");
340        println!("  step <rule_id> - Step through rule evaluation");
341        println!("  break <id>     - Set breakpoint on rule");
342        println!("  unbreak <id>   - Remove breakpoint");
343        println!("  list           - List all rules");
344        println!("  breakpoints    - Show breakpoints");
345        println!("  run            - Run evaluation on current batch");
346        println!("  help           - Show this help");
347        println!("  quit/exit      - Exit debugger");
348    }
349}