1use std::path::PathBuf;
6use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
7use std::sync::Arc;
8
9use axum::extract::ws::{Message, WebSocket};
10use futures_util::{SinkExt, StreamExt};
11use indexmap::IndexMap;
12use rustc_hash::FxBuildHasher;
13use serde::{Deserialize, Serialize};
14use tokio::sync::{mpsc, RwLock};
15use varpulis_parser::parse;
16use varpulis_runtime::engine::Engine;
17use varpulis_runtime::event::Event;
18
19use crate::security;
20
21#[derive(Debug)]
31pub struct RelayMetrics {
32 pub events_forwarded: AtomicU64,
33 pub events_dropped: AtomicU64,
34 pub forwarding_errors: AtomicU64,
35 pub coordinator_healthy: AtomicBool,
36}
37
38impl RelayMetrics {
39 pub const fn new() -> Self {
40 Self {
41 events_forwarded: AtomicU64::new(0),
42 events_dropped: AtomicU64::new(0),
43 forwarding_errors: AtomicU64::new(0),
44 coordinator_healthy: AtomicBool::new(true),
45 }
46 }
47
48 pub fn snapshot(&self) -> serde_json::Value {
49 serde_json::json!({
50 "relay_events_forwarded": self.events_forwarded.load(Ordering::Relaxed),
51 "relay_events_dropped": self.events_dropped.load(Ordering::Relaxed),
52 "relay_forwarding_errors": self.forwarding_errors.load(Ordering::Relaxed),
53 "relay_coordinator_healthy": self.coordinator_healthy.load(Ordering::Relaxed),
54 })
55 }
56}
57
58impl Default for RelayMetrics {
59 fn default() -> Self {
60 Self::new()
61 }
62}
63
64#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
70#[serde(tag = "type", rename_all = "snake_case")]
71pub enum WsMessage {
72 LoadFile { path: String },
75 InjectEvent {
77 event_type: String,
78 data: serde_json::Value,
79 },
80 GetStreams,
82 GetMetrics,
84
85 LoadResult {
88 success: bool,
89 streams_loaded: usize,
90 error: Option<String>,
91 },
92 Streams { data: Vec<StreamInfo> },
94 Event {
96 id: String,
97 event_type: String,
98 timestamp: String,
99 data: serde_json::Value,
100 },
101 OutputEvent {
103 event_type: String,
104 data: serde_json::Value,
105 timestamp: String,
106 },
107 Metrics {
109 events_processed: u64,
110 output_events_emitted: u64,
111 active_streams: usize,
112 uptime: f64,
113 memory_usage: u64,
114 cpu_usage: f64,
115 },
116 EventInjected { event_type: String, success: bool },
118 Error { message: String },
120}
121
122#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
124pub struct StreamInfo {
125 pub name: String,
126 pub source: String,
127 pub operations: Vec<String>,
128 pub events_per_second: f64,
129 pub status: String,
130}
131
132#[derive(Debug)]
138pub struct ServerState {
139 pub engine: Option<Engine>,
141 pub streams: Vec<StreamInfo>,
143 pub start_time: std::time::Instant,
145 pub output_tx: mpsc::Sender<Event>,
147 pub workdir: PathBuf,
149}
150
151impl ServerState {
152 pub fn new(output_tx: mpsc::Sender<Event>, workdir: PathBuf) -> Self {
154 Self {
155 engine: None,
156 streams: Vec::new(),
157 start_time: std::time::Instant::now(),
158 output_tx,
159 workdir,
160 }
161 }
162
163 pub fn uptime_secs(&self) -> f64 {
165 self.start_time.elapsed().as_secs_f64()
166 }
167}
168
169pub async fn handle_message(msg: WsMessage, state: &Arc<RwLock<ServerState>>) -> WsMessage {
175 match msg {
176 WsMessage::LoadFile { path } => handle_load_file(&path, state).await,
177 WsMessage::GetStreams => handle_get_streams(state).await,
178 WsMessage::GetMetrics => handle_get_metrics(state).await,
179 WsMessage::InjectEvent { event_type, data } => {
180 handle_inject_event(&event_type, data, state).await
181 }
182 _ => WsMessage::Error {
183 message: "Unknown message type".to_string(),
184 },
185 }
186}
187
188async fn handle_load_file(path: &str, state: &Arc<RwLock<ServerState>>) -> WsMessage {
190 let workdir = {
192 let state = state.read().await;
193 state.workdir.clone()
194 };
195
196 let validated_path = match security::validate_path(path, &workdir) {
198 Ok(p) => p,
199 Err(e) => {
200 return WsMessage::LoadResult {
201 success: false,
202 streams_loaded: 0,
203 error: Some(e.to_string()),
204 };
205 }
206 };
207
208 let source = match std::fs::read_to_string(&validated_path) {
210 Ok(s) => s,
211 Err(_) => {
212 return WsMessage::LoadResult {
213 success: false,
214 streams_loaded: 0,
215 error: Some("Failed to read file".to_string()),
217 };
218 }
219 };
220
221 let program = match parse(&source) {
223 Ok(p) => p,
224 Err(e) => {
225 return WsMessage::LoadResult {
226 success: false,
227 streams_loaded: 0,
228 error: Some(format!("Parse error: {e}")),
229 };
230 }
231 };
232
233 let mut state = state.write().await;
235 let output_tx = state.output_tx.clone();
236 let mut engine = Engine::new(output_tx);
237
238 match engine.load(&program) {
239 Ok(()) => {
240 let streams_count = engine.metrics().streams_count;
241 let stream_infos: Vec<StreamInfo> = engine
242 .stream_names()
243 .into_iter()
244 .map(|name| StreamInfo {
245 name: name.to_string(),
246 source: String::new(),
247 operations: Vec::new(),
248 events_per_second: 0.0,
249 status: "active".into(),
250 })
251 .collect();
252 state.engine = Some(engine);
253 state.streams = stream_infos;
254
255 WsMessage::LoadResult {
256 success: true,
257 streams_loaded: streams_count,
258 error: None,
259 }
260 }
261 Err(e) => WsMessage::LoadResult {
262 success: false,
263 streams_loaded: 0,
264 error: Some(e.to_string()),
265 },
266 }
267}
268
269async fn handle_get_streams(state: &Arc<RwLock<ServerState>>) -> WsMessage {
271 let state = state.read().await;
272 WsMessage::Streams {
273 data: state.streams.clone(),
274 }
275}
276
277async fn handle_get_metrics(state: &Arc<RwLock<ServerState>>) -> WsMessage {
279 let state = state.read().await;
280 let (events_processed, output_events_emitted, active_streams) =
281 state.engine.as_ref().map_or((0, 0, 0), |engine| {
282 let m = engine.metrics();
283 (m.events_processed, m.output_events_emitted, m.streams_count)
284 });
285
286 WsMessage::Metrics {
287 events_processed,
288 output_events_emitted,
289 active_streams,
290 uptime: state.uptime_secs(),
291 memory_usage: process_rss_bytes(),
292 cpu_usage: 0.0, }
294}
295
296const MAX_EVENT_FIELDS: usize = 256;
298
299const MAX_JSON_DEPTH: usize = 16;
301
302async fn handle_inject_event(
304 event_type: &str,
305 data: serde_json::Value,
306 state: &Arc<RwLock<ServerState>>,
307) -> WsMessage {
308 let mut state = state.write().await;
309
310 let engine = match state.engine.as_mut() {
311 Some(e) => e,
312 None => {
313 return WsMessage::Error {
314 message: "No engine loaded. Load a .vpl file first.".to_string(),
315 };
316 }
317 };
318
319 let mut event = Event::new(event_type);
321
322 if let Some(obj) = data.as_object() {
324 if obj.len() > MAX_EVENT_FIELDS {
325 return WsMessage::Error {
326 message: format!(
327 "Event exceeds maximum field count ({} > {})",
328 obj.len(),
329 MAX_EVENT_FIELDS
330 ),
331 };
332 }
333 for (key, value) in obj {
334 let v = json_to_value_bounded(value, MAX_JSON_DEPTH);
335 event.data.insert(key.as_str().into(), v);
336 }
337 }
338
339 match engine.process(event).await {
341 Ok(()) => WsMessage::EventInjected {
342 event_type: event_type.to_string(),
343 success: true,
344 },
345 Err(e) => WsMessage::Error {
346 message: format!("Failed to process event: {e}"),
347 },
348 }
349}
350
351fn process_rss_bytes() -> u64 {
354 #[cfg(target_os = "linux")]
355 {
356 if let Ok(statm) = std::fs::read_to_string("/proc/self/statm") {
357 if let Some(rss_pages) = statm.split_whitespace().nth(1) {
359 if let Ok(pages) = rss_pages.parse::<u64>() {
360 return pages * 4096; }
362 }
363 }
364 0
365 }
366 #[cfg(not(target_os = "linux"))]
367 {
368 0
369 }
370}
371
372pub async fn handle_connection(
378 ws: WebSocket,
379 state: Arc<RwLock<ServerState>>,
380 broadcast_tx: Arc<tokio::sync::broadcast::Sender<String>>,
381) {
382 let (ws_tx, mut ws_rx) = ws.split();
383 let ws_tx = Arc::new(tokio::sync::Mutex::new(ws_tx));
384 let mut broadcast_rx = broadcast_tx.subscribe();
385
386 let ws_tx_clone = ws_tx.clone();
388 let forward_task = tokio::spawn(async move {
389 while let Ok(msg) = broadcast_rx.recv().await {
390 let mut tx = ws_tx_clone.lock().await;
391 if tx.send(Message::Text(msg.into())).await.is_err() {
392 break;
393 }
394 }
395 });
396
397 while let Some(result) = ws_rx.next().await {
399 let msg = match result {
400 Ok(msg) => msg,
401 Err(e) => {
402 tracing::warn!("WebSocket error: {}", e);
403 break;
404 }
405 };
406
407 match msg {
408 Message::Text(text) => {
409 if let Ok(ws_msg) = serde_json::from_str::<WsMessage>(&text) {
410 let response = handle_message(ws_msg, &state).await;
411 if let Ok(json) = serde_json::to_string(&response) {
412 let mut tx = ws_tx.lock().await;
413 if tx.send(Message::Text(json.into())).await.is_err() {
414 break;
415 }
416 }
417 }
418 }
419 Message::Close(_) => break,
420 _ => {} }
422 }
423
424 forward_task.abort();
425 tracing::info!("WebSocket client disconnected");
426}
427
428pub fn create_output_event_message(event: &Event) -> WsMessage {
434 WsMessage::OutputEvent {
435 event_type: event.event_type.to_string(),
436 data: serde_json::to_value(&event.data).unwrap_or_default(),
437 timestamp: event.timestamp.to_rfc3339(),
438 }
439}
440
441pub fn forward_output_events_to_websocket(
443 mut output_rx: mpsc::Receiver<Event>,
444 broadcast_tx: Arc<tokio::sync::broadcast::Sender<String>>,
445 metrics: Arc<RelayMetrics>,
446) -> tokio::task::JoinHandle<()> {
447 tokio::spawn(async move {
448 while let Some(event) = output_rx.recv().await {
449 let msg = create_output_event_message(&event);
450 if let Ok(json) = serde_json::to_string(&msg) {
451 match broadcast_tx.send(json) {
452 Ok(_) => {
453 metrics.events_forwarded.fetch_add(1, Ordering::Relaxed);
454 }
455 Err(_) => {
456 metrics.events_dropped.fetch_add(1, Ordering::Relaxed);
457 tracing::debug!("No WS subscribers for output event");
458 }
459 }
460 }
461 }
462 })
463}
464
465pub async fn handle_coordinator_connection(
474 ws: WebSocket,
475 broadcast_tx: Arc<tokio::sync::broadcast::Sender<String>>,
476) {
477 let (ws_tx, mut ws_rx) = ws.split();
478 let ws_tx = Arc::new(tokio::sync::Mutex::new(ws_tx));
479 let mut broadcast_rx = broadcast_tx.subscribe();
480
481 let ws_tx_clone = ws_tx.clone();
483 let forward_task = tokio::spawn(async move {
484 while let Ok(msg) = broadcast_rx.recv().await {
485 let mut tx = ws_tx_clone.lock().await;
486 if tx.send(Message::Text(msg.into())).await.is_err() {
487 break;
488 }
489 }
490 });
491
492 while let Some(result) = ws_rx.next().await {
494 match result {
495 Ok(Message::Close(_)) => break,
496 Err(_) => break,
497 _ => {} }
499 }
500
501 forward_task.abort();
502 tracing::info!("Coordinator WebSocket client disconnected");
503}
504
505pub fn forward_output_events_to_coordinator(
518 broadcast_tx: Arc<tokio::sync::broadcast::Sender<String>>,
519 coordinator_url: String,
520 api_key: String,
521 metrics: Arc<RelayMetrics>,
522) -> tokio::task::JoinHandle<()> {
523 tokio::spawn(async move {
524 let client = reqwest::Client::builder()
525 .timeout(std::time::Duration::from_secs(5))
526 .build()
527 .unwrap_or_default();
528 let url = format!(
529 "{}/api/v1/internal/output-events",
530 coordinator_url.trim_end_matches('/')
531 );
532 let health_url = format!("{}/health", coordinator_url.trim_end_matches('/'));
533 let mut rx = broadcast_tx.subscribe();
534 let mut batch: Vec<String> = Vec::with_capacity(50);
535 let mut consecutive_failures: u32 = 0;
536
537 loop {
538 if consecutive_failures >= 5 {
540 metrics.coordinator_healthy.store(false, Ordering::Relaxed);
541 tracing::warn!(
542 "Coordinator relay: {} consecutive failures, entering cooldown",
543 consecutive_failures
544 );
545 loop {
546 tokio::time::sleep(std::time::Duration::from_secs(5)).await;
547 match client.get(&health_url).send().await {
548 Ok(resp) if resp.status().is_success() => {
549 tracing::info!("Coordinator relay: health check passed, resuming");
550 consecutive_failures = 0;
551 metrics.coordinator_healthy.store(true, Ordering::Relaxed);
552 break;
553 }
554 _ => {
555 tracing::debug!(
556 "Coordinator relay: health check failed, retrying in 5s"
557 );
558 }
559 }
560 }
561 }
562
563 match rx.recv().await {
565 Ok(msg) => batch.push(msg),
566 Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
567 Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
568 tracing::warn!("Output event forwarder lagged by {} messages", n);
569 continue;
570 }
571 }
572
573 let deadline = tokio::time::sleep(std::time::Duration::from_millis(200));
575 tokio::pin!(deadline);
576
577 loop {
578 if batch.len() >= 50 {
579 break;
580 }
581 tokio::select! {
582 result = rx.recv() => {
583 match result {
584 Ok(msg) => batch.push(msg),
585 Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
586 Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
587 }
588 }
589 () = &mut deadline => break,
590 }
591 }
592
593 if !batch.is_empty() {
595 let batch_len = batch.len() as u64;
596 let mut sent = false;
597
598 for attempt in 0..3u32 {
599 match client
600 .post(&url)
601 .header("x-api-key", &api_key)
602 .json(&batch)
603 .send()
604 .await
605 {
606 Ok(resp) if resp.status().is_success() => {
607 metrics
608 .events_forwarded
609 .fetch_add(batch_len, Ordering::Relaxed);
610 metrics.coordinator_healthy.store(true, Ordering::Relaxed);
611 consecutive_failures = 0;
612 sent = true;
613 break;
614 }
615 Ok(resp) => {
616 metrics.forwarding_errors.fetch_add(1, Ordering::Relaxed);
617 tracing::warn!(
618 "Coordinator relay: attempt {}/3 got status {} for {} events",
619 attempt + 1,
620 resp.status(),
621 batch_len
622 );
623 }
624 Err(e) => {
625 metrics.forwarding_errors.fetch_add(1, Ordering::Relaxed);
626 tracing::warn!(
627 "Coordinator relay: attempt {}/3 failed: {}",
628 attempt + 1,
629 e
630 );
631 }
632 }
633 let backoff = std::time::Duration::from_millis(100 * 2u64.pow(attempt));
635 tokio::time::sleep(backoff).await;
636 }
637
638 if !sent {
639 consecutive_failures += 1;
640 metrics
641 .events_dropped
642 .fetch_add(batch_len, Ordering::Relaxed);
643 metrics.coordinator_healthy.store(false, Ordering::Relaxed);
644 tracing::error!(
645 "Coordinator relay: dropped {} events after 3 retries (consecutive failures: {})",
646 batch_len,
647 consecutive_failures
648 );
649 }
650
651 batch.clear();
652 }
653 }
654 })
655}
656
657pub fn json_to_value(json: &serde_json::Value) -> varpulis_core::Value {
663 json_to_value_bounded(json, MAX_JSON_DEPTH)
664}
665
666fn json_to_value_bounded(json: &serde_json::Value, depth: usize) -> varpulis_core::Value {
668 use varpulis_core::Value;
669 if depth == 0 {
670 return Value::Null;
671 }
672 match json {
673 serde_json::Value::Null => Value::Null,
674 serde_json::Value::Bool(b) => Value::Bool(*b),
675 serde_json::Value::Number(n) => {
676 if let Some(i) = n.as_i64() {
677 Value::Int(i)
678 } else if let Some(f) = n.as_f64() {
679 Value::Float(f)
680 } else {
681 Value::Null
682 }
683 }
684 serde_json::Value::String(s) => Value::Str(s.clone().into()),
685 serde_json::Value::Array(arr) => Value::array(
686 arr.iter()
687 .map(|v| json_to_value_bounded(v, depth - 1))
688 .collect(),
689 ),
690 serde_json::Value::Object(obj) => {
691 let map: IndexMap<std::sync::Arc<str>, Value, FxBuildHasher> = obj
692 .iter()
693 .map(|(k, v)| {
694 (
695 std::sync::Arc::from(k.as_str()),
696 json_to_value_bounded(v, depth - 1),
697 )
698 })
699 .collect();
700 Value::map(map)
701 }
702 }
703}
704
705pub fn value_to_json(value: &varpulis_core::Value) -> serde_json::Value {
707 use varpulis_core::Value;
708 match value {
709 Value::Null => serde_json::Value::Null,
710 Value::Bool(b) => serde_json::Value::Bool(*b),
711 Value::Int(i) => serde_json::json!(*i),
712 Value::Float(f) => serde_json::json!(*f),
713 Value::Str(s) => serde_json::Value::String(s.to_string()),
714 Value::Array(arr) => serde_json::Value::Array(arr.iter().map(value_to_json).collect()),
715 Value::Map(map) => {
716 let obj: serde_json::Map<String, serde_json::Value> = map
717 .iter()
718 .map(|(k, v)| (k.to_string(), value_to_json(v)))
719 .collect();
720 serde_json::Value::Object(obj)
721 }
722 Value::Timestamp(ts) => serde_json::json!(*ts),
724 Value::Duration(d) => serde_json::json!(*d),
726 }
727}
728
729#[cfg(test)]
734mod tests {
735 use super::*;
736
737 #[test]
742 fn test_ws_message_serialize_load_file() {
743 let msg = WsMessage::LoadFile {
744 path: "test.vpl".to_string(),
745 };
746 let json = serde_json::to_string(&msg).expect("should serialize");
747 assert!(json.contains("load_file"));
748 assert!(json.contains("test.vpl"));
749 }
750
751 #[test]
752 fn test_ws_message_deserialize_load_file() {
753 let json = r#"{"type": "load_file", "path": "example.vpl"}"#;
754 let msg: WsMessage = serde_json::from_str(json).expect("should deserialize");
755
756 match msg {
757 WsMessage::LoadFile { path } => {
758 assert_eq!(path, "example.vpl");
759 }
760 _ => panic!("Expected LoadFile message"),
761 }
762 }
763
764 #[test]
765 fn test_ws_message_serialize_inject_event() {
766 let msg = WsMessage::InjectEvent {
767 event_type: "Temperature".to_string(),
768 data: serde_json::json!({"value": 25.5}),
769 };
770 let json = serde_json::to_string(&msg).expect("should serialize");
771 assert!(json.contains("inject_event"));
772 assert!(json.contains("Temperature"));
773 assert!(json.contains("25.5"));
774 }
775
776 #[test]
777 fn test_ws_message_deserialize_inject_event() {
778 let json = r#"{"type": "inject_event", "event_type": "Sensor", "data": {"id": 42}}"#;
779 let msg: WsMessage = serde_json::from_str(json).expect("should deserialize");
780
781 match msg {
782 WsMessage::InjectEvent { event_type, data } => {
783 assert_eq!(event_type, "Sensor");
784 assert_eq!(data["id"], 42);
785 }
786 _ => panic!("Expected InjectEvent message"),
787 }
788 }
789
790 #[test]
791 fn test_ws_message_serialize_get_streams() {
792 let msg = WsMessage::GetStreams;
793 let json = serde_json::to_string(&msg).expect("should serialize");
794 assert!(json.contains("get_streams"));
795 }
796
797 #[test]
798 fn test_ws_message_serialize_get_metrics() {
799 let msg = WsMessage::GetMetrics;
800 let json = serde_json::to_string(&msg).expect("should serialize");
801 assert!(json.contains("get_metrics"));
802 }
803
804 #[test]
805 fn test_ws_message_serialize_load_result_success() {
806 let msg = WsMessage::LoadResult {
807 success: true,
808 streams_loaded: 5,
809 error: None,
810 };
811 let json = serde_json::to_string(&msg).expect("should serialize");
812 assert!(json.contains("load_result"));
813 assert!(json.contains("true"));
814 assert!(json.contains('5'));
815 }
816
817 #[test]
818 fn test_ws_message_serialize_load_result_error() {
819 let msg = WsMessage::LoadResult {
820 success: false,
821 streams_loaded: 0,
822 error: Some("Parse error at line 5".to_string()),
823 };
824 let json = serde_json::to_string(&msg).expect("should serialize");
825 assert!(json.contains("false"));
826 assert!(json.contains("Parse error"));
827 }
828
829 #[test]
830 fn test_ws_message_serialize_streams() {
831 let msg = WsMessage::Streams {
832 data: vec![StreamInfo {
833 name: "HighTemp".to_string(),
834 source: "TempReading".to_string(),
835 operations: vec!["where".to_string(), "emit".to_string()],
836 events_per_second: 100.5,
837 status: "active".to_string(),
838 }],
839 };
840 let json = serde_json::to_string(&msg).expect("should serialize");
841 assert!(json.contains("HighTemp"));
842 assert!(json.contains("TempReading"));
843 }
844
845 #[test]
846 fn test_ws_message_serialize_output_event() {
847 let msg = WsMessage::OutputEvent {
848 event_type: "HighTemperature".to_string(),
849 data: serde_json::json!({"value": 35.5}),
850 timestamp: "2026-01-29T12:00:00Z".to_string(),
851 };
852 let json = serde_json::to_string(&msg).expect("should serialize");
853 assert!(json.contains("output_event"));
854 assert!(json.contains("HighTemperature"));
855 assert!(json.contains("35.5"));
856 }
857
858 #[test]
859 fn test_ws_message_serialize_metrics() {
860 let msg = WsMessage::Metrics {
861 events_processed: 1000,
862 output_events_emitted: 50,
863 active_streams: 3,
864 uptime: 3600.5,
865 memory_usage: 1024000,
866 cpu_usage: 25.5,
867 };
868 let json = serde_json::to_string(&msg).expect("should serialize");
869 assert!(json.contains("metrics"));
870 assert!(json.contains("1000"));
871 assert!(json.contains("50"));
872 }
873
874 #[test]
875 fn test_ws_message_serialize_error() {
876 let msg = WsMessage::Error {
877 message: "Something went wrong".to_string(),
878 };
879 let json = serde_json::to_string(&msg).expect("should serialize");
880 assert!(json.contains("error"));
881 assert!(json.contains("Something went wrong"));
882 }
883
884 #[test]
889 fn test_json_to_value_null() {
890 let json = serde_json::Value::Null;
891 let value = json_to_value(&json);
892 assert_eq!(value, varpulis_core::Value::Null);
893 }
894
895 #[test]
896 fn test_json_to_value_bool() {
897 let json = serde_json::json!(true);
898 let value = json_to_value(&json);
899 assert_eq!(value, varpulis_core::Value::Bool(true));
900 }
901
902 #[test]
903 fn test_json_to_value_int() {
904 let json = serde_json::json!(42);
905 let value = json_to_value(&json);
906 assert_eq!(value, varpulis_core::Value::Int(42));
907 }
908
909 #[test]
910 fn test_json_to_value_float() {
911 let json = serde_json::json!(3.15);
912 let value = json_to_value(&json);
913 match value {
914 varpulis_core::Value::Float(f) => {
915 assert!((f - 3.15).abs() < 0.001);
916 }
917 _ => panic!("Expected Float"),
918 }
919 }
920
921 #[test]
922 fn test_json_to_value_string() {
923 let json = serde_json::json!("hello");
924 let value = json_to_value(&json);
925 assert_eq!(value, varpulis_core::Value::Str("hello".into()));
926 }
927
928 #[test]
929 fn test_json_to_value_array() {
930 let json = serde_json::json!([1, 2, 3]);
931 let value = json_to_value(&json);
932 match value {
933 varpulis_core::Value::Array(arr) => {
934 assert_eq!(arr.len(), 3);
935 assert_eq!(arr[0], varpulis_core::Value::Int(1));
936 }
937 _ => panic!("Expected Array"),
938 }
939 }
940
941 #[test]
942 fn test_json_to_value_object() {
943 let json = serde_json::json!({"key": "value"});
944 let value = json_to_value(&json);
945 match value {
946 varpulis_core::Value::Map(map) => {
947 assert_eq!(
948 map.get("key"),
949 Some(&varpulis_core::Value::Str("value".into()))
950 );
951 }
952 _ => panic!("Expected Map"),
953 }
954 }
955
956 #[test]
957 fn test_json_to_value_nested() {
958 let json = serde_json::json!({
959 "name": "sensor",
960 "values": [1, 2, 3],
961 "config": {"enabled": true}
962 });
963 let value = json_to_value(&json);
964 match value {
965 varpulis_core::Value::Map(map) => {
966 assert!(map.contains_key("name"));
967 assert!(map.contains_key("values"));
968 assert!(map.contains_key("config"));
969 }
970 _ => panic!("Expected Map"),
971 }
972 }
973
974 #[test]
979 fn test_value_to_json_null() {
980 let value = varpulis_core::Value::Null;
981 let json = value_to_json(&value);
982 assert_eq!(json, serde_json::Value::Null);
983 }
984
985 #[test]
986 fn test_value_to_json_bool() {
987 let value = varpulis_core::Value::Bool(false);
988 let json = value_to_json(&value);
989 assert_eq!(json, serde_json::json!(false));
990 }
991
992 #[test]
993 fn test_value_to_json_int() {
994 let value = varpulis_core::Value::Int(100);
995 let json = value_to_json(&value);
996 assert_eq!(json, serde_json::json!(100));
997 }
998
999 #[test]
1000 fn test_value_to_json_float() {
1001 let value = varpulis_core::Value::Float(2.72);
1002 let json = value_to_json(&value);
1003 assert_eq!(json, serde_json::json!(2.72));
1004 }
1005
1006 #[test]
1007 fn test_value_to_json_string() {
1008 let value = varpulis_core::Value::Str("world".into());
1009 let json = value_to_json(&value);
1010 assert_eq!(json, serde_json::json!("world"));
1011 }
1012
1013 #[test]
1014 fn test_value_to_json_array() {
1015 let value = varpulis_core::Value::array(vec![
1016 varpulis_core::Value::Int(1),
1017 varpulis_core::Value::Int(2),
1018 ]);
1019 let json = value_to_json(&value);
1020 assert_eq!(json, serde_json::json!([1, 2]));
1021 }
1022
1023 #[test]
1028 fn test_stream_info_serialize() {
1029 let info = StreamInfo {
1030 name: "TestStream".to_string(),
1031 source: "Events".to_string(),
1032 operations: vec!["where".to_string(), "select".to_string()],
1033 events_per_second: 50.0,
1034 status: "active".to_string(),
1035 };
1036 let json = serde_json::to_string(&info).expect("should serialize");
1037 assert!(json.contains("TestStream"));
1038 assert!(json.contains("Events"));
1039 assert!(json.contains("active"));
1040 }
1041
1042 #[test]
1043 fn test_stream_info_deserialize() {
1044 let json = r#"{
1045 "name": "MyStream",
1046 "source": "Sensors",
1047 "operations": ["filter"],
1048 "events_per_second": 10.0,
1049 "status": "idle"
1050 }"#;
1051 let info: StreamInfo = serde_json::from_str(json).expect("should deserialize");
1052 assert_eq!(info.name, "MyStream");
1053 assert_eq!(info.source, "Sensors");
1054 assert_eq!(info.status, "idle");
1055 }
1056
1057 #[tokio::test]
1062 async fn test_server_state_new() {
1063 let (output_tx, _) = mpsc::channel(100);
1064 let workdir = std::env::current_dir().expect("should get current dir");
1065 let state = ServerState::new(output_tx, workdir.clone());
1066
1067 assert!(state.engine.is_none());
1068 assert!(state.streams.is_empty());
1069 assert_eq!(state.workdir, workdir);
1070 }
1071
1072 #[tokio::test]
1073 async fn test_server_state_uptime() {
1074 let (output_tx, _) = mpsc::channel(100);
1075 let workdir = std::env::current_dir().expect("should get current dir");
1076 let state = ServerState::new(output_tx, workdir);
1077
1078 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
1080
1081 let uptime = state.uptime_secs();
1082 assert!(uptime >= 0.01);
1083 }
1084
1085 #[tokio::test]
1090 async fn test_handle_message_unknown_type() {
1091 let (output_tx, _) = mpsc::channel(100);
1092 let workdir = std::env::current_dir().expect("should get current dir");
1093 let state = Arc::new(RwLock::new(ServerState::new(output_tx, workdir)));
1094
1095 let msg = WsMessage::Error {
1097 message: "test".to_string(),
1098 };
1099
1100 let response = handle_message(msg, &state).await;
1101
1102 match response {
1103 WsMessage::Error { message } => {
1104 assert!(message.contains("Unknown"));
1105 }
1106 _ => panic!("Expected Error response"),
1107 }
1108 }
1109
1110 #[tokio::test]
1111 async fn test_handle_get_metrics_no_engine() {
1112 let (output_tx, _) = mpsc::channel(100);
1113 let workdir = std::env::current_dir().expect("should get current dir");
1114 let state = Arc::new(RwLock::new(ServerState::new(output_tx, workdir)));
1115
1116 let msg = WsMessage::GetMetrics;
1117 let response = handle_message(msg, &state).await;
1118
1119 match response {
1120 WsMessage::Metrics {
1121 events_processed,
1122 output_events_emitted,
1123 active_streams,
1124 ..
1125 } => {
1126 assert_eq!(events_processed, 0);
1127 assert_eq!(output_events_emitted, 0);
1128 assert_eq!(active_streams, 0);
1129 }
1130 _ => panic!("Expected Metrics response"),
1131 }
1132 }
1133
1134 #[tokio::test]
1135 async fn test_handle_get_streams_empty() {
1136 let (output_tx, _) = mpsc::channel(100);
1137 let workdir = std::env::current_dir().expect("should get current dir");
1138 let state = Arc::new(RwLock::new(ServerState::new(output_tx, workdir)));
1139
1140 let msg = WsMessage::GetStreams;
1141 let response = handle_message(msg, &state).await;
1142
1143 match response {
1144 WsMessage::Streams { data } => {
1145 assert!(data.is_empty());
1146 }
1147 _ => panic!("Expected Streams response"),
1148 }
1149 }
1150
1151 #[tokio::test]
1152 async fn test_handle_inject_event_no_engine() {
1153 let (output_tx, _) = mpsc::channel(100);
1154 let workdir = std::env::current_dir().expect("should get current dir");
1155 let state = Arc::new(RwLock::new(ServerState::new(output_tx, workdir)));
1156
1157 let msg = WsMessage::InjectEvent {
1158 event_type: "Test".to_string(),
1159 data: serde_json::json!({}),
1160 };
1161 let response = handle_message(msg, &state).await;
1162
1163 match response {
1164 WsMessage::Error { message } => {
1165 assert!(message.contains("No engine loaded"));
1166 }
1167 _ => panic!("Expected Error response"),
1168 }
1169 }
1170
1171 #[tokio::test]
1172 async fn test_handle_load_file_path_traversal() {
1173 use tempfile::TempDir;
1174
1175 let temp_dir = TempDir::new().expect("Failed to create temp dir");
1176 let workdir = temp_dir.path().join("allowed");
1177 std::fs::create_dir(&workdir).expect("Failed to create workdir");
1178
1179 let (output_tx, _) = mpsc::channel(100);
1180 let state = Arc::new(RwLock::new(ServerState::new(output_tx, workdir)));
1181
1182 let msg = WsMessage::LoadFile {
1183 path: "../../../etc/passwd".to_string(),
1184 };
1185 let response = handle_message(msg, &state).await;
1186
1187 match response {
1188 WsMessage::LoadResult { success, error, .. } => {
1189 assert!(!success);
1190 assert!(error.is_some());
1191 let err = error.expect("should have error");
1193 assert!(!err.contains("passwd"));
1194 }
1195 _ => panic!("Expected LoadResult response"),
1196 }
1197 }
1198
1199 #[test]
1204 fn test_create_output_event_message() {
1205 let mut event = Event::new("HighTemp");
1206 event
1207 .data
1208 .insert("sensor_id".into(), varpulis_core::Value::Str("S1".into()));
1209
1210 let msg = create_output_event_message(&event);
1211
1212 match msg {
1213 WsMessage::OutputEvent {
1214 event_type,
1215 data,
1216 timestamp,
1217 } => {
1218 assert_eq!(event_type, "HighTemp");
1219 assert!(data.get("sensor_id").is_some());
1220 assert!(!timestamp.is_empty());
1221 }
1222 _ => panic!("Expected OutputEvent message"),
1223 }
1224 }
1225
1226 #[test]
1231 fn test_relay_metrics_new() {
1232 let metrics = RelayMetrics::new();
1233 assert_eq!(metrics.events_forwarded.load(Ordering::Relaxed), 0);
1234 assert_eq!(metrics.events_dropped.load(Ordering::Relaxed), 0);
1235 assert_eq!(metrics.forwarding_errors.load(Ordering::Relaxed), 0);
1236 assert!(metrics.coordinator_healthy.load(Ordering::Relaxed));
1237 }
1238
1239 #[test]
1240 fn test_relay_metrics_snapshot() {
1241 let metrics = RelayMetrics::new();
1242 metrics.events_forwarded.store(100, Ordering::Relaxed);
1243 metrics.events_dropped.store(5, Ordering::Relaxed);
1244 metrics.forwarding_errors.store(2, Ordering::Relaxed);
1245 metrics.coordinator_healthy.store(false, Ordering::Relaxed);
1246
1247 let snap = metrics.snapshot();
1248 assert_eq!(snap["relay_events_forwarded"], 100);
1249 assert_eq!(snap["relay_events_dropped"], 5);
1250 assert_eq!(snap["relay_forwarding_errors"], 2);
1251 assert_eq!(snap["relay_coordinator_healthy"], false);
1252 }
1253
1254 #[tokio::test]
1255 async fn test_forward_to_ws_broadcasts() {
1256 let (output_tx, output_rx) = mpsc::channel(100);
1257 let (broadcast_tx, _) = tokio::sync::broadcast::channel::<String>(100);
1258 let broadcast_tx = Arc::new(broadcast_tx);
1259 let metrics = Arc::new(RelayMetrics::new());
1260
1261 let mut sub = broadcast_tx.subscribe();
1263
1264 let _handle =
1265 forward_output_events_to_websocket(output_rx, broadcast_tx.clone(), metrics.clone());
1266
1267 let event = Event::new("TestType");
1269 output_tx.send(event).await.unwrap();
1270
1271 let msg = tokio::time::timeout(std::time::Duration::from_secs(2), sub.recv())
1273 .await
1274 .expect("timeout")
1275 .expect("recv error");
1276
1277 assert!(msg.contains("TestType"));
1278 assert_eq!(metrics.events_forwarded.load(Ordering::Relaxed), 1);
1279 assert_eq!(metrics.events_dropped.load(Ordering::Relaxed), 0);
1280 }
1281
1282 #[tokio::test]
1283 async fn test_forward_to_ws_counts_drops() {
1284 let (output_tx, output_rx) = mpsc::channel(100);
1285 let (broadcast_tx, _) = tokio::sync::broadcast::channel::<String>(100);
1286 let broadcast_tx = Arc::new(broadcast_tx);
1287 let metrics = Arc::new(RelayMetrics::new());
1288
1289 let _handle =
1291 forward_output_events_to_websocket(output_rx, broadcast_tx.clone(), metrics.clone());
1292
1293 let event = Event::new("Dropped");
1294 output_tx.send(event).await.unwrap();
1295
1296 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1298
1299 assert_eq!(metrics.events_dropped.load(Ordering::Relaxed), 1);
1300 assert_eq!(metrics.events_forwarded.load(Ordering::Relaxed), 0);
1301 }
1302
1303 }