Skip to main content

varpulis_cli/
playground.rs

1//! Playground API — zero-friction VPL execution for the interactive playground.
2//!
3//! Provides ephemeral sessions with no authentication required.
4//! Sessions auto-expire after inactivity and have conservative resource quotas.
5
6use std::collections::HashMap;
7use std::sync::Arc;
8use std::time::{Duration, Instant};
9
10use axum::extract::{Json, Path, State};
11use axum::http::StatusCode;
12use axum::response::{IntoResponse, Response};
13use axum::routing::{get, post};
14use axum::Router;
15use serde::{Deserialize, Serialize};
16use tokio::sync::RwLock;
17use uuid::Uuid;
18use varpulis_runtime::event::Event;
19
20// =============================================================================
21// Constants
22// =============================================================================
23
24/// Maximum events per playground run request.
25const MAX_EVENTS_PER_RUN: usize = 10_000;
26
27/// Maximum execution timeout per request (seconds).
28const MAX_EXECUTION_SECS: u64 = 10;
29
30/// Session expiry after inactivity.
31const SESSION_EXPIRY: Duration = Duration::from_secs(3600); // 1 hour
32
33/// Reaper interval — how often to clean up expired sessions.
34const REAPER_INTERVAL: Duration = Duration::from_secs(300); // 5 minutes
35
36/// Maximum VPL source length.
37const MAX_VPL_LENGTH: usize = 50_000;
38
39// =============================================================================
40// Types
41// =============================================================================
42
43/// Shared playground state.
44pub type SharedPlayground = Arc<RwLock<PlaygroundState>>;
45
46/// State for the playground backend.
47#[derive(Debug)]
48pub struct PlaygroundState {
49    sessions: HashMap<String, PlaygroundSession>,
50}
51
52#[derive(Debug)]
53struct PlaygroundSession {
54    last_active: Instant,
55}
56
57impl Default for PlaygroundState {
58    fn default() -> Self {
59        Self::new()
60    }
61}
62
63impl PlaygroundState {
64    pub fn new() -> Self {
65        Self {
66            sessions: HashMap::new(),
67        }
68    }
69
70    fn get_or_create_session(&mut self, session_id: &str) -> &mut PlaygroundSession {
71        self.sessions
72            .entry(session_id.to_string())
73            .or_insert_with(|| PlaygroundSession {
74                last_active: Instant::now(),
75            })
76    }
77
78    fn reap_expired(&mut self) -> usize {
79        let before = self.sessions.len();
80        self.sessions
81            .retain(|_, s| s.last_active.elapsed() < SESSION_EXPIRY);
82        before - self.sessions.len()
83    }
84}
85
86// =============================================================================
87// Request/Response types
88// =============================================================================
89
90#[derive(Debug, Serialize)]
91pub struct SessionResponse {
92    pub session_id: String,
93}
94
95#[derive(Debug, Deserialize)]
96pub struct PlaygroundRunRequest {
97    pub vpl: String,
98    /// Events in .evt text format (e.g., `sensor_reading { temperature: 65.3, zone: "A" }`)
99    #[serde(default)]
100    pub events: String,
101}
102
103#[derive(Debug, Deserialize)]
104pub struct PlaygroundValidateRequest {
105    pub vpl: String,
106}
107
108#[derive(Debug, Serialize)]
109pub struct PlaygroundRunResponse {
110    pub ok: bool,
111    pub events_processed: usize,
112    pub output_events: Vec<serde_json::Value>,
113    pub latency_ms: u64,
114    #[serde(skip_serializing_if = "Vec::is_empty")]
115    pub diagnostics: Vec<PlaygroundDiagnostic>,
116    #[serde(skip_serializing_if = "Option::is_none")]
117    pub error: Option<String>,
118}
119
120#[derive(Debug, Serialize)]
121pub struct PlaygroundValidateResponse {
122    pub ok: bool,
123    #[serde(skip_serializing_if = "Option::is_none")]
124    pub ast: Option<serde_json::Value>,
125    pub diagnostics: Vec<PlaygroundDiagnostic>,
126}
127
128#[derive(Debug, Serialize)]
129pub struct PlaygroundDiagnostic {
130    pub severity: String,
131    pub message: String,
132    #[serde(skip_serializing_if = "Option::is_none")]
133    pub hint: Option<String>,
134    #[serde(skip_serializing_if = "Option::is_none")]
135    pub code: Option<String>,
136    pub start_line: u32,
137    pub start_col: u32,
138    pub end_line: u32,
139    pub end_col: u32,
140}
141
142#[derive(Debug, Serialize)]
143pub struct PlaygroundExample {
144    pub id: String,
145    pub name: String,
146    pub description: String,
147    pub category: String,
148}
149
150#[derive(Debug, Serialize)]
151pub struct PlaygroundExampleDetail {
152    pub id: String,
153    pub name: String,
154    pub description: String,
155    pub category: String,
156    pub vpl: String,
157    /// Events in .evt text format
158    pub events: String,
159    pub expected_output_count: Option<usize>,
160}
161
162#[derive(Debug, Serialize)]
163struct PlaygroundError {
164    error: String,
165    code: String,
166}
167
168// =============================================================================
169// Built-in examples
170// =============================================================================
171
172fn builtin_examples() -> Vec<PlaygroundExampleDetail> {
173    vec![
174        PlaygroundExampleDetail {
175            id: "hvac-alert".into(),
176            name: "HVAC Alert".into(),
177            description: "Simple temperature threshold alert — the 'Hello World' of CEP.".into(),
178            category: "Getting Started".into(),
179            vpl: r#"stream HighTemp = TempReading
180    .where(temperature > 30)
181    .emit(alert: "High temperature detected", sensor: sensor_id, temp: temperature)"#.into(),
182            events: r#"@0s TempReading { sensor_id: "HVAC-01", temperature: 22 }
183@1s TempReading { sensor_id: "HVAC-02", temperature: 35 }
184@2s TempReading { sensor_id: "HVAC-03", temperature: 28 }
185@3s TempReading { sensor_id: "HVAC-01", temperature: 41 }
186@4s TempReading { sensor_id: "HVAC-04", temperature: 19 }
187@5s TempReading { sensor_id: "HVAC-02", temperature: 33 }"#.into(),
188            expected_output_count: Some(3),
189        },
190        PlaygroundExampleDetail {
191            id: "fraud-detection".into(),
192            name: "Fraud Detection".into(),
193            description: "Detect login followed by large transfer within 5 minutes — sequence pattern matching.".into(),
194            category: "Finance".into(),
195            vpl: r#"stream FraudAlert = login as l -> transfer as t .within(5m)
196    .where(l.user_id == t.user_id and t.amount > 5000)
197    .emit(alert: "Suspicious transfer after login", user: l.user_id, amount: t.amount, city: l.city)"#.into(),
198            events: r#"# Alice logs in, then makes a large transfer — triggers alert
199@0s  login { user_id: "alice", city: "New York", device: "mobile" }
200@10s transfer { user_id: "bob", amount: 200, to_account: "ext_001" }
201@30s transfer { user_id: "alice", amount: 15000, to_account: "ext_099" }
202# Charlie scenario
203@60s login { user_id: "charlie", city: "London", device: "desktop" }
204@90s transfer { user_id: "charlie", amount: 8500, to_account: "ext_042" }
205@120s transfer { user_id: "alice", amount: 100, to_account: "ext_005" }"#.into(),
206            expected_output_count: None,
207        },
208        PlaygroundExampleDetail {
209            id: "iot-anomaly".into(),
210            name: "IoT Sensor Anomaly".into(),
211            description: "Detect temperature spikes in sensor data using window aggregation.".into(),
212            category: "IoT".into(),
213            vpl: r#"stream TempSpike = sensor_reading
214    .where(temperature > 50)
215    .emit(alert: "Temperature spike", sensor: sensor_id, temp: temperature, zone: zone)"#.into(),
216            events: r#"@0s sensor_reading { sensor_id: "S001", zone: "zone_a", temperature: 22.5 }
217@1s sensor_reading { sensor_id: "S002", zone: "zone_b", temperature: 65.3 }
218@2s sensor_reading { sensor_id: "S001", zone: "zone_a", temperature: 23.1 }
219@3s sensor_reading { sensor_id: "S003", zone: "zone_c", temperature: 55.0 }
220@4s sensor_reading { sensor_id: "S002", zone: "zone_b", temperature: 24.0 }"#.into(),
221            expected_output_count: Some(2),
222        },
223        PlaygroundExampleDetail {
224            id: "trading-signal".into(),
225            name: "Trading Signal".into(),
226            description: "Detect large trades on a single symbol — volume spike alert.".into(),
227            category: "Finance".into(),
228            vpl: r#"stream VolumeSpike = trade
229    .where(volume > 10000)
230    .emit(alert: "Large trade detected", symbol: symbol, vol: volume, price: price, side: side)"#.into(),
231            events: r#"@0s trade { symbol: "AAPL", price: 185.50, volume: 500, side: "buy" }
232@1s trade { symbol: "GOOGL", price: 142.30, volume: 25000, side: "sell" }
233@2s trade { symbol: "AAPL", price: 185.60, volume: 15000, side: "buy" }
234@3s trade { symbol: "TSLA", price: 250.10, volume: 800, side: "sell" }
235@4s trade { symbol: "MSFT", price: 420.00, volume: 50000, side: "buy" }"#.into(),
236            expected_output_count: Some(3),
237        },
238        PlaygroundExampleDetail {
239            id: "cyber-killchain".into(),
240            name: "Cyber Kill Chain".into(),
241            description: "Detect a 3-stage attack sequence: scan → exploit → exfiltrate within 10 minutes.".into(),
242            category: "Security".into(),
243            vpl: r#"stream KillChain = scan as s -> exploit as e -> exfiltrate as x .within(10m)
244    .where(s.target_ip == e.target_ip and e.target_ip == x.source_ip)
245    .emit(alert: "Kill chain detected", target: s.target_ip, attacker: s.source_ip)"#.into(),
246            events: r#"@0s  scan { source_ip: "10.0.0.5", target_ip: "192.168.1.100", port: 443 }
247@30s exploit { source_ip: "10.0.0.5", target_ip: "192.168.1.100", cve: "CVE-2024-1234" }
248@60s exfiltrate { source_ip: "192.168.1.100", dest_ip: "10.0.0.5", bytes: 50000000 }
249@120s scan { source_ip: "10.0.0.9", target_ip: "192.168.1.200", port: 80 }
250@180s login { user_id: "admin", ip: "192.168.1.200" }"#.into(),
251            expected_output_count: None,
252        },
253        PlaygroundExampleDetail {
254            id: "kleene-pattern".into(),
255            name: "Kleene Pattern".into(),
256            description: "Match one or more failed logins followed by a successful login — brute force detection.".into(),
257            category: "Security".into(),
258            vpl: r#"stream BruteForce = failed_login+ as fails -> successful_login as success .within(5m)
259    .where(fails.user_id == success.user_id)
260    .emit(alert: "Possible brute force", user: success.user_id)"#.into(),
261            events: r#"@0s failed_login { user_id: "admin", ip: "10.0.0.5" }
262@1s failed_login { user_id: "admin", ip: "10.0.0.5" }
263@2s failed_login { user_id: "admin", ip: "10.0.0.5" }
264@3s successful_login { user_id: "admin", ip: "10.0.0.5" }
265@4s successful_login { user_id: "bob", ip: "10.0.0.9" }"#.into(),
266            expected_output_count: None,
267        },
268        PlaygroundExampleDetail {
269            id: "merge-stream".into(),
270            name: "Merge Streams".into(),
271            description: "Combine events from multiple sources into a single alert stream.".into(),
272            category: "Getting Started".into(),
273            vpl: r#"stream TempAlerts = TempReading
274    .where(temperature > 30)
275    .emit(alert: "High temp", source: "temp", value: temperature)
276
277stream HumidAlerts = HumidityReading
278    .where(humidity > 80)
279    .emit(alert: "High humidity", source: "humidity", value: humidity)
280
281stream AllAlerts = merge(TempAlerts, HumidAlerts)
282    .emit()"#.into(),
283            events: r#"@0s TempReading { sensor_id: "S1", temperature: 35 }
284@1s HumidityReading { sensor_id: "S2", humidity: 85 }
285@2s TempReading { sensor_id: "S3", temperature: 22 }
286@3s HumidityReading { sensor_id: "S4", humidity: 45 }
287@4s TempReading { sensor_id: "S5", temperature: 38 }"#.into(),
288            expected_output_count: None,
289        },
290        PlaygroundExampleDetail {
291            id: "forecast-fraud".into(),
292            name: "Fraud Forecasting".into(),
293            description: "Predict fraud patterns using PST-based forecasting — sequence prediction with confidence scores.".into(),
294            category: "Advanced".into(),
295            vpl: r"stream FraudForecast = login as l -> transfer as t .within(5m)
296    .forecast(confidence: 0.7, horizon: 2m, warmup: 50, max_depth: 3)
297    .where(forecast_probability > 0.5)
298    .emit(probability: forecast_probability, state: forecast_state)".into(),
299            events: {
300                // Generate a training sequence: alternating login/transfer pairs
301                let mut lines = Vec::new();
302                for i in 0..60 {
303                    let t = i * 2;
304                    lines.push(format!(
305                        "@{}s login {{ user_id: \"user_{}\", city: \"NYC\" }}",
306                        t,
307                        i % 10
308                    ));
309                    lines.push(format!(
310                        "@{}s transfer {{ user_id: \"user_{}\", amount: {} }}",
311                        t + 1,
312                        i % 10,
313                        100 + i * 10
314                    ));
315                }
316                // Add a final login to trigger forecast
317                lines.push("@120s login { user_id: \"user_0\", city: \"NYC\" }".into());
318                lines.join("\n")
319            },
320            expected_output_count: None, // Depends on PST learning convergence
321        },
322    ]
323}
324
325// =============================================================================
326// Route construction
327// =============================================================================
328
329pub fn playground_routes(playground: SharedPlayground) -> Router {
330    Router::new()
331        .route("/api/v1/playground/session", post(handle_create_session))
332        .route(
333            "/api/v1/playground/run",
334            post(handle_run).layer(tower_http::limit::RequestBodyLimitLayer::new(1024 * 1024)),
335        )
336        .route(
337            "/api/v1/playground/validate",
338            post(handle_validate).layer(tower_http::limit::RequestBodyLimitLayer::new(256 * 1024)),
339        )
340        .route("/api/v1/playground/examples", get(handle_list_examples))
341        .route("/api/v1/playground/examples/{id}", get(handle_get_example))
342        .with_state(playground)
343}
344
345// =============================================================================
346// Handlers
347// =============================================================================
348
349async fn handle_create_session(State(playground): State<SharedPlayground>) -> impl IntoResponse {
350    let session_id = Uuid::new_v4().to_string();
351    {
352        let mut pg = playground.write().await;
353        pg.get_or_create_session(&session_id);
354    }
355    let resp = SessionResponse { session_id };
356    (StatusCode::CREATED, Json(resp))
357}
358
359async fn handle_run(
360    State(playground): State<SharedPlayground>,
361    Json(body): Json<PlaygroundRunRequest>,
362) -> Response {
363    // Validate input size
364    if body.vpl.len() > MAX_VPL_LENGTH {
365        return pg_error_response(
366            StatusCode::BAD_REQUEST,
367            "vpl_too_large",
368            &format!("VPL source exceeds maximum size of {MAX_VPL_LENGTH} bytes"),
369        );
370    }
371
372    // Track session (auto-create if needed)
373    {
374        let mut pg = playground.write().await;
375        let session_id = Uuid::new_v4().to_string();
376        pg.get_or_create_session(&session_id);
377    }
378
379    let start = Instant::now();
380
381    // Parse .evt format text into runtime events
382    let timed_events = match varpulis_runtime::event_file::EventFileParser::parse(&body.events) {
383        Ok(te) => te,
384        Err(e) => {
385            let resp = PlaygroundRunResponse {
386                ok: false,
387                events_processed: 0,
388                output_events: vec![],
389                latency_ms: 0,
390                diagnostics: vec![],
391                error: Some(format!("Event parse error: {e}")),
392            };
393            return (StatusCode::OK, Json(resp)).into_response();
394        }
395    };
396
397    if timed_events.len() > MAX_EVENTS_PER_RUN {
398        return pg_error_response(
399            StatusCode::BAD_REQUEST,
400            "too_many_events",
401            &format!("Maximum {MAX_EVENTS_PER_RUN} events per run"),
402        );
403    }
404
405    let events: Vec<Event> = timed_events.into_iter().map(|te| te.event).collect();
406    let event_count = events.len();
407
408    // Execute with timeout
409    let run_result = tokio::time::timeout(
410        Duration::from_secs(MAX_EXECUTION_SECS),
411        crate::simulate_from_source(&body.vpl, events),
412    )
413    .await;
414
415    let latency_ms = start.elapsed().as_millis() as u64;
416
417    match run_result {
418        Ok(Ok(output_events)) => {
419            let output: Vec<serde_json::Value> = output_events
420                .iter()
421                .map(|e| {
422                    let mut flat = serde_json::Map::new();
423                    flat.insert(
424                        "event_type".to_string(),
425                        serde_json::Value::String(e.event_type.to_string()),
426                    );
427                    for (k, v) in &e.data {
428                        flat.insert(k.to_string(), crate::websocket::value_to_json(v));
429                    }
430                    serde_json::Value::Object(flat)
431                })
432                .collect();
433
434            let resp = PlaygroundRunResponse {
435                ok: true,
436                events_processed: event_count,
437                output_events: output,
438                latency_ms,
439                diagnostics: vec![],
440                error: None,
441            };
442            (StatusCode::OK, Json(resp)).into_response()
443        }
444        Ok(Err(e)) => {
445            let resp = PlaygroundRunResponse {
446                ok: false,
447                events_processed: 0,
448                output_events: vec![],
449                latency_ms,
450                diagnostics: vec![],
451                error: Some(e.to_string()),
452            };
453            (StatusCode::OK, Json(resp)).into_response()
454        }
455        Err(_timeout) => {
456            let resp = PlaygroundRunResponse {
457                ok: false,
458                events_processed: 0,
459                output_events: vec![],
460                latency_ms,
461                diagnostics: vec![],
462                error: Some(format!("Execution timed out after {MAX_EXECUTION_SECS}s")),
463            };
464            (StatusCode::REQUEST_TIMEOUT, Json(resp)).into_response()
465        }
466    }
467}
468
469async fn handle_validate(
470    State(_playground): State<SharedPlayground>,
471    Json(body): Json<PlaygroundValidateRequest>,
472) -> Response {
473    if body.vpl.len() > MAX_VPL_LENGTH {
474        return pg_error_response(
475            StatusCode::BAD_REQUEST,
476            "vpl_too_large",
477            &format!("VPL source exceeds maximum size of {MAX_VPL_LENGTH} bytes"),
478        );
479    }
480
481    let result = match varpulis_parser::parse(&body.vpl) {
482        Ok(program) => {
483            let ast = serde_json::to_value(&program).ok();
484            let validation = varpulis_core::validate::validate(&body.vpl, &program);
485            let diagnostics: Vec<PlaygroundDiagnostic> = validation
486                .diagnostics
487                .iter()
488                .map(|d| {
489                    let (sl, sc) = position_to_line_col(&body.vpl, d.span.start);
490                    let (el, ec) = position_to_line_col(&body.vpl, d.span.end);
491                    PlaygroundDiagnostic {
492                        severity: match d.severity {
493                            varpulis_core::validate::Severity::Error => "error".into(),
494                            varpulis_core::validate::Severity::Warning => "warning".into(),
495                        },
496                        message: d.message.clone(),
497                        hint: d.hint.clone(),
498                        code: d.code.map(|c| c.to_string()),
499                        start_line: sl as u32,
500                        start_col: sc as u32,
501                        end_line: el as u32,
502                        end_col: ec as u32,
503                    }
504                })
505                .collect();
506            let has_errors = diagnostics.iter().any(|d| d.severity == "error");
507            PlaygroundValidateResponse {
508                ok: !has_errors,
509                ast,
510                diagnostics,
511            }
512        }
513        Err(error) => {
514            let diag = parse_error_to_diagnostic(&body.vpl, &error);
515            PlaygroundValidateResponse {
516                ok: false,
517                ast: None,
518                diagnostics: vec![diag],
519            }
520        }
521    };
522
523    (StatusCode::OK, Json(result)).into_response()
524}
525
526async fn handle_list_examples() -> impl IntoResponse {
527    let examples: Vec<PlaygroundExample> = builtin_examples()
528        .into_iter()
529        .map(|e| PlaygroundExample {
530            id: e.id,
531            name: e.name,
532            description: e.description,
533            category: e.category,
534        })
535        .collect();
536    (StatusCode::OK, Json(examples))
537}
538
539async fn handle_get_example(Path(id): Path<String>) -> Response {
540    let examples = builtin_examples();
541    match examples.into_iter().find(|e| e.id == id) {
542        Some(example) => (StatusCode::OK, Json(example)).into_response(),
543        None => pg_error_response(
544            StatusCode::NOT_FOUND,
545            "example_not_found",
546            &format!("Example '{id}' not found"),
547        ),
548    }
549}
550
551// =============================================================================
552// Session reaper task
553// =============================================================================
554
555/// Spawn a background task that periodically cleans up expired sessions.
556pub fn spawn_session_reaper(playground: SharedPlayground) {
557    tokio::spawn(async move {
558        loop {
559            tokio::time::sleep(REAPER_INTERVAL).await;
560            let mut pg = playground.write().await;
561            let reaped = pg.reap_expired();
562            if reaped > 0 {
563                tracing::debug!("Playground: reaped {} expired sessions", reaped);
564            }
565        }
566    });
567}
568
569// =============================================================================
570// Helpers
571// =============================================================================
572
573fn pg_error_response(status: StatusCode, code: &str, message: &str) -> Response {
574    let body = PlaygroundError {
575        error: message.to_string(),
576        code: code.to_string(),
577    };
578    (status, Json(body)).into_response()
579}
580
581fn parse_error_to_diagnostic(
582    source: &str,
583    error: &varpulis_parser::ParseError,
584) -> PlaygroundDiagnostic {
585    use varpulis_parser::ParseError;
586    match error {
587        ParseError::Located {
588            line,
589            column,
590            message,
591            hint,
592            ..
593        } => PlaygroundDiagnostic {
594            severity: "error".into(),
595            message: message.clone(),
596            hint: hint.clone(),
597            code: None,
598            start_line: line.saturating_sub(1) as u32,
599            start_col: column.saturating_sub(1) as u32,
600            end_line: line.saturating_sub(1) as u32,
601            end_col: *column as u32,
602        },
603        ParseError::UnexpectedToken {
604            position,
605            expected,
606            found,
607        } => {
608            let (line, col) = position_to_line_col(source, *position);
609            PlaygroundDiagnostic {
610                severity: "error".into(),
611                message: format!("Unexpected token: expected {expected}, found '{found}'"),
612                hint: None,
613                code: None,
614                start_line: line as u32,
615                start_col: col as u32,
616                end_line: line as u32,
617                end_col: (col + found.len()) as u32,
618            }
619        }
620        ParseError::UnexpectedEof => {
621            let line = source.lines().count().saturating_sub(1);
622            let col = source.lines().last().map_or(0, |l| l.len());
623            PlaygroundDiagnostic {
624                severity: "error".into(),
625                message: "Unexpected end of input".into(),
626                hint: None,
627                code: None,
628                start_line: line as u32,
629                start_col: col as u32,
630                end_line: line as u32,
631                end_col: col as u32,
632            }
633        }
634        ParseError::InvalidToken { position, message } => {
635            let (line, col) = position_to_line_col(source, *position);
636            PlaygroundDiagnostic {
637                severity: "error".into(),
638                message: message.clone(),
639                hint: None,
640                code: None,
641                start_line: line as u32,
642                start_col: col as u32,
643                end_line: line as u32,
644                end_col: (col + 10) as u32,
645            }
646        }
647        ParseError::InvalidNumber(msg)
648        | ParseError::InvalidDuration(msg)
649        | ParseError::InvalidTimestamp(msg)
650        | ParseError::InvalidEscape(msg) => PlaygroundDiagnostic {
651            severity: "error".into(),
652            message: msg.clone(),
653            hint: None,
654            code: None,
655            start_line: 0,
656            start_col: 0,
657            end_line: 0,
658            end_col: 0,
659        },
660        ParseError::UnterminatedString(position) => {
661            let (line, col) = position_to_line_col(source, *position);
662            PlaygroundDiagnostic {
663                severity: "error".into(),
664                message: "Unterminated string literal".into(),
665                hint: None,
666                code: None,
667                start_line: line as u32,
668                start_col: col as u32,
669                end_line: line as u32,
670                end_col: source.lines().nth(line).map_or(col, |l| l.len()) as u32,
671            }
672        }
673        ParseError::Custom { span, message } => {
674            let (sl, sc) = position_to_line_col(source, span.start);
675            let (el, ec) = position_to_line_col(source, span.end);
676            PlaygroundDiagnostic {
677                severity: "error".into(),
678                message: message.clone(),
679                hint: None,
680                code: None,
681                start_line: sl as u32,
682                start_col: sc as u32,
683                end_line: el as u32,
684                end_col: ec as u32,
685            }
686        }
687    }
688}
689
690fn position_to_line_col(source: &str, position: usize) -> (usize, usize) {
691    let mut line = 0;
692    let mut col = 0;
693    let mut pos = 0;
694
695    for ch in source.chars() {
696        if pos >= position {
697            break;
698        }
699        if ch == '\n' {
700            line += 1;
701            col = 0;
702        } else {
703            col += 1;
704        }
705        pos += ch.len_utf8();
706    }
707
708    (line, col)
709}
710
711#[cfg(test)]
712mod tests {
713    use super::*;
714
715    #[test]
716    fn test_builtin_examples_valid() {
717        let examples = builtin_examples();
718        assert!(!examples.is_empty());
719        for example in &examples {
720            assert!(!example.id.is_empty());
721            assert!(!example.name.is_empty());
722            assert!(!example.vpl.is_empty());
723            assert!(!example.events.is_empty());
724        }
725    }
726
727    #[test]
728    fn test_builtin_examples_unique_ids() {
729        let examples = builtin_examples();
730        let mut ids: Vec<&str> = examples.iter().map(|e| e.id.as_str()).collect();
731        ids.sort_unstable();
732        ids.dedup();
733        assert_eq!(ids.len(), examples.len(), "Duplicate example IDs found");
734    }
735
736    #[test]
737    fn test_session_reaping() {
738        let mut state = PlaygroundState::new();
739        state.get_or_create_session("test-1");
740        state.get_or_create_session("test-2");
741        assert_eq!(state.sessions.len(), 2);
742
743        // Sessions should not be reaped immediately
744        let reaped = state.reap_expired();
745        assert_eq!(reaped, 0);
746        assert_eq!(state.sessions.len(), 2);
747    }
748
749    #[test]
750    fn test_validate_response_for_valid_vpl() {
751        let vpl = r#"
752event SensorReading:
753    temperature: int
754
755stream HighTemp = SensorReading
756    .where(temperature > 30)
757    .emit(alert: "high_temp")
758"#;
759        match varpulis_parser::parse(vpl) {
760            Ok(program) => {
761                let validation = varpulis_core::validate::validate(vpl, &program);
762                let has_errors = validation
763                    .diagnostics
764                    .iter()
765                    .any(|d| d.severity == varpulis_core::validate::Severity::Error);
766                assert!(!has_errors);
767            }
768            Err(e) => panic!("Parse failed: {e}"),
769        }
770    }
771
772    #[test]
773    fn test_evt_format_parsing() {
774        let evt_text = r#"sensor_reading { sensor_id: "S001", temperature: 65.3 }"#;
775        let timed = varpulis_runtime::event_file::EventFileParser::parse(evt_text).unwrap();
776        assert_eq!(timed.len(), 1);
777        assert_eq!(timed[0].event.event_type.as_ref(), "sensor_reading");
778    }
779
780    #[tokio::test]
781    async fn test_iot_anomaly_example_produces_matches() {
782        let examples = builtin_examples();
783        let iot = examples.iter().find(|e| e.id == "iot-anomaly").unwrap();
784
785        let timed = varpulis_runtime::event_file::EventFileParser::parse(&iot.events).unwrap();
786        let events: Vec<varpulis_runtime::event::Event> =
787            timed.into_iter().map(|te| te.event).collect();
788
789        let results = crate::simulate_from_source(&iot.vpl, events).await.unwrap();
790        assert_eq!(
791            results.len(),
792            iot.expected_output_count.unwrap(),
793            "IoT anomaly example should produce {} matches, got {}",
794            iot.expected_output_count.unwrap(),
795            results.len()
796        );
797    }
798
799    #[tokio::test]
800    async fn test_all_examples_with_expected_count() {
801        for example in builtin_examples() {
802            let Some(expected) = example.expected_output_count else {
803                continue;
804            };
805
806            let timed = varpulis_runtime::event_file::EventFileParser::parse(&example.events)
807                .unwrap_or_else(|e| panic!("Event parse failed for '{}': {e}", example.id));
808            let events: Vec<varpulis_runtime::event::Event> =
809                timed.into_iter().map(|te| te.event).collect();
810
811            let results = crate::simulate_from_source(&example.vpl, events)
812                .await
813                .unwrap_or_else(|e| panic!("Run failed for '{}': {e}", example.id));
814            assert_eq!(
815                results.len(),
816                expected,
817                "Example '{}' expected {} matches, got {}",
818                example.id,
819                expected,
820                results.len()
821            );
822        }
823    }
824}