1use std::io;
2
3use distant_auth::msg::{Authentication, AuthenticationResponse};
4use distant_auth::AuthHandler;
5use log::*;
6
7use crate::client::Client;
8use crate::common::{ConnectionId, Destination, Map, Request};
9use crate::manager::data::{
10 ConnectionInfo, ConnectionList, ManagerRequest, ManagerResponse, SemVer,
11};
12
13mod channel;
14pub use channel::*;
15
16pub type ManagerClient = Client<ManagerRequest, ManagerResponse>;
18
19impl ManagerClient {
20 pub async fn launch(
27 &mut self,
28 destination: impl Into<Destination>,
29 options: impl Into<Map>,
30 mut handler: impl AuthHandler + Send,
31 ) -> io::Result<Destination> {
32 let destination = Box::new(destination.into());
33 let options = options.into();
34 trace!("launch({}, {})", destination, options);
35
36 let mut mailbox = self
37 .mail(ManagerRequest::Launch {
38 destination: destination.clone(),
39 options,
40 })
41 .await?;
42
43 while let Some(res) = mailbox.next().await {
46 match res.payload {
47 ManagerResponse::Authenticate { id, msg } => match msg {
48 Authentication::Initialization(x) => {
49 if log::log_enabled!(Level::Debug) {
50 debug!(
51 "Initializing authentication, supporting {}",
52 x.methods
53 .iter()
54 .map(ToOwned::to_owned)
55 .collect::<Vec<_>>()
56 .join(",")
57 );
58 }
59 let msg = AuthenticationResponse::Initialization(
60 handler.on_initialization(x).await?,
61 );
62 self.fire(Request::new(ManagerRequest::Authenticate { id, msg }))
63 .await?;
64 }
65 Authentication::StartMethod(x) => {
66 debug!("Starting authentication method {}", x.method);
67 }
68 Authentication::Challenge(x) => {
69 if log::log_enabled!(Level::Debug) {
70 for question in x.questions.iter() {
71 debug!(
72 "Received challenge question [{}]: {}",
73 question.label, question.text
74 );
75 }
76 }
77 let msg = AuthenticationResponse::Challenge(handler.on_challenge(x).await?);
78 self.fire(Request::new(ManagerRequest::Authenticate { id, msg }))
79 .await?;
80 }
81 Authentication::Verification(x) => {
82 debug!("Received verification request {}: {}", x.kind, x.text);
83 let msg =
84 AuthenticationResponse::Verification(handler.on_verification(x).await?);
85 self.fire(Request::new(ManagerRequest::Authenticate { id, msg }))
86 .await?;
87 }
88 Authentication::Info(x) => {
89 info!("{}", x.text);
90 }
91 Authentication::Error(x) => {
92 error!("{}", x.text);
93 if x.is_fatal() {
94 return Err(x.into_io_permission_denied());
95 }
96 }
97 Authentication::Finished => {
98 debug!("Finished authentication for {destination}");
99 }
100 },
101 ManagerResponse::Launched { destination } => return Ok(destination),
102 ManagerResponse::Error { description } => {
103 return Err(io::Error::new(io::ErrorKind::Other, description))
104 }
105 x => {
106 return Err(io::Error::new(
107 io::ErrorKind::InvalidData,
108 format!("Got unexpected response: {x:?}"),
109 ))
110 }
111 }
112 }
113
114 Err(io::Error::new(
115 io::ErrorKind::UnexpectedEof,
116 "Missing connection confirmation",
117 ))
118 }
119
120 pub async fn connect(
126 &mut self,
127 destination: impl Into<Destination>,
128 options: impl Into<Map>,
129 mut handler: impl AuthHandler + Send,
130 ) -> io::Result<ConnectionId> {
131 let destination = Box::new(destination.into());
132 let options = options.into();
133 trace!("connect({}, {})", destination, options);
134
135 let mut mailbox = self
136 .mail(ManagerRequest::Connect {
137 destination: destination.clone(),
138 options,
139 })
140 .await?;
141
142 while let Some(res) = mailbox.next().await {
145 match res.payload {
146 ManagerResponse::Authenticate { id, msg } => match msg {
147 Authentication::Initialization(x) => {
148 if log::log_enabled!(Level::Debug) {
149 debug!(
150 "Initializing authentication, supporting {}",
151 x.methods
152 .iter()
153 .map(ToOwned::to_owned)
154 .collect::<Vec<_>>()
155 .join(",")
156 );
157 }
158 let msg = AuthenticationResponse::Initialization(
159 handler.on_initialization(x).await?,
160 );
161 self.fire(Request::new(ManagerRequest::Authenticate { id, msg }))
162 .await?;
163 }
164 Authentication::StartMethod(x) => {
165 debug!("Starting authentication method {}", x.method);
166 }
167 Authentication::Challenge(x) => {
168 if log::log_enabled!(Level::Debug) {
169 for question in x.questions.iter() {
170 debug!(
171 "Received challenge question [{}]: {}",
172 question.label, question.text
173 );
174 }
175 }
176 let msg = AuthenticationResponse::Challenge(handler.on_challenge(x).await?);
177 self.fire(Request::new(ManagerRequest::Authenticate { id, msg }))
178 .await?;
179 }
180 Authentication::Verification(x) => {
181 debug!("Received verification request {}: {}", x.kind, x.text);
182 let msg =
183 AuthenticationResponse::Verification(handler.on_verification(x).await?);
184 self.fire(Request::new(ManagerRequest::Authenticate { id, msg }))
185 .await?;
186 }
187 Authentication::Info(x) => {
188 info!("{}", x.text);
189 }
190 Authentication::Error(x) => {
191 error!("{}", x.text);
192 if x.is_fatal() {
193 return Err(x.into_io_permission_denied());
194 }
195 }
196 Authentication::Finished => {
197 debug!("Finished authentication for {destination}");
198 }
199 },
200 ManagerResponse::Connected { id } => return Ok(id),
201 ManagerResponse::Error { description } => {
202 return Err(io::Error::new(io::ErrorKind::Other, description))
203 }
204 x => {
205 return Err(io::Error::new(
206 io::ErrorKind::InvalidData,
207 format!("Got unexpected response: {x:?}"),
208 ))
209 }
210 }
211 }
212
213 Err(io::Error::new(
214 io::ErrorKind::UnexpectedEof,
215 "Missing connection confirmation",
216 ))
217 }
218
219 pub async fn open_raw_channel(
227 &mut self,
228 connection_id: ConnectionId,
229 ) -> io::Result<RawChannel> {
230 trace!("open_raw_channel({})", connection_id);
231 RawChannel::spawn(connection_id, self).await
232 }
233
234 pub async fn version(&mut self) -> io::Result<SemVer> {
236 trace!("version()");
237 let res = self.send(ManagerRequest::Version).await?;
238 match res.payload {
239 ManagerResponse::Version { version } => Ok(version),
240 ManagerResponse::Error { description } => {
241 Err(io::Error::new(io::ErrorKind::Other, description))
242 }
243 x => Err(io::Error::new(
244 io::ErrorKind::InvalidData,
245 format!("Got unexpected response: {x:?}"),
246 )),
247 }
248 }
249
250 pub async fn info(&mut self, id: ConnectionId) -> io::Result<ConnectionInfo> {
252 trace!("info({})", id);
253 let res = self.send(ManagerRequest::Info { id }).await?;
254 match res.payload {
255 ManagerResponse::Info(info) => Ok(info),
256 ManagerResponse::Error { description } => {
257 Err(io::Error::new(io::ErrorKind::Other, description))
258 }
259 x => Err(io::Error::new(
260 io::ErrorKind::InvalidData,
261 format!("Got unexpected response: {x:?}"),
262 )),
263 }
264 }
265
266 pub async fn kill(&mut self, id: ConnectionId) -> io::Result<()> {
268 trace!("kill({})", id);
269 let res = self.send(ManagerRequest::Kill { id }).await?;
270 match res.payload {
271 ManagerResponse::Killed => Ok(()),
272 ManagerResponse::Error { description } => {
273 Err(io::Error::new(io::ErrorKind::Other, description))
274 }
275 x => Err(io::Error::new(
276 io::ErrorKind::InvalidData,
277 format!("Got unexpected response: {x:?}"),
278 )),
279 }
280 }
281
282 pub async fn list(&mut self) -> io::Result<ConnectionList> {
284 trace!("list()");
285 let res = self.send(ManagerRequest::List).await?;
286 match res.payload {
287 ManagerResponse::List(list) => Ok(list),
288 ManagerResponse::Error { description } => {
289 Err(io::Error::new(io::ErrorKind::Other, description))
290 }
291 x => Err(io::Error::new(
292 io::ErrorKind::InvalidData,
293 format!("Got unexpected response: {x:?}"),
294 )),
295 }
296 }
297}
298
299#[cfg(test)]
300mod tests {
301 use distant_auth::DummyAuthHandler;
302
303 use super::*;
304 use crate::client::UntypedClient;
305 use crate::common::{Connection, InmemoryTransport, Request, Response};
306
307 fn setup() -> (ManagerClient, Connection<InmemoryTransport>) {
308 let (client, server) = Connection::pair(100);
309 let client = UntypedClient::spawn(client, Default::default()).into_typed_client();
310 (client, server)
311 }
312
313 #[inline]
314 fn test_error() -> io::Error {
315 io::Error::new(io::ErrorKind::Interrupted, "test error")
316 }
317
318 #[inline]
319 fn test_error_response() -> ManagerResponse {
320 ManagerResponse::from(test_error())
321 }
322
323 #[tokio::test]
324 async fn connect_should_report_error_if_receives_error_response() {
325 let (mut client, mut transport) = setup();
326
327 tokio::spawn(async move {
328 let request = transport
329 .read_frame_as::<Request<ManagerRequest>>()
330 .await
331 .unwrap()
332 .unwrap();
333
334 transport
335 .write_frame_for(&Response::new(request.id, test_error_response()))
336 .await
337 .unwrap();
338 });
339
340 let err = client
341 .connect(
342 "scheme://host".parse::<Destination>().unwrap(),
343 "key=value".parse::<Map>().unwrap(),
344 DummyAuthHandler,
345 )
346 .await
347 .unwrap_err();
348 assert_eq!(err.kind(), io::ErrorKind::Other);
349 assert_eq!(err.to_string(), test_error().to_string());
350 }
351
352 #[tokio::test]
353 async fn connect_should_report_error_if_receives_unexpected_response() {
354 let (mut client, mut transport) = setup();
355
356 tokio::spawn(async move {
357 let request = transport
358 .read_frame_as::<Request<ManagerRequest>>()
359 .await
360 .unwrap()
361 .unwrap();
362
363 transport
364 .write_frame_for(&Response::new(request.id, ManagerResponse::Killed))
365 .await
366 .unwrap();
367 });
368
369 let err = client
370 .connect(
371 "scheme://host".parse::<Destination>().unwrap(),
372 "key=value".parse::<Map>().unwrap(),
373 DummyAuthHandler,
374 )
375 .await
376 .unwrap_err();
377 assert_eq!(err.kind(), io::ErrorKind::InvalidData);
378 }
379
380 #[tokio::test]
381 async fn connect_should_return_id_from_successful_response() {
382 let (mut client, mut transport) = setup();
383
384 let expected_id = 999;
385 tokio::spawn(async move {
386 let request = transport
387 .read_frame_as::<Request<ManagerRequest>>()
388 .await
389 .unwrap()
390 .unwrap();
391
392 transport
393 .write_frame_for(&Response::new(
394 request.id,
395 ManagerResponse::Connected { id: expected_id },
396 ))
397 .await
398 .unwrap();
399 });
400
401 let id = client
402 .connect(
403 "scheme://host".parse::<Destination>().unwrap(),
404 "key=value".parse::<Map>().unwrap(),
405 DummyAuthHandler,
406 )
407 .await
408 .unwrap();
409 assert_eq!(id, expected_id);
410 }
411
412 #[tokio::test]
413 async fn info_should_report_error_if_receives_error_response() {
414 let (mut client, mut transport) = setup();
415
416 tokio::spawn(async move {
417 let request = transport
418 .read_frame_as::<Request<ManagerRequest>>()
419 .await
420 .unwrap()
421 .unwrap();
422
423 transport
424 .write_frame_for(&Response::new(request.id, test_error_response()))
425 .await
426 .unwrap();
427 });
428
429 let err = client.info(123).await.unwrap_err();
430 assert_eq!(err.kind(), io::ErrorKind::Other);
431 assert_eq!(err.to_string(), test_error().to_string());
432 }
433
434 #[tokio::test]
435 async fn info_should_report_error_if_receives_unexpected_response() {
436 let (mut client, mut transport) = setup();
437
438 tokio::spawn(async move {
439 let request = transport
440 .read_frame_as::<Request<ManagerRequest>>()
441 .await
442 .unwrap()
443 .unwrap();
444
445 transport
446 .write_frame_for(&Response::new(request.id, ManagerResponse::Killed))
447 .await
448 .unwrap();
449 });
450
451 let err = client.info(123).await.unwrap_err();
452 assert_eq!(err.kind(), io::ErrorKind::InvalidData);
453 }
454
455 #[tokio::test]
456 async fn info_should_return_connection_info_from_successful_response() {
457 let (mut client, mut transport) = setup();
458
459 tokio::spawn(async move {
460 let request = transport
461 .read_frame_as::<Request<ManagerRequest>>()
462 .await
463 .unwrap()
464 .unwrap();
465
466 let info = ConnectionInfo {
467 id: 123,
468 destination: "scheme://host".parse::<Destination>().unwrap(),
469 options: "key=value".parse::<Map>().unwrap(),
470 };
471
472 transport
473 .write_frame_for(&Response::new(request.id, ManagerResponse::Info(info)))
474 .await
475 .unwrap();
476 });
477
478 let info = client.info(123).await.unwrap();
479 assert_eq!(info.id, 123);
480 assert_eq!(
481 info.destination,
482 "scheme://host".parse::<Destination>().unwrap()
483 );
484 assert_eq!(info.options, "key=value".parse::<Map>().unwrap());
485 }
486
487 #[tokio::test]
488 async fn list_should_report_error_if_receives_error_response() {
489 let (mut client, mut transport) = setup();
490
491 tokio::spawn(async move {
492 let request = transport
493 .read_frame_as::<Request<ManagerRequest>>()
494 .await
495 .unwrap()
496 .unwrap();
497
498 transport
499 .write_frame_for(&Response::new(request.id, test_error_response()))
500 .await
501 .unwrap();
502 });
503
504 let err = client.list().await.unwrap_err();
505 assert_eq!(err.kind(), io::ErrorKind::Other);
506 assert_eq!(err.to_string(), test_error().to_string());
507 }
508
509 #[tokio::test]
510 async fn list_should_report_error_if_receives_unexpected_response() {
511 let (mut client, mut transport) = setup();
512
513 tokio::spawn(async move {
514 let request = transport
515 .read_frame_as::<Request<ManagerRequest>>()
516 .await
517 .unwrap()
518 .unwrap();
519
520 transport
521 .write_frame_for(&Response::new(request.id, ManagerResponse::Killed))
522 .await
523 .unwrap();
524 });
525
526 let err = client.list().await.unwrap_err();
527 assert_eq!(err.kind(), io::ErrorKind::InvalidData);
528 }
529
530 #[tokio::test]
531 async fn list_should_return_connection_list_from_successful_response() {
532 let (mut client, mut transport) = setup();
533
534 tokio::spawn(async move {
535 let request = transport
536 .read_frame_as::<Request<ManagerRequest>>()
537 .await
538 .unwrap()
539 .unwrap();
540
541 let mut list = ConnectionList::new();
542 list.insert(123, "scheme://host".parse::<Destination>().unwrap());
543
544 transport
545 .write_frame_for(&Response::new(request.id, ManagerResponse::List(list)))
546 .await
547 .unwrap();
548 });
549
550 let list = client.list().await.unwrap();
551 assert_eq!(list.len(), 1);
552 assert_eq!(
553 list.get(&123).expect("Connection list missing item"),
554 &"scheme://host".parse::<Destination>().unwrap()
555 );
556 }
557
558 #[tokio::test]
559 async fn kill_should_report_error_if_receives_error_response() {
560 let (mut client, mut transport) = setup();
561
562 tokio::spawn(async move {
563 let request = transport
564 .read_frame_as::<Request<ManagerRequest>>()
565 .await
566 .unwrap()
567 .unwrap();
568
569 transport
570 .write_frame_for(&Response::new(request.id, test_error_response()))
571 .await
572 .unwrap();
573 });
574
575 let err = client.kill(123).await.unwrap_err();
576 assert_eq!(err.kind(), io::ErrorKind::Other);
577 assert_eq!(err.to_string(), test_error().to_string());
578 }
579
580 #[tokio::test]
581 async fn kill_should_report_error_if_receives_unexpected_response() {
582 let (mut client, mut transport) = setup();
583
584 tokio::spawn(async move {
585 let request = transport
586 .read_frame_as::<Request<ManagerRequest>>()
587 .await
588 .unwrap()
589 .unwrap();
590
591 transport
592 .write_frame_for(&Response::new(
593 request.id,
594 ManagerResponse::Connected { id: 0 },
595 ))
596 .await
597 .unwrap();
598 });
599
600 let err = client.kill(123).await.unwrap_err();
601 assert_eq!(err.kind(), io::ErrorKind::InvalidData);
602 }
603
604 #[tokio::test]
605 async fn kill_should_return_success_from_successful_response() {
606 let (mut client, mut transport) = setup();
607
608 tokio::spawn(async move {
609 let request = transport
610 .read_frame_as::<Request<ManagerRequest>>()
611 .await
612 .unwrap()
613 .unwrap();
614
615 transport
616 .write_frame_for(&Response::new(request.id, ManagerResponse::Killed))
617 .await
618 .unwrap();
619 });
620
621 client.kill(123).await.unwrap();
622 }
623}