#![cfg(feature = "cortex")]
use futures::StreamExt;
use net::adapter::net::channel::ChannelName;
use net::adapter::net::cortex::tasks::{OrderBy, TaskStatus, TasksAdapter, TASKS_CHANNEL};
use net::adapter::net::cortex::{
compute_checksum, compute_checksum_with_meta, EventMeta, WaitForTokenError, EVENT_META_SIZE,
};
use net::adapter::net::redex::Redex;
#[cfg(feature = "redex-disk")]
use net::adapter::net::redex::RedexFileConfig;
use net::adapter::net::redex::WriteToken;
const ORIGIN: u64 = 0xABCD_EF01;
fn now_ns() -> u64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos() as u64
}
#[tokio::test]
async fn test_full_task_lifecycle() {
let redex = Redex::new();
let tasks = TasksAdapter::open(&redex, ORIGIN).await.unwrap();
let t0 = now_ns();
let _ = tasks.create(1, "write docs", t0).unwrap();
let _ = tasks.create(2, "ship adapter", t0 + 1).unwrap();
let _ = tasks.rename(1, "write better docs", t0 + 2).unwrap();
let seq = tasks.complete(2, t0 + 3).unwrap();
tasks.wait_for_seq(seq).await.unwrap();
let state = tasks.state();
let guard = state.read();
assert_eq!(guard.len(), 2);
let t1 = guard.get(1).unwrap();
assert_eq!(t1.title, "write better docs");
assert_eq!(t1.status, TaskStatus::Pending);
assert_eq!(t1.created_ns, t0);
assert_eq!(t1.updated_ns, t0 + 2);
let t2 = guard.get(2).unwrap();
assert_eq!(t2.title, "ship adapter");
assert_eq!(t2.status, TaskStatus::Completed);
assert_eq!(t2.updated_ns, t0 + 3);
assert_eq!(guard.pending().count(), 1);
assert_eq!(guard.completed().count(), 1);
}
#[tokio::test]
async fn test_delete_removes_task() {
let redex = Redex::new();
let tasks = TasksAdapter::open(&redex, ORIGIN).await.unwrap();
tasks.create(1, "temp", 100).unwrap();
let seq = tasks.delete(1).unwrap();
tasks.wait_for_seq(seq).await.unwrap();
let state = tasks.state();
let guard = state.read();
assert!(guard.is_empty());
assert!(guard.get(1).is_none());
}
#[tokio::test]
async fn test_rename_on_unknown_id_is_noop() {
let redex = Redex::new();
let tasks = TasksAdapter::open(&redex, ORIGIN).await.unwrap();
let seq = tasks.rename(42, "ghost", 100).unwrap();
tasks.wait_for_seq(seq).await.unwrap();
let state = tasks.state();
let guard = state.read();
assert!(guard.is_empty());
}
#[tokio::test]
async fn test_complete_on_unknown_id_is_noop() {
let redex = Redex::new();
let tasks = TasksAdapter::open(&redex, ORIGIN).await.unwrap();
let seq = tasks.complete(99, 100).unwrap();
tasks.wait_for_seq(seq).await.unwrap();
let state = tasks.state();
let guard = state.read();
assert!(guard.is_empty());
}
#[tokio::test]
async fn test_replay_after_close_reconstructs_state() {
let redex = Redex::new();
{
let tasks = TasksAdapter::open(&redex, ORIGIN).await.unwrap();
tasks.create(1, "a", 100).unwrap();
tasks.create(2, "b", 101).unwrap();
tasks.complete(1, 102).unwrap();
let seq = tasks.rename(2, "b-renamed", 103).unwrap();
tasks.wait_for_seq(seq).await.unwrap();
tasks.close().unwrap();
}
let tasks2 = TasksAdapter::open(&redex, ORIGIN).await.unwrap();
tasks2.wait_for_seq(3).await.unwrap();
let state = tasks2.state();
let guard = state.read();
assert_eq!(guard.len(), 2);
assert_eq!(guard.get(1).unwrap().status, TaskStatus::Completed);
assert_eq!(guard.get(2).unwrap().title, "b-renamed");
assert_eq!(guard.get(2).unwrap().status, TaskStatus::Pending);
}
#[tokio::test]
async fn test_multi_producer_same_file_different_origins() {
let redex = Redex::new();
let a = TasksAdapter::open(&redex, 0x0000_0001).await.unwrap();
let b = TasksAdapter::open(&redex, 0x0000_0002).await.unwrap();
a.create(1, "from-a", 100).unwrap();
let seq = b.create(2, "from-b", 101).unwrap();
a.wait_for_seq(seq).await.unwrap();
b.wait_for_seq(seq).await.unwrap();
let state_a = a.state();
let state_b = b.state();
let ga = state_a.read();
let gb = state_b.read();
assert_eq!(ga.len(), 2);
assert_eq!(gb.len(), 2);
}
#[tokio::test]
async fn test_pending_and_completed_queries() {
let redex = Redex::new();
let tasks = TasksAdapter::open(&redex, ORIGIN).await.unwrap();
for i in 1..=10u64 {
tasks.create(i, format!("task-{}", i), 100 + i).unwrap();
}
for i in (2..=10u64).step_by(2) {
tasks.complete(i, 200 + i).unwrap();
}
let last = tasks.complete(10, 9999).unwrap(); tasks.wait_for_seq(last).await.unwrap();
let state = tasks.state();
let guard = state.read();
assert_eq!(guard.len(), 10);
let mut pending_ids: Vec<_> = guard.pending().map(|t| t.id).collect();
pending_ids.sort();
assert_eq!(pending_ids, vec![1, 3, 5, 7, 9]);
let mut completed_ids: Vec<_> = guard.completed().map(|t| t.id).collect();
completed_ids.sort();
assert_eq!(completed_ids, vec![2, 4, 6, 8, 10]);
}
#[tokio::test]
async fn test_query_through_live_adapter() {
let redex = Redex::new();
let tasks = TasksAdapter::open(&redex, ORIGIN).await.unwrap();
for (id, title, now) in [
(1u64, "alpha", 1000u64),
(2, "beta", 2000),
(3, "gamma", 3000),
(4, "delta", 4000),
(5, "epsilon", 5000),
] {
tasks.create(id, title, now).unwrap();
}
tasks.complete(2, 2500).unwrap();
tasks.complete(4, 4500).unwrap();
let last = tasks.rename(5, "EPSILON", 5500).unwrap();
tasks.wait_for_seq(last).await.unwrap();
let state = tasks.state();
let guard = state.read();
let mut pending_ids: Vec<_> = guard
.query()
.where_status(TaskStatus::Pending)
.collect()
.iter()
.map(|t| t.id)
.collect();
pending_ids.sort();
assert_eq!(pending_ids, vec![1, 3, 5]);
let top = guard
.query()
.where_status(TaskStatus::Completed)
.order_by(OrderBy::UpdatedDesc)
.first()
.unwrap();
assert_eq!(top.id, 4);
let match_title: Vec<_> = guard
.query()
.title_contains("PSI")
.collect()
.iter()
.map(|t| t.id)
.collect();
assert_eq!(match_title, vec![5]);
let mut recent_pending: Vec<_> = guard
.query()
.created_after(2500)
.where_status(TaskStatus::Pending)
.collect()
.iter()
.map(|t| t.id)
.collect();
recent_pending.sort();
assert_eq!(recent_pending, vec![3, 5]);
assert!(!guard.query().title_contains("does-not-exist").exists());
assert!(guard.query().where_status(TaskStatus::Pending).exists());
}
#[tokio::test]
async fn time_filter_cutoff_is_inclusive() {
let redex = Redex::new();
let tasks = TasksAdapter::open(&redex, ORIGIN).await.unwrap();
tasks.create(1, "before", 1000).unwrap();
tasks.create(2, "at-cutoff", 2000).unwrap();
let last = tasks.create(3, "after", 3000).unwrap();
tasks.wait_for_seq(last).await.unwrap();
let state = tasks.state();
let guard = state.read();
let mut after_2000: Vec<u64> = guard
.query()
.created_after(2000)
.collect()
.iter()
.map(|t| t.id)
.collect();
after_2000.sort();
assert_eq!(
after_2000,
vec![2, 3],
"created_after must include the cutoff itself (inclusive)"
);
let mut before_2000: Vec<u64> = guard
.query()
.created_before(2000)
.collect()
.iter()
.map(|t| t.id)
.collect();
before_2000.sort();
assert_eq!(
before_2000,
vec![1, 2],
"created_before must include the cutoff itself (inclusive)"
);
}
#[tokio::test]
async fn test_watch_initial_emission() {
let redex = Redex::new();
let tasks = TasksAdapter::open(&redex, ORIGIN).await.unwrap();
tasks.create(1, "a", 100).unwrap();
tasks.create(2, "b", 200).unwrap();
let seq = tasks.complete(2, 250).unwrap();
tasks.wait_for_seq(seq).await.unwrap();
let mut stream = Box::pin(
tasks
.watch()
.where_status(TaskStatus::Pending)
.order_by(OrderBy::IdAsc)
.stream(),
);
let initial = stream.next().await.unwrap();
assert_eq!(initial.len(), 1);
assert_eq!(initial[0].id, 1);
}
#[tokio::test]
async fn test_watch_emits_on_relevant_change() {
let redex = Redex::new();
let tasks = TasksAdapter::open(&redex, ORIGIN).await.unwrap();
let mut stream = Box::pin(
tasks
.watch()
.where_status(TaskStatus::Pending)
.order_by(OrderBy::IdAsc)
.stream(),
);
let initial = stream.next().await.unwrap();
assert!(initial.is_empty());
tasks.create(1, "first", 100).unwrap();
let next = stream.next().await.unwrap();
assert_eq!(next.len(), 1);
assert_eq!(next[0].id, 1);
tasks.create(2, "second", 200).unwrap();
let next = stream.next().await.unwrap();
assert_eq!(next.len(), 2);
assert_eq!(next[0].id, 1);
assert_eq!(next[1].id, 2);
tasks.complete(1, 300).unwrap();
let next = stream.next().await.unwrap();
assert_eq!(next.len(), 1);
assert_eq!(next[0].id, 2);
}
#[tokio::test]
async fn test_watch_dedupes_unchanged_results() {
let redex = Redex::new();
let tasks = TasksAdapter::open(&redex, ORIGIN).await.unwrap();
tasks.create(1, "p", 100).unwrap();
tasks.create(2, "c", 200).unwrap();
let seq = tasks.complete(2, 250).unwrap();
tasks.wait_for_seq(seq).await.unwrap();
let mut stream = Box::pin(tasks.watch().where_status(TaskStatus::Pending).stream());
let initial = stream.next().await.unwrap();
assert_eq!(initial.len(), 1);
tasks.complete(2, 9999).unwrap();
let seq = tasks.rename(2, "c-renamed", 9999).unwrap();
tasks.wait_for_seq(seq).await.unwrap();
tasks.create(3, "p2", 300).unwrap();
let next = stream.next().await.unwrap();
let ids: Vec<_> = next.iter().map(|t| t.id).collect();
assert!(ids.contains(&1));
assert!(ids.contains(&3));
assert_eq!(ids.len(), 2);
}
#[tokio::test]
async fn test_watch_multiple_subscribers_independent() {
let redex = Redex::new();
let tasks = TasksAdapter::open(&redex, ORIGIN).await.unwrap();
let mut pending_stream = Box::pin(tasks.watch().where_status(TaskStatus::Pending).stream());
let mut completed_stream = Box::pin(tasks.watch().where_status(TaskStatus::Completed).stream());
assert!(pending_stream.next().await.unwrap().is_empty());
assert!(completed_stream.next().await.unwrap().is_empty());
tasks.create(1, "x", 100).unwrap();
let p = pending_stream.next().await.unwrap();
assert_eq!(p.len(), 1);
tasks.complete(1, 200).unwrap();
let p = pending_stream.next().await.unwrap();
assert!(p.is_empty());
let c = completed_stream.next().await.unwrap();
assert_eq!(c.len(), 1);
assert_eq!(c[0].id, 1);
}
#[tokio::test]
async fn test_watch_with_limit_and_order() {
let redex = Redex::new();
let tasks = TasksAdapter::open(&redex, ORIGIN).await.unwrap();
let mut stream = Box::pin(
tasks
.watch()
.where_status(TaskStatus::Pending)
.order_by(OrderBy::CreatedDesc)
.limit(2)
.stream(),
);
assert!(stream.next().await.unwrap().is_empty());
for id in 1..=5u64 {
tasks.create(id, format!("t-{}", id), 100 * id).unwrap();
}
let mut last: Vec<_> = Vec::new();
for _ in 0..5 {
last = stream.next().await.unwrap();
if last.len() == 2 && last[0].id == 5 && last[1].id == 4 {
break;
}
}
assert_eq!(last.len(), 2);
assert_eq!(last[0].id, 5);
assert_eq!(last[1].id, 4);
}
#[tokio::test]
async fn test_regression_open_from_snapshot_rejects_u64_max_last_seq() {
let redex = Redex::new();
let tasks = TasksAdapter::open(&redex, ORIGIN).await.unwrap();
let (state_bytes, _) = tasks.snapshot().unwrap();
tasks.close().unwrap();
let redex2 = Redex::new();
let result =
TasksAdapter::open_from_snapshot(&redex2, ORIGIN, &state_bytes, Some(u64::MAX)).await;
assert!(result.is_err(), "u64::MAX last_seq must be rejected");
let msg = format!("{}", result.unwrap_err());
assert!(
msg.contains("u64::MAX"),
"error should mention u64::MAX overflow; got: {}",
msg
);
}
#[tokio::test]
async fn test_regression_fold_rejects_checksum_mismatch() {
use bytes::Bytes;
use net::adapter::net::cortex::tasks::{TasksFold, TasksState};
use net::adapter::net::cortex::{
CortexAdapter, CortexAdapterConfig, EventEnvelope, FoldErrorPolicy, StartPosition,
};
let redex = Redex::new();
let cfg = CortexAdapterConfig {
start: StartPosition::FromBeginning,
on_fold_error: FoldErrorPolicy::Stop,
..Default::default()
};
let adapter = CortexAdapter::<TasksState>::open(
&redex,
&ChannelName::new(TASKS_CHANNEL).unwrap(),
Default::default(),
cfg,
TasksFold,
TasksState::new(),
)
.unwrap();
let tail = b"any bytes would have matched some xxh3 except this one".to_vec();
let wrong_checksum = compute_checksum(&tail).wrapping_add(1);
let wrong_meta = EventMeta::new(
0x01, 0,
ORIGIN,
0,
wrong_checksum,
);
let seq = adapter
.ingest(EventEnvelope::new(wrong_meta, Bytes::from(tail)))
.unwrap();
adapter.wait_for_seq(seq).await.unwrap();
assert!(
adapter.is_running(),
"fold task must continue after checksum mismatch — \
decode errors are recoverable under Stop policy"
);
assert_eq!(
adapter.fold_errors(),
1,
"the bad event must be counted in fold_errors"
);
let state = adapter.state();
let guard = state.read();
assert!(
guard.get(1).is_none(),
"checksum-mismatched event must NOT have folded into state"
);
}
#[tokio::test]
async fn test_regression_open_from_snapshot_bumps_app_seq_past_replayed_events() {
let redex = Redex::new();
let tasks = TasksAdapter::open(&redex, ORIGIN).await.unwrap();
tasks.create(1, "a", 100).unwrap();
let seq1 = tasks.create(2, "b", 200).unwrap();
tasks.wait_for_seq(seq1).await.unwrap();
let (state_bytes, last_seq) = tasks.snapshot().unwrap();
assert_eq!(last_seq, Some(1), "snapshot must capture seqs 0..=1");
tasks.create(3, "c", 300).unwrap();
let seq3 = tasks.create(4, "d", 400).unwrap();
tasks.wait_for_seq(seq3).await.unwrap();
tasks.close().unwrap();
let restored = TasksAdapter::open_from_snapshot(&redex, ORIGIN, &state_bytes, last_seq)
.await
.unwrap();
let new_seq = restored.create(5, "e", 500).unwrap();
restored.wait_for_seq(new_seq).await.unwrap();
let file = redex
.open_file(
&ChannelName::new(TASKS_CHANNEL).unwrap(),
Default::default(),
)
.unwrap();
let events = file.read_range(new_seq, new_seq + 1);
assert_eq!(events.len(), 1);
let meta = EventMeta::from_bytes(&events[0].payload[..EVENT_META_SIZE]).unwrap();
assert_eq!(
meta.seq_or_ts, 4,
"post-restore ingest must continue past replayed events' seq_or_ts (got {}, expected 4)",
meta.seq_or_ts
);
}
#[tokio::test]
async fn test_regression_snapshot_restore_preserves_app_seq_monotonicity() {
let redex = Redex::new();
let tasks = TasksAdapter::open(&redex, ORIGIN).await.unwrap();
tasks.create(1, "a", 100).unwrap();
tasks.create(2, "b", 200).unwrap();
let seq = tasks.create(3, "c", 300).unwrap();
tasks.wait_for_seq(seq).await.unwrap();
let (state_bytes, last_seq) = tasks.snapshot().unwrap();
tasks.close().unwrap();
let redex2 = Redex::new();
let tasks2 = TasksAdapter::open_from_snapshot(&redex2, ORIGIN, &state_bytes, last_seq)
.await
.unwrap();
let new_seq = tasks2.create(4, "d", 400).unwrap();
tasks2.wait_for_seq(new_seq).await.unwrap();
let file = redex2
.open_file(
&ChannelName::new(TASKS_CHANNEL).unwrap(),
Default::default(),
)
.unwrap();
let events = file.read_range(0, 1);
assert_eq!(events.len(), 1, "first post-restore event must be present");
let meta = EventMeta::from_bytes(&events[0].payload[..EVENT_META_SIZE]).unwrap();
assert_eq!(
meta.seq_or_ts, 3,
"post-restore app_seq must continue from pre-snapshot value, not reset to 0"
);
}
#[tokio::test]
async fn test_open_returns_with_state_already_caught_up() {
let redex = Redex::new();
{
let a = TasksAdapter::open(&redex, ORIGIN).await.unwrap();
a.create(1, "first", 100).unwrap();
a.create(2, "second", 200).unwrap();
let seq = a.create(3, "third", 300).unwrap();
a.wait_for_seq(seq).await.unwrap();
a.close().unwrap();
}
let b = TasksAdapter::open(&redex, ORIGIN).await.unwrap();
let state = b.state();
let guard = state.read();
assert_eq!(
guard.len(),
3,
"post-open state must be fully caught up — saw {} tasks, expected 3",
guard.len(),
);
}
#[tokio::test]
async fn test_open_on_empty_redex_does_not_block() {
let redex = Redex::new();
let result = tokio::time::timeout(
std::time::Duration::from_secs(2),
TasksAdapter::open(&redex, ORIGIN),
)
.await;
assert!(
matches!(result, Ok(Ok(_))),
"open() on an empty Redex must complete promptly; got {result:?}",
);
}
#[tokio::test]
async fn test_open_from_snapshot_with_empty_replay_tail_keeps_snapshot_app_seq() {
let redex = Redex::new();
let tasks = TasksAdapter::open(&redex, ORIGIN).await.unwrap();
tasks.create(1, "a", 100).unwrap();
tasks.create(2, "b", 200).unwrap();
let seq = tasks.create(3, "c", 300).unwrap();
tasks.wait_for_seq(seq).await.unwrap();
let (state_bytes, last_seq) = tasks.snapshot().unwrap();
tasks.close().unwrap();
let redex2 = Redex::new();
let restored = TasksAdapter::open_from_snapshot(&redex2, ORIGIN, &state_bytes, last_seq)
.await
.unwrap();
let new_seq = restored.create(4, "d", 400).unwrap();
restored.wait_for_seq(new_seq).await.unwrap();
let file = redex2
.open_file(
&ChannelName::new(TASKS_CHANNEL).unwrap(),
Default::default(),
)
.unwrap();
let events = file.read_range(new_seq, new_seq + 1);
let meta = EventMeta::from_bytes(&events[0].payload[..EVENT_META_SIZE]).unwrap();
assert_eq!(
meta.seq_or_ts, 3,
"post-restore counter must continue from snapshot's persisted app_seq (got {}, expected 3)",
meta.seq_or_ts,
);
}
#[tokio::test]
async fn test_regression_open_advances_app_seq_past_existing_same_origin_events() {
let redex = Redex::new();
{
let a = TasksAdapter::open(&redex, ORIGIN).await.unwrap();
a.create(1, "first", 100).unwrap();
a.create(2, "second", 200).unwrap();
let seq = a.create(3, "third", 300).unwrap();
a.wait_for_seq(seq).await.unwrap();
a.close().unwrap();
}
let b = TasksAdapter::open(&redex, ORIGIN).await.unwrap();
let new_seq = b.create(4, "fourth", 400).unwrap();
b.wait_for_seq(new_seq).await.unwrap();
let file = redex
.open_file(
&ChannelName::new(TASKS_CHANNEL).unwrap(),
Default::default(),
)
.unwrap();
let events = file.read_range(new_seq, new_seq + 1);
assert_eq!(events.len(), 1);
let meta = EventMeta::from_bytes(&events[0].payload[..EVENT_META_SIZE]).unwrap();
assert_eq!(
meta.seq_or_ts, 3,
"first ingest after reopen-on-existing-log must continue past replayed events' \
seq_or_ts (got {}, expected 3)",
meta.seq_or_ts,
);
}
#[tokio::test]
async fn test_regression_open_ignores_other_origins_when_advancing_app_seq() {
let redex = Redex::new();
const ORIGIN_A: u64 = 0x0000_00AA;
const ORIGIN_B: u64 = 0x0000_00BB;
{
let b = TasksAdapter::open(&redex, ORIGIN_B).await.unwrap();
b.create(10, "b1", 100).unwrap();
b.create(11, "b2", 200).unwrap();
let seq = b.create(12, "b3", 300).unwrap();
b.wait_for_seq(seq).await.unwrap();
b.close().unwrap();
}
let a = TasksAdapter::open(&redex, ORIGIN_A).await.unwrap();
let new_seq = a.create(20, "a1", 400).unwrap();
a.wait_for_seq(new_seq).await.unwrap();
let file = redex
.open_file(
&ChannelName::new(TASKS_CHANNEL).unwrap(),
Default::default(),
)
.unwrap();
let events = file.read_range(new_seq, new_seq + 1);
let meta = EventMeta::from_bytes(&events[0].payload[..EVENT_META_SIZE]).unwrap();
assert_eq!(
meta.seq_or_ts, 0,
"origin A's first ingest must not be polluted by origin B's seq_or_ts values \
(got {}, expected 0)",
meta.seq_or_ts,
);
assert_eq!(meta.origin_hash, ORIGIN_A);
}
#[tokio::test]
async fn test_regression_checksum_is_computed_not_zero() {
let redex = Redex::new();
let tasks = TasksAdapter::open(&redex, ORIGIN).await.unwrap();
let seq = tasks.create(42, "distinctive title", 12345).unwrap();
tasks.wait_for_seq(seq).await.unwrap();
let file = redex
.open_file(
&ChannelName::new(TASKS_CHANNEL).unwrap(),
Default::default(),
)
.unwrap();
let events = file.read_range(0, 1);
assert_eq!(events.len(), 1, "tasks channel should have one event");
let payload = &events[0].payload;
let meta = EventMeta::from_bytes(&payload[..EVENT_META_SIZE]).expect("decode meta");
let tail = &payload[EVENT_META_SIZE..];
assert_ne!(meta.checksum, 0, "checksum must not be hardcoded to 0");
assert_eq!(
meta.checksum,
compute_checksum_with_meta(&meta, tail),
"meta.checksum must match the v2 header-covering hash",
);
}
#[tokio::test]
async fn test_regression_watch_without_order_by_is_stable() {
let redex = Redex::new();
let tasks = TasksAdapter::open(&redex, ORIGIN).await.unwrap();
const N: u64 = 64;
let mut last = 0;
for id in 1..=N {
last = tasks.create(id, format!("t-{}", id), id * 100).unwrap();
}
tasks.wait_for_seq(last).await.unwrap();
let mut stream = Box::pin(tasks.watch().where_status(TaskStatus::Pending).stream());
let initial = stream.next().await.unwrap();
assert_eq!(initial.len(), N as usize);
let ids: Vec<u64> = initial.iter().map(|t| t.id).collect();
let sorted: Vec<u64> = (1..=N).collect();
assert_eq!(ids, sorted, "watcher without order_by must emit IdAsc");
}
#[tokio::test]
async fn test_snapshot_and_restore_skips_replay() {
let redex = Redex::new();
let tasks = TasksAdapter::open(&redex, ORIGIN).await.unwrap();
tasks.create(1, "alpha", 100).unwrap();
tasks.create(2, "beta", 200).unwrap();
tasks.complete(1, 150).unwrap();
let seq = tasks.rename(2, "beta-v2", 250).unwrap();
tasks.wait_for_seq(seq).await.unwrap();
let (bytes, last_seq) = tasks.snapshot().unwrap();
assert_eq!(last_seq, Some(3)); tasks.close().unwrap();
let tasks2 = TasksAdapter::open_from_snapshot(&redex, ORIGIN, &bytes, last_seq)
.await
.unwrap();
{
let state = tasks2.state();
let guard = state.read();
assert_eq!(guard.len(), 2);
let t1 = guard.get(1).unwrap();
assert_eq!(t1.status, TaskStatus::Completed);
let t2 = guard.get(2).unwrap();
assert_eq!(t2.title, "beta-v2");
assert_eq!(t2.status, TaskStatus::Pending);
}
let seq = tasks2.create(3, "gamma", 300).unwrap();
assert_eq!(seq, 4);
tasks2.wait_for_seq(seq).await.unwrap();
assert_eq!(tasks2.state().read().len(), 3);
}
#[tokio::test]
async fn test_snapshot_empty_state_has_no_last_seq() {
let redex = Redex::new();
let tasks = TasksAdapter::open(&redex, ORIGIN).await.unwrap();
let (bytes, last_seq) = tasks.snapshot().unwrap();
assert_eq!(last_seq, None);
assert!(!bytes.is_empty()); }
#[tokio::test]
async fn test_ingest_after_close_errors() {
let redex = Redex::new();
let tasks = TasksAdapter::open(&redex, ORIGIN).await.unwrap();
tasks.create(1, "a", 100).unwrap();
tasks.close().unwrap();
assert!(tasks.create(2, "b", 101).is_err());
}
#[cfg(feature = "redex-disk")]
#[tokio::test]
async fn test_persistent_tasks_recover_across_processes() {
use std::path::PathBuf;
let mut base: PathBuf = std::env::temp_dir();
base.push(format!(
"cortex_tasks_persist_{}_{}",
std::process::id(),
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos()
));
std::fs::create_dir_all(&base).unwrap();
let cfg = RedexFileConfig::default().with_persistent(true);
{
let redex = Redex::new().with_persistent_dir(&base);
let tasks = TasksAdapter::open_with_config(&redex, ORIGIN, cfg.clone())
.await
.unwrap();
tasks.create(1, "durable", 100).unwrap();
tasks.create(2, "also durable", 101).unwrap();
let seq = tasks.complete(1, 102).unwrap();
tasks.wait_for_seq(seq).await.unwrap();
tasks.close().unwrap();
}
let redex2 = Redex::new().with_persistent_dir(&base);
let tasks2 = TasksAdapter::open_with_config(&redex2, ORIGIN, cfg)
.await
.unwrap();
tasks2.wait_for_seq(2).await.unwrap();
let state = tasks2.state();
let guard = state.read();
assert_eq!(guard.len(), 2);
assert_eq!(guard.get(1).unwrap().status, TaskStatus::Completed);
assert_eq!(guard.get(2).unwrap().status, TaskStatus::Pending);
let _ = std::fs::remove_dir_all(&base);
}
#[tokio::test]
async fn test_snapshot_and_watch_snapshot_reflects_current_state() {
let redex = Redex::new();
let tasks = TasksAdapter::open(&redex, ORIGIN).await.unwrap();
tasks.create(1, "p1", 100).unwrap();
tasks.create(2, "c1", 200).unwrap();
let seq = tasks.complete(2, 250).unwrap();
tasks.wait_for_seq(seq).await.unwrap();
let watcher = tasks.watch().where_status(TaskStatus::Pending);
let (snapshot, _stream) = tasks.snapshot_and_watch(watcher);
let ids: Vec<_> = snapshot.iter().map(|t| t.id).collect();
assert_eq!(
ids,
vec![1],
"snapshot must reflect the filter evaluated against current state"
);
}
#[tokio::test]
async fn test_regression_snapshot_and_watch_delivers_post_call_updates() {
let redex = Redex::new();
let tasks = TasksAdapter::open(&redex, ORIGIN).await.unwrap();
let seq = tasks.create(1, "seed", 100).unwrap();
tasks.wait_for_seq(seq).await.unwrap();
let watcher = tasks.watch();
let (initial, mut stream) = tasks.snapshot_and_watch(watcher);
let initial_ids: Vec<_> = initial.iter().map(|t| t.id).collect();
assert_eq!(initial_ids, vec![1]);
let seq = tasks.create(2, "post", 200).unwrap();
tasks.wait_for_seq(seq).await.unwrap();
let observed = tokio::time::timeout(std::time::Duration::from_secs(1), stream.next())
.await
.expect("stream must emit after mutation")
.expect("stream must not end");
let ids: Vec<_> = observed.iter().map(|t| t.id).collect();
assert_eq!(ids, vec![1, 2]);
assert_ne!(observed, initial);
}
#[tokio::test]
async fn test_regression_snapshot_and_watch_forwards_divergent_stream_initial() {
for trial in 0..20 {
let redex = Redex::new();
let tasks = std::sync::Arc::new(TasksAdapter::open(&redex, ORIGIN).await.unwrap());
let seq = tasks.create(1, "seed", 100).unwrap();
tasks.wait_for_seq(seq).await.unwrap();
let tasks_m = tasks.clone();
let mutator = tokio::spawn(async move {
let seq = tasks_m.create(2, "race", 200).unwrap();
tasks_m.wait_for_seq(seq).await.unwrap();
});
let watcher = tasks.watch();
let (initial, mut stream) = tasks.snapshot_and_watch(watcher);
mutator.await.unwrap();
if initial.len() == 2 {
continue;
}
assert_eq!(
initial.len(),
1,
"trial {}: snapshot should be [seed]",
trial
);
let observed = tokio::time::timeout(std::time::Duration::from_secs(1), stream.next())
.await
.unwrap_or_else(|_| {
panic!(
"trial {}: stream must deliver post-snapshot state within timeout",
trial
)
})
.expect("stream must not end");
assert_eq!(
observed.len(),
2,
"trial {}: stream must deliver state with both tasks",
trial
);
}
}
#[tokio::test]
async fn poll_for_token_synchronous_non_blocking_check() {
let redex = Redex::new();
let tasks = TasksAdapter::open(&redex, ORIGIN).await.unwrap();
let seq = tasks.create(1, "ping", now_ns()).unwrap();
let token = WriteToken::new(ORIGIN, seq);
tasks.wait_for_seq(seq).await.unwrap();
tasks
.poll_for_token(token)
.expect("poll must succeed once seq applied");
let future_token = WriteToken::new(ORIGIN, seq + 1_000_000);
let err = tasks.poll_for_token(future_token).unwrap_err();
assert!(matches!(err, WaitForTokenError::Timeout));
let alien_token = WriteToken::new(0xDEAD_BEEF, seq);
let err = tasks.poll_for_token(alien_token).unwrap_err();
assert!(matches!(err, WaitForTokenError::WrongOrigin { .. }));
}