Skip to main content

varpulis_cli/
lib.rs

1#![allow(missing_docs)]
2//! Varpulis CLI library - testable functions and modules
3//!
4//! This library provides the core functionality for the Varpulis CLI,
5//! organized into dedicated modules for security, WebSocket handling, and more.
6
7#[cfg(feature = "saas")]
8pub mod admin;
9pub mod api;
10pub mod audit;
11pub mod auth;
12pub mod billing;
13pub mod client;
14pub mod config;
15#[cfg(feature = "saas")]
16pub mod email;
17pub mod oauth;
18pub mod oidc;
19#[cfg(feature = "saas")]
20pub mod org;
21pub mod playground;
22pub mod rate_limit;
23pub mod security;
24#[cfg(feature = "saas")]
25pub mod tenant_context;
26pub mod users;
27pub mod websocket;
28
29use std::path::PathBuf;
30
31use anyhow::Result;
32use varpulis_core::ast::{Program, Stmt};
33use varpulis_parser::parse;
34
35/// Parse and validate VPL source code
36pub fn check_syntax(source: &str) -> Result<()> {
37    match parse(source) {
38        Ok(program) => {
39            println!("Syntax OK ({} statements)", program.statements.len());
40            Ok(())
41        }
42        Err(e) => {
43            println!("Syntax error: {e}");
44            Err(anyhow::anyhow!("Parse error: {e}"))
45        }
46    }
47}
48
49/// Parse and return statement count
50pub fn parse_program(source: &str) -> Result<usize> {
51    let program = parse(source).map_err(|e| anyhow::anyhow!("Parse error: {e}"))?;
52    Ok(program.statements.len())
53}
54
55/// Validate program can be loaded by engine
56pub fn validate_program(source: &str) -> Result<usize> {
57    let program = parse(source).map_err(|e| anyhow::anyhow!("Parse error: {e}"))?;
58    let statement_count = program.statements.len();
59
60    // Try to create engine and load program
61    let (output_tx, _output_rx) = tokio::sync::mpsc::channel(100);
62    let mut engine = varpulis_runtime::engine::Engine::new(output_tx);
63    engine
64        .load(&program)
65        .map_err(|e| anyhow::anyhow!("Load error: {e}"))?;
66
67    Ok(statement_count)
68}
69
70// =============================================================================
71// Import resolution (moved from main.rs for testability)
72// =============================================================================
73
74/// Maximum depth for nested imports to prevent stack overflow
75pub const MAX_IMPORT_DEPTH: usize = 10;
76
77/// Resolve `@import` statements by loading and parsing imported files.
78///
79/// Recursively processes imports with cycle detection and depth limiting.
80/// Imported statements are prepended to the program (before main file statements).
81pub fn resolve_imports(program: &mut Program, base_path: Option<&PathBuf>) -> Result<()> {
82    use std::collections::HashSet;
83    let mut visited = HashSet::new();
84    resolve_imports_inner(program, base_path, 0, &mut visited)
85}
86
87fn resolve_imports_inner(
88    program: &mut Program,
89    base_path: Option<&PathBuf>,
90    depth: usize,
91    visited: &mut std::collections::HashSet<PathBuf>,
92) -> Result<()> {
93    if depth > MAX_IMPORT_DEPTH {
94        anyhow::bail!(
95            "Import depth limit exceeded (max {MAX_IMPORT_DEPTH}). Check for circular imports."
96        );
97    }
98
99    let mut imported_statements = Vec::new();
100    let mut imports_to_process = Vec::new();
101
102    for stmt in &program.statements {
103        if let Stmt::Import { path, .. } = &stmt.node {
104            imports_to_process.push(path.clone());
105        }
106    }
107
108    for import_path in imports_to_process {
109        let full_path = if let Some(base) = base_path {
110            base.join(&import_path)
111        } else {
112            PathBuf::from(&import_path)
113        };
114
115        let canonical_path = full_path.canonicalize().map_err(|e| {
116            anyhow::anyhow!("Failed to resolve import '{}': {}", full_path.display(), e)
117        })?;
118
119        if visited.contains(&canonical_path) {
120            continue;
121        }
122        visited.insert(canonical_path.clone());
123
124        let import_source = std::fs::read_to_string(&full_path).map_err(|e| {
125            anyhow::anyhow!("Failed to read import '{}': {}", full_path.display(), e)
126        })?;
127
128        let import_program = parse(&import_source).map_err(|e| {
129            anyhow::anyhow!("Parse error in import '{}': {}", full_path.display(), e)
130        })?;
131
132        let import_base = full_path.parent().map(|p| p.to_path_buf());
133        let mut imported = import_program;
134        resolve_imports_inner(&mut imported, import_base.as_ref(), depth + 1, visited)?;
135
136        imported_statements.extend(imported.statements);
137    }
138
139    program
140        .statements
141        .retain(|stmt| !matches!(&stmt.node, Stmt::Import { .. }));
142
143    let mut new_statements = imported_statements;
144    new_statements.append(&mut program.statements);
145    program.statements = new_statements;
146
147    Ok(())
148}
149
150// =============================================================================
151// Simulation from source (for testing)
152// =============================================================================
153
154/// Run a VPL program against a list of events and collect output events.
155///
156/// This is the testable core of the `simulate` command — parses VPL source,
157/// loads it into an engine, processes all input events, and returns emitted events.
158pub async fn simulate_from_source(
159    vpl: &str,
160    events: Vec<varpulis_runtime::event::Event>,
161) -> Result<Vec<varpulis_runtime::event::Event>> {
162    use varpulis_runtime::event::Event;
163
164    let program = parse(vpl).map_err(|e| anyhow::anyhow!("Parse error: {e}"))?;
165
166    let (output_tx, mut output_rx) = tokio::sync::mpsc::channel::<Event>(10_000);
167    let mut engine = varpulis_runtime::engine::Engine::new(output_tx);
168    engine
169        .load(&program)
170        .map_err(|e| anyhow::anyhow!("Load error: {e}"))?;
171
172    engine
173        .process_batch_sync(events)
174        .map_err(|e| anyhow::anyhow!("Process error: {e}"))?;
175
176    // Drop the sender side so the receiver knows no more events are coming
177    drop(engine);
178
179    let mut results = Vec::new();
180    while let Ok(event) = output_rx.try_recv() {
181        results.push(event);
182    }
183
184    Ok(results)
185}
186
187#[cfg(test)]
188mod tests {
189    use super::*;
190
191    #[test]
192    fn test_check_syntax_valid() {
193        let source = r#"
194            stream HighTemp = TempReading
195                .where(temperature > 30)
196                .emit(alert_type: "high_temp")
197        "#;
198        assert!(check_syntax(source).is_ok());
199    }
200
201    #[test]
202    fn test_check_syntax_invalid() {
203        let source = r"
204            stream Invalid =
205                .where(
206        ";
207        assert!(check_syntax(source).is_err());
208    }
209
210    #[test]
211    fn test_parse_program_valid() {
212        let source = r"
213            stream Test = Events
214                .where(value > 10)
215        ";
216        let result = parse_program(source);
217        assert!(result.is_ok());
218        assert_eq!(result.expect("should succeed"), 1);
219    }
220
221    #[test]
222    fn test_parse_program_invalid() {
223        // Use truly invalid syntax (unclosed parenthesis)
224        let source = "stream x = (";
225        assert!(parse_program(source).is_err());
226    }
227
228    #[tokio::test]
229    async fn test_validate_program_simple() {
230        let source = r#"
231            stream Simple = Events
232                .where(x > 0)
233                .emit(alert_type: "test")
234        "#;
235        let result = validate_program(source);
236        assert!(result.is_ok());
237        assert_eq!(result.expect("should succeed"), 1);
238    }
239
240    #[tokio::test]
241    async fn test_validate_program_multiple_streams() {
242        let source = r#"
243            stream A = Events
244                .where(event_type == "a")
245                .emit(alert_type: "a")
246
247            stream B = Events
248                .where(event_type == "b")
249                .emit(alert_type: "b")
250        "#;
251        let result = validate_program(source);
252        assert!(result.is_ok());
253        assert_eq!(result.expect("should succeed"), 2);
254    }
255
256    #[tokio::test]
257    async fn test_validate_program_with_filter() {
258        let source = r#"
259            stream Filtered = Metrics
260                .where(value > 100)
261                .emit(alert_type: "high_value")
262        "#;
263        let result = validate_program(source);
264        assert!(result.is_ok());
265    }
266
267    #[test]
268    fn test_check_syntax_followed_by() {
269        let source = r#"
270            stream Pattern = Events
271                .pattern(p: A -> B)
272                .emit(alert_type: "sequence_match")
273        "#;
274        assert!(check_syntax(source).is_ok());
275    }
276
277    #[test]
278    fn test_check_syntax_event_declaration() {
279        let source = r"
280            event TempReading:
281                sensor_id: str
282                temperature: float
283        ";
284        assert!(check_syntax(source).is_ok());
285    }
286
287    #[test]
288    fn test_check_syntax_function_declaration() {
289        let source = r"
290            fn celsius_to_fahrenheit(c: float) -> float:
291                c * 9.0 / 5.0 + 32.0
292        ";
293        assert!(check_syntax(source).is_ok());
294    }
295
296    #[test]
297    fn test_check_syntax_pattern_matching() {
298        let source = r#"
299            stream PatternMatch = Events
300                .pattern(p: A -> B)
301                .emit(alert_type: "pattern")
302        "#;
303        assert!(check_syntax(source).is_ok());
304    }
305
306    #[test]
307    fn test_check_syntax_merge() {
308        let source = r#"
309            stream Merged = merge(StreamA, StreamB)
310                .emit(alert_type: "merged")
311        "#;
312        assert!(check_syntax(source).is_ok());
313    }
314}