use super::Mailbox;
#[derive(Debug, Clone)]
pub(crate) struct CheckpointRepairTask {
pub thread_id: String,
pub run_id: String,
pub first_seq: u64,
pub last_seq: u64,
}
impl Mailbox {
const CHECKPOINT_REPAIR_QUEUE_CAP: usize = 1024;
pub(super) fn enqueue_checkpoint_repair(&self, task: CheckpointRepairTask) {
if self.server_event_publisher.is_none() {
return;
}
let mut queue = self
.checkpoint_repair_queue
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner());
if queue.len() >= Self::CHECKPOINT_REPAIR_QUEUE_CAP {
if let Some(dropped) = queue.pop_front() {
crate::metrics::inc_mailbox_operation_by("checkpoint_repair_queue", "dropped", 1);
tracing::warn!(
thread_id = %dropped.thread_id,
run_id = %dropped.run_id,
cap = Self::CHECKPOINT_REPAIR_QUEUE_CAP,
"checkpoint repair queue full; dropped oldest task (startup repair remains backstop)"
);
}
}
queue.push_back(task);
}
pub(super) async fn drain_checkpoint_repair_queue(&self) {
if self.server_event_publisher.is_none() {
return;
}
let tasks: Vec<CheckpointRepairTask> = {
let mut queue = self
.checkpoint_repair_queue
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner());
queue.drain(..).collect()
};
for task in tasks {
let messages = match self.run_store.load_messages(&task.thread_id).await {
Ok(Some(messages)) => messages,
Ok(None) => {
tracing::warn!(
thread_id = %task.thread_id,
run_id = %task.run_id,
"checkpoint repair retry found no committed messages; re-queued"
);
self.enqueue_checkpoint_repair(task);
continue;
}
Err(error) => {
tracing::warn!(
thread_id = %task.thread_id,
run_id = %task.run_id,
error = %error,
"checkpoint repair retry could not load messages; re-queued"
);
self.enqueue_checkpoint_repair(task);
continue;
}
};
if let Err(error) = self
.record_thread_message_checkpoint_events(
&task.thread_id,
&task.run_id,
&messages,
task.first_seq,
task.last_seq,
)
.await
{
tracing::warn!(
thread_id = %task.thread_id,
run_id = %task.run_id,
error = %error,
"checkpoint repair retry failed; re-queued for next sweep"
);
self.enqueue_checkpoint_repair(task);
}
}
}
}