Skip to main content

varpulis_cli/
websocket.rs

1//! WebSocket module for Varpulis CLI
2//!
3//! Provides WebSocket server functionality for the VS Code extension and other clients.
4
5use std::path::PathBuf;
6use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
7use std::sync::Arc;
8
9use axum::extract::ws::{Message, WebSocket};
10use futures_util::{SinkExt, StreamExt};
11use indexmap::IndexMap;
12use rustc_hash::FxBuildHasher;
13use serde::{Deserialize, Serialize};
14use tokio::sync::{mpsc, RwLock};
15use varpulis_parser::parse;
16use varpulis_runtime::engine::Engine;
17use varpulis_runtime::event::Event;
18
19use crate::security;
20
21// =============================================================================
22// Relay Metrics
23// =============================================================================
24
25/// Metrics for the output event relay pipeline.
26///
27/// Thread-safe counters tracking events forwarded to WebSocket clients,
28/// events dropped (no subscribers or coordinator unreachable), forwarding
29/// errors, and coordinator health status.
30#[derive(Debug)]
31pub struct RelayMetrics {
32    pub events_forwarded: AtomicU64,
33    pub events_dropped: AtomicU64,
34    pub forwarding_errors: AtomicU64,
35    pub coordinator_healthy: AtomicBool,
36}
37
38impl RelayMetrics {
39    pub const fn new() -> Self {
40        Self {
41            events_forwarded: AtomicU64::new(0),
42            events_dropped: AtomicU64::new(0),
43            forwarding_errors: AtomicU64::new(0),
44            coordinator_healthy: AtomicBool::new(true),
45        }
46    }
47
48    pub fn snapshot(&self) -> serde_json::Value {
49        serde_json::json!({
50            "relay_events_forwarded": self.events_forwarded.load(Ordering::Relaxed),
51            "relay_events_dropped": self.events_dropped.load(Ordering::Relaxed),
52            "relay_forwarding_errors": self.forwarding_errors.load(Ordering::Relaxed),
53            "relay_coordinator_healthy": self.coordinator_healthy.load(Ordering::Relaxed),
54        })
55    }
56}
57
58impl Default for RelayMetrics {
59    fn default() -> Self {
60        Self::new()
61    }
62}
63
64// =============================================================================
65// Message Types
66// =============================================================================
67
68/// WebSocket message types for client-server communication
69#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
70#[serde(tag = "type", rename_all = "snake_case")]
71pub enum WsMessage {
72    // Client -> Server messages
73    /// Load a VPL file
74    LoadFile { path: String },
75    /// Inject an event into the engine
76    InjectEvent {
77        event_type: String,
78        data: serde_json::Value,
79    },
80    /// Request list of streams
81    GetStreams,
82    /// Request current metrics
83    GetMetrics,
84
85    // Server -> Client messages
86    /// Result of loading a file
87    LoadResult {
88        success: bool,
89        streams_loaded: usize,
90        error: Option<String>,
91    },
92    /// List of streams
93    Streams { data: Vec<StreamInfo> },
94    /// Event notification
95    Event {
96        id: String,
97        event_type: String,
98        timestamp: String,
99        data: serde_json::Value,
100    },
101    /// Output event notification
102    OutputEvent {
103        event_type: String,
104        data: serde_json::Value,
105        timestamp: String,
106    },
107    /// Current metrics
108    Metrics {
109        events_processed: u64,
110        output_events_emitted: u64,
111        active_streams: usize,
112        uptime: f64,
113        memory_usage: u64,
114        cpu_usage: f64,
115    },
116    /// Result of event injection
117    EventInjected { event_type: String, success: bool },
118    /// Error message
119    Error { message: String },
120}
121
122/// Information about a stream
123#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
124pub struct StreamInfo {
125    pub name: String,
126    pub source: String,
127    pub operations: Vec<String>,
128    pub events_per_second: f64,
129    pub status: String,
130}
131
132// =============================================================================
133// Server State
134// =============================================================================
135
136/// Shared state for the WebSocket server
137#[derive(Debug)]
138pub struct ServerState {
139    /// The loaded engine (if any)
140    pub engine: Option<Engine>,
141    /// List of streams
142    pub streams: Vec<StreamInfo>,
143    /// Server start time
144    pub start_time: std::time::Instant,
145    /// Channel to send output events
146    pub output_tx: mpsc::Sender<Event>,
147    /// Allowed working directory for file operations
148    pub workdir: PathBuf,
149}
150
151impl ServerState {
152    /// Create a new server state
153    pub fn new(output_tx: mpsc::Sender<Event>, workdir: PathBuf) -> Self {
154        Self {
155            engine: None,
156            streams: Vec::new(),
157            start_time: std::time::Instant::now(),
158            output_tx,
159            workdir,
160        }
161    }
162
163    /// Get the uptime in seconds
164    pub fn uptime_secs(&self) -> f64 {
165        self.start_time.elapsed().as_secs_f64()
166    }
167}
168
169// =============================================================================
170// Message Handlers
171// =============================================================================
172
173/// Handle a WebSocket message and return a response
174pub async fn handle_message(msg: WsMessage, state: &Arc<RwLock<ServerState>>) -> WsMessage {
175    match msg {
176        WsMessage::LoadFile { path } => handle_load_file(&path, state).await,
177        WsMessage::GetStreams => handle_get_streams(state).await,
178        WsMessage::GetMetrics => handle_get_metrics(state).await,
179        WsMessage::InjectEvent { event_type, data } => {
180            handle_inject_event(&event_type, data, state).await
181        }
182        _ => WsMessage::Error {
183            message: "Unknown message type".to_string(),
184        },
185    }
186}
187
188/// Handle LoadFile message
189async fn handle_load_file(path: &str, state: &Arc<RwLock<ServerState>>) -> WsMessage {
190    // Get workdir from state
191    let workdir = {
192        let state = state.read().await;
193        state.workdir.clone()
194    };
195
196    // Validate path
197    let validated_path = match security::validate_path(path, &workdir) {
198        Ok(p) => p,
199        Err(e) => {
200            return WsMessage::LoadResult {
201                success: false,
202                streams_loaded: 0,
203                error: Some(e.to_string()),
204            };
205        }
206    };
207
208    // Read file
209    let source = match std::fs::read_to_string(&validated_path) {
210        Ok(s) => s,
211        Err(_) => {
212            return WsMessage::LoadResult {
213                success: false,
214                streams_loaded: 0,
215                // Generic error to avoid information disclosure
216                error: Some("Failed to read file".to_string()),
217            };
218        }
219    };
220
221    // Parse program
222    let program = match parse(&source) {
223        Ok(p) => p,
224        Err(e) => {
225            return WsMessage::LoadResult {
226                success: false,
227                streams_loaded: 0,
228                error: Some(format!("Parse error: {e}")),
229            };
230        }
231    };
232
233    // Load into engine
234    let mut state = state.write().await;
235    let output_tx = state.output_tx.clone();
236    let mut engine = Engine::new(output_tx);
237
238    match engine.load(&program) {
239        Ok(()) => {
240            let streams_count = engine.metrics().streams_count;
241            let stream_infos: Vec<StreamInfo> = engine
242                .stream_names()
243                .into_iter()
244                .map(|name| StreamInfo {
245                    name: name.to_string(),
246                    source: String::new(),
247                    operations: Vec::new(),
248                    events_per_second: 0.0,
249                    status: "active".into(),
250                })
251                .collect();
252            state.engine = Some(engine);
253            state.streams = stream_infos;
254
255            WsMessage::LoadResult {
256                success: true,
257                streams_loaded: streams_count,
258                error: None,
259            }
260        }
261        Err(e) => WsMessage::LoadResult {
262            success: false,
263            streams_loaded: 0,
264            error: Some(e.to_string()),
265        },
266    }
267}
268
269/// Handle GetStreams message
270async fn handle_get_streams(state: &Arc<RwLock<ServerState>>) -> WsMessage {
271    let state = state.read().await;
272    WsMessage::Streams {
273        data: state.streams.clone(),
274    }
275}
276
277/// Handle GetMetrics message
278async fn handle_get_metrics(state: &Arc<RwLock<ServerState>>) -> WsMessage {
279    let state = state.read().await;
280    let (events_processed, output_events_emitted, active_streams) =
281        state.engine.as_ref().map_or((0, 0, 0), |engine| {
282            let m = engine.metrics();
283            (m.events_processed, m.output_events_emitted, m.streams_count)
284        });
285
286    WsMessage::Metrics {
287        events_processed,
288        output_events_emitted,
289        active_streams,
290        uptime: state.uptime_secs(),
291        memory_usage: process_rss_bytes(),
292        cpu_usage: 0.0, // CPU usage requires sampling over time; not meaningful in a snapshot
293    }
294}
295
296/// Maximum number of fields per injected event.
297const MAX_EVENT_FIELDS: usize = 256;
298
299/// Maximum JSON nesting depth for injected event data.
300const MAX_JSON_DEPTH: usize = 16;
301
302/// Handle InjectEvent message
303async fn handle_inject_event(
304    event_type: &str,
305    data: serde_json::Value,
306    state: &Arc<RwLock<ServerState>>,
307) -> WsMessage {
308    let mut state = state.write().await;
309
310    let engine = match state.engine.as_mut() {
311        Some(e) => e,
312        None => {
313            return WsMessage::Error {
314                message: "No engine loaded. Load a .vpl file first.".to_string(),
315            };
316        }
317    };
318
319    // Create event from injected data
320    let mut event = Event::new(event_type);
321
322    // Convert JSON data to event fields with safety limits
323    if let Some(obj) = data.as_object() {
324        if obj.len() > MAX_EVENT_FIELDS {
325            return WsMessage::Error {
326                message: format!(
327                    "Event exceeds maximum field count ({} > {})",
328                    obj.len(),
329                    MAX_EVENT_FIELDS
330                ),
331            };
332        }
333        for (key, value) in obj {
334            let v = json_to_value_bounded(value, MAX_JSON_DEPTH);
335            event.data.insert(key.as_str().into(), v);
336        }
337    }
338
339    // Process the event
340    match engine.process(event).await {
341        Ok(()) => WsMessage::EventInjected {
342            event_type: event_type.to_string(),
343            success: true,
344        },
345        Err(e) => WsMessage::Error {
346            message: format!("Failed to process event: {e}"),
347        },
348    }
349}
350
351/// Read process RSS (Resident Set Size) in bytes from /proc/self/statm.
352/// Returns 0 on non-Linux platforms or if the read fails.
353fn process_rss_bytes() -> u64 {
354    #[cfg(target_os = "linux")]
355    {
356        if let Ok(statm) = std::fs::read_to_string("/proc/self/statm") {
357            // statm format: size resident shared text lib data dt (all in pages)
358            if let Some(rss_pages) = statm.split_whitespace().nth(1) {
359                if let Ok(pages) = rss_pages.parse::<u64>() {
360                    return pages * 4096; // page size is typically 4KB
361                }
362            }
363        }
364        0
365    }
366    #[cfg(not(target_os = "linux"))]
367    {
368        0
369    }
370}
371
372// =============================================================================
373// WebSocket Connection Handler
374// =============================================================================
375
376/// Handle a WebSocket connection
377pub async fn handle_connection(
378    ws: WebSocket,
379    state: Arc<RwLock<ServerState>>,
380    broadcast_tx: Arc<tokio::sync::broadcast::Sender<String>>,
381) {
382    let (ws_tx, mut ws_rx) = ws.split();
383    let ws_tx = Arc::new(tokio::sync::Mutex::new(ws_tx));
384    let mut broadcast_rx = broadcast_tx.subscribe();
385
386    // Spawn task to forward broadcasts to this client
387    let ws_tx_clone = ws_tx.clone();
388    let forward_task = tokio::spawn(async move {
389        while let Ok(msg) = broadcast_rx.recv().await {
390            let mut tx = ws_tx_clone.lock().await;
391            if tx.send(Message::Text(msg.into())).await.is_err() {
392                break;
393            }
394        }
395    });
396
397    // Handle incoming messages
398    while let Some(result) = ws_rx.next().await {
399        let msg = match result {
400            Ok(msg) => msg,
401            Err(e) => {
402                tracing::warn!("WebSocket error: {}", e);
403                break;
404            }
405        };
406
407        match msg {
408            Message::Text(text) => {
409                if let Ok(ws_msg) = serde_json::from_str::<WsMessage>(&text) {
410                    let response = handle_message(ws_msg, &state).await;
411                    if let Ok(json) = serde_json::to_string(&response) {
412                        let mut tx = ws_tx.lock().await;
413                        if tx.send(Message::Text(json.into())).await.is_err() {
414                            break;
415                        }
416                    }
417                }
418            }
419            Message::Close(_) => break,
420            _ => {} // ignore binary, ping, pong
421        }
422    }
423
424    forward_task.abort();
425    tracing::info!("WebSocket client disconnected");
426}
427
428// =============================================================================
429// Output Event Forwarder
430// =============================================================================
431
432/// Create an output event message from an Event
433pub fn create_output_event_message(event: &Event) -> WsMessage {
434    WsMessage::OutputEvent {
435        event_type: event.event_type.to_string(),
436        data: serde_json::to_value(&event.data).unwrap_or_default(),
437        timestamp: event.timestamp.to_rfc3339(),
438    }
439}
440
441/// Spawn an output event forwarder task
442pub fn forward_output_events_to_websocket(
443    mut output_rx: mpsc::Receiver<Event>,
444    broadcast_tx: Arc<tokio::sync::broadcast::Sender<String>>,
445    metrics: Arc<RelayMetrics>,
446) -> tokio::task::JoinHandle<()> {
447    tokio::spawn(async move {
448        while let Some(event) = output_rx.recv().await {
449            let msg = create_output_event_message(&event);
450            if let Ok(json) = serde_json::to_string(&msg) {
451                match broadcast_tx.send(json) {
452                    Ok(_) => {
453                        metrics.events_forwarded.fetch_add(1, Ordering::Relaxed);
454                    }
455                    Err(_) => {
456                        metrics.events_dropped.fetch_add(1, Ordering::Relaxed);
457                        tracing::debug!("No WS subscribers for output event");
458                    }
459                }
460            }
461        }
462    })
463}
464
465// =============================================================================
466// Coordinator WebSocket Handler (broadcast-only, no engine)
467// =============================================================================
468
469/// Handle a coordinator WebSocket connection (output events only, no engine).
470///
471/// The coordinator doesn't run pipelines — it only relays output events received
472/// from workers via the internal `/api/v1/internal/output-events` endpoint.
473pub async fn handle_coordinator_connection(
474    ws: WebSocket,
475    broadcast_tx: Arc<tokio::sync::broadcast::Sender<String>>,
476) {
477    let (ws_tx, mut ws_rx) = ws.split();
478    let ws_tx = Arc::new(tokio::sync::Mutex::new(ws_tx));
479    let mut broadcast_rx = broadcast_tx.subscribe();
480
481    // Forward broadcast messages to this WS client
482    let ws_tx_clone = ws_tx.clone();
483    let forward_task = tokio::spawn(async move {
484        while let Ok(msg) = broadcast_rx.recv().await {
485            let mut tx = ws_tx_clone.lock().await;
486            if tx.send(Message::Text(msg.into())).await.is_err() {
487                break;
488            }
489        }
490    });
491
492    // Consume incoming messages (keep connection alive, ignore content)
493    while let Some(result) = ws_rx.next().await {
494        match result {
495            Ok(Message::Close(_)) => break,
496            Err(_) => break,
497            _ => {} // ignore pings/text from client
498        }
499    }
500
501    forward_task.abort();
502    tracing::info!("Coordinator WebSocket client disconnected");
503}
504
505// =============================================================================
506// Worker → Coordinator Output Event Forwarder
507// =============================================================================
508
509/// Spawn a task that forwards output events from the worker's broadcast channel
510/// to the coordinator's internal endpoint for relaying to WebSocket clients.
511///
512/// Events are batched (up to 50 events or 200ms, whichever comes first) and
513/// sent as a JSON array of pre-serialized event strings.
514///
515/// Includes retry with exponential backoff (3 attempts: 100ms, 200ms, 400ms)
516/// and coordinator health-check gating after 5 consecutive failures.
517pub fn forward_output_events_to_coordinator(
518    broadcast_tx: Arc<tokio::sync::broadcast::Sender<String>>,
519    coordinator_url: String,
520    api_key: String,
521    metrics: Arc<RelayMetrics>,
522) -> tokio::task::JoinHandle<()> {
523    tokio::spawn(async move {
524        let client = reqwest::Client::builder()
525            .timeout(std::time::Duration::from_secs(5))
526            .build()
527            .unwrap_or_default();
528        let url = format!(
529            "{}/api/v1/internal/output-events",
530            coordinator_url.trim_end_matches('/')
531        );
532        let health_url = format!("{}/health", coordinator_url.trim_end_matches('/'));
533        let mut rx = broadcast_tx.subscribe();
534        let mut batch: Vec<String> = Vec::with_capacity(50);
535        let mut consecutive_failures: u32 = 0;
536
537        loop {
538            // Health-check gating: after 5 consecutive failures, probe before continuing
539            if consecutive_failures >= 5 {
540                metrics.coordinator_healthy.store(false, Ordering::Relaxed);
541                tracing::warn!(
542                    "Coordinator relay: {} consecutive failures, entering cooldown",
543                    consecutive_failures
544                );
545                loop {
546                    tokio::time::sleep(std::time::Duration::from_secs(5)).await;
547                    match client.get(&health_url).send().await {
548                        Ok(resp) if resp.status().is_success() => {
549                            tracing::info!("Coordinator relay: health check passed, resuming");
550                            consecutive_failures = 0;
551                            metrics.coordinator_healthy.store(true, Ordering::Relaxed);
552                            break;
553                        }
554                        _ => {
555                            tracing::debug!(
556                                "Coordinator relay: health check failed, retrying in 5s"
557                            );
558                        }
559                    }
560                }
561            }
562
563            // Collect first event (blocking wait)
564            match rx.recv().await {
565                Ok(msg) => batch.push(msg),
566                Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
567                Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
568                    tracing::warn!("Output event forwarder lagged by {} messages", n);
569                    continue;
570                }
571            }
572
573            // Drain remaining available events up to batch limit
574            let deadline = tokio::time::sleep(std::time::Duration::from_millis(200));
575            tokio::pin!(deadline);
576
577            loop {
578                if batch.len() >= 50 {
579                    break;
580                }
581                tokio::select! {
582                    result = rx.recv() => {
583                        match result {
584                            Ok(msg) => batch.push(msg),
585                            Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
586                            Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
587                        }
588                    }
589                    () = &mut deadline => break,
590                }
591            }
592
593            // Send batch to coordinator with retry
594            if !batch.is_empty() {
595                let batch_len = batch.len() as u64;
596                let mut sent = false;
597
598                for attempt in 0..3u32 {
599                    match client
600                        .post(&url)
601                        .header("x-api-key", &api_key)
602                        .json(&batch)
603                        .send()
604                        .await
605                    {
606                        Ok(resp) if resp.status().is_success() => {
607                            metrics
608                                .events_forwarded
609                                .fetch_add(batch_len, Ordering::Relaxed);
610                            metrics.coordinator_healthy.store(true, Ordering::Relaxed);
611                            consecutive_failures = 0;
612                            sent = true;
613                            break;
614                        }
615                        Ok(resp) => {
616                            metrics.forwarding_errors.fetch_add(1, Ordering::Relaxed);
617                            tracing::warn!(
618                                "Coordinator relay: attempt {}/3 got status {} for {} events",
619                                attempt + 1,
620                                resp.status(),
621                                batch_len
622                            );
623                        }
624                        Err(e) => {
625                            metrics.forwarding_errors.fetch_add(1, Ordering::Relaxed);
626                            tracing::warn!(
627                                "Coordinator relay: attempt {}/3 failed: {}",
628                                attempt + 1,
629                                e
630                            );
631                        }
632                    }
633                    // Exponential backoff: 100ms, 200ms, 400ms
634                    let backoff = std::time::Duration::from_millis(100 * 2u64.pow(attempt));
635                    tokio::time::sleep(backoff).await;
636                }
637
638                if !sent {
639                    consecutive_failures += 1;
640                    metrics
641                        .events_dropped
642                        .fetch_add(batch_len, Ordering::Relaxed);
643                    metrics.coordinator_healthy.store(false, Ordering::Relaxed);
644                    tracing::error!(
645                        "Coordinator relay: dropped {} events after 3 retries (consecutive failures: {})",
646                        batch_len,
647                        consecutive_failures
648                    );
649                }
650
651                batch.clear();
652            }
653        }
654    })
655}
656
657// =============================================================================
658// Utility Functions
659// =============================================================================
660
661/// Convert a serde_json::Value to varpulis_core::Value
662pub fn json_to_value(json: &serde_json::Value) -> varpulis_core::Value {
663    json_to_value_bounded(json, MAX_JSON_DEPTH)
664}
665
666/// Depth-bounded JSON to Value conversion. Returns Null if depth exceeded.
667fn json_to_value_bounded(json: &serde_json::Value, depth: usize) -> varpulis_core::Value {
668    use varpulis_core::Value;
669    if depth == 0 {
670        return Value::Null;
671    }
672    match json {
673        serde_json::Value::Null => Value::Null,
674        serde_json::Value::Bool(b) => Value::Bool(*b),
675        serde_json::Value::Number(n) => {
676            if let Some(i) = n.as_i64() {
677                Value::Int(i)
678            } else if let Some(f) = n.as_f64() {
679                Value::Float(f)
680            } else {
681                Value::Null
682            }
683        }
684        serde_json::Value::String(s) => Value::Str(s.clone().into()),
685        serde_json::Value::Array(arr) => Value::array(
686            arr.iter()
687                .map(|v| json_to_value_bounded(v, depth - 1))
688                .collect(),
689        ),
690        serde_json::Value::Object(obj) => {
691            let map: IndexMap<std::sync::Arc<str>, Value, FxBuildHasher> = obj
692                .iter()
693                .map(|(k, v)| {
694                    (
695                        std::sync::Arc::from(k.as_str()),
696                        json_to_value_bounded(v, depth - 1),
697                    )
698                })
699                .collect();
700            Value::map(map)
701        }
702    }
703}
704
705/// Convert a varpulis_core::Value to serde_json::Value
706pub fn value_to_json(value: &varpulis_core::Value) -> serde_json::Value {
707    use varpulis_core::Value;
708    match value {
709        Value::Null => serde_json::Value::Null,
710        Value::Bool(b) => serde_json::Value::Bool(*b),
711        Value::Int(i) => serde_json::json!(*i),
712        Value::Float(f) => serde_json::json!(*f),
713        Value::Str(s) => serde_json::Value::String(s.to_string()),
714        Value::Array(arr) => serde_json::Value::Array(arr.iter().map(value_to_json).collect()),
715        Value::Map(map) => {
716            let obj: serde_json::Map<String, serde_json::Value> = map
717                .iter()
718                .map(|(k, v)| (k.to_string(), value_to_json(v)))
719                .collect();
720            serde_json::Value::Object(obj)
721        }
722        // Timestamp is nanoseconds since epoch (i64)
723        Value::Timestamp(ts) => serde_json::json!(*ts),
724        // Duration is nanoseconds (u64)
725        Value::Duration(d) => serde_json::json!(*d),
726    }
727}
728
729// =============================================================================
730// Tests - TDD approach
731// =============================================================================
732
733#[cfg(test)]
734mod tests {
735    use super::*;
736
737    // -------------------------------------------------------------------------
738    // WsMessage serialization tests
739    // -------------------------------------------------------------------------
740
741    #[test]
742    fn test_ws_message_serialize_load_file() {
743        let msg = WsMessage::LoadFile {
744            path: "test.vpl".to_string(),
745        };
746        let json = serde_json::to_string(&msg).expect("should serialize");
747        assert!(json.contains("load_file"));
748        assert!(json.contains("test.vpl"));
749    }
750
751    #[test]
752    fn test_ws_message_deserialize_load_file() {
753        let json = r#"{"type": "load_file", "path": "example.vpl"}"#;
754        let msg: WsMessage = serde_json::from_str(json).expect("should deserialize");
755
756        match msg {
757            WsMessage::LoadFile { path } => {
758                assert_eq!(path, "example.vpl");
759            }
760            _ => panic!("Expected LoadFile message"),
761        }
762    }
763
764    #[test]
765    fn test_ws_message_serialize_inject_event() {
766        let msg = WsMessage::InjectEvent {
767            event_type: "Temperature".to_string(),
768            data: serde_json::json!({"value": 25.5}),
769        };
770        let json = serde_json::to_string(&msg).expect("should serialize");
771        assert!(json.contains("inject_event"));
772        assert!(json.contains("Temperature"));
773        assert!(json.contains("25.5"));
774    }
775
776    #[test]
777    fn test_ws_message_deserialize_inject_event() {
778        let json = r#"{"type": "inject_event", "event_type": "Sensor", "data": {"id": 42}}"#;
779        let msg: WsMessage = serde_json::from_str(json).expect("should deserialize");
780
781        match msg {
782            WsMessage::InjectEvent { event_type, data } => {
783                assert_eq!(event_type, "Sensor");
784                assert_eq!(data["id"], 42);
785            }
786            _ => panic!("Expected InjectEvent message"),
787        }
788    }
789
790    #[test]
791    fn test_ws_message_serialize_get_streams() {
792        let msg = WsMessage::GetStreams;
793        let json = serde_json::to_string(&msg).expect("should serialize");
794        assert!(json.contains("get_streams"));
795    }
796
797    #[test]
798    fn test_ws_message_serialize_get_metrics() {
799        let msg = WsMessage::GetMetrics;
800        let json = serde_json::to_string(&msg).expect("should serialize");
801        assert!(json.contains("get_metrics"));
802    }
803
804    #[test]
805    fn test_ws_message_serialize_load_result_success() {
806        let msg = WsMessage::LoadResult {
807            success: true,
808            streams_loaded: 5,
809            error: None,
810        };
811        let json = serde_json::to_string(&msg).expect("should serialize");
812        assert!(json.contains("load_result"));
813        assert!(json.contains("true"));
814        assert!(json.contains('5'));
815    }
816
817    #[test]
818    fn test_ws_message_serialize_load_result_error() {
819        let msg = WsMessage::LoadResult {
820            success: false,
821            streams_loaded: 0,
822            error: Some("Parse error at line 5".to_string()),
823        };
824        let json = serde_json::to_string(&msg).expect("should serialize");
825        assert!(json.contains("false"));
826        assert!(json.contains("Parse error"));
827    }
828
829    #[test]
830    fn test_ws_message_serialize_streams() {
831        let msg = WsMessage::Streams {
832            data: vec![StreamInfo {
833                name: "HighTemp".to_string(),
834                source: "TempReading".to_string(),
835                operations: vec!["where".to_string(), "emit".to_string()],
836                events_per_second: 100.5,
837                status: "active".to_string(),
838            }],
839        };
840        let json = serde_json::to_string(&msg).expect("should serialize");
841        assert!(json.contains("HighTemp"));
842        assert!(json.contains("TempReading"));
843    }
844
845    #[test]
846    fn test_ws_message_serialize_output_event() {
847        let msg = WsMessage::OutputEvent {
848            event_type: "HighTemperature".to_string(),
849            data: serde_json::json!({"value": 35.5}),
850            timestamp: "2026-01-29T12:00:00Z".to_string(),
851        };
852        let json = serde_json::to_string(&msg).expect("should serialize");
853        assert!(json.contains("output_event"));
854        assert!(json.contains("HighTemperature"));
855        assert!(json.contains("35.5"));
856    }
857
858    #[test]
859    fn test_ws_message_serialize_metrics() {
860        let msg = WsMessage::Metrics {
861            events_processed: 1000,
862            output_events_emitted: 50,
863            active_streams: 3,
864            uptime: 3600.5,
865            memory_usage: 1024000,
866            cpu_usage: 25.5,
867        };
868        let json = serde_json::to_string(&msg).expect("should serialize");
869        assert!(json.contains("metrics"));
870        assert!(json.contains("1000"));
871        assert!(json.contains("50"));
872    }
873
874    #[test]
875    fn test_ws_message_serialize_error() {
876        let msg = WsMessage::Error {
877            message: "Something went wrong".to_string(),
878        };
879        let json = serde_json::to_string(&msg).expect("should serialize");
880        assert!(json.contains("error"));
881        assert!(json.contains("Something went wrong"));
882    }
883
884    // -------------------------------------------------------------------------
885    // json_to_value tests
886    // -------------------------------------------------------------------------
887
888    #[test]
889    fn test_json_to_value_null() {
890        let json = serde_json::Value::Null;
891        let value = json_to_value(&json);
892        assert_eq!(value, varpulis_core::Value::Null);
893    }
894
895    #[test]
896    fn test_json_to_value_bool() {
897        let json = serde_json::json!(true);
898        let value = json_to_value(&json);
899        assert_eq!(value, varpulis_core::Value::Bool(true));
900    }
901
902    #[test]
903    fn test_json_to_value_int() {
904        let json = serde_json::json!(42);
905        let value = json_to_value(&json);
906        assert_eq!(value, varpulis_core::Value::Int(42));
907    }
908
909    #[test]
910    fn test_json_to_value_float() {
911        let json = serde_json::json!(3.15);
912        let value = json_to_value(&json);
913        match value {
914            varpulis_core::Value::Float(f) => {
915                assert!((f - 3.15).abs() < 0.001);
916            }
917            _ => panic!("Expected Float"),
918        }
919    }
920
921    #[test]
922    fn test_json_to_value_string() {
923        let json = serde_json::json!("hello");
924        let value = json_to_value(&json);
925        assert_eq!(value, varpulis_core::Value::Str("hello".into()));
926    }
927
928    #[test]
929    fn test_json_to_value_array() {
930        let json = serde_json::json!([1, 2, 3]);
931        let value = json_to_value(&json);
932        match value {
933            varpulis_core::Value::Array(arr) => {
934                assert_eq!(arr.len(), 3);
935                assert_eq!(arr[0], varpulis_core::Value::Int(1));
936            }
937            _ => panic!("Expected Array"),
938        }
939    }
940
941    #[test]
942    fn test_json_to_value_object() {
943        let json = serde_json::json!({"key": "value"});
944        let value = json_to_value(&json);
945        match value {
946            varpulis_core::Value::Map(map) => {
947                assert_eq!(
948                    map.get("key"),
949                    Some(&varpulis_core::Value::Str("value".into()))
950                );
951            }
952            _ => panic!("Expected Map"),
953        }
954    }
955
956    #[test]
957    fn test_json_to_value_nested() {
958        let json = serde_json::json!({
959            "name": "sensor",
960            "values": [1, 2, 3],
961            "config": {"enabled": true}
962        });
963        let value = json_to_value(&json);
964        match value {
965            varpulis_core::Value::Map(map) => {
966                assert!(map.contains_key("name"));
967                assert!(map.contains_key("values"));
968                assert!(map.contains_key("config"));
969            }
970            _ => panic!("Expected Map"),
971        }
972    }
973
974    // -------------------------------------------------------------------------
975    // value_to_json tests
976    // -------------------------------------------------------------------------
977
978    #[test]
979    fn test_value_to_json_null() {
980        let value = varpulis_core::Value::Null;
981        let json = value_to_json(&value);
982        assert_eq!(json, serde_json::Value::Null);
983    }
984
985    #[test]
986    fn test_value_to_json_bool() {
987        let value = varpulis_core::Value::Bool(false);
988        let json = value_to_json(&value);
989        assert_eq!(json, serde_json::json!(false));
990    }
991
992    #[test]
993    fn test_value_to_json_int() {
994        let value = varpulis_core::Value::Int(100);
995        let json = value_to_json(&value);
996        assert_eq!(json, serde_json::json!(100));
997    }
998
999    #[test]
1000    fn test_value_to_json_float() {
1001        let value = varpulis_core::Value::Float(2.72);
1002        let json = value_to_json(&value);
1003        assert_eq!(json, serde_json::json!(2.72));
1004    }
1005
1006    #[test]
1007    fn test_value_to_json_string() {
1008        let value = varpulis_core::Value::Str("world".into());
1009        let json = value_to_json(&value);
1010        assert_eq!(json, serde_json::json!("world"));
1011    }
1012
1013    #[test]
1014    fn test_value_to_json_array() {
1015        let value = varpulis_core::Value::array(vec![
1016            varpulis_core::Value::Int(1),
1017            varpulis_core::Value::Int(2),
1018        ]);
1019        let json = value_to_json(&value);
1020        assert_eq!(json, serde_json::json!([1, 2]));
1021    }
1022
1023    // -------------------------------------------------------------------------
1024    // StreamInfo tests
1025    // -------------------------------------------------------------------------
1026
1027    #[test]
1028    fn test_stream_info_serialize() {
1029        let info = StreamInfo {
1030            name: "TestStream".to_string(),
1031            source: "Events".to_string(),
1032            operations: vec!["where".to_string(), "select".to_string()],
1033            events_per_second: 50.0,
1034            status: "active".to_string(),
1035        };
1036        let json = serde_json::to_string(&info).expect("should serialize");
1037        assert!(json.contains("TestStream"));
1038        assert!(json.contains("Events"));
1039        assert!(json.contains("active"));
1040    }
1041
1042    #[test]
1043    fn test_stream_info_deserialize() {
1044        let json = r#"{
1045            "name": "MyStream",
1046            "source": "Sensors",
1047            "operations": ["filter"],
1048            "events_per_second": 10.0,
1049            "status": "idle"
1050        }"#;
1051        let info: StreamInfo = serde_json::from_str(json).expect("should deserialize");
1052        assert_eq!(info.name, "MyStream");
1053        assert_eq!(info.source, "Sensors");
1054        assert_eq!(info.status, "idle");
1055    }
1056
1057    // -------------------------------------------------------------------------
1058    // ServerState tests
1059    // -------------------------------------------------------------------------
1060
1061    #[tokio::test]
1062    async fn test_server_state_new() {
1063        let (output_tx, _) = mpsc::channel(100);
1064        let workdir = std::env::current_dir().expect("should get current dir");
1065        let state = ServerState::new(output_tx, workdir.clone());
1066
1067        assert!(state.engine.is_none());
1068        assert!(state.streams.is_empty());
1069        assert_eq!(state.workdir, workdir);
1070    }
1071
1072    #[tokio::test]
1073    async fn test_server_state_uptime() {
1074        let (output_tx, _) = mpsc::channel(100);
1075        let workdir = std::env::current_dir().expect("should get current dir");
1076        let state = ServerState::new(output_tx, workdir);
1077
1078        // Small delay
1079        tokio::time::sleep(std::time::Duration::from_millis(10)).await;
1080
1081        let uptime = state.uptime_secs();
1082        assert!(uptime >= 0.01);
1083    }
1084
1085    // -------------------------------------------------------------------------
1086    // handle_message tests
1087    // -------------------------------------------------------------------------
1088
1089    #[tokio::test]
1090    async fn test_handle_message_unknown_type() {
1091        let (output_tx, _) = mpsc::channel(100);
1092        let workdir = std::env::current_dir().expect("should get current dir");
1093        let state = Arc::new(RwLock::new(ServerState::new(output_tx, workdir)));
1094
1095        // Create a response-type message (which shouldn't be sent from client)
1096        let msg = WsMessage::Error {
1097            message: "test".to_string(),
1098        };
1099
1100        let response = handle_message(msg, &state).await;
1101
1102        match response {
1103            WsMessage::Error { message } => {
1104                assert!(message.contains("Unknown"));
1105            }
1106            _ => panic!("Expected Error response"),
1107        }
1108    }
1109
1110    #[tokio::test]
1111    async fn test_handle_get_metrics_no_engine() {
1112        let (output_tx, _) = mpsc::channel(100);
1113        let workdir = std::env::current_dir().expect("should get current dir");
1114        let state = Arc::new(RwLock::new(ServerState::new(output_tx, workdir)));
1115
1116        let msg = WsMessage::GetMetrics;
1117        let response = handle_message(msg, &state).await;
1118
1119        match response {
1120            WsMessage::Metrics {
1121                events_processed,
1122                output_events_emitted,
1123                active_streams,
1124                ..
1125            } => {
1126                assert_eq!(events_processed, 0);
1127                assert_eq!(output_events_emitted, 0);
1128                assert_eq!(active_streams, 0);
1129            }
1130            _ => panic!("Expected Metrics response"),
1131        }
1132    }
1133
1134    #[tokio::test]
1135    async fn test_handle_get_streams_empty() {
1136        let (output_tx, _) = mpsc::channel(100);
1137        let workdir = std::env::current_dir().expect("should get current dir");
1138        let state = Arc::new(RwLock::new(ServerState::new(output_tx, workdir)));
1139
1140        let msg = WsMessage::GetStreams;
1141        let response = handle_message(msg, &state).await;
1142
1143        match response {
1144            WsMessage::Streams { data } => {
1145                assert!(data.is_empty());
1146            }
1147            _ => panic!("Expected Streams response"),
1148        }
1149    }
1150
1151    #[tokio::test]
1152    async fn test_handle_inject_event_no_engine() {
1153        let (output_tx, _) = mpsc::channel(100);
1154        let workdir = std::env::current_dir().expect("should get current dir");
1155        let state = Arc::new(RwLock::new(ServerState::new(output_tx, workdir)));
1156
1157        let msg = WsMessage::InjectEvent {
1158            event_type: "Test".to_string(),
1159            data: serde_json::json!({}),
1160        };
1161        let response = handle_message(msg, &state).await;
1162
1163        match response {
1164            WsMessage::Error { message } => {
1165                assert!(message.contains("No engine loaded"));
1166            }
1167            _ => panic!("Expected Error response"),
1168        }
1169    }
1170
1171    #[tokio::test]
1172    async fn test_handle_load_file_path_traversal() {
1173        use tempfile::TempDir;
1174
1175        let temp_dir = TempDir::new().expect("Failed to create temp dir");
1176        let workdir = temp_dir.path().join("allowed");
1177        std::fs::create_dir(&workdir).expect("Failed to create workdir");
1178
1179        let (output_tx, _) = mpsc::channel(100);
1180        let state = Arc::new(RwLock::new(ServerState::new(output_tx, workdir)));
1181
1182        let msg = WsMessage::LoadFile {
1183            path: "../../../etc/passwd".to_string(),
1184        };
1185        let response = handle_message(msg, &state).await;
1186
1187        match response {
1188            WsMessage::LoadResult { success, error, .. } => {
1189                assert!(!success);
1190                assert!(error.is_some());
1191                // Should NOT reveal the actual path in error
1192                let err = error.expect("should have error");
1193                assert!(!err.contains("passwd"));
1194            }
1195            _ => panic!("Expected LoadResult response"),
1196        }
1197    }
1198
1199    // -------------------------------------------------------------------------
1200    // create_output_event_message tests
1201    // -------------------------------------------------------------------------
1202
1203    #[test]
1204    fn test_create_output_event_message() {
1205        let mut event = Event::new("HighTemp");
1206        event
1207            .data
1208            .insert("sensor_id".into(), varpulis_core::Value::Str("S1".into()));
1209
1210        let msg = create_output_event_message(&event);
1211
1212        match msg {
1213            WsMessage::OutputEvent {
1214                event_type,
1215                data,
1216                timestamp,
1217            } => {
1218                assert_eq!(event_type, "HighTemp");
1219                assert!(data.get("sensor_id").is_some());
1220                assert!(!timestamp.is_empty());
1221            }
1222            _ => panic!("Expected OutputEvent message"),
1223        }
1224    }
1225
1226    // -------------------------------------------------------------------------
1227    // Relay metrics tests
1228    // -------------------------------------------------------------------------
1229
1230    #[test]
1231    fn test_relay_metrics_new() {
1232        let metrics = RelayMetrics::new();
1233        assert_eq!(metrics.events_forwarded.load(Ordering::Relaxed), 0);
1234        assert_eq!(metrics.events_dropped.load(Ordering::Relaxed), 0);
1235        assert_eq!(metrics.forwarding_errors.load(Ordering::Relaxed), 0);
1236        assert!(metrics.coordinator_healthy.load(Ordering::Relaxed));
1237    }
1238
1239    #[test]
1240    fn test_relay_metrics_snapshot() {
1241        let metrics = RelayMetrics::new();
1242        metrics.events_forwarded.store(100, Ordering::Relaxed);
1243        metrics.events_dropped.store(5, Ordering::Relaxed);
1244        metrics.forwarding_errors.store(2, Ordering::Relaxed);
1245        metrics.coordinator_healthy.store(false, Ordering::Relaxed);
1246
1247        let snap = metrics.snapshot();
1248        assert_eq!(snap["relay_events_forwarded"], 100);
1249        assert_eq!(snap["relay_events_dropped"], 5);
1250        assert_eq!(snap["relay_forwarding_errors"], 2);
1251        assert_eq!(snap["relay_coordinator_healthy"], false);
1252    }
1253
1254    #[tokio::test]
1255    async fn test_forward_to_ws_broadcasts() {
1256        let (output_tx, output_rx) = mpsc::channel(100);
1257        let (broadcast_tx, _) = tokio::sync::broadcast::channel::<String>(100);
1258        let broadcast_tx = Arc::new(broadcast_tx);
1259        let metrics = Arc::new(RelayMetrics::new());
1260
1261        // Subscribe before starting forwarder
1262        let mut sub = broadcast_tx.subscribe();
1263
1264        let _handle =
1265            forward_output_events_to_websocket(output_rx, broadcast_tx.clone(), metrics.clone());
1266
1267        // Send an event
1268        let event = Event::new("TestType");
1269        output_tx.send(event).await.unwrap();
1270
1271        // Receive on subscriber
1272        let msg = tokio::time::timeout(std::time::Duration::from_secs(2), sub.recv())
1273            .await
1274            .expect("timeout")
1275            .expect("recv error");
1276
1277        assert!(msg.contains("TestType"));
1278        assert_eq!(metrics.events_forwarded.load(Ordering::Relaxed), 1);
1279        assert_eq!(metrics.events_dropped.load(Ordering::Relaxed), 0);
1280    }
1281
1282    #[tokio::test]
1283    async fn test_forward_to_ws_counts_drops() {
1284        let (output_tx, output_rx) = mpsc::channel(100);
1285        let (broadcast_tx, _) = tokio::sync::broadcast::channel::<String>(100);
1286        let broadcast_tx = Arc::new(broadcast_tx);
1287        let metrics = Arc::new(RelayMetrics::new());
1288
1289        // No subscribers — drops are expected
1290        let _handle =
1291            forward_output_events_to_websocket(output_rx, broadcast_tx.clone(), metrics.clone());
1292
1293        let event = Event::new("Dropped");
1294        output_tx.send(event).await.unwrap();
1295
1296        // Give forwarder time to process
1297        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1298
1299        assert_eq!(metrics.events_dropped.load(Ordering::Relaxed), 1);
1300        assert_eq!(metrics.events_forwarded.load(Ordering::Relaxed), 0);
1301    }
1302
1303    // NOTE: The coordinator WS relay integration test has been removed during the
1304    // axum migration. The old test framework has no direct axum equivalent without
1305    // adding tokio-tungstenite as a dev-dependency.
1306    // The broadcast relay logic is exercised by the forwarder tests above.
1307}