use std::collections::HashMap;
use std::sync::Arc;
use async_trait::async_trait;
use connection_utils::{Disconnected, Connected};
use cs_trace::child;
use serde::{Serialize, Deserialize};
use tokio::io::{AsyncRead, AsyncWrite, duplex};
use tokio::sync::Mutex;
use tokio::sync::mpsc::channel;
use tokio::try_join;
use anyhow::Result;
pub use super::rpc::RpcChannelsServiceClient;
use super::MultiplexedConnection;
use super::connected::MultiplexedConnectionConnected;
#[derive(Serialize, Deserialize, Clone, Debug)]
pub enum RpcMessage {
Client(String),
Server(String),
}
#[async_trait]
impl<TAsyncDuplex: AsyncRead + AsyncWrite + Send + 'static> Disconnected
for MultiplexedConnection<TAsyncDuplex>
{
async fn connect(mut self: Box<Self>) -> Result<Box<dyn Connected>> {
let stream = self.stream
.take()
.expect("Cannot find stream.");
let (on_data_channel_sink, on_data_channel_source) = channel(10);
let (on_rpc_channel_sink, on_rpc_channel_source) = channel(10);
let (client_duplex_source, client_duplex_sink) = duplex(4096);
let (server_duplex_source, server_duplex_sink) = duplex(4096);
let trace = &self.trace;
let client_trace = child!(trace, "client");
let server_trace = child!(trace, "server");
let channels = Arc::new(Mutex::new(HashMap::new()));
let server_channels = Arc::clone(&channels);
let (rpc_client, _) = try_join!(
async move {
MultiplexedConnectionConnected::connect_client(
&client_trace,
client_duplex_source,
).await
},
async move {
MultiplexedConnectionConnected::connect_server(
&server_trace,
server_channels,
server_duplex_source,
on_data_channel_sink,
).await
},
)?;
return Ok(
Box::new(
MultiplexedConnectionConnected::new(
&self.trace,
channels,
Arc::new(rpc_client),
Some(on_rpc_channel_source),
on_data_channel_source,
on_rpc_channel_sink,
stream,
client_duplex_sink,
server_duplex_sink,
),
),
);
}
async fn listen(mut self: Box<Self>) -> Result<Box<dyn Connected>> {
return self.connect().await;
}
}