Skip to main content

varpulis_cli/
playground.rs

1//! Playground API — zero-friction VPL execution for the interactive playground.
2//!
3//! Provides ephemeral sessions with no authentication required.
4//! Sessions auto-expire after inactivity and have conservative resource quotas.
5
6use axum::extract::{Json, Path, State};
7use axum::http::StatusCode;
8use axum::response::{IntoResponse, Response};
9use axum::routing::{get, post};
10use axum::Router;
11use indexmap::IndexMap;
12use rustc_hash::FxBuildHasher;
13use serde::{Deserialize, Serialize};
14use std::collections::HashMap;
15use std::sync::Arc;
16use std::time::{Duration, Instant};
17use tokio::sync::RwLock;
18use uuid::Uuid;
19use varpulis_core::Value;
20use varpulis_runtime::event::Event;
21
22// =============================================================================
23// Constants
24// =============================================================================
25
26/// Maximum events per playground run request.
27const MAX_EVENTS_PER_RUN: usize = 10_000;
28
29/// Maximum execution timeout per request (seconds).
30const MAX_EXECUTION_SECS: u64 = 10;
31
32/// Session expiry after inactivity.
33const SESSION_EXPIRY: Duration = Duration::from_secs(3600); // 1 hour
34
35/// Reaper interval — how often to clean up expired sessions.
36const REAPER_INTERVAL: Duration = Duration::from_secs(300); // 5 minutes
37
38/// Maximum VPL source length.
39const MAX_VPL_LENGTH: usize = 50_000;
40
41// =============================================================================
42// Types
43// =============================================================================
44
45/// Shared playground state.
46pub type SharedPlayground = Arc<RwLock<PlaygroundState>>;
47
48/// State for the playground backend.
49#[derive(Debug)]
50pub struct PlaygroundState {
51    sessions: HashMap<String, PlaygroundSession>,
52}
53
54#[derive(Debug)]
55struct PlaygroundSession {
56    last_active: Instant,
57}
58
59impl Default for PlaygroundState {
60    fn default() -> Self {
61        Self::new()
62    }
63}
64
65impl PlaygroundState {
66    pub fn new() -> Self {
67        Self {
68            sessions: HashMap::new(),
69        }
70    }
71
72    fn get_or_create_session(&mut self, session_id: &str) -> &mut PlaygroundSession {
73        self.sessions
74            .entry(session_id.to_string())
75            .or_insert_with(|| PlaygroundSession {
76                last_active: Instant::now(),
77            })
78    }
79
80    fn reap_expired(&mut self) -> usize {
81        let before = self.sessions.len();
82        self.sessions
83            .retain(|_, s| s.last_active.elapsed() < SESSION_EXPIRY);
84        before - self.sessions.len()
85    }
86}
87
88// =============================================================================
89// Request/Response types
90// =============================================================================
91
92#[derive(Debug, Serialize)]
93pub struct SessionResponse {
94    pub session_id: String,
95}
96
97#[derive(Debug, Deserialize)]
98pub struct PlaygroundRunRequest {
99    pub vpl: String,
100    #[serde(default)]
101    pub events: Vec<PlaygroundEvent>,
102}
103
104#[derive(Debug, Deserialize)]
105pub struct PlaygroundEvent {
106    pub event_type: String,
107    #[serde(default)]
108    pub fields: serde_json::Map<String, serde_json::Value>,
109}
110
111#[derive(Debug, Deserialize)]
112pub struct PlaygroundValidateRequest {
113    pub vpl: String,
114}
115
116#[derive(Debug, Serialize)]
117pub struct PlaygroundRunResponse {
118    pub ok: bool,
119    pub events_processed: usize,
120    pub output_events: Vec<serde_json::Value>,
121    pub latency_ms: u64,
122    #[serde(skip_serializing_if = "Vec::is_empty")]
123    pub diagnostics: Vec<PlaygroundDiagnostic>,
124    #[serde(skip_serializing_if = "Option::is_none")]
125    pub error: Option<String>,
126}
127
128#[derive(Debug, Serialize)]
129pub struct PlaygroundValidateResponse {
130    pub ok: bool,
131    #[serde(skip_serializing_if = "Option::is_none")]
132    pub ast: Option<serde_json::Value>,
133    pub diagnostics: Vec<PlaygroundDiagnostic>,
134}
135
136#[derive(Debug, Serialize)]
137pub struct PlaygroundDiagnostic {
138    pub severity: String,
139    pub message: String,
140    #[serde(skip_serializing_if = "Option::is_none")]
141    pub hint: Option<String>,
142    #[serde(skip_serializing_if = "Option::is_none")]
143    pub code: Option<String>,
144    pub start_line: u32,
145    pub start_col: u32,
146    pub end_line: u32,
147    pub end_col: u32,
148}
149
150#[derive(Debug, Serialize)]
151pub struct PlaygroundExample {
152    pub id: String,
153    pub name: String,
154    pub description: String,
155    pub category: String,
156}
157
158#[derive(Debug, Serialize)]
159pub struct PlaygroundExampleDetail {
160    pub id: String,
161    pub name: String,
162    pub description: String,
163    pub category: String,
164    pub vpl: String,
165    pub events: Vec<serde_json::Value>,
166    pub expected_output_count: Option<usize>,
167}
168
169#[derive(Debug, Serialize)]
170struct PlaygroundError {
171    error: String,
172    code: String,
173}
174
175// =============================================================================
176// Built-in examples
177// =============================================================================
178
179fn builtin_examples() -> Vec<PlaygroundExampleDetail> {
180    vec![
181        PlaygroundExampleDetail {
182            id: "hvac-alert".into(),
183            name: "HVAC Alert".into(),
184            description: "Simple temperature threshold alert — the 'Hello World' of CEP.".into(),
185            category: "Getting Started".into(),
186            vpl: r#"stream HighTemp = TempReading
187    .where(temperature > 30)
188    .emit(alert: "High temperature detected", sensor: sensor_id, temp: temperature)"#.into(),
189            events: vec![
190                serde_json::json!({"event_type": "TempReading", "sensor_id": "HVAC-01", "temperature": 22}),
191                serde_json::json!({"event_type": "TempReading", "sensor_id": "HVAC-02", "temperature": 35}),
192                serde_json::json!({"event_type": "TempReading", "sensor_id": "HVAC-03", "temperature": 28}),
193                serde_json::json!({"event_type": "TempReading", "sensor_id": "HVAC-01", "temperature": 41}),
194                serde_json::json!({"event_type": "TempReading", "sensor_id": "HVAC-04", "temperature": 19}),
195                serde_json::json!({"event_type": "TempReading", "sensor_id": "HVAC-02", "temperature": 33}),
196            ],
197            expected_output_count: Some(3),
198        },
199        PlaygroundExampleDetail {
200            id: "fraud-detection".into(),
201            name: "Fraud Detection".into(),
202            description: "Detect login followed by large transfer within 5 minutes — sequence pattern matching.".into(),
203            category: "Finance".into(),
204            vpl: r#"stream FraudAlert = login as l -> transfer as t .within(5m)
205    .where(l.user_id == t.user_id && t.amount > 5000)
206    .emit(alert: "Suspicious transfer after login", user: l.user_id, amount: t.amount, city: l.city)"#.into(),
207            events: vec![
208                serde_json::json!({"event_type": "login", "user_id": "alice", "city": "New York", "device": "mobile"}),
209                serde_json::json!({"event_type": "transfer", "user_id": "bob", "amount": 200, "to_account": "ext_001"}),
210                serde_json::json!({"event_type": "transfer", "user_id": "alice", "amount": 15000, "to_account": "ext_099"}),
211                serde_json::json!({"event_type": "login", "user_id": "charlie", "city": "London", "device": "desktop"}),
212                serde_json::json!({"event_type": "transfer", "user_id": "charlie", "amount": 8500, "to_account": "ext_042"}),
213                serde_json::json!({"event_type": "transfer", "user_id": "alice", "amount": 100, "to_account": "ext_005"}),
214            ],
215            expected_output_count: Some(2),
216        },
217        PlaygroundExampleDetail {
218            id: "iot-anomaly".into(),
219            name: "IoT Sensor Anomaly".into(),
220            description: "Detect temperature spikes in sensor data using window aggregation.".into(),
221            category: "IoT".into(),
222            vpl: r#"stream TempSpike = sensor_reading
223    .where(temperature > 50)
224    .emit(alert: "Temperature spike", sensor: sensor_id, temp: temperature, zone: zone)"#.into(),
225            events: vec![
226                serde_json::json!({"event_type": "sensor_reading", "sensor_id": "S001", "zone": "zone_a", "temperature": 22.5}),
227                serde_json::json!({"event_type": "sensor_reading", "sensor_id": "S002", "zone": "zone_b", "temperature": 65.3}),
228                serde_json::json!({"event_type": "sensor_reading", "sensor_id": "S001", "zone": "zone_a", "temperature": 23.1}),
229                serde_json::json!({"event_type": "sensor_reading", "sensor_id": "S003", "zone": "zone_c", "temperature": 55.0}),
230                serde_json::json!({"event_type": "sensor_reading", "sensor_id": "S002", "zone": "zone_b", "temperature": 24.0}),
231            ],
232            expected_output_count: Some(2),
233        },
234        PlaygroundExampleDetail {
235            id: "trading-signal".into(),
236            name: "Trading Signal".into(),
237            description: "Detect large trades on a single symbol — volume spike alert.".into(),
238            category: "Finance".into(),
239            vpl: r#"stream VolumeSpike = trade
240    .where(volume > 10000)
241    .emit(alert: "Large trade detected", symbol: symbol, vol: volume, price: price, side: side)"#.into(),
242            events: vec![
243                serde_json::json!({"event_type": "trade", "symbol": "AAPL", "price": 185.50, "volume": 500, "side": "buy"}),
244                serde_json::json!({"event_type": "trade", "symbol": "GOOGL", "price": 142.30, "volume": 25000, "side": "sell"}),
245                serde_json::json!({"event_type": "trade", "symbol": "AAPL", "price": 185.60, "volume": 15000, "side": "buy"}),
246                serde_json::json!({"event_type": "trade", "symbol": "TSLA", "price": 250.10, "volume": 800, "side": "sell"}),
247                serde_json::json!({"event_type": "trade", "symbol": "MSFT", "price": 420.00, "volume": 50000, "side": "buy"}),
248            ],
249            expected_output_count: Some(3),
250        },
251        PlaygroundExampleDetail {
252            id: "cyber-killchain".into(),
253            name: "Cyber Kill Chain".into(),
254            description: "Detect a 3-stage attack sequence: scan → exploit → exfiltrate within 10 minutes.".into(),
255            category: "Security".into(),
256            vpl: r#"stream KillChain = scan as s -> exploit as e -> exfiltrate as x .within(10m)
257    .where(s.target_ip == e.target_ip && e.target_ip == x.source_ip)
258    .emit(alert: "Kill chain detected", target: s.target_ip, attacker: s.source_ip)"#.into(),
259            events: vec![
260                serde_json::json!({"event_type": "scan", "source_ip": "10.0.0.5", "target_ip": "192.168.1.100", "port": 443}),
261                serde_json::json!({"event_type": "exploit", "source_ip": "10.0.0.5", "target_ip": "192.168.1.100", "cve": "CVE-2024-1234"}),
262                serde_json::json!({"event_type": "exfiltrate", "source_ip": "192.168.1.100", "dest_ip": "10.0.0.5", "bytes": 50000000}),
263                serde_json::json!({"event_type": "scan", "source_ip": "10.0.0.9", "target_ip": "192.168.1.200", "port": 80}),
264                serde_json::json!({"event_type": "login", "user_id": "admin", "ip": "192.168.1.200"}),
265            ],
266            expected_output_count: Some(1),
267        },
268        PlaygroundExampleDetail {
269            id: "kleene-pattern".into(),
270            name: "Kleene Pattern".into(),
271            description: "Match one or more failed logins followed by a successful login — brute force detection.".into(),
272            category: "Security".into(),
273            vpl: r#"stream BruteForce = failed_login+ as fails -> successful_login as success .within(5m)
274    .where(fails.user_id == success.user_id)
275    .emit(alert: "Possible brute force", user: success.user_id)"#.into(),
276            events: vec![
277                serde_json::json!({"event_type": "failed_login", "user_id": "admin", "ip": "10.0.0.5"}),
278                serde_json::json!({"event_type": "failed_login", "user_id": "admin", "ip": "10.0.0.5"}),
279                serde_json::json!({"event_type": "failed_login", "user_id": "admin", "ip": "10.0.0.5"}),
280                serde_json::json!({"event_type": "successful_login", "user_id": "admin", "ip": "10.0.0.5"}),
281                serde_json::json!({"event_type": "successful_login", "user_id": "bob", "ip": "10.0.0.9"}),
282            ],
283            expected_output_count: None,
284        },
285        PlaygroundExampleDetail {
286            id: "merge-stream".into(),
287            name: "Merge Streams".into(),
288            description: "Combine events from multiple sources into a single alert stream.".into(),
289            category: "Getting Started".into(),
290            vpl: r#"stream TempAlerts = TempReading
291    .where(temperature > 30)
292    .emit(alert: "High temp", source: "temp", value: temperature)
293
294stream HumidAlerts = HumidityReading
295    .where(humidity > 80)
296    .emit(alert: "High humidity", source: "humidity", value: humidity)
297
298stream AllAlerts = merge(TempAlerts, HumidAlerts)
299    .emit()"#.into(),
300            events: vec![
301                serde_json::json!({"event_type": "TempReading", "sensor_id": "S1", "temperature": 35}),
302                serde_json::json!({"event_type": "HumidityReading", "sensor_id": "S2", "humidity": 85}),
303                serde_json::json!({"event_type": "TempReading", "sensor_id": "S3", "temperature": 22}),
304                serde_json::json!({"event_type": "HumidityReading", "sensor_id": "S4", "humidity": 45}),
305                serde_json::json!({"event_type": "TempReading", "sensor_id": "S5", "temperature": 38}),
306            ],
307            expected_output_count: Some(3),
308        },
309        PlaygroundExampleDetail {
310            id: "forecast-fraud".into(),
311            name: "Fraud Forecasting".into(),
312            description: "Predict fraud patterns using PST-based forecasting — sequence prediction with confidence scores.".into(),
313            category: "Advanced".into(),
314            vpl: r"stream FraudForecast = login as l -> transfer as t .within(5m)
315    .forecast(confidence: 0.7, horizon: 2m, warmup: 50, max_depth: 3)
316    .where(forecast_probability > 0.5)
317    .emit(probability: forecast_probability, state: forecast_state)".into(),
318            events: {
319                // Generate a training sequence: alternating login/transfer pairs
320                let mut events = Vec::new();
321                for i in 0..60 {
322                    events.push(serde_json::json!({"event_type": "login", "user_id": format!("user_{}", i % 10), "city": "NYC"}));
323                    events.push(serde_json::json!({"event_type": "transfer", "user_id": format!("user_{}", i % 10), "amount": 100 + i * 10}));
324                }
325                // Add a final login to trigger forecast
326                events.push(serde_json::json!({"event_type": "login", "user_id": "user_0", "city": "NYC"}));
327                events
328            },
329            expected_output_count: None, // Depends on PST learning convergence
330        },
331    ]
332}
333
334// =============================================================================
335// Route construction
336// =============================================================================
337
338pub fn playground_routes(playground: SharedPlayground) -> Router {
339    Router::new()
340        .route("/api/v1/playground/session", post(handle_create_session))
341        .route(
342            "/api/v1/playground/run",
343            post(handle_run).layer(tower_http::limit::RequestBodyLimitLayer::new(1024 * 1024)),
344        )
345        .route(
346            "/api/v1/playground/validate",
347            post(handle_validate).layer(tower_http::limit::RequestBodyLimitLayer::new(256 * 1024)),
348        )
349        .route("/api/v1/playground/examples", get(handle_list_examples))
350        .route("/api/v1/playground/examples/{id}", get(handle_get_example))
351        .with_state(playground)
352}
353
354// =============================================================================
355// Handlers
356// =============================================================================
357
358async fn handle_create_session(State(playground): State<SharedPlayground>) -> impl IntoResponse {
359    let session_id = Uuid::new_v4().to_string();
360    {
361        let mut pg = playground.write().await;
362        pg.get_or_create_session(&session_id);
363    }
364    let resp = SessionResponse { session_id };
365    (StatusCode::CREATED, Json(resp))
366}
367
368async fn handle_run(
369    State(playground): State<SharedPlayground>,
370    Json(body): Json<PlaygroundRunRequest>,
371) -> Response {
372    // Validate input size
373    if body.vpl.len() > MAX_VPL_LENGTH {
374        return pg_error_response(
375            StatusCode::BAD_REQUEST,
376            "vpl_too_large",
377            &format!("VPL source exceeds maximum size of {MAX_VPL_LENGTH} bytes"),
378        );
379    }
380
381    if body.events.len() > MAX_EVENTS_PER_RUN {
382        return pg_error_response(
383            StatusCode::BAD_REQUEST,
384            "too_many_events",
385            &format!("Maximum {MAX_EVENTS_PER_RUN} events per run"),
386        );
387    }
388
389    // Track session (auto-create if needed)
390    {
391        let mut pg = playground.write().await;
392        let session_id = Uuid::new_v4().to_string();
393        pg.get_or_create_session(&session_id);
394    }
395
396    let start = Instant::now();
397
398    // Convert playground events to runtime events
399    let events: Vec<Event> = body
400        .events
401        .iter()
402        .map(|pe| {
403            let mut event = Event::new(pe.event_type.clone());
404            for (key, value) in &pe.fields {
405                let v = json_to_runtime_value(value);
406                event = event.with_field(key.as_str(), v);
407            }
408            event
409        })
410        .collect();
411    let event_count = events.len();
412
413    // Execute with timeout
414    let run_result = tokio::time::timeout(
415        Duration::from_secs(MAX_EXECUTION_SECS),
416        crate::simulate_from_source(&body.vpl, events),
417    )
418    .await;
419
420    let latency_ms = start.elapsed().as_millis() as u64;
421
422    match run_result {
423        Ok(Ok(output_events)) => {
424            let output: Vec<serde_json::Value> = output_events
425                .iter()
426                .map(|e| {
427                    let mut flat = serde_json::Map::new();
428                    flat.insert(
429                        "event_type".to_string(),
430                        serde_json::Value::String(e.event_type.to_string()),
431                    );
432                    for (k, v) in &e.data {
433                        flat.insert(k.to_string(), crate::websocket::value_to_json(v));
434                    }
435                    serde_json::Value::Object(flat)
436                })
437                .collect();
438
439            let resp = PlaygroundRunResponse {
440                ok: true,
441                events_processed: event_count,
442                output_events: output,
443                latency_ms,
444                diagnostics: vec![],
445                error: None,
446            };
447            (StatusCode::OK, Json(resp)).into_response()
448        }
449        Ok(Err(e)) => {
450            let resp = PlaygroundRunResponse {
451                ok: false,
452                events_processed: 0,
453                output_events: vec![],
454                latency_ms,
455                diagnostics: vec![],
456                error: Some(e.to_string()),
457            };
458            (StatusCode::OK, Json(resp)).into_response()
459        }
460        Err(_timeout) => {
461            let resp = PlaygroundRunResponse {
462                ok: false,
463                events_processed: 0,
464                output_events: vec![],
465                latency_ms,
466                diagnostics: vec![],
467                error: Some(format!("Execution timed out after {MAX_EXECUTION_SECS}s")),
468            };
469            (StatusCode::REQUEST_TIMEOUT, Json(resp)).into_response()
470        }
471    }
472}
473
474async fn handle_validate(
475    State(_playground): State<SharedPlayground>,
476    Json(body): Json<PlaygroundValidateRequest>,
477) -> Response {
478    if body.vpl.len() > MAX_VPL_LENGTH {
479        return pg_error_response(
480            StatusCode::BAD_REQUEST,
481            "vpl_too_large",
482            &format!("VPL source exceeds maximum size of {MAX_VPL_LENGTH} bytes"),
483        );
484    }
485
486    let result = match varpulis_parser::parse(&body.vpl) {
487        Ok(program) => {
488            let ast = serde_json::to_value(&program).ok();
489            let validation = varpulis_core::validate::validate(&body.vpl, &program);
490            let diagnostics: Vec<PlaygroundDiagnostic> = validation
491                .diagnostics
492                .iter()
493                .map(|d| {
494                    let (sl, sc) = position_to_line_col(&body.vpl, d.span.start);
495                    let (el, ec) = position_to_line_col(&body.vpl, d.span.end);
496                    PlaygroundDiagnostic {
497                        severity: match d.severity {
498                            varpulis_core::validate::Severity::Error => "error".into(),
499                            varpulis_core::validate::Severity::Warning => "warning".into(),
500                        },
501                        message: d.message.clone(),
502                        hint: d.hint.clone(),
503                        code: d.code.map(|c| c.to_string()),
504                        start_line: sl as u32,
505                        start_col: sc as u32,
506                        end_line: el as u32,
507                        end_col: ec as u32,
508                    }
509                })
510                .collect();
511            let has_errors = diagnostics.iter().any(|d| d.severity == "error");
512            PlaygroundValidateResponse {
513                ok: !has_errors,
514                ast,
515                diagnostics,
516            }
517        }
518        Err(error) => {
519            let diag = parse_error_to_diagnostic(&body.vpl, &error);
520            PlaygroundValidateResponse {
521                ok: false,
522                ast: None,
523                diagnostics: vec![diag],
524            }
525        }
526    };
527
528    (StatusCode::OK, Json(result)).into_response()
529}
530
531async fn handle_list_examples() -> impl IntoResponse {
532    let examples: Vec<PlaygroundExample> = builtin_examples()
533        .into_iter()
534        .map(|e| PlaygroundExample {
535            id: e.id,
536            name: e.name,
537            description: e.description,
538            category: e.category,
539        })
540        .collect();
541    (StatusCode::OK, Json(examples))
542}
543
544async fn handle_get_example(Path(id): Path<String>) -> Response {
545    let examples = builtin_examples();
546    match examples.into_iter().find(|e| e.id == id) {
547        Some(example) => (StatusCode::OK, Json(example)).into_response(),
548        None => pg_error_response(
549            StatusCode::NOT_FOUND,
550            "example_not_found",
551            &format!("Example '{id}' not found"),
552        ),
553    }
554}
555
556// =============================================================================
557// Session reaper task
558// =============================================================================
559
560/// Spawn a background task that periodically cleans up expired sessions.
561pub fn spawn_session_reaper(playground: SharedPlayground) {
562    tokio::spawn(async move {
563        loop {
564            tokio::time::sleep(REAPER_INTERVAL).await;
565            let mut pg = playground.write().await;
566            let reaped = pg.reap_expired();
567            if reaped > 0 {
568                tracing::debug!("Playground: reaped {} expired sessions", reaped);
569            }
570        }
571    });
572}
573
574// =============================================================================
575// Helpers
576// =============================================================================
577
578fn pg_error_response(status: StatusCode, code: &str, message: &str) -> Response {
579    let body = PlaygroundError {
580        error: message.to_string(),
581        code: code.to_string(),
582    };
583    (status, Json(body)).into_response()
584}
585
586fn json_to_runtime_value(v: &serde_json::Value) -> Value {
587    match v {
588        serde_json::Value::Null => Value::Null,
589        serde_json::Value::Bool(b) => Value::Bool(*b),
590        serde_json::Value::Number(n) => {
591            if let Some(i) = n.as_i64() {
592                Value::Int(i)
593            } else if let Some(f) = n.as_f64() {
594                Value::Float(f)
595            } else {
596                Value::Null
597            }
598        }
599        serde_json::Value::String(s) => Value::Str(s.clone().into()),
600        serde_json::Value::Array(arr) => {
601            Value::array(arr.iter().map(json_to_runtime_value).collect())
602        }
603        serde_json::Value::Object(map) => {
604            let mut m: IndexMap<std::sync::Arc<str>, Value, FxBuildHasher> =
605                IndexMap::with_hasher(FxBuildHasher);
606            for (k, v) in map {
607                m.insert(k.as_str().into(), json_to_runtime_value(v));
608            }
609            Value::map(m)
610        }
611    }
612}
613
614fn parse_error_to_diagnostic(
615    source: &str,
616    error: &varpulis_parser::ParseError,
617) -> PlaygroundDiagnostic {
618    use varpulis_parser::ParseError;
619    match error {
620        ParseError::Located {
621            line,
622            column,
623            message,
624            hint,
625            ..
626        } => PlaygroundDiagnostic {
627            severity: "error".into(),
628            message: message.clone(),
629            hint: hint.clone(),
630            code: None,
631            start_line: line.saturating_sub(1) as u32,
632            start_col: column.saturating_sub(1) as u32,
633            end_line: line.saturating_sub(1) as u32,
634            end_col: *column as u32,
635        },
636        ParseError::UnexpectedToken {
637            position,
638            expected,
639            found,
640        } => {
641            let (line, col) = position_to_line_col(source, *position);
642            PlaygroundDiagnostic {
643                severity: "error".into(),
644                message: format!("Unexpected token: expected {expected}, found '{found}'"),
645                hint: None,
646                code: None,
647                start_line: line as u32,
648                start_col: col as u32,
649                end_line: line as u32,
650                end_col: (col + found.len()) as u32,
651            }
652        }
653        ParseError::UnexpectedEof => {
654            let line = source.lines().count().saturating_sub(1);
655            let col = source.lines().last().map_or(0, |l| l.len());
656            PlaygroundDiagnostic {
657                severity: "error".into(),
658                message: "Unexpected end of input".into(),
659                hint: None,
660                code: None,
661                start_line: line as u32,
662                start_col: col as u32,
663                end_line: line as u32,
664                end_col: col as u32,
665            }
666        }
667        ParseError::InvalidToken { position, message } => {
668            let (line, col) = position_to_line_col(source, *position);
669            PlaygroundDiagnostic {
670                severity: "error".into(),
671                message: message.clone(),
672                hint: None,
673                code: None,
674                start_line: line as u32,
675                start_col: col as u32,
676                end_line: line as u32,
677                end_col: (col + 10) as u32,
678            }
679        }
680        ParseError::InvalidNumber(msg)
681        | ParseError::InvalidDuration(msg)
682        | ParseError::InvalidTimestamp(msg)
683        | ParseError::InvalidEscape(msg) => PlaygroundDiagnostic {
684            severity: "error".into(),
685            message: msg.clone(),
686            hint: None,
687            code: None,
688            start_line: 0,
689            start_col: 0,
690            end_line: 0,
691            end_col: 0,
692        },
693        ParseError::UnterminatedString(position) => {
694            let (line, col) = position_to_line_col(source, *position);
695            PlaygroundDiagnostic {
696                severity: "error".into(),
697                message: "Unterminated string literal".into(),
698                hint: None,
699                code: None,
700                start_line: line as u32,
701                start_col: col as u32,
702                end_line: line as u32,
703                end_col: source.lines().nth(line).map_or(col, |l| l.len()) as u32,
704            }
705        }
706        ParseError::Custom { span, message } => {
707            let (sl, sc) = position_to_line_col(source, span.start);
708            let (el, ec) = position_to_line_col(source, span.end);
709            PlaygroundDiagnostic {
710                severity: "error".into(),
711                message: message.clone(),
712                hint: None,
713                code: None,
714                start_line: sl as u32,
715                start_col: sc as u32,
716                end_line: el as u32,
717                end_col: ec as u32,
718            }
719        }
720    }
721}
722
723fn position_to_line_col(source: &str, position: usize) -> (usize, usize) {
724    let mut line = 0;
725    let mut col = 0;
726    let mut pos = 0;
727
728    for ch in source.chars() {
729        if pos >= position {
730            break;
731        }
732        if ch == '\n' {
733            line += 1;
734            col = 0;
735        } else {
736            col += 1;
737        }
738        pos += ch.len_utf8();
739    }
740
741    (line, col)
742}
743
744#[cfg(test)]
745mod tests {
746    use super::*;
747
748    #[test]
749    fn test_builtin_examples_valid() {
750        let examples = builtin_examples();
751        assert!(!examples.is_empty());
752        for example in &examples {
753            assert!(!example.id.is_empty());
754            assert!(!example.name.is_empty());
755            assert!(!example.vpl.is_empty());
756            assert!(!example.events.is_empty());
757        }
758    }
759
760    #[test]
761    fn test_builtin_examples_unique_ids() {
762        let examples = builtin_examples();
763        let mut ids: Vec<&str> = examples.iter().map(|e| e.id.as_str()).collect();
764        ids.sort_unstable();
765        ids.dedup();
766        assert_eq!(ids.len(), examples.len(), "Duplicate example IDs found");
767    }
768
769    #[test]
770    fn test_session_reaping() {
771        let mut state = PlaygroundState::new();
772        state.get_or_create_session("test-1");
773        state.get_or_create_session("test-2");
774        assert_eq!(state.sessions.len(), 2);
775
776        // Sessions should not be reaped immediately
777        let reaped = state.reap_expired();
778        assert_eq!(reaped, 0);
779        assert_eq!(state.sessions.len(), 2);
780    }
781
782    #[test]
783    fn test_validate_response_for_valid_vpl() {
784        let vpl = r#"
785event SensorReading:
786    temperature: int
787
788stream HighTemp = SensorReading
789    .where(temperature > 30)
790    .emit(alert: "high_temp")
791"#;
792        match varpulis_parser::parse(vpl) {
793            Ok(program) => {
794                let validation = varpulis_core::validate::validate(vpl, &program);
795                let has_errors = validation
796                    .diagnostics
797                    .iter()
798                    .any(|d| d.severity == varpulis_core::validate::Severity::Error);
799                assert!(!has_errors);
800            }
801            Err(e) => panic!("Parse failed: {e}"),
802        }
803    }
804}