Skip to main content

varpulis_cli/
lib.rs

1//! Varpulis CLI library - testable functions and modules
2//!
3//! This library provides the core functionality for the Varpulis CLI,
4//! organized into dedicated modules for security, WebSocket handling, and more.
5
6pub mod api;
7pub mod audit;
8pub mod auth;
9pub mod billing;
10pub mod client;
11pub mod config;
12pub mod oauth;
13pub mod oidc;
14#[cfg(feature = "saas")]
15pub mod org;
16pub mod playground;
17pub mod rate_limit;
18pub mod security;
19pub mod users;
20pub mod websocket;
21
22use anyhow::Result;
23use std::path::PathBuf;
24use varpulis_core::ast::{Program, Stmt};
25use varpulis_parser::parse;
26
27/// Parse and validate VPL source code
28pub fn check_syntax(source: &str) -> Result<()> {
29    match parse(source) {
30        Ok(program) => {
31            println!("Syntax OK ({} statements)", program.statements.len());
32            Ok(())
33        }
34        Err(e) => {
35            println!("Syntax error: {e}");
36            Err(anyhow::anyhow!("Parse error: {e}"))
37        }
38    }
39}
40
41/// Parse and return statement count
42pub fn parse_program(source: &str) -> Result<usize> {
43    let program = parse(source).map_err(|e| anyhow::anyhow!("Parse error: {e}"))?;
44    Ok(program.statements.len())
45}
46
47/// Validate program can be loaded by engine
48pub fn validate_program(source: &str) -> Result<usize> {
49    let program = parse(source).map_err(|e| anyhow::anyhow!("Parse error: {e}"))?;
50    let statement_count = program.statements.len();
51
52    // Try to create engine and load program
53    let (output_tx, _output_rx) = tokio::sync::mpsc::channel(100);
54    let mut engine = varpulis_runtime::engine::Engine::new(output_tx);
55    engine
56        .load(&program)
57        .map_err(|e| anyhow::anyhow!("Load error: {e}"))?;
58
59    Ok(statement_count)
60}
61
62// =============================================================================
63// Import resolution (moved from main.rs for testability)
64// =============================================================================
65
66/// Maximum depth for nested imports to prevent stack overflow
67pub const MAX_IMPORT_DEPTH: usize = 10;
68
69/// Resolve `@import` statements by loading and parsing imported files.
70///
71/// Recursively processes imports with cycle detection and depth limiting.
72/// Imported statements are prepended to the program (before main file statements).
73pub fn resolve_imports(program: &mut Program, base_path: Option<&PathBuf>) -> Result<()> {
74    use std::collections::HashSet;
75    let mut visited = HashSet::new();
76    resolve_imports_inner(program, base_path, 0, &mut visited)
77}
78
79fn resolve_imports_inner(
80    program: &mut Program,
81    base_path: Option<&PathBuf>,
82    depth: usize,
83    visited: &mut std::collections::HashSet<PathBuf>,
84) -> Result<()> {
85    if depth > MAX_IMPORT_DEPTH {
86        anyhow::bail!(
87            "Import depth limit exceeded (max {MAX_IMPORT_DEPTH}). Check for circular imports."
88        );
89    }
90
91    let mut imported_statements = Vec::new();
92    let mut imports_to_process = Vec::new();
93
94    for stmt in &program.statements {
95        if let Stmt::Import { path, .. } = &stmt.node {
96            imports_to_process.push(path.clone());
97        }
98    }
99
100    for import_path in imports_to_process {
101        let full_path = if let Some(base) = base_path {
102            base.join(&import_path)
103        } else {
104            PathBuf::from(&import_path)
105        };
106
107        let canonical_path = full_path.canonicalize().map_err(|e| {
108            anyhow::anyhow!("Failed to resolve import '{}': {}", full_path.display(), e)
109        })?;
110
111        if visited.contains(&canonical_path) {
112            continue;
113        }
114        visited.insert(canonical_path.clone());
115
116        let import_source = std::fs::read_to_string(&full_path).map_err(|e| {
117            anyhow::anyhow!("Failed to read import '{}': {}", full_path.display(), e)
118        })?;
119
120        let import_program = parse(&import_source).map_err(|e| {
121            anyhow::anyhow!("Parse error in import '{}': {}", full_path.display(), e)
122        })?;
123
124        let import_base = full_path.parent().map(|p| p.to_path_buf());
125        let mut imported = import_program;
126        resolve_imports_inner(&mut imported, import_base.as_ref(), depth + 1, visited)?;
127
128        imported_statements.extend(imported.statements);
129    }
130
131    program
132        .statements
133        .retain(|stmt| !matches!(&stmt.node, Stmt::Import { .. }));
134
135    let mut new_statements = imported_statements;
136    new_statements.append(&mut program.statements);
137    program.statements = new_statements;
138
139    Ok(())
140}
141
142// =============================================================================
143// Simulation from source (for testing)
144// =============================================================================
145
146/// Run a VPL program against a list of events and collect output events.
147///
148/// This is the testable core of the `simulate` command — parses VPL source,
149/// loads it into an engine, processes all input events, and returns emitted events.
150pub async fn simulate_from_source(
151    vpl: &str,
152    events: Vec<varpulis_runtime::event::Event>,
153) -> Result<Vec<varpulis_runtime::event::Event>> {
154    use varpulis_runtime::event::Event;
155
156    let program = parse(vpl).map_err(|e| anyhow::anyhow!("Parse error: {e}"))?;
157
158    let (output_tx, mut output_rx) = tokio::sync::mpsc::channel::<Event>(10_000);
159    let mut engine = varpulis_runtime::engine::Engine::new(output_tx);
160    engine
161        .load(&program)
162        .map_err(|e| anyhow::anyhow!("Load error: {e}"))?;
163
164    engine
165        .process_batch_sync(events)
166        .map_err(|e| anyhow::anyhow!("Process error: {e}"))?;
167
168    // Drop the sender side so the receiver knows no more events are coming
169    drop(engine);
170
171    let mut results = Vec::new();
172    while let Ok(event) = output_rx.try_recv() {
173        results.push(event);
174    }
175
176    Ok(results)
177}
178
179#[cfg(test)]
180mod tests {
181    use super::*;
182
183    #[test]
184    fn test_check_syntax_valid() {
185        let source = r#"
186            stream HighTemp = TempReading
187                .where(temperature > 30)
188                .emit(alert_type: "high_temp")
189        "#;
190        assert!(check_syntax(source).is_ok());
191    }
192
193    #[test]
194    fn test_check_syntax_invalid() {
195        let source = r"
196            stream Invalid =
197                .where(
198        ";
199        assert!(check_syntax(source).is_err());
200    }
201
202    #[test]
203    fn test_parse_program_valid() {
204        let source = r"
205            stream Test = Events
206                .where(value > 10)
207        ";
208        let result = parse_program(source);
209        assert!(result.is_ok());
210        assert_eq!(result.expect("should succeed"), 1);
211    }
212
213    #[test]
214    fn test_parse_program_invalid() {
215        // Use truly invalid syntax (unclosed parenthesis)
216        let source = "stream x = (";
217        assert!(parse_program(source).is_err());
218    }
219
220    #[tokio::test]
221    async fn test_validate_program_simple() {
222        let source = r#"
223            stream Simple = Events
224                .where(x > 0)
225                .emit(alert_type: "test")
226        "#;
227        let result = validate_program(source);
228        assert!(result.is_ok());
229        assert_eq!(result.expect("should succeed"), 1);
230    }
231
232    #[tokio::test]
233    async fn test_validate_program_multiple_streams() {
234        let source = r#"
235            stream A = Events
236                .where(event_type == "a")
237                .emit(alert_type: "a")
238
239            stream B = Events
240                .where(event_type == "b")
241                .emit(alert_type: "b")
242        "#;
243        let result = validate_program(source);
244        assert!(result.is_ok());
245        assert_eq!(result.expect("should succeed"), 2);
246    }
247
248    #[tokio::test]
249    async fn test_validate_program_with_filter() {
250        let source = r#"
251            stream Filtered = Metrics
252                .where(value > 100)
253                .emit(alert_type: "high_value")
254        "#;
255        let result = validate_program(source);
256        assert!(result.is_ok());
257    }
258
259    #[test]
260    fn test_check_syntax_followed_by() {
261        let source = r#"
262            stream Pattern = Events
263                .pattern(p: A -> B)
264                .emit(alert_type: "sequence_match")
265        "#;
266        assert!(check_syntax(source).is_ok());
267    }
268
269    #[test]
270    fn test_check_syntax_event_declaration() {
271        let source = r"
272            event TempReading:
273                sensor_id: str
274                temperature: float
275        ";
276        assert!(check_syntax(source).is_ok());
277    }
278
279    #[test]
280    fn test_check_syntax_function_declaration() {
281        let source = r"
282            fn celsius_to_fahrenheit(c: float) -> float:
283                c * 9.0 / 5.0 + 32.0
284        ";
285        assert!(check_syntax(source).is_ok());
286    }
287
288    #[test]
289    fn test_check_syntax_pattern_matching() {
290        let source = r#"
291            stream PatternMatch = Events
292                .pattern(p: A -> B)
293                .emit(alert_type: "pattern")
294        "#;
295        assert!(check_syntax(source).is_ok());
296    }
297
298    #[test]
299    fn test_check_syntax_merge() {
300        let source = r#"
301            stream Merged = merge(StreamA, StreamB)
302                .emit(alert_type: "merged")
303        "#;
304        assert!(check_syntax(source).is_ok());
305    }
306}