1use std::collections::HashMap;
8use std::process::Stdio;
9use std::sync::Arc;
10use std::sync::atomic::{AtomicI64, Ordering};
11
12use futures_util::{SinkExt, StreamExt};
13use tokio::process::{Child, ChildStdout, Command};
14use tokio::sync::{Mutex, RwLock, oneshot};
15use tokio_util::codec::{FramedRead, FramedWrite};
16
17use crate::codec::ContentLengthCodec;
18use crate::error::DapError;
19
20pub type EventHandler = Arc<dyn Fn(serde_json::Value) + Send + Sync>;
22
23type PendingMap = HashMap<i64, oneshot::Sender<serde_json::Value>>;
25
26pub struct DapTransport {
32 writer: Arc<Mutex<FramedWrite<tokio::process::ChildStdin, ContentLengthCodec>>>,
33 pending: Arc<RwLock<PendingMap>>,
34 event_handler: EventHandler,
35 next_seq: AtomicI64,
36 _child: Arc<Mutex<Child>>,
37 _read_handle: tokio::task::JoinHandle<()>,
38}
39
40impl DapTransport {
41 pub fn spawn(
48 command: &str,
49 args: &[String],
50 env: &HashMap<String, String>,
51 event_handler: EventHandler,
52 ) -> Result<Self, DapError> {
53 let mut cmd = Command::new(command);
54 let _ = cmd
55 .args(args)
56 .envs(env)
57 .stdin(Stdio::piped())
58 .stdout(Stdio::piped())
59 .stderr(Stdio::null());
60
61 let mut child = cmd.spawn()?;
62
63 let stdin = child
64 .stdin
65 .take()
66 .ok_or_else(|| DapError::Transport("failed to capture adapter stdin".into()))?;
67 let stdout = child
68 .stdout
69 .take()
70 .ok_or_else(|| DapError::Transport("failed to capture adapter stdout".into()))?;
71
72 let writer = Arc::new(Mutex::new(FramedWrite::new(
73 stdin,
74 ContentLengthCodec::new(),
75 )));
76 let reader = FramedRead::new(stdout, ContentLengthCodec::new());
77
78 let pending: Arc<RwLock<PendingMap>> = Arc::new(RwLock::new(HashMap::new()));
79 let child_arc = Arc::new(Mutex::new(child));
80
81 let read_handle =
82 Self::spawn_reader(reader, Arc::clone(&pending), Arc::clone(&event_handler));
83
84 Ok(Self {
85 writer,
86 pending,
87 event_handler,
88 next_seq: AtomicI64::new(1),
89 _child: child_arc,
90 _read_handle: read_handle,
91 })
92 }
93
94 pub async fn send_request(
102 &self,
103 command: &str,
104 arguments: serde_json::Value,
105 ) -> Result<serde_json::Value, DapError> {
106 let seq = self.next_seq.fetch_add(1, Ordering::Relaxed);
107
108 let request = serde_json::json!({
109 "seq": seq,
110 "type": "request",
111 "command": command,
112 "arguments": arguments,
113 });
114
115 let (tx, rx) = oneshot::channel();
116
117 {
118 let mut pending = self.pending.write().await;
119 let _ = pending.insert(seq, tx);
120 }
121
122 self.writer
123 .lock()
124 .await
125 .send(request)
126 .await
127 .map_err(|e| DapError::Transport(format!("failed to send request: {e}")))?;
128
129 tracing::debug!(seq, command, "DAP request sent");
130
131 let response = rx.await.map_err(|_| DapError::Timeout)?;
132
133 let success = response
135 .get("success")
136 .and_then(serde_json::Value::as_bool)
137 .unwrap_or(false);
138
139 if !success {
140 let message = response
141 .get("message")
142 .and_then(serde_json::Value::as_str)
143 .unwrap_or("unknown error")
144 .to_string();
145 return Err(DapError::RequestFailed {
146 command: command.to_string(),
147 message,
148 });
149 }
150
151 Ok(response)
152 }
153
154 #[must_use]
156 pub fn event_handler(&self) -> &EventHandler {
157 &self.event_handler
158 }
159
160 fn spawn_reader(
162 mut reader: FramedRead<ChildStdout, ContentLengthCodec>,
163 pending: Arc<RwLock<PendingMap>>,
164 event_handler: EventHandler,
165 ) -> tokio::task::JoinHandle<()> {
166 tokio::spawn(async move {
167 while let Some(result) = reader.next().await {
168 match result {
169 Ok(message) => {
170 let msg_type = message
171 .get("type")
172 .and_then(serde_json::Value::as_str)
173 .unwrap_or("");
174
175 match msg_type {
176 "response" => {
177 let request_seq = message
178 .get("request_seq")
179 .and_then(serde_json::Value::as_i64)
180 .unwrap_or(-1);
181
182 let mut pending_guard = pending.write().await;
183 if let Some(tx) = pending_guard.remove(&request_seq) {
184 if tx.send(message).is_err() {
185 tracing::warn!(request_seq, "Response receiver dropped");
186 }
187 } else {
188 tracing::warn!(request_seq, "No pending request for response");
189 }
190 }
191 "event" => {
192 let event_name = message
193 .get("event")
194 .and_then(serde_json::Value::as_str)
195 .unwrap_or("unknown");
196 tracing::debug!(event = event_name, "DAP event received");
197 event_handler(message);
198 }
199 other => {
200 tracing::debug!(msg_type = other, "Unknown DAP message type");
201 }
202 }
203 }
204 Err(e) => {
205 tracing::warn!(error = %e, "DAP transport read error");
206 break;
207 }
208 }
209 }
210 tracing::debug!("DAP reader task exiting");
211 })
212 }
213}