phlow_runtime/
debug_server.rs

1use phlow_engine::debug::{DebugController, DebugReleaseResult, DebugSnapshot};
2use phlow_sdk::prelude::{JsonMode, ToValueBehavior, Value};
3use std::collections::HashMap;
4use std::sync::Arc;
5use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
6use tokio::net::{TcpListener, TcpStream};
7
8const DEFAULT_DEBUG_PORT: u16 = 31400;
9
10pub async fn spawn(controller: Arc<DebugController>) -> std::io::Result<()> {
11    let addr = debug_addr();
12    let listener = TcpListener::bind(&addr).await?;
13    log::info!("Phlow debug server listening on {}", addr);
14
15    tokio::spawn(async move {
16        if let Err(err) = serve(listener, controller).await {
17            log::error!("Phlow debug server failed: {}", err);
18        }
19    });
20
21    Ok(())
22}
23
24async fn serve(
25    listener: TcpListener,
26    controller: Arc<DebugController>,
27) -> std::io::Result<()> {
28    loop {
29        let (stream, addr) = listener.accept().await?;
30        log::debug!("Debug client connected: {}", addr);
31        let controller = controller.clone();
32        tokio::spawn(async move {
33            if let Err(err) = handle_client(stream, controller).await {
34                log::debug!("Debug client error: {}", err);
35            }
36        });
37    }
38}
39
40async fn handle_client(
41    stream: TcpStream,
42    controller: Arc<DebugController>,
43) -> std::io::Result<()> {
44    let (reader, mut writer) = stream.into_split();
45    let mut lines = BufReader::new(reader).lines();
46
47    while let Some(line) = lines.next_line().await? {
48        let cmd = line.trim();
49        if cmd.is_empty() {
50            continue;
51        }
52
53        let response = match cmd.to_ascii_uppercase().as_str() {
54            "STEP" => match controller.current_snapshot().await {
55                Some(snapshot) => snapshot_to_value(&snapshot),
56                None => error_value("no step waiting"),
57            },
58            "SHOW" => match controller.show_script().await {
59                Some(script) => script,
60                None => error_value("no script loaded"),
61            },
62            "NEXT" => {
63                match controller.release_next().await {
64                    DebugReleaseResult::Released => ok_value(),
65                    DebugReleaseResult::Awaiting => error_value("awaiting step"),
66                    DebugReleaseResult::NoStep => error_value("no step waiting"),
67                }
68            }
69            "ALL" => {
70                let history = controller.history().await;
71                let values: Vec<Value> = history.iter().map(snapshot_to_value).collect();
72                values.to_value()
73            }
74            "RELEASE" => {
75                match controller.release_pipeline().await {
76                    DebugReleaseResult::Released => ok_value(),
77                    DebugReleaseResult::Awaiting => error_value("awaiting step"),
78                    DebugReleaseResult::NoStep => error_value("no step waiting"),
79                }
80            }
81            "PAUSE" => {
82                controller.pause_release().await;
83                ok_value()
84            }
85            _ => error_value("unknown command"),
86        };
87
88        let payload = response.to_json(JsonMode::Inline);
89        writer.write_all(payload.as_bytes()).await?;
90        writer.write_all(b"\n").await?;
91    }
92
93    Ok(())
94}
95
96fn debug_addr() -> String {
97    let port = std::env::var("PHLOW_DEBUG_PORT")
98        .ok()
99        .and_then(|value| value.parse::<u16>().ok())
100        .unwrap_or(DEFAULT_DEBUG_PORT);
101    format!("0.0.0.0:{}", port)
102}
103
104fn snapshot_to_value(snapshot: &DebugSnapshot) -> Value {
105    let payload = snapshot.context.payload.clone().unwrap_or(Value::Null);
106    let main = snapshot.context.main.clone().unwrap_or(Value::Null);
107    let mut context_map = HashMap::new();
108    context_map.insert("payload".to_string(), payload);
109    context_map.insert("main".to_string(), main);
110
111    let mut map = HashMap::new();
112    map.insert("context".to_string(), context_map.to_value());
113    map.insert("step".to_string(), snapshot.step.clone());
114    map.insert(
115        "pipeline".to_string(),
116        (snapshot.pipeline as i64).to_value(),
117    );
118
119    map.to_value()
120}
121
122fn ok_value() -> Value {
123    let mut map = HashMap::new();
124    map.insert("ok".to_string(), true.to_value());
125    map.to_value()
126}
127
128fn error_value(message: &str) -> Value {
129    let mut map = HashMap::new();
130    map.insert("ok".to_string(), false.to_value());
131    map.insert("error".to_string(), message.to_value());
132    map.to_value()
133}