1pub(crate) mod api;
20pub mod codec;
21pub mod error;
22mod executor;
23mod receiver;
24mod sender;
25
26use crate::dap::{
27 api::{CancelErrorResponse, DebugAdapter, DebugAdapterContext, ProgressContext},
28 error::DebugAdapterError,
29 executor::DebugAdapterExecutor,
30 receiver::DebugAdapterReceiver,
31 sender::DebugAdapterSender,
32};
33use debug_adapter_protocol::{
34 events::{Event, ProgressEndEventBody, ProgressStartEventBody},
35 requests::Request,
36 responses::{ErrorResponse, Response, SuccessResponse},
37 ProtocolMessage, ProtocolMessageContent, SequenceNumber,
38};
39use futures::{future::Either, FutureExt, Sink, SinkExt, Stream, TryFutureExt};
40use log::trace;
41use serde_json::Value;
42use std::{
43 collections::{HashMap, HashSet},
44 sync::{Arc, Mutex},
45};
46use tokio::{
47 spawn,
48 sync::mpsc::{self, unbounded_channel, UnboundedSender},
49 try_join,
50};
51use uuid::Uuid;
52
53pub async fn run_adapter<D, I, O, E>(
54 input: I,
55 output: O,
56 adapter_factory: impl FnOnce(
57 UnboundedSender<Either<ProtocolMessage, <D as DebugAdapter>::Message>>,
58 ) -> D,
59) -> Result<
60 (),
61 DebugAdapterError<E, <O as Sink<ProtocolMessage>>::Error, <D as DebugAdapter>::CustomError>,
62>
63where
64 D: DebugAdapter + Send + 'static,
65 I: Stream<Item = Result<ProtocolMessage, E>> + Unpin + Send + 'static,
66 O: Sink<ProtocolMessage> + Unpin + Send + 'static,
67 E: Send + 'static,
68 <O as Sink<ProtocolMessage>>::Error: Send + 'static,
69 <D as DebugAdapter>::CustomError: Send + 'static,
70{
71 let (outbox_sender, outbox_receiver) = unbounded_channel();
72 let outbox = Outbox { outbox_sender };
73 let (inbox_sender, inbox_receiver) = unbounded_channel();
74 let (cancel_sender, cancel_receiver) = unbounded_channel();
75 let adapter = adapter_factory(inbox_sender.clone());
76 let (shutdown_sender, shutdown_receiver) = mpsc::channel(1);
77
78 let cancel_data = Arc::new(Mutex::new(CancelData::new()));
79 let receiver = DebugAdapterReceiver {
80 inbox_sender,
81 outbox: outbox.clone(),
82 cancel_data: cancel_data.clone(),
83 cancel_sender,
84 input,
85 shutdown_receiver,
86 };
87
88 let executor = DebugAdapterExecutor {
89 inbox_receiver,
90 outbox,
91 cancel_data,
92 cancel_receiver,
93 adapter,
94 shutdown_sender,
95 };
96
97 let message_writer = MessageWriter::new(output);
98 let sender = DebugAdapterSender {
99 message_writer,
100 outbox_receiver,
101 };
102
103 let receiver = spawn(receiver.run());
104 let executor = spawn(executor.run());
105 let sender = spawn(sender.run());
106
107 try_join!(
108 receiver
109 .map(Result::unwrap) .map_err(DebugAdapterError::Input),
111 executor
112 .map(Result::unwrap) .map_err(DebugAdapterError::Custom),
114 sender
115 .map(Result::unwrap) .map_err(DebugAdapterError::Output),
117 )?;
118
119 Ok(())
120}
121
122struct CancelData {
123 current_request_id: Option<i32>,
124 cancelled_request_ids: HashSet<i32>,
125 current_progresses: HashMap<String, UnboundedSender<SequenceNumber>>,
126}
127impl CancelData {
128 fn new() -> Self {
129 CancelData {
130 current_request_id: None,
131 cancelled_request_ids: HashSet::new(),
132 current_progresses: HashMap::new(),
133 }
134 }
135}
136
137pub struct DebugAdapterContextImpl {
138 outbox: Outbox,
139 cancel_data: Arc<Mutex<CancelData>>,
140 shutdown: bool,
141}
142impl DebugAdapterContextImpl {
143 fn new(outbox: Outbox, cancel_data: Arc<Mutex<CancelData>>) -> DebugAdapterContextImpl {
144 DebugAdapterContextImpl {
145 outbox,
146 cancel_data,
147 shutdown: false,
148 }
149 }
150}
151impl DebugAdapterContext for &mut DebugAdapterContextImpl {
152 fn fire_event(&mut self, event: impl Into<Event> + Send) {
153 let event = event.into();
154 self.outbox.send(event);
155 }
156
157 fn start_cancellable_progress(
158 &mut self,
159 title: String,
160 message: Option<String>,
161 ) -> ProgressContext {
162 let progress_id = Uuid::new_v4();
163 let (cancel_sender, cancel_receiver) = unbounded_channel();
164 {
165 let mut cancel_data = self.cancel_data.lock().unwrap();
166 cancel_data
167 .current_progresses
168 .insert(progress_id.to_string(), cancel_sender);
169 }
170
171 let event = ProgressStartEventBody::builder()
172 .progress_id(progress_id.to_string())
173 .title(title)
174 .message(message)
175 .cancellable(true)
176 .build();
177 self.fire_event(event);
178
179 let progress_id = progress_id.to_string();
180 let outbox = self.outbox.clone();
181 ProgressContext::new(progress_id, cancel_receiver, outbox)
182 }
183
184 fn end_cancellable_progress(&mut self, progress_id: String, message: Option<String>) {
185 {
186 let mut cancel_data = self.cancel_data.lock().unwrap();
187 cancel_data.current_progresses.remove(&progress_id);
188 }
189 let event = ProgressEndEventBody::builder()
190 .progress_id(progress_id)
191 .message(message)
192 .build();
193 self.fire_event(event);
194 }
195
196 fn shutdown(&mut self) {
197 trace!("Shutting down executor");
198 self.shutdown = true
199 }
200}
201
202#[derive(Clone)]
203struct Outbox {
204 outbox_sender: UnboundedSender<ProtocolMessageContent>,
205}
206impl Outbox {
207 fn send(&self, message: impl Into<ProtocolMessageContent>) {
208 let _ = self.outbox_sender.send(message.into());
209 }
210
211 fn respond(&self, request_id: SequenceNumber, result: Result<SuccessResponse, ErrorResponse>) {
212 let response = Response {
213 request_seq: request_id,
214 result,
215 };
216 self.send(response);
217 }
218
219 fn respond_unknown_progress(&self, request_id: SequenceNumber, progress_id: String) {
220 let response = Err(CancelErrorResponse::builder()
221 .message(format!("Unknown progress id: {}", progress_id))
222 .build()
223 .into());
224 self.respond(request_id, response);
225 }
226}
227
228pub struct MessageWriter<O>
229where
230 O: Sink<ProtocolMessage>,
231{
232 seq: SequenceNumber,
233 output: O,
234}
235
236impl<O> MessageWriter<O>
237where
238 O: Sink<ProtocolMessage> + Unpin,
239{
240 pub fn new(output: O) -> MessageWriter<O> {
241 MessageWriter { seq: 0, output }
242 }
243
244 pub async fn respond(
245 &mut self,
246 request_seq: SequenceNumber,
247 result: Result<SuccessResponse, ErrorResponse>,
248 ) -> Result<(), O::Error> {
249 self.write_msg(ProtocolMessageContent::Response(Response {
250 request_seq,
251 result,
252 }))
253 .await
254 }
255
256 pub async fn write_msg(
257 &mut self,
258 content: impl Into<ProtocolMessageContent>,
259 ) -> Result<(), O::Error> {
260 self.seq += 1;
261 let msg = ProtocolMessage::new(self.seq, content);
262 trace!("Sending message to client: {}", msg);
263 self.output.send(msg).await
264 }
265}
266
267pub fn get_command(request: &Request) -> String {
268 let value = serde_json::to_value(request).unwrap();
269 if let Value::Object(mut object) = value {
270 let command = object.remove("command").unwrap();
271 if let Value::String(command) = command {
272 command
273 } else {
274 panic!("command must be a string");
275 }
276 } else {
277 panic!("value must be an object");
278 }
279}