1use std::collections::HashMap;
24use std::path::Path;
25use std::process::Stdio;
26use std::sync::atomic::{AtomicI64, Ordering};
27use std::sync::Arc;
28use std::time::Duration;
29
30use std::pin::Pin;
31use std::task::{Context, Poll};
32
33use serde_json::Value;
34use tokio::io::{AsyncWrite, BufReader, BufWriter};
35use tokio::net::TcpStream;
36use tokio::process::{Child, ChildStdin, ChildStdout, Command};
37use tokio::sync::{mpsc, oneshot, Mutex};
38
39use crate::common::{Error, Result};
40
41use super::codec;
42use super::types::*;
43
44type PendingResponses = Arc<Mutex<HashMap<i64, oneshot::Sender<std::result::Result<ResponseMessage, Error>>>>>;
46
47enum DapWriter {
49 Stdio(BufWriter<ChildStdin>),
50 Tcp(BufWriter<tokio::io::WriteHalf<TcpStream>>),
51}
52
53impl AsyncWrite for DapWriter {
54 fn poll_write(
55 self: Pin<&mut Self>,
56 cx: &mut Context<'_>,
57 buf: &[u8],
58 ) -> Poll<std::io::Result<usize>> {
59 match self.get_mut() {
60 DapWriter::Stdio(w) => Pin::new(w).poll_write(cx, buf),
61 DapWriter::Tcp(w) => Pin::new(w).poll_write(cx, buf),
62 }
63 }
64
65 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
66 match self.get_mut() {
67 DapWriter::Stdio(w) => Pin::new(w).poll_flush(cx),
68 DapWriter::Tcp(w) => Pin::new(w).poll_flush(cx),
69 }
70 }
71
72 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
73 match self.get_mut() {
74 DapWriter::Stdio(w) => Pin::new(w).poll_shutdown(cx),
75 DapWriter::Tcp(w) => Pin::new(w).poll_shutdown(cx),
76 }
77 }
78}
79
80pub struct DapClient {
82 adapter: Child,
84 writer: DapWriter,
86 seq: AtomicI64,
88 pub capabilities: Capabilities,
90 pending: PendingResponses,
92 event_tx: mpsc::UnboundedSender<Event>,
94 event_rx: Option<mpsc::UnboundedReceiver<Event>>,
96 reader_task: Option<tokio::task::JoinHandle<()>>,
98 shutdown_tx: Option<mpsc::Sender<()>>,
100}
101
102impl DapClient {
103 pub async fn spawn(adapter_path: &Path, args: &[String]) -> Result<Self> {
105 let mut cmd = Command::new(adapter_path);
106 cmd.args(args)
107 .stdin(Stdio::piped())
108 .stdout(Stdio::piped())
109 .stderr(Stdio::inherit()); let mut adapter = cmd.spawn().map_err(|e| {
112 Error::AdapterStartFailed(format!(
113 "Failed to start {}: {}",
114 adapter_path.display(),
115 e
116 ))
117 })?;
118
119 let stdin = adapter
120 .stdin
121 .take()
122 .ok_or_else(|| Error::AdapterStartFailed("Failed to get adapter stdin".to_string()))?;
123 let stdout = adapter.stdout.take().ok_or_else(|| {
124 Error::AdapterStartFailed("Failed to get adapter stdout".to_string())
125 })?;
126
127 let (event_tx, event_rx) = mpsc::unbounded_channel();
128 let (shutdown_tx, shutdown_rx) = mpsc::channel(1);
129 let pending: PendingResponses = Arc::new(Mutex::new(HashMap::new()));
130
131 let reader_task = Self::spawn_stdio_reader_task(
133 stdout,
134 event_tx.clone(),
135 pending.clone(),
136 shutdown_rx,
137 );
138
139 Ok(Self {
140 adapter,
141 writer: DapWriter::Stdio(BufWriter::new(stdin)),
142 seq: AtomicI64::new(1),
143 capabilities: Capabilities::default(),
144 pending,
145 event_tx,
146 event_rx: Some(event_rx),
147 reader_task: Some(reader_task),
148 shutdown_tx: Some(shutdown_tx),
149 })
150 }
151
152 pub async fn spawn_tcp(adapter_path: &Path, args: &[String]) -> Result<Self> {
157 use crate::common::parse_listen_address;
158 use tokio::io::{AsyncBufReadExt, BufReader as TokioBufReader};
159
160 let mut cmd = Command::new(adapter_path);
162 cmd.args(args)
163 .arg("--listen=127.0.0.1:0")
164 .stdin(Stdio::null())
165 .stdout(Stdio::piped())
166 .stderr(Stdio::piped());
167
168 let mut adapter = cmd.spawn().map_err(|e| {
169 Error::AdapterStartFailed(format!(
170 "Failed to start {}: {}",
171 adapter_path.display(),
172 e
173 ))
174 })?;
175
176 let stdout = adapter.stdout.take().ok_or_else(|| {
179 let _ = adapter.start_kill();
180 Error::AdapterStartFailed("Failed to get adapter stdout".to_string())
181 })?;
182
183 let mut stdout_reader = TokioBufReader::new(stdout);
184 let mut line = String::new();
185
186 let addr_result = tokio::time::timeout(Duration::from_secs(10), async {
188 loop {
189 line.clear();
190 let bytes_read = stdout_reader.read_line(&mut line).await.map_err(|e| {
191 Error::AdapterStartFailed(format!("Failed to read adapter output: {}", e))
192 })?;
193
194 if bytes_read == 0 {
195 return Err(Error::AdapterStartFailed(
196 "Adapter exited before outputting listen address".to_string(),
197 ));
198 }
199
200 tracing::debug!("Delve output: {}", line.trim());
201
202 if let Some(addr) = parse_listen_address(&line) {
204 return Ok(addr);
205 }
206 }
207 })
208 .await;
209
210 let addr = match addr_result {
212 Ok(Ok(addr)) => addr,
213 Ok(Err(e)) => {
214 let _ = adapter.start_kill();
215 return Err(e);
216 }
217 Err(_) => {
218 let _ = adapter.start_kill();
219 return Err(Error::AdapterStartFailed(
220 "Timeout waiting for Delve to start listening".to_string(),
221 ));
222 }
223 };
224
225 tracing::info!("Connecting to Delve DAP server at {}", addr);
226
227 let stream = match TcpStream::connect(&addr).await {
229 Ok(s) => s,
230 Err(e) => {
231 let _ = adapter.start_kill();
232 return Err(Error::AdapterStartFailed(format!(
233 "Failed to connect to Delve at {}: {}",
234 addr, e
235 )));
236 }
237 };
238
239 let (read_half, write_half) = tokio::io::split(stream);
240
241 let (event_tx, event_rx) = mpsc::unbounded_channel();
242 let (shutdown_tx, shutdown_rx) = mpsc::channel(1);
243 let pending: PendingResponses = Arc::new(Mutex::new(HashMap::new()));
244
245 let reader_task = Self::spawn_tcp_reader_task(
247 read_half,
248 event_tx.clone(),
249 pending.clone(),
250 shutdown_rx,
251 );
252
253 Ok(Self {
254 adapter,
255 writer: DapWriter::Tcp(BufWriter::new(write_half)),
256 seq: AtomicI64::new(1),
257 capabilities: Capabilities::default(),
258 pending,
259 event_tx,
260 event_rx: Some(event_rx),
261 reader_task: Some(reader_task),
262 shutdown_tx: Some(shutdown_tx),
263 })
264 }
265
266 fn spawn_stdio_reader_task(
268 stdout: ChildStdout,
269 event_tx: mpsc::UnboundedSender<Event>,
270 pending: PendingResponses,
271 mut shutdown_rx: mpsc::Receiver<()>,
272 ) -> tokio::task::JoinHandle<()> {
273 tokio::spawn(async move {
274 let mut reader = BufReader::new(stdout);
275
276 loop {
277 tokio::select! {
278 biased;
279
280 _ = shutdown_rx.recv() => {
282 tracing::debug!("Reader task received shutdown signal");
283 break;
284 }
285
286 result = codec::read_message(&mut reader) => {
288 match result {
289 Ok(json) => {
290 tracing::trace!("DAP <<< {}", json);
291
292 if let Err(e) = Self::process_message(&json, &event_tx, &pending).await {
293 tracing::error!("Error processing DAP message: {}", e);
294 }
295 }
296 Err(e) => {
297 let err_str = e.to_string().to_lowercase();
300 let is_eof = err_str.contains("unexpected eof")
301 || err_str.contains("unexpectedeof")
302 || err_str.contains("end of file");
303
304 if is_eof {
305 tracing::info!("DAP adapter closed connection");
306 } else {
307 tracing::error!("Error reading from DAP adapter: {}", e);
308 }
309
310 let mut pending_guard = pending.lock().await;
312 for (_, tx) in pending_guard.drain() {
313 let _ = tx.send(Err(Error::AdapterCrashed));
314 }
315
316 let _ = event_tx.send(Event::Terminated(None));
318 break;
319 }
320 }
321 }
322 }
323 }
324
325 tracing::debug!("Reader task exiting");
326 })
327 }
328
329 fn spawn_tcp_reader_task(
331 read_half: tokio::io::ReadHalf<TcpStream>,
332 event_tx: mpsc::UnboundedSender<Event>,
333 pending: PendingResponses,
334 mut shutdown_rx: mpsc::Receiver<()>,
335 ) -> tokio::task::JoinHandle<()> {
336 tokio::spawn(async move {
337 let mut reader = BufReader::new(read_half);
338
339 loop {
340 tokio::select! {
341 biased;
342
343 _ = shutdown_rx.recv() => {
345 tracing::debug!("TCP reader task received shutdown signal");
346 break;
347 }
348
349 result = codec::read_message(&mut reader) => {
351 match result {
352 Ok(json) => {
353 tracing::trace!("DAP <<< {}", json);
354
355 if let Err(e) = Self::process_message(&json, &event_tx, &pending).await {
356 tracing::error!("Error processing DAP message: {}", e);
357 }
358 }
359 Err(e) => {
360 let err_str = e.to_string().to_lowercase();
361 let is_eof = err_str.contains("unexpected eof")
362 || err_str.contains("unexpectedeof")
363 || err_str.contains("end of file")
364 || err_str.contains("connection reset");
365
366 if is_eof {
367 tracing::info!("DAP adapter closed TCP connection");
368 } else {
369 tracing::error!("Error reading from DAP adapter (TCP): {}", e);
370 }
371
372 let mut pending_guard = pending.lock().await;
374 for (_, tx) in pending_guard.drain() {
375 let _ = tx.send(Err(Error::AdapterCrashed));
376 }
377
378 let _ = event_tx.send(Event::Terminated(None));
380 break;
381 }
382 }
383 }
384 }
385 }
386
387 tracing::debug!("TCP reader task exiting");
388 })
389 }
390
391 async fn process_message(
393 json: &str,
394 event_tx: &mpsc::UnboundedSender<Event>,
395 pending: &PendingResponses,
396 ) -> Result<()> {
397 let msg: Value = serde_json::from_str(json)
398 .map_err(|e| Error::DapProtocol(format!("Invalid JSON: {}", e)))?;
399
400 let msg_type = msg
401 .get("type")
402 .and_then(|v| v.as_str())
403 .unwrap_or("unknown");
404
405 match msg_type {
406 "response" => {
407 let response: ResponseMessage = serde_json::from_value(msg)?;
408 let seq = response.request_seq;
409
410 let mut pending_guard = pending.lock().await;
411 if let Some(tx) = pending_guard.remove(&seq) {
412 let _ = tx.send(Ok(response));
413 } else {
414 tracing::warn!("Received response for unknown request seq {}", seq);
415 }
416 }
417 "event" => {
418 let event_msg: EventMessage = serde_json::from_value(msg)?;
419 let event = Event::from_message(&event_msg);
420 let _ = event_tx.send(event);
421 }
422 _ => {
423 tracing::warn!("Unknown message type: {}", msg_type);
424 }
425 }
426
427 Ok(())
428 }
429
430 pub fn take_event_receiver(&mut self) -> Option<mpsc::UnboundedReceiver<Event>> {
432 self.event_rx.take()
433 }
434
435 fn next_seq(&self) -> i64 {
437 self.seq.fetch_add(1, Ordering::SeqCst)
438 }
439
440 async fn send_request(&mut self, command: &str, arguments: Option<Value>) -> Result<i64> {
442 let seq = self.next_seq();
443
444 let request = if let Some(args) = arguments {
446 serde_json::json!({
447 "seq": seq,
448 "type": "request",
449 "command": command,
450 "arguments": args
451 })
452 } else {
453 serde_json::json!({
454 "seq": seq,
455 "type": "request",
456 "command": command
457 })
458 };
459
460 let json = serde_json::to_string(&request)?;
461 tracing::trace!("DAP >>> {}", json);
462
463 codec::write_message(&mut self.writer, &json).await?;
464
465 Ok(seq)
466 }
467
468 pub async fn request<T: serde::de::DeserializeOwned>(
470 &mut self,
471 command: &str,
472 arguments: Option<Value>,
473 ) -> Result<T> {
474 self.request_with_timeout(command, arguments, Duration::from_secs(30)).await
475 }
476
477 pub async fn request_with_timeout<T: serde::de::DeserializeOwned>(
483 &mut self,
484 command: &str,
485 arguments: Option<Value>,
486 timeout: Duration,
487 ) -> Result<T> {
488 let seq = self.next_seq();
489
490 let request = if let Some(ref args) = arguments {
492 serde_json::json!({
493 "seq": seq,
494 "type": "request",
495 "command": command,
496 "arguments": args
497 })
498 } else {
499 serde_json::json!({
500 "seq": seq,
501 "type": "request",
502 "command": command
503 })
504 };
505
506 let (tx, rx) = oneshot::channel();
509 {
510 let mut pending_guard = self.pending.lock().await;
511 pending_guard.insert(seq, tx);
512 }
513
514 let json = serde_json::to_string(&request)?;
516 tracing::trace!("DAP >>> {}", json);
517
518 if let Err(e) = codec::write_message(&mut self.writer, &json).await {
519 let mut pending_guard = self.pending.lock().await;
521 pending_guard.remove(&seq);
522 return Err(e);
523 }
524
525 let response = tokio::time::timeout(timeout, rx)
527 .await
528 .map_err(|_| {
529 let pending = self.pending.clone();
531 tokio::spawn(async move {
532 let mut pending_guard = pending.lock().await;
533 pending_guard.remove(&seq);
534 });
535 Error::Timeout(timeout.as_secs())
536 })?
537 .map_err(|_| Error::AdapterCrashed)??;
538
539 if response.success {
540 let body = response.body.unwrap_or(Value::Null);
541 serde_json::from_value(body).map_err(|e| {
542 Error::DapProtocol(format!(
543 "Failed to parse {} response: {}",
544 command, e
545 ))
546 })
547 } else {
548 Err(Error::dap_request_failed(
549 command,
550 &response.message.unwrap_or_else(|| "Unknown error".to_string()),
551 ))
552 }
553 }
554
555 pub async fn poll_event(&mut self) -> Result<Option<Event>> {
559 Ok(None)
563 }
564
565 pub async fn initialize(&mut self, adapter_id: &str) -> Result<Capabilities> {
567 self.initialize_with_timeout(adapter_id, Duration::from_secs(10)).await
568 }
569
570 pub async fn initialize_with_timeout(
572 &mut self,
573 adapter_id: &str,
574 timeout: Duration,
575 ) -> Result<Capabilities> {
576 let args = InitializeArguments {
577 adapter_id: adapter_id.to_string(),
578 ..Default::default()
579 };
580
581 let caps: Capabilities = self
582 .request_with_timeout("initialize", Some(serde_json::to_value(&args)?), timeout)
583 .await?;
584
585 self.capabilities = caps.clone();
586 Ok(caps)
587 }
588
589 pub async fn wait_initialized(&mut self) -> Result<()> {
594 self.wait_initialized_with_timeout(Duration::from_secs(30)).await
595 }
596
597 pub async fn wait_initialized_with_timeout(&mut self, timeout: Duration) -> Result<()> {
610 if let Some(ref mut rx) = self.event_rx {
613 let deadline = tokio::time::Instant::now() + timeout;
614
615 loop {
616 let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
617 if remaining.is_zero() {
618 return Err(Error::Timeout(timeout.as_secs()));
619 }
620
621 match tokio::time::timeout(remaining, rx.recv()).await {
622 Ok(Some(event)) => {
623 if matches!(event, Event::Initialized) {
624 return Ok(());
625 }
626 let _ = self.event_tx.send(event);
630 }
631 Ok(None) => {
632 return Err(Error::AdapterCrashed);
633 }
634 Err(_) => {
635 return Err(Error::Timeout(timeout.as_secs()));
636 }
637 }
638 }
639 } else {
640 Err(Error::Internal("Event receiver already taken before wait_initialized".to_string()))
642 }
643 }
644
645 pub async fn launch(&mut self, args: LaunchArguments) -> Result<()> {
647 self.request::<Value>("launch", Some(serde_json::to_value(&args)?))
648 .await?;
649 Ok(())
650 }
651
652 pub async fn launch_no_wait(&mut self, args: LaunchArguments) -> Result<i64> {
658 self.send_request("launch", Some(serde_json::to_value(&args)?)).await
659 }
660
661 pub async fn attach(&mut self, args: AttachArguments) -> Result<()> {
663 self.request::<Value>("attach", Some(serde_json::to_value(&args)?))
664 .await?;
665 Ok(())
666 }
667
668 pub async fn configuration_done(&mut self) -> Result<()> {
670 self.request::<Value>("configurationDone", None).await?;
671 Ok(())
672 }
673
674 pub async fn set_breakpoints(
676 &mut self,
677 source_path: &Path,
678 breakpoints: Vec<SourceBreakpoint>,
679 ) -> Result<Vec<Breakpoint>> {
680 let args = SetBreakpointsArguments {
681 source: Source {
682 path: Some(source_path.to_string_lossy().into_owned()),
683 ..Default::default()
684 },
685 breakpoints,
686 };
687
688 let response: SetBreakpointsResponseBody = self
689 .request("setBreakpoints", Some(serde_json::to_value(&args)?))
690 .await?;
691
692 Ok(response.breakpoints)
693 }
694
695 pub async fn set_function_breakpoints(
697 &mut self,
698 breakpoints: Vec<FunctionBreakpoint>,
699 ) -> Result<Vec<Breakpoint>> {
700 let args = SetFunctionBreakpointsArguments { breakpoints };
701
702 let response: SetBreakpointsResponseBody = self
703 .request(
704 "setFunctionBreakpoints",
705 Some(serde_json::to_value(&args)?),
706 )
707 .await?;
708
709 Ok(response.breakpoints)
710 }
711
712 pub async fn continue_execution(&mut self, thread_id: i64) -> Result<bool> {
714 let args = ContinueArguments {
715 thread_id,
716 single_thread: false,
717 };
718
719 let response: ContinueResponseBody = self
720 .request("continue", Some(serde_json::to_value(&args)?))
721 .await?;
722
723 Ok(response.all_threads_continued)
724 }
725
726 pub async fn next(&mut self, thread_id: i64) -> Result<()> {
728 let args = StepArguments {
729 thread_id,
730 granularity: Some("statement".to_string()),
731 };
732
733 self.request::<Value>("next", Some(serde_json::to_value(&args)?))
734 .await?;
735 Ok(())
736 }
737
738 pub async fn step_in(&mut self, thread_id: i64) -> Result<()> {
740 let args = StepArguments {
741 thread_id,
742 granularity: Some("statement".to_string()),
743 };
744
745 self.request::<Value>("stepIn", Some(serde_json::to_value(&args)?))
746 .await?;
747 Ok(())
748 }
749
750 pub async fn step_out(&mut self, thread_id: i64) -> Result<()> {
752 let args = StepArguments {
753 thread_id,
754 granularity: Some("statement".to_string()),
755 };
756
757 self.request::<Value>("stepOut", Some(serde_json::to_value(&args)?))
758 .await?;
759 Ok(())
760 }
761
762 pub async fn pause(&mut self, thread_id: i64) -> Result<()> {
764 let args = PauseArguments { thread_id };
765
766 self.request::<Value>("pause", Some(serde_json::to_value(&args)?))
767 .await?;
768 Ok(())
769 }
770
771 pub async fn stack_trace(&mut self, thread_id: i64, levels: i64) -> Result<Vec<StackFrame>> {
773 let args = StackTraceArguments {
774 thread_id,
775 start_frame: Some(0),
776 levels: Some(levels),
777 };
778
779 let response: StackTraceResponseBody = self
780 .request("stackTrace", Some(serde_json::to_value(&args)?))
781 .await?;
782
783 Ok(response.stack_frames)
784 }
785
786 pub async fn threads(&mut self) -> Result<Vec<Thread>> {
788 let response: ThreadsResponseBody = self.request("threads", None).await?;
789 Ok(response.threads)
790 }
791
792 pub async fn scopes(&mut self, frame_id: i64) -> Result<Vec<Scope>> {
794 let args = ScopesArguments { frame_id };
795
796 let response: ScopesResponseBody = self
797 .request("scopes", Some(serde_json::to_value(&args)?))
798 .await?;
799
800 Ok(response.scopes)
801 }
802
803 pub async fn variables(&mut self, variables_reference: i64) -> Result<Vec<Variable>> {
805 let args = VariablesArguments {
806 variables_reference,
807 start: None,
808 count: None,
809 };
810
811 let response: VariablesResponseBody = self
812 .request("variables", Some(serde_json::to_value(&args)?))
813 .await?;
814
815 Ok(response.variables)
816 }
817
818 pub async fn evaluate(
820 &mut self,
821 expression: &str,
822 frame_id: Option<i64>,
823 context: &str,
824 ) -> Result<EvaluateResponseBody> {
825 let args = EvaluateArguments {
826 expression: expression.to_string(),
827 frame_id,
828 context: Some(context.to_string()),
829 };
830
831 self.request("evaluate", Some(serde_json::to_value(&args)?))
832 .await
833 }
834
835 pub async fn disconnect(&mut self, terminate_debuggee: bool) -> Result<()> {
837 let args = DisconnectArguments {
838 restart: false,
839 terminate_debuggee: Some(terminate_debuggee),
840 };
841
842 let _ = self
844 .send_request("disconnect", Some(serde_json::to_value(&args)?))
845 .await;
846
847 Ok(())
848 }
849
850 pub async fn terminate(&mut self) -> Result<()> {
852 let _ = self.disconnect(true).await;
854
855 if let Some(tx) = self.shutdown_tx.take() {
857 let _ = tx.send(()).await;
858 }
859
860 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
862
863 if let Some(task) = self.reader_task.take() {
865 let _ = tokio::time::timeout(
867 Duration::from_millis(500),
868 task,
869 ).await;
870 }
871
872 let _ = self.adapter.kill().await;
874
875 Ok(())
876 }
877
878 pub fn is_running(&mut self) -> bool {
880 self.adapter.try_wait().ok().flatten().is_none()
881 }
882
883 pub async fn restart(&mut self, no_debug: bool) -> Result<()> {
885 if !self.capabilities.supports_restart_request {
886 return Err(Error::Internal(
887 "Debug adapter does not support restart".to_string(),
888 ));
889 }
890
891 let args = serde_json::json!({
892 "noDebug": no_debug
893 });
894
895 self.request::<Value>("restart", Some(args)).await?;
896 Ok(())
897 }
898}
899
900impl Drop for DapClient {
901 fn drop(&mut self) {
914 if let Some(tx) = self.shutdown_tx.take() {
916 let _ = tx.try_send(());
918 }
919
920 if let Some(task) = self.reader_task.take() {
923 task.abort();
924 }
925
926 let _ = self.adapter.start_kill();
929 }
930}