use freenet_stdlib::prelude::*;
use crate::client_events::ClientId;
use crate::contract::executor::mock_wasm_runtime::MockWasmRuntime;
use crate::contract::executor::{
ContractExecutor, Executor, MAX_SUBSCRIBERS_PER_CONTRACT, MAX_SUBSCRIPTIONS_PER_CLIENT,
SUBSCRIBER_NOTIFICATION_CHANNEL_SIZE,
};
use crate::wasm_runtime::MockStateStorage;
async fn create_executor() -> Executor<MockWasmRuntime, MockStateStorage> {
let storage = MockStateStorage::new();
Executor::new_mock_wasm("subscriber_limit_test", storage, None, None, None)
.await
.expect("create executor")
}
fn test_contract(seed: &[u8]) -> ContractContainer {
crate::contract::executor::mock_runtime::test::create_test_contract(seed)
}
async fn store_contract(
executor: &mut Executor<MockWasmRuntime, MockStateStorage>,
seed: &[u8],
) -> ContractKey {
let contract = test_contract(seed);
let key = contract.key();
let state = WrappedState::new(vec![1]);
executor
.upsert_contract_state(
key,
either::Either::Left(state),
RelatedContracts::default(),
Some(contract),
)
.await
.expect("store contract");
key
}
#[tokio::test(flavor = "current_thread")]
async fn test_per_contract_subscriber_limit_enforced() {
let mut executor = create_executor().await;
let key = store_contract(&mut executor, b"sub_limit_test").await;
let instance_id = *key.id();
let mut receivers = Vec::new();
for _ in 0..MAX_SUBSCRIBERS_PER_CONTRACT {
let client_id = ClientId::next();
let (tx, rx) = tokio::sync::mpsc::channel(SUBSCRIBER_NOTIFICATION_CHANNEL_SIZE);
executor
.register_contract_notifier(instance_id, client_id, tx, None)
.expect("registration should succeed within limit");
receivers.push(rx);
}
let extra_client = ClientId::next();
let (tx, _rx) = tokio::sync::mpsc::channel(SUBSCRIBER_NOTIFICATION_CHANNEL_SIZE);
let result = executor.register_contract_notifier(instance_id, extra_client, tx, None);
assert!(
result.is_err(),
"Registration beyond MAX_SUBSCRIBERS_PER_CONTRACT must fail"
);
let err = result.unwrap_err();
let err_msg = err.to_string();
assert!(
err_msg.contains("subscriber limit"),
"Error should mention subscriber limit, got: {err_msg}"
);
}
#[tokio::test(flavor = "current_thread")]
async fn test_per_client_subscription_limit_enforced() {
let mut executor = create_executor().await;
let client_id = ClientId::next();
let mut receivers = Vec::new();
for i in 0..MAX_SUBSCRIPTIONS_PER_CLIENT {
let seed = format!("client_limit_test_{i}");
let key = store_contract(&mut executor, seed.as_bytes()).await;
let instance_id = *key.id();
let (tx, rx) = tokio::sync::mpsc::channel(SUBSCRIBER_NOTIFICATION_CHANNEL_SIZE);
executor
.register_contract_notifier(instance_id, client_id, tx, None)
.expect("registration should succeed within per-client limit");
receivers.push(rx);
}
let extra_key = store_contract(&mut executor, b"client_limit_extra").await;
let (tx, _rx) = tokio::sync::mpsc::channel(SUBSCRIBER_NOTIFICATION_CHANNEL_SIZE);
let result = executor.register_contract_notifier(*extra_key.id(), client_id, tx, None);
assert!(
result.is_err(),
"Registration beyond MAX_SUBSCRIPTIONS_PER_CLIENT must fail"
);
let err = result.unwrap_err();
let err_msg = err.to_string();
assert!(
err_msg.contains("per-client subscription limit"),
"Error should mention per-client limit, got: {err_msg}"
);
}
#[tokio::test(flavor = "current_thread")]
async fn test_per_client_limit_does_not_affect_other_clients() {
let mut executor = create_executor().await;
let saturated_client = ClientId::next();
let other_client = ClientId::next();
let mut receivers = Vec::new();
for i in 0..MAX_SUBSCRIPTIONS_PER_CLIENT {
let seed = format!("other_client_test_{i}");
let key = store_contract(&mut executor, seed.as_bytes()).await;
let (tx, rx) = tokio::sync::mpsc::channel(SUBSCRIBER_NOTIFICATION_CHANNEL_SIZE);
executor
.register_contract_notifier(*key.id(), saturated_client, tx, None)
.expect("registration should succeed");
receivers.push(rx);
}
let key = store_contract(&mut executor, b"other_client_contract").await;
let (tx, rx) = tokio::sync::mpsc::channel(SUBSCRIBER_NOTIFICATION_CHANNEL_SIZE);
executor
.register_contract_notifier(*key.id(), other_client, tx, None)
.expect("other client should not be affected by first client's limit");
receivers.push(rx);
}
#[tokio::test(flavor = "current_thread")]
async fn test_sorted_insert_maintains_order() {
let mut executor = create_executor().await;
let key = store_contract(&mut executor, b"sorted_insert_test").await;
let instance_id = *key.id();
let clients: Vec<ClientId> = (0..10).map(|_| ClientId::next()).collect();
let mut receivers = Vec::new();
for &client_id in clients.iter().rev() {
let (tx, rx) = tokio::sync::mpsc::channel(SUBSCRIBER_NOTIFICATION_CHANNEL_SIZE);
executor
.register_contract_notifier(instance_id, client_id, tx, None)
.expect("registration should succeed");
receivers.push(rx);
}
let subs = executor.get_subscription_info();
let contract_client_ids: Vec<ClientId> = subs
.iter()
.filter(|info| info.instance_id == instance_id)
.map(|info| info.client_id)
.collect();
assert_eq!(contract_client_ids.len(), 10, "Should have 10 subscribers");
let mut sorted = contract_client_ids.clone();
sorted.sort();
assert_eq!(
contract_client_ids, sorted,
"Client IDs should be in sorted order from the internal storage"
);
}
#[tokio::test(flavor = "current_thread")]
async fn test_reconnection_updates_channel_not_count() {
let mut executor = create_executor().await;
let key = store_contract(&mut executor, b"reconnect_test").await;
let instance_id = *key.id();
let client_id = ClientId::next();
let (tx1, _rx1) = tokio::sync::mpsc::channel(SUBSCRIBER_NOTIFICATION_CHANNEL_SIZE);
executor
.register_contract_notifier(instance_id, client_id, tx1, None)
.expect("first registration should succeed");
let (tx2, _rx2) = tokio::sync::mpsc::channel(SUBSCRIBER_NOTIFICATION_CHANNEL_SIZE);
executor
.register_contract_notifier(instance_id, client_id, tx2, None)
.expect("reconnection should succeed");
let subs = executor.get_subscription_info();
let contract_sub_count = subs
.iter()
.filter(|info| info.instance_id == instance_id)
.count();
assert_eq!(
contract_sub_count, 1,
"Reconnection should update channel, not add duplicate"
);
}
#[tokio::test(flavor = "current_thread")]
async fn test_reconnection_does_not_inflate_client_count() {
let mut executor = create_executor().await;
let client_id = ClientId::next();
let key = store_contract(&mut executor, b"reconnect_count_test").await;
let (tx1, _rx1) = tokio::sync::mpsc::channel(SUBSCRIBER_NOTIFICATION_CHANNEL_SIZE);
executor
.register_contract_notifier(*key.id(), client_id, tx1, None)
.expect("registration should succeed");
for _ in 0..5 {
let (tx, _rx) = tokio::sync::mpsc::channel(SUBSCRIBER_NOTIFICATION_CHANNEL_SIZE);
executor
.register_contract_notifier(*key.id(), client_id, tx, None)
.expect("reconnection should succeed");
}
let mut receivers = Vec::new();
for i in 1..MAX_SUBSCRIPTIONS_PER_CLIENT {
let seed = format!("reconnect_count_extra_{i}");
let extra_key = store_contract(&mut executor, seed.as_bytes()).await;
let (tx, rx) = tokio::sync::mpsc::channel(SUBSCRIBER_NOTIFICATION_CHANNEL_SIZE);
executor
.register_contract_notifier(*extra_key.id(), client_id, tx, None)
.unwrap_or_else(|e| {
panic!("Registration {i} should succeed (reconnections didn't inflate count): {e}")
});
receivers.push(rx);
}
}