Skip to main content

varpulis_cli/
lib.rs

1//! Varpulis CLI library - testable functions and modules
2//!
3//! This library provides the core functionality for the Varpulis CLI,
4//! organized into dedicated modules for security, WebSocket handling, and more.
5
6pub mod api;
7pub mod audit;
8pub mod auth;
9pub mod billing;
10pub mod client;
11pub mod config;
12pub mod oauth;
13pub mod oidc;
14#[cfg(feature = "saas")]
15pub mod org;
16pub mod playground;
17pub mod rate_limit;
18pub mod security;
19pub mod users;
20pub mod websocket;
21
22use anyhow::Result;
23use std::path::PathBuf;
24use varpulis_core::ast::{Program, Stmt};
25use varpulis_parser::parse;
26
27/// Parse and validate VPL source code
28pub fn check_syntax(source: &str) -> Result<()> {
29    match parse(source) {
30        Ok(program) => {
31            println!("Syntax OK ({} statements)", program.statements.len());
32            Ok(())
33        }
34        Err(e) => {
35            println!("Syntax error: {}", e);
36            Err(anyhow::anyhow!("Parse error: {}", e))
37        }
38    }
39}
40
41/// Parse and return statement count
42pub fn parse_program(source: &str) -> Result<usize> {
43    let program = parse(source).map_err(|e| anyhow::anyhow!("Parse error: {}", e))?;
44    Ok(program.statements.len())
45}
46
47/// Validate program can be loaded by engine
48pub fn validate_program(source: &str) -> Result<usize> {
49    let program = parse(source).map_err(|e| anyhow::anyhow!("Parse error: {}", e))?;
50    let statement_count = program.statements.len();
51
52    // Try to create engine and load program
53    let (output_tx, _output_rx) = tokio::sync::mpsc::channel(100);
54    let mut engine = varpulis_runtime::engine::Engine::new(output_tx);
55    engine
56        .load(&program)
57        .map_err(|e| anyhow::anyhow!("Load error: {}", e))?;
58
59    Ok(statement_count)
60}
61
62// =============================================================================
63// Import resolution (moved from main.rs for testability)
64// =============================================================================
65
66/// Maximum depth for nested imports to prevent stack overflow
67pub const MAX_IMPORT_DEPTH: usize = 10;
68
69/// Resolve `@import` statements by loading and parsing imported files.
70///
71/// Recursively processes imports with cycle detection and depth limiting.
72/// Imported statements are prepended to the program (before main file statements).
73pub fn resolve_imports(program: &mut Program, base_path: Option<&PathBuf>) -> Result<()> {
74    use std::collections::HashSet;
75    let mut visited = HashSet::new();
76    resolve_imports_inner(program, base_path, 0, &mut visited)
77}
78
79fn resolve_imports_inner(
80    program: &mut Program,
81    base_path: Option<&PathBuf>,
82    depth: usize,
83    visited: &mut std::collections::HashSet<PathBuf>,
84) -> Result<()> {
85    if depth > MAX_IMPORT_DEPTH {
86        anyhow::bail!(
87            "Import depth limit exceeded (max {}). Check for circular imports.",
88            MAX_IMPORT_DEPTH
89        );
90    }
91
92    let mut imported_statements = Vec::new();
93    let mut imports_to_process = Vec::new();
94
95    for stmt in &program.statements {
96        if let Stmt::Import { path, .. } = &stmt.node {
97            imports_to_process.push(path.clone());
98        }
99    }
100
101    for import_path in imports_to_process {
102        let full_path = if let Some(base) = base_path {
103            base.join(&import_path)
104        } else {
105            PathBuf::from(&import_path)
106        };
107
108        let canonical_path = full_path.canonicalize().map_err(|e| {
109            anyhow::anyhow!("Failed to resolve import '{}': {}", full_path.display(), e)
110        })?;
111
112        if visited.contains(&canonical_path) {
113            continue;
114        }
115        visited.insert(canonical_path.clone());
116
117        let import_source = std::fs::read_to_string(&full_path).map_err(|e| {
118            anyhow::anyhow!("Failed to read import '{}': {}", full_path.display(), e)
119        })?;
120
121        let import_program = parse(&import_source).map_err(|e| {
122            anyhow::anyhow!("Parse error in import '{}': {}", full_path.display(), e)
123        })?;
124
125        let import_base = full_path.parent().map(|p| p.to_path_buf());
126        let mut imported = import_program;
127        resolve_imports_inner(&mut imported, import_base.as_ref(), depth + 1, visited)?;
128
129        imported_statements.extend(imported.statements);
130    }
131
132    program
133        .statements
134        .retain(|stmt| !matches!(&stmt.node, Stmt::Import { .. }));
135
136    let mut new_statements = imported_statements;
137    new_statements.append(&mut program.statements);
138    program.statements = new_statements;
139
140    Ok(())
141}
142
143// =============================================================================
144// Simulation from source (for testing)
145// =============================================================================
146
147/// Run a VPL program against a list of events and collect output events.
148///
149/// This is the testable core of the `simulate` command — parses VPL source,
150/// loads it into an engine, processes all input events, and returns emitted events.
151pub async fn simulate_from_source(
152    vpl: &str,
153    events: Vec<varpulis_runtime::event::Event>,
154) -> Result<Vec<varpulis_runtime::event::Event>> {
155    use varpulis_runtime::event::Event;
156
157    let program = parse(vpl).map_err(|e| anyhow::anyhow!("Parse error: {}", e))?;
158
159    let (output_tx, mut output_rx) = tokio::sync::mpsc::channel::<Event>(10_000);
160    let mut engine = varpulis_runtime::engine::Engine::new(output_tx);
161    engine
162        .load(&program)
163        .map_err(|e| anyhow::anyhow!("Load error: {}", e))?;
164
165    engine
166        .process_batch_sync(events)
167        .map_err(|e| anyhow::anyhow!("Process error: {}", e))?;
168
169    // Drop the sender side so the receiver knows no more events are coming
170    drop(engine);
171
172    let mut results = Vec::new();
173    while let Ok(event) = output_rx.try_recv() {
174        results.push(event);
175    }
176
177    Ok(results)
178}
179
180#[cfg(test)]
181mod tests {
182    use super::*;
183
184    #[test]
185    fn test_check_syntax_valid() {
186        let source = r#"
187            stream HighTemp = TempReading
188                .where(temperature > 30)
189                .emit(alert_type: "high_temp")
190        "#;
191        assert!(check_syntax(source).is_ok());
192    }
193
194    #[test]
195    fn test_check_syntax_invalid() {
196        let source = r#"
197            stream Invalid =
198                .where(
199        "#;
200        assert!(check_syntax(source).is_err());
201    }
202
203    #[test]
204    fn test_parse_program_valid() {
205        let source = r#"
206            stream Test = Events
207                .where(value > 10)
208        "#;
209        let result = parse_program(source);
210        assert!(result.is_ok());
211        assert_eq!(result.expect("should succeed"), 1);
212    }
213
214    #[test]
215    fn test_parse_program_invalid() {
216        // Use truly invalid syntax (unclosed parenthesis)
217        let source = "stream x = (";
218        assert!(parse_program(source).is_err());
219    }
220
221    #[tokio::test]
222    async fn test_validate_program_simple() {
223        let source = r#"
224            stream Simple = Events
225                .where(x > 0)
226                .emit(alert_type: "test")
227        "#;
228        let result = validate_program(source);
229        assert!(result.is_ok());
230        assert_eq!(result.expect("should succeed"), 1);
231    }
232
233    #[tokio::test]
234    async fn test_validate_program_multiple_streams() {
235        let source = r#"
236            stream A = Events
237                .where(event_type == "a")
238                .emit(alert_type: "a")
239
240            stream B = Events
241                .where(event_type == "b")
242                .emit(alert_type: "b")
243        "#;
244        let result = validate_program(source);
245        assert!(result.is_ok());
246        assert_eq!(result.expect("should succeed"), 2);
247    }
248
249    #[tokio::test]
250    async fn test_validate_program_with_filter() {
251        let source = r#"
252            stream Filtered = Metrics
253                .where(value > 100)
254                .emit(alert_type: "high_value")
255        "#;
256        let result = validate_program(source);
257        assert!(result.is_ok());
258    }
259
260    #[test]
261    fn test_check_syntax_followed_by() {
262        let source = r#"
263            stream Pattern = Events
264                .pattern(p: A -> B)
265                .emit(alert_type: "sequence_match")
266        "#;
267        assert!(check_syntax(source).is_ok());
268    }
269
270    #[test]
271    fn test_check_syntax_event_declaration() {
272        let source = r#"
273            event TempReading:
274                sensor_id: str
275                temperature: float
276        "#;
277        assert!(check_syntax(source).is_ok());
278    }
279
280    #[test]
281    fn test_check_syntax_function_declaration() {
282        let source = r#"
283            fn celsius_to_fahrenheit(c: float) -> float:
284                c * 9.0 / 5.0 + 32.0
285        "#;
286        assert!(check_syntax(source).is_ok());
287    }
288
289    #[test]
290    fn test_check_syntax_pattern_matching() {
291        let source = r#"
292            stream PatternMatch = Events
293                .pattern(p: A -> B)
294                .emit(alert_type: "pattern")
295        "#;
296        assert!(check_syntax(source).is_ok());
297    }
298
299    #[test]
300    fn test_check_syntax_merge() {
301        let source = r#"
302            stream Merged = merge(StreamA, StreamB)
303                .emit(alert_type: "merged")
304        "#;
305        assert!(check_syntax(source).is_ok());
306    }
307}