Skip to main content

pgqrs/
workers.rs

1//! Worker, producer, and consumer interfaces.
2
3use crate::rate_limit::RateLimitStatus;
4use crate::store::{AnyStore, Store};
5pub use crate::types::{
6    QueueMessage, QueueRecord, RunRecord, StepRecord, WorkerRecord, WorkerStatus, WorkflowRecord,
7};
8use crate::validation::{PayloadValidator, ValidationConfig};
9use async_trait::async_trait;
10use chrono::{DateTime, Duration, Utc};
11use serde_json::Value;
12
13/// Common worker operations and lifecycle hooks.
14#[async_trait]
15pub trait Worker: Send + Sync {
16    /// Get the worker record for this instance.
17    fn worker_record(&self) -> &WorkerRecord;
18
19    /// Get the worker id for this instance.
20    fn worker_id(&self) -> i64 {
21        self.worker_record().id
22    }
23
24    async fn status(&self) -> crate::error::Result<WorkerStatus>;
25    async fn suspend(&self) -> crate::error::Result<()>;
26    async fn resume(&self) -> crate::error::Result<()>;
27    async fn shutdown(&self) -> crate::error::Result<()>;
28    async fn heartbeat(&self) -> crate::error::Result<()>;
29    async fn is_healthy(&self, max_age: Duration) -> crate::error::Result<bool>;
30}
31
32/// Admin operations for queues, workers, and stats.
33#[async_trait]
34pub trait Admin: Worker {
35    /// Verify the pgqrs schema is correctly installed.
36    async fn verify(&self) -> crate::error::Result<()>;
37
38    /// Delete a queue.
39    async fn delete_queue(&self, queue_info: &QueueRecord) -> crate::error::Result<()>;
40
41    /// Purge all messages and workers from a queue.
42    async fn purge_queue(&self, name: &str) -> crate::error::Result<()>;
43
44    /// Get IDs of messages in the dead letter queue.
45    async fn dlq(&self) -> crate::error::Result<Vec<i64>>;
46
47    /// Get metrics for a specific queue.
48    async fn queue_metrics(&self, name: &str) -> crate::error::Result<crate::stats::QueueMetrics>;
49
50    /// Get metrics for all queues.
51    async fn all_queues_metrics(&self) -> crate::error::Result<Vec<crate::stats::QueueMetrics>>;
52
53    /// Get system-wide statistics.
54    async fn system_stats(&self) -> crate::error::Result<crate::stats::SystemStats>;
55
56    /// Get worker health statistics.
57    async fn worker_health_stats(
58        &self,
59        heartbeat_timeout: Duration,
60        group_by_queue: bool,
61    ) -> crate::error::Result<Vec<crate::stats::WorkerHealthStats>>;
62
63    /// Get worker statistics for a queue.
64    async fn worker_stats(
65        &self,
66        queue_name: &str,
67    ) -> crate::error::Result<crate::stats::WorkerStats>;
68
69    /// Delete a worker by ID.
70    async fn delete_worker(&self, worker_id: i64) -> crate::error::Result<u64>;
71
72    /// Get messages currently held by a worker.
73    async fn get_worker_messages(&self, worker_id: i64) -> crate::error::Result<Vec<QueueMessage>>;
74
75    /// Reclaim messages that have exceeded their visibility timeout.
76    async fn reclaim_messages(
77        &self,
78        queue_id: i64,
79        older_than: Option<Duration>,
80    ) -> crate::error::Result<u64>;
81
82    /// Purge workers that haven't sent a heartbeat recently.
83    async fn purge_old_workers(&self, older_than: chrono::Duration) -> crate::error::Result<u64>;
84
85    /// Release all messages held by a worker.
86    async fn release_worker_messages(&self, worker_id: i64) -> crate::error::Result<u64>;
87}
88
89/// Producer for enqueueing messages to a queue.
90#[derive(Clone, Debug)]
91pub struct Producer {
92    store: AnyStore,
93    queue_info: QueueRecord,
94    worker_record: WorkerRecord,
95    validator: PayloadValidator,
96    current_time: Option<DateTime<Utc>>,
97}
98
99impl Producer {
100    /// Create a producer bound to a queue and worker record.
101    pub fn new(
102        store: AnyStore,
103        queue_info: QueueRecord,
104        worker_record: WorkerRecord,
105        validation_config: ValidationConfig,
106    ) -> Self {
107        Self {
108            store,
109            queue_info,
110            worker_record,
111            validator: PayloadValidator::new(validation_config),
112            current_time: None,
113        }
114    }
115
116    /// Set the current time used for enqueue timestamps.
117    pub fn with_time(mut self, time: DateTime<Utc>) -> Self {
118        self.current_time = Some(time);
119        self
120    }
121
122    /// Return the current time used for enqueue timestamps.
123    pub fn current_time(&self) -> DateTime<Utc> {
124        self.current_time.unwrap_or_else(Utc::now)
125    }
126
127    /// Return the worker id for this producer.
128    pub fn worker_id(&self) -> i64 {
129        self.worker_record.id
130    }
131
132    /// Return the worker record for this producer.
133    pub fn worker_record(&self) -> &WorkerRecord {
134        &self.worker_record
135    }
136
137    /// Fetch the current worker status.
138    pub async fn status(&self) -> crate::error::Result<WorkerStatus> {
139        self.store.workers().get_status(self.worker_record.id).await
140    }
141
142    /// Suspend this worker.
143    pub async fn suspend(&self) -> crate::error::Result<()> {
144        self.store.workers().suspend(self.worker_record.id).await
145    }
146
147    /// Resume this worker.
148    pub async fn resume(&self) -> crate::error::Result<()> {
149        self.store.workers().resume(self.worker_record.id).await
150    }
151
152    /// Shut down this worker.
153    pub async fn shutdown(&self) -> crate::error::Result<()> {
154        self.store.workers().shutdown(self.worker_record.id).await
155    }
156
157    /// Record a heartbeat for this worker.
158    pub async fn heartbeat(&self) -> crate::error::Result<()> {
159        self.store.workers().heartbeat(self.worker_record.id).await
160    }
161
162    /// Check if the worker heartbeat is within the given age.
163    pub async fn is_healthy(&self, max_age: Duration) -> crate::error::Result<bool> {
164        self.store
165            .workers()
166            .is_healthy(self.worker_record.id, max_age)
167            .await
168    }
169
170    /// Fetch a message by id.
171    pub async fn get_message_by_id(&self, msg_id: i64) -> crate::error::Result<QueueMessage> {
172        self.store.messages().get(msg_id).await
173    }
174
175    /// Enqueue a message immediately.
176    pub async fn enqueue(&self, payload: &Value) -> crate::error::Result<QueueMessage> {
177        self.enqueue_delayed(payload, 0).await
178    }
179
180    /// Enqueue a message with a delay in seconds.
181    pub async fn enqueue_delayed(
182        &self,
183        payload: &Value,
184        delay_seconds: u32,
185    ) -> crate::error::Result<QueueMessage> {
186        self.validator.validate(payload)?;
187
188        let now = self.current_time();
189        let vt = now + chrono::Duration::seconds(i64::from(delay_seconds));
190
191        let new_message = crate::types::NewQueueMessage {
192            queue_id: self.queue_info.id,
193            payload: payload.clone(),
194            read_ct: 0,
195            enqueued_at: now,
196            vt,
197            producer_worker_id: Some(self.worker_record.id),
198            consumer_worker_id: None,
199        };
200
201        self.store.messages().insert(new_message).await
202    }
203
204    /// Enqueue multiple messages immediately.
205    pub async fn batch_enqueue(
206        &self,
207        payloads: &[Value],
208    ) -> crate::error::Result<Vec<QueueMessage>> {
209        self.batch_enqueue_delayed(payloads, 0).await
210    }
211
212    /// Enqueue multiple messages with a delay in seconds.
213    pub async fn batch_enqueue_delayed(
214        &self,
215        payloads: &[Value],
216        delay_seconds: u32,
217    ) -> crate::error::Result<Vec<QueueMessage>> {
218        self.batch_enqueue_at(payloads, self.current_time(), delay_seconds)
219            .await
220    }
221
222    /// Enqueue a message using an explicit time reference.
223    pub async fn enqueue_at(
224        &self,
225        payload: &Value,
226        now: chrono::DateTime<chrono::Utc>,
227        delay_seconds: u32,
228    ) -> crate::error::Result<QueueMessage> {
229        self.validator.validate(payload)?;
230
231        let vt = now + chrono::Duration::seconds(i64::from(delay_seconds));
232
233        let new_message = crate::types::NewQueueMessage {
234            queue_id: self.queue_info.id,
235            payload: payload.clone(),
236            read_ct: 0,
237            enqueued_at: now,
238            vt,
239            producer_worker_id: Some(self.worker_record.id),
240            consumer_worker_id: None,
241        };
242
243        self.store.messages().insert(new_message).await
244    }
245
246    /// Enqueue multiple messages using an explicit time reference.
247    pub async fn batch_enqueue_at(
248        &self,
249        payloads: &[Value],
250        now: chrono::DateTime<chrono::Utc>,
251        delay_seconds: u32,
252    ) -> crate::error::Result<Vec<QueueMessage>> {
253        self.validator.validate_batch(payloads)?;
254
255        let vt = now + chrono::Duration::seconds(i64::from(delay_seconds));
256
257        let ids = self
258            .store
259            .messages()
260            .batch_insert(
261                self.queue_info.id,
262                payloads,
263                crate::types::BatchInsertParams {
264                    read_ct: 0,
265                    enqueued_at: now,
266                    vt,
267                    producer_worker_id: Some(self.worker_record.id),
268                    consumer_worker_id: None,
269                },
270            )
271            .await?;
272
273        self.store.messages().get_by_ids(&ids).await
274    }
275
276    /// Replay an archived DLQ message back into the queue.
277    pub async fn replay_dlq(
278        &self,
279        archived_msg_id: i64,
280    ) -> crate::error::Result<Option<QueueMessage>> {
281        self.store.messages().replay_dlq(archived_msg_id).await
282    }
283
284    /// Return the validation config for this producer.
285    pub fn validation_config(&self) -> &ValidationConfig {
286        self.validator.config()
287    }
288
289    /// Return the current rate limit status, if enabled.
290    pub fn rate_limit_status(&self) -> Option<RateLimitStatus> {
291        self.validator.rate_limit_status()
292    }
293}
294
295/// Consumer for dequeueing and managing messages.
296#[derive(Clone, Debug)]
297pub struct Consumer {
298    store: AnyStore,
299    queue_info: QueueRecord,
300    worker_record: WorkerRecord,
301    current_time: Option<DateTime<Utc>>,
302}
303
304impl Consumer {
305    /// Create a consumer bound to a queue and worker record.
306    pub fn new(store: AnyStore, queue_info: QueueRecord, worker_record: WorkerRecord) -> Self {
307        Self {
308            store,
309            queue_info,
310            worker_record,
311            current_time: None,
312        }
313    }
314
315    /// Set the current time used for dequeue timestamps.
316    pub fn with_time(mut self, time: DateTime<Utc>) -> Self {
317        self.current_time = Some(time);
318        self
319    }
320
321    /// Return the current time used for dequeue timestamps.
322    pub fn current_time(&self) -> DateTime<Utc> {
323        self.current_time.unwrap_or_else(Utc::now)
324    }
325
326    /// Return the worker id for this consumer.
327    pub fn worker_id(&self) -> i64 {
328        self.worker_record.id
329    }
330
331    pub(crate) fn store(&self) -> &AnyStore {
332        &self.store
333    }
334
335    /// Return the worker record for this consumer.
336    pub fn worker_record(&self) -> &WorkerRecord {
337        &self.worker_record
338    }
339
340    /// Fetch the current worker status.
341    pub async fn status(&self) -> crate::error::Result<WorkerStatus> {
342        self.store.workers().get_status(self.worker_record.id).await
343    }
344
345    /// Suspend this worker.
346    pub async fn suspend(&self) -> crate::error::Result<()> {
347        self.store.workers().suspend(self.worker_record.id).await
348    }
349
350    /// Mark this consumer as polling.
351    pub async fn poll(&self) -> crate::error::Result<()> {
352        self.store.workers().poll(self.worker_record.id).await
353    }
354
355    /// Interrupt this consumer's poll loop.
356    pub async fn interrupt(&self) -> crate::error::Result<()> {
357        self.store.workers().interrupt(self.worker_record.id).await
358    }
359
360    /// Resume this worker.
361    pub async fn resume(&self) -> crate::error::Result<()> {
362        self.store.workers().resume(self.worker_record.id).await
363    }
364
365    /// Shut down this worker if no messages are pending.
366    pub async fn shutdown(&self) -> crate::error::Result<()> {
367        let pending = self
368            .store
369            .messages()
370            .count_pending_for_queue_and_worker(self.queue_info.id, self.worker_record.id)
371            .await?;
372
373        if pending > 0 {
374            return Err(crate::error::Error::WorkerHasPendingMessages {
375                count: pending as u64,
376                reason: format!("Consumer has {} pending messages", pending),
377            });
378        }
379        self.store.workers().shutdown(self.worker_record.id).await
380    }
381
382    /// Record a heartbeat for this worker.
383    pub async fn heartbeat(&self) -> crate::error::Result<()> {
384        self.store.workers().heartbeat(self.worker_record.id).await
385    }
386
387    /// Check if the worker heartbeat is within the given age.
388    pub async fn is_healthy(&self, max_age: Duration) -> crate::error::Result<bool> {
389        self.store
390            .workers()
391            .is_healthy(self.worker_record.id, max_age)
392            .await
393    }
394
395    /// Dequeue a single message.
396    pub async fn dequeue(&self) -> crate::error::Result<Vec<QueueMessage>> {
397        self.dequeue_many(1).await
398    }
399
400    /// Dequeue multiple messages.
401    pub async fn dequeue_many(&self, limit: usize) -> crate::error::Result<Vec<QueueMessage>> {
402        self.dequeue_many_with_delay(limit, 30).await
403    }
404
405    /// Dequeue a message with a custom visibility timeout.
406    pub async fn dequeue_delay(&self, vt: u32) -> crate::error::Result<Vec<QueueMessage>> {
407        self.dequeue_many_with_delay(1, vt).await
408    }
409
410    /// Dequeue multiple messages with a visibility timeout.
411    pub async fn dequeue_many_with_delay(
412        &self,
413        limit: usize,
414        vt: u32,
415    ) -> crate::error::Result<Vec<QueueMessage>> {
416        self.dequeue_at(limit, vt, self.current_time()).await
417    }
418
419    /// Dequeue messages using an explicit time reference.
420    pub async fn dequeue_at(
421        &self,
422        limit: usize,
423        vt: u32,
424        now: chrono::DateTime<chrono::Utc>,
425    ) -> crate::error::Result<Vec<QueueMessage>> {
426        self.store
427            .messages()
428            .dequeue_at(
429                self.queue_info.id,
430                limit,
431                vt,
432                self.worker_record.id,
433                now,
434                self.store.config().max_read_ct,
435            )
436            .await
437    }
438
439    /// Extend the visibility timeout for a message.
440    pub async fn extend_vt(&self, message_id: i64, seconds: u32) -> crate::error::Result<bool> {
441        let count = self
442            .store
443            .messages()
444            .extend_visibility(message_id, self.worker_record.id, seconds)
445            .await?;
446        Ok(count > 0)
447    }
448
449    /// Delete a message owned by this consumer.
450    pub async fn delete(&self, message_id: i64) -> crate::error::Result<bool> {
451        let count = self
452            .store
453            .messages()
454            .delete_owned(message_id, self.worker_record.id)
455            .await?;
456        Ok(count > 0)
457    }
458
459    /// Delete multiple messages owned by this consumer.
460    pub async fn delete_many(&self, message_ids: Vec<i64>) -> crate::error::Result<Vec<bool>> {
461        self.store
462            .messages()
463            .delete_many_owned(&message_ids, self.worker_record.id)
464            .await
465    }
466
467    /// Archive a message owned by this consumer.
468    pub async fn archive(&self, msg_id: i64) -> crate::error::Result<Option<QueueMessage>> {
469        self.store
470            .messages()
471            .archive(msg_id, self.worker_record.id)
472            .await
473    }
474
475    /// Archive multiple messages owned by this consumer.
476    pub async fn archive_many(&self, msg_ids: Vec<i64>) -> crate::error::Result<Vec<bool>> {
477        self.store
478            .messages()
479            .archive_many(&msg_ids, self.worker_record.id)
480            .await
481    }
482
483    /// Release messages back to the queue.
484    pub async fn release_messages(&self, message_ids: &[i64]) -> crate::error::Result<u64> {
485        let res = self
486            .store
487            .messages()
488            .release_messages_by_ids(message_ids, self.worker_record.id)
489            .await?;
490        Ok(res.iter().filter(|&&x| x).count() as u64)
491    }
492
493    /// Release a message with a custom visibility time.
494    pub async fn release_with_visibility(
495        &self,
496        message_id: i64,
497        visible_at: chrono::DateTime<chrono::Utc>,
498    ) -> crate::error::Result<bool> {
499        let count = self
500            .store
501            .messages()
502            .release_with_visibility(message_id, self.worker_record.id, visible_at)
503            .await?;
504        Ok(count > 0)
505    }
506}
507
508/// Workflow execution run handle.
509///
510/// Use this to acquire steps and complete or pause a workflow run.
511#[derive(Clone, Debug)]
512pub struct Run {
513    store: AnyStore,
514    record: RunRecord,
515    current_time: Option<DateTime<Utc>>,
516}
517
518impl Run {
519    /// Create a run handle from a run record.
520    pub fn new(store: AnyStore, record: RunRecord) -> Self {
521        Self {
522            store,
523            record,
524            current_time: None,
525        }
526    }
527
528    /// Set the current time used for step acquisition.
529    pub fn with_time(mut self, time: DateTime<Utc>) -> Self {
530        self.current_time = Some(time);
531        self
532    }
533
534    /// Return the current time override, if any.
535    pub fn current_time(&self) -> Option<DateTime<Utc>> {
536        self.current_time
537    }
538
539    /// Return the run id.
540    pub fn id(&self) -> i64 {
541        self.record.id
542    }
543
544    /// Return the run record.
545    pub fn record(&self) -> &RunRecord {
546        &self.record
547    }
548
549    fn with_record(&self, record: RunRecord) -> Self {
550        Self {
551            store: self.store.clone(),
552            record,
553            current_time: self.current_time,
554        }
555    }
556
557    /// Refresh the run record from storage.
558    pub async fn refresh(&self) -> crate::error::Result<Run> {
559        let record = self.store.workflow_runs().get(self.record.id).await?;
560        Ok(self.with_record(record))
561    }
562
563    /// Mark the run as started.
564    pub async fn start(&self) -> crate::error::Result<Run> {
565        let record = self.store.workflow_runs().start_run(self.record.id).await?;
566        Ok(self.with_record(record))
567    }
568
569    /// Complete the run with output.
570    pub async fn complete(&self, output: serde_json::Value) -> crate::error::Result<Run> {
571        let record = self
572            .store
573            .workflow_runs()
574            .complete_run(self.record.id, output)
575            .await?;
576        Ok(self.with_record(record))
577    }
578
579    /// Pause the run until a resume time.
580    pub async fn pause(
581        &self,
582        message: String,
583        resume_after: std::time::Duration,
584    ) -> crate::error::Result<Run> {
585        let record = self
586            .store
587            .workflow_runs()
588            .pause_run(self.record.id, message, resume_after)
589            .await?;
590        Ok(self.with_record(record))
591    }
592
593    /// Fail the run with a structured error payload.
594    pub async fn fail_with_json(&self, error: serde_json::Value) -> crate::error::Result<Run> {
595        let record = self
596            .store
597            .workflow_runs()
598            .fail_run(self.record.id, error)
599            .await?;
600        Ok(self.with_record(record))
601    }
602
603    /// Complete the run with a serializable payload.
604    pub async fn success<T: serde::Serialize + Send + Sync>(
605        &self,
606        output: &T,
607    ) -> crate::error::Result<Run> {
608        let value = serde_json::to_value(output).map_err(crate::error::Error::Serialization)?;
609        self.complete(value).await
610    }
611
612    /// Fail the run with a serializable payload.
613    pub async fn fail<T: serde::Serialize + Send + Sync>(
614        &self,
615        error: &T,
616    ) -> crate::error::Result<Run> {
617        let value = serde_json::to_value(error).map_err(crate::error::Error::Serialization)?;
618        self.fail_with_json(value).await
619    }
620
621    /// Acquire a step for execution or replay.
622    pub async fn acquire_step(
623        &self,
624        step_name: &str,
625        current_time: chrono::DateTime<chrono::Utc>,
626    ) -> crate::error::Result<Step> {
627        let step_name_string = step_name.to_string();
628        let row = self
629            .store
630            .workflow_steps()
631            .execute(
632                crate::store::query::QueryBuilder::new(
633                    self.store.workflow_steps().sql_acquire_step(),
634                )
635                .bind_i64(self.record.id)
636                .bind_string(step_name_string.clone()),
637            )
638            .await
639            .map_err(|e| crate::error::Error::QueryFailed {
640                query: "SQL_ACQUIRE_STEP".into(),
641                source: Box::new(e),
642                context: format!(
643                    "Failed to acquire step {} for run {}",
644                    step_name_string, self.record.id
645                ),
646            })?;
647
648        let mut status = row.status;
649        let retry_count = row.retry_count;
650        let retry_at = row.retry_at;
651
652        if status == crate::types::WorkflowStatus::Error {
653            if let Some(retry_at) = retry_at {
654                if current_time < retry_at {
655                    return Err(crate::error::Error::StepNotReady {
656                        retry_at,
657                        retry_count: retry_count as u32,
658                    });
659                }
660
661                self.store
662                    .workflow_steps()
663                    .execute(
664                        crate::store::query::QueryBuilder::new(
665                            self.store.workflow_steps().sql_clear_retry(),
666                        )
667                        .bind_i64(row.id),
668                    )
669                    .await
670                    .map(|_| ())
671                    .map_err(|e| crate::error::Error::QueryFailed {
672                        query: "SQL_CLEAR_RETRY".into(),
673                        source: Box::new(e),
674                        context: format!("Failed to clear retry_at for step {}", row.id),
675                    })?;
676
677                status = crate::types::WorkflowStatus::Running;
678            } else {
679                let error_val = row.error.unwrap_or_else(|| {
680                    serde_json::json!({
681                        "is_transient": false,
682                        "message": "Unknown error"
683                    })
684                });
685
686                return Err(crate::error::Error::RetriesExhausted {
687                    error: error_val,
688                    attempts: retry_count as u32,
689                });
690            }
691        }
692
693        let record = StepRecord { status, ..row };
694        Ok(Step::new(self.store.clone(), record))
695    }
696
697    /// Complete a step by name.
698    pub async fn complete_step(
699        &self,
700        step_name: &str,
701        output: serde_json::Value,
702    ) -> crate::error::Result<()> {
703        let current_time = self.current_time().unwrap_or_else(chrono::Utc::now);
704        let mut step = self.acquire_step(step_name, current_time).await?;
705        step.complete(output).await
706    }
707
708    /// Fail a step by name with a structured error payload.
709    pub async fn fail_step(
710        &self,
711        step_name: &str,
712        error: serde_json::Value,
713        current_time: chrono::DateTime<chrono::Utc>,
714    ) -> crate::error::Result<()> {
715        let mut step = self.acquire_step(step_name, current_time).await?;
716        step.fail_with_json(error, current_time).await
717    }
718}
719
720/// Workflow step execution handle.
721#[derive(Clone, Debug)]
722pub struct Step {
723    store: AnyStore,
724    record: StepRecord,
725    current_time: Option<DateTime<Utc>>,
726}
727
728impl Step {
729    /// Create a step handle from a step record.
730    pub fn new(store: AnyStore, record: StepRecord) -> Self {
731        Self {
732            store,
733            record,
734            current_time: None,
735        }
736    }
737
738    /// Set the current time used for retry calculations.
739    pub fn with_time(mut self, time: DateTime<Utc>) -> Self {
740        self.current_time = Some(time);
741        self
742    }
743
744    /// Return the step id.
745    pub fn id(&self) -> i64 {
746        self.record.id
747    }
748
749    /// Return the step record.
750    pub fn record(&self) -> &StepRecord {
751        &self.record
752    }
753
754    /// Return the step status.
755    pub fn status(&self) -> crate::types::WorkflowStatus {
756        self.record.status
757    }
758
759    /// Return the step output, if available.
760    pub fn output(&self) -> Option<&serde_json::Value> {
761        self.record.output.as_ref()
762    }
763
764    /// Complete the step with an output payload.
765    pub async fn complete(&mut self, output: serde_json::Value) -> crate::error::Result<()> {
766        let query =
767            crate::store::query::QueryBuilder::new(self.store.workflow_steps().sql_complete_step())
768                .bind_i64(self.record.id)
769                .bind_json(output);
770        self.store.workflow_steps().execute(query).await.map(|_| ())
771    }
772
773    /// Fail the step with a structured error payload.
774    pub async fn fail_with_json(
775        &mut self,
776        error: serde_json::Value,
777        current_time: DateTime<Utc>,
778    ) -> crate::error::Result<()> {
779        let error_record = if error.get("is_transient").is_some() {
780            error
781        } else {
782            serde_json::json!({
783                "is_transient": false,
784                "code": "NON_RETRYABLE",
785                "message": error.to_string(),
786            })
787        };
788
789        let is_transient = error_record
790            .get("is_transient")
791            .and_then(|v| v.as_bool())
792            .unwrap_or(false);
793
794        if is_transient {
795            let policy = crate::StepRetryPolicy::default();
796            if !policy.should_retry(self.record.retry_count as u32) {
797                let query = crate::store::query::QueryBuilder::new(
798                    self.store.workflow_steps().sql_fail_step(),
799                )
800                .bind_i64(self.record.id)
801                .bind_json(error_record)
802                .bind_datetime(None)
803                .bind_i32(self.record.retry_count);
804                return self.store.workflow_steps().execute(query).await.map(|_| ());
805            }
806
807            let delay_seconds =
808                policy.extract_retry_delay(&error_record, self.record.retry_count.max(0) as u32);
809            let delay_i64: i64 = delay_seconds.into();
810
811            let retry_at = current_time + chrono::Duration::seconds(delay_i64);
812            let new_retry_count = self.record.retry_count + 1;
813
814            let query =
815                crate::store::query::QueryBuilder::new(self.store.workflow_steps().sql_fail_step())
816                    .bind_i64(self.record.id)
817                    .bind_json(error_record)
818                    .bind_datetime(Some(retry_at))
819                    .bind_i32(new_retry_count);
820            return self.store.workflow_steps().execute(query).await.map(|_| ());
821        }
822
823        let query =
824            crate::store::query::QueryBuilder::new(self.store.workflow_steps().sql_fail_step())
825                .bind_i64(self.record.id)
826                .bind_json(error_record)
827                .bind_datetime(None)
828                .bind_i32(self.record.retry_count);
829        self.store.workflow_steps().execute(query).await.map(|_| ())
830    }
831
832    /// Complete the step with a serializable payload.
833    pub async fn success<T: serde::Serialize + Send + Sync>(
834        &mut self,
835        output: &T,
836    ) -> crate::error::Result<()> {
837        let value = serde_json::to_value(output).map_err(crate::error::Error::Serialization)?;
838        self.complete(value).await
839    }
840
841    /// Fail the step with a serializable payload.
842    pub async fn fail<T: serde::Serialize + Send + Sync>(
843        &mut self,
844        error: &T,
845    ) -> crate::error::Result<()> {
846        let value = serde_json::to_value(error).map_err(crate::error::Error::Serialization)?;
847        self.fail_with_json(value, chrono::Utc::now()).await
848    }
849}