Skip to main content

varpulis_cli/
playground.rs

1//! Playground API — zero-friction VPL execution for the interactive playground.
2//!
3//! Provides ephemeral sessions with no authentication required.
4//! Sessions auto-expire after inactivity and have conservative resource quotas.
5
6use std::collections::HashMap;
7use std::sync::Arc;
8use std::time::{Duration, Instant};
9
10use axum::extract::{Json, Path, State};
11use axum::http::StatusCode;
12use axum::response::{IntoResponse, Response};
13use axum::routing::{get, post};
14use axum::Router;
15use serde::{Deserialize, Serialize};
16use tokio::sync::RwLock;
17use uuid::Uuid;
18use varpulis_runtime::event::Event;
19
20// =============================================================================
21// Constants
22// =============================================================================
23
24/// Maximum events per playground run request.
25const MAX_EVENTS_PER_RUN: usize = 10_000;
26
27/// Maximum execution timeout per request (seconds).
28const MAX_EXECUTION_SECS: u64 = 10;
29
30/// Session expiry after inactivity.
31const SESSION_EXPIRY: Duration = Duration::from_secs(3600); // 1 hour
32
33/// Reaper interval — how often to clean up expired sessions.
34const REAPER_INTERVAL: Duration = Duration::from_secs(300); // 5 minutes
35
36/// Maximum VPL source length.
37const MAX_VPL_LENGTH: usize = 50_000;
38
39// =============================================================================
40// Types
41// =============================================================================
42
43/// Shared playground state.
44pub type SharedPlayground = Arc<RwLock<PlaygroundState>>;
45
46/// State for the playground backend.
47#[derive(Debug)]
48pub struct PlaygroundState {
49    sessions: HashMap<String, PlaygroundSession>,
50}
51
52#[derive(Debug)]
53struct PlaygroundSession {
54    last_active: Instant,
55}
56
57impl Default for PlaygroundState {
58    fn default() -> Self {
59        Self::new()
60    }
61}
62
63impl PlaygroundState {
64    pub fn new() -> Self {
65        Self {
66            sessions: HashMap::new(),
67        }
68    }
69
70    fn get_or_create_session(&mut self, session_id: &str) -> &mut PlaygroundSession {
71        self.sessions
72            .entry(session_id.to_string())
73            .or_insert_with(|| PlaygroundSession {
74                last_active: Instant::now(),
75            })
76    }
77
78    fn reap_expired(&mut self) -> usize {
79        let before = self.sessions.len();
80        self.sessions
81            .retain(|_, s| s.last_active.elapsed() < SESSION_EXPIRY);
82        before - self.sessions.len()
83    }
84}
85
86// =============================================================================
87// Request/Response types
88// =============================================================================
89
90#[derive(Debug, Serialize)]
91pub struct SessionResponse {
92    pub session_id: String,
93}
94
95#[derive(Debug, Deserialize)]
96pub struct PlaygroundRunRequest {
97    pub vpl: String,
98    /// Events in .evt text format (e.g., `sensor_reading { temperature: 65.3, zone: "A" }`)
99    #[serde(default)]
100    pub events: String,
101}
102
103#[derive(Debug, Deserialize)]
104pub struct PlaygroundValidateRequest {
105    pub vpl: String,
106}
107
108#[derive(Debug, Serialize)]
109pub struct PlaygroundRunResponse {
110    pub ok: bool,
111    pub events_processed: usize,
112    pub output_events: Vec<serde_json::Value>,
113    pub latency_ms: u64,
114    #[serde(skip_serializing_if = "Vec::is_empty")]
115    pub diagnostics: Vec<PlaygroundDiagnostic>,
116    #[serde(skip_serializing_if = "Option::is_none")]
117    pub error: Option<String>,
118}
119
120#[derive(Debug, Serialize)]
121pub struct PlaygroundValidateResponse {
122    pub ok: bool,
123    #[serde(skip_serializing_if = "Option::is_none")]
124    pub ast: Option<serde_json::Value>,
125    pub diagnostics: Vec<PlaygroundDiagnostic>,
126}
127
128#[derive(Debug, Serialize)]
129pub struct PlaygroundDiagnostic {
130    pub severity: String,
131    pub message: String,
132    #[serde(skip_serializing_if = "Option::is_none")]
133    pub hint: Option<String>,
134    #[serde(skip_serializing_if = "Option::is_none")]
135    pub code: Option<String>,
136    pub start_line: u32,
137    pub start_col: u32,
138    pub end_line: u32,
139    pub end_col: u32,
140}
141
142#[derive(Debug, Serialize)]
143pub struct PlaygroundExample {
144    pub id: String,
145    pub name: String,
146    pub description: String,
147    pub category: String,
148}
149
150#[derive(Debug, Serialize)]
151pub struct PlaygroundExampleDetail {
152    pub id: String,
153    pub name: String,
154    pub description: String,
155    pub category: String,
156    pub vpl: String,
157    /// Events in .evt text format
158    pub events: String,
159    pub expected_output_count: Option<usize>,
160}
161
162#[derive(Debug, Serialize)]
163struct PlaygroundError {
164    error: String,
165    code: String,
166}
167
168// =============================================================================
169// Built-in examples
170// =============================================================================
171
172fn builtin_examples() -> Vec<PlaygroundExampleDetail> {
173    vec![
174        PlaygroundExampleDetail {
175            id: "hvac-alert".into(),
176            name: "HVAC Alert".into(),
177            description: "Simple temperature threshold alert — the 'Hello World' of CEP.".into(),
178            category: "Getting Started".into(),
179            vpl: r#"stream HighTemp = TempReading
180    .where(temperature > 30)
181    .emit(alert: "High temperature detected", sensor: sensor_id, temp: temperature)"#.into(),
182            events: r#"@0s TempReading { sensor_id: "HVAC-01", temperature: 22 }
183@1s TempReading { sensor_id: "HVAC-02", temperature: 35 }
184@2s TempReading { sensor_id: "HVAC-03", temperature: 28 }
185@3s TempReading { sensor_id: "HVAC-01", temperature: 41 }
186@4s TempReading { sensor_id: "HVAC-04", temperature: 19 }
187@5s TempReading { sensor_id: "HVAC-02", temperature: 33 }"#.into(),
188            expected_output_count: Some(3),
189        },
190        PlaygroundExampleDetail {
191            id: "fraud-detection".into(),
192            name: "Fraud Detection".into(),
193            description: "Detect login followed by large transfer within 5 minutes — sequence pattern matching.".into(),
194            category: "Finance".into(),
195            vpl: r#"stream FraudAlert = login as l -> transfer as t .within(5m)
196    .where(l.user_id == t.user_id and t.amount > 5000)
197    .emit(alert: "Suspicious transfer after login", user: l.user_id, amount: t.amount, city: l.city)"#.into(),
198            events: r#"# Alice logs in, then makes a large transfer — triggers alert
199@0s  login { user_id: "alice", city: "New York", device: "mobile" }
200@10s transfer { user_id: "bob", amount: 200, to_account: "ext_001" }
201@30s transfer { user_id: "alice", amount: 15000, to_account: "ext_099" }
202# Charlie scenario
203@60s login { user_id: "charlie", city: "London", device: "desktop" }
204@90s transfer { user_id: "charlie", amount: 8500, to_account: "ext_042" }
205@120s transfer { user_id: "alice", amount: 100, to_account: "ext_005" }"#.into(),
206            expected_output_count: None,
207        },
208        PlaygroundExampleDetail {
209            id: "iot-anomaly".into(),
210            name: "IoT Sensor Anomaly".into(),
211            description: "Detect temperature spikes in sensor data using window aggregation.".into(),
212            category: "IoT".into(),
213            vpl: r#"stream TempSpike = sensor_reading
214    .where(temperature > 50)
215    .emit(alert: "Temperature spike", sensor: sensor_id, temp: temperature, zone: zone)"#.into(),
216            events: r#"@0s sensor_reading { sensor_id: "S001", zone: "zone_a", temperature: 22.5 }
217@1s sensor_reading { sensor_id: "S002", zone: "zone_b", temperature: 65.3 }
218@2s sensor_reading { sensor_id: "S001", zone: "zone_a", temperature: 23.1 }
219@3s sensor_reading { sensor_id: "S003", zone: "zone_c", temperature: 55.0 }
220@4s sensor_reading { sensor_id: "S002", zone: "zone_b", temperature: 24.0 }"#.into(),
221            expected_output_count: Some(2),
222        },
223        PlaygroundExampleDetail {
224            id: "trading-signal".into(),
225            name: "Trading Signal".into(),
226            description: "Detect large trades on a single symbol — volume spike alert.".into(),
227            category: "Finance".into(),
228            vpl: r#"stream VolumeSpike = trade
229    .where(volume > 10000)
230    .emit(alert: "Large trade detected", symbol: symbol, vol: volume, price: price, side: side)"#.into(),
231            events: r#"@0s trade { symbol: "AAPL", price: 185.50, volume: 500, side: "buy" }
232@1s trade { symbol: "GOOGL", price: 142.30, volume: 25000, side: "sell" }
233@2s trade { symbol: "AAPL", price: 185.60, volume: 15000, side: "buy" }
234@3s trade { symbol: "TSLA", price: 250.10, volume: 800, side: "sell" }
235@4s trade { symbol: "MSFT", price: 420.00, volume: 50000, side: "buy" }"#.into(),
236            expected_output_count: Some(3),
237        },
238        PlaygroundExampleDetail {
239            id: "cyber-killchain".into(),
240            name: "Cyber Kill Chain".into(),
241            description: "Detect a 3-stage attack sequence: scan → exploit → exfiltrate within 10 minutes.".into(),
242            category: "Security".into(),
243            vpl: r#"stream KillChain = scan as s -> exploit as e -> exfiltrate as x .within(10m)
244    .where(s.target_ip == e.target_ip and e.target_ip == x.source_ip)
245    .emit(alert: "Kill chain detected", target: s.target_ip, attacker: s.source_ip)"#.into(),
246            events: r#"@0s  scan { source_ip: "10.0.0.5", target_ip: "192.168.1.100", port: 443 }
247@30s exploit { source_ip: "10.0.0.5", target_ip: "192.168.1.100", cve: "CVE-2024-1234" }
248@60s exfiltrate { source_ip: "192.168.1.100", dest_ip: "10.0.0.5", bytes: 50000000 }
249@120s scan { source_ip: "10.0.0.9", target_ip: "192.168.1.200", port: 80 }
250@180s login { user_id: "admin", ip: "192.168.1.200" }"#.into(),
251            expected_output_count: None,
252        },
253        PlaygroundExampleDetail {
254            id: "kleene-pattern".into(),
255            name: "Brute Force (Kleene)".into(),
256            description: "Detect repeated failed logins followed by a success — uses 'all' for Kleene+ matching with match_count threshold.".into(),
257            category: "Security".into(),
258            vpl: r#"# Kleene pattern: match ALL failed logins, then a successful login
259# partition_by(user_id) isolates each user's login sequence
260# Filter: at least 3 failed attempts (match_count >= 4)
261stream BruteForce = failed_login -> all failed_login as f -> successful_login as s .within(5m)
262    .partition_by(user_id)
263    .where(match_count >= 4)
264    .emit(alert: "Brute force detected", user: s.user_id, failed_attempts: match_count - 1)"#.into(),
265            events: r#"# Admin: 3 failed logins then success — triggers alert (3 failures >= 3)
266@0s failed_login { user_id: "admin", ip: "10.0.0.5" }
267@1s failed_login { user_id: "admin", ip: "10.0.0.5" }
268@2s failed_login { user_id: "admin", ip: "10.0.0.5" }
269@3s successful_login { user_id: "admin", ip: "10.0.0.5" }
270# Eve: 2 failed logins then success — no alert (2 failures < 3)
271@10s failed_login { user_id: "eve", ip: "10.0.0.7" }
272@11s failed_login { user_id: "eve", ip: "10.0.0.7" }
273@12s successful_login { user_id: "eve", ip: "10.0.0.7" }
274# Bob: just logs in — no alert (no failed logins)
275@20s successful_login { user_id: "bob", ip: "10.0.0.9" }"#.into(),
276            expected_output_count: None,
277        },
278        PlaygroundExampleDetail {
279            id: "merge-stream".into(),
280            name: "Merge Streams".into(),
281            description: "Combine events from multiple sources into a single alert stream.".into(),
282            category: "Getting Started".into(),
283            vpl: r#"stream TempAlerts = TempReading
284    .where(temperature > 30)
285    .emit(alert: "High temp", source: "temp", value: temperature)
286
287stream HumidAlerts = HumidityReading
288    .where(humidity > 80)
289    .emit(alert: "High humidity", source: "humidity", value: humidity)
290
291stream AllAlerts = merge(TempAlerts, HumidAlerts)
292    .emit(alert: alert, source: source, value: value)"#.into(),
293            events: r#"@0s TempReading { sensor_id: "S1", temperature: 35 }
294@1s HumidityReading { sensor_id: "S2", humidity: 85 }
295@2s TempReading { sensor_id: "S3", temperature: 22 }
296@3s HumidityReading { sensor_id: "S4", humidity: 45 }
297@4s TempReading { sensor_id: "S5", temperature: 38 }"#.into(),
298            expected_output_count: None,
299        },
300        PlaygroundExampleDetail {
301            id: "ddos-detection".into(),
302            name: "DDoS Detection (Kleene)".into(),
303            description: "Detect a flood of requests to the same target — Kleene+ with match_count threshold.".into(),
304            category: "Security".into(),
305            vpl: r#"# Detect 5+ requests to the same host within 10 seconds
306# partition_by(host) isolates per-target detection
307stream DDoS = http_request -> all http_request as flood .within(10s)
308    .partition_by(host)
309    .where(match_count >= 5)
310    .emit(alert: "DDoS detected", target: flood.host, request_count: match_count)"#.into(),
311            events: r#"# Burst of 6 requests to api.example.com — triggers alert
312@0s http_request { host: "api.example.com", method: "GET", ip: "10.0.0.1" }
313@1s http_request { host: "api.example.com", method: "POST", ip: "10.0.0.2" }
314@2s http_request { host: "api.example.com", method: "GET", ip: "10.0.0.3" }
315@3s http_request { host: "api.example.com", method: "GET", ip: "10.0.0.1" }
316@4s http_request { host: "api.example.com", method: "DELETE", ip: "10.0.0.4" }
317@5s http_request { host: "api.example.com", method: "GET", ip: "10.0.0.2" }
318# Normal traffic — 2 requests, won't trigger
319@20s http_request { host: "cdn.example.com", method: "GET", ip: "10.0.0.5" }
320@21s http_request { host: "cdn.example.com", method: "GET", ip: "10.0.0.6" }"#.into(),
321            expected_output_count: None,
322        },
323        PlaygroundExampleDetail {
324            id: "error-storm".into(),
325            name: "Error Storm (Kleene)".into(),
326            description: "Detect cascading errors in microservices — repeated errors followed by a timeout.".into(),
327            category: "IoT".into(),
328            vpl: r#"# Cascading failure: 3+ errors then a timeout event
329# partition_by(service) isolates per-service detection
330stream ErrorStorm = service_error -> all service_error as errors -> timeout as t .within(30s)
331    .partition_by(service)
332    .where(match_count >= 4)
333    .emit(alert: "Error storm", service: t.service, error_count: match_count - 1)"#.into(),
334            events: r#"# 4 errors then timeout — triggers (4 errors >= 3)
335@0s service_error { service: "payments", code: 500, msg: "DB connection lost" }
336@2s service_error { service: "payments", code: 503, msg: "Circuit breaker open" }
337@4s service_error { service: "payments", code: 500, msg: "DB connection lost" }
338@6s service_error { service: "payments", code: 502, msg: "Bad gateway" }
339@8s timeout { service: "payments", duration_ms: 30000 }
340# 1 error then timeout — no alert (1 error < 3)
341@20s service_error { service: "auth", code: 401, msg: "Token expired" }
342@22s timeout { service: "auth", duration_ms: 5000 }"#.into(),
343            expected_output_count: None,
344        },
345        PlaygroundExampleDetail {
346            id: "stock-trend".into(),
347            name: "Stock Trend Count".into(),
348            description: "Count rising stock price trends using Hamlet trend aggregation — efficient multi-pattern counting.".into(),
349            category: "Finance".into(),
350            vpl: r"# Count how many rising trends appear in a stock's price series
351stream RisingTrends = StockTick as first
352    -> all StockTick as rising
353    .within(60s)
354    .trend_aggregate(count: count_trends())
355    .emit(trends: count)".into(),
356            events: r#"@0s StockTick { symbol: "AAPL", price: 150.0 }
357@1s StockTick { symbol: "AAPL", price: 152.0 }
358@2s StockTick { symbol: "AAPL", price: 155.0 }
359@3s StockTick { symbol: "AAPL", price: 153.0 }
360@4s StockTick { symbol: "AAPL", price: 158.0 }
361@5s StockTick { symbol: "AAPL", price: 160.0 }
362@6s StockTick { symbol: "AAPL", price: 162.0 }"#.into(),
363            expected_output_count: None,
364        },
365        PlaygroundExampleDetail {
366            id: "sensor-stats".into(),
367            name: "Sensor Statistics".into(),
368            description: "Compute sum, average, and count over sensor readings using trend aggregation.".into(),
369            category: "IoT".into(),
370            vpl: r"# Multiple aggregation functions on a Kleene sequence
371stream SensorStats = sensor_reading as first
372    -> all sensor_reading as readings
373    .within(60s)
374    .trend_aggregate(
375        total: sum_trends(readings.temperature),
376        avg_temp: avg_trends(readings.temperature),
377        reading_count: count_events(readings)
378    )
379    .emit(sum: total, average: avg_temp, count: reading_count)".into(),
380            events: r#"@0s sensor_reading { sensor_id: "S1", temperature: 20.5, zone: "A" }
381@1s sensor_reading { sensor_id: "S1", temperature: 22.0, zone: "A" }
382@2s sensor_reading { sensor_id: "S1", temperature: 24.5, zone: "A" }
383@3s sensor_reading { sensor_id: "S1", temperature: 21.0, zone: "A" }
384@4s sensor_reading { sensor_id: "S1", temperature: 26.0, zone: "A" }
385@5s sensor_reading { sensor_id: "S1", temperature: 28.5, zone: "A" }"#.into(),
386            expected_output_count: None,
387        },
388        PlaygroundExampleDetail {
389            id: "transaction-analysis".into(),
390            name: "Transaction Analysis".into(),
391            description: "Multi-type Kleene: track processing steps between transaction open and close, with aggregated stats.".into(),
392            category: "Advanced".into(),
393            vpl: r"# Open -> multiple steps -> Close, with aggregated statistics
394stream TxStats = TxOpen as open
395    -> all TxStep as steps
396    -> TxClose as close
397    .within(30m)
398    .trend_aggregate(
399        avg_dur: avg_trends(steps.duration),
400        max_errors: max_trends(steps.error_count),
401        step_count: count_events(steps),
402        trend_count: count_trends()
403    )
404    .emit(tx: open.tx_id, steps: step_count, avg_duration: avg_dur, max_err: max_errors)".into(),
405            events: r#"# Transaction TX-001: open, 4 processing steps, close
406@0s TxOpen { tx_id: "TX-001", customer: "acme_corp" }
407@1s TxStep { tx_id: "TX-001", step: "validate", duration: 120.0, error_count: 0.0 }
408@2s TxStep { tx_id: "TX-001", step: "enrich", duration: 350.0, error_count: 1.0 }
409@3s TxStep { tx_id: "TX-001", step: "transform", duration: 90.0, error_count: 0.0 }
410@4s TxStep { tx_id: "TX-001", step: "persist", duration: 200.0, error_count: 2.0 }
411@5s TxClose { tx_id: "TX-001", status: "completed" }"#.into(),
412            expected_output_count: None,
413        },
414        PlaygroundExampleDetail {
415            id: "forecast-fraud".into(),
416            name: "Fraud Forecasting".into(),
417            description: "Predict fraud patterns using PST-based forecasting — sequence prediction with confidence scores.".into(),
418            category: "Advanced".into(),
419            vpl: r"stream FraudForecast = login as l -> transfer as t .within(5m)
420    .forecast(confidence: 0.7, horizon: 2m, warmup: 50, max_depth: 3)
421    .where(forecast_probability > 0.5)
422    .emit(probability: forecast_probability, state: forecast_state)".into(),
423            events: {
424                // Generate a training sequence: alternating login/transfer pairs
425                let mut lines = Vec::new();
426                for i in 0..60 {
427                    let t = i * 2;
428                    lines.push(format!(
429                        "@{}s login {{ user_id: \"user_{}\", city: \"NYC\" }}",
430                        t,
431                        i % 10
432                    ));
433                    lines.push(format!(
434                        "@{}s transfer {{ user_id: \"user_{}\", amount: {} }}",
435                        t + 1,
436                        i % 10,
437                        100 + i * 10
438                    ));
439                }
440                // Add a final login to trigger forecast
441                lines.push("@120s login { user_id: \"user_0\", city: \"NYC\" }".into());
442                lines.join("\n")
443            },
444            expected_output_count: None, // Depends on PST learning convergence
445        },
446        PlaygroundExampleDetail {
447            id: "kleene-aggregates".into(),
448            name: "Kleene Aggregates".into(),
449            description: "Inline aggregates on Kleene matches — sum, avg, min, max, count without trend_aggregate.".into(),
450            category: "Advanced".into(),
451            vpl: r"# Detect large cumulative transfers after login
452# partition_by(user_id) isolates per-user sequences
453stream LargeTransfers = login as l -> all transfer as t .within(10m)
454    .partition_by(user_id)
455    .where(sum(t.amount) > 500)
456    .emit(
457        user: l.user_id,
458        total: sum(t.amount),
459        avg_amount: avg(t.amount),
460        largest: max(t.amount),
461        smallest: min(t.amount),
462        num_transfers: count(t)
463    )".into(),
464            events: r#"# Alice: login then 3 transfers totaling 1500 — triggers (1500 > 500)
465@0s login { user_id: "alice", city: "NYC" }
466@10s transfer { user_id: "alice", amount: 200.0, to: "ext_001" }
467@20s transfer { user_id: "alice", amount: 800.0, to: "ext_002" }
468@30s transfer { user_id: "alice", amount: 500.0, to: "ext_003" }
469# Bob: login then 1 small transfer — no alert (100 < 500)
470@60s login { user_id: "bob", city: "London" }
471@70s transfer { user_id: "bob", amount: 100.0, to: "ext_010" }"#.into(),
472            expected_output_count: None,
473        },
474        PlaygroundExampleDetail {
475            id: "first-last".into(),
476            name: "First/Last Access".into(),
477            description: "Access the first and last events in a Kleene match — detect location changes.".into(),
478            category: "Security".into(),
479            vpl: r"# Detect when a user roams across cities
480# partition_by(user_id) isolates per-user login sequences
481stream LocationChange = login as l -> all login as f .within(1h)
482    .partition_by(user_id)
483    .where(count(f) >= 2 and l.city != last(f).city)
484    .emit(
485        user: l.user_id,
486        origin: l.city,
487        first_roam: first(f).city,
488        latest: last(f).city,
489        roam_count: count(f)
490    )".into(),
491            events: r#"# Alice: NYC then roams to London, Tokyo, Berlin — triggers
492@0s login { user_id: "alice", city: "NYC", device: "mobile" }
493@60s login { user_id: "alice", city: "London", device: "laptop" }
494@120s login { user_id: "alice", city: "Tokyo", device: "desktop" }
495@180s login { user_id: "alice", city: "Berlin", device: "tablet" }
496# Bob: stays in Berlin — no location change
497@200s login { user_id: "bob", city: "Berlin", device: "mobile" }
498@260s login { user_id: "bob", city: "Berlin", device: "tablet" }"#.into(),
499            expected_output_count: None,
500        },
501        PlaygroundExampleDetail {
502            id: "distinct-count".into(),
503            name: "Distinct Count".into(),
504            description: "Count unique values across Kleene matches — detect distributed attacks from many IPs.".into(),
505            category: "Security".into(),
506            vpl: r"# Detect attacks from 3+ distinct source IPs
507# partition_by(target_ip) isolates per-target detection
508stream DistributedAttack = scan -> all scan as s .within(1m)
509    .partition_by(target_ip)
510    .where(distinct_count(s.source_ip) >= 3)
511    .emit(
512        target: s.target_ip,
513        unique_sources: distinct_count(s.source_ip),
514        total_scans: count(s)
515    )".into(),
516            events: r#"# Server .100: 5 scans from 4 distinct IPs — triggers (4 >= 3)
517@0s scan { source_ip: "10.0.0.1", target_ip: "192.168.1.100", port: 22 }
518@1s scan { source_ip: "10.0.0.2", target_ip: "192.168.1.100", port: 80 }
519@2s scan { source_ip: "10.0.0.3", target_ip: "192.168.1.100", port: 443 }
520@3s scan { source_ip: "10.0.0.1", target_ip: "192.168.1.100", port: 8080 }
521@4s scan { source_ip: "10.0.0.4", target_ip: "192.168.1.100", port: 3306 }
522# Server .200: 2 scans from 1 IP — no alert (1 < 3)
523@20s scan { source_ip: "10.0.0.5", target_ip: "192.168.1.200", port: 22 }
524@21s scan { source_ip: "10.0.0.5", target_ip: "192.168.1.200", port: 80 }"#.into(),
525            expected_output_count: None,
526        },
527        PlaygroundExampleDetail {
528            id: "absence-detection".into(),
529            name: "Absence Detection".into(),
530            description: "Detect when an expected event does NOT occur — order without shipment.".into(),
531            category: "Getting Started".into(),
532            vpl: r"# Alert if an order is placed but a cancellation arrives before shipment
533# .not(cancellation) invalidates the run if cancellation appears
534stream OrderCancelled = order as o -> shipment as s .within(5m)
535    .not(cancellation)
536    .emit(order_id: o.order_id, status: s.status)".into(),
537            events: r#"# Order 1: order -> shipment (no cancellation) — matches
538@0s order { order_id: "ORD-001", customer: "alice", total: 99.99 }
539@30s shipment { order_id: "ORD-001", status: "shipped", carrier: "ups" }
540# Order 2: order -> cancellation -> shipment — no match (cancellation kills run)
541@60s order { order_id: "ORD-002", customer: "bob", total: 50.00 }
542@70s cancellation { order_id: "ORD-002", reason: "changed mind" }
543@90s shipment { order_id: "ORD-002", status: "shipped", carrier: "fedex" }
544# Order 3: order -> shipment (no cancellation) — matches
545@120s order { order_id: "ORD-003", customer: "charlie", total: 200.00 }
546@150s shipment { order_id: "ORD-003", status: "shipped", carrier: "dhl" }"#.into(),
547            expected_output_count: None,
548        },
549        PlaygroundExampleDetail {
550            id: "rate-detection".into(),
551            name: "Rate Detection".into(),
552            description: "Detect high event rates using match_rate — events per second within a pattern match.".into(),
553            category: "Security".into(),
554            vpl: r"# Alert when request rate exceeds 2 events/second
555# partition_by(endpoint) isolates per-endpoint rate monitoring
556stream HighRate = request -> all request as r .within(30s)
557    .partition_by(endpoint)
558    .where(match_count >= 3 and match_rate > 2.0)
559    .emit(
560        endpoint: r.endpoint,
561        rate: match_rate,
562        total: match_count
563    )".into(),
564            events: r#"# /api/login: 6 requests in 2 seconds — triggers (rate > 2.0)
565@0s request { endpoint: "/api/login", ip: "10.0.0.1" }
566@0s request { endpoint: "/api/login", ip: "10.0.0.1" }
567@1s request { endpoint: "/api/login", ip: "10.0.0.1" }
568@1s request { endpoint: "/api/login", ip: "10.0.0.1" }
569@2s request { endpoint: "/api/login", ip: "10.0.0.1" }
570@2s request { endpoint: "/api/login", ip: "10.0.0.1" }
571# /api/data: 3 requests over 10 seconds — no alert (rate < 2.0)
572@20s request { endpoint: "/api/data", ip: "10.0.0.2" }
573@25s request { endpoint: "/api/data", ip: "10.0.0.2" }
574@30s request { endpoint: "/api/data", ip: "10.0.0.2" }"#.into(),
575            expected_output_count: None,
576        },
577    ]
578}
579
580// =============================================================================
581// Route construction
582// =============================================================================
583
584pub fn playground_routes(playground: SharedPlayground) -> Router {
585    Router::new()
586        .route("/api/v1/playground/session", post(handle_create_session))
587        .route(
588            "/api/v1/playground/run",
589            post(handle_run).layer(tower_http::limit::RequestBodyLimitLayer::new(1024 * 1024)),
590        )
591        .route(
592            "/api/v1/playground/validate",
593            post(handle_validate).layer(tower_http::limit::RequestBodyLimitLayer::new(256 * 1024)),
594        )
595        .route("/api/v1/playground/examples", get(handle_list_examples))
596        .route("/api/v1/playground/examples/{id}", get(handle_get_example))
597        .with_state(playground)
598}
599
600// =============================================================================
601// Handlers
602// =============================================================================
603
604async fn handle_create_session(State(playground): State<SharedPlayground>) -> impl IntoResponse {
605    let session_id = Uuid::new_v4().to_string();
606    {
607        let mut pg = playground.write().await;
608        pg.get_or_create_session(&session_id);
609    }
610    let resp = SessionResponse { session_id };
611    (StatusCode::CREATED, Json(resp))
612}
613
614async fn handle_run(
615    State(playground): State<SharedPlayground>,
616    Json(body): Json<PlaygroundRunRequest>,
617) -> Response {
618    // Validate input size
619    if body.vpl.len() > MAX_VPL_LENGTH {
620        return pg_error_response(
621            StatusCode::BAD_REQUEST,
622            "vpl_too_large",
623            &format!("VPL source exceeds maximum size of {MAX_VPL_LENGTH} bytes"),
624        );
625    }
626
627    // Track session (auto-create if needed)
628    {
629        let mut pg = playground.write().await;
630        let session_id = Uuid::new_v4().to_string();
631        pg.get_or_create_session(&session_id);
632    }
633
634    let start = Instant::now();
635
636    // Parse .evt format text into runtime events
637    let timed_events = match varpulis_runtime::event_file::EventFileParser::parse(&body.events) {
638        Ok(te) => te,
639        Err(e) => {
640            let resp = PlaygroundRunResponse {
641                ok: false,
642                events_processed: 0,
643                output_events: vec![],
644                latency_ms: 0,
645                diagnostics: vec![],
646                error: Some(format!("Event parse error: {e}")),
647            };
648            return (StatusCode::OK, Json(resp)).into_response();
649        }
650    };
651
652    if timed_events.len() > MAX_EVENTS_PER_RUN {
653        return pg_error_response(
654            StatusCode::BAD_REQUEST,
655            "too_many_events",
656            &format!("Maximum {MAX_EVENTS_PER_RUN} events per run"),
657        );
658    }
659
660    let events: Vec<Event> = timed_events.into_iter().map(|te| te.event).collect();
661    let event_count = events.len();
662
663    // Execute with timeout
664    let run_result = tokio::time::timeout(
665        Duration::from_secs(MAX_EXECUTION_SECS),
666        crate::simulate_from_source(&body.vpl, events),
667    )
668    .await;
669
670    let latency_ms = start.elapsed().as_millis() as u64;
671
672    match run_result {
673        Ok(Ok(output_events)) => {
674            let output: Vec<serde_json::Value> = output_events
675                .iter()
676                .map(|e| {
677                    let mut flat = serde_json::Map::new();
678                    flat.insert(
679                        "event_type".to_string(),
680                        serde_json::Value::String(e.event_type.to_string()),
681                    );
682                    for (k, v) in &e.data {
683                        flat.insert(k.to_string(), crate::websocket::value_to_json(v));
684                    }
685                    serde_json::Value::Object(flat)
686                })
687                .collect();
688
689            let resp = PlaygroundRunResponse {
690                ok: true,
691                events_processed: event_count,
692                output_events: output,
693                latency_ms,
694                diagnostics: vec![],
695                error: None,
696            };
697            (StatusCode::OK, Json(resp)).into_response()
698        }
699        Ok(Err(e)) => {
700            let resp = PlaygroundRunResponse {
701                ok: false,
702                events_processed: 0,
703                output_events: vec![],
704                latency_ms,
705                diagnostics: vec![],
706                error: Some(e.to_string()),
707            };
708            (StatusCode::OK, Json(resp)).into_response()
709        }
710        Err(_timeout) => {
711            let resp = PlaygroundRunResponse {
712                ok: false,
713                events_processed: 0,
714                output_events: vec![],
715                latency_ms,
716                diagnostics: vec![],
717                error: Some(format!("Execution timed out after {MAX_EXECUTION_SECS}s")),
718            };
719            (StatusCode::REQUEST_TIMEOUT, Json(resp)).into_response()
720        }
721    }
722}
723
724async fn handle_validate(
725    State(_playground): State<SharedPlayground>,
726    Json(body): Json<PlaygroundValidateRequest>,
727) -> Response {
728    if body.vpl.len() > MAX_VPL_LENGTH {
729        return pg_error_response(
730            StatusCode::BAD_REQUEST,
731            "vpl_too_large",
732            &format!("VPL source exceeds maximum size of {MAX_VPL_LENGTH} bytes"),
733        );
734    }
735
736    let result = match varpulis_parser::parse(&body.vpl) {
737        Ok(program) => {
738            let ast = serde_json::to_value(&program).ok();
739            let validation = varpulis_core::validate::validate(&body.vpl, &program);
740            // Filter out E033 (undefined event type) — playground events are
741            // provided at runtime via .evt format, not declared in VPL.
742            let diagnostics: Vec<PlaygroundDiagnostic> = validation
743                .diagnostics
744                .iter()
745                .filter(|d| d.code != Some("E033"))
746                .map(|d| {
747                    let (sl, sc) = position_to_line_col(&body.vpl, d.span.start);
748                    let (el, ec) = position_to_line_col(&body.vpl, d.span.end);
749                    PlaygroundDiagnostic {
750                        severity: match d.severity {
751                            varpulis_core::validate::Severity::Error => "error".into(),
752                            varpulis_core::validate::Severity::Warning => "warning".into(),
753                        },
754                        message: d.message.clone(),
755                        hint: d.hint.clone(),
756                        code: d.code.map(|c| c.to_string()),
757                        start_line: sl as u32,
758                        start_col: sc as u32,
759                        end_line: el as u32,
760                        end_col: ec as u32,
761                    }
762                })
763                .collect();
764            let has_errors = diagnostics.iter().any(|d| d.severity == "error");
765            PlaygroundValidateResponse {
766                ok: !has_errors,
767                ast,
768                diagnostics,
769            }
770        }
771        Err(error) => {
772            let diag = parse_error_to_diagnostic(&body.vpl, &error);
773            PlaygroundValidateResponse {
774                ok: false,
775                ast: None,
776                diagnostics: vec![diag],
777            }
778        }
779    };
780
781    (StatusCode::OK, Json(result)).into_response()
782}
783
784async fn handle_list_examples() -> impl IntoResponse {
785    let examples: Vec<PlaygroundExample> = builtin_examples()
786        .into_iter()
787        .map(|e| PlaygroundExample {
788            id: e.id,
789            name: e.name,
790            description: e.description,
791            category: e.category,
792        })
793        .collect();
794    (StatusCode::OK, Json(examples))
795}
796
797async fn handle_get_example(Path(id): Path<String>) -> Response {
798    let examples = builtin_examples();
799    match examples.into_iter().find(|e| e.id == id) {
800        Some(example) => (StatusCode::OK, Json(example)).into_response(),
801        None => pg_error_response(
802            StatusCode::NOT_FOUND,
803            "example_not_found",
804            &format!("Example '{id}' not found"),
805        ),
806    }
807}
808
809// =============================================================================
810// Session reaper task
811// =============================================================================
812
813/// Spawn a background task that periodically cleans up expired sessions.
814pub fn spawn_session_reaper(playground: SharedPlayground) {
815    tokio::spawn(async move {
816        loop {
817            tokio::time::sleep(REAPER_INTERVAL).await;
818            let mut pg = playground.write().await;
819            let reaped = pg.reap_expired();
820            if reaped > 0 {
821                tracing::debug!("Playground: reaped {} expired sessions", reaped);
822            }
823        }
824    });
825}
826
827// =============================================================================
828// Helpers
829// =============================================================================
830
831fn pg_error_response(status: StatusCode, code: &str, message: &str) -> Response {
832    let body = PlaygroundError {
833        error: message.to_string(),
834        code: code.to_string(),
835    };
836    (status, Json(body)).into_response()
837}
838
839fn parse_error_to_diagnostic(
840    source: &str,
841    error: &varpulis_parser::ParseError,
842) -> PlaygroundDiagnostic {
843    use varpulis_parser::ParseError;
844    match error {
845        ParseError::Located {
846            line,
847            column,
848            message,
849            hint,
850            ..
851        } => PlaygroundDiagnostic {
852            severity: "error".into(),
853            message: message.clone(),
854            hint: hint.clone(),
855            code: None,
856            start_line: line.saturating_sub(1) as u32,
857            start_col: column.saturating_sub(1) as u32,
858            end_line: line.saturating_sub(1) as u32,
859            end_col: *column as u32,
860        },
861        ParseError::UnexpectedToken {
862            position,
863            expected,
864            found,
865        } => {
866            let (line, col) = position_to_line_col(source, *position);
867            PlaygroundDiagnostic {
868                severity: "error".into(),
869                message: format!("Unexpected token: expected {expected}, found '{found}'"),
870                hint: None,
871                code: None,
872                start_line: line as u32,
873                start_col: col as u32,
874                end_line: line as u32,
875                end_col: (col + found.len()) as u32,
876            }
877        }
878        ParseError::UnexpectedEof => {
879            let line = source.lines().count().saturating_sub(1);
880            let col = source.lines().last().map_or(0, |l| l.len());
881            PlaygroundDiagnostic {
882                severity: "error".into(),
883                message: "Unexpected end of input".into(),
884                hint: None,
885                code: None,
886                start_line: line as u32,
887                start_col: col as u32,
888                end_line: line as u32,
889                end_col: col as u32,
890            }
891        }
892        ParseError::InvalidToken { position, message } => {
893            let (line, col) = position_to_line_col(source, *position);
894            PlaygroundDiagnostic {
895                severity: "error".into(),
896                message: message.clone(),
897                hint: None,
898                code: None,
899                start_line: line as u32,
900                start_col: col as u32,
901                end_line: line as u32,
902                end_col: (col + 10) as u32,
903            }
904        }
905        ParseError::InvalidNumber(msg)
906        | ParseError::InvalidDuration(msg)
907        | ParseError::InvalidTimestamp(msg)
908        | ParseError::InvalidEscape(msg) => PlaygroundDiagnostic {
909            severity: "error".into(),
910            message: msg.clone(),
911            hint: None,
912            code: None,
913            start_line: 0,
914            start_col: 0,
915            end_line: 0,
916            end_col: 0,
917        },
918        ParseError::UnterminatedString(position) => {
919            let (line, col) = position_to_line_col(source, *position);
920            PlaygroundDiagnostic {
921                severity: "error".into(),
922                message: "Unterminated string literal".into(),
923                hint: None,
924                code: None,
925                start_line: line as u32,
926                start_col: col as u32,
927                end_line: line as u32,
928                end_col: source.lines().nth(line).map_or(col, |l| l.len()) as u32,
929            }
930        }
931        ParseError::Custom { span, message } => {
932            let (sl, sc) = position_to_line_col(source, span.start);
933            let (el, ec) = position_to_line_col(source, span.end);
934            PlaygroundDiagnostic {
935                severity: "error".into(),
936                message: message.clone(),
937                hint: None,
938                code: None,
939                start_line: sl as u32,
940                start_col: sc as u32,
941                end_line: el as u32,
942                end_col: ec as u32,
943            }
944        }
945    }
946}
947
948fn position_to_line_col(source: &str, position: usize) -> (usize, usize) {
949    let mut line = 0;
950    let mut col = 0;
951    let mut pos = 0;
952
953    for ch in source.chars() {
954        if pos >= position {
955            break;
956        }
957        if ch == '\n' {
958            line += 1;
959            col = 0;
960        } else {
961            col += 1;
962        }
963        pos += ch.len_utf8();
964    }
965
966    (line, col)
967}
968
969#[cfg(test)]
970mod tests {
971    use super::*;
972
973    #[test]
974    fn test_builtin_examples_valid() {
975        let examples = builtin_examples();
976        assert!(!examples.is_empty());
977        for example in &examples {
978            assert!(!example.id.is_empty());
979            assert!(!example.name.is_empty());
980            assert!(!example.vpl.is_empty());
981            assert!(!example.events.is_empty());
982        }
983    }
984
985    #[test]
986    fn test_builtin_examples_unique_ids() {
987        let examples = builtin_examples();
988        let mut ids: Vec<&str> = examples.iter().map(|e| e.id.as_str()).collect();
989        ids.sort_unstable();
990        ids.dedup();
991        assert_eq!(ids.len(), examples.len(), "Duplicate example IDs found");
992    }
993
994    #[test]
995    fn test_session_reaping() {
996        let mut state = PlaygroundState::new();
997        state.get_or_create_session("test-1");
998        state.get_or_create_session("test-2");
999        assert_eq!(state.sessions.len(), 2);
1000
1001        // Sessions should not be reaped immediately
1002        let reaped = state.reap_expired();
1003        assert_eq!(reaped, 0);
1004        assert_eq!(state.sessions.len(), 2);
1005    }
1006
1007    #[test]
1008    fn test_validate_response_for_valid_vpl() {
1009        let vpl = r#"
1010event SensorReading:
1011    temperature: int
1012
1013stream HighTemp = SensorReading
1014    .where(temperature > 30)
1015    .emit(alert: "high_temp")
1016"#;
1017        match varpulis_parser::parse(vpl) {
1018            Ok(program) => {
1019                let validation = varpulis_core::validate::validate(vpl, &program);
1020                let has_errors = validation
1021                    .diagnostics
1022                    .iter()
1023                    .any(|d| d.severity == varpulis_core::validate::Severity::Error);
1024                assert!(!has_errors);
1025            }
1026            Err(e) => panic!("Parse failed: {e}"),
1027        }
1028    }
1029
1030    #[test]
1031    fn test_evt_format_parsing() {
1032        let evt_text = r#"sensor_reading { sensor_id: "S001", temperature: 65.3 }"#;
1033        let timed = varpulis_runtime::event_file::EventFileParser::parse(evt_text).unwrap();
1034        assert_eq!(timed.len(), 1);
1035        assert_eq!(timed[0].event.event_type.as_ref(), "sensor_reading");
1036    }
1037
1038    #[tokio::test]
1039    async fn test_iot_anomaly_example_produces_matches() {
1040        let examples = builtin_examples();
1041        let iot = examples.iter().find(|e| e.id == "iot-anomaly").unwrap();
1042
1043        let timed = varpulis_runtime::event_file::EventFileParser::parse(&iot.events).unwrap();
1044        let events: Vec<varpulis_runtime::event::Event> =
1045            timed.into_iter().map(|te| te.event).collect();
1046
1047        let results = crate::simulate_from_source(&iot.vpl, events).await.unwrap();
1048        assert_eq!(
1049            results.len(),
1050            iot.expected_output_count.unwrap(),
1051            "IoT anomaly example should produce {} matches, got {}",
1052            iot.expected_output_count.unwrap(),
1053            results.len()
1054        );
1055    }
1056
1057    #[tokio::test]
1058    async fn test_all_examples_with_expected_count() {
1059        for example in builtin_examples() {
1060            let Some(expected) = example.expected_output_count else {
1061                continue;
1062            };
1063
1064            let timed = varpulis_runtime::event_file::EventFileParser::parse(&example.events)
1065                .unwrap_or_else(|e| panic!("Event parse failed for '{}': {e}", example.id));
1066            let events: Vec<varpulis_runtime::event::Event> =
1067                timed.into_iter().map(|te| te.event).collect();
1068
1069            let results = crate::simulate_from_source(&example.vpl, events)
1070                .await
1071                .unwrap_or_else(|e| panic!("Run failed for '{}': {e}", example.id));
1072            assert_eq!(
1073                results.len(),
1074                expected,
1075                "Example '{}' expected {} matches, got {}",
1076                example.id,
1077                expected,
1078                results.len()
1079            );
1080        }
1081    }
1082}