#![cfg(feature = "cortex")]
use std::collections::HashMap;
use std::sync::Arc;
use bytes::Bytes;
use net::adapter::net::channel::ChannelName;
use net::adapter::net::cortex::{
CortexAdapter, CortexAdapterConfig, EventEnvelope, EventMeta, EVENT_META_SIZE,
};
use net::adapter::net::redex::{Redex, RedexError, RedexEvent, RedexFileConfig, RedexFold};
fn cn(s: &str) -> ChannelName {
ChannelName::new(s).unwrap()
}
#[derive(Default, serde::Serialize, serde::Deserialize)]
struct RecorderState {
seen: Vec<(u8, u64)>,
}
struct RecorderFold;
impl RedexFold<RecorderState> for RecorderFold {
fn apply(&mut self, ev: &RedexEvent, state: &mut RecorderState) -> Result<(), RedexError> {
let meta = EventMeta::from_bytes(&ev.payload[..EVENT_META_SIZE])
.ok_or_else(|| RedexError::Encode("bad EventMeta".into()))?;
state.seen.push((meta.dispatch, meta.seq_or_ts));
Ok(())
}
}
fn mk_env(dispatch: u8, seq_or_ts: u64, tail: &[u8]) -> EventEnvelope {
let meta = EventMeta::new(dispatch, 0, 0xAB, seq_or_ts, 0);
EventEnvelope::new(meta, Bytes::copy_from_slice(tail))
}
#[tokio::test]
async fn test_read_after_write() {
let redex = Redex::new();
let adapter = CortexAdapter::<RecorderState>::open(
&redex,
&cn("cortex/raw"),
RedexFileConfig::default(),
CortexAdapterConfig::default(),
RecorderFold,
RecorderState::default(),
)
.unwrap();
for i in 0..50u64 {
let seq = adapter.ingest(mk_env(1, i, b"")).unwrap();
adapter.wait_for_seq(seq).await.unwrap();
}
let state_handle = adapter.state();
let guard = state_handle.read();
assert_eq!(guard.seen.len(), 50);
for (i, (dispatch, app_seq)) in guard.seen.iter().enumerate() {
assert_eq!(*dispatch, 1);
assert_eq!(*app_seq, i as u64);
}
}
#[tokio::test]
async fn test_replay_from_beginning_rebuilds_state() {
let redex = Redex::new();
{
let a = CortexAdapter::<RecorderState>::open(
&redex,
&cn("cortex/replay"),
RedexFileConfig::default(),
CortexAdapterConfig::default(),
RecorderFold,
RecorderState::default(),
)
.unwrap();
for i in 0..25u64 {
let seq = a.ingest(mk_env(2, i, b"")).unwrap();
a.wait_for_seq(seq).await.unwrap();
}
a.close().unwrap();
}
let a2 = CortexAdapter::<RecorderState>::open(
&redex,
&cn("cortex/replay"),
RedexFileConfig::default(),
CortexAdapterConfig::default(),
RecorderFold,
RecorderState::default(),
)
.unwrap();
a2.wait_for_seq(24).await.unwrap();
assert_eq!(a2.state().read().seen.len(), 25);
}
#[tokio::test]
async fn test_from_seq_checkpoint_resume() {
let redex = Redex::new();
let a = CortexAdapter::<RecorderState>::open(
&redex,
&cn("cortex/checkpoint"),
RedexFileConfig::default(),
CortexAdapterConfig::default(),
RecorderFold,
RecorderState::default(),
)
.unwrap();
for i in 0..10u64 {
let seq = a.ingest(mk_env(3, i, b"")).unwrap();
a.wait_for_seq(seq).await.unwrap();
}
a.close().unwrap();
let mut rehydrated = RecorderState::default();
for i in 0..5u64 {
rehydrated.seen.push((3, i));
}
let snapshot_bytes = postcard::to_allocvec(&rehydrated).unwrap();
let a2 = CortexAdapter::<RecorderState>::open_from_snapshot(
&redex,
&cn("cortex/checkpoint"),
RedexFileConfig::default(),
CortexAdapterConfig::default(),
RecorderFold,
&snapshot_bytes,
Some(4),
)
.unwrap();
a2.wait_for_seq(9).await.unwrap();
let state = a2.state();
let guard = state.read();
assert_eq!(guard.seen.len(), 10);
for (i, (_d, app_seq)) in guard.seen.iter().enumerate() {
assert_eq!(*app_seq, i as u64);
}
}
#[tokio::test]
async fn test_live_only_skips_pre_open_events() {
let redex = Redex::new();
{
let a = CortexAdapter::<RecorderState>::open(
&redex,
&cn("cortex/liveonly"),
RedexFileConfig::default(),
CortexAdapterConfig::default(),
RecorderFold,
RecorderState::default(),
)
.unwrap();
for i in 0..10u64 {
let seq = a.ingest(mk_env(5, i, b"")).unwrap();
a.wait_for_seq(seq).await.unwrap();
}
a.close().unwrap();
}
let empty_snapshot = postcard::to_allocvec(&RecorderState::default()).unwrap();
let a2 = CortexAdapter::<RecorderState>::open_from_snapshot(
&redex,
&cn("cortex/liveonly"),
RedexFileConfig::default(),
CortexAdapterConfig::default(),
RecorderFold,
&empty_snapshot,
Some(9),
)
.unwrap();
assert_eq!(a2.state().read().seen.len(), 0);
for i in 100..103u64 {
let seq = a2.ingest(mk_env(5, i, b"")).unwrap();
a2.wait_for_seq(seq).await.unwrap();
}
let state = a2.state();
let guard = state.read();
assert_eq!(guard.seen.len(), 3);
assert_eq!(guard.seen[0].1, 100);
assert_eq!(guard.seen[2].1, 102);
}
#[derive(Default)]
struct TaskState {
tasks: HashMap<u64, String>,
}
const DISPATCH_TASK_CREATED: u8 = 0x80;
const DISPATCH_TASK_RENAMED: u8 = 0x81;
const DISPATCH_TASK_DELETED: u8 = 0x82;
struct TaskFold;
impl RedexFold<TaskState> for TaskFold {
fn apply(&mut self, ev: &RedexEvent, state: &mut TaskState) -> Result<(), RedexError> {
let meta = EventMeta::from_bytes(&ev.payload[..EVENT_META_SIZE])
.ok_or_else(|| RedexError::Encode("bad EventMeta".into()))?;
let tail = &ev.payload[EVENT_META_SIZE..];
let id = meta.seq_or_ts;
match meta.dispatch {
DISPATCH_TASK_CREATED | DISPATCH_TASK_RENAMED => {
state
.tasks
.insert(id, String::from_utf8_lossy(tail).into_owned());
}
DISPATCH_TASK_DELETED => {
state.tasks.remove(&id);
}
_ => {}
}
Ok(())
}
}
#[tokio::test]
async fn test_mixed_dispatch_routing() {
let redex = Redex::new();
let adapter = CortexAdapter::<TaskState>::open(
&redex,
&cn("cortex/tasks"),
RedexFileConfig::default(),
CortexAdapterConfig::default(),
TaskFold,
TaskState::default(),
)
.unwrap();
adapter
.ingest(mk_env(DISPATCH_TASK_CREATED, 1, b"first"))
.unwrap();
adapter
.ingest(mk_env(DISPATCH_TASK_CREATED, 2, b"second"))
.unwrap();
adapter
.ingest(mk_env(DISPATCH_TASK_RENAMED, 1, b"first-renamed"))
.unwrap();
let seq = adapter
.ingest(mk_env(DISPATCH_TASK_DELETED, 2, b""))
.unwrap();
adapter.wait_for_seq(seq).await.unwrap();
let state = adapter.state();
let guard = state.read();
assert_eq!(guard.tasks.len(), 1);
assert_eq!(
guard.tasks.get(&1).map(|s| s.as_str()),
Some("first-renamed")
);
assert!(!guard.tasks.contains_key(&2));
}
#[tokio::test]
async fn test_ingest_after_close_errors() {
let redex = Redex::new();
let adapter = CortexAdapter::<RecorderState>::open(
&redex,
&cn("cortex/close-ingest"),
RedexFileConfig::default(),
CortexAdapterConfig::default(),
RecorderFold,
RecorderState::default(),
)
.unwrap();
let seq = adapter.ingest(mk_env(0, 0, b"")).unwrap();
adapter.wait_for_seq(seq).await.unwrap();
adapter.close().unwrap();
assert_eq!(adapter.state().read().seen.len(), 1);
assert!(adapter.ingest(mk_env(0, 1, b"")).is_err());
}
#[tokio::test]
async fn test_burst_ordering_preserved() {
let redex = Redex::new();
let adapter = CortexAdapter::<RecorderState>::open(
&redex,
&cn("cortex/burst"),
RedexFileConfig::default(),
CortexAdapterConfig::default(),
RecorderFold,
RecorderState::default(),
)
.unwrap();
let mut last_seq = 0;
for i in 0..500u64 {
last_seq = adapter.ingest(mk_env(7, i, b"")).unwrap();
}
adapter.wait_for_seq(last_seq).await.unwrap();
let state = adapter.state();
let guard = state.read();
assert_eq!(guard.seen.len(), 500);
for (i, (_d, app_seq)) in guard.seen.iter().enumerate() {
assert_eq!(
*app_seq, i as u64,
"event {} arrived out of order or missing",
i
);
}
}
#[tokio::test]
async fn test_regression_folded_through_seq_sentinel_handles_u64_range() {
let redex = Redex::new();
let adapter = CortexAdapter::<RecorderState>::open(
&redex,
&cn("cortex/sentinel"),
RedexFileConfig::default(),
CortexAdapterConfig::default(),
RecorderFold,
RecorderState::default(),
)
.unwrap();
assert!(
adapter.folded_through_seq().is_none(),
"fresh adapter must report the 'nothing folded yet' sentinel as None"
);
let seq = adapter.ingest(mk_env(1, 0, b"")).unwrap();
adapter.wait_for_seq(seq).await.unwrap();
assert_eq!(
adapter.folded_through_seq(),
Some(seq),
"after ingest + wait_for_seq, folded_through_seq must be the applied seq"
);
}
#[tokio::test]
async fn test_state_handle_is_shared_arc() {
let redex = Redex::new();
let adapter = CortexAdapter::<RecorderState>::open(
&redex,
&cn("cortex/shared"),
RedexFileConfig::default(),
CortexAdapterConfig::default(),
RecorderFold,
RecorderState::default(),
)
.unwrap();
let handle_a: Arc<_> = adapter.state();
let handle_b: Arc<_> = adapter.state();
assert!(Arc::ptr_eq(&handle_a, &handle_b));
let seq = adapter.ingest(mk_env(0, 42, b"")).unwrap();
adapter.wait_for_seq(seq).await.unwrap();
assert_eq!(handle_a.read().seen.len(), 1);
assert_eq!(handle_b.read().seen.len(), 1);
}