Skip to main content

varpulis_cli/
lib.rs

1#![allow(missing_docs)]
2//! Varpulis CLI library - testable functions and modules
3//!
4//! This library provides the core functionality for the Varpulis CLI,
5//! organized into dedicated modules for security, WebSocket handling, and more.
6
7#[cfg(feature = "saas")]
8pub mod admin;
9pub mod api;
10pub mod audit;
11pub mod auth;
12pub mod billing;
13pub mod client;
14pub mod config;
15#[cfg(feature = "saas")]
16pub mod email;
17pub mod oauth;
18pub mod oidc;
19#[cfg(feature = "saas")]
20pub mod org;
21pub mod output;
22pub mod playground;
23pub mod rate_limit;
24pub mod security;
25#[cfg(feature = "saas")]
26pub mod tenant_context;
27pub mod users;
28pub mod websocket;
29
30use std::path::PathBuf;
31
32use anyhow::Result;
33use varpulis_core::ast::{Program, Stmt};
34use varpulis_parser::parse;
35
36/// Parse and validate VPL source code
37pub fn check_syntax(source: &str) -> Result<()> {
38    match parse(source) {
39        Ok(program) => {
40            println!("Syntax OK ({} statements)", program.statements.len());
41            Ok(())
42        }
43        Err(e) => {
44            println!("Syntax error: {e}");
45            Err(anyhow::anyhow!("Parse error: {e}"))
46        }
47    }
48}
49
50/// Parse and return statement count
51pub fn parse_program(source: &str) -> Result<usize> {
52    let program = parse(source).map_err(|e| anyhow::anyhow!("Parse error: {e}"))?;
53    Ok(program.statements.len())
54}
55
56/// Validate program can be loaded by engine
57pub fn validate_program(source: &str) -> Result<usize> {
58    let program = parse(source).map_err(|e| anyhow::anyhow!("Parse error: {e}"))?;
59    let statement_count = program.statements.len();
60
61    // Try to create engine and load program
62    let (output_tx, _output_rx) = tokio::sync::mpsc::channel(100);
63    let mut engine = varpulis_runtime::engine::Engine::new(output_tx);
64    engine
65        .load(&program)
66        .map_err(|e| anyhow::anyhow!("Load error: {e}"))?;
67
68    Ok(statement_count)
69}
70
71// =============================================================================
72// Import resolution (moved from main.rs for testability)
73// =============================================================================
74
75/// Maximum depth for nested imports to prevent stack overflow
76pub const MAX_IMPORT_DEPTH: usize = 10;
77
78/// Resolve `@import` statements by loading and parsing imported files.
79///
80/// Recursively processes imports with cycle detection and depth limiting.
81/// Imported statements are prepended to the program (before main file statements).
82pub fn resolve_imports(program: &mut Program, base_path: Option<&PathBuf>) -> Result<()> {
83    use std::collections::HashSet;
84    let mut visited = HashSet::new();
85    resolve_imports_inner(program, base_path, 0, &mut visited)
86}
87
88fn resolve_imports_inner(
89    program: &mut Program,
90    base_path: Option<&PathBuf>,
91    depth: usize,
92    visited: &mut std::collections::HashSet<PathBuf>,
93) -> Result<()> {
94    if depth > MAX_IMPORT_DEPTH {
95        anyhow::bail!(
96            "Import depth limit exceeded (max {MAX_IMPORT_DEPTH}). Check for circular imports."
97        );
98    }
99
100    let mut imported_statements = Vec::new();
101    let mut imports_to_process = Vec::new();
102
103    for stmt in &program.statements {
104        if let Stmt::Import { path, .. } = &stmt.node {
105            imports_to_process.push(path.clone());
106        }
107    }
108
109    for import_path in imports_to_process {
110        let full_path = if let Some(base) = base_path {
111            base.join(&import_path)
112        } else {
113            PathBuf::from(&import_path)
114        };
115
116        let canonical_path = full_path.canonicalize().map_err(|e| {
117            anyhow::anyhow!("Failed to resolve import '{}': {}", full_path.display(), e)
118        })?;
119
120        if visited.contains(&canonical_path) {
121            continue;
122        }
123        visited.insert(canonical_path.clone());
124
125        let import_source = std::fs::read_to_string(&full_path).map_err(|e| {
126            anyhow::anyhow!("Failed to read import '{}': {}", full_path.display(), e)
127        })?;
128
129        let import_program = parse(&import_source).map_err(|e| {
130            anyhow::anyhow!("Parse error in import '{}': {}", full_path.display(), e)
131        })?;
132
133        let import_base = full_path.parent().map(|p| p.to_path_buf());
134        let mut imported = import_program;
135        resolve_imports_inner(&mut imported, import_base.as_ref(), depth + 1, visited)?;
136
137        imported_statements.extend(imported.statements);
138    }
139
140    program
141        .statements
142        .retain(|stmt| !matches!(&stmt.node, Stmt::Import { .. }));
143
144    let mut new_statements = imported_statements;
145    new_statements.append(&mut program.statements);
146    program.statements = new_statements;
147
148    Ok(())
149}
150
151// =============================================================================
152// Simulation from source (for testing)
153// =============================================================================
154
155/// Run a VPL program against a list of events and collect output events.
156///
157/// This is the testable core of the `simulate` command — parses VPL source,
158/// loads it into an engine, processes all input events, and returns emitted events.
159pub async fn simulate_from_source(
160    vpl: &str,
161    events: Vec<varpulis_runtime::event::Event>,
162) -> Result<Vec<varpulis_runtime::event::Event>> {
163    use varpulis_runtime::event::Event;
164
165    let program = parse(vpl).map_err(|e| anyhow::anyhow!("Parse error: {e}"))?;
166
167    let (output_tx, mut output_rx) = tokio::sync::mpsc::channel::<Event>(10_000);
168    let mut engine = varpulis_runtime::engine::Engine::new(output_tx);
169    engine
170        .load(&program)
171        .map_err(|e| anyhow::anyhow!("Load error: {e}"))?;
172
173    engine
174        .process_batch_sync(events)
175        .map_err(|e| anyhow::anyhow!("Process error: {e}"))?;
176
177    // Drop the sender side so the receiver knows no more events are coming
178    drop(engine);
179
180    let mut results = Vec::new();
181    while let Ok(event) = output_rx.try_recv() {
182        results.push(event);
183    }
184
185    Ok(results)
186}
187
188/// Parse a duration string like "60s", "5m", "1h" into seconds.
189///
190/// Supports suffixes: `s` (seconds), `m` (minutes), `h` (hours).
191/// If no suffix is provided, assumes seconds.
192pub fn parse_duration_str(s: &str) -> Result<u64> {
193    let s = s.trim();
194    if s.is_empty() {
195        anyhow::bail!("Empty duration string");
196    }
197    let (num_part, suffix) = if let Some(stripped) = s.strip_suffix('s') {
198        (stripped, "s")
199    } else if let Some(stripped) = s.strip_suffix('m') {
200        (stripped, "m")
201    } else if let Some(stripped) = s.strip_suffix('h') {
202        (stripped, "h")
203    } else {
204        (s, "s")
205    };
206    let value: u64 = num_part
207        .parse()
208        .map_err(|_| anyhow::anyhow!("Invalid duration number: '{num_part}'"))?;
209    match suffix {
210        "s" => Ok(value),
211        "m" => Ok(value * 60),
212        "h" => Ok(value * 3600),
213        _ => anyhow::bail!("Unknown duration suffix: '{suffix}'"),
214    }
215}
216
217#[cfg(test)]
218mod tests {
219    use super::*;
220
221    #[test]
222    fn test_check_syntax_valid() {
223        let source = r#"
224            stream HighTemp = TempReading
225                .where(temperature > 30)
226                .emit(alert_type: "high_temp")
227        "#;
228        assert!(check_syntax(source).is_ok());
229    }
230
231    #[test]
232    fn test_check_syntax_invalid() {
233        let source = r"
234            stream Invalid =
235                .where(
236        ";
237        assert!(check_syntax(source).is_err());
238    }
239
240    #[test]
241    fn test_parse_program_valid() {
242        let source = r"
243            stream Test = Events
244                .where(value > 10)
245        ";
246        let result = parse_program(source);
247        assert!(result.is_ok());
248        assert_eq!(result.expect("should succeed"), 1);
249    }
250
251    #[test]
252    fn test_parse_program_invalid() {
253        // Use truly invalid syntax (unclosed parenthesis)
254        let source = "stream x = (";
255        assert!(parse_program(source).is_err());
256    }
257
258    #[tokio::test]
259    async fn test_validate_program_simple() {
260        let source = r#"
261            stream Simple = Events
262                .where(x > 0)
263                .emit(alert_type: "test")
264        "#;
265        let result = validate_program(source);
266        assert!(result.is_ok());
267        assert_eq!(result.expect("should succeed"), 1);
268    }
269
270    #[tokio::test]
271    async fn test_validate_program_multiple_streams() {
272        let source = r#"
273            stream A = Events
274                .where(event_type == "a")
275                .emit(alert_type: "a")
276
277            stream B = Events
278                .where(event_type == "b")
279                .emit(alert_type: "b")
280        "#;
281        let result = validate_program(source);
282        assert!(result.is_ok());
283        assert_eq!(result.expect("should succeed"), 2);
284    }
285
286    #[tokio::test]
287    async fn test_validate_program_with_filter() {
288        let source = r#"
289            stream Filtered = Metrics
290                .where(value > 100)
291                .emit(alert_type: "high_value")
292        "#;
293        let result = validate_program(source);
294        assert!(result.is_ok());
295    }
296
297    #[test]
298    fn test_check_syntax_followed_by() {
299        let source = r#"
300            stream Pattern = Events
301                .pattern(p: A -> B)
302                .emit(alert_type: "sequence_match")
303        "#;
304        assert!(check_syntax(source).is_ok());
305    }
306
307    #[test]
308    fn test_check_syntax_event_declaration() {
309        let source = r"
310            event TempReading:
311                sensor_id: str
312                temperature: float
313        ";
314        assert!(check_syntax(source).is_ok());
315    }
316
317    #[test]
318    fn test_check_syntax_function_declaration() {
319        let source = r"
320            fn celsius_to_fahrenheit(c: float) -> float:
321                c * 9.0 / 5.0 + 32.0
322        ";
323        assert!(check_syntax(source).is_ok());
324    }
325
326    #[test]
327    fn test_check_syntax_pattern_matching() {
328        let source = r#"
329            stream PatternMatch = Events
330                .pattern(p: A -> B)
331                .emit(alert_type: "pattern")
332        "#;
333        assert!(check_syntax(source).is_ok());
334    }
335
336    #[test]
337    fn test_check_syntax_merge() {
338        let source = r#"
339            stream Merged = merge(StreamA, StreamB)
340                .emit(alert_type: "merged")
341        "#;
342        assert!(check_syntax(source).is_ok());
343    }
344}