use std::sync::Arc;
use std::time::Duration;
use tidepool_rpc::cache::CacheStore;
use tidepool_rpc::cnft::{CnftStore, SqliteCnftStore, TreeInfo};
use tidepool_rpc::das::types::{DasAsset, DasContent, DasMetadata};
use tidepool_rpc::sqlite_backend::SqliteBackend;
use tidepool_rpc::sqlite_cache::SqliteCache;
use tidepool_rpc::webhooks::{SqliteWebhookRegistry, WebhookInput, WebhookRegistry};
const WRITERS_PER_STORE: usize = 20;
const WRITES_PER_WRITER: usize = 50;
fn asset(id: &str) -> DasAsset {
DasAsset {
id: id.into(),
interface: "FungibleToken".into(),
content: DasContent {
metadata: DasMetadata {
name: format!("Asset {id}"),
..Default::default()
},
..Default::default()
},
..Default::default()
}
}
fn tree(seed: u16) -> TreeInfo {
let mut bytes = [0u8; 32];
bytes[0] = seed as u8;
bytes[1] = (seed >> 8) as u8;
TreeInfo {
tree: bytes,
depth: 20,
max_buffer_size: 64,
num_minted: 0,
}
}
fn webhook_input(idx: usize) -> WebhookInput {
WebhookInput {
webhook_url: Some(format!("https://example.com/hook/{idx}")),
account_addresses: Some(vec!["ADDR_A".into()]),
transaction_types: vec![],
txn_status: None,
webhook_type: None,
auth_header: None,
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn concurrent_writes_across_three_stores_dont_corrupt() {
let backend = SqliteBackend::open(":memory:").expect("open sqlite");
let cache: Arc<SqliteCache> = Arc::new(SqliteCache::new(&backend));
let cnft: Arc<SqliteCnftStore> = Arc::new(SqliteCnftStore::new(&backend));
let webhooks: Arc<SqliteWebhookRegistry> = Arc::new(
SqliteWebhookRegistry::new(&backend)
.await
.expect("webhook registry"),
);
let mut joins = Vec::new();
for writer in 0..WRITERS_PER_STORE {
let cache = Arc::clone(&cache);
joins.push(tokio::spawn(async move {
for i in 0..WRITES_PER_WRITER {
let id = format!("ASSET_{writer:03}_{i:03}");
cache.put_asset(asset(&id)).await.expect("put_asset");
}
}));
}
for writer in 0..WRITERS_PER_STORE {
let cnft = Arc::clone(&cnft);
joins.push(tokio::spawn(async move {
for i in 0..WRITES_PER_WRITER {
let seed = u16::try_from(writer * WRITES_PER_WRITER + i).unwrap();
cnft.put_tree(tree(seed)).await.expect("put_tree");
}
}));
}
for writer in 0..WRITERS_PER_STORE {
let webhooks = Arc::clone(&webhooks);
joins.push(tokio::spawn(async move {
for i in 0..WRITES_PER_WRITER {
let input = webhook_input(writer * WRITES_PER_WRITER + i);
webhooks.create(input).await.expect("create webhook");
}
}));
}
tokio::time::timeout(Duration::from_secs(15), async {
for j in joins {
j.await.expect("writer task");
}
})
.await
.expect("writers completed within timeout");
let expected = WRITERS_PER_STORE * WRITES_PER_WRITER;
let probe_first = cache
.get_asset("ASSET_000_000")
.await
.expect("get_asset")
.expect("first asset present");
assert_eq!(probe_first.id, "ASSET_000_000");
let last_id = format!(
"ASSET_{:03}_{:03}",
WRITERS_PER_STORE - 1,
WRITES_PER_WRITER - 1
);
let probe_last = cache
.get_asset(&last_id)
.await
.expect("get_asset")
.expect("last asset present");
assert_eq!(probe_last.id, last_id);
let seed_first = tree(0).tree;
let seed_last = tree(u16::try_from(expected - 1).unwrap()).tree;
assert!(cnft
.get_tree(&seed_first)
.await
.expect("get_tree")
.is_some());
assert!(cnft.get_tree(&seed_last).await.expect("get_tree").is_some());
let all = webhooks.list().await.expect("list webhooks");
assert_eq!(
all.len(),
expected,
"webhook count mismatch: expected {expected}, got {}",
all.len()
);
let distinct_urls: std::collections::HashSet<_> =
all.iter().map(|w| w.webhook_url.clone()).collect();
assert_eq!(distinct_urls.len(), expected);
}