1#![allow(missing_docs)]
2#[cfg(feature = "saas")]
8pub mod admin;
9pub mod api;
10pub mod audit;
11pub mod auth;
12pub mod billing;
13pub mod client;
14pub mod config;
15#[cfg(feature = "saas")]
16pub mod email;
17pub mod oauth;
18pub mod oidc;
19#[cfg(feature = "saas")]
20pub mod org;
21pub mod playground;
22pub mod rate_limit;
23pub mod security;
24#[cfg(feature = "saas")]
25pub mod tenant_context;
26pub mod users;
27pub mod websocket;
28
29use std::path::PathBuf;
30
31use anyhow::Result;
32use varpulis_core::ast::{Program, Stmt};
33use varpulis_parser::parse;
34
35pub fn check_syntax(source: &str) -> Result<()> {
37 match parse(source) {
38 Ok(program) => {
39 println!("Syntax OK ({} statements)", program.statements.len());
40 Ok(())
41 }
42 Err(e) => {
43 println!("Syntax error: {e}");
44 Err(anyhow::anyhow!("Parse error: {e}"))
45 }
46 }
47}
48
49pub fn parse_program(source: &str) -> Result<usize> {
51 let program = parse(source).map_err(|e| anyhow::anyhow!("Parse error: {e}"))?;
52 Ok(program.statements.len())
53}
54
55pub fn validate_program(source: &str) -> Result<usize> {
57 let program = parse(source).map_err(|e| anyhow::anyhow!("Parse error: {e}"))?;
58 let statement_count = program.statements.len();
59
60 let (output_tx, _output_rx) = tokio::sync::mpsc::channel(100);
62 let mut engine = varpulis_runtime::engine::Engine::new(output_tx);
63 engine
64 .load(&program)
65 .map_err(|e| anyhow::anyhow!("Load error: {e}"))?;
66
67 Ok(statement_count)
68}
69
70pub const MAX_IMPORT_DEPTH: usize = 10;
76
77pub fn resolve_imports(program: &mut Program, base_path: Option<&PathBuf>) -> Result<()> {
82 use std::collections::HashSet;
83 let mut visited = HashSet::new();
84 resolve_imports_inner(program, base_path, 0, &mut visited)
85}
86
87fn resolve_imports_inner(
88 program: &mut Program,
89 base_path: Option<&PathBuf>,
90 depth: usize,
91 visited: &mut std::collections::HashSet<PathBuf>,
92) -> Result<()> {
93 if depth > MAX_IMPORT_DEPTH {
94 anyhow::bail!(
95 "Import depth limit exceeded (max {MAX_IMPORT_DEPTH}). Check for circular imports."
96 );
97 }
98
99 let mut imported_statements = Vec::new();
100 let mut imports_to_process = Vec::new();
101
102 for stmt in &program.statements {
103 if let Stmt::Import { path, .. } = &stmt.node {
104 imports_to_process.push(path.clone());
105 }
106 }
107
108 for import_path in imports_to_process {
109 let full_path = if let Some(base) = base_path {
110 base.join(&import_path)
111 } else {
112 PathBuf::from(&import_path)
113 };
114
115 let canonical_path = full_path.canonicalize().map_err(|e| {
116 anyhow::anyhow!("Failed to resolve import '{}': {}", full_path.display(), e)
117 })?;
118
119 if visited.contains(&canonical_path) {
120 continue;
121 }
122 visited.insert(canonical_path.clone());
123
124 let import_source = std::fs::read_to_string(&full_path).map_err(|e| {
125 anyhow::anyhow!("Failed to read import '{}': {}", full_path.display(), e)
126 })?;
127
128 let import_program = parse(&import_source).map_err(|e| {
129 anyhow::anyhow!("Parse error in import '{}': {}", full_path.display(), e)
130 })?;
131
132 let import_base = full_path.parent().map(|p| p.to_path_buf());
133 let mut imported = import_program;
134 resolve_imports_inner(&mut imported, import_base.as_ref(), depth + 1, visited)?;
135
136 imported_statements.extend(imported.statements);
137 }
138
139 program
140 .statements
141 .retain(|stmt| !matches!(&stmt.node, Stmt::Import { .. }));
142
143 let mut new_statements = imported_statements;
144 new_statements.append(&mut program.statements);
145 program.statements = new_statements;
146
147 Ok(())
148}
149
150pub async fn simulate_from_source(
159 vpl: &str,
160 events: Vec<varpulis_runtime::event::Event>,
161) -> Result<Vec<varpulis_runtime::event::Event>> {
162 use varpulis_runtime::event::Event;
163
164 let program = parse(vpl).map_err(|e| anyhow::anyhow!("Parse error: {e}"))?;
165
166 let (output_tx, mut output_rx) = tokio::sync::mpsc::channel::<Event>(10_000);
167 let mut engine = varpulis_runtime::engine::Engine::new(output_tx);
168 engine
169 .load(&program)
170 .map_err(|e| anyhow::anyhow!("Load error: {e}"))?;
171
172 engine
173 .process_batch_sync(events)
174 .map_err(|e| anyhow::anyhow!("Process error: {e}"))?;
175
176 drop(engine);
178
179 let mut results = Vec::new();
180 while let Ok(event) = output_rx.try_recv() {
181 results.push(event);
182 }
183
184 Ok(results)
185}
186
187pub fn parse_duration_str(s: &str) -> Result<u64> {
192 let s = s.trim();
193 if s.is_empty() {
194 anyhow::bail!("Empty duration string");
195 }
196 let (num_part, suffix) = if let Some(stripped) = s.strip_suffix('s') {
197 (stripped, "s")
198 } else if let Some(stripped) = s.strip_suffix('m') {
199 (stripped, "m")
200 } else if let Some(stripped) = s.strip_suffix('h') {
201 (stripped, "h")
202 } else {
203 (s, "s")
204 };
205 let value: u64 = num_part
206 .parse()
207 .map_err(|_| anyhow::anyhow!("Invalid duration number: '{num_part}'"))?;
208 match suffix {
209 "s" => Ok(value),
210 "m" => Ok(value * 60),
211 "h" => Ok(value * 3600),
212 _ => anyhow::bail!("Unknown duration suffix: '{suffix}'"),
213 }
214}
215
216#[cfg(test)]
217mod tests {
218 use super::*;
219
220 #[test]
221 fn test_check_syntax_valid() {
222 let source = r#"
223 stream HighTemp = TempReading
224 .where(temperature > 30)
225 .emit(alert_type: "high_temp")
226 "#;
227 assert!(check_syntax(source).is_ok());
228 }
229
230 #[test]
231 fn test_check_syntax_invalid() {
232 let source = r"
233 stream Invalid =
234 .where(
235 ";
236 assert!(check_syntax(source).is_err());
237 }
238
239 #[test]
240 fn test_parse_program_valid() {
241 let source = r"
242 stream Test = Events
243 .where(value > 10)
244 ";
245 let result = parse_program(source);
246 assert!(result.is_ok());
247 assert_eq!(result.expect("should succeed"), 1);
248 }
249
250 #[test]
251 fn test_parse_program_invalid() {
252 let source = "stream x = (";
254 assert!(parse_program(source).is_err());
255 }
256
257 #[tokio::test]
258 async fn test_validate_program_simple() {
259 let source = r#"
260 stream Simple = Events
261 .where(x > 0)
262 .emit(alert_type: "test")
263 "#;
264 let result = validate_program(source);
265 assert!(result.is_ok());
266 assert_eq!(result.expect("should succeed"), 1);
267 }
268
269 #[tokio::test]
270 async fn test_validate_program_multiple_streams() {
271 let source = r#"
272 stream A = Events
273 .where(event_type == "a")
274 .emit(alert_type: "a")
275
276 stream B = Events
277 .where(event_type == "b")
278 .emit(alert_type: "b")
279 "#;
280 let result = validate_program(source);
281 assert!(result.is_ok());
282 assert_eq!(result.expect("should succeed"), 2);
283 }
284
285 #[tokio::test]
286 async fn test_validate_program_with_filter() {
287 let source = r#"
288 stream Filtered = Metrics
289 .where(value > 100)
290 .emit(alert_type: "high_value")
291 "#;
292 let result = validate_program(source);
293 assert!(result.is_ok());
294 }
295
296 #[test]
297 fn test_check_syntax_followed_by() {
298 let source = r#"
299 stream Pattern = Events
300 .pattern(p: A -> B)
301 .emit(alert_type: "sequence_match")
302 "#;
303 assert!(check_syntax(source).is_ok());
304 }
305
306 #[test]
307 fn test_check_syntax_event_declaration() {
308 let source = r"
309 event TempReading:
310 sensor_id: str
311 temperature: float
312 ";
313 assert!(check_syntax(source).is_ok());
314 }
315
316 #[test]
317 fn test_check_syntax_function_declaration() {
318 let source = r"
319 fn celsius_to_fahrenheit(c: float) -> float:
320 c * 9.0 / 5.0 + 32.0
321 ";
322 assert!(check_syntax(source).is_ok());
323 }
324
325 #[test]
326 fn test_check_syntax_pattern_matching() {
327 let source = r#"
328 stream PatternMatch = Events
329 .pattern(p: A -> B)
330 .emit(alert_type: "pattern")
331 "#;
332 assert!(check_syntax(source).is_ok());
333 }
334
335 #[test]
336 fn test_check_syntax_merge() {
337 let source = r#"
338 stream Merged = merge(StreamA, StreamB)
339 .emit(alert_type: "merged")
340 "#;
341 assert!(check_syntax(source).is_ok());
342 }
343}