#![doc = include_str!("../README.md")]
#![cfg_attr(feature = "document-features", doc = document_features::document_features!())]
#![no_std]
#![cfg_attr(not(test), warn(
missing_debug_implementations,
missing_docs,
//trivial_casts,
trivial_numeric_casts,
unused_extern_crates,
unused_import_braces,
unused_qualifications,
//unused_results
))]
#![cfg_attr(test, deny(
missing_debug_implementations,
missing_docs,
//trivial_casts,
trivial_numeric_casts,
unused_extern_crates,
unused_import_braces,
unused_qualifications,
unused_must_use,
//unused_results
))]
#![cfg_attr(
test,
deny(
bad_style,
dead_code,
improper_ctypes,
non_shorthand_field_patterns,
no_mangle_generic_items,
overflowing_literals,
path_statements,
patterns_in_fns_without_body,
unconditional_recursion,
unused,
unused_allocation,
unused_comparisons,
unused_parens,
while_true
)
)]
extern crate alloc;
#[cfg(feature = "std")]
extern crate std;
#[cfg(feature = "std")]
use alloc::boxed::Box;
#[cfg(feature = "std")]
use alloc::string::ToString;
use alloc::{string::String, vec, vec::Vec};
#[cfg(not(target_pointer_width = "64"))]
use core::sync::atomic::AtomicU32;
#[cfg(target_pointer_width = "64")]
use core::sync::atomic::AtomicU64;
use core::{
cmp::max,
fmt::Debug,
hint,
mem::size_of,
num::NonZeroUsize,
ops::{BitAnd, BitOr, Not},
ptr, slice,
sync::atomic::{AtomicU16, Ordering, fence},
time::Duration,
};
#[cfg(feature = "std")]
use core::{mem::offset_of, net::SocketAddr, ptr::write_unaligned};
#[cfg(all(debug_assertions, feature = "llmp_debug", feature = "std"))]
use std::backtrace::Backtrace;
#[cfg(feature = "std")]
use std::{
env,
io::{ErrorKind, Read, Write},
net::{TcpListener, TcpStream, ToSocketAddrs},
sync::mpsc::channel,
thread,
};
#[cfg(all(unix, feature = "alloc"))]
use exceptional::unix_signals::SignalHandler;
#[cfg(all(unix, feature = "alloc", not(miri)))]
use exceptional::unix_signals::setup_signal_handler;
#[cfg(all(unix, feature = "alloc"))]
use exceptional::unix_signals::{Signal, siginfo_t, ucontext_t};
#[cfg(all(windows, feature = "std"))]
use exceptional::windows_exceptions::{CtrlHandler, setup_ctrl_handler};
#[cfg(feature = "std")]
use libafl_core::IP_LOCALHOST;
use libafl_core::{ClientId, Error, format};
#[cfg(all(unix, feature = "std"))]
#[cfg(not(any(target_os = "solaris", target_os = "illumos")))]
use nix::sys::socket::{self, sockopt::ReusePort};
#[cfg(feature = "std")]
use no_std_time::current_time;
use serde::{Deserialize, Serialize};
use shmem_providers::{ShMem, ShMemDescription, ShMemId, ShMemProvider};
#[cfg(feature = "std")]
use tuple_list::tuple_list;
const LLMP_CFG_MAX_PENDING_UNREAD_PAGES: usize = 3;
#[cfg(not(feature = "llmp_small_maps"))]
const LLMP_CFG_INITIAL_MAP_SIZE: usize = 1 << 28;
#[cfg(feature = "llmp_small_maps")]
const LLMP_CFG_INITIAL_MAP_SIZE: usize = 1 << 20;
const LLMP_CFG_ALIGNNMENT: usize = 64;
const LLMP_TAG_UNSET: Tag = Tag(0xDEADAF);
const LLMP_TAG_UNINITIALIZED: Tag = Tag(0xA143AF11);
const LLMP_TAG_END_OF_PAGE: Tag = Tag(0xAF1E0F1);
const LLMP_TAG_NEW_SHM_CLIENT: Tag = Tag(0xC11E471);
const LLMP_TAG_CLIENT_EXIT: Tag = Tag(0xC11E472);
const LLMP_TAG_EXITING: Tag = Tag(0x13C5171);
const LLMP_SLOW_RECEIVER_PANIC: Tag = Tag(0x70051041);
pub const LLMP_FLAG_INITIALIZED: Flags = Flags(0x0);
pub const LLMP_FLAG_COMPRESSED: Flags = Flags(0x1);
pub const LLMP_FLAG_FROM_B2B: Flags = Flags(0x2);
pub const LLMP_FLAG_FROM_MM: Flags = Flags(0x4);
const _LLMP_B2B_BLOCK_TIME: Duration = Duration::from_millis(3_000);
#[cfg(feature = "llmp_bind_public")]
const _LLMP_BIND_ADDR: &str = "0.0.0.0";
#[cfg(not(feature = "llmp_bind_public"))]
const _LLMP_BIND_ADDR: &str = "127.0.0.1";
const _NULL_ENV_STR: &str = "_NULL";
const PAGE_INITIALIZED_MAGIC: u64 = 0x1A1A1A1A1A1A1AF1;
const PAGE_DEINITIALIZED_MAGIC: u64 = 0xDEADC0FEAF1BEEF1;
const EOP_MSG_SIZE: usize =
llmp_align(size_of::<LlmpMsg>() + size_of::<LlmpPayloadSharedMapInfo>());
const LLMP_PAGE_HEADER_LEN: usize = size_of::<LlmpPage>();
#[cfg(any(unix, all(windows, feature = "std")))]
static mut LLMP_SIGHANDLER_STATE: LlmpShutdownSignalHandler = LlmpShutdownSignalHandler {
shutting_down: false,
};
#[repr(transparent)]
#[derive(Copy, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
pub struct Tag(pub u32);
impl Debug for Tag {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
f.write_fmt(format_args!("Tag({:X})", self.0))
}
}
#[repr(transparent)]
#[derive(
Debug, Default, Copy, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize,
)]
pub struct BrokerId(pub u32);
#[repr(transparent)]
#[derive(Copy, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct Flags(pub u32);
impl Debug for Flags {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
f.write_fmt(format_args!("Flags{:x}( ", self.0))?;
if *self & LLMP_FLAG_COMPRESSED == LLMP_FLAG_COMPRESSED {
f.write_str("COMPRESSED")?;
}
if *self & LLMP_FLAG_FROM_B2B == LLMP_FLAG_FROM_B2B {
f.write_str("FROM_B2B")?;
}
f.write_str(" )")
}
}
impl BitAnd for Flags {
type Output = Self;
fn bitand(self, rhs: Self) -> Self::Output {
Self(self.0 & rhs.0)
}
}
impl BitOr for Flags {
type Output = Self;
fn bitor(self, rhs: Self) -> Self::Output {
Self(self.0 | rhs.0)
}
}
impl Not for Flags {
type Output = Self;
fn not(self) -> Self::Output {
Self(!self.0)
}
}
#[cfg(target_pointer_width = "64")]
#[repr(transparent)]
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
pub struct MessageId(u64);
#[cfg(not(target_pointer_width = "64"))]
#[repr(transparent)]
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
pub struct MessageId(u32);
#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum TcpRequest {
LocalClientHello {
shmem_description: ShMemDescription,
},
RemoteBrokerHello {
hostname: String,
},
ClientQuit {
client_id: ClientId,
},
}
#[cfg(feature = "std")]
impl TryFrom<&Vec<u8>> for TcpRequest {
type Error = Error;
fn try_from(bytes: &Vec<u8>) -> Result<Self, Error> {
Ok(postcard::from_bytes(bytes)?)
}
}
#[cfg(feature = "std")]
impl TryFrom<Vec<u8>> for TcpRequest {
type Error = Error;
fn try_from(bytes: Vec<u8>) -> Result<Self, Error> {
Ok(postcard::from_bytes(&bytes)?)
}
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct TcpRemoteNewMessage {
client_id: ClientId,
tag: Tag,
flags: Flags,
payload: Vec<u8>,
}
#[cfg(feature = "std")]
impl TryFrom<&Vec<u8>> for TcpRemoteNewMessage {
type Error = Error;
fn try_from(bytes: &Vec<u8>) -> Result<Self, Error> {
Ok(postcard::from_bytes(bytes)?)
}
}
#[cfg(feature = "std")]
impl TryFrom<Vec<u8>> for TcpRemoteNewMessage {
type Error = Error;
fn try_from(bytes: Vec<u8>) -> Result<Self, Error> {
Ok(postcard::from_bytes(&bytes)?)
}
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum TcpResponse {
BrokerConnectHello {
broker_shmem_description: ShMemDescription,
hostname: String,
},
LocalClientAccepted {
client_id: ClientId,
},
RemoteBrokerAccepted {
broker_id: BrokerId,
},
Error {
description: String,
},
}
#[cfg(feature = "std")]
impl TryFrom<&Vec<u8>> for TcpResponse {
type Error = Error;
fn try_from(bytes: &Vec<u8>) -> Result<Self, Error> {
Ok(postcard::from_bytes(bytes)?)
}
}
#[cfg(feature = "std")]
impl TryFrom<Vec<u8>> for TcpResponse {
type Error = Error;
fn try_from(bytes: Vec<u8>) -> Result<Self, Error> {
Ok(postcard::from_bytes(&bytes)?)
}
}
#[cfg(feature = "std")]
#[derive(Debug)]
pub enum Listener {
Tcp(TcpListener),
}
#[cfg(feature = "std")]
#[derive(Debug)]
pub enum ListenerStream {
Tcp(TcpStream, SocketAddr),
Empty(),
}
#[cfg(feature = "std")]
impl Listener {
fn accept(&self) -> ListenerStream {
match self {
Listener::Tcp(inner) => match inner.accept() {
Ok(res) => ListenerStream::Tcp(res.0, res.1),
Err(err) => {
log::warn!("Ignoring failed accept: {err:?}");
ListenerStream::Empty()
}
},
}
}
}
#[inline]
#[expect(clippy::cast_ptr_alignment)]
unsafe fn shmem2page_mut<SHM: ShMem>(afl_shmem: &mut SHM) -> *mut LlmpPage {
afl_shmem.as_mut_ptr() as *mut LlmpPage
}
#[inline]
#[expect(clippy::cast_ptr_alignment)]
unsafe fn shmem2page<SHM: ShMem>(afl_shmem: &SHM) -> *const LlmpPage {
afl_shmem.as_ptr() as *const LlmpPage
}
#[inline]
unsafe fn llmp_msg_in_page(page: *const LlmpPage, msg: *const LlmpMsg) -> bool {
unsafe {
(page as *const u8) < msg as *const u8
&& (page as *const u8).add((*page).size_total) > msg as *const u8
}
}
#[inline]
const fn llmp_align(to_align: usize) -> usize {
if LLMP_CFG_ALIGNNMENT == 0 {
return to_align;
}
let modulo = to_align % LLMP_CFG_ALIGNNMENT;
if modulo == 0 {
to_align
} else {
to_align + LLMP_CFG_ALIGNNMENT - modulo
}
}
#[cfg(feature = "std")]
#[inline]
fn msg_offset_from_env(env_name: &str) -> Result<Option<u64>, Error> {
let msg_offset_str = env::var(format!("{env_name}_OFFSET"))?;
Ok(if msg_offset_str == _NULL_ENV_STR {
None
} else {
Some(msg_offset_str.parse()?)
})
}
#[cfg(feature = "std")]
fn tcp_bind(port: u16) -> Result<TcpListener, Error> {
let listener = TcpListener::bind((_LLMP_BIND_ADDR, port))
.map_err(|err| Error::os_error(err, "Failed to bind to port {port}"))?;
#[cfg(unix)]
#[cfg(not(any(target_os = "solaris", target_os = "illumos")))]
socket::setsockopt(&listener, ReusePort, &true)?;
Ok(listener)
}
#[cfg(feature = "std")]
pub fn send_tcp_msg<T>(stream: &mut TcpStream, msg: &T) -> Result<(), Error>
where
T: Serialize,
{
let msg = postcard::to_allocvec(msg)?;
if msg.len() > u32::MAX as usize {
return Err(Error::illegal_state(format!(
"Trying to send message a tcp message > u32! (size: {})",
msg.len()
)));
}
#[cfg(feature = "llmp_debug")]
log::trace!("LLMP TCP: Sending {} bytes", msg.len());
let size_bytes = (msg.len() as u32).to_be_bytes();
stream.write_all(&size_bytes)?;
stream.write_all(&msg)?;
#[cfg(feature = "llmp_debug")]
log::trace!("LLMP TCP: Sending {} bytes finished.", msg.len());
Ok(())
}
#[cfg(feature = "std")]
pub fn recv_tcp_msg(stream: &mut TcpStream) -> Result<Vec<u8>, Error> {
#[cfg(feature = "llmp_debug")]
log::trace!(
"LLMP TCP: Waiting for packet... (Timeout: {:?})",
stream.read_timeout().unwrap_or(None)
);
let mut size_bytes = [0_u8; 4];
stream.read_exact(&mut size_bytes)?;
let size = u32::from_be_bytes(size_bytes);
let mut bytes = vec![0; size.try_into().unwrap()];
#[cfg(feature = "llmp_debug")]
log::trace!("LLMP TCP: Receiving payload of size {size}");
stream
.read_exact(&mut bytes)
.expect("Failed to read message body");
Ok(bytes)
}
#[inline]
fn next_shmem_size(max_alloc: usize) -> usize {
max(
max_alloc * 2 + EOP_MSG_SIZE + LLMP_PAGE_HEADER_LEN,
LLMP_CFG_INITIAL_MAP_SIZE - 1,
)
.next_power_of_two()
}
unsafe fn llmp_page_init<SHM: ShMem>(shmem: &mut SHM, sender_id: ClientId, allow_reinit: bool) {
unsafe {
#[cfg(feature = "llmp_debug")]
log::trace!("llmp_page_init: shmem {:?}", &shmem);
let map_size = shmem.len();
let page = shmem2page_mut(shmem);
#[cfg(feature = "llmp_debug")]
log::trace!("llmp_page_init: page {:?}", &(*page));
if !allow_reinit {
assert!(
(*page).magic != PAGE_INITIALIZED_MAGIC,
"Tried to initialize page {page:?} twice (for shmem {shmem:?})"
);
}
(*page).magic = PAGE_INITIALIZED_MAGIC;
(*page).sender_id = sender_id;
(*page).current_msg_id.store(0, Ordering::Relaxed);
(*page).max_alloc_size = 0;
(*page).size_total = map_size - LLMP_PAGE_HEADER_LEN;
(*page).size_used = 0;
(*(*page).messages.as_mut_ptr()).message_id = MessageId(0);
(*(*page).messages.as_mut_ptr()).tag = LLMP_TAG_UNSET;
(*page).receivers_joined_count.store(0, Ordering::Release);
(*page).receivers_left_count.store(0, Ordering::Relaxed);
assert!((*page).size_total != 0);
}
}
#[inline]
unsafe fn llmp_next_msg_ptr_checked<SHM: ShMem>(
map: &mut LlmpSharedMap<SHM>,
last_msg: *const LlmpMsg,
alloc_size: usize,
) -> Result<*mut LlmpMsg, Error> {
unsafe {
let page = map.page_mut();
let map_size = map.shmem.len();
let msg_begin_min = (page as *const u8).add(size_of::<LlmpPage>());
let msg_begin_max = (page as *const u8).add(map_size - alloc_size);
let next = llmp_next_msg_ptr(last_msg);
let next_ptr = next as *const u8;
if next_ptr >= msg_begin_min && next_ptr <= msg_begin_max {
Ok(next)
} else {
Err(Error::illegal_state(format!(
"Inconsistent data on sharedmap, or Bug (next_ptr was {:x}, sharedmap page was {:x})",
next_ptr as usize, page as usize
)))
}
}
}
#[inline]
#[expect(clippy::cast_ptr_alignment)]
unsafe fn llmp_next_msg_ptr(last_msg: *const LlmpMsg) -> *mut LlmpMsg {
unsafe {
(last_msg as *mut u8)
.add(size_of::<LlmpMsg>())
.add((*last_msg).buf_len_padded as usize) as *mut LlmpMsg
}
}
#[derive(Debug, Copy, Clone, Serialize, Deserialize)]
pub struct LlmpDescription {
shmem: ShMemDescription,
last_message_offset: Option<u64>,
}
#[derive(Debug, Copy, Clone)]
pub enum LlmpMsgHookResult {
Handled,
ForwardToClients,
}
#[derive(Debug, Copy, Clone)]
#[repr(C)]
pub struct LlmpMsg {
pub tag: Tag, pub sender: ClientId, pub broker: BrokerId, pub flags: Flags, pub message_id: MessageId, pub buf_len: u64,
pub buf_len_padded: u64,
pub buf: [u8; 0],
}
impl LlmpMsg {
#[must_use]
pub unsafe fn as_slice_unsafe(&self) -> &[u8] {
unsafe { slice::from_raw_parts(self.buf.as_ptr(), self.buf_len as usize) }
}
#[must_use]
pub unsafe fn as_slice_mut_unsafe(&mut self) -> &mut [u8] {
unsafe { slice::from_raw_parts_mut(self.buf.as_mut_ptr(), self.buf_len as usize) }
}
#[inline]
pub fn try_as_slice<SHM: ShMem>(&self, map: &mut LlmpSharedMap<SHM>) -> Result<&[u8], Error> {
unsafe {
if self.in_shmem(map) {
Ok(self.as_slice_unsafe())
} else {
Err(Error::illegal_state(
"Current message not in page. The sharedmap get tampered with or we have a BUG.",
))
}
}
}
#[inline]
pub fn try_as_slice_mut<SHM: ShMem>(
&mut self,
map: &mut LlmpSharedMap<SHM>,
) -> Result<&mut [u8], Error> {
unsafe {
if self.in_shmem(map) {
Ok(self.as_slice_mut_unsafe())
} else {
Err(Error::illegal_state(
"Current message not in page. The sharedmap get tampered with or we have a BUG.",
))
}
}
}
#[inline]
pub fn in_shmem<SHM: ShMem>(&self, map: &mut LlmpSharedMap<SHM>) -> bool {
let map_size = map.shmem.len();
let buf_ptr = self.buf.as_ptr();
let len = self.buf_len_padded as usize + size_of::<LlmpMsg>();
unsafe {
buf_ptr > (map.page_mut() as *const u8).add(size_of::<LlmpPage>())
&& buf_ptr.add(len).sub(size_of::<LlmpPage>())
<= (map.page_mut() as *const u8).add(map_size)
}
}
}
#[derive(Debug)]
pub enum LlmpConnection<HT, SHM, SP> {
IsBroker {
broker: LlmpBroker<HT, SHM, SP>,
},
IsClient {
client: LlmpClient<SHM, SP>,
},
}
impl<SHM, SP> LlmpConnection<(), SHM, SP>
where
SHM: ShMem,
SP: ShMemProvider<ShMem = SHM>,
{
#[cfg(feature = "std")]
pub fn on_port(shmem_provider: SP, port: u16) -> Result<Self, Error> {
match tcp_bind(port) {
Ok(listener) => {
log::info!("We're the broker");
let mut broker = LlmpBroker::new(shmem_provider, tuple_list!())?;
let _listener_thread = broker
.inner_mut()
.launch_listener(Listener::Tcp(listener))?;
Ok(LlmpConnection::IsBroker { broker })
}
Err(Error::OsError(e, ..)) if e.kind() == ErrorKind::AddrInUse => {
log::info!("We're the client (internal port already bound by broker, {e:#?})");
let client = LlmpClient::create_attach_to_tcp(shmem_provider, port)?;
let conn = LlmpConnection::IsClient { client };
Ok(conn)
}
Err(e) => {
log::error!("{e:?}");
Err(e)
}
}
}
#[cfg(feature = "std")]
pub fn broker_on_port(shmem_provider: SP, port: u16) -> Result<Self, Error> {
Ok(LlmpConnection::IsBroker {
broker: LlmpBroker::create_attach_to_tcp(shmem_provider, tuple_list!(), port)?,
})
}
#[cfg(feature = "std")]
pub fn client_on_port(shmem_provider: SP, port: u16) -> Result<Self, Error> {
let client = LlmpClient::create_attach_to_tcp(shmem_provider, port)?;
let conn = LlmpConnection::IsClient { client };
Ok(conn)
}
}
impl<MT, SHM, SP> LlmpConnection<MT, SHM, SP>
where
MT: LlmpHookTuple<SHM, SP>,
SHM: ShMem,
SP: ShMemProvider<ShMem = SHM>,
{
pub fn describe(&self) -> Result<LlmpClientDescription, Error> {
Ok(match self {
LlmpConnection::IsClient { client } => client.describe()?,
LlmpConnection::IsBroker { .. } => todo!("Only client can be described atm."),
})
}
pub fn existing_client_from_description(
shmem_provider: SP,
description: &LlmpClientDescription,
) -> Result<LlmpConnection<MT, SHM, SP>, Error> {
Ok(LlmpConnection::IsClient {
client: LlmpClient::existing_client_from_description(shmem_provider, description)?,
})
}
pub fn send_buf(&mut self, tag: Tag, buf: &[u8]) -> Result<(), Error> {
match self {
LlmpConnection::IsBroker { broker } => broker.inner.send_buf(tag, buf),
LlmpConnection::IsClient { client } => client.send_buf(tag, buf),
}
}
pub fn send_buf_with_flags(&mut self, tag: Tag, buf: &[u8], flags: Flags) -> Result<(), Error> {
match self {
LlmpConnection::IsBroker { broker } => {
broker.inner.send_buf_with_flags(tag, flags, buf)
}
LlmpConnection::IsClient { client } => client.send_buf_with_flags(tag, flags, buf),
}
}
}
#[derive(Debug)]
#[repr(C)]
pub struct LlmpPage {
pub magic: u64,
pub sender_id: ClientId,
pub receivers_joined_count: AtomicU16,
pub receivers_left_count: AtomicU16,
#[cfg(target_pointer_width = "64")]
pub current_msg_id: AtomicU64,
#[cfg(not(target_pointer_width = "64"))]
pub current_msg_id: AtomicU32,
pub size_total: usize,
pub size_used: usize,
pub max_alloc_size: usize,
pub messages: [LlmpMsg; 0],
}
impl LlmpPage {
#[inline]
fn receiver_joined(&mut self) {
let receivers_joined_count = &mut self.receivers_joined_count;
receivers_joined_count.store(1, Ordering::Relaxed);
}
#[inline]
fn receiver_left(&mut self) {
let receivers_left_count = &mut self.receivers_left_count;
receivers_left_count.store(1, Ordering::Relaxed);
}
}
#[derive(Debug, Copy, Clone)]
#[repr(C)]
struct LlmpPayloadSharedMapInfo {
pub map_size: usize,
pub shm_str: [u8; 20],
}
#[derive(Debug, Copy, Clone)]
#[repr(C, align(8))]
struct LlmpClientExitInfo {
pub client_id: u32,
}
#[derive(Debug)]
pub struct LlmpSender<SHM, SP> {
id: ClientId,
last_msg_sent: *const LlmpMsg,
out_shmems: Vec<LlmpSharedMap<SHM>>,
unused_shmem_cache: Vec<LlmpSharedMap<SHM>>,
keep_pages_forever: bool,
has_unsent_message: bool,
shmem_provider: SP,
}
impl<SHM, SP> LlmpSender<SHM, SP>
where
SHM: ShMem,
SP: ShMemProvider<ShMem = SHM>,
{
pub fn new(
mut shmem_provider: SP,
id: ClientId,
keep_pages_forever: bool,
) -> Result<Self, Error> {
#[cfg(feature = "llmp_debug")]
log::info!(
"PID: {:#?} Initializing LlmpSender {:#?}",
std::process::id(),
id
);
Ok(Self {
id,
last_msg_sent: ptr::null_mut(),
out_shmems: vec![LlmpSharedMap::new(
id,
shmem_provider.new_shmem(LLMP_CFG_INITIAL_MAP_SIZE)?,
)],
keep_pages_forever,
has_unsent_message: false,
shmem_provider,
unused_shmem_cache: vec![],
})
}
#[cfg(feature = "std")]
pub fn on_existing_from_env(mut shmem_provider: SP, env_name: &str) -> Result<Self, Error> {
let msg_sent_offset = msg_offset_from_env(env_name)?;
let mut ret = Self::on_existing_shmem(
shmem_provider.clone(),
shmem_provider.existing_from_env(env_name)?,
msg_sent_offset,
)?;
ret.id = Self::client_id_from_env(env_name)?.unwrap_or_default();
#[cfg(feature = "llmp_debug")]
log::info!(
"PID: {:#?} Initializing LlmpSender from on_existing_from_env {:#?}",
std::process::id(),
&ret.id
);
Ok(ret)
}
pub fn on_existing_shmem(
shmem_provider: SP,
current_out_shmem: SHM,
last_msg_sent_offset: Option<u64>,
) -> Result<Self, Error> {
let mut out_shmem = LlmpSharedMap::existing(current_out_shmem);
let last_msg_sent = match last_msg_sent_offset {
Some(offset) => out_shmem.msg_from_offset(offset)?,
None => ptr::null_mut(),
};
let client_id = unsafe { (*out_shmem.page()).sender_id };
#[cfg(feature = "llmp_debug")]
log::info!(
"PID: {:#?} Initializing LlmpSender from on_existing_shmem {:#?}",
std::process::id(),
&client_id
);
Ok(Self {
id: client_id,
last_msg_sent,
out_shmems: vec![out_shmem],
keep_pages_forever: false,
has_unsent_message: false,
shmem_provider,
unused_shmem_cache: vec![],
})
}
unsafe fn prune_old_pages(&mut self) {
unsafe {
let mut unmap_until_excl = 0;
for map in self.out_shmems.split_last_mut().unwrap().1 {
if (*map.page()).receivers_joined_count.load(Ordering::Acquire) == 0 {
break;
}
unmap_until_excl += 1;
}
if unmap_until_excl == 0 && self.out_shmems.len() > LLMP_CFG_MAX_PENDING_UNREAD_PAGES {
self.send_buf(LLMP_SLOW_RECEIVER_PANIC, &[]).unwrap();
panic!(
"The receiver/broker could not process our sent llmp messages in time. Either we're sending too many messages too fast, the broker got stuck, or it crashed. Giving up."
);
}
self.out_shmems.reserve(unmap_until_excl);
for _ in 0..unmap_until_excl {
let mut map = self.out_shmems.remove(0);
let page = shmem2page_mut(&mut map.shmem);
assert!(
(*page).magic == PAGE_INITIALIZED_MAGIC,
"LLMP: Tried to free uninitialized shared map at addr {:#}!",
page as usize
);
(*page).magic = PAGE_DEINITIALIZED_MAGIC;
#[cfg(feature = "llmp_debug")]
log::debug!("Moving unused map to cache: {map:?} {:x?}", map.page());
self.unused_shmem_cache
.insert(self.unused_shmem_cache.len(), map);
}
}
}
unsafe fn new_or_unused_shmem(
&mut self,
sender_id: ClientId,
next_min_shmem_size: usize,
) -> Result<LlmpSharedMap<SHM>, Error> {
unsafe {
let cached_shmem = self
.unused_shmem_cache
.iter()
.position(|cached_shmem| {
let page = &(*shmem2page(&cached_shmem.shmem));
let receivers_joined_count =
page.receivers_joined_count.load(Ordering::Relaxed);
debug_assert_ne!(receivers_joined_count, 0);
let receivers_left_count = page.receivers_left_count.load(Ordering::Relaxed);
debug_assert!(receivers_joined_count >= receivers_left_count);
let ret = receivers_joined_count == receivers_left_count;
debug_assert_eq!(
receivers_joined_count,
page.receivers_joined_count.load(Ordering::Relaxed),
"Oops, some receiver joined while re-using the page!"
);
ret
})
.map(|e| self.unused_shmem_cache.remove(e));
match cached_shmem {
Some(mut cached_shmem) => {
if cached_shmem.shmem.len() < next_min_shmem_size {
#[cfg(feature = "llmp_debug")]
log::info!("Dropping too small shmem {cached_shmem:?}");
drop(cached_shmem);
self.new_or_unused_shmem(sender_id, next_min_shmem_size)
} else {
#[cfg(feature = "llmp_debug")]
log::info!("Returning cached shmem {cached_shmem:?}");
llmp_page_init(&mut cached_shmem.shmem, sender_id, false);
Ok(cached_shmem)
}
}
_ => {
Ok(LlmpSharedMap::new(
sender_id,
self.shmem_provider.new_shmem(next_min_shmem_size)?,
))
}
}
}
}
unsafe fn handle_out_eop(&mut self) -> Result<(), Error> {
unsafe {
#[cfg(all(feature = "llmp_debug", feature = "std"))]
{
#[cfg(debug_assertions)]
let bt = Backtrace::capture();
#[cfg(not(debug_assertions))]
let bt = "<n/a (release)>";
let shm = self.out_shmems.last().unwrap();
log::info!(
"LLMP_DEBUG: End of page reached for map {} with len {}, sending EOP, bt: {:?}",
shm.shmem.id(),
shm.shmem.len(),
bt
);
}
if !self.keep_pages_forever {
#[cfg(feature = "llmp_debug")]
log::debug!("LLMP DEBUG: pruning old pages");
self.prune_old_pages();
}
let old_map = self.out_shmems.last_mut().unwrap().page_mut();
let next_min_shmem_size = next_shmem_size((*old_map).max_alloc_size);
#[cfg(feature = "llmp_debug")]
log::info!("Next min ShMem Size {next_min_shmem_size}",);
let mut new_map_shmem =
self.new_or_unused_shmem((*old_map).sender_id, next_min_shmem_size)?;
let new_map = new_map_shmem.page_mut();
#[cfg(feature = "llmp_debug")]
log::info!("got new map at: {new_map:?}");
(*new_map).current_msg_id.store(0, Ordering::Release);
#[cfg(feature = "llmp_debug")]
log::info!("Setting max alloc size: {:?}", (*old_map).max_alloc_size);
(*new_map).max_alloc_size = (*old_map).max_alloc_size;
let out = self.alloc_eop()?;
#[expect(clippy::cast_ptr_alignment)]
let end_of_page_msg = (*out).buf.as_mut_ptr() as *mut LlmpPayloadSharedMapInfo;
(*end_of_page_msg).map_size = new_map_shmem.shmem.len();
(*end_of_page_msg).shm_str = *new_map_shmem.shmem.id().as_array();
self.send(out, true)?;
self.out_shmems.push(new_map_shmem);
self.last_msg_sent = ptr::null_mut();
Ok(())
}
}
pub fn alloc_next(&mut self, buf_len: usize) -> Result<*mut LlmpMsg, Error> {
if let Some(msg) = unsafe { self.alloc_next_if_space(buf_len) } {
return Ok(msg);
}
unsafe {
self.handle_out_eop()?;
}
#[cfg(feature = "llmp_debug")]
log::debug!("Handled out eop");
match unsafe { self.alloc_next_if_space(buf_len) } {
Some(msg) => Ok(msg),
None => Err(Error::unknown(format!(
"Error allocating {buf_len} bytes in shmap"
))),
}
}
pub unsafe fn cancel_send(&mut self, msg: *mut LlmpMsg) {
unsafe {
let page = self.out_shmems.last_mut().unwrap().page_mut();
(*msg).tag = LLMP_TAG_UNSET;
(*page).size_used -= (*msg).buf_len_padded as usize + size_of::<LlmpMsg>();
}
}
pub unsafe fn shrink_alloced(
&mut self,
msg: *mut LlmpMsg,
shrinked_len: usize,
) -> Result<(), Error> {
unsafe {
if msg.is_null() {
return Err(Error::illegal_argument("Null msg passed to shrink_alloced"));
} else if !self.has_unsent_message {
return Err(Error::illegal_state(
"Called shrink_alloced, but the msg was not unsent",
));
}
let old_len_padded = (*msg).buf_len_padded;
let msg_start = msg as usize;
let buf_len_padded = llmp_align(msg_start + shrinked_len + size_of::<LlmpMsg>())
- msg_start
- size_of::<LlmpMsg>();
if buf_len_padded > old_len_padded.try_into().unwrap() {
return Err(Error::illegal_argument(format!(
"Cannot shrink msg of size {} (paded: {old_len_padded}) to requested larger size of {shrinked_len} (padded: {buf_len_padded})!",
(*msg).buf_len
)));
}
(*msg).buf_len = shrinked_len as u64;
(*msg).buf_len_padded = buf_len_padded as u64;
let page = self.out_shmems.last_mut().unwrap().page_mut();
(*page).size_used -= old_len_padded as usize;
(*page).size_used += buf_len_padded;
(*llmp_next_msg_ptr(msg)).tag = LLMP_TAG_UNSET;
Ok(())
}
}
pub fn send_buf(&mut self, tag: Tag, buf: &[u8]) -> Result<(), Error> {
if tag == LLMP_TAG_NEW_SHM_CLIENT
|| tag == LLMP_TAG_END_OF_PAGE
|| tag == LLMP_TAG_UNINITIALIZED
|| tag == LLMP_TAG_UNSET
{
return Err(Error::unknown(format!(
"Reserved tag supplied to send_buf ({tag:?})"
)));
}
unsafe {
let msg = self.alloc_next(buf.len())?;
(*msg).tag = tag;
(*msg).flags = LLMP_FLAG_INITIALIZED;
buf.as_ptr()
.copy_to_nonoverlapping((*msg).buf.as_mut_ptr(), buf.len());
self.send(msg, true)
}
}
pub fn send_buf_with_flags(&mut self, tag: Tag, flags: Flags, buf: &[u8]) -> Result<(), Error> {
if tag == LLMP_TAG_NEW_SHM_CLIENT
|| tag == LLMP_TAG_END_OF_PAGE
|| tag == LLMP_TAG_UNINITIALIZED
|| tag == LLMP_TAG_UNSET
{
return Err(Error::unknown(format!(
"Reserved tag supplied to send_buf ({tag:?})"
)));
}
unsafe {
let msg = self.alloc_next(buf.len())?;
(*msg).tag = tag;
(*msg).flags = flags;
buf.as_ptr()
.copy_to_nonoverlapping((*msg).buf.as_mut_ptr(), buf.len());
self.send(msg, true)
}
}
pub fn on_existing_from_description(
mut shmem_provider: SP,
description: &LlmpDescription,
) -> Result<Self, Error> {
Self::on_existing_shmem(
shmem_provider.clone(),
shmem_provider.shmem_from_description(description.shmem)?,
description.last_message_offset,
)
}
pub fn send_exiting(&mut self) -> Result<(), Error> {
self.send_buf(LLMP_TAG_EXITING, &[])
}
}
impl<SHM, SP> LlmpSender<SHM, SP>
where
SHM: ShMem,
{
#[must_use]
pub fn id(&self) -> ClientId {
self.id
}
pub unsafe fn reset(&mut self) {
unsafe {
llmp_page_init(
&mut self.out_shmems.last_mut().unwrap().shmem,
self.id,
true,
);
self.last_msg_sent = ptr::null_mut();
}
}
#[cfg(feature = "std")]
#[inline]
fn client_id_from_env(env_name: &str) -> Result<Option<ClientId>, Error> {
let client_id_str = env::var(format!("{env_name}_CLIENT_ID"))?;
Ok(if client_id_str == _NULL_ENV_STR {
None
} else {
Some(ClientId(client_id_str.parse()?))
})
}
#[cfg(feature = "std")]
fn client_id_to_env(env_name: &str, id: ClientId) {
unsafe { env::set_var(format!("{env_name}_CLIENT_ID"), format!("{}", id.0)) };
}
#[cfg(feature = "std")]
pub unsafe fn to_env(&self, env_name: &str) -> Result<(), Error> {
unsafe {
let current_out_shmem = self.out_shmems.last().unwrap();
current_out_shmem.shmem.write_to_env(env_name)?;
Self::client_id_to_env(env_name, self.id);
current_out_shmem.msg_to_env(self.last_msg_sent, env_name)
}
}
pub fn await_safe_to_unmap_blocking(&self) {
#[cfg(feature = "std")]
let mut ctr = 0_u16;
loop {
if self.safe_to_unmap() {
return;
}
hint::spin_loop();
#[cfg(feature = "std")]
{
ctr = ctr.wrapping_add(1);
if ctr == 0 {
log::info!("Awaiting safe_to_unmap_blocking");
}
}
}
}
pub fn safe_to_unmap(&self) -> bool {
let current_out_shmem = self.out_shmems.last().unwrap();
unsafe {
(*current_out_shmem.page())
.receivers_joined_count
.load(Ordering::Relaxed)
>= 1
}
}
pub unsafe fn mark_safe_to_unmap(&mut self) {
unsafe {
(*self.out_shmems.last_mut().unwrap().page_mut()).receiver_joined();
}
}
unsafe fn alloc_eop(&mut self) -> Result<*mut LlmpMsg, Error> {
unsafe {
let map = self.out_shmems.last_mut().unwrap();
let page = map.page_mut();
let last_msg = self.last_msg_sent;
assert!(
(*page).size_used + EOP_MSG_SIZE <= (*page).size_total,
"PROGRAM ABORT : BUG: EOP does not fit in page! page {page:?}, size_current {:?}, size_total {:?}",
&raw const (*page).size_used,
&raw const (*page).size_total
);
let ret: *mut LlmpMsg = if last_msg.is_null() {
(*page).messages.as_mut_ptr()
} else {
llmp_next_msg_ptr_checked(map, last_msg, EOP_MSG_SIZE)?
};
assert!(
(*ret).tag != LLMP_TAG_UNINITIALIZED,
"Did not call send() on last message!"
);
(*ret).buf_len = size_of::<LlmpPayloadSharedMapInfo>() as u64;
(*ret).buf_len_padded = (*ret).buf_len;
(*ret).message_id = if last_msg.is_null() {
MessageId(1)
} else {
MessageId((*last_msg).message_id.0 + 1)
};
(*ret).tag = LLMP_TAG_END_OF_PAGE;
(*page).size_used += EOP_MSG_SIZE;
Ok(ret)
}
}
unsafe fn alloc_next_if_space(&mut self, buf_len: usize) -> Option<*mut LlmpMsg> {
unsafe {
let map = self.out_shmems.last_mut().unwrap();
let page = map.page_mut();
let last_msg = self.last_msg_sent;
assert!(
!self.has_unsent_message,
"Called alloc without calling send inbetween"
);
#[cfg(feature = "llmp_debug")]
log::info!(
"Allocating {} bytes on page {:?} / map {:?} (last msg: {:?})",
buf_len,
page,
&map.shmem.id().as_str(),
last_msg
);
let msg_start = (*page).messages.as_mut_ptr() as usize + (*page).size_used;
let buf_len_padded = llmp_align(msg_start + buf_len + size_of::<LlmpMsg>())
- msg_start
- size_of::<LlmpMsg>();
#[cfg(feature = "llmp_debug")]
log::trace!(
"{page:?} {:?} size_used={:x} buf_len_padded={:x} EOP_MSG_SIZE={:x} size_total={}",
&(*page),
(*page).size_used,
buf_len_padded,
EOP_MSG_SIZE,
(*page).size_total
);
(*page).max_alloc_size = max(
(*page).max_alloc_size,
size_of::<LlmpMsg>() + buf_len_padded,
);
if (*page).size_used + size_of::<LlmpMsg>() + buf_len_padded + EOP_MSG_SIZE
> (*page).size_total
{
#[cfg(feature = "llmp_debug")]
log::info!("LLMP: Page full.");
return None;
}
let ret = msg_start as *mut LlmpMsg;
(*ret).message_id = if last_msg.is_null() {
MessageId(1)
} else if (*page).current_msg_id.load(Ordering::Relaxed) == (*last_msg).message_id.0 {
MessageId((*last_msg).message_id.0 + 1)
} else {
panic!(
"BUG: The current message never got committed using send! (page->current_msg_id {:?}, last_msg->message_id: {:?})",
&raw const (*page).current_msg_id,
(*last_msg).message_id
);
};
(*ret).buf_len = buf_len as u64;
(*ret).buf_len_padded = buf_len_padded as u64;
(*page).size_used += size_of::<LlmpMsg>() + buf_len_padded;
(*llmp_next_msg_ptr(ret)).tag = LLMP_TAG_UNSET;
(*ret).tag = LLMP_TAG_UNINITIALIZED;
self.has_unsent_message = true;
Some(ret)
}
}
#[inline(never)] unsafe fn send(&mut self, msg: *mut LlmpMsg, overwrite_client_id: bool) -> Result<(), Error> {
unsafe {
assert!(
!ptr::addr_eq(self.last_msg_sent, msg),
"Message sent twice!"
);
assert!(
(*msg).tag != LLMP_TAG_UNSET,
"No tag set on message with id {:?}",
(*msg).message_id
);
if overwrite_client_id {
(*msg).sender = self.id;
}
let page = self.out_shmems.last_mut().unwrap().page_mut();
if msg.is_null() || !llmp_msg_in_page(page, msg) {
return Err(Error::unknown(format!(
"Llmp Message {msg:?} is null or not in current page"
)));
}
let mid = (*page).current_msg_id.load(Ordering::Relaxed) + 1;
(*msg).message_id.0 = mid;
(*page)
.current_msg_id
.store((*msg).message_id.0, Ordering::Release);
self.last_msg_sent = msg;
self.has_unsent_message = false;
log::debug!(
"[{} - {:#x}] Send message with id {}",
self.id.0,
ptr::from_ref::<Self>(self) as u64,
mid
);
Ok(())
}
}
pub fn describe(&self) -> Result<LlmpDescription, Error> {
let map = self.out_shmems.last().unwrap();
let last_message_offset = if self.last_msg_sent.is_null() {
None
} else {
Some(unsafe { map.msg_to_offset(self.last_msg_sent) }?)
};
Ok(LlmpDescription {
shmem: map.shmem.description(),
last_message_offset,
})
}
}
#[derive(Debug)]
pub struct LlmpReceiver<SHM, SP> {
id: ClientId,
last_msg_recvd: *const LlmpMsg,
#[cfg(feature = "std")]
last_msg_time: Duration,
shmem_provider: SP,
current_recv_shmem: LlmpSharedMap<SHM>,
highest_msg_id: MessageId,
}
impl<SHM, SP> LlmpReceiver<SHM, SP>
where
SHM: ShMem,
SP: ShMemProvider<ShMem = SHM>,
{
#[cfg(feature = "std")]
pub fn on_existing_from_env(mut shmem_provider: SP, env_name: &str) -> Result<Self, Error> {
Self::on_existing_shmem(
shmem_provider.clone(),
shmem_provider.existing_from_env(env_name)?,
msg_offset_from_env(env_name)?,
)
}
pub fn on_existing_shmem(
shmem_provider: SP,
current_sender_shmem: SHM,
last_msg_recvd_offset: Option<u64>,
) -> Result<Self, Error> {
let mut current_recv_shmem = LlmpSharedMap::existing(current_sender_shmem);
let last_msg_recvd = match last_msg_recvd_offset {
Some(offset) => current_recv_shmem.msg_from_offset(offset)?,
None => ptr::null_mut(),
};
Ok(Self {
id: ClientId(0),
current_recv_shmem,
last_msg_recvd,
shmem_provider,
highest_msg_id: MessageId(0),
#[cfg(feature = "std")]
last_msg_time: current_time(),
})
}
#[inline(never)]
unsafe fn recv(&mut self) -> Result<Option<*mut LlmpMsg>, Error> {
unsafe {
let page = self.current_recv_shmem.page_mut();
let last_msg = self.last_msg_recvd;
let (current_msg_id, loaded) =
if !last_msg.is_null() && self.highest_msg_id > (*last_msg).message_id {
(self.highest_msg_id, false)
} else {
let current_msg_id = (*page).current_msg_id.load(Ordering::Relaxed);
self.highest_msg_id = MessageId(current_msg_id);
(MessageId(current_msg_id), true)
};
let ret = if current_msg_id.0 == 0 {
None
} else if last_msg.is_null() {
fence(Ordering::Acquire);
Some((*page).messages.as_mut_ptr())
} else if (*last_msg).message_id == current_msg_id {
None
} else {
if loaded {
fence(Ordering::Acquire);
}
Some(llmp_next_msg_ptr_checked(
&mut self.current_recv_shmem,
last_msg,
size_of::<LlmpMsg>(),
)?)
};
if let Some(msg) = ret {
if !(*msg).in_shmem(&mut self.current_recv_shmem) {
return Err(Error::illegal_state(
"Unexpected message in map (out of map bounds) - buggy client or tampered shared map detected!",
));
}
log::debug!(
"[{} - {:#x}] Received message with ID {}...",
self.id.0,
ptr::from_ref::<Self>(self) as u64,
(*msg).message_id.0
);
match (*msg).tag {
LLMP_TAG_UNSET => panic!(
"BUG: Read unallocated msg (tag was {:?} - msg header: {:?}",
LLMP_TAG_UNSET,
&(*msg)
),
LLMP_TAG_EXITING => {
assert_eq!((*msg).buf_len, 0);
return Err(Error::shutting_down());
}
LLMP_TAG_END_OF_PAGE => {
log::debug!("Received end of page, allocating next");
assert!(
(*msg).buf_len >= size_of::<LlmpPayloadSharedMapInfo>() as u64,
"Illegal message length for EOP (is {}/{}, expected {})",
(*msg).buf_len,
(*msg).buf_len_padded,
size_of::<LlmpPayloadSharedMapInfo>()
);
#[expect(clippy::cast_ptr_alignment)]
let pageinfo = (*msg).buf.as_mut_ptr() as *mut LlmpPayloadSharedMapInfo;
let pageinfo_cpy = *pageinfo;
self.last_msg_recvd = ptr::null();
self.highest_msg_id = MessageId(0);
(*page).receiver_left();
self.current_recv_shmem =
LlmpSharedMap::existing(self.shmem_provider.shmem_from_id_and_size(
ShMemId::from_array(&pageinfo_cpy.shm_str),
pageinfo_cpy.map_size,
)?);
let new_page = self.current_recv_shmem.page_mut();
(*new_page).receiver_joined();
#[cfg(feature = "llmp_debug")]
log::info!(
"LLMP_DEBUG: Got a new recv map {} with len {:?}",
self.current_recv_shmem.shmem.id(),
self.current_recv_shmem.shmem.len()
);
return self.recv();
}
_ => (),
}
self.last_msg_recvd = msg;
}
Ok(ret)
}
}
pub unsafe fn recv_blocking(&mut self) -> Result<*mut LlmpMsg, Error> {
unsafe {
let mut current_msg_id = MessageId(0);
let page = self.current_recv_shmem.page_mut();
let last_msg = self.last_msg_recvd;
if !last_msg.is_null() {
assert!(
(*last_msg).tag != LLMP_TAG_END_OF_PAGE || llmp_msg_in_page(page, last_msg),
"BUG: full page passed to await_message_blocking or reset failed"
);
current_msg_id = (*last_msg).message_id;
}
loop {
if (*page).current_msg_id.load(Ordering::Relaxed) != current_msg_id.0 {
return match self.recv()? {
Some(msg) => Ok(msg),
None => panic!("BUG: blocking llmp message should never be NULL"),
};
}
hint::spin_loop();
}
}
}
#[expect(clippy::type_complexity)]
#[inline]
pub fn recv_buf(&mut self) -> Result<Option<(ClientId, Tag, &[u8])>, Error> {
if let Some((sender, tag, _flags, buf)) = self.recv_buf_with_flags()? {
Ok(Some((sender, tag, buf)))
} else {
Ok(None)
}
}
#[expect(clippy::type_complexity)]
#[inline]
pub fn recv_buf_with_flags(&mut self) -> Result<Option<(ClientId, Tag, Flags, &[u8])>, Error> {
unsafe {
Ok(match self.recv()? {
Some(msg) => Some((
(*msg).sender,
(*msg).tag,
(*msg).flags,
(*msg).try_as_slice(&mut self.current_recv_shmem)?,
)),
None => None,
})
}
}
#[inline]
pub fn recv_buf_blocking_with_flags(&mut self) -> Result<(ClientId, Tag, Flags, &[u8]), Error> {
unsafe {
let msg = self.recv_blocking()?;
Ok((
(*msg).sender,
(*msg).tag,
(*msg).flags,
(*msg).try_as_slice(&mut self.current_recv_shmem)?,
))
}
}
#[inline]
pub fn recv_buf_blocking(&mut self) -> Result<(ClientId, Tag, &[u8]), Error> {
unsafe {
let msg = self.recv_blocking()?;
Ok((
(*msg).sender,
(*msg).tag,
(*msg).try_as_slice(&mut self.current_recv_shmem)?,
))
}
}
pub fn on_existing_from_description(
mut shmem_provider: SP,
description: &LlmpDescription,
) -> Result<Self, Error> {
Self::on_existing_shmem(
shmem_provider.clone(),
shmem_provider.shmem_from_description(description.shmem)?,
description.last_message_offset,
)
}
}
impl<SHM, SP> LlmpReceiver<SHM, SP>
where
SHM: ShMem,
{
#[cfg(feature = "std")]
pub unsafe fn to_env(&self, env_name: &str) -> Result<(), Error> {
let current_out_shmem = &self.current_recv_shmem;
unsafe {
current_out_shmem.shmem.write_to_env(env_name)?;
}
unsafe { current_out_shmem.msg_to_env(self.last_msg_recvd, env_name) }
}
pub fn describe(&self) -> Result<LlmpDescription, Error> {
let map = &self.current_recv_shmem;
let last_message_offset = if self.last_msg_recvd.is_null() {
None
} else {
Some(unsafe { map.msg_to_offset(self.last_msg_recvd) }?)
};
Ok(LlmpDescription {
shmem: map.shmem.description(),
last_message_offset,
})
}
}
#[derive(Debug, Clone)]
pub struct LlmpSharedMap<SHM> {
shmem: SHM,
}
impl<SHM> LlmpSharedMap<SHM>
where
SHM: ShMem,
{
pub fn new(sender: ClientId, mut new_shmem: SHM) -> Self {
#[cfg(feature = "llmp_debug")]
log::info!(
"LLMP_DEBUG: Initializing map on {} with size {}",
new_shmem.id(),
new_shmem.len()
);
unsafe {
llmp_page_init(&mut new_shmem, sender, false);
}
Self { shmem: new_shmem }
}
pub fn existing(existing_shmem: SHM) -> Self {
#[cfg(feature = "llmp_debug")]
log::info!(
"LLMP_DEBUG: Using existing map {} with size {}",
existing_shmem.id(),
existing_shmem.len(),
);
let ret = Self {
shmem: existing_shmem,
};
unsafe {
assert!(
(*ret.page()).magic == PAGE_INITIALIZED_MAGIC,
"Map was not priviously initialized at {:?}",
&ret.shmem
);
#[cfg(feature = "llmp_debug")]
log::info!("PAGE: {:?}", &(*ret.page()));
}
ret
}
pub fn mark_safe_to_unmap(&mut self) {
unsafe {
(*self.page_mut()).receiver_joined();
}
}
pub unsafe fn page_mut(&mut self) -> *mut LlmpPage {
unsafe { shmem2page_mut(&mut self.shmem) }
}
pub unsafe fn page(&self) -> *const LlmpPage {
unsafe { shmem2page(&self.shmem) }
}
#[expect(clippy::cast_sign_loss)]
pub unsafe fn msg_to_offset(&self, msg: *const LlmpMsg) -> Result<u64, Error> {
unsafe {
let page = self.page();
if llmp_msg_in_page(page, msg) {
Ok((msg as *const u8).offset_from((*page).messages.as_ptr() as *const u8) as u64)
} else {
Err(Error::illegal_argument(format!(
"Message (0x{:X}) not in page (0x{:X})",
page as u64, msg as u64
)))
}
}
}
#[cfg(feature = "std")]
pub fn msg_from_env(&mut self, map_env_name: &str) -> Result<*mut LlmpMsg, Error> {
match msg_offset_from_env(map_env_name)? {
Some(offset) => self.msg_from_offset(offset),
None => Ok(ptr::null_mut()),
}
}
#[cfg(feature = "std")]
pub unsafe fn msg_to_env(&self, msg: *const LlmpMsg, map_env_name: &str) -> Result<(), Error> {
if msg.is_null() {
unsafe { env::set_var(format!("{map_env_name}_OFFSET"), _NULL_ENV_STR) };
} else {
unsafe {
env::set_var(
format!("{map_env_name}_OFFSET"),
format!("{}", self.msg_to_offset(msg)?),
);
};
}
Ok(())
}
#[expect(clippy::cast_ptr_alignment)]
pub fn msg_from_offset(&mut self, offset: u64) -> Result<*mut LlmpMsg, Error> {
let offset = offset as usize;
let page = unsafe { self.page_mut() };
let page_size = self.shmem.len() - size_of::<LlmpPage>();
if offset > page_size {
Err(Error::illegal_argument(format!(
"Msg offset out of bounds (size: {page_size}, requested offset: {offset})"
)))
} else {
unsafe { Ok(((*page).messages.as_mut_ptr() as *mut u8).add(offset) as *mut LlmpMsg) }
}
}
}
#[derive(Debug)]
pub struct LlmpBrokerInner<SHM, SP> {
llmp_out: LlmpSender<SHM, SP>,
llmp_clients: Vec<LlmpReceiver<SHM, SP>>,
listeners: Vec<ClientId>,
num_clients_seen: usize,
pub exit_cleanly_after: Option<NonZeroUsize>,
clients_to_remove: Vec<ClientId>,
shmem_provider: SP,
}
#[derive(Debug)]
pub struct LlmpBroker<HT, SHM, SP> {
inner: LlmpBrokerInner<SHM, SP>,
hooks: HT,
}
pub trait Broker {
fn is_shutting_down(&self) -> bool;
fn on_timeout(&mut self) -> Result<(), Error>;
fn broker_once(&mut self) -> Result<bool, Error>;
fn exit_after(&self) -> Option<NonZeroUsize>;
fn set_exit_after(&mut self, n_clients: NonZeroUsize);
fn has_clients(&self) -> bool;
fn send_buf(&mut self, tag: Tag, buf: &[u8]) -> Result<(), Error>;
fn num_clients_seen(&self) -> usize;
fn nb_listeners(&self) -> usize;
}
impl<HT, SHM, SP> Broker for LlmpBroker<HT, SHM, SP>
where
HT: LlmpHookTuple<SHM, SP>,
SHM: ShMem,
SP: ShMemProvider<ShMem = SHM>,
{
fn is_shutting_down(&self) -> bool {
self.inner.is_shutting_down()
}
fn on_timeout(&mut self) -> Result<(), Error> {
self.hooks.on_timeout_all()
}
fn broker_once(&mut self) -> Result<bool, Error> {
self.broker_once()
}
fn exit_after(&self) -> Option<NonZeroUsize> {
self.inner.exit_cleanly_after
}
fn set_exit_after(&mut self, n_clients: NonZeroUsize) {
self.inner.set_exit_cleanly_after(n_clients);
}
fn has_clients(&self) -> bool {
self.inner.has_clients()
}
fn send_buf(&mut self, tag: Tag, buf: &[u8]) -> Result<(), Error> {
self.inner.llmp_out.send_buf(tag, buf)
}
fn num_clients_seen(&self) -> usize {
self.inner.num_clients_seen
}
fn nb_listeners(&self) -> usize {
self.inner.listeners.len()
}
}
#[cfg(feature = "std")]
#[derive(Default)]
pub struct Brokers {
llmp_brokers: Vec<Box<dyn Broker>>,
}
#[cfg(feature = "std")]
impl Debug for Brokers {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
Debug::fmt("Brokers", f)?;
Ok(())
}
}
#[cfg(any(unix, all(windows, feature = "std")))]
#[derive(Debug, Clone)]
pub struct LlmpShutdownSignalHandler {
shutting_down: bool,
}
#[cfg(all(unix, feature = "alloc"))]
impl SignalHandler for LlmpShutdownSignalHandler {
unsafe fn handle(
&mut self,
_signal: Signal,
_info: &mut siginfo_t,
_context: Option<&mut ucontext_t>,
) {
unsafe {
ptr::write_volatile(&raw mut self.shutting_down, true);
}
}
fn signals(&self) -> Vec<Signal> {
vec![Signal::SigTerm, Signal::SigInterrupt, Signal::SigQuit]
}
}
#[cfg(all(windows, feature = "std"))]
impl CtrlHandler for LlmpShutdownSignalHandler {
fn handle(&mut self, ctrl_type: u32) -> bool {
log::info!("LLMP: Received shutdown signal, ctrl_type {ctrl_type:?}");
unsafe {
ptr::write_volatile(&raw mut self.shutting_down, true);
}
true
}
}
pub trait LlmpHook<SHM, SP> {
fn on_new_message(
&mut self,
broker_inner: &mut LlmpBrokerInner<SHM, SP>,
client_id: ClientId,
msg_tag: &mut Tag,
msg_flags: &mut Flags,
msg: &mut [u8],
new_msgs: &mut Vec<(Tag, Flags, Vec<u8>)>,
) -> Result<LlmpMsgHookResult, Error>;
fn on_timeout(&mut self) -> Result<(), Error> {
Ok(())
}
}
pub trait LlmpHookTuple<SHM, SP> {
fn on_new_message_all(
&mut self,
inner: &mut LlmpBrokerInner<SHM, SP>,
client_id: ClientId,
msg_tag: &mut Tag,
msg_flags: &mut Flags,
msg: &mut [u8],
new_msgs: &mut Vec<(Tag, Flags, Vec<u8>)>,
) -> Result<LlmpMsgHookResult, Error>;
fn on_timeout_all(&mut self) -> Result<(), Error>;
}
impl<SHM, SP> LlmpHookTuple<SHM, SP> for () {
fn on_new_message_all(
&mut self,
_inner: &mut LlmpBrokerInner<SHM, SP>,
_client_id: ClientId,
_msg_tag: &mut Tag,
_msg_flags: &mut Flags,
_msg: &mut [u8],
_new_msgs: &mut Vec<(Tag, Flags, Vec<u8>)>,
) -> Result<LlmpMsgHookResult, Error> {
Ok(LlmpMsgHookResult::ForwardToClients)
}
fn on_timeout_all(&mut self) -> Result<(), Error> {
Ok(())
}
}
impl<Head, Tail, SHM, SP> LlmpHookTuple<SHM, SP> for (Head, Tail)
where
Head: LlmpHook<SHM, SP>,
Tail: LlmpHookTuple<SHM, SP>,
{
fn on_new_message_all(
&mut self,
inner: &mut LlmpBrokerInner<SHM, SP>,
client_id: ClientId,
msg_tag: &mut Tag,
msg_flags: &mut Flags,
msg: &mut [u8],
new_msgs: &mut Vec<(Tag, Flags, Vec<u8>)>,
) -> Result<LlmpMsgHookResult, Error> {
match self
.0
.on_new_message(inner, client_id, msg_tag, msg_flags, msg, new_msgs)?
{
LlmpMsgHookResult::Handled => {
Ok(LlmpMsgHookResult::Handled)
}
LlmpMsgHookResult::ForwardToClients => {
self.1
.on_new_message_all(inner, client_id, msg_tag, msg_flags, msg, new_msgs)
}
}
}
fn on_timeout_all(&mut self) -> Result<(), Error> {
self.0.on_timeout()?;
self.1.on_timeout_all()
}
}
impl<SHM, SP> LlmpBroker<(), SHM, SP> {
pub fn add_hooks<HT>(self, hooks: HT) -> LlmpBroker<HT, SHM, SP>
where
HT: LlmpHookTuple<SHM, SP>,
{
LlmpBroker {
inner: self.inner,
hooks,
}
}
}
#[cfg(feature = "std")]
impl Brokers {
#[must_use]
pub fn new() -> Self {
Self {
llmp_brokers: Vec::new(),
}
}
pub fn add(&mut self, broker: Box<dyn Broker>) {
self.llmp_brokers.push(broker);
}
#[cfg(any(all(unix, feature = "alloc", not(miri)), all(windows, feature = "std")))]
fn setup_handlers() {
#[cfg(all(unix, not(miri)))]
if let Err(e) = unsafe { setup_signal_handler(&raw mut LLMP_SIGHANDLER_STATE) } {
log::info!("Failed to setup signal handlers: {e}");
} else {
log::info!("Successfully setup signal handlers");
}
#[cfg(all(windows, feature = "std"))]
if let Err(e) = unsafe { setup_ctrl_handler(&raw mut LLMP_SIGHANDLER_STATE) } {
log::info!("Failed to setup control handlers: {e}");
} else {
log::info!(
"{}: Broker successfully setup control handlers",
std::process::id()
);
}
}
#[cfg(feature = "std")]
pub fn loop_with_timeouts(&mut self, timeout: Duration, sleep_time: Option<Duration>) {
use no_std_time::current_milliseconds;
#[cfg(any(all(unix, feature = "alloc", not(miri)), all(windows, feature = "std")))]
Self::setup_handlers();
let timeout = timeout.as_millis() as u64;
let mut end_time = current_milliseconds() + timeout;
loop {
self.llmp_brokers.retain_mut(|broker| {
if broker.is_shutting_down() {
broker.send_buf(LLMP_TAG_EXITING, &[]).expect(
"Error when shutting down broker: Could not send LLMP_TAG_EXITING msg.",
);
return false;
}
if current_milliseconds() > end_time {
broker
.on_timeout()
.expect("An error occurred in broker timeout. Exiting.");
end_time = current_milliseconds() + timeout;
}
if broker
.broker_once()
.expect("An error occurred when brokering. Exiting.")
{
end_time = current_milliseconds() + timeout;
}
if let Some(exit_after_count) = broker.exit_after() {
if !broker.has_clients()
&& (broker.num_clients_seen() - broker.nb_listeners())
>= exit_after_count.into()
{
return false;
}
}
true
});
if self.llmp_brokers.is_empty() {
break;
}
#[cfg(feature = "std")]
if let Some(time) = sleep_time {
thread::sleep(time);
}
#[cfg(not(feature = "std"))]
if let Some(time) = sleep_time {
panic!("Cannot sleep on no_std platform (requested {time:?})");
}
}
}
}
impl<HT, SHM, SP> LlmpBroker<HT, SHM, SP>
where
HT: LlmpHookTuple<SHM, SP>,
SHM: ShMem,
SP: ShMemProvider<ShMem = SHM>,
{
pub fn new(shmem_provider: SP, hooks: HT) -> Result<Self, Error> {
Self::with_keep_pages(shmem_provider, hooks, true)
}
pub fn with_keep_pages(
shmem_provider: SP,
hooks: HT,
keep_pages_forever: bool,
) -> Result<Self, Error> {
Ok(LlmpBroker {
inner: LlmpBrokerInner::with_keep_pages(shmem_provider, keep_pages_forever)?,
hooks,
})
}
#[cfg(feature = "std")]
pub fn create_attach_to_tcp(shmem_provider: SP, hooks: HT, port: u16) -> Result<Self, Error> {
Ok(LlmpBroker {
inner: LlmpBrokerInner::create_attach_to_tcp(shmem_provider, port)?,
hooks,
})
}
#[cfg(feature = "std")]
pub fn with_keep_pages_attach_to_tcp(
shmem_provider: SP,
hooks: HT,
port: u16,
keep_pages_forever: bool,
) -> Result<Self, Error> {
Ok(LlmpBroker {
inner: LlmpBrokerInner::with_keep_pages_attach_to_tcp(
shmem_provider,
port,
keep_pages_forever,
)?,
hooks,
})
}
pub fn inner(&self) -> &LlmpBrokerInner<SHM, SP> {
&self.inner
}
pub fn inner_mut(&mut self) -> &mut LlmpBrokerInner<SHM, SP> {
&mut self.inner
}
pub fn loop_forever(&mut self, sleep_time: Option<Duration>) {
#[cfg(any(all(unix, feature = "alloc", not(miri)), all(windows, feature = "std")))]
Self::setup_handlers();
while !self.inner.is_shutting_down() {
self.broker_once()
.expect("An error occurred when brokering. Exiting.");
if let Some(exit_after_count) = self.inner.exit_cleanly_after {
if !self.inner.has_clients()
&& (self.inner.num_clients_seen - self.inner.listeners.len())
> exit_after_count.into()
{
break;
}
}
#[cfg(feature = "std")]
if let Some(time) = sleep_time {
thread::sleep(time);
}
#[cfg(not(feature = "std"))]
if let Some(time) = sleep_time {
panic!("Cannot sleep on no_std platform (requested {time:?})");
}
}
self.inner
.llmp_out
.send_buf(LLMP_TAG_EXITING, &[])
.expect("Error when shutting down broker: Could not send LLMP_TAG_EXITING msg.");
}
#[cfg(feature = "std")]
pub fn loop_with_timeouts(&mut self, timeout: Duration, sleep_time: Option<Duration>) {
use no_std_time::current_milliseconds;
#[cfg(any(all(unix, feature = "alloc", not(miri)), all(windows, feature = "std")))]
Self::setup_handlers();
let timeout = timeout.as_millis() as u64;
let mut end_time = current_milliseconds() + timeout;
while !self.inner.is_shutting_down() {
if current_milliseconds() > end_time {
self.hooks
.on_timeout_all()
.expect("An error occurred in broker timeout. Exiting.");
end_time = current_milliseconds() + timeout;
}
if self
.broker_once()
.expect("An error occurred when brokering. Exiting.")
{
end_time = current_milliseconds() + timeout;
}
if let Some(exit_after_count) = self.inner.exit_cleanly_after {
if !self.inner.has_clients()
&& (self.inner.num_clients_seen - self.inner.listeners.len())
>= exit_after_count.into()
{
break;
}
}
#[cfg(feature = "std")]
if let Some(time) = sleep_time {
thread::sleep(time);
}
#[cfg(not(feature = "std"))]
if let Some(time) = sleep_time {
panic!("Cannot sleep on no_std platform (requested {time:?})");
}
}
self.inner
.llmp_out
.send_buf(LLMP_TAG_EXITING, &[])
.expect("Error when shutting down broker: Could not send LLMP_TAG_EXITING msg.");
}
#[inline]
pub fn broker_once(&mut self) -> Result<bool, Error> {
let mut new_messages = false;
for i in 0..self.inner.llmp_clients.len() {
let client_id = self.inner.llmp_clients[i].id;
match unsafe { self.handle_new_msgs(client_id) } {
Ok(has_messages) => {
new_messages = has_messages;
}
Err(Error::ShuttingDown) => {
self.inner.clients_to_remove.push(client_id);
}
Err(err) => return Err(err),
}
}
let possible_remove = self.inner.clients_to_remove.len();
if possible_remove > 0 {
self.inner.clients_to_remove.sort_unstable();
self.inner.clients_to_remove.dedup();
log::trace!("Removing {:#?}", self.inner.clients_to_remove);
for idx in (0..self.inner.llmp_clients.len()).rev() {
let client_id = self.inner.llmp_clients[idx].id;
if self.inner.clients_to_remove.contains(&client_id) {
log::info!("Client {client_id:#?} wants to exit. Removing.");
self.inner.llmp_clients.remove(idx);
}
}
}
self.inner.clients_to_remove.clear();
Ok(new_messages)
}
#[inline]
#[expect(clippy::cast_ptr_alignment)]
#[expect(clippy::too_many_lines)]
unsafe fn handle_new_msgs(&mut self, client_id: ClientId) -> Result<bool, Error> {
unsafe {
let mut new_messages = false;
loop {
let msg = {
let pos = if (client_id.0 as usize) < self.inner.llmp_clients.len()
&& self.inner.llmp_clients[client_id.0 as usize].id == client_id
{
client_id.0 as usize
} else {
self.inner
.llmp_clients
.binary_search_by_key(&client_id, |x| x.id)
.unwrap_or_else(|_| {
panic!(
"Fatal error, client ID {client_id:?} not found in llmp_clients."
)
})
};
let client = &mut self.inner.llmp_clients[pos];
match client.recv()? {
None => {
#[cfg(feature = "std")]
if new_messages {
self.inner.llmp_clients[pos].last_msg_time = current_time();
}
return Ok(new_messages);
}
Some(msg) => msg,
}
};
new_messages = true;
match (*msg).tag {
LLMP_SLOW_RECEIVER_PANIC => {
return Err(Error::unknown(format!(
"The broker was too slow to handle messages of client {client_id:?} in time, so it quit. Either the client sent messages too fast, or we (the broker) got stuck!"
)));
}
LLMP_TAG_CLIENT_EXIT => {
let msg_buf_len_padded = (*msg).buf_len_padded;
if (*msg).buf_len < size_of::<LlmpClientExitInfo>() as u64 {
log::info!(
"Ignoring broken CLIENT_EXIT msg due to incorrect size. Expected {} but got {}",
msg_buf_len_padded,
size_of::<LlmpClientExitInfo>()
);
#[cfg(not(feature = "std"))]
return Err(Error::unknown(format!(
"Broken CLIENT_EXIT msg with incorrect size received. Expected {} but got {}",
msg_buf_len_padded,
size_of::<LlmpClientExitInfo>()
)));
}
let exitinfo =
((*msg).buf.as_mut_ptr() as *mut LlmpClientExitInfo).read_unaligned();
let client_id = ClientId(exitinfo.client_id);
log::info!(
"Client exit message received!, we are removing clients whose client_group_id is {client_id:#?}"
);
self.inner.clients_to_remove.push(client_id);
}
LLMP_TAG_NEW_SHM_CLIENT => {
let msg_buf_len_padded = (*msg).buf_len_padded;
if (*msg).buf_len < size_of::<LlmpPayloadSharedMapInfo>() as u64 {
log::info!(
"Ignoring broken CLIENT_ADDED msg due to incorrect size. Expected {} but got {}",
msg_buf_len_padded,
size_of::<LlmpPayloadSharedMapInfo>()
);
#[cfg(not(feature = "std"))]
return Err(Error::unknown(format!(
"Broken CLIENT_ADDED msg with incorrect size received. Expected {} but got {}",
msg_buf_len_padded,
size_of::<LlmpPayloadSharedMapInfo>()
)));
}
let pageinfo = (*msg).buf.as_mut_ptr() as *mut LlmpPayloadSharedMapInfo;
match self.inner.shmem_provider.shmem_from_id_and_size(
ShMemId::from_array(&(*pageinfo).shm_str),
(*pageinfo).map_size,
) {
Ok(new_shmem) => {
let mut new_page = LlmpSharedMap::existing(new_shmem);
new_page.mark_safe_to_unmap();
let _new_client = self.inner.add_client(LlmpReceiver {
id: ClientId(0), current_recv_shmem: new_page,
last_msg_recvd: ptr::null_mut(),
shmem_provider: self.inner.shmem_provider.clone(),
highest_msg_id: MessageId(0),
#[cfg(feature = "std")]
last_msg_time: current_time(),
});
}
Err(e) => {
log::info!("Error adding client! Ignoring: {e:?}");
#[cfg(not(feature = "std"))]
return Err(Error::unknown(format!(
"Error adding client! PANIC! {e:?}"
)));
}
}
}
_ => {
let pos = if (client_id.0 as usize) < self.inner.llmp_clients.len()
&& self.inner.llmp_clients[client_id.0 as usize].id == client_id
{
client_id.0 as usize
} else {
self.inner
.llmp_clients
.binary_search_by_key(&client_id, |x| x.id)
.unwrap_or_else(|_| panic!("Fatal error, client ID {client_id:?} not found in llmp_clients."))
};
let map = &mut self.inner.llmp_clients[pos].current_recv_shmem;
let msg_buf = (*msg).try_as_slice_mut(map)?;
let mut new_msgs: Vec<(Tag, Flags, Vec<u8>)> = Vec::new();
if let LlmpMsgHookResult::ForwardToClients = self.hooks.on_new_message_all(
&mut self.inner,
client_id,
&mut (*msg).tag,
&mut (*msg).flags,
msg_buf,
&mut new_msgs,
)? {
self.inner.forward_msg(msg)?;
}
log::debug!("New msg vector: {}", new_msgs.len());
for (new_msg_tag, new_msg_flag, new_msg) in new_msgs {
self.inner.llmp_out.send_buf_with_flags(
new_msg_tag,
new_msg_flag,
new_msg.as_ref(),
)?;
}
}
}
}
}
}
#[cfg(any(all(unix, feature = "alloc", not(miri)), all(windows, feature = "std")))]
fn setup_handlers() {
#[cfg(all(unix, not(miri)))]
if let Err(e) = unsafe { setup_signal_handler(&raw mut LLMP_SIGHANDLER_STATE) } {
log::info!("Failed to setup signal handlers: {e}");
} else {
log::info!("Successfully setup signal handlers");
}
#[cfg(all(windows, feature = "std"))]
if let Err(e) = unsafe { setup_ctrl_handler(&raw mut LLMP_SIGHANDLER_STATE) } {
log::info!("Failed to setup control handlers: {e}");
} else {
log::info!(
"{}: Broker successfully setup control handlers",
std::process::id()
);
}
}
}
impl<SHM, SP> LlmpBrokerInner<SHM, SP>
where
SHM: ShMem,
SP: ShMemProvider<ShMem = SHM>,
{
pub fn new(shmem_provider: SP) -> Result<Self, Error> {
Self::with_keep_pages(shmem_provider, true)
}
pub fn with_keep_pages(
mut shmem_provider: SP,
keep_pages_forever: bool,
) -> Result<Self, Error> {
Ok(LlmpBrokerInner {
llmp_out: LlmpSender {
id: ClientId(0),
last_msg_sent: ptr::null_mut(),
out_shmems: vec![LlmpSharedMap::new(
ClientId(0),
shmem_provider.new_shmem(next_shmem_size(0))?,
)],
keep_pages_forever,
has_unsent_message: false,
shmem_provider: shmem_provider.clone(),
unused_shmem_cache: vec![],
},
llmp_clients: vec![],
clients_to_remove: Vec::new(),
listeners: vec![],
exit_cleanly_after: None,
num_clients_seen: 0,
shmem_provider,
})
}
#[must_use]
#[inline]
pub fn peek_next_client_id(&self) -> ClientId {
ClientId(
self.num_clients_seen
.try_into()
.expect("More than u32::MAX clients!"),
)
}
#[cfg(feature = "std")]
pub fn create_attach_to_tcp(shmem_provider: SP, port: u16) -> Result<Self, Error> {
Self::with_keep_pages_attach_to_tcp(shmem_provider, port, true)
}
#[cfg(feature = "std")]
pub fn with_keep_pages_attach_to_tcp(
shmem_provider: SP,
port: u16,
keep_pages_forever: bool,
) -> Result<Self, Error> {
match tcp_bind(port) {
Ok(listener) => {
let mut broker =
LlmpBrokerInner::with_keep_pages(shmem_provider, keep_pages_forever)?;
let _listener_thread = broker.launch_listener(Listener::Tcp(listener))?;
Ok(broker)
}
Err(e) => Err(e),
}
}
pub fn set_exit_cleanly_after(&mut self, n_clients: NonZeroUsize) {
self.exit_cleanly_after = Some(n_clients);
}
pub fn add_client(&mut self, mut client_receiver: LlmpReceiver<SHM, SP>) -> ClientId {
let id = self.peek_next_client_id();
client_receiver.id = id;
self.llmp_clients.push(client_receiver);
self.num_clients_seen += 1;
id
}
unsafe fn alloc_next(&mut self, buf_len: usize) -> Result<*mut LlmpMsg, Error> {
self.llmp_out.alloc_next(buf_len)
}
pub fn register_client(&mut self, mut client_page: LlmpSharedMap<SHM>) -> ClientId {
client_page.mark_safe_to_unmap();
self.add_client(LlmpReceiver {
id: ClientId(0), current_recv_shmem: client_page,
last_msg_recvd: ptr::null_mut(),
shmem_provider: self.shmem_provider.clone(),
highest_msg_id: MessageId(0),
#[cfg(feature = "std")]
last_msg_time: current_time(),
})
}
#[cfg(feature = "std")]
pub fn connect_b2b<A>(&mut self, addr: A) -> Result<(), Error>
where
A: ToSocketAddrs,
{
let mut stream = TcpStream::connect(addr)?;
log::info!("B2B: Connected to {stream:?}");
match recv_tcp_msg(&mut stream)?.try_into()? {
TcpResponse::BrokerConnectHello {
broker_shmem_description: _,
hostname,
} => log::info!("B2B: Connected to {hostname}"),
_ => {
return Err(Error::illegal_state(
"Unexpected response from B2B server received.".to_string(),
));
}
}
let hostname = hostname::get()
.unwrap_or_else(|_| "<unknown>".into())
.to_string_lossy()
.into();
send_tcp_msg(&mut stream, &TcpRequest::RemoteBrokerHello { hostname })?;
let broker_id = match recv_tcp_msg(&mut stream)?.try_into()? {
TcpResponse::RemoteBrokerAccepted { broker_id } => {
log::info!("B2B: Got Connection Ack, broker_id {broker_id:?}");
broker_id
}
_ => {
return Err(Error::illegal_state(
"Unexpected response from B2B server received.".to_string(),
));
}
};
log::info!("B2B: We are broker {broker_id:?}");
let map_description = Self::b2b_thread_on(
stream,
self.peek_next_client_id(),
&self
.llmp_out
.out_shmems
.first()
.unwrap()
.shmem
.description(),
)?;
let new_shmem = LlmpSharedMap::existing(
self.shmem_provider
.shmem_from_description(map_description)?,
);
{
self.register_client(new_shmem);
}
Ok(())
}
unsafe fn forward_msg(&mut self, msg: *mut LlmpMsg) -> Result<(), Error> {
unsafe {
let out: *mut LlmpMsg = self.alloc_next((*msg).buf_len_padded as usize)?;
let actual_size = (*out).buf_len_padded;
let complete_size = actual_size as usize + size_of::<LlmpMsg>();
(msg as *const u8).copy_to_nonoverlapping(out as *mut u8, complete_size);
(*out).buf_len_padded = actual_size;
if let Err(e) = self.llmp_out.send(out, false) {
panic!("Error sending msg: {e:?}");
}
self.llmp_out.last_msg_sent = out;
Ok(())
}
}
#[inline]
#[cfg(any(unix, all(windows, feature = "std")))]
#[expect(clippy::unused_self)]
fn is_shutting_down(&self) -> bool {
unsafe { ptr::read_volatile(&raw const (LLMP_SIGHANDLER_STATE.shutting_down)) }
}
#[inline]
#[cfg(not(any(unix, all(windows, feature = "std"))))]
#[expect(clippy::unused_self)]
fn is_shutting_down(&self) -> bool {
false
}
#[inline]
fn has_clients(&self) -> bool {
self.llmp_clients.len() > self.listeners.len()
}
pub fn send_buf(&mut self, tag: Tag, buf: &[u8]) -> Result<(), Error> {
self.llmp_out.send_buf(tag, buf)
}
pub fn send_buf_with_flags(&mut self, tag: Tag, flags: Flags, buf: &[u8]) -> Result<(), Error> {
self.llmp_out.send_buf_with_flags(tag, flags, buf)
}
#[cfg(feature = "std")]
pub fn launch_tcp_listener_on(&mut self, port: u16) -> Result<thread::JoinHandle<()>, Error> {
let listener = tcp_bind(port)?;
log::info!("Server listening on port {port}");
self.launch_listener(Listener::Tcp(listener))
}
#[cfg(feature = "std")]
fn announce_new_client(
sender: &mut LlmpSender<SHM, SP>,
shmem_description: &ShMemDescription,
) -> Result<(), Error> {
unsafe {
let msg = sender
.alloc_next(size_of::<LlmpPayloadSharedMapInfo>())
.expect("Could not allocate a new message in shared map.");
(*msg).tag = LLMP_TAG_NEW_SHM_CLIENT;
#[expect(clippy::cast_ptr_alignment)]
let pageinfo = (*msg).buf.as_mut_ptr() as *mut LlmpPayloadSharedMapInfo;
(*pageinfo).shm_str = *shmem_description.id.as_array();
(*pageinfo).map_size = shmem_description.size;
sender.send(msg, true)
}
}
#[cfg(feature = "std")]
fn announce_client_exit(sender: &mut LlmpSender<SHM, SP>, client_id: u32) -> Result<(), Error> {
unsafe {
let msg = sender
.alloc_next(size_of::<LlmpClientExitInfo>())
.expect("Could not allocate a new message in shared map.");
(*msg).tag = LLMP_TAG_CLIENT_EXIT;
let client_id_offset = offset_of!(LlmpClientExitInfo, client_id);
write_unaligned(
(*msg).buf.as_mut_ptr().add(client_id_offset) as _,
client_id,
);
sender.send(msg, true)
}
}
#[cfg(feature = "std")]
#[expect(clippy::too_many_lines)]
fn b2b_thread_on(
mut stream: TcpStream,
b2b_client_id: ClientId,
broker_shmem_description: &ShMemDescription,
) -> Result<ShMemDescription, Error> {
let broker_shmem_description = *broker_shmem_description;
let (send, recv) = channel();
thread::spawn(move || {
let shmem_provider_bg = SP::new().unwrap();
#[cfg(feature = "llmp_debug")]
log::info!("B2b: Spawned proxy thread");
stream
.set_read_timeout(Some(_LLMP_B2B_BLOCK_TIME))
.expect("Failed to set tcp stream timeout");
let mut new_sender =
match LlmpSender::new(shmem_provider_bg.clone(), b2b_client_id, false) {
Ok(new_sender) => new_sender,
Err(e) => {
panic!("B2B: Could not map shared map: {e}");
}
};
send.send(new_sender.out_shmems.first().unwrap().shmem.description())
.expect("B2B: Error sending map description to channel!");
let mut local_receiver = LlmpReceiver::on_existing_from_description(
shmem_provider_bg,
&LlmpDescription {
last_message_offset: None,
shmem: broker_shmem_description,
},
)
.expect("Failed to map local page in broker 2 broker thread!");
#[cfg(feature = "llmp_debug")]
log::info!("B2B: Starting proxy loop :)");
let peer_address = stream.peer_addr().unwrap();
loop {
loop {
match local_receiver.recv_buf_with_flags() {
Ok(None) => break, Ok(Some((client_id, tag, flags, payload))) => {
if client_id == b2b_client_id {
log::info!(
"Ignored message we probably sent earlier (same id), TAG: {tag:?}"
);
continue;
}
#[cfg(feature = "llmp_debug")]
log::info!(
"Fowarding message ({} bytes) via broker2broker connection",
payload.len()
);
if let Err(e) = send_tcp_msg(
&mut stream,
&TcpRemoteNewMessage {
client_id,
tag,
flags,
payload: payload.to_vec(),
},
) {
log::info!(
"Got error {e} while trying to forward a message to broker {peer_address}, exiting thread"
);
return;
}
}
Err(Error::ShuttingDown) => {
log::info!("Local broker is shutting down, exiting thread");
return;
}
Err(e) => panic!("Error reading from local page! {e}"),
}
}
match recv_tcp_msg(&mut stream) {
Ok(val) => {
let msg: TcpRemoteNewMessage = val.try_into().expect(
"Illegal message received from broker 2 broker connection - shutting down.",
);
#[cfg(feature = "llmp_debug")]
log::info!(
"Fowarding incoming message ({} bytes) from broker2broker connection",
msg.payload.len()
);
new_sender
.send_buf_with_flags(
msg.tag,
msg.flags | LLMP_FLAG_FROM_B2B,
&msg.payload,
)
.expect("B2B: Error forwarding message. Exiting.");
}
Err(e) => {
if let Error::OsError(e, ..) = e {
if e.kind() == ErrorKind::UnexpectedEof {
log::info!(
"Broker {peer_address} seems to have disconnected, exiting"
);
return;
}
}
#[cfg(feature = "llmp_debug")]
log::info!("Received no input, timeout or closed. Looping back up :)");
}
}
}
});
let ret = recv.recv().map_err(|_| {
Error::unknown("Error launching background thread for b2b communcation".to_string())
});
#[cfg(feature = "llmp_debug")]
log::info!("B2B: returning from loop. Success: {}", ret.is_ok());
ret
}
#[cfg(feature = "std")]
fn handle_tcp_request(
mut stream: TcpStream,
request: &TcpRequest,
current_client_id: &mut ClientId,
sender: &mut LlmpSender<SHM, SP>,
broker_shmem_description: &ShMemDescription,
) {
match request {
TcpRequest::ClientQuit { client_id } => {
match Self::announce_client_exit(sender, client_id.0) {
Ok(()) => (),
Err(e) => log::info!("Error announcing client exit: {e:?}"),
}
}
TcpRequest::LocalClientHello { shmem_description } => {
match Self::announce_new_client(sender, shmem_description) {
Ok(()) => (),
Err(e) => log::info!("Error forwarding client on map: {e:?}"),
}
if let Err(e) = send_tcp_msg(
&mut stream,
&TcpResponse::LocalClientAccepted {
client_id: *current_client_id,
},
) {
log::info!("An error occurred sending via tcp {e}");
}
current_client_id.0 += 1;
}
TcpRequest::RemoteBrokerHello { hostname } => {
log::info!("B2B new client: {hostname}");
if send_tcp_msg(
&mut stream,
&TcpResponse::RemoteBrokerAccepted {
broker_id: BrokerId(current_client_id.0),
},
)
.is_err()
{
log::info!("Error accepting broker, ignoring.");
return;
}
if let Ok(shmem_description) =
Self::b2b_thread_on(stream, *current_client_id, broker_shmem_description)
{
if Self::announce_new_client(sender, &shmem_description).is_err() {
log::info!("B2B: Error announcing client {shmem_description:?}");
}
current_client_id.0 += 1;
}
}
}
}
#[cfg(feature = "std")]
pub fn launch_listener(&mut self, listener: Listener) -> Result<thread::JoinHandle<()>, Error> {
let client_out_shmem_mem = &self.llmp_out.out_shmems.first().unwrap().shmem;
let broker_shmem_description = client_out_shmem_mem.description();
let hostname = hostname::get()
.unwrap_or_else(|_| "<unknown>".into())
.to_string_lossy()
.into();
let broker_hello = TcpResponse::BrokerConnectHello {
broker_shmem_description,
hostname,
};
let llmp_tcp_id = self.peek_next_client_id();
let tcp_out_shmem = LlmpSharedMap::new(
llmp_tcp_id,
self.shmem_provider.new_shmem(LLMP_CFG_INITIAL_MAP_SIZE)?,
);
let tcp_out_shmem_description = tcp_out_shmem.shmem.description();
let listener_id = self.register_client(tcp_out_shmem);
let ret = thread::spawn(move || {
let mut shmem_provider_bg = SP::new().unwrap();
let mut current_client_id = ClientId(llmp_tcp_id.0 + 1);
let mut tcp_incoming_sender = LlmpSender {
id: llmp_tcp_id,
last_msg_sent: ptr::null_mut(),
out_shmems: vec![LlmpSharedMap::existing(
shmem_provider_bg
.shmem_from_description(tcp_out_shmem_description)
.unwrap(),
)],
keep_pages_forever: false,
has_unsent_message: false,
shmem_provider: shmem_provider_bg.clone(),
unused_shmem_cache: vec![],
};
loop {
match listener.accept() {
ListenerStream::Tcp(mut stream, addr) => {
log::info!(
"New connection: {:?}/{:?}",
addr,
stream.peer_addr().unwrap()
);
match send_tcp_msg(&mut stream, &broker_hello) {
Ok(()) => {}
Err(e) => {
log::error!("Error sending initial hello: {e:?}");
continue;
}
}
let buf = match recv_tcp_msg(&mut stream) {
Ok(buf) => buf,
Err(e) => {
log::error!("Error receving from tcp: {e:?}");
continue;
}
};
let req = match buf.try_into() {
Ok(req) => req,
Err(e) => {
log::error!("Could not deserialize tcp message: {e:?}");
continue;
}
};
Self::handle_tcp_request(
stream,
&req,
&mut current_client_id,
&mut tcp_incoming_sender,
&broker_shmem_description,
);
}
ListenerStream::Empty() => {}
}
}
});
self.listeners.push(listener_id);
Ok(ret)
}
}
#[derive(Debug, Copy, Clone, Serialize, Deserialize)]
pub struct LlmpClientDescription {
sender: LlmpDescription,
receiver: LlmpDescription,
}
#[derive(Debug)]
pub struct LlmpClient<SHM, SP> {
sender: LlmpSender<SHM, SP>,
receiver: LlmpReceiver<SHM, SP>,
}
impl<SHM, SP> LlmpClient<SHM, SP>
where
SHM: ShMem,
SP: ShMemProvider<ShMem = SHM>,
{
pub fn new(
mut shmem_provider: SP,
initial_broker_shmem: LlmpSharedMap<SHM>,
sender_id: ClientId,
) -> Result<Self, Error> {
Ok(Self {
sender: LlmpSender {
id: sender_id,
last_msg_sent: ptr::null_mut(),
out_shmems: vec![LlmpSharedMap::new(sender_id, {
shmem_provider.new_shmem(LLMP_CFG_INITIAL_MAP_SIZE)?
})],
keep_pages_forever: false,
has_unsent_message: false,
shmem_provider: shmem_provider.clone(),
unused_shmem_cache: vec![],
},
receiver: LlmpReceiver {
id: ClientId(0),
current_recv_shmem: initial_broker_shmem,
last_msg_recvd: ptr::null_mut(),
shmem_provider,
highest_msg_id: MessageId(0),
#[cfg(feature = "std")]
last_msg_time: current_time(),
},
})
}
pub fn new_p2p(shmem_provider: SP, sender_id: ClientId) -> Result<Self, Error> {
let sender = LlmpSender::new(shmem_provider.clone(), sender_id, false)?;
let receiver = LlmpReceiver::on_existing_shmem(
shmem_provider,
sender.out_shmems[0].shmem.clone(),
None,
)?;
Ok(Self { sender, receiver })
}
#[allow(clippy::needless_pass_by_value)] pub fn on_existing_shmem(
shmem_provider: SP,
_current_out_shmem: SHM,
_last_msg_sent_offset: Option<u64>,
current_broker_shmem: SHM,
last_msg_recvd_offset: Option<u64>,
) -> Result<Self, Error> {
Ok(Self {
receiver: LlmpReceiver::on_existing_shmem(
shmem_provider.clone(),
current_broker_shmem.clone(),
last_msg_recvd_offset,
)?,
sender: LlmpSender::on_existing_shmem(
shmem_provider,
current_broker_shmem,
last_msg_recvd_offset,
)?,
})
}
#[cfg(feature = "std")]
pub fn on_existing_from_env(shmem_provider: SP, env_name: &str) -> Result<Self, Error> {
Ok(Self {
sender: LlmpSender::on_existing_from_env(
shmem_provider.clone(),
&format!("{env_name}_SENDER"),
)?,
receiver: LlmpReceiver::on_existing_from_env(
shmem_provider,
&format!("{env_name}_RECEIVER"),
)?,
})
}
pub fn existing_client_from_description(
shmem_provider: SP,
description: &LlmpClientDescription,
) -> Result<Self, Error> {
Ok(Self {
sender: LlmpSender::on_existing_from_description(
shmem_provider.clone(),
&description.sender,
)?,
receiver: LlmpReceiver::on_existing_from_description(
shmem_provider,
&description.receiver,
)?,
})
}
pub fn send_buf(&mut self, tag: Tag, buf: &[u8]) -> Result<(), Error> {
self.sender.send_buf(tag, buf)
}
pub fn send_buf_with_flags(&mut self, tag: Tag, flags: Flags, buf: &[u8]) -> Result<(), Error> {
self.sender.send_buf_with_flags(tag, flags, buf)
}
#[inline]
pub unsafe fn recv(&mut self) -> Result<Option<*mut LlmpMsg>, Error> {
unsafe { self.receiver.recv() }
}
#[inline]
pub unsafe fn recv_blocking(&mut self) -> Result<*mut LlmpMsg, Error> {
unsafe { self.receiver.recv_blocking() }
}
#[inline]
pub unsafe fn alloc_next(&mut self, buf_len: usize) -> Result<*mut LlmpMsg, Error> {
self.sender.alloc_next(buf_len)
}
#[expect(clippy::type_complexity)]
#[inline]
pub fn recv_buf(&mut self) -> Result<Option<(ClientId, Tag, &[u8])>, Error> {
self.receiver.recv_buf()
}
#[inline]
pub fn recv_buf_blocking(&mut self) -> Result<(ClientId, Tag, &[u8]), Error> {
self.receiver.recv_buf_blocking()
}
#[expect(clippy::type_complexity)]
pub fn recv_buf_with_flags(&mut self) -> Result<Option<(ClientId, Tag, Flags, &[u8])>, Error> {
self.receiver.recv_buf_with_flags()
}
pub fn recv_buf_blocking_with_flags(&mut self) -> Result<(ClientId, Tag, Flags, &[u8]), Error> {
self.receiver.recv_buf_blocking_with_flags()
}
#[cfg(feature = "std")]
pub fn create_using_env(mut shmem_provider: SP, env_var: &str) -> Result<Self, Error> {
let map = LlmpSharedMap::existing(shmem_provider.existing_from_env(env_var)?);
let client_id = unsafe { (*map.page()).sender_id };
Self::new(shmem_provider, map, client_id)
}
#[cfg(feature = "std")]
pub fn create_attach_to_tcp(mut shmem_provider: SP, port: u16) -> Result<Self, Error> {
let mut stream = match TcpStream::connect((IP_LOCALHOST, port)) {
Ok(stream) => stream,
Err(e) => {
match e.kind() {
ErrorKind::ConnectionRefused => {
loop {
if let Ok(stream) = TcpStream::connect((IP_LOCALHOST, port)) {
break stream;
}
log::debug!("Connection Refused. Retrying...");
#[cfg(feature = "std")]
thread::sleep(Duration::from_millis(50));
}
}
_ => return Err(Error::illegal_state(e.to_string())),
}
}
};
log::info!("Connected to port {port}");
let TcpResponse::BrokerConnectHello {
broker_shmem_description,
hostname: _,
} = recv_tcp_msg(&mut stream)?.try_into()?
else {
return Err(Error::illegal_state(
"Received unexpected Broker Hello".to_string(),
));
};
let map = LlmpSharedMap::existing(
shmem_provider.shmem_from_description(broker_shmem_description)?,
);
let mut ret = Self::new(shmem_provider, map, ClientId(0))?;
let client_hello_req = TcpRequest::LocalClientHello {
shmem_description: ret.sender.out_shmems.first().unwrap().shmem.description(),
};
send_tcp_msg(&mut stream, &client_hello_req)?;
let TcpResponse::LocalClientAccepted {
client_id: client_sender_id,
} = recv_tcp_msg(&mut stream)?.try_into()?
else {
return Err(Error::illegal_state(
"Unexpected Response from Broker".to_string(),
));
};
ret.sender.id = client_sender_id;
unsafe {
(*ret.sender.out_shmems.first_mut().unwrap().page_mut()).sender_id = client_sender_id;
}
Ok(ret)
}
}
impl<SHM, SP> LlmpClient<SHM, SP>
where
SHM: ShMem,
{
pub fn await_safe_to_unmap_blocking(&self) {
self.sender.await_safe_to_unmap_blocking();
}
pub fn safe_to_unmap(&self) -> bool {
self.sender.safe_to_unmap()
}
pub unsafe fn mark_safe_to_unmap(&mut self) {
unsafe {
self.sender.mark_safe_to_unmap();
}
}
pub unsafe fn send(&mut self, msg: *mut LlmpMsg) -> Result<(), Error> {
unsafe { self.sender.send(msg, true) }
}
#[cfg(feature = "std")]
pub unsafe fn to_env(&self, env_name: &str) -> Result<(), Error> {
unsafe {
self.sender.to_env(&format!("{env_name}_SENDER"))?;
self.receiver.to_env(&format!("{env_name}_RECEIVER"))
}
}
pub fn describe(&self) -> Result<LlmpClientDescription, Error> {
Ok(LlmpClientDescription {
sender: self.sender.describe()?,
receiver: self.receiver.describe()?,
})
}
}
impl<SHM, SP> LlmpClient<SHM, SP> {
#[must_use]
pub fn sender(&self) -> &LlmpSender<SHM, SP> {
&self.sender
}
#[must_use]
pub fn sender_mut(&mut self) -> &mut LlmpSender<SHM, SP> {
&mut self.sender
}
#[must_use]
pub fn receiver(&self) -> &LlmpReceiver<SHM, SP> {
&self.receiver
}
#[must_use]
pub fn receiver_mut(&mut self) -> &mut LlmpReceiver<SHM, SP> {
&mut self.receiver
}
}
#[cfg(test)]
#[cfg(all(unix, feature = "std", not(target_os = "haiku")))]
mod tests {
use alloc::vec;
use core::time::Duration;
use std::thread::sleep;
use serial_test::serial;
use shmem_providers::{ShMemProvider, StdShMemProvider};
use super::{
LlmpClient,
LlmpConnection::{self, IsBroker, IsClient},
Tag,
};
#[test]
#[serial]
#[cfg_attr(miri, ignore)]
fn test_llmp_connection() {
let shmem_provider = StdShMemProvider::new().unwrap();
let mut broker = match LlmpConnection::on_port(shmem_provider.clone(), 1337).unwrap() {
IsClient { client: _ } => panic!("Could not bind to port as broker"),
IsBroker { broker } => broker,
};
let mut client = match LlmpConnection::on_port(shmem_provider.clone(), 1337).unwrap() {
IsBroker { broker: _ } => panic!("Second connect should be a client!"),
IsClient { client } => client,
};
sleep(Duration::from_millis(100));
broker.broker_once().unwrap();
let tag: Tag = Tag(0x1337);
let arr: [u8; 1] = [1_u8];
client.send_buf(tag, &arr).unwrap();
unsafe {
client.to_env("_ENV_TEST").unwrap();
}
#[cfg(all(feature = "llmp_debug", feature = "std"))]
log::info!("{:?}", std::env::vars());
for (key, value) in std::env::vars_os() {
log::info!("{key:?}: {value:?}");
}
client = LlmpClient::on_existing_from_env(shmem_provider, "_ENV_TEST").unwrap();
client.send_buf(tag, &arr).unwrap();
broker.broker_once().unwrap();
let (_sender_id, tag2, arr2) = client.recv_buf_blocking().unwrap();
assert_eq!(tag, tag2);
assert_eq!(arr[0], arr2[0]);
assert_eq!(broker.inner.llmp_clients.len(), 2);
}
}