fuse_rule/
repl.rs

1use crate::RuleEngine;
2use anyhow::{Context, Result};
3use arrow::datatypes::Schema;
4use arrow_json::ReaderBuilder;
5use rustyline::error::ReadlineError;
6use rustyline::DefaultEditor;
7use serde_json::Value;
8use std::io::Cursor;
9use std::sync::Arc;
10use tokio::sync::RwLock;
11
12pub struct Repl {
13    engine: Arc<RwLock<RuleEngine>>,
14    schema: Arc<Schema>,
15}
16
17impl Repl {
18    pub fn new(engine: Arc<RwLock<RuleEngine>>, schema: Arc<Schema>) -> Self {
19        Self { engine, schema }
20    }
21
22    pub async fn run(&mut self) -> Result<()> {
23        let mut rl = DefaultEditor::new()?;
24        let _ = rl.load_history(".fuserule_history");
25
26        println!("🔥 FuseRule Interactive REPL");
27        println!("Commands:");
28        println!("  ingest <json>  - Ingest JSON data");
29        println!("  state          - Show all rule states");
30        println!("  state <id>     - Show state for specific rule");
31        println!("  rules          - List all rules");
32        println!("  eval <pred>    - Evaluate predicate on last batch");
33        println!("  help           - Show this help");
34        println!("  quit/exit      - Exit REPL");
35        println!();
36
37        loop {
38            let readline = rl.readline("fuserule> ");
39            match readline {
40                Ok(line) => {
41                    let _ = rl.add_history_entry(line.as_str());
42                    let trimmed = line.trim();
43
44                    if trimmed.is_empty() {
45                        continue;
46                    }
47
48                    let parts: Vec<&str> = trimmed.splitn(2, ' ').collect();
49                    let command = parts[0];
50                    let args = parts.get(1).unwrap_or(&"");
51
52                    match command {
53                        "ingest" | "i" => {
54                            if let Err(e) = self.handle_ingest(args).await {
55                                eprintln!("❌ Error: {}", e);
56                            }
57                        }
58                        "state" | "s" => {
59                            if let Err(e) = self.handle_state(args).await {
60                                eprintln!("❌ Error: {}", e);
61                            }
62                        }
63                        "rules" | "r" => {
64                            if let Err(e) = self.handle_rules().await {
65                                eprintln!("❌ Error: {}", e);
66                            }
67                        }
68                        "eval" | "e" => {
69                            if let Err(e) = self.handle_eval(args).await {
70                                eprintln!("❌ Error: {}", e);
71                            }
72                        }
73                        "help" | "h" | "?" => {
74                            self.show_help();
75                        }
76                        "quit" | "exit" | "q" => {
77                            println!("👋 Goodbye!");
78                            break;
79                        }
80                        _ => {
81                            eprintln!("❌ Unknown command: {}. Type 'help' for commands.", command);
82                        }
83                    }
84                }
85                Err(ReadlineError::Interrupted) => {
86                    println!("CTRL-C");
87                    break;
88                }
89                Err(ReadlineError::Eof) => {
90                    println!("CTRL-D");
91                    break;
92                }
93                Err(err) => {
94                    eprintln!("Error: {:?}", err);
95                    break;
96                }
97            }
98        }
99
100        let _ = rl.save_history(".fuserule_history");
101        Ok(())
102    }
103
104    async fn handle_ingest(&self, json_str: &str) -> Result<()> {
105        if json_str.is_empty() {
106            eprintln!("Usage: ingest <json_data>");
107            return Ok(());
108        }
109
110        // Parse JSON
111        let json_value: Value = serde_json::from_str(json_str).context("Failed to parse JSON")?;
112
113        // Convert to RecordBatch
114        let json_data = serde_json::to_vec(&json_value)?;
115        let cursor = Cursor::new(json_data);
116
117        let reader = ReaderBuilder::new(self.schema.clone())
118            .build(cursor)
119            .context("Failed to create JSON reader")?;
120
121        let mut engine = self.engine.write().await;
122        let mut batch_count = 0;
123
124        for batch_result in reader {
125            match batch_result {
126                Ok(batch) => {
127                    batch_count += 1;
128                    let traces = engine.process_batch(&batch).await?;
129
130                    println!(
131                        "✅ Ingested batch #{} ({} rows)",
132                        batch_count,
133                        batch.num_rows()
134                    );
135
136                    for trace in traces {
137                        if trace.action_fired {
138                            println!(
139                                "  🔔 Rule '{}' ({}) {} -> {}",
140                                trace.rule_name,
141                                trace.rule_id,
142                                match trace.transition.as_str() {
143                                    "Activated" => "⚡ ACTIVATED",
144                                    "Deactivated" => "🔻 DEACTIVATED",
145                                    _ => "",
146                                },
147                                if matches!(trace.result, crate::state::PredicateResult::True) {
148                                    "TRUE"
149                                } else {
150                                    "FALSE"
151                                }
152                            );
153                        }
154                    }
155                }
156                Err(e) => {
157                    eprintln!("❌ Failed to read batch: {}", e);
158                }
159            }
160        }
161
162        Ok(())
163    }
164
165    async fn handle_state(&self, rule_id: &str) -> Result<()> {
166        let engine = self.engine.read().await;
167
168        if rule_id.is_empty() {
169            // Show all states
170            println!("📊 Rule States:");
171            for rule in &engine.rules {
172                let last_result = engine.state.get_last_result(rule.rule.id.as_str()).await?;
173                let window_size = engine
174                    .window_buffers
175                    .get(rule.rule.id.as_str())
176                    .map(|b| {
177                        b.get_batches()
178                            .iter()
179                            .map(|batch| batch.num_rows())
180                            .sum::<usize>()
181                    })
182                    .unwrap_or(0);
183
184                println!(
185                    "  {} {} [{}] - Window: {} rows",
186                    if matches!(last_result, crate::state::PredicateResult::True) {
187                        "🟢"
188                    } else {
189                        "⚪"
190                    },
191                    rule.rule.name,
192                    rule.rule.id,
193                    window_size
194                );
195            }
196        } else {
197            // Show specific rule state
198            let rule = engine.rules.iter().find(|r| r.rule.id == rule_id);
199            if let Some(rule) = rule {
200                let last_result = engine.state.get_last_result(rule_id).await?;
201                let last_transition = engine.state.get_last_transition_time(rule_id).await?;
202                let window_size = engine
203                    .window_buffers
204                    .get(rule_id)
205                    .map(|b| {
206                        b.get_batches()
207                            .iter()
208                            .map(|batch| batch.num_rows())
209                            .sum::<usize>()
210                    })
211                    .unwrap_or(0);
212
213                println!("📊 Rule State: {}", rule.rule.name);
214                println!("  ID: {}", rule.rule.id);
215                println!("  Predicate: {}", rule.rule.predicate);
216                println!("  Current State: {:?}", last_result);
217                println!("  Window Size: {} rows", window_size);
218                println!("  Enabled: {}", rule.rule.enabled);
219                if let Some(ts) = last_transition {
220                    println!("  Last Transition: {}", ts);
221                }
222            } else {
223                eprintln!("❌ Rule not found: {}", rule_id);
224            }
225        }
226
227        Ok(())
228    }
229
230    async fn handle_rules(&self) -> Result<()> {
231        let engine = self.engine.read().await;
232
233        println!("📋 Rules:");
234        for (i, rule) in engine.rules.iter().enumerate() {
235            println!(
236                "  {}. {} ({}) - {} [{}]",
237                i + 1,
238                rule.rule.name,
239                rule.rule.id,
240                if rule.rule.enabled { "✅" } else { "❌" },
241                rule.rule.predicate
242            );
243        }
244
245        Ok(())
246    }
247
248    async fn handle_eval(&self, predicate: &str) -> Result<()> {
249        if predicate.is_empty() {
250            eprintln!("Usage: eval <predicate>");
251            return Ok(());
252        }
253
254        eprintln!("⚠️  Eval command requires last batch - use 'ingest' first");
255        eprintln!("   This feature will be enhanced in the debugger mode");
256        Ok(())
257    }
258
259    fn show_help(&self) {
260        println!("Commands:");
261        println!("  ingest <json>  - Ingest JSON data");
262        println!("  state          - Show all rule states");
263        println!("  state <id>     - Show state for specific rule");
264        println!("  rules          - List all rules");
265        println!("  eval <pred>    - Evaluate predicate on last batch");
266        println!("  help           - Show this help");
267        println!("  quit/exit      - Exit REPL");
268    }
269}