Skip to main content

debugger/dap/
client.rs

1//! DAP client for communicating with debug adapters
2//!
3//! This module handles the communication with DAP adapters like lldb-dap,
4//! including the initialization sequence and request/response handling.
5//!
6//! ## Architecture
7//!
8//! The DapClient uses a background reader task to continuously read from the
9//! adapter's stdout (or TCP socket). This ensures that events (stopped, output,
10//! etc.) are captured immediately rather than only during request/response cycles.
11//!
12//! ```text
13//! [DAP Adapter] --stdout/tcp--> [Reader Task] --events--> [event_tx channel]
14//!                                            --responses-> [response channels]
15//! [DapClient]   --stdin/tcp-->  [DAP Adapter]
16//! ```
17//!
18//! ## Transport Modes
19//!
20//! - **Stdio**: Standard input/output (default, used by lldb-dap, debugpy)
21//! - **TCP**: TCP socket connection (used by Delve)
22
23use std::collections::HashMap;
24use std::path::Path;
25use std::process::Stdio;
26use std::sync::atomic::{AtomicI64, Ordering};
27use std::sync::Arc;
28use std::time::Duration;
29
30use std::pin::Pin;
31use std::task::{Context, Poll};
32
33use serde_json::Value;
34use tokio::io::{AsyncWrite, BufReader, BufWriter};
35use tokio::net::TcpStream;
36use tokio::process::{Child, ChildStdin, ChildStdout, Command};
37use tokio::sync::{mpsc, oneshot, Mutex};
38
39use crate::common::{Error, Result};
40
41use super::codec;
42use super::types::*;
43
44/// Pending response waiters, keyed by request sequence number
45type PendingResponses = Arc<Mutex<HashMap<i64, oneshot::Sender<std::result::Result<ResponseMessage, Error>>>>>;
46
47/// Abstraction over different writer types (stdin or TCP)
48enum DapWriter {
49    Stdio(BufWriter<ChildStdin>),
50    Tcp(BufWriter<tokio::io::WriteHalf<TcpStream>>),
51}
52
53impl AsyncWrite for DapWriter {
54    fn poll_write(
55        self: Pin<&mut Self>,
56        cx: &mut Context<'_>,
57        buf: &[u8],
58    ) -> Poll<std::io::Result<usize>> {
59        match self.get_mut() {
60            DapWriter::Stdio(w) => Pin::new(w).poll_write(cx, buf),
61            DapWriter::Tcp(w) => Pin::new(w).poll_write(cx, buf),
62        }
63    }
64
65    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
66        match self.get_mut() {
67            DapWriter::Stdio(w) => Pin::new(w).poll_flush(cx),
68            DapWriter::Tcp(w) => Pin::new(w).poll_flush(cx),
69        }
70    }
71
72    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
73        match self.get_mut() {
74            DapWriter::Stdio(w) => Pin::new(w).poll_shutdown(cx),
75            DapWriter::Tcp(w) => Pin::new(w).poll_shutdown(cx),
76        }
77    }
78}
79
80/// DAP client for communicating with a debug adapter
81pub struct DapClient {
82    /// Adapter subprocess
83    adapter: Child,
84    /// Buffered writer for adapter communication
85    writer: DapWriter,
86    /// Sequence number for requests
87    seq: AtomicI64,
88    /// Adapter capabilities (populated after initialize)
89    pub capabilities: Capabilities,
90    /// Pending response waiters
91    pending: PendingResponses,
92    /// Channel for events (to session)
93    event_tx: mpsc::UnboundedSender<Event>,
94    /// Receiver for events (given to session)
95    event_rx: Option<mpsc::UnboundedReceiver<Event>>,
96    /// Handle to the background reader task
97    reader_task: Option<tokio::task::JoinHandle<()>>,
98    /// Channel to signal reader task to stop
99    shutdown_tx: Option<mpsc::Sender<()>>,
100}
101
102impl DapClient {
103    /// Spawn a new DAP adapter and create a client
104    pub async fn spawn(adapter_path: &Path, args: &[String]) -> Result<Self> {
105        let mut cmd = Command::new(adapter_path);
106        cmd.args(args)
107            .stdin(Stdio::piped())
108            .stdout(Stdio::piped())
109            .stderr(Stdio::inherit()); // Let adapter errors go to stderr
110
111        let mut adapter = cmd.spawn().map_err(|e| {
112            Error::AdapterStartFailed(format!(
113                "Failed to start {}: {}",
114                adapter_path.display(),
115                e
116            ))
117        })?;
118
119        let stdin = adapter
120            .stdin
121            .take()
122            .ok_or_else(|| Error::AdapterStartFailed("Failed to get adapter stdin".to_string()))?;
123        let stdout = adapter.stdout.take().ok_or_else(|| {
124            Error::AdapterStartFailed("Failed to get adapter stdout".to_string())
125        })?;
126
127        let (event_tx, event_rx) = mpsc::unbounded_channel();
128        let (shutdown_tx, shutdown_rx) = mpsc::channel(1);
129        let pending: PendingResponses = Arc::new(Mutex::new(HashMap::new()));
130
131        // Spawn background reader task
132        let reader_task = Self::spawn_stdio_reader_task(
133            stdout,
134            event_tx.clone(),
135            pending.clone(),
136            shutdown_rx,
137        );
138
139        Ok(Self {
140            adapter,
141            writer: DapWriter::Stdio(BufWriter::new(stdin)),
142            seq: AtomicI64::new(1),
143            capabilities: Capabilities::default(),
144            pending,
145            event_tx,
146            event_rx: Some(event_rx),
147            reader_task: Some(reader_task),
148            shutdown_tx: Some(shutdown_tx),
149        })
150    }
151
152    /// Spawn a new DAP adapter that uses TCP for communication (e.g., Delve, js-debug)
153    pub async fn spawn_tcp(
154        adapter_path: &Path,
155        args: &[String],
156        spawn_style: &crate::common::config::TcpSpawnStyle,
157    ) -> Result<Self> {
158        use crate::common::parse_listen_address;
159        use tokio::io::{AsyncBufReadExt, BufReader as TokioBufReader};
160
161        let (mut adapter, addr) = match spawn_style {
162            crate::common::config::TcpSpawnStyle::TcpListen => {
163                let mut cmd = Command::new(adapter_path);
164                cmd.args(args)
165                    .arg("--listen=127.0.0.1:0")
166                    .stdin(Stdio::null())
167                    .stdout(Stdio::piped())
168                    .stderr(Stdio::piped());
169
170                let mut adapter = cmd.spawn().map_err(|e| {
171                    Error::AdapterStartFailed(format!(
172                        "Failed to start {}: {}",
173                        adapter_path.display(),
174                        e
175                    ))
176                })?;
177
178                let stdout = adapter.stdout.take().ok_or_else(|| {
179                    let _ = adapter.start_kill();
180                    Error::AdapterStartFailed("Failed to get adapter stdout".to_string())
181                })?;
182
183                let mut stdout_reader = TokioBufReader::new(stdout);
184                let mut line = String::new();
185
186                let addr_result = tokio::time::timeout(Duration::from_secs(10), async {
187                    loop {
188                        line.clear();
189                        let bytes_read = stdout_reader.read_line(&mut line).await.map_err(|e| {
190                            Error::AdapterStartFailed(format!("Failed to read adapter output: {}", e))
191                        })?;
192
193                        if bytes_read == 0 {
194                            return Err(Error::AdapterStartFailed(
195                                "Adapter exited before outputting listen address".to_string(),
196                            ));
197                        }
198
199                        tracing::debug!("Adapter output: {}", line.trim());
200
201                        if let Some(addr) = parse_listen_address(&line) {
202                            return Ok(addr);
203                        }
204                    }
205                })
206                .await;
207
208                let addr = match addr_result {
209                    Ok(Ok(addr)) => addr,
210                    Ok(Err(e)) => {
211                        let _ = adapter.start_kill();
212                        return Err(e);
213                    }
214                    Err(_) => {
215                        let _ = adapter.start_kill();
216                        return Err(Error::AdapterStartFailed(
217                            "Timeout waiting for adapter to start listening".to_string(),
218                        ));
219                    }
220                };
221
222                (adapter, addr)
223            }
224            crate::common::config::TcpSpawnStyle::TcpPortArg => {
225                use std::net::TcpListener as StdTcpListener;
226
227                let listener = StdTcpListener::bind("127.0.0.1:0").map_err(|e| {
228                    Error::AdapterStartFailed(format!("Failed to allocate port: {}", e))
229                })?;
230                let port = listener.local_addr().map_err(|e| {
231                    Error::AdapterStartFailed(format!("Failed to get port: {}", e))
232                })?.port();
233                // Race condition window: port released before adapter binds. Mitigated by immediate spawn + 500ms buffer.
234                // If connection fails, port may have been reallocated by OS.
235                drop(listener);
236
237                let addr = format!("127.0.0.1:{}", port);
238
239                let mut cmd = Command::new(adapter_path);
240                let mut full_args = args.to_vec();
241                full_args.push(port.to_string());
242
243                cmd.args(&full_args)
244                    .stdin(Stdio::null())
245                    .stdout(Stdio::piped())
246                    .stderr(Stdio::piped());
247
248                let adapter = cmd.spawn().map_err(|e| {
249                    Error::AdapterStartFailed(format!(
250                        "Failed to start {}: {}",
251                        adapter_path.display(),
252                        e
253                    ))
254                })?;
255
256                (adapter, addr)
257            }
258        };
259
260        tracing::info!("Connecting to DAP adapter at {}", addr);
261
262        // Retry TCP connection with exponential backoff
263        // Handles adapters that need time to start listening (e.g., js-debug)
264        let stream = {
265            let mut last_error = None;
266            let mut delay = Duration::from_millis(100);
267            let max_delay = Duration::from_millis(1000);
268            let timeout_duration = Duration::from_secs(10);
269            let start = std::time::Instant::now();
270
271            loop {
272                match TcpStream::connect(&addr).await {
273                    Ok(s) => break s,
274                    Err(e) => {
275                        last_error = Some(e);
276                        if start.elapsed() >= timeout_duration {
277                            let _ = adapter.start_kill();
278                            return Err(Error::AdapterStartFailed(format!(
279                                "Failed to connect to adapter at {} after {:?}: {}",
280                                addr, timeout_duration, last_error.unwrap()
281                            )));
282                        }
283                        tokio::time::sleep(delay).await;
284                        delay = std::cmp::min(delay * 2, max_delay);
285                    }
286                }
287            }
288        };
289
290        let (read_half, write_half) = tokio::io::split(stream);
291
292        let (event_tx, event_rx) = mpsc::unbounded_channel();
293        let (shutdown_tx, shutdown_rx) = mpsc::channel(1);
294        let pending: PendingResponses = Arc::new(Mutex::new(HashMap::new()));
295
296        // Spawn background reader task for TCP
297        let reader_task = Self::spawn_tcp_reader_task(
298            read_half,
299            event_tx.clone(),
300            pending.clone(),
301            shutdown_rx,
302        );
303
304        Ok(Self {
305            adapter,
306            writer: DapWriter::Tcp(BufWriter::new(write_half)),
307            seq: AtomicI64::new(1),
308            capabilities: Capabilities::default(),
309            pending,
310            event_tx,
311            event_rx: Some(event_rx),
312            reader_task: Some(reader_task),
313            shutdown_tx: Some(shutdown_tx),
314        })
315    }
316
317    /// Spawn the background reader task for stdio-based adapters
318    fn spawn_stdio_reader_task(
319        stdout: ChildStdout,
320        event_tx: mpsc::UnboundedSender<Event>,
321        pending: PendingResponses,
322        mut shutdown_rx: mpsc::Receiver<()>,
323    ) -> tokio::task::JoinHandle<()> {
324        tokio::spawn(async move {
325            let mut reader = BufReader::new(stdout);
326
327            loop {
328                tokio::select! {
329                    biased;
330
331                    // Check for shutdown signal
332                    _ = shutdown_rx.recv() => {
333                        tracing::debug!("Reader task received shutdown signal");
334                        break;
335                    }
336
337                    // Read next message
338                    result = codec::read_message(&mut reader) => {
339                        match result {
340                            Ok(json) => {
341                                tracing::trace!("DAP <<< {}", json);
342
343                                if let Err(e) = Self::process_message(&json, &event_tx, &pending).await {
344                                    tracing::error!("Error processing DAP message: {}", e);
345                                }
346                            }
347                            Err(e) => {
348                                // Check if this is an expected EOF (adapter exited)
349                                // We check the error message string as a fallback for various error types
350                                let err_str = e.to_string().to_lowercase();
351                                let is_eof = err_str.contains("unexpected eof")
352                                    || err_str.contains("unexpectedeof")
353                                    || err_str.contains("end of file");
354
355                                if is_eof {
356                                    tracing::info!("DAP adapter closed connection");
357                                } else {
358                                    tracing::error!("Error reading from DAP adapter: {}", e);
359                                }
360
361                                // Signal error to any pending requests
362                                let mut pending_guard = pending.lock().await;
363                                for (_, tx) in pending_guard.drain() {
364                                    let _ = tx.send(Err(Error::AdapterCrashed));
365                                }
366
367                                // Send terminated event to notify the session
368                                let _ = event_tx.send(Event::Terminated(None));
369                                break;
370                            }
371                        }
372                    }
373                }
374            }
375
376            tracing::debug!("Reader task exiting");
377        })
378    }
379
380    /// Spawn the background reader task for TCP-based adapters
381    fn spawn_tcp_reader_task(
382        read_half: tokio::io::ReadHalf<TcpStream>,
383        event_tx: mpsc::UnboundedSender<Event>,
384        pending: PendingResponses,
385        mut shutdown_rx: mpsc::Receiver<()>,
386    ) -> tokio::task::JoinHandle<()> {
387        tokio::spawn(async move {
388            let mut reader = BufReader::new(read_half);
389
390            loop {
391                tokio::select! {
392                    biased;
393
394                    // Check for shutdown signal
395                    _ = shutdown_rx.recv() => {
396                        tracing::debug!("TCP reader task received shutdown signal");
397                        break;
398                    }
399
400                    // Read next message
401                    result = codec::read_message(&mut reader) => {
402                        match result {
403                            Ok(json) => {
404                                tracing::trace!("DAP <<< {}", json);
405
406                                if let Err(e) = Self::process_message(&json, &event_tx, &pending).await {
407                                    tracing::error!("Error processing DAP message: {}", e);
408                                }
409                            }
410                            Err(e) => {
411                                let err_str = e.to_string().to_lowercase();
412                                let is_eof = err_str.contains("unexpected eof")
413                                    || err_str.contains("unexpectedeof")
414                                    || err_str.contains("end of file")
415                                    || err_str.contains("connection reset");
416
417                                if is_eof {
418                                    tracing::info!("DAP adapter closed TCP connection");
419                                } else {
420                                    tracing::error!("Error reading from DAP adapter (TCP): {}", e);
421                                }
422
423                                // Signal error to any pending requests
424                                let mut pending_guard = pending.lock().await;
425                                for (_, tx) in pending_guard.drain() {
426                                    let _ = tx.send(Err(Error::AdapterCrashed));
427                                }
428
429                                // Send terminated event to notify the session
430                                let _ = event_tx.send(Event::Terminated(None));
431                                break;
432                            }
433                        }
434                    }
435                }
436            }
437
438            tracing::debug!("TCP reader task exiting");
439        })
440    }
441
442    /// Process a single message from the adapter
443    async fn process_message(
444        json: &str,
445        event_tx: &mpsc::UnboundedSender<Event>,
446        pending: &PendingResponses,
447    ) -> Result<()> {
448        let msg: Value = serde_json::from_str(json)
449            .map_err(|e| Error::DapProtocol(format!("Invalid JSON: {}", e)))?;
450
451        let msg_type = msg
452            .get("type")
453            .and_then(|v| v.as_str())
454            .unwrap_or("unknown");
455
456        match msg_type {
457            "response" => {
458                let response: ResponseMessage = serde_json::from_value(msg)?;
459                let seq = response.request_seq;
460
461                let mut pending_guard = pending.lock().await;
462                if let Some(tx) = pending_guard.remove(&seq) {
463                    let _ = tx.send(Ok(response));
464                } else {
465                    tracing::warn!("Received response for unknown request seq {}", seq);
466                }
467            }
468            "event" => {
469                let event_msg: EventMessage = serde_json::from_value(msg)?;
470                let event = Event::from_message(&event_msg);
471                let _ = event_tx.send(event);
472            }
473            _ => {
474                tracing::warn!("Unknown message type: {}", msg_type);
475            }
476        }
477
478        Ok(())
479    }
480
481    /// Take the event receiver (can only be called once)
482    pub fn take_event_receiver(&mut self) -> Option<mpsc::UnboundedReceiver<Event>> {
483        self.event_rx.take()
484    }
485
486    /// Get the next sequence number
487    fn next_seq(&self) -> i64 {
488        self.seq.fetch_add(1, Ordering::SeqCst)
489    }
490
491    /// Send a request and return its sequence number
492    async fn send_request(&mut self, command: &str, arguments: Option<Value>) -> Result<i64> {
493        let seq = self.next_seq();
494
495        // Build request with or without arguments field
496        let request = if let Some(args) = arguments {
497            serde_json::json!({
498                "seq": seq,
499                "type": "request",
500                "command": command,
501                "arguments": args
502            })
503        } else {
504            serde_json::json!({
505                "seq": seq,
506                "type": "request",
507                "command": command
508            })
509        };
510
511        let json = serde_json::to_string(&request)?;
512        tracing::trace!("DAP >>> {}", json);
513
514        codec::write_message(&mut self.writer, &json).await?;
515
516        Ok(seq)
517    }
518
519    /// Send a request and wait for the response with timeout
520    pub async fn request<T: serde::de::DeserializeOwned>(
521        &mut self,
522        command: &str,
523        arguments: Option<Value>,
524    ) -> Result<T> {
525        self.request_with_timeout(command, arguments, Duration::from_secs(30)).await
526    }
527
528    /// Send a request and wait for the response with configurable timeout
529    ///
530    /// Note: We register the pending response handler BEFORE sending the request
531    /// to avoid a race condition where a fast adapter response arrives before
532    /// we've set up the handler.
533    pub async fn request_with_timeout<T: serde::de::DeserializeOwned>(
534        &mut self,
535        command: &str,
536        arguments: Option<Value>,
537        timeout: Duration,
538    ) -> Result<T> {
539        let seq = self.next_seq();
540
541        // Build request with or without arguments field
542        let request = if let Some(ref args) = arguments {
543            serde_json::json!({
544                "seq": seq,
545                "type": "request",
546                "command": command,
547                "arguments": args
548            })
549        } else {
550            serde_json::json!({
551                "seq": seq,
552                "type": "request",
553                "command": command
554            })
555        };
556
557        // IMPORTANT: Register the pending response handler BEFORE sending the request
558        // to avoid race condition where fast adapter responds before we're ready
559        let (tx, rx) = oneshot::channel();
560        {
561            let mut pending_guard = self.pending.lock().await;
562            pending_guard.insert(seq, tx);
563        }
564
565        // Now send the request
566        let json = serde_json::to_string(&request)?;
567        tracing::trace!("DAP >>> {}", json);
568
569        if let Err(e) = codec::write_message(&mut self.writer, &json).await {
570            // Remove the pending handler if send failed
571            let mut pending_guard = self.pending.lock().await;
572            pending_guard.remove(&seq);
573            return Err(e);
574        }
575
576        // Wait for response with timeout
577        let response = tokio::time::timeout(timeout, rx)
578            .await
579            .map_err(|_| {
580                // Clean up pending handler on timeout
581                let pending = self.pending.clone();
582                tokio::spawn(async move {
583                    let mut pending_guard = pending.lock().await;
584                    pending_guard.remove(&seq);
585                });
586                Error::Timeout(timeout.as_secs())
587            })?
588            .map_err(|_| Error::AdapterCrashed)??;
589
590        if response.success {
591            let body = response.body.unwrap_or(Value::Null);
592            serde_json::from_value(body).map_err(|e| {
593                Error::DapProtocol(format!(
594                    "Failed to parse {} response: {}",
595                    command, e
596                ))
597            })
598        } else {
599            Err(Error::dap_request_failed(
600                command,
601                &response.message.unwrap_or_else(|| "Unknown error".to_string()),
602            ))
603        }
604    }
605
606    /// Poll for events - this is now non-blocking since events are already in the channel
607    /// Note: This method is kept for API compatibility but is no longer necessary
608    /// since the background reader task handles all event ingestion
609    pub async fn poll_event(&mut self) -> Result<Option<Event>> {
610        // Events are now handled by the background reader task
611        // and delivered through the event channel.
612        // This method is kept for backward compatibility.
613        Ok(None)
614    }
615
616    /// Initialize the debug adapter
617    pub async fn initialize(&mut self, adapter_id: &str) -> Result<Capabilities> {
618        self.initialize_with_timeout(adapter_id, Duration::from_secs(10)).await
619    }
620
621    /// Initialize the debug adapter with configurable timeout
622    pub async fn initialize_with_timeout(
623        &mut self,
624        adapter_id: &str,
625        timeout: Duration,
626    ) -> Result<Capabilities> {
627        let args = InitializeArguments {
628            adapter_id: adapter_id.to_string(),
629            ..Default::default()
630        };
631
632        let caps: Capabilities = self
633            .request_with_timeout("initialize", Some(serde_json::to_value(&args)?), timeout)
634            .await?;
635
636        self.capabilities = caps.clone();
637        Ok(caps)
638    }
639
640    /// Wait for the initialized event with timeout
641    ///
642    /// This method waits for the initialized event which comes through the event channel.
643    /// It's called before the session takes the event receiver.
644    pub async fn wait_initialized(&mut self) -> Result<()> {
645        self.wait_initialized_with_timeout(Duration::from_secs(30)).await
646    }
647
648    /// Wait for the initialized event with configurable timeout
649    ///
650    /// ## Event Ordering Note
651    ///
652    /// This method consumes events from the channel until it sees `Initialized`.
653    /// Non-Initialized events are re-sent to the channel so they won't be lost.
654    /// This is safe because:
655    /// 1. The session hasn't taken the receiver yet (wait_initialized is called during setup)
656    /// 2. The re-sent events go back to the same unbounded channel
657    /// 3. The background reader task continues adding new events after our re-sent ones
658    ///
659    /// Events will be received in order: [re-sent events] + [new events from reader]
660    pub async fn wait_initialized_with_timeout(&mut self, timeout: Duration) -> Result<()> {
661        // The event receiver is typically taken by the session after initialization,
662        // but wait_initialized is called before that, so we should still have it
663        if let Some(ref mut rx) = self.event_rx {
664            let deadline = tokio::time::Instant::now() + timeout;
665
666            loop {
667                let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
668                if remaining.is_zero() {
669                    return Err(Error::Timeout(timeout.as_secs()));
670                }
671
672                match tokio::time::timeout(remaining, rx.recv()).await {
673                    Ok(Some(event)) => {
674                        if matches!(event, Event::Initialized) {
675                            return Ok(());
676                        }
677                        // Re-send other events so they're not lost when session takes the receiver.
678                        // This maintains event ordering: these events arrived before Initialized,
679                        // so they'll be received first when the session starts processing.
680                        let _ = self.event_tx.send(event);
681                    }
682                    Ok(None) => {
683                        return Err(Error::AdapterCrashed);
684                    }
685                    Err(_) => {
686                        return Err(Error::Timeout(timeout.as_secs()));
687                    }
688                }
689            }
690        } else {
691            // Event receiver already taken - this shouldn't happen in normal flow
692            Err(Error::Internal("Event receiver already taken before wait_initialized".to_string()))
693        }
694    }
695
696    /// Launch a program for debugging
697    pub async fn launch(&mut self, args: LaunchArguments) -> Result<()> {
698        self.request::<Value>("launch", Some(serde_json::to_value(&args)?))
699            .await?;
700        Ok(())
701    }
702
703    /// Launch a program for debugging without waiting for response
704    /// 
705    /// Some debuggers (like debugpy) don't respond to launch until after
706    /// configurationDone is sent. This sends the launch request but doesn't
707    /// wait for the response.
708    pub async fn launch_no_wait(&mut self, args: LaunchArguments) -> Result<i64> {
709        self.send_request("launch", Some(serde_json::to_value(&args)?)).await
710    }
711
712    /// Attach to a running process
713    pub async fn attach(&mut self, args: AttachArguments) -> Result<()> {
714        self.request::<Value>("attach", Some(serde_json::to_value(&args)?))
715            .await?;
716        Ok(())
717    }
718
719    /// Signal that configuration is done
720    pub async fn configuration_done(&mut self) -> Result<()> {
721        self.request::<Value>("configurationDone", None).await?;
722        Ok(())
723    }
724
725    /// Set breakpoints for a source file
726    pub async fn set_breakpoints(
727        &mut self,
728        source_path: &Path,
729        breakpoints: Vec<SourceBreakpoint>,
730    ) -> Result<Vec<Breakpoint>> {
731        let args = SetBreakpointsArguments {
732            source: Source {
733                path: Some(source_path.to_string_lossy().into_owned()),
734                ..Default::default()
735            },
736            breakpoints,
737        };
738
739        let response: SetBreakpointsResponseBody = self
740            .request("setBreakpoints", Some(serde_json::to_value(&args)?))
741            .await?;
742
743        Ok(response.breakpoints)
744    }
745
746    /// Set function breakpoints
747    pub async fn set_function_breakpoints(
748        &mut self,
749        breakpoints: Vec<FunctionBreakpoint>,
750    ) -> Result<Vec<Breakpoint>> {
751        let args = SetFunctionBreakpointsArguments { breakpoints };
752
753        let response: SetBreakpointsResponseBody = self
754            .request(
755                "setFunctionBreakpoints",
756                Some(serde_json::to_value(&args)?),
757            )
758            .await?;
759
760        Ok(response.breakpoints)
761    }
762
763    /// Continue execution
764    pub async fn continue_execution(&mut self, thread_id: i64) -> Result<bool> {
765        let args = ContinueArguments {
766            thread_id,
767            single_thread: false,
768        };
769
770        let response: ContinueResponseBody = self
771            .request("continue", Some(serde_json::to_value(&args)?))
772            .await?;
773
774        Ok(response.all_threads_continued)
775    }
776
777    /// Step over (next)
778    pub async fn next(&mut self, thread_id: i64) -> Result<()> {
779        let args = StepArguments {
780            thread_id,
781            granularity: Some("statement".to_string()),
782        };
783
784        self.request::<Value>("next", Some(serde_json::to_value(&args)?))
785            .await?;
786        Ok(())
787    }
788
789    /// Step into
790    pub async fn step_in(&mut self, thread_id: i64) -> Result<()> {
791        let args = StepArguments {
792            thread_id,
793            granularity: Some("statement".to_string()),
794        };
795
796        self.request::<Value>("stepIn", Some(serde_json::to_value(&args)?))
797            .await?;
798        Ok(())
799    }
800
801    /// Step out
802    pub async fn step_out(&mut self, thread_id: i64) -> Result<()> {
803        let args = StepArguments {
804            thread_id,
805            granularity: Some("statement".to_string()),
806        };
807
808        self.request::<Value>("stepOut", Some(serde_json::to_value(&args)?))
809            .await?;
810        Ok(())
811    }
812
813    /// Pause execution
814    pub async fn pause(&mut self, thread_id: i64) -> Result<()> {
815        let args = PauseArguments { thread_id };
816
817        self.request::<Value>("pause", Some(serde_json::to_value(&args)?))
818            .await?;
819        Ok(())
820    }
821
822    /// Get stack trace
823    pub async fn stack_trace(&mut self, thread_id: i64, levels: i64) -> Result<Vec<StackFrame>> {
824        let args = StackTraceArguments {
825            thread_id,
826            start_frame: Some(0),
827            levels: Some(levels),
828        };
829
830        let response: StackTraceResponseBody = self
831            .request("stackTrace", Some(serde_json::to_value(&args)?))
832            .await?;
833
834        Ok(response.stack_frames)
835    }
836
837    /// Get threads
838    pub async fn threads(&mut self) -> Result<Vec<Thread>> {
839        let response: ThreadsResponseBody = self.request("threads", None).await?;
840        Ok(response.threads)
841    }
842
843    /// Get scopes for a frame
844    pub async fn scopes(&mut self, frame_id: i64) -> Result<Vec<Scope>> {
845        let args = ScopesArguments { frame_id };
846
847        let response: ScopesResponseBody = self
848            .request("scopes", Some(serde_json::to_value(&args)?))
849            .await?;
850
851        Ok(response.scopes)
852    }
853
854    /// Get variables
855    pub async fn variables(&mut self, variables_reference: i64) -> Result<Vec<Variable>> {
856        let args = VariablesArguments {
857            variables_reference,
858            start: None,
859            count: None,
860        };
861
862        let response: VariablesResponseBody = self
863            .request("variables", Some(serde_json::to_value(&args)?))
864            .await?;
865
866        Ok(response.variables)
867    }
868
869    /// Evaluate an expression
870    pub async fn evaluate(
871        &mut self,
872        expression: &str,
873        frame_id: Option<i64>,
874        context: &str,
875    ) -> Result<EvaluateResponseBody> {
876        let args = EvaluateArguments {
877            expression: expression.to_string(),
878            frame_id,
879            context: Some(context.to_string()),
880        };
881
882        self.request("evaluate", Some(serde_json::to_value(&args)?))
883            .await
884    }
885
886    /// Disconnect from the debug adapter
887    pub async fn disconnect(&mut self, terminate_debuggee: bool) -> Result<()> {
888        let args = DisconnectArguments {
889            restart: false,
890            terminate_debuggee: Some(terminate_debuggee),
891        };
892
893        // Don't wait for response - adapter might exit immediately
894        let _ = self
895            .send_request("disconnect", Some(serde_json::to_value(&args)?))
896            .await;
897
898        Ok(())
899    }
900
901    /// Terminate the adapter process and clean up resources
902    pub async fn terminate(&mut self) -> Result<()> {
903        // Try graceful disconnect first
904        let _ = self.disconnect(true).await;
905
906        // Signal the reader task to stop
907        if let Some(tx) = self.shutdown_tx.take() {
908            let _ = tx.send(()).await;
909        }
910
911        // Wait a bit for clean shutdown
912        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
913
914        // Wait for reader task to finish
915        if let Some(task) = self.reader_task.take() {
916            // Give it a short timeout
917            let _ = tokio::time::timeout(
918                Duration::from_millis(500),
919                task,
920            ).await;
921        }
922
923        // Force kill if still running
924        let _ = self.adapter.kill().await;
925
926        Ok(())
927    }
928
929    /// Check if the adapter is still running
930    pub fn is_running(&mut self) -> bool {
931        self.adapter.try_wait().ok().flatten().is_none()
932    }
933
934    /// Restart the debug session (for adapters that support it)
935    pub async fn restart(&mut self, no_debug: bool) -> Result<()> {
936        if !self.capabilities.supports_restart_request {
937            return Err(Error::Internal(
938                "Debug adapter does not support restart".to_string(),
939            ));
940        }
941
942        let args = serde_json::json!({
943            "noDebug": no_debug
944        });
945
946        self.request::<Value>("restart", Some(args)).await?;
947        Ok(())
948    }
949}
950
951impl Drop for DapClient {
952    /// Best-effort cleanup on drop.
953    ///
954    /// ## Limitations
955    ///
956    /// Since we can't await in `drop()`, this is necessarily imperfect:
957    /// - `try_send` may fail if the shutdown channel is full (unlikely with capacity 1)
958    /// - `task.abort()` is immediate; the reader may be mid-operation
959    /// - `start_kill()` is non-blocking; the adapter may not exit immediately
960    ///
961    /// For graceful cleanup, prefer calling `terminate()` before dropping.
962    /// This Drop impl exists as a safety net to avoid leaking resources if
963    /// `terminate()` wasn't called.
964    fn drop(&mut self) {
965        // Signal shutdown to reader task (best-effort, can't await)
966        if let Some(tx) = self.shutdown_tx.take() {
967            // Use try_send since we can't await in drop
968            let _ = tx.try_send(());
969        }
970
971        // Abort the reader task if it's still running
972        // Note: This is abrupt but necessary since we can't await graceful shutdown
973        if let Some(task) = self.reader_task.take() {
974            task.abort();
975        }
976
977        // Try to kill the adapter on drop
978        // This is best-effort since we can't await in drop
979        let _ = self.adapter.start_kill();
980    }
981}