use std::collections::HashMap;
use std::sync::Arc;
use std::sync::Mutex;
use std::sync::atomic::{AtomicBool, Ordering};
use haematite::{Database, DatabaseConfig, EventStore};
use liminal::durability::bridge::block_on;
use liminal::durability::{
DurabilityError, DurableStore, HaematiteStore, MessageEnvelope as DurableEnvelope, StoredEntry,
};
use liminal::protocol::{CausalContext, MessageEnvelope, SchemaId};
use tempfile::TempDir;
use super::services::{ConnectionServices, LiminalConnectionServices};
use crate::config::types::{ChannelDef, ServerConfig};
fn disk_store() -> Result<(Arc<dyn DurableStore>, TempDir), Box<dyn std::error::Error>> {
let dir = tempfile::tempdir()?;
let database = Database::create(DatabaseConfig {
data_dir: dir.path().join("db"),
shard_count: 4,
sweep_interval: None,
distributed: None,
})?;
let store: Arc<dyn DurableStore> =
Arc::new(HaematiteStore::new(Arc::new(EventStore::new(database))));
Ok((store, dir))
}
const ORDERS_STREAM_KEY: &str = "orders:0";
#[test]
fn shutdown_flush_persists_durable_channel_state_to_store() -> Result<(), Box<dyn std::error::Error>>
{
let (store, _dir) = disk_store()?;
let services =
LiminalConnectionServices::from_config_with_store(&durable_orders_config()?, store)?;
let first = br#"{"order":1}"#.to_vec();
let second = br#"{"order":2}"#.to_vec();
services.publish("orders", &order_envelope(first.clone()), None)?;
services.publish("orders", &order_envelope(second.clone()), None)?;
services.flush_durable_state()?;
let persisted = read_payloads(services.durable_store().as_ref(), ORDERS_STREAM_KEY)?;
assert_eq!(persisted, vec![first, second]);
Ok(())
}
#[test]
fn persisted_durable_state_survives_fresh_services_over_same_store()
-> Result<(), Box<dyn std::error::Error>> {
let (store, _dir) = disk_store()?;
{
let services = LiminalConnectionServices::from_config_with_store(
&durable_orders_config()?,
Arc::clone(&store),
)?;
services.publish("orders", &order_envelope(br#"{"order":7}"#.to_vec()), None)?;
services.flush_durable_state()?;
}
let restarted =
LiminalConnectionServices::from_config_with_store(&durable_orders_config()?, store)?;
let persisted = read_payloads(restarted.durable_store().as_ref(), ORDERS_STREAM_KEY)?;
assert_eq!(persisted, vec![br#"{"order":7}"#.to_vec()]);
Ok(())
}
#[test]
fn duplicate_idempotency_key_delivers_to_subscriber_exactly_once()
-> Result<(), Box<dyn std::error::Error>> {
let (store, _dir) = disk_store()?;
let services =
LiminalConnectionServices::from_config_with_store(&ephemeral_orders_config()?, store)?;
let subscription = services.subscribe_handle_for_test("orders")?;
let payload = br#"{"order":1}"#.to_vec();
let first = services.publish("orders", &order_envelope(payload.clone()), Some("k1"))?;
assert!(
first.delivered,
"first publish of a fresh key with a subscriber must report a genuine delivery"
);
let duplicate = services.publish("orders", &order_envelope(payload.clone()), Some("k1"))?;
assert!(
!duplicate.delivered,
"a duplicate idempotency key must be suppressed (no second delivery)"
);
let other_payload = br#"{"order":2}"#.to_vec();
let other = services.publish("orders", &order_envelope(other_payload.clone()), Some("k2"))?;
assert!(
other.delivered,
"a different idempotency key must be delivered"
);
let mut received = Vec::new();
while let Some(envelope) = subscription.try_next()? {
received.push(envelope.payload);
}
assert_eq!(
received,
vec![payload, other_payload],
"subscriber must receive each distinct key once and never the duplicate"
);
Ok(())
}
#[test]
fn publish_without_subscriber_reports_not_delivered() -> Result<(), Box<dyn std::error::Error>> {
let (store, _dir) = disk_store()?;
let services =
LiminalConnectionServices::from_config_with_store(&ephemeral_orders_config()?, store)?;
let outcome = services.publish("orders", &order_envelope(br#"{"order":9}"#.to_vec()), None)?;
assert!(
!outcome.delivered,
"a publish that reaches no subscriber must report a non-delivery ack"
);
Ok(())
}
#[test]
fn publish_failure_releases_claim_so_reclaim_is_delivered() -> Result<(), Box<dyn std::error::Error>>
{
let (inner, _dir) = disk_store()?;
let failing = Arc::new(FailingAppendStore::new(inner, |stream_key| {
stream_key == ORDERS_STREAM_KEY
}));
let store: Arc<dyn DurableStore> = Arc::clone(&failing) as Arc<dyn DurableStore>;
let services =
LiminalConnectionServices::from_config_with_store(&durable_orders_config()?, store)?;
let subscription = services.subscribe_handle_for_test("orders")?;
let payload = br#"{"order":1}"#.to_vec();
let first = services.publish("orders", &order_envelope(payload.clone()), Some("k1"));
assert!(
first.is_err(),
"publish must surface the injected durable-append failure"
);
failing.clear_failure();
let retry = services.publish("orders", &order_envelope(payload.clone()), Some("k1"))?;
assert!(
retry.delivered,
"after a failed publish releases its claim, the re-publish must be delivered, not suppressed"
);
let mut received = Vec::new();
while let Some(envelope) = subscription.try_next()? {
received.push(envelope.payload);
}
assert_eq!(
received,
vec![payload],
"the retry delivers exactly once; the failed publish delivered nothing"
);
Ok(())
}
#[test]
fn publish_failure_with_failing_release_returns_original_error()
-> Result<(), Box<dyn std::error::Error>> {
let (inner, _dir) = disk_store()?;
let store = Arc::new(FailingAppendStore::fail_after_first_per_stream(inner));
let store: Arc<dyn DurableStore> = store;
let services =
LiminalConnectionServices::from_config_with_store(&durable_orders_config()?, store)?;
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
services.publish(
"orders",
&order_envelope(br#"{"order":1}"#.to_vec()),
Some("k1"),
)
}));
let publish_result = result.map_err(|_| "publish panicked under failing release")?;
let error = publish_result
.err()
.ok_or("publish must surface the original failure")?;
let message = error.to_string();
assert!(
message.contains("liminal publish failed"),
"must return the ORIGINAL publish error, got: {message}"
);
Ok(())
}
fn ephemeral_orders_config() -> Result<ServerConfig, Box<dyn std::error::Error>> {
Ok(ServerConfig {
listen_address: "127.0.0.1:0".parse()?,
health_listen_address: "127.0.0.1:0".parse()?,
drain_timeout_ms: 30_000,
channels: vec![ChannelDef {
name: "orders".to_owned(),
schema_ref: "schemas/orders.json".to_owned(),
durable: false,
}],
routing_rules: Vec::new(),
persistence_path: None,
cluster: None,
})
}
fn durable_orders_config() -> Result<ServerConfig, Box<dyn std::error::Error>> {
Ok(ServerConfig {
listen_address: "127.0.0.1:0".parse()?,
health_listen_address: "127.0.0.1:0".parse()?,
drain_timeout_ms: 30_000,
channels: vec![ChannelDef {
name: "orders".to_owned(),
schema_ref: "schemas/orders.json".to_owned(),
durable: true,
}],
routing_rules: Vec::new(),
persistence_path: None,
cluster: None,
})
}
fn order_envelope(payload: Vec<u8>) -> MessageEnvelope {
MessageEnvelope::new(
SchemaId::new([0_u8; SchemaId::WIRE_LEN]),
CausalContext::independent(),
payload,
)
}
fn read_payloads(
store: &dyn DurableStore,
stream_key: &str,
) -> Result<Vec<Vec<u8>>, Box<dyn std::error::Error>> {
let entries: Vec<StoredEntry> = block_on(store.read_from(stream_key, 0, 1024))??;
let mut payloads = Vec::with_capacity(entries.len());
for entry in entries {
payloads.push(DurableEnvelope::deserialize(&entry.payload)?.payload);
}
Ok(payloads)
}
enum FailMode {
Predicate(fn(&str) -> bool),
ChannelAlwaysDedupAfterFirst(Mutex<HashMap<String, u32>>),
}
#[derive(Debug)]
struct FailingAppendStore {
inner: Arc<dyn DurableStore>,
armed: AtomicBool,
mode: FailMode,
}
impl std::fmt::Debug for FailMode {
fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Predicate(_) => formatter.write_str("Predicate"),
Self::ChannelAlwaysDedupAfterFirst(_) => {
formatter.write_str("ChannelAlwaysDedupAfterFirst")
}
}
}
}
impl FailingAppendStore {
fn new(inner: Arc<dyn DurableStore>, should_fail: fn(&str) -> bool) -> Self {
Self {
inner,
armed: AtomicBool::new(true),
mode: FailMode::Predicate(should_fail),
}
}
fn fail_after_first_per_stream(inner: Arc<dyn DurableStore>) -> Self {
Self {
inner,
armed: AtomicBool::new(true),
mode: FailMode::ChannelAlwaysDedupAfterFirst(Mutex::new(HashMap::new())),
}
}
fn clear_failure(&self) {
self.armed.store(false, Ordering::SeqCst);
}
fn should_fail(&self, stream_key: &str) -> Result<bool, DurabilityError> {
if !self.armed.load(Ordering::SeqCst) {
return Ok(false);
}
match &self.mode {
FailMode::Predicate(predicate) => Ok(predicate(stream_key)),
FailMode::ChannelAlwaysDedupAfterFirst(seen) => {
if stream_key == ORDERS_STREAM_KEY {
return Ok(true);
}
let mut seen = seen
.lock()
.map_err(|_| DurabilityError::ConfigError("test lock poisoned".to_owned()))?;
let count = seen.entry(stream_key.to_owned()).or_insert(0);
let fail = *count > 0;
*count += 1;
drop(seen);
Ok(fail)
}
}
}
}
#[async_trait::async_trait]
impl DurableStore for FailingAppendStore {
async fn append(
&self,
stream_key: &str,
payload: Vec<u8>,
expected_seq: u64,
) -> Result<u64, DurabilityError> {
if self.should_fail(stream_key)? {
return Err(DurabilityError::ConfigError(format!(
"injected append failure for stream '{stream_key}'"
)));
}
self.inner.append(stream_key, payload, expected_seq).await
}
async fn read_from(
&self,
stream_key: &str,
offset: u64,
limit: usize,
) -> Result<Vec<StoredEntry>, DurabilityError> {
self.inner.read_from(stream_key, offset, limit).await
}
async fn cas(&self, key: &str, old_value: u64, new_value: u64) -> Result<(), DurabilityError> {
self.inner.cas(key, old_value, new_value).await
}
async fn read_value(&self, key: &str) -> Result<Option<u64>, DurabilityError> {
self.inner.read_value(key).await
}
async fn scan(&self, prefix: &str) -> Result<Vec<StoredEntry>, DurabilityError> {
self.inner.scan(prefix).await
}
async fn flush(&self) -> Result<(), DurabilityError> {
self.inner.flush().await
}
}