Skip to main content

synwire_dap/
transport.rs

1//! Low-level DAP transport managing child process I/O and request correlation.
2//!
3//! Spawns a debug adapter as a child process, wraps its stdin/stdout in a
4//! Content-Length framed codec, and correlates request/response pairs by
5//! DAP sequence number.
6
7use std::collections::HashMap;
8use std::process::Stdio;
9use std::sync::Arc;
10use std::sync::atomic::{AtomicI64, Ordering};
11
12use futures_util::{SinkExt, StreamExt};
13use tokio::process::{Child, ChildStdout, Command};
14use tokio::sync::{Mutex, RwLock, oneshot};
15use tokio_util::codec::{FramedRead, FramedWrite};
16
17use crate::codec::ContentLengthCodec;
18use crate::error::DapError;
19
20/// Callback type for DAP events received from the adapter.
21pub type EventHandler = Arc<dyn Fn(serde_json::Value) + Send + Sync>;
22
23/// Map of pending request sequence numbers to their response channels.
24type PendingMap = HashMap<i64, oneshot::Sender<serde_json::Value>>;
25
26/// Low-level DAP transport managing child process I/O and request correlation.
27///
28/// The transport spawns a background task that reads framed messages from the
29/// adapter's stdout. Responses are correlated to pending requests by their
30/// `request_seq` field; events are forwarded to an [`EventHandler`] callback.
31pub struct DapTransport {
32    writer: Arc<Mutex<FramedWrite<tokio::process::ChildStdin, ContentLengthCodec>>>,
33    pending: Arc<RwLock<PendingMap>>,
34    event_handler: EventHandler,
35    next_seq: AtomicI64,
36    _child: Arc<Mutex<Child>>,
37    _read_handle: tokio::task::JoinHandle<()>,
38}
39
40impl DapTransport {
41    /// Spawn a debug adapter process and set up framed transport.
42    ///
43    /// # Errors
44    ///
45    /// Returns [`DapError::Io`] if the process cannot be spawned, or
46    /// [`DapError::Transport`] if stdin/stdout cannot be captured.
47    pub fn spawn(
48        command: &str,
49        args: &[String],
50        env: &HashMap<String, String>,
51        event_handler: EventHandler,
52    ) -> Result<Self, DapError> {
53        let mut cmd = Command::new(command);
54        let _ = cmd
55            .args(args)
56            .envs(env)
57            .stdin(Stdio::piped())
58            .stdout(Stdio::piped())
59            .stderr(Stdio::null());
60
61        let mut child = cmd.spawn()?;
62
63        let stdin = child
64            .stdin
65            .take()
66            .ok_or_else(|| DapError::Transport("failed to capture adapter stdin".into()))?;
67        let stdout = child
68            .stdout
69            .take()
70            .ok_or_else(|| DapError::Transport("failed to capture adapter stdout".into()))?;
71
72        let writer = Arc::new(Mutex::new(FramedWrite::new(
73            stdin,
74            ContentLengthCodec::new(),
75        )));
76        let reader = FramedRead::new(stdout, ContentLengthCodec::new());
77
78        let pending: Arc<RwLock<PendingMap>> = Arc::new(RwLock::new(HashMap::new()));
79        let child_arc = Arc::new(Mutex::new(child));
80
81        let read_handle =
82            Self::spawn_reader(reader, Arc::clone(&pending), Arc::clone(&event_handler));
83
84        Ok(Self {
85            writer,
86            pending,
87            event_handler,
88            next_seq: AtomicI64::new(1),
89            _child: child_arc,
90            _read_handle: read_handle,
91        })
92    }
93
94    /// Send a DAP request and wait for the correlated response.
95    ///
96    /// # Errors
97    ///
98    /// Returns [`DapError::Transport`] if the write fails, [`DapError::Timeout`]
99    /// if the response channel is dropped, or [`DapError::RequestFailed`] if
100    /// the adapter returns `success: false`.
101    pub async fn send_request(
102        &self,
103        command: &str,
104        arguments: serde_json::Value,
105    ) -> Result<serde_json::Value, DapError> {
106        let seq = self.next_seq.fetch_add(1, Ordering::Relaxed);
107
108        let request = serde_json::json!({
109            "seq": seq,
110            "type": "request",
111            "command": command,
112            "arguments": arguments,
113        });
114
115        let (tx, rx) = oneshot::channel();
116
117        {
118            let mut pending = self.pending.write().await;
119            let _ = pending.insert(seq, tx);
120        }
121
122        self.writer
123            .lock()
124            .await
125            .send(request)
126            .await
127            .map_err(|e| DapError::Transport(format!("failed to send request: {e}")))?;
128
129        tracing::debug!(seq, command, "DAP request sent");
130
131        let response = rx.await.map_err(|_| DapError::Timeout)?;
132
133        // Check for error responses.
134        let success = response
135            .get("success")
136            .and_then(serde_json::Value::as_bool)
137            .unwrap_or(false);
138
139        if !success {
140            let message = response
141                .get("message")
142                .and_then(serde_json::Value::as_str)
143                .unwrap_or("unknown error")
144                .to_string();
145            return Err(DapError::RequestFailed {
146                command: command.to_string(),
147                message,
148            });
149        }
150
151        Ok(response)
152    }
153
154    /// Access the event handler (used by plugin to register callbacks).
155    #[must_use]
156    pub fn event_handler(&self) -> &EventHandler {
157        &self.event_handler
158    }
159
160    /// Spawn a background task to read messages from the adapter and dispatch them.
161    fn spawn_reader(
162        mut reader: FramedRead<ChildStdout, ContentLengthCodec>,
163        pending: Arc<RwLock<PendingMap>>,
164        event_handler: EventHandler,
165    ) -> tokio::task::JoinHandle<()> {
166        tokio::spawn(async move {
167            while let Some(result) = reader.next().await {
168                match result {
169                    Ok(message) => {
170                        let msg_type = message
171                            .get("type")
172                            .and_then(serde_json::Value::as_str)
173                            .unwrap_or("");
174
175                        match msg_type {
176                            "response" => {
177                                let request_seq = message
178                                    .get("request_seq")
179                                    .and_then(serde_json::Value::as_i64)
180                                    .unwrap_or(-1);
181
182                                let mut pending_guard = pending.write().await;
183                                if let Some(tx) = pending_guard.remove(&request_seq) {
184                                    if tx.send(message).is_err() {
185                                        tracing::warn!(request_seq, "Response receiver dropped");
186                                    }
187                                } else {
188                                    tracing::warn!(request_seq, "No pending request for response");
189                                }
190                            }
191                            "event" => {
192                                let event_name = message
193                                    .get("event")
194                                    .and_then(serde_json::Value::as_str)
195                                    .unwrap_or("unknown");
196                                tracing::debug!(event = event_name, "DAP event received");
197                                event_handler(message);
198                            }
199                            other => {
200                                tracing::debug!(msg_type = other, "Unknown DAP message type");
201                            }
202                        }
203                    }
204                    Err(e) => {
205                        tracing::warn!(error = %e, "DAP transport read error");
206                        break;
207                    }
208                }
209            }
210            tracing::debug!("DAP reader task exiting");
211        })
212    }
213}