distant_net/manager/client/
channel.rs

1use std::io;
2use std::ops::{Deref, DerefMut};
3
4use log::*;
5use serde::de::DeserializeOwned;
6use serde::Serialize;
7use tokio::task::JoinHandle;
8
9use crate::client::{Client, ClientConfig, UntypedClient};
10use crate::common::{ConnectionId, FramedTransport, InmemoryTransport, UntypedRequest};
11use crate::manager::data::{ManagerRequest, ManagerResponse};
12
13/// Represents a raw channel between a manager client and server. Underneath, this routes incoming
14/// and outgoing data from a proxied server to an inmemory transport.
15pub struct RawChannel {
16    transport: FramedTransport<InmemoryTransport>,
17    task: JoinHandle<()>,
18}
19
20impl RawChannel {
21    pub fn abort(&self) {
22        self.task.abort();
23    }
24
25    /// Consumes this channel, returning a typed client wrapping the transport.
26    ///
27    /// ### Note
28    ///
29    /// This does not perform any additional handshakes or authentication. All authentication was
30    /// performed during separate connection and this merely wraps an inmemory transport that maps
31    /// to the primary connection.
32    pub fn into_client<T, U>(self) -> Client<T, U>
33    where
34        T: Send + Sync + Serialize + 'static,
35        U: Send + Sync + DeserializeOwned + 'static,
36    {
37        Client::spawn_inmemory(
38            self.transport,
39            ClientConfig::default().with_maximum_silence_duration(),
40        )
41    }
42
43    /// Consumes this channel, returning an untyped client wrapping the transport.
44    ///
45    /// ### Note
46    ///
47    /// This does not perform any additional handshakes or authentication. All authentication was
48    /// performed during separate connection and this merely wraps an inmemory transport that maps
49    /// to the primary connection.
50    pub fn into_untyped_client(self) -> UntypedClient {
51        UntypedClient::spawn_inmemory(
52            self.transport,
53            ClientConfig::default().with_maximum_silence_duration(),
54        )
55    }
56
57    /// Returns reference to the underlying framed transport.
58    pub fn as_framed_transport(&self) -> &FramedTransport<InmemoryTransport> {
59        &self.transport
60    }
61
62    /// Returns mutable reference to the underlying framed transport.
63    pub fn as_mut_framed_transport(&mut self) -> &mut FramedTransport<InmemoryTransport> {
64        &mut self.transport
65    }
66
67    /// Consumes the channel, returning the underlying framed transport.
68    pub fn into_framed_transport(self) -> FramedTransport<InmemoryTransport> {
69        self.transport
70    }
71}
72
73impl Deref for RawChannel {
74    type Target = FramedTransport<InmemoryTransport>;
75
76    fn deref(&self) -> &Self::Target {
77        &self.transport
78    }
79}
80
81impl DerefMut for RawChannel {
82    fn deref_mut(&mut self) -> &mut Self::Target {
83        &mut self.transport
84    }
85}
86
87impl RawChannel {
88    pub(super) async fn spawn(
89        connection_id: ConnectionId,
90        client: &mut Client<ManagerRequest, ManagerResponse>,
91    ) -> io::Result<Self> {
92        let mut mailbox = client
93            .mail(ManagerRequest::OpenChannel { id: connection_id })
94            .await?;
95
96        // Wait for the first response, which should be channel confirmation
97        let channel_id = match mailbox.next().await {
98            Some(response) => match response.payload {
99                ManagerResponse::ChannelOpened { id } => Ok(id),
100                ManagerResponse::Error { description } => {
101                    Err(io::Error::new(io::ErrorKind::Other, description))
102                }
103                x => Err(io::Error::new(
104                    io::ErrorKind::InvalidData,
105                    format!("[Conn {connection_id}] Raw channel open unexpected response: {x:?}"),
106                )),
107            },
108            None => Err(io::Error::new(
109                io::ErrorKind::ConnectionAborted,
110                format!("[Conn {connection_id}] Raw channel mailbox aborted"),
111            )),
112        }?;
113
114        // Spawn our channel proxy transport
115        let (mut proxy, transport) = FramedTransport::pair(1);
116
117        let mut manager_channel = client.clone_channel();
118        let task = tokio::spawn(async move {
119            loop {
120                tokio::select! {
121                    maybe_response = mailbox.next() => {
122                        if maybe_response.is_none() {
123                            debug!("[Conn {connection_id} :: Chan {channel_id}] Closing from no more responses");
124                            break;
125                        }
126
127                        match maybe_response.unwrap().payload {
128                            ManagerResponse::Channel { response, .. } => {
129                                if let Err(x) = proxy.write_frame(response.to_bytes()).await {
130                                    error!(
131                                        "[Conn {connection_id} :: Chan {channel_id}] Write response failed: {x}"
132                                    );
133                                }
134                            }
135                            ManagerResponse::ChannelClosed { .. } => {
136                                break;
137                            }
138                            _ => continue,
139                        }
140                    }
141                    result = proxy.read_frame() => {
142                        match result {
143                            Ok(Some(frame)) => {
144                                let request = match UntypedRequest::from_slice(frame.as_item()) {
145                                    Ok(x) => x.into_owned(),
146                                    Err(x) => {
147                                        error!("[Conn {connection_id} :: Chan {channel_id}] Parse request failed: {x}");
148                                        continue;
149                                    }
150                                };
151
152                                // NOTE: In this situation, we do not expect a response to this
153                                //       request (even if the server sends something back)
154                                if let Err(x) = manager_channel
155                                    .fire(ManagerRequest::Channel {
156                                        id: channel_id,
157                                        request,
158                                    })
159                                    .await
160                                {
161                                    error!("[Conn {connection_id} :: Chan {channel_id}] Forward failed: {x}");
162                                }
163                            }
164                            Ok(None) => {
165                                debug!("[Conn {connection_id} :: Chan {channel_id}] Closing from no more requests");
166                                break;
167                            }
168                            Err(x) => {
169                                error!("[Conn {connection_id} :: Chan {channel_id}] Read frame failed: {x}");
170                            }
171                        }
172                    }
173                }
174            }
175        });
176
177        Ok(RawChannel { transport, task })
178    }
179}