1use std::collections::HashMap;
19use std::path::Path;
20use std::process::Stdio;
21use std::sync::atomic::{AtomicI64, Ordering};
22use std::sync::Arc;
23use std::time::Duration;
24
25use serde_json::Value;
26use tokio::io::{BufReader, BufWriter};
27use tokio::process::{Child, ChildStdin, ChildStdout, Command};
28use tokio::sync::{mpsc, oneshot, Mutex};
29
30use crate::common::{Error, Result};
31
32use super::codec;
33use super::types::*;
34
35type PendingResponses = Arc<Mutex<HashMap<i64, oneshot::Sender<std::result::Result<ResponseMessage, Error>>>>>;
37
38pub struct DapClient {
40 adapter: Child,
42 writer: BufWriter<ChildStdin>,
44 seq: AtomicI64,
46 pub capabilities: Capabilities,
48 pending: PendingResponses,
50 event_tx: mpsc::UnboundedSender<Event>,
52 event_rx: Option<mpsc::UnboundedReceiver<Event>>,
54 reader_task: Option<tokio::task::JoinHandle<()>>,
56 shutdown_tx: Option<mpsc::Sender<()>>,
58}
59
60impl DapClient {
61 pub async fn spawn(adapter_path: &Path, args: &[String]) -> Result<Self> {
63 let mut cmd = Command::new(adapter_path);
64 cmd.args(args)
65 .stdin(Stdio::piped())
66 .stdout(Stdio::piped())
67 .stderr(Stdio::inherit()); let mut adapter = cmd.spawn().map_err(|e| {
70 Error::AdapterStartFailed(format!(
71 "Failed to start {}: {}",
72 adapter_path.display(),
73 e
74 ))
75 })?;
76
77 let stdin = adapter
78 .stdin
79 .take()
80 .ok_or_else(|| Error::AdapterStartFailed("Failed to get adapter stdin".to_string()))?;
81 let stdout = adapter.stdout.take().ok_or_else(|| {
82 Error::AdapterStartFailed("Failed to get adapter stdout".to_string())
83 })?;
84
85 let (event_tx, event_rx) = mpsc::unbounded_channel();
86 let (shutdown_tx, shutdown_rx) = mpsc::channel(1);
87 let pending: PendingResponses = Arc::new(Mutex::new(HashMap::new()));
88
89 let reader_task = Self::spawn_reader_task(
91 stdout,
92 event_tx.clone(),
93 pending.clone(),
94 shutdown_rx,
95 );
96
97 Ok(Self {
98 adapter,
99 writer: BufWriter::new(stdin),
100 seq: AtomicI64::new(1),
101 capabilities: Capabilities::default(),
102 pending,
103 event_tx,
104 event_rx: Some(event_rx),
105 reader_task: Some(reader_task),
106 shutdown_tx: Some(shutdown_tx),
107 })
108 }
109
110 fn spawn_reader_task(
112 stdout: ChildStdout,
113 event_tx: mpsc::UnboundedSender<Event>,
114 pending: PendingResponses,
115 mut shutdown_rx: mpsc::Receiver<()>,
116 ) -> tokio::task::JoinHandle<()> {
117 tokio::spawn(async move {
118 let mut reader = BufReader::new(stdout);
119
120 loop {
121 tokio::select! {
122 biased;
123
124 _ = shutdown_rx.recv() => {
126 tracing::debug!("Reader task received shutdown signal");
127 break;
128 }
129
130 result = codec::read_message(&mut reader) => {
132 match result {
133 Ok(json) => {
134 tracing::trace!("DAP <<< {}", json);
135
136 if let Err(e) = Self::process_message(&json, &event_tx, &pending).await {
137 tracing::error!("Error processing DAP message: {}", e);
138 }
139 }
140 Err(e) => {
141 let err_str = e.to_string().to_lowercase();
144 let is_eof = err_str.contains("unexpected eof")
145 || err_str.contains("unexpectedeof")
146 || err_str.contains("end of file");
147
148 if is_eof {
149 tracing::info!("DAP adapter closed connection");
150 } else {
151 tracing::error!("Error reading from DAP adapter: {}", e);
152 }
153
154 let mut pending_guard = pending.lock().await;
156 for (_, tx) in pending_guard.drain() {
157 let _ = tx.send(Err(Error::AdapterCrashed));
158 }
159
160 let _ = event_tx.send(Event::Terminated(None));
162 break;
163 }
164 }
165 }
166 }
167 }
168
169 tracing::debug!("Reader task exiting");
170 })
171 }
172
173 async fn process_message(
175 json: &str,
176 event_tx: &mpsc::UnboundedSender<Event>,
177 pending: &PendingResponses,
178 ) -> Result<()> {
179 let msg: Value = serde_json::from_str(json)
180 .map_err(|e| Error::DapProtocol(format!("Invalid JSON: {}", e)))?;
181
182 let msg_type = msg
183 .get("type")
184 .and_then(|v| v.as_str())
185 .unwrap_or("unknown");
186
187 match msg_type {
188 "response" => {
189 let response: ResponseMessage = serde_json::from_value(msg)?;
190 let seq = response.request_seq;
191
192 let mut pending_guard = pending.lock().await;
193 if let Some(tx) = pending_guard.remove(&seq) {
194 let _ = tx.send(Ok(response));
195 } else {
196 tracing::warn!("Received response for unknown request seq {}", seq);
197 }
198 }
199 "event" => {
200 let event_msg: EventMessage = serde_json::from_value(msg)?;
201 let event = Event::from_message(&event_msg);
202 let _ = event_tx.send(event);
203 }
204 _ => {
205 tracing::warn!("Unknown message type: {}", msg_type);
206 }
207 }
208
209 Ok(())
210 }
211
212 pub fn take_event_receiver(&mut self) -> Option<mpsc::UnboundedReceiver<Event>> {
214 self.event_rx.take()
215 }
216
217 fn next_seq(&self) -> i64 {
219 self.seq.fetch_add(1, Ordering::SeqCst)
220 }
221
222 async fn send_request(&mut self, command: &str, arguments: Option<Value>) -> Result<i64> {
224 let seq = self.next_seq();
225
226 let request = if let Some(args) = arguments {
228 serde_json::json!({
229 "seq": seq,
230 "type": "request",
231 "command": command,
232 "arguments": args
233 })
234 } else {
235 serde_json::json!({
236 "seq": seq,
237 "type": "request",
238 "command": command
239 })
240 };
241
242 let json = serde_json::to_string(&request)?;
243 tracing::trace!("DAP >>> {}", json);
244
245 codec::write_message(&mut self.writer, &json).await?;
246
247 Ok(seq)
248 }
249
250 pub async fn request<T: serde::de::DeserializeOwned>(
252 &mut self,
253 command: &str,
254 arguments: Option<Value>,
255 ) -> Result<T> {
256 self.request_with_timeout(command, arguments, Duration::from_secs(30)).await
257 }
258
259 pub async fn request_with_timeout<T: serde::de::DeserializeOwned>(
265 &mut self,
266 command: &str,
267 arguments: Option<Value>,
268 timeout: Duration,
269 ) -> Result<T> {
270 let seq = self.next_seq();
271
272 let request = if let Some(ref args) = arguments {
274 serde_json::json!({
275 "seq": seq,
276 "type": "request",
277 "command": command,
278 "arguments": args
279 })
280 } else {
281 serde_json::json!({
282 "seq": seq,
283 "type": "request",
284 "command": command
285 })
286 };
287
288 let (tx, rx) = oneshot::channel();
291 {
292 let mut pending_guard = self.pending.lock().await;
293 pending_guard.insert(seq, tx);
294 }
295
296 let json = serde_json::to_string(&request)?;
298 tracing::trace!("DAP >>> {}", json);
299
300 if let Err(e) = codec::write_message(&mut self.writer, &json).await {
301 let mut pending_guard = self.pending.lock().await;
303 pending_guard.remove(&seq);
304 return Err(e);
305 }
306
307 let response = tokio::time::timeout(timeout, rx)
309 .await
310 .map_err(|_| {
311 let pending = self.pending.clone();
313 tokio::spawn(async move {
314 let mut pending_guard = pending.lock().await;
315 pending_guard.remove(&seq);
316 });
317 Error::Timeout(timeout.as_secs())
318 })?
319 .map_err(|_| Error::AdapterCrashed)??;
320
321 if response.success {
322 let body = response.body.unwrap_or(Value::Null);
323 serde_json::from_value(body).map_err(|e| {
324 Error::DapProtocol(format!(
325 "Failed to parse {} response: {}",
326 command, e
327 ))
328 })
329 } else {
330 Err(Error::dap_request_failed(
331 command,
332 &response.message.unwrap_or_else(|| "Unknown error".to_string()),
333 ))
334 }
335 }
336
337 pub async fn poll_event(&mut self) -> Result<Option<Event>> {
341 Ok(None)
345 }
346
347 pub async fn initialize(&mut self, adapter_id: &str) -> Result<Capabilities> {
349 self.initialize_with_timeout(adapter_id, Duration::from_secs(10)).await
350 }
351
352 pub async fn initialize_with_timeout(
354 &mut self,
355 adapter_id: &str,
356 timeout: Duration,
357 ) -> Result<Capabilities> {
358 let args = InitializeArguments {
359 adapter_id: adapter_id.to_string(),
360 ..Default::default()
361 };
362
363 let caps: Capabilities = self
364 .request_with_timeout("initialize", Some(serde_json::to_value(&args)?), timeout)
365 .await?;
366
367 self.capabilities = caps.clone();
368 Ok(caps)
369 }
370
371 pub async fn wait_initialized(&mut self) -> Result<()> {
376 self.wait_initialized_with_timeout(Duration::from_secs(30)).await
377 }
378
379 pub async fn wait_initialized_with_timeout(&mut self, timeout: Duration) -> Result<()> {
392 if let Some(ref mut rx) = self.event_rx {
395 let deadline = tokio::time::Instant::now() + timeout;
396
397 loop {
398 let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
399 if remaining.is_zero() {
400 return Err(Error::Timeout(timeout.as_secs()));
401 }
402
403 match tokio::time::timeout(remaining, rx.recv()).await {
404 Ok(Some(event)) => {
405 if matches!(event, Event::Initialized) {
406 return Ok(());
407 }
408 let _ = self.event_tx.send(event);
412 }
413 Ok(None) => {
414 return Err(Error::AdapterCrashed);
415 }
416 Err(_) => {
417 return Err(Error::Timeout(timeout.as_secs()));
418 }
419 }
420 }
421 } else {
422 Err(Error::Internal("Event receiver already taken before wait_initialized".to_string()))
424 }
425 }
426
427 pub async fn launch(&mut self, args: LaunchArguments) -> Result<()> {
429 self.request::<Value>("launch", Some(serde_json::to_value(&args)?))
430 .await?;
431 Ok(())
432 }
433
434 pub async fn launch_no_wait(&mut self, args: LaunchArguments) -> Result<i64> {
440 self.send_request("launch", Some(serde_json::to_value(&args)?)).await
441 }
442
443 pub async fn attach(&mut self, args: AttachArguments) -> Result<()> {
445 self.request::<Value>("attach", Some(serde_json::to_value(&args)?))
446 .await?;
447 Ok(())
448 }
449
450 pub async fn configuration_done(&mut self) -> Result<()> {
452 self.request::<Value>("configurationDone", None).await?;
453 Ok(())
454 }
455
456 pub async fn set_breakpoints(
458 &mut self,
459 source_path: &Path,
460 breakpoints: Vec<SourceBreakpoint>,
461 ) -> Result<Vec<Breakpoint>> {
462 let args = SetBreakpointsArguments {
463 source: Source {
464 path: Some(source_path.to_string_lossy().into_owned()),
465 ..Default::default()
466 },
467 breakpoints,
468 };
469
470 let response: SetBreakpointsResponseBody = self
471 .request("setBreakpoints", Some(serde_json::to_value(&args)?))
472 .await?;
473
474 Ok(response.breakpoints)
475 }
476
477 pub async fn set_function_breakpoints(
479 &mut self,
480 breakpoints: Vec<FunctionBreakpoint>,
481 ) -> Result<Vec<Breakpoint>> {
482 let args = SetFunctionBreakpointsArguments { breakpoints };
483
484 let response: SetBreakpointsResponseBody = self
485 .request(
486 "setFunctionBreakpoints",
487 Some(serde_json::to_value(&args)?),
488 )
489 .await?;
490
491 Ok(response.breakpoints)
492 }
493
494 pub async fn continue_execution(&mut self, thread_id: i64) -> Result<bool> {
496 let args = ContinueArguments {
497 thread_id,
498 single_thread: false,
499 };
500
501 let response: ContinueResponseBody = self
502 .request("continue", Some(serde_json::to_value(&args)?))
503 .await?;
504
505 Ok(response.all_threads_continued)
506 }
507
508 pub async fn next(&mut self, thread_id: i64) -> Result<()> {
510 let args = StepArguments {
511 thread_id,
512 granularity: Some("statement".to_string()),
513 };
514
515 self.request::<Value>("next", Some(serde_json::to_value(&args)?))
516 .await?;
517 Ok(())
518 }
519
520 pub async fn step_in(&mut self, thread_id: i64) -> Result<()> {
522 let args = StepArguments {
523 thread_id,
524 granularity: Some("statement".to_string()),
525 };
526
527 self.request::<Value>("stepIn", Some(serde_json::to_value(&args)?))
528 .await?;
529 Ok(())
530 }
531
532 pub async fn step_out(&mut self, thread_id: i64) -> Result<()> {
534 let args = StepArguments {
535 thread_id,
536 granularity: Some("statement".to_string()),
537 };
538
539 self.request::<Value>("stepOut", Some(serde_json::to_value(&args)?))
540 .await?;
541 Ok(())
542 }
543
544 pub async fn pause(&mut self, thread_id: i64) -> Result<()> {
546 let args = PauseArguments { thread_id };
547
548 self.request::<Value>("pause", Some(serde_json::to_value(&args)?))
549 .await?;
550 Ok(())
551 }
552
553 pub async fn stack_trace(&mut self, thread_id: i64, levels: i64) -> Result<Vec<StackFrame>> {
555 let args = StackTraceArguments {
556 thread_id,
557 start_frame: Some(0),
558 levels: Some(levels),
559 };
560
561 let response: StackTraceResponseBody = self
562 .request("stackTrace", Some(serde_json::to_value(&args)?))
563 .await?;
564
565 Ok(response.stack_frames)
566 }
567
568 pub async fn threads(&mut self) -> Result<Vec<Thread>> {
570 let response: ThreadsResponseBody = self.request("threads", None).await?;
571 Ok(response.threads)
572 }
573
574 pub async fn scopes(&mut self, frame_id: i64) -> Result<Vec<Scope>> {
576 let args = ScopesArguments { frame_id };
577
578 let response: ScopesResponseBody = self
579 .request("scopes", Some(serde_json::to_value(&args)?))
580 .await?;
581
582 Ok(response.scopes)
583 }
584
585 pub async fn variables(&mut self, variables_reference: i64) -> Result<Vec<Variable>> {
587 let args = VariablesArguments {
588 variables_reference,
589 start: None,
590 count: None,
591 };
592
593 let response: VariablesResponseBody = self
594 .request("variables", Some(serde_json::to_value(&args)?))
595 .await?;
596
597 Ok(response.variables)
598 }
599
600 pub async fn evaluate(
602 &mut self,
603 expression: &str,
604 frame_id: Option<i64>,
605 context: &str,
606 ) -> Result<EvaluateResponseBody> {
607 let args = EvaluateArguments {
608 expression: expression.to_string(),
609 frame_id,
610 context: Some(context.to_string()),
611 };
612
613 self.request("evaluate", Some(serde_json::to_value(&args)?))
614 .await
615 }
616
617 pub async fn disconnect(&mut self, terminate_debuggee: bool) -> Result<()> {
619 let args = DisconnectArguments {
620 restart: false,
621 terminate_debuggee: Some(terminate_debuggee),
622 };
623
624 let _ = self
626 .send_request("disconnect", Some(serde_json::to_value(&args)?))
627 .await;
628
629 Ok(())
630 }
631
632 pub async fn terminate(&mut self) -> Result<()> {
634 let _ = self.disconnect(true).await;
636
637 if let Some(tx) = self.shutdown_tx.take() {
639 let _ = tx.send(()).await;
640 }
641
642 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
644
645 if let Some(task) = self.reader_task.take() {
647 let _ = tokio::time::timeout(
649 Duration::from_millis(500),
650 task,
651 ).await;
652 }
653
654 let _ = self.adapter.kill().await;
656
657 Ok(())
658 }
659
660 pub fn is_running(&mut self) -> bool {
662 self.adapter.try_wait().ok().flatten().is_none()
663 }
664
665 pub async fn restart(&mut self, no_debug: bool) -> Result<()> {
667 if !self.capabilities.supports_restart_request {
668 return Err(Error::Internal(
669 "Debug adapter does not support restart".to_string(),
670 ));
671 }
672
673 let args = serde_json::json!({
674 "noDebug": no_debug
675 });
676
677 self.request::<Value>("restart", Some(args)).await?;
678 Ok(())
679 }
680}
681
682impl Drop for DapClient {
683 fn drop(&mut self) {
696 if let Some(tx) = self.shutdown_tx.take() {
698 let _ = tx.try_send(());
700 }
701
702 if let Some(task) = self.reader_task.take() {
705 task.abort();
706 }
707
708 let _ = self.adapter.start_kill();
711 }
712}