pub mod backend;
pub mod circuit;
pub mod engine;
pub mod item;
pub mod probe;
pub use backend::{LocalDiskBackend, NatsObjectBackend, SpoolBackend, SpoolMeta};
#[cfg(feature = "gcs")]
pub use backend::GcsBackend;
#[cfg(feature = "s3")]
pub use backend::S3Backend;
pub use circuit::{
CircuitBreaker, CircuitConfig, CircuitDecision, CircuitPhase, CircuitRegistry, CircuitState,
DownstreamSpec, ProbeKind,
};
pub use engine::{Admission, DeadLetter, DrainReport, GcReport, SpoolEngine, SpooledRef};
pub use item::{recv_seq_from_object_key, sha256_hex, spool_ref, SpoolItem};
pub use probe::probe_downstream;
use serde::{Deserialize, Serialize};
use crate::error::ToolError;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
#[serde(rename_all = "snake_case")]
pub enum SpoolMode {
#[default]
Off,
BufferAndAck,
Hybrid,
}
impl SpoolMode {
pub fn as_str(&self) -> &'static str {
match self {
SpoolMode::Off => "off",
SpoolMode::BufferAndAck => "buffer_and_ack",
SpoolMode::Hybrid => "hybrid",
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
#[serde(rename_all = "snake_case")]
pub enum OrderingMode {
Global,
#[default]
PerKey,
None,
}
impl OrderingMode {
pub fn as_str(&self) -> &'static str {
match self {
OrderingMode::Global => "global",
OrderingMode::PerKey => "per_key",
OrderingMode::None => "none",
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
#[serde(rename_all = "snake_case")]
pub enum OnFull {
#[default]
StopAcking,
DropToDlq,
AlertOnly,
}
impl OnFull {
pub fn as_str(&self) -> &'static str {
match self {
OnFull::StopAcking => "stop_acking",
OnFull::DropToDlq => "drop_to_dlq",
OnFull::AlertOnly => "alert_only",
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
#[serde(rename_all = "snake_case")]
pub enum DrainOnRecovery {
#[default]
OrderedThenLive,
Interleave,
}
impl DrainOnRecovery {
pub fn as_str(&self) -> &'static str {
match self {
DrainOnRecovery::OrderedThenLive => "ordered_then_live",
DrainOnRecovery::Interleave => "interleave",
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
#[serde(rename_all = "snake_case")]
pub enum SpoolBackendKind {
#[default]
NatsObject,
LocalDisk,
Gcs,
S3,
}
impl SpoolBackendKind {
pub fn as_str(&self) -> &'static str {
match self {
SpoolBackendKind::NatsObject => "nats_object",
SpoolBackendKind::LocalDisk => "local_disk",
SpoolBackendKind::Gcs => "gcs",
SpoolBackendKind::S3 => "s3",
}
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct RetentionConfig {
pub max_age_hours: Option<u64>,
pub max_bytes: Option<u64>,
pub on_full: OnFull,
}
impl Default for RetentionConfig {
fn default() -> Self {
Self {
max_age_hours: None,
max_bytes: Some(1024 * 1024 * 1024),
on_full: OnFull::default(),
}
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct DrainConfig {
pub rate_per_sec: Option<u32>,
pub max_replay_attempts: u32,
pub on_recovery: DrainOnRecovery,
}
impl Default for DrainConfig {
fn default() -> Self {
Self {
rate_per_sec: None,
max_replay_attempts: 5,
on_recovery: DrainOnRecovery::default(),
}
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct SpoolSpec {
pub mode: SpoolMode,
pub backend: SpoolBackendKind,
pub bucket: Option<String>,
pub path: Option<String>,
pub credential: Option<String>,
pub circuit: CircuitConfig,
pub ordering: OrderingMode,
pub ordering_key: Option<String>,
pub retention: RetentionConfig,
pub drain: DrainConfig,
pub hybrid_escalate_after_ms: u64,
}
impl SpoolSpec {
pub fn off() -> Self {
Self {
mode: SpoolMode::Off,
backend: SpoolBackendKind::default(),
bucket: None,
path: None,
credential: None,
circuit: CircuitConfig::default(),
ordering: OrderingMode::default(),
ordering_key: None,
retention: RetentionConfig::default(),
drain: DrainConfig::default(),
hybrid_escalate_after_ms: 30_000,
}
}
pub fn buffers(&self) -> bool {
!matches!(self.mode, SpoolMode::Off)
}
pub fn parse(value: Option<&serde_json::Value>) -> Result<SpoolSpec, ToolError> {
let obj = match value {
None | Some(serde_json::Value::Null) => return Ok(SpoolSpec::off()),
Some(serde_json::Value::Object(o)) => o,
Some(other) => {
return Err(ToolError::Configuration(format!(
"subscription 'spool' must be a mapping, got {other}"
)))
}
};
let mut spec = SpoolSpec::off();
if let Some(m) = obj.get("mode") {
spec.mode = parse_enum_str(m, "spool.mode", &[
("off", SpoolMode::Off),
("buffer_and_ack", SpoolMode::BufferAndAck),
("hybrid", SpoolMode::Hybrid),
])?;
}
if let Some(b) = obj.get("backend") {
spec.backend = parse_enum_str(b, "spool.backend", &[
("nats_object", SpoolBackendKind::NatsObject),
("local_disk", SpoolBackendKind::LocalDisk),
("gcs", SpoolBackendKind::Gcs),
("s3", SpoolBackendKind::S3),
])?;
}
spec.bucket = obj.get("bucket").and_then(|v| v.as_str()).map(str::to_string);
spec.path = obj.get("path").and_then(|v| v.as_str()).map(str::to_string);
spec.credential = obj
.get("credential")
.and_then(|v| v.as_str())
.map(str::to_string);
if let Some(c) = obj.get("circuit") {
spec.circuit = CircuitConfig::parse(c)?;
}
if let Some(o) = obj.get("ordering") {
spec.ordering = parse_enum_str(o, "spool.ordering", &[
("global", OrderingMode::Global),
("per_key", OrderingMode::PerKey),
("none", OrderingMode::None),
])?;
}
spec.ordering_key = obj
.get("ordering_key")
.and_then(|v| v.as_str())
.map(str::to_string);
if let Some(r) = obj.get("retention").and_then(|v| v.as_object()) {
spec.retention.max_age_hours = r.get("max_age_hours").and_then(|v| v.as_u64());
if r.contains_key("max_bytes") {
spec.retention.max_bytes = r.get("max_bytes").and_then(|v| v.as_u64());
}
if let Some(f) = r.get("on_full") {
spec.retention.on_full = parse_enum_str(f, "spool.retention.on_full", &[
("stop_acking", OnFull::StopAcking),
("drop_to_dlq", OnFull::DropToDlq),
("alert_only", OnFull::AlertOnly),
])?;
}
}
if let Some(d) = obj.get("drain").and_then(|v| v.as_object()) {
spec.drain.rate_per_sec = d
.get("rate_per_sec")
.and_then(|v| v.as_u64())
.map(|v| v as u32);
if let Some(m) = d.get("max_replay_attempts").and_then(|v| v.as_u64()) {
spec.drain.max_replay_attempts = m as u32;
}
if let Some(r) = d.get("on_recovery") {
spec.drain.on_recovery = parse_enum_str(r, "spool.drain.on_recovery", &[
("ordered_then_live", DrainOnRecovery::OrderedThenLive),
("interleave", DrainOnRecovery::Interleave),
])?;
}
}
if let Some(h) = obj.get("hybrid_escalate_after_ms").and_then(|v| v.as_u64()) {
spec.hybrid_escalate_after_ms = h;
}
spec.validate()?;
Ok(spec)
}
pub fn validate(&self) -> Result<(), ToolError> {
if !self.buffers() {
return Ok(()); }
match self.backend {
SpoolBackendKind::NatsObject | SpoolBackendKind::Gcs | SpoolBackendKind::S3 => {
if self.bucket.as_deref().unwrap_or("").is_empty() {
return Err(ToolError::Configuration(format!(
"spool.backend '{}' requires a non-empty 'bucket'",
self.backend.as_str()
)));
}
}
SpoolBackendKind::LocalDisk => {
if self.path.as_deref().unwrap_or("").is_empty() {
return Err(ToolError::Configuration(
"spool.backend 'local_disk' requires a non-empty 'path'".to_string(),
));
}
}
}
if matches!(self.backend, SpoolBackendKind::Gcs | SpoolBackendKind::S3)
&& self.credential.as_deref().unwrap_or("").is_empty()
{
return Err(ToolError::Configuration(format!(
"spool.backend '{}' requires a keychain 'credential' alias for the bucket",
self.backend.as_str()
)));
}
if matches!(self.drain.on_recovery, DrainOnRecovery::Interleave)
&& matches!(self.ordering, OrderingMode::Global)
{
return Err(ToolError::Configuration(
"spool.drain.on_recovery 'interleave' is unsafe with ordering 'global'; \
use 'ordered_then_live' or change ordering to 'per_key'/'none'"
.to_string(),
));
}
self.circuit.validate()?;
Ok(())
}
}
pub(crate) fn parse_enum_str<T: Copy>(
value: &serde_json::Value,
field: &str,
allowed: &[(&str, T)],
) -> Result<T, ToolError> {
let s = value.as_str().ok_or_else(|| {
ToolError::Configuration(format!("{field} must be a string"))
})?;
let lower = s.to_ascii_lowercase();
for (wire, val) in allowed {
if *wire == lower {
return Ok(*val);
}
}
let valid: Vec<&str> = allowed.iter().map(|(w, _)| *w).collect();
Err(ToolError::Configuration(format!(
"{field} '{s}' invalid; valid: {}",
valid.join(", ")
)))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn parse_missing_block_is_off() {
assert_eq!(SpoolSpec::parse(None).unwrap(), SpoolSpec::off());
assert_eq!(
SpoolSpec::parse(Some(&serde_json::Value::Null)).unwrap(),
SpoolSpec::off()
);
assert!(!SpoolSpec::off().buffers());
}
#[test]
fn parse_buffer_and_ack_nats_object() {
let v = serde_json::json!({
"mode": "buffer_and_ack",
"backend": "nats_object",
"bucket": "noetl_spool",
"ordering": "per_key",
"ordering_key": "device_id",
"circuit": { "trip_after": 3, "probe_after_ms": 5000,
"downstream": [{"name": "warehouse", "type": "http", "target": "http://wh/health"}] },
"retention": { "max_bytes": 1048576, "on_full": "drop_to_dlq" },
"drain": { "max_replay_attempts": 2, "on_recovery": "ordered_then_live" }
});
let s = SpoolSpec::parse(Some(&v)).unwrap();
assert_eq!(s.mode, SpoolMode::BufferAndAck);
assert_eq!(s.backend, SpoolBackendKind::NatsObject);
assert_eq!(s.bucket.as_deref(), Some("noetl_spool"));
assert_eq!(s.ordering, OrderingMode::PerKey);
assert_eq!(s.ordering_key.as_deref(), Some("device_id"));
assert_eq!(s.circuit.trip_after, 3);
assert_eq!(s.circuit.probe_after_ms, 5000);
assert_eq!(s.circuit.downstream.len(), 1);
assert_eq!(s.retention.max_bytes, Some(1048576));
assert_eq!(s.retention.on_full, OnFull::DropToDlq);
assert_eq!(s.drain.max_replay_attempts, 2);
assert!(s.buffers());
}
#[test]
fn parse_rejects_missing_bucket_for_nats_object() {
let v = serde_json::json!({ "mode": "buffer_and_ack", "backend": "nats_object" });
assert!(SpoolSpec::parse(Some(&v)).is_err());
}
#[test]
fn parse_rejects_local_disk_without_path() {
let v = serde_json::json!({ "mode": "buffer_and_ack", "backend": "local_disk" });
assert!(SpoolSpec::parse(Some(&v)).is_err());
}
#[test]
fn parse_rejects_gcs_without_credential() {
let v = serde_json::json!({
"mode": "buffer_and_ack", "backend": "gcs", "bucket": "b"
});
assert!(SpoolSpec::parse(Some(&v)).is_err());
}
#[test]
fn parse_rejects_interleave_with_global_ordering() {
let v = serde_json::json!({
"mode": "buffer_and_ack", "backend": "local_disk", "path": "/tmp/x",
"ordering": "global", "drain": { "on_recovery": "interleave" }
});
assert!(SpoolSpec::parse(Some(&v)).is_err());
}
#[test]
fn parse_rejects_bad_enum() {
let v = serde_json::json!({ "mode": "bogus" });
assert!(SpoolSpec::parse(Some(&v)).is_err());
}
#[test]
fn off_mode_skips_backend_validation() {
let v = serde_json::json!({ "mode": "off" });
assert!(SpoolSpec::parse(Some(&v)).unwrap().validate().is_ok());
}
#[test]
fn default_retention_has_byte_ceiling() {
assert_eq!(RetentionConfig::default().max_bytes, Some(1024 * 1024 * 1024));
}
}