distant_net/manager/
client.rs

1use std::io;
2
3use distant_auth::msg::{Authentication, AuthenticationResponse};
4use distant_auth::AuthHandler;
5use log::*;
6
7use crate::client::Client;
8use crate::common::{ConnectionId, Destination, Map, Request};
9use crate::manager::data::{
10    ConnectionInfo, ConnectionList, ManagerRequest, ManagerResponse, SemVer,
11};
12
13mod channel;
14pub use channel::*;
15
16/// Represents a client that can connect to a remote server manager.
17pub type ManagerClient = Client<ManagerRequest, ManagerResponse>;
18
19impl ManagerClient {
20    /// Request that the manager launches a new server at the given `destination` with `options`
21    /// being passed for destination-specific details, returning the new `destination` of the
22    /// spawned server.
23    ///
24    ///  The provided `handler` will be used for any authentication requirements when connecting to
25    ///  the remote machine to spawn the server.
26    pub async fn launch(
27        &mut self,
28        destination: impl Into<Destination>,
29        options: impl Into<Map>,
30        mut handler: impl AuthHandler + Send,
31    ) -> io::Result<Destination> {
32        let destination = Box::new(destination.into());
33        let options = options.into();
34        trace!("launch({}, {})", destination, options);
35
36        let mut mailbox = self
37            .mail(ManagerRequest::Launch {
38                destination: destination.clone(),
39                options,
40            })
41            .await?;
42
43        // Continue to process authentication challenges and other details until we are either
44        // launched or fail
45        while let Some(res) = mailbox.next().await {
46            match res.payload {
47                ManagerResponse::Authenticate { id, msg } => match msg {
48                    Authentication::Initialization(x) => {
49                        if log::log_enabled!(Level::Debug) {
50                            debug!(
51                                "Initializing authentication, supporting {}",
52                                x.methods
53                                    .iter()
54                                    .map(ToOwned::to_owned)
55                                    .collect::<Vec<_>>()
56                                    .join(",")
57                            );
58                        }
59                        let msg = AuthenticationResponse::Initialization(
60                            handler.on_initialization(x).await?,
61                        );
62                        self.fire(Request::new(ManagerRequest::Authenticate { id, msg }))
63                            .await?;
64                    }
65                    Authentication::StartMethod(x) => {
66                        debug!("Starting authentication method {}", x.method);
67                    }
68                    Authentication::Challenge(x) => {
69                        if log::log_enabled!(Level::Debug) {
70                            for question in x.questions.iter() {
71                                debug!(
72                                    "Received challenge question [{}]: {}",
73                                    question.label, question.text
74                                );
75                            }
76                        }
77                        let msg = AuthenticationResponse::Challenge(handler.on_challenge(x).await?);
78                        self.fire(Request::new(ManagerRequest::Authenticate { id, msg }))
79                            .await?;
80                    }
81                    Authentication::Verification(x) => {
82                        debug!("Received verification request {}: {}", x.kind, x.text);
83                        let msg =
84                            AuthenticationResponse::Verification(handler.on_verification(x).await?);
85                        self.fire(Request::new(ManagerRequest::Authenticate { id, msg }))
86                            .await?;
87                    }
88                    Authentication::Info(x) => {
89                        info!("{}", x.text);
90                    }
91                    Authentication::Error(x) => {
92                        error!("{}", x.text);
93                        if x.is_fatal() {
94                            return Err(x.into_io_permission_denied());
95                        }
96                    }
97                    Authentication::Finished => {
98                        debug!("Finished authentication for {destination}");
99                    }
100                },
101                ManagerResponse::Launched { destination } => return Ok(destination),
102                ManagerResponse::Error { description } => {
103                    return Err(io::Error::new(io::ErrorKind::Other, description))
104                }
105                x => {
106                    return Err(io::Error::new(
107                        io::ErrorKind::InvalidData,
108                        format!("Got unexpected response: {x:?}"),
109                    ))
110                }
111            }
112        }
113
114        Err(io::Error::new(
115            io::ErrorKind::UnexpectedEof,
116            "Missing connection confirmation",
117        ))
118    }
119
120    /// Request that the manager establishes a new connection at the given `destination`
121    /// with `options` being passed for destination-specific details.
122    ///
123    /// The provided `handler` will be used for any authentication requirements when connecting to
124    /// the server.
125    pub async fn connect(
126        &mut self,
127        destination: impl Into<Destination>,
128        options: impl Into<Map>,
129        mut handler: impl AuthHandler + Send,
130    ) -> io::Result<ConnectionId> {
131        let destination = Box::new(destination.into());
132        let options = options.into();
133        trace!("connect({}, {})", destination, options);
134
135        let mut mailbox = self
136            .mail(ManagerRequest::Connect {
137                destination: destination.clone(),
138                options,
139            })
140            .await?;
141
142        // Continue to process authentication challenges and other details until we are either
143        // connected or fail
144        while let Some(res) = mailbox.next().await {
145            match res.payload {
146                ManagerResponse::Authenticate { id, msg } => match msg {
147                    Authentication::Initialization(x) => {
148                        if log::log_enabled!(Level::Debug) {
149                            debug!(
150                                "Initializing authentication, supporting {}",
151                                x.methods
152                                    .iter()
153                                    .map(ToOwned::to_owned)
154                                    .collect::<Vec<_>>()
155                                    .join(",")
156                            );
157                        }
158                        let msg = AuthenticationResponse::Initialization(
159                            handler.on_initialization(x).await?,
160                        );
161                        self.fire(Request::new(ManagerRequest::Authenticate { id, msg }))
162                            .await?;
163                    }
164                    Authentication::StartMethod(x) => {
165                        debug!("Starting authentication method {}", x.method);
166                    }
167                    Authentication::Challenge(x) => {
168                        if log::log_enabled!(Level::Debug) {
169                            for question in x.questions.iter() {
170                                debug!(
171                                    "Received challenge question [{}]: {}",
172                                    question.label, question.text
173                                );
174                            }
175                        }
176                        let msg = AuthenticationResponse::Challenge(handler.on_challenge(x).await?);
177                        self.fire(Request::new(ManagerRequest::Authenticate { id, msg }))
178                            .await?;
179                    }
180                    Authentication::Verification(x) => {
181                        debug!("Received verification request {}: {}", x.kind, x.text);
182                        let msg =
183                            AuthenticationResponse::Verification(handler.on_verification(x).await?);
184                        self.fire(Request::new(ManagerRequest::Authenticate { id, msg }))
185                            .await?;
186                    }
187                    Authentication::Info(x) => {
188                        info!("{}", x.text);
189                    }
190                    Authentication::Error(x) => {
191                        error!("{}", x.text);
192                        if x.is_fatal() {
193                            return Err(x.into_io_permission_denied());
194                        }
195                    }
196                    Authentication::Finished => {
197                        debug!("Finished authentication for {destination}");
198                    }
199                },
200                ManagerResponse::Connected { id } => return Ok(id),
201                ManagerResponse::Error { description } => {
202                    return Err(io::Error::new(io::ErrorKind::Other, description))
203                }
204                x => {
205                    return Err(io::Error::new(
206                        io::ErrorKind::InvalidData,
207                        format!("Got unexpected response: {x:?}"),
208                    ))
209                }
210            }
211        }
212
213        Err(io::Error::new(
214            io::ErrorKind::UnexpectedEof,
215            "Missing connection confirmation",
216        ))
217    }
218
219    /// Establishes a channel with the server represented by the `connection_id`,
220    /// returning a [`RawChannel`] acting as the connection.
221    ///
222    /// ### Note
223    ///
224    /// Multiple calls to open a channel against the same connection will result in establishing a
225    /// duplicate channel to the same server, so take care when using this method.
226    pub async fn open_raw_channel(
227        &mut self,
228        connection_id: ConnectionId,
229    ) -> io::Result<RawChannel> {
230        trace!("open_raw_channel({})", connection_id);
231        RawChannel::spawn(connection_id, self).await
232    }
233
234    /// Retrieves the version of the  manager.
235    pub async fn version(&mut self) -> io::Result<SemVer> {
236        trace!("version()");
237        let res = self.send(ManagerRequest::Version).await?;
238        match res.payload {
239            ManagerResponse::Version { version } => Ok(version),
240            ManagerResponse::Error { description } => {
241                Err(io::Error::new(io::ErrorKind::Other, description))
242            }
243            x => Err(io::Error::new(
244                io::ErrorKind::InvalidData,
245                format!("Got unexpected response: {x:?}"),
246            )),
247        }
248    }
249
250    /// Retrieves information about a specific connection
251    pub async fn info(&mut self, id: ConnectionId) -> io::Result<ConnectionInfo> {
252        trace!("info({})", id);
253        let res = self.send(ManagerRequest::Info { id }).await?;
254        match res.payload {
255            ManagerResponse::Info(info) => Ok(info),
256            ManagerResponse::Error { description } => {
257                Err(io::Error::new(io::ErrorKind::Other, description))
258            }
259            x => Err(io::Error::new(
260                io::ErrorKind::InvalidData,
261                format!("Got unexpected response: {x:?}"),
262            )),
263        }
264    }
265
266    /// Kills the specified connection
267    pub async fn kill(&mut self, id: ConnectionId) -> io::Result<()> {
268        trace!("kill({})", id);
269        let res = self.send(ManagerRequest::Kill { id }).await?;
270        match res.payload {
271            ManagerResponse::Killed => Ok(()),
272            ManagerResponse::Error { description } => {
273                Err(io::Error::new(io::ErrorKind::Other, description))
274            }
275            x => Err(io::Error::new(
276                io::ErrorKind::InvalidData,
277                format!("Got unexpected response: {x:?}"),
278            )),
279        }
280    }
281
282    /// Retrieves a list of active connections
283    pub async fn list(&mut self) -> io::Result<ConnectionList> {
284        trace!("list()");
285        let res = self.send(ManagerRequest::List).await?;
286        match res.payload {
287            ManagerResponse::List(list) => Ok(list),
288            ManagerResponse::Error { description } => {
289                Err(io::Error::new(io::ErrorKind::Other, description))
290            }
291            x => Err(io::Error::new(
292                io::ErrorKind::InvalidData,
293                format!("Got unexpected response: {x:?}"),
294            )),
295        }
296    }
297}
298
299#[cfg(test)]
300mod tests {
301    use distant_auth::DummyAuthHandler;
302
303    use super::*;
304    use crate::client::UntypedClient;
305    use crate::common::{Connection, InmemoryTransport, Request, Response};
306
307    fn setup() -> (ManagerClient, Connection<InmemoryTransport>) {
308        let (client, server) = Connection::pair(100);
309        let client = UntypedClient::spawn(client, Default::default()).into_typed_client();
310        (client, server)
311    }
312
313    #[inline]
314    fn test_error() -> io::Error {
315        io::Error::new(io::ErrorKind::Interrupted, "test error")
316    }
317
318    #[inline]
319    fn test_error_response() -> ManagerResponse {
320        ManagerResponse::from(test_error())
321    }
322
323    #[tokio::test]
324    async fn connect_should_report_error_if_receives_error_response() {
325        let (mut client, mut transport) = setup();
326
327        tokio::spawn(async move {
328            let request = transport
329                .read_frame_as::<Request<ManagerRequest>>()
330                .await
331                .unwrap()
332                .unwrap();
333
334            transport
335                .write_frame_for(&Response::new(request.id, test_error_response()))
336                .await
337                .unwrap();
338        });
339
340        let err = client
341            .connect(
342                "scheme://host".parse::<Destination>().unwrap(),
343                "key=value".parse::<Map>().unwrap(),
344                DummyAuthHandler,
345            )
346            .await
347            .unwrap_err();
348        assert_eq!(err.kind(), io::ErrorKind::Other);
349        assert_eq!(err.to_string(), test_error().to_string());
350    }
351
352    #[tokio::test]
353    async fn connect_should_report_error_if_receives_unexpected_response() {
354        let (mut client, mut transport) = setup();
355
356        tokio::spawn(async move {
357            let request = transport
358                .read_frame_as::<Request<ManagerRequest>>()
359                .await
360                .unwrap()
361                .unwrap();
362
363            transport
364                .write_frame_for(&Response::new(request.id, ManagerResponse::Killed))
365                .await
366                .unwrap();
367        });
368
369        let err = client
370            .connect(
371                "scheme://host".parse::<Destination>().unwrap(),
372                "key=value".parse::<Map>().unwrap(),
373                DummyAuthHandler,
374            )
375            .await
376            .unwrap_err();
377        assert_eq!(err.kind(), io::ErrorKind::InvalidData);
378    }
379
380    #[tokio::test]
381    async fn connect_should_return_id_from_successful_response() {
382        let (mut client, mut transport) = setup();
383
384        let expected_id = 999;
385        tokio::spawn(async move {
386            let request = transport
387                .read_frame_as::<Request<ManagerRequest>>()
388                .await
389                .unwrap()
390                .unwrap();
391
392            transport
393                .write_frame_for(&Response::new(
394                    request.id,
395                    ManagerResponse::Connected { id: expected_id },
396                ))
397                .await
398                .unwrap();
399        });
400
401        let id = client
402            .connect(
403                "scheme://host".parse::<Destination>().unwrap(),
404                "key=value".parse::<Map>().unwrap(),
405                DummyAuthHandler,
406            )
407            .await
408            .unwrap();
409        assert_eq!(id, expected_id);
410    }
411
412    #[tokio::test]
413    async fn info_should_report_error_if_receives_error_response() {
414        let (mut client, mut transport) = setup();
415
416        tokio::spawn(async move {
417            let request = transport
418                .read_frame_as::<Request<ManagerRequest>>()
419                .await
420                .unwrap()
421                .unwrap();
422
423            transport
424                .write_frame_for(&Response::new(request.id, test_error_response()))
425                .await
426                .unwrap();
427        });
428
429        let err = client.info(123).await.unwrap_err();
430        assert_eq!(err.kind(), io::ErrorKind::Other);
431        assert_eq!(err.to_string(), test_error().to_string());
432    }
433
434    #[tokio::test]
435    async fn info_should_report_error_if_receives_unexpected_response() {
436        let (mut client, mut transport) = setup();
437
438        tokio::spawn(async move {
439            let request = transport
440                .read_frame_as::<Request<ManagerRequest>>()
441                .await
442                .unwrap()
443                .unwrap();
444
445            transport
446                .write_frame_for(&Response::new(request.id, ManagerResponse::Killed))
447                .await
448                .unwrap();
449        });
450
451        let err = client.info(123).await.unwrap_err();
452        assert_eq!(err.kind(), io::ErrorKind::InvalidData);
453    }
454
455    #[tokio::test]
456    async fn info_should_return_connection_info_from_successful_response() {
457        let (mut client, mut transport) = setup();
458
459        tokio::spawn(async move {
460            let request = transport
461                .read_frame_as::<Request<ManagerRequest>>()
462                .await
463                .unwrap()
464                .unwrap();
465
466            let info = ConnectionInfo {
467                id: 123,
468                destination: "scheme://host".parse::<Destination>().unwrap(),
469                options: "key=value".parse::<Map>().unwrap(),
470            };
471
472            transport
473                .write_frame_for(&Response::new(request.id, ManagerResponse::Info(info)))
474                .await
475                .unwrap();
476        });
477
478        let info = client.info(123).await.unwrap();
479        assert_eq!(info.id, 123);
480        assert_eq!(
481            info.destination,
482            "scheme://host".parse::<Destination>().unwrap()
483        );
484        assert_eq!(info.options, "key=value".parse::<Map>().unwrap());
485    }
486
487    #[tokio::test]
488    async fn list_should_report_error_if_receives_error_response() {
489        let (mut client, mut transport) = setup();
490
491        tokio::spawn(async move {
492            let request = transport
493                .read_frame_as::<Request<ManagerRequest>>()
494                .await
495                .unwrap()
496                .unwrap();
497
498            transport
499                .write_frame_for(&Response::new(request.id, test_error_response()))
500                .await
501                .unwrap();
502        });
503
504        let err = client.list().await.unwrap_err();
505        assert_eq!(err.kind(), io::ErrorKind::Other);
506        assert_eq!(err.to_string(), test_error().to_string());
507    }
508
509    #[tokio::test]
510    async fn list_should_report_error_if_receives_unexpected_response() {
511        let (mut client, mut transport) = setup();
512
513        tokio::spawn(async move {
514            let request = transport
515                .read_frame_as::<Request<ManagerRequest>>()
516                .await
517                .unwrap()
518                .unwrap();
519
520            transport
521                .write_frame_for(&Response::new(request.id, ManagerResponse::Killed))
522                .await
523                .unwrap();
524        });
525
526        let err = client.list().await.unwrap_err();
527        assert_eq!(err.kind(), io::ErrorKind::InvalidData);
528    }
529
530    #[tokio::test]
531    async fn list_should_return_connection_list_from_successful_response() {
532        let (mut client, mut transport) = setup();
533
534        tokio::spawn(async move {
535            let request = transport
536                .read_frame_as::<Request<ManagerRequest>>()
537                .await
538                .unwrap()
539                .unwrap();
540
541            let mut list = ConnectionList::new();
542            list.insert(123, "scheme://host".parse::<Destination>().unwrap());
543
544            transport
545                .write_frame_for(&Response::new(request.id, ManagerResponse::List(list)))
546                .await
547                .unwrap();
548        });
549
550        let list = client.list().await.unwrap();
551        assert_eq!(list.len(), 1);
552        assert_eq!(
553            list.get(&123).expect("Connection list missing item"),
554            &"scheme://host".parse::<Destination>().unwrap()
555        );
556    }
557
558    #[tokio::test]
559    async fn kill_should_report_error_if_receives_error_response() {
560        let (mut client, mut transport) = setup();
561
562        tokio::spawn(async move {
563            let request = transport
564                .read_frame_as::<Request<ManagerRequest>>()
565                .await
566                .unwrap()
567                .unwrap();
568
569            transport
570                .write_frame_for(&Response::new(request.id, test_error_response()))
571                .await
572                .unwrap();
573        });
574
575        let err = client.kill(123).await.unwrap_err();
576        assert_eq!(err.kind(), io::ErrorKind::Other);
577        assert_eq!(err.to_string(), test_error().to_string());
578    }
579
580    #[tokio::test]
581    async fn kill_should_report_error_if_receives_unexpected_response() {
582        let (mut client, mut transport) = setup();
583
584        tokio::spawn(async move {
585            let request = transport
586                .read_frame_as::<Request<ManagerRequest>>()
587                .await
588                .unwrap()
589                .unwrap();
590
591            transport
592                .write_frame_for(&Response::new(
593                    request.id,
594                    ManagerResponse::Connected { id: 0 },
595                ))
596                .await
597                .unwrap();
598        });
599
600        let err = client.kill(123).await.unwrap_err();
601        assert_eq!(err.kind(), io::ErrorKind::InvalidData);
602    }
603
604    #[tokio::test]
605    async fn kill_should_return_success_from_successful_response() {
606        let (mut client, mut transport) = setup();
607
608        tokio::spawn(async move {
609            let request = transport
610                .read_frame_as::<Request<ManagerRequest>>()
611                .await
612                .unwrap()
613                .unwrap();
614
615            transport
616                .write_frame_for(&Response::new(request.id, ManagerResponse::Killed))
617                .await
618                .unwrap();
619        });
620
621        client.kill(123).await.unwrap();
622    }
623}