1use std::collections::HashMap;
7use std::sync::Arc;
8use std::time::{Duration, Instant};
9
10use axum::extract::{Json, Path, State};
11use axum::http::StatusCode;
12use axum::response::{IntoResponse, Response};
13use axum::routing::{get, post};
14use axum::Router;
15use serde::{Deserialize, Serialize};
16use tokio::sync::RwLock;
17use uuid::Uuid;
18use varpulis_runtime::event::Event;
19
20const MAX_EVENTS_PER_RUN: usize = 10_000;
26
27const MAX_EXECUTION_SECS: u64 = 10;
29
30const SESSION_EXPIRY: Duration = Duration::from_secs(3600); const REAPER_INTERVAL: Duration = Duration::from_secs(300); const MAX_VPL_LENGTH: usize = 50_000;
38
39pub type SharedPlayground = Arc<RwLock<PlaygroundState>>;
45
46#[derive(Debug)]
48pub struct PlaygroundState {
49 sessions: HashMap<String, PlaygroundSession>,
50}
51
52#[derive(Debug)]
53struct PlaygroundSession {
54 last_active: Instant,
55}
56
57impl Default for PlaygroundState {
58 fn default() -> Self {
59 Self::new()
60 }
61}
62
63impl PlaygroundState {
64 pub fn new() -> Self {
65 Self {
66 sessions: HashMap::new(),
67 }
68 }
69
70 fn get_or_create_session(&mut self, session_id: &str) -> &mut PlaygroundSession {
71 self.sessions
72 .entry(session_id.to_string())
73 .or_insert_with(|| PlaygroundSession {
74 last_active: Instant::now(),
75 })
76 }
77
78 fn reap_expired(&mut self) -> usize {
79 let before = self.sessions.len();
80 self.sessions
81 .retain(|_, s| s.last_active.elapsed() < SESSION_EXPIRY);
82 before - self.sessions.len()
83 }
84}
85
86#[derive(Debug, Serialize)]
91pub struct SessionResponse {
92 pub session_id: String,
93}
94
95#[derive(Debug, Deserialize)]
96pub struct PlaygroundRunRequest {
97 pub vpl: String,
98 #[serde(default)]
100 pub events: String,
101}
102
103#[derive(Debug, Deserialize)]
104pub struct PlaygroundValidateRequest {
105 pub vpl: String,
106}
107
108#[derive(Debug, Serialize)]
109pub struct PlaygroundRunResponse {
110 pub ok: bool,
111 pub events_processed: usize,
112 pub output_events: Vec<serde_json::Value>,
113 pub latency_ms: u64,
114 #[serde(skip_serializing_if = "Vec::is_empty")]
115 pub diagnostics: Vec<PlaygroundDiagnostic>,
116 #[serde(skip_serializing_if = "Option::is_none")]
117 pub error: Option<String>,
118}
119
120#[derive(Debug, Serialize)]
121pub struct PlaygroundValidateResponse {
122 pub ok: bool,
123 #[serde(skip_serializing_if = "Option::is_none")]
124 pub ast: Option<serde_json::Value>,
125 pub diagnostics: Vec<PlaygroundDiagnostic>,
126}
127
128#[derive(Debug, Serialize)]
129pub struct PlaygroundDiagnostic {
130 pub severity: String,
131 pub message: String,
132 #[serde(skip_serializing_if = "Option::is_none")]
133 pub hint: Option<String>,
134 #[serde(skip_serializing_if = "Option::is_none")]
135 pub code: Option<String>,
136 pub start_line: u32,
137 pub start_col: u32,
138 pub end_line: u32,
139 pub end_col: u32,
140}
141
142#[derive(Debug, Serialize)]
143pub struct PlaygroundExample {
144 pub id: String,
145 pub name: String,
146 pub description: String,
147 pub category: String,
148}
149
150#[derive(Debug, Serialize)]
151pub struct PlaygroundExampleDetail {
152 pub id: String,
153 pub name: String,
154 pub description: String,
155 pub category: String,
156 pub vpl: String,
157 pub events: String,
159 pub expected_output_count: Option<usize>,
160}
161
162#[derive(Debug, Serialize)]
163struct PlaygroundError {
164 error: String,
165 code: String,
166}
167
168fn builtin_examples() -> Vec<PlaygroundExampleDetail> {
173 vec![
174 PlaygroundExampleDetail {
175 id: "hvac-alert".into(),
176 name: "HVAC Alert".into(),
177 description: "Simple temperature threshold alert — the 'Hello World' of CEP.".into(),
178 category: "Getting Started".into(),
179 vpl: r#"stream HighTemp = TempReading
180 .where(temperature > 30)
181 .emit(alert: "High temperature detected", sensor: sensor_id, temp: temperature)"#.into(),
182 events: r#"@0s TempReading { sensor_id: "HVAC-01", temperature: 22 }
183@1s TempReading { sensor_id: "HVAC-02", temperature: 35 }
184@2s TempReading { sensor_id: "HVAC-03", temperature: 28 }
185@3s TempReading { sensor_id: "HVAC-01", temperature: 41 }
186@4s TempReading { sensor_id: "HVAC-04", temperature: 19 }
187@5s TempReading { sensor_id: "HVAC-02", temperature: 33 }"#.into(),
188 expected_output_count: Some(3),
189 },
190 PlaygroundExampleDetail {
191 id: "fraud-detection".into(),
192 name: "Fraud Detection".into(),
193 description: "Detect login followed by large transfer within 5 minutes — sequence pattern matching.".into(),
194 category: "Finance".into(),
195 vpl: r#"stream FraudAlert = login as l -> transfer as t .within(5m)
196 .where(l.user_id == t.user_id and t.amount > 5000)
197 .emit(alert: "Suspicious transfer after login", user: l.user_id, amount: t.amount, city: l.city)"#.into(),
198 events: r#"# Alice logs in, then makes a large transfer — triggers alert
199@0s login { user_id: "alice", city: "New York", device: "mobile" }
200@10s transfer { user_id: "bob", amount: 200, to_account: "ext_001" }
201@30s transfer { user_id: "alice", amount: 15000, to_account: "ext_099" }
202# Charlie scenario
203@60s login { user_id: "charlie", city: "London", device: "desktop" }
204@90s transfer { user_id: "charlie", amount: 8500, to_account: "ext_042" }
205@120s transfer { user_id: "alice", amount: 100, to_account: "ext_005" }"#.into(),
206 expected_output_count: None,
207 },
208 PlaygroundExampleDetail {
209 id: "iot-anomaly".into(),
210 name: "IoT Sensor Anomaly".into(),
211 description: "Detect temperature spikes in sensor data using window aggregation.".into(),
212 category: "IoT".into(),
213 vpl: r#"stream TempSpike = sensor_reading
214 .where(temperature > 50)
215 .emit(alert: "Temperature spike", sensor: sensor_id, temp: temperature, zone: zone)"#.into(),
216 events: r#"@0s sensor_reading { sensor_id: "S001", zone: "zone_a", temperature: 22.5 }
217@1s sensor_reading { sensor_id: "S002", zone: "zone_b", temperature: 65.3 }
218@2s sensor_reading { sensor_id: "S001", zone: "zone_a", temperature: 23.1 }
219@3s sensor_reading { sensor_id: "S003", zone: "zone_c", temperature: 55.0 }
220@4s sensor_reading { sensor_id: "S002", zone: "zone_b", temperature: 24.0 }"#.into(),
221 expected_output_count: Some(2),
222 },
223 PlaygroundExampleDetail {
224 id: "trading-signal".into(),
225 name: "Trading Signal".into(),
226 description: "Detect large trades on a single symbol — volume spike alert.".into(),
227 category: "Finance".into(),
228 vpl: r#"stream VolumeSpike = trade
229 .where(volume > 10000)
230 .emit(alert: "Large trade detected", symbol: symbol, vol: volume, price: price, side: side)"#.into(),
231 events: r#"@0s trade { symbol: "AAPL", price: 185.50, volume: 500, side: "buy" }
232@1s trade { symbol: "GOOGL", price: 142.30, volume: 25000, side: "sell" }
233@2s trade { symbol: "AAPL", price: 185.60, volume: 15000, side: "buy" }
234@3s trade { symbol: "TSLA", price: 250.10, volume: 800, side: "sell" }
235@4s trade { symbol: "MSFT", price: 420.00, volume: 50000, side: "buy" }"#.into(),
236 expected_output_count: Some(3),
237 },
238 PlaygroundExampleDetail {
239 id: "cyber-killchain".into(),
240 name: "Cyber Kill Chain".into(),
241 description: "Detect a 3-stage attack sequence: scan → exploit → exfiltrate within 10 minutes.".into(),
242 category: "Security".into(),
243 vpl: r#"stream KillChain = scan as s -> exploit as e -> exfiltrate as x .within(10m)
244 .where(s.target_ip == e.target_ip and e.target_ip == x.source_ip)
245 .emit(alert: "Kill chain detected", target: s.target_ip, attacker: s.source_ip)"#.into(),
246 events: r#"@0s scan { source_ip: "10.0.0.5", target_ip: "192.168.1.100", port: 443 }
247@30s exploit { source_ip: "10.0.0.5", target_ip: "192.168.1.100", cve: "CVE-2024-1234" }
248@60s exfiltrate { source_ip: "192.168.1.100", dest_ip: "10.0.0.5", bytes: 50000000 }
249@120s scan { source_ip: "10.0.0.9", target_ip: "192.168.1.200", port: 80 }
250@180s login { user_id: "admin", ip: "192.168.1.200" }"#.into(),
251 expected_output_count: None,
252 },
253 PlaygroundExampleDetail {
254 id: "kleene-pattern".into(),
255 name: "Kleene Pattern".into(),
256 description: "Match one or more failed logins followed by a successful login — brute force detection.".into(),
257 category: "Security".into(),
258 vpl: r#"stream BruteForce = failed_login+ as fails -> successful_login as success .within(5m)
259 .where(fails.user_id == success.user_id)
260 .emit(alert: "Possible brute force", user: success.user_id)"#.into(),
261 events: r#"@0s failed_login { user_id: "admin", ip: "10.0.0.5" }
262@1s failed_login { user_id: "admin", ip: "10.0.0.5" }
263@2s failed_login { user_id: "admin", ip: "10.0.0.5" }
264@3s successful_login { user_id: "admin", ip: "10.0.0.5" }
265@4s successful_login { user_id: "bob", ip: "10.0.0.9" }"#.into(),
266 expected_output_count: None,
267 },
268 PlaygroundExampleDetail {
269 id: "merge-stream".into(),
270 name: "Merge Streams".into(),
271 description: "Combine events from multiple sources into a single alert stream.".into(),
272 category: "Getting Started".into(),
273 vpl: r#"stream TempAlerts = TempReading
274 .where(temperature > 30)
275 .emit(alert: "High temp", source: "temp", value: temperature)
276
277stream HumidAlerts = HumidityReading
278 .where(humidity > 80)
279 .emit(alert: "High humidity", source: "humidity", value: humidity)
280
281stream AllAlerts = merge(TempAlerts, HumidAlerts)
282 .emit()"#.into(),
283 events: r#"@0s TempReading { sensor_id: "S1", temperature: 35 }
284@1s HumidityReading { sensor_id: "S2", humidity: 85 }
285@2s TempReading { sensor_id: "S3", temperature: 22 }
286@3s HumidityReading { sensor_id: "S4", humidity: 45 }
287@4s TempReading { sensor_id: "S5", temperature: 38 }"#.into(),
288 expected_output_count: None,
289 },
290 PlaygroundExampleDetail {
291 id: "forecast-fraud".into(),
292 name: "Fraud Forecasting".into(),
293 description: "Predict fraud patterns using PST-based forecasting — sequence prediction with confidence scores.".into(),
294 category: "Advanced".into(),
295 vpl: r"stream FraudForecast = login as l -> transfer as t .within(5m)
296 .forecast(confidence: 0.7, horizon: 2m, warmup: 50, max_depth: 3)
297 .where(forecast_probability > 0.5)
298 .emit(probability: forecast_probability, state: forecast_state)".into(),
299 events: {
300 let mut lines = Vec::new();
302 for i in 0..60 {
303 let t = i * 2;
304 lines.push(format!(
305 "@{}s login {{ user_id: \"user_{}\", city: \"NYC\" }}",
306 t,
307 i % 10
308 ));
309 lines.push(format!(
310 "@{}s transfer {{ user_id: \"user_{}\", amount: {} }}",
311 t + 1,
312 i % 10,
313 100 + i * 10
314 ));
315 }
316 lines.push("@120s login { user_id: \"user_0\", city: \"NYC\" }".into());
318 lines.join("\n")
319 },
320 expected_output_count: None, },
322 ]
323}
324
325pub fn playground_routes(playground: SharedPlayground) -> Router {
330 Router::new()
331 .route("/api/v1/playground/session", post(handle_create_session))
332 .route(
333 "/api/v1/playground/run",
334 post(handle_run).layer(tower_http::limit::RequestBodyLimitLayer::new(1024 * 1024)),
335 )
336 .route(
337 "/api/v1/playground/validate",
338 post(handle_validate).layer(tower_http::limit::RequestBodyLimitLayer::new(256 * 1024)),
339 )
340 .route("/api/v1/playground/examples", get(handle_list_examples))
341 .route("/api/v1/playground/examples/{id}", get(handle_get_example))
342 .with_state(playground)
343}
344
345async fn handle_create_session(State(playground): State<SharedPlayground>) -> impl IntoResponse {
350 let session_id = Uuid::new_v4().to_string();
351 {
352 let mut pg = playground.write().await;
353 pg.get_or_create_session(&session_id);
354 }
355 let resp = SessionResponse { session_id };
356 (StatusCode::CREATED, Json(resp))
357}
358
359async fn handle_run(
360 State(playground): State<SharedPlayground>,
361 Json(body): Json<PlaygroundRunRequest>,
362) -> Response {
363 if body.vpl.len() > MAX_VPL_LENGTH {
365 return pg_error_response(
366 StatusCode::BAD_REQUEST,
367 "vpl_too_large",
368 &format!("VPL source exceeds maximum size of {MAX_VPL_LENGTH} bytes"),
369 );
370 }
371
372 {
374 let mut pg = playground.write().await;
375 let session_id = Uuid::new_v4().to_string();
376 pg.get_or_create_session(&session_id);
377 }
378
379 let start = Instant::now();
380
381 let timed_events = match varpulis_runtime::event_file::EventFileParser::parse(&body.events) {
383 Ok(te) => te,
384 Err(e) => {
385 let resp = PlaygroundRunResponse {
386 ok: false,
387 events_processed: 0,
388 output_events: vec![],
389 latency_ms: 0,
390 diagnostics: vec![],
391 error: Some(format!("Event parse error: {e}")),
392 };
393 return (StatusCode::OK, Json(resp)).into_response();
394 }
395 };
396
397 if timed_events.len() > MAX_EVENTS_PER_RUN {
398 return pg_error_response(
399 StatusCode::BAD_REQUEST,
400 "too_many_events",
401 &format!("Maximum {MAX_EVENTS_PER_RUN} events per run"),
402 );
403 }
404
405 let events: Vec<Event> = timed_events.into_iter().map(|te| te.event).collect();
406 let event_count = events.len();
407
408 let run_result = tokio::time::timeout(
410 Duration::from_secs(MAX_EXECUTION_SECS),
411 crate::simulate_from_source(&body.vpl, events),
412 )
413 .await;
414
415 let latency_ms = start.elapsed().as_millis() as u64;
416
417 match run_result {
418 Ok(Ok(output_events)) => {
419 let output: Vec<serde_json::Value> = output_events
420 .iter()
421 .map(|e| {
422 let mut flat = serde_json::Map::new();
423 flat.insert(
424 "event_type".to_string(),
425 serde_json::Value::String(e.event_type.to_string()),
426 );
427 for (k, v) in &e.data {
428 flat.insert(k.to_string(), crate::websocket::value_to_json(v));
429 }
430 serde_json::Value::Object(flat)
431 })
432 .collect();
433
434 let resp = PlaygroundRunResponse {
435 ok: true,
436 events_processed: event_count,
437 output_events: output,
438 latency_ms,
439 diagnostics: vec![],
440 error: None,
441 };
442 (StatusCode::OK, Json(resp)).into_response()
443 }
444 Ok(Err(e)) => {
445 let resp = PlaygroundRunResponse {
446 ok: false,
447 events_processed: 0,
448 output_events: vec![],
449 latency_ms,
450 diagnostics: vec![],
451 error: Some(e.to_string()),
452 };
453 (StatusCode::OK, Json(resp)).into_response()
454 }
455 Err(_timeout) => {
456 let resp = PlaygroundRunResponse {
457 ok: false,
458 events_processed: 0,
459 output_events: vec![],
460 latency_ms,
461 diagnostics: vec![],
462 error: Some(format!("Execution timed out after {MAX_EXECUTION_SECS}s")),
463 };
464 (StatusCode::REQUEST_TIMEOUT, Json(resp)).into_response()
465 }
466 }
467}
468
469async fn handle_validate(
470 State(_playground): State<SharedPlayground>,
471 Json(body): Json<PlaygroundValidateRequest>,
472) -> Response {
473 if body.vpl.len() > MAX_VPL_LENGTH {
474 return pg_error_response(
475 StatusCode::BAD_REQUEST,
476 "vpl_too_large",
477 &format!("VPL source exceeds maximum size of {MAX_VPL_LENGTH} bytes"),
478 );
479 }
480
481 let result = match varpulis_parser::parse(&body.vpl) {
482 Ok(program) => {
483 let ast = serde_json::to_value(&program).ok();
484 let validation = varpulis_core::validate::validate(&body.vpl, &program);
485 let diagnostics: Vec<PlaygroundDiagnostic> = validation
486 .diagnostics
487 .iter()
488 .map(|d| {
489 let (sl, sc) = position_to_line_col(&body.vpl, d.span.start);
490 let (el, ec) = position_to_line_col(&body.vpl, d.span.end);
491 PlaygroundDiagnostic {
492 severity: match d.severity {
493 varpulis_core::validate::Severity::Error => "error".into(),
494 varpulis_core::validate::Severity::Warning => "warning".into(),
495 },
496 message: d.message.clone(),
497 hint: d.hint.clone(),
498 code: d.code.map(|c| c.to_string()),
499 start_line: sl as u32,
500 start_col: sc as u32,
501 end_line: el as u32,
502 end_col: ec as u32,
503 }
504 })
505 .collect();
506 let has_errors = diagnostics.iter().any(|d| d.severity == "error");
507 PlaygroundValidateResponse {
508 ok: !has_errors,
509 ast,
510 diagnostics,
511 }
512 }
513 Err(error) => {
514 let diag = parse_error_to_diagnostic(&body.vpl, &error);
515 PlaygroundValidateResponse {
516 ok: false,
517 ast: None,
518 diagnostics: vec![diag],
519 }
520 }
521 };
522
523 (StatusCode::OK, Json(result)).into_response()
524}
525
526async fn handle_list_examples() -> impl IntoResponse {
527 let examples: Vec<PlaygroundExample> = builtin_examples()
528 .into_iter()
529 .map(|e| PlaygroundExample {
530 id: e.id,
531 name: e.name,
532 description: e.description,
533 category: e.category,
534 })
535 .collect();
536 (StatusCode::OK, Json(examples))
537}
538
539async fn handle_get_example(Path(id): Path<String>) -> Response {
540 let examples = builtin_examples();
541 match examples.into_iter().find(|e| e.id == id) {
542 Some(example) => (StatusCode::OK, Json(example)).into_response(),
543 None => pg_error_response(
544 StatusCode::NOT_FOUND,
545 "example_not_found",
546 &format!("Example '{id}' not found"),
547 ),
548 }
549}
550
551pub fn spawn_session_reaper(playground: SharedPlayground) {
557 tokio::spawn(async move {
558 loop {
559 tokio::time::sleep(REAPER_INTERVAL).await;
560 let mut pg = playground.write().await;
561 let reaped = pg.reap_expired();
562 if reaped > 0 {
563 tracing::debug!("Playground: reaped {} expired sessions", reaped);
564 }
565 }
566 });
567}
568
569fn pg_error_response(status: StatusCode, code: &str, message: &str) -> Response {
574 let body = PlaygroundError {
575 error: message.to_string(),
576 code: code.to_string(),
577 };
578 (status, Json(body)).into_response()
579}
580
581fn parse_error_to_diagnostic(
582 source: &str,
583 error: &varpulis_parser::ParseError,
584) -> PlaygroundDiagnostic {
585 use varpulis_parser::ParseError;
586 match error {
587 ParseError::Located {
588 line,
589 column,
590 message,
591 hint,
592 ..
593 } => PlaygroundDiagnostic {
594 severity: "error".into(),
595 message: message.clone(),
596 hint: hint.clone(),
597 code: None,
598 start_line: line.saturating_sub(1) as u32,
599 start_col: column.saturating_sub(1) as u32,
600 end_line: line.saturating_sub(1) as u32,
601 end_col: *column as u32,
602 },
603 ParseError::UnexpectedToken {
604 position,
605 expected,
606 found,
607 } => {
608 let (line, col) = position_to_line_col(source, *position);
609 PlaygroundDiagnostic {
610 severity: "error".into(),
611 message: format!("Unexpected token: expected {expected}, found '{found}'"),
612 hint: None,
613 code: None,
614 start_line: line as u32,
615 start_col: col as u32,
616 end_line: line as u32,
617 end_col: (col + found.len()) as u32,
618 }
619 }
620 ParseError::UnexpectedEof => {
621 let line = source.lines().count().saturating_sub(1);
622 let col = source.lines().last().map_or(0, |l| l.len());
623 PlaygroundDiagnostic {
624 severity: "error".into(),
625 message: "Unexpected end of input".into(),
626 hint: None,
627 code: None,
628 start_line: line as u32,
629 start_col: col as u32,
630 end_line: line as u32,
631 end_col: col as u32,
632 }
633 }
634 ParseError::InvalidToken { position, message } => {
635 let (line, col) = position_to_line_col(source, *position);
636 PlaygroundDiagnostic {
637 severity: "error".into(),
638 message: message.clone(),
639 hint: None,
640 code: None,
641 start_line: line as u32,
642 start_col: col as u32,
643 end_line: line as u32,
644 end_col: (col + 10) as u32,
645 }
646 }
647 ParseError::InvalidNumber(msg)
648 | ParseError::InvalidDuration(msg)
649 | ParseError::InvalidTimestamp(msg)
650 | ParseError::InvalidEscape(msg) => PlaygroundDiagnostic {
651 severity: "error".into(),
652 message: msg.clone(),
653 hint: None,
654 code: None,
655 start_line: 0,
656 start_col: 0,
657 end_line: 0,
658 end_col: 0,
659 },
660 ParseError::UnterminatedString(position) => {
661 let (line, col) = position_to_line_col(source, *position);
662 PlaygroundDiagnostic {
663 severity: "error".into(),
664 message: "Unterminated string literal".into(),
665 hint: None,
666 code: None,
667 start_line: line as u32,
668 start_col: col as u32,
669 end_line: line as u32,
670 end_col: source.lines().nth(line).map_or(col, |l| l.len()) as u32,
671 }
672 }
673 ParseError::Custom { span, message } => {
674 let (sl, sc) = position_to_line_col(source, span.start);
675 let (el, ec) = position_to_line_col(source, span.end);
676 PlaygroundDiagnostic {
677 severity: "error".into(),
678 message: message.clone(),
679 hint: None,
680 code: None,
681 start_line: sl as u32,
682 start_col: sc as u32,
683 end_line: el as u32,
684 end_col: ec as u32,
685 }
686 }
687 }
688}
689
690fn position_to_line_col(source: &str, position: usize) -> (usize, usize) {
691 let mut line = 0;
692 let mut col = 0;
693 let mut pos = 0;
694
695 for ch in source.chars() {
696 if pos >= position {
697 break;
698 }
699 if ch == '\n' {
700 line += 1;
701 col = 0;
702 } else {
703 col += 1;
704 }
705 pos += ch.len_utf8();
706 }
707
708 (line, col)
709}
710
711#[cfg(test)]
712mod tests {
713 use super::*;
714
715 #[test]
716 fn test_builtin_examples_valid() {
717 let examples = builtin_examples();
718 assert!(!examples.is_empty());
719 for example in &examples {
720 assert!(!example.id.is_empty());
721 assert!(!example.name.is_empty());
722 assert!(!example.vpl.is_empty());
723 assert!(!example.events.is_empty());
724 }
725 }
726
727 #[test]
728 fn test_builtin_examples_unique_ids() {
729 let examples = builtin_examples();
730 let mut ids: Vec<&str> = examples.iter().map(|e| e.id.as_str()).collect();
731 ids.sort_unstable();
732 ids.dedup();
733 assert_eq!(ids.len(), examples.len(), "Duplicate example IDs found");
734 }
735
736 #[test]
737 fn test_session_reaping() {
738 let mut state = PlaygroundState::new();
739 state.get_or_create_session("test-1");
740 state.get_or_create_session("test-2");
741 assert_eq!(state.sessions.len(), 2);
742
743 let reaped = state.reap_expired();
745 assert_eq!(reaped, 0);
746 assert_eq!(state.sessions.len(), 2);
747 }
748
749 #[test]
750 fn test_validate_response_for_valid_vpl() {
751 let vpl = r#"
752event SensorReading:
753 temperature: int
754
755stream HighTemp = SensorReading
756 .where(temperature > 30)
757 .emit(alert: "high_temp")
758"#;
759 match varpulis_parser::parse(vpl) {
760 Ok(program) => {
761 let validation = varpulis_core::validate::validate(vpl, &program);
762 let has_errors = validation
763 .diagnostics
764 .iter()
765 .any(|d| d.severity == varpulis_core::validate::Severity::Error);
766 assert!(!has_errors);
767 }
768 Err(e) => panic!("Parse failed: {e}"),
769 }
770 }
771
772 #[test]
773 fn test_evt_format_parsing() {
774 let evt_text = r#"sensor_reading { sensor_id: "S001", temperature: 65.3 }"#;
775 let timed = varpulis_runtime::event_file::EventFileParser::parse(evt_text).unwrap();
776 assert_eq!(timed.len(), 1);
777 assert_eq!(timed[0].event.event_type.as_ref(), "sensor_reading");
778 }
779
780 #[tokio::test]
781 async fn test_iot_anomaly_example_produces_matches() {
782 let examples = builtin_examples();
783 let iot = examples.iter().find(|e| e.id == "iot-anomaly").unwrap();
784
785 let timed = varpulis_runtime::event_file::EventFileParser::parse(&iot.events).unwrap();
786 let events: Vec<varpulis_runtime::event::Event> =
787 timed.into_iter().map(|te| te.event).collect();
788
789 let results = crate::simulate_from_source(&iot.vpl, events).await.unwrap();
790 assert_eq!(
791 results.len(),
792 iot.expected_output_count.unwrap(),
793 "IoT anomaly example should produce {} matches, got {}",
794 iot.expected_output_count.unwrap(),
795 results.len()
796 );
797 }
798
799 #[tokio::test]
800 async fn test_all_examples_with_expected_count() {
801 for example in builtin_examples() {
802 let Some(expected) = example.expected_output_count else {
803 continue;
804 };
805
806 let timed = varpulis_runtime::event_file::EventFileParser::parse(&example.events)
807 .unwrap_or_else(|e| panic!("Event parse failed for '{}': {e}", example.id));
808 let events: Vec<varpulis_runtime::event::Event> =
809 timed.into_iter().map(|te| te.event).collect();
810
811 let results = crate::simulate_from_source(&example.vpl, events)
812 .await
813 .unwrap_or_else(|e| panic!("Run failed for '{}': {e}", example.id));
814 assert_eq!(
815 results.len(),
816 expected,
817 "Example '{}' expected {} matches, got {}",
818 example.id,
819 expected,
820 results.len()
821 );
822 }
823 }
824}