use crate::contracts::ReclaimGrant;
use crate::types::{TimestampMs, WaitpointToken, WorkerId, WorkerInstanceId};
pub use crate::types::Namespace;
pub use crate::contracts::{
CompositeBody, IdempotencyKey, ResumeCondition, ResumePolicy, ResumeTarget, SignalMatcher,
SuspendArgs, SuspendOutcome, SuspendOutcomeDetails, SuspensionReasonCode,
SuspensionRequester, TimeoutBehavior, WaitpointBinding,
};
use std::collections::BTreeMap;
use std::time::Duration;
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
#[non_exhaustive]
pub enum BackendTag {
Valkey,
Postgres,
}
impl BackendTag {
pub const fn wire_byte(self) -> u8 {
match self {
BackendTag::Valkey => 0x01,
BackendTag::Postgres => 0x02,
}
}
pub const fn from_wire_byte(b: u8) -> Option<Self> {
match b {
0x01 => Some(BackendTag::Valkey),
0x02 => Some(BackendTag::Postgres),
_ => None,
}
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
#[non_exhaustive]
pub enum HandleKind {
Fresh,
Resumed,
Suspended,
}
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct HandleOpaque(Box<[u8]>);
impl HandleOpaque {
pub fn new(bytes: Box<[u8]>) -> Self {
Self(bytes)
}
pub fn as_bytes(&self) -> &[u8] {
&self.0
}
}
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
#[non_exhaustive]
pub struct Handle {
pub backend: BackendTag,
pub kind: HandleKind,
pub opaque: HandleOpaque,
}
impl Handle {
pub fn new(backend: BackendTag, kind: HandleKind, opaque: HandleOpaque) -> Self {
Self {
backend,
kind,
opaque,
}
}
}
#[derive(Clone, Debug, PartialEq, Eq, Default)]
#[non_exhaustive]
pub struct CapabilitySet {
pub tokens: Vec<String>,
}
impl CapabilitySet {
pub fn new<I, S>(tokens: I) -> Self
where
I: IntoIterator<Item = S>,
S: Into<String>,
{
Self {
tokens: tokens.into_iter().map(Into::into).collect(),
}
}
pub fn is_empty(&self) -> bool {
self.tokens.is_empty()
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
#[non_exhaustive]
pub struct ClaimPolicy {
pub max_wait: Option<Duration>,
pub worker_id: WorkerId,
pub worker_instance_id: WorkerInstanceId,
pub lease_ttl_ms: u32,
}
impl ClaimPolicy {
pub fn new(
worker_id: WorkerId,
worker_instance_id: WorkerInstanceId,
lease_ttl_ms: u32,
max_wait: Option<Duration>,
) -> Self {
Self {
max_wait,
worker_id,
worker_instance_id,
lease_ttl_ms,
}
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
#[non_exhaustive]
pub enum FrameKind {
Stdout,
Stderr,
Event,
Blob,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
#[non_exhaustive]
pub enum PatchKind {
JsonMergePatch,
}
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Hash)]
#[non_exhaustive]
pub enum StreamMode {
#[default]
Durable,
DurableSummary { patch_kind: PatchKind },
BestEffortLive { config: BestEffortLiveConfig },
}
#[derive(Clone, Copy, Debug, PartialEq)]
#[non_exhaustive]
pub struct BestEffortLiveConfig {
pub ttl_ms: u32,
pub maxlen_floor: u32,
pub maxlen_ceiling: u32,
pub ema_alpha: f64,
}
impl BestEffortLiveConfig {
pub fn with_ttl(ttl_ms: u32) -> Self {
Self {
ttl_ms,
maxlen_floor: 64,
maxlen_ceiling: 16_384,
ema_alpha: 0.2,
}
}
pub fn with_maxlen_floor(mut self, floor: u32) -> Self {
self.maxlen_floor = floor;
self
}
pub fn with_maxlen_ceiling(mut self, ceiling: u32) -> Self {
self.maxlen_ceiling = ceiling;
self
}
pub fn with_ema_alpha(mut self, alpha: f64) -> Self {
self.ema_alpha = alpha;
self
}
}
impl Eq for BestEffortLiveConfig {}
impl std::hash::Hash for BestEffortLiveConfig {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.ttl_ms.hash(state);
self.maxlen_floor.hash(state);
self.maxlen_ceiling.hash(state);
self.ema_alpha.to_bits().hash(state);
}
}
impl StreamMode {
pub fn durable() -> Self {
StreamMode::Durable
}
pub fn durable_summary() -> Self {
StreamMode::DurableSummary {
patch_kind: PatchKind::JsonMergePatch,
}
}
pub fn best_effort_live(ttl_ms: u32) -> Self {
StreamMode::BestEffortLive {
config: BestEffortLiveConfig::with_ttl(ttl_ms),
}
}
pub fn best_effort_live_with_config(config: BestEffortLiveConfig) -> Self {
StreamMode::BestEffortLive { config }
}
pub fn wire_str(&self) -> &'static str {
match self {
StreamMode::Durable => "durable",
StreamMode::DurableSummary { .. } => "summary",
StreamMode::BestEffortLive { .. } => "best_effort",
}
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Default)]
#[non_exhaustive]
pub enum TailVisibility {
#[default]
All,
ExcludeBestEffort,
}
impl TailVisibility {
pub fn wire_str(&self) -> &'static str {
match self {
TailVisibility::All => "",
TailVisibility::ExcludeBestEffort => "exclude_best_effort",
}
}
}
pub const SUMMARY_NULL_SENTINEL: &str = "__ff_null__";
#[derive(Clone, Debug, PartialEq, Eq)]
#[non_exhaustive]
pub struct SummaryDocument {
pub document_json: Vec<u8>,
pub version: u64,
pub patch_kind: PatchKind,
pub last_updated_ms: u64,
pub first_applied_ms: u64,
}
impl SummaryDocument {
pub fn new(
document_json: Vec<u8>,
version: u64,
patch_kind: PatchKind,
last_updated_ms: u64,
first_applied_ms: u64,
) -> Self {
Self {
document_json,
version,
patch_kind,
last_updated_ms,
first_applied_ms,
}
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
#[non_exhaustive]
pub struct Frame {
pub bytes: Vec<u8>,
pub kind: FrameKind,
pub seq: Option<u64>,
pub frame_type: String,
pub correlation_id: Option<String>,
pub mode: StreamMode,
}
impl Frame {
pub fn new(bytes: Vec<u8>, kind: FrameKind) -> Self {
Self {
bytes,
kind,
seq: None,
frame_type: String::new(),
correlation_id: None,
mode: StreamMode::Durable,
}
}
pub fn with_seq(bytes: Vec<u8>, kind: FrameKind, seq: u64) -> Self {
Self {
bytes,
kind,
seq: Some(seq),
frame_type: String::new(),
correlation_id: None,
mode: StreamMode::Durable,
}
}
pub fn with_mode(mut self, mode: StreamMode) -> Self {
self.mode = mode;
self
}
pub fn with_frame_type(mut self, frame_type: impl Into<String>) -> Self {
self.frame_type = frame_type.into();
self
}
pub fn with_correlation_id(mut self, correlation_id: impl Into<String>) -> Self {
self.correlation_id = Some(correlation_id.into());
self
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
#[non_exhaustive]
pub enum WaitpointKind {
SignalName,
CorrelationId,
External,
}
#[derive(Clone, PartialEq, Eq, Hash)]
pub struct WaitpointHmac(WaitpointToken);
impl WaitpointHmac {
pub fn new(token: impl Into<String>) -> Self {
Self(WaitpointToken::from(token.into()))
}
pub fn token(&self) -> &WaitpointToken {
&self.0
}
pub fn as_str(&self) -> &str {
self.0.as_str()
}
}
impl std::fmt::Debug for WaitpointHmac {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "WaitpointHmac({:?})", self.0)
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
#[non_exhaustive]
pub struct PendingWaitpoint {
pub waitpoint_id: crate::types::WaitpointId,
pub hmac_token: WaitpointHmac,
}
impl PendingWaitpoint {
pub fn new(waitpoint_id: crate::types::WaitpointId, hmac_token: WaitpointHmac) -> Self {
Self {
waitpoint_id,
hmac_token,
}
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
#[non_exhaustive]
pub struct WaitpointSpec {
pub kind: WaitpointKind,
pub matcher: Vec<u8>,
pub hmac_token: WaitpointHmac,
}
impl WaitpointSpec {
pub fn new(kind: WaitpointKind, matcher: Vec<u8>, hmac_token: WaitpointHmac) -> Self {
Self {
kind,
matcher,
hmac_token,
}
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
#[non_exhaustive]
pub struct FailureReason {
pub message: String,
pub detail: Option<Vec<u8>>,
}
impl FailureReason {
pub fn new(message: impl Into<String>) -> Self {
Self {
message: message.into(),
detail: None,
}
}
pub fn with_detail(message: impl Into<String>, detail: Vec<u8>) -> Self {
Self {
message: message.into(),
detail: Some(detail),
}
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
#[non_exhaustive]
pub enum FailureClass {
Transient,
Permanent,
InfraCrash,
Timeout,
Cancelled,
}
#[derive(Clone, Debug, PartialEq, Eq, Default)]
#[non_exhaustive]
pub struct UsageDimensions {
pub input_tokens: u64,
pub output_tokens: u64,
pub wall_ms: Option<u64>,
pub custom: BTreeMap<String, u64>,
pub dedup_key: Option<String>,
}
impl UsageDimensions {
pub fn new() -> Self {
Self::default()
}
pub fn with_input_tokens(mut self, tokens: u64) -> Self {
self.input_tokens = tokens;
self
}
pub fn with_output_tokens(mut self, tokens: u64) -> Self {
self.output_tokens = tokens;
self
}
pub fn with_wall_ms(mut self, ms: u64) -> Self {
self.wall_ms = Some(ms);
self
}
pub fn with_dedup_key(mut self, key: impl Into<String>) -> Self {
self.dedup_key = Some(key.into());
self
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
#[non_exhaustive]
pub struct ReclaimToken {
pub grant: ReclaimGrant,
pub worker_id: WorkerId,
pub worker_instance_id: WorkerInstanceId,
pub lease_ttl_ms: u32,
}
impl ReclaimToken {
pub fn new(
grant: ReclaimGrant,
worker_id: WorkerId,
worker_instance_id: WorkerInstanceId,
lease_ttl_ms: u32,
) -> Self {
Self {
grant,
worker_id,
worker_instance_id,
lease_ttl_ms,
}
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
#[non_exhaustive]
pub struct LeaseRenewal {
pub expires_at_ms: u64,
pub lease_epoch: u64,
}
impl LeaseRenewal {
pub fn new(expires_at_ms: u64, lease_epoch: u64) -> Self {
Self {
expires_at_ms,
lease_epoch,
}
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
#[non_exhaustive]
pub enum CancelFlowPolicy {
FlowOnly,
CancelAll,
CancelPending,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
#[non_exhaustive]
pub enum CancelFlowWait {
NoWait,
WaitTimeout(Duration),
WaitIndefinite,
}
#[derive(Clone, Debug, PartialEq, Eq)]
#[non_exhaustive]
pub struct CompletionPayload {
pub execution_id: crate::types::ExecutionId,
pub outcome: String,
pub payload_bytes: Option<Vec<u8>>,
pub produced_at_ms: TimestampMs,
pub flow_id: Option<crate::types::FlowId>,
}
impl CompletionPayload {
pub fn new(
execution_id: crate::types::ExecutionId,
outcome: impl Into<String>,
payload_bytes: Option<Vec<u8>>,
produced_at_ms: TimestampMs,
) -> Self {
Self {
execution_id,
outcome: outcome.into(),
payload_bytes,
produced_at_ms,
flow_id: None,
}
}
#[must_use]
pub fn with_flow_id(mut self, flow_id: crate::types::FlowId) -> Self {
self.flow_id = Some(flow_id);
self
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct ResumeSignal {
pub signal_id: crate::types::SignalId,
pub signal_name: String,
pub signal_category: String,
pub source_type: String,
pub source_identity: String,
pub correlation_id: String,
pub accepted_at: TimestampMs,
pub payload: Option<Vec<u8>>,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum FailOutcome {
RetryScheduled {
delay_until: crate::types::TimestampMs,
},
TerminalFailed,
}
#[derive(Clone, Debug, PartialEq, Eq, Default)]
#[non_exhaustive]
pub struct AppendFrameOutcome {
pub stream_id: String,
pub frame_count: u64,
pub summary_version: Option<u64>,
}
impl AppendFrameOutcome {
pub fn new(stream_id: impl Into<String>, frame_count: u64) -> Self {
Self {
stream_id: stream_id.into(),
frame_count,
summary_version: None,
}
}
#[must_use]
pub fn with_summary_version(mut self, version: u64) -> Self {
self.summary_version = Some(version);
self
}
}
#[derive(Clone, Debug, Default, PartialEq, Eq)]
#[non_exhaustive]
pub struct BackendTimeouts {
pub request: Option<Duration>,
}
#[derive(Clone, Debug, Default, PartialEq, Eq)]
#[non_exhaustive]
pub struct BackendRetry {
pub exponent_base: Option<u32>,
pub factor: Option<u32>,
pub number_of_retries: Option<u32>,
pub jitter_percent: Option<u32>,
}
#[derive(Clone, Debug, PartialEq, Eq)]
#[non_exhaustive]
pub struct ValkeyConnection {
pub host: String,
pub port: u16,
pub tls: bool,
pub cluster: bool,
}
impl ValkeyConnection {
pub fn new(host: impl Into<String>, port: u16) -> Self {
Self {
host: host.into(),
port,
tls: false,
cluster: false,
}
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
#[non_exhaustive]
pub struct PostgresConnection {
pub url: String,
pub max_connections: u32,
pub min_connections: u32,
pub acquire_timeout: Duration,
}
impl PostgresConnection {
pub fn new(url: impl Into<String>) -> Self {
Self {
url: url.into(),
max_connections: 10,
min_connections: 0,
acquire_timeout: Duration::from_secs(30),
}
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
#[non_exhaustive]
pub enum BackendConnection {
Valkey(ValkeyConnection),
Postgres(PostgresConnection),
}
#[derive(Clone, Debug, PartialEq, Eq)]
#[non_exhaustive]
pub struct BackendConfig {
pub connection: BackendConnection,
pub timeouts: BackendTimeouts,
pub retry: BackendRetry,
}
impl BackendConfig {
pub fn valkey(host: impl Into<String>, port: u16) -> Self {
Self {
connection: BackendConnection::Valkey(ValkeyConnection::new(host, port)),
timeouts: BackendTimeouts::default(),
retry: BackendRetry::default(),
}
}
pub fn postgres(url: impl Into<String>) -> Self {
Self {
connection: BackendConnection::Postgres(PostgresConnection::new(url)),
timeouts: BackendTimeouts::default(),
retry: BackendRetry::default(),
}
}
}
#[derive(Clone, Debug, Default, PartialEq, Eq)]
#[non_exhaustive]
pub struct ScannerFilter {
pub namespace: Option<Namespace>,
pub instance_tag: Option<(String, String)>,
}
impl ScannerFilter {
pub const NOOP: Self = Self {
namespace: None,
instance_tag: None,
};
pub fn new() -> Self {
Self::default()
}
pub fn with_namespace(mut self, ns: impl Into<Namespace>) -> Self {
self.namespace = Some(ns.into());
self
}
pub fn with_instance_tag(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
self.instance_tag = Some((key.into(), value.into()));
self
}
pub fn is_noop(&self) -> bool {
self.namespace.is_none() && self.instance_tag.is_none()
}
pub fn matches(
&self,
core_namespace: Option<&Namespace>,
tag_value: Option<&str>,
) -> bool {
if let Some(ref want) = self.namespace {
match core_namespace {
Some(have) if have == want => {}
_ => return false,
}
}
if let Some((_, ref want_value)) = self.instance_tag {
match tag_value {
Some(have) if have == want_value.as_str() => {}
_ => return false,
}
}
true
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::partition::{Partition, PartitionFamily};
use crate::types::{ExecutionId, LaneId, SignalId};
#[test]
fn backend_tag_derives() {
let a = BackendTag::Valkey;
let b = a;
assert_eq!(a, b);
assert_eq!(format!("{a:?}"), "Valkey");
}
#[test]
fn handle_kind_derives() {
for k in [HandleKind::Fresh, HandleKind::Resumed, HandleKind::Suspended] {
let c = k;
assert_eq!(k, c);
let _ = format!("{k:?}");
}
}
#[test]
fn handle_opaque_roundtrips() {
let bytes: Box<[u8]> = Box::new([1u8, 2, 3, 4]);
let o = HandleOpaque::new(bytes.clone());
assert_eq!(o.as_bytes(), &[1u8, 2, 3, 4]);
assert_eq!(o, o.clone());
let _ = format!("{o:?}");
}
#[test]
fn handle_composes() {
let h = Handle::new(
BackendTag::Valkey,
HandleKind::Fresh,
HandleOpaque::new(Box::new([0u8; 4])),
);
assert_eq!(h.backend, BackendTag::Valkey);
assert_eq!(h.kind, HandleKind::Fresh);
assert_eq!(h.clone(), h);
}
#[test]
fn capability_set_derives() {
let c1 = CapabilitySet::new(["gpu", "cuda"]);
let c2 = CapabilitySet::new(["gpu", "cuda"]);
assert_eq!(c1, c2);
assert!(!c1.is_empty());
assert!(CapabilitySet::default().is_empty());
let _ = format!("{c1:?}");
}
#[test]
fn claim_policy_derives() {
let p = ClaimPolicy::new(
WorkerId::new("w"),
WorkerInstanceId::new("w-1"),
30_000,
Some(Duration::from_millis(500)),
);
assert_eq!(p.max_wait, Some(Duration::from_millis(500)));
assert_eq!(p.lease_ttl_ms, 30_000);
assert_eq!(p.worker_id.as_str(), "w");
assert_eq!(p.worker_instance_id.as_str(), "w-1");
assert_eq!(p.clone(), p);
let immediate = ClaimPolicy::new(
WorkerId::new("w"),
WorkerInstanceId::new("w-1"),
30_000,
None,
);
assert_eq!(immediate.max_wait, None);
}
#[test]
fn frame_and_kind_derive() {
let f = Frame {
bytes: b"hello".to_vec(),
kind: FrameKind::Stdout,
seq: Some(3),
frame_type: "delta".to_owned(),
correlation_id: Some("req-42".to_owned()),
mode: StreamMode::Durable,
};
assert_eq!(f.clone(), f);
assert_eq!(f.kind, FrameKind::Stdout);
assert_eq!(f.frame_type, "delta");
assert_eq!(f.correlation_id.as_deref(), Some("req-42"));
assert_ne!(FrameKind::Stderr, FrameKind::Event);
let _ = format!("{f:?}");
}
#[test]
fn frame_builders_populate_extended_fields() {
let f = Frame::new(b"payload".to_vec(), FrameKind::Event)
.with_frame_type("agent_step")
.with_correlation_id("corr-1");
assert_eq!(f.frame_type, "agent_step");
assert_eq!(f.correlation_id.as_deref(), Some("corr-1"));
assert_eq!(f.seq, None);
let bare = Frame::new(b"p".to_vec(), FrameKind::Event);
assert_eq!(bare.frame_type, "");
assert_eq!(bare.correlation_id, None);
}
#[test]
fn waitpoint_spec_derives() {
let spec = WaitpointSpec {
kind: WaitpointKind::SignalName,
matcher: b"approved".to_vec(),
hmac_token: WaitpointHmac::new("kid1:deadbeef"),
};
assert_eq!(spec.clone(), spec);
assert_eq!(spec.hmac_token.as_str(), "kid1:deadbeef");
assert_eq!(
WaitpointHmac::new("a"),
WaitpointHmac::new(String::from("a"))
);
}
#[test]
fn failure_reason_and_class() {
let r1 = FailureReason::new("boom");
let r2 = FailureReason::with_detail("boom", b"stack".to_vec());
assert_eq!(r1.message, "boom");
assert!(r1.detail.is_none());
assert_eq!(r2.detail.as_deref(), Some(&b"stack"[..]));
assert_eq!(r1.clone(), r1);
assert_ne!(FailureClass::Transient, FailureClass::Permanent);
}
#[test]
fn usage_dimensions_default_and_eq() {
let u = UsageDimensions {
input_tokens: 10,
output_tokens: 20,
wall_ms: Some(150),
custom: BTreeMap::from([("net_bytes".to_string(), 42)]),
dedup_key: Some("k1".into()),
};
assert_eq!(u.clone(), u);
assert_eq!(UsageDimensions::default().input_tokens, 0);
assert_eq!(UsageDimensions::default().dedup_key, None);
}
#[test]
fn usage_dimensions_builder_chain() {
let u = UsageDimensions::new()
.with_input_tokens(10)
.with_output_tokens(20)
.with_wall_ms(150)
.with_dedup_key("k1");
assert_eq!(u.input_tokens, 10);
assert_eq!(u.output_tokens, 20);
assert_eq!(u.wall_ms, Some(150));
assert_eq!(u.dedup_key.as_deref(), Some("k1"));
assert!(u.custom.is_empty());
assert_eq!(UsageDimensions::new(), UsageDimensions::default());
}
#[test]
fn reclaim_token_wraps_grant() {
let grant = ReclaimGrant {
execution_id: ExecutionId::solo(&LaneId::new("default"), &Default::default()),
partition_key: crate::partition::PartitionKey::from(&Partition {
family: PartitionFamily::Flow,
index: 0,
}),
grant_key: "gkey".into(),
expires_at_ms: 123,
lane_id: LaneId::new("default"),
};
let t = ReclaimToken::new(
grant.clone(),
WorkerId::new("w"),
WorkerInstanceId::new("w-1"),
30_000,
);
assert_eq!(t.grant, grant);
assert_eq!(t.worker_id.as_str(), "w");
assert_eq!(t.worker_instance_id.as_str(), "w-1");
assert_eq!(t.lease_ttl_ms, 30_000);
assert_eq!(t.clone(), t);
}
#[test]
fn lease_renewal_is_copy() {
let r = LeaseRenewal {
expires_at_ms: 100,
lease_epoch: 2,
};
let s = r; assert_eq!(r, s);
}
#[test]
fn cancel_flow_policy_and_wait() {
assert_ne!(CancelFlowPolicy::FlowOnly, CancelFlowPolicy::CancelAll);
let w = CancelFlowWait::WaitTimeout(Duration::from_secs(1));
assert_eq!(w, w);
assert_ne!(CancelFlowWait::NoWait, CancelFlowWait::WaitIndefinite);
}
#[test]
fn completion_payload_derives() {
let c = CompletionPayload {
execution_id: ExecutionId::solo(&LaneId::new("default"), &Default::default()),
outcome: "success".into(),
payload_bytes: Some(b"ok".to_vec()),
produced_at_ms: TimestampMs::from_millis(1234),
flow_id: None,
};
assert_eq!(c.clone(), c);
let _ = format!("{c:?}");
}
#[test]
fn resume_signal_moved_and_derives() {
let s = ResumeSignal {
signal_id: SignalId::new(),
signal_name: "approve".into(),
signal_category: "decision".into(),
source_type: "user".into(),
source_identity: "u1".into(),
correlation_id: "c1".into(),
accepted_at: TimestampMs::from_millis(10),
payload: None,
};
assert_eq!(s.clone(), s);
let _ = format!("{s:?}");
}
#[test]
fn fail_outcome_variants() {
let retry = FailOutcome::RetryScheduled {
delay_until: TimestampMs::from_millis(42),
};
let terminal = FailOutcome::TerminalFailed;
assert_ne!(retry, terminal);
assert_eq!(retry.clone(), retry);
}
#[test]
fn scanner_filter_noop_and_default() {
let f = ScannerFilter::default();
assert!(f.is_noop());
assert_eq!(f, ScannerFilter::NOOP);
assert!(f.matches(None, None));
assert!(f.matches(Some(&Namespace::new("t1")), Some("v")));
}
#[test]
fn scanner_filter_namespace_match() {
let f = ScannerFilter {
namespace: Some(Namespace::new("tenant-a")),
instance_tag: None,
};
assert!(!f.is_noop());
assert!(f.matches(Some(&Namespace::new("tenant-a")), None));
assert!(!f.matches(Some(&Namespace::new("tenant-b")), None));
assert!(!f.matches(None, None));
}
#[test]
fn scanner_filter_instance_tag_match() {
let f = ScannerFilter {
namespace: None,
instance_tag: Some(("cairn.instance_id".into(), "i-1".into())),
};
assert!(f.matches(None, Some("i-1")));
assert!(!f.matches(None, Some("i-2")));
assert!(!f.matches(None, None));
}
#[test]
fn scanner_filter_both_dimensions() {
let f = ScannerFilter {
namespace: Some(Namespace::new("tenant-a")),
instance_tag: Some(("cairn.instance_id".into(), "i-1".into())),
};
assert!(f.matches(Some(&Namespace::new("tenant-a")), Some("i-1")));
assert!(!f.matches(Some(&Namespace::new("tenant-a")), Some("i-2")));
assert!(!f.matches(Some(&Namespace::new("tenant-b")), Some("i-1")));
assert!(!f.matches(None, Some("i-1")));
}
#[test]
fn scanner_filter_builder_construction() {
let empty = ScannerFilter::new();
assert!(empty.is_noop());
assert_eq!(empty, ScannerFilter::NOOP);
let ns_only = ScannerFilter::new().with_namespace(Namespace::new("tenant-a"));
assert!(!ns_only.is_noop());
assert!(ns_only.matches(Some(&Namespace::new("tenant-a")), None));
let tag_only = ScannerFilter::new().with_instance_tag("cairn.instance_id", "i-1");
assert!(!tag_only.is_noop());
assert!(tag_only.matches(None, Some("i-1")));
let both = ScannerFilter::new()
.with_namespace(Namespace::new("tenant-a"))
.with_instance_tag("cairn.instance_id", "i-1");
assert!(both.matches(Some(&Namespace::new("tenant-a")), Some("i-1")));
assert!(!both.matches(Some(&Namespace::new("tenant-b")), Some("i-1")));
}
#[test]
fn stream_mode_constructors_and_wire_str() {
assert_eq!(StreamMode::durable().wire_str(), "durable");
assert_eq!(StreamMode::durable(), StreamMode::Durable);
let s = StreamMode::durable_summary();
assert_eq!(s.wire_str(), "summary");
match s {
StreamMode::DurableSummary { patch_kind } => {
assert_eq!(patch_kind, PatchKind::JsonMergePatch);
}
_ => panic!("expected DurableSummary"),
}
let b = StreamMode::best_effort_live(15_000);
assert_eq!(b.wire_str(), "best_effort");
match b {
StreamMode::BestEffortLive { config } => {
assert_eq!(config.ttl_ms, 15_000);
assert_eq!(config.maxlen_floor, 64);
assert_eq!(config.maxlen_ceiling, 16_384);
assert!((config.ema_alpha - 0.2).abs() < 1e-9);
}
_ => panic!("expected BestEffortLive"),
}
let cfg = BestEffortLiveConfig::with_ttl(10_000)
.with_maxlen_floor(128)
.with_maxlen_ceiling(8_192)
.with_ema_alpha(0.3);
let b2 = StreamMode::best_effort_live_with_config(cfg);
match b2 {
StreamMode::BestEffortLive { config } => {
assert_eq!(config.ttl_ms, 10_000);
assert_eq!(config.maxlen_floor, 128);
assert_eq!(config.maxlen_ceiling, 8_192);
assert!((config.ema_alpha - 0.3).abs() < 1e-9);
}
_ => panic!("expected BestEffortLive"),
}
assert_eq!(StreamMode::default(), StreamMode::Durable);
}
#[test]
fn tail_visibility_default_and_wire() {
assert_eq!(TailVisibility::default(), TailVisibility::All);
assert_eq!(TailVisibility::All.wire_str(), "");
assert_eq!(
TailVisibility::ExcludeBestEffort.wire_str(),
"exclude_best_effort"
);
}
#[test]
fn append_frame_outcome_summary_version_builder() {
let base = AppendFrameOutcome::new("1713-0", 3);
assert_eq!(base.stream_id, "1713-0");
assert_eq!(base.frame_count, 3);
assert_eq!(base.summary_version, None);
let with_v = AppendFrameOutcome::new("1713-0", 3).with_summary_version(7);
assert_eq!(with_v.summary_version, Some(7));
assert_eq!(with_v.clone(), with_v);
}
#[test]
fn summary_null_sentinel_byte_exact() {
assert_eq!(SUMMARY_NULL_SENTINEL, "__ff_null__");
assert_eq!(SUMMARY_NULL_SENTINEL.len(), 11);
assert!(SUMMARY_NULL_SENTINEL.bytes().all(|b| b.is_ascii()));
assert_eq!(SUMMARY_NULL_SENTINEL.trim(), SUMMARY_NULL_SENTINEL);
}
#[test]
fn summary_document_constructor() {
let doc = SummaryDocument::new(
br#"{"tokens":3}"#.to_vec(),
2,
PatchKind::JsonMergePatch,
1_700_000_100,
1_700_000_000,
);
assert_eq!(doc.version, 2);
assert_eq!(doc.patch_kind, PatchKind::JsonMergePatch);
assert_eq!(doc.first_applied_ms, 1_700_000_000);
assert_eq!(doc.clone(), doc);
}
#[test]
fn frame_builder_sets_mode() {
let f = Frame::new(b"p".to_vec(), FrameKind::Event);
assert_eq!(f.mode, StreamMode::Durable);
let f = Frame::new(b"p".to_vec(), FrameKind::Event)
.with_mode(StreamMode::durable_summary());
match f.mode {
StreamMode::DurableSummary { patch_kind } => {
assert_eq!(patch_kind, PatchKind::JsonMergePatch);
}
other => panic!("expected DurableSummary, got {other:?}"),
}
}
#[test]
fn backend_config_valkey_ctor() {
let c = BackendConfig::valkey("host.local", 6379);
let BackendConnection::Valkey(v) = &c.connection else {
panic!("expected Valkey connection, got Postgres");
};
assert_eq!(v.host, "host.local");
assert_eq!(v.port, 6379);
assert!(!v.tls);
assert!(!v.cluster);
assert_eq!(c.timeouts, BackendTimeouts::default());
assert_eq!(c.retry, BackendRetry::default());
assert_eq!(c.clone(), c);
}
}