1use std::io::{self, Cursor, Read};
2use std::ops::{Deref, DerefMut};
3use std::path::PathBuf;
4
5use futures::stream::{Stream, StreamExt};
6use tokio::sync::mpsc;
7use tokio::sync::mpsc::error::TryRecvError;
8use tokio::task::JoinHandle;
9
10use crate::client::{
11 DistantChannel, RemoteCommand, RemoteProcess, RemoteStatus, RemoteStderr, RemoteStdin,
12 RemoteStdout,
13};
14use crate::protocol::{Environment, PtySize};
15
16mod msg;
17pub use msg::*;
18
19pub struct RemoteLspCommand {
22 pty: Option<PtySize>,
23 environment: Environment,
24 current_dir: Option<PathBuf>,
25 scheme: Option<String>,
26}
27
28impl Default for RemoteLspCommand {
29 fn default() -> Self {
30 Self::new()
31 }
32}
33
34impl RemoteLspCommand {
35 pub fn new() -> Self {
37 Self {
38 pty: None,
39 environment: Environment::new(),
40 current_dir: None,
41 scheme: None,
42 }
43 }
44
45 pub fn pty(&mut self, pty: Option<PtySize>) -> &mut Self {
47 self.pty = pty;
48 self
49 }
50
51 pub fn environment(&mut self, environment: Environment) -> &mut Self {
53 self.environment = environment;
54 self
55 }
56
57 pub fn current_dir(&mut self, current_dir: Option<PathBuf>) -> &mut Self {
59 self.current_dir = current_dir;
60 self
61 }
62
63 pub fn scheme(&mut self, scheme: Option<String>) -> &mut Self {
65 self.scheme = scheme;
66 self
67 }
68
69 pub async fn spawn(
72 &mut self,
73 channel: DistantChannel,
74 cmd: impl Into<String>,
75 ) -> io::Result<RemoteLspProcess> {
76 let mut command = RemoteCommand::new();
77 command.environment(self.environment.clone());
78 command.current_dir(self.current_dir.clone());
79 command.pty(self.pty);
80
81 let mut inner = command.spawn(channel, cmd).await?;
82 let stdin = inner
83 .stdin
84 .take()
85 .map(|x| RemoteLspStdin::new(x, self.scheme.clone()));
86 let stdout = inner
87 .stdout
88 .take()
89 .map(|x| RemoteLspStdout::new(x, self.scheme.clone()));
90 let stderr = inner
91 .stderr
92 .take()
93 .map(|x| RemoteLspStderr::new(x, self.scheme.clone()));
94
95 Ok(RemoteLspProcess {
96 inner,
97 stdin,
98 stdout,
99 stderr,
100 })
101 }
102}
103
104#[derive(Debug)]
106pub struct RemoteLspProcess {
107 inner: RemoteProcess,
108 pub stdin: Option<RemoteLspStdin>,
109 pub stdout: Option<RemoteLspStdout>,
110 pub stderr: Option<RemoteLspStderr>,
111}
112
113impl RemoteLspProcess {
114 pub async fn wait(self) -> io::Result<RemoteStatus> {
116 self.inner.wait().await
117 }
118}
119
120impl Deref for RemoteLspProcess {
121 type Target = RemoteProcess;
122
123 fn deref(&self) -> &Self::Target {
124 &self.inner
125 }
126}
127
128impl DerefMut for RemoteLspProcess {
129 fn deref_mut(&mut self) -> &mut Self::Target {
130 &mut self.inner
131 }
132}
133
134#[derive(Debug)]
136pub struct RemoteLspStdin {
137 inner: RemoteStdin,
138 buf: Option<Vec<u8>>,
139 scheme: Option<String>,
140}
141
142impl RemoteLspStdin {
143 pub fn new(inner: RemoteStdin, scheme: impl Into<Option<String>>) -> Self {
144 Self {
145 inner,
146 buf: None,
147 scheme: scheme.into(),
148 }
149 }
150
151 pub fn try_write(&mut self, data: &[u8]) -> io::Result<()> {
153 let queue = self.update_and_read_messages(data)?;
154
155 for mut data in queue {
157 match self.scheme.as_mut() {
159 Some(scheme) => data.mut_content().convert_scheme_to_local(scheme),
160 None => data.mut_content().convert_distant_scheme_to_local(),
161 }
162 data.refresh_content_length();
163 self.inner.try_write_str(data.to_string())?;
164 }
165
166 Ok(())
167 }
168
169 pub fn try_write_str(&mut self, data: &str) -> io::Result<()> {
170 self.try_write(data.as_bytes())
171 }
172
173 pub async fn write(&mut self, data: &[u8]) -> io::Result<()> {
175 let queue = self.update_and_read_messages(data)?;
176
177 for mut data in queue {
179 match self.scheme.as_mut() {
181 Some(scheme) => data.mut_content().convert_scheme_to_local(scheme),
182 None => data.mut_content().convert_distant_scheme_to_local(),
183 }
184 data.refresh_content_length();
185 self.inner.write_str(data.to_string()).await?;
186 }
187
188 Ok(())
189 }
190
191 pub async fn write_str(&mut self, data: &str) -> io::Result<()> {
192 self.write(data.as_bytes()).await
193 }
194
195 fn update_and_read_messages(&mut self, data: &[u8]) -> io::Result<Vec<LspMsg>> {
196 match &mut self.buf {
198 Some(buf) => buf.extend(data),
199 None => self.buf = Some(data.to_vec()),
200 }
201
202 let buf = self.buf.take().unwrap();
204 match read_lsp_messages(&buf) {
205 Ok((remainder, queue)) => {
207 self.buf = remainder;
208 Ok(queue)
209 }
210
211 Err(x) => {
213 self.buf = Some(buf);
214 Err(x)
215 }
216 }
217 }
218}
219
220#[derive(Debug)]
222pub struct RemoteLspStdout {
223 read_task: JoinHandle<()>,
224 rx: mpsc::Receiver<io::Result<Vec<u8>>>,
225}
226
227impl RemoteLspStdout {
228 pub fn new(inner: RemoteStdout, scheme: impl Into<Option<String>>) -> Self {
229 let (read_task, rx) = spawn_read_task(
230 Box::pin(futures::stream::unfold(inner, |mut inner| async move {
231 match inner.read().await {
232 Ok(res) => Some((res, inner)),
233 Err(_) => None,
234 }
235 })),
236 scheme,
237 );
238
239 Self { read_task, rx }
240 }
241
242 pub fn try_read(&mut self) -> io::Result<Option<Vec<u8>>> {
245 match self.rx.try_recv() {
246 Ok(Ok(data)) => Ok(Some(data)),
247 Ok(Err(x)) => Err(x),
248 Err(TryRecvError::Empty) => Ok(None),
249 Err(TryRecvError::Disconnected) => Err(io::Error::from(io::ErrorKind::BrokenPipe)),
250 }
251 }
252
253 pub fn try_read_string(&mut self) -> io::Result<Option<String>> {
255 self.try_read().and_then(|x| match x {
256 Some(data) => String::from_utf8(data)
257 .map(Some)
258 .map_err(|x| io::Error::new(io::ErrorKind::InvalidData, x)),
259 None => Ok(None),
260 })
261 }
262
263 pub async fn read(&mut self) -> io::Result<Vec<u8>> {
265 self.rx
266 .recv()
267 .await
268 .ok_or_else(|| io::Error::from(io::ErrorKind::BrokenPipe))?
269 }
270
271 pub async fn read_string(&mut self) -> io::Result<String> {
273 self.read().await.and_then(|data| {
274 String::from_utf8(data).map_err(|x| io::Error::new(io::ErrorKind::InvalidData, x))
275 })
276 }
277}
278
279impl Drop for RemoteLspStdout {
280 fn drop(&mut self) {
281 self.read_task.abort();
282 self.rx.close();
283 }
284}
285
286#[derive(Debug)]
288pub struct RemoteLspStderr {
289 read_task: JoinHandle<()>,
290 rx: mpsc::Receiver<io::Result<Vec<u8>>>,
291}
292
293impl RemoteLspStderr {
294 pub fn new(inner: RemoteStderr, scheme: impl Into<Option<String>>) -> Self {
295 let (read_task, rx) = spawn_read_task(
296 Box::pin(futures::stream::unfold(inner, |mut inner| async move {
297 match inner.read().await {
298 Ok(res) => Some((res, inner)),
299 Err(_) => None,
300 }
301 })),
302 scheme,
303 );
304
305 Self { read_task, rx }
306 }
307
308 pub fn try_read(&mut self) -> io::Result<Option<Vec<u8>>> {
311 match self.rx.try_recv() {
312 Ok(Ok(data)) => Ok(Some(data)),
313 Ok(Err(x)) => Err(x),
314 Err(TryRecvError::Empty) => Ok(None),
315 Err(TryRecvError::Disconnected) => Err(io::Error::from(io::ErrorKind::BrokenPipe)),
316 }
317 }
318
319 pub fn try_read_string(&mut self) -> io::Result<Option<String>> {
321 self.try_read().and_then(|x| match x {
322 Some(data) => String::from_utf8(data)
323 .map(Some)
324 .map_err(|x| io::Error::new(io::ErrorKind::InvalidData, x)),
325 None => Ok(None),
326 })
327 }
328
329 pub async fn read(&mut self) -> io::Result<Vec<u8>> {
331 self.rx
332 .recv()
333 .await
334 .ok_or_else(|| io::Error::from(io::ErrorKind::BrokenPipe))?
335 }
336
337 pub async fn read_string(&mut self) -> io::Result<String> {
339 self.read().await.and_then(|data| {
340 String::from_utf8(data).map_err(|x| io::Error::new(io::ErrorKind::InvalidData, x))
341 })
342 }
343}
344
345impl Drop for RemoteLspStderr {
346 fn drop(&mut self) {
347 self.read_task.abort();
348 self.rx.close();
349 }
350}
351
352fn spawn_read_task<S>(
353 mut stream: S,
354 scheme: impl Into<Option<String>>,
355) -> (JoinHandle<()>, mpsc::Receiver<io::Result<Vec<u8>>>)
356where
357 S: Stream<Item = Vec<u8>> + Send + Unpin + 'static,
358{
359 let mut scheme = scheme.into();
360 let (tx, rx) = mpsc::channel::<io::Result<Vec<u8>>>(1);
361 let read_task = tokio::spawn(async move {
362 let mut task_buf: Option<Vec<u8>> = None;
363
364 while let Some(data) = stream.next().await {
365 match &mut task_buf {
367 Some(buf) => buf.extend(data),
368 None => task_buf = Some(data),
369 }
370
371 let buf = task_buf.take().unwrap();
373 let (remainder, queue) = match read_lsp_messages(&buf) {
374 Ok(x) => x,
375 Err(x) => {
376 let _ = tx.send(Err(x)).await;
377 break;
378 }
379 };
380 task_buf = remainder;
381
382 if !queue.is_empty() {
384 let mut out = Vec::new();
385 for mut data in queue {
386 match scheme.as_mut() {
388 Some(scheme) => data.mut_content().convert_local_scheme_to(scheme),
389 None => data.mut_content().convert_local_scheme_to_distant(),
390 }
391 data.refresh_content_length();
392 out.extend(data.to_bytes());
393 }
394 if tx.send(Ok(out)).await.is_err() {
395 break;
396 }
397 }
398 }
399 });
400
401 (read_task, rx)
402}
403
404fn read_lsp_messages(input: &[u8]) -> io::Result<(Option<Vec<u8>>, Vec<LspMsg>)> {
405 let mut queue = Vec::new();
406
407 let mut cursor = Cursor::new(input);
411 let mut pos = 0;
412 while let Ok(data) = LspMsg::from_buf_reader(&mut cursor) {
413 queue.push(data);
414 pos = cursor.position();
415 }
416 cursor.set_position(pos);
417
418 let remainder = if (cursor.position() as usize) < cursor.get_ref().len() {
420 let mut buf = Vec::new();
421 cursor.read_to_end(&mut buf)?;
422 Some(buf)
423 } else {
424 None
425 };
426
427 Ok((remainder, queue))
428}
429
430#[cfg(test)]
431mod tests {
432 use std::future::Future;
433 use std::time::Duration;
434
435 use distant_net::common::{FramedTransport, InmemoryTransport, Request, Response};
436 use distant_net::Client;
437 use test_log::test;
438
439 use super::*;
440 use crate::protocol;
441
442 const TIMEOUT: Duration = Duration::from_millis(50);
444
445 async fn spawn_lsp_process() -> (FramedTransport<InmemoryTransport>, RemoteLspProcess) {
447 let (mut t1, t2) = FramedTransport::pair(100);
448 let client = Client::spawn_inmemory(t2, Default::default());
449 let spawn_task = tokio::spawn({
450 let channel = client.clone_channel();
451 async move {
452 RemoteLspCommand::new()
453 .spawn(channel, String::from("cmd arg"))
454 .await
455 }
456 });
457
458 let req: Request<protocol::Request> = t1.read_frame_as().await.unwrap().unwrap();
460
461 t1.write_frame_for(&Response::new(
463 req.id,
464 protocol::Response::ProcSpawned { id: rand::random() },
465 ))
466 .await
467 .unwrap();
468
469 let proc = spawn_task.await.unwrap().unwrap();
471 (t1, proc)
472 }
473
474 fn make_lsp_msg<T>(value: T) -> Vec<u8>
475 where
476 T: serde::Serialize,
477 {
478 let content = serde_json::to_string_pretty(&value).unwrap();
479 format!("Content-Length: {}\r\n\r\n{}", content.len(), content).into_bytes()
480 }
481
482 async fn timeout<F, R>(duration: Duration, f: F) -> io::Result<R>
483 where
484 F: Future<Output = R>,
485 {
486 tokio::select! {
487 res = f => {
488 Ok(res)
489 }
490 _ = tokio::time::sleep(duration) => {
491 Err(io::Error::from(io::ErrorKind::TimedOut))
492 }
493 }
494 }
495
496 #[test(tokio::test)]
497 async fn stdin_write_should_only_send_out_complete_lsp_messages() {
498 let (mut transport, mut proc) = spawn_lsp_process().await;
499
500 proc.stdin
501 .as_mut()
502 .unwrap()
503 .write(&make_lsp_msg(serde_json::json!({
504 "field1": "a",
505 "field2": "b",
506 })))
507 .await
508 .unwrap();
509
510 let req: Request<protocol::Request> = transport.read_frame_as().await.unwrap().unwrap();
512 match req.payload {
513 protocol::Request::ProcStdin { data, .. } => {
514 assert_eq!(
515 data,
516 make_lsp_msg(serde_json::json!({
517 "field1": "a",
518 "field2": "b",
519 }))
520 );
521 }
522 x => panic!("Unexpected request: {:?}", x),
523 }
524 }
525
526 #[test(tokio::test)]
527 async fn stdin_write_should_support_buffering_output_until_a_complete_lsp_message_is_composed()
528 {
529 let (mut transport, mut proc) = spawn_lsp_process().await;
530
531 let msg = make_lsp_msg(serde_json::json!({
532 "field1": "a",
533 "field2": "b",
534 }));
535 let (msg_a, msg_b) = msg.split_at(msg.len() / 2);
536
537 proc.stdin.as_mut().unwrap().write(msg_a).await.unwrap();
539
540 tokio::task::yield_now().await;
543 let result = timeout(
544 TIMEOUT,
545 transport.read_frame_as::<Request<protocol::Request>>(),
546 )
547 .await;
548 assert!(result.is_err(), "Unexpectedly got data: {:?}", result);
549
550 proc.stdin.as_mut().unwrap().write(msg_b).await.unwrap();
552
553 let req: Request<protocol::Request> = transport.read_frame_as().await.unwrap().unwrap();
555 match req.payload {
556 protocol::Request::ProcStdin { data, .. } => {
557 assert_eq!(
558 data,
559 make_lsp_msg(serde_json::json!({
560 "field1": "a",
561 "field2": "b",
562 }))
563 );
564 }
565 x => panic!("Unexpected request: {:?}", x),
566 }
567 }
568
569 #[test(tokio::test)]
570 async fn stdin_write_should_only_consume_a_complete_lsp_message_even_if_more_is_written() {
571 let (mut transport, mut proc) = spawn_lsp_process().await;
572
573 let msg = make_lsp_msg(serde_json::json!({
574 "field1": "a",
575 "field2": "b",
576 }));
577
578 let extra = "Content-Length: 123";
579
580 proc.stdin
582 .as_mut()
583 .unwrap()
584 .write_str(&format!("{}{}", String::from_utf8(msg).unwrap(), extra))
585 .await
586 .unwrap();
587
588 let req: Request<protocol::Request> = transport.read_frame_as().await.unwrap().unwrap();
590 match req.payload {
591 protocol::Request::ProcStdin { data, .. } => {
592 assert_eq!(
593 data,
594 make_lsp_msg(serde_json::json!({
595 "field1": "a",
596 "field2": "b",
597 }))
598 );
599 }
600 x => panic!("Unexpected request: {:?}", x),
601 }
602
603 assert_eq!(
605 String::from_utf8(proc.stdin.unwrap().buf.unwrap()).unwrap(),
606 extra,
607 "Extra was not retained"
608 );
609 }
610
611 #[test(tokio::test)]
612 async fn stdin_write_should_support_sending_out_multiple_lsp_messages_if_all_received_at_once()
613 {
614 let (mut transport, mut proc) = spawn_lsp_process().await;
615
616 let msg_1 = make_lsp_msg(serde_json::json!({
617 "field1": "a",
618 "field2": "b",
619 }));
620 let msg_2 = make_lsp_msg(serde_json::json!({
621 "field1": "c",
622 "field2": "d",
623 }));
624
625 proc.stdin
627 .as_mut()
628 .unwrap()
629 .write_str(&format!(
630 "{}{}",
631 String::from_utf8(msg_1).unwrap(),
632 String::from_utf8(msg_2).unwrap()
633 ))
634 .await
635 .unwrap();
636
637 let req: Request<protocol::Request> = transport.read_frame_as().await.unwrap().unwrap();
639 match req.payload {
640 protocol::Request::ProcStdin { data, .. } => {
641 assert_eq!(
642 data,
643 make_lsp_msg(serde_json::json!({
644 "field1": "a",
645 "field2": "b",
646 }))
647 );
648 }
649 x => panic!("Unexpected request: {:?}", x),
650 }
651
652 let req: Request<protocol::Request> = transport.read_frame_as().await.unwrap().unwrap();
654 match req.payload {
655 protocol::Request::ProcStdin { data, .. } => {
656 assert_eq!(
657 data,
658 make_lsp_msg(serde_json::json!({
659 "field1": "c",
660 "field2": "d",
661 }))
662 );
663 }
664 x => panic!("Unexpected request: {:?}", x),
665 }
666 }
667
668 #[test(tokio::test)]
669 async fn stdin_write_should_convert_content_with_distant_scheme_to_file_scheme() {
670 let (mut transport, mut proc) = spawn_lsp_process().await;
671
672 proc.stdin
673 .as_mut()
674 .unwrap()
675 .write(&make_lsp_msg(serde_json::json!({
676 "field1": "distant://some/path",
677 "field2": "file://other/path",
678 })))
679 .await
680 .unwrap();
681
682 let req: Request<protocol::Request> = transport.read_frame_as().await.unwrap().unwrap();
684 match req.payload {
685 protocol::Request::ProcStdin { data, .. } => {
686 assert_eq!(
690 data,
691 make_lsp_msg(serde_json::json!({
692 "field1": "file://some/path",
693 "field2": "file://other/path",
694 }))
695 );
696 }
697 x => panic!("Unexpected request: {:?}", x),
698 }
699 }
700
701 #[test(tokio::test)]
702 async fn stdout_read_should_yield_lsp_messages_as_strings() {
703 let (mut transport, mut proc) = spawn_lsp_process().await;
704
705 transport
707 .write_frame_for(&Response::new(
708 proc.origin_id().to_string(),
709 protocol::Response::ProcStdout {
710 id: proc.id(),
711 data: make_lsp_msg(serde_json::json!({
712 "field1": "a",
713 "field2": "b",
714 })),
715 },
716 ))
717 .await
718 .unwrap();
719
720 let out = proc.stdout.as_mut().unwrap().read().await.unwrap();
722 assert_eq!(
723 out,
724 make_lsp_msg(serde_json::json!({
725 "field1": "a",
726 "field2": "b",
727 }))
728 );
729 }
730
731 #[test(tokio::test)]
732 async fn stdout_read_should_only_yield_complete_lsp_messages() {
733 let (mut transport, mut proc) = spawn_lsp_process().await;
734
735 let msg = make_lsp_msg(serde_json::json!({
736 "field1": "a",
737 "field2": "b",
738 }));
739 let (msg_a, msg_b) = msg.split_at(msg.len() / 2);
740
741 transport
743 .write_frame_for(&Response::new(
744 proc.origin_id().to_string(),
745 protocol::Response::ProcStdout {
746 id: proc.id(),
747 data: msg_a.to_vec(),
748 },
749 ))
750 .await
751 .unwrap();
752
753 tokio::task::yield_now().await;
756 let result = timeout(TIMEOUT, proc.stdout.as_mut().unwrap().read()).await;
757 assert!(result.is_err(), "Unexpectedly got data: {:?}", result);
758
759 transport
761 .write_frame_for(&Response::new(
762 proc.origin_id().to_string(),
763 protocol::Response::ProcStdout {
764 id: proc.id(),
765 data: msg_b.to_vec(),
766 },
767 ))
768 .await
769 .unwrap();
770
771 let out = proc.stdout.as_mut().unwrap().read().await.unwrap();
773 assert_eq!(
774 out,
775 make_lsp_msg(serde_json::json!({
776 "field1": "a",
777 "field2": "b",
778 }))
779 );
780 }
781
782 #[test(tokio::test)]
783 async fn stdout_read_should_only_consume_a_complete_lsp_message_even_if_more_output_is_available(
784 ) {
785 let (mut transport, mut proc) = spawn_lsp_process().await;
786
787 let msg = make_lsp_msg(serde_json::json!({
788 "field1": "a",
789 "field2": "b",
790 }));
791 let extra = "some extra content";
792
793 transport
795 .write_frame_for(&Response::new(
796 proc.origin_id().to_string(),
797 protocol::Response::ProcStdout {
798 id: proc.id(),
799 data: format!("{}{}", String::from_utf8(msg).unwrap(), extra).into_bytes(),
800 },
801 ))
802 .await
803 .unwrap();
804
805 let out = proc.stdout.as_mut().unwrap().read().await.unwrap();
807 assert_eq!(
808 out,
809 make_lsp_msg(serde_json::json!({
810 "field1": "a",
811 "field2": "b",
812 }))
813 );
814
815 let result = timeout(TIMEOUT, proc.stdout.as_mut().unwrap().read()).await;
817 assert!(
818 result.is_err(),
819 "Unexpected extra content received on stdout"
820 );
821 }
822
823 #[test(tokio::test)]
824 async fn stdout_read_should_support_yielding_multiple_lsp_messages_if_all_received_at_once() {
825 let (mut transport, mut proc) = spawn_lsp_process().await;
826
827 let msg_1 = make_lsp_msg(serde_json::json!({
828 "field1": "a",
829 "field2": "b",
830 }));
831 let msg_2 = make_lsp_msg(serde_json::json!({
832 "field1": "c",
833 "field2": "d",
834 }));
835
836 transport
838 .write_frame_for(&Response::new(
839 proc.origin_id().to_string(),
840 protocol::Response::ProcStdout {
841 id: proc.id(),
842 data: format!(
843 "{}{}",
844 String::from_utf8(msg_1).unwrap(),
845 String::from_utf8(msg_2).unwrap()
846 )
847 .into_bytes(),
848 },
849 ))
850 .await
851 .unwrap();
852
853 let out = proc.stdout.as_mut().unwrap().read().await.unwrap();
855 assert_eq!(
856 out,
857 format!(
858 "{}{}",
859 String::from_utf8(make_lsp_msg(serde_json::json!({
860 "field1": "a",
861 "field2": "b",
862 })))
863 .unwrap(),
864 String::from_utf8(make_lsp_msg(serde_json::json!({
865 "field1": "c",
866 "field2": "d",
867 })))
868 .unwrap()
869 )
870 .into_bytes()
871 );
872 }
873
874 #[test(tokio::test)]
875 async fn stdout_read_should_convert_content_with_file_scheme_to_distant_scheme() {
876 let (mut transport, mut proc) = spawn_lsp_process().await;
877
878 transport
880 .write_frame_for(&Response::new(
881 proc.origin_id().to_string(),
882 protocol::Response::ProcStdout {
883 id: proc.id(),
884 data: make_lsp_msg(serde_json::json!({
885 "field1": "distant://some/path",
886 "field2": "file://other/path",
887 })),
888 },
889 ))
890 .await
891 .unwrap();
892
893 let out = proc.stdout.as_mut().unwrap().read().await.unwrap();
895 assert_eq!(
896 out,
897 make_lsp_msg(serde_json::json!({
898 "field1": "distant://some/path",
899 "field2": "distant://other/path",
900 }))
901 );
902 }
903
904 #[test(tokio::test)]
905 async fn stderr_read_should_yield_lsp_messages_as_strings() {
906 let (mut transport, mut proc) = spawn_lsp_process().await;
907
908 transport
910 .write_frame_for(&Response::new(
911 proc.origin_id().to_string(),
912 protocol::Response::ProcStderr {
913 id: proc.id(),
914 data: make_lsp_msg(serde_json::json!({
915 "field1": "a",
916 "field2": "b",
917 })),
918 },
919 ))
920 .await
921 .unwrap();
922
923 let err = proc.stderr.as_mut().unwrap().read().await.unwrap();
925 assert_eq!(
926 err,
927 make_lsp_msg(serde_json::json!({
928 "field1": "a",
929 "field2": "b",
930 }))
931 );
932 }
933
934 #[test(tokio::test)]
935 async fn stderr_read_should_only_yield_complete_lsp_messages() {
936 let (mut transport, mut proc) = spawn_lsp_process().await;
937
938 let msg = make_lsp_msg(serde_json::json!({
939 "field1": "a",
940 "field2": "b",
941 }));
942 let (msg_a, msg_b) = msg.split_at(msg.len() / 2);
943
944 transport
946 .write_frame_for(&Response::new(
947 proc.origin_id().to_string(),
948 protocol::Response::ProcStderr {
949 id: proc.id(),
950 data: msg_a.to_vec(),
951 },
952 ))
953 .await
954 .unwrap();
955
956 tokio::task::yield_now().await;
959 let result = timeout(TIMEOUT, proc.stderr.as_mut().unwrap().read()).await;
960 assert!(result.is_err(), "Unexpectedly got data: {:?}", result);
961
962 transport
964 .write_frame_for(&Response::new(
965 proc.origin_id().to_string(),
966 protocol::Response::ProcStderr {
967 id: proc.id(),
968 data: msg_b.to_vec(),
969 },
970 ))
971 .await
972 .unwrap();
973
974 let err = proc.stderr.as_mut().unwrap().read().await.unwrap();
976 assert_eq!(
977 err,
978 make_lsp_msg(serde_json::json!({
979 "field1": "a",
980 "field2": "b",
981 }))
982 );
983 }
984
985 #[test(tokio::test)]
986 async fn stderr_read_should_only_consume_a_complete_lsp_message_even_if_more_errput_is_available(
987 ) {
988 let (mut transport, mut proc) = spawn_lsp_process().await;
989
990 let msg = make_lsp_msg(serde_json::json!({
991 "field1": "a",
992 "field2": "b",
993 }));
994 let extra = "some extra content";
995
996 transport
998 .write_frame_for(&Response::new(
999 proc.origin_id().to_string(),
1000 protocol::Response::ProcStderr {
1001 id: proc.id(),
1002 data: format!("{}{}", String::from_utf8(msg).unwrap(), extra).into_bytes(),
1003 },
1004 ))
1005 .await
1006 .unwrap();
1007
1008 let err = proc.stderr.as_mut().unwrap().read().await.unwrap();
1010 assert_eq!(
1011 err,
1012 make_lsp_msg(serde_json::json!({
1013 "field1": "a",
1014 "field2": "b",
1015 }))
1016 );
1017
1018 let result = timeout(TIMEOUT, proc.stderr.as_mut().unwrap().read()).await;
1020 assert!(
1021 result.is_err(),
1022 "Unexpected extra content received on stderr"
1023 );
1024 }
1025
1026 #[test(tokio::test)]
1027 async fn stderr_read_should_support_yielding_multiple_lsp_messages_if_all_received_at_once() {
1028 let (mut transport, mut proc) = spawn_lsp_process().await;
1029
1030 let msg_1 = make_lsp_msg(serde_json::json!({
1031 "field1": "a",
1032 "field2": "b",
1033 }));
1034 let msg_2 = make_lsp_msg(serde_json::json!({
1035 "field1": "c",
1036 "field2": "d",
1037 }));
1038
1039 transport
1041 .write_frame_for(&Response::new(
1042 proc.origin_id().to_string(),
1043 protocol::Response::ProcStderr {
1044 id: proc.id(),
1045 data: format!(
1046 "{}{}",
1047 String::from_utf8(msg_1).unwrap(),
1048 String::from_utf8(msg_2).unwrap()
1049 )
1050 .into_bytes(),
1051 },
1052 ))
1053 .await
1054 .unwrap();
1055
1056 let err = proc.stderr.as_mut().unwrap().read().await.unwrap();
1058 assert_eq!(
1059 err,
1060 format!(
1061 "{}{}",
1062 String::from_utf8(make_lsp_msg(serde_json::json!({
1063 "field1": "a",
1064 "field2": "b",
1065 })))
1066 .unwrap(),
1067 String::from_utf8(make_lsp_msg(serde_json::json!({
1068 "field1": "c",
1069 "field2": "d",
1070 })))
1071 .unwrap()
1072 )
1073 .into_bytes()
1074 );
1075 }
1076
1077 #[test(tokio::test)]
1078 async fn stderr_read_should_convert_content_with_file_scheme_to_distant_scheme() {
1079 let (mut transport, mut proc) = spawn_lsp_process().await;
1080
1081 transport
1083 .write_frame_for(&Response::new(
1084 proc.origin_id().to_string(),
1085 protocol::Response::ProcStderr {
1086 id: proc.id(),
1087 data: make_lsp_msg(serde_json::json!({
1088 "field1": "distant://some/path",
1089 "field2": "file://other/path",
1090 })),
1091 },
1092 ))
1093 .await
1094 .unwrap();
1095
1096 let err = proc.stderr.as_mut().unwrap().read().await.unwrap();
1098 assert_eq!(
1099 err,
1100 make_lsp_msg(serde_json::json!({
1101 "field1": "distant://some/path",
1102 "field2": "distant://other/path",
1103 }))
1104 );
1105 }
1106}