use clasp_client::Clasp;
use clasp_core::{SecurityMode, Value};
use clasp_router::{Router, RouterConfig};
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::Notify;
use tokio::time::timeout;
pub const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10);
pub const DEFAULT_CHECK_INTERVAL: Duration = Duration::from_millis(10);
pub async fn find_available_port() -> u16 {
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
listener.local_addr().unwrap().port()
}
pub fn find_available_udp_port() -> u16 {
let socket = std::net::UdpSocket::bind("127.0.0.1:0").unwrap();
socket.local_addr().unwrap().port()
}
pub async fn wait_for<F, Fut>(check: F, interval: Duration, max_wait: Duration) -> bool
where
F: Fn() -> Fut,
Fut: std::future::Future<Output = bool>,
{
let start = Instant::now();
while start.elapsed() < max_wait {
if check().await {
return true;
}
tokio::time::sleep(interval).await;
}
false
}
pub async fn wait_for_count(counter: &AtomicU32, target: u32, max_wait: Duration) -> bool {
wait_for(
|| async { counter.load(Ordering::SeqCst) >= target },
DEFAULT_CHECK_INTERVAL,
max_wait,
)
.await
}
pub async fn wait_for_flag(flag: &AtomicBool, max_wait: Duration) -> bool {
wait_for(
|| async { flag.load(Ordering::SeqCst) },
DEFAULT_CHECK_INTERVAL,
max_wait,
)
.await
}
pub async fn wait_with_notify(notify: &Notify, max_wait: Duration) -> bool {
timeout(max_wait, notify.notified()).await.is_ok()
}
pub struct TestRouter {
port: u16,
handle: Option<tokio::task::JoinHandle<()>>,
ready: Arc<AtomicBool>,
}
impl TestRouter {
pub async fn start() -> Self {
Self::start_with_config(RouterConfig {
name: "Test Router".to_string(),
max_sessions: 100,
session_timeout: 60,
features: vec![
"param".to_string(),
"event".to_string(),
"stream".to_string(),
],
security_mode: SecurityMode::Open,
max_subscriptions_per_session: 1000,
gesture_coalescing: true,
gesture_coalesce_interval_ms: 16,
max_messages_per_second: 0, rate_limiting_enabled: false,
state_config: clasp_router::RouterStateConfig::unlimited(), })
.await
}
pub async fn start_with_config(config: RouterConfig) -> Self {
let port = find_available_port().await;
let addr = format!("127.0.0.1:{}", port);
let ready = Arc::new(AtomicBool::new(false));
let ready_clone = ready.clone();
let router = Router::new(config);
let handle = tokio::spawn(async move {
ready_clone.store(true, Ordering::SeqCst);
let _ = router.serve_websocket(&addr).await;
});
let start = Instant::now();
while !ready.load(Ordering::SeqCst) && start.elapsed() < Duration::from_secs(5) {
tokio::time::sleep(Duration::from_millis(10)).await;
}
let _ = wait_for(
|| async move {
tokio::net::TcpStream::connect(format!("127.0.0.1:{}", port))
.await
.is_ok()
},
Duration::from_millis(10),
Duration::from_secs(5),
)
.await;
Self {
port,
handle: Some(handle),
ready,
}
}
pub fn url(&self) -> String {
format!("ws://127.0.0.1:{}", self.port)
}
pub fn port(&self) -> u16 {
self.port
}
pub fn is_ready(&self) -> bool {
self.ready.load(Ordering::SeqCst)
}
pub async fn connect_client(&self) -> Result<Clasp, clasp_client::ClientError> {
Clasp::connect_to(&self.url()).await
}
pub async fn connect_client_named(
&self,
name: &str,
) -> Result<Clasp, clasp_client::ClientError> {
Clasp::builder(&self.url()).name(name).connect().await
}
pub fn stop(&mut self) {
if let Some(handle) = self.handle.take() {
handle.abort();
}
}
}
impl Drop for TestRouter {
fn drop(&mut self) {
self.stop();
}
}
pub fn assert_approx_eq(actual: f64, expected: f64, epsilon: f64, msg: &str) -> Result<(), String> {
if (actual - expected).abs() < epsilon {
Ok(())
} else {
Err(format!(
"{}: expected {} +/- {}, got {}",
msg, expected, epsilon, actual
))
}
}
pub fn assert_that(condition: bool, msg: &str) -> Result<(), String> {
if condition {
Ok(())
} else {
Err(msg.to_string())
}
}
pub fn assert_some<T>(opt: Option<T>, msg: &str) -> Result<T, String> {
opt.ok_or_else(|| msg.to_string())
}
pub fn assert_ok<T, E: std::fmt::Debug>(result: Result<T, E>, msg: &str) -> Result<T, String> {
result.map_err(|e| format!("{}: {:?}", msg, e))
}
pub fn assert_err<T: std::fmt::Debug, E>(result: Result<T, E>, msg: &str) -> Result<(), String> {
match result {
Ok(v) => Err(format!("{}: expected error, got Ok({:?})", msg, v)),
Err(_) => Ok(()),
}
}
#[derive(Clone)]
pub struct ValueCollector {
values: Arc<parking_lot::Mutex<Vec<(String, Value)>>>,
notify: Arc<Notify>,
count: Arc<AtomicU32>,
}
impl ValueCollector {
pub fn new() -> Self {
Self {
values: Arc::new(parking_lot::Mutex::new(Vec::new())),
notify: Arc::new(Notify::new()),
count: Arc::new(AtomicU32::new(0)),
}
}
pub fn callback(&self) -> impl Fn(Value, String) + Send + 'static {
let values = self.values.clone();
let notify = self.notify.clone();
let count = self.count.clone();
move |value, address| {
{
let mut guard = values.lock();
guard.push((address, value));
}
count.fetch_add(1, Ordering::SeqCst);
notify.notify_waiters();
}
}
pub fn callback_ref(&self) -> impl Fn(Value, &str) + Send + Sync + 'static {
let values = self.values.clone();
let notify = self.notify.clone();
let count = self.count.clone();
move |value, address| {
{
let mut guard = values.lock();
guard.push((address.to_string(), value));
}
count.fetch_add(1, Ordering::SeqCst);
notify.notify_waiters();
}
}
pub fn count(&self) -> u32 {
self.count.load(Ordering::SeqCst)
}
pub async fn wait_for_count(&self, n: u32, max_wait: Duration) -> bool {
wait_for_count(&self.count, n, max_wait).await
}
pub fn values(&self) -> Vec<(String, Value)> {
self.values.lock().clone()
}
pub fn has_address(&self, addr: &str) -> bool {
self.values.lock().iter().any(|(a, _)| a == addr)
}
pub fn values_for(&self, addr: &str) -> Vec<Value> {
self.values
.lock()
.iter()
.filter(|(a, _)| a == addr)
.map(|(_, v)| v.clone())
.collect()
}
pub fn last_value(&self) -> Option<(String, Value)> {
self.values.lock().last().cloned()
}
pub fn clear(&self) {
self.values.lock().clear();
self.count.store(0, Ordering::SeqCst);
}
}
impl Default for ValueCollector {
fn default() -> Self {
Self::new()
}
}
#[cfg(feature = "lens")]
pub mod lens_helpers {
use clasp_lens::LensHost;
use serde_json::Value;
pub fn load_test_lens(name: &str) -> LensHost {
let manifest_dir = env!("CARGO_MANIFEST_DIR");
let workspace_root = std::path::Path::new(manifest_dir)
.parent()
.unwrap()
.parent()
.unwrap();
let wasm_name = format!("lens_{}", name.replace('-', "_"));
let wasm_path = workspace_root
.join("lenses")
.join(name)
.join("target/wasm32-unknown-unknown/release")
.join(format!("{}.wasm", wasm_name));
let bytes = std::fs::read(&wasm_path).unwrap_or_else(|e| {
panic!(
"Failed to load test lens '{}' at {}: {}",
name,
wasm_path.display(),
e
)
});
LensHost::new(&bytes)
.unwrap_or_else(|e| panic!("Failed to create LensHost for '{}': {}", name, e))
}
pub fn assert_lens_transform(host: &LensHost, input: &Value, expected: &Value) {
let actual = host
.transform(input)
.unwrap_or_else(|e| panic!("Transform failed on input {}: {}", input, e));
assert_eq!(
&actual, expected,
"Lens transform mismatch.\n Input: {}\n Expected: {}\n Actual: {}",
input, expected, actual
);
}
}