backfill/lib.rs
1//! # Backfill
2//!
3//! A high-performance, PostgreSQL-backed async priority queue system for Rust
4//! applications. Built on top of [Graphile Worker](https://github.com/graphile/worker) for reliability and performance.
5//!
6//! This library provides:
7//! - **Durable job queues** with PostgreSQL backend
8//! - **Priority-based scheduling** with configurable priority levels
9//! - **Parallel execution** by default - jobs run concurrently across all
10//! workers
11//! - **Serial queues** when you need ordering or rate limiting
12//! - **Exponential backoff** with jitter to prevent thundering herds
13//! - **Flexible retry policies** (fast, aggressive, conservative, or custom)
14//! - **Dead letter queue** handling for failed jobs
15//! - **Type-safe job handlers** using Rust's type system
16//! - **Low-latency execution** via PostgreSQL LISTEN/NOTIFY
17//!
18//! ## Quick Start
19//!
20//! ```rust,no_run
21//! use backfill::{BackfillClient, enqueue_critical, enqueue_fast_with_retries, JobSpec, RetryPolicy};
22//! use serde::{Deserialize, Serialize};
23//!
24//! #[derive(Serialize, Deserialize)]
25//! struct SendEmailJob {
26//! recipient: String,
27//! subject: String,
28//! body: String,
29//! }
30//!
31//! #[tokio::main]
32//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
33//! let database_url = "postgresql://user:password@localhost/mydb";
34//! let client = BackfillClient::new(database_url).await?;
35//!
36//! // Enqueue a critical job with aggressive retry policy (12 attempts)
37//! enqueue_critical(
38//! &client,
39//! "send_email",
40//! &SendEmailJob {
41//! recipient: "user@example.com".to_string(),
42//! subject: "Welcome!".to_string(),
43//! body: "Thanks for signing up.".to_string(),
44//! },
45//! Some("welcome_email_123".to_string()),
46//! ).await?;
47//!
48//! // Or use fast retries for quick turnaround (3 attempts, 100ms-30s)
49//! enqueue_fast_with_retries(
50//! &client,
51//! "send_notification",
52//! &SendEmailJob {
53//! recipient: "admin@example.com".to_string(),
54//! subject: "Notification".to_string(),
55//! body: "Quick notification with fast retries.".to_string(),
56//! },
57//! Some("quick_notification".to_string()),
58//! ).await?;
59//!
60//! Ok(())
61//! }
62//! ```
63
64use chrono::{DateTime, Utc};
65// Lifecycle hooks for plugins - new Plugin API with event registration
66pub use graphile_worker::{
67 // Event types (for hooks.on() registration)
68 AfterJobRun,
69 // Context types (for hook handlers)
70 AfterJobRunContext,
71 BeforeJobRun,
72 BeforeJobRunContext,
73 BeforeJobSchedule,
74 BeforeJobScheduleContext,
75 CronJobScheduled,
76 CronJobScheduledContext,
77 CronTick,
78 CronTickContext,
79 // Plugin trait and registry
80 HookRegistry,
81 // Result types
82 HookResult,
83 // Job type for accessing job data
84 Job,
85 JobComplete,
86 JobCompleteContext,
87 JobFail,
88 JobFailContext,
89 JobFetch,
90 JobFetchContext,
91 JobPermanentlyFail,
92 JobPermanentlyFailContext,
93 JobScheduleResult,
94 JobStart,
95 JobStartContext,
96 Plugin,
97 // Other
98 ShutdownReason,
99 WorkerInit,
100 WorkerInitContext,
101 WorkerShutdown,
102 WorkerShutdownContext,
103 WorkerStart,
104 WorkerStartContext,
105};
106// Re-export commonly used types from graphile_worker
107pub use graphile_worker::{IntoTaskHandlerResult, JobKeyMode, TaskHandler, WorkerContext, WorkerOptions};
108use graphile_worker::{JobSpec as GraphileJobSpec, JobSpecBuilder};
109// Re-export crontab parsing types for advanced usage
110pub use graphile_worker_crontab_parser::{CrontabParseError, parse_crontab};
111use serde::Serialize;
112
113mod client;
114mod dlq_cleanup_plugin;
115mod errors;
116mod priorities;
117mod retries;
118mod worker;
119
120#[cfg(feature = "axum")]
121pub mod admin;
122
123pub mod metrics;
124
125pub use client::cleanup::{
126 DEFAULT_STALE_JOB_LOCK_TIMEOUT, DEFAULT_STALE_LOCK_CLEANUP_INTERVAL, DEFAULT_STALE_QUEUE_LOCK_TIMEOUT,
127};
128pub use client::*;
129pub use dlq_cleanup_plugin::DlqCleanupPlugin;
130pub use errors::{BackfillError, WorkerError};
131pub use priorities::*;
132pub use retries::*;
133pub use worker::*;
134
135/// Queue configuration for job execution.
136///
137/// By default, jobs execute **in parallel** across all workers. Use
138/// `Queue::Serial` when you need ordering guarantees or mutual exclusion (e.g.,
139/// rate limiting API calls, processing events for a single user in order).
140///
141/// # Parallel Execution (Default)
142///
143/// With `Queue::Parallel`, workers can fetch and execute jobs concurrently.
144/// Priority controls which jobs get picked first, but multiple jobs can run
145/// at the same time.
146///
147/// # Serial Execution
148///
149/// With `Queue::Serial("name")`, only one job with that queue name can execute
150/// at a time across the entire cluster. This is useful for:
151/// - Rate limiting external API calls
152/// - Processing events for a single entity in order
153/// - Mutual exclusion
154///
155/// # Examples
156///
157/// ```rust
158/// use backfill::{Queue, JobSpec, Priority};
159///
160/// // Parallel execution (default) - jobs run concurrently
161/// let spec = JobSpec {
162/// priority: Priority::FAST_DEFAULT,
163/// ..Default::default()
164/// };
165///
166/// // Serial execution - one job at a time per user
167/// let spec = JobSpec {
168/// queue: Queue::serial_for("user", 123),
169/// ..Default::default()
170/// };
171/// ```
172#[derive(Debug, Clone, PartialEq, Eq, Hash, Default)]
173pub enum Queue {
174 /// Jobs execute in parallel (DEFAULT). Priority controls fetch order,
175 /// but multiple jobs can run simultaneously.
176 #[default]
177 Parallel,
178
179 /// Jobs in this queue execute one at a time across all workers.
180 /// Use for rate limiting, ordering, or mutual exclusion.
181 Serial(String),
182}
183
184impl Queue {
185 /// Create a serial queue for a specific entity.
186 ///
187 /// This is useful for ensuring jobs related to the same entity
188 /// (user, order, etc.) are processed one at a time.
189 ///
190 /// # Example
191 /// ```rust
192 /// use backfill::Queue;
193 ///
194 /// // Process user events serially
195 /// let queue = Queue::serial_for("user", 123); // → "user:123"
196 ///
197 /// // Process order updates serially
198 /// let queue = Queue::serial_for("order", "abc-def"); // → "order:abc-def"
199 /// ```
200 pub fn serial_for(entity: &str, id: impl std::fmt::Display) -> Self {
201 Queue::Serial(format!("{}:{}", entity, id))
202 }
203
204 /// Create a named serial queue.
205 ///
206 /// Use this when you need a fixed queue name for rate limiting
207 /// or other serialization purposes.
208 ///
209 /// # Example
210 /// ```rust
211 /// use backfill::Queue;
212 ///
213 /// // Rate limit calls to external API
214 /// let queue = Queue::serial("external-api");
215 /// ```
216 pub fn serial(name: impl Into<String>) -> Self {
217 Queue::Serial(name.into())
218 }
219
220 /// Returns the queue name for serial queues, or None for parallel.
221 ///
222 /// This is used internally when building the GraphileWorker job spec.
223 pub fn name(&self) -> Option<&str> {
224 match self {
225 Queue::Parallel => None,
226 Queue::Serial(name) => Some(name),
227 }
228 }
229
230 /// Returns a string representation for logging/metrics.
231 ///
232 /// Returns "parallel" for parallel queues, or the queue name for serial
233 /// queues.
234 pub fn as_str(&self) -> &str {
235 match self {
236 Queue::Parallel => "parallel",
237 Queue::Serial(name) => name,
238 }
239 }
240
241 /// Returns true if this is a parallel (non-serialized) queue.
242 pub fn is_parallel(&self) -> bool {
243 matches!(self, Queue::Parallel)
244 }
245
246 /// Returns true if this is a serial queue.
247 pub fn is_serial(&self) -> bool {
248 matches!(self, Queue::Serial(_))
249 }
250
251 /// Get the dead letter queue (serial execution).
252 pub fn dead_letter() -> Self {
253 Queue::Serial("dead_letter".to_string())
254 }
255}
256
257/// Outcome of an enqueue operation.
258///
259/// When enqueueing a job, the result can either be:
260/// - `Enqueued(Job)`: The job was successfully created or updated
261/// - `AlreadyInProgress { job_key }`: A job with this key is currently locked
262/// by a worker
263#[derive(Debug, Clone)]
264pub enum EnqueueOutcome {
265 /// Job was successfully enqueued (either created or updated)
266 Enqueued(Box<Job>),
267 /// A job with this key is already in progress (locked by a worker).
268 /// Contains the job_key that was in conflict.
269 AlreadyInProgress { job_key: String },
270}
271
272impl EnqueueOutcome {
273 /// Returns the Job if the outcome was Enqueued, None otherwise.
274 pub fn job(&self) -> Option<&Job> {
275 match self {
276 EnqueueOutcome::Enqueued(job) => Some(job),
277 EnqueueOutcome::AlreadyInProgress { .. } => None,
278 }
279 }
280
281 /// Consumes self and returns the Job if Enqueued, None otherwise.
282 pub fn into_job(self) -> Option<Job> {
283 match self {
284 EnqueueOutcome::Enqueued(job) => Some(*job),
285 EnqueueOutcome::AlreadyInProgress { .. } => None,
286 }
287 }
288
289 /// Returns the Job if Enqueued, panics with message otherwise.
290 ///
291 /// # Panics
292 /// Panics if the outcome was AlreadyInProgress.
293 pub fn expect(self, msg: &str) -> Job {
294 match self {
295 EnqueueOutcome::Enqueued(job) => *job,
296 EnqueueOutcome::AlreadyInProgress { job_key } => {
297 panic!("{}: job_key '{}' was already in progress", msg, job_key)
298 }
299 }
300 }
301
302 /// Returns the Job if Enqueued, panics otherwise.
303 ///
304 /// # Panics
305 /// Panics if the outcome was AlreadyInProgress.
306 pub fn unwrap(self) -> Job {
307 self.expect("called `EnqueueOutcome::unwrap()` on `AlreadyInProgress` value")
308 }
309
310 /// Returns true if a job was enqueued.
311 pub fn is_enqueued(&self) -> bool {
312 matches!(self, EnqueueOutcome::Enqueued(_))
313 }
314
315 /// Returns true if the job was already in progress.
316 pub fn is_already_in_progress(&self) -> bool {
317 matches!(self, EnqueueOutcome::AlreadyInProgress { .. })
318 }
319
320 /// Returns the job_key if AlreadyInProgress, None otherwise.
321 pub fn conflicting_job_key(&self) -> Option<&str> {
322 match self {
323 EnqueueOutcome::AlreadyInProgress { job_key } => Some(job_key),
324 EnqueueOutcome::Enqueued(_) => None,
325 }
326 }
327}
328
329/// Configuration for job scheduling and execution.
330#[derive(Debug, Clone)]
331pub struct JobSpec {
332 /// When the job should be executed (default: now)
333 pub run_at: Option<DateTime<Utc>>,
334 /// Job priority (lower numbers = higher priority)
335 pub priority: Priority,
336 /// Queue name for the job
337 pub queue: Queue,
338 /// Maximum number of retry attempts (default: 8)
339 pub max_attempts: Option<i32>,
340 /// Exponential backoff retry policy for failed jobs
341 pub retry_policy: Option<RetryPolicy>,
342 /// Unique identifier for job deduplication
343 pub job_key: Option<String>,
344 /// How to handle duplicate job keys
345 pub job_key_mode: JobKeyMode,
346}
347
348impl Default for JobSpec {
349 fn default() -> Self {
350 Self {
351 run_at: None,
352 priority: Priority::default(),
353 queue: Queue::default(),
354 max_attempts: Some(8),
355 retry_policy: None, // Use GraphileWorker's default retry handling
356 job_key: None,
357 job_key_mode: JobKeyMode::Replace,
358 }
359 }
360}
361
362impl JobSpec {
363 /// Create a JobSpec with exponential backoff retry policy
364 pub fn with_retry_policy(mut self, retry_policy: RetryPolicy) -> Self {
365 self.max_attempts = Some(retry_policy.max_attempts);
366 self.retry_policy = Some(retry_policy);
367 self
368 }
369
370 /// Create a JobSpec optimized for fast retries
371 pub fn with_fast_retries(mut self) -> Self {
372 let policy = RetryPolicy::fast();
373 self.max_attempts = Some(policy.max_attempts);
374 self.retry_policy = Some(policy);
375 self
376 }
377
378 /// Create a JobSpec optimized for aggressive retries
379 pub fn with_aggressive_retries(mut self) -> Self {
380 let policy = RetryPolicy::aggressive();
381 self.max_attempts = Some(policy.max_attempts);
382 self.retry_policy = Some(policy);
383 self
384 }
385
386 /// Create a JobSpec optimized for conservative retries
387 pub fn with_conservative_retries(mut self) -> Self {
388 let policy = RetryPolicy::conservative();
389 self.max_attempts = Some(policy.max_attempts);
390 self.retry_policy = Some(policy);
391 self
392 }
393
394 /// Get the effective retry policy (returns default if none specified)
395 pub fn effective_retry_policy(&self) -> RetryPolicy {
396 self.retry_policy.clone().unwrap_or_default()
397 }
398
399 /// Calculate the next retry time for a failed job
400 pub fn calculate_retry_time(&self, attempt: i32, failed_at: DateTime<Utc>) -> Option<DateTime<Utc>> {
401 let policy = self.effective_retry_policy();
402 if policy.should_retry(attempt) {
403 Some(policy.calculate_retry_time(attempt, failed_at))
404 } else {
405 None // No more retries
406 }
407 }
408}
409
410impl From<JobSpec> for GraphileJobSpec {
411 fn from(spec: JobSpec) -> Self {
412 let mut builder = JobSpecBuilder::new();
413
414 if let Some(run_at) = spec.run_at {
415 builder = builder.run_at(run_at);
416 }
417
418 builder = builder.priority(spec.priority.into());
419
420 // Only set queue_name for serial queues - parallel jobs have no queue
421 if let Some(queue_name) = spec.queue.name() {
422 builder = builder.queue_name(queue_name);
423 }
424
425 if let Some(max_attempts) = spec.max_attempts {
426 // Convert i32 to i16, clamping to avoid overflow
427 let max_attempts_i16 = max_attempts.clamp(0, i16::MAX as i32) as i16;
428 builder = builder.max_attempts(max_attempts_i16);
429 }
430
431 if let Some(job_key) = spec.job_key {
432 builder = builder.job_key(&job_key).job_key_mode(spec.job_key_mode);
433 }
434
435 builder.build()
436 }
437}
438
439/// Enqueue a high-priority job for parallel execution.
440///
441/// Jobs are executed in parallel across all workers, with higher priority
442/// jobs (lower numbers) fetched first.
443pub async fn enqueue_fast<T>(
444 client: &BackfillClient,
445 task_identifier: &str,
446 payload: &T,
447 job_key: Option<String>,
448) -> Result<EnqueueOutcome, BackfillError>
449where
450 T: Serialize,
451{
452 let spec = JobSpec {
453 priority: Priority::FAST_DEFAULT,
454 job_key,
455 ..Default::default()
456 };
457
458 client.enqueue(task_identifier, payload, spec).await
459}
460
461/// Enqueue a normal-priority job for parallel execution.
462///
463/// Jobs are executed in parallel across all workers, with higher priority
464/// jobs fetched first. Bulk priority is lower than fast priority.
465pub async fn enqueue_bulk<T>(
466 client: &BackfillClient,
467 task_identifier: &str,
468 payload: &T,
469 job_key: Option<String>,
470) -> Result<EnqueueOutcome, BackfillError>
471where
472 T: Serialize,
473{
474 let spec = JobSpec {
475 priority: Priority::BULK_DEFAULT,
476 job_key,
477 ..Default::default()
478 };
479
480 client.enqueue(task_identifier, payload, spec).await
481}
482
483/// Enqueue an emergency priority job.
484///
485/// Use sparingly - emergency jobs jump ahead of all other jobs.
486pub async fn enqueue_emergency<T>(
487 client: &BackfillClient,
488 task_identifier: &str,
489 payload: &T,
490 job_key: Option<String>,
491) -> Result<EnqueueOutcome, BackfillError>
492where
493 T: Serialize,
494{
495 let spec = JobSpec {
496 priority: Priority::EMERGENCY,
497 run_at: Some(Utc::now()), // Execute immediately
498 job_key,
499 ..Default::default()
500 };
501
502 client.enqueue(task_identifier, payload, spec).await
503}
504
505/// Enqueue a high-priority job with fast exponential backoff retries.
506///
507/// Best for high-priority jobs that need quick retries (3 attempts, 100ms-30s
508/// delays).
509pub async fn enqueue_fast_with_retries<T>(
510 client: &BackfillClient,
511 task_identifier: &str,
512 payload: &T,
513 job_key: Option<String>,
514) -> Result<EnqueueOutcome, BackfillError>
515where
516 T: Serialize,
517{
518 let spec = JobSpec {
519 priority: Priority::FAST_HIGH,
520 job_key,
521 ..Default::default()
522 }
523 .with_fast_retries();
524
525 client.enqueue(task_identifier, payload, spec).await
526}
527
528/// Enqueue a critical job with aggressive exponential backoff retries.
529///
530/// Best for critical jobs that must succeed (12 attempts, up to 4 hour delays).
531pub async fn enqueue_critical<T>(
532 client: &BackfillClient,
533 task_identifier: &str,
534 payload: &T,
535 job_key: Option<String>,
536) -> Result<EnqueueOutcome, BackfillError>
537where
538 T: Serialize,
539{
540 let spec = JobSpec {
541 priority: Priority::FAST_HIGH,
542 job_key,
543 ..Default::default()
544 }
545 .with_aggressive_retries();
546
547 client.enqueue(task_identifier, payload, spec).await
548}
549
550/// Enqueue a bulk job with conservative exponential backoff retries.
551///
552/// Best for background jobs where consistency matters more than speed
553/// (8 attempts, 1 min - 8 hour delays).
554pub async fn enqueue_bulk_with_retries<T>(
555 client: &BackfillClient,
556 task_identifier: &str,
557 payload: &T,
558 job_key: Option<String>,
559) -> Result<EnqueueOutcome, BackfillError>
560where
561 T: Serialize,
562{
563 let spec = JobSpec {
564 priority: Priority::BULK_DEFAULT,
565 job_key,
566 ..Default::default()
567 }
568 .with_conservative_retries();
569
570 client.enqueue(task_identifier, payload, spec).await
571}
572
573/// Enqueue a job for serial execution within a named queue.
574///
575/// Only one job from this queue will execute at a time across all workers.
576/// Use for rate limiting external APIs or ensuring ordered processing.
577///
578/// # Example
579/// ```rust,no_run
580/// use backfill::{BackfillClient, enqueue_serial, Priority};
581///
582/// # async fn example(client: &BackfillClient) -> Result<(), backfill::BackfillError> {
583/// // Rate limit calls to an external API
584/// enqueue_serial(
585/// client,
586/// "call_external_api",
587/// &serde_json::json!({"url": "https://api.example.com"}),
588/// "external-api",
589/// Priority::BULK_DEFAULT,
590/// None,
591/// ).await?;
592///
593/// // Process events for a specific user in order
594/// enqueue_serial(
595/// client,
596/// "process_user_event",
597/// &serde_json::json!({"event": "login"}),
598/// format!("user:{}", 123),
599/// Priority::FAST_DEFAULT,
600/// Some("user-123-login".to_string()),
601/// ).await?;
602/// # Ok(())
603/// # }
604/// ```
605pub async fn enqueue_serial<T>(
606 client: &BackfillClient,
607 task_identifier: &str,
608 payload: &T,
609 queue_name: impl Into<String>,
610 priority: Priority,
611 job_key: Option<String>,
612) -> Result<EnqueueOutcome, BackfillError>
613where
614 T: Serialize,
615{
616 let spec = JobSpec {
617 queue: Queue::Serial(queue_name.into()),
618 priority,
619 job_key,
620 ..Default::default()
621 };
622
623 client.enqueue(task_identifier, payload, spec).await
624}
625
626#[cfg(test)]
627mod tests {
628 use super::*;
629
630 // =========================================================================
631 // Queue Type Tests
632 // =========================================================================
633
634 #[test]
635 fn queue_parallel_is_default() {
636 assert_eq!(Queue::default(), Queue::Parallel);
637 assert!(Queue::Parallel.is_parallel());
638 assert!(!Queue::Parallel.is_serial());
639 assert_eq!(Queue::Parallel.name(), None);
640 assert_eq!(Queue::Parallel.as_str(), "parallel");
641 }
642
643 #[test]
644 fn queue_serial_creation() {
645 let queue = Queue::serial("test");
646 assert!(queue.is_serial());
647 assert!(!queue.is_parallel());
648 assert_eq!(queue.name(), Some("test"));
649 assert_eq!(queue.as_str(), "test");
650
651 let queue = Queue::serial_for("user", 123);
652 assert_eq!(queue.name(), Some("user:123"));
653 assert_eq!(queue.as_str(), "user:123");
654
655 let queue = Queue::serial_for("order", "abc-def");
656 assert_eq!(queue.name(), Some("order:abc-def"));
657
658 let queue = Queue::dead_letter();
659 assert_eq!(queue.name(), Some("dead_letter"));
660 }
661
662 #[test]
663 fn queue_equality() {
664 // Parallel queues are equal
665 assert_eq!(Queue::Parallel, Queue::Parallel);
666
667 // Serial queues with same name are equal
668 assert_eq!(Queue::serial("test"), Queue::serial("test"));
669 assert_eq!(Queue::serial("test".to_string()), Queue::Serial("test".to_string()));
670
671 // Serial queues with different names are not equal
672 assert_ne!(Queue::serial("a"), Queue::serial("b"));
673
674 // Parallel and Serial are not equal
675 assert_ne!(Queue::Parallel, Queue::serial("parallel"));
676 }
677
678 // =========================================================================
679 // JobSpec Tests
680 // =========================================================================
681
682 #[test]
683 fn job_spec_defaults() {
684 let spec = JobSpec::default();
685 assert_eq!(spec.priority, Priority::BULK_DEFAULT);
686 assert_eq!(spec.queue, Queue::Parallel);
687 assert_eq!(spec.max_attempts, Some(8));
688 assert!(spec.run_at.is_none());
689 assert!(spec.job_key.is_none());
690 }
691
692 #[test]
693 fn job_spec_parallel_has_no_queue_name() {
694 let spec = JobSpec {
695 queue: Queue::Parallel,
696 ..Default::default()
697 };
698
699 // The key behavior: parallel jobs should not have a queue name
700 // This is what causes them to run in parallel (no queue lock)
701 assert!(spec.queue.name().is_none());
702 }
703
704 #[test]
705 fn job_spec_serial_has_queue_name() {
706 let spec = JobSpec {
707 queue: Queue::serial("rate-limit"),
708 ..Default::default()
709 };
710
711 // Serial jobs must have a queue name - this creates the queue lock
712 assert_eq!(spec.queue.name(), Some("rate-limit"));
713 }
714
715 #[test]
716 fn job_spec_conversion_parallel() {
717 // Verify that parallel JobSpec converts without queue_name
718 let spec = JobSpec {
719 queue: Queue::Parallel,
720 priority: Priority::FAST_HIGH,
721 ..Default::default()
722 };
723
724 // Convert to GraphileJobSpec
725 let graphile_spec: GraphileJobSpec = spec.into();
726
727 // We can't directly inspect GraphileJobSpec internals, but we can verify
728 // that our Queue::Parallel correctly reports no queue name
729 assert!(Queue::Parallel.name().is_none());
730
731 // The graphile_spec is built - if it compiled, the conversion worked
732 let _ = graphile_spec;
733 }
734
735 #[test]
736 fn job_spec_conversion_serial() {
737 // Verify that serial JobSpec converts with queue_name
738 let spec = JobSpec {
739 queue: Queue::serial("my-queue"),
740 priority: Priority::BULK_DEFAULT,
741 ..Default::default()
742 };
743
744 // Verify queue name is set before conversion
745 assert_eq!(spec.queue.name(), Some("my-queue"));
746
747 // Convert to GraphileJobSpec
748 let graphile_spec: GraphileJobSpec = spec.into();
749
750 // The graphile_spec is built with queue_name - if it compiled, the
751 // conversion worked
752 let _ = graphile_spec;
753 }
754
755 // =========================================================================
756 // Convenience Function Queue Tests
757 // =========================================================================
758
759 #[test]
760 fn convenience_functions_use_parallel_queue() {
761 // All convenience function specs should use Queue::Parallel
762 // We can't call them without a client, but we can verify the JobSpec
763 // construction
764
765 // enqueue_fast uses parallel
766 let spec = JobSpec {
767 priority: Priority::FAST_DEFAULT,
768 ..Default::default()
769 };
770 assert_eq!(spec.queue, Queue::Parallel);
771
772 // enqueue_bulk uses parallel
773 let spec = JobSpec {
774 priority: Priority::BULK_DEFAULT,
775 ..Default::default()
776 };
777 assert_eq!(spec.queue, Queue::Parallel);
778
779 // enqueue_emergency uses parallel
780 let spec = JobSpec {
781 priority: Priority::EMERGENCY,
782 ..Default::default()
783 };
784 assert_eq!(spec.queue, Queue::Parallel);
785 }
786}