distant_net/manager/client/
channel.rs1use std::io;
2use std::ops::{Deref, DerefMut};
3
4use log::*;
5use serde::de::DeserializeOwned;
6use serde::Serialize;
7use tokio::task::JoinHandle;
8
9use crate::client::{Client, ClientConfig, UntypedClient};
10use crate::common::{ConnectionId, FramedTransport, InmemoryTransport, UntypedRequest};
11use crate::manager::data::{ManagerRequest, ManagerResponse};
12
13pub struct RawChannel {
16 transport: FramedTransport<InmemoryTransport>,
17 task: JoinHandle<()>,
18}
19
20impl RawChannel {
21 pub fn abort(&self) {
22 self.task.abort();
23 }
24
25 pub fn into_client<T, U>(self) -> Client<T, U>
33 where
34 T: Send + Sync + Serialize + 'static,
35 U: Send + Sync + DeserializeOwned + 'static,
36 {
37 Client::spawn_inmemory(
38 self.transport,
39 ClientConfig::default().with_maximum_silence_duration(),
40 )
41 }
42
43 pub fn into_untyped_client(self) -> UntypedClient {
51 UntypedClient::spawn_inmemory(
52 self.transport,
53 ClientConfig::default().with_maximum_silence_duration(),
54 )
55 }
56
57 pub fn as_framed_transport(&self) -> &FramedTransport<InmemoryTransport> {
59 &self.transport
60 }
61
62 pub fn as_mut_framed_transport(&mut self) -> &mut FramedTransport<InmemoryTransport> {
64 &mut self.transport
65 }
66
67 pub fn into_framed_transport(self) -> FramedTransport<InmemoryTransport> {
69 self.transport
70 }
71}
72
73impl Deref for RawChannel {
74 type Target = FramedTransport<InmemoryTransport>;
75
76 fn deref(&self) -> &Self::Target {
77 &self.transport
78 }
79}
80
81impl DerefMut for RawChannel {
82 fn deref_mut(&mut self) -> &mut Self::Target {
83 &mut self.transport
84 }
85}
86
87impl RawChannel {
88 pub(super) async fn spawn(
89 connection_id: ConnectionId,
90 client: &mut Client<ManagerRequest, ManagerResponse>,
91 ) -> io::Result<Self> {
92 let mut mailbox = client
93 .mail(ManagerRequest::OpenChannel { id: connection_id })
94 .await?;
95
96 let channel_id = match mailbox.next().await {
98 Some(response) => match response.payload {
99 ManagerResponse::ChannelOpened { id } => Ok(id),
100 ManagerResponse::Error { description } => {
101 Err(io::Error::new(io::ErrorKind::Other, description))
102 }
103 x => Err(io::Error::new(
104 io::ErrorKind::InvalidData,
105 format!("[Conn {connection_id}] Raw channel open unexpected response: {x:?}"),
106 )),
107 },
108 None => Err(io::Error::new(
109 io::ErrorKind::ConnectionAborted,
110 format!("[Conn {connection_id}] Raw channel mailbox aborted"),
111 )),
112 }?;
113
114 let (mut proxy, transport) = FramedTransport::pair(1);
116
117 let mut manager_channel = client.clone_channel();
118 let task = tokio::spawn(async move {
119 loop {
120 tokio::select! {
121 maybe_response = mailbox.next() => {
122 if maybe_response.is_none() {
123 debug!("[Conn {connection_id} :: Chan {channel_id}] Closing from no more responses");
124 break;
125 }
126
127 match maybe_response.unwrap().payload {
128 ManagerResponse::Channel { response, .. } => {
129 if let Err(x) = proxy.write_frame(response.to_bytes()).await {
130 error!(
131 "[Conn {connection_id} :: Chan {channel_id}] Write response failed: {x}"
132 );
133 }
134 }
135 ManagerResponse::ChannelClosed { .. } => {
136 break;
137 }
138 _ => continue,
139 }
140 }
141 result = proxy.read_frame() => {
142 match result {
143 Ok(Some(frame)) => {
144 let request = match UntypedRequest::from_slice(frame.as_item()) {
145 Ok(x) => x.into_owned(),
146 Err(x) => {
147 error!("[Conn {connection_id} :: Chan {channel_id}] Parse request failed: {x}");
148 continue;
149 }
150 };
151
152 if let Err(x) = manager_channel
155 .fire(ManagerRequest::Channel {
156 id: channel_id,
157 request,
158 })
159 .await
160 {
161 error!("[Conn {connection_id} :: Chan {channel_id}] Forward failed: {x}");
162 }
163 }
164 Ok(None) => {
165 debug!("[Conn {connection_id} :: Chan {channel_id}] Closing from no more requests");
166 break;
167 }
168 Err(x) => {
169 error!("[Conn {connection_id} :: Chan {channel_id}] Read frame failed: {x}");
170 }
171 }
172 }
173 }
174 }
175 });
176
177 Ok(RawChannel { transport, task })
178 }
179}