use super::AcceptStopHandle;
use crate::async_rt;
use crate::endpoint::Endpoint;
use crate::error::ZmqError;
use crate::task_handle::TaskHandle;
use crate::ZmqResult;
use flume as ac;
use futures::channel::oneshot;
use futures::{select, FutureExt};
use once_cell::sync::Lazy;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
static REGISTRY: Lazy<Mutex<HashMap<String, ac::Sender<InprocPeer>>>> =
Lazy::new(|| Mutex::new(HashMap::new()));
static PENDING: Lazy<Mutex<HashMap<String, Vec<InprocPeer>>>> =
Lazy::new(|| Mutex::new(HashMap::new()));
pub(crate) struct InprocChannelInfo {
pub(crate) tx: crate::engine::InprocInboundTx,
pub(crate) notify: Arc<crate::async_rt::notify::RuntimeNotify>,
pub(crate) socket_type: crate::SocketType,
pub(crate) routing_id: Option<crate::PeerIdentity>,
}
pub(crate) struct InprocPeer {
pub(crate) endpoint: Endpoint,
pub(crate) send_inbound: oneshot::Sender<InprocChannelInfo>,
pub(crate) recv_inbound: oneshot::Receiver<InprocChannelInfo>,
pub(crate) send_key: oneshot::Sender<crate::engine::registry::PeerKey>,
pub(crate) recv_key: oneshot::Receiver<crate::engine::registry::PeerKey>,
}
fn peer_pair(name: &str) -> (InprocPeer, InprocPeer) {
let (a_inbound_tx, b_inbound_rx) = oneshot::channel();
let (b_inbound_tx, a_inbound_rx) = oneshot::channel();
let (a_key_tx, b_key_rx) = oneshot::channel();
let (b_key_tx, a_key_rx) = oneshot::channel();
let ep = Endpoint::Inproc(name.to_string());
let client = InprocPeer {
endpoint: ep.clone(),
send_inbound: a_inbound_tx,
recv_inbound: a_inbound_rx,
send_key: a_key_tx,
recv_key: a_key_rx,
};
let server = InprocPeer {
endpoint: ep,
send_inbound: b_inbound_tx,
recv_inbound: b_inbound_rx,
send_key: b_key_tx,
recv_key: b_key_rx,
};
(client, server)
}
pub(crate) async fn connect(name: &str) -> ZmqResult<InprocPeer> {
let (client, server) = peer_pair(name);
let outcome: Option<ac::Sender<InprocPeer>> = {
let registry = REGISTRY.lock().unwrap();
if let Some(tx) = registry.get(name).cloned() {
Some(tx)
} else {
PENDING
.lock()
.unwrap()
.entry(name.to_string())
.or_default()
.push(server);
return Ok(client);
}
};
let tx = outcome.expect("set in REGISTRY branch above");
tx.send_async(server)
.await
.map_err(|_e| ZmqError::Socket("inproc: accept task closed before connect".into()))?;
Ok(client)
}
struct InprocRegistryGuard(String);
impl Drop for InprocRegistryGuard {
fn drop(&mut self) {
REGISTRY.lock().unwrap().remove(&self.0);
}
}
pub(crate) async fn begin_accept<T>(
name: String,
cback: impl Fn(ZmqResult<InprocPeer>) -> T + Send + 'static,
) -> ZmqResult<(Endpoint, AcceptStopHandle)>
where
T: std::future::Future<Output = ()> + Send + 'static,
{
let (handoff_tx, handoff_rx) = ac::bounded::<InprocPeer>(8);
let (stop_tx, stop_rx) = futures::channel::oneshot::channel::<()>();
let pending_peers: Vec<InprocPeer> = {
let mut registry = REGISTRY.lock().unwrap();
if registry.contains_key(&name) {
return Err(ZmqError::AddressInUse(Endpoint::Inproc(name)));
}
registry.insert(name.clone(), handoff_tx.clone());
PENDING.lock().unwrap().remove(&name).unwrap_or_default()
};
for peer in pending_peers {
if handoff_tx.send_async(peer).await.is_err() {
break;
}
}
let endpoint = Endpoint::Inproc(name.clone());
let task_handle = async_rt::task::spawn(async move {
let mut stop_rx = stop_rx.fuse();
loop {
select! {
incoming = handoff_rx.recv_async().fuse() => {
match incoming {
Ok(peer) => {
async_rt::task::spawn(cback(Ok(peer)));
}
Err(_) => break,
}
}
_ = stop_rx => break,
}
}
Ok(())
});
let guard: Box<dyn std::any::Any + Send + Sync> = Box::new(InprocRegistryGuard(name));
Ok((
endpoint,
AcceptStopHandle::with_guard(TaskHandle::new(stop_tx, task_handle), guard),
))
}