Skip to main content

debugger/dap/
client.rs

1//! DAP client for communicating with debug adapters
2//!
3//! This module handles the communication with DAP adapters like lldb-dap,
4//! including the initialization sequence and request/response handling.
5//!
6//! ## Architecture
7//!
8//! The DapClient uses a background reader task to continuously read from the
9//! adapter's stdout (or TCP socket). This ensures that events (stopped, output,
10//! etc.) are captured immediately rather than only during request/response cycles.
11//!
12//! ```text
13//! [DAP Adapter] --stdout/tcp--> [Reader Task] --events--> [event_tx channel]
14//!                                            --responses-> [response channels]
15//! [DapClient]   --stdin/tcp-->  [DAP Adapter]
16//! ```
17//!
18//! ## Transport Modes
19//!
20//! - **Stdio**: Standard input/output (default, used by lldb-dap, debugpy)
21//! - **TCP**: TCP socket connection (used by Delve)
22
23use std::collections::HashMap;
24use std::path::Path;
25use std::process::Stdio;
26use std::sync::atomic::{AtomicI64, Ordering};
27use std::sync::Arc;
28use std::time::Duration;
29
30use std::pin::Pin;
31use std::task::{Context, Poll};
32
33use serde_json::Value;
34use tokio::io::{AsyncWrite, BufReader, BufWriter};
35use tokio::net::TcpStream;
36use tokio::process::{Child, ChildStdin, ChildStdout, Command};
37use tokio::sync::{mpsc, oneshot, Mutex};
38
39use crate::common::{Error, Result};
40
41use super::codec;
42use super::types::*;
43
44/// Pending response waiters, keyed by request sequence number
45type PendingResponses = Arc<Mutex<HashMap<i64, oneshot::Sender<std::result::Result<ResponseMessage, Error>>>>>;
46
47/// Abstraction over different writer types (stdin or TCP)
48enum DapWriter {
49    Stdio(BufWriter<ChildStdin>),
50    Tcp(BufWriter<tokio::io::WriteHalf<TcpStream>>),
51}
52
53impl AsyncWrite for DapWriter {
54    fn poll_write(
55        self: Pin<&mut Self>,
56        cx: &mut Context<'_>,
57        buf: &[u8],
58    ) -> Poll<std::io::Result<usize>> {
59        match self.get_mut() {
60            DapWriter::Stdio(w) => Pin::new(w).poll_write(cx, buf),
61            DapWriter::Tcp(w) => Pin::new(w).poll_write(cx, buf),
62        }
63    }
64
65    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
66        match self.get_mut() {
67            DapWriter::Stdio(w) => Pin::new(w).poll_flush(cx),
68            DapWriter::Tcp(w) => Pin::new(w).poll_flush(cx),
69        }
70    }
71
72    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
73        match self.get_mut() {
74            DapWriter::Stdio(w) => Pin::new(w).poll_shutdown(cx),
75            DapWriter::Tcp(w) => Pin::new(w).poll_shutdown(cx),
76        }
77    }
78}
79
80/// DAP client for communicating with a debug adapter
81pub struct DapClient {
82    /// Adapter subprocess
83    adapter: Child,
84    /// Buffered writer for adapter communication
85    writer: DapWriter,
86    /// Sequence number for requests
87    seq: AtomicI64,
88    /// Adapter capabilities (populated after initialize)
89    pub capabilities: Capabilities,
90    /// Pending response waiters
91    pending: PendingResponses,
92    /// Channel for events (to session)
93    event_tx: mpsc::UnboundedSender<Event>,
94    /// Receiver for events (given to session)
95    event_rx: Option<mpsc::UnboundedReceiver<Event>>,
96    /// Handle to the background reader task
97    reader_task: Option<tokio::task::JoinHandle<()>>,
98    /// Channel to signal reader task to stop
99    shutdown_tx: Option<mpsc::Sender<()>>,
100}
101
102impl DapClient {
103    /// Spawn a new DAP adapter and create a client
104    pub async fn spawn(adapter_path: &Path, args: &[String]) -> Result<Self> {
105        let mut cmd = Command::new(adapter_path);
106        cmd.args(args)
107            .stdin(Stdio::piped())
108            .stdout(Stdio::piped())
109            .stderr(Stdio::inherit()); // Let adapter errors go to stderr
110
111        let mut adapter = cmd.spawn().map_err(|e| {
112            Error::AdapterStartFailed(format!(
113                "Failed to start {}: {}",
114                adapter_path.display(),
115                e
116            ))
117        })?;
118
119        let stdin = adapter
120            .stdin
121            .take()
122            .ok_or_else(|| Error::AdapterStartFailed("Failed to get adapter stdin".to_string()))?;
123        let stdout = adapter.stdout.take().ok_or_else(|| {
124            Error::AdapterStartFailed("Failed to get adapter stdout".to_string())
125        })?;
126
127        let (event_tx, event_rx) = mpsc::unbounded_channel();
128        let (shutdown_tx, shutdown_rx) = mpsc::channel(1);
129        let pending: PendingResponses = Arc::new(Mutex::new(HashMap::new()));
130
131        // Spawn background reader task
132        let reader_task = Self::spawn_stdio_reader_task(
133            stdout,
134            event_tx.clone(),
135            pending.clone(),
136            shutdown_rx,
137        );
138
139        Ok(Self {
140            adapter,
141            writer: DapWriter::Stdio(BufWriter::new(stdin)),
142            seq: AtomicI64::new(1),
143            capabilities: Capabilities::default(),
144            pending,
145            event_tx,
146            event_rx: Some(event_rx),
147            reader_task: Some(reader_task),
148            shutdown_tx: Some(shutdown_tx),
149        })
150    }
151
152    /// Spawn a new DAP adapter that uses TCP for communication (e.g., Delve)
153    ///
154    /// This spawns the adapter with a --listen flag, waits for it to output
155    /// the port it's listening on, then connects via TCP.
156    pub async fn spawn_tcp(adapter_path: &Path, args: &[String]) -> Result<Self> {
157        use crate::common::parse_listen_address;
158        use tokio::io::{AsyncBufReadExt, BufReader as TokioBufReader};
159
160        // Build command with --listen=127.0.0.1:0 to get a random available port
161        let mut cmd = Command::new(adapter_path);
162        cmd.args(args)
163            .arg("--listen=127.0.0.1:0")
164            .stdin(Stdio::null())
165            .stdout(Stdio::piped())
166            .stderr(Stdio::piped());
167
168        let mut adapter = cmd.spawn().map_err(|e| {
169            Error::AdapterStartFailed(format!(
170                "Failed to start {}: {}",
171                adapter_path.display(),
172                e
173            ))
174        })?;
175
176        // Read stdout to find the listening address
177        // Delve outputs: "DAP server listening at: 127.0.0.1:PORT"
178        let stdout = adapter.stdout.take().ok_or_else(|| {
179            let _ = adapter.start_kill();
180            Error::AdapterStartFailed("Failed to get adapter stdout".to_string())
181        })?;
182
183        let mut stdout_reader = TokioBufReader::new(stdout);
184        let mut line = String::new();
185
186        // Wait for the "listening at" message with timeout
187        let addr_result = tokio::time::timeout(Duration::from_secs(10), async {
188            loop {
189                line.clear();
190                let bytes_read = stdout_reader.read_line(&mut line).await.map_err(|e| {
191                    Error::AdapterStartFailed(format!("Failed to read adapter output: {}", e))
192                })?;
193
194                if bytes_read == 0 {
195                    return Err(Error::AdapterStartFailed(
196                        "Adapter exited before outputting listen address".to_string(),
197                    ));
198                }
199
200                tracing::debug!("Delve output: {}", line.trim());
201
202                // Look for the listening address in the output
203                if let Some(addr) = parse_listen_address(&line) {
204                    return Ok(addr);
205                }
206            }
207        })
208        .await;
209
210        // Handle timeout or error - cleanup adapter before returning
211        let addr = match addr_result {
212            Ok(Ok(addr)) => addr,
213            Ok(Err(e)) => {
214                let _ = adapter.start_kill();
215                return Err(e);
216            }
217            Err(_) => {
218                let _ = adapter.start_kill();
219                return Err(Error::AdapterStartFailed(
220                    "Timeout waiting for Delve to start listening".to_string(),
221                ));
222            }
223        };
224
225        tracing::info!("Connecting to Delve DAP server at {}", addr);
226
227        // Connect to the TCP port - cleanup adapter on failure
228        let stream = match TcpStream::connect(&addr).await {
229            Ok(s) => s,
230            Err(e) => {
231                let _ = adapter.start_kill();
232                return Err(Error::AdapterStartFailed(format!(
233                    "Failed to connect to Delve at {}: {}",
234                    addr, e
235                )));
236            }
237        };
238
239        let (read_half, write_half) = tokio::io::split(stream);
240
241        let (event_tx, event_rx) = mpsc::unbounded_channel();
242        let (shutdown_tx, shutdown_rx) = mpsc::channel(1);
243        let pending: PendingResponses = Arc::new(Mutex::new(HashMap::new()));
244
245        // Spawn background reader task for TCP
246        let reader_task = Self::spawn_tcp_reader_task(
247            read_half,
248            event_tx.clone(),
249            pending.clone(),
250            shutdown_rx,
251        );
252
253        Ok(Self {
254            adapter,
255            writer: DapWriter::Tcp(BufWriter::new(write_half)),
256            seq: AtomicI64::new(1),
257            capabilities: Capabilities::default(),
258            pending,
259            event_tx,
260            event_rx: Some(event_rx),
261            reader_task: Some(reader_task),
262            shutdown_tx: Some(shutdown_tx),
263        })
264    }
265
266    /// Spawn the background reader task for stdio-based adapters
267    fn spawn_stdio_reader_task(
268        stdout: ChildStdout,
269        event_tx: mpsc::UnboundedSender<Event>,
270        pending: PendingResponses,
271        mut shutdown_rx: mpsc::Receiver<()>,
272    ) -> tokio::task::JoinHandle<()> {
273        tokio::spawn(async move {
274            let mut reader = BufReader::new(stdout);
275
276            loop {
277                tokio::select! {
278                    biased;
279
280                    // Check for shutdown signal
281                    _ = shutdown_rx.recv() => {
282                        tracing::debug!("Reader task received shutdown signal");
283                        break;
284                    }
285
286                    // Read next message
287                    result = codec::read_message(&mut reader) => {
288                        match result {
289                            Ok(json) => {
290                                tracing::trace!("DAP <<< {}", json);
291
292                                if let Err(e) = Self::process_message(&json, &event_tx, &pending).await {
293                                    tracing::error!("Error processing DAP message: {}", e);
294                                }
295                            }
296                            Err(e) => {
297                                // Check if this is an expected EOF (adapter exited)
298                                // We check the error message string as a fallback for various error types
299                                let err_str = e.to_string().to_lowercase();
300                                let is_eof = err_str.contains("unexpected eof")
301                                    || err_str.contains("unexpectedeof")
302                                    || err_str.contains("end of file");
303
304                                if is_eof {
305                                    tracing::info!("DAP adapter closed connection");
306                                } else {
307                                    tracing::error!("Error reading from DAP adapter: {}", e);
308                                }
309
310                                // Signal error to any pending requests
311                                let mut pending_guard = pending.lock().await;
312                                for (_, tx) in pending_guard.drain() {
313                                    let _ = tx.send(Err(Error::AdapterCrashed));
314                                }
315
316                                // Send terminated event to notify the session
317                                let _ = event_tx.send(Event::Terminated(None));
318                                break;
319                            }
320                        }
321                    }
322                }
323            }
324
325            tracing::debug!("Reader task exiting");
326        })
327    }
328
329    /// Spawn the background reader task for TCP-based adapters
330    fn spawn_tcp_reader_task(
331        read_half: tokio::io::ReadHalf<TcpStream>,
332        event_tx: mpsc::UnboundedSender<Event>,
333        pending: PendingResponses,
334        mut shutdown_rx: mpsc::Receiver<()>,
335    ) -> tokio::task::JoinHandle<()> {
336        tokio::spawn(async move {
337            let mut reader = BufReader::new(read_half);
338
339            loop {
340                tokio::select! {
341                    biased;
342
343                    // Check for shutdown signal
344                    _ = shutdown_rx.recv() => {
345                        tracing::debug!("TCP reader task received shutdown signal");
346                        break;
347                    }
348
349                    // Read next message
350                    result = codec::read_message(&mut reader) => {
351                        match result {
352                            Ok(json) => {
353                                tracing::trace!("DAP <<< {}", json);
354
355                                if let Err(e) = Self::process_message(&json, &event_tx, &pending).await {
356                                    tracing::error!("Error processing DAP message: {}", e);
357                                }
358                            }
359                            Err(e) => {
360                                let err_str = e.to_string().to_lowercase();
361                                let is_eof = err_str.contains("unexpected eof")
362                                    || err_str.contains("unexpectedeof")
363                                    || err_str.contains("end of file")
364                                    || err_str.contains("connection reset");
365
366                                if is_eof {
367                                    tracing::info!("DAP adapter closed TCP connection");
368                                } else {
369                                    tracing::error!("Error reading from DAP adapter (TCP): {}", e);
370                                }
371
372                                // Signal error to any pending requests
373                                let mut pending_guard = pending.lock().await;
374                                for (_, tx) in pending_guard.drain() {
375                                    let _ = tx.send(Err(Error::AdapterCrashed));
376                                }
377
378                                // Send terminated event to notify the session
379                                let _ = event_tx.send(Event::Terminated(None));
380                                break;
381                            }
382                        }
383                    }
384                }
385            }
386
387            tracing::debug!("TCP reader task exiting");
388        })
389    }
390
391    /// Process a single message from the adapter
392    async fn process_message(
393        json: &str,
394        event_tx: &mpsc::UnboundedSender<Event>,
395        pending: &PendingResponses,
396    ) -> Result<()> {
397        let msg: Value = serde_json::from_str(json)
398            .map_err(|e| Error::DapProtocol(format!("Invalid JSON: {}", e)))?;
399
400        let msg_type = msg
401            .get("type")
402            .and_then(|v| v.as_str())
403            .unwrap_or("unknown");
404
405        match msg_type {
406            "response" => {
407                let response: ResponseMessage = serde_json::from_value(msg)?;
408                let seq = response.request_seq;
409
410                let mut pending_guard = pending.lock().await;
411                if let Some(tx) = pending_guard.remove(&seq) {
412                    let _ = tx.send(Ok(response));
413                } else {
414                    tracing::warn!("Received response for unknown request seq {}", seq);
415                }
416            }
417            "event" => {
418                let event_msg: EventMessage = serde_json::from_value(msg)?;
419                let event = Event::from_message(&event_msg);
420                let _ = event_tx.send(event);
421            }
422            _ => {
423                tracing::warn!("Unknown message type: {}", msg_type);
424            }
425        }
426
427        Ok(())
428    }
429
430    /// Take the event receiver (can only be called once)
431    pub fn take_event_receiver(&mut self) -> Option<mpsc::UnboundedReceiver<Event>> {
432        self.event_rx.take()
433    }
434
435    /// Get the next sequence number
436    fn next_seq(&self) -> i64 {
437        self.seq.fetch_add(1, Ordering::SeqCst)
438    }
439
440    /// Send a request and return its sequence number
441    async fn send_request(&mut self, command: &str, arguments: Option<Value>) -> Result<i64> {
442        let seq = self.next_seq();
443
444        // Build request with or without arguments field
445        let request = if let Some(args) = arguments {
446            serde_json::json!({
447                "seq": seq,
448                "type": "request",
449                "command": command,
450                "arguments": args
451            })
452        } else {
453            serde_json::json!({
454                "seq": seq,
455                "type": "request",
456                "command": command
457            })
458        };
459
460        let json = serde_json::to_string(&request)?;
461        tracing::trace!("DAP >>> {}", json);
462
463        codec::write_message(&mut self.writer, &json).await?;
464
465        Ok(seq)
466    }
467
468    /// Send a request and wait for the response with timeout
469    pub async fn request<T: serde::de::DeserializeOwned>(
470        &mut self,
471        command: &str,
472        arguments: Option<Value>,
473    ) -> Result<T> {
474        self.request_with_timeout(command, arguments, Duration::from_secs(30)).await
475    }
476
477    /// Send a request and wait for the response with configurable timeout
478    ///
479    /// Note: We register the pending response handler BEFORE sending the request
480    /// to avoid a race condition where a fast adapter response arrives before
481    /// we've set up the handler.
482    pub async fn request_with_timeout<T: serde::de::DeserializeOwned>(
483        &mut self,
484        command: &str,
485        arguments: Option<Value>,
486        timeout: Duration,
487    ) -> Result<T> {
488        let seq = self.next_seq();
489
490        // Build request with or without arguments field
491        let request = if let Some(ref args) = arguments {
492            serde_json::json!({
493                "seq": seq,
494                "type": "request",
495                "command": command,
496                "arguments": args
497            })
498        } else {
499            serde_json::json!({
500                "seq": seq,
501                "type": "request",
502                "command": command
503            })
504        };
505
506        // IMPORTANT: Register the pending response handler BEFORE sending the request
507        // to avoid race condition where fast adapter responds before we're ready
508        let (tx, rx) = oneshot::channel();
509        {
510            let mut pending_guard = self.pending.lock().await;
511            pending_guard.insert(seq, tx);
512        }
513
514        // Now send the request
515        let json = serde_json::to_string(&request)?;
516        tracing::trace!("DAP >>> {}", json);
517
518        if let Err(e) = codec::write_message(&mut self.writer, &json).await {
519            // Remove the pending handler if send failed
520            let mut pending_guard = self.pending.lock().await;
521            pending_guard.remove(&seq);
522            return Err(e);
523        }
524
525        // Wait for response with timeout
526        let response = tokio::time::timeout(timeout, rx)
527            .await
528            .map_err(|_| {
529                // Clean up pending handler on timeout
530                let pending = self.pending.clone();
531                tokio::spawn(async move {
532                    let mut pending_guard = pending.lock().await;
533                    pending_guard.remove(&seq);
534                });
535                Error::Timeout(timeout.as_secs())
536            })?
537            .map_err(|_| Error::AdapterCrashed)??;
538
539        if response.success {
540            let body = response.body.unwrap_or(Value::Null);
541            serde_json::from_value(body).map_err(|e| {
542                Error::DapProtocol(format!(
543                    "Failed to parse {} response: {}",
544                    command, e
545                ))
546            })
547        } else {
548            Err(Error::dap_request_failed(
549                command,
550                &response.message.unwrap_or_else(|| "Unknown error".to_string()),
551            ))
552        }
553    }
554
555    /// Poll for events - this is now non-blocking since events are already in the channel
556    /// Note: This method is kept for API compatibility but is no longer necessary
557    /// since the background reader task handles all event ingestion
558    pub async fn poll_event(&mut self) -> Result<Option<Event>> {
559        // Events are now handled by the background reader task
560        // and delivered through the event channel.
561        // This method is kept for backward compatibility.
562        Ok(None)
563    }
564
565    /// Initialize the debug adapter
566    pub async fn initialize(&mut self, adapter_id: &str) -> Result<Capabilities> {
567        self.initialize_with_timeout(adapter_id, Duration::from_secs(10)).await
568    }
569
570    /// Initialize the debug adapter with configurable timeout
571    pub async fn initialize_with_timeout(
572        &mut self,
573        adapter_id: &str,
574        timeout: Duration,
575    ) -> Result<Capabilities> {
576        let args = InitializeArguments {
577            adapter_id: adapter_id.to_string(),
578            ..Default::default()
579        };
580
581        let caps: Capabilities = self
582            .request_with_timeout("initialize", Some(serde_json::to_value(&args)?), timeout)
583            .await?;
584
585        self.capabilities = caps.clone();
586        Ok(caps)
587    }
588
589    /// Wait for the initialized event with timeout
590    ///
591    /// This method waits for the initialized event which comes through the event channel.
592    /// It's called before the session takes the event receiver.
593    pub async fn wait_initialized(&mut self) -> Result<()> {
594        self.wait_initialized_with_timeout(Duration::from_secs(30)).await
595    }
596
597    /// Wait for the initialized event with configurable timeout
598    ///
599    /// ## Event Ordering Note
600    ///
601    /// This method consumes events from the channel until it sees `Initialized`.
602    /// Non-Initialized events are re-sent to the channel so they won't be lost.
603    /// This is safe because:
604    /// 1. The session hasn't taken the receiver yet (wait_initialized is called during setup)
605    /// 2. The re-sent events go back to the same unbounded channel
606    /// 3. The background reader task continues adding new events after our re-sent ones
607    ///
608    /// Events will be received in order: [re-sent events] + [new events from reader]
609    pub async fn wait_initialized_with_timeout(&mut self, timeout: Duration) -> Result<()> {
610        // The event receiver is typically taken by the session after initialization,
611        // but wait_initialized is called before that, so we should still have it
612        if let Some(ref mut rx) = self.event_rx {
613            let deadline = tokio::time::Instant::now() + timeout;
614
615            loop {
616                let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
617                if remaining.is_zero() {
618                    return Err(Error::Timeout(timeout.as_secs()));
619                }
620
621                match tokio::time::timeout(remaining, rx.recv()).await {
622                    Ok(Some(event)) => {
623                        if matches!(event, Event::Initialized) {
624                            return Ok(());
625                        }
626                        // Re-send other events so they're not lost when session takes the receiver.
627                        // This maintains event ordering: these events arrived before Initialized,
628                        // so they'll be received first when the session starts processing.
629                        let _ = self.event_tx.send(event);
630                    }
631                    Ok(None) => {
632                        return Err(Error::AdapterCrashed);
633                    }
634                    Err(_) => {
635                        return Err(Error::Timeout(timeout.as_secs()));
636                    }
637                }
638            }
639        } else {
640            // Event receiver already taken - this shouldn't happen in normal flow
641            Err(Error::Internal("Event receiver already taken before wait_initialized".to_string()))
642        }
643    }
644
645    /// Launch a program for debugging
646    pub async fn launch(&mut self, args: LaunchArguments) -> Result<()> {
647        self.request::<Value>("launch", Some(serde_json::to_value(&args)?))
648            .await?;
649        Ok(())
650    }
651
652    /// Launch a program for debugging without waiting for response
653    /// 
654    /// Some debuggers (like debugpy) don't respond to launch until after
655    /// configurationDone is sent. This sends the launch request but doesn't
656    /// wait for the response.
657    pub async fn launch_no_wait(&mut self, args: LaunchArguments) -> Result<i64> {
658        self.send_request("launch", Some(serde_json::to_value(&args)?)).await
659    }
660
661    /// Attach to a running process
662    pub async fn attach(&mut self, args: AttachArguments) -> Result<()> {
663        self.request::<Value>("attach", Some(serde_json::to_value(&args)?))
664            .await?;
665        Ok(())
666    }
667
668    /// Signal that configuration is done
669    pub async fn configuration_done(&mut self) -> Result<()> {
670        self.request::<Value>("configurationDone", None).await?;
671        Ok(())
672    }
673
674    /// Set breakpoints for a source file
675    pub async fn set_breakpoints(
676        &mut self,
677        source_path: &Path,
678        breakpoints: Vec<SourceBreakpoint>,
679    ) -> Result<Vec<Breakpoint>> {
680        let args = SetBreakpointsArguments {
681            source: Source {
682                path: Some(source_path.to_string_lossy().into_owned()),
683                ..Default::default()
684            },
685            breakpoints,
686        };
687
688        let response: SetBreakpointsResponseBody = self
689            .request("setBreakpoints", Some(serde_json::to_value(&args)?))
690            .await?;
691
692        Ok(response.breakpoints)
693    }
694
695    /// Set function breakpoints
696    pub async fn set_function_breakpoints(
697        &mut self,
698        breakpoints: Vec<FunctionBreakpoint>,
699    ) -> Result<Vec<Breakpoint>> {
700        let args = SetFunctionBreakpointsArguments { breakpoints };
701
702        let response: SetBreakpointsResponseBody = self
703            .request(
704                "setFunctionBreakpoints",
705                Some(serde_json::to_value(&args)?),
706            )
707            .await?;
708
709        Ok(response.breakpoints)
710    }
711
712    /// Continue execution
713    pub async fn continue_execution(&mut self, thread_id: i64) -> Result<bool> {
714        let args = ContinueArguments {
715            thread_id,
716            single_thread: false,
717        };
718
719        let response: ContinueResponseBody = self
720            .request("continue", Some(serde_json::to_value(&args)?))
721            .await?;
722
723        Ok(response.all_threads_continued)
724    }
725
726    /// Step over (next)
727    pub async fn next(&mut self, thread_id: i64) -> Result<()> {
728        let args = StepArguments {
729            thread_id,
730            granularity: Some("statement".to_string()),
731        };
732
733        self.request::<Value>("next", Some(serde_json::to_value(&args)?))
734            .await?;
735        Ok(())
736    }
737
738    /// Step into
739    pub async fn step_in(&mut self, thread_id: i64) -> Result<()> {
740        let args = StepArguments {
741            thread_id,
742            granularity: Some("statement".to_string()),
743        };
744
745        self.request::<Value>("stepIn", Some(serde_json::to_value(&args)?))
746            .await?;
747        Ok(())
748    }
749
750    /// Step out
751    pub async fn step_out(&mut self, thread_id: i64) -> Result<()> {
752        let args = StepArguments {
753            thread_id,
754            granularity: Some("statement".to_string()),
755        };
756
757        self.request::<Value>("stepOut", Some(serde_json::to_value(&args)?))
758            .await?;
759        Ok(())
760    }
761
762    /// Pause execution
763    pub async fn pause(&mut self, thread_id: i64) -> Result<()> {
764        let args = PauseArguments { thread_id };
765
766        self.request::<Value>("pause", Some(serde_json::to_value(&args)?))
767            .await?;
768        Ok(())
769    }
770
771    /// Get stack trace
772    pub async fn stack_trace(&mut self, thread_id: i64, levels: i64) -> Result<Vec<StackFrame>> {
773        let args = StackTraceArguments {
774            thread_id,
775            start_frame: Some(0),
776            levels: Some(levels),
777        };
778
779        let response: StackTraceResponseBody = self
780            .request("stackTrace", Some(serde_json::to_value(&args)?))
781            .await?;
782
783        Ok(response.stack_frames)
784    }
785
786    /// Get threads
787    pub async fn threads(&mut self) -> Result<Vec<Thread>> {
788        let response: ThreadsResponseBody = self.request("threads", None).await?;
789        Ok(response.threads)
790    }
791
792    /// Get scopes for a frame
793    pub async fn scopes(&mut self, frame_id: i64) -> Result<Vec<Scope>> {
794        let args = ScopesArguments { frame_id };
795
796        let response: ScopesResponseBody = self
797            .request("scopes", Some(serde_json::to_value(&args)?))
798            .await?;
799
800        Ok(response.scopes)
801    }
802
803    /// Get variables
804    pub async fn variables(&mut self, variables_reference: i64) -> Result<Vec<Variable>> {
805        let args = VariablesArguments {
806            variables_reference,
807            start: None,
808            count: None,
809        };
810
811        let response: VariablesResponseBody = self
812            .request("variables", Some(serde_json::to_value(&args)?))
813            .await?;
814
815        Ok(response.variables)
816    }
817
818    /// Evaluate an expression
819    pub async fn evaluate(
820        &mut self,
821        expression: &str,
822        frame_id: Option<i64>,
823        context: &str,
824    ) -> Result<EvaluateResponseBody> {
825        let args = EvaluateArguments {
826            expression: expression.to_string(),
827            frame_id,
828            context: Some(context.to_string()),
829        };
830
831        self.request("evaluate", Some(serde_json::to_value(&args)?))
832            .await
833    }
834
835    /// Disconnect from the debug adapter
836    pub async fn disconnect(&mut self, terminate_debuggee: bool) -> Result<()> {
837        let args = DisconnectArguments {
838            restart: false,
839            terminate_debuggee: Some(terminate_debuggee),
840        };
841
842        // Don't wait for response - adapter might exit immediately
843        let _ = self
844            .send_request("disconnect", Some(serde_json::to_value(&args)?))
845            .await;
846
847        Ok(())
848    }
849
850    /// Terminate the adapter process and clean up resources
851    pub async fn terminate(&mut self) -> Result<()> {
852        // Try graceful disconnect first
853        let _ = self.disconnect(true).await;
854
855        // Signal the reader task to stop
856        if let Some(tx) = self.shutdown_tx.take() {
857            let _ = tx.send(()).await;
858        }
859
860        // Wait a bit for clean shutdown
861        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
862
863        // Wait for reader task to finish
864        if let Some(task) = self.reader_task.take() {
865            // Give it a short timeout
866            let _ = tokio::time::timeout(
867                Duration::from_millis(500),
868                task,
869            ).await;
870        }
871
872        // Force kill if still running
873        let _ = self.adapter.kill().await;
874
875        Ok(())
876    }
877
878    /// Check if the adapter is still running
879    pub fn is_running(&mut self) -> bool {
880        self.adapter.try_wait().ok().flatten().is_none()
881    }
882
883    /// Restart the debug session (for adapters that support it)
884    pub async fn restart(&mut self, no_debug: bool) -> Result<()> {
885        if !self.capabilities.supports_restart_request {
886            return Err(Error::Internal(
887                "Debug adapter does not support restart".to_string(),
888            ));
889        }
890
891        let args = serde_json::json!({
892            "noDebug": no_debug
893        });
894
895        self.request::<Value>("restart", Some(args)).await?;
896        Ok(())
897    }
898}
899
900impl Drop for DapClient {
901    /// Best-effort cleanup on drop.
902    ///
903    /// ## Limitations
904    ///
905    /// Since we can't await in `drop()`, this is necessarily imperfect:
906    /// - `try_send` may fail if the shutdown channel is full (unlikely with capacity 1)
907    /// - `task.abort()` is immediate; the reader may be mid-operation
908    /// - `start_kill()` is non-blocking; the adapter may not exit immediately
909    ///
910    /// For graceful cleanup, prefer calling `terminate()` before dropping.
911    /// This Drop impl exists as a safety net to avoid leaking resources if
912    /// `terminate()` wasn't called.
913    fn drop(&mut self) {
914        // Signal shutdown to reader task (best-effort, can't await)
915        if let Some(tx) = self.shutdown_tx.take() {
916            // Use try_send since we can't await in drop
917            let _ = tx.try_send(());
918        }
919
920        // Abort the reader task if it's still running
921        // Note: This is abrupt but necessary since we can't await graceful shutdown
922        if let Some(task) = self.reader_task.take() {
923            task.abort();
924        }
925
926        // Try to kill the adapter on drop
927        // This is best-effort since we can't await in drop
928        let _ = self.adapter.start_kill();
929    }
930}