use {
super::common::*,
crate::{
Ident,
database::{
chunk::RecordWriter,
PartitionKey,
},
scheduler::lanes::*,
},
macro_rules_attribute::apply,
std::{
sync::{
Arc,
atomic::{
AtomicBool,
Ordering,
},
},
time::Duration,
},
};
fn partition_key() -> Ident {
TestPartition::KEY
}
#[apply(smol_macros::test!)]
#[test_log::test]
async fn test_task_writes_single_record() {
let (scheduler, _conn) = test_scheduler();
let task_executed = Arc::new(AtomicBool::new(false));
let task_exec = task_executed.clone();
let sort_key = "record_1".to_string();
let test_value = "hello world".to_string();
let test_count = 42;
scheduler.queue(
move |_ctx| {
let sk = sort_key.clone();
let val = test_value.clone();
let cnt = test_count;
let exec = task_exec.clone();
async move {
exec.store(true, Ordering::SeqCst);
let mut builder = RecordWriter::new(Ident::new("task_1"));
builder.insert::<TestPartition, _>(sk, TestRecordData::new(val, cnt));
Some(builder)
}
},
DEFAULT_LANE,
);
scheduler.spawn_workers();
assert!(
wait_for(
|| task_executed.load(Ordering::SeqCst),
Duration::from_secs(5)
)
.await,
"Task did not execute"
);
let verify_executed = Arc::new(AtomicBool::new(false));
let verify_success = Arc::new(AtomicBool::new(false));
let verify_exec = verify_executed.clone();
let verify_ok = verify_success.clone();
scheduler.queue(
move |mut ctx| {
let exec = verify_exec.clone();
let success = verify_ok.clone();
async move {
let query_client = ctx.query_client();
exec.store(true, Ordering::SeqCst);
let results = query_client
.get_record(partition_key(), "record_1".to_string())
.await;
if !results.is_empty()
&& results.len() == 1
&& let Some(TestRecord::Test(record)) = results.get(&results.records()[0])
&& record.value == "hello world"
&& record.count == 42
{
success.store(true, Ordering::SeqCst);
}
None
}
},
DEFAULT_LANE,
);
assert!(
wait_for(
|| verify_success.load(Ordering::SeqCst),
Duration::from_secs(5)
)
.await,
"Verify task did not find expected record"
);
}
#[apply(smol_macros::test!)]
#[test_log::test]
async fn test_task_writes_multiple_records() {
let (scheduler, _conn) = test_scheduler();
let task_executed = Arc::new(AtomicBool::new(false));
let task_exec = task_executed.clone();
let num_records = 5;
scheduler.queue(
move |_ctx| {
let exec = task_exec.clone();
async move {
exec.store(true, Ordering::SeqCst);
let mut builder =
RecordWriter::new(Ident::new("task_multi"));
for i in 0..num_records {
builder.insert::<TestPartition, _>(
format!("record_{}", i),
TestRecordData::new(format!("value_{}", i), i),
);
}
Some(builder)
}
},
DEFAULT_LANE,
);
scheduler.spawn_workers();
assert!(
wait_for(
|| task_executed.load(Ordering::SeqCst),
Duration::from_secs(5)
)
.await,
"Task did not execute"
);
let verify_executed = Arc::new(AtomicBool::new(false));
let verify_success = Arc::new(AtomicBool::new(false));
let verify_exec = verify_executed.clone();
let verify_ok = verify_success.clone();
scheduler.queue(
move |mut ctx| {
let exec = verify_exec.clone();
let success = verify_ok.clone();
async move {
let query_client = ctx.query_client();
exec.store(true, Ordering::SeqCst);
let mut all_correct = true;
for i in 0..num_records {
let results = query_client
.get_record(partition_key(), format!("record_{}", i))
.await;
if results.is_empty() {
all_correct = false;
break;
}
if let Some(TestRecord::Test(record)) = results.get(&results.records()[0]) {
if record.value != format!("value_{}", i) || record.count != i {
all_correct = false;
break;
}
} else {
all_correct = false;
break;
}
}
if all_correct {
success.store(true, Ordering::SeqCst);
}
None
}
},
DEFAULT_LANE,
);
assert!(
wait_for(
|| verify_success.load(Ordering::SeqCst),
Duration::from_secs(5)
)
.await,
"Verify task did not find all expected records"
);
}
#[apply(smol_macros::test!)]
#[test_log::test]
async fn test_parent_and_child_write_different_records() {
let (scheduler, _conn) = test_scheduler();
let parent_executed = Arc::new(AtomicBool::new(false));
let child_executed = Arc::new(AtomicBool::new(false));
let parent_exec = parent_executed.clone();
let child_exec = child_executed.clone();
scheduler.queue(
move |ctx| {
let exec = parent_exec.clone();
let child_exec_inner = child_exec.clone();
ctx.spawn_task(
move |_child_ctx| {
async move {
child_exec_inner.store(true, Ordering::SeqCst);
let mut builder =
RecordWriter::new(Ident::new("child_task"));
builder.insert::<TestPartition, _>(
"child_record".to_string(),
TestRecordData::new("child_value".to_string(), 100),
);
Some(builder)
}
},
DEFAULT_LANE,
);
async move {
exec.store(true, Ordering::SeqCst);
let mut builder =
RecordWriter::new(Ident::new("parent_task"));
builder.insert::<TestPartition, _>(
"parent_record".to_string(),
TestRecordData::new("parent_value".to_string(), 200),
);
Some(builder)
}
},
DEFAULT_LANE,
);
scheduler.spawn_workers();
assert!(
wait_for(
|| parent_executed.load(Ordering::SeqCst),
Duration::from_secs(5)
)
.await,
"Parent task did not execute"
);
assert!(
wait_for(
|| child_executed.load(Ordering::SeqCst),
Duration::from_secs(5)
)
.await,
"Child task did not execute"
);
let verify_executed = Arc::new(AtomicBool::new(false));
let verify_success = Arc::new(AtomicBool::new(false));
let verify_exec = verify_executed.clone();
let verify_ok = verify_success.clone();
scheduler.queue(
move |mut ctx| {
let exec = verify_exec.clone();
let success = verify_ok.clone();
async move {
let query_client = ctx.query_client();
exec.store(true, Ordering::SeqCst);
let parent_results = query_client
.get_record(
partition_key(),
"parent_record".to_string(),
)
.await;
let child_results = query_client
.get_record(partition_key(), "child_record".to_string())
.await;
let mut both_correct = false;
if !parent_results.is_empty()
&& !child_results.is_empty()
&& let (Some(TestRecord::Test(parent)), Some(TestRecord::Test(child))) = (
parent_results.get(&parent_results.records()[0]),
child_results.get(&child_results.records()[0]),
)
&& parent.value == "parent_value"
&& parent.count == 200
&& child.value == "child_value"
&& child.count == 100
{
both_correct = true;
}
if both_correct {
success.store(true, Ordering::SeqCst);
}
None
}
},
DEFAULT_LANE,
);
assert!(
wait_for(
|| verify_success.load(Ordering::SeqCst),
Duration::from_secs(5)
)
.await,
"Verify task did not find expected records"
);
}
#[apply(smol_macros::test!)]
#[test_log::test]
async fn test_query_within_task() {
use crate::database::query::QueryClient;
let (scheduler, _conn) = test_scheduler();
let first_task_executed = Arc::new(AtomicBool::new(false));
let second_task_found_record = Arc::new(AtomicBool::new(false));
let first_exec = first_task_executed.clone();
let second_found = second_task_found_record.clone();
scheduler.queue(
move |_ctx| {
let exec = first_exec.clone();
async move {
exec.store(true, Ordering::SeqCst);
let mut builder =
RecordWriter::new(Ident::new("first_task"));
builder.insert::<TestPartition, _>(
"shared_record".to_string(),
TestRecordData::new("shared_value".to_string(), 777),
);
Some(builder)
}
},
DEFAULT_LANE,
);
scheduler.spawn_workers();
assert!(
wait_for(
|| first_task_executed.load(Ordering::SeqCst),
Duration::from_secs(5)
)
.await,
"First task did not execute"
);
let scheduler_clone = scheduler.clone();
scheduler.queue(
move |_ctx| {
let found = second_found.clone();
let db = scheduler_clone.db.clone();
async move {
let mut query_client = QueryClient::new(db);
let results = query_client
.get_record(
partition_key(),
"shared_record".to_string(),
)
.await;
if !results.records.is_empty() {
let record_meta = &results.records[0];
if let Some(TestRecord::Test(record)) = results.get(record_meta)
&& record.value == "shared_value"
&& record.count == 777
{
found.store(true, Ordering::SeqCst);
}
}
None
}
},
DEFAULT_LANE,
);
assert!(
wait_for(
|| second_task_found_record.load(Ordering::SeqCst),
Duration::from_secs(5)
)
.await,
"Second task did not find the record written by first task"
);
}
#[apply(smol_macros::test!)]
#[test_log::test]
async fn test_snapshot_isolation() {
let (scheduler, _conn) = test_scheduler();
let first_executed = Arc::new(AtomicBool::new(false));
let second_executed = Arc::new(AtomicBool::new(false));
let first_exec = first_executed.clone();
let second_exec = second_executed.clone();
scheduler.queue(
move |_ctx| {
let exec = first_exec.clone();
async move {
exec.store(true, Ordering::SeqCst);
let mut builder =
RecordWriter::new(Ident::new("initial_task"));
builder.insert::<TestPartition, _>(
"initial_record".to_string(),
TestRecordData::new("initial_value".to_string(), 111),
);
Some(builder)
}
},
DEFAULT_LANE,
);
scheduler.spawn_workers();
assert!(
wait_for(
|| first_executed.load(Ordering::SeqCst),
Duration::from_secs(5)
)
.await,
"First task did not execute"
);
scheduler.queue(
move |_ctx| {
let exec = second_exec.clone();
async move {
exec.store(true, Ordering::SeqCst);
let mut builder =
RecordWriter::new(Ident::new("snapshot_task"));
builder.insert::<TestPartition, _>(
"new_record".to_string(),
TestRecordData::new("new_value".to_string(), 999),
);
Some(builder)
}
},
DEFAULT_LANE,
);
assert!(
wait_for(
|| second_executed.load(Ordering::SeqCst),
Duration::from_secs(5)
)
.await,
"Second task did not execute"
);
let verify_success = Arc::new(AtomicBool::new(false));
let verify_ok = verify_success.clone();
scheduler.queue(
move |mut ctx| {
let success = verify_ok.clone();
async move {
let query_client = ctx.query_client();
let initial_results = query_client
.get_record(partition_key(), "initial_record".to_string())
.await;
let new_results = query_client
.get_record(partition_key(), "new_record".to_string())
.await;
if !initial_results.is_empty() && !new_results.is_empty() {
success.store(true, Ordering::SeqCst);
}
None
}
},
DEFAULT_LANE,
);
assert!(
wait_for(
|| verify_success.load(Ordering::SeqCst),
Duration::from_secs(5)
)
.await,
"Should have both records committed to database"
);
}