1pub mod api;
7pub mod audit;
8pub mod auth;
9pub mod billing;
10pub mod client;
11pub mod config;
12pub mod oauth;
13pub mod oidc;
14#[cfg(feature = "saas")]
15pub mod org;
16pub mod playground;
17pub mod rate_limit;
18pub mod security;
19pub mod users;
20pub mod websocket;
21
22use anyhow::Result;
23use std::path::PathBuf;
24use varpulis_core::ast::{Program, Stmt};
25use varpulis_parser::parse;
26
27pub fn check_syntax(source: &str) -> Result<()> {
29 match parse(source) {
30 Ok(program) => {
31 println!("Syntax OK ({} statements)", program.statements.len());
32 Ok(())
33 }
34 Err(e) => {
35 println!("Syntax error: {e}");
36 Err(anyhow::anyhow!("Parse error: {e}"))
37 }
38 }
39}
40
41pub fn parse_program(source: &str) -> Result<usize> {
43 let program = parse(source).map_err(|e| anyhow::anyhow!("Parse error: {e}"))?;
44 Ok(program.statements.len())
45}
46
47pub fn validate_program(source: &str) -> Result<usize> {
49 let program = parse(source).map_err(|e| anyhow::anyhow!("Parse error: {e}"))?;
50 let statement_count = program.statements.len();
51
52 let (output_tx, _output_rx) = tokio::sync::mpsc::channel(100);
54 let mut engine = varpulis_runtime::engine::Engine::new(output_tx);
55 engine
56 .load(&program)
57 .map_err(|e| anyhow::anyhow!("Load error: {e}"))?;
58
59 Ok(statement_count)
60}
61
62pub const MAX_IMPORT_DEPTH: usize = 10;
68
69pub fn resolve_imports(program: &mut Program, base_path: Option<&PathBuf>) -> Result<()> {
74 use std::collections::HashSet;
75 let mut visited = HashSet::new();
76 resolve_imports_inner(program, base_path, 0, &mut visited)
77}
78
79fn resolve_imports_inner(
80 program: &mut Program,
81 base_path: Option<&PathBuf>,
82 depth: usize,
83 visited: &mut std::collections::HashSet<PathBuf>,
84) -> Result<()> {
85 if depth > MAX_IMPORT_DEPTH {
86 anyhow::bail!(
87 "Import depth limit exceeded (max {MAX_IMPORT_DEPTH}). Check for circular imports."
88 );
89 }
90
91 let mut imported_statements = Vec::new();
92 let mut imports_to_process = Vec::new();
93
94 for stmt in &program.statements {
95 if let Stmt::Import { path, .. } = &stmt.node {
96 imports_to_process.push(path.clone());
97 }
98 }
99
100 for import_path in imports_to_process {
101 let full_path = if let Some(base) = base_path {
102 base.join(&import_path)
103 } else {
104 PathBuf::from(&import_path)
105 };
106
107 let canonical_path = full_path.canonicalize().map_err(|e| {
108 anyhow::anyhow!("Failed to resolve import '{}': {}", full_path.display(), e)
109 })?;
110
111 if visited.contains(&canonical_path) {
112 continue;
113 }
114 visited.insert(canonical_path.clone());
115
116 let import_source = std::fs::read_to_string(&full_path).map_err(|e| {
117 anyhow::anyhow!("Failed to read import '{}': {}", full_path.display(), e)
118 })?;
119
120 let import_program = parse(&import_source).map_err(|e| {
121 anyhow::anyhow!("Parse error in import '{}': {}", full_path.display(), e)
122 })?;
123
124 let import_base = full_path.parent().map(|p| p.to_path_buf());
125 let mut imported = import_program;
126 resolve_imports_inner(&mut imported, import_base.as_ref(), depth + 1, visited)?;
127
128 imported_statements.extend(imported.statements);
129 }
130
131 program
132 .statements
133 .retain(|stmt| !matches!(&stmt.node, Stmt::Import { .. }));
134
135 let mut new_statements = imported_statements;
136 new_statements.append(&mut program.statements);
137 program.statements = new_statements;
138
139 Ok(())
140}
141
142pub async fn simulate_from_source(
151 vpl: &str,
152 events: Vec<varpulis_runtime::event::Event>,
153) -> Result<Vec<varpulis_runtime::event::Event>> {
154 use varpulis_runtime::event::Event;
155
156 let program = parse(vpl).map_err(|e| anyhow::anyhow!("Parse error: {e}"))?;
157
158 let (output_tx, mut output_rx) = tokio::sync::mpsc::channel::<Event>(10_000);
159 let mut engine = varpulis_runtime::engine::Engine::new(output_tx);
160 engine
161 .load(&program)
162 .map_err(|e| anyhow::anyhow!("Load error: {e}"))?;
163
164 engine
165 .process_batch_sync(events)
166 .map_err(|e| anyhow::anyhow!("Process error: {e}"))?;
167
168 drop(engine);
170
171 let mut results = Vec::new();
172 while let Ok(event) = output_rx.try_recv() {
173 results.push(event);
174 }
175
176 Ok(results)
177}
178
179#[cfg(test)]
180mod tests {
181 use super::*;
182
183 #[test]
184 fn test_check_syntax_valid() {
185 let source = r#"
186 stream HighTemp = TempReading
187 .where(temperature > 30)
188 .emit(alert_type: "high_temp")
189 "#;
190 assert!(check_syntax(source).is_ok());
191 }
192
193 #[test]
194 fn test_check_syntax_invalid() {
195 let source = r"
196 stream Invalid =
197 .where(
198 ";
199 assert!(check_syntax(source).is_err());
200 }
201
202 #[test]
203 fn test_parse_program_valid() {
204 let source = r"
205 stream Test = Events
206 .where(value > 10)
207 ";
208 let result = parse_program(source);
209 assert!(result.is_ok());
210 assert_eq!(result.expect("should succeed"), 1);
211 }
212
213 #[test]
214 fn test_parse_program_invalid() {
215 let source = "stream x = (";
217 assert!(parse_program(source).is_err());
218 }
219
220 #[tokio::test]
221 async fn test_validate_program_simple() {
222 let source = r#"
223 stream Simple = Events
224 .where(x > 0)
225 .emit(alert_type: "test")
226 "#;
227 let result = validate_program(source);
228 assert!(result.is_ok());
229 assert_eq!(result.expect("should succeed"), 1);
230 }
231
232 #[tokio::test]
233 async fn test_validate_program_multiple_streams() {
234 let source = r#"
235 stream A = Events
236 .where(event_type == "a")
237 .emit(alert_type: "a")
238
239 stream B = Events
240 .where(event_type == "b")
241 .emit(alert_type: "b")
242 "#;
243 let result = validate_program(source);
244 assert!(result.is_ok());
245 assert_eq!(result.expect("should succeed"), 2);
246 }
247
248 #[tokio::test]
249 async fn test_validate_program_with_filter() {
250 let source = r#"
251 stream Filtered = Metrics
252 .where(value > 100)
253 .emit(alert_type: "high_value")
254 "#;
255 let result = validate_program(source);
256 assert!(result.is_ok());
257 }
258
259 #[test]
260 fn test_check_syntax_followed_by() {
261 let source = r#"
262 stream Pattern = Events
263 .pattern(p: A -> B)
264 .emit(alert_type: "sequence_match")
265 "#;
266 assert!(check_syntax(source).is_ok());
267 }
268
269 #[test]
270 fn test_check_syntax_event_declaration() {
271 let source = r"
272 event TempReading:
273 sensor_id: str
274 temperature: float
275 ";
276 assert!(check_syntax(source).is_ok());
277 }
278
279 #[test]
280 fn test_check_syntax_function_declaration() {
281 let source = r"
282 fn celsius_to_fahrenheit(c: float) -> float:
283 c * 9.0 / 5.0 + 32.0
284 ";
285 assert!(check_syntax(source).is_ok());
286 }
287
288 #[test]
289 fn test_check_syntax_pattern_matching() {
290 let source = r#"
291 stream PatternMatch = Events
292 .pattern(p: A -> B)
293 .emit(alert_type: "pattern")
294 "#;
295 assert!(check_syntax(source).is_ok());
296 }
297
298 #[test]
299 fn test_check_syntax_merge() {
300 let source = r#"
301 stream Merged = merge(StreamA, StreamB)
302 .emit(alert_type: "merged")
303 "#;
304 assert!(check_syntax(source).is_ok());
305 }
306}