use serde::{Deserialize, Serialize};
use std::time::{SystemTime, UNIX_EPOCH};
pub type JobId = String;
pub(crate) fn now_ms() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0)
}
#[derive(Clone, Copy, Debug, PartialEq, Serialize, Deserialize)]
pub struct JobRetryOverride {
pub max_attempts: Option<u32>,
pub backoff: Option<BackoffSpec>,
}
#[derive(Clone, Copy, Debug, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
#[non_exhaustive]
pub enum BackoffKind {
Fixed,
Exponential,
#[serde(other)]
Unknown,
}
#[derive(Clone, Copy, Debug, PartialEq, Serialize, Deserialize)]
pub struct BackoffSpec {
pub kind: BackoffKind,
pub delay_ms: u64,
pub max_delay_ms: Option<u64>,
pub multiplier: Option<f64>,
pub jitter_ms: Option<u64>,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct Job<T> {
pub id: JobId,
pub payload: T,
pub created_at_ms: u64,
#[serde(default)]
pub attempt: u32,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub retry: Option<JobRetryOverride>,
#[serde(default, skip)]
pub name: String,
}
impl<T> Job<T> {
pub fn new(payload: T) -> Self {
Self::with_id(ulid::Ulid::new().to_string(), payload)
}
pub fn with_id(id: JobId, payload: T) -> Self {
Self {
id,
payload,
created_at_ms: now_ms(),
attempt: 0,
retry: None,
name: String::new(),
}
}
pub fn with_retry(mut self, retry: JobRetryOverride) -> Self {
self.retry = Some(retry);
self
}
pub fn with_name(mut self, name: impl Into<String>) -> Self {
self.name = name.into();
self
}
}
#[cfg(test)]
mod tests {
use super::*;
#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
struct Nested {
n: i64,
s: String,
}
#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
struct Sample {
name: String,
count: u32,
nested: Nested,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
struct LegacyJob<T> {
id: JobId,
payload: T,
created_at_ms: u64,
#[serde(default)]
attempt: u32,
}
#[test]
fn round_trip_messagepack() {
let job = Job::new(Sample {
name: "hello".into(),
count: 7,
nested: Nested {
n: -42,
s: "world".into(),
},
});
let bytes = rmp_serde::to_vec(&job).expect("encode");
let decoded: Job<Sample> = rmp_serde::from_slice(&bytes).expect("decode");
assert_eq!(job.id, decoded.id);
assert_eq!(job.payload, decoded.payload);
assert_eq!(job.created_at_ms, decoded.created_at_ms);
assert_eq!(decoded.retry, None);
assert_eq!(decoded.name, "");
}
#[test]
fn name_field_is_not_serialized() {
let job_named: Job<u32> = Job::with_id("test".into(), 42).with_name("send-email");
let job_unnamed: Job<u32> = Job::with_id("test".into(), 42);
let bytes_named = rmp_serde::to_vec(&job_named).expect("encode named");
let bytes_unnamed = rmp_serde::to_vec(&job_unnamed).expect("encode unnamed");
assert_eq!(
bytes_named, bytes_unnamed,
"name must be #[serde(skip)] — encoded bytes must match the unnamed shape"
);
let decoded: Job<u32> = rmp_serde::from_slice(&bytes_named).expect("decode");
assert_eq!(
decoded.name, "",
"decoded name defaults to empty; the n stream-entry field is the source of truth"
);
}
#[test]
fn new_populates_id_and_timestamp() {
let now_ms = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
let job = Job::new("payload");
ulid::Ulid::from_string(&job.id).expect("id is a valid ULID");
assert!(job.created_at_ms >= now_ms.saturating_sub(1_000));
assert!(job.created_at_ms <= now_ms + 1_000);
assert_eq!(job.retry, None);
}
#[test]
fn round_trip_with_retry_override() {
let job = Job::new(Sample {
name: "x".into(),
count: 1,
nested: Nested {
n: 0,
s: "y".into(),
},
})
.with_retry(JobRetryOverride {
max_attempts: Some(7),
backoff: Some(BackoffSpec {
kind: BackoffKind::Exponential,
delay_ms: 250,
max_delay_ms: Some(10_000),
multiplier: Some(3.0),
jitter_ms: Some(50),
}),
});
let bytes = rmp_serde::to_vec(&job).expect("encode");
let decoded: Job<Sample> = rmp_serde::from_slice(&bytes).expect("decode");
assert_eq!(decoded.retry, job.retry);
}
#[test]
fn forward_compat_legacy_payload_decodes_with_retry_none() {
let legacy = LegacyJob {
id: "legacy-1".to_string(),
payload: Sample {
name: "old".into(),
count: 3,
nested: Nested {
n: 1,
s: "z".into(),
},
},
created_at_ms: 1_700_000_000_000,
attempt: 2,
};
let bytes = rmp_serde::to_vec(&legacy).expect("encode legacy");
let decoded: Job<Sample> = rmp_serde::from_slice(&bytes).expect("decode into new shape");
assert_eq!(decoded.id, "legacy-1");
assert_eq!(decoded.attempt, 2);
assert_eq!(decoded.payload.name, "old");
assert_eq!(decoded.retry, None);
}
#[test]
fn backoff_kind_serializes_as_lowercase_string() {
let exp = rmp_serde::to_vec(&BackoffKind::Exponential).expect("encode exp");
let from_str = rmp_serde::to_vec(&"exponential".to_string()).expect("encode str");
assert_eq!(
exp, from_str,
"BackoffKind::Exponential must encode as msgpack string \"exponential\""
);
let fixed = rmp_serde::to_vec(&BackoffKind::Fixed).expect("encode fixed");
let from_str = rmp_serde::to_vec(&"fixed".to_string()).expect("encode str");
assert_eq!(
fixed, from_str,
"BackoffKind::Fixed must encode as msgpack string \"fixed\""
);
}
#[test]
fn legacy_backoff_kind_string_decodes_into_enum() {
#[derive(Serialize, Deserialize)]
struct LegacyBackoffSpec {
kind: String,
delay_ms: u64,
max_delay_ms: Option<u64>,
multiplier: Option<f64>,
jitter_ms: Option<u64>,
}
for (legacy_kind, want) in [
("exponential", BackoffKind::Exponential),
("fixed", BackoffKind::Fixed),
] {
let legacy = LegacyBackoffSpec {
kind: legacy_kind.to_string(),
delay_ms: 250,
max_delay_ms: Some(10_000),
multiplier: Some(3.0),
jitter_ms: Some(50),
};
let bytes = rmp_serde::to_vec(&legacy).expect("encode legacy");
let decoded: BackoffSpec =
rmp_serde::from_slice(&bytes).expect("decode legacy into new shape");
assert_eq!(decoded.kind, want, "wire-format mismatch for {legacy_kind}");
assert_eq!(decoded.delay_ms, 250);
assert_eq!(decoded.max_delay_ms, Some(10_000));
assert_eq!(decoded.multiplier, Some(3.0));
assert_eq!(decoded.jitter_ms, Some(50));
}
}
#[test]
fn legacy_backoff_kind_unknown_string_decodes_as_unknown_variant() {
#[derive(Serialize, Deserialize)]
struct LegacyBackoffSpec {
kind: String,
delay_ms: u64,
max_delay_ms: Option<u64>,
multiplier: Option<f64>,
jitter_ms: Option<u64>,
}
let legacy = LegacyBackoffSpec {
kind: "linear".to_string(),
delay_ms: 100,
max_delay_ms: None,
multiplier: None,
jitter_ms: None,
};
let bytes = rmp_serde::to_vec(&legacy).expect("encode legacy");
let decoded: BackoffSpec =
rmp_serde::from_slice(&bytes).expect("decode unknown kind into new shape");
assert_eq!(decoded.kind, BackoffKind::Unknown);
assert_eq!(decoded.delay_ms, 100);
}
#[test]
fn backoff_spec_round_trips_with_enum_kind() {
let spec = BackoffSpec {
kind: BackoffKind::Fixed,
delay_ms: 80,
max_delay_ms: Some(1_000),
multiplier: Some(2.0),
jitter_ms: Some(10),
};
let bytes = rmp_serde::to_vec(&spec).expect("encode");
let decoded: BackoffSpec = rmp_serde::from_slice(&bytes).expect("decode");
assert_eq!(decoded, spec);
}
#[test]
fn new_payload_without_override_decodes_in_legacy_consumer() {
let job: Job<Sample> = Job::new(Sample {
name: "new".into(),
count: 1,
nested: Nested { n: 0, s: "".into() },
});
let bytes = rmp_serde::to_vec(&job).expect("encode new");
let decoded: LegacyJob<Sample> =
rmp_serde::from_slice(&bytes).expect("decode into legacy shape");
assert_eq!(decoded.id, job.id);
assert_eq!(decoded.attempt, 0);
}
#[test]
fn legacy_consumer_fails_on_new_payload_with_retry_some() {
#[derive(serde::Deserialize, Debug)]
#[allow(dead_code)]
struct LegacyJob {
id: String,
payload: u32,
created_at_ms: u64,
attempt: u32,
}
let job = Job::with_id("test".into(), 42_u32).with_retry(JobRetryOverride {
max_attempts: Some(7),
backoff: None,
});
let bytes = rmp_serde::to_vec(&job).expect("encode");
let res: std::result::Result<LegacyJob, _> = rmp_serde::from_slice(&bytes);
assert!(
res.is_err(),
"expected legacy 4-field decode to fail on 5-field payload, got {res:?}"
);
let msg = format!("{:?}", res.unwrap_err());
assert!(
msg.contains("incorrect length") || msg.contains("4") || msg.contains("array"),
"unexpected error message: {msg}"
);
}
#[test]
fn legacy_consumer_decodes_new_payload_with_retry_none() {
#[derive(serde::Deserialize, Debug, PartialEq)]
struct LegacyJob {
id: String,
payload: u32,
created_at_ms: u64,
attempt: u32,
}
let job = Job::with_id("test".into(), 42_u32);
assert!(job.retry.is_none());
let bytes = rmp_serde::to_vec(&job).expect("encode");
let decoded: LegacyJob = rmp_serde::from_slice(&bytes).expect("decode");
assert_eq!(decoded.id, "test");
assert_eq!(decoded.payload, 42);
assert_eq!(decoded.attempt, 0);
}
#[test]
fn empty_override_with_no_inner_fields_set_is_inert() {
let opts = JobRetryOverride {
max_attempts: None,
backoff: None,
};
let job: Job<u32> = Job::with_id("x".into(), 0).with_retry(opts);
let bytes = rmp_serde::to_vec(&job).expect("encode");
let back: Job<u32> = rmp_serde::from_slice(&bytes).expect("round-trip");
assert!(back.retry.is_some(), "outer retry must survive encode");
let r = back.retry.unwrap();
assert!(
r.max_attempts.is_none(),
"empty override stays empty after round-trip"
);
assert!(
r.backoff.is_none(),
"empty override stays empty after round-trip"
);
}
}