distant_core/client/
lsp.rs

1use std::io::{self, Cursor, Read};
2use std::ops::{Deref, DerefMut};
3use std::path::PathBuf;
4
5use futures::stream::{Stream, StreamExt};
6use tokio::sync::mpsc;
7use tokio::sync::mpsc::error::TryRecvError;
8use tokio::task::JoinHandle;
9
10use crate::client::{
11    DistantChannel, RemoteCommand, RemoteProcess, RemoteStatus, RemoteStderr, RemoteStdin,
12    RemoteStdout,
13};
14use crate::protocol::{Environment, PtySize};
15
16mod msg;
17pub use msg::*;
18
19/// A [`RemoteLspProcess`] builder providing support to configure
20/// before spawning the process on a remote machine
21pub struct RemoteLspCommand {
22    pty: Option<PtySize>,
23    environment: Environment,
24    current_dir: Option<PathBuf>,
25    scheme: Option<String>,
26}
27
28impl Default for RemoteLspCommand {
29    fn default() -> Self {
30        Self::new()
31    }
32}
33
34impl RemoteLspCommand {
35    /// Creates a new set of options for a remote LSP process
36    pub fn new() -> Self {
37        Self {
38            pty: None,
39            environment: Environment::new(),
40            current_dir: None,
41            scheme: None,
42        }
43    }
44
45    /// Configures the process to leverage a PTY with the specified size
46    pub fn pty(&mut self, pty: Option<PtySize>) -> &mut Self {
47        self.pty = pty;
48        self
49    }
50
51    /// Replaces the existing environment variables with the given collection
52    pub fn environment(&mut self, environment: Environment) -> &mut Self {
53        self.environment = environment;
54        self
55    }
56
57    /// Configures the process with an alternative current directory
58    pub fn current_dir(&mut self, current_dir: Option<PathBuf>) -> &mut Self {
59        self.current_dir = current_dir;
60        self
61    }
62
63    /// Configures the process with a specific scheme to convert rather than `distant://`
64    pub fn scheme(&mut self, scheme: Option<String>) -> &mut Self {
65        self.scheme = scheme;
66        self
67    }
68
69    /// Spawns the specified process on the remote machine using the given session, treating
70    /// the process like an LSP server
71    pub async fn spawn(
72        &mut self,
73        channel: DistantChannel,
74        cmd: impl Into<String>,
75    ) -> io::Result<RemoteLspProcess> {
76        let mut command = RemoteCommand::new();
77        command.environment(self.environment.clone());
78        command.current_dir(self.current_dir.clone());
79        command.pty(self.pty);
80
81        let mut inner = command.spawn(channel, cmd).await?;
82        let stdin = inner
83            .stdin
84            .take()
85            .map(|x| RemoteLspStdin::new(x, self.scheme.clone()));
86        let stdout = inner
87            .stdout
88            .take()
89            .map(|x| RemoteLspStdout::new(x, self.scheme.clone()));
90        let stderr = inner
91            .stderr
92            .take()
93            .map(|x| RemoteLspStderr::new(x, self.scheme.clone()));
94
95        Ok(RemoteLspProcess {
96            inner,
97            stdin,
98            stdout,
99            stderr,
100        })
101    }
102}
103
104/// Represents an LSP server process on a remote machine
105#[derive(Debug)]
106pub struct RemoteLspProcess {
107    inner: RemoteProcess,
108    pub stdin: Option<RemoteLspStdin>,
109    pub stdout: Option<RemoteLspStdout>,
110    pub stderr: Option<RemoteLspStderr>,
111}
112
113impl RemoteLspProcess {
114    /// Waits for the process to terminate, returning the success status and an optional exit code
115    pub async fn wait(self) -> io::Result<RemoteStatus> {
116        self.inner.wait().await
117    }
118}
119
120impl Deref for RemoteLspProcess {
121    type Target = RemoteProcess;
122
123    fn deref(&self) -> &Self::Target {
124        &self.inner
125    }
126}
127
128impl DerefMut for RemoteLspProcess {
129    fn deref_mut(&mut self) -> &mut Self::Target {
130        &mut self.inner
131    }
132}
133
134/// A handle to a remote LSP process' standard input (stdin)
135#[derive(Debug)]
136pub struct RemoteLspStdin {
137    inner: RemoteStdin,
138    buf: Option<Vec<u8>>,
139    scheme: Option<String>,
140}
141
142impl RemoteLspStdin {
143    pub fn new(inner: RemoteStdin, scheme: impl Into<Option<String>>) -> Self {
144        Self {
145            inner,
146            buf: None,
147            scheme: scheme.into(),
148        }
149    }
150
151    /// Tries to write data to the stdin of a specific remote process
152    pub fn try_write(&mut self, data: &[u8]) -> io::Result<()> {
153        let queue = self.update_and_read_messages(data)?;
154
155        // Process and then send out each LSP message in our queue
156        for mut data in queue {
157            // Convert distant:// to file://
158            match self.scheme.as_mut() {
159                Some(scheme) => data.mut_content().convert_scheme_to_local(scheme),
160                None => data.mut_content().convert_distant_scheme_to_local(),
161            }
162            data.refresh_content_length();
163            self.inner.try_write_str(data.to_string())?;
164        }
165
166        Ok(())
167    }
168
169    pub fn try_write_str(&mut self, data: &str) -> io::Result<()> {
170        self.try_write(data.as_bytes())
171    }
172
173    /// Writes data to the stdin of a specific remote process
174    pub async fn write(&mut self, data: &[u8]) -> io::Result<()> {
175        let queue = self.update_and_read_messages(data)?;
176
177        // Process and then send out each LSP message in our queue
178        for mut data in queue {
179            // Convert distant:// to file://
180            match self.scheme.as_mut() {
181                Some(scheme) => data.mut_content().convert_scheme_to_local(scheme),
182                None => data.mut_content().convert_distant_scheme_to_local(),
183            }
184            data.refresh_content_length();
185            self.inner.write_str(data.to_string()).await?;
186        }
187
188        Ok(())
189    }
190
191    pub async fn write_str(&mut self, data: &str) -> io::Result<()> {
192        self.write(data.as_bytes()).await
193    }
194
195    fn update_and_read_messages(&mut self, data: &[u8]) -> io::Result<Vec<LspMsg>> {
196        // Create or insert into our buffer
197        match &mut self.buf {
198            Some(buf) => buf.extend(data),
199            None => self.buf = Some(data.to_vec()),
200        }
201
202        // Read LSP messages from our internal buffer
203        let buf = self.buf.take().unwrap();
204        match read_lsp_messages(&buf) {
205            // If we succeed, update buf with our remainder and return messages
206            Ok((remainder, queue)) => {
207                self.buf = remainder;
208                Ok(queue)
209            }
210
211            // Otherwise, if failed, reset buf back to what it was
212            Err(x) => {
213                self.buf = Some(buf);
214                Err(x)
215            }
216        }
217    }
218}
219
220/// A handle to a remote LSP process' standard output (stdout)
221#[derive(Debug)]
222pub struct RemoteLspStdout {
223    read_task: JoinHandle<()>,
224    rx: mpsc::Receiver<io::Result<Vec<u8>>>,
225}
226
227impl RemoteLspStdout {
228    pub fn new(inner: RemoteStdout, scheme: impl Into<Option<String>>) -> Self {
229        let (read_task, rx) = spawn_read_task(
230            Box::pin(futures::stream::unfold(inner, |mut inner| async move {
231                match inner.read().await {
232                    Ok(res) => Some((res, inner)),
233                    Err(_) => None,
234                }
235            })),
236            scheme,
237        );
238
239        Self { read_task, rx }
240    }
241
242    /// Tries to read a complete LSP message over stdout, returning `None` if no complete message
243    /// is available
244    pub fn try_read(&mut self) -> io::Result<Option<Vec<u8>>> {
245        match self.rx.try_recv() {
246            Ok(Ok(data)) => Ok(Some(data)),
247            Ok(Err(x)) => Err(x),
248            Err(TryRecvError::Empty) => Ok(None),
249            Err(TryRecvError::Disconnected) => Err(io::Error::from(io::ErrorKind::BrokenPipe)),
250        }
251    }
252
253    /// Same as `try_read`, but returns a string
254    pub fn try_read_string(&mut self) -> io::Result<Option<String>> {
255        self.try_read().and_then(|x| match x {
256            Some(data) => String::from_utf8(data)
257                .map(Some)
258                .map_err(|x| io::Error::new(io::ErrorKind::InvalidData, x)),
259            None => Ok(None),
260        })
261    }
262
263    /// Reads a complete LSP message over stdout
264    pub async fn read(&mut self) -> io::Result<Vec<u8>> {
265        self.rx
266            .recv()
267            .await
268            .ok_or_else(|| io::Error::from(io::ErrorKind::BrokenPipe))?
269    }
270
271    /// Same as `read`, but returns a string
272    pub async fn read_string(&mut self) -> io::Result<String> {
273        self.read().await.and_then(|data| {
274            String::from_utf8(data).map_err(|x| io::Error::new(io::ErrorKind::InvalidData, x))
275        })
276    }
277}
278
279impl Drop for RemoteLspStdout {
280    fn drop(&mut self) {
281        self.read_task.abort();
282        self.rx.close();
283    }
284}
285
286/// A handle to a remote LSP process' stderr
287#[derive(Debug)]
288pub struct RemoteLspStderr {
289    read_task: JoinHandle<()>,
290    rx: mpsc::Receiver<io::Result<Vec<u8>>>,
291}
292
293impl RemoteLspStderr {
294    pub fn new(inner: RemoteStderr, scheme: impl Into<Option<String>>) -> Self {
295        let (read_task, rx) = spawn_read_task(
296            Box::pin(futures::stream::unfold(inner, |mut inner| async move {
297                match inner.read().await {
298                    Ok(res) => Some((res, inner)),
299                    Err(_) => None,
300                }
301            })),
302            scheme,
303        );
304
305        Self { read_task, rx }
306    }
307
308    /// Tries to read a complete LSP message over stderr, returning `None` if no complete message
309    /// is available
310    pub fn try_read(&mut self) -> io::Result<Option<Vec<u8>>> {
311        match self.rx.try_recv() {
312            Ok(Ok(data)) => Ok(Some(data)),
313            Ok(Err(x)) => Err(x),
314            Err(TryRecvError::Empty) => Ok(None),
315            Err(TryRecvError::Disconnected) => Err(io::Error::from(io::ErrorKind::BrokenPipe)),
316        }
317    }
318
319    /// Same as `try_read`, but returns a string
320    pub fn try_read_string(&mut self) -> io::Result<Option<String>> {
321        self.try_read().and_then(|x| match x {
322            Some(data) => String::from_utf8(data)
323                .map(Some)
324                .map_err(|x| io::Error::new(io::ErrorKind::InvalidData, x)),
325            None => Ok(None),
326        })
327    }
328
329    /// Reads a complete LSP message over stderr
330    pub async fn read(&mut self) -> io::Result<Vec<u8>> {
331        self.rx
332            .recv()
333            .await
334            .ok_or_else(|| io::Error::from(io::ErrorKind::BrokenPipe))?
335    }
336
337    /// Same as `read`, but returns a string
338    pub async fn read_string(&mut self) -> io::Result<String> {
339        self.read().await.and_then(|data| {
340            String::from_utf8(data).map_err(|x| io::Error::new(io::ErrorKind::InvalidData, x))
341        })
342    }
343}
344
345impl Drop for RemoteLspStderr {
346    fn drop(&mut self) {
347        self.read_task.abort();
348        self.rx.close();
349    }
350}
351
352fn spawn_read_task<S>(
353    mut stream: S,
354    scheme: impl Into<Option<String>>,
355) -> (JoinHandle<()>, mpsc::Receiver<io::Result<Vec<u8>>>)
356where
357    S: Stream<Item = Vec<u8>> + Send + Unpin + 'static,
358{
359    let mut scheme = scheme.into();
360    let (tx, rx) = mpsc::channel::<io::Result<Vec<u8>>>(1);
361    let read_task = tokio::spawn(async move {
362        let mut task_buf: Option<Vec<u8>> = None;
363
364        while let Some(data) = stream.next().await {
365            // Create or insert into our buffer
366            match &mut task_buf {
367                Some(buf) => buf.extend(data),
368                None => task_buf = Some(data),
369            }
370
371            // Read LSP messages from our internal buffer
372            let buf = task_buf.take().unwrap();
373            let (remainder, queue) = match read_lsp_messages(&buf) {
374                Ok(x) => x,
375                Err(x) => {
376                    let _ = tx.send(Err(x)).await;
377                    break;
378                }
379            };
380            task_buf = remainder;
381
382            // Process and then add each LSP message as output
383            if !queue.is_empty() {
384                let mut out = Vec::new();
385                for mut data in queue {
386                    // Convert file:// to distant://
387                    match scheme.as_mut() {
388                        Some(scheme) => data.mut_content().convert_local_scheme_to(scheme),
389                        None => data.mut_content().convert_local_scheme_to_distant(),
390                    }
391                    data.refresh_content_length();
392                    out.extend(data.to_bytes());
393                }
394                if tx.send(Ok(out)).await.is_err() {
395                    break;
396                }
397            }
398        }
399    });
400
401    (read_task, rx)
402}
403
404fn read_lsp_messages(input: &[u8]) -> io::Result<(Option<Vec<u8>>, Vec<LspMsg>)> {
405    let mut queue = Vec::new();
406
407    // Continue to read complete messages from the input until we either fail to parse or we reach
408    // end of input, resetting cursor position back to last successful parse as otherwise the
409    // cursor may have moved partially from lsp successfully reading the start of a message
410    let mut cursor = Cursor::new(input);
411    let mut pos = 0;
412    while let Ok(data) = LspMsg::from_buf_reader(&mut cursor) {
413        queue.push(data);
414        pos = cursor.position();
415    }
416    cursor.set_position(pos);
417
418    // Keep remainder of bytes not processed as LSP message in buffer
419    let remainder = if (cursor.position() as usize) < cursor.get_ref().len() {
420        let mut buf = Vec::new();
421        cursor.read_to_end(&mut buf)?;
422        Some(buf)
423    } else {
424        None
425    };
426
427    Ok((remainder, queue))
428}
429
430#[cfg(test)]
431mod tests {
432    use std::future::Future;
433    use std::time::Duration;
434
435    use distant_net::common::{FramedTransport, InmemoryTransport, Request, Response};
436    use distant_net::Client;
437    use test_log::test;
438
439    use super::*;
440    use crate::protocol;
441
442    /// Timeout used with timeout function
443    const TIMEOUT: Duration = Duration::from_millis(50);
444
445    // Configures an lsp process with a means to send & receive data from outside
446    async fn spawn_lsp_process() -> (FramedTransport<InmemoryTransport>, RemoteLspProcess) {
447        let (mut t1, t2) = FramedTransport::pair(100);
448        let client = Client::spawn_inmemory(t2, Default::default());
449        let spawn_task = tokio::spawn({
450            let channel = client.clone_channel();
451            async move {
452                RemoteLspCommand::new()
453                    .spawn(channel, String::from("cmd arg"))
454                    .await
455            }
456        });
457
458        // Wait until we get the request from the session
459        let req: Request<protocol::Request> = t1.read_frame_as().await.unwrap().unwrap();
460
461        // Send back a response through the session
462        t1.write_frame_for(&Response::new(
463            req.id,
464            protocol::Response::ProcSpawned { id: rand::random() },
465        ))
466        .await
467        .unwrap();
468
469        // Wait for the process to be ready
470        let proc = spawn_task.await.unwrap().unwrap();
471        (t1, proc)
472    }
473
474    fn make_lsp_msg<T>(value: T) -> Vec<u8>
475    where
476        T: serde::Serialize,
477    {
478        let content = serde_json::to_string_pretty(&value).unwrap();
479        format!("Content-Length: {}\r\n\r\n{}", content.len(), content).into_bytes()
480    }
481
482    async fn timeout<F, R>(duration: Duration, f: F) -> io::Result<R>
483    where
484        F: Future<Output = R>,
485    {
486        tokio::select! {
487            res = f => {
488                Ok(res)
489            }
490            _ = tokio::time::sleep(duration) => {
491                Err(io::Error::from(io::ErrorKind::TimedOut))
492            }
493        }
494    }
495
496    #[test(tokio::test)]
497    async fn stdin_write_should_only_send_out_complete_lsp_messages() {
498        let (mut transport, mut proc) = spawn_lsp_process().await;
499
500        proc.stdin
501            .as_mut()
502            .unwrap()
503            .write(&make_lsp_msg(serde_json::json!({
504                "field1": "a",
505                "field2": "b",
506            })))
507            .await
508            .unwrap();
509
510        // Validate that the outgoing req is a complete LSP message
511        let req: Request<protocol::Request> = transport.read_frame_as().await.unwrap().unwrap();
512        match req.payload {
513            protocol::Request::ProcStdin { data, .. } => {
514                assert_eq!(
515                    data,
516                    make_lsp_msg(serde_json::json!({
517                        "field1": "a",
518                        "field2": "b",
519                    }))
520                );
521            }
522            x => panic!("Unexpected request: {:?}", x),
523        }
524    }
525
526    #[test(tokio::test)]
527    async fn stdin_write_should_support_buffering_output_until_a_complete_lsp_message_is_composed()
528    {
529        let (mut transport, mut proc) = spawn_lsp_process().await;
530
531        let msg = make_lsp_msg(serde_json::json!({
532            "field1": "a",
533            "field2": "b",
534        }));
535        let (msg_a, msg_b) = msg.split_at(msg.len() / 2);
536
537        // Write part of the message that isn't finished
538        proc.stdin.as_mut().unwrap().write(msg_a).await.unwrap();
539
540        // Verify that nothing has been sent out yet
541        // NOTE: Yield to ensure that data would be waiting at the transport if it was sent
542        tokio::task::yield_now().await;
543        let result = timeout(
544            TIMEOUT,
545            transport.read_frame_as::<Request<protocol::Request>>(),
546        )
547        .await;
548        assert!(result.is_err(), "Unexpectedly got data: {:?}", result);
549
550        // Write remainder of message
551        proc.stdin.as_mut().unwrap().write(msg_b).await.unwrap();
552
553        // Validate that the outgoing req is a complete LSP message
554        let req: Request<protocol::Request> = transport.read_frame_as().await.unwrap().unwrap();
555        match req.payload {
556            protocol::Request::ProcStdin { data, .. } => {
557                assert_eq!(
558                    data,
559                    make_lsp_msg(serde_json::json!({
560                        "field1": "a",
561                        "field2": "b",
562                    }))
563                );
564            }
565            x => panic!("Unexpected request: {:?}", x),
566        }
567    }
568
569    #[test(tokio::test)]
570    async fn stdin_write_should_only_consume_a_complete_lsp_message_even_if_more_is_written() {
571        let (mut transport, mut proc) = spawn_lsp_process().await;
572
573        let msg = make_lsp_msg(serde_json::json!({
574            "field1": "a",
575            "field2": "b",
576        }));
577
578        let extra = "Content-Length: 123";
579
580        // Write a full message plus some extra
581        proc.stdin
582            .as_mut()
583            .unwrap()
584            .write_str(&format!("{}{}", String::from_utf8(msg).unwrap(), extra))
585            .await
586            .unwrap();
587
588        // Validate that the outgoing req is a complete LSP message
589        let req: Request<protocol::Request> = transport.read_frame_as().await.unwrap().unwrap();
590        match req.payload {
591            protocol::Request::ProcStdin { data, .. } => {
592                assert_eq!(
593                    data,
594                    make_lsp_msg(serde_json::json!({
595                        "field1": "a",
596                        "field2": "b",
597                    }))
598                );
599            }
600            x => panic!("Unexpected request: {:?}", x),
601        }
602
603        // Also validate that the internal buffer still contains the extra
604        assert_eq!(
605            String::from_utf8(proc.stdin.unwrap().buf.unwrap()).unwrap(),
606            extra,
607            "Extra was not retained"
608        );
609    }
610
611    #[test(tokio::test)]
612    async fn stdin_write_should_support_sending_out_multiple_lsp_messages_if_all_received_at_once()
613    {
614        let (mut transport, mut proc) = spawn_lsp_process().await;
615
616        let msg_1 = make_lsp_msg(serde_json::json!({
617            "field1": "a",
618            "field2": "b",
619        }));
620        let msg_2 = make_lsp_msg(serde_json::json!({
621            "field1": "c",
622            "field2": "d",
623        }));
624
625        // Write two full messages at once
626        proc.stdin
627            .as_mut()
628            .unwrap()
629            .write_str(&format!(
630                "{}{}",
631                String::from_utf8(msg_1).unwrap(),
632                String::from_utf8(msg_2).unwrap()
633            ))
634            .await
635            .unwrap();
636
637        // Validate that the first outgoing req is a complete LSP message matching first
638        let req: Request<protocol::Request> = transport.read_frame_as().await.unwrap().unwrap();
639        match req.payload {
640            protocol::Request::ProcStdin { data, .. } => {
641                assert_eq!(
642                    data,
643                    make_lsp_msg(serde_json::json!({
644                        "field1": "a",
645                        "field2": "b",
646                    }))
647                );
648            }
649            x => panic!("Unexpected request: {:?}", x),
650        }
651
652        // Validate that the second outgoing req is a complete LSP message matching second
653        let req: Request<protocol::Request> = transport.read_frame_as().await.unwrap().unwrap();
654        match req.payload {
655            protocol::Request::ProcStdin { data, .. } => {
656                assert_eq!(
657                    data,
658                    make_lsp_msg(serde_json::json!({
659                        "field1": "c",
660                        "field2": "d",
661                    }))
662                );
663            }
664            x => panic!("Unexpected request: {:?}", x),
665        }
666    }
667
668    #[test(tokio::test)]
669    async fn stdin_write_should_convert_content_with_distant_scheme_to_file_scheme() {
670        let (mut transport, mut proc) = spawn_lsp_process().await;
671
672        proc.stdin
673            .as_mut()
674            .unwrap()
675            .write(&make_lsp_msg(serde_json::json!({
676                "field1": "distant://some/path",
677                "field2": "file://other/path",
678            })))
679            .await
680            .unwrap();
681
682        // Validate that the outgoing req is a complete LSP message
683        let req: Request<protocol::Request> = transport.read_frame_as().await.unwrap().unwrap();
684        match req.payload {
685            protocol::Request::ProcStdin { data, .. } => {
686                // Verify the contents AND headers are as expected; in this case,
687                // this will also ensure that the Content-Length is adjusted
688                // when the distant scheme was changed to file
689                assert_eq!(
690                    data,
691                    make_lsp_msg(serde_json::json!({
692                        "field1": "file://some/path",
693                        "field2": "file://other/path",
694                    }))
695                );
696            }
697            x => panic!("Unexpected request: {:?}", x),
698        }
699    }
700
701    #[test(tokio::test)]
702    async fn stdout_read_should_yield_lsp_messages_as_strings() {
703        let (mut transport, mut proc) = spawn_lsp_process().await;
704
705        // Send complete LSP message as stdout to process
706        transport
707            .write_frame_for(&Response::new(
708                proc.origin_id().to_string(),
709                protocol::Response::ProcStdout {
710                    id: proc.id(),
711                    data: make_lsp_msg(serde_json::json!({
712                        "field1": "a",
713                        "field2": "b",
714                    })),
715                },
716            ))
717            .await
718            .unwrap();
719
720        // Receive complete message as stdout from process
721        let out = proc.stdout.as_mut().unwrap().read().await.unwrap();
722        assert_eq!(
723            out,
724            make_lsp_msg(serde_json::json!({
725                "field1": "a",
726                "field2": "b",
727            }))
728        );
729    }
730
731    #[test(tokio::test)]
732    async fn stdout_read_should_only_yield_complete_lsp_messages() {
733        let (mut transport, mut proc) = spawn_lsp_process().await;
734
735        let msg = make_lsp_msg(serde_json::json!({
736            "field1": "a",
737            "field2": "b",
738        }));
739        let (msg_a, msg_b) = msg.split_at(msg.len() / 2);
740
741        // Send half of LSP message over stdout
742        transport
743            .write_frame_for(&Response::new(
744                proc.origin_id().to_string(),
745                protocol::Response::ProcStdout {
746                    id: proc.id(),
747                    data: msg_a.to_vec(),
748                },
749            ))
750            .await
751            .unwrap();
752
753        // Verify that remote process has not received a complete message yet
754        // NOTE: Yield to ensure that data would be waiting at the transport if it was sent
755        tokio::task::yield_now().await;
756        let result = timeout(TIMEOUT, proc.stdout.as_mut().unwrap().read()).await;
757        assert!(result.is_err(), "Unexpectedly got data: {:?}", result);
758
759        // Send other half of LSP message over stdout
760        transport
761            .write_frame_for(&Response::new(
762                proc.origin_id().to_string(),
763                protocol::Response::ProcStdout {
764                    id: proc.id(),
765                    data: msg_b.to_vec(),
766                },
767            ))
768            .await
769            .unwrap();
770
771        // Receive complete message as stdout from process
772        let out = proc.stdout.as_mut().unwrap().read().await.unwrap();
773        assert_eq!(
774            out,
775            make_lsp_msg(serde_json::json!({
776                "field1": "a",
777                "field2": "b",
778            }))
779        );
780    }
781
782    #[test(tokio::test)]
783    async fn stdout_read_should_only_consume_a_complete_lsp_message_even_if_more_output_is_available(
784    ) {
785        let (mut transport, mut proc) = spawn_lsp_process().await;
786
787        let msg = make_lsp_msg(serde_json::json!({
788            "field1": "a",
789            "field2": "b",
790        }));
791        let extra = "some extra content";
792
793        // Send complete LSP message as stdout to process
794        transport
795            .write_frame_for(&Response::new(
796                proc.origin_id().to_string(),
797                protocol::Response::ProcStdout {
798                    id: proc.id(),
799                    data: format!("{}{}", String::from_utf8(msg).unwrap(), extra).into_bytes(),
800                },
801            ))
802            .await
803            .unwrap();
804
805        // Receive complete message as stdout from process
806        let out = proc.stdout.as_mut().unwrap().read().await.unwrap();
807        assert_eq!(
808            out,
809            make_lsp_msg(serde_json::json!({
810                "field1": "a",
811                "field2": "b",
812            }))
813        );
814
815        // Verify nothing else was sent
816        let result = timeout(TIMEOUT, proc.stdout.as_mut().unwrap().read()).await;
817        assert!(
818            result.is_err(),
819            "Unexpected extra content received on stdout"
820        );
821    }
822
823    #[test(tokio::test)]
824    async fn stdout_read_should_support_yielding_multiple_lsp_messages_if_all_received_at_once() {
825        let (mut transport, mut proc) = spawn_lsp_process().await;
826
827        let msg_1 = make_lsp_msg(serde_json::json!({
828            "field1": "a",
829            "field2": "b",
830        }));
831        let msg_2 = make_lsp_msg(serde_json::json!({
832            "field1": "c",
833            "field2": "d",
834        }));
835
836        // Send complete LSP message as stdout to process
837        transport
838            .write_frame_for(&Response::new(
839                proc.origin_id().to_string(),
840                protocol::Response::ProcStdout {
841                    id: proc.id(),
842                    data: format!(
843                        "{}{}",
844                        String::from_utf8(msg_1).unwrap(),
845                        String::from_utf8(msg_2).unwrap()
846                    )
847                    .into_bytes(),
848                },
849            ))
850            .await
851            .unwrap();
852
853        // Should send both messages back together as a single string
854        let out = proc.stdout.as_mut().unwrap().read().await.unwrap();
855        assert_eq!(
856            out,
857            format!(
858                "{}{}",
859                String::from_utf8(make_lsp_msg(serde_json::json!({
860                    "field1": "a",
861                    "field2": "b",
862                })))
863                .unwrap(),
864                String::from_utf8(make_lsp_msg(serde_json::json!({
865                    "field1": "c",
866                    "field2": "d",
867                })))
868                .unwrap()
869            )
870            .into_bytes()
871        );
872    }
873
874    #[test(tokio::test)]
875    async fn stdout_read_should_convert_content_with_file_scheme_to_distant_scheme() {
876        let (mut transport, mut proc) = spawn_lsp_process().await;
877
878        // Send complete LSP message as stdout to process
879        transport
880            .write_frame_for(&Response::new(
881                proc.origin_id().to_string(),
882                protocol::Response::ProcStdout {
883                    id: proc.id(),
884                    data: make_lsp_msg(serde_json::json!({
885                        "field1": "distant://some/path",
886                        "field2": "file://other/path",
887                    })),
888                },
889            ))
890            .await
891            .unwrap();
892
893        // Receive complete message as stdout from process
894        let out = proc.stdout.as_mut().unwrap().read().await.unwrap();
895        assert_eq!(
896            out,
897            make_lsp_msg(serde_json::json!({
898                "field1": "distant://some/path",
899                "field2": "distant://other/path",
900            }))
901        );
902    }
903
904    #[test(tokio::test)]
905    async fn stderr_read_should_yield_lsp_messages_as_strings() {
906        let (mut transport, mut proc) = spawn_lsp_process().await;
907
908        // Send complete LSP message as stderr to process
909        transport
910            .write_frame_for(&Response::new(
911                proc.origin_id().to_string(),
912                protocol::Response::ProcStderr {
913                    id: proc.id(),
914                    data: make_lsp_msg(serde_json::json!({
915                        "field1": "a",
916                        "field2": "b",
917                    })),
918                },
919            ))
920            .await
921            .unwrap();
922
923        // Receive complete message as stderr from process
924        let err = proc.stderr.as_mut().unwrap().read().await.unwrap();
925        assert_eq!(
926            err,
927            make_lsp_msg(serde_json::json!({
928                "field1": "a",
929                "field2": "b",
930            }))
931        );
932    }
933
934    #[test(tokio::test)]
935    async fn stderr_read_should_only_yield_complete_lsp_messages() {
936        let (mut transport, mut proc) = spawn_lsp_process().await;
937
938        let msg = make_lsp_msg(serde_json::json!({
939            "field1": "a",
940            "field2": "b",
941        }));
942        let (msg_a, msg_b) = msg.split_at(msg.len() / 2);
943
944        // Send half of LSP message over stderr
945        transport
946            .write_frame_for(&Response::new(
947                proc.origin_id().to_string(),
948                protocol::Response::ProcStderr {
949                    id: proc.id(),
950                    data: msg_a.to_vec(),
951                },
952            ))
953            .await
954            .unwrap();
955
956        // Verify that remote process has not received a complete message yet
957        // NOTE: Yield to ensure that data would be waiting at the transport if it was sent
958        tokio::task::yield_now().await;
959        let result = timeout(TIMEOUT, proc.stderr.as_mut().unwrap().read()).await;
960        assert!(result.is_err(), "Unexpectedly got data: {:?}", result);
961
962        // Send other half of LSP message over stderr
963        transport
964            .write_frame_for(&Response::new(
965                proc.origin_id().to_string(),
966                protocol::Response::ProcStderr {
967                    id: proc.id(),
968                    data: msg_b.to_vec(),
969                },
970            ))
971            .await
972            .unwrap();
973
974        // Receive complete message as stderr from process
975        let err = proc.stderr.as_mut().unwrap().read().await.unwrap();
976        assert_eq!(
977            err,
978            make_lsp_msg(serde_json::json!({
979                "field1": "a",
980                "field2": "b",
981            }))
982        );
983    }
984
985    #[test(tokio::test)]
986    async fn stderr_read_should_only_consume_a_complete_lsp_message_even_if_more_errput_is_available(
987    ) {
988        let (mut transport, mut proc) = spawn_lsp_process().await;
989
990        let msg = make_lsp_msg(serde_json::json!({
991            "field1": "a",
992            "field2": "b",
993        }));
994        let extra = "some extra content";
995
996        // Send complete LSP message as stderr to process
997        transport
998            .write_frame_for(&Response::new(
999                proc.origin_id().to_string(),
1000                protocol::Response::ProcStderr {
1001                    id: proc.id(),
1002                    data: format!("{}{}", String::from_utf8(msg).unwrap(), extra).into_bytes(),
1003                },
1004            ))
1005            .await
1006            .unwrap();
1007
1008        // Receive complete message as stderr from process
1009        let err = proc.stderr.as_mut().unwrap().read().await.unwrap();
1010        assert_eq!(
1011            err,
1012            make_lsp_msg(serde_json::json!({
1013                "field1": "a",
1014                "field2": "b",
1015            }))
1016        );
1017
1018        // Verify nothing else was sent
1019        let result = timeout(TIMEOUT, proc.stderr.as_mut().unwrap().read()).await;
1020        assert!(
1021            result.is_err(),
1022            "Unexpected extra content received on stderr"
1023        );
1024    }
1025
1026    #[test(tokio::test)]
1027    async fn stderr_read_should_support_yielding_multiple_lsp_messages_if_all_received_at_once() {
1028        let (mut transport, mut proc) = spawn_lsp_process().await;
1029
1030        let msg_1 = make_lsp_msg(serde_json::json!({
1031            "field1": "a",
1032            "field2": "b",
1033        }));
1034        let msg_2 = make_lsp_msg(serde_json::json!({
1035            "field1": "c",
1036            "field2": "d",
1037        }));
1038
1039        // Send complete LSP message as stderr to process
1040        transport
1041            .write_frame_for(&Response::new(
1042                proc.origin_id().to_string(),
1043                protocol::Response::ProcStderr {
1044                    id: proc.id(),
1045                    data: format!(
1046                        "{}{}",
1047                        String::from_utf8(msg_1).unwrap(),
1048                        String::from_utf8(msg_2).unwrap()
1049                    )
1050                    .into_bytes(),
1051                },
1052            ))
1053            .await
1054            .unwrap();
1055
1056        // Should send both messages back together as a single string
1057        let err = proc.stderr.as_mut().unwrap().read().await.unwrap();
1058        assert_eq!(
1059            err,
1060            format!(
1061                "{}{}",
1062                String::from_utf8(make_lsp_msg(serde_json::json!({
1063                    "field1": "a",
1064                    "field2": "b",
1065                })))
1066                .unwrap(),
1067                String::from_utf8(make_lsp_msg(serde_json::json!({
1068                    "field1": "c",
1069                    "field2": "d",
1070                })))
1071                .unwrap()
1072            )
1073            .into_bytes()
1074        );
1075    }
1076
1077    #[test(tokio::test)]
1078    async fn stderr_read_should_convert_content_with_file_scheme_to_distant_scheme() {
1079        let (mut transport, mut proc) = spawn_lsp_process().await;
1080
1081        // Send complete LSP message as stderr to process
1082        transport
1083            .write_frame_for(&Response::new(
1084                proc.origin_id().to_string(),
1085                protocol::Response::ProcStderr {
1086                    id: proc.id(),
1087                    data: make_lsp_msg(serde_json::json!({
1088                        "field1": "distant://some/path",
1089                        "field2": "file://other/path",
1090                    })),
1091                },
1092            ))
1093            .await
1094            .unwrap();
1095
1096        // Receive complete message as stderr from process
1097        let err = proc.stderr.as_mut().unwrap().read().await.unwrap();
1098        assert_eq!(
1099            err,
1100            make_lsp_msg(serde_json::json!({
1101                "field1": "distant://some/path",
1102                "field2": "distant://other/path",
1103            }))
1104        );
1105    }
1106}