#![allow(clippy::disallowed_types, clippy::disallowed_methods)]
use backon::{ExponentialBuilder, Retryable};
use log::warn;
use rand::rngs::StdRng;
use rand::{Rng, SeedableRng};
use slatedb::admin;
use slatedb::config::{
CompactorOptions, DurabilityLevel, FlushOptions, FlushType, PutOptions, ReadOptions, Settings,
SizeTieredCompactionSchedulerOptions, WriteOptions,
};
use slatedb::fail_parallel::{self, FailPointRegistry};
use slatedb::object_store::memory::InMemory;
use slatedb::object_store::ObjectStore;
use slatedb::size_tiered_compaction::SizeTieredCompactionSchedulerSupplier;
use slatedb::{CompactorBuilder, Db};
use std::collections::HashMap;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Duration;
use tokio_util::sync::CancellationToken;
use tracing_subscriber::fmt::format::FmtSpan;
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn test_replay_wal_then_write() {
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let path = "/tmp/test_replay_wal_then_write";
let fp_registry = Arc::new(FailPointRegistry::new());
let settings = Settings {
flush_interval: None,
l0_sst_size_bytes: 64,
..Default::default()
};
fail_parallel::cfg(fp_registry.clone(), "flush-memtable-to-l0", "return").unwrap();
let db = Db::builder(path, object_store.clone())
.with_settings(settings.clone())
.with_fp_registry(fp_registry.clone())
.build()
.await
.expect("failed to open db");
for i in 0..20 {
let key = format!("key{:04}", i);
let value = format!("value{:04}", i);
db.put_with_options(
key.as_bytes(),
value.as_bytes(),
&PutOptions::default(),
&WriteOptions {
await_durable: false,
},
)
.await
.expect("failed to put");
}
db.flush_with_options(FlushOptions {
flush_type: FlushType::Wal,
})
.await
.expect("failed to flush WAL");
db.close().await.expect("failed to close db");
fail_parallel::cfg(fp_registry.clone(), "flush-memtable-to-l0", "pause").unwrap();
let db = Db::builder(path, object_store.clone())
.with_settings(settings)
.with_fp_registry(fp_registry.clone())
.build()
.await
.expect("failed to reopen db");
db.put_with_options(
b"new_key",
b"new_value",
&PutOptions::default(),
&WriteOptions {
await_durable: false,
},
)
.await
.expect("put after replay should succeed");
fail_parallel::cfg(fp_registry.clone(), "flush-memtable-to-l0", "off").unwrap();
db.close().await.expect("failed to close db");
}
use tracing_subscriber::EnvFilter;
#[tokio::test(flavor = "multi_thread", worker_threads = 8)]
async fn test_concurrent_writers_and_readers() {
let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("debug"));
tracing_subscriber::fmt()
.with_env_filter(filter)
.with_span_events(FmtSpan::NEW | FmtSpan::CLOSE)
.with_test_writer()
.init();
let num_writers: usize = std::env::var("SLATEDB_TEST_NUM_WRITERS")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(10);
let num_readers: usize = std::env::var("SLATEDB_TEST_NUM_READERS")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(2);
let writes_per_task: usize = std::env::var("SLATEDB_TEST_WRITES_PER_TASK")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(100);
let key_length: usize = std::env::var("SLATEDB_TEST_KEY_LENGTH")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(256);
fn zero_pad_key(key: u64, length: usize) -> Vec<u8> {
let mut padded_key = vec![0; length];
padded_key[0..8].copy_from_slice(&key.to_le_bytes());
padded_key
}
let object_store: Arc<dyn ObjectStore> = if let Ok(provider) = std::env::var("CLOUD_PROVIDER") {
log::info!("Using object store from env (CLOUD_PROVIDER={})", provider);
admin::load_object_store_from_env(None).expect("failed to load object store from env")
} else {
Arc::new(InMemory::new()) as Arc<dyn ObjectStore>
};
let compactor_options = CompactorOptions {
poll_interval: Duration::from_millis(100),
scheduler_options: SizeTieredCompactionSchedulerOptions {
min_compaction_sources: 1,
..Default::default()
}
.into(),
..Default::default()
};
let config = Settings::from_env_with_default(
"SLATEDB_TEST_",
Settings {
flush_interval: Some(Duration::from_millis(100)),
manifest_poll_interval: Duration::from_millis(100),
manifest_update_timeout: Duration::from_secs(300),
max_unflushed_bytes: 16 * 1024,
min_filter_keys: 0,
l0_sst_size_bytes: 4 * 4096,
compactor_options: None,
..Default::default()
},
)
.expect("failed to load db settings from environment");
let supplier = Arc::new(SizeTieredCompactionSchedulerSupplier::new());
let retry_builder = ExponentialBuilder::default()
.without_max_times()
.with_min_delay(Duration::from_millis(1))
.with_max_delay(Duration::from_millis(1));
let db = Arc::new(
(|| async {
let ts = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos();
let db_path = format!("/tmp/test_concurrent_writers_readers_{}", ts);
Db::builder(db_path.clone(), object_store.clone())
.with_settings(config.clone())
.with_compactor_builder(
CompactorBuilder::new(db_path, object_store.clone())
.with_options(compactor_options.clone())
.with_scheduler_supplier(supplier.clone()),
)
.build()
.await
})
.retry(retry_builder)
.notify(|err, dur| {
warn!("retrying error {:?} with sleeping {:?}", err, dur);
})
.await
.expect("Failed to build DB after retries"),
);
let reader_cancellation_token = CancellationToken::new();
let writer_handles = (0..num_writers)
.map(|writer_id| {
let db = db.clone();
tokio::spawn(async move {
let key = zero_pad_key(writer_id.try_into().unwrap(), key_length);
for i in 1..=writes_per_task {
db.put_with_options(
&key,
i.to_be_bytes().as_ref(),
&PutOptions::default(),
&WriteOptions {
await_durable: false,
},
)
.await
.expect("Failed to write value");
if i % 10 == 0 {
log::info!("wrote values [writer_id={}, write_count={}]", writer_id, i);
}
}
db.flush().await.expect("Failed to flush");
})
})
.collect::<Vec<_>>();
let reader_handles = (0..num_readers)
.map(|reader_id| {
let db = db.clone();
let reader_cancellation_token = reader_cancellation_token.clone();
tokio::spawn(async move {
let mut latest_values = HashMap::<usize, AtomicU64>::new();
let mut iterations = 0;
let mut rng = StdRng::from_os_rng();
while !reader_cancellation_token.is_cancelled() {
let key = rng.random_range(0..num_writers);
if let Some(bytes) = db
.get_with_options(
zero_pad_key(key.try_into().unwrap(), key_length),
&ReadOptions::new().with_durability_filter(DurabilityLevel::Remote),
).await?
{
let value_bytes: [u8; 8] = bytes.as_ref().try_into().expect("invalid byte conversion");
let current_value = u64::from_be_bytes(value_bytes);
let last_seen_atomic = latest_values
.entry(key)
.or_insert(AtomicU64::new(current_value));
let last_seen_value = last_seen_atomic.load(Ordering::SeqCst);
assert!(
current_value >= last_seen_value,
"Value {} is less than last seen value {}",
current_value,
last_seen_value
);
last_seen_atomic
.compare_exchange(
last_seen_value,
current_value,
Ordering::SeqCst,
Ordering::SeqCst,
)
.unwrap();
iterations += 1;
if current_value != last_seen_value {
log::info!(
"progress [reader_id={}, iterations={}, sample_key={}, last_seen_value={}, current_value={}]",
reader_id,
iterations,
key,
last_seen_value,
current_value
);
}
} else {
assert!(
!latest_values.contains_key(&key),
"No key {} in DB, but found in latest_values",
key,
);
}
}
Ok::<(), slatedb::Error>(())
})
})
.collect::<Vec<_>>();
futures::future::try_join_all(writer_handles)
.await
.expect("Writer handles failed");
reader_cancellation_token.cancel();
futures::future::try_join_all(reader_handles)
.await
.expect("Reader handles failed");
(|| async { db.close().await })
.retry(retry_builder)
.notify(|err, dur| {
warn!("retrying error {:?} with sleeping {:?}", err, dur);
})
.await
.expect("Failed to close DB after retries");
}