debugger/dap/
client.rs

1//! DAP client for communicating with debug adapters
2//!
3//! This module handles the communication with DAP adapters like lldb-dap,
4//! including the initialization sequence and request/response handling.
5//!
6//! ## Architecture
7//!
8//! The DapClient uses a background reader task to continuously read from the
9//! adapter's stdout. This ensures that events (stopped, output, etc.) are
10//! captured immediately rather than only during request/response cycles.
11//!
12//! ```text
13//! [DAP Adapter] --stdout--> [Reader Task] --events--> [event_tx channel]
14//!                                        --responses-> [response channels]
15//! [DapClient]   --stdin-->  [DAP Adapter]
16//! ```
17
18use std::collections::HashMap;
19use std::path::Path;
20use std::process::Stdio;
21use std::sync::atomic::{AtomicI64, Ordering};
22use std::sync::Arc;
23use std::time::Duration;
24
25use serde_json::Value;
26use tokio::io::{BufReader, BufWriter};
27use tokio::process::{Child, ChildStdin, ChildStdout, Command};
28use tokio::sync::{mpsc, oneshot, Mutex};
29
30use crate::common::{Error, Result};
31
32use super::codec;
33use super::types::*;
34
35/// Pending response waiters, keyed by request sequence number
36type PendingResponses = Arc<Mutex<HashMap<i64, oneshot::Sender<std::result::Result<ResponseMessage, Error>>>>>;
37
38/// DAP client for communicating with a debug adapter
39pub struct DapClient {
40    /// Adapter subprocess
41    adapter: Child,
42    /// Buffered writer for adapter stdin
43    writer: BufWriter<ChildStdin>,
44    /// Sequence number for requests
45    seq: AtomicI64,
46    /// Adapter capabilities (populated after initialize)
47    pub capabilities: Capabilities,
48    /// Pending response waiters
49    pending: PendingResponses,
50    /// Channel for events (to session)
51    event_tx: mpsc::UnboundedSender<Event>,
52    /// Receiver for events (given to session)
53    event_rx: Option<mpsc::UnboundedReceiver<Event>>,
54    /// Handle to the background reader task
55    reader_task: Option<tokio::task::JoinHandle<()>>,
56    /// Channel to signal reader task to stop
57    shutdown_tx: Option<mpsc::Sender<()>>,
58}
59
60impl DapClient {
61    /// Spawn a new DAP adapter and create a client
62    pub async fn spawn(adapter_path: &Path, args: &[String]) -> Result<Self> {
63        let mut cmd = Command::new(adapter_path);
64        cmd.args(args)
65            .stdin(Stdio::piped())
66            .stdout(Stdio::piped())
67            .stderr(Stdio::inherit()); // Let adapter errors go to stderr
68
69        let mut adapter = cmd.spawn().map_err(|e| {
70            Error::AdapterStartFailed(format!(
71                "Failed to start {}: {}",
72                adapter_path.display(),
73                e
74            ))
75        })?;
76
77        let stdin = adapter
78            .stdin
79            .take()
80            .ok_or_else(|| Error::AdapterStartFailed("Failed to get adapter stdin".to_string()))?;
81        let stdout = adapter.stdout.take().ok_or_else(|| {
82            Error::AdapterStartFailed("Failed to get adapter stdout".to_string())
83        })?;
84
85        let (event_tx, event_rx) = mpsc::unbounded_channel();
86        let (shutdown_tx, shutdown_rx) = mpsc::channel(1);
87        let pending: PendingResponses = Arc::new(Mutex::new(HashMap::new()));
88
89        // Spawn background reader task
90        let reader_task = Self::spawn_reader_task(
91            stdout,
92            event_tx.clone(),
93            pending.clone(),
94            shutdown_rx,
95        );
96
97        Ok(Self {
98            adapter,
99            writer: BufWriter::new(stdin),
100            seq: AtomicI64::new(1),
101            capabilities: Capabilities::default(),
102            pending,
103            event_tx,
104            event_rx: Some(event_rx),
105            reader_task: Some(reader_task),
106            shutdown_tx: Some(shutdown_tx),
107        })
108    }
109
110    /// Spawn the background reader task that continuously reads from adapter stdout
111    fn spawn_reader_task(
112        stdout: ChildStdout,
113        event_tx: mpsc::UnboundedSender<Event>,
114        pending: PendingResponses,
115        mut shutdown_rx: mpsc::Receiver<()>,
116    ) -> tokio::task::JoinHandle<()> {
117        tokio::spawn(async move {
118            let mut reader = BufReader::new(stdout);
119
120            loop {
121                tokio::select! {
122                    biased;
123
124                    // Check for shutdown signal
125                    _ = shutdown_rx.recv() => {
126                        tracing::debug!("Reader task received shutdown signal");
127                        break;
128                    }
129
130                    // Read next message
131                    result = codec::read_message(&mut reader) => {
132                        match result {
133                            Ok(json) => {
134                                tracing::trace!("DAP <<< {}", json);
135
136                                if let Err(e) = Self::process_message(&json, &event_tx, &pending).await {
137                                    tracing::error!("Error processing DAP message: {}", e);
138                                }
139                            }
140                            Err(e) => {
141                                // Check if this is an expected EOF (adapter exited)
142                                // We check the error message string as a fallback for various error types
143                                let err_str = e.to_string().to_lowercase();
144                                let is_eof = err_str.contains("unexpected eof")
145                                    || err_str.contains("unexpectedeof")
146                                    || err_str.contains("end of file");
147
148                                if is_eof {
149                                    tracing::info!("DAP adapter closed connection");
150                                } else {
151                                    tracing::error!("Error reading from DAP adapter: {}", e);
152                                }
153
154                                // Signal error to any pending requests
155                                let mut pending_guard = pending.lock().await;
156                                for (_, tx) in pending_guard.drain() {
157                                    let _ = tx.send(Err(Error::AdapterCrashed));
158                                }
159
160                                // Send terminated event to notify the session
161                                let _ = event_tx.send(Event::Terminated(None));
162                                break;
163                            }
164                        }
165                    }
166                }
167            }
168
169            tracing::debug!("Reader task exiting");
170        })
171    }
172
173    /// Process a single message from the adapter
174    async fn process_message(
175        json: &str,
176        event_tx: &mpsc::UnboundedSender<Event>,
177        pending: &PendingResponses,
178    ) -> Result<()> {
179        let msg: Value = serde_json::from_str(json)
180            .map_err(|e| Error::DapProtocol(format!("Invalid JSON: {}", e)))?;
181
182        let msg_type = msg
183            .get("type")
184            .and_then(|v| v.as_str())
185            .unwrap_or("unknown");
186
187        match msg_type {
188            "response" => {
189                let response: ResponseMessage = serde_json::from_value(msg)?;
190                let seq = response.request_seq;
191
192                let mut pending_guard = pending.lock().await;
193                if let Some(tx) = pending_guard.remove(&seq) {
194                    let _ = tx.send(Ok(response));
195                } else {
196                    tracing::warn!("Received response for unknown request seq {}", seq);
197                }
198            }
199            "event" => {
200                let event_msg: EventMessage = serde_json::from_value(msg)?;
201                let event = Event::from_message(&event_msg);
202                let _ = event_tx.send(event);
203            }
204            _ => {
205                tracing::warn!("Unknown message type: {}", msg_type);
206            }
207        }
208
209        Ok(())
210    }
211
212    /// Take the event receiver (can only be called once)
213    pub fn take_event_receiver(&mut self) -> Option<mpsc::UnboundedReceiver<Event>> {
214        self.event_rx.take()
215    }
216
217    /// Get the next sequence number
218    fn next_seq(&self) -> i64 {
219        self.seq.fetch_add(1, Ordering::SeqCst)
220    }
221
222    /// Send a request and return its sequence number
223    async fn send_request(&mut self, command: &str, arguments: Option<Value>) -> Result<i64> {
224        let seq = self.next_seq();
225
226        // Build request with or without arguments field
227        let request = if let Some(args) = arguments {
228            serde_json::json!({
229                "seq": seq,
230                "type": "request",
231                "command": command,
232                "arguments": args
233            })
234        } else {
235            serde_json::json!({
236                "seq": seq,
237                "type": "request",
238                "command": command
239            })
240        };
241
242        let json = serde_json::to_string(&request)?;
243        tracing::trace!("DAP >>> {}", json);
244
245        codec::write_message(&mut self.writer, &json).await?;
246
247        Ok(seq)
248    }
249
250    /// Send a request and wait for the response with timeout
251    pub async fn request<T: serde::de::DeserializeOwned>(
252        &mut self,
253        command: &str,
254        arguments: Option<Value>,
255    ) -> Result<T> {
256        self.request_with_timeout(command, arguments, Duration::from_secs(30)).await
257    }
258
259    /// Send a request and wait for the response with configurable timeout
260    ///
261    /// Note: We register the pending response handler BEFORE sending the request
262    /// to avoid a race condition where a fast adapter response arrives before
263    /// we've set up the handler.
264    pub async fn request_with_timeout<T: serde::de::DeserializeOwned>(
265        &mut self,
266        command: &str,
267        arguments: Option<Value>,
268        timeout: Duration,
269    ) -> Result<T> {
270        let seq = self.next_seq();
271
272        // Build request with or without arguments field
273        let request = if let Some(ref args) = arguments {
274            serde_json::json!({
275                "seq": seq,
276                "type": "request",
277                "command": command,
278                "arguments": args
279            })
280        } else {
281            serde_json::json!({
282                "seq": seq,
283                "type": "request",
284                "command": command
285            })
286        };
287
288        // IMPORTANT: Register the pending response handler BEFORE sending the request
289        // to avoid race condition where fast adapter responds before we're ready
290        let (tx, rx) = oneshot::channel();
291        {
292            let mut pending_guard = self.pending.lock().await;
293            pending_guard.insert(seq, tx);
294        }
295
296        // Now send the request
297        let json = serde_json::to_string(&request)?;
298        tracing::trace!("DAP >>> {}", json);
299
300        if let Err(e) = codec::write_message(&mut self.writer, &json).await {
301            // Remove the pending handler if send failed
302            let mut pending_guard = self.pending.lock().await;
303            pending_guard.remove(&seq);
304            return Err(e);
305        }
306
307        // Wait for response with timeout
308        let response = tokio::time::timeout(timeout, rx)
309            .await
310            .map_err(|_| {
311                // Clean up pending handler on timeout
312                let pending = self.pending.clone();
313                tokio::spawn(async move {
314                    let mut pending_guard = pending.lock().await;
315                    pending_guard.remove(&seq);
316                });
317                Error::Timeout(timeout.as_secs())
318            })?
319            .map_err(|_| Error::AdapterCrashed)??;
320
321        if response.success {
322            let body = response.body.unwrap_or(Value::Null);
323            serde_json::from_value(body).map_err(|e| {
324                Error::DapProtocol(format!(
325                    "Failed to parse {} response: {}",
326                    command, e
327                ))
328            })
329        } else {
330            Err(Error::dap_request_failed(
331                command,
332                &response.message.unwrap_or_else(|| "Unknown error".to_string()),
333            ))
334        }
335    }
336
337    /// Poll for events - this is now non-blocking since events are already in the channel
338    /// Note: This method is kept for API compatibility but is no longer necessary
339    /// since the background reader task handles all event ingestion
340    pub async fn poll_event(&mut self) -> Result<Option<Event>> {
341        // Events are now handled by the background reader task
342        // and delivered through the event channel.
343        // This method is kept for backward compatibility.
344        Ok(None)
345    }
346
347    /// Initialize the debug adapter
348    pub async fn initialize(&mut self, adapter_id: &str) -> Result<Capabilities> {
349        self.initialize_with_timeout(adapter_id, Duration::from_secs(10)).await
350    }
351
352    /// Initialize the debug adapter with configurable timeout
353    pub async fn initialize_with_timeout(
354        &mut self,
355        adapter_id: &str,
356        timeout: Duration,
357    ) -> Result<Capabilities> {
358        let args = InitializeArguments {
359            adapter_id: adapter_id.to_string(),
360            ..Default::default()
361        };
362
363        let caps: Capabilities = self
364            .request_with_timeout("initialize", Some(serde_json::to_value(&args)?), timeout)
365            .await?;
366
367        self.capabilities = caps.clone();
368        Ok(caps)
369    }
370
371    /// Wait for the initialized event with timeout
372    ///
373    /// This method waits for the initialized event which comes through the event channel.
374    /// It's called before the session takes the event receiver.
375    pub async fn wait_initialized(&mut self) -> Result<()> {
376        self.wait_initialized_with_timeout(Duration::from_secs(30)).await
377    }
378
379    /// Wait for the initialized event with configurable timeout
380    ///
381    /// ## Event Ordering Note
382    ///
383    /// This method consumes events from the channel until it sees `Initialized`.
384    /// Non-Initialized events are re-sent to the channel so they won't be lost.
385    /// This is safe because:
386    /// 1. The session hasn't taken the receiver yet (wait_initialized is called during setup)
387    /// 2. The re-sent events go back to the same unbounded channel
388    /// 3. The background reader task continues adding new events after our re-sent ones
389    ///
390    /// Events will be received in order: [re-sent events] + [new events from reader]
391    pub async fn wait_initialized_with_timeout(&mut self, timeout: Duration) -> Result<()> {
392        // The event receiver is typically taken by the session after initialization,
393        // but wait_initialized is called before that, so we should still have it
394        if let Some(ref mut rx) = self.event_rx {
395            let deadline = tokio::time::Instant::now() + timeout;
396
397            loop {
398                let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
399                if remaining.is_zero() {
400                    return Err(Error::Timeout(timeout.as_secs()));
401                }
402
403                match tokio::time::timeout(remaining, rx.recv()).await {
404                    Ok(Some(event)) => {
405                        if matches!(event, Event::Initialized) {
406                            return Ok(());
407                        }
408                        // Re-send other events so they're not lost when session takes the receiver.
409                        // This maintains event ordering: these events arrived before Initialized,
410                        // so they'll be received first when the session starts processing.
411                        let _ = self.event_tx.send(event);
412                    }
413                    Ok(None) => {
414                        return Err(Error::AdapterCrashed);
415                    }
416                    Err(_) => {
417                        return Err(Error::Timeout(timeout.as_secs()));
418                    }
419                }
420            }
421        } else {
422            // Event receiver already taken - this shouldn't happen in normal flow
423            Err(Error::Internal("Event receiver already taken before wait_initialized".to_string()))
424        }
425    }
426
427    /// Launch a program for debugging
428    pub async fn launch(&mut self, args: LaunchArguments) -> Result<()> {
429        self.request::<Value>("launch", Some(serde_json::to_value(&args)?))
430            .await?;
431        Ok(())
432    }
433
434    /// Launch a program for debugging without waiting for response
435    /// 
436    /// Some debuggers (like debugpy) don't respond to launch until after
437    /// configurationDone is sent. This sends the launch request but doesn't
438    /// wait for the response.
439    pub async fn launch_no_wait(&mut self, args: LaunchArguments) -> Result<i64> {
440        self.send_request("launch", Some(serde_json::to_value(&args)?)).await
441    }
442
443    /// Attach to a running process
444    pub async fn attach(&mut self, args: AttachArguments) -> Result<()> {
445        self.request::<Value>("attach", Some(serde_json::to_value(&args)?))
446            .await?;
447        Ok(())
448    }
449
450    /// Signal that configuration is done
451    pub async fn configuration_done(&mut self) -> Result<()> {
452        self.request::<Value>("configurationDone", None).await?;
453        Ok(())
454    }
455
456    /// Set breakpoints for a source file
457    pub async fn set_breakpoints(
458        &mut self,
459        source_path: &Path,
460        breakpoints: Vec<SourceBreakpoint>,
461    ) -> Result<Vec<Breakpoint>> {
462        let args = SetBreakpointsArguments {
463            source: Source {
464                path: Some(source_path.to_string_lossy().into_owned()),
465                ..Default::default()
466            },
467            breakpoints,
468        };
469
470        let response: SetBreakpointsResponseBody = self
471            .request("setBreakpoints", Some(serde_json::to_value(&args)?))
472            .await?;
473
474        Ok(response.breakpoints)
475    }
476
477    /// Set function breakpoints
478    pub async fn set_function_breakpoints(
479        &mut self,
480        breakpoints: Vec<FunctionBreakpoint>,
481    ) -> Result<Vec<Breakpoint>> {
482        let args = SetFunctionBreakpointsArguments { breakpoints };
483
484        let response: SetBreakpointsResponseBody = self
485            .request(
486                "setFunctionBreakpoints",
487                Some(serde_json::to_value(&args)?),
488            )
489            .await?;
490
491        Ok(response.breakpoints)
492    }
493
494    /// Continue execution
495    pub async fn continue_execution(&mut self, thread_id: i64) -> Result<bool> {
496        let args = ContinueArguments {
497            thread_id,
498            single_thread: false,
499        };
500
501        let response: ContinueResponseBody = self
502            .request("continue", Some(serde_json::to_value(&args)?))
503            .await?;
504
505        Ok(response.all_threads_continued)
506    }
507
508    /// Step over (next)
509    pub async fn next(&mut self, thread_id: i64) -> Result<()> {
510        let args = StepArguments {
511            thread_id,
512            granularity: Some("statement".to_string()),
513        };
514
515        self.request::<Value>("next", Some(serde_json::to_value(&args)?))
516            .await?;
517        Ok(())
518    }
519
520    /// Step into
521    pub async fn step_in(&mut self, thread_id: i64) -> Result<()> {
522        let args = StepArguments {
523            thread_id,
524            granularity: Some("statement".to_string()),
525        };
526
527        self.request::<Value>("stepIn", Some(serde_json::to_value(&args)?))
528            .await?;
529        Ok(())
530    }
531
532    /// Step out
533    pub async fn step_out(&mut self, thread_id: i64) -> Result<()> {
534        let args = StepArguments {
535            thread_id,
536            granularity: Some("statement".to_string()),
537        };
538
539        self.request::<Value>("stepOut", Some(serde_json::to_value(&args)?))
540            .await?;
541        Ok(())
542    }
543
544    /// Pause execution
545    pub async fn pause(&mut self, thread_id: i64) -> Result<()> {
546        let args = PauseArguments { thread_id };
547
548        self.request::<Value>("pause", Some(serde_json::to_value(&args)?))
549            .await?;
550        Ok(())
551    }
552
553    /// Get stack trace
554    pub async fn stack_trace(&mut self, thread_id: i64, levels: i64) -> Result<Vec<StackFrame>> {
555        let args = StackTraceArguments {
556            thread_id,
557            start_frame: Some(0),
558            levels: Some(levels),
559        };
560
561        let response: StackTraceResponseBody = self
562            .request("stackTrace", Some(serde_json::to_value(&args)?))
563            .await?;
564
565        Ok(response.stack_frames)
566    }
567
568    /// Get threads
569    pub async fn threads(&mut self) -> Result<Vec<Thread>> {
570        let response: ThreadsResponseBody = self.request("threads", None).await?;
571        Ok(response.threads)
572    }
573
574    /// Get scopes for a frame
575    pub async fn scopes(&mut self, frame_id: i64) -> Result<Vec<Scope>> {
576        let args = ScopesArguments { frame_id };
577
578        let response: ScopesResponseBody = self
579            .request("scopes", Some(serde_json::to_value(&args)?))
580            .await?;
581
582        Ok(response.scopes)
583    }
584
585    /// Get variables
586    pub async fn variables(&mut self, variables_reference: i64) -> Result<Vec<Variable>> {
587        let args = VariablesArguments {
588            variables_reference,
589            start: None,
590            count: None,
591        };
592
593        let response: VariablesResponseBody = self
594            .request("variables", Some(serde_json::to_value(&args)?))
595            .await?;
596
597        Ok(response.variables)
598    }
599
600    /// Evaluate an expression
601    pub async fn evaluate(
602        &mut self,
603        expression: &str,
604        frame_id: Option<i64>,
605        context: &str,
606    ) -> Result<EvaluateResponseBody> {
607        let args = EvaluateArguments {
608            expression: expression.to_string(),
609            frame_id,
610            context: Some(context.to_string()),
611        };
612
613        self.request("evaluate", Some(serde_json::to_value(&args)?))
614            .await
615    }
616
617    /// Disconnect from the debug adapter
618    pub async fn disconnect(&mut self, terminate_debuggee: bool) -> Result<()> {
619        let args = DisconnectArguments {
620            restart: false,
621            terminate_debuggee: Some(terminate_debuggee),
622        };
623
624        // Don't wait for response - adapter might exit immediately
625        let _ = self
626            .send_request("disconnect", Some(serde_json::to_value(&args)?))
627            .await;
628
629        Ok(())
630    }
631
632    /// Terminate the adapter process and clean up resources
633    pub async fn terminate(&mut self) -> Result<()> {
634        // Try graceful disconnect first
635        let _ = self.disconnect(true).await;
636
637        // Signal the reader task to stop
638        if let Some(tx) = self.shutdown_tx.take() {
639            let _ = tx.send(()).await;
640        }
641
642        // Wait a bit for clean shutdown
643        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
644
645        // Wait for reader task to finish
646        if let Some(task) = self.reader_task.take() {
647            // Give it a short timeout
648            let _ = tokio::time::timeout(
649                Duration::from_millis(500),
650                task,
651            ).await;
652        }
653
654        // Force kill if still running
655        let _ = self.adapter.kill().await;
656
657        Ok(())
658    }
659
660    /// Check if the adapter is still running
661    pub fn is_running(&mut self) -> bool {
662        self.adapter.try_wait().ok().flatten().is_none()
663    }
664
665    /// Restart the debug session (for adapters that support it)
666    pub async fn restart(&mut self, no_debug: bool) -> Result<()> {
667        if !self.capabilities.supports_restart_request {
668            return Err(Error::Internal(
669                "Debug adapter does not support restart".to_string(),
670            ));
671        }
672
673        let args = serde_json::json!({
674            "noDebug": no_debug
675        });
676
677        self.request::<Value>("restart", Some(args)).await?;
678        Ok(())
679    }
680}
681
682impl Drop for DapClient {
683    /// Best-effort cleanup on drop.
684    ///
685    /// ## Limitations
686    ///
687    /// Since we can't await in `drop()`, this is necessarily imperfect:
688    /// - `try_send` may fail if the shutdown channel is full (unlikely with capacity 1)
689    /// - `task.abort()` is immediate; the reader may be mid-operation
690    /// - `start_kill()` is non-blocking; the adapter may not exit immediately
691    ///
692    /// For graceful cleanup, prefer calling `terminate()` before dropping.
693    /// This Drop impl exists as a safety net to avoid leaking resources if
694    /// `terminate()` wasn't called.
695    fn drop(&mut self) {
696        // Signal shutdown to reader task (best-effort, can't await)
697        if let Some(tx) = self.shutdown_tx.take() {
698            // Use try_send since we can't await in drop
699            let _ = tx.try_send(());
700        }
701
702        // Abort the reader task if it's still running
703        // Note: This is abrupt but necessary since we can't await graceful shutdown
704        if let Some(task) = self.reader_task.take() {
705            task.abort();
706        }
707
708        // Try to kill the adapter on drop
709        // This is best-effort since we can't await in drop
710        let _ = self.adapter.start_kill();
711    }
712}