use crate::mux::demux::{Demuxer, MultiReceiver, ReceiverDemuxer};
use crate::mux::error::MuxError;
use crate::mux::protocol::{ClientId, MultiMessage};
use crate::mux::sender::{MultiSender, SubChannelSender};
use crate::mux::subchannel_endpoint::{BytesSubReceiver, BytesSubSender, SubReceiver, SubSender};
use crate::mux::subchannel_lifecycle;
use ipc_channel::ipc::{self, IpcOneShotServer, IpcReceiver};
use serde::{Deserialize, Serialize};
use std::fmt;
use std::io;
use std::marker::PhantomData;
use std::sync::{Arc, Mutex};
use tracing::instrument;
use uuid::Uuid;
pub struct Channel {
multi_sender: Arc<Mutex<MultiSender>>,
multi_receiver: Arc<MultiReceiver>,
}
impl Channel {
#[instrument(level = "debug", err(level = "debug"))]
pub fn new() -> Result<Channel, MuxError> {
let (ms, mr) = multi_channel()?;
Ok(Channel {
multi_sender: ms,
multi_receiver: mr,
})
}
#[instrument(level = "debug", skip(self))]
pub fn sub_channel<T>(&self) -> (SubSender<T>, SubReceiver<T>)
where
T: for<'de> Deserialize<'de> + Serialize,
{
let scs = SubChannelSender::new(Arc::clone(&self.multi_sender));
let scid = scs.sub_channel_id();
self.multi_sender
.lock()
.unwrap()
.insert_sub_receiver_proxy(scid, subchannel_lifecycle::SubReceiverProxy::new());
let scr = MultiReceiver::attach(&self.multi_receiver, scid);
(SubSender::from_sender(scs), SubReceiver::from_receiver(scr))
}
#[instrument(level = "debug", skip(self))]
pub fn bytes_sub_channel(&self) -> (BytesSubSender, BytesSubReceiver) {
let scs = SubChannelSender::new(Arc::clone(&self.multi_sender));
let scid = scs.sub_channel_id();
self.multi_sender
.lock()
.unwrap()
.insert_sub_receiver_proxy(scid, subchannel_lifecycle::SubReceiverProxy::new());
let scr = MultiReceiver::attach(&self.multi_receiver, scid);
(
BytesSubSender::from_sender(scs),
BytesSubReceiver::from_receiver(scr),
)
}
}
pub struct SubOneShotServer<T> {
one_shot_multi_server: OneShotMultiServer,
name: String,
phantom: PhantomData<T>,
}
impl<T> std::fmt::Debug for SubOneShotServer<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("SubOneShotServer")
.field("name", &self.name)
.finish_non_exhaustive()
}
}
impl<T> SubOneShotServer<T>
where
T: for<'de> Deserialize<'de> + Serialize,
{
#[instrument(level = "debug", ret, err(level = "debug"))]
pub fn new() -> Result<(SubOneShotServer<T>, String), MuxError> {
let (one_shot_multi_server, name) = OneShotMultiServer::new()?;
Ok((
SubOneShotServer {
one_shot_multi_server,
name: name.clone(),
phantom: PhantomData,
},
name,
))
}
#[instrument(level = "debug", err(level = "debug"))]
pub fn accept(self) -> Result<(SubReceiver<T>, T), MuxError> {
let multi_receiver = self.one_shot_multi_server.accept()?;
let (subchannel_id, name) = MultiReceiver::receive_sub_channel(&multi_receiver)
.expect("receive sub channel failed");
if name != self.name {
return Err(MuxError::InternalError(format!(
"unexpected sub channel name {name}"
)));
}
let sub_receiver = MultiReceiver::attach(&multi_receiver, subchannel_id);
let msg: T = sub_receiver.recv()?;
Ok((SubReceiver::from_receiver(sub_receiver), msg))
}
}
#[instrument(level = "debug", ret, err(level = "debug"))]
fn multi_channel() -> Result<(Arc<Mutex<MultiSender>>, Arc<MultiReceiver>), io::Error> {
let (ipc_sender, ipc_receiver) = ipc::channel()?;
let (ipc_response_sender, ipc_response_receiver) = ipc::channel()?;
let client_id = ClientId::new();
let multi_receiver = MultiReceiver::new(
Uuid::new_v4(),
ReceiverDemuxer::new(
ipc_receiver,
Arc::new(Mutex::new(Demuxer::with_sender(
client_id,
ipc_response_sender,
))),
),
);
let multi_receiver_rc = Arc::new(multi_receiver);
let multi_sender = MultiSender::new(client_id, Arc::new(ipc_sender), ipc_response_receiver);
Ok((Arc::new(Mutex::new(multi_sender)), multi_receiver_rc))
}
struct OneShotMultiServer {
multi_server: IpcOneShotServer<MultiMessage>,
}
impl OneShotMultiServer {
#[instrument(level = "debug", err(level = "debug"))]
fn new() -> Result<(OneShotMultiServer, String), io::Error> {
let (multi_server, name) = IpcOneShotServer::new()?;
Ok((OneShotMultiServer { multi_server }, name))
}
#[instrument(level = "debug", skip(self), ret, err(level = "debug"))]
fn accept(self) -> Result<Arc<MultiReceiver>, MuxError> {
let (ipc_receiver, multi_message): (IpcReceiver<MultiMessage>, MultiMessage) =
self.multi_server.accept()?;
let mr = MultiReceiver::new(
Uuid::new_v4(),
ReceiverDemuxer::new(ipc_receiver, Arc::new(Mutex::new(Demuxer::empty()))),
);
let mr_rc = Arc::new(mr);
mr_rc.handle_initial_message(multi_message)?;
Ok(mr_rc)
}
}