use serde::{Deserialize, Serialize};
use super::{Message, gen_message_id};
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum DeliveryBoundary {
Interrupt,
NextStep,
OnNaturalEnd,
ResumeInput,
#[default]
NewRun,
}
impl DeliveryBoundary {
#[must_use]
pub fn eligible_at(self, current: Self) -> bool {
if self == Self::ResumeInput || current == Self::ResumeInput {
return self == current;
}
self.precedence() <= current.precedence()
}
fn precedence(self) -> u8 {
match self {
Self::Interrupt => 0,
Self::NextStep => 1,
Self::OnNaturalEnd => 2,
Self::ResumeInput => 3,
Self::NewRun => 4,
}
}
}
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum DeliveryGranularity {
One,
#[default]
Batch,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct DeliveryMode {
#[serde(default)]
pub boundary: DeliveryBoundary,
#[serde(default)]
pub granularity: DeliveryGranularity,
#[serde(default, skip_serializing_if = "is_false")]
pub barrier: bool,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub target_run_id: Option<String>,
#[serde(default = "default_true", skip_serializing_if = "is_true")]
pub fallback_to_new_run: bool,
}
impl Default for DeliveryMode {
fn default() -> Self {
Self::new_run(DeliveryGranularity::Batch)
}
}
impl DeliveryMode {
#[must_use]
pub fn interrupt(granularity: DeliveryGranularity) -> Self {
Self {
boundary: DeliveryBoundary::Interrupt,
granularity,
barrier: false,
target_run_id: None,
fallback_to_new_run: true,
}
}
#[must_use]
pub fn next_step(granularity: DeliveryGranularity) -> Self {
Self {
boundary: DeliveryBoundary::NextStep,
granularity,
barrier: false,
target_run_id: None,
fallback_to_new_run: true,
}
}
#[must_use]
pub fn on_natural_end(granularity: DeliveryGranularity) -> Self {
Self {
boundary: DeliveryBoundary::OnNaturalEnd,
granularity,
barrier: false,
target_run_id: None,
fallback_to_new_run: true,
}
}
#[must_use]
pub fn new_run(granularity: DeliveryGranularity) -> Self {
Self {
boundary: DeliveryBoundary::NewRun,
granularity,
barrier: false,
target_run_id: None,
fallback_to_new_run: true,
}
}
#[must_use]
pub fn resume_input(granularity: DeliveryGranularity, run_id: impl Into<String>) -> Self {
Self {
boundary: DeliveryBoundary::ResumeInput,
granularity,
barrier: false,
target_run_id: Some(run_id.into()),
fallback_to_new_run: false,
}
}
#[must_use]
pub fn targeted_to_run(mut self, run_id: impl Into<String>, fallback_to_new_run: bool) -> Self {
self.target_run_id = Some(run_id.into());
self.fallback_to_new_run = fallback_to_new_run;
self
}
}
fn is_false(value: &bool) -> bool {
!*value
}
fn is_true(value: &bool) -> bool {
*value
}
fn default_true() -> bool {
true
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PendingMessageRecord {
pub pending_id: String,
pub thread_id: String,
pub position: u64,
pub message: Message,
#[serde(default = "default_pending_revision")]
pub revision: u64,
#[serde(default)]
pub delivery_mode: DeliveryMode,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub created_at: Option<u64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub updated_at: Option<u64>,
}
impl PendingMessageRecord {
pub fn from_message(
thread_id: impl Into<String>,
position: u64,
mut message: Message,
delivery_mode: DeliveryMode,
) -> Self {
let pending_id = message.id.clone().unwrap_or_else(gen_message_id);
if message.id.is_none() {
message.id = Some(pending_id.clone());
}
Self {
pending_id,
thread_id: thread_id.into(),
position,
message,
revision: default_pending_revision(),
delivery_mode,
created_at: None,
updated_at: None,
}
}
}
fn default_pending_revision() -> u64 {
1
}
#[must_use]
pub fn pending_queue_revision(pending: &[PendingMessageRecord]) -> u64 {
const FNV_OFFSET: u64 = 0xcbf2_9ce4_8422_2325;
const FNV_PRIME: u64 = 0x0000_0100_0000_01b3;
let mut hash = FNV_OFFSET;
for record in pending {
for byte in record.pending_id.as_bytes() {
hash ^= u64::from(*byte);
hash = hash.wrapping_mul(FNV_PRIME);
}
for value in [record.position, record.revision] {
for byte in value.to_le_bytes() {
hash ^= u64::from(byte);
hash = hash.wrapping_mul(FNV_PRIME);
}
}
}
hash
}
#[must_use]
pub fn select_pending_for_freeze(
pending: &[PendingMessageRecord],
boundary: DeliveryBoundary,
) -> Vec<usize> {
select_pending_for_freeze_for_run(pending, boundary, None)
}
#[must_use]
pub fn select_pending_for_freeze_for_run(
pending: &[PendingMessageRecord],
boundary: DeliveryBoundary,
current_run_id: Option<&str>,
) -> Vec<usize> {
let mut selected = Vec::new();
let mut skipped_prior = false;
for (index, entry) in pending.iter().enumerate() {
if !eligible_for_freeze(entry, boundary, current_run_id) {
if entry.delivery_mode.barrier {
break;
}
if can_skip_ineligible(entry, boundary) {
skipped_prior = true;
continue;
}
break;
}
if entry.delivery_mode.barrier && skipped_prior {
break;
}
if !selected.is_empty() && entry.delivery_mode.granularity == DeliveryGranularity::One {
break;
}
selected.push(index);
if entry.delivery_mode.barrier
|| entry.delivery_mode.granularity == DeliveryGranularity::One
{
break;
}
}
selected
}
fn eligible_for_freeze(
entry: &PendingMessageRecord,
boundary: DeliveryBoundary,
current_run_id: Option<&str>,
) -> bool {
let mode = &entry.delivery_mode;
if let Some(target_run_id) = mode.target_run_id.as_deref() {
if boundary == DeliveryBoundary::NewRun {
if !mode.fallback_to_new_run {
return false;
}
} else if Some(target_run_id) != current_run_id {
return false;
}
}
mode.boundary.eligible_at(boundary)
}
fn can_skip_ineligible(entry: &PendingMessageRecord, boundary: DeliveryBoundary) -> bool {
if boundary != DeliveryBoundary::NewRun
&& entry.delivery_mode.boundary == DeliveryBoundary::NewRun
{
return true;
}
boundary == DeliveryBoundary::NewRun
&& entry.delivery_mode.boundary != DeliveryBoundary::NewRun
&& !entry.delivery_mode.fallback_to_new_run
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn pending_record_without_delivery_mode_migrates_to_new_run_batch() {
let record = PendingMessageRecord::from_message(
"t1",
1,
Message::user("hi").with_id("p1".to_string()),
DeliveryMode::next_step(DeliveryGranularity::One),
);
let mut value = serde_json::to_value(&record).unwrap();
value
.as_object_mut()
.unwrap()
.remove("delivery_mode")
.expect("serialized record carries delivery_mode");
let parsed: PendingMessageRecord = serde_json::from_value(value).unwrap();
assert_eq!(parsed.delivery_mode.boundary, DeliveryBoundary::NewRun);
assert_eq!(parsed.delivery_mode.granularity, DeliveryGranularity::Batch);
assert!(!parsed.delivery_mode.barrier);
assert_eq!(parsed.delivery_mode.target_run_id, None);
assert!(parsed.delivery_mode.fallback_to_new_run);
}
fn pending(
id: &str,
position: u64,
boundary: DeliveryBoundary,
granularity: DeliveryGranularity,
) -> PendingMessageRecord {
PendingMessageRecord::from_message(
"thread-1",
position,
Message::user(id).with_id(id.to_string()),
DeliveryMode {
boundary,
granularity,
barrier: false,
target_run_id: None,
fallback_to_new_run: true,
},
)
}
fn pending_with_barrier(
id: &str,
position: u64,
boundary: DeliveryBoundary,
granularity: DeliveryGranularity,
) -> PendingMessageRecord {
let mut record = pending(id, position, boundary, granularity);
record.delivery_mode.barrier = true;
record
}
#[test]
fn delivery_boundary_fallback_cascades_forward() {
assert!(DeliveryBoundary::Interrupt.eligible_at(DeliveryBoundary::Interrupt));
assert!(DeliveryBoundary::Interrupt.eligible_at(DeliveryBoundary::NextStep));
assert!(DeliveryBoundary::NextStep.eligible_at(DeliveryBoundary::OnNaturalEnd));
assert!(DeliveryBoundary::OnNaturalEnd.eligible_at(DeliveryBoundary::NewRun));
assert!(DeliveryBoundary::NewRun.eligible_at(DeliveryBoundary::NewRun));
assert!(!DeliveryBoundary::NewRun.eligible_at(DeliveryBoundary::OnNaturalEnd));
assert!(!DeliveryBoundary::OnNaturalEnd.eligible_at(DeliveryBoundary::NextStep));
}
#[test]
fn pending_record_uses_message_id_as_pending_id() {
let record = PendingMessageRecord::from_message(
"thread-1",
1,
Message::user("hello").with_id("msg-1".to_string()),
DeliveryMode::new_run(DeliveryGranularity::Batch),
);
assert_eq!(record.pending_id, "msg-1");
assert_eq!(record.message.id.as_deref(), Some("msg-1"));
assert_eq!(record.thread_id, "thread-1");
assert_eq!(record.position, 1);
assert_eq!(record.delivery_mode.boundary, DeliveryBoundary::NewRun);
}
#[test]
fn pending_record_assigns_generated_id_to_message() {
let record = PendingMessageRecord::from_message(
"thread-1",
1,
Message::user("hello"),
DeliveryMode::new_run(DeliveryGranularity::Batch),
);
assert_eq!(
record.message.id.as_deref(),
Some(record.pending_id.as_str())
);
}
#[test]
fn pending_record_missing_delivery_mode_defaults_to_new_run_batch() {
let json = serde_json::json!({
"pending_id": "pending-1",
"thread_id": "thread-1",
"position": 1,
"message": Message::user("hello").with_id("pending-1".to_string())
});
let record: PendingMessageRecord = serde_json::from_value(json).unwrap();
assert_eq!(
record.delivery_mode,
DeliveryMode::new_run(DeliveryGranularity::Batch)
);
}
#[test]
fn freeze_selection_takes_one_when_first_eligible_is_one() {
let pending = vec![
pending("a", 1, DeliveryBoundary::NewRun, DeliveryGranularity::One),
pending("b", 2, DeliveryBoundary::NewRun, DeliveryGranularity::Batch),
];
assert_eq!(
select_pending_for_freeze(&pending, DeliveryBoundary::NewRun),
vec![0]
);
}
#[test]
fn freeze_selection_batches_all_eligible_at_boundary() {
let pending = vec![
pending(
"a",
1,
DeliveryBoundary::NextStep,
DeliveryGranularity::Batch,
),
pending(
"b",
2,
DeliveryBoundary::OnNaturalEnd,
DeliveryGranularity::Batch,
),
pending("c", 3, DeliveryBoundary::NewRun, DeliveryGranularity::Batch),
];
assert_eq!(
select_pending_for_freeze(&pending, DeliveryBoundary::OnNaturalEnd),
vec![0, 1]
);
}
#[test]
fn freeze_selection_global_barrier_blocks_skipped_prior_pending() {
let pending = vec![
pending("a", 1, DeliveryBoundary::NewRun, DeliveryGranularity::Batch),
pending_with_barrier(
"b",
2,
DeliveryBoundary::NextStep,
DeliveryGranularity::Batch,
),
pending(
"c",
3,
DeliveryBoundary::NextStep,
DeliveryGranularity::Batch,
),
];
assert_eq!(
select_pending_for_freeze(&pending, DeliveryBoundary::NextStep),
Vec::<usize>::new()
);
}
#[test]
fn freeze_selection_frontier_barrier_blocks_active_lane_skip() {
let pending = vec![
pending_with_barrier(
"queued",
1,
DeliveryBoundary::NewRun,
DeliveryGranularity::Batch,
),
pending(
"live",
2,
DeliveryBoundary::NextStep,
DeliveryGranularity::Batch,
),
];
assert_eq!(
select_pending_for_freeze(&pending, DeliveryBoundary::NextStep),
Vec::<usize>::new()
);
}
#[test]
fn freeze_selection_active_lane_skips_queued_new_run() {
let pending = vec![
pending(
"queued",
1,
DeliveryBoundary::NewRun,
DeliveryGranularity::Batch,
),
pending(
"live",
2,
DeliveryBoundary::NextStep,
DeliveryGranularity::Batch,
),
];
assert_eq!(
select_pending_for_freeze(&pending, DeliveryBoundary::NextStep),
vec![1]
);
}
#[test]
fn freeze_selection_interrupt_skips_queued_new_run() {
let pending = vec![
pending(
"queued",
1,
DeliveryBoundary::NewRun,
DeliveryGranularity::Batch,
),
pending(
"interrupt",
2,
DeliveryBoundary::Interrupt,
DeliveryGranularity::Batch,
),
];
assert_eq!(
select_pending_for_freeze(&pending, DeliveryBoundary::Interrupt),
vec![1]
);
}
#[test]
fn freeze_selection_preserves_fifo_within_active_lane() {
let pending = vec![
pending(
"natural-end",
1,
DeliveryBoundary::OnNaturalEnd,
DeliveryGranularity::Batch,
),
pending(
"live",
2,
DeliveryBoundary::NextStep,
DeliveryGranularity::Batch,
),
];
assert_eq!(
select_pending_for_freeze(&pending, DeliveryBoundary::NextStep),
Vec::<usize>::new()
);
}
#[test]
fn freeze_selection_new_run_skips_targeted_active_message_without_fallback() {
let mut live = pending(
"live",
1,
DeliveryBoundary::NextStep,
DeliveryGranularity::Batch,
);
live.delivery_mode = live.delivery_mode.targeted_to_run("run-a", false);
let pending = vec![
live,
pending(
"queued",
2,
DeliveryBoundary::NewRun,
DeliveryGranularity::Batch,
),
];
assert_eq!(
select_pending_for_freeze_for_run(&pending, DeliveryBoundary::NewRun, Some("run-b")),
vec![1]
);
}
#[test]
fn freeze_selection_active_message_requires_matching_target_run() {
let mut live = pending(
"live",
1,
DeliveryBoundary::NextStep,
DeliveryGranularity::Batch,
);
live.delivery_mode = live.delivery_mode.targeted_to_run("run-a", false);
assert_eq!(
select_pending_for_freeze_for_run(&[live.clone()], DeliveryBoundary::NextStep, None),
Vec::<usize>::new()
);
assert_eq!(
select_pending_for_freeze_for_run(&[live], DeliveryBoundary::NextStep, Some("run-a")),
vec![0]
);
}
#[test]
fn resume_input_only_consumes_matching_resume_boundary() {
let pending = vec![
pending(
"queued",
1,
DeliveryBoundary::NewRun,
DeliveryGranularity::Batch,
),
PendingMessageRecord::from_message(
"thread-1",
2,
Message::user("resume").with_id("resume".to_string()),
DeliveryMode::resume_input(DeliveryGranularity::Batch, "run-r"),
),
];
assert_eq!(
select_pending_for_freeze_for_run(
&pending,
DeliveryBoundary::ResumeInput,
Some("run-r")
),
vec![1]
);
assert_eq!(
select_pending_for_freeze_for_run(&pending, DeliveryBoundary::NewRun, Some("run-r")),
vec![0]
);
}
#[test]
fn freeze_selection_barrier_stops_batch_before_later_messages() {
let pending = vec![
pending(
"a",
1,
DeliveryBoundary::NextStep,
DeliveryGranularity::Batch,
),
pending_with_barrier(
"barrier",
2,
DeliveryBoundary::NextStep,
DeliveryGranularity::Batch,
),
pending(
"c",
3,
DeliveryBoundary::NextStep,
DeliveryGranularity::Batch,
),
];
assert_eq!(
select_pending_for_freeze(&pending, DeliveryBoundary::NextStep),
vec![0, 1]
);
}
#[test]
fn freeze_selection_batch_stops_before_later_one_message() {
let pending = vec![
pending(
"batch-1",
1,
DeliveryBoundary::NextStep,
DeliveryGranularity::Batch,
),
pending(
"one",
2,
DeliveryBoundary::NextStep,
DeliveryGranularity::One,
),
pending(
"batch-2",
3,
DeliveryBoundary::NextStep,
DeliveryGranularity::Batch,
),
];
assert_eq!(
select_pending_for_freeze(&pending, DeliveryBoundary::NextStep),
vec![0]
);
}
}