1use std::collections::HashMap;
7use std::sync::Arc;
8use std::time::{Duration, Instant};
9
10use axum::extract::{Json, Path, State};
11use axum::http::StatusCode;
12use axum::response::{IntoResponse, Response};
13use axum::routing::{get, post};
14use axum::Router;
15use serde::{Deserialize, Serialize};
16use tokio::sync::RwLock;
17use uuid::Uuid;
18use varpulis_runtime::event::Event;
19
20const MAX_EVENTS_PER_RUN: usize = 10_000;
26
27const MAX_EXECUTION_SECS: u64 = 10;
29
30const SESSION_EXPIRY: Duration = Duration::from_secs(3600); const REAPER_INTERVAL: Duration = Duration::from_secs(300); const MAX_VPL_LENGTH: usize = 50_000;
38
39pub type SharedPlayground = Arc<RwLock<PlaygroundState>>;
45
46#[derive(Debug)]
48pub struct PlaygroundState {
49 sessions: HashMap<String, PlaygroundSession>,
50}
51
52#[derive(Debug)]
53struct PlaygroundSession {
54 last_active: Instant,
55}
56
57impl Default for PlaygroundState {
58 fn default() -> Self {
59 Self::new()
60 }
61}
62
63impl PlaygroundState {
64 pub fn new() -> Self {
65 Self {
66 sessions: HashMap::new(),
67 }
68 }
69
70 fn get_or_create_session(&mut self, session_id: &str) -> &mut PlaygroundSession {
71 self.sessions
72 .entry(session_id.to_string())
73 .or_insert_with(|| PlaygroundSession {
74 last_active: Instant::now(),
75 })
76 }
77
78 fn reap_expired(&mut self) -> usize {
79 let before = self.sessions.len();
80 self.sessions
81 .retain(|_, s| s.last_active.elapsed() < SESSION_EXPIRY);
82 before - self.sessions.len()
83 }
84}
85
86#[derive(Debug, Serialize)]
91pub struct SessionResponse {
92 pub session_id: String,
93}
94
95#[derive(Debug, Deserialize)]
96pub struct PlaygroundRunRequest {
97 pub vpl: String,
98 #[serde(default)]
100 pub events: String,
101}
102
103#[derive(Debug, Deserialize)]
104pub struct PlaygroundValidateRequest {
105 pub vpl: String,
106}
107
108#[derive(Debug, Serialize)]
109pub struct PlaygroundRunResponse {
110 pub ok: bool,
111 pub events_processed: usize,
112 pub output_events: Vec<serde_json::Value>,
113 pub latency_ms: u64,
114 #[serde(skip_serializing_if = "Vec::is_empty")]
115 pub diagnostics: Vec<PlaygroundDiagnostic>,
116 #[serde(skip_serializing_if = "Option::is_none")]
117 pub error: Option<String>,
118}
119
120#[derive(Debug, Serialize)]
121pub struct PlaygroundValidateResponse {
122 pub ok: bool,
123 #[serde(skip_serializing_if = "Option::is_none")]
124 pub ast: Option<serde_json::Value>,
125 pub diagnostics: Vec<PlaygroundDiagnostic>,
126}
127
128#[derive(Debug, Serialize)]
129pub struct PlaygroundDiagnostic {
130 pub severity: String,
131 pub message: String,
132 #[serde(skip_serializing_if = "Option::is_none")]
133 pub hint: Option<String>,
134 #[serde(skip_serializing_if = "Option::is_none")]
135 pub code: Option<String>,
136 pub start_line: u32,
137 pub start_col: u32,
138 pub end_line: u32,
139 pub end_col: u32,
140}
141
142#[derive(Debug, Serialize)]
143pub struct PlaygroundExample {
144 pub id: String,
145 pub name: String,
146 pub description: String,
147 pub category: String,
148}
149
150#[derive(Debug, Serialize)]
151pub struct PlaygroundExampleDetail {
152 pub id: String,
153 pub name: String,
154 pub description: String,
155 pub category: String,
156 pub vpl: String,
157 pub events: String,
159 pub expected_output_count: Option<usize>,
160}
161
162#[derive(Debug, Serialize)]
163struct PlaygroundError {
164 error: String,
165 code: String,
166}
167
168fn builtin_examples() -> Vec<PlaygroundExampleDetail> {
173 vec![
174 PlaygroundExampleDetail {
175 id: "hvac-alert".into(),
176 name: "HVAC Alert".into(),
177 description: "Simple temperature threshold alert — the 'Hello World' of CEP.".into(),
178 category: "Getting Started".into(),
179 vpl: r#"stream HighTemp = TempReading
180 .where(temperature > 30)
181 .emit(alert: "High temperature detected", sensor: sensor_id, temp: temperature)"#.into(),
182 events: r#"@0s TempReading { sensor_id: "HVAC-01", temperature: 22 }
183@1s TempReading { sensor_id: "HVAC-02", temperature: 35 }
184@2s TempReading { sensor_id: "HVAC-03", temperature: 28 }
185@3s TempReading { sensor_id: "HVAC-01", temperature: 41 }
186@4s TempReading { sensor_id: "HVAC-04", temperature: 19 }
187@5s TempReading { sensor_id: "HVAC-02", temperature: 33 }"#.into(),
188 expected_output_count: Some(3),
189 },
190 PlaygroundExampleDetail {
191 id: "fraud-detection".into(),
192 name: "Fraud Detection".into(),
193 description: "Detect login followed by large transfer within 5 minutes — sequence pattern matching.".into(),
194 category: "Finance".into(),
195 vpl: r#"stream FraudAlert = login as l -> transfer as t .within(5m)
196 .where(l.user_id == t.user_id and t.amount > 5000)
197 .emit(alert: "Suspicious transfer after login", user: l.user_id, amount: t.amount, city: l.city)"#.into(),
198 events: r#"# Alice logs in, then makes a large transfer — triggers alert
199@0s login { user_id: "alice", city: "New York", device: "mobile" }
200@10s transfer { user_id: "bob", amount: 200, to_account: "ext_001" }
201@30s transfer { user_id: "alice", amount: 15000, to_account: "ext_099" }
202# Charlie scenario
203@60s login { user_id: "charlie", city: "London", device: "desktop" }
204@90s transfer { user_id: "charlie", amount: 8500, to_account: "ext_042" }
205@120s transfer { user_id: "alice", amount: 100, to_account: "ext_005" }"#.into(),
206 expected_output_count: None,
207 },
208 PlaygroundExampleDetail {
209 id: "iot-anomaly".into(),
210 name: "IoT Sensor Anomaly".into(),
211 description: "Detect temperature spikes in sensor data using window aggregation.".into(),
212 category: "IoT".into(),
213 vpl: r#"stream TempSpike = sensor_reading
214 .where(temperature > 50)
215 .emit(alert: "Temperature spike", sensor: sensor_id, temp: temperature, zone: zone)"#.into(),
216 events: r#"@0s sensor_reading { sensor_id: "S001", zone: "zone_a", temperature: 22.5 }
217@1s sensor_reading { sensor_id: "S002", zone: "zone_b", temperature: 65.3 }
218@2s sensor_reading { sensor_id: "S001", zone: "zone_a", temperature: 23.1 }
219@3s sensor_reading { sensor_id: "S003", zone: "zone_c", temperature: 55.0 }
220@4s sensor_reading { sensor_id: "S002", zone: "zone_b", temperature: 24.0 }"#.into(),
221 expected_output_count: Some(2),
222 },
223 PlaygroundExampleDetail {
224 id: "trading-signal".into(),
225 name: "Trading Signal".into(),
226 description: "Detect large trades on a single symbol — volume spike alert.".into(),
227 category: "Finance".into(),
228 vpl: r#"stream VolumeSpike = trade
229 .where(volume > 10000)
230 .emit(alert: "Large trade detected", symbol: symbol, vol: volume, price: price, side: side)"#.into(),
231 events: r#"@0s trade { symbol: "AAPL", price: 185.50, volume: 500, side: "buy" }
232@1s trade { symbol: "GOOGL", price: 142.30, volume: 25000, side: "sell" }
233@2s trade { symbol: "AAPL", price: 185.60, volume: 15000, side: "buy" }
234@3s trade { symbol: "TSLA", price: 250.10, volume: 800, side: "sell" }
235@4s trade { symbol: "MSFT", price: 420.00, volume: 50000, side: "buy" }"#.into(),
236 expected_output_count: Some(3),
237 },
238 PlaygroundExampleDetail {
239 id: "cyber-killchain".into(),
240 name: "Cyber Kill Chain".into(),
241 description: "Detect a 3-stage attack sequence: scan → exploit → exfiltrate within 10 minutes.".into(),
242 category: "Security".into(),
243 vpl: r#"stream KillChain = scan as s -> exploit as e -> exfiltrate as x .within(10m)
244 .where(s.target_ip == e.target_ip and e.target_ip == x.source_ip)
245 .emit(alert: "Kill chain detected", target: s.target_ip, attacker: s.source_ip)"#.into(),
246 events: r#"@0s scan { source_ip: "10.0.0.5", target_ip: "192.168.1.100", port: 443 }
247@30s exploit { source_ip: "10.0.0.5", target_ip: "192.168.1.100", cve: "CVE-2024-1234" }
248@60s exfiltrate { source_ip: "192.168.1.100", dest_ip: "10.0.0.5", bytes: 50000000 }
249@120s scan { source_ip: "10.0.0.9", target_ip: "192.168.1.200", port: 80 }
250@180s login { user_id: "admin", ip: "192.168.1.200" }"#.into(),
251 expected_output_count: None,
252 },
253 PlaygroundExampleDetail {
254 id: "kleene-pattern".into(),
255 name: "Brute Force (Kleene)".into(),
256 description: "Detect repeated failed logins followed by a success — uses 'all' for Kleene+ matching with match_count threshold.".into(),
257 category: "Security".into(),
258 vpl: r#"# Kleene pattern: match ALL failed logins, then a successful login
259# partition_by(user_id) isolates each user's login sequence
260# Filter: at least 3 failed attempts (match_count >= 4)
261stream BruteForce = failed_login -> all failed_login as f -> successful_login as s .within(5m)
262 .partition_by(user_id)
263 .where(match_count >= 4)
264 .emit(alert: "Brute force detected", user: s.user_id, failed_attempts: match_count - 1)"#.into(),
265 events: r#"# Admin: 3 failed logins then success — triggers alert (3 failures >= 3)
266@0s failed_login { user_id: "admin", ip: "10.0.0.5" }
267@1s failed_login { user_id: "admin", ip: "10.0.0.5" }
268@2s failed_login { user_id: "admin", ip: "10.0.0.5" }
269@3s successful_login { user_id: "admin", ip: "10.0.0.5" }
270# Eve: 2 failed logins then success — no alert (2 failures < 3)
271@10s failed_login { user_id: "eve", ip: "10.0.0.7" }
272@11s failed_login { user_id: "eve", ip: "10.0.0.7" }
273@12s successful_login { user_id: "eve", ip: "10.0.0.7" }
274# Bob: just logs in — no alert (no failed logins)
275@20s successful_login { user_id: "bob", ip: "10.0.0.9" }"#.into(),
276 expected_output_count: None,
277 },
278 PlaygroundExampleDetail {
279 id: "merge-stream".into(),
280 name: "Merge Streams".into(),
281 description: "Combine events from multiple sources into a single alert stream.".into(),
282 category: "Getting Started".into(),
283 vpl: r#"stream TempAlerts = TempReading
284 .where(temperature > 30)
285 .emit(alert: "High temp", source: "temp", value: temperature)
286
287stream HumidAlerts = HumidityReading
288 .where(humidity > 80)
289 .emit(alert: "High humidity", source: "humidity", value: humidity)
290
291stream AllAlerts = merge(TempAlerts, HumidAlerts)
292 .emit(alert: alert, source: source, value: value)"#.into(),
293 events: r#"@0s TempReading { sensor_id: "S1", temperature: 35 }
294@1s HumidityReading { sensor_id: "S2", humidity: 85 }
295@2s TempReading { sensor_id: "S3", temperature: 22 }
296@3s HumidityReading { sensor_id: "S4", humidity: 45 }
297@4s TempReading { sensor_id: "S5", temperature: 38 }"#.into(),
298 expected_output_count: None,
299 },
300 PlaygroundExampleDetail {
301 id: "ddos-detection".into(),
302 name: "DDoS Detection (Kleene)".into(),
303 description: "Detect a flood of requests to the same target — Kleene+ with match_count threshold.".into(),
304 category: "Security".into(),
305 vpl: r#"# Detect 5+ requests to the same host within 10 seconds
306# partition_by(host) isolates per-target detection
307stream DDoS = http_request -> all http_request as flood .within(10s)
308 .partition_by(host)
309 .where(match_count >= 5)
310 .emit(alert: "DDoS detected", target: flood.host, request_count: match_count)"#.into(),
311 events: r#"# Burst of 6 requests to api.example.com — triggers alert
312@0s http_request { host: "api.example.com", method: "GET", ip: "10.0.0.1" }
313@1s http_request { host: "api.example.com", method: "POST", ip: "10.0.0.2" }
314@2s http_request { host: "api.example.com", method: "GET", ip: "10.0.0.3" }
315@3s http_request { host: "api.example.com", method: "GET", ip: "10.0.0.1" }
316@4s http_request { host: "api.example.com", method: "DELETE", ip: "10.0.0.4" }
317@5s http_request { host: "api.example.com", method: "GET", ip: "10.0.0.2" }
318# Normal traffic — 2 requests, won't trigger
319@20s http_request { host: "cdn.example.com", method: "GET", ip: "10.0.0.5" }
320@21s http_request { host: "cdn.example.com", method: "GET", ip: "10.0.0.6" }"#.into(),
321 expected_output_count: None,
322 },
323 PlaygroundExampleDetail {
324 id: "error-storm".into(),
325 name: "Error Storm (Kleene)".into(),
326 description: "Detect cascading errors in microservices — repeated errors followed by a timeout.".into(),
327 category: "IoT".into(),
328 vpl: r#"# Cascading failure: 3+ errors then a timeout event
329# partition_by(service) isolates per-service detection
330stream ErrorStorm = service_error -> all service_error as errors -> timeout as t .within(30s)
331 .partition_by(service)
332 .where(match_count >= 4)
333 .emit(alert: "Error storm", service: t.service, error_count: match_count - 1)"#.into(),
334 events: r#"# 4 errors then timeout — triggers (4 errors >= 3)
335@0s service_error { service: "payments", code: 500, msg: "DB connection lost" }
336@2s service_error { service: "payments", code: 503, msg: "Circuit breaker open" }
337@4s service_error { service: "payments", code: 500, msg: "DB connection lost" }
338@6s service_error { service: "payments", code: 502, msg: "Bad gateway" }
339@8s timeout { service: "payments", duration_ms: 30000 }
340# 1 error then timeout — no alert (1 error < 3)
341@20s service_error { service: "auth", code: 401, msg: "Token expired" }
342@22s timeout { service: "auth", duration_ms: 5000 }"#.into(),
343 expected_output_count: None,
344 },
345 PlaygroundExampleDetail {
346 id: "stock-trend".into(),
347 name: "Stock Trend Count".into(),
348 description: "Count rising stock price trends using Hamlet trend aggregation — efficient multi-pattern counting.".into(),
349 category: "Finance".into(),
350 vpl: r"# Count how many rising trends appear in a stock's price series
351stream RisingTrends = StockTick as first
352 -> all StockTick as rising
353 .within(60s)
354 .trend_aggregate(count: count_trends())
355 .emit(trends: count)".into(),
356 events: r#"@0s StockTick { symbol: "AAPL", price: 150.0 }
357@1s StockTick { symbol: "AAPL", price: 152.0 }
358@2s StockTick { symbol: "AAPL", price: 155.0 }
359@3s StockTick { symbol: "AAPL", price: 153.0 }
360@4s StockTick { symbol: "AAPL", price: 158.0 }
361@5s StockTick { symbol: "AAPL", price: 160.0 }
362@6s StockTick { symbol: "AAPL", price: 162.0 }"#.into(),
363 expected_output_count: None,
364 },
365 PlaygroundExampleDetail {
366 id: "sensor-stats".into(),
367 name: "Sensor Statistics".into(),
368 description: "Compute sum, average, and count over sensor readings using trend aggregation.".into(),
369 category: "IoT".into(),
370 vpl: r"# Multiple aggregation functions on a Kleene sequence
371stream SensorStats = sensor_reading as first
372 -> all sensor_reading as readings
373 .within(60s)
374 .trend_aggregate(
375 total: sum_trends(readings.temperature),
376 avg_temp: avg_trends(readings.temperature),
377 reading_count: count_events(readings)
378 )
379 .emit(sum: total, average: avg_temp, count: reading_count)".into(),
380 events: r#"@0s sensor_reading { sensor_id: "S1", temperature: 20.5, zone: "A" }
381@1s sensor_reading { sensor_id: "S1", temperature: 22.0, zone: "A" }
382@2s sensor_reading { sensor_id: "S1", temperature: 24.5, zone: "A" }
383@3s sensor_reading { sensor_id: "S1", temperature: 21.0, zone: "A" }
384@4s sensor_reading { sensor_id: "S1", temperature: 26.0, zone: "A" }
385@5s sensor_reading { sensor_id: "S1", temperature: 28.5, zone: "A" }"#.into(),
386 expected_output_count: None,
387 },
388 PlaygroundExampleDetail {
389 id: "transaction-analysis".into(),
390 name: "Transaction Analysis".into(),
391 description: "Multi-type Kleene: track processing steps between transaction open and close, with aggregated stats.".into(),
392 category: "Advanced".into(),
393 vpl: r"# Open -> multiple steps -> Close, with aggregated statistics
394stream TxStats = TxOpen as open
395 -> all TxStep as steps
396 -> TxClose as close
397 .within(30m)
398 .trend_aggregate(
399 avg_dur: avg_trends(steps.duration),
400 max_errors: max_trends(steps.error_count),
401 step_count: count_events(steps),
402 trend_count: count_trends()
403 )
404 .emit(tx: open.tx_id, steps: step_count, avg_duration: avg_dur, max_err: max_errors)".into(),
405 events: r#"# Transaction TX-001: open, 4 processing steps, close
406@0s TxOpen { tx_id: "TX-001", customer: "acme_corp" }
407@1s TxStep { tx_id: "TX-001", step: "validate", duration: 120.0, error_count: 0.0 }
408@2s TxStep { tx_id: "TX-001", step: "enrich", duration: 350.0, error_count: 1.0 }
409@3s TxStep { tx_id: "TX-001", step: "transform", duration: 90.0, error_count: 0.0 }
410@4s TxStep { tx_id: "TX-001", step: "persist", duration: 200.0, error_count: 2.0 }
411@5s TxClose { tx_id: "TX-001", status: "completed" }"#.into(),
412 expected_output_count: None,
413 },
414 PlaygroundExampleDetail {
415 id: "forecast-fraud".into(),
416 name: "Fraud Forecasting".into(),
417 description: "Predict fraud patterns using PST-based forecasting — sequence prediction with confidence scores.".into(),
418 category: "Advanced".into(),
419 vpl: r"stream FraudForecast = login as l -> transfer as t .within(5m)
420 .forecast(confidence: 0.7, horizon: 2m, warmup: 50, max_depth: 3)
421 .where(forecast_probability > 0.5)
422 .emit(probability: forecast_probability, state: forecast_state)".into(),
423 events: {
424 let mut lines = Vec::new();
426 for i in 0..60 {
427 let t = i * 2;
428 lines.push(format!(
429 "@{}s login {{ user_id: \"user_{}\", city: \"NYC\" }}",
430 t,
431 i % 10
432 ));
433 lines.push(format!(
434 "@{}s transfer {{ user_id: \"user_{}\", amount: {} }}",
435 t + 1,
436 i % 10,
437 100 + i * 10
438 ));
439 }
440 lines.push("@120s login { user_id: \"user_0\", city: \"NYC\" }".into());
442 lines.join("\n")
443 },
444 expected_output_count: None, },
446 PlaygroundExampleDetail {
447 id: "kleene-aggregates".into(),
448 name: "Kleene Aggregates".into(),
449 description: "Inline aggregates on Kleene matches — sum, avg, min, max, count without trend_aggregate.".into(),
450 category: "Advanced".into(),
451 vpl: r"# Detect large cumulative transfers after login
452# partition_by(user_id) isolates per-user sequences
453stream LargeTransfers = login as l -> all transfer as t .within(10m)
454 .partition_by(user_id)
455 .where(sum(t.amount) > 500)
456 .emit(
457 user: l.user_id,
458 total: sum(t.amount),
459 avg_amount: avg(t.amount),
460 largest: max(t.amount),
461 smallest: min(t.amount),
462 num_transfers: count(t)
463 )".into(),
464 events: r#"# Alice: login then 3 transfers totaling 1500 — triggers (1500 > 500)
465@0s login { user_id: "alice", city: "NYC" }
466@10s transfer { user_id: "alice", amount: 200.0, to: "ext_001" }
467@20s transfer { user_id: "alice", amount: 800.0, to: "ext_002" }
468@30s transfer { user_id: "alice", amount: 500.0, to: "ext_003" }
469# Bob: login then 1 small transfer — no alert (100 < 500)
470@60s login { user_id: "bob", city: "London" }
471@70s transfer { user_id: "bob", amount: 100.0, to: "ext_010" }"#.into(),
472 expected_output_count: None,
473 },
474 PlaygroundExampleDetail {
475 id: "first-last".into(),
476 name: "First/Last Access".into(),
477 description: "Access the first and last events in a Kleene match — detect location changes.".into(),
478 category: "Security".into(),
479 vpl: r"# Detect when a user roams across cities
480# partition_by(user_id) isolates per-user login sequences
481stream LocationChange = login as l -> all login as f .within(1h)
482 .partition_by(user_id)
483 .where(count(f) >= 2 and l.city != last(f).city)
484 .emit(
485 user: l.user_id,
486 origin: l.city,
487 first_roam: first(f).city,
488 latest: last(f).city,
489 roam_count: count(f)
490 )".into(),
491 events: r#"# Alice: NYC then roams to London, Tokyo, Berlin — triggers
492@0s login { user_id: "alice", city: "NYC", device: "mobile" }
493@60s login { user_id: "alice", city: "London", device: "laptop" }
494@120s login { user_id: "alice", city: "Tokyo", device: "desktop" }
495@180s login { user_id: "alice", city: "Berlin", device: "tablet" }
496# Bob: stays in Berlin — no location change
497@200s login { user_id: "bob", city: "Berlin", device: "mobile" }
498@260s login { user_id: "bob", city: "Berlin", device: "tablet" }"#.into(),
499 expected_output_count: None,
500 },
501 PlaygroundExampleDetail {
502 id: "distinct-count".into(),
503 name: "Distinct Count".into(),
504 description: "Count unique values across Kleene matches — detect distributed attacks from many IPs.".into(),
505 category: "Security".into(),
506 vpl: r"# Detect attacks from 3+ distinct source IPs
507# partition_by(target_ip) isolates per-target detection
508stream DistributedAttack = scan -> all scan as s .within(1m)
509 .partition_by(target_ip)
510 .where(distinct_count(s.source_ip) >= 3)
511 .emit(
512 target: s.target_ip,
513 unique_sources: distinct_count(s.source_ip),
514 total_scans: count(s)
515 )".into(),
516 events: r#"# Server .100: 5 scans from 4 distinct IPs — triggers (4 >= 3)
517@0s scan { source_ip: "10.0.0.1", target_ip: "192.168.1.100", port: 22 }
518@1s scan { source_ip: "10.0.0.2", target_ip: "192.168.1.100", port: 80 }
519@2s scan { source_ip: "10.0.0.3", target_ip: "192.168.1.100", port: 443 }
520@3s scan { source_ip: "10.0.0.1", target_ip: "192.168.1.100", port: 8080 }
521@4s scan { source_ip: "10.0.0.4", target_ip: "192.168.1.100", port: 3306 }
522# Server .200: 2 scans from 1 IP — no alert (1 < 3)
523@20s scan { source_ip: "10.0.0.5", target_ip: "192.168.1.200", port: 22 }
524@21s scan { source_ip: "10.0.0.5", target_ip: "192.168.1.200", port: 80 }"#.into(),
525 expected_output_count: None,
526 },
527 PlaygroundExampleDetail {
528 id: "absence-detection".into(),
529 name: "Absence Detection".into(),
530 description: "Detect when an expected event does NOT occur — order without shipment.".into(),
531 category: "Getting Started".into(),
532 vpl: r"# Alert if an order is placed but a cancellation arrives before shipment
533# .not(cancellation) invalidates the run if cancellation appears
534stream OrderCancelled = order as o -> shipment as s .within(5m)
535 .not(cancellation)
536 .emit(order_id: o.order_id, status: s.status)".into(),
537 events: r#"# Order 1: order -> shipment (no cancellation) — matches
538@0s order { order_id: "ORD-001", customer: "alice", total: 99.99 }
539@30s shipment { order_id: "ORD-001", status: "shipped", carrier: "ups" }
540# Order 2: order -> cancellation -> shipment — no match (cancellation kills run)
541@60s order { order_id: "ORD-002", customer: "bob", total: 50.00 }
542@70s cancellation { order_id: "ORD-002", reason: "changed mind" }
543@90s shipment { order_id: "ORD-002", status: "shipped", carrier: "fedex" }
544# Order 3: order -> shipment (no cancellation) — matches
545@120s order { order_id: "ORD-003", customer: "charlie", total: 200.00 }
546@150s shipment { order_id: "ORD-003", status: "shipped", carrier: "dhl" }"#.into(),
547 expected_output_count: None,
548 },
549 PlaygroundExampleDetail {
550 id: "rate-detection".into(),
551 name: "Rate Detection".into(),
552 description: "Detect high event rates using match_rate — events per second within a pattern match.".into(),
553 category: "Security".into(),
554 vpl: r"# Alert when request rate exceeds 2 events/second
555# partition_by(endpoint) isolates per-endpoint rate monitoring
556stream HighRate = request -> all request as r .within(30s)
557 .partition_by(endpoint)
558 .where(match_count >= 3 and match_rate > 2.0)
559 .emit(
560 endpoint: r.endpoint,
561 rate: match_rate,
562 total: match_count
563 )".into(),
564 events: r#"# /api/login: 6 requests in 2 seconds — triggers (rate > 2.0)
565@0s request { endpoint: "/api/login", ip: "10.0.0.1" }
566@0s request { endpoint: "/api/login", ip: "10.0.0.1" }
567@1s request { endpoint: "/api/login", ip: "10.0.0.1" }
568@1s request { endpoint: "/api/login", ip: "10.0.0.1" }
569@2s request { endpoint: "/api/login", ip: "10.0.0.1" }
570@2s request { endpoint: "/api/login", ip: "10.0.0.1" }
571# /api/data: 3 requests over 10 seconds — no alert (rate < 2.0)
572@20s request { endpoint: "/api/data", ip: "10.0.0.2" }
573@25s request { endpoint: "/api/data", ip: "10.0.0.2" }
574@30s request { endpoint: "/api/data", ip: "10.0.0.2" }"#.into(),
575 expected_output_count: None,
576 },
577 ]
578}
579
580pub fn playground_routes(playground: SharedPlayground) -> Router {
585 Router::new()
586 .route("/api/v1/playground/session", post(handle_create_session))
587 .route(
588 "/api/v1/playground/run",
589 post(handle_run).layer(tower_http::limit::RequestBodyLimitLayer::new(1024 * 1024)),
590 )
591 .route(
592 "/api/v1/playground/validate",
593 post(handle_validate).layer(tower_http::limit::RequestBodyLimitLayer::new(256 * 1024)),
594 )
595 .route("/api/v1/playground/examples", get(handle_list_examples))
596 .route("/api/v1/playground/examples/{id}", get(handle_get_example))
597 .with_state(playground)
598}
599
600async fn handle_create_session(State(playground): State<SharedPlayground>) -> impl IntoResponse {
605 let session_id = Uuid::new_v4().to_string();
606 {
607 let mut pg = playground.write().await;
608 pg.get_or_create_session(&session_id);
609 }
610 let resp = SessionResponse { session_id };
611 (StatusCode::CREATED, Json(resp))
612}
613
614async fn handle_run(
615 State(playground): State<SharedPlayground>,
616 Json(body): Json<PlaygroundRunRequest>,
617) -> Response {
618 if body.vpl.len() > MAX_VPL_LENGTH {
620 return pg_error_response(
621 StatusCode::BAD_REQUEST,
622 "vpl_too_large",
623 &format!("VPL source exceeds maximum size of {MAX_VPL_LENGTH} bytes"),
624 );
625 }
626
627 {
629 let mut pg = playground.write().await;
630 let session_id = Uuid::new_v4().to_string();
631 pg.get_or_create_session(&session_id);
632 }
633
634 let start = Instant::now();
635
636 let timed_events = match varpulis_runtime::event_file::EventFileParser::parse(&body.events) {
638 Ok(te) => te,
639 Err(e) => {
640 let resp = PlaygroundRunResponse {
641 ok: false,
642 events_processed: 0,
643 output_events: vec![],
644 latency_ms: 0,
645 diagnostics: vec![],
646 error: Some(format!("Event parse error: {e}")),
647 };
648 return (StatusCode::OK, Json(resp)).into_response();
649 }
650 };
651
652 if timed_events.len() > MAX_EVENTS_PER_RUN {
653 return pg_error_response(
654 StatusCode::BAD_REQUEST,
655 "too_many_events",
656 &format!("Maximum {MAX_EVENTS_PER_RUN} events per run"),
657 );
658 }
659
660 let events: Vec<Event> = timed_events.into_iter().map(|te| te.event).collect();
661 let event_count = events.len();
662
663 let run_result = tokio::time::timeout(
665 Duration::from_secs(MAX_EXECUTION_SECS),
666 crate::simulate_from_source(&body.vpl, events),
667 )
668 .await;
669
670 let latency_ms = start.elapsed().as_millis() as u64;
671
672 match run_result {
673 Ok(Ok(output_events)) => {
674 let output: Vec<serde_json::Value> = output_events
675 .iter()
676 .map(|e| {
677 let mut flat = serde_json::Map::new();
678 flat.insert(
679 "event_type".to_string(),
680 serde_json::Value::String(e.event_type.to_string()),
681 );
682 for (k, v) in &e.data {
683 flat.insert(k.to_string(), crate::websocket::value_to_json(v));
684 }
685 serde_json::Value::Object(flat)
686 })
687 .collect();
688
689 let resp = PlaygroundRunResponse {
690 ok: true,
691 events_processed: event_count,
692 output_events: output,
693 latency_ms,
694 diagnostics: vec![],
695 error: None,
696 };
697 (StatusCode::OK, Json(resp)).into_response()
698 }
699 Ok(Err(e)) => {
700 let resp = PlaygroundRunResponse {
701 ok: false,
702 events_processed: 0,
703 output_events: vec![],
704 latency_ms,
705 diagnostics: vec![],
706 error: Some(e.to_string()),
707 };
708 (StatusCode::OK, Json(resp)).into_response()
709 }
710 Err(_timeout) => {
711 let resp = PlaygroundRunResponse {
712 ok: false,
713 events_processed: 0,
714 output_events: vec![],
715 latency_ms,
716 diagnostics: vec![],
717 error: Some(format!("Execution timed out after {MAX_EXECUTION_SECS}s")),
718 };
719 (StatusCode::REQUEST_TIMEOUT, Json(resp)).into_response()
720 }
721 }
722}
723
724async fn handle_validate(
725 State(_playground): State<SharedPlayground>,
726 Json(body): Json<PlaygroundValidateRequest>,
727) -> Response {
728 if body.vpl.len() > MAX_VPL_LENGTH {
729 return pg_error_response(
730 StatusCode::BAD_REQUEST,
731 "vpl_too_large",
732 &format!("VPL source exceeds maximum size of {MAX_VPL_LENGTH} bytes"),
733 );
734 }
735
736 let result = match varpulis_parser::parse(&body.vpl) {
737 Ok(program) => {
738 let ast = serde_json::to_value(&program).ok();
739 let validation = varpulis_core::validate::validate(&body.vpl, &program);
740 let diagnostics: Vec<PlaygroundDiagnostic> = validation
743 .diagnostics
744 .iter()
745 .filter(|d| d.code != Some("E033"))
746 .map(|d| {
747 let (sl, sc) = position_to_line_col(&body.vpl, d.span.start);
748 let (el, ec) = position_to_line_col(&body.vpl, d.span.end);
749 PlaygroundDiagnostic {
750 severity: match d.severity {
751 varpulis_core::validate::Severity::Error => "error".into(),
752 varpulis_core::validate::Severity::Warning => "warning".into(),
753 },
754 message: d.message.clone(),
755 hint: d.hint.clone(),
756 code: d.code.map(|c| c.to_string()),
757 start_line: sl as u32,
758 start_col: sc as u32,
759 end_line: el as u32,
760 end_col: ec as u32,
761 }
762 })
763 .collect();
764 let has_errors = diagnostics.iter().any(|d| d.severity == "error");
765 PlaygroundValidateResponse {
766 ok: !has_errors,
767 ast,
768 diagnostics,
769 }
770 }
771 Err(error) => {
772 let diag = parse_error_to_diagnostic(&body.vpl, &error);
773 PlaygroundValidateResponse {
774 ok: false,
775 ast: None,
776 diagnostics: vec![diag],
777 }
778 }
779 };
780
781 (StatusCode::OK, Json(result)).into_response()
782}
783
784async fn handle_list_examples() -> impl IntoResponse {
785 let examples: Vec<PlaygroundExample> = builtin_examples()
786 .into_iter()
787 .map(|e| PlaygroundExample {
788 id: e.id,
789 name: e.name,
790 description: e.description,
791 category: e.category,
792 })
793 .collect();
794 (StatusCode::OK, Json(examples))
795}
796
797async fn handle_get_example(Path(id): Path<String>) -> Response {
798 let examples = builtin_examples();
799 match examples.into_iter().find(|e| e.id == id) {
800 Some(example) => (StatusCode::OK, Json(example)).into_response(),
801 None => pg_error_response(
802 StatusCode::NOT_FOUND,
803 "example_not_found",
804 &format!("Example '{id}' not found"),
805 ),
806 }
807}
808
809pub fn spawn_session_reaper(playground: SharedPlayground) {
815 tokio::spawn(async move {
816 loop {
817 tokio::time::sleep(REAPER_INTERVAL).await;
818 let mut pg = playground.write().await;
819 let reaped = pg.reap_expired();
820 if reaped > 0 {
821 tracing::debug!("Playground: reaped {} expired sessions", reaped);
822 }
823 }
824 });
825}
826
827fn pg_error_response(status: StatusCode, code: &str, message: &str) -> Response {
832 let body = PlaygroundError {
833 error: message.to_string(),
834 code: code.to_string(),
835 };
836 (status, Json(body)).into_response()
837}
838
839fn parse_error_to_diagnostic(
840 source: &str,
841 error: &varpulis_parser::ParseError,
842) -> PlaygroundDiagnostic {
843 use varpulis_parser::ParseError;
844 match error {
845 ParseError::Located {
846 line,
847 column,
848 message,
849 hint,
850 ..
851 } => PlaygroundDiagnostic {
852 severity: "error".into(),
853 message: message.clone(),
854 hint: hint.clone(),
855 code: None,
856 start_line: line.saturating_sub(1) as u32,
857 start_col: column.saturating_sub(1) as u32,
858 end_line: line.saturating_sub(1) as u32,
859 end_col: *column as u32,
860 },
861 ParseError::UnexpectedToken {
862 position,
863 expected,
864 found,
865 } => {
866 let (line, col) = position_to_line_col(source, *position);
867 PlaygroundDiagnostic {
868 severity: "error".into(),
869 message: format!("Unexpected token: expected {expected}, found '{found}'"),
870 hint: None,
871 code: None,
872 start_line: line as u32,
873 start_col: col as u32,
874 end_line: line as u32,
875 end_col: (col + found.len()) as u32,
876 }
877 }
878 ParseError::UnexpectedEof => {
879 let line = source.lines().count().saturating_sub(1);
880 let col = source.lines().last().map_or(0, |l| l.len());
881 PlaygroundDiagnostic {
882 severity: "error".into(),
883 message: "Unexpected end of input".into(),
884 hint: None,
885 code: None,
886 start_line: line as u32,
887 start_col: col as u32,
888 end_line: line as u32,
889 end_col: col as u32,
890 }
891 }
892 ParseError::InvalidToken { position, message } => {
893 let (line, col) = position_to_line_col(source, *position);
894 PlaygroundDiagnostic {
895 severity: "error".into(),
896 message: message.clone(),
897 hint: None,
898 code: None,
899 start_line: line as u32,
900 start_col: col as u32,
901 end_line: line as u32,
902 end_col: (col + 10) as u32,
903 }
904 }
905 ParseError::InvalidNumber(msg)
906 | ParseError::InvalidDuration(msg)
907 | ParseError::InvalidTimestamp(msg)
908 | ParseError::InvalidEscape(msg) => PlaygroundDiagnostic {
909 severity: "error".into(),
910 message: msg.clone(),
911 hint: None,
912 code: None,
913 start_line: 0,
914 start_col: 0,
915 end_line: 0,
916 end_col: 0,
917 },
918 ParseError::UnterminatedString(position) => {
919 let (line, col) = position_to_line_col(source, *position);
920 PlaygroundDiagnostic {
921 severity: "error".into(),
922 message: "Unterminated string literal".into(),
923 hint: None,
924 code: None,
925 start_line: line as u32,
926 start_col: col as u32,
927 end_line: line as u32,
928 end_col: source.lines().nth(line).map_or(col, |l| l.len()) as u32,
929 }
930 }
931 ParseError::Custom { span, message } => {
932 let (sl, sc) = position_to_line_col(source, span.start);
933 let (el, ec) = position_to_line_col(source, span.end);
934 PlaygroundDiagnostic {
935 severity: "error".into(),
936 message: message.clone(),
937 hint: None,
938 code: None,
939 start_line: sl as u32,
940 start_col: sc as u32,
941 end_line: el as u32,
942 end_col: ec as u32,
943 }
944 }
945 }
946}
947
948fn position_to_line_col(source: &str, position: usize) -> (usize, usize) {
949 let mut line = 0;
950 let mut col = 0;
951 let mut pos = 0;
952
953 for ch in source.chars() {
954 if pos >= position {
955 break;
956 }
957 if ch == '\n' {
958 line += 1;
959 col = 0;
960 } else {
961 col += 1;
962 }
963 pos += ch.len_utf8();
964 }
965
966 (line, col)
967}
968
969#[cfg(test)]
970mod tests {
971 use super::*;
972
973 #[test]
974 fn test_builtin_examples_valid() {
975 let examples = builtin_examples();
976 assert!(!examples.is_empty());
977 for example in &examples {
978 assert!(!example.id.is_empty());
979 assert!(!example.name.is_empty());
980 assert!(!example.vpl.is_empty());
981 assert!(!example.events.is_empty());
982 }
983 }
984
985 #[test]
986 fn test_builtin_examples_unique_ids() {
987 let examples = builtin_examples();
988 let mut ids: Vec<&str> = examples.iter().map(|e| e.id.as_str()).collect();
989 ids.sort_unstable();
990 ids.dedup();
991 assert_eq!(ids.len(), examples.len(), "Duplicate example IDs found");
992 }
993
994 #[test]
995 fn test_session_reaping() {
996 let mut state = PlaygroundState::new();
997 state.get_or_create_session("test-1");
998 state.get_or_create_session("test-2");
999 assert_eq!(state.sessions.len(), 2);
1000
1001 let reaped = state.reap_expired();
1003 assert_eq!(reaped, 0);
1004 assert_eq!(state.sessions.len(), 2);
1005 }
1006
1007 #[test]
1008 fn test_validate_response_for_valid_vpl() {
1009 let vpl = r#"
1010event SensorReading:
1011 temperature: int
1012
1013stream HighTemp = SensorReading
1014 .where(temperature > 30)
1015 .emit(alert: "high_temp")
1016"#;
1017 match varpulis_parser::parse(vpl) {
1018 Ok(program) => {
1019 let validation = varpulis_core::validate::validate(vpl, &program);
1020 let has_errors = validation
1021 .diagnostics
1022 .iter()
1023 .any(|d| d.severity == varpulis_core::validate::Severity::Error);
1024 assert!(!has_errors);
1025 }
1026 Err(e) => panic!("Parse failed: {e}"),
1027 }
1028 }
1029
1030 #[test]
1031 fn test_evt_format_parsing() {
1032 let evt_text = r#"sensor_reading { sensor_id: "S001", temperature: 65.3 }"#;
1033 let timed = varpulis_runtime::event_file::EventFileParser::parse(evt_text).unwrap();
1034 assert_eq!(timed.len(), 1);
1035 assert_eq!(timed[0].event.event_type.as_ref(), "sensor_reading");
1036 }
1037
1038 #[tokio::test]
1039 async fn test_iot_anomaly_example_produces_matches() {
1040 let examples = builtin_examples();
1041 let iot = examples.iter().find(|e| e.id == "iot-anomaly").unwrap();
1042
1043 let timed = varpulis_runtime::event_file::EventFileParser::parse(&iot.events).unwrap();
1044 let events: Vec<varpulis_runtime::event::Event> =
1045 timed.into_iter().map(|te| te.event).collect();
1046
1047 let results = crate::simulate_from_source(&iot.vpl, events).await.unwrap();
1048 assert_eq!(
1049 results.len(),
1050 iot.expected_output_count.unwrap(),
1051 "IoT anomaly example should produce {} matches, got {}",
1052 iot.expected_output_count.unwrap(),
1053 results.len()
1054 );
1055 }
1056
1057 #[tokio::test]
1058 async fn test_all_examples_with_expected_count() {
1059 for example in builtin_examples() {
1060 let Some(expected) = example.expected_output_count else {
1061 continue;
1062 };
1063
1064 let timed = varpulis_runtime::event_file::EventFileParser::parse(&example.events)
1065 .unwrap_or_else(|e| panic!("Event parse failed for '{}': {e}", example.id));
1066 let events: Vec<varpulis_runtime::event::Event> =
1067 timed.into_iter().map(|te| te.event).collect();
1068
1069 let results = crate::simulate_from_source(&example.vpl, events)
1070 .await
1071 .unwrap_or_else(|e| panic!("Run failed for '{}': {e}", example.id));
1072 assert_eq!(
1073 results.len(),
1074 expected,
1075 "Example '{}' expected {} matches, got {}",
1076 example.id,
1077 expected,
1078 results.len()
1079 );
1080 }
1081 }
1082}