Skip to main content

harn_vm/triggers/
worker_queue.rs

1use std::collections::{BTreeMap, BTreeSet};
2use std::sync::Arc;
3use std::time::Duration as StdDuration;
4
5use serde::{Deserialize, Serialize};
6use uuid::Uuid;
7
8use crate::event_log::{
9    sanitize_topic_component, AnyEventLog, EventLog, LogError, LogEvent, Topic,
10};
11
12use super::{DispatchOutcome, TriggerEvent};
13
14pub const WORKER_QUEUE_CATALOG_TOPIC: &str = "worker.queues";
15const WORKER_QUEUE_CLAIMS_SUFFIX: &str = ".claims";
16const WORKER_QUEUE_RESPONSES_SUFFIX: &str = ".responses";
17const NORMAL_PROMOTION_AGE_MS: i64 = 15 * 60 * 1000;
18
19#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
20#[serde(rename_all = "lowercase")]
21pub enum WorkerQueuePriority {
22    High,
23    #[default]
24    Normal,
25    Low,
26}
27
28impl WorkerQueuePriority {
29    pub fn as_str(self) -> &'static str {
30        match self {
31            Self::High => "high",
32            Self::Normal => "normal",
33            Self::Low => "low",
34        }
35    }
36
37    fn effective_rank(self, enqueued_at_ms: i64, now_ms: i64) -> u8 {
38        match self {
39            Self::High => 0,
40            Self::Normal if now_ms.saturating_sub(enqueued_at_ms) >= NORMAL_PROMOTION_AGE_MS => 0,
41            Self::Normal => 1,
42            Self::Low => 2,
43        }
44    }
45}
46
47#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
48pub struct WorkerQueueJob {
49    pub queue: String,
50    pub trigger_id: String,
51    pub binding_key: String,
52    pub binding_version: u32,
53    pub event: TriggerEvent,
54    #[serde(default)]
55    pub replay_of_event_id: Option<String>,
56    #[serde(default)]
57    pub priority: WorkerQueuePriority,
58}
59
60#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
61pub struct WorkerQueueEnqueueReceipt {
62    pub queue: String,
63    pub job_event_id: u64,
64    pub response_topic: String,
65}
66
67#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
68pub struct WorkerQueueClaimHandle {
69    pub queue: String,
70    pub job_event_id: u64,
71    pub claim_id: String,
72    pub consumer_id: String,
73    pub expires_at_ms: i64,
74}
75
76#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
77pub struct ClaimedWorkerJob {
78    pub handle: WorkerQueueClaimHandle,
79    pub job: WorkerQueueJob,
80}
81
82#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
83pub struct WorkerQueueResponseRecord {
84    pub queue: String,
85    pub job_event_id: u64,
86    pub consumer_id: String,
87    pub handled_at_ms: i64,
88    pub outcome: Option<DispatchOutcome>,
89    pub error: Option<String>,
90}
91
92#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
93pub struct WorkerQueueSummary {
94    pub queue: String,
95    pub ready: usize,
96    pub in_flight: usize,
97    pub acked: usize,
98    pub purged: usize,
99    pub responses: usize,
100    pub oldest_unclaimed_age_ms: Option<u64>,
101}
102
103#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
104pub struct WorkerQueueJobState {
105    pub job_event_id: u64,
106    pub enqueued_at_ms: i64,
107    pub job: WorkerQueueJob,
108    pub active_claim: Option<WorkerQueueClaimHandle>,
109    pub acked: bool,
110    pub purged: bool,
111}
112
113impl WorkerQueueJobState {
114    pub fn is_ready(&self) -> bool {
115        !self.acked && !self.purged && self.active_claim.is_none()
116    }
117}
118
119#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
120pub struct WorkerQueueState {
121    pub queue: String,
122    pub responses: Vec<WorkerQueueResponseRecord>,
123    pub jobs: Vec<WorkerQueueJobState>,
124}
125
126impl WorkerQueueState {
127    pub fn summary(&self, now_ms: i64) -> WorkerQueueSummary {
128        let ready = self.jobs.iter().filter(|job| job.is_ready()).count();
129        let in_flight = self
130            .jobs
131            .iter()
132            .filter(|job| !job.acked && !job.purged && job.active_claim.is_some())
133            .count();
134        let acked = self.jobs.iter().filter(|job| job.acked).count();
135        let purged = self.jobs.iter().filter(|job| job.purged).count();
136        let oldest_unclaimed_age_ms = self
137            .jobs
138            .iter()
139            .filter(|job| job.is_ready())
140            .map(|job| now_ms.saturating_sub(job.enqueued_at_ms).max(0) as u64)
141            .max();
142        WorkerQueueSummary {
143            queue: self.queue.clone(),
144            ready,
145            in_flight,
146            acked,
147            purged,
148            responses: self.responses.len(),
149            oldest_unclaimed_age_ms,
150        }
151    }
152
153    fn next_ready_job(&self, now_ms: i64) -> Option<&WorkerQueueJobState> {
154        self.jobs
155            .iter()
156            .filter(|job| job.is_ready())
157            .min_by_key(|job| {
158                (
159                    job.job.priority.effective_rank(job.enqueued_at_ms, now_ms),
160                    job.enqueued_at_ms,
161                    job.job_event_id,
162                )
163            })
164    }
165
166    fn active_claim_for(&self, job_event_id: u64) -> Option<&WorkerQueueClaimHandle> {
167        self.jobs
168            .iter()
169            .find(|job| job.job_event_id == job_event_id)
170            .and_then(|job| job.active_claim.as_ref())
171    }
172}
173
174#[derive(Clone)]
175pub struct WorkerQueue {
176    event_log: Arc<AnyEventLog>,
177}
178
179impl WorkerQueue {
180    pub fn new(event_log: Arc<AnyEventLog>) -> Self {
181        Self { event_log }
182    }
183
184    pub async fn enqueue(
185        &self,
186        job: &WorkerQueueJob,
187    ) -> Result<WorkerQueueEnqueueReceipt, LogError> {
188        let queue = job.queue.trim();
189        if queue.is_empty() {
190            return Err(LogError::Config(
191                "worker queue name cannot be empty".to_string(),
192            ));
193        }
194        let queue_name = queue.to_string();
195        let catalog_topic = Topic::new(WORKER_QUEUE_CATALOG_TOPIC)
196            .expect("static worker queue catalog topic should always be valid");
197        self.event_log
198            .append(
199                &catalog_topic,
200                LogEvent::new(
201                    "queue_seen",
202                    serde_json::to_value(WorkerQueueCatalogRecord {
203                        queue: queue_name.clone(),
204                    })
205                    .map_err(|error| LogError::Serde(error.to_string()))?,
206                ),
207            )
208            .await?;
209
210        let job_topic = job_topic(&queue_name)?;
211        let mut headers = BTreeMap::new();
212        headers.insert("queue".to_string(), queue_name.clone());
213        headers.insert("trigger_id".to_string(), job.trigger_id.clone());
214        headers.insert("binding_key".to_string(), job.binding_key.clone());
215        headers.insert("event_id".to_string(), job.event.id.0.clone());
216        headers.insert("priority".to_string(), job.priority.as_str().to_string());
217        let job_event_id = self
218            .event_log
219            .append(
220                &job_topic,
221                LogEvent::new(
222                    "trigger_dispatch",
223                    serde_json::to_value(job)
224                        .map_err(|error| LogError::Serde(error.to_string()))?,
225                )
226                .with_headers(headers),
227            )
228            .await?;
229        if let Some(metrics) = crate::active_metrics_registry() {
230            if let Ok(state) = self.queue_state(&queue_name).await {
231                let summary = state.summary(now_ms());
232                metrics.set_worker_queue_depth(
233                    &queue_name,
234                    (summary.ready + summary.in_flight) as u64,
235                );
236            }
237        }
238        Ok(WorkerQueueEnqueueReceipt {
239            queue: queue_name.clone(),
240            job_event_id,
241            response_topic: response_topic_name(&queue_name),
242        })
243    }
244
245    pub async fn known_queues(&self) -> Result<Vec<String>, LogError> {
246        let topic = Topic::new(WORKER_QUEUE_CATALOG_TOPIC)
247            .expect("static worker queue catalog topic should always be valid");
248        let events = self.event_log.read_range(&topic, None, usize::MAX).await?;
249        let mut queues = BTreeSet::new();
250        for (_, event) in events {
251            if event.kind != "queue_seen" {
252                continue;
253            }
254            let record: WorkerQueueCatalogRecord = serde_json::from_value(event.payload)
255                .map_err(|error| LogError::Serde(error.to_string()))?;
256            if !record.queue.trim().is_empty() {
257                queues.insert(record.queue);
258            }
259        }
260        Ok(queues.into_iter().collect())
261    }
262
263    pub async fn queue_state(&self, queue: &str) -> Result<WorkerQueueState, LogError> {
264        let queue_name = queue.trim();
265        if queue_name.is_empty() {
266            return Err(LogError::Config(
267                "worker queue name cannot be empty".to_string(),
268            ));
269        }
270        let now_ms = now_ms();
271        let job_events = self
272            .event_log
273            .read_range(&job_topic(queue_name)?, None, usize::MAX)
274            .await?;
275        let claim_events = self
276            .event_log
277            .read_range(&claims_topic(queue_name)?, None, usize::MAX)
278            .await?;
279        let response_events = self
280            .event_log
281            .read_range(&responses_topic(queue_name)?, None, usize::MAX)
282            .await?;
283
284        let mut jobs = BTreeMap::<u64, WorkerQueueJobStateInternal>::new();
285        for (job_event_id, event) in job_events {
286            if event.kind != "trigger_dispatch" {
287                continue;
288            }
289            let job: WorkerQueueJob = serde_json::from_value(event.payload)
290                .map_err(|error| LogError::Serde(error.to_string()))?;
291            jobs.insert(
292                job_event_id,
293                WorkerQueueJobStateInternal {
294                    job_event_id,
295                    enqueued_at_ms: event.occurred_at_ms,
296                    job,
297                    active_claim: None,
298                    acked: false,
299                    purged: false,
300                    seen_claim_ids: BTreeSet::new(),
301                },
302            );
303        }
304
305        for (_, event) in claim_events {
306            match event.kind.as_str() {
307                "job_claimed" => {
308                    let claim: WorkerQueueClaimRecord = serde_json::from_value(event.payload)
309                        .map_err(|error| LogError::Serde(error.to_string()))?;
310                    let Some(job) = jobs.get_mut(&claim.job_event_id) else {
311                        continue;
312                    };
313                    if job.acked || job.purged {
314                        continue;
315                    }
316                    job.seen_claim_ids.insert(claim.claim_id.clone());
317                    let can_take = job
318                        .active_claim
319                        .as_ref()
320                        .is_none_or(|active| active.expires_at_ms <= claim.claimed_at_ms);
321                    if can_take {
322                        job.active_claim = Some(WorkerQueueClaimHandle {
323                            queue: queue_name.to_string(),
324                            job_event_id: claim.job_event_id,
325                            claim_id: claim.claim_id,
326                            consumer_id: claim.consumer_id,
327                            expires_at_ms: claim.expires_at_ms,
328                        });
329                    }
330                }
331                "claim_renewed" => {
332                    let renewal: WorkerQueueClaimRenewalRecord =
333                        serde_json::from_value(event.payload)
334                            .map_err(|error| LogError::Serde(error.to_string()))?;
335                    let Some(job) = jobs.get_mut(&renewal.job_event_id) else {
336                        continue;
337                    };
338                    if let Some(active) = job.active_claim.as_mut() {
339                        if active.claim_id == renewal.claim_id {
340                            active.expires_at_ms = renewal.expires_at_ms;
341                        }
342                    }
343                }
344                "job_released" => {
345                    let release: WorkerQueueReleaseRecord =
346                        serde_json::from_value(event.payload)
347                            .map_err(|error| LogError::Serde(error.to_string()))?;
348                    let Some(job) = jobs.get_mut(&release.job_event_id) else {
349                        continue;
350                    };
351                    if job
352                        .active_claim
353                        .as_ref()
354                        .is_some_and(|active| active.claim_id == release.claim_id)
355                    {
356                        job.active_claim = None;
357                    }
358                }
359                "job_acked" => {
360                    let ack: WorkerQueueAckRecord = serde_json::from_value(event.payload)
361                        .map_err(|error| LogError::Serde(error.to_string()))?;
362                    let Some(job) = jobs.get_mut(&ack.job_event_id) else {
363                        continue;
364                    };
365                    if ack.claim_id.is_empty() || job.seen_claim_ids.contains(&ack.claim_id) {
366                        job.acked = true;
367                        job.active_claim = None;
368                    }
369                }
370                "job_purged" => {
371                    let purge: WorkerQueuePurgeRecord = serde_json::from_value(event.payload)
372                        .map_err(|error| LogError::Serde(error.to_string()))?;
373                    let Some(job) = jobs.get_mut(&purge.job_event_id) else {
374                        continue;
375                    };
376                    if !job.acked {
377                        job.purged = true;
378                        job.active_claim = None;
379                    }
380                }
381                _ => {}
382            }
383        }
384
385        let responses = response_events
386            .into_iter()
387            .filter(|(_, event)| event.kind == "job_response")
388            .map(|(_, event)| {
389                serde_json::from_value::<WorkerQueueResponseRecord>(event.payload)
390                    .map_err(|error| LogError::Serde(error.to_string()))
391            })
392            .collect::<Result<Vec<_>, _>>()?;
393
394        let mut queue_state = WorkerQueueState {
395            queue: queue_name.to_string(),
396            responses,
397            jobs: jobs
398                .into_values()
399                .map(|mut job| {
400                    if job
401                        .active_claim
402                        .as_ref()
403                        .is_some_and(|active| active.expires_at_ms <= now_ms)
404                    {
405                        job.active_claim = None;
406                    }
407                    WorkerQueueJobState {
408                        job_event_id: job.job_event_id,
409                        enqueued_at_ms: job.enqueued_at_ms,
410                        job: job.job,
411                        active_claim: job.active_claim,
412                        acked: job.acked,
413                        purged: job.purged,
414                    }
415                })
416                .collect(),
417        };
418        queue_state
419            .jobs
420            .sort_by_key(|job| (job.enqueued_at_ms, job.job_event_id));
421        Ok(queue_state)
422    }
423
424    pub async fn queue_summaries(&self) -> Result<Vec<WorkerQueueSummary>, LogError> {
425        let now_ms = now_ms();
426        let mut summaries = Vec::new();
427        for queue in self.known_queues().await? {
428            let state = self.queue_state(&queue).await?;
429            summaries.push(state.summary(now_ms));
430        }
431        summaries.sort_by(|left, right| left.queue.cmp(&right.queue));
432        Ok(summaries)
433    }
434
435    pub async fn claim_next(
436        &self,
437        queue: &str,
438        consumer_id: &str,
439        ttl: StdDuration,
440    ) -> Result<Option<ClaimedWorkerJob>, LogError> {
441        let queue_name = queue.trim();
442        if queue_name.is_empty() {
443            return Err(LogError::Config(
444                "worker queue name cannot be empty".to_string(),
445            ));
446        }
447        if consumer_id.trim().is_empty() {
448            return Err(LogError::InvalidConsumer(
449                "worker queue consumer id cannot be empty".to_string(),
450            ));
451        }
452        for _ in 0..8 {
453            let now_ms = now_ms();
454            let state = self.queue_state(queue_name).await?;
455            let Some(job) = state.next_ready_job(now_ms).cloned() else {
456                return Ok(None);
457            };
458            let claim = WorkerQueueClaimRecord {
459                job_event_id: job.job_event_id,
460                claim_id: Uuid::new_v4().to_string(),
461                consumer_id: consumer_id.to_string(),
462                claimed_at_ms: now_ms,
463                expires_at_ms: expiry_ms(now_ms, ttl),
464            };
465            self.event_log
466                .append(
467                    &claims_topic(queue_name)?,
468                    LogEvent::new(
469                        "job_claimed",
470                        serde_json::to_value(&claim)
471                            .map_err(|error| LogError::Serde(error.to_string()))?,
472                    ),
473                )
474                .await?;
475            let refreshed = self.queue_state(queue_name).await?;
476            if refreshed
477                .active_claim_for(job.job_event_id)
478                .is_some_and(|active| active.claim_id == claim.claim_id)
479            {
480                if let Some(metrics) = crate::active_metrics_registry() {
481                    let summary = refreshed.summary(now_ms);
482                    metrics.record_worker_queue_claim_age(
483                        queue_name,
484                        now_ms.saturating_sub(job.enqueued_at_ms) as f64 / 1000.0,
485                    );
486                    metrics.set_worker_queue_depth(
487                        queue_name,
488                        (summary.ready + summary.in_flight) as u64,
489                    );
490                }
491                return Ok(Some(ClaimedWorkerJob {
492                    handle: WorkerQueueClaimHandle {
493                        queue: queue_name.to_string(),
494                        job_event_id: claim.job_event_id,
495                        claim_id: claim.claim_id,
496                        consumer_id: claim.consumer_id,
497                        expires_at_ms: claim.expires_at_ms,
498                    },
499                    job: job.job,
500                }));
501            }
502        }
503        Ok(None)
504    }
505
506    pub async fn renew_claim(
507        &self,
508        handle: &WorkerQueueClaimHandle,
509        ttl: StdDuration,
510    ) -> Result<bool, LogError> {
511        let now_ms = now_ms();
512        let renewal = WorkerQueueClaimRenewalRecord {
513            job_event_id: handle.job_event_id,
514            claim_id: handle.claim_id.clone(),
515            consumer_id: handle.consumer_id.clone(),
516            renewed_at_ms: now_ms,
517            expires_at_ms: expiry_ms(now_ms, ttl),
518        };
519        self.event_log
520            .append(
521                &claims_topic(&handle.queue)?,
522                LogEvent::new(
523                    "claim_renewed",
524                    serde_json::to_value(&renewal)
525                        .map_err(|error| LogError::Serde(error.to_string()))?,
526                ),
527            )
528            .await?;
529        let refreshed = self.queue_state(&handle.queue).await?;
530        Ok(refreshed
531            .active_claim_for(handle.job_event_id)
532            .is_some_and(|active| active.claim_id == handle.claim_id))
533    }
534
535    pub async fn release_claim(
536        &self,
537        handle: &WorkerQueueClaimHandle,
538        reason: &str,
539    ) -> Result<(), LogError> {
540        let release = WorkerQueueReleaseRecord {
541            job_event_id: handle.job_event_id,
542            claim_id: handle.claim_id.clone(),
543            consumer_id: handle.consumer_id.clone(),
544            released_at_ms: now_ms(),
545            reason: if reason.trim().is_empty() {
546                None
547            } else {
548                Some(reason.to_string())
549            },
550        };
551        self.event_log
552            .append(
553                &claims_topic(&handle.queue)?,
554                LogEvent::new(
555                    "job_released",
556                    serde_json::to_value(&release)
557                        .map_err(|error| LogError::Serde(error.to_string()))?,
558                ),
559            )
560            .await?;
561        Ok(())
562    }
563
564    pub async fn append_response(
565        &self,
566        queue: &str,
567        response: &WorkerQueueResponseRecord,
568    ) -> Result<u64, LogError> {
569        self.event_log
570            .append(
571                &responses_topic(queue)?,
572                LogEvent::new(
573                    "job_response",
574                    serde_json::to_value(response)
575                        .map_err(|error| LogError::Serde(error.to_string()))?,
576                ),
577            )
578            .await
579    }
580
581    pub async fn ack_claim(&self, handle: &WorkerQueueClaimHandle) -> Result<u64, LogError> {
582        self.event_log
583            .append(
584                &claims_topic(&handle.queue)?,
585                LogEvent::new(
586                    "job_acked",
587                    serde_json::to_value(WorkerQueueAckRecord {
588                        job_event_id: handle.job_event_id,
589                        claim_id: handle.claim_id.clone(),
590                        consumer_id: handle.consumer_id.clone(),
591                        acked_at_ms: now_ms(),
592                    })
593                    .map_err(|error| LogError::Serde(error.to_string()))?,
594                ),
595            )
596            .await
597    }
598
599    pub async fn purge_unclaimed(
600        &self,
601        queue: &str,
602        purged_by: &str,
603        reason: Option<&str>,
604    ) -> Result<usize, LogError> {
605        let state = self.queue_state(queue).await?;
606        let ready_jobs: Vec<_> = state
607            .jobs
608            .into_iter()
609            .filter(|job| job.is_ready())
610            .map(|job| job.job_event_id)
611            .collect();
612        for job_event_id in &ready_jobs {
613            self.event_log
614                .append(
615                    &claims_topic(queue)?,
616                    LogEvent::new(
617                        "job_purged",
618                        serde_json::to_value(WorkerQueuePurgeRecord {
619                            job_event_id: *job_event_id,
620                            purged_by: purged_by.to_string(),
621                            purged_at_ms: now_ms(),
622                            reason: reason
623                                .filter(|value| !value.trim().is_empty())
624                                .map(|value| value.to_string()),
625                        })
626                        .map_err(|error| LogError::Serde(error.to_string()))?,
627                    ),
628                )
629                .await?;
630        }
631        Ok(ready_jobs.len())
632    }
633}
634
635#[derive(Clone, Debug)]
636struct WorkerQueueJobStateInternal {
637    job_event_id: u64,
638    enqueued_at_ms: i64,
639    job: WorkerQueueJob,
640    active_claim: Option<WorkerQueueClaimHandle>,
641    acked: bool,
642    purged: bool,
643    seen_claim_ids: BTreeSet<String>,
644}
645
646#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
647struct WorkerQueueCatalogRecord {
648    queue: String,
649}
650
651#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
652struct WorkerQueueClaimRecord {
653    job_event_id: u64,
654    claim_id: String,
655    consumer_id: String,
656    claimed_at_ms: i64,
657    expires_at_ms: i64,
658}
659
660#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
661struct WorkerQueueClaimRenewalRecord {
662    job_event_id: u64,
663    claim_id: String,
664    consumer_id: String,
665    renewed_at_ms: i64,
666    expires_at_ms: i64,
667}
668
669#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
670struct WorkerQueueReleaseRecord {
671    job_event_id: u64,
672    claim_id: String,
673    consumer_id: String,
674    released_at_ms: i64,
675    #[serde(default)]
676    reason: Option<String>,
677}
678
679#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
680struct WorkerQueueAckRecord {
681    job_event_id: u64,
682    claim_id: String,
683    consumer_id: String,
684    acked_at_ms: i64,
685}
686
687#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
688struct WorkerQueuePurgeRecord {
689    job_event_id: u64,
690    purged_by: String,
691    purged_at_ms: i64,
692    #[serde(default)]
693    reason: Option<String>,
694}
695
696pub fn job_topic_name(queue: &str) -> String {
697    format!("worker.{}", sanitize_topic_component(queue))
698}
699
700pub fn claims_topic_name(queue: &str) -> String {
701    format!("{}{}", job_topic_name(queue), WORKER_QUEUE_CLAIMS_SUFFIX)
702}
703
704pub fn response_topic_name(queue: &str) -> String {
705    format!("{}{}", job_topic_name(queue), WORKER_QUEUE_RESPONSES_SUFFIX)
706}
707
708fn job_topic(queue: &str) -> Result<Topic, LogError> {
709    Topic::new(job_topic_name(queue))
710}
711
712fn claims_topic(queue: &str) -> Result<Topic, LogError> {
713    Topic::new(claims_topic_name(queue))
714}
715
716fn responses_topic(queue: &str) -> Result<Topic, LogError> {
717    Topic::new(response_topic_name(queue))
718}
719
720fn now_ms() -> i64 {
721    std::time::SystemTime::now()
722        .duration_since(std::time::UNIX_EPOCH)
723        .map(|duration| duration.as_millis() as i64)
724        .unwrap_or(0)
725}
726
727fn expiry_ms(now_ms: i64, ttl: StdDuration) -> i64 {
728    now_ms.saturating_add(ttl.as_millis().min(i64::MAX as u128) as i64)
729}
730
731#[cfg(test)]
732mod tests {
733    use super::*;
734
735    use crate::event_log::{AnyEventLog, MemoryEventLog};
736    use crate::triggers::{
737        event::{GenericWebhookPayload, KnownProviderPayload},
738        ProviderId, ProviderPayload, SignatureStatus, TraceId, TriggerEvent,
739    };
740
741    fn test_event(id: &str) -> TriggerEvent {
742        TriggerEvent {
743            id: crate::triggers::TriggerEventId(id.to_string()),
744            provider: ProviderId::from("github"),
745            kind: "issues.opened".to_string(),
746            trace_id: TraceId("trace-test".to_string()),
747            dedupe_key: id.to_string(),
748            tenant_id: None,
749            headers: BTreeMap::new(),
750            batch: None,
751            raw_body: None,
752            provider_payload: ProviderPayload::Known(KnownProviderPayload::Webhook(
753                GenericWebhookPayload {
754                    source: Some("worker-queue-test".to_string()),
755                    content_type: Some("application/json".to_string()),
756                    raw: serde_json::json!({"id": id}),
757                },
758            )),
759            signature_status: SignatureStatus::Verified,
760            received_at: time::OffsetDateTime::now_utc(),
761            occurred_at: None,
762            dedupe_claimed: false,
763        }
764    }
765
766    fn test_job(
767        queue: &str,
768        trigger_id: &str,
769        event_id: &str,
770        priority: WorkerQueuePriority,
771    ) -> WorkerQueueJob {
772        WorkerQueueJob {
773            queue: queue.to_string(),
774            trigger_id: trigger_id.to_string(),
775            binding_key: format!("{trigger_id}@v1"),
776            binding_version: 1,
777            event: test_event(event_id),
778            replay_of_event_id: None,
779            priority,
780        }
781    }
782
783    #[tokio::test(flavor = "current_thread")]
784    async fn enqueue_and_summarize_queue() {
785        let log = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(32)));
786        let queue = WorkerQueue::new(log);
787        queue
788            .enqueue(&test_job(
789                "triage",
790                "incoming-review-task",
791                "evt-1",
792                WorkerQueuePriority::Normal,
793            ))
794            .await
795            .unwrap();
796        let summaries = queue.queue_summaries().await.unwrap();
797        assert_eq!(summaries.len(), 1);
798        assert_eq!(summaries[0].queue, "triage");
799        assert_eq!(summaries[0].ready, 1);
800        assert_eq!(summaries[0].in_flight, 0);
801    }
802
803    #[tokio::test(flavor = "current_thread")]
804    async fn claim_and_ack_remove_job_from_ready_pool() {
805        let log = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(32)));
806        let queue = WorkerQueue::new(log);
807        queue
808            .enqueue(&test_job(
809                "triage",
810                "incoming-review-task",
811                "evt-1",
812                WorkerQueuePriority::Normal,
813            ))
814            .await
815            .unwrap();
816        let claimed = queue
817            .claim_next("triage", "consumer-a", StdDuration::from_secs(60))
818            .await
819            .unwrap()
820            .unwrap();
821        let before_ack = queue.queue_state("triage").await.unwrap();
822        assert_eq!(before_ack.summary(now_ms()).ready, 0);
823        assert_eq!(before_ack.summary(now_ms()).in_flight, 1);
824        queue
825            .append_response(
826                "triage",
827                &WorkerQueueResponseRecord {
828                    queue: "triage".to_string(),
829                    job_event_id: claimed.handle.job_event_id,
830                    consumer_id: "consumer-a".to_string(),
831                    handled_at_ms: now_ms(),
832                    outcome: Some(DispatchOutcome {
833                        trigger_id: "incoming-review-task".to_string(),
834                        binding_key: "incoming-review-task@v1".to_string(),
835                        event_id: "evt-1".to_string(),
836                        attempt_count: 1,
837                        status: super::super::DispatchStatus::Succeeded,
838                        handler_kind: "local".to_string(),
839                        target_uri: "handlers::on_review".to_string(),
840                        replay_of_event_id: None,
841                        result: Some(serde_json::json!({"ok": true})),
842                        error: None,
843                    }),
844                    error: None,
845                },
846            )
847            .await
848            .unwrap();
849        queue.ack_claim(&claimed.handle).await.unwrap();
850        let after_ack = queue.queue_state("triage").await.unwrap();
851        let summary = after_ack.summary(now_ms());
852        assert_eq!(summary.ready, 0);
853        assert_eq!(summary.in_flight, 0);
854        assert_eq!(summary.acked, 1);
855        assert_eq!(summary.responses, 1);
856    }
857
858    #[tokio::test(flavor = "current_thread")]
859    async fn expired_claim_allows_reclaim() {
860        let log = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(32)));
861        let queue = WorkerQueue::new(log.clone());
862        let receipt = queue
863            .enqueue(&test_job(
864                "triage",
865                "incoming-review-task",
866                "evt-1",
867                WorkerQueuePriority::Normal,
868            ))
869            .await
870            .unwrap();
871        let expired_claim = WorkerQueueClaimRecord {
872            job_event_id: receipt.job_event_id,
873            claim_id: "expired-claim".to_string(),
874            consumer_id: "consumer-a".to_string(),
875            claimed_at_ms: now_ms().saturating_sub(2),
876            expires_at_ms: now_ms().saturating_sub(1),
877        };
878        log.append(
879            &claims_topic("triage").unwrap(),
880            LogEvent::new("job_claimed", serde_json::to_value(&expired_claim).unwrap()),
881        )
882        .await
883        .unwrap();
884        let second = queue
885            .claim_next("triage", "consumer-b", StdDuration::from_secs(60))
886            .await
887            .unwrap()
888            .unwrap();
889        assert_eq!(second.job.event.id.0, "evt-1");
890        assert_ne!(second.handle.claim_id, expired_claim.claim_id);
891        assert_eq!(second.handle.consumer_id, "consumer-b");
892    }
893
894    #[tokio::test(flavor = "current_thread")]
895    async fn high_priority_and_aged_normal_are_selected_first() {
896        let log = Arc::new(AnyEventLog::Memory(MemoryEventLog::new(32)));
897        let queue = WorkerQueue::new(log.clone());
898
899        let catalog_topic = Topic::new(WORKER_QUEUE_CATALOG_TOPIC).unwrap();
900        log.append(
901            &catalog_topic,
902            LogEvent::new("queue_seen", serde_json::json!({"queue":"triage"})),
903        )
904        .await
905        .unwrap();
906
907        let topic = job_topic("triage").unwrap();
908        let mut old_normal = LogEvent::new(
909            "trigger_dispatch",
910            serde_json::to_value(test_job(
911                "triage",
912                "incoming-review-task",
913                "evt-old-normal",
914                WorkerQueuePriority::Normal,
915            ))
916            .unwrap(),
917        );
918        old_normal.occurred_at_ms = now_ms() - NORMAL_PROMOTION_AGE_MS - 1_000;
919        log.append(&topic, old_normal).await.unwrap();
920
921        let high = LogEvent::new(
922            "trigger_dispatch",
923            serde_json::to_value(test_job(
924                "triage",
925                "incoming-review-task",
926                "evt-high",
927                WorkerQueuePriority::High,
928            ))
929            .unwrap(),
930        );
931        log.append(&topic, high).await.unwrap();
932
933        let claimed = queue
934            .claim_next("triage", "consumer-a", StdDuration::from_secs(60))
935            .await
936            .unwrap()
937            .unwrap();
938        assert_eq!(claimed.job.event.id.0, "evt-old-normal");
939    }
940}