#![allow(clippy::unbuffered_bytes)]
use std::{
io::{self, Read, Write},
path::{Path, PathBuf},
process::{Child, Command, Stdio},
sync::{Arc, Mutex},
time::Duration,
};
use clap::ValueEnum;
use dashmap::{DashMap, DashSet};
use freenet_stdlib::{
client_api::{ClientRequest, ContractRequest, NodeDiagnosticsConfig, NodeQuery, WebApi},
prelude::*,
};
use serde::{Deserialize, Serialize};
use std::sync::LazyLock;
use tracing::{error, info};
use crate::util::workspace::get_workspace_target_dir;
pub fn set_peer_id(peer_id: impl Into<String>) {
let peer_id = peer_id.into();
tracing::Span::current().record("test_node", peer_id);
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum LogFormat {
Pretty,
Json,
}
pub struct TestLogger {
format: LogFormat,
level: String,
capture: bool,
captured_logs: Arc<Mutex<Vec<String>>>,
_guard: Option<tracing::subscriber::DefaultGuard>,
}
impl TestLogger {
pub fn new() -> Self {
Self {
format: LogFormat::Pretty,
level: "info".to_string(),
capture: false,
captured_logs: Arc::new(Mutex::new(Vec::new())),
_guard: None,
}
}
pub fn with_json(mut self) -> Self {
self.format = LogFormat::Json;
self
}
pub fn with_pretty(mut self) -> Self {
self.format = LogFormat::Pretty;
self
}
pub fn with_level(mut self, level: impl Into<String>) -> Self {
self.level = level.into();
self
}
pub fn capture_logs(mut self) -> Self {
self.capture = true;
self
}
pub fn init(mut self) -> Self {
use tracing_subscriber::{
EnvFilter, Layer, fmt, layer::SubscriberExt, util::SubscriberInitExt,
};
let env_filter = EnvFilter::new(&self.level);
let layer: Box<dyn Layer<_> + Send + Sync> = match self.format {
LogFormat::Pretty => {
if self.capture {
let writer = CapturingWriter::new(self.captured_logs.clone());
fmt::layer()
.with_writer(move || writer.clone())
.pretty()
.boxed()
} else {
fmt::layer().with_test_writer().pretty().boxed()
}
}
LogFormat::Json => {
if self.capture {
let writer = CapturingWriter::new(self.captured_logs.clone());
fmt::layer()
.with_writer(move || writer.clone())
.json()
.with_span_list(true)
.flatten_event(true)
.boxed()
} else {
fmt::layer()
.with_test_writer()
.json()
.with_span_list(true)
.flatten_event(true)
.boxed()
}
}
};
let subscriber = tracing_subscriber::registry().with(env_filter).with(layer);
self._guard = Some(subscriber.set_default());
self
}
pub fn contains(&self, message: &str) -> bool {
if !self.capture {
panic!("Cannot inspect logs without calling .capture_logs()");
}
self.captured_logs
.lock()
.unwrap()
.iter()
.any(|log| log.contains(message))
}
pub fn logs(&self) -> Vec<String> {
if !self.capture {
panic!("Cannot get logs without calling .capture_logs()");
}
self.captured_logs.lock().unwrap().clone()
}
pub fn logs_matching(&self, filter: impl Fn(&str) -> bool) -> Vec<String> {
self.logs().into_iter().filter(|log| filter(log)).collect()
}
pub fn log_count(&self) -> usize {
if !self.capture {
panic!("Cannot count logs without calling .capture_logs()");
}
self.captured_logs.lock().unwrap().len()
}
}
impl Default for TestLogger {
fn default() -> Self {
Self::new()
}
}
#[derive(Clone)]
struct CapturingWriter {
buffer: Arc<Mutex<Vec<String>>>,
}
impl CapturingWriter {
fn new(buffer: Arc<Mutex<Vec<String>>>) -> Self {
Self { buffer }
}
}
impl Write for CapturingWriter {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
if let Ok(s) = std::str::from_utf8(buf) {
for line in s.lines() {
if !line.is_empty() {
self.buffer.lock().unwrap().push(line.to_string());
}
}
}
std::io::stdout().write_all(buf)?;
Ok(buf.len())
}
fn flush(&mut self) -> std::io::Result<()> {
std::io::stdout().flush()
}
}
pub async fn make_put(
client: &mut WebApi,
state: WrappedState,
contract: ContractContainer,
subscribe: bool,
) -> anyhow::Result<()> {
make_put_with_blocking(client, state, contract, subscribe, false).await
}
pub async fn make_put_with_blocking(
client: &mut WebApi,
state: WrappedState,
contract: ContractContainer,
subscribe: bool,
blocking_subscribe: bool,
) -> anyhow::Result<()> {
client
.send(ClientRequest::ContractOp(ContractRequest::Put {
contract: contract.clone(),
state: state.clone(),
related_contracts: RelatedContracts::default(),
subscribe,
blocking_subscribe,
}))
.await?;
Ok(())
}
pub async fn make_update(
client: &mut WebApi,
key: ContractKey,
state: WrappedState,
) -> anyhow::Result<()> {
client
.send(ClientRequest::ContractOp(ContractRequest::Update {
key,
data: UpdateData::State(State::from(state)),
}))
.await?;
Ok(())
}
pub async fn make_subscribe(client: &mut WebApi, key: ContractKey) -> anyhow::Result<()> {
client
.send(ClientRequest::ContractOp(ContractRequest::Subscribe {
key: *key.id(),
summary: None,
}))
.await?;
Ok(())
}
pub async fn make_get(
client: &mut WebApi,
key: ContractKey,
return_contract_code: bool,
subscribe: bool,
) -> anyhow::Result<()> {
make_get_with_blocking(client, key, return_contract_code, subscribe, false).await
}
pub async fn make_get_with_blocking(
client: &mut WebApi,
key: ContractKey,
return_contract_code: bool,
subscribe: bool,
blocking_subscribe: bool,
) -> anyhow::Result<()> {
client
.send(ClientRequest::ContractOp(ContractRequest::Get {
key: *key.id(),
return_contract_code,
subscribe,
blocking_subscribe,
}))
.await?;
Ok(())
}
pub async fn make_node_diagnostics(
client: &mut WebApi,
config: NodeDiagnosticsConfig,
) -> anyhow::Result<()> {
client
.send(ClientRequest::NodeQueries(NodeQuery::NodeDiagnostics {
config,
}))
.await?;
Ok(())
}
static COMPILED_CONTRACT_CACHE: LazyLock<dashmap::DashMap<String, Vec<u8>>> =
LazyLock::new(dashmap::DashMap::new);
pub fn ensure_contract_compiled(name: &str) -> anyhow::Result<()> {
if COMPILED_CONTRACT_CACHE.contains_key(name) {
return Ok(());
}
let bytes = compile_contract(name)?;
COMPILED_CONTRACT_CACHE
.entry(name.to_string())
.or_insert(bytes);
Ok(())
}
pub fn load_contract(name: &str, params: Parameters<'static>) -> anyhow::Result<ContractContainer> {
let wasm_bytes = match COMPILED_CONTRACT_CACHE.get(name) {
Some(entry) => entry.value().clone(),
None => {
let bytes = compile_contract(name)?;
COMPILED_CONTRACT_CACHE
.entry(name.to_string())
.or_insert(bytes.clone());
bytes
}
};
let contract_bytes = WrappedContract::new(Arc::new(ContractCode::from(wasm_bytes)), params);
let contract = ContractContainer::Wasm(ContractWasmAPIVersion::V1(contract_bytes));
Ok(contract)
}
pub fn load_delegate(name: &str, params: Parameters<'static>) -> anyhow::Result<DelegateContainer> {
let delegate_bytes = compile_delegate(name)?;
let delegate_code = DelegateCode::from(delegate_bytes);
let delegate = Delegate::from((&delegate_code, ¶ms));
let delegate = DelegateContainer::Wasm(DelegateWasmAPIVersion::V1(delegate));
Ok(delegate)
}
fn compile_contract(name: &str) -> anyhow::Result<Vec<u8>> {
let contract_path = {
const CRATE_DIR: &str = concat!(env!("CARGO_MANIFEST_DIR"), "/../../tests/");
let contracts = PathBuf::from(CRATE_DIR);
contracts.join(name)
};
info!("module path: {contract_path:?}");
let target = get_workspace_target_dir();
info!(
"trying to compile the test contract, target: {}",
target.display()
);
compile_rust_wasm_lib(
&BuildToolConfig {
features: None,
package_type: PackageType::Contract,
debug: false,
},
&contract_path,
)?;
let output_file = target
.join(WASM_TARGET)
.join("release")
.join(name.replace('-', "_"))
.with_extension("wasm");
info!("output file: {output_file:?}");
Ok(std::fs::read(output_file)?)
}
pub fn compile_delegate(name: &str) -> anyhow::Result<Vec<u8>> {
let delegate_path = {
const CRATE_DIR: &str = concat!(env!("CARGO_MANIFEST_DIR"), "/../../tests/");
let delegates = PathBuf::from(CRATE_DIR);
delegates.join(name)
};
info!("delegate path: {delegate_path:?}");
if !delegate_path.exists() {
return Err(anyhow::anyhow!(
"Delegate directory does not exist: {delegate_path:?}"
));
}
let target = get_workspace_target_dir();
info!(
"trying to compile the test delegate, target: {}",
target.display()
);
compile_rust_wasm_lib(
&BuildToolConfig {
features: None,
package_type: PackageType::Delegate,
debug: false,
},
&delegate_path,
)?;
let output_file = target
.join(WASM_TARGET)
.join("release")
.join(name.replace('-', "_"))
.with_extension("wasm");
info!("output file: {output_file:?}");
if !output_file.exists() {
return Err(anyhow::anyhow!(
"Compiled WASM file not found at: {output_file:?}"
));
}
let wasm_data = std::fs::read(&output_file)
.map_err(|e| anyhow::anyhow!("Failed to read output file {output_file:?}: {e}"))?;
info!("WASM size: {} bytes", wasm_data.len());
Ok(wasm_data)
}
const WASM_TARGET: &str = "wasm32-unknown-unknown";
fn compile_options(cli_config: &BuildToolConfig) -> impl Iterator<Item = String> {
let release: &[&str] = if cli_config.debug {
&[]
} else {
&["--release"]
};
let feature_list = cli_config
.features
.iter()
.flat_map(|s| {
s.split(',')
.filter(|p| *p != cli_config.package_type.feature())
})
.chain([cli_config.package_type.feature()]);
let features = [
"--features".to_string(),
feature_list.collect::<Vec<_>>().join(","),
];
features
.into_iter()
.chain(release.iter().map(|s| s.to_string()))
}
fn compile_rust_wasm_lib(cli_config: &BuildToolConfig, work_dir: &Path) -> anyhow::Result<()> {
const RUST_TARGET_ARGS: &[&str] = &["build", "--lib", "--target"];
use std::io::IsTerminal;
let comp_opts = compile_options(cli_config).collect::<Vec<_>>();
let cmd_args = if std::io::stdout().is_terminal() && std::io::stderr().is_terminal() {
RUST_TARGET_ARGS
.iter()
.copied()
.chain([WASM_TARGET, "--color", "always"])
.chain(comp_opts.iter().map(|s| s.as_str()))
.collect::<Vec<_>>()
} else {
RUST_TARGET_ARGS
.iter()
.copied()
.chain([WASM_TARGET])
.chain(comp_opts.iter().map(|s| s.as_str()))
.collect::<Vec<_>>()
};
let package_type = cli_config.package_type;
info!("Compiling {package_type} with rust");
let mut command = Command::new("cargo");
if std::env::var("CARGO_TARGET_DIR").is_err() {
command.env("CARGO_TARGET_DIR", get_workspace_target_dir());
}
let child = command
.args(&cmd_args)
.current_dir(work_dir)
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.map_err(|e| {
error!("Error while executing cargo command: {e}");
anyhow::anyhow!("Error while executing cargo command: {e}")
})?;
pipe_std_streams(child)?;
Ok(())
}
pub(crate) fn pipe_std_streams(mut child: Child) -> anyhow::Result<()> {
let c_stdout = child.stdout.take().expect("Failed to open command stdout");
let c_stderr = child.stderr.take().expect("Failed to open command stderr");
let write_child_stderr = move || -> anyhow::Result<()> {
let mut stderr = io::stderr();
for b in c_stderr.bytes() {
let b = b?;
stderr.write_all(&[b])?;
}
Ok(())
};
let write_child_stdout = move || -> anyhow::Result<()> {
let mut stdout = io::stdout();
for b in c_stdout.bytes() {
let b = b?;
stdout.write_all(&[b])?;
}
Ok(())
};
std::thread::spawn(write_child_stdout);
std::thread::spawn(write_child_stderr);
loop {
match child.try_wait() {
Ok(Some(status)) => {
if !status.success() {
anyhow::bail!("exit with status: {status}");
}
break;
}
Ok(None) => {
std::thread::sleep(Duration::from_millis(500));
}
Err(err) => {
return Err(err.into());
}
}
}
Ok(())
}
#[derive(clap::Parser, Clone, Debug)]
pub struct BuildToolConfig {
#[arg(long)]
pub(crate) features: Option<String>,
#[arg(long, value_enum, default_value_t=PackageType::default())]
pub(crate) package_type: PackageType,
#[arg(long)]
pub(crate) debug: bool,
}
#[derive(Default, Debug, Clone, Copy, ValueEnum)]
pub(crate) enum PackageType {
#[default]
Contract,
Delegate,
}
impl PackageType {
pub fn feature(&self) -> &'static str {
match self {
PackageType::Contract => "freenet-main-contract",
PackageType::Delegate => "freenet-main-delegate",
}
}
}
impl std::fmt::Display for PackageType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
PackageType::Contract => write!(f, "contract"),
PackageType::Delegate => write!(f, "delegate"),
}
}
}
pub async fn verify_contract_exists(dir: &Path, key: ContractKey) -> anyhow::Result<bool> {
let code_hash = key.encoded_code_hash();
let contract_path = dir.join("contracts").join(code_hash);
Ok(tokio::fs::metadata(contract_path).await.is_ok())
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct TodoList {
pub tasks: Vec<Task>,
pub version: u64,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct Task {
pub id: u64,
pub title: String,
pub description: String,
pub completed: bool,
pub priority: u8,
}
#[derive(Serialize, Deserialize, Debug)]
pub enum TodoOperation {
Add(Task),
Update(Task),
Remove(u64),
Complete(u64),
}
pub fn create_empty_todo_list() -> Vec<u8> {
let todo_list = TodoList {
tasks: Vec::new(),
version: 0,
};
serde_json::to_vec(&todo_list).unwrap_or_default()
}
pub fn create_todo_list_with_item(title: &str) -> Vec<u8> {
let task = Task {
id: 1,
title: title.to_string(),
description: String::new(),
completed: false,
priority: 3,
};
let todo_list = TodoList {
tasks: vec![task],
version: 1,
};
serde_json::to_vec(&todo_list).unwrap_or_default()
}
pub fn create_large_todo_list() -> Vec<u8> {
const TARGET_SIZE: usize = 1024 * 1024; const APPROX_TASK_SIZE: usize = 200;
let num_tasks = TARGET_SIZE / APPROX_TASK_SIZE;
let tasks: Vec<Task> = (0..num_tasks)
.map(|i| Task {
id: i as u64,
title: format!("Task {} - Large state boundary test", i),
description: format!(
"This is task number {} in a large state test. \
It contains enough text to make the serialized size predictable.",
i
),
completed: i % 2 == 0,
priority: ((i % 5) + 1) as u8,
})
.collect();
let todo_list = TodoList { tasks, version: 1 };
serde_json::to_vec(&todo_list).unwrap_or_default()
}
pub fn create_oversized_todo_list() -> Vec<u8> {
const TARGET_SIZE: usize = 10 * 1024 * 1024; const APPROX_TASK_SIZE: usize = 200;
let num_tasks = TARGET_SIZE / APPROX_TASK_SIZE;
let tasks: Vec<Task> = (0..num_tasks)
.map(|i| Task {
id: i as u64,
title: format!("Oversized task {}", i),
description: "X".repeat(150), completed: false,
priority: 1,
})
.collect();
let todo_list = TodoList { tasks, version: 1 };
serde_json::to_vec(&todo_list).unwrap_or_default()
}
pub fn create_empty_delta_update() -> Vec<u8> {
let operation = TodoOperation::Remove(u64::MAX); serde_json::to_vec(&operation).unwrap_or_default()
}
pub fn create_minimal_state() -> Vec<u8> {
let todo_list = TodoList {
tasks: vec![],
version: 1,
};
serde_json::to_vec(&todo_list).unwrap_or_default()
}
pub fn create_max_tasks_todo_list(max_tasks: usize) -> Vec<u8> {
let tasks: Vec<Task> = (0..max_tasks)
.map(|i| Task {
id: i as u64,
title: format!("Task {}", i),
description: String::new(),
completed: false,
priority: 3,
})
.collect();
let todo_list = TodoList { tasks, version: 1 };
serde_json::to_vec(&todo_list).unwrap_or_default()
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn test_compile_contract() -> testresult::TestResult {
let contract = compile_contract("test-contract-integration")?;
assert!(!contract.is_empty());
Ok(())
}
#[test]
fn test_logger_basic() {
let _logger = TestLogger::new().with_pretty().with_level("info").init();
tracing::info!("Test log message");
tracing::warn!("Test warning");
}
#[test]
fn test_logger_json() {
let _logger = TestLogger::new().with_json().with_level("debug").init();
tracing::info!("JSON formatted message");
tracing::debug!("Debug message");
}
#[test]
fn test_logger_with_peer_id() {
let _logger = TestLogger::new().with_level("info").init();
let _span = tracing::info_span!("test_peer", test_node = "test-peer").entered();
tracing::info!("Message with peer ID");
}
#[test]
fn test_logger_capture() {
let logger = TestLogger::new().capture_logs().with_level("info").init();
tracing::info!("Captured message 1");
tracing::warn!("Captured message 2");
tracing::error!("Captured message 3");
assert!(logger.contains("Captured message 1"));
assert!(logger.contains("Captured message 2"));
assert!(logger.contains("Captured message 3"));
assert!(
logger.log_count() >= 3,
"Expected at least 3 log entries, got {}",
logger.log_count()
);
}
#[test]
fn test_logger_capture_with_json() {
let logger = TestLogger::new()
.with_json()
.capture_logs()
.with_level("info")
.init();
tracing::info!("JSON captured message");
assert!(logger.contains("JSON captured message"));
}
#[tokio::test]
async fn test_logger_async() {
let _logger = TestLogger::new().with_json().with_level("debug").init();
let _span = tracing::info_span!("test_peer", test_node = "async-peer").entered();
tracing::info!("Async test message");
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
tracing::debug!("After sleep");
}
#[test]
fn test_logger_json_with_span_fields() {
let logger = TestLogger::new()
.with_json()
.capture_logs()
.with_level("info")
.init();
let _span = tracing::info_span!("test_peer", test_node = "test-gateway").entered();
tracing::info!("Message from gateway");
let logs = logger.logs();
assert!(!logs.is_empty(), "Should have captured logs");
assert!(logs.iter().any(|log| log.contains("Message from gateway")));
let json_str = logs.join("\n");
assert!(
json_str.contains("test_peer") || json_str.contains("gateway"),
"Should contain span information"
);
}
#[test]
fn test_create_large_todo_list() {
let large_state = create_large_todo_list();
assert!(
large_state.len() > 900_000,
"Large state should be close to 1MB, got {} bytes",
large_state.len()
);
assert!(
large_state.len() < 1_200_000,
"Large state shouldn't exceed 1.2MB, got {} bytes",
large_state.len()
);
let parsed: Result<TodoList, _> = serde_json::from_slice(&large_state);
assert!(parsed.is_ok(), "Large state should be valid JSON");
}
#[test]
fn test_create_oversized_todo_list() {
let oversized_state = create_oversized_todo_list();
assert!(
oversized_state.len() > 10_000_000,
"Oversized state should exceed 10MB, got {} bytes",
oversized_state.len()
);
let parsed: Result<TodoList, _> = serde_json::from_slice(&oversized_state);
assert!(parsed.is_ok(), "Oversized state should still be valid JSON");
}
#[test]
fn test_create_minimal_state() {
let minimal = create_minimal_state();
assert!(
minimal.len() < 50,
"Minimal state should be small (got {} bytes)",
minimal.len()
);
let parsed: TodoList =
serde_json::from_slice(&minimal).expect("Minimal state should be valid TodoList JSON");
assert_eq!(parsed.tasks.len(), 0, "Should have no tasks");
assert_eq!(parsed.version, 1, "Should have version 1");
}
#[test]
fn test_create_empty_delta_update() {
let delta = create_empty_delta_update();
let parsed: Result<TodoOperation, _> = serde_json::from_slice(&delta);
assert!(parsed.is_ok(), "Empty delta should be valid TodoOperation");
}
#[test]
fn test_create_max_tasks_todo_list() {
const MAX_TASKS: usize = 1000;
let state = create_max_tasks_todo_list(MAX_TASKS);
let parsed: TodoList = serde_json::from_slice(&state).unwrap();
assert_eq!(
parsed.tasks.len(),
MAX_TASKS,
"Should have exactly {} tasks",
MAX_TASKS
);
}
#[test]
fn test_empty_todo_list_is_valid() {
let empty = create_empty_todo_list();
let parsed: TodoList = serde_json::from_slice(&empty).unwrap();
assert_eq!(parsed.tasks.len(), 0, "Empty list should have 0 tasks");
assert_eq!(parsed.version, 0, "Empty list should be version 0");
}
#[test]
fn test_todo_list_with_item_is_valid() {
let state = create_todo_list_with_item("Test task");
let parsed: TodoList = serde_json::from_slice(&state).unwrap();
assert_eq!(parsed.tasks.len(), 1, "Should have 1 task");
assert_eq!(parsed.tasks[0].title, "Test task");
assert_eq!(parsed.version, 1);
}
}
static RESERVED_PORTS: LazyLock<DashSet<u16>> = LazyLock::new(DashSet::new);
static RESERVED_SOCKETS: LazyLock<DashMap<u16, (std::net::UdpSocket, std::net::TcpListener)>> =
LazyLock::new(DashMap::new);
const NODE_INDEX_BLOCK: usize = 10_000;
thread_local! {
static GLOBAL_NODE_INDEX: std::cell::Cell<usize> = {
let idx = crate::config::GlobalRng::thread_index();
std::cell::Cell::new((idx as usize) * NODE_INDEX_BLOCK)
};
}
pub fn reset_global_node_index() {
let idx = crate::config::GlobalRng::thread_index();
GLOBAL_NODE_INDEX.with(|c| c.set((idx as usize) * NODE_INDEX_BLOCK));
}
pub fn allocate_test_node_block(node_count: usize) -> usize {
GLOBAL_NODE_INDEX.with(|c| {
let v = c.get();
c.set(v + node_count);
v
})
}
pub fn test_ip_for_node(node_idx: usize) -> std::net::Ipv4Addr {
let second_octet = ((node_idx / 254) % 254) + 1;
let third_octet = (node_idx % 254) + 1;
std::net::Ipv4Addr::new(127, second_octet as u8, third_octet as u8, 1)
}
pub fn reserve_local_port_on_ip(ip: std::net::Ipv4Addr) -> anyhow::Result<u16> {
const MAX_ATTEMPTS: usize = 128;
for _ in 0..MAX_ATTEMPTS {
let udp_socket = std::net::UdpSocket::bind((ip, 0))
.map_err(|e| anyhow::anyhow!("failed to bind ephemeral UDP port on {ip}: {e}"))?;
let port = udp_socket
.local_addr()
.map_err(|e| anyhow::anyhow!("failed to read ephemeral port address: {e}"))?
.port();
let tcp_listener = match std::net::TcpListener::bind((ip, port)) {
Ok(l) => l,
Err(_) => continue, };
if RESERVED_PORTS.insert(port) {
RESERVED_SOCKETS.insert(port, (udp_socket, tcp_listener));
return Ok(port);
}
}
Err(anyhow::anyhow!(
"failed to reserve a unique local port on {ip} after {MAX_ATTEMPTS} attempts"
))
}
pub fn reserve_local_port() -> anyhow::Result<u16> {
const MAX_ATTEMPTS: usize = 128;
for _ in 0..MAX_ATTEMPTS {
let udp_socket = std::net::UdpSocket::bind(("127.0.0.1", 0))
.map_err(|e| anyhow::anyhow!("failed to bind ephemeral UDP port: {e}"))?;
let port = udp_socket
.local_addr()
.map_err(|e| anyhow::anyhow!("failed to read ephemeral port address: {e}"))?
.port();
let tcp_listener = match std::net::TcpListener::bind(("127.0.0.1", port)) {
Ok(l) => l,
Err(_) => continue, };
if RESERVED_PORTS.insert(port) {
RESERVED_SOCKETS.insert(port, (udp_socket, tcp_listener));
return Ok(port);
}
}
Err(anyhow::anyhow!(
"failed to reserve a unique local port after {MAX_ATTEMPTS} attempts"
))
}
pub fn release_local_port(port: u16) {
RESERVED_PORTS.remove(&port);
RESERVED_SOCKETS.remove(&port);
}
pub fn take_reserved_tcp_listener(port: u16) -> Option<std::net::TcpListener> {
RESERVED_PORTS.remove(&port);
RESERVED_SOCKETS.remove(&port).map(|(_, (_udp, tcp))| tcp)
}
use std::collections::HashMap;
#[derive(Debug)]
pub struct NodeInfo {
pub label: String,
pub temp_dir_path: PathBuf,
pub ws_port: u16,
pub network_port: Option<u16>,
pub is_gateway: bool,
pub location: f64,
pub ip: std::net::Ipv4Addr,
pub origin_contracts: crate::server::client_api::OriginContractMap,
}
impl NodeInfo {
pub fn ws_url(&self) -> String {
format!(
"ws://{}:{}/v1/contract/command?encodingProtocol=native",
self.ip, self.ws_port
)
}
pub fn insert_origin_contract(
&self,
token: crate::client_events::AuthToken,
contract_id: freenet_stdlib::prelude::ContractInstanceId,
) {
use crate::client_events::ClientId;
use crate::server::client_api::OriginContract;
self.origin_contracts
.insert(token, OriginContract::new(contract_id, ClientId::FIRST));
}
pub async fn wait_until_ready(
&self,
timeout: std::time::Duration,
) -> anyhow::Result<std::time::Duration> {
use freenet_stdlib::client_api::{
ClientRequest, HostResponse, NodeDiagnosticsConfig, NodeQuery, QueryResponse, WebApi,
};
use std::time::Instant;
use tokio::time::sleep;
let start = Instant::now();
let mut attempt = 0;
let mut ws_ready = false;
let max_backoff = std::time::Duration::from_millis(500);
loop {
let elapsed = start.elapsed();
if elapsed >= timeout {
let phase = if ws_ready {
"WebSocket is up but peer has not joined the network"
} else {
"WebSocket API is not responding"
};
return Err(anyhow::anyhow!(
"Node '{}' did not become ready within {:?} (ws_port: {}, {})",
self.label,
timeout,
self.ws_port,
phase,
));
}
match tokio::time::timeout(
std::time::Duration::from_secs(2),
tokio_tungstenite::connect_async(&self.ws_url()),
)
.await
{
Ok(Ok((stream, _))) => {
let mut client = WebApi::start(stream);
if !ws_ready {
if client
.send(ClientRequest::NodeQueries(NodeQuery::NodeDiagnostics {
config: NodeDiagnosticsConfig {
include_node_info: false,
include_network_info: false,
include_subscriptions: false,
contract_keys: vec![],
include_system_metrics: false,
include_detailed_peer_info: false,
include_subscriber_peer_ids: false,
},
}))
.await
.is_ok()
{
if self.is_gateway {
tracing::debug!(
"Gateway '{}' ready after {:?} ({} attempts)",
self.label,
elapsed,
attempt + 1
);
return Ok(elapsed);
}
ws_ready = true;
tracing::debug!(
"Node '{}' WebSocket ready after {:?}, waiting for network join...",
self.label,
elapsed,
);
}
} else {
if client
.send(ClientRequest::NodeQueries(NodeQuery::ConnectedPeers))
.await
.is_ok()
{
match tokio::time::timeout(
std::time::Duration::from_secs(2),
client.recv(),
)
.await
{
Ok(Ok(HostResponse::QueryResponse(
QueryResponse::ConnectedPeers { peers },
))) if !peers.is_empty() => {
tracing::debug!(
"Node '{}' joined network after {:?} ({} attempts, {} peers)",
self.label,
elapsed,
attempt + 1,
peers.len(),
);
return Ok(elapsed);
}
_ => {
}
}
}
}
}
_ => {
}
}
attempt += 1;
let backoff = std::time::Duration::from_millis(50 * (1 << attempt.min(3)));
let backoff = backoff.min(max_backoff);
sleep(backoff).await;
}
}
}
pub type TestResult = anyhow::Result<()>;
pub struct TestContext {
nodes: HashMap<String, NodeInfo>,
node_order: Vec<String>,
flush_handles: HashMap<String, crate::tracing::EventFlushHandle>,
}
impl TestContext {
pub fn new(nodes: Vec<NodeInfo>) -> Self {
let node_order: Vec<String> = nodes.iter().map(|n| n.label.clone()).collect();
let nodes_map: HashMap<String, NodeInfo> =
nodes.into_iter().map(|n| (n.label.clone(), n)).collect();
Self {
nodes: nodes_map,
node_order,
flush_handles: HashMap::new(),
}
}
pub fn with_flush_handles(
nodes: Vec<NodeInfo>,
flush_handles: Vec<(String, crate::tracing::EventFlushHandle)>,
) -> Self {
let node_order: Vec<String> = nodes.iter().map(|n| n.label.clone()).collect();
let nodes_map: HashMap<String, NodeInfo> =
nodes.into_iter().map(|n| (n.label.clone(), n)).collect();
let flush_handles_map: HashMap<String, crate::tracing::EventFlushHandle> =
flush_handles.into_iter().collect();
Self {
nodes: nodes_map,
node_order,
flush_handles: flush_handles_map,
}
}
pub fn node(&self, label: &str) -> anyhow::Result<&NodeInfo> {
self.nodes
.get(label)
.ok_or_else(|| anyhow::anyhow!("Node '{}' not found", label))
}
pub fn gateway(&self) -> anyhow::Result<&NodeInfo> {
for label in &self.node_order {
if let Ok(node) = self.node(label) {
if node.is_gateway {
return Ok(node);
}
}
}
Err(anyhow::anyhow!("No gateway nodes found"))
}
pub fn gateways(&self) -> Vec<&NodeInfo> {
self.node_order
.iter()
.filter_map(|label| self.node(label).ok())
.filter(|node| node.is_gateway)
.collect()
}
pub fn peers(&self) -> Vec<&NodeInfo> {
self.node_order
.iter()
.filter_map(|label| self.node(label).ok())
.filter(|node| !node.is_gateway)
.collect()
}
pub fn event_log_path(&self, node_label: &str) -> anyhow::Result<PathBuf> {
let node = self.node(node_label)?;
Ok(node.temp_dir_path.join("_EVENT_LOG"))
}
pub fn node_labels(&self) -> &[String] {
&self.node_order
}
pub async fn aggregate_events(
&self,
) -> anyhow::Result<crate::tracing::EventLogAggregator<crate::tracing::AOFEventSource>> {
for (label, handle) in &self.flush_handles {
tracing::debug!("Flushing events for node: {}", label);
handle.flush().await;
}
let mut builder = TestAggregatorBuilder::new();
for label in &self.node_order {
let path = self.event_log_path(label)?;
builder = builder.add_node(label, path);
}
builder.build().await
}
pub async fn generate_failure_report(&self, error: &anyhow::Error) -> String {
use std::fmt::Write;
let mut report = String::new();
writeln!(&mut report, "\n{}", "=".repeat(80)).unwrap();
writeln!(&mut report, "TEST FAILURE REPORT").unwrap();
writeln!(&mut report, "{}", "=".repeat(80)).unwrap();
writeln!(&mut report, "\nError: {:#}", error).unwrap();
match self.aggregate_events().await {
Ok(aggregator) => {
writeln!(&mut report, "\n{}", "-".repeat(80)).unwrap();
writeln!(&mut report, "EVENT LOG SUMMARY").unwrap();
writeln!(&mut report, "{}", "-".repeat(80)).unwrap();
match aggregator.get_all_events().await {
Ok(events) => {
writeln!(&mut report, "\nTotal events: {}", events.len()).unwrap();
let mut by_peer: HashMap<String, Vec<_>> = HashMap::new();
for event in &events {
let peer_str = event.peer_id.to_string();
by_peer.entry(peer_str).or_default().push(event);
}
writeln!(&mut report, "\nEvents by peer:").unwrap();
for (peer_id, peer_events) in by_peer.iter() {
writeln!(
&mut report,
" {}: {} events",
&peer_id[..8.min(peer_id.len())], peer_events.len()
)
.unwrap();
}
writeln!(&mut report, "\nLast 10 events:").unwrap();
let last_events = events.iter().rev().take(10).collect::<Vec<_>>();
for (i, event) in last_events.iter().rev().enumerate() {
let peer_str = event.peer_id.to_string();
writeln!(
&mut report,
" {}. [{}] {} - {:?}",
i + 1,
&peer_str[..8.min(peer_str.len())],
event.datetime.format("%H:%M:%S%.3f"),
event.kind
)
.unwrap();
}
if !events.is_empty() {
match self
.generate_detailed_reports("test_failure", &aggregator)
.await
{
Ok(report_dir) => {
writeln!(&mut report, "\n📁 Detailed Reports Generated:")
.unwrap();
writeln!(
&mut report,
" 📄 Full event log: file://{}/events.md",
report_dir.display()
)
.unwrap();
writeln!(
&mut report,
" 📊 Event flow diagram: file://{}/event-flow.mmd",
report_dir.display()
)
.unwrap();
writeln!(
&mut report,
"\n💡 Tip: View diagram at https://mermaid.live or in VS Code"
)
.unwrap();
}
Err(e) => {
writeln!(
&mut report,
"\n⚠️ Failed to generate detailed reports: {}",
e
)
.unwrap();
}
}
}
}
Err(e) => {
writeln!(&mut report, "\nFailed to get events: {}", e).unwrap();
}
}
}
Err(e) => {
writeln!(&mut report, "\nFailed to aggregate events: {}", e).unwrap();
}
}
writeln!(&mut report, "\n{}", "=".repeat(80)).unwrap();
report
}
async fn generate_detailed_reports(
&self,
test_name: &str,
aggregator: &crate::tracing::EventLogAggregator<crate::tracing::AOFEventSource>,
) -> anyhow::Result<std::path::PathBuf> {
use std::fmt::Write as FmtWrite;
use std::io::Write as IoWrite;
let timestamp = chrono::Utc::now().format("%Y%m%d-%H%M%S");
let report_dir =
std::path::PathBuf::from(format!("/tmp/freenet-test-{}-{}", test_name, timestamp));
std::fs::create_dir_all(&report_dir)?;
let events = aggregator.get_all_events().await?;
let mut events_md = String::new();
writeln!(&mut events_md, "# Detailed Event Log: {}\n", test_name)?;
writeln!(&mut events_md, "**Generated**: {}", chrono::Utc::now())?;
writeln!(&mut events_md, "**Total Events**: {}\n", events.len())?;
if !events.is_empty() {
writeln!(&mut events_md, "## Events by Timestamp\n")?;
let start_time = events.first().unwrap().datetime;
for event in &events {
let elapsed = (event.datetime - start_time).num_milliseconds();
let (icon, type_name) = match &event.kind {
crate::tracing::EventKind::Connect(..) => ("🔗", "Connect"),
crate::tracing::EventKind::Put(..) => ("📤", "Put"),
crate::tracing::EventKind::Get(..) => ("📥", "Get"),
crate::tracing::EventKind::Route(..) => ("🔀", "Route"),
crate::tracing::EventKind::Update(..) => ("🔄", "Update"),
crate::tracing::EventKind::Subscribe(..) => ("🔔", "Subscribe"),
crate::tracing::EventKind::Transfer(..) => ("📡", "Transfer"),
crate::tracing::EventKind::Lifecycle(..) => ("🚀", "Lifecycle"),
crate::tracing::EventKind::Disconnected { .. } => ("❌", "Disconnect"),
crate::tracing::EventKind::Timeout { .. } => ("⏱️", "Timeout"),
crate::tracing::EventKind::Ignored => ("⏭️", "Ignored"),
crate::tracing::EventKind::TransportSnapshot(..) => ("📊", "TransportSnapshot"),
crate::tracing::EventKind::InterestSync(..) => ("🔃", "InterestSync"),
crate::tracing::EventKind::RoutingDecision(..) => ("🎯", "RoutingDecision"),
crate::tracing::EventKind::RouterSnapshot(..) => ("📸", "RouterSnapshot"),
};
writeln!(
&mut events_md,
"### {} {} - [{:>6}ms]\n",
icon, type_name, elapsed
)?;
writeln!(&mut events_md, "- **Peer ID**: `{}`", event.peer_id)?;
writeln!(&mut events_md, "- **Transaction**: `{}`", event.tx)?;
writeln!(&mut events_md, "- **Timestamp**: {}", event.datetime)?;
writeln!(
&mut events_md,
"\n**Event Details**:\n```rust\n{:#?}\n```\n",
event.kind
)?;
}
}
let events_md_path = report_dir.join("events.md");
let mut events_file = std::fs::File::create(&events_md_path)?;
events_file.write_all(events_md.as_bytes())?;
let mut mermaid = String::from("```mermaid\ngraph TD\n");
mermaid.push_str(" %% Event Flow Diagram\n");
let mut prev_id: Option<String> = None;
for (idx, event) in events.iter().enumerate().take(50) {
let node_id = format!("N{}", idx);
let peer_short = &event.peer_id.to_string()[..8.min(event.peer_id.to_string().len())];
let (icon, type_name) = match &event.kind {
crate::tracing::EventKind::Connect(..) => ("🔗", "Connect"),
crate::tracing::EventKind::Put(..) => ("📤", "Put"),
crate::tracing::EventKind::Get(..) => ("📥", "Get"),
crate::tracing::EventKind::Route(..) => ("🔀", "Route"),
crate::tracing::EventKind::Update(..) => ("🔄", "Update"),
crate::tracing::EventKind::Subscribe(..) => ("🔔", "Subscribe"),
crate::tracing::EventKind::Transfer(..) => ("📡", "Transfer"),
crate::tracing::EventKind::Lifecycle(..) => ("🚀", "Lifecycle"),
crate::tracing::EventKind::Disconnected { .. } => ("❌", "Disconnect"),
crate::tracing::EventKind::Timeout { .. } => ("⏱️", "Timeout"),
crate::tracing::EventKind::Ignored => ("⏭️", "Ignored"),
crate::tracing::EventKind::TransportSnapshot(..) => ("📊", "TransportSnapshot"),
crate::tracing::EventKind::InterestSync(..) => ("🔃", "InterestSync"),
crate::tracing::EventKind::RoutingDecision(..) => ("🎯", "RoutingDecision"),
crate::tracing::EventKind::RouterSnapshot(..) => ("📸", "RouterSnapshot"),
};
writeln!(
&mut mermaid,
" {}[\"{} {}\\n{}\"]",
node_id, peer_short, icon, type_name
)?;
if let Some(prev) = prev_id {
writeln!(&mut mermaid, " {} --> {}", prev, node_id)?;
}
prev_id = Some(node_id);
}
if events.len() > 50 {
writeln!(
&mut mermaid,
" NMore[\"... and {} more events\"]",
events.len() - 50
)?;
if let Some(prev) = prev_id {
writeln!(&mut mermaid, " {} -.-> NMore", prev)?;
}
}
mermaid.push_str("```\n");
let mermaid_path = report_dir.join("event-flow.mmd");
let mut mermaid_file = std::fs::File::create(&mermaid_path)?;
mermaid_file.write_all(mermaid.as_bytes())?;
Ok(report_dir)
}
pub async fn generate_success_summary(&self) -> String {
use std::fmt::Write;
let mut report = String::new();
writeln!(&mut report, "\n{}", "=".repeat(80)).unwrap();
writeln!(&mut report, "TEST SUCCESS SUMMARY").unwrap();
writeln!(&mut report, "{}", "=".repeat(80)).unwrap();
match self.aggregate_events().await {
Ok(aggregator) => match aggregator.get_all_events().await {
Ok(mut events) => {
writeln!(&mut report, "\n📊 Event Statistics:").unwrap();
writeln!(&mut report, " Total events: {}", events.len()).unwrap();
let mut by_type: HashMap<String, usize> = HashMap::new();
for event in &events {
let type_name = match &event.kind {
crate::tracing::EventKind::Connect(..) => "Connect",
crate::tracing::EventKind::Put(..) => "Put",
crate::tracing::EventKind::Get(..) => "Get",
crate::tracing::EventKind::Route(..) => "Route",
crate::tracing::EventKind::Update(..) => "Update",
crate::tracing::EventKind::Subscribe(..) => "Subscribe",
crate::tracing::EventKind::Transfer(..) => "Transfer",
crate::tracing::EventKind::Lifecycle(..) => "Lifecycle",
crate::tracing::EventKind::Disconnected { .. } => "Disconnect",
crate::tracing::EventKind::Timeout { .. } => "Timeout",
crate::tracing::EventKind::TransportSnapshot(..) => "TransportSnapshot",
crate::tracing::EventKind::InterestSync(..) => "InterestSync",
crate::tracing::EventKind::Ignored => "Ignored",
crate::tracing::EventKind::RoutingDecision(..) => "RoutingDecision",
crate::tracing::EventKind::RouterSnapshot(..) => "RouterSnapshot",
};
*by_type.entry(type_name.to_string()).or_default() += 1;
}
writeln!(&mut report, "\n By type:").unwrap();
for (event_type, count) in by_type.iter() {
writeln!(&mut report, " {}: {}", event_type, count).unwrap();
}
let mut by_peer: HashMap<String, Vec<_>> = HashMap::new();
for event in &events {
let peer_str = event.peer_id.to_string();
by_peer.entry(peer_str).or_default().push(event);
}
writeln!(&mut report, "\n By peer:").unwrap();
for (peer_id, peer_events) in by_peer.iter() {
writeln!(
&mut report,
" {}: {} events",
&peer_id[..8.min(peer_id.len())],
peer_events.len()
)
.unwrap();
}
events.sort_by_key(|e| e.datetime);
writeln!(&mut report, "\n📅 Event Timeline:").unwrap();
let start_time = events
.first()
.map(|e| e.datetime)
.unwrap_or_else(chrono::Utc::now);
for event in &events {
let elapsed = (event.datetime - start_time).num_milliseconds();
let peer_short =
&event.peer_id.to_string()[..8.min(event.peer_id.to_string().len())];
let (icon, _type_name) = match &event.kind {
crate::tracing::EventKind::Connect(..) => ("🔗", "Connect"),
crate::tracing::EventKind::Put(..) => ("📤", "Put"),
crate::tracing::EventKind::Get(..) => ("📥", "Get"),
crate::tracing::EventKind::Route(..) => ("🔀", "Route"),
crate::tracing::EventKind::Update(..) => ("🔄", "Update"),
crate::tracing::EventKind::Subscribe(..) => ("🔔", "Subscribe"),
crate::tracing::EventKind::Transfer(..) => ("📡", "Transfer"),
crate::tracing::EventKind::Lifecycle(..) => ("🚀", "Lifecycle"),
crate::tracing::EventKind::Disconnected { .. } => ("❌", "Disconnect"),
crate::tracing::EventKind::Timeout { .. } => ("⏱️", "Timeout"),
crate::tracing::EventKind::TransportSnapshot(..) => {
("📈", "TransportSnapshot")
}
crate::tracing::EventKind::InterestSync(..) => ("🔃", "InterestSync"),
crate::tracing::EventKind::RoutingDecision(..) => {
("🎯", "RoutingDecision")
}
crate::tracing::EventKind::RouterSnapshot(..) => {
("📸", "RouterSnapshot")
}
crate::tracing::EventKind::Ignored => ("⏭️", "Ignored"),
};
writeln!(
&mut report,
" [{:>6}ms] {} {} {}",
elapsed,
peer_short,
icon,
format!("{:?}", event.kind)
.chars()
.take(60)
.collect::<String>()
)
.unwrap();
}
if !events.is_empty() {
match self
.generate_detailed_reports("test_success", &aggregator)
.await
{
Ok(report_dir) => {
writeln!(&mut report, "\n📁 Detailed Reports Generated:").unwrap();
writeln!(
&mut report,
" 📄 Full event log: file://{}/events.md",
report_dir.display()
)
.unwrap();
writeln!(
&mut report,
" 📊 Event flow diagram: file://{}/event-flow.mmd",
report_dir.display()
)
.unwrap();
writeln!(
&mut report,
"\n💡 Tip: View diagram at https://mermaid.live or in VS Code"
)
.unwrap();
}
Err(e) => {
writeln!(
&mut report,
"\n⚠️ Failed to generate detailed reports: {}",
e
)
.unwrap();
}
}
}
}
Err(e) => {
writeln!(&mut report, "\n❌ Failed to get events: {}", e).unwrap();
}
},
Err(e) => {
writeln!(&mut report, "\n❌ Failed to aggregate events: {}", e).unwrap();
}
}
writeln!(&mut report, "\n{}", "=".repeat(80)).unwrap();
report
}
pub async fn wait_for_all_nodes_ready(
&self,
timeout_per_node: std::time::Duration,
) -> anyhow::Result<std::time::Duration> {
use std::time::Instant;
let start = Instant::now();
let mut failed_nodes = Vec::new();
tracing::info!(
"Waiting for {} nodes to become ready...",
self.node_order.len()
);
let results: Vec<_> =
futures::future::join_all(self.node_order.iter().map(|label| async move {
let node = self.nodes.get(label).unwrap();
match node.wait_until_ready(timeout_per_node).await {
Ok(duration) => Ok((label.clone(), duration)),
Err(e) => Err((label.clone(), e)),
}
}))
.await;
for result in results {
match result {
Ok((label, duration)) => {
tracing::debug!("Node '{}' ready in {:?}", label, duration);
}
Err((label, err)) => {
failed_nodes.push((label, err));
}
}
}
if !failed_nodes.is_empty() {
let mut error_msg = format!("{} node(s) failed to become ready:\n", failed_nodes.len());
for (label, err) in failed_nodes {
error_msg.push_str(&format!(" - {}: {}\n", label, err));
}
return Err(anyhow::anyhow!(error_msg));
}
let total_time = start.elapsed();
tracing::info!("All nodes ready in {:?}", total_time);
Ok(total_time)
}
}
impl Drop for TestContext {
fn drop(&mut self) {
for node in self.nodes.values() {
release_local_port(node.ws_port);
if let Some(port) = node.network_port {
release_local_port(port);
}
}
}
}
pub mod event_aggregator_utils {
use crate::tracing::EventLogAggregator;
use anyhow::Result;
use std::path::PathBuf;
#[derive(Debug, Clone)]
pub struct NodeLogInfo {
pub label: String,
pub event_log_path: PathBuf,
}
impl NodeLogInfo {
pub fn new(label: impl Into<String>, event_log_path: PathBuf) -> Self {
Self {
label: label.into(),
event_log_path,
}
}
}
pub struct TestAggregatorBuilder {
nodes: Vec<NodeLogInfo>,
}
impl TestAggregatorBuilder {
pub fn new() -> Self {
Self { nodes: Vec::new() }
}
pub fn add_node(mut self, label: impl Into<String>, event_log_path: PathBuf) -> Self {
self.nodes.push(NodeLogInfo::new(label, event_log_path));
self
}
pub fn add_nodes_from_configs(mut self, configs: Vec<(String, PathBuf)>) -> Self {
for (label, config_dir) in configs {
let event_log = config_dir.join("event_log");
let local_log = config_dir.join("_EVENT_LOG_LOCAL");
let log_path = if event_log.exists() {
event_log
} else if local_log.exists() {
local_log
} else {
tracing::warn!(
"No event log found for {} in {:?}, using event_log path",
label,
config_dir
);
event_log
};
self.nodes.push(NodeLogInfo::new(label, log_path));
}
self
}
pub async fn build(self) -> Result<EventLogAggregator<crate::tracing::AOFEventSource>> {
let sources = self
.nodes
.into_iter()
.map(|node| (node.event_log_path, Some(node.label)))
.collect();
EventLogAggregator::from_aof_files(sources).await
}
}
impl Default for TestAggregatorBuilder {
fn default() -> Self {
Self::new()
}
}
}
pub use event_aggregator_utils::{NodeLogInfo, TestAggregatorBuilder};