#![allow(clippy::expect_used)]
use core::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;
use std::sync::Barrier;
use std::thread;
use polyplug::runtime::Runtime;
use polyplug_abi::types::LogLevel;
use polyplug_abi::{HostApi, StringView};
const THREADS: usize = 8_usize;
const LOGS_PER_THREAD: usize = 2_000_usize;
fn thread_index_agrees(scope: &str, message: &str) -> bool {
let scope_idx: Option<&str> = scope.strip_prefix("scope.");
let message_idx: Option<&str> = message
.strip_prefix("payload.")
.and_then(|rest: &str| rest.split('.').next());
match (scope_idx, message_idx) {
(Some(s), Some(m)) => !s.is_empty() && s == m,
_ => false,
}
}
#[test]
fn concurrent_host_log_funnel_delivers_every_record_intact() {
let delivered: Arc<AtomicUsize> = Arc::new(AtomicUsize::new(0_usize));
let malformed: Arc<AtomicBool> = Arc::new(AtomicBool::new(false));
let cb_delivered: Arc<AtomicUsize> = Arc::clone(&delivered);
let cb_malformed: Arc<AtomicBool> = Arc::clone(&malformed);
let rt: Arc<Runtime> = Runtime::builder()
.loader(crate::common::TestNativeLoader::new())
.logger(move |level: LogLevel, scope: &str, message: &str| {
if level != LogLevel::Info || !thread_index_agrees(scope, message) {
cb_malformed.store(true, Ordering::Relaxed);
}
cb_delivered.fetch_add(1_usize, Ordering::Relaxed);
})
.build()
.expect("runtime build must succeed");
let barrier: Arc<Barrier> = Arc::new(Barrier::new(THREADS));
let handles: Vec<thread::JoinHandle<()>> = (0_usize..THREADS)
.map(|t| {
let rt_clone: Arc<Runtime> = Arc::clone(&rt);
let barrier_clone: Arc<Barrier> = Arc::clone(&barrier);
thread::spawn(move || {
let host: *const HostApi = rt_clone.host_abi();
let scope: String = format!("scope.{t}");
barrier_clone.wait();
for seq in 0_usize..LOGS_PER_THREAD {
let message: String = format!("payload.{t}.{seq}");
let scope_sv: StringView = StringView {
ptr: scope.as_bytes().as_ptr(),
len: scope.len(),
};
let message_sv: StringView = StringView {
ptr: message.as_bytes().as_ptr(),
len: message.len(),
};
unsafe {
((*host).log)(host, LogLevel::Info as u32, scope_sv, message_sv);
}
}
})
})
.collect();
for handle in handles {
handle.join().expect("logging thread must not panic");
}
assert!(
!malformed.load(Ordering::Relaxed),
"a record arrived torn or cross-wired — the logger funnel is not thread-safe"
);
assert_eq!(
delivered.load(Ordering::Relaxed),
THREADS * LOGS_PER_THREAD,
"every host->log record must reach the sink exactly once under contention"
);
}