1use crate::rate_limit::RateLimitStatus;
4use crate::store::{AnyStore, Store};
5pub use crate::types::{
6 QueueMessage, QueueRecord, RunRecord, StepRecord, WorkerRecord, WorkerStatus, WorkflowRecord,
7};
8use crate::validation::{PayloadValidator, ValidationConfig};
9use async_trait::async_trait;
10use chrono::{DateTime, Duration, Utc};
11use serde_json::Value;
12
13#[async_trait]
15pub trait Worker: Send + Sync {
16 fn worker_record(&self) -> &WorkerRecord;
18
19 fn worker_id(&self) -> i64 {
21 self.worker_record().id
22 }
23
24 async fn status(&self) -> crate::error::Result<WorkerStatus>;
25 async fn suspend(&self) -> crate::error::Result<()>;
26 async fn resume(&self) -> crate::error::Result<()>;
27 async fn shutdown(&self) -> crate::error::Result<()>;
28 async fn heartbeat(&self) -> crate::error::Result<()>;
29 async fn is_healthy(&self, max_age: Duration) -> crate::error::Result<bool>;
30}
31
32#[async_trait]
34pub trait Admin: Worker {
35 async fn verify(&self) -> crate::error::Result<()>;
37
38 async fn delete_queue(&self, queue_info: &QueueRecord) -> crate::error::Result<()>;
40
41 async fn purge_queue(&self, name: &str) -> crate::error::Result<()>;
43
44 async fn dlq(&self) -> crate::error::Result<Vec<i64>>;
46
47 async fn queue_metrics(&self, name: &str) -> crate::error::Result<crate::stats::QueueMetrics>;
49
50 async fn all_queues_metrics(&self) -> crate::error::Result<Vec<crate::stats::QueueMetrics>>;
52
53 async fn system_stats(&self) -> crate::error::Result<crate::stats::SystemStats>;
55
56 async fn worker_health_stats(
58 &self,
59 heartbeat_timeout: Duration,
60 group_by_queue: bool,
61 ) -> crate::error::Result<Vec<crate::stats::WorkerHealthStats>>;
62
63 async fn worker_stats(
65 &self,
66 queue_name: &str,
67 ) -> crate::error::Result<crate::stats::WorkerStats>;
68
69 async fn delete_worker(&self, worker_id: i64) -> crate::error::Result<u64>;
71
72 async fn get_worker_messages(&self, worker_id: i64) -> crate::error::Result<Vec<QueueMessage>>;
74
75 async fn reclaim_messages(
77 &self,
78 queue_id: i64,
79 older_than: Option<Duration>,
80 ) -> crate::error::Result<u64>;
81
82 async fn purge_old_workers(&self, older_than: chrono::Duration) -> crate::error::Result<u64>;
84
85 async fn release_worker_messages(&self, worker_id: i64) -> crate::error::Result<u64>;
87}
88
89#[derive(Clone, Debug)]
91pub struct Producer {
92 store: AnyStore,
93 queue_info: QueueRecord,
94 worker_record: WorkerRecord,
95 validator: PayloadValidator,
96 current_time: Option<DateTime<Utc>>,
97}
98
99impl Producer {
100 pub fn new(
102 store: AnyStore,
103 queue_info: QueueRecord,
104 worker_record: WorkerRecord,
105 validation_config: ValidationConfig,
106 ) -> Self {
107 Self {
108 store,
109 queue_info,
110 worker_record,
111 validator: PayloadValidator::new(validation_config),
112 current_time: None,
113 }
114 }
115
116 pub fn with_time(mut self, time: DateTime<Utc>) -> Self {
118 self.current_time = Some(time);
119 self
120 }
121
122 pub fn current_time(&self) -> DateTime<Utc> {
124 self.current_time.unwrap_or_else(Utc::now)
125 }
126
127 pub fn worker_id(&self) -> i64 {
129 self.worker_record.id
130 }
131
132 pub fn worker_record(&self) -> &WorkerRecord {
134 &self.worker_record
135 }
136
137 pub async fn status(&self) -> crate::error::Result<WorkerStatus> {
139 self.store.workers().get_status(self.worker_record.id).await
140 }
141
142 pub async fn suspend(&self) -> crate::error::Result<()> {
144 self.store.workers().suspend(self.worker_record.id).await
145 }
146
147 pub async fn resume(&self) -> crate::error::Result<()> {
149 self.store.workers().resume(self.worker_record.id).await
150 }
151
152 pub async fn shutdown(&self) -> crate::error::Result<()> {
154 self.store.workers().shutdown(self.worker_record.id).await
155 }
156
157 pub async fn heartbeat(&self) -> crate::error::Result<()> {
159 self.store.workers().heartbeat(self.worker_record.id).await
160 }
161
162 pub async fn is_healthy(&self, max_age: Duration) -> crate::error::Result<bool> {
164 self.store
165 .workers()
166 .is_healthy(self.worker_record.id, max_age)
167 .await
168 }
169
170 pub async fn get_message_by_id(&self, msg_id: i64) -> crate::error::Result<QueueMessage> {
172 self.store.messages().get(msg_id).await
173 }
174
175 pub async fn enqueue(&self, payload: &Value) -> crate::error::Result<QueueMessage> {
177 self.enqueue_delayed(payload, 0).await
178 }
179
180 pub async fn enqueue_delayed(
182 &self,
183 payload: &Value,
184 delay_seconds: u32,
185 ) -> crate::error::Result<QueueMessage> {
186 self.validator.validate(payload)?;
187
188 let now = self.current_time();
189 let vt = now + chrono::Duration::seconds(i64::from(delay_seconds));
190
191 let new_message = crate::types::NewQueueMessage {
192 queue_id: self.queue_info.id,
193 payload: payload.clone(),
194 read_ct: 0,
195 enqueued_at: now,
196 vt,
197 producer_worker_id: Some(self.worker_record.id),
198 consumer_worker_id: None,
199 };
200
201 self.store.messages().insert(new_message).await
202 }
203
204 pub async fn batch_enqueue(
206 &self,
207 payloads: &[Value],
208 ) -> crate::error::Result<Vec<QueueMessage>> {
209 self.batch_enqueue_delayed(payloads, 0).await
210 }
211
212 pub async fn batch_enqueue_delayed(
214 &self,
215 payloads: &[Value],
216 delay_seconds: u32,
217 ) -> crate::error::Result<Vec<QueueMessage>> {
218 self.batch_enqueue_at(payloads, self.current_time(), delay_seconds)
219 .await
220 }
221
222 pub async fn enqueue_at(
224 &self,
225 payload: &Value,
226 now: chrono::DateTime<chrono::Utc>,
227 delay_seconds: u32,
228 ) -> crate::error::Result<QueueMessage> {
229 self.validator.validate(payload)?;
230
231 let vt = now + chrono::Duration::seconds(i64::from(delay_seconds));
232
233 let new_message = crate::types::NewQueueMessage {
234 queue_id: self.queue_info.id,
235 payload: payload.clone(),
236 read_ct: 0,
237 enqueued_at: now,
238 vt,
239 producer_worker_id: Some(self.worker_record.id),
240 consumer_worker_id: None,
241 };
242
243 self.store.messages().insert(new_message).await
244 }
245
246 pub async fn batch_enqueue_at(
248 &self,
249 payloads: &[Value],
250 now: chrono::DateTime<chrono::Utc>,
251 delay_seconds: u32,
252 ) -> crate::error::Result<Vec<QueueMessage>> {
253 self.validator.validate_batch(payloads)?;
254
255 let vt = now + chrono::Duration::seconds(i64::from(delay_seconds));
256
257 let ids = self
258 .store
259 .messages()
260 .batch_insert(
261 self.queue_info.id,
262 payloads,
263 crate::types::BatchInsertParams {
264 read_ct: 0,
265 enqueued_at: now,
266 vt,
267 producer_worker_id: Some(self.worker_record.id),
268 consumer_worker_id: None,
269 },
270 )
271 .await?;
272
273 self.store.messages().get_by_ids(&ids).await
274 }
275
276 pub async fn replay_dlq(
278 &self,
279 archived_msg_id: i64,
280 ) -> crate::error::Result<Option<QueueMessage>> {
281 self.store.messages().replay_dlq(archived_msg_id).await
282 }
283
284 pub fn validation_config(&self) -> &ValidationConfig {
286 self.validator.config()
287 }
288
289 pub fn rate_limit_status(&self) -> Option<RateLimitStatus> {
291 self.validator.rate_limit_status()
292 }
293}
294
295#[derive(Clone, Debug)]
297pub struct Consumer {
298 store: AnyStore,
299 queue_info: QueueRecord,
300 worker_record: WorkerRecord,
301 current_time: Option<DateTime<Utc>>,
302}
303
304impl Consumer {
305 pub fn new(store: AnyStore, queue_info: QueueRecord, worker_record: WorkerRecord) -> Self {
307 Self {
308 store,
309 queue_info,
310 worker_record,
311 current_time: None,
312 }
313 }
314
315 pub fn with_time(mut self, time: DateTime<Utc>) -> Self {
317 self.current_time = Some(time);
318 self
319 }
320
321 pub fn current_time(&self) -> DateTime<Utc> {
323 self.current_time.unwrap_or_else(Utc::now)
324 }
325
326 pub fn worker_id(&self) -> i64 {
328 self.worker_record.id
329 }
330
331 pub(crate) fn store(&self) -> &AnyStore {
332 &self.store
333 }
334
335 pub fn worker_record(&self) -> &WorkerRecord {
337 &self.worker_record
338 }
339
340 pub async fn status(&self) -> crate::error::Result<WorkerStatus> {
342 self.store.workers().get_status(self.worker_record.id).await
343 }
344
345 pub async fn suspend(&self) -> crate::error::Result<()> {
347 self.store.workers().suspend(self.worker_record.id).await
348 }
349
350 pub async fn poll(&self) -> crate::error::Result<()> {
352 self.store.workers().poll(self.worker_record.id).await
353 }
354
355 pub async fn interrupt(&self) -> crate::error::Result<()> {
357 self.store.workers().interrupt(self.worker_record.id).await
358 }
359
360 pub async fn resume(&self) -> crate::error::Result<()> {
362 self.store.workers().resume(self.worker_record.id).await
363 }
364
365 pub async fn shutdown(&self) -> crate::error::Result<()> {
367 let pending = self
368 .store
369 .messages()
370 .count_pending_for_queue_and_worker(self.queue_info.id, self.worker_record.id)
371 .await?;
372
373 if pending > 0 {
374 return Err(crate::error::Error::WorkerHasPendingMessages {
375 count: pending as u64,
376 reason: format!("Consumer has {} pending messages", pending),
377 });
378 }
379 self.store.workers().shutdown(self.worker_record.id).await
380 }
381
382 pub async fn heartbeat(&self) -> crate::error::Result<()> {
384 self.store.workers().heartbeat(self.worker_record.id).await
385 }
386
387 pub async fn is_healthy(&self, max_age: Duration) -> crate::error::Result<bool> {
389 self.store
390 .workers()
391 .is_healthy(self.worker_record.id, max_age)
392 .await
393 }
394
395 pub async fn dequeue(&self) -> crate::error::Result<Vec<QueueMessage>> {
397 self.dequeue_many(1).await
398 }
399
400 pub async fn dequeue_many(&self, limit: usize) -> crate::error::Result<Vec<QueueMessage>> {
402 self.dequeue_many_with_delay(limit, 30).await
403 }
404
405 pub async fn dequeue_delay(&self, vt: u32) -> crate::error::Result<Vec<QueueMessage>> {
407 self.dequeue_many_with_delay(1, vt).await
408 }
409
410 pub async fn dequeue_many_with_delay(
412 &self,
413 limit: usize,
414 vt: u32,
415 ) -> crate::error::Result<Vec<QueueMessage>> {
416 self.dequeue_at(limit, vt, self.current_time()).await
417 }
418
419 pub async fn dequeue_at(
421 &self,
422 limit: usize,
423 vt: u32,
424 now: chrono::DateTime<chrono::Utc>,
425 ) -> crate::error::Result<Vec<QueueMessage>> {
426 self.store
427 .messages()
428 .dequeue_at(
429 self.queue_info.id,
430 limit,
431 vt,
432 self.worker_record.id,
433 now,
434 self.store.config().max_read_ct,
435 )
436 .await
437 }
438
439 pub async fn extend_vt(&self, message_id: i64, seconds: u32) -> crate::error::Result<bool> {
441 let count = self
442 .store
443 .messages()
444 .extend_visibility(message_id, self.worker_record.id, seconds)
445 .await?;
446 Ok(count > 0)
447 }
448
449 pub async fn delete(&self, message_id: i64) -> crate::error::Result<bool> {
451 let count = self
452 .store
453 .messages()
454 .delete_owned(message_id, self.worker_record.id)
455 .await?;
456 Ok(count > 0)
457 }
458
459 pub async fn delete_many(&self, message_ids: Vec<i64>) -> crate::error::Result<Vec<bool>> {
461 self.store
462 .messages()
463 .delete_many_owned(&message_ids, self.worker_record.id)
464 .await
465 }
466
467 pub async fn archive(&self, msg_id: i64) -> crate::error::Result<Option<QueueMessage>> {
469 self.store
470 .messages()
471 .archive(msg_id, self.worker_record.id)
472 .await
473 }
474
475 pub async fn archive_many(&self, msg_ids: Vec<i64>) -> crate::error::Result<Vec<bool>> {
477 self.store
478 .messages()
479 .archive_many(&msg_ids, self.worker_record.id)
480 .await
481 }
482
483 pub async fn release_messages(&self, message_ids: &[i64]) -> crate::error::Result<u64> {
485 let res = self
486 .store
487 .messages()
488 .release_messages_by_ids(message_ids, self.worker_record.id)
489 .await?;
490 Ok(res.iter().filter(|&&x| x).count() as u64)
491 }
492
493 pub async fn release_with_visibility(
495 &self,
496 message_id: i64,
497 visible_at: chrono::DateTime<chrono::Utc>,
498 ) -> crate::error::Result<bool> {
499 let count = self
500 .store
501 .messages()
502 .release_with_visibility(message_id, self.worker_record.id, visible_at)
503 .await?;
504 Ok(count > 0)
505 }
506}
507
508#[derive(Clone, Debug)]
512pub struct Run {
513 store: AnyStore,
514 record: RunRecord,
515 current_time: Option<DateTime<Utc>>,
516}
517
518impl Run {
519 pub fn new(store: AnyStore, record: RunRecord) -> Self {
521 Self {
522 store,
523 record,
524 current_time: None,
525 }
526 }
527
528 pub fn with_time(mut self, time: DateTime<Utc>) -> Self {
530 self.current_time = Some(time);
531 self
532 }
533
534 pub fn current_time(&self) -> Option<DateTime<Utc>> {
536 self.current_time
537 }
538
539 pub fn id(&self) -> i64 {
541 self.record.id
542 }
543
544 pub fn record(&self) -> &RunRecord {
546 &self.record
547 }
548
549 fn with_record(&self, record: RunRecord) -> Self {
550 Self {
551 store: self.store.clone(),
552 record,
553 current_time: self.current_time,
554 }
555 }
556
557 pub async fn refresh(&self) -> crate::error::Result<Run> {
559 let record = self.store.workflow_runs().get(self.record.id).await?;
560 Ok(self.with_record(record))
561 }
562
563 pub async fn start(&self) -> crate::error::Result<Run> {
565 let record = self.store.workflow_runs().start_run(self.record.id).await?;
566 Ok(self.with_record(record))
567 }
568
569 pub async fn complete(&self, output: serde_json::Value) -> crate::error::Result<Run> {
571 let record = self
572 .store
573 .workflow_runs()
574 .complete_run(self.record.id, output)
575 .await?;
576 Ok(self.with_record(record))
577 }
578
579 pub async fn pause(
581 &self,
582 message: String,
583 resume_after: std::time::Duration,
584 ) -> crate::error::Result<Run> {
585 let record = self
586 .store
587 .workflow_runs()
588 .pause_run(self.record.id, message, resume_after)
589 .await?;
590 Ok(self.with_record(record))
591 }
592
593 pub async fn fail_with_json(&self, error: serde_json::Value) -> crate::error::Result<Run> {
595 let record = self
596 .store
597 .workflow_runs()
598 .fail_run(self.record.id, error)
599 .await?;
600 Ok(self.with_record(record))
601 }
602
603 pub async fn success<T: serde::Serialize + Send + Sync>(
605 &self,
606 output: &T,
607 ) -> crate::error::Result<Run> {
608 let value = serde_json::to_value(output).map_err(crate::error::Error::Serialization)?;
609 self.complete(value).await
610 }
611
612 pub async fn fail<T: serde::Serialize + Send + Sync>(
614 &self,
615 error: &T,
616 ) -> crate::error::Result<Run> {
617 let value = serde_json::to_value(error).map_err(crate::error::Error::Serialization)?;
618 self.fail_with_json(value).await
619 }
620
621 pub async fn acquire_step(
623 &self,
624 step_name: &str,
625 current_time: chrono::DateTime<chrono::Utc>,
626 ) -> crate::error::Result<Step> {
627 let step_name_string = step_name.to_string();
628 let row = self
629 .store
630 .workflow_steps()
631 .execute(
632 crate::store::query::QueryBuilder::new(
633 self.store.workflow_steps().sql_acquire_step(),
634 )
635 .bind_i64(self.record.id)
636 .bind_string(step_name_string.clone()),
637 )
638 .await
639 .map_err(|e| crate::error::Error::QueryFailed {
640 query: "SQL_ACQUIRE_STEP".into(),
641 source: Box::new(e),
642 context: format!(
643 "Failed to acquire step {} for run {}",
644 step_name_string, self.record.id
645 ),
646 })?;
647
648 let mut status = row.status;
649 let retry_count = row.retry_count;
650 let retry_at = row.retry_at;
651
652 if status == crate::types::WorkflowStatus::Error {
653 if let Some(retry_at) = retry_at {
654 if current_time < retry_at {
655 return Err(crate::error::Error::StepNotReady {
656 retry_at,
657 retry_count: retry_count as u32,
658 });
659 }
660
661 self.store
662 .workflow_steps()
663 .execute(
664 crate::store::query::QueryBuilder::new(
665 self.store.workflow_steps().sql_clear_retry(),
666 )
667 .bind_i64(row.id),
668 )
669 .await
670 .map(|_| ())
671 .map_err(|e| crate::error::Error::QueryFailed {
672 query: "SQL_CLEAR_RETRY".into(),
673 source: Box::new(e),
674 context: format!("Failed to clear retry_at for step {}", row.id),
675 })?;
676
677 status = crate::types::WorkflowStatus::Running;
678 } else {
679 let error_val = row.error.unwrap_or_else(|| {
680 serde_json::json!({
681 "is_transient": false,
682 "message": "Unknown error"
683 })
684 });
685
686 return Err(crate::error::Error::RetriesExhausted {
687 error: error_val,
688 attempts: retry_count as u32,
689 });
690 }
691 }
692
693 let record = StepRecord { status, ..row };
694 Ok(Step::new(self.store.clone(), record))
695 }
696
697 pub async fn complete_step(
699 &self,
700 step_name: &str,
701 output: serde_json::Value,
702 ) -> crate::error::Result<()> {
703 let current_time = self.current_time().unwrap_or_else(chrono::Utc::now);
704 let mut step = self.acquire_step(step_name, current_time).await?;
705 step.complete(output).await
706 }
707
708 pub async fn fail_step(
710 &self,
711 step_name: &str,
712 error: serde_json::Value,
713 current_time: chrono::DateTime<chrono::Utc>,
714 ) -> crate::error::Result<()> {
715 let mut step = self.acquire_step(step_name, current_time).await?;
716 step.fail_with_json(error, current_time).await
717 }
718}
719
720#[derive(Clone, Debug)]
722pub struct Step {
723 store: AnyStore,
724 record: StepRecord,
725 current_time: Option<DateTime<Utc>>,
726}
727
728impl Step {
729 pub fn new(store: AnyStore, record: StepRecord) -> Self {
731 Self {
732 store,
733 record,
734 current_time: None,
735 }
736 }
737
738 pub fn with_time(mut self, time: DateTime<Utc>) -> Self {
740 self.current_time = Some(time);
741 self
742 }
743
744 pub fn id(&self) -> i64 {
746 self.record.id
747 }
748
749 pub fn record(&self) -> &StepRecord {
751 &self.record
752 }
753
754 pub fn status(&self) -> crate::types::WorkflowStatus {
756 self.record.status
757 }
758
759 pub fn output(&self) -> Option<&serde_json::Value> {
761 self.record.output.as_ref()
762 }
763
764 pub async fn complete(&mut self, output: serde_json::Value) -> crate::error::Result<()> {
766 let query =
767 crate::store::query::QueryBuilder::new(self.store.workflow_steps().sql_complete_step())
768 .bind_i64(self.record.id)
769 .bind_json(output);
770 self.store.workflow_steps().execute(query).await.map(|_| ())
771 }
772
773 pub async fn fail_with_json(
775 &mut self,
776 error: serde_json::Value,
777 current_time: DateTime<Utc>,
778 ) -> crate::error::Result<()> {
779 let error_record = if error.get("is_transient").is_some() {
780 error
781 } else {
782 serde_json::json!({
783 "is_transient": false,
784 "code": "NON_RETRYABLE",
785 "message": error.to_string(),
786 })
787 };
788
789 let is_transient = error_record
790 .get("is_transient")
791 .and_then(|v| v.as_bool())
792 .unwrap_or(false);
793
794 if is_transient {
795 let policy = crate::StepRetryPolicy::default();
796 if !policy.should_retry(self.record.retry_count as u32) {
797 let query = crate::store::query::QueryBuilder::new(
798 self.store.workflow_steps().sql_fail_step(),
799 )
800 .bind_i64(self.record.id)
801 .bind_json(error_record)
802 .bind_datetime(None)
803 .bind_i32(self.record.retry_count);
804 return self.store.workflow_steps().execute(query).await.map(|_| ());
805 }
806
807 let delay_seconds =
808 policy.extract_retry_delay(&error_record, self.record.retry_count.max(0) as u32);
809 let delay_i64: i64 = delay_seconds.into();
810
811 let retry_at = current_time + chrono::Duration::seconds(delay_i64);
812 let new_retry_count = self.record.retry_count + 1;
813
814 let query =
815 crate::store::query::QueryBuilder::new(self.store.workflow_steps().sql_fail_step())
816 .bind_i64(self.record.id)
817 .bind_json(error_record)
818 .bind_datetime(Some(retry_at))
819 .bind_i32(new_retry_count);
820 return self.store.workflow_steps().execute(query).await.map(|_| ());
821 }
822
823 let query =
824 crate::store::query::QueryBuilder::new(self.store.workflow_steps().sql_fail_step())
825 .bind_i64(self.record.id)
826 .bind_json(error_record)
827 .bind_datetime(None)
828 .bind_i32(self.record.retry_count);
829 self.store.workflow_steps().execute(query).await.map(|_| ())
830 }
831
832 pub async fn success<T: serde::Serialize + Send + Sync>(
834 &mut self,
835 output: &T,
836 ) -> crate::error::Result<()> {
837 let value = serde_json::to_value(output).map_err(crate::error::Error::Serialization)?;
838 self.complete(value).await
839 }
840
841 pub async fn fail<T: serde::Serialize + Send + Sync>(
843 &mut self,
844 error: &T,
845 ) -> crate::error::Result<()> {
846 let value = serde_json::to_value(error).map_err(crate::error::Error::Serialization)?;
847 self.fail_with_json(value, chrono::Utc::now()).await
848 }
849}