1#![allow(missing_docs)]
2#[cfg(feature = "saas")]
8pub mod admin;
9pub mod api;
10pub mod audit;
11pub mod auth;
12pub mod billing;
13pub mod client;
14pub mod config;
15#[cfg(feature = "saas")]
16pub mod email;
17pub mod oauth;
18pub mod oidc;
19#[cfg(feature = "saas")]
20pub mod org;
21pub mod output;
22pub mod playground;
23pub mod rate_limit;
24pub mod security;
25#[cfg(feature = "saas")]
26pub mod tenant_context;
27pub mod users;
28pub mod websocket;
29
30use std::path::PathBuf;
31
32use anyhow::Result;
33use varpulis_core::ast::{Program, Stmt};
34use varpulis_parser::parse;
35
36pub fn check_syntax(source: &str) -> Result<()> {
38 match parse(source) {
39 Ok(program) => {
40 println!("Syntax OK ({} statements)", program.statements.len());
41 Ok(())
42 }
43 Err(e) => {
44 println!("Syntax error: {e}");
45 Err(anyhow::anyhow!("Parse error: {e}"))
46 }
47 }
48}
49
50pub fn parse_program(source: &str) -> Result<usize> {
52 let program = parse(source).map_err(|e| anyhow::anyhow!("Parse error: {e}"))?;
53 Ok(program.statements.len())
54}
55
56pub fn validate_program(source: &str) -> Result<usize> {
58 let program = parse(source).map_err(|e| anyhow::anyhow!("Parse error: {e}"))?;
59 let statement_count = program.statements.len();
60
61 let (output_tx, _output_rx) = tokio::sync::mpsc::channel(100);
63 let mut engine = varpulis_runtime::engine::Engine::new(output_tx);
64 engine
65 .load(&program)
66 .map_err(|e| anyhow::anyhow!("Load error: {e}"))?;
67
68 Ok(statement_count)
69}
70
71pub const MAX_IMPORT_DEPTH: usize = 10;
77
78pub fn resolve_imports(program: &mut Program, base_path: Option<&PathBuf>) -> Result<()> {
83 use std::collections::HashSet;
84 let mut visited = HashSet::new();
85 resolve_imports_inner(program, base_path, 0, &mut visited)
86}
87
88fn resolve_imports_inner(
89 program: &mut Program,
90 base_path: Option<&PathBuf>,
91 depth: usize,
92 visited: &mut std::collections::HashSet<PathBuf>,
93) -> Result<()> {
94 if depth > MAX_IMPORT_DEPTH {
95 anyhow::bail!(
96 "Import depth limit exceeded (max {MAX_IMPORT_DEPTH}). Check for circular imports."
97 );
98 }
99
100 let mut imported_statements = Vec::new();
101 let mut imports_to_process = Vec::new();
102
103 for stmt in &program.statements {
104 if let Stmt::Import { path, .. } = &stmt.node {
105 imports_to_process.push(path.clone());
106 }
107 }
108
109 for import_path in imports_to_process {
110 let full_path = if let Some(base) = base_path {
111 base.join(&import_path)
112 } else {
113 PathBuf::from(&import_path)
114 };
115
116 let canonical_path = full_path.canonicalize().map_err(|e| {
117 anyhow::anyhow!("Failed to resolve import '{}': {}", full_path.display(), e)
118 })?;
119
120 if visited.contains(&canonical_path) {
121 continue;
122 }
123 visited.insert(canonical_path.clone());
124
125 let import_source = std::fs::read_to_string(&full_path).map_err(|e| {
126 anyhow::anyhow!("Failed to read import '{}': {}", full_path.display(), e)
127 })?;
128
129 let import_program = parse(&import_source).map_err(|e| {
130 anyhow::anyhow!("Parse error in import '{}': {}", full_path.display(), e)
131 })?;
132
133 let import_base = full_path.parent().map(|p| p.to_path_buf());
134 let mut imported = import_program;
135 resolve_imports_inner(&mut imported, import_base.as_ref(), depth + 1, visited)?;
136
137 imported_statements.extend(imported.statements);
138 }
139
140 program
141 .statements
142 .retain(|stmt| !matches!(&stmt.node, Stmt::Import { .. }));
143
144 let mut new_statements = imported_statements;
145 new_statements.append(&mut program.statements);
146 program.statements = new_statements;
147
148 Ok(())
149}
150
151pub async fn simulate_from_source(
160 vpl: &str,
161 events: Vec<varpulis_runtime::event::Event>,
162) -> Result<Vec<varpulis_runtime::event::Event>> {
163 use varpulis_runtime::event::Event;
164
165 let program = parse(vpl).map_err(|e| anyhow::anyhow!("Parse error: {e}"))?;
166
167 let (output_tx, mut output_rx) = tokio::sync::mpsc::channel::<Event>(10_000);
168 let mut engine = varpulis_runtime::engine::Engine::new(output_tx);
169 engine
170 .load(&program)
171 .map_err(|e| anyhow::anyhow!("Load error: {e}"))?;
172
173 engine
174 .process_batch_sync(events)
175 .map_err(|e| anyhow::anyhow!("Process error: {e}"))?;
176
177 drop(engine);
179
180 let mut results = Vec::new();
181 while let Ok(event) = output_rx.try_recv() {
182 results.push(event);
183 }
184
185 Ok(results)
186}
187
188pub fn parse_duration_str(s: &str) -> Result<u64> {
193 let s = s.trim();
194 if s.is_empty() {
195 anyhow::bail!("Empty duration string");
196 }
197 let (num_part, suffix) = if let Some(stripped) = s.strip_suffix('s') {
198 (stripped, "s")
199 } else if let Some(stripped) = s.strip_suffix('m') {
200 (stripped, "m")
201 } else if let Some(stripped) = s.strip_suffix('h') {
202 (stripped, "h")
203 } else {
204 (s, "s")
205 };
206 let value: u64 = num_part
207 .parse()
208 .map_err(|_| anyhow::anyhow!("Invalid duration number: '{num_part}'"))?;
209 match suffix {
210 "s" => Ok(value),
211 "m" => Ok(value * 60),
212 "h" => Ok(value * 3600),
213 _ => anyhow::bail!("Unknown duration suffix: '{suffix}'"),
214 }
215}
216
217#[cfg(test)]
218mod tests {
219 use super::*;
220
221 #[test]
222 fn test_check_syntax_valid() {
223 let source = r#"
224 stream HighTemp = TempReading
225 .where(temperature > 30)
226 .emit(alert_type: "high_temp")
227 "#;
228 assert!(check_syntax(source).is_ok());
229 }
230
231 #[test]
232 fn test_check_syntax_invalid() {
233 let source = r"
234 stream Invalid =
235 .where(
236 ";
237 assert!(check_syntax(source).is_err());
238 }
239
240 #[test]
241 fn test_parse_program_valid() {
242 let source = r"
243 stream Test = Events
244 .where(value > 10)
245 ";
246 let result = parse_program(source);
247 assert!(result.is_ok());
248 assert_eq!(result.expect("should succeed"), 1);
249 }
250
251 #[test]
252 fn test_parse_program_invalid() {
253 let source = "stream x = (";
255 assert!(parse_program(source).is_err());
256 }
257
258 #[tokio::test]
259 async fn test_validate_program_simple() {
260 let source = r#"
261 stream Simple = Events
262 .where(x > 0)
263 .emit(alert_type: "test")
264 "#;
265 let result = validate_program(source);
266 assert!(result.is_ok());
267 assert_eq!(result.expect("should succeed"), 1);
268 }
269
270 #[tokio::test]
271 async fn test_validate_program_multiple_streams() {
272 let source = r#"
273 stream A = Events
274 .where(event_type == "a")
275 .emit(alert_type: "a")
276
277 stream B = Events
278 .where(event_type == "b")
279 .emit(alert_type: "b")
280 "#;
281 let result = validate_program(source);
282 assert!(result.is_ok());
283 assert_eq!(result.expect("should succeed"), 2);
284 }
285
286 #[tokio::test]
287 async fn test_validate_program_with_filter() {
288 let source = r#"
289 stream Filtered = Metrics
290 .where(value > 100)
291 .emit(alert_type: "high_value")
292 "#;
293 let result = validate_program(source);
294 assert!(result.is_ok());
295 }
296
297 #[test]
298 fn test_check_syntax_followed_by() {
299 let source = r#"
300 stream Pattern = Events
301 .pattern(p: A -> B)
302 .emit(alert_type: "sequence_match")
303 "#;
304 assert!(check_syntax(source).is_ok());
305 }
306
307 #[test]
308 fn test_check_syntax_event_declaration() {
309 let source = r"
310 event TempReading:
311 sensor_id: str
312 temperature: float
313 ";
314 assert!(check_syntax(source).is_ok());
315 }
316
317 #[test]
318 fn test_check_syntax_function_declaration() {
319 let source = r"
320 fn celsius_to_fahrenheit(c: float) -> float:
321 c * 9.0 / 5.0 + 32.0
322 ";
323 assert!(check_syntax(source).is_ok());
324 }
325
326 #[test]
327 fn test_check_syntax_pattern_matching() {
328 let source = r#"
329 stream PatternMatch = Events
330 .pattern(p: A -> B)
331 .emit(alert_type: "pattern")
332 "#;
333 assert!(check_syntax(source).is_ok());
334 }
335
336 #[test]
337 fn test_check_syntax_merge() {
338 let source = r#"
339 stream Merged = merge(StreamA, StreamB)
340 .emit(alert_type: "merged")
341 "#;
342 assert!(check_syntax(source).is_ok());
343 }
344}