1pub mod api;
7pub mod audit;
8pub mod auth;
9pub mod billing;
10pub mod client;
11pub mod config;
12pub mod oauth;
13pub mod oidc;
14#[cfg(feature = "saas")]
15pub mod org;
16pub mod playground;
17pub mod rate_limit;
18pub mod security;
19pub mod users;
20pub mod websocket;
21
22use anyhow::Result;
23use std::path::PathBuf;
24use varpulis_core::ast::{Program, Stmt};
25use varpulis_parser::parse;
26
27pub fn check_syntax(source: &str) -> Result<()> {
29 match parse(source) {
30 Ok(program) => {
31 println!("Syntax OK ({} statements)", program.statements.len());
32 Ok(())
33 }
34 Err(e) => {
35 println!("Syntax error: {}", e);
36 Err(anyhow::anyhow!("Parse error: {}", e))
37 }
38 }
39}
40
41pub fn parse_program(source: &str) -> Result<usize> {
43 let program = parse(source).map_err(|e| anyhow::anyhow!("Parse error: {}", e))?;
44 Ok(program.statements.len())
45}
46
47pub fn validate_program(source: &str) -> Result<usize> {
49 let program = parse(source).map_err(|e| anyhow::anyhow!("Parse error: {}", e))?;
50 let statement_count = program.statements.len();
51
52 let (output_tx, _output_rx) = tokio::sync::mpsc::channel(100);
54 let mut engine = varpulis_runtime::engine::Engine::new(output_tx);
55 engine
56 .load(&program)
57 .map_err(|e| anyhow::anyhow!("Load error: {}", e))?;
58
59 Ok(statement_count)
60}
61
62pub const MAX_IMPORT_DEPTH: usize = 10;
68
69pub fn resolve_imports(program: &mut Program, base_path: Option<&PathBuf>) -> Result<()> {
74 use std::collections::HashSet;
75 let mut visited = HashSet::new();
76 resolve_imports_inner(program, base_path, 0, &mut visited)
77}
78
79fn resolve_imports_inner(
80 program: &mut Program,
81 base_path: Option<&PathBuf>,
82 depth: usize,
83 visited: &mut std::collections::HashSet<PathBuf>,
84) -> Result<()> {
85 if depth > MAX_IMPORT_DEPTH {
86 anyhow::bail!(
87 "Import depth limit exceeded (max {}). Check for circular imports.",
88 MAX_IMPORT_DEPTH
89 );
90 }
91
92 let mut imported_statements = Vec::new();
93 let mut imports_to_process = Vec::new();
94
95 for stmt in &program.statements {
96 if let Stmt::Import { path, .. } = &stmt.node {
97 imports_to_process.push(path.clone());
98 }
99 }
100
101 for import_path in imports_to_process {
102 let full_path = if let Some(base) = base_path {
103 base.join(&import_path)
104 } else {
105 PathBuf::from(&import_path)
106 };
107
108 let canonical_path = full_path.canonicalize().map_err(|e| {
109 anyhow::anyhow!("Failed to resolve import '{}': {}", full_path.display(), e)
110 })?;
111
112 if visited.contains(&canonical_path) {
113 continue;
114 }
115 visited.insert(canonical_path.clone());
116
117 let import_source = std::fs::read_to_string(&full_path).map_err(|e| {
118 anyhow::anyhow!("Failed to read import '{}': {}", full_path.display(), e)
119 })?;
120
121 let import_program = parse(&import_source).map_err(|e| {
122 anyhow::anyhow!("Parse error in import '{}': {}", full_path.display(), e)
123 })?;
124
125 let import_base = full_path.parent().map(|p| p.to_path_buf());
126 let mut imported = import_program;
127 resolve_imports_inner(&mut imported, import_base.as_ref(), depth + 1, visited)?;
128
129 imported_statements.extend(imported.statements);
130 }
131
132 program
133 .statements
134 .retain(|stmt| !matches!(&stmt.node, Stmt::Import { .. }));
135
136 let mut new_statements = imported_statements;
137 new_statements.append(&mut program.statements);
138 program.statements = new_statements;
139
140 Ok(())
141}
142
143pub async fn simulate_from_source(
152 vpl: &str,
153 events: Vec<varpulis_runtime::event::Event>,
154) -> Result<Vec<varpulis_runtime::event::Event>> {
155 use varpulis_runtime::event::Event;
156
157 let program = parse(vpl).map_err(|e| anyhow::anyhow!("Parse error: {}", e))?;
158
159 let (output_tx, mut output_rx) = tokio::sync::mpsc::channel::<Event>(10_000);
160 let mut engine = varpulis_runtime::engine::Engine::new(output_tx);
161 engine
162 .load(&program)
163 .map_err(|e| anyhow::anyhow!("Load error: {}", e))?;
164
165 engine
166 .process_batch_sync(events)
167 .map_err(|e| anyhow::anyhow!("Process error: {}", e))?;
168
169 drop(engine);
171
172 let mut results = Vec::new();
173 while let Ok(event) = output_rx.try_recv() {
174 results.push(event);
175 }
176
177 Ok(results)
178}
179
180#[cfg(test)]
181mod tests {
182 use super::*;
183
184 #[test]
185 fn test_check_syntax_valid() {
186 let source = r#"
187 stream HighTemp = TempReading
188 .where(temperature > 30)
189 .emit(alert_type: "high_temp")
190 "#;
191 assert!(check_syntax(source).is_ok());
192 }
193
194 #[test]
195 fn test_check_syntax_invalid() {
196 let source = r#"
197 stream Invalid =
198 .where(
199 "#;
200 assert!(check_syntax(source).is_err());
201 }
202
203 #[test]
204 fn test_parse_program_valid() {
205 let source = r#"
206 stream Test = Events
207 .where(value > 10)
208 "#;
209 let result = parse_program(source);
210 assert!(result.is_ok());
211 assert_eq!(result.expect("should succeed"), 1);
212 }
213
214 #[test]
215 fn test_parse_program_invalid() {
216 let source = "stream x = (";
218 assert!(parse_program(source).is_err());
219 }
220
221 #[tokio::test]
222 async fn test_validate_program_simple() {
223 let source = r#"
224 stream Simple = Events
225 .where(x > 0)
226 .emit(alert_type: "test")
227 "#;
228 let result = validate_program(source);
229 assert!(result.is_ok());
230 assert_eq!(result.expect("should succeed"), 1);
231 }
232
233 #[tokio::test]
234 async fn test_validate_program_multiple_streams() {
235 let source = r#"
236 stream A = Events
237 .where(event_type == "a")
238 .emit(alert_type: "a")
239
240 stream B = Events
241 .where(event_type == "b")
242 .emit(alert_type: "b")
243 "#;
244 let result = validate_program(source);
245 assert!(result.is_ok());
246 assert_eq!(result.expect("should succeed"), 2);
247 }
248
249 #[tokio::test]
250 async fn test_validate_program_with_filter() {
251 let source = r#"
252 stream Filtered = Metrics
253 .where(value > 100)
254 .emit(alert_type: "high_value")
255 "#;
256 let result = validate_program(source);
257 assert!(result.is_ok());
258 }
259
260 #[test]
261 fn test_check_syntax_followed_by() {
262 let source = r#"
263 stream Pattern = Events
264 .pattern(p: A -> B)
265 .emit(alert_type: "sequence_match")
266 "#;
267 assert!(check_syntax(source).is_ok());
268 }
269
270 #[test]
271 fn test_check_syntax_event_declaration() {
272 let source = r#"
273 event TempReading:
274 sensor_id: str
275 temperature: float
276 "#;
277 assert!(check_syntax(source).is_ok());
278 }
279
280 #[test]
281 fn test_check_syntax_function_declaration() {
282 let source = r#"
283 fn celsius_to_fahrenheit(c: float) -> float:
284 c * 9.0 / 5.0 + 32.0
285 "#;
286 assert!(check_syntax(source).is_ok());
287 }
288
289 #[test]
290 fn test_check_syntax_pattern_matching() {
291 let source = r#"
292 stream PatternMatch = Events
293 .pattern(p: A -> B)
294 .emit(alert_type: "pattern")
295 "#;
296 assert!(check_syntax(source).is_ok());
297 }
298
299 #[test]
300 fn test_check_syntax_merge() {
301 let source = r#"
302 stream Merged = merge(StreamA, StreamB)
303 .emit(alert_type: "merged")
304 "#;
305 assert!(check_syntax(source).is_ok());
306 }
307}