use alloc::{string::String, vec::Vec};
use core::{
cmp::max,
convert::TryFrom,
fmt::Debug,
mem::size_of,
ptr, slice,
sync::atomic::{compiler_fence, Ordering},
time::Duration,
};
use serde::{Deserialize, Serialize};
#[cfg(feature = "std")]
use std::{
convert::TryInto,
env,
io::{Read, Write},
net::{SocketAddr, TcpListener, TcpStream, ToSocketAddrs},
sync::mpsc::channel,
thread,
};
#[cfg(all(feature = "llmp_debug", feature = "std"))]
use backtrace::Backtrace;
#[cfg(unix)]
use crate::bolts::os::unix_signals::{setup_signal_handler, siginfo_t, Handler, Signal};
use crate::{
bolts::shmem::{ShMem, ShMemDescription, ShMemId, ShMemProvider},
Error,
};
#[cfg(unix)]
use libc::ucontext_t;
#[cfg(not(feature = "llmp_small_maps"))]
const LLMP_CFG_INITIAL_MAP_SIZE: usize = 1 << 28;
#[cfg(feature = "llmp_small_maps")]
const LLMP_CFG_INITIAL_MAP_SIZE: usize = 1 << 20;
const LLMP_CFG_ALIGNNMENT: usize = 64;
const LLMP_TAG_UNSET: Tag = 0xDEADAF;
const LLMP_TAG_UNINITIALIZED: Tag = 0xA143AF11;
const LLMP_TAG_END_OF_PAGE: Tag = 0xAF1E0F1;
const LLMP_TAG_NEW_SHM_CLIENT: Tag = 0xC11E471;
const LLMP_TAG_EXITING: Tag = 0x13C5171;
pub const LLMP_FLAG_INITIALIZED: Flags = 0x0;
pub const LLMP_FLAG_COMPRESSED: Flags = 0x1;
pub const LLMP_FLAG_FROM_B2B: Flags = 0x2;
const _LLMP_B2B_BLOCK_TIME: Duration = Duration::from_millis(3_000);
#[cfg(feature = "llmp_bind_public")]
const _LLMP_BIND_ADDR: &str = "0.0.0.0";
#[cfg(not(feature = "llmp_bind_public"))]
const _LLMP_BIND_ADDR: &str = "127.0.0.1";
const _NULL_ENV_STR: &str = "_NULL";
const PAGE_INITIALIZED_MAGIC: u64 = 0x1A1A1A1A1A1A1AF1;
const EOP_MSG_SIZE: usize =
llmp_align(size_of::<LlmpMsg>() + size_of::<LlmpPayloadSharedMapInfo>());
const LLMP_PAGE_HEADER_LEN: usize = size_of::<LlmpPage>();
#[cfg(unix)]
static mut GLOBAL_SIGHANDLER_STATE: LlmpBrokerSignalHandler = LlmpBrokerSignalHandler {
shutting_down: false,
};
pub type Tag = u32;
pub type ClientId = u32;
pub type BrokerId = u32;
pub type Flags = u32;
pub type MessageId = u64;
#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum TcpRequest {
LocalClientHello {
shmem_description: ShMemDescription,
},
RemoteBrokerHello {
hostname: String,
},
}
impl TryFrom<&Vec<u8>> for TcpRequest {
type Error = crate::Error;
fn try_from(bytes: &Vec<u8>) -> Result<Self, Error> {
Ok(postcard::from_bytes(bytes)?)
}
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct TcpRemoteNewMessage {
client_id: ClientId,
tag: Tag,
flags: Flags,
payload: Vec<u8>,
}
impl TryFrom<&Vec<u8>> for TcpRemoteNewMessage {
type Error = crate::Error;
fn try_from(bytes: &Vec<u8>) -> Result<Self, Error> {
Ok(postcard::from_bytes(bytes)?)
}
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum TcpResponse {
BrokerConnectHello {
broker_map_description: ShMemDescription,
hostname: String,
},
LocalClientAccepted {
client_id: ClientId,
},
RemoteBrokerAccepted {
broker_id: BrokerId,
},
Error {
description: String,
},
}
impl TryFrom<&Vec<u8>> for TcpResponse {
type Error = crate::Error;
fn try_from(bytes: &Vec<u8>) -> Result<Self, Error> {
Ok(postcard::from_bytes(bytes)?)
}
}
#[cfg(feature = "std")]
pub enum Listener {
Tcp(TcpListener),
}
#[cfg(feature = "std")]
pub enum ListenerStream {
Tcp(TcpStream, SocketAddr),
Empty(),
}
#[cfg(feature = "std")]
impl Listener {
fn accept(&self) -> ListenerStream {
match self {
Listener::Tcp(inner) => match inner.accept() {
Ok(res) => ListenerStream::Tcp(res.0, res.1),
Err(err) => {
dbg!("Ignoring failed accept", err);
ListenerStream::Empty()
}
},
}
}
}
#[inline]
unsafe fn shmem2page_mut<SHM: ShMem>(afl_shmem: &mut SHM) -> *mut LlmpPage {
afl_shmem.map_mut().as_mut_ptr() as *mut LlmpPage
}
#[inline]
unsafe fn shmem2page<SHM: ShMem>(afl_shmem: &SHM) -> *const LlmpPage {
afl_shmem.map().as_ptr() as *const LlmpPage
}
#[inline]
unsafe fn llmp_msg_in_page(page: *const LlmpPage, msg: *const LlmpMsg) -> bool {
(page as *const u8) < msg as *const u8
&& (page as *const u8).add((*page).size_total) > msg as *const u8
}
#[inline]
const fn llmp_align(to_align: usize) -> usize {
if LLMP_CFG_ALIGNNMENT == 0 {
return to_align;
}
let modulo = to_align % LLMP_CFG_ALIGNNMENT;
if modulo == 0 {
to_align
} else {
to_align + LLMP_CFG_ALIGNNMENT - modulo
}
}
#[cfg(feature = "std")]
#[inline]
fn msg_offset_from_env(env_name: &str) -> Result<Option<u64>, Error> {
let msg_offset_str = env::var(&format!("{}_OFFSET", env_name))?;
Ok(if msg_offset_str == _NULL_ENV_STR {
None
} else {
Some(msg_offset_str.parse()?)
})
}
#[cfg(feature = "std")]
fn send_tcp_msg<T>(stream: &mut TcpStream, msg: &T) -> Result<(), Error>
where
T: Serialize,
{
let msg = postcard::to_allocvec(msg)?;
if msg.len() > u32::MAX as usize {
return Err(Error::IllegalState(format!(
"Trying to send message a tcp message > u32! (size: {})",
msg.len()
)));
}
#[cfg(feature = "llmp_debug")]
println!("LLMP TCP: Sending {} bytes", msg.len());
let size_bytes = (msg.len() as u32).to_be_bytes();
stream.write_all(&size_bytes)?;
stream.write_all(&msg)?;
#[cfg(feature = "llmp_debug")]
println!("LLMP TCP: Sending {} bytes finished.", msg.len());
Ok(())
}
#[cfg(feature = "std")]
fn recv_tcp_msg(stream: &mut TcpStream) -> Result<Vec<u8>, Error> {
#[cfg(feature = "llmp_debug")]
println!(
"LLMP TCP: Waiting for packet... (Timeout: {:?})",
stream.read_timeout().unwrap_or(None)
);
let mut size_bytes = [0u8; 4];
stream.read_exact(&mut size_bytes)?;
let size = u32::from_be_bytes(size_bytes);
let mut bytes = vec![];
bytes.resize(size as usize, 0u8);
#[cfg(feature = "llmp_debug")]
println!("LLMP TCP: Receiving payload of size {}", size);
stream
.read_exact(&mut bytes)
.expect("Failed to read message body");
Ok(bytes)
}
#[inline]
fn new_map_size(max_alloc: usize) -> usize {
max(
max_alloc * 2 + EOP_MSG_SIZE + LLMP_PAGE_HEADER_LEN,
LLMP_CFG_INITIAL_MAP_SIZE - 1,
)
.next_power_of_two()
}
unsafe fn _llmp_page_init<SHM: ShMem>(shmem: &mut SHM, sender: u32, allow_reinit: bool) {
#[cfg(all(feature = "llmp_debug", feature = "std"))]
dbg!("_llmp_page_init: shmem {}", &shmem);
let map_size = shmem.len();
let page = shmem2page_mut(shmem);
#[cfg(all(feature = "llmp_debug", feature = "std"))]
dbg!("_llmp_page_init: page {}", *page);
if (*page).magic == PAGE_INITIALIZED_MAGIC && !allow_reinit {
panic!(
"Tried to initialize page {:?} twice (for shmem {:?})",
page, shmem
);
};
(*page).magic = PAGE_INITIALIZED_MAGIC;
(*page).sender = sender;
ptr::write_volatile(ptr::addr_of_mut!((*page).current_msg_id), 0);
(*page).max_alloc_size = 0;
(*page).size_total = map_size - LLMP_PAGE_HEADER_LEN;
(*page).size_used = 0;
(*(*page).messages.as_mut_ptr()).message_id = 0;
(*(*page).messages.as_mut_ptr()).tag = LLMP_TAG_UNSET;
ptr::write_volatile(ptr::addr_of_mut!((*page).save_to_unmap), 0);
ptr::write_volatile(ptr::addr_of_mut!((*page).sender_dead), 0);
assert!((*page).size_total != 0);
}
#[inline]
unsafe fn llmp_next_msg_ptr_checked<SHM: ShMem>(
map: &mut LlmpSharedMap<SHM>,
last_msg: *const LlmpMsg,
alloc_size: usize,
) -> Result<*mut LlmpMsg, Error> {
let page = map.page_mut();
let map_size = map.shmem.map().len();
let msg_begin_min = (page as *const u8).add(size_of::<LlmpPage>());
let msg_begin_max = (page as *const u8).add(map_size - alloc_size);
let next = _llmp_next_msg_ptr(last_msg);
let next_ptr = next as *const u8;
if next_ptr >= msg_begin_min && next_ptr <= msg_begin_max {
Ok(next)
} else {
Err(Error::IllegalState(format!(
"Inconsistent data on sharedmap, or Bug (next_ptr was {:x}, sharedmap page was {:x})",
next_ptr as usize, page as usize
)))
}
}
#[inline]
unsafe fn _llmp_next_msg_ptr(last_msg: *const LlmpMsg) -> *mut LlmpMsg {
(last_msg as *mut u8)
.add(size_of::<LlmpMsg>())
.add((*last_msg).buf_len_padded as usize) as *mut LlmpMsg
}
#[derive(Copy, Clone, Debug, Serialize, Deserialize)]
pub struct LlmpDescription {
shmem: ShMemDescription,
last_message_offset: Option<u64>,
}
#[derive(Copy, Clone, Debug)]
pub enum LlmpMsgHookResult {
Handled,
ForwardToClients,
}
#[derive(Copy, Clone, Debug)]
#[repr(C, packed)]
pub struct LlmpMsg {
pub tag: Tag,
pub sender: ClientId,
pub broker: BrokerId,
pub flags: Flags,
pub message_id: MessageId,
pub buf_len: u64,
pub buf_len_padded: u64,
pub buf: [u8; 0],
}
impl LlmpMsg {
#[must_use]
pub unsafe fn as_slice_unsafe(&self) -> &[u8] {
slice::from_raw_parts(self.buf.as_ptr(), self.buf_len as usize)
}
#[inline]
pub fn as_slice<SHM: ShMem>(&self, map: &mut LlmpSharedMap<SHM>) -> Result<&[u8], Error> {
unsafe {
if self.in_map(map) {
Ok(self.as_slice_unsafe())
} else {
Err(Error::IllegalState("Current message not in page. The sharedmap get tampered with or we have a BUG.".into()))
}
}
}
#[inline]
pub fn in_map<SHM: ShMem>(&self, map: &mut LlmpSharedMap<SHM>) -> bool {
unsafe {
let map_size = map.shmem.map().len();
let buf_ptr = self.buf.as_ptr();
if buf_ptr > (map.page_mut() as *const u8).add(size_of::<LlmpPage>())
&& buf_ptr
<= (map.page_mut() as *const u8).add(map_size - size_of::<LlmpMsg>() as usize)
{
let len = self.buf_len_padded as usize + size_of::<LlmpMsg>();
buf_ptr <= (map.page_mut() as *const u8).add(map_size - len)
} else {
false
}
}
}
}
#[derive(Debug)]
pub enum LlmpConnection<SP>
where
SP: ShMemProvider + 'static,
{
IsBroker {
broker: LlmpBroker<SP>,
},
IsClient {
client: LlmpClient<SP>,
},
}
impl<SP> LlmpConnection<SP>
where
SP: ShMemProvider,
{
#[cfg(feature = "std")]
pub fn on_port(shmem_provider: SP, port: u16) -> Result<Self, Error> {
match TcpListener::bind(format!("{}:{}", _LLMP_BIND_ADDR, port)) {
Ok(listener) => {
dbg!("We're the broker");
let mut broker = LlmpBroker::new(shmem_provider)?;
let _listener_thread = broker.launch_listener(Listener::Tcp(listener))?;
Ok(LlmpConnection::IsBroker { broker })
}
Err(e) => {
println!("error: {:?}", e);
match e.kind() {
std::io::ErrorKind::AddrInUse => {
dbg!("We're the client", e);
Ok(LlmpConnection::IsClient {
client: LlmpClient::create_attach_to_tcp(shmem_provider, port)?,
})
}
_ => Err(Error::File(e)),
}
}
}
}
pub fn describe(&self) -> Result<LlmpClientDescription, Error> {
Ok(match self {
LlmpConnection::IsClient { client } => client.describe()?,
_ => todo!("Only client can be described atm."),
})
}
pub fn existing_client_from_description(
shmem_provider: SP,
description: &LlmpClientDescription,
) -> Result<LlmpConnection<SP>, Error> {
Ok(LlmpConnection::IsClient {
client: LlmpClient::existing_client_from_description(shmem_provider, description)?,
})
}
pub fn send_buf(&mut self, tag: Tag, buf: &[u8]) -> Result<(), Error> {
match self {
LlmpConnection::IsBroker { broker } => broker.send_buf(tag, buf),
LlmpConnection::IsClient { client } => client.send_buf(tag, buf),
}
}
pub fn send_buf_with_flags(&mut self, tag: Tag, buf: &[u8], flags: Flags) -> Result<(), Error> {
match self {
LlmpConnection::IsBroker { broker } => broker.send_buf_with_flags(tag, flags, buf),
LlmpConnection::IsClient { client } => client.send_buf_with_flags(tag, flags, buf),
}
}
}
#[derive(Copy, Clone, Debug)]
#[repr(C, packed)]
pub struct LlmpPage {
pub magic: u64,
pub sender: u32,
pub save_to_unmap: u16,
pub sender_dead: u16,
pub current_msg_id: u64,
pub size_total: usize,
pub size_used: usize,
pub max_alloc_size: usize,
pub messages: [LlmpMsg; 0],
}
#[derive(Copy, Clone, Debug)]
#[repr(C, packed)]
struct LlmpPayloadSharedMapInfo {
pub map_size: usize,
pub shm_str: [u8; 20],
}
#[derive(Debug)]
pub struct LlmpSender<SP>
where
SP: ShMemProvider,
{
pub id: u32,
pub last_msg_sent: *const LlmpMsg,
pub out_maps: Vec<LlmpSharedMap<SP::Mem>>,
pub keep_pages_forever: bool,
shmem_provider: SP,
}
impl<SP> LlmpSender<SP>
where
SP: ShMemProvider,
{
pub fn new(mut shmem_provider: SP, id: u32, keep_pages_forever: bool) -> Result<Self, Error> {
Ok(Self {
id,
last_msg_sent: ptr::null_mut(),
out_maps: vec![LlmpSharedMap::new(
0,
shmem_provider.new_map(LLMP_CFG_INITIAL_MAP_SIZE)?,
)],
keep_pages_forever,
shmem_provider,
})
}
pub unsafe fn reset(&mut self) {
_llmp_page_init(&mut self.out_maps.last_mut().unwrap().shmem, self.id, true);
self.last_msg_sent = ptr::null_mut();
}
#[cfg(feature = "std")]
pub fn on_existing_from_env(mut shmem_provider: SP, env_name: &str) -> Result<Self, Error> {
let msg_sent_offset = msg_offset_from_env(env_name)?;
Self::on_existing_map(
shmem_provider.clone(),
shmem_provider.existing_from_env(env_name)?,
msg_sent_offset,
)
}
#[cfg(feature = "std")]
pub fn to_env(&self, env_name: &str) -> Result<(), Error> {
let current_out_map = self.out_maps.last().unwrap();
current_out_map.shmem.write_to_env(env_name)?;
unsafe { current_out_map.msg_to_env(self.last_msg_sent, env_name) }
}
pub fn await_save_to_unmap_blocking(&self) {
loop {
if self.save_to_unmap() {
return;
}
}
}
pub fn save_to_unmap(&self) -> bool {
let current_out_map = self.out_maps.last().unwrap();
unsafe {
compiler_fence(Ordering::SeqCst);
ptr::read_volatile(ptr::addr_of!((*current_out_map.page()).save_to_unmap)) != 0
}
}
pub fn on_existing_map(
shmem_provider: SP,
current_out_map: SP::Mem,
last_msg_sent_offset: Option<u64>,
) -> Result<Self, Error> {
let mut out_map = LlmpSharedMap::existing(current_out_map);
let last_msg_sent = match last_msg_sent_offset {
Some(offset) => out_map.msg_from_offset(offset)?,
None => ptr::null_mut(),
};
Ok(Self {
id: 0,
last_msg_sent,
out_maps: vec![out_map],
keep_pages_forever: false,
shmem_provider,
})
}
unsafe fn prune_old_pages(&mut self) {
let mut unmap_until_excl = 0;
for map in self.out_maps.split_last_mut().unwrap().1 {
if (*map.page_mut()).save_to_unmap == 0 {
break;
}
unmap_until_excl += 1;
}
self.out_maps.drain(0..unmap_until_excl);
}
unsafe fn alloc_eop(&mut self) -> Result<*mut LlmpMsg, Error> {
let mut map = self.out_maps.last_mut().unwrap();
let page = map.page_mut();
let last_msg = self.last_msg_sent;
if (*page).size_used + EOP_MSG_SIZE > (*page).size_total {
panic!("PROGRAM ABORT : BUG: EOP does not fit in page! page {:?}, size_current {:?}, size_total {:?}", page,
ptr::addr_of!((*page).size_used), ptr::addr_of!((*page).size_total));
}
let mut ret: *mut LlmpMsg = if last_msg.is_null() {
(*page).messages.as_mut_ptr()
} else {
llmp_next_msg_ptr_checked(&mut map, last_msg, EOP_MSG_SIZE)?
};
if (*ret).tag == LLMP_TAG_UNINITIALIZED {
panic!("Did not call send() on last message!");
}
(*ret).buf_len = size_of::<LlmpPayloadSharedMapInfo>() as u64;
(*ret).buf_len_padded = (*ret).buf_len;
(*ret).message_id = if last_msg.is_null() {
1
} else {
(*last_msg).message_id + 1
};
(*ret).tag = LLMP_TAG_END_OF_PAGE;
(*page).size_used += EOP_MSG_SIZE;
Ok(ret)
}
unsafe fn alloc_next_if_space(&mut self, buf_len: usize) -> Option<*mut LlmpMsg> {
let buf_len_padded;
let mut complete_msg_size = llmp_align(size_of::<LlmpMsg>() + buf_len);
let map = self.out_maps.last_mut().unwrap();
let page = map.page_mut();
let last_msg = self.last_msg_sent;
#[cfg(all(feature = "llmp_debug", feature = "std"))]
println!(
"Allocating {} (>={}) bytes on page {:?} / map {:?} (last msg: {:?})",
complete_msg_size, buf_len, page, &map, last_msg
);
(*page).max_alloc_size = max((*page).max_alloc_size, complete_msg_size);
let mut ret: *mut LlmpMsg;
if last_msg.is_null() || (*last_msg).tag == LLMP_TAG_END_OF_PAGE {
ret = (*page).messages.as_mut_ptr();
let base_addr = ret as usize;
buf_len_padded =
llmp_align(base_addr + complete_msg_size) - base_addr - size_of::<LlmpMsg>();
complete_msg_size = buf_len_padded + size_of::<LlmpMsg>();
#[cfg(all(feature = "llmp_debug", feature = "std"))]
dbg!(
page,
*page,
(*page).size_used,
complete_msg_size,
EOP_MSG_SIZE,
(*page).size_total
);
if (*page).size_used + complete_msg_size + EOP_MSG_SIZE > (*page).size_total {
return None;
}
(*ret).message_id = if last_msg.is_null() {
1
} else {
(*last_msg).message_id + 1
}
} else if (*page).current_msg_id == (*last_msg).message_id {
buf_len_padded = complete_msg_size - size_of::<LlmpMsg>();
if (*page).size_used + complete_msg_size + EOP_MSG_SIZE > (*page).size_total {
return None;
}
ret = match llmp_next_msg_ptr_checked(map, last_msg, complete_msg_size) {
Ok(msg) => msg,
Err(e) => {
#[cfg(feature = "std")]
dbg!("Unexpected error allocing new msg", e);
#[cfg(feature = "std")]
return None;
#[cfg(not(feature = "std"))]
panic!("Unexpected error allocing new msg {:?}", e);
}
};
(*ret).message_id = (*last_msg).message_id + 1
} else {
panic!("BUG: The current message never got committed using send! (page->current_msg_id {:?}, last_msg->message_id: {})", ptr::addr_of!((*page).current_msg_id), (*last_msg).message_id);
}
if last_msg.is_null() && (*page).size_used != 0
|| ((ret as usize) - (*page).messages.as_mut_ptr() as usize) != (*page).size_used
{
panic!("Allocated new message without calling send() inbetween. ret: {:?}, page: {:?}, complete_msg_size: {:?}, size_used: {:?}, last_msg: {:?}", ret, page,
buf_len_padded, ptr::addr_of!((*page).size_used), last_msg);
}
(*page).size_used += complete_msg_size;
(*ret).buf_len_padded = buf_len_padded as u64;
(*ret).buf_len = buf_len as u64;
(*_llmp_next_msg_ptr(ret)).tag = LLMP_TAG_UNSET;
(*ret).tag = LLMP_TAG_UNINITIALIZED;
Some(ret)
}
#[inline(never)]
unsafe fn send(&mut self, msg: *mut LlmpMsg) -> Result<(), Error> {
if self.last_msg_sent == msg {
panic!("Message sent twice!");
}
if (*msg).tag == LLMP_TAG_UNSET {
panic!("No tag set on message with id {}", (*msg).message_id);
}
let page = self.out_maps.last_mut().unwrap().page_mut();
if msg.is_null() || !llmp_msg_in_page(page, msg) {
return Err(Error::Unknown(format!(
"Llmp Message {:?} is null or not in current page",
msg
)));
}
(*msg).message_id = (*page).current_msg_id + 1;
compiler_fence(Ordering::SeqCst);
ptr::write_volatile(ptr::addr_of_mut!((*page).current_msg_id), (*msg).message_id);
compiler_fence(Ordering::SeqCst);
self.last_msg_sent = msg;
Ok(())
}
unsafe fn handle_out_eop(&mut self) -> Result<(), Error> {
#[cfg(all(feature = "llmp_debug", feature = "std"))]
{
#[cfg(debug_assertions)]
let bt = Backtrace::new();
#[cfg(not(debug_assertions))]
let bt = "<n/a (release)>";
let shm = self.out_maps.last().unwrap();
println!(
"LLMP_DEBUG: End of page reached for map {} with len {}, sending EOP, bt: {:?}",
shm.shmem.id().to_string(),
shm.shmem.len(),
bt
);
}
let old_map = self.out_maps.last_mut().unwrap().page_mut();
#[cfg(all(feature = "llmp_debug", feature = "std"))]
println!("New Map Size {}", new_map_size((*old_map).max_alloc_size));
let mut new_map_shmem = LlmpSharedMap::new(
(*old_map).sender,
self.shmem_provider
.new_map(new_map_size((*old_map).max_alloc_size))?,
);
let mut new_map = new_map_shmem.page_mut();
#[cfg(all(feature = "llmp_debug", feature = "std"))]
println!("got new map at: {:?}", new_map);
ptr::write_volatile(
ptr::addr_of_mut!((*new_map).current_msg_id),
(*old_map).current_msg_id,
);
#[cfg(all(feature = "llmp_debug", feature = "std"))]
println!("Setting max alloc size: {:?}", (*old_map).max_alloc_size);
(*new_map).max_alloc_size = (*old_map).max_alloc_size;
let mut out: *mut LlmpMsg = self.alloc_eop()?;
(*out).sender = (*old_map).sender;
let mut end_of_page_msg = (*out).buf.as_mut_ptr() as *mut LlmpPayloadSharedMapInfo;
(*end_of_page_msg).map_size = new_map_shmem.shmem.len();
(*end_of_page_msg).shm_str = *new_map_shmem.shmem.id().as_slice();
self.send(out)?;
self.out_maps.push(new_map_shmem);
self.last_msg_sent = ptr::null_mut();
if !self.keep_pages_forever {
#[cfg(all(feature = "llmp_debug", feature = "std"))]
println!("pruning");
self.prune_old_pages();
}
Ok(())
}
pub fn alloc_next(&mut self, buf_len: usize) -> Result<*mut LlmpMsg, Error> {
if let Some(msg) = unsafe { self.alloc_next_if_space(buf_len) } {
return Ok(msg);
};
unsafe {
self.handle_out_eop()?;
}
#[cfg(all(feature = "llmp_debug", feature = "std"))]
println!("Handled out eop");
match unsafe { self.alloc_next_if_space(buf_len) } {
Some(msg) => Ok(msg),
None => Err(Error::Unknown(format!(
"Error allocating {} bytes in shmap",
buf_len
))),
}
}
pub unsafe fn cancel_send(&mut self, msg: *mut LlmpMsg) {
let page = self.out_maps.last_mut().unwrap().page_mut();
(*msg).tag = LLMP_TAG_UNSET;
(*page).size_used -= (*msg).buf_len_padded as usize + size_of::<LlmpMsg>();
}
pub fn send_buf(&mut self, tag: Tag, buf: &[u8]) -> Result<(), Error> {
if tag == LLMP_TAG_NEW_SHM_CLIENT
|| tag == LLMP_TAG_END_OF_PAGE
|| tag == LLMP_TAG_UNINITIALIZED
|| tag == LLMP_TAG_UNSET
{
return Err(Error::Unknown(format!(
"Reserved tag supplied to send_buf ({:#X})",
tag
)));
}
unsafe {
let msg = self.alloc_next(buf.len())?;
(*msg).tag = tag;
(*msg).flags = LLMP_FLAG_INITIALIZED;
buf.as_ptr()
.copy_to_nonoverlapping((*msg).buf.as_mut_ptr(), buf.len());
self.send(msg)
}
}
pub fn send_buf_with_flags(&mut self, tag: Tag, flags: Flags, buf: &[u8]) -> Result<(), Error> {
if tag == LLMP_TAG_NEW_SHM_CLIENT
|| tag == LLMP_TAG_END_OF_PAGE
|| tag == LLMP_TAG_UNINITIALIZED
|| tag == LLMP_TAG_UNSET
{
return Err(Error::Unknown(format!(
"Reserved tag supplied to send_buf ({:#X})",
tag
)));
}
unsafe {
let msg = self.alloc_next(buf.len())?;
(*msg).tag = tag;
(*msg).flags = flags;
buf.as_ptr()
.copy_to_nonoverlapping((*msg).buf.as_mut_ptr(), buf.len());
self.send(msg)
}
}
pub fn describe(&self) -> Result<LlmpDescription, Error> {
let map = self.out_maps.last().unwrap();
let last_message_offset = if self.last_msg_sent.is_null() {
None
} else {
Some(unsafe { map.msg_to_offset(self.last_msg_sent) }?)
};
Ok(LlmpDescription {
shmem: map.shmem.description(),
last_message_offset,
})
}
pub fn on_existing_from_description(
mut shmem_provider: SP,
description: &LlmpDescription,
) -> Result<Self, Error> {
Self::on_existing_map(
shmem_provider.clone(),
shmem_provider.from_description(description.shmem)?,
description.last_message_offset,
)
}
}
#[derive(Debug)]
pub struct LlmpReceiver<SP>
where
SP: ShMemProvider,
{
pub id: u32,
pub last_msg_recvd: *const LlmpMsg,
pub shmem_provider: SP,
pub current_recv_map: LlmpSharedMap<SP::Mem>,
}
impl<SP> LlmpReceiver<SP>
where
SP: ShMemProvider,
{
#[cfg(feature = "std")]
pub fn on_existing_from_env(mut shmem_provider: SP, env_name: &str) -> Result<Self, Error> {
Self::on_existing_map(
shmem_provider.clone(),
shmem_provider.existing_from_env(env_name)?,
msg_offset_from_env(env_name)?,
)
}
#[cfg(feature = "std")]
pub fn to_env(&self, env_name: &str) -> Result<(), Error> {
let current_out_map = &self.current_recv_map;
current_out_map.shmem.write_to_env(env_name)?;
unsafe { current_out_map.msg_to_env(self.last_msg_recvd, env_name) }
}
pub fn on_existing_map(
shmem_provider: SP,
current_sender_map: SP::Mem,
last_msg_recvd_offset: Option<u64>,
) -> Result<Self, Error> {
let mut current_recv_map = LlmpSharedMap::existing(current_sender_map);
let last_msg_recvd = match last_msg_recvd_offset {
Some(offset) => current_recv_map.msg_from_offset(offset)?,
None => ptr::null_mut(),
};
Ok(Self {
id: 0,
current_recv_map,
last_msg_recvd,
shmem_provider,
})
}
#[inline(never)]
unsafe fn recv(&mut self) -> Result<Option<*mut LlmpMsg>, Error> {
compiler_fence(Ordering::SeqCst);
let mut page = self.current_recv_map.page_mut();
let last_msg = self.last_msg_recvd;
let current_msg_id = ptr::read_volatile(ptr::addr_of!((*page).current_msg_id));
let ret = if current_msg_id == 0 {
None
} else if last_msg.is_null() {
Some((*page).messages.as_mut_ptr())
} else if (*last_msg).message_id == current_msg_id {
None
} else {
Some(llmp_next_msg_ptr_checked(
&mut self.current_recv_map,
last_msg,
size_of::<LlmpMsg>(),
)?)
};
if let Some(msg) = ret {
if !(*msg).in_map(&mut self.current_recv_map) {
return Err(Error::IllegalState("Unexpected message in map (out of map bounds) - bugy client or tampered shared map detedted!".into()));
}
match (*msg).tag {
LLMP_TAG_UNSET => panic!("BUG: Read unallocated msg"),
LLMP_TAG_EXITING => {
assert_eq!((*msg).buf_len, 0);
return Err(Error::ShuttingDown);
}
LLMP_TAG_END_OF_PAGE => {
#[cfg(feature = "std")]
println!("Received end of page, allocating next");
if (*msg).buf_len < size_of::<LlmpPayloadSharedMapInfo>() as u64 {
panic!(
"Illegal message length for EOP (is {}/{}, expected {})",
(*msg).buf_len,
(*msg).buf_len_padded,
size_of::<LlmpPayloadSharedMapInfo>()
);
}
let pageinfo = (*msg).buf.as_mut_ptr() as *mut LlmpPayloadSharedMapInfo;
let pageinfo_cpy = *pageinfo;
self.last_msg_recvd = ptr::null();
ptr::write_volatile(ptr::addr_of_mut!((*page).save_to_unmap), 1);
self.current_recv_map =
LlmpSharedMap::existing(self.shmem_provider.from_id_and_size(
ShMemId::from_slice(&pageinfo_cpy.shm_str),
pageinfo_cpy.map_size,
)?);
page = self.current_recv_map.page_mut();
ptr::write_volatile(ptr::addr_of_mut!((*page).save_to_unmap), 1);
#[cfg(all(feature = "llmp_debug", feature = "std"))]
println!(
"LLMP_DEBUG: Got a new recv map {} with len {:?}",
self.current_recv_map.shmem.id().to_string(),
self.current_recv_map.shmem.len()
);
return self.recv();
}
_ => (),
}
self.last_msg_recvd = msg;
};
Ok(ret)
}
pub unsafe fn recv_blocking(&mut self) -> Result<*mut LlmpMsg, Error> {
let mut current_msg_id = 0;
let page = self.current_recv_map.page_mut();
let last_msg = self.last_msg_recvd;
if !last_msg.is_null() {
if (*last_msg).tag == LLMP_TAG_END_OF_PAGE && !llmp_msg_in_page(page, last_msg) {
panic!("BUG: full page passed to await_message_blocking or reset failed");
}
current_msg_id = (*last_msg).message_id
}
loop {
compiler_fence(Ordering::SeqCst);
if ptr::read_volatile(ptr::addr_of!((*page).current_msg_id)) != current_msg_id {
return match self.recv()? {
Some(msg) => Ok(msg),
None => panic!("BUG: blocking llmp message should never be NULL"),
};
}
}
}
#[allow(clippy::type_complexity)]
#[inline]
pub fn recv_buf(&mut self) -> Result<Option<(u32, Tag, &[u8])>, Error> {
if let Some((sender, tag, _flags, buf)) = self.recv_buf_with_flags()? {
Ok(Some((sender, tag, buf)))
} else {
Ok(None)
}
}
#[allow(clippy::type_complexity)]
#[inline]
pub fn recv_buf_with_flags(&mut self) -> Result<Option<(ClientId, Tag, Flags, &[u8])>, Error> {
unsafe {
Ok(match self.recv()? {
Some(msg) => Some((
(*msg).sender,
(*msg).tag,
(*msg).flags,
(*msg).as_slice(&mut self.current_recv_map)?,
)),
None => None,
})
}
}
#[inline]
pub fn recv_buf_blocking(&mut self) -> Result<(ClientId, Tag, &[u8]), Error> {
unsafe {
let msg = self.recv_blocking()?;
Ok((
(*msg).sender,
(*msg).tag,
(*msg).as_slice(&mut self.current_recv_map)?,
))
}
}
pub fn describe(&self) -> Result<LlmpDescription, Error> {
let map = &self.current_recv_map;
let last_message_offset = if self.last_msg_recvd.is_null() {
None
} else {
Some(unsafe { map.msg_to_offset(self.last_msg_recvd) }?)
};
Ok(LlmpDescription {
shmem: map.shmem.description(),
last_message_offset,
})
}
pub fn on_existing_from_description(
mut shmem_provider: SP,
description: &LlmpDescription,
) -> Result<Self, Error> {
Self::on_existing_map(
shmem_provider.clone(),
shmem_provider.from_description(description.shmem)?,
description.last_message_offset,
)
}
}
#[derive(Clone, Debug)]
pub struct LlmpSharedMap<SHM>
where
SHM: ShMem,
{
pub shmem: SHM,
}
impl<SHM> LlmpSharedMap<SHM>
where
SHM: ShMem,
{
pub fn new(sender: ClientId, mut new_map: SHM) -> Self {
#[cfg(all(feature = "llmp_debug", feature = "std"))]
println!(
"LLMP_DEBUG: Initializing map on {} with size {}",
new_map.id().to_string(),
new_map.len()
);
unsafe {
_llmp_page_init(&mut new_map, sender, false);
}
Self { shmem: new_map }
}
pub fn existing(existing_map: SHM) -> Self {
#[cfg(all(feature = "llmp_debug", feature = "std"))]
dbg!(
"LLMP_DEBUG: Using existing map {} with size {}",
existing_map.id().to_string(),
existing_map.len(),
);
let ret = Self {
shmem: existing_map,
};
unsafe {
if (*ret.page()).magic != PAGE_INITIALIZED_MAGIC {
panic!("Map was not priviously initialized at {:?}", &ret.shmem);
}
#[cfg(all(feature = "llmp_debug", feature = "std"))]
dbg!("PAGE: {}", *ret.page());
}
ret
}
pub fn mark_save_to_unmap(&mut self) {
unsafe {
ptr::write_volatile(ptr::addr_of_mut!((*self.page_mut()).save_to_unmap), 1);
}
}
pub unsafe fn page_mut(&mut self) -> *mut LlmpPage {
shmem2page_mut(&mut self.shmem)
}
pub unsafe fn page(&self) -> *const LlmpPage {
shmem2page(&self.shmem)
}
#[allow(clippy::cast_sign_loss)]
pub unsafe fn msg_to_offset(&self, msg: *const LlmpMsg) -> Result<u64, Error> {
let page = self.page();
if llmp_msg_in_page(page, msg) {
Ok((msg as *const u8).offset_from((*page).messages.as_ptr() as *const u8) as u64)
} else {
Err(Error::IllegalArgument(format!(
"Message (0x{:X}) not in page (0x{:X})",
page as u64, msg as u64
)))
}
}
#[cfg(feature = "std")]
pub fn msg_from_env(&mut self, map_env_name: &str) -> Result<*mut LlmpMsg, Error> {
match msg_offset_from_env(map_env_name)? {
Some(offset) => self.msg_from_offset(offset),
None => Ok(ptr::null_mut()),
}
}
#[cfg(feature = "std")]
pub unsafe fn msg_to_env(&self, msg: *const LlmpMsg, map_env_name: &str) -> Result<(), Error> {
if msg.is_null() {
env::set_var(&format!("{}_OFFSET", map_env_name), _NULL_ENV_STR)
} else {
env::set_var(
&format!("{}_OFFSET", map_env_name),
format!("{}", self.msg_to_offset(msg)?),
)
};
Ok(())
}
pub fn msg_from_offset(&mut self, offset: u64) -> Result<*mut LlmpMsg, Error> {
let offset = offset as usize;
unsafe {
let page = self.page_mut();
let page_size = self.shmem.map().len() - size_of::<LlmpPage>();
if offset > page_size {
Err(Error::IllegalArgument(format!(
"Msg offset out of bounds (size: {}, requested offset: {})",
page_size, offset
)))
} else {
Ok(((*page).messages.as_mut_ptr() as *mut u8).add(offset) as *mut LlmpMsg)
}
}
}
}
#[derive(Debug)]
pub struct LlmpBroker<SP>
where
SP: ShMemProvider + 'static,
{
pub llmp_out: LlmpSender<SP>,
pub llmp_clients: Vec<LlmpReceiver<SP>>,
socket_name: Option<String>,
shutting_down: bool,
shmem_provider: SP,
}
#[cfg(unix)]
pub struct LlmpBrokerSignalHandler {
shutting_down: bool,
}
#[cfg(unix)]
impl Handler for LlmpBrokerSignalHandler {
fn handle(&mut self, _signal: Signal, _info: siginfo_t, _context: &mut ucontext_t) {
unsafe { ptr::write_volatile(&mut self.shutting_down, true) };
}
fn signals(&self) -> Vec<Signal> {
vec![Signal::SigTerm, Signal::SigInterrupt, Signal::SigQuit]
}
}
impl<SP> LlmpBroker<SP>
where
SP: ShMemProvider + 'static,
{
pub fn new(mut shmem_provider: SP) -> Result<Self, Error> {
Ok(LlmpBroker {
llmp_out: LlmpSender {
id: 0,
last_msg_sent: ptr::null_mut(),
out_maps: vec![LlmpSharedMap::new(
0,
shmem_provider.new_map(new_map_size(0))?,
)],
keep_pages_forever: true,
shmem_provider: shmem_provider.clone(),
},
llmp_clients: vec![],
socket_name: None,
shutting_down: false,
shmem_provider,
})
}
unsafe fn alloc_next(&mut self, buf_len: usize) -> Result<*mut LlmpMsg, Error> {
self.llmp_out.alloc_next(buf_len)
}
pub fn register_client(&mut self, mut client_page: LlmpSharedMap<SP::Mem>) {
client_page.mark_save_to_unmap();
let id = self.llmp_clients.len() as u32;
self.llmp_clients.push(LlmpReceiver {
id,
current_recv_map: client_page,
last_msg_recvd: ptr::null_mut(),
shmem_provider: self.shmem_provider.clone(),
});
}
#[cfg(feature = "std")]
pub fn connect_b2b<A>(&mut self, addr: A) -> Result<(), Error>
where
A: ToSocketAddrs,
{
let mut stream = TcpStream::connect(addr)?;
println!("B2B: Connected to {:?}", stream);
match (&recv_tcp_msg(&mut stream)?).try_into()? {
TcpResponse::BrokerConnectHello {
broker_map_description: _,
hostname,
} => println!("B2B: Connected to {}", hostname),
_ => {
return Err(Error::IllegalState(
"Unexpected response from B2B server received.".to_string(),
))
}
};
let hostname = hostname::get()
.unwrap_or_else(|_| "<unknown>".into())
.to_string_lossy()
.into();
send_tcp_msg(&mut stream, &TcpRequest::RemoteBrokerHello { hostname })?;
let broker_id = match (&recv_tcp_msg(&mut stream)?).try_into()? {
TcpResponse::RemoteBrokerAccepted { broker_id } => {
println!("B2B: Got Connection Ack, broker_id {}", broker_id);
broker_id
}
_ => {
return Err(Error::IllegalState(
"Unexpected response from B2B server received.".to_string(),
));
}
};
println!("B2B: We are broker {}", broker_id);
let map_description = Self::b2b_thread_on(
stream,
&self.shmem_provider,
self.llmp_clients.len() as ClientId,
&self.llmp_out.out_maps.first().unwrap().shmem.description(),
)?;
let new_map =
LlmpSharedMap::existing(self.shmem_provider.from_description(map_description)?);
{
self.register_client(new_map);
}
Ok(())
}
unsafe fn forward_msg(&mut self, msg: *mut LlmpMsg) -> Result<(), Error> {
let mut out: *mut LlmpMsg = self.alloc_next((*msg).buf_len_padded as usize)?;
let actual_size = (*out).buf_len_padded;
let complete_size = actual_size as usize + size_of::<LlmpMsg>();
(msg as *const u8).copy_to_nonoverlapping(out as *mut u8, complete_size);
(*out).buf_len_padded = actual_size;
if let Err(e) = self.llmp_out.send(out) {
panic!("Error sending msg: {:?}", e)
};
self.llmp_out.last_msg_sent = out;
Ok(())
}
#[inline]
pub fn once<F>(&mut self, on_new_msg: &mut F) -> Result<(), Error>
where
F: FnMut(ClientId, Tag, Flags, &[u8]) -> Result<LlmpMsgHookResult, Error>,
{
compiler_fence(Ordering::SeqCst);
for i in 0..self.llmp_clients.len() {
unsafe {
self.handle_new_msgs(i as u32, on_new_msg)?;
}
}
Ok(())
}
#[inline]
#[cfg(unix)]
#[allow(clippy::unused_self)]
fn is_shutting_down(&self) -> bool {
unsafe { ptr::read_volatile(&GLOBAL_SIGHANDLER_STATE.shutting_down) }
}
#[inline]
#[cfg(not(unix))]
fn is_shutting_down(&self) -> bool {
false
}
pub fn loop_forever<F>(&mut self, on_new_msg: &mut F, sleep_time: Option<Duration>)
where
F: FnMut(ClientId, Tag, Flags, &[u8]) -> Result<LlmpMsgHookResult, Error>,
{
#[cfg(unix)]
if let Err(_e) = unsafe { setup_signal_handler(&mut GLOBAL_SIGHANDLER_STATE) } {
#[cfg(feature = "std")]
println!("Failed to setup signal handlers: {}", _e);
}
while !self.is_shutting_down() {
compiler_fence(Ordering::SeqCst);
self.once(on_new_msg)
.expect("An error occurred when brokering. Exiting.");
#[cfg(feature = "std")]
if let Some(time) = sleep_time {
thread::sleep(time)
};
#[cfg(not(feature = "std"))]
match sleep_time {
Some(_) => {
panic!("Cannot sleep on no_std platform");
}
None => (),
}
}
self.llmp_out
.send_buf(LLMP_TAG_EXITING, &[])
.expect("Error when shutting down broker: Could not send LLMP_TAG_EXITING msg.");
}
pub fn send_buf(&mut self, tag: Tag, buf: &[u8]) -> Result<(), Error> {
self.llmp_out.send_buf(tag, buf)
}
pub fn send_buf_with_flags(&mut self, tag: Tag, flags: Flags, buf: &[u8]) -> Result<(), Error> {
self.llmp_out.send_buf_with_flags(tag, flags, buf)
}
#[cfg(feature = "std")]
pub fn launch_tcp_listener_on(&mut self, port: u16) -> Result<thread::JoinHandle<()>, Error> {
let listener = TcpListener::bind(format!("{}:{}", _LLMP_BIND_ADDR, port))?;
println!("Server listening on port {}", port);
self.launch_listener(Listener::Tcp(listener))
}
#[allow(dead_code)]
fn announce_new_client(
sender: &mut LlmpSender<SP>,
shmem_description: &ShMemDescription,
) -> Result<(), Error> {
unsafe {
let msg = sender
.alloc_next(size_of::<LlmpPayloadSharedMapInfo>())
.expect("Could not allocate a new message in shared map.");
(*msg).tag = LLMP_TAG_NEW_SHM_CLIENT;
let pageinfo = (*msg).buf.as_mut_ptr() as *mut LlmpPayloadSharedMapInfo;
(*pageinfo).shm_str = *shmem_description.id.as_slice();
(*pageinfo).map_size = shmem_description.size;
sender.send(msg)
}
}
#[cfg(feature = "std")]
#[allow(clippy::let_and_return)]
fn b2b_thread_on(
mut stream: TcpStream,
shmem_provider: &SP,
b2b_client_id: ClientId,
broker_map_description: &ShMemDescription,
) -> Result<ShMemDescription, Error> {
let broker_map_description = *broker_map_description;
let mut shmem_provider_clone = shmem_provider.clone();
let (send, recv) = channel();
thread::spawn(move || {
shmem_provider_clone.post_fork();
#[cfg(fature = "llmp_debug")]
println!("B2b: Spawned proxy thread");
stream
.set_read_timeout(Some(_LLMP_B2B_BLOCK_TIME))
.expect("Failed to set tcp stream timeout");
let mut new_sender =
match LlmpSender::new(shmem_provider_clone.clone(), b2b_client_id, false) {
Ok(new_sender) => new_sender,
Err(e) => {
panic!("B2B: Could not map shared map: {}", e);
}
};
send.send(new_sender.out_maps.first().unwrap().shmem.description())
.expect("B2B: Error sending map description to channel!");
let mut local_receiver = LlmpReceiver::on_existing_from_description(
shmem_provider_clone,
&LlmpDescription {
last_message_offset: None,
shmem: broker_map_description,
},
)
.expect("Failed to map local page in broker 2 broker thread!");
#[cfg(all(feature = "llmp_debug", feature = "std"))]
dbg!("B2B: Starting proxy loop :)");
loop {
while let Some((client_id, tag, flags, payload)) = local_receiver
.recv_buf_with_flags()
.expect("Error reading from local page!")
{
if client_id == b2b_client_id {
dbg!("Ignored message we probably sent earlier (same id)", tag);
continue;
}
#[cfg(all(feature = "llmp_debug", feature = "std"))]
dbg!(
"Fowarding message via broker2broker connection",
payload.len()
);
send_tcp_msg(
&mut stream,
&TcpRemoteNewMessage {
client_id,
tag,
flags,
payload: payload.to_vec(),
},
)
.expect("Error sending message via broker 2 broker");
}
if let Ok(val) = recv_tcp_msg(&mut stream) {
let msg: TcpRemoteNewMessage = (&val).try_into().expect(
"Illegal message received from broker 2 broker connection - shutting down.",
);
#[cfg(all(feature = "llmp_debug", feature = "std"))]
dbg!(
"Fowarding incoming message from broker2broker connection",
msg.payload.len()
);
new_sender
.send_buf_with_flags(msg.tag, msg.flags | LLMP_FLAG_FROM_B2B, &msg.payload)
.expect("B2B: Error forwarding message. Exiting.");
} else {
#[cfg(all(feature = "llmp_debug", feature = "std"))]
dbg!("Received no input, timeout or closed. Looping back up :)");
}
}
});
let ret = recv.recv().map_err(|_| {
Error::Unknown("Error launching background thread for b2b communcation".to_string())
});
#[cfg(all(feature = "llmp_debug", feature = "std"))]
dbg!("B2B: returning from loop. Success: {}", ret.is_ok());
ret
}
#[cfg(feature = "std")]
fn handle_tcp_request(
mut stream: TcpStream,
request: &TcpRequest,
current_client_id: &mut u32,
sender: &mut LlmpSender<SP>,
shmem_provider: &SP,
broker_map_description: &ShMemDescription,
) {
match request {
TcpRequest::LocalClientHello { shmem_description } => {
match Self::announce_new_client(sender, shmem_description) {
Ok(()) => (),
Err(e) => println!("Error forwarding client on map: {:?}", e),
};
if let Err(e) = send_tcp_msg(
&mut stream,
&TcpResponse::LocalClientAccepted {
client_id: *current_client_id,
},
) {
println!("An error occurred sending via tcp {}", e);
};
*current_client_id += 1;
}
TcpRequest::RemoteBrokerHello { hostname } => {
println!("B2B new client: {}", hostname);
if send_tcp_msg(
&mut stream,
&TcpResponse::RemoteBrokerAccepted {
broker_id: *current_client_id,
},
)
.is_err()
{
println!("Error accepting broker, ignoring.");
return;
}
if let Ok(shmem_description) = Self::b2b_thread_on(
stream,
shmem_provider,
*current_client_id,
&broker_map_description,
) {
if Self::announce_new_client(sender, &shmem_description).is_err() {
println!("B2B: Error announcing client {:?}", shmem_description);
};
*current_client_id += 1;
}
}
};
}
#[cfg(feature = "std")]
pub fn launch_listener(&mut self, listener: Listener) -> Result<thread::JoinHandle<()>, Error> {
let client_out_map_mem = &self.llmp_out.out_maps.first().unwrap().shmem;
let broker_map_description = client_out_map_mem.description();
let hostname = hostname::get()
.unwrap_or_else(|_| "<unknown>".into())
.to_string_lossy()
.into();
let broker_hello = TcpResponse::BrokerConnectHello {
broker_map_description,
hostname,
};
let llmp_tcp_id = self.llmp_clients.len() as ClientId;
let tcp_out_map = LlmpSharedMap::new(
llmp_tcp_id,
self.shmem_provider.new_map(LLMP_CFG_INITIAL_MAP_SIZE)?,
);
let tcp_out_map_description = tcp_out_map.shmem.description();
self.register_client(tcp_out_map);
let mut shmem_provider_clone = self.shmem_provider.clone();
Ok(thread::spawn(move || {
shmem_provider_clone.post_fork();
let mut current_client_id = llmp_tcp_id + 1;
let mut tcp_incoming_sender = LlmpSender {
id: llmp_tcp_id,
last_msg_sent: ptr::null_mut(),
out_maps: vec![LlmpSharedMap::existing(
shmem_provider_clone
.from_description(tcp_out_map_description)
.unwrap(),
)],
keep_pages_forever: false,
shmem_provider: shmem_provider_clone.clone(),
};
loop {
match listener.accept() {
ListenerStream::Tcp(mut stream, addr) => {
dbg!("New connection", addr, stream.peer_addr().unwrap());
match send_tcp_msg(&mut stream, &broker_hello) {
Ok(()) => {}
Err(e) => {
dbg!("Error sending initial hello: {:?}", e);
continue;
}
}
let buf = match recv_tcp_msg(&mut stream) {
Ok(buf) => buf,
Err(e) => {
dbg!("Error receving from tcp", e);
continue;
}
};
let req = match (&buf).try_into() {
Ok(req) => req,
Err(e) => {
dbg!("Could not deserialize tcp message", e);
continue;
}
};
Self::handle_tcp_request(
stream,
&req,
&mut current_client_id,
&mut tcp_incoming_sender,
&shmem_provider_clone,
&broker_map_description,
);
}
ListenerStream::Empty() => {
continue;
}
};
}
}))
}
#[inline]
unsafe fn handle_new_msgs<F>(&mut self, client_id: u32, on_new_msg: &mut F) -> Result<(), Error>
where
F: FnMut(ClientId, Tag, Flags, &[u8]) -> Result<LlmpMsgHookResult, Error>,
{
let mut next_id = self.llmp_clients.len() as u32;
loop {
let msg = {
let client = &mut self.llmp_clients[client_id as usize];
match client.recv()? {
None => {
return Ok(());
}
Some(msg) => msg,
}
};
if (*msg).tag == LLMP_TAG_NEW_SHM_CLIENT {
let msg_buf_len_padded = (*msg).buf_len_padded;
if (*msg).buf_len < size_of::<LlmpPayloadSharedMapInfo>() as u64 {
#[cfg(feature = "std")]
println!("Ignoring broken CLIENT_ADDED msg due to incorrect size. Expected {} but got {}",
msg_buf_len_padded,
size_of::<LlmpPayloadSharedMapInfo>()
);
#[cfg(not(feature = "std"))]
return Err(Error::Unknown(format!("Broken CLIENT_ADDED msg with incorrect size received. Expected {} but got {}",
msg_buf_len_padded,
size_of::<LlmpPayloadSharedMapInfo>()
)));
} else {
let pageinfo = (*msg).buf.as_mut_ptr() as *mut LlmpPayloadSharedMapInfo;
match self.shmem_provider.from_id_and_size(
ShMemId::from_slice(&(*pageinfo).shm_str),
(*pageinfo).map_size,
) {
Ok(new_map) => {
let mut new_page = LlmpSharedMap::existing(new_map);
let id = next_id;
next_id += 1;
new_page.mark_save_to_unmap();
self.llmp_clients.push(LlmpReceiver {
id,
current_recv_map: new_page,
last_msg_recvd: ptr::null_mut(),
shmem_provider: self.shmem_provider.clone(),
});
}
Err(e) => {
#[cfg(feature = "std")]
println!("Error adding client! Ignoring: {:?}", e);
#[cfg(not(feature = "std"))]
return Err(Error::Unknown(format!(
"Error adding client! PANIC! {:?}",
e
)));
}
};
}
} else {
let mut should_forward_msg = true;
let map = &mut self.llmp_clients[client_id as usize].current_recv_map;
let msg_buf = (*msg).as_slice(map)?;
if let LlmpMsgHookResult::Handled =
(on_new_msg)(client_id, (*msg).tag, (*msg).flags, msg_buf)?
{
should_forward_msg = false
};
if should_forward_msg {
self.forward_msg(msg)?;
}
}
}
}
}
#[derive(Clone, Copy, Debug, Serialize, Deserialize)]
pub struct LlmpClientDescription {
sender: LlmpDescription,
receiver: LlmpDescription,
}
#[derive(Debug)]
pub struct LlmpClient<SP>
where
SP: ShMemProvider,
{
shmem_provider: SP,
pub sender: LlmpSender<SP>,
pub receiver: LlmpReceiver<SP>,
}
impl<SP> LlmpClient<SP>
where
SP: ShMemProvider,
{
#[allow(clippy::needless_pass_by_value)]
pub fn on_existing_map(
shmem_provider: SP,
_current_out_map: SP::Mem,
_last_msg_sent_offset: Option<u64>,
current_broker_map: SP::Mem,
last_msg_recvd_offset: Option<u64>,
) -> Result<Self, Error> {
Ok(Self {
receiver: LlmpReceiver::on_existing_map(
shmem_provider.clone(),
current_broker_map.clone(),
last_msg_recvd_offset,
)?,
sender: LlmpSender::on_existing_map(
shmem_provider.clone(),
current_broker_map,
last_msg_recvd_offset,
)?,
shmem_provider,
})
}
#[cfg(feature = "std")]
pub fn on_existing_from_env(shmem_provider: SP, env_name: &str) -> Result<Self, Error> {
Ok(Self {
sender: LlmpSender::on_existing_from_env(
shmem_provider.clone(),
&format!("{}_SENDER", env_name),
)?,
receiver: LlmpReceiver::on_existing_from_env(
shmem_provider.clone(),
&format!("{}_RECEIVER", env_name),
)?,
shmem_provider,
})
}
#[cfg(feature = "std")]
pub fn to_env(&self, env_name: &str) -> Result<(), Error> {
self.sender.to_env(&format!("{}_SENDER", env_name))?;
self.receiver.to_env(&format!("{}_RECEIVER", env_name))
}
fn describe(&self) -> Result<LlmpClientDescription, Error> {
Ok(LlmpClientDescription {
sender: self.sender.describe()?,
receiver: self.receiver.describe()?,
})
}
fn existing_client_from_description(
shmem_provider: SP,
description: &LlmpClientDescription,
) -> Result<Self, Error> {
Ok(Self {
sender: LlmpSender::on_existing_from_description(
shmem_provider.clone(),
&description.sender,
)?,
receiver: LlmpReceiver::on_existing_from_description(
shmem_provider.clone(),
&description.receiver,
)?,
shmem_provider,
})
}
pub fn await_save_to_unmap_blocking(&self) {
self.sender.await_save_to_unmap_blocking();
}
pub fn save_to_unmap(&self) -> bool {
self.sender.save_to_unmap()
}
pub fn new(
mut shmem_provider: SP,
initial_broker_map: LlmpSharedMap<SP::Mem>,
) -> Result<Self, Error> {
Ok(Self {
sender: LlmpSender {
id: 0,
last_msg_sent: ptr::null_mut(),
out_maps: vec![LlmpSharedMap::new(0, {
shmem_provider.new_map(LLMP_CFG_INITIAL_MAP_SIZE)?
})],
keep_pages_forever: false,
shmem_provider: shmem_provider.clone(),
},
receiver: LlmpReceiver {
id: 0,
current_recv_map: initial_broker_map,
last_msg_recvd: ptr::null_mut(),
shmem_provider: shmem_provider.clone(),
},
shmem_provider,
})
}
pub unsafe fn send(&mut self, msg: *mut LlmpMsg) -> Result<(), Error> {
self.sender.send(msg)
}
pub fn send_buf(&mut self, tag: Tag, buf: &[u8]) -> Result<(), Error> {
self.sender.send_buf(tag, buf)
}
pub fn send_buf_with_flags(&mut self, tag: Tag, flags: Flags, buf: &[u8]) -> Result<(), Error> {
self.sender.send_buf_with_flags(tag, flags, buf)
}
pub fn send_client_added_msg(
&mut self,
shm_str: &[u8; 20],
shm_id: usize,
) -> Result<(), Error> {
unsafe {
let msg = self
.alloc_next(size_of::<LlmpPayloadSharedMapInfo>())
.expect("Could not allocate a new message in shared map.");
(*msg).tag = LLMP_TAG_NEW_SHM_CLIENT;
let pageinfo = (*msg).buf.as_mut_ptr() as *mut LlmpPayloadSharedMapInfo;
(*pageinfo).shm_str = *shm_str;
(*pageinfo).map_size = shm_id;
self.send(msg)
}
}
#[inline]
pub unsafe fn recv(&mut self) -> Result<Option<*mut LlmpMsg>, Error> {
self.receiver.recv()
}
#[inline]
pub unsafe fn recv_blocking(&mut self) -> Result<*mut LlmpMsg, Error> {
self.receiver.recv_blocking()
}
#[inline]
pub unsafe fn alloc_next(&mut self, buf_len: usize) -> Result<*mut LlmpMsg, Error> {
self.sender.alloc_next(buf_len)
}
#[allow(clippy::type_complexity)]
#[inline]
pub fn recv_buf(&mut self) -> Result<Option<(u32, Tag, &[u8])>, Error> {
self.receiver.recv_buf()
}
#[inline]
pub fn recv_buf_blocking(&mut self) -> Result<(u32, Tag, &[u8]), Error> {
self.receiver.recv_buf_blocking()
}
#[allow(clippy::type_complexity)]
pub fn recv_buf_with_flags(&mut self) -> Result<Option<(ClientId, Tag, Flags, &[u8])>, Error> {
self.receiver.recv_buf_with_flags()
}
#[cfg(feature = "std")]
pub fn create_using_env(mut shmem_provider: SP, env_var: &str) -> Result<Self, Error> {
let map = LlmpSharedMap::existing(shmem_provider.existing_from_env(env_var)?);
Self::new(shmem_provider, map)
}
#[cfg(feature = "std")]
pub fn create_attach_to_tcp(mut shmem_provider: SP, port: u16) -> Result<Self, Error> {
let mut stream = TcpStream::connect(format!("{}:{}", _LLMP_BIND_ADDR, port))?;
println!("Connected to port {}", port);
let broker_map_description = if let TcpResponse::BrokerConnectHello {
broker_map_description,
hostname: _,
} = (&recv_tcp_msg(&mut stream)?).try_into()?
{
broker_map_description
} else {
return Err(Error::IllegalState(
"Received unexpected Broker Hello".to_string(),
));
};
let map = LlmpSharedMap::existing(shmem_provider.from_description(broker_map_description)?);
let mut ret = Self::new(shmem_provider, map)?;
let client_hello_req = TcpRequest::LocalClientHello {
shmem_description: ret.sender.out_maps.first().unwrap().shmem.description(),
};
send_tcp_msg(&mut stream, &client_hello_req)?;
let client_id = if let TcpResponse::LocalClientAccepted { client_id } =
(&recv_tcp_msg(&mut stream)?).try_into()?
{
client_id
} else {
return Err(Error::IllegalState(
"Unexpected Response from Broker".to_string(),
));
};
ret.sender.id = client_id;
unsafe {
(*ret.sender.out_maps.first_mut().unwrap().page_mut()).sender = client_id;
}
Ok(ret)
}
}
#[cfg(test)]
#[cfg(all(unix, feature = "std"))]
mod tests {
use std::{thread::sleep, time::Duration};
use super::{
LlmpClient,
LlmpConnection::{self, IsBroker, IsClient},
LlmpMsgHookResult::ForwardToClients,
Tag,
};
use crate::bolts::shmem::{ShMemProvider, StdShMemProvider};
#[test]
pub fn llmp_connection() {
let shmem_provider = StdShMemProvider::new().unwrap();
let mut broker = match LlmpConnection::on_port(shmem_provider.clone(), 1337).unwrap() {
IsClient { client: _ } => panic!("Could not bind to port as broker"),
IsBroker { broker } => broker,
};
let mut client = match LlmpConnection::on_port(shmem_provider.clone(), 1337).unwrap() {
IsBroker { broker: _ } => panic!("Second connect should be a client!"),
IsClient { client } => client,
};
sleep(Duration::from_millis(100));
broker
.once(&mut |_sender_id, _tag, _flags, _msg| Ok(ForwardToClients))
.unwrap();
let tag: Tag = 0x1337;
let arr: [u8; 1] = [1u8];
client.send_buf(tag, &arr).unwrap();
client.to_env("_ENV_TEST").unwrap();
#[cfg(all(feature = "llmp_debug", feature = "std"))]
dbg!(std::env::vars());
for (key, value) in std::env::vars_os() {
println!("{:?}: {:?}", key, value);
}
client = LlmpClient::on_existing_from_env(shmem_provider, "_ENV_TEST").unwrap();
client.send_buf(tag, &arr).unwrap();
broker
.once(&mut |_sender_id, _tag, _flags, _msg| Ok(ForwardToClients))
.unwrap();
let (_sender_id, tag2, arr2) = client.recv_buf_blocking().unwrap();
assert_eq!(tag, tag2);
assert_eq!(arr[0], arr2[0]);
assert_eq!(broker.llmp_clients.len(), 2);
}
}