use std::collections::{HashMap, HashSet};
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::{Arc, RwLock};
use futures_util::{SinkExt, StreamExt};
use tokio::net::TcpListener;
use tokio::sync::mpsc::Sender;
use tokio_tungstenite::tungstenite::Message;
use crate::coord::EnuConverter;
use crate::net::{ClientMsg, ServerMsg, ZoneEvent, decode, encode, encode_snapshot};
use crate::octree::OctreeNode;
use crate::store::{ZoneDiff, ZoneStore};
use crate::zone::ZoneEntry;
pub type SharedOctree = Arc<RwLock<OctreeNode>>;
pub type SharedStore = Arc<RwLock<ZoneStore>>;
#[derive(Debug, Clone, Copy)]
pub struct HysteresisConfig {
pub enter_ticks: u8,
pub exit_ticks: u8,
}
impl Default for HysteresisConfig {
fn default() -> Self {
Self { enter_ticks: 2, exit_ticks: 2 }
}
}
fn step_hysteresis(
current: &[u32],
active_zones: &mut Vec<u32>,
tick_count: &mut HashMap<u32, i16>,
config: &HysteresisConfig,
) -> Vec<(u32, ZoneEvent)> {
let mut events = Vec::new();
let enter = config.enter_ticks as i16;
let exit = config.exit_ticks as i16;
let current_set: HashSet<u32> = current.iter().copied().collect();
for &id in current {
if active_zones.contains(&id) {
tick_count.insert(id, 0);
} else {
let c = tick_count.entry(id).or_insert(0);
*c += 1;
if *c >= enter {
events.push((id, ZoneEvent::Enter));
active_zones.push(id);
*c = 0;
}
}
}
let mut to_exit = Vec::new();
for &id in active_zones.iter() {
if !current_set.contains(&id) {
let c = tick_count.entry(id).or_insert(0);
*c -= 1;
if *c <= -exit {
events.push((id, ZoneEvent::Exit));
to_exit.push(id);
}
}
}
for id in to_exit {
active_zones.retain(|&z| z != id);
tick_count.remove(&id);
}
tick_count.retain(|id, _| active_zones.contains(id) || current_set.contains(id));
events
}
pub struct ZoneServer {
pub store: SharedStore,
pub octree: SharedOctree,
pub conv: Arc<EnuConverter>,
pub hysteresis: HysteresisConfig,
subscribers: RwLock<HashMap<u32, Sender<Vec<u8>>>>,
seq: AtomicU32,
next_entity: AtomicU32,
}
impl ZoneServer {
pub fn new(store: SharedStore, octree: SharedOctree, conv: Arc<EnuConverter>) -> Self {
Self::new_with_hysteresis(store, octree, conv, HysteresisConfig::default())
}
pub fn new_with_hysteresis(
store: SharedStore,
octree: SharedOctree,
conv: Arc<EnuConverter>,
config: HysteresisConfig,
) -> Self {
Self {
store,
octree,
conv,
hysteresis: config,
subscribers: RwLock::new(HashMap::new()),
seq: AtomicU32::new(0),
next_entity: AtomicU32::new(1),
}
}
fn next_seq(&self) -> u16 {
self.seq.fetch_add(1, Ordering::Relaxed) as u16
}
fn alloc_entity(&self) -> u32 {
self.next_entity.fetch_add(1, Ordering::Relaxed)
}
pub fn broadcast(&self, diffs: Vec<ZoneDiff>) {
let msg = encode(&ServerMsg::ZoneBatch { seq: self.next_seq(), diffs });
for tx in self.subscribers.read().unwrap().values() {
let _ = tx.try_send(msg.clone());
}
}
pub fn add_zone_broadcast(&self, entry: ZoneEntry) {
let diff = ZoneDiff::Add(entry.clone());
self.store.write().unwrap().add_zone(entry.id, &entry.zone, &self.conv);
self.broadcast(vec![diff]);
}
pub fn remove_zone_broadcast(&self, id: u32) {
self.store.write().unwrap().remove(id);
self.broadcast(vec![ZoneDiff::Remove { id }]);
}
pub async fn listen(self: Arc<Self>, addr: &str) {
let listener = TcpListener::bind(addr).await.unwrap();
loop {
if let Ok((stream, _)) = listener.accept().await {
let srv = self.clone();
tokio::spawn(async move { srv.handle_connection(stream).await });
}
}
}
async fn handle_connection(self: Arc<Self>, stream: tokio::net::TcpStream) {
let ws = match tokio_tungstenite::accept_async(stream).await {
Ok(ws) => ws,
Err(_) => return,
};
let (mut ws_tx, mut ws_rx) = ws.split();
let entity_id = self.alloc_entity();
let (out_tx, mut out_rx) = tokio::sync::mpsc::channel::<Vec<u8>>(256);
self.subscribers.write().unwrap().insert(entity_id, out_tx);
let subs = self.subscribers.read().unwrap().get(&entity_id).unwrap().clone();
let pump = tokio::spawn(async move {
while let Some(bytes) = out_rx.recv().await {
if ws_tx.send(Message::Binary(bytes.into())).await.is_err() {
break;
}
}
});
let mut last_pos: [f64; 3] = [0.0, 0.0, 0.0];
let mut last_ts: u32 = 0;
let mut _last_vel: [f64; 3] = [0.0, 0.0, 0.0];
let mut active_zones: Vec<u32> = vec![];
let mut tick_count: HashMap<u32, i16> = HashMap::new();
while let Some(Ok(msg)) = ws_rx.next().await {
let bytes = match msg {
Message::Binary(b) => b,
Message::Close(_) => break,
_ => continue,
};
let Some(client_msg) = decode::<ClientMsg>(&bytes) else {
continue;
};
let (enu, ts) = match client_msg {
ClientMsg::FullPos { pos, ts_ms, .. } => {
let e = [pos[0] as f64, pos[1] as f64, pos[2] as f64];
let dt = (ts_ms.wrapping_sub(last_ts)) as f64 / 1000.0;
if dt > 0.0 {
_last_vel = std::array::from_fn(|i| (e[i] - last_pos[i]) / dt);
}
last_ts = ts_ms;
last_pos = e;
(e, ts_ms)
}
ClientMsg::DeltaPos { dx, dy, dz, dt_ms, .. } => {
let e = [
last_pos[0] + dx as f64 / 1000.0,
last_pos[1] + dy as f64 / 1000.0,
last_pos[2] + dz as f64 / 1000.0,
];
let ts = last_ts.wrapping_add(dt_ms as u32);
last_pos = e;
last_ts = ts;
(e, ts)
}
ClientMsg::Stationary { duration_ms, .. } => {
let ts = last_ts.wrapping_add(duration_ms as u32);
last_ts = ts;
(last_pos, ts)
}
ClientMsg::RequestSnapshot => {
let entries: Vec<ZoneEntry> = vec![];
let snap_bytes = encode_snapshot(&entries);
let _ = subs.try_send(snap_bytes);
continue;
}
ClientMsg::Ping { seq } => {
let pong = encode(&ServerMsg::Pong {
seq,
server_ts_ms: epoch_ms() as u32,
});
let _ = subs.try_send(pong);
continue;
}
ClientMsg::Ack { .. } => continue,
};
let current = self.store.read().unwrap().query_enu(enu);
let events =
step_hysteresis(¤t, &mut active_zones, &mut tick_count, &self.hysteresis);
for (zone_id, event) in events {
let _ = subs.try_send(encode(&ServerMsg::EntityEvent {
entity_id,
event,
zone_id,
ts_ms: ts,
}));
}
}
self.subscribers.write().unwrap().remove(&entity_id);
pump.abort();
}
}
fn epoch_ms() -> u64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as u64
}
pub struct AtomicPos {
xy: std::sync::atomic::AtomicU64,
z: std::sync::atomic::AtomicU64,
}
impl AtomicPos {
pub fn new(x: f32, y: f32, z: f32) -> Self {
let xy = ((x.to_bits() as u64) << 32) | y.to_bits() as u64;
let z_val = (z.to_bits() as u64) << 32;
Self {
xy: std::sync::atomic::AtomicU64::new(xy),
z: std::sync::atomic::AtomicU64::new(z_val),
}
}
pub fn store(&self, x: f32, y: f32, z: f32) {
let xy = ((x.to_bits() as u64) << 32) | y.to_bits() as u64;
self.xy.store(xy, Ordering::Release);
self.z.store((z.to_bits() as u64) << 32, Ordering::Release);
}
pub fn load(&self) -> [f32; 3] {
let xy = self.xy.load(Ordering::Acquire);
let z = self.z.load(Ordering::Acquire);
[
f32::from_bits((xy >> 32) as u32),
f32::from_bits(xy as u32),
f32::from_bits((z >> 32) as u32),
]
}
pub fn load_f64(&self) -> [f64; 3] {
let p = self.load();
[p[0] as f64, p[1] as f64, p[2] as f64]
}
}
impl Default for AtomicPos {
fn default() -> Self {
Self::new(0.0, 0.0, 0.0)
}
}
const DEFAULT_SHARDS: usize = 16;
pub struct ShardedZoneServer {
shards: Vec<Arc<ZoneServer>>,
}
impl ShardedZoneServer {
pub fn new(
num_shards: usize,
conv: Arc<EnuConverter>,
world_half: f64,
) -> Self {
let shards = (0..num_shards)
.map(|_| {
let store = Arc::new(RwLock::new(ZoneStore::from_entries(&[], &*conv)));
let octree = Arc::new(RwLock::new(OctreeNode::new([0.0; 3], world_half)));
Arc::new(ZoneServer::new(store, octree, conv.clone()))
})
.collect();
Self { shards }
}
pub fn new_default(conv: Arc<EnuConverter>, world_half: f64) -> Self {
Self::new(DEFAULT_SHARDS, conv, world_half)
}
pub fn shard_for(&self, entity_id: u32) -> &Arc<ZoneServer> {
&self.shards[entity_id as usize % self.shards.len()]
}
pub fn broadcast_all(&self, diffs: Vec<ZoneDiff>) {
for shard in &self.shards {
shard.broadcast(diffs.clone());
}
}
pub fn add_zone_all(&self, entry: ZoneEntry) {
for shard in &self.shards {
shard.store.write().unwrap().add_zone(entry.id, &entry.zone, &shard.conv);
}
self.broadcast_all(vec![ZoneDiff::Add(entry)]);
}
pub fn remove_zone_all(&self, id: u32) {
for shard in &self.shards {
shard.store.write().unwrap().remove(id);
}
self.broadcast_all(vec![ZoneDiff::Remove { id }]);
}
pub fn shard_count(&self) -> usize {
self.shards.len()
}
}
#[cfg(all(feature = "io_uring", target_os = "linux"))]
pub async fn run_io_uring(addr: &str, server: Arc<ZoneServer>) -> std::io::Result<()> {
let socket_addr: std::net::SocketAddr = addr
.parse()
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidInput, e))?;
let listener = tokio_uring::net::TcpListener::bind(socket_addr)?;
loop {
let (stream, _peer) = listener.accept().await?;
let srv = server.clone();
tokio_uring::spawn(async move {
let _ = handle_io_uring_connection(stream, srv).await;
});
}
}
#[cfg(all(feature = "io_uring", target_os = "linux"))]
async fn write_frame_io_uring(
stream: &tokio_uring::net::TcpStream,
payload: &[u8],
) -> std::io::Result<()> {
let mut frame = (payload.len() as u32).to_le_bytes().to_vec();
frame.extend_from_slice(payload);
let (result, _) = stream.write_all(frame).await;
result
}
#[cfg(all(feature = "io_uring", target_os = "linux"))]
async fn handle_io_uring_connection(
stream: tokio_uring::net::TcpStream,
server: Arc<ZoneServer>,
) -> std::io::Result<()> {
use crate::net::{ClientMsg, ServerMsg, decode, encode};
let entity_id = server.alloc_entity();
let mut active_zones: Vec<u32> = Vec::new();
let mut tick_counts: HashMap<u32, i16> = HashMap::new();
let mut last_pos: [f64; 3] = [0.0; 3];
let mut last_ts: u32 = 0;
let mut frame_buf: Vec<u8> = Vec::with_capacity(4096);
let mut read_buf = vec![0u8; 4096];
loop {
let (result, buf) = stream.read(read_buf).await;
let n = result?;
if n == 0 {
break;
}
frame_buf.extend_from_slice(&buf[..n]);
read_buf = buf;
while frame_buf.len() >= 4 {
let len = u32::from_le_bytes([
frame_buf[0], frame_buf[1], frame_buf[2], frame_buf[3],
]) as usize;
if frame_buf.len() < 4 + len {
break;
}
let payload = frame_buf[4..4 + len].to_vec();
frame_buf.drain(..4 + len);
let Some(client_msg) = decode::<ClientMsg>(&payload) else {
continue;
};
let (enu, ts) = match client_msg {
ClientMsg::FullPos { pos, ts_ms, .. } => {
let e = [pos[0] as f64, pos[1] as f64, pos[2] as f64];
last_ts = ts_ms;
last_pos = e;
(e, ts_ms)
}
ClientMsg::DeltaPos { dx, dy, dz, dt_ms, .. } => {
let e = [
last_pos[0] + dx as f64 / 1000.0,
last_pos[1] + dy as f64 / 1000.0,
last_pos[2] + dz as f64 / 1000.0,
];
let ts = last_ts.wrapping_add(dt_ms as u32);
last_pos = e;
last_ts = ts;
(e, ts)
}
ClientMsg::Stationary { duration_ms, .. } => {
let ts = last_ts.wrapping_add(duration_ms as u32);
last_ts = ts;
(last_pos, ts)
}
ClientMsg::RequestSnapshot => continue,
ClientMsg::Ping { seq } => {
let pong = encode(&ServerMsg::Pong {
seq,
server_ts_ms: epoch_ms() as u32,
});
write_frame_io_uring(&stream, &pong).await?;
continue;
}
ClientMsg::Ack { .. } => continue,
};
let current = server.store.read().unwrap().query_enu(enu);
let events = step_hysteresis(
¤t,
&mut active_zones,
&mut tick_counts,
&server.hysteresis,
);
for (zone_id, event) in events {
let response = encode(&ServerMsg::EntityEvent {
entity_id,
event,
zone_id,
ts_ms: ts,
});
write_frame_io_uring(&stream, &response).await?;
}
}
}
Ok(())
}
#[cfg(not(all(feature = "io_uring", target_os = "linux")))]
pub async fn run_io_uring(_addr: &str, _server: Arc<ZoneServer>) -> std::io::Result<()> {
Err(std::io::Error::new(
std::io::ErrorKind::Unsupported,
"io_uring requires Linux and the 'io_uring' feature",
))
}
#[cfg(feature = "server")]
pub async fn start_health_endpoint(port: u16) -> std::io::Result<()> {
use tokio::io::AsyncWriteExt;
use tokio::net::TcpListener;
let listener = TcpListener::bind(("0.0.0.0", port)).await?;
loop {
if let Ok((mut stream, _)) = listener.accept().await {
tokio::spawn(async move {
let response = b"HTTP/1.1 200 OK\r\nContent-Length: 2\r\n\r\nok";
let _ = stream.write_all(response).await;
});
}
}
}
#[cfg(feature = "prometheus")]
pub async fn start_metrics_endpoint(port: u16) -> std::io::Result<()> {
use metrics::describe_counter;
use metrics_exporter_prometheus::PrometheusBuilder;
use tokio::io::AsyncWriteExt;
use tokio::net::TcpListener;
describe_counter!("plozone_queries_total", "total zone queries");
describe_counter!("plozone_events_total", "total zone enter/exit events");
let handle = PrometheusBuilder::new()
.install_recorder()
.map_err(|e| std::io::Error::other(e.to_string()))?;
let listener = TcpListener::bind(("0.0.0.0", port)).await?;
loop {
if let Ok((mut stream, _)) = listener.accept().await {
let metrics_text = handle.render();
let response = format!(
"HTTP/1.1 200 OK\r\nContent-Type: text/plain\r\nContent-Length: {}\r\n\r\n{}",
metrics_text.len(),
metrics_text
);
tokio::spawn(async move {
let _ = stream.write_all(response.as_bytes()).await;
});
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::zone::{Zone, ZoneEntry};
#[test]
fn broadcast_sends_to_all_subscribers() {
let conv = Arc::new(EnuConverter::new(0.0, 0.0, 0.0));
let store = Arc::new(RwLock::new(ZoneStore::from_entries(&[], &*conv)));
let octree = Arc::new(RwLock::new(OctreeNode::new([0.0; 3], 50.0)));
let srv = Arc::new(ZoneServer::new(store, octree, conv));
let (tx1, mut rx1) = tokio::sync::mpsc::channel::<Vec<u8>>(16);
let (tx2, mut rx2) = tokio::sync::mpsc::channel::<Vec<u8>>(16);
srv.subscribers.write().unwrap().insert(1, tx1);
srv.subscribers.write().unwrap().insert(2, tx2);
srv.broadcast(vec![ZoneDiff::Remove { id: 99 }]);
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
let b1 = rx1.try_recv().unwrap();
let b2 = rx2.try_recv().unwrap();
let msg1: ServerMsg = decode(&b1).unwrap();
let msg2: ServerMsg = decode(&b2).unwrap();
assert_eq!(msg1, msg2);
});
}
#[test]
fn add_zone_broadcast_pushes_to_clients() {
let conv = Arc::new(EnuConverter::new(0.0, 0.0, 0.0));
let store = Arc::new(RwLock::new(ZoneStore::from_entries(&[], &*conv)));
let octree = Arc::new(RwLock::new(OctreeNode::new([0.0; 3], 50.0)));
let srv = Arc::new(ZoneServer::new(store.clone(), octree, conv));
let (tx, mut rx) = tokio::sync::mpsc::channel::<Vec<u8>>(16);
srv.subscribers.write().unwrap().insert(1, tx);
srv.add_zone_broadcast(ZoneEntry::new(
7,
Zone::Cylinder { center: [0.0, 0.0], radius_m: 10.0, z_min: 0.0, z_max: 5.0 },
));
assert!(store.read().unwrap().ids().contains(&7));
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
let bytes = rx.try_recv().unwrap();
let msg: ServerMsg = decode(&bytes).unwrap();
match msg {
ServerMsg::ZoneBatch { diffs, .. } => {
assert_eq!(diffs.len(), 1);
assert!(matches!(diffs[0], ZoneDiff::Add(ref e) if e.id == 7));
}
other => panic!("expected ZoneBatch, got {other:?}"),
}
});
}
#[test]
fn atomic_pos_round_trip() {
let pos = AtomicPos::new(1.5, -2.3, 100.0);
let loaded = pos.load();
assert!((loaded[0] - 1.5).abs() < 1e-6);
assert!((loaded[1] - (-2.3)).abs() < 1e-6);
assert!((loaded[2] - 100.0).abs() < 1e-6);
}
#[test]
fn atomic_pos_store_updates() {
let pos = AtomicPos::new(0.0, 0.0, 0.0);
pos.store(5.0, 6.0, 7.0);
assert_eq!(pos.load_f64(), [5.0, 6.0, 7.0]);
}
#[test]
fn sharded_server_distributes_entities() {
let conv = Arc::new(EnuConverter::new(0.0, 0.0, 0.0));
let sharded = ShardedZoneServer::new(4, conv, 50.0);
assert_eq!(sharded.shard_count(), 4);
let s0 = sharded.shard_for(0);
let s4 = sharded.shard_for(4);
assert!(std::ptr::eq(&**s0, &**s4), "0 and 4 should map to same shard");
}
#[test]
fn hysteresis_suppresses_single_tick_jitter() {
let config = HysteresisConfig::default(); let mut active: Vec<u32> = vec![];
let mut ticks: HashMap<u32, i16> = HashMap::new();
let ev = step_hysteresis(&[1], &mut active, &mut ticks, &config);
assert!(ev.is_empty(), "single tick must not fire Enter");
assert!(active.is_empty(), "zone must not be active after one tick");
let ev = step_hysteresis(&[], &mut active, &mut ticks, &config);
assert!(ev.is_empty(), "leaving before enter_ticks fires nothing");
assert!(active.is_empty());
}
#[test]
fn hysteresis_enter_requires_consecutive_ticks() {
let config = HysteresisConfig::default(); let mut active: Vec<u32> = vec![];
let mut ticks: HashMap<u32, i16> = HashMap::new();
assert!(step_hysteresis(&[1], &mut active, &mut ticks, &config).is_empty());
assert!(step_hysteresis(&[], &mut active, &mut ticks, &config).is_empty());
assert!(!ticks.contains_key(&1), "broken enter streak must clear the counter");
assert!(step_hysteresis(&[1], &mut active, &mut ticks, &config).is_empty());
assert!(active.is_empty(), "non-consecutive presence must not fire Enter");
let ev = step_hysteresis(&[1], &mut active, &mut ticks, &config);
assert_eq!(ev, vec![(1, ZoneEvent::Enter)]);
assert_eq!(active, vec![1]);
}
#[test]
fn hysteresis_fires_after_sustained_presence() {
let config = HysteresisConfig::default(); let mut active: Vec<u32> = vec![];
let mut ticks: HashMap<u32, i16> = HashMap::new();
assert!(step_hysteresis(&[1], &mut active, &mut ticks, &config).is_empty());
let ev = step_hysteresis(&[1], &mut active, &mut ticks, &config);
assert_eq!(ev, vec![(1, ZoneEvent::Enter)]);
assert_eq!(active, vec![1]);
assert!(step_hysteresis(&[1], &mut active, &mut ticks, &config).is_empty());
assert!(step_hysteresis(&[], &mut active, &mut ticks, &config).is_empty());
assert_eq!(active, vec![1], "still active after one tick outside");
let ev = step_hysteresis(&[], &mut active, &mut ticks, &config);
assert_eq!(ev, vec![(1, ZoneEvent::Exit)]);
assert!(active.is_empty(), "zone removed from active set after Exit");
assert!(!ticks.contains_key(&1), "exited zone purged from tick map");
}
#[test]
fn sharded_server_add_zone_propagates() {
let conv = Arc::new(EnuConverter::new(0.0, 0.0, 0.0));
let sharded = ShardedZoneServer::new(4, conv, 50.0);
sharded.add_zone_all(ZoneEntry::new(
1,
Zone::Cylinder { center: [0.0, 0.0], radius_m: 10.0, z_min: 0.0, z_max: 5.0 },
));
for shard in &sharded.shards {
assert!(shard.store.read().unwrap().ids().contains(&1));
}
}
#[cfg(all(feature = "io_uring", target_os = "linux"))]
#[test]
fn io_uring_server_binds_and_accepts() {
use crate::net::{ClientMsg, ServerMsg, decode, encode};
use std::io::{Read, Write};
use std::sync::mpsc::channel;
let conv = Arc::new(EnuConverter::new(0.0, 0.0, 0.0));
let store = Arc::new(RwLock::new(ZoneStore::from_entries(&[], &*conv)));
let octree = Arc::new(RwLock::new(OctreeNode::new([0.0; 3], 50.0)));
let server = Arc::new(ZoneServer::new(store, octree, conv));
let (addr_tx, addr_rx) = channel();
let srv = server.clone();
let server_thread = std::thread::spawn(move || {
tokio_uring::start(async move {
let listener =
tokio_uring::net::TcpListener::bind("127.0.0.1:0".parse().unwrap()).unwrap();
addr_tx.send(listener.local_addr().unwrap()).unwrap();
let (stream, _) = listener.accept().await.unwrap();
handle_io_uring_connection(stream, srv).await.unwrap();
});
});
let bound_addr = addr_rx.recv().unwrap();
let mut client = std::net::TcpStream::connect(bound_addr).unwrap();
let ping = encode(&ClientMsg::Ping { seq: 42 });
let mut frame = (ping.len() as u32).to_le_bytes().to_vec();
frame.extend_from_slice(&ping);
client.write_all(&frame).unwrap();
let mut len_buf = [0u8; 4];
client.read_exact(&mut len_buf).unwrap();
let resp_len = u32::from_le_bytes(len_buf) as usize;
let mut resp_buf = vec![0u8; resp_len];
client.read_exact(&mut resp_buf).unwrap();
let pong: ServerMsg = decode(&resp_buf).unwrap();
assert!(matches!(pong, ServerMsg::Pong { seq: 42, .. }));
drop(client);
server_thread.join().unwrap();
}
#[cfg(all(feature = "io_uring", target_os = "linux"))]
#[test]
fn io_uring_write_frame_helper() {
use std::io::Read;
use std::sync::mpsc::channel;
let (addr_tx, addr_rx) = channel();
let payload = b"hello io_uring".to_vec();
let payload2 = payload.clone();
std::thread::spawn(move || {
tokio_uring::start(async move {
let listener =
tokio_uring::net::TcpListener::bind("127.0.0.1:0".parse().unwrap()).unwrap();
addr_tx.send(listener.local_addr().unwrap()).unwrap();
let (stream, _) = listener.accept().await.unwrap();
write_frame_io_uring(&stream, &payload2).await.unwrap();
});
});
let bound_addr = addr_rx.recv().unwrap();
let mut client = std::net::TcpStream::connect(bound_addr).unwrap();
let mut len_buf = [0u8; 4];
client.read_exact(&mut len_buf).unwrap();
let len = u32::from_le_bytes(len_buf) as usize;
assert_eq!(len, payload.len());
let mut data = vec![0u8; len];
client.read_exact(&mut data).unwrap();
assert_eq!(data, payload);
}
#[cfg(not(all(feature = "io_uring", target_os = "linux")))]
#[test]
fn io_uring_stub_returns_unsupported() {
let rt = tokio::runtime::Runtime::new().unwrap();
let result = rt.block_on(async {
run_io_uring("127.0.0.1:0", Arc::new(ZoneServer::new(
Arc::new(RwLock::new(ZoneStore::from_entries(&[], &EnuConverter::new(0.0, 0.0, 0.0)))),
Arc::new(RwLock::new(OctreeNode::new([0.0; 3], 50.0))),
Arc::new(EnuConverter::new(0.0, 0.0, 0.0)),
))).await
});
assert!(result.is_err());
assert_eq!(result.unwrap_err().kind(), std::io::ErrorKind::Unsupported);
}
}