Skip to main content

varpulis_cli/
lib.rs

1#![allow(missing_docs)]
2//! Varpulis CLI library - testable functions and modules
3//!
4//! This library provides the core functionality for the Varpulis CLI,
5//! organized into dedicated modules for security, WebSocket handling, and more.
6
7#[cfg(feature = "saas")]
8pub mod admin;
9pub mod api;
10pub mod audit;
11pub mod auth;
12pub mod billing;
13pub mod client;
14pub mod config;
15#[cfg(feature = "saas")]
16pub mod email;
17pub mod oauth;
18pub mod oidc;
19#[cfg(feature = "saas")]
20pub mod org;
21pub mod playground;
22pub mod rate_limit;
23pub mod security;
24#[cfg(feature = "saas")]
25pub mod tenant_context;
26pub mod users;
27pub mod websocket;
28
29use std::path::PathBuf;
30
31use anyhow::Result;
32use varpulis_core::ast::{Program, Stmt};
33use varpulis_parser::parse;
34
35/// Parse and validate VPL source code
36pub fn check_syntax(source: &str) -> Result<()> {
37    match parse(source) {
38        Ok(program) => {
39            println!("Syntax OK ({} statements)", program.statements.len());
40            Ok(())
41        }
42        Err(e) => {
43            println!("Syntax error: {e}");
44            Err(anyhow::anyhow!("Parse error: {e}"))
45        }
46    }
47}
48
49/// Parse and return statement count
50pub fn parse_program(source: &str) -> Result<usize> {
51    let program = parse(source).map_err(|e| anyhow::anyhow!("Parse error: {e}"))?;
52    Ok(program.statements.len())
53}
54
55/// Validate program can be loaded by engine
56pub fn validate_program(source: &str) -> Result<usize> {
57    let program = parse(source).map_err(|e| anyhow::anyhow!("Parse error: {e}"))?;
58    let statement_count = program.statements.len();
59
60    // Try to create engine and load program
61    let (output_tx, _output_rx) = tokio::sync::mpsc::channel(100);
62    let mut engine = varpulis_runtime::engine::Engine::new(output_tx);
63    engine
64        .load(&program)
65        .map_err(|e| anyhow::anyhow!("Load error: {e}"))?;
66
67    Ok(statement_count)
68}
69
70// =============================================================================
71// Import resolution (moved from main.rs for testability)
72// =============================================================================
73
74/// Maximum depth for nested imports to prevent stack overflow
75pub const MAX_IMPORT_DEPTH: usize = 10;
76
77/// Resolve `@import` statements by loading and parsing imported files.
78///
79/// Recursively processes imports with cycle detection and depth limiting.
80/// Imported statements are prepended to the program (before main file statements).
81pub fn resolve_imports(program: &mut Program, base_path: Option<&PathBuf>) -> Result<()> {
82    use std::collections::HashSet;
83    let mut visited = HashSet::new();
84    resolve_imports_inner(program, base_path, 0, &mut visited)
85}
86
87fn resolve_imports_inner(
88    program: &mut Program,
89    base_path: Option<&PathBuf>,
90    depth: usize,
91    visited: &mut std::collections::HashSet<PathBuf>,
92) -> Result<()> {
93    if depth > MAX_IMPORT_DEPTH {
94        anyhow::bail!(
95            "Import depth limit exceeded (max {MAX_IMPORT_DEPTH}). Check for circular imports."
96        );
97    }
98
99    let mut imported_statements = Vec::new();
100    let mut imports_to_process = Vec::new();
101
102    for stmt in &program.statements {
103        if let Stmt::Import { path, .. } = &stmt.node {
104            imports_to_process.push(path.clone());
105        }
106    }
107
108    for import_path in imports_to_process {
109        let full_path = if let Some(base) = base_path {
110            base.join(&import_path)
111        } else {
112            PathBuf::from(&import_path)
113        };
114
115        let canonical_path = full_path.canonicalize().map_err(|e| {
116            anyhow::anyhow!("Failed to resolve import '{}': {}", full_path.display(), e)
117        })?;
118
119        if visited.contains(&canonical_path) {
120            continue;
121        }
122        visited.insert(canonical_path.clone());
123
124        let import_source = std::fs::read_to_string(&full_path).map_err(|e| {
125            anyhow::anyhow!("Failed to read import '{}': {}", full_path.display(), e)
126        })?;
127
128        let import_program = parse(&import_source).map_err(|e| {
129            anyhow::anyhow!("Parse error in import '{}': {}", full_path.display(), e)
130        })?;
131
132        let import_base = full_path.parent().map(|p| p.to_path_buf());
133        let mut imported = import_program;
134        resolve_imports_inner(&mut imported, import_base.as_ref(), depth + 1, visited)?;
135
136        imported_statements.extend(imported.statements);
137    }
138
139    program
140        .statements
141        .retain(|stmt| !matches!(&stmt.node, Stmt::Import { .. }));
142
143    let mut new_statements = imported_statements;
144    new_statements.append(&mut program.statements);
145    program.statements = new_statements;
146
147    Ok(())
148}
149
150// =============================================================================
151// Simulation from source (for testing)
152// =============================================================================
153
154/// Run a VPL program against a list of events and collect output events.
155///
156/// This is the testable core of the `simulate` command — parses VPL source,
157/// loads it into an engine, processes all input events, and returns emitted events.
158pub async fn simulate_from_source(
159    vpl: &str,
160    events: Vec<varpulis_runtime::event::Event>,
161) -> Result<Vec<varpulis_runtime::event::Event>> {
162    use varpulis_runtime::event::Event;
163
164    let program = parse(vpl).map_err(|e| anyhow::anyhow!("Parse error: {e}"))?;
165
166    let (output_tx, mut output_rx) = tokio::sync::mpsc::channel::<Event>(10_000);
167    let mut engine = varpulis_runtime::engine::Engine::new(output_tx);
168    engine
169        .load(&program)
170        .map_err(|e| anyhow::anyhow!("Load error: {e}"))?;
171
172    engine
173        .process_batch_sync(events)
174        .map_err(|e| anyhow::anyhow!("Process error: {e}"))?;
175
176    // Drop the sender side so the receiver knows no more events are coming
177    drop(engine);
178
179    let mut results = Vec::new();
180    while let Ok(event) = output_rx.try_recv() {
181        results.push(event);
182    }
183
184    Ok(results)
185}
186
187/// Parse a duration string like "60s", "5m", "1h" into seconds.
188///
189/// Supports suffixes: `s` (seconds), `m` (minutes), `h` (hours).
190/// If no suffix is provided, assumes seconds.
191pub fn parse_duration_str(s: &str) -> Result<u64> {
192    let s = s.trim();
193    if s.is_empty() {
194        anyhow::bail!("Empty duration string");
195    }
196    let (num_part, suffix) = if let Some(stripped) = s.strip_suffix('s') {
197        (stripped, "s")
198    } else if let Some(stripped) = s.strip_suffix('m') {
199        (stripped, "m")
200    } else if let Some(stripped) = s.strip_suffix('h') {
201        (stripped, "h")
202    } else {
203        (s, "s")
204    };
205    let value: u64 = num_part
206        .parse()
207        .map_err(|_| anyhow::anyhow!("Invalid duration number: '{num_part}'"))?;
208    match suffix {
209        "s" => Ok(value),
210        "m" => Ok(value * 60),
211        "h" => Ok(value * 3600),
212        _ => anyhow::bail!("Unknown duration suffix: '{suffix}'"),
213    }
214}
215
216#[cfg(test)]
217mod tests {
218    use super::*;
219
220    #[test]
221    fn test_check_syntax_valid() {
222        let source = r#"
223            stream HighTemp = TempReading
224                .where(temperature > 30)
225                .emit(alert_type: "high_temp")
226        "#;
227        assert!(check_syntax(source).is_ok());
228    }
229
230    #[test]
231    fn test_check_syntax_invalid() {
232        let source = r"
233            stream Invalid =
234                .where(
235        ";
236        assert!(check_syntax(source).is_err());
237    }
238
239    #[test]
240    fn test_parse_program_valid() {
241        let source = r"
242            stream Test = Events
243                .where(value > 10)
244        ";
245        let result = parse_program(source);
246        assert!(result.is_ok());
247        assert_eq!(result.expect("should succeed"), 1);
248    }
249
250    #[test]
251    fn test_parse_program_invalid() {
252        // Use truly invalid syntax (unclosed parenthesis)
253        let source = "stream x = (";
254        assert!(parse_program(source).is_err());
255    }
256
257    #[tokio::test]
258    async fn test_validate_program_simple() {
259        let source = r#"
260            stream Simple = Events
261                .where(x > 0)
262                .emit(alert_type: "test")
263        "#;
264        let result = validate_program(source);
265        assert!(result.is_ok());
266        assert_eq!(result.expect("should succeed"), 1);
267    }
268
269    #[tokio::test]
270    async fn test_validate_program_multiple_streams() {
271        let source = r#"
272            stream A = Events
273                .where(event_type == "a")
274                .emit(alert_type: "a")
275
276            stream B = Events
277                .where(event_type == "b")
278                .emit(alert_type: "b")
279        "#;
280        let result = validate_program(source);
281        assert!(result.is_ok());
282        assert_eq!(result.expect("should succeed"), 2);
283    }
284
285    #[tokio::test]
286    async fn test_validate_program_with_filter() {
287        let source = r#"
288            stream Filtered = Metrics
289                .where(value > 100)
290                .emit(alert_type: "high_value")
291        "#;
292        let result = validate_program(source);
293        assert!(result.is_ok());
294    }
295
296    #[test]
297    fn test_check_syntax_followed_by() {
298        let source = r#"
299            stream Pattern = Events
300                .pattern(p: A -> B)
301                .emit(alert_type: "sequence_match")
302        "#;
303        assert!(check_syntax(source).is_ok());
304    }
305
306    #[test]
307    fn test_check_syntax_event_declaration() {
308        let source = r"
309            event TempReading:
310                sensor_id: str
311                temperature: float
312        ";
313        assert!(check_syntax(source).is_ok());
314    }
315
316    #[test]
317    fn test_check_syntax_function_declaration() {
318        let source = r"
319            fn celsius_to_fahrenheit(c: float) -> float:
320                c * 9.0 / 5.0 + 32.0
321        ";
322        assert!(check_syntax(source).is_ok());
323    }
324
325    #[test]
326    fn test_check_syntax_pattern_matching() {
327        let source = r#"
328            stream PatternMatch = Events
329                .pattern(p: A -> B)
330                .emit(alert_type: "pattern")
331        "#;
332        assert!(check_syntax(source).is_ok());
333    }
334
335    #[test]
336    fn test_check_syntax_merge() {
337        let source = r#"
338            stream Merged = merge(StreamA, StreamB)
339                .emit(alert_type: "merged")
340        "#;
341        assert!(check_syntax(source).is_ok());
342    }
343}