use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256};
use crate::tools::source::PolledMessage;
const RECV_SEQ_WIDTH: usize = 20;
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct SpoolItem {
pub subscription: String,
pub source: String,
pub message_id: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub idempotency_key: Option<String>,
pub dedup_key: String,
pub recv_seq: u64,
#[serde(skip_serializing_if = "Option::is_none")]
pub ordering_key: Option<String>,
pub downstream: String,
pub sha256: String,
pub attempts: u32,
pub spooled_at_ms: u64,
pub reason: String,
pub message: PolledMessage,
}
impl SpoolItem {
#[allow(clippy::too_many_arguments)]
pub fn new(
subscription: impl Into<String>,
source: impl Into<String>,
message: PolledMessage,
idempotency_key: Option<String>,
recv_seq: u64,
ordering_key: Option<String>,
downstream: impl Into<String>,
reason: impl Into<String>,
spooled_at_ms: u64,
) -> Self {
let message_id = message.id.clone();
let dedup_key = idempotency_key.clone().unwrap_or_else(|| message_id.clone());
let sha256 = sha256_hex(&canonical_payload_bytes(&message));
Self {
subscription: subscription.into(),
source: source.into(),
message_id,
idempotency_key,
dedup_key,
recv_seq,
ordering_key,
downstream: downstream.into(),
sha256,
attempts: 0,
spooled_at_ms,
reason: reason.into(),
message,
}
}
pub fn object_key(&self) -> String {
format!(
"{:0width$}-{}",
self.recv_seq,
sanitize_key(&self.message_id),
width = RECV_SEQ_WIDTH
)
}
pub fn spool_ref(&self) -> String {
spool_ref(&self.subscription, self.recv_seq, &self.message_id)
}
pub fn lane(&self, ordering: super::OrderingMode) -> Option<String> {
match ordering {
super::OrderingMode::PerKey => self.ordering_key.clone(),
super::OrderingMode::Global | super::OrderingMode::None => None,
}
}
pub fn to_bytes(&self) -> Vec<u8> {
serde_json::to_vec(self).unwrap_or_default()
}
pub fn from_bytes(bytes: &[u8]) -> Result<SpoolItem, crate::error::ToolError> {
serde_json::from_slice(bytes).map_err(|e| {
crate::error::ToolError::Json(format!("spool item decode failed: {e}"))
})
}
}
pub fn spool_ref(subscription: &str, recv_seq: u64, message_id: &str) -> String {
format!("noetl://spool/{subscription}/{recv_seq}/{message_id}")
}
pub fn recv_seq_from_object_key(key: &str) -> Option<u64> {
if key.len() < RECV_SEQ_WIDTH {
return None;
}
key[..RECV_SEQ_WIDTH].parse::<u64>().ok()
}
pub fn sha256_hex(bytes: &[u8]) -> String {
let mut h = Sha256::new();
h.update(bytes);
let digest = h.finalize();
let mut out = String::with_capacity(64);
for b in digest {
out.push_str(&format!("{b:02x}"));
}
out
}
fn canonical_payload_bytes(message: &PolledMessage) -> Vec<u8> {
serde_json::to_vec(&message.data).unwrap_or_else(|_| message.id.as_bytes().to_vec())
}
fn sanitize_key(s: &str) -> String {
let mut out: String = s
.chars()
.map(|c| {
if c.is_ascii_alphanumeric() || matches!(c, '-' | '_' | '.') {
c
} else {
'_'
}
})
.collect();
if out.len() > 128 {
out.truncate(128);
}
out
}
#[cfg(test)]
mod tests {
use super::*;
fn msg(id: &str, data: serde_json::Value) -> PolledMessage {
PolledMessage {
id: id.to_string(),
data,
headers: serde_json::Map::new(),
attributes: serde_json::Value::Null,
metadata: serde_json::Value::Null,
ack_id: None,
}
}
#[test]
fn dedup_key_prefers_idempotency_then_message_id() {
let i1 = SpoolItem::new("s", "nats", msg("m1", serde_json::json!({})), Some("idem-9".into()), 1, None, "default", "circuit_open", 0);
assert_eq!(i1.dedup_key, "idem-9");
let i2 = SpoolItem::new("s", "nats", msg("m2", serde_json::json!({})), None, 2, None, "default", "circuit_open", 0);
assert_eq!(i2.dedup_key, "m2");
}
#[test]
fn object_keys_sort_in_receive_order() {
let a = SpoolItem::new("s", "nats", msg("z", serde_json::json!(1)), None, 9, None, "d", "circuit_open", 0);
let b = SpoolItem::new("s", "nats", msg("a", serde_json::json!(2)), None, 10, None, "d", "circuit_open", 0);
let c = SpoolItem::new("s", "nats", msg("a", serde_json::json!(3)), None, 100, None, "d", "circuit_open", 0);
let mut keys = vec![b.object_key(), c.object_key(), a.object_key()];
keys.sort();
assert_eq!(keys, vec![a.object_key(), b.object_key(), c.object_key()]);
}
#[test]
fn sha256_is_stable_and_payload_sensitive() {
let a = SpoolItem::new("s", "nats", msg("m", serde_json::json!({"v": 1})), None, 1, None, "d", "circuit_open", 0);
let b = SpoolItem::new("s", "nats", msg("m", serde_json::json!({"v": 1})), None, 2, None, "d", "circuit_open", 0);
let c = SpoolItem::new("s", "nats", msg("m", serde_json::json!({"v": 2})), None, 3, None, "d", "circuit_open", 0);
assert_eq!(a.sha256, b.sha256); assert_ne!(a.sha256, c.sha256); assert_eq!(a.sha256.len(), 64);
}
#[test]
fn spool_ref_format() {
let i = SpoolItem::new("subscriptions/iot", "nats", msg("42", serde_json::json!(1)), None, 7, None, "d", "circuit_open", 0);
assert_eq!(i.spool_ref(), "noetl://spool/subscriptions/iot/7/42");
}
#[test]
fn bytes_round_trip() {
let i = SpoolItem::new("s", "kafka", msg("p:1", serde_json::json!({"x": [1,2,3]})), Some("k".into()), 5, Some("dev-7".into()), "wh", "circuit_open", 123);
let bytes = i.to_bytes();
let back = SpoolItem::from_bytes(&bytes).unwrap();
assert_eq!(i, back);
}
#[test]
fn lane_depends_on_ordering() {
let i = SpoolItem::new("s", "nats", msg("m", serde_json::json!(1)), None, 1, Some("dev-7".into()), "d", "circuit_open", 0);
assert_eq!(i.lane(super::super::OrderingMode::PerKey).as_deref(), Some("dev-7"));
assert_eq!(i.lane(super::super::OrderingMode::Global), None);
assert_eq!(i.lane(super::super::OrderingMode::None), None);
}
#[test]
fn sanitize_key_strips_unsafe_chars() {
let i = SpoolItem::new("s", "nats", msg("a/b c:d", serde_json::json!(1)), None, 1, None, "d", "circuit_open", 0);
assert!(i.object_key().ends_with("a_b_c_d"));
}
#[test]
fn recv_seq_round_trips_through_object_key() {
for seq in [1u64, 7, 42, 1000, u64::MAX] {
let i = SpoolItem::new("s", "nats", msg("m", serde_json::json!(1)), None, seq, None, "d", "circuit_open", 0);
assert_eq!(recv_seq_from_object_key(&i.object_key()), Some(seq));
}
assert_eq!(recv_seq_from_object_key("garbage"), None);
assert_eq!(recv_seq_from_object_key(""), None);
}
}