use crate::contracts::ReclaimGrant;
use crate::types::{TimestampMs, WaitpointToken};
pub use crate::types::Namespace;
use std::collections::BTreeMap;
use std::time::Duration;
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
#[non_exhaustive]
pub enum BackendTag {
Valkey,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
#[non_exhaustive]
pub enum HandleKind {
Fresh,
Resumed,
Suspended,
}
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct HandleOpaque(Box<[u8]>);
impl HandleOpaque {
pub fn new(bytes: Box<[u8]>) -> Self {
Self(bytes)
}
pub fn as_bytes(&self) -> &[u8] {
&self.0
}
}
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
#[non_exhaustive]
pub struct Handle {
pub backend: BackendTag,
pub kind: HandleKind,
pub opaque: HandleOpaque,
}
impl Handle {
pub fn new(backend: BackendTag, kind: HandleKind, opaque: HandleOpaque) -> Self {
Self {
backend,
kind,
opaque,
}
}
}
#[derive(Clone, Debug, PartialEq, Eq, Default)]
#[non_exhaustive]
pub struct CapabilitySet {
pub tokens: Vec<String>,
}
impl CapabilitySet {
pub fn new<I, S>(tokens: I) -> Self
where
I: IntoIterator<Item = S>,
S: Into<String>,
{
Self {
tokens: tokens.into_iter().map(Into::into).collect(),
}
}
pub fn is_empty(&self) -> bool {
self.tokens.is_empty()
}
}
#[derive(Clone, Debug, PartialEq, Eq, Default)]
#[non_exhaustive]
pub struct ClaimPolicy {
pub max_wait: Option<Duration>,
}
impl ClaimPolicy {
pub fn immediate() -> Self {
Self { max_wait: None }
}
pub fn with_max_wait(max_wait: Duration) -> Self {
Self {
max_wait: Some(max_wait),
}
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
#[non_exhaustive]
pub enum FrameKind {
Stdout,
Stderr,
Event,
Blob,
}
#[derive(Clone, Debug, PartialEq, Eq)]
#[non_exhaustive]
pub struct Frame {
pub bytes: Vec<u8>,
pub kind: FrameKind,
pub seq: Option<u64>,
pub frame_type: String,
pub correlation_id: Option<String>,
}
impl Frame {
pub fn new(bytes: Vec<u8>, kind: FrameKind) -> Self {
Self {
bytes,
kind,
seq: None,
frame_type: String::new(),
correlation_id: None,
}
}
pub fn with_seq(bytes: Vec<u8>, kind: FrameKind, seq: u64) -> Self {
Self {
bytes,
kind,
seq: Some(seq),
frame_type: String::new(),
correlation_id: None,
}
}
pub fn with_frame_type(mut self, frame_type: impl Into<String>) -> Self {
self.frame_type = frame_type.into();
self
}
pub fn with_correlation_id(mut self, correlation_id: impl Into<String>) -> Self {
self.correlation_id = Some(correlation_id.into());
self
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
#[non_exhaustive]
pub enum WaitpointKind {
SignalName,
CorrelationId,
External,
}
#[derive(Clone, PartialEq, Eq, Hash)]
pub struct WaitpointHmac(WaitpointToken);
impl WaitpointHmac {
pub fn new(token: impl Into<String>) -> Self {
Self(WaitpointToken::from(token.into()))
}
pub fn token(&self) -> &WaitpointToken {
&self.0
}
pub fn as_str(&self) -> &str {
self.0.as_str()
}
}
impl std::fmt::Debug for WaitpointHmac {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "WaitpointHmac({:?})", self.0)
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
#[non_exhaustive]
pub struct PendingWaitpoint {
pub waitpoint_id: crate::types::WaitpointId,
pub hmac_token: WaitpointHmac,
}
impl PendingWaitpoint {
pub fn new(waitpoint_id: crate::types::WaitpointId, hmac_token: WaitpointHmac) -> Self {
Self {
waitpoint_id,
hmac_token,
}
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
#[non_exhaustive]
pub struct WaitpointSpec {
pub kind: WaitpointKind,
pub matcher: Vec<u8>,
pub hmac_token: WaitpointHmac,
}
impl WaitpointSpec {
pub fn new(kind: WaitpointKind, matcher: Vec<u8>, hmac_token: WaitpointHmac) -> Self {
Self {
kind,
matcher,
hmac_token,
}
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
#[non_exhaustive]
pub struct FailureReason {
pub message: String,
pub detail: Option<Vec<u8>>,
}
impl FailureReason {
pub fn new(message: impl Into<String>) -> Self {
Self {
message: message.into(),
detail: None,
}
}
pub fn with_detail(message: impl Into<String>, detail: Vec<u8>) -> Self {
Self {
message: message.into(),
detail: Some(detail),
}
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
#[non_exhaustive]
pub enum FailureClass {
Transient,
Permanent,
InfraCrash,
Timeout,
Cancelled,
}
#[derive(Clone, Debug, PartialEq, Eq, Default)]
#[non_exhaustive]
pub struct UsageDimensions {
pub input_tokens: u64,
pub output_tokens: u64,
pub wall_ms: Option<u64>,
pub custom: BTreeMap<String, u64>,
pub dedup_key: Option<String>,
}
impl UsageDimensions {
pub fn new() -> Self {
Self::default()
}
pub fn with_input_tokens(mut self, tokens: u64) -> Self {
self.input_tokens = tokens;
self
}
pub fn with_output_tokens(mut self, tokens: u64) -> Self {
self.output_tokens = tokens;
self
}
pub fn with_wall_ms(mut self, ms: u64) -> Self {
self.wall_ms = Some(ms);
self
}
pub fn with_dedup_key(mut self, key: impl Into<String>) -> Self {
self.dedup_key = Some(key.into());
self
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
#[non_exhaustive]
pub struct ReclaimToken {
pub grant: ReclaimGrant,
}
impl ReclaimToken {
pub fn new(grant: ReclaimGrant) -> Self {
Self { grant }
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
#[non_exhaustive]
pub struct LeaseRenewal {
pub expires_at_ms: u64,
pub lease_epoch: u64,
}
impl LeaseRenewal {
pub fn new(expires_at_ms: u64, lease_epoch: u64) -> Self {
Self {
expires_at_ms,
lease_epoch,
}
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
#[non_exhaustive]
pub enum CancelFlowPolicy {
FlowOnly,
CancelAll,
CancelPending,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
#[non_exhaustive]
pub enum CancelFlowWait {
NoWait,
WaitTimeout(Duration),
WaitIndefinite,
}
#[derive(Clone, Debug, PartialEq, Eq)]
#[non_exhaustive]
pub struct CompletionPayload {
pub execution_id: crate::types::ExecutionId,
pub outcome: String,
pub payload_bytes: Option<Vec<u8>>,
pub produced_at_ms: TimestampMs,
pub flow_id: Option<crate::types::FlowId>,
}
impl CompletionPayload {
pub fn new(
execution_id: crate::types::ExecutionId,
outcome: impl Into<String>,
payload_bytes: Option<Vec<u8>>,
produced_at_ms: TimestampMs,
) -> Self {
Self {
execution_id,
outcome: outcome.into(),
payload_bytes,
produced_at_ms,
flow_id: None,
}
}
#[must_use]
pub fn with_flow_id(mut self, flow_id: crate::types::FlowId) -> Self {
self.flow_id = Some(flow_id);
self
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct ResumeSignal {
pub signal_id: crate::types::SignalId,
pub signal_name: String,
pub signal_category: String,
pub source_type: String,
pub source_identity: String,
pub correlation_id: String,
pub accepted_at: TimestampMs,
pub payload: Option<Vec<u8>>,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum FailOutcome {
RetryScheduled {
delay_until: crate::types::TimestampMs,
},
TerminalFailed,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct AppendFrameOutcome {
pub stream_id: String,
pub frame_count: u64,
}
#[derive(Clone, Debug, Default, PartialEq, Eq)]
#[non_exhaustive]
pub struct BackendTimeouts {
pub request: Option<Duration>,
}
#[derive(Clone, Debug, Default, PartialEq, Eq)]
#[non_exhaustive]
pub struct BackendRetry {
pub exponent_base: Option<u32>,
pub factor: Option<u32>,
pub number_of_retries: Option<u32>,
pub jitter_percent: Option<u32>,
}
#[derive(Clone, Debug, PartialEq, Eq)]
#[non_exhaustive]
pub struct ValkeyConnection {
pub host: String,
pub port: u16,
pub tls: bool,
pub cluster: bool,
}
impl ValkeyConnection {
pub fn new(host: impl Into<String>, port: u16) -> Self {
Self {
host: host.into(),
port,
tls: false,
cluster: false,
}
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
#[non_exhaustive]
pub enum BackendConnection {
Valkey(ValkeyConnection),
}
#[derive(Clone, Debug, PartialEq, Eq)]
#[non_exhaustive]
pub struct BackendConfig {
pub connection: BackendConnection,
pub timeouts: BackendTimeouts,
pub retry: BackendRetry,
}
impl BackendConfig {
pub fn valkey(host: impl Into<String>, port: u16) -> Self {
Self {
connection: BackendConnection::Valkey(ValkeyConnection::new(host, port)),
timeouts: BackendTimeouts::default(),
retry: BackendRetry::default(),
}
}
}
#[derive(Clone, Debug, Default, PartialEq, Eq)]
#[non_exhaustive]
pub struct ScannerFilter {
pub namespace: Option<Namespace>,
pub instance_tag: Option<(String, String)>,
}
impl ScannerFilter {
pub const NOOP: Self = Self {
namespace: None,
instance_tag: None,
};
pub fn new() -> Self {
Self::default()
}
pub fn with_namespace(mut self, ns: impl Into<Namespace>) -> Self {
self.namespace = Some(ns.into());
self
}
pub fn with_instance_tag(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
self.instance_tag = Some((key.into(), value.into()));
self
}
pub fn is_noop(&self) -> bool {
self.namespace.is_none() && self.instance_tag.is_none()
}
pub fn matches(
&self,
core_namespace: Option<&Namespace>,
tag_value: Option<&str>,
) -> bool {
if let Some(ref want) = self.namespace {
match core_namespace {
Some(have) if have == want => {}
_ => return false,
}
}
if let Some((_, ref want_value)) = self.instance_tag {
match tag_value {
Some(have) if have == want_value.as_str() => {}
_ => return false,
}
}
true
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::partition::{Partition, PartitionFamily};
use crate::types::{ExecutionId, LaneId, SignalId};
#[test]
fn backend_tag_derives() {
let a = BackendTag::Valkey;
let b = a;
assert_eq!(a, b);
assert_eq!(format!("{a:?}"), "Valkey");
}
#[test]
fn handle_kind_derives() {
for k in [HandleKind::Fresh, HandleKind::Resumed, HandleKind::Suspended] {
let c = k;
assert_eq!(k, c);
let _ = format!("{k:?}");
}
}
#[test]
fn handle_opaque_roundtrips() {
let bytes: Box<[u8]> = Box::new([1u8, 2, 3, 4]);
let o = HandleOpaque::new(bytes.clone());
assert_eq!(o.as_bytes(), &[1u8, 2, 3, 4]);
assert_eq!(o, o.clone());
let _ = format!("{o:?}");
}
#[test]
fn handle_composes() {
let h = Handle::new(
BackendTag::Valkey,
HandleKind::Fresh,
HandleOpaque::new(Box::new([0u8; 4])),
);
assert_eq!(h.backend, BackendTag::Valkey);
assert_eq!(h.kind, HandleKind::Fresh);
assert_eq!(h.clone(), h);
}
#[test]
fn capability_set_derives() {
let c1 = CapabilitySet::new(["gpu", "cuda"]);
let c2 = CapabilitySet::new(["gpu", "cuda"]);
assert_eq!(c1, c2);
assert!(!c1.is_empty());
assert!(CapabilitySet::default().is_empty());
let _ = format!("{c1:?}");
}
#[test]
fn claim_policy_derives() {
let p = ClaimPolicy::with_max_wait(Duration::from_millis(500));
assert_eq!(p.max_wait, Some(Duration::from_millis(500)));
assert_eq!(p.clone(), p);
assert_eq!(ClaimPolicy::immediate(), ClaimPolicy::default());
}
#[test]
fn frame_and_kind_derive() {
let f = Frame {
bytes: b"hello".to_vec(),
kind: FrameKind::Stdout,
seq: Some(3),
frame_type: "delta".to_owned(),
correlation_id: Some("req-42".to_owned()),
};
assert_eq!(f.clone(), f);
assert_eq!(f.kind, FrameKind::Stdout);
assert_eq!(f.frame_type, "delta");
assert_eq!(f.correlation_id.as_deref(), Some("req-42"));
assert_ne!(FrameKind::Stderr, FrameKind::Event);
let _ = format!("{f:?}");
}
#[test]
fn frame_builders_populate_extended_fields() {
let f = Frame::new(b"payload".to_vec(), FrameKind::Event)
.with_frame_type("agent_step")
.with_correlation_id("corr-1");
assert_eq!(f.frame_type, "agent_step");
assert_eq!(f.correlation_id.as_deref(), Some("corr-1"));
assert_eq!(f.seq, None);
let bare = Frame::new(b"p".to_vec(), FrameKind::Event);
assert_eq!(bare.frame_type, "");
assert_eq!(bare.correlation_id, None);
}
#[test]
fn waitpoint_spec_derives() {
let spec = WaitpointSpec {
kind: WaitpointKind::SignalName,
matcher: b"approved".to_vec(),
hmac_token: WaitpointHmac::new("kid1:deadbeef"),
};
assert_eq!(spec.clone(), spec);
assert_eq!(spec.hmac_token.as_str(), "kid1:deadbeef");
assert_eq!(
WaitpointHmac::new("a"),
WaitpointHmac::new(String::from("a"))
);
}
#[test]
fn failure_reason_and_class() {
let r1 = FailureReason::new("boom");
let r2 = FailureReason::with_detail("boom", b"stack".to_vec());
assert_eq!(r1.message, "boom");
assert!(r1.detail.is_none());
assert_eq!(r2.detail.as_deref(), Some(&b"stack"[..]));
assert_eq!(r1.clone(), r1);
assert_ne!(FailureClass::Transient, FailureClass::Permanent);
}
#[test]
fn usage_dimensions_default_and_eq() {
let u = UsageDimensions {
input_tokens: 10,
output_tokens: 20,
wall_ms: Some(150),
custom: BTreeMap::from([("net_bytes".to_string(), 42)]),
dedup_key: Some("k1".into()),
};
assert_eq!(u.clone(), u);
assert_eq!(UsageDimensions::default().input_tokens, 0);
assert_eq!(UsageDimensions::default().dedup_key, None);
}
#[test]
fn usage_dimensions_builder_chain() {
let u = UsageDimensions::new()
.with_input_tokens(10)
.with_output_tokens(20)
.with_wall_ms(150)
.with_dedup_key("k1");
assert_eq!(u.input_tokens, 10);
assert_eq!(u.output_tokens, 20);
assert_eq!(u.wall_ms, Some(150));
assert_eq!(u.dedup_key.as_deref(), Some("k1"));
assert!(u.custom.is_empty());
assert_eq!(UsageDimensions::new(), UsageDimensions::default());
}
#[test]
fn reclaim_token_wraps_grant() {
let grant = ReclaimGrant {
execution_id: ExecutionId::solo(&LaneId::new("default"), &Default::default()),
partition_key: crate::partition::PartitionKey::from(&Partition {
family: PartitionFamily::Flow,
index: 0,
}),
grant_key: "gkey".into(),
expires_at_ms: 123,
lane_id: LaneId::new("default"),
};
let t = ReclaimToken::new(grant.clone());
assert_eq!(t.grant, grant);
assert_eq!(t.clone(), t);
}
#[test]
fn lease_renewal_is_copy() {
let r = LeaseRenewal {
expires_at_ms: 100,
lease_epoch: 2,
};
let s = r; assert_eq!(r, s);
}
#[test]
fn cancel_flow_policy_and_wait() {
assert_ne!(CancelFlowPolicy::FlowOnly, CancelFlowPolicy::CancelAll);
let w = CancelFlowWait::WaitTimeout(Duration::from_secs(1));
assert_eq!(w, w);
assert_ne!(CancelFlowWait::NoWait, CancelFlowWait::WaitIndefinite);
}
#[test]
fn completion_payload_derives() {
let c = CompletionPayload {
execution_id: ExecutionId::solo(&LaneId::new("default"), &Default::default()),
outcome: "success".into(),
payload_bytes: Some(b"ok".to_vec()),
produced_at_ms: TimestampMs::from_millis(1234),
flow_id: None,
};
assert_eq!(c.clone(), c);
let _ = format!("{c:?}");
}
#[test]
fn resume_signal_moved_and_derives() {
let s = ResumeSignal {
signal_id: SignalId::new(),
signal_name: "approve".into(),
signal_category: "decision".into(),
source_type: "user".into(),
source_identity: "u1".into(),
correlation_id: "c1".into(),
accepted_at: TimestampMs::from_millis(10),
payload: None,
};
assert_eq!(s.clone(), s);
let _ = format!("{s:?}");
}
#[test]
fn fail_outcome_variants() {
let retry = FailOutcome::RetryScheduled {
delay_until: TimestampMs::from_millis(42),
};
let terminal = FailOutcome::TerminalFailed;
assert_ne!(retry, terminal);
assert_eq!(retry.clone(), retry);
}
#[test]
fn scanner_filter_noop_and_default() {
let f = ScannerFilter::default();
assert!(f.is_noop());
assert_eq!(f, ScannerFilter::NOOP);
assert!(f.matches(None, None));
assert!(f.matches(Some(&Namespace::new("t1")), Some("v")));
}
#[test]
fn scanner_filter_namespace_match() {
let f = ScannerFilter {
namespace: Some(Namespace::new("tenant-a")),
instance_tag: None,
};
assert!(!f.is_noop());
assert!(f.matches(Some(&Namespace::new("tenant-a")), None));
assert!(!f.matches(Some(&Namespace::new("tenant-b")), None));
assert!(!f.matches(None, None));
}
#[test]
fn scanner_filter_instance_tag_match() {
let f = ScannerFilter {
namespace: None,
instance_tag: Some(("cairn.instance_id".into(), "i-1".into())),
};
assert!(f.matches(None, Some("i-1")));
assert!(!f.matches(None, Some("i-2")));
assert!(!f.matches(None, None));
}
#[test]
fn scanner_filter_both_dimensions() {
let f = ScannerFilter {
namespace: Some(Namespace::new("tenant-a")),
instance_tag: Some(("cairn.instance_id".into(), "i-1".into())),
};
assert!(f.matches(Some(&Namespace::new("tenant-a")), Some("i-1")));
assert!(!f.matches(Some(&Namespace::new("tenant-a")), Some("i-2")));
assert!(!f.matches(Some(&Namespace::new("tenant-b")), Some("i-1")));
assert!(!f.matches(None, Some("i-1")));
}
#[test]
fn scanner_filter_builder_construction() {
let empty = ScannerFilter::new();
assert!(empty.is_noop());
assert_eq!(empty, ScannerFilter::NOOP);
let ns_only = ScannerFilter::new().with_namespace(Namespace::new("tenant-a"));
assert!(!ns_only.is_noop());
assert!(ns_only.matches(Some(&Namespace::new("tenant-a")), None));
let tag_only = ScannerFilter::new().with_instance_tag("cairn.instance_id", "i-1");
assert!(!tag_only.is_noop());
assert!(tag_only.matches(None, Some("i-1")));
let both = ScannerFilter::new()
.with_namespace(Namespace::new("tenant-a"))
.with_instance_tag("cairn.instance_id", "i-1");
assert!(both.matches(Some(&Namespace::new("tenant-a")), Some("i-1")));
assert!(!both.matches(Some(&Namespace::new("tenant-b")), Some("i-1")));
}
#[test]
fn backend_config_valkey_ctor() {
let c = BackendConfig::valkey("host.local", 6379);
let BackendConnection::Valkey(v) = &c.connection;
assert_eq!(v.host, "host.local");
assert_eq!(v.port, 6379);
assert!(!v.tls);
assert!(!v.cluster);
assert_eq!(c.timeouts, BackendTimeouts::default());
assert_eq!(c.retry, BackendRetry::default());
assert_eq!(c.clone(), c);
}
}