use std::io;
use std::sync::Arc;
use shm_primitives::PeerId;
#[cfg(windows)]
use shm_primitives_async::create_mmap_control_receiver_server;
use shm_primitives_async::{
Doorbell, DoorbellHandle, MmapControlHandle, MmapControlReceiver, MmapControlSender,
create_mmap_control_pair,
};
use crate::ShmLink;
use crate::mmap_registry::{MmapChannelRx, MmapChannelTx};
use crate::segment::Segment;
fn io_other(msg: impl Into<String>) -> io::Error {
io::Error::other(msg.into())
}
#[cfg(unix)]
use std::os::unix::io::RawFd;
#[cfg(unix)]
fn dup_fd(fd: RawFd) -> io::Result<RawFd> {
let duplicated = unsafe { libc::dup(fd) };
if duplicated < 0 {
Err(io::Error::last_os_error())
} else {
Ok(duplicated)
}
}
pub struct HostHub {
segment: Arc<Segment>,
}
impl HostHub {
pub fn new(segment: Arc<Segment>) -> Self {
Self { segment }
}
pub fn segment(&self) -> &Arc<Segment> {
&self.segment
}
pub fn prepare_peer(&self) -> io::Result<PreparedPeer> {
let peer_id = self
.segment
.reserve_peer()
.ok_or_else(|| io_other("no free SHM peer slots"))?;
let (host_doorbell, guest_doorbell) = Doorbell::create_pair()?;
#[cfg(unix)]
shm_primitives_async::clear_cloexec(guest_doorbell.as_raw_fd())?;
let (host_mmap_tx, guest_mmap_rx) = create_mmap_control_pair()?;
#[cfg(unix)]
let (host, guest) = {
shm_primitives_async::clear_cloexec(guest_mmap_rx.as_raw_fd())?;
let host_mmap_rx_fd = dup_fd(host_mmap_tx.as_raw_fd())?;
let guest_mmap_tx_fd = dup_fd(guest_mmap_rx.as_raw_fd())?;
shm_primitives_async::clear_cloexec(guest_mmap_tx_fd)?;
let host = HostPeer {
segment: Arc::clone(&self.segment),
peer_id,
doorbell: host_doorbell,
mmap_tx: host_mmap_tx,
mmap_rx: unsafe { MmapControlHandle::from_raw_fd(host_mmap_rx_fd) },
};
let guest = GuestSpawnTicket {
peer_id,
doorbell: guest_doorbell,
mmap_rx: guest_mmap_rx,
mmap_tx_fd: guest_mmap_tx_fd,
};
(host, guest)
};
#[cfg(windows)]
let (host, guest) = {
let (host_mmap_rx, mmap_tx_pipe) = create_mmap_control_receiver_server()?;
let host = HostPeer {
segment: Arc::clone(&self.segment),
peer_id,
doorbell: host_doorbell,
mmap_tx: host_mmap_tx,
mmap_rx: host_mmap_rx,
};
let guest = GuestSpawnTicket {
peer_id,
doorbell: guest_doorbell,
mmap_rx: guest_mmap_rx,
mmap_tx_pipe,
};
(host, guest)
};
Ok(PreparedPeer { host, guest })
}
}
pub struct HostPeer {
segment: Arc<Segment>,
peer_id: PeerId,
doorbell: Doorbell,
mmap_tx: MmapControlSender,
#[cfg(unix)]
mmap_rx: MmapControlHandle,
#[cfg(windows)]
mmap_rx: MmapControlReceiver,
}
impl HostPeer {
pub fn peer_id(&self) -> PeerId {
self.peer_id
}
pub fn into_link(self) -> io::Result<ShmLink> {
#[cfg(unix)]
let mmap_rx = MmapControlReceiver::from_handle(self.mmap_rx)?;
#[cfg(windows)]
let mmap_rx = self.mmap_rx;
Ok(ShmLink::for_host(
self.segment,
self.peer_id,
self.doorbell,
MmapChannelTx::Real(self.mmap_tx),
MmapChannelRx::Real(mmap_rx),
))
}
pub fn release_reservation(self) {
self.segment.release_reserved_peer(self.peer_id);
}
}
pub struct GuestSpawnTicket {
pub peer_id: PeerId,
pub doorbell: DoorbellHandle,
pub mmap_rx: MmapControlHandle,
#[cfg(unix)]
pub mmap_tx_fd: RawFd,
#[cfg(windows)]
pub mmap_tx_pipe: String,
}
impl GuestSpawnTicket {
pub fn doorbell_arg(&self) -> String {
self.doorbell.to_arg()
}
pub fn mmap_rx_arg(&self) -> String {
self.mmap_rx.to_arg()
}
pub fn mmap_tx_arg(&self) -> String {
#[cfg(unix)]
{
self.mmap_tx_fd.to_string()
}
#[cfg(windows)]
{
self.mmap_tx_pipe.clone()
}
}
}
pub struct PreparedPeer {
host: HostPeer,
guest: GuestSpawnTicket,
}
impl PreparedPeer {
pub fn into_parts(self) -> (HostPeer, GuestSpawnTicket) {
(self.host, self.guest)
}
}
#[derive(Debug, Clone, Copy, Default)]
pub struct AddPeerOptions;
pub type MultiPeerHostDriver = ShmHost;
pub struct ShmHost {
hub: HostHub,
}
impl ShmHost {
pub fn new(segment: Arc<Segment>) -> Self {
Self {
hub: HostHub::new(segment),
}
}
pub fn segment(&self) -> &Arc<Segment> {
self.hub.segment()
}
pub fn add_peer(&self, _options: AddPeerOptions) -> io::Result<PreparedPeer> {
self.hub.prepare_peer()
}
}
#[cfg(unix)]
pub unsafe fn guest_link_from_raw(
segment: Arc<Segment>,
peer_id: PeerId,
doorbell_fd: RawFd,
mmap_rx_fd: RawFd,
mmap_tx_fd: RawFd,
claim_reserved: bool,
) -> io::Result<ShmLink> {
if claim_reserved {
segment.claim_peer(peer_id).map_err(|state| {
io_other(format!(
"failed to claim reserved peer {} (state: {state:?})",
peer_id.get()
))
})?;
}
let doorbell = unsafe { Doorbell::from_handle(DoorbellHandle::from_raw_fd(doorbell_fd)) }?;
let mmap_rx =
MmapControlReceiver::from_handle(unsafe { MmapControlHandle::from_raw_fd(mmap_rx_fd) })?;
let mmap_tx = unsafe { MmapControlSender::from_raw_fd(mmap_tx_fd) };
Ok(ShmLink::for_guest(
segment,
peer_id,
doorbell,
MmapChannelTx::Real(mmap_tx),
MmapChannelRx::Real(mmap_rx),
))
}
#[cfg(unix)]
pub unsafe fn guest_link_from_ticket(
segment: Arc<Segment>,
ticket: GuestSpawnTicket,
claim_reserved: bool,
) -> io::Result<ShmLink> {
let peer_id = ticket.peer_id;
let doorbell_fd = ticket.doorbell.into_raw_fd();
let mmap_rx_fd = ticket.mmap_rx.into_raw_fd();
let mmap_tx_fd = ticket.mmap_tx_fd;
unsafe {
guest_link_from_raw(
segment,
peer_id,
doorbell_fd,
mmap_rx_fd,
mmap_tx_fd,
claim_reserved,
)
}
}
#[cfg(windows)]
pub fn guest_link_from_names(
segment: Arc<Segment>,
peer_id: PeerId,
doorbell_pipe: &str,
mmap_rx_pipe: &str,
mmap_tx_pipe: &str,
claim_reserved: bool,
) -> io::Result<ShmLink> {
if claim_reserved {
segment.claim_peer(peer_id).map_err(|state| {
io_other(format!(
"failed to claim reserved peer {} (state: {state:?})",
peer_id.get()
))
})?;
}
let doorbell =
Doorbell::from_handle(DoorbellHandle::from_pipe_name(doorbell_pipe.to_string()))?;
let mmap_rx_handle = unsafe { MmapControlHandle::from_arg(mmap_rx_pipe) }
.map_err(|e| io_other(format!("mmap_rx pipe name: {e}")))?;
let mmap_rx = MmapControlReceiver::from_handle(mmap_rx_handle)?;
let mmap_tx_sender = MmapControlSender::connect(mmap_tx_pipe)?;
Ok(ShmLink::for_guest(
segment,
peer_id,
doorbell,
MmapChannelTx::Real(mmap_tx_sender),
MmapChannelRx::Real(mmap_rx),
))
}
#[cfg(windows)]
pub fn guest_link_from_ticket_windows(
segment: Arc<Segment>,
ticket: GuestSpawnTicket,
claim_reserved: bool,
) -> io::Result<ShmLink> {
if claim_reserved {
segment.claim_peer(ticket.peer_id).map_err(|state| {
io_other(format!(
"failed to claim reserved peer {} (state: {state:?})",
ticket.peer_id.get()
))
})?;
}
let doorbell = Doorbell::from_handle(ticket.doorbell)?;
let mmap_rx = MmapControlReceiver::from_handle(ticket.mmap_rx)?;
let mmap_tx = MmapControlSender::connect(&ticket.mmap_tx_pipe)?;
Ok(ShmLink::for_guest(
segment,
ticket.peer_id,
doorbell,
MmapChannelTx::Real(mmap_tx),
MmapChannelRx::Real(mmap_rx),
))
}
pub async fn create_test_link_pair(segment: Arc<Segment>) -> io::Result<(ShmLink, ShmLink)> {
let hub = HostHub::new(Arc::clone(&segment));
let prepared = hub.prepare_peer()?;
let (host_peer, ticket) = prepared.into_parts();
let host_link = host_peer.into_link()?;
#[cfg(unix)]
let guest_link = unsafe { guest_link_from_ticket(segment, ticket, true) }?;
#[cfg(windows)]
let guest_link = guest_link_from_ticket_windows(segment, ticket, true)?;
#[cfg(windows)]
host_link.accept_doorbell().await?;
Ok((host_link, guest_link))
}
#[cfg(test)]
mod tests {
use super::*;
use crate::segment::SegmentConfig;
use crate::varslot::SizeClassConfig;
use shm_primitives::FileCleanup;
use vox_types::{Link, LinkRx, LinkTx, LinkTxPermit, WriteSlot};
#[tokio::test]
async fn host_hub_builds_host_and_guest_links() {
let dir = tempfile::tempdir().expect("tempdir");
let path = dir.path().join("host-hub.shm");
let classes = [SizeClassConfig {
slot_size: 4096,
slot_count: 8,
}];
let segment = Arc::new(
Segment::create(
&path,
SegmentConfig {
max_guests: 1,
bipbuf_capacity: 64 * 1024,
max_payload_size: 4096,
inline_threshold: 256,
heartbeat_interval: 0,
size_classes: &classes,
},
FileCleanup::Manual,
)
.expect("create segment"),
);
let hub = HostHub::new(Arc::clone(&segment));
let prepared = hub.prepare_peer().expect("prepare peer");
let (host_peer, ticket) = prepared.into_parts();
let host_link = host_peer.into_link().expect("build host link");
#[cfg(unix)]
let guest_link = {
unsafe { guest_link_from_ticket(Arc::clone(&segment), ticket, true) }
.expect("build guest link")
};
#[cfg(windows)]
let guest_link = {
guest_link_from_ticket_windows(Arc::clone(&segment), ticket, true)
.expect("build guest link")
};
#[cfg(windows)]
host_link.accept_doorbell().await.expect("accept doorbell");
let (host_tx, mut host_rx) = host_link.split();
let (guest_tx, mut guest_rx) = guest_link.split();
let permit = host_tx.reserve().await.expect("reserve host tx");
let mut slot = permit.alloc(4).expect("alloc host slot");
slot.as_mut_slice().copy_from_slice(b"ping");
slot.commit();
let msg = guest_rx
.recv()
.await
.expect("recv guest")
.expect("guest payload");
assert_eq!(msg.as_bytes(), b"ping");
let permit = guest_tx.reserve().await.expect("reserve guest tx");
let mut slot = permit.alloc(4).expect("alloc guest slot");
slot.as_mut_slice().copy_from_slice(b"pong");
slot.commit();
let msg = host_rx
.recv()
.await
.expect("recv host")
.expect("host payload");
assert_eq!(msg.as_bytes(), b"pong");
}
#[tokio::test]
async fn releasing_reservation_allows_reuse() {
let dir = tempfile::tempdir().expect("tempdir");
let path = dir.path().join("host-hub-reuse.shm");
let classes = [SizeClassConfig {
slot_size: 4096,
slot_count: 4,
}];
let segment = Arc::new(
Segment::create(
&path,
SegmentConfig {
max_guests: 1,
bipbuf_capacity: 16 * 1024,
max_payload_size: 1024,
inline_threshold: 256,
heartbeat_interval: 0,
size_classes: &classes,
},
FileCleanup::Manual,
)
.expect("create segment"),
);
let hub = HostHub::new(Arc::clone(&segment));
let prepared = hub.prepare_peer().expect("first prepare");
let (host_peer, _ticket) = prepared.into_parts();
host_peer.release_reservation();
let _prepared2 = hub.prepare_peer().expect("second prepare");
}
}