phlow_runtime/
debug_server.rs1use phlow_engine::debug::{DebugController, DebugReleaseResult, DebugSnapshot};
2use phlow_sdk::prelude::{JsonMode, ToValueBehavior, Value};
3use std::collections::HashMap;
4use std::sync::Arc;
5use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
6use tokio::net::{TcpListener, TcpStream};
7
8const DEFAULT_DEBUG_PORT: u16 = 31400;
9
10pub async fn spawn(controller: Arc<DebugController>) -> std::io::Result<()> {
11 let addr = debug_addr();
12 let listener = TcpListener::bind(&addr).await?;
13 log::info!("Phlow debug server listening on {}", addr);
14
15 tokio::spawn(async move {
16 if let Err(err) = serve(listener, controller).await {
17 log::error!("Phlow debug server failed: {}", err);
18 }
19 });
20
21 Ok(())
22}
23
24async fn serve(
25 listener: TcpListener,
26 controller: Arc<DebugController>,
27) -> std::io::Result<()> {
28 loop {
29 let (stream, addr) = listener.accept().await?;
30 log::debug!("Debug client connected: {}", addr);
31 let controller = controller.clone();
32 tokio::spawn(async move {
33 if let Err(err) = handle_client(stream, controller).await {
34 log::debug!("Debug client error: {}", err);
35 }
36 });
37 }
38}
39
40async fn handle_client(
41 stream: TcpStream,
42 controller: Arc<DebugController>,
43) -> std::io::Result<()> {
44 let (reader, mut writer) = stream.into_split();
45 let mut lines = BufReader::new(reader).lines();
46
47 while let Some(line) = lines.next_line().await? {
48 let cmd = line.trim();
49 if cmd.is_empty() {
50 continue;
51 }
52
53 let response = match cmd.to_ascii_uppercase().as_str() {
54 "STEP" => match controller.current_snapshot().await {
55 Some(snapshot) => snapshot_to_value(&snapshot),
56 None => error_value("no step waiting"),
57 },
58 "SHOW" => match controller.show_script().await {
59 Some(script) => script,
60 None => error_value("no script loaded"),
61 },
62 "NEXT" => {
63 match controller.release_next().await {
64 DebugReleaseResult::Released => ok_value(),
65 DebugReleaseResult::Awaiting => error_value("awaiting step"),
66 DebugReleaseResult::NoStep => error_value("no step waiting"),
67 }
68 }
69 "ALL" => {
70 let history = controller.history().await;
71 let values: Vec<Value> = history.iter().map(snapshot_to_value).collect();
72 values.to_value()
73 }
74 "RELEASE" => {
75 match controller.release_pipeline().await {
76 DebugReleaseResult::Released => ok_value(),
77 DebugReleaseResult::Awaiting => error_value("awaiting step"),
78 DebugReleaseResult::NoStep => error_value("no step waiting"),
79 }
80 }
81 "PAUSE" => {
82 controller.pause_release().await;
83 ok_value()
84 }
85 _ => error_value("unknown command"),
86 };
87
88 let payload = response.to_json(JsonMode::Inline);
89 writer.write_all(payload.as_bytes()).await?;
90 writer.write_all(b"\n").await?;
91 }
92
93 Ok(())
94}
95
96fn debug_addr() -> String {
97 let port = std::env::var("PHLOW_DEBUG_PORT")
98 .ok()
99 .and_then(|value| value.parse::<u16>().ok())
100 .unwrap_or(DEFAULT_DEBUG_PORT);
101 format!("0.0.0.0:{}", port)
102}
103
104fn snapshot_to_value(snapshot: &DebugSnapshot) -> Value {
105 let payload = snapshot.context.payload.clone().unwrap_or(Value::Null);
106 let main = snapshot.context.main.clone().unwrap_or(Value::Null);
107 let mut context_map = HashMap::new();
108 context_map.insert("payload".to_string(), payload);
109 context_map.insert("main".to_string(), main);
110
111 let mut map = HashMap::new();
112 map.insert("context".to_string(), context_map.to_value());
113 map.insert("step".to_string(), snapshot.step.clone());
114 map.insert(
115 "pipeline".to_string(),
116 (snapshot.pipeline as i64).to_value(),
117 );
118
119 map.to_value()
120}
121
122fn ok_value() -> Value {
123 let mut map = HashMap::new();
124 map.insert("ok".to_string(), true.to_value());
125 map.to_value()
126}
127
128fn error_value(message: &str) -> Value {
129 let mut map = HashMap::new();
130 map.insert("ok".to_string(), false.to_value());
131 map.insert("error".to_string(), message.to_value());
132 map.to_value()
133}