use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use slatedb::object_store::memory::InMemory;
use slatedb::{Db, ErrorKind, IsolationLevel, WriteBatch};
use time::OffsetDateTime;
use crate::{
deadline::{Deadline, DeadlineStore, DueNowResult},
envelope::{EventEnvelope, NewEvent},
error::EngineError,
event_store::{AppendResult, AtomicAppend, EventStore, ExpectedVersion},
ids::{DeadlineId, OutboxMessageId, ProcessIdentity, StreamId, TenantId},
inbox::InboxStore,
outbox::{OutboxMessage, OutboxStore, PendingOutbox},
registry::{ProcessRegistry, RegistryKey},
snapshot::{Snapshot, SnapshotStore},
};
fn event_key(stream_id: &StreamId, seq: u64) -> String {
format!("e/{stream_id}/{seq:016x}")
}
fn sv_key(stream_id: &StreamId) -> String {
format!("sv/{stream_id}")
}
fn si_key(stream_id: &StreamId) -> String {
format!("si/{stream_id}")
}
fn sn_key(stream_id: &StreamId) -> String {
format!("sn/{stream_id}")
}
fn cp_cursor_key(name: &str, stream_id: &StreamId) -> String {
format!("cp/{name}/{stream_id}")
}
fn cp_prefix_end(name: &str) -> String {
format!("cp/{name}0")
}
fn om_key(id: &OutboxMessageId) -> String {
format!("om/{id}")
}
const OM_COUNT_KEY: &[u8] = b"_count/om";
async fn read_om_count_txn(txn: &slatedb::DbTransaction) -> Result<u64, EngineError> {
match txn.get(OM_COUNT_KEY).await.map_err(to_outbox_err)? {
None => Ok(0), Some(bytes) if bytes.len() == 8 => Ok(u64::from_le_bytes(bytes[..8].try_into().unwrap())),
Some(bytes) => {
Err(EngineError::store(format!(
"_count/om corrupt: expected 8-byte little-endian u64, got {} bytes",
bytes.len()
)))
}
}
}
async fn read_om_count(db: &Db) -> Result<u64, EngineError> {
match db.get(OM_COUNT_KEY).await.map_err(to_outbox_err)? {
None => Ok(0), Some(bytes) if bytes.len() == 8 => Ok(u64::from_le_bytes(bytes[..8].try_into().unwrap())),
Some(bytes) => {
Err(EngineError::store(format!(
"_count/om corrupt: expected 8-byte little-endian u64, got {} bytes",
bytes.len()
)))
}
}
}
fn ot_key(ts: OffsetDateTime, id: &OutboxMessageId) -> String {
let nanos = u64::try_from(ts.unix_timestamp_nanos().max(0)).unwrap_or(0);
format!("ot/{nanos:016x}/{id}")
}
fn event_prefix_end(stream_id: &StreamId) -> String {
format!("e/{stream_id}0")
}
fn dl_key(id: &DeadlineId) -> String {
format!("dl/{id}")
}
fn dt_key(due_at: OffsetDateTime, id: &DeadlineId) -> String {
let nanos = u64::try_from(due_at.unix_timestamp_nanos().max(0)).unwrap_or(0);
format!("dt/{nanos:016x}/{id}")
}
fn ds_key(stream_id: &StreamId, id: &DeadlineId) -> String {
format!("ds/{}/{id}", stream_id.as_str())
}
fn ds_stream_prefix(stream_id: &StreamId) -> String {
format!("ds/{}/", stream_id.as_str())
}
fn pr_key(tenant_id: TenantId, key: &RegistryKey) -> String {
format!("pr/{tenant_id}/{}", key.as_str())
}
#[cfg(test)]
#[allow(dead_code)]
fn pr_tenant_prefix(tenant_id: TenantId) -> String {
format!("pr/{tenant_id}/")
}
fn ci_key(tenant_id: TenantId, tag: &str, process_id: crate::ids::ProcessId) -> String {
format!("ci/{tenant_id}/{tag}/{process_id}")
}
fn ci_tag_prefix(tenant_id: TenantId, tag: &str) -> String {
format!("ci/{tenant_id}/{tag}/")
}
fn validate_ci_tag(tag: &str) -> Result<(), EngineError> {
if tag.contains('\0') {
return Err(EngineError::registry(
"ci tag must not contain NUL bytes ('\\0')",
));
}
if tag.is_empty() {
return Err(EngineError::registry("ci tag must not be empty"));
}
if tag.contains('/') {
return Err(EngineError::registry(
"ci tag must not contain '/' — use '-' or ':' as a separator",
));
}
if tag.len() > 128 {
return Err(EngineError::registry(
"ci tag exceeds the 128-byte maximum length",
));
}
Ok(())
}
fn ib_key(key: &str) -> String {
format!("ib/{key}")
}
fn it_key(ts: OffsetDateTime, nonce: &str) -> String {
let nanos = u64::try_from(ts.unix_timestamp_nanos().max(0)).unwrap_or(0);
format!("it/{nanos:016x}/{nonce}")
}
fn slatedb_error_kind_str(e: &slatedb::Error) -> &'static str {
use slatedb::ErrorKind;
match e.kind() {
ErrorKind::Transaction => "transaction conflict",
ErrorKind::Closed(_) => "database closed",
ErrorKind::Unavailable => "storage unavailable",
ErrorKind::Invalid => "invalid storage operation",
ErrorKind::Data => "data integrity error",
ErrorKind::Internal => "internal storage error",
_ => "storage error",
}
}
pub(crate) fn slatedb_error_is_transient(e: &slatedb::Error) -> bool {
use slatedb::ErrorKind;
matches!(e.kind(), ErrorKind::Unavailable | ErrorKind::Closed(_))
}
#[allow(clippy::needless_pass_by_value)]
fn to_store_err(e: slatedb::Error) -> EngineError {
tracing::error!(error = %e, "event store error");
if slatedb_error_is_transient(&e) {
EngineError::transient_store(slatedb_error_kind_str(&e))
} else {
EngineError::store(slatedb_error_kind_str(&e))
}
}
#[allow(clippy::needless_pass_by_value)]
fn to_outbox_err(e: slatedb::Error) -> EngineError {
tracing::error!(error = %e, "outbox store error");
if slatedb_error_is_transient(&e) {
EngineError::transient_outbox(slatedb_error_kind_str(&e))
} else {
EngineError::outbox(slatedb_error_kind_str(&e))
}
}
#[allow(clippy::needless_pass_by_value)]
fn to_deadline_err(e: slatedb::Error) -> EngineError {
tracing::error!(error = %e, "deadline store error");
if slatedb_error_is_transient(&e) {
EngineError::transient_deadline(slatedb_error_kind_str(&e))
} else {
EngineError::deadline(slatedb_error_kind_str(&e))
}
}
#[allow(clippy::needless_pass_by_value)]
fn to_registry_err(e: slatedb::Error) -> EngineError {
tracing::error!(error = %e, "process registry error");
if slatedb_error_is_transient(&e) {
EngineError::transient_registry(slatedb_error_kind_str(&e))
} else {
EngineError::registry(slatedb_error_kind_str(&e))
}
}
#[allow(clippy::needless_pass_by_value)]
fn to_inbox_err(e: slatedb::Error) -> EngineError {
tracing::error!(error = %e, "inbox store error");
if slatedb_error_is_transient(&e) {
EngineError::transient_inbox(slatedb_error_kind_str(&e))
} else {
EngineError::inbox(slatedb_error_kind_str(&e))
}
}
#[derive(Clone)]
pub struct SlateDbStore {
db: Db,
closed: Arc<AtomicBool>,
max_stream_events: Option<u64>,
}
impl SlateDbStore {
pub async fn open(
path: impl Into<slatedb::object_store::path::Path>,
object_store: Arc<dyn slatedb::object_store::ObjectStore>,
) -> Result<Self, EngineError> {
let db = Db::open(path, object_store).await.map_err(to_store_err)?;
Ok(Self {
db,
closed: Arc::new(AtomicBool::new(false)),
max_stream_events: None,
})
}
pub async fn open_in_memory() -> Result<Self, EngineError> {
Self::open_in_memory_with_label("mako-engine-test").await
}
pub async fn open_in_memory_with_label(label: &str) -> Result<Self, EngineError> {
let object_store = Arc::new(InMemory::new());
Self::open(label, object_store).await
}
pub async fn open_local(dir: impl AsRef<std::path::Path>) -> Result<Self, EngineError> {
let dir = dir.as_ref();
std::fs::create_dir_all(dir)
.map_err(|e| EngineError::store(format!("cannot create data-dir: {e}")))?;
let canonical = dir
.canonicalize()
.map_err(|e| EngineError::store(format!("invalid data-dir: {e}")))?;
let local = slatedb::object_store::local::LocalFileSystem::new_with_prefix(&canonical)
.map_err(|e| EngineError::store(e.to_string()))?;
Self::open("db", Arc::new(local)).await
}
#[must_use]
pub fn with_max_stream_events(mut self, max_events: u64) -> Self {
self.max_stream_events = Some(max_events);
self
}
fn check_quota(
&self,
stream_id: &StreamId,
current: u64,
new_events: usize,
) -> Result<(), EngineError> {
if let Some(limit) = self.max_stream_events {
let actual = current + new_events as u64;
if actual > limit {
tracing::error!(
stream_id = %stream_id,
limit,
new_events,
actual,
"stream quota exceeded"
);
return Err(EngineError::StreamQuotaExceeded {
stream_id: stream_id.clone(),
limit,
new_events,
actual,
});
}
}
Ok(())
}
pub async fn close(&self) -> Result<(), EngineError> {
if self.closed.swap(true, Ordering::AcqRel) {
tracing::warn!("SlateDbStore::close called more than once — this is a no-op");
return Ok(());
}
self.db.close().await.map_err(to_store_err)
}
pub async fn close_owned(self) -> Result<(), EngineError> {
self.close().await
}
async fn read_version(&self, stream_id: &StreamId) -> Result<u64, EngineError> {
match self
.db
.get(sv_key(stream_id).as_bytes())
.await
.map_err(to_store_err)?
{
Some(bytes) if bytes.len() >= 8 => {
Ok(u64::from_le_bytes(bytes[..8].try_into().unwrap()))
}
_ => Ok(0),
}
}
#[must_use]
pub fn as_deadline_store(&self) -> SlateDbDeadlineStore {
SlateDbDeadlineStore {
db: self.db.clone(),
}
}
#[must_use]
pub fn as_process_registry(&self) -> SlateDbProcessRegistry {
SlateDbProcessRegistry {
db: self.db.clone(),
}
}
#[must_use]
pub fn as_snapshot_store(&self) -> SlateDbSnapshotStore {
SlateDbSnapshotStore {
db: self.db.clone(),
}
}
#[must_use]
pub fn as_inbox_store(&self) -> SlateDbInboxStore {
SlateDbInboxStore {
db: self.db.clone(),
}
}
pub async fn kv_get(
&self,
ns: KvNamespace,
suffix: &str,
) -> Result<Option<Vec<u8>>, EngineError> {
let key = ns.key(suffix);
self.db
.get(key.as_bytes())
.await
.map_err(to_store_err)
.map(|opt| opt.map(|b| b.to_vec()))
}
pub async fn kv_put(
&self,
ns: KvNamespace,
suffix: &str,
value: &[u8],
) -> Result<(), EngineError> {
let key = ns.key(suffix);
let mut batch = WriteBatch::new();
batch.put(key.as_bytes(), value);
self.db.write(batch).await.map_err(to_store_err).map(|_| ())
}
pub async fn kv_delete(&self, ns: KvNamespace, suffix: &str) -> Result<(), EngineError> {
let key = ns.key(suffix);
let mut batch = WriteBatch::new();
batch.delete(key.as_bytes());
self.db.write(batch).await.map_err(to_store_err).map(|_| ())
}
pub async fn kv_scan_prefix(
&self,
ns: KvNamespace,
) -> Result<Vec<(String, Vec<u8>)>, EngineError> {
let prefix = ns.as_str();
let mut iter = self
.db
.scan_prefix(prefix.as_bytes())
.await
.map_err(to_store_err)?;
let mut results = Vec::new();
while let Some(kv) = iter.next().await.map_err(to_store_err)? {
let full_key =
std::str::from_utf8(&kv.key).map_err(|e| EngineError::store(e.to_string()))?;
let suffix = full_key.strip_prefix(prefix).unwrap_or(full_key).to_owned();
results.push((suffix, kv.value.to_vec()));
}
Ok(results)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct KvNamespace(&'static str);
impl KvNamespace {
#[must_use]
pub const fn new(prefix: &'static str) -> Self {
assert!(
!prefix.is_empty() && matches!(prefix.as_bytes().last(), Some(&b'/')),
"KvNamespace prefix must be non-empty and end with '/'"
);
Self(prefix)
}
#[must_use]
pub fn as_str(self) -> &'static str {
self.0
}
#[must_use]
pub fn key(self, suffix: &str) -> String {
format!("{}{}", self.0, suffix)
}
}
fn check_version(expected: ExpectedVersion, actual: u64) -> Result<(), EngineError> {
match expected {
ExpectedVersion::NoStream if actual != 0 => Err(EngineError::VersionConflict {
expected: 0,
actual,
}),
ExpectedVersion::Exact(v) if actual != v => Err(EngineError::VersionConflict {
expected: v,
actual,
}),
_ => Ok(()),
}
}
async fn read_version_in_txn(
txn: &slatedb::DbTransaction,
stream_id: &StreamId,
) -> Result<u64, EngineError> {
match txn
.get(sv_key(stream_id).as_bytes())
.await
.map_err(to_store_err)?
{
Some(bytes) if bytes.len() >= 8 => Ok(u64::from_le_bytes(bytes[..8].try_into().unwrap())),
_ => Ok(0),
}
}
fn stamp_events_txn(
txn: &slatedb::DbTransaction,
stream_id: &StreamId,
current_version: u64,
events: &[NewEvent],
now: OffsetDateTime,
) -> Result<Vec<EventEnvelope>, EngineError> {
let mut envelopes = Vec::with_capacity(events.len());
for (i, new_event) in events.iter().enumerate() {
let seq = current_version + 1 + i as u64;
let envelope = EventEnvelope::from_new(new_event.clone(), stream_id.clone(), seq, now);
let value = serde_json::to_vec(&envelope).map_err(|e| EngineError::store(e.to_string()))?;
txn.put(event_key(stream_id, seq).as_bytes(), value.as_slice())
.map_err(to_store_err)?;
envelopes.push(envelope);
}
let new_version = current_version + events.len() as u64;
txn.put(
sv_key(stream_id).as_bytes(),
new_version.to_le_bytes().as_slice(),
)
.map_err(to_store_err)?;
txn.put(si_key(stream_id).as_bytes(), b"")
.map_err(to_store_err)?;
Ok(envelopes)
}
fn write_outbox_entries_txn(
txn: &slatedb::DbTransaction,
messages: &[OutboxMessage],
) -> Result<(), EngineError> {
for msg in messages {
let value = serde_json::to_vec(msg).map_err(|e| EngineError::outbox(e.to_string()))?;
let ts_key = ot_key(msg.deliver_after.unwrap_or(msg.created_at), &msg.message_id);
txn.put(om_key(&msg.message_id).as_bytes(), value.as_slice())
.map_err(to_outbox_err)?;
txn.put(ts_key.as_bytes(), b"").map_err(to_outbox_err)?;
}
Ok(())
}
fn map_txn_commit_err(
e: slatedb::Error,
expected_version: ExpectedVersion,
snapshot_version: u64,
) -> EngineError {
if e.kind() == slatedb::ErrorKind::Transaction {
let expected = match expected_version {
ExpectedVersion::NoStream => 0,
ExpectedVersion::Exact(v) => v,
ExpectedVersion::Any => snapshot_version,
};
EngineError::VersionConflict {
expected,
actual: snapshot_version + 1,
}
} else {
to_store_err(e)
}
}
impl EventStore for SlateDbStore {
async fn append(
&self,
stream_id: &StreamId,
expected_version: ExpectedVersion,
events: &[NewEvent],
) -> Result<AppendResult, EngineError> {
let txn = self
.db
.begin(IsolationLevel::SerializableSnapshot)
.await
.map_err(to_store_err)?;
let current = read_version_in_txn(&txn, stream_id).await?;
check_version(expected_version, current)?;
self.check_quota(stream_id, current, events.len())?;
let now = OffsetDateTime::now_utc();
let envelopes = stamp_events_txn(&txn, stream_id, current, events, now)?;
txn.commit()
.await
.map_err(|e| map_txn_commit_err(e, expected_version, current))?;
Ok(AppendResult {
last_sequence: current + events.len() as u64,
events: envelopes,
})
}
async fn load(&self, stream_id: &StreamId) -> Result<Vec<EventEnvelope>, EngineError> {
self.load_from(stream_id, 0).await
}
async fn load_from(
&self,
stream_id: &StreamId,
from_sequence: u64,
) -> Result<Vec<EventEnvelope>, EngineError> {
let start_seq = from_sequence.saturating_add(1);
let start = event_key(stream_id, start_seq);
let end = event_prefix_end(stream_id);
let mut iter = self
.db
.scan(start.as_bytes()..end.as_bytes())
.await
.map_err(to_store_err)?;
let mut events = Vec::new();
while let Some(kv) = iter.next().await.map_err(to_store_err)? {
let event_env: EventEnvelope =
serde_json::from_slice(&kv.value).map_err(|e| EngineError::store(e.to_string()))?;
events.push(event_env);
}
Ok(events)
}
async fn fold_stream<T, F>(
&self,
stream_id: &StreamId,
from_sequence: u64,
initial: T,
mut f: F,
) -> Result<T, EngineError>
where
T: Send,
F: FnMut(T, EventEnvelope) -> Result<T, EngineError> + Send,
{
let start_seq = from_sequence.saturating_add(1);
let start = event_key(stream_id, start_seq);
let end = event_prefix_end(stream_id);
let mut iter = self
.db
.scan(start.as_bytes()..end.as_bytes())
.await
.map_err(to_store_err)?;
let mut acc = initial;
while let Some(kv) = iter.next().await.map_err(to_store_err)? {
let event_env: EventEnvelope =
serde_json::from_slice(&kv.value).map_err(|e| EngineError::store(e.to_string()))?;
acc = f(acc, event_env)?;
}
Ok(acc)
}
async fn stream_version(&self, stream_id: &StreamId) -> Result<u64, EngineError> {
self.read_version(stream_id).await
}
async fn list_streams(&self, prefix: Option<&str>) -> Result<Vec<StreamId>, EngineError> {
let scan_prefix = match prefix {
Some(p) => format!("si/{p}"),
None => "si/".to_string(),
};
let mut iter = self
.db
.scan_prefix(scan_prefix.as_bytes())
.await
.map_err(to_store_err)?;
let mut streams = Vec::new();
while let Some(kv) = iter.next().await.map_err(to_store_err)? {
let key =
std::str::from_utf8(&kv.key).map_err(|e| EngineError::store(e.to_string()))?;
if let Some(stream_id_str) = key.strip_prefix("si/") {
streams.push(
StreamId::try_new(stream_id_str)
.map_err(|e| EngineError::store(e.to_string()))?,
);
}
}
Ok(streams)
}
async fn list_streams_page(
&self,
prefix: Option<&str>,
cursor: Option<&StreamId>,
limit: usize,
) -> Result<Vec<StreamId>, EngineError> {
if limit == 0 {
return Ok(Vec::new());
}
let si_prefix = match prefix {
Some(p) => format!("si/{p}"),
None => "si/".to_string(),
};
let start_key: Vec<u8> = match cursor {
None => si_prefix.as_bytes().to_vec(),
Some(c) => {
let mut key = format!("si/{}{}", prefix.unwrap_or(""), c.as_str()).into_bytes();
key.push(0x00); key
}
};
let end_key: Option<Vec<u8>> = {
let mut upper = si_prefix.into_bytes();
let mut carried = true;
while carried {
if let Some(last) = upper.last_mut() {
if *last == u8::MAX {
upper.pop();
} else {
*last += 1;
carried = false;
}
} else {
carried = false;
}
}
if upper.is_empty() {
None } else {
Some(upper)
}
};
let mut iter = match end_key {
Some(end) => self
.db
.scan(start_key.as_slice()..end.as_slice())
.await
.map_err(to_store_err)?,
None => self
.db
.scan(start_key.as_slice()..)
.await
.map_err(to_store_err)?,
};
let si_prefix_str = match prefix {
Some(p) => format!("si/{p}"),
None => "si/".to_string(),
};
let mut streams = Vec::with_capacity(limit);
while streams.len() < limit {
match iter.next().await.map_err(to_store_err)? {
None => break,
Some(kv) => {
let key = std::str::from_utf8(&kv.key)
.map_err(|e| EngineError::store(e.to_string()))?;
if let Some(stream_id_str) = key.strip_prefix(&si_prefix_str) {
if !stream_id_str.is_empty() {
streams.push(
StreamId::try_new(stream_id_str)
.map_err(|e| EngineError::store(e.to_string()))?,
);
}
}
}
}
}
Ok(streams)
}
}
fn materialise_outbox(
pending: &PendingOutbox,
stream_id: &StreamId,
envelopes: &[EventEnvelope],
now: OffsetDateTime,
) -> OutboxMessage {
let idx = pending
.caused_by_event_index
.min(envelopes.len().saturating_sub(1));
let env = &envelopes[idx];
OutboxMessage {
message_id: crate::ids::OutboxMessageId::new(),
stream_id: stream_id.clone(),
process_id: env.process_id,
tenant_id: env.tenant_id,
correlation_id: env.correlation_id,
conversation_id: env.conversation_id,
causation_event_id: env.event_id,
message_type: pending.message_type.clone(),
recipient: pending.recipient.clone(),
payload: pending.payload.clone(),
payload_schema: pending.payload_schema.clone(),
created_at: now,
deliver_after: pending.deliver_after,
attempt_count: 0,
}
}
impl AtomicAppend for SlateDbStore {
async fn append_with_outbox(
&self,
stream_id: &StreamId,
expected_version: ExpectedVersion,
events: &[NewEvent],
outbox: &[PendingOutbox],
) -> Result<AppendResult, EngineError> {
let txn = self
.db
.begin(IsolationLevel::SerializableSnapshot)
.await
.map_err(to_store_err)?;
let current = read_version_in_txn(&txn, stream_id).await?;
check_version(expected_version, current)?;
self.check_quota(stream_id, current, events.len())?;
let now = OffsetDateTime::now_utc();
let envelopes = stamp_events_txn(&txn, stream_id, current, events, now)?;
if !outbox.is_empty() {
let messages: Vec<OutboxMessage> = outbox
.iter()
.map(|p| materialise_outbox(p, stream_id, &envelopes, now))
.collect();
let current_count = read_om_count_txn(&txn).await?;
let new_count = current_count + messages.len() as u64;
txn.put(OM_COUNT_KEY, new_count.to_le_bytes().as_slice())
.map_err(to_outbox_err)?;
write_outbox_entries_txn(&txn, &messages)?;
}
txn.commit()
.await
.map_err(|e| map_txn_commit_err(e, expected_version, current))?;
Ok(AppendResult {
last_sequence: current + events.len() as u64,
events: envelopes,
})
}
}
impl crate::projection::ProjectionCheckpointStore for SlateDbStore {
async fn load_projection_checkpoint(
&self,
name: &str,
) -> Result<crate::projection::GlobalProjectionCheckpoint, EngineError> {
let prefix = format!("cp/{name}/");
let end = cp_prefix_end(name);
let mut iter = self
.db
.scan(prefix.as_bytes()..end.as_bytes())
.await
.map_err(to_store_err)?;
let mut cp = crate::projection::GlobalProjectionCheckpoint::new();
while let Some(kv) = iter.next().await.map_err(to_store_err)? {
let key_str = std::str::from_utf8(&kv.key)
.map_err(|e| EngineError::store(format!("checkpoint key utf8: {e}")))?;
let stream_id_str = key_str.strip_prefix(&prefix).ok_or_else(|| {
EngineError::store(format!("unexpected checkpoint key: {key_str}"))
})?;
if kv.value.len() != 8 {
return Err(EngineError::store(format!(
"checkpoint cursor for '{key_str}' is {} bytes, expected 8",
kv.value.len()
)));
}
let seq = u64::from_le_bytes(kv.value[..8].try_into().unwrap());
cp.advance(&StreamId::new(stream_id_str), seq);
}
Ok(cp)
}
async fn save_projection_checkpoint(
&self,
name: &str,
checkpoint: &crate::projection::GlobalProjectionCheckpoint,
) -> Result<(), EngineError> {
let mut batch = WriteBatch::new();
for (stream_id, &seq) in &checkpoint.cursors {
let key = cp_cursor_key(name, stream_id);
batch.put(key.as_bytes(), seq.to_le_bytes().as_slice());
}
self.db.write(batch).await.map_err(to_store_err)?;
Ok(())
}
async fn advance_projection_cursors(
&self,
name: &str,
previous: &crate::projection::GlobalProjectionCheckpoint,
current: &crate::projection::GlobalProjectionCheckpoint,
) -> Result<(), EngineError> {
let mut batch = WriteBatch::new();
for (stream_id, &seq) in ¤t.cursors {
if seq > previous.cursor_for(stream_id) {
let key = cp_cursor_key(name, stream_id);
batch.put(key.as_bytes(), seq.to_le_bytes().as_slice());
}
}
self.db.write(batch).await.map_err(to_store_err)?;
Ok(())
}
}
impl OutboxStore for SlateDbStore {
async fn enqueue(&self, messages: &[OutboxMessage]) -> Result<(), EngineError> {
if messages.is_empty() {
return Ok(());
}
let txn = self
.db
.begin(IsolationLevel::SerializableSnapshot)
.await
.map_err(to_outbox_err)?;
let current_count = read_om_count_txn(&txn).await?;
let new_count = current_count + messages.len() as u64;
txn.put(OM_COUNT_KEY, new_count.to_le_bytes().as_slice())
.map_err(to_outbox_err)?;
write_outbox_entries_txn(&txn, messages)?;
txn.commit().await.map_err(to_outbox_err)?;
Ok(())
}
async fn pending(
&self,
limit: usize,
now: OffsetDateTime,
) -> Result<Vec<OutboxMessage>, EngineError> {
let now_nanos = u64::try_from(now.unix_timestamp_nanos().max(0)).unwrap_or(0);
let end_key = format!("ot/{now_nanos:016x}/\u{FFFF}");
let mut iter = self
.db
.scan(b"ot/".as_slice()..end_key.as_bytes())
.await
.map_err(to_outbox_err)?;
let mut result = Vec::new();
while let Some(kv) = iter.next().await.map_err(to_outbox_err)? {
if result.len() >= limit {
break;
}
let key =
std::str::from_utf8(&kv.key).map_err(|e| EngineError::outbox(e.to_string()))?;
let rest = match key.strip_prefix("ot/") {
Some(r) if r.len() >= 17 => r,
_ => continue,
};
let entry_nanos = u64::from_str_radix(&rest[..16], 16).unwrap_or(u64::MAX);
if entry_nanos > now_nanos {
break;
}
let msg_id_str = &rest[17..]; let msg_key = format!("om/{msg_id_str}");
if let Some(msg_bytes) = self
.db
.get(msg_key.as_bytes())
.await
.map_err(to_outbox_err)?
{
let msg: OutboxMessage = serde_json::from_slice(&msg_bytes)
.map_err(|e| EngineError::outbox(e.to_string()))?;
result.push(msg);
}
}
Ok(result)
}
async fn acknowledge(&self, id: OutboxMessageId) -> Result<(), EngineError> {
let msg_key = om_key(&id);
let txn = self
.db
.begin(IsolationLevel::SerializableSnapshot)
.await
.map_err(to_outbox_err)?;
if let Some(msg_bytes) = txn.get(msg_key.as_bytes()).await.map_err(to_outbox_err)? {
let msg: OutboxMessage = serde_json::from_slice(&msg_bytes)
.map_err(|e| EngineError::outbox(e.to_string()))?;
let ts_key = ot_key(msg.deliver_after.unwrap_or(msg.created_at), &id);
let current_count = read_om_count_txn(&txn).await?;
let new_count = current_count.saturating_sub(1);
txn.delete(msg_key.as_bytes()).map_err(to_outbox_err)?;
txn.delete(ts_key.as_bytes()).map_err(to_outbox_err)?;
txn.put(OM_COUNT_KEY, new_count.to_le_bytes().as_slice())
.map_err(to_outbox_err)?;
txn.commit().await.map_err(to_outbox_err)?;
}
Ok(())
}
async fn reschedule(
&self,
id: OutboxMessageId,
deliver_after: OffsetDateTime,
) -> Result<(), EngineError> {
const MAX_RESCHEDULE_RETRIES: usize = 8;
let msg_key = om_key(&id);
for _attempt in 0..MAX_RESCHEDULE_RETRIES {
let txn = self
.db
.begin(IsolationLevel::SerializableSnapshot)
.await
.map_err(to_outbox_err)?;
let Some(msg_bytes) = txn.get(msg_key.as_bytes()).await.map_err(to_outbox_err)? else {
txn.rollback();
return Ok(());
};
let mut msg: OutboxMessage = serde_json::from_slice(&msg_bytes)
.map_err(|e| EngineError::outbox(e.to_string()))?;
let old_ts_key = ot_key(msg.deliver_after.unwrap_or(msg.created_at), &id);
msg.deliver_after = Some(deliver_after);
msg.attempt_count += 1;
let new_ts_key = ot_key(deliver_after, &id);
let new_value =
serde_json::to_vec(&msg).map_err(|e| EngineError::outbox(e.to_string()))?;
txn.delete(old_ts_key.as_bytes()).map_err(to_outbox_err)?;
txn.put(msg_key.as_bytes(), new_value.as_slice())
.map_err(to_outbox_err)?;
txn.put(new_ts_key.as_bytes(), b"").map_err(to_outbox_err)?;
match txn.commit().await {
Ok(_) => return Ok(()),
Err(e) if e.kind() == ErrorKind::Transaction => {
}
Err(e) => return Err(to_outbox_err(e)),
}
}
Err(EngineError::outbox("reschedule conflict: too many retries"))
}
async fn len(&self) -> Result<usize, EngineError> {
Ok(usize::try_from(read_om_count(&self.db).await?).unwrap_or(usize::MAX))
}
}
const DL_COUNT_KEY: &[u8] = b"_count/dl";
async fn read_dl_count_txn(txn: &slatedb::DbTransaction) -> Result<u64, EngineError> {
match txn.get(DL_COUNT_KEY).await.map_err(to_deadline_err)? {
None => Ok(0), Some(bytes) if bytes.len() == 8 => Ok(u64::from_le_bytes(bytes[..8].try_into().unwrap())),
Some(bytes) => Err(EngineError::deadline(format!(
"_count/dl corrupt: expected 8-byte little-endian u64, got {} bytes",
bytes.len()
))),
}
}
const PR_COUNT_KEY: &[u8] = b"_count/pr";
async fn read_pr_count_txn(txn: &slatedb::DbTransaction) -> Result<u64, EngineError> {
match txn.get(PR_COUNT_KEY).await.map_err(to_registry_err)? {
None => Ok(0), Some(bytes) if bytes.len() == 8 => Ok(u64::from_le_bytes(bytes[..8].try_into().unwrap())),
Some(bytes) => Err(EngineError::registry(format!(
"_count/pr corrupt: expected 8-byte little-endian u64, got {} bytes",
bytes.len()
))),
}
}
#[derive(Clone)]
pub struct SlateDbDeadlineStore {
db: Db,
}
impl DeadlineStore for SlateDbDeadlineStore {
async fn register(&self, deadline: &Deadline) -> Result<(), EngineError> {
let payload =
serde_json::to_vec(deadline).map_err(|e| EngineError::deadline(e.to_string()))?;
let dl_k = dl_key(&deadline.deadline_id());
let time_key = dt_key(deadline.due_at(), &deadline.deadline_id());
let stream_key = ds_key(deadline.stream_id(), &deadline.deadline_id());
let txn = self
.db
.begin(IsolationLevel::SerializableSnapshot)
.await
.map_err(to_deadline_err)?;
let is_new = txn
.get(dl_k.as_bytes())
.await
.map_err(to_deadline_err)?
.is_none();
if is_new {
let current_count = read_dl_count_txn(&txn).await?;
txn.put(DL_COUNT_KEY, (current_count + 1).to_le_bytes().as_slice())
.map_err(to_deadline_err)?;
}
txn.put(dl_k.as_bytes(), payload.as_slice())
.map_err(to_deadline_err)?;
txn.put(time_key.as_bytes(), b"").map_err(to_deadline_err)?;
txn.put(stream_key.as_bytes(), b"")
.map_err(to_deadline_err)?;
txn.commit().await.map_err(to_deadline_err)?;
Ok(())
}
async fn cancel(&self, id: DeadlineId) -> Result<(), EngineError> {
let dl_k = dl_key(&id);
if let Some(bytes) = self
.db
.get(dl_k.as_bytes())
.await
.map_err(to_deadline_err)?
{
let deadline: Deadline =
serde_json::from_slice(&bytes).map_err(|e| EngineError::deadline(e.to_string()))?;
let time_key = dt_key(deadline.due_at(), &id);
let stream_key = ds_key(deadline.stream_id(), &id);
let txn = self
.db
.begin(IsolationLevel::SerializableSnapshot)
.await
.map_err(to_deadline_err)?;
let current_count = read_dl_count_txn(&txn).await?;
txn.put(
DL_COUNT_KEY,
current_count.saturating_sub(1).to_le_bytes().as_slice(),
)
.map_err(to_deadline_err)?;
txn.delete(dl_k.as_bytes()).map_err(to_deadline_err)?;
txn.delete(time_key.as_bytes()).map_err(to_deadline_err)?;
txn.delete(stream_key.as_bytes()).map_err(to_deadline_err)?;
txn.commit().await.map_err(to_deadline_err)?;
}
Ok(())
}
async fn due_now(&self, limit: usize) -> Result<DueNowResult, EngineError> {
let now_nanos =
u64::try_from(OffsetDateTime::now_utc().unix_timestamp_nanos().max(0)).unwrap_or(0);
let end_key = format!("dt/{now_nanos:016x}/\u{FFFF}");
let mut iter = self
.db
.scan(b"dt/".as_slice()..end_key.as_bytes())
.await
.map_err(to_deadline_err)?;
let mut deadlines = Vec::new();
let mut future_entry_seen = false;
while let Some(kv) = iter.next().await.map_err(to_deadline_err)? {
let key =
std::str::from_utf8(&kv.key).map_err(|e| EngineError::deadline(e.to_string()))?;
let rest = match key.strip_prefix("dt/") {
Some(r) if r.len() >= 17 => r,
_ => continue,
};
let entry_nanos = u64::from_str_radix(&rest[..16], 16).unwrap_or(u64::MAX);
if entry_nanos > now_nanos {
future_entry_seen = true;
break;
}
if deadlines.len() >= limit {
let has_more = true;
return Ok(DueNowResult {
deadlines,
has_more,
});
}
let id_str = &rest[17..]; let dl_k = format!("dl/{id_str}");
if let Some(bytes) = self
.db
.get(dl_k.as_bytes())
.await
.map_err(to_deadline_err)?
{
let deadline: Deadline = serde_json::from_slice(&bytes)
.map_err(|e| EngineError::deadline(e.to_string()))?;
deadlines.push(deadline);
}
}
let has_more = !future_entry_seen && deadlines.len() == limit;
Ok(DueNowResult {
deadlines,
has_more,
})
}
async fn for_stream(&self, stream_id: &StreamId) -> Result<Vec<Deadline>, EngineError> {
let prefix = ds_stream_prefix(stream_id);
let mut iter = self
.db
.scan_prefix(prefix.as_bytes())
.await
.map_err(to_deadline_err)?;
let mut result = Vec::new();
while let Some(kv) = iter.next().await.map_err(to_deadline_err)? {
let key =
std::str::from_utf8(&kv.key).map_err(|e| EngineError::deadline(e.to_string()))?;
let deadline_id_str = match key.strip_prefix(prefix.as_str()) {
Some(id) if !id.is_empty() => id,
_ => continue,
};
let dl_k = format!("dl/{deadline_id_str}");
if let Some(bytes) = self
.db
.get(dl_k.as_bytes())
.await
.map_err(to_deadline_err)?
{
let deadline: Deadline = serde_json::from_slice(&bytes)
.map_err(|e| EngineError::deadline(e.to_string()))?;
result.push(deadline);
}
}
Ok(result)
}
async fn len(&self) -> Result<usize, EngineError> {
match self.db.get(DL_COUNT_KEY).await.map_err(to_deadline_err)? {
Some(bytes) if bytes.len() == 8 => {
return Ok(
usize::try_from(u64::from_le_bytes(bytes[..8].try_into().unwrap()))
.unwrap_or(usize::MAX),
);
}
_ => {} }
let mut iter = self.db.scan_prefix(b"dl/").await.map_err(to_deadline_err)?;
let mut count = 0u64;
while iter.next().await.map_err(to_deadline_err)?.is_some() {
count += 1;
}
let mut batch = WriteBatch::new();
batch.put(DL_COUNT_KEY, count.to_le_bytes().as_slice());
let _ = self.db.write(batch).await;
Ok(usize::try_from(count).unwrap_or(usize::MAX))
}
}
#[derive(Clone)]
pub struct SlateDbProcessRegistry {
db: Db,
}
impl ProcessRegistry for SlateDbProcessRegistry {
async fn register(
&self,
tenant_id: TenantId,
key: &RegistryKey,
identity: ProcessIdentity,
) -> Result<(), EngineError> {
let k = pr_key(tenant_id, key);
let v = serde_json::to_vec(&identity).map_err(|e| EngineError::registry(e.to_string()))?;
let txn = self
.db
.begin(IsolationLevel::SerializableSnapshot)
.await
.map_err(to_registry_err)?;
let is_new = txn
.get(k.as_bytes())
.await
.map_err(to_registry_err)?
.is_none();
if is_new {
let current_count = read_pr_count_txn(&txn).await?;
txn.put(PR_COUNT_KEY, (current_count + 1).to_le_bytes().as_slice())
.map_err(to_registry_err)?;
}
txn.put(k.as_bytes(), v.as_slice())
.map_err(to_registry_err)?;
txn.commit().await.map_err(to_registry_err)?;
Ok(())
}
async fn lookup(
&self,
tenant_id: TenantId,
key: &RegistryKey,
) -> Result<Option<ProcessIdentity>, EngineError> {
let k = pr_key(tenant_id, key);
match self.db.get(k.as_bytes()).await.map_err(to_registry_err)? {
Some(bytes) => {
let identity: ProcessIdentity = serde_json::from_slice(&bytes)
.map_err(|e| EngineError::registry(e.to_string()))?;
Ok(Some(identity))
}
None => Ok(None),
}
}
async fn remove(&self, tenant_id: TenantId, key: &RegistryKey) -> Result<(), EngineError> {
let k = pr_key(tenant_id, key);
let txn = self
.db
.begin(IsolationLevel::SerializableSnapshot)
.await
.map_err(to_registry_err)?;
if txn
.get(k.as_bytes())
.await
.map_err(to_registry_err)?
.is_some()
{
let current_count = read_pr_count_txn(&txn).await?;
txn.put(
PR_COUNT_KEY,
current_count.saturating_sub(1).to_le_bytes().as_slice(),
)
.map_err(to_registry_err)?;
txn.delete(k.as_bytes()).map_err(to_registry_err)?;
}
txn.commit().await.map_err(to_registry_err)?;
Ok(())
}
async fn len(&self) -> Result<usize, EngineError> {
match self.db.get(PR_COUNT_KEY).await.map_err(to_registry_err)? {
Some(bytes) if bytes.len() == 8 => {
return Ok(
usize::try_from(u64::from_le_bytes(bytes[..8].try_into().unwrap()))
.unwrap_or(usize::MAX),
);
}
_ => {} }
let mut iter = self.db.scan_prefix(b"pr/").await.map_err(to_registry_err)?;
let mut count = 0u64;
while iter.next().await.map_err(to_registry_err)?.is_some() {
count += 1;
}
let mut batch = WriteBatch::new();
batch.put(PR_COUNT_KEY, count.to_le_bytes().as_slice());
let _ = self.db.write(batch).await;
Ok(usize::try_from(count).unwrap_or(usize::MAX))
}
async fn register_correlated(
&self,
tenant_id: TenantId,
tag: &str,
process_id: crate::ids::ProcessId,
identity: ProcessIdentity,
) -> Result<(), EngineError> {
validate_ci_tag(tag)?;
let k = ci_key(tenant_id, tag, process_id);
let v = serde_json::to_vec(&identity).map_err(|e| EngineError::registry(e.to_string()))?;
let mut batch = WriteBatch::new();
batch.put(k.as_bytes(), v.as_slice());
self.db.write(batch).await.map_err(to_registry_err)?;
Ok(())
}
async fn lookup_correlated(
&self,
tenant_id: TenantId,
tag: &str,
) -> Result<Vec<ProcessIdentity>, EngineError> {
validate_ci_tag(tag)?;
let prefix = ci_tag_prefix(tenant_id, tag);
let mut iter = self
.db
.scan_prefix(prefix.as_bytes())
.await
.map_err(to_registry_err)?;
let mut results = Vec::new();
while let Some(entry) = iter.next().await.map_err(to_registry_err)? {
let identity: ProcessIdentity = serde_json::from_slice(&entry.value)
.map_err(|e| EngineError::registry(e.to_string()))?;
results.push(identity);
}
Ok(results)
}
async fn remove_correlated(
&self,
tenant_id: TenantId,
tag: &str,
process_id: crate::ids::ProcessId,
) -> Result<(), EngineError> {
validate_ci_tag(tag)?;
let k = ci_key(tenant_id, tag, process_id);
let mut batch = WriteBatch::new();
batch.delete(k.as_bytes());
self.db.write(batch).await.map_err(to_registry_err)?;
Ok(())
}
}
#[derive(Clone)]
pub struct SlateDbSnapshotStore {
db: Db,
}
fn to_snapshot_err(e: &slatedb::Error) -> EngineError {
tracing::error!(error = %e, "snapshot store error");
if slatedb_error_is_transient(e) {
EngineError::transient_snapshot(slatedb_error_kind_str(e))
} else {
EngineError::snapshot(slatedb_error_kind_str(e))
}
}
impl SnapshotStore for SlateDbSnapshotStore {
async fn save(&self, snapshot: &Snapshot) -> Result<(), EngineError> {
let key = sn_key(&snapshot.stream_id);
let value =
serde_json::to_vec(snapshot).map_err(|e| EngineError::snapshot(e.to_string()))?;
let mut batch = WriteBatch::new();
batch.put(key.as_bytes(), value.as_slice());
self.db
.write(batch)
.await
.map_err(|e| to_snapshot_err(&e))?;
Ok(())
}
async fn load(&self, stream_id: &StreamId) -> Result<Option<Snapshot>, EngineError> {
let key = sn_key(stream_id);
match self
.db
.get(key.as_bytes())
.await
.map_err(|e| to_snapshot_err(&e))?
{
None => Ok(None),
Some(bytes) => {
let snapshot: Snapshot = serde_json::from_slice(&bytes)
.map_err(|e| EngineError::snapshot(e.to_string()))?;
Ok(Some(snapshot))
}
}
}
}
#[derive(Clone)]
pub struct SlateDbInboxStore {
db: Db,
}
impl SlateDbInboxStore {
pub async fn purge_expired(&self, before: OffsetDateTime) -> Result<usize, EngineError> {
let cutoff_nanos = u64::try_from(before.unix_timestamp_nanos().max(0)).unwrap_or(0);
let mut iter = self.db.scan_prefix(b"it/").await.map_err(to_inbox_err)?;
let mut purged = 0usize;
let mut batch = WriteBatch::new();
while let Some(kv) = iter.next().await.map_err(to_inbox_err)? {
let key_str =
std::str::from_utf8(&kv.key).map_err(|e| EngineError::inbox(e.to_string()))?;
let rest = match key_str.strip_prefix("it/") {
Some(r) if r.len() >= 17 => r,
_ => continue,
};
let entry_nanos = u64::from_str_radix(&rest[..16], 16).unwrap_or(u64::MAX);
if entry_nanos >= cutoff_nanos {
break;
}
let inbox_key =
std::str::from_utf8(&kv.value).map_err(|e| EngineError::inbox(e.to_string()))?;
let ib_k = ib_key(inbox_key);
batch.delete(kv.key.as_ref());
batch.delete(ib_k.as_bytes());
purged += 1;
if purged % 1000 == 0 {
self.db
.write(std::mem::replace(&mut batch, WriteBatch::new()))
.await
.map_err(to_inbox_err)?;
}
}
if !batch.is_empty() {
self.db.write(batch).await.map_err(to_inbox_err)?;
}
Ok(purged)
}
}
impl InboxStore for SlateDbInboxStore {
async fn accept(&self, key: &str) -> Result<bool, EngineError> {
const MAX_ACCEPT_RETRIES: usize = 8;
if key.len() > crate::inbox::MAX_INBOX_KEY_LEN {
return Err(EngineError::inbox(format!(
"inbox key is {} bytes, exceeds maximum of {}",
key.len(),
crate::inbox::MAX_INBOX_KEY_LEN,
)));
}
let ib_k = ib_key(key);
for _attempt in 0..MAX_ACCEPT_RETRIES {
let txn = self
.db
.begin(IsolationLevel::SerializableSnapshot)
.await
.map_err(to_inbox_err)?;
if txn
.get(ib_k.as_bytes())
.await
.map_err(to_inbox_err)?
.is_some()
{
txn.rollback();
return Ok(false);
}
let now = OffsetDateTime::now_utc();
let nonce = uuid::Uuid::new_v4().to_string();
let time_key = it_key(now, &nonce);
txn.put(ib_k.as_bytes(), b"").map_err(to_inbox_err)?;
txn.put(time_key.as_bytes(), key.as_bytes())
.map_err(to_inbox_err)?;
match txn.commit().await {
Ok(_) => return Ok(true),
Err(e) if e.kind() == ErrorKind::Transaction => {
}
Err(e) => return Err(to_inbox_err(e)),
}
}
Err(EngineError::inbox(
"accept conflict: too many retries (concurrent accept storm)",
))
}
}
const DL_BUFFER_CAPACITY: usize = 512;
struct DlEntry {
label: String,
detail: String,
}
#[derive(Clone)]
pub struct SlateDbDeadLetterSink {
inner: std::sync::Arc<DlSinkInner>,
}
struct DlSinkInner {
tx: std::sync::Mutex<Option<tokio::sync::mpsc::Sender<DlEntry>>>,
}
impl std::fmt::Debug for SlateDbDeadLetterSink {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let open = self.inner.tx.lock().map(|g| g.is_some()).unwrap_or(false);
f.debug_struct("SlateDbDeadLetterSink")
.field("channel_open", &open)
.finish_non_exhaustive()
}
}
impl SlateDbDeadLetterSink {
pub fn signal_shutdown(&self) {
if let Ok(mut guard) = self.inner.tx.lock() {
*guard = None;
}
}
}
impl crate::dead_letter::DeadLetterSink for SlateDbDeadLetterSink {
fn reject(&self, reason: &crate::dead_letter::DeadLetterReason) {
let label = reason.label();
let detail = reason.to_string();
let metric_label: std::borrow::Cow<'static, str> = match reason {
crate::dead_letter::DeadLetterReason::UnknownPid { pid, .. } => {
format!("unknown_pid:{pid}").into()
}
_ => label.into(),
};
crate::metrics::EngineMetrics::global().dead_letter_recorded(&metric_label);
tracing::warn!(
reason = label,
detail = %detail,
"dead letter: enqueueing for durable DLQ persistence",
);
let guard = self.inner.tx.lock().unwrap();
let Some(tx) = &*guard else {
return;
};
if let Err(_full) = tx.try_send(DlEntry {
label: label.to_owned(),
detail,
}) {
tracing::error!(
reason = label,
capacity = DL_BUFFER_CAPACITY,
"dead letter channel full; entry dropped — check for mass-rejection storm",
);
crate::metrics::EngineMetrics::global().dead_letter_recorded("channel_overflow");
}
}
}
pub struct SlateDbDeadLetterWorker {
rx: tokio::sync::mpsc::Receiver<DlEntry>,
db: Db,
}
impl std::fmt::Debug for SlateDbDeadLetterWorker {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SlateDbDeadLetterWorker")
.finish_non_exhaustive()
}
}
impl SlateDbDeadLetterWorker {
pub async fn run(mut self) -> u64 {
let mut count = 0u64;
while let Some(entry) = self.rx.recv().await {
Self::persist_entry(&self.db, entry).await;
count += 1;
}
count
}
async fn persist_entry(db: &Db, entry: DlEntry) {
let now = OffsetDateTime::now_utc();
let uuid = uuid::Uuid::new_v4().to_string();
let key = dr_key(now, &uuid);
let record = DeadLetterRecord {
rejected_at: now.to_string(),
reason_label: entry.label,
reason_detail: entry.detail,
};
if let Ok(bytes) = serde_json::to_vec(&record) {
let mut batch = WriteBatch::new();
batch.put(key.as_bytes(), &bytes);
if let Err(e) = db.write(batch).await {
tracing::error!(
error = %e,
"SlateDbDeadLetterWorker: failed to persist dead-letter entry; \
entry is lost — check storage health",
);
}
}
}
}
#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct DeadLetterRecord {
pub rejected_at: String,
pub reason_label: String,
pub reason_detail: String,
}
fn dr_key(ts: OffsetDateTime, uuid: &str) -> String {
let nanos = u64::try_from(ts.unix_timestamp_nanos().max(0)).unwrap_or(0);
format!("dr/{nanos:016x}/{uuid}")
}
impl SlateDbStore {
#[must_use]
pub fn as_dead_letter_sink(&self) -> (SlateDbDeadLetterSink, SlateDbDeadLetterWorker) {
let (tx, rx) = tokio::sync::mpsc::channel(DL_BUFFER_CAPACITY);
let inner = std::sync::Arc::new(DlSinkInner {
tx: std::sync::Mutex::new(Some(tx)),
});
(
SlateDbDeadLetterSink { inner },
SlateDbDeadLetterWorker {
rx,
db: self.db.clone(),
},
)
}
pub async fn list_dead_letters(
&self,
limit: usize,
) -> Result<Vec<DeadLetterRecord>, EngineError> {
if limit == 0 {
return Ok(Vec::new());
}
let mut iter = self
.db
.scan_prefix(b"dr/")
.await
.map_err(|e| EngineError::dead_letter(slatedb_error_kind_str(&e)))?;
let mut all: Vec<Vec<u8>> = Vec::new();
while let Some(kv) = iter
.next()
.await
.map_err(|e| EngineError::dead_letter(slatedb_error_kind_str(&e)))?
{
all.push(kv.value.to_vec());
}
all.reverse();
all.truncate(limit);
let mut records = Vec::with_capacity(all.len());
for bytes in all {
let record: DeadLetterRecord = serde_json::from_slice(&bytes)
.map_err(|e| EngineError::dead_letter(e.to_string()))?;
records.push(record);
}
Ok(records)
}
#[must_use]
pub fn as_partner_store(&self) -> SlateDbPartnerStore {
SlateDbPartnerStore {
db: self.db.clone(),
}
}
}
#[derive(Clone)]
pub struct SlateDbPartnerStore {
db: Db,
}
fn pt_key(tenant_id: TenantId, gln: &crate::types::MarktpartnerCode) -> String {
format!("pt/{tenant_id}/{gln}")
}
fn pt_tenant_prefix(tenant_id: TenantId) -> String {
format!("pt/{tenant_id}/")
}
fn to_partner_err(e: &slatedb::Error) -> EngineError {
tracing::error!(error = %e, "partner store error");
if slatedb_error_is_transient(e) {
EngineError::transient_partner(slatedb_error_kind_str(e))
} else {
EngineError::partner(slatedb_error_kind_str(e))
}
}
impl crate::partner::PartnerStore for SlateDbPartnerStore {
async fn upsert(
&self,
tenant_id: TenantId,
record: &crate::partner::PartnerRecord,
) -> Result<(), EngineError> {
const MAX_UPSERT_RETRIES: usize = 8;
let key = pt_key(tenant_id, &record.gln);
for _attempt in 0..MAX_UPSERT_RETRIES {
let txn = self
.db
.begin(IsolationLevel::SerializableSnapshot)
.await
.map_err(|e| to_partner_err(&e))?;
let merged = match txn
.get(key.as_bytes())
.await
.map_err(|e| to_partner_err(&e))?
{
Some(bytes) => {
let mut existing: crate::partner::PartnerRecord =
serde_json::from_slice(&bytes)
.map_err(|e| EngineError::partner(e.to_string()))?;
existing.merge_from_partin(record.clone());
existing
}
None => record.clone(),
};
let v = serde_json::to_vec(&merged).map_err(|e| EngineError::partner(e.to_string()))?;
txn.put(key.as_bytes(), v.as_slice())
.map_err(|e| to_partner_err(&e))?;
match txn.commit().await {
Ok(_) => return Ok(()),
Err(e) if e.kind() == ErrorKind::Transaction => {}
Err(e) => return Err(to_partner_err(&e)),
}
}
Err(EngineError::partner(
"partner upsert conflict: too many retries (concurrent update storm)",
))
}
async fn get(
&self,
tenant_id: TenantId,
gln: &crate::types::MarktpartnerCode,
) -> Result<Option<crate::partner::PartnerRecord>, EngineError> {
let key = pt_key(tenant_id, gln);
match self
.db
.get(key.as_bytes())
.await
.map_err(|e| to_partner_err(&e))?
{
None => Ok(None),
Some(bytes) => {
let record: crate::partner::PartnerRecord = serde_json::from_slice(&bytes)
.map_err(|e| EngineError::partner(e.to_string()))?;
Ok(Some(record))
}
}
}
async fn remove(
&self,
tenant_id: TenantId,
gln: &crate::types::MarktpartnerCode,
) -> Result<(), EngineError> {
let key = pt_key(tenant_id, gln);
let mut batch = WriteBatch::new();
batch.delete(key.as_bytes());
self.db.write(batch).await.map_err(|e| to_partner_err(&e))?;
Ok(())
}
async fn list(
&self,
tenant_id: TenantId,
) -> Result<Vec<crate::partner::PartnerRecord>, EngineError> {
let prefix = pt_tenant_prefix(tenant_id);
let mut iter = self
.db
.scan_prefix(prefix.as_bytes())
.await
.map_err(|e| to_partner_err(&e))?;
let mut results = Vec::new();
while let Some(entry) = iter.next().await.map_err(|e| to_partner_err(&e))? {
let record: crate::partner::PartnerRecord = serde_json::from_slice(&entry.value)
.map_err(|e| EngineError::partner(e.to_string()))?;
results.push(record);
}
Ok(results)
}
}
#[cfg(test)]
mod tests {
use super::*;
use time::{Duration, OffsetDateTime};
#[allow(unused_imports)]
use crate::{
deadline::Deadline,
envelope::NewEvent,
event_store::{AtomicAppend, EventStore, ExpectedVersion},
ids::{
ConversationId, CorrelationId, EventId, OutboxMessageId, ProcessId, StreamId, TenantId,
},
inbox::InboxStore,
outbox::{OutboxMessage, OutboxStore, PendingOutbox},
registry::RegistryKey,
version::WorkflowId,
};
#[test]
fn isolation_level_default_is_snapshot_not_ssi() {
assert_eq!(
IsolationLevel::default(),
IsolationLevel::Snapshot,
"SlateDB changed IsolationLevel::default() — check whether explicit \
SerializableSnapshot calls in store_slatedb.rs are still required",
);
assert_ne!(
IsolationLevel::SerializableSnapshot,
IsolationLevel::Snapshot,
"SerializableSnapshot and Snapshot must not be the same variant",
);
}
#[test]
fn slatedb_error_transient_classification_consistent() {
use crate::error::EngineError;
assert!(EngineError::transient_store("storage unavailable").is_transient());
assert!(EngineError::transient_outbox("outbox unavailable").is_transient());
assert!(EngineError::transient_deadline("deadline unavailable").is_transient());
assert!(EngineError::transient_registry("registry unavailable").is_transient());
assert!(EngineError::transient_inbox("inbox unavailable").is_transient());
assert!(EngineError::transient_snapshot("snapshot unavailable").is_transient());
assert!(EngineError::transient_partner("partner unavailable").is_transient());
assert!(EngineError::transient_dead_letter("dead-letter unavailable").is_transient());
assert!(!EngineError::store("data integrity error").is_transient());
assert!(!EngineError::outbox("outbox conflict").is_transient());
assert!(!EngineError::dead_letter("serialization error").is_transient());
assert!(
EngineError::Transport {
endpoint: "http://test".into(),
message: "connection refused".into(),
}
.is_transient()
);
assert!(
!EngineError::PartnerUnknown {
recipient: "0123456789012".into()
}
.is_transient()
);
}
async fn make_store() -> SlateDbStore {
SlateDbStore::open_in_memory_with_label("test")
.await
.unwrap()
}
fn make_stream(name: &str) -> StreamId {
StreamId::new(name)
}
fn make_event(seq_hint: u64) -> NewEvent {
let wf = WorkflowId::new("test-workflow", "FV2025-10-01");
NewEvent {
correlation_id: CorrelationId::new(),
causation_id: None,
conversation_id: ConversationId::new(),
process_id: ProcessId::new(),
tenant_id: TenantId::new(),
workflow_id: wf,
event_type: format!("TestEvent{seq_hint}").into(),
schema_version: 1,
payload: serde_json::json!({ "n": seq_hint }),
}
}
fn make_outbox_msg(deliver_in: Duration) -> OutboxMessage {
let now = OffsetDateTime::now_utc();
OutboxMessage {
message_id: OutboxMessageId::new(),
stream_id: StreamId::new("test/stream"),
process_id: ProcessId::new(),
tenant_id: TenantId::new(),
correlation_id: CorrelationId::new(),
conversation_id: ConversationId::new(),
causation_event_id: EventId::new(),
message_type: "APERAK".into(),
recipient: "9900000000001".into(),
payload: serde_json::json!({ "type": "APERAK" }),
payload_schema: None,
created_at: now,
deliver_after: Some(now + deliver_in),
attempt_count: 0,
}
}
#[tokio::test]
async fn append_load_round_trip() {
let store = make_store().await;
let stream = make_stream("test/round-trip");
let events = vec![make_event(1), make_event(2), make_event(3)];
let result = store
.append(&stream, ExpectedVersion::NoStream, &events)
.await
.unwrap();
assert_eq!(result.last_sequence, 3);
assert_eq!(result.events.len(), 3);
let loaded = store.load(&stream).await.unwrap();
assert_eq!(loaded.len(), 3);
assert_eq!(loaded[0].sequence_number, 1);
assert_eq!(loaded[1].sequence_number, 2);
assert_eq!(loaded[2].sequence_number, 3);
assert_eq!(loaded[0].event_type.as_ref(), "TestEvent1");
assert_eq!(loaded[2].event_type.as_ref(), "TestEvent3");
}
#[tokio::test]
async fn version_conflict_on_concurrent_append() {
let store = make_store().await;
let stream = make_stream("test/conflict");
store
.append(&stream, ExpectedVersion::NoStream, &[make_event(1)])
.await
.unwrap();
let err = store
.append(&stream, ExpectedVersion::Exact(0), &[make_event(2)])
.await
.unwrap_err();
assert!(
matches!(
err,
EngineError::VersionConflict {
expected: 0,
actual: 1
}
),
"expected VersionConflict, got {err:?}",
);
}
#[tokio::test]
async fn concurrent_race_produces_exactly_one_conflict() {
use std::sync::Arc;
let store = Arc::new(make_store().await);
let stream = make_stream("test/race");
store
.append(&stream, ExpectedVersion::NoStream, &[make_event(0)])
.await
.unwrap();
let s1 = Arc::clone(&store);
let st1 = stream.clone();
let s2 = Arc::clone(&store);
let st2 = stream.clone();
let t1 = tokio::spawn(async move {
s1.append(&st1, ExpectedVersion::Exact(1), &[make_event(10)])
.await
});
let t2 = tokio::spawn(async move {
s2.append(&st2, ExpectedVersion::Exact(1), &[make_event(20)])
.await
});
let r1 = t1.await.expect("task 1 panicked");
let r2 = t2.await.expect("task 2 panicked");
let successes = [&r1, &r2].iter().filter(|r| r.is_ok()).count();
let conflicts = [&r1, &r2]
.iter()
.filter(|r| matches!(r, Err(EngineError::VersionConflict { .. })))
.count();
assert_eq!(
successes, 1,
"exactly one task should succeed; r1={r1:?}, r2={r2:?}"
);
assert_eq!(
conflicts, 1,
"exactly one task should get VersionConflict; r1={r1:?}, r2={r2:?}"
);
assert_eq!(
store.stream_version(&stream).await.unwrap(),
2,
"final stream version must be 2 (seed + one winner)",
);
}
#[tokio::test]
async fn stream_version_tracks_appends() {
let store = make_store().await;
let stream = make_stream("test/version");
assert_eq!(store.stream_version(&stream).await.unwrap(), 0);
store
.append(&stream, ExpectedVersion::NoStream, &[make_event(1)])
.await
.unwrap();
assert_eq!(store.stream_version(&stream).await.unwrap(), 1);
store
.append(
&stream,
ExpectedVersion::Exact(1),
&[make_event(2), make_event(3)],
)
.await
.unwrap();
assert_eq!(store.stream_version(&stream).await.unwrap(), 3);
}
#[tokio::test]
async fn load_from_skips_earlier_events() {
let store = make_store().await;
let stream = make_stream("test/load-from");
let events: Vec<NewEvent> = (1..=10).map(make_event).collect();
store
.append(&stream, ExpectedVersion::NoStream, &events)
.await
.unwrap();
let tail = store.load_from(&stream, 7).await.unwrap();
assert_eq!(tail.len(), 3);
assert_eq!(tail[0].sequence_number, 8);
assert_eq!(tail[2].sequence_number, 10);
}
#[tokio::test]
async fn fold_stream_streams_without_vec() {
let store = make_store().await;
let stream = make_stream("test/fold");
let events: Vec<NewEvent> = (1..=5).map(make_event).collect();
store
.append(&stream, ExpectedVersion::NoStream, &events)
.await
.unwrap();
let sum = store
.fold_stream(&stream, 0, 0u64, |acc, env| Ok(acc + env.sequence_number))
.await
.unwrap();
assert_eq!(sum, 15);
}
#[tokio::test]
async fn fold_stream_from_sequence_filters_correctly() {
let store = make_store().await;
let stream = make_stream("test/fold-from");
let events: Vec<NewEvent> = (1..=5).map(make_event).collect();
store
.append(&stream, ExpectedVersion::NoStream, &events)
.await
.unwrap();
let sum = store
.fold_stream(&stream, 3, 0u64, |acc, env| Ok(acc + env.sequence_number))
.await
.unwrap();
assert_eq!(sum, 9);
}
#[tokio::test]
async fn list_streams_with_prefix_filter() {
let store = make_store().await;
store
.append(
&make_stream("process/alpha"),
ExpectedVersion::NoStream,
&[make_event(1)],
)
.await
.unwrap();
store
.append(
&make_stream("process/beta"),
ExpectedVersion::NoStream,
&[make_event(1)],
)
.await
.unwrap();
store
.append(
&make_stream("other/gamma"),
ExpectedVersion::NoStream,
&[make_event(1)],
)
.await
.unwrap();
let process_streams = store.list_streams(Some("process/")).await.unwrap();
assert_eq!(process_streams.len(), 2);
assert!(
process_streams
.iter()
.any(|s| s.as_str() == "process/alpha")
);
assert!(process_streams.iter().any(|s| s.as_str() == "process/beta"));
let all_streams = store.list_streams(None).await.unwrap();
assert_eq!(all_streams.len(), 3);
}
#[tokio::test]
async fn pending_returns_due_messages_in_order() {
let store = make_store().await;
let past = make_outbox_msg(Duration::hours(-2));
let now = make_outbox_msg(Duration::ZERO);
let future = make_outbox_msg(Duration::hours(1));
store
.enqueue(&[past.clone(), now.clone(), future.clone()])
.await
.unwrap();
let cutoff = OffsetDateTime::now_utc() + Duration::milliseconds(50);
let pending = store.pending(10, cutoff).await.unwrap();
assert_eq!(pending.len(), 2, "only past and now-due messages expected");
assert!(
pending[0].deliver_after <= pending[1].deliver_after,
"pending messages must be in chronological order",
);
assert!(!pending.iter().any(|m| m.message_id == future.message_id));
}
#[tokio::test]
async fn acknowledge_removes_message_and_time_index() {
let store = make_store().await;
let msg = make_outbox_msg(Duration::hours(-1));
let id = msg.message_id;
store.enqueue(&[msg]).await.unwrap();
let before = store.pending(10, OffsetDateTime::now_utc()).await.unwrap();
assert_eq!(before.len(), 1);
store.acknowledge(id).await.unwrap();
let after = store.pending(10, OffsetDateTime::now_utc()).await.unwrap();
assert_eq!(
after.len(),
0,
"message must not be visible after acknowledge"
);
assert_eq!(store.len().await.unwrap(), 0, "om/ entry must be deleted");
}
#[tokio::test]
async fn reschedule_moves_time_index_and_increments_attempt_count() {
let store = make_store().await;
let msg = make_outbox_msg(Duration::hours(-1));
let id = msg.message_id;
store.enqueue(&[msg]).await.unwrap();
let future = OffsetDateTime::now_utc() + Duration::hours(1);
store.reschedule(id, future).await.unwrap();
let pending = store.pending(10, OffsetDateTime::now_utc()).await.unwrap();
assert_eq!(pending.len(), 0, "rescheduled message must not be due yet");
let msg_key = om_key(&id);
let bytes = store.db.get(msg_key.as_bytes()).await.unwrap().unwrap();
let updated: OutboxMessage = serde_json::from_slice(&bytes).unwrap();
assert_eq!(updated.attempt_count, 1);
assert_eq!(updated.deliver_after, Some(future));
}
#[tokio::test]
async fn append_with_outbox_is_atomic() {
let store = make_store().await;
let stream = make_stream("test/atomic");
let event = make_event(1);
let pending_outbox = PendingOutbox::new(
"APERAK",
"9900000000001",
serde_json::json!({ "type": "APERAK" }),
);
store
.append_with_outbox(
&stream,
ExpectedVersion::NoStream,
&[event],
&[pending_outbox],
)
.await
.unwrap();
let events = store.load(&stream).await.unwrap();
assert_eq!(events.len(), 1);
let pending = store.pending(10, OffsetDateTime::now_utc()).await.unwrap();
assert_eq!(pending.len(), 1);
assert_eq!(pending[0].causation_event_id, events[0].event_id);
assert_eq!(pending[0].message_type.as_ref(), "APERAK");
}
#[tokio::test]
async fn ot_key_pre_epoch_clamps_to_epoch() {
let pre_epoch = OffsetDateTime::UNIX_EPOCH - Duration::hours(1);
let id = OutboxMessageId::new();
let key = ot_key(pre_epoch, &id);
let epoch_key = ot_key(OffsetDateTime::UNIX_EPOCH, &id);
assert!(
key <= epoch_key,
"pre-epoch key {key} must sort at or before epoch key {epoch_key}",
);
}
async fn make_deadline_store() -> (SlateDbStore, SlateDbDeadlineStore) {
let s = SlateDbStore::open_in_memory_with_label("deadlines-test")
.await
.unwrap();
let ds = s.as_deadline_store();
(s, ds)
}
fn make_deadline(due_at: OffsetDateTime) -> Deadline {
use crate::ids::ProcessId;
Deadline::new(
StreamId::new("process/test"),
ProcessId::new(),
TenantId::new(),
WorkflowId::new("test-workflow", "FV2025-10-01"),
"aperak-response-window",
due_at,
)
}
#[tokio::test]
async fn deadline_register_and_cancel() {
let (_, ds) = make_deadline_store().await;
let d = make_deadline(OffsetDateTime::now_utc() + Duration::days(1));
let id = d.deadline_id();
ds.register(&d).await.unwrap();
assert_eq!(ds.len().await.unwrap(), 1);
ds.cancel(id).await.unwrap();
assert_eq!(ds.len().await.unwrap(), 0);
}
#[tokio::test]
async fn deadline_due_now_only_returns_overdue() {
let (_, ds) = make_deadline_store().await;
let past = make_deadline(OffsetDateTime::now_utc() - Duration::seconds(1));
let future = make_deadline(OffsetDateTime::now_utc() + Duration::days(5));
ds.register(&past).await.unwrap();
ds.register(&future).await.unwrap();
let result = ds.due_now(100).await.unwrap();
assert_eq!(result.deadlines.len(), 1);
assert!(!result.has_more);
}
#[tokio::test]
async fn deadline_due_now_has_more_when_over_limit() {
let (_, ds) = make_deadline_store().await;
let past = OffsetDateTime::now_utc() - Duration::seconds(1);
for _ in 0..5 {
ds.register(&make_deadline(past)).await.unwrap();
}
let r = ds.due_now(3).await.unwrap();
assert_eq!(r.deadlines.len(), 3);
assert!(r.has_more);
let r2 = ds.due_now(10).await.unwrap();
assert_eq!(r2.deadlines.len(), 5);
assert!(!r2.has_more);
}
#[tokio::test]
async fn deadline_for_stream_filters_correctly() {
let (_, ds) = make_deadline_store().await;
let stream1 = StreamId::new("process/aaa");
let stream2 = StreamId::new("process/bbb");
let wf = WorkflowId::new("test-workflow", "FV2025-10-01");
let d1 = Deadline::new(
stream1.clone(),
ProcessId::new(),
TenantId::new(),
wf.clone(),
"label",
OffsetDateTime::now_utc() + Duration::days(1),
);
let d2 = Deadline::new(
stream2.clone(),
ProcessId::new(),
TenantId::new(),
wf,
"label",
OffsetDateTime::now_utc() + Duration::days(1),
);
ds.register(&d1).await.unwrap();
ds.register(&d2).await.unwrap();
let for1 = ds.for_stream(&stream1).await.unwrap();
assert_eq!(for1.len(), 1);
assert_eq!(for1[0].stream_id(), &stream1);
let for2 = ds.for_stream(&stream2).await.unwrap();
assert_eq!(for2.len(), 1);
assert_eq!(for2[0].stream_id(), &stream2);
}
#[tokio::test]
async fn deadline_register_upserts_same_id() {
use crate::ids::ProcessId;
let (_, ds) = make_deadline_store().await;
let d1 = make_deadline(OffsetDateTime::now_utc() + Duration::days(5));
let d2 = make_deadline(OffsetDateTime::now_utc() + Duration::days(10));
ds.register(&d1).await.unwrap();
ds.register(&d2).await.unwrap();
assert_eq!(ds.len().await.unwrap(), 2);
ds.register(&d1).await.unwrap();
let _ = ProcessId::new(); assert_eq!(
ds.len().await.unwrap(),
2,
"upsert must not grow the payload count"
);
}
fn make_reg_store(base: &SlateDbStore) -> SlateDbProcessRegistry {
base.as_process_registry()
}
fn make_identity() -> ProcessIdentity {
use crate::{ids::ProcessId, version::WorkflowId};
ProcessIdentity::new(
ProcessId::new(),
TenantId::new(),
WorkflowId::new("test", "FV2024-10-01"),
)
}
#[tokio::test]
async fn registry_register_and_lookup() {
let (store, _) = make_deadline_store().await;
let reg = make_reg_store(&store);
let tenant = TenantId::new();
let key = RegistryKey::parse("conv:abc").expect("valid key");
let id = make_identity();
reg.register(tenant, &key, id.clone()).await.unwrap();
let found = reg
.lookup(tenant, &key)
.await
.unwrap()
.expect("must be found");
assert_eq!(found.process_id, id.process_id);
}
#[tokio::test]
async fn registry_lookup_returns_none_for_unknown() {
let (store, _) = make_deadline_store().await;
let reg = make_reg_store(&store);
assert!(
reg.lookup(
TenantId::new(),
&RegistryKey::parse("nope").expect("valid key")
)
.await
.unwrap()
.is_none()
);
}
#[tokio::test]
async fn registry_remove_clears_mapping() {
let (store, _) = make_deadline_store().await;
let reg = make_reg_store(&store);
let tenant = TenantId::new();
let key = RegistryKey::parse("k1").expect("valid key");
reg.register(tenant, &key, make_identity()).await.unwrap();
reg.remove(tenant, &key).await.unwrap();
assert!(reg.lookup(tenant, &key).await.unwrap().is_none());
}
#[tokio::test]
async fn registry_upsert_overwrites_existing() {
let (store, _) = make_deadline_store().await;
let reg = make_reg_store(&store);
let tenant = TenantId::new();
let key = RegistryKey::parse("k1").expect("valid key");
let id1 = make_identity();
let id2 = make_identity();
reg.register(tenant, &key, id1).await.unwrap();
reg.register(tenant, &key, id2.clone()).await.unwrap();
let found = reg.lookup(tenant, &key).await.unwrap().unwrap();
assert_eq!(found.process_id, id2.process_id);
assert_eq!(reg.len().await.unwrap(), 1);
}
#[tokio::test]
async fn registry_tenant_isolation() {
let (store, _) = make_deadline_store().await;
let reg = make_reg_store(&store);
let t1 = TenantId::new();
let t2 = TenantId::new();
let key = RegistryKey::parse("k1").expect("valid key");
reg.register(t1, &key, make_identity()).await.unwrap();
assert!(reg.contains(t1, &key).await.unwrap());
assert!(!reg.contains(t2, &key).await.unwrap());
}
#[tokio::test]
async fn correlated_register_and_lookup() {
let (store, _) = make_deadline_store().await;
let reg = make_reg_store(&store);
let tenant = TenantId::new();
let id1 = make_identity();
let id2 = make_identity();
reg.register_correlated(tenant, "DE0001234567890", id1.process_id, id1.clone())
.await
.unwrap();
reg.register_correlated(tenant, "DE0001234567890", id2.process_id, id2.clone())
.await
.unwrap();
let mut results = reg
.lookup_correlated(tenant, "DE0001234567890")
.await
.unwrap();
assert_eq!(results.len(), 2);
results.sort_by_key(|id| id.process_id.to_string());
let mut expected = vec![id1.process_id, id2.process_id];
expected.sort_by_key(std::string::ToString::to_string);
assert_eq!(
results.iter().map(|id| id.process_id).collect::<Vec<_>>(),
expected,
);
}
#[tokio::test]
async fn correlated_lookup_empty_for_unknown_tag() {
let (store, _) = make_deadline_store().await;
let reg = make_reg_store(&store);
let results = reg
.lookup_correlated(TenantId::new(), "unknown-malo")
.await
.unwrap();
assert!(results.is_empty());
}
#[tokio::test]
async fn correlated_remove_single_entry() {
let (store, _) = make_deadline_store().await;
let reg = make_reg_store(&store);
let tenant = TenantId::new();
let id1 = make_identity();
let id2 = make_identity();
reg.register_correlated(tenant, "malo-1", id1.process_id, id1.clone())
.await
.unwrap();
reg.register_correlated(tenant, "malo-1", id2.process_id, id2.clone())
.await
.unwrap();
reg.remove_correlated(tenant, "malo-1", id1.process_id)
.await
.unwrap();
let results = reg.lookup_correlated(tenant, "malo-1").await.unwrap();
assert_eq!(results.len(), 1);
assert_eq!(results[0].process_id, id2.process_id);
}
#[tokio::test]
async fn correlated_tenant_isolation() {
let (store, _) = make_deadline_store().await;
let reg = make_reg_store(&store);
let t1 = TenantId::new();
let t2 = TenantId::new();
let id = make_identity();
reg.register_correlated(t1, "shared-tag", id.process_id, id.clone())
.await
.unwrap();
assert_eq!(
reg.lookup_correlated(t1, "shared-tag").await.unwrap().len(),
1
);
assert_eq!(
reg.lookup_correlated(t2, "shared-tag").await.unwrap().len(),
0
);
}
fn make_inbox_store(base: &SlateDbStore) -> SlateDbInboxStore {
base.as_inbox_store()
}
#[tokio::test]
async fn inbox_new_message_accepted() {
let (store, _) = make_deadline_store().await;
let inbox = make_inbox_store(&store);
assert!(inbox.accept("sender:ref-001").await.unwrap());
}
#[tokio::test]
async fn inbox_duplicate_rejected() {
let (store, _) = make_deadline_store().await;
let inbox = make_inbox_store(&store);
assert!(inbox.accept("sender:ref-001").await.unwrap());
assert!(!inbox.accept("sender:ref-001").await.unwrap());
}
#[tokio::test]
async fn inbox_different_senders_same_ref_are_independent() {
use crate::inbox::inbox_key;
let (store, _) = make_deadline_store().await;
let inbox = make_inbox_store(&store);
assert!(
inbox
.accept(&inbox_key("sender-A", "ref-001").unwrap())
.await
.unwrap()
);
assert!(
inbox
.accept(&inbox_key("sender-B", "ref-001").unwrap())
.await
.unwrap()
);
}
#[tokio::test]
async fn inbox_purge_expired_removes_old_entries() {
let (store, _) = make_deadline_store().await;
let inbox = make_inbox_store(&store);
inbox.accept("sender:old-ref").await.unwrap();
let far_future = OffsetDateTime::now_utc() + Duration::hours(200);
let purged = inbox.purge_expired(far_future).await.unwrap();
assert_eq!(purged, 1, "one entry should have been purged");
assert!(
inbox.accept("sender:old-ref").await.unwrap(),
"key should be gone after purge"
);
}
#[tokio::test]
async fn projection_checkpoint_round_trip() {
use crate::projection::{GlobalProjectionCheckpoint, ProjectionCheckpointStore};
let store = make_store().await;
let empty = store.load_projection_checkpoint("billing").await.unwrap();
assert_eq!(empty.cursor_for(&StreamId::new("any")), 0);
let mut cp = GlobalProjectionCheckpoint::new();
cp.advance(&StreamId::new("process/t1/p1"), 42);
cp.advance(&StreamId::new("process/t1/p2"), 7);
store
.save_projection_checkpoint("billing", &cp)
.await
.unwrap();
let loaded = store.load_projection_checkpoint("billing").await.unwrap();
assert_eq!(loaded.cursor_for(&StreamId::new("process/t1/p1")), 42);
assert_eq!(loaded.cursor_for(&StreamId::new("process/t1/p2")), 7);
assert_eq!(loaded.cursor_for(&StreamId::new("process/t1/p3")), 0);
}
#[tokio::test]
async fn projection_checkpoints_are_independent_by_name() {
use crate::projection::{GlobalProjectionCheckpoint, ProjectionCheckpointStore};
let store = make_store().await;
let mut cp_a = GlobalProjectionCheckpoint::new();
cp_a.advance(&StreamId::new("s1"), 10);
store
.save_projection_checkpoint("proj-a", &cp_a)
.await
.unwrap();
let mut cp_b = GlobalProjectionCheckpoint::new();
cp_b.advance(&StreamId::new("s1"), 99);
store
.save_projection_checkpoint("proj-b", &cp_b)
.await
.unwrap();
let loaded_a = store.load_projection_checkpoint("proj-a").await.unwrap();
let loaded_b = store.load_projection_checkpoint("proj-b").await.unwrap();
assert_eq!(loaded_a.cursor_for(&StreamId::new("s1")), 10);
assert_eq!(loaded_b.cursor_for(&StreamId::new("s1")), 99);
}
#[tokio::test]
async fn advance_projection_cursors_only_writes_changed_streams() {
use crate::projection::{GlobalProjectionCheckpoint, ProjectionCheckpointStore};
let store = make_store().await;
let mut prev = GlobalProjectionCheckpoint::new();
prev.advance(&StreamId::new("p/a"), 10);
prev.advance(&StreamId::new("p/b"), 20);
store
.save_projection_checkpoint("adv-test", &prev)
.await
.unwrap();
let mut curr = prev.clone();
curr.advance(&StreamId::new("p/b"), 30);
store
.advance_projection_cursors("adv-test", &prev, &curr)
.await
.unwrap();
let loaded = store.load_projection_checkpoint("adv-test").await.unwrap();
assert_eq!(loaded.cursor_for(&StreamId::new("p/a")), 10);
assert_eq!(loaded.cursor_for(&StreamId::new("p/b")), 30);
}
}