Skip to main content

backfill/
lib.rs

1//! # Backfill
2//!
3//! A high-performance, PostgreSQL-backed async priority queue system for Rust
4//! applications. Built on top of [Graphile Worker](https://github.com/graphile/worker) for reliability and performance.
5//!
6//! This library provides:
7//! - **Durable job queues** with PostgreSQL backend
8//! - **Priority-based scheduling** with configurable priority levels
9//! - **Parallel execution** by default - jobs run concurrently across all
10//!   workers
11//! - **Serial queues** when you need ordering or rate limiting
12//! - **Exponential backoff** with jitter to prevent thundering herds
13//! - **Flexible retry policies** (fast, aggressive, conservative, or custom)
14//! - **Dead letter queue** handling for failed jobs
15//! - **Type-safe job handlers** using Rust's type system
16//! - **Low-latency execution** via PostgreSQL LISTEN/NOTIFY
17//!
18//! ## Quick Start
19//!
20//! ```rust,no_run
21//! use backfill::{BackfillClient, enqueue_critical, enqueue_fast_with_retries, JobSpec, RetryPolicy};
22//! use serde::{Deserialize, Serialize};
23//!
24//! #[derive(Serialize, Deserialize)]
25//! struct SendEmailJob {
26//!     recipient: String,
27//!     subject: String,
28//!     body: String,
29//! }
30//!
31//! #[tokio::main]
32//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
33//!     let database_url = "postgresql://user:password@localhost/mydb";
34//!     let client = BackfillClient::new(database_url).await?;
35//!
36//!     // Enqueue a critical job with aggressive retry policy (12 attempts)
37//!     enqueue_critical(
38//!         &client,
39//!         "send_email",
40//!         &SendEmailJob {
41//!             recipient: "user@example.com".to_string(),
42//!             subject: "Welcome!".to_string(),
43//!             body: "Thanks for signing up.".to_string(),
44//!         },
45//!         Some("welcome_email_123".to_string()),
46//!     ).await?;
47//!
48//!     // Or use fast retries for quick turnaround (3 attempts, 100ms-30s)
49//!     enqueue_fast_with_retries(
50//!         &client,
51//!         "send_notification",
52//!         &SendEmailJob {
53//!             recipient: "admin@example.com".to_string(),
54//!             subject: "Notification".to_string(),
55//!             body: "Quick notification with fast retries.".to_string(),
56//!         },
57//!         Some("quick_notification".to_string()),
58//!     ).await?;
59//!
60//!     Ok(())
61//! }
62//! ```
63
64use chrono::{DateTime, Utc};
65// Lifecycle hooks for plugins - new Plugin API with event registration
66pub use graphile_worker::{
67    // Event types (for hooks.on() registration)
68    AfterJobRun,
69    // Context types (for hook handlers)
70    AfterJobRunContext,
71    BeforeJobRun,
72    BeforeJobRunContext,
73    BeforeJobSchedule,
74    BeforeJobScheduleContext,
75    CronJobScheduled,
76    CronJobScheduledContext,
77    CronTick,
78    CronTickContext,
79    // Plugin trait and registry
80    HookRegistry,
81    // Result types
82    HookResult,
83    // Job type for accessing job data
84    Job,
85    JobComplete,
86    JobCompleteContext,
87    JobFail,
88    JobFailContext,
89    JobFetch,
90    JobFetchContext,
91    JobPermanentlyFail,
92    JobPermanentlyFailContext,
93    JobScheduleResult,
94    JobStart,
95    JobStartContext,
96    Plugin,
97    // Other
98    ShutdownReason,
99    WorkerInit,
100    WorkerInitContext,
101    WorkerShutdown,
102    WorkerShutdownContext,
103    WorkerStart,
104    WorkerStartContext,
105};
106// Re-export commonly used types from graphile_worker
107pub use graphile_worker::{IntoTaskHandlerResult, JobKeyMode, TaskHandler, WorkerContext, WorkerOptions};
108use graphile_worker::{JobSpec as GraphileJobSpec, JobSpecBuilder};
109// Re-export crontab parsing types for advanced usage
110pub use graphile_worker_crontab_parser::{CrontabParseError, parse_crontab};
111use serde::Serialize;
112
113mod client;
114mod dlq_cleanup_plugin;
115mod errors;
116mod priorities;
117mod retries;
118mod worker;
119
120#[cfg(feature = "axum")]
121pub mod admin;
122
123pub mod metrics;
124
125pub use client::cleanup::{
126    DEFAULT_STALE_JOB_LOCK_TIMEOUT, DEFAULT_STALE_LOCK_CLEANUP_INTERVAL, DEFAULT_STALE_QUEUE_LOCK_TIMEOUT,
127};
128pub use client::*;
129pub use dlq_cleanup_plugin::DlqCleanupPlugin;
130pub use errors::{BackfillError, WorkerError};
131pub use priorities::*;
132pub use retries::*;
133pub use worker::*;
134
135/// Queue configuration for job execution.
136///
137/// By default, jobs execute **in parallel** across all workers. Use
138/// `Queue::Serial` when you need ordering guarantees or mutual exclusion (e.g.,
139/// rate limiting API calls, processing events for a single user in order).
140///
141/// # Parallel Execution (Default)
142///
143/// With `Queue::Parallel`, workers can fetch and execute jobs concurrently.
144/// Priority controls which jobs get picked first, but multiple jobs can run
145/// at the same time.
146///
147/// # Serial Execution
148///
149/// With `Queue::Serial("name")`, only one job with that queue name can execute
150/// at a time across the entire cluster. This is useful for:
151/// - Rate limiting external API calls
152/// - Processing events for a single entity in order
153/// - Mutual exclusion
154///
155/// # Examples
156///
157/// ```rust
158/// use backfill::{Queue, JobSpec, Priority};
159///
160/// // Parallel execution (default) - jobs run concurrently
161/// let spec = JobSpec {
162///     priority: Priority::FAST_DEFAULT,
163///     ..Default::default()
164/// };
165///
166/// // Serial execution - one job at a time per user
167/// let spec = JobSpec {
168///     queue: Queue::serial_for("user", 123),
169///     ..Default::default()
170/// };
171/// ```
172#[derive(Debug, Clone, PartialEq, Eq, Hash, Default)]
173pub enum Queue {
174    /// Jobs execute in parallel (DEFAULT). Priority controls fetch order,
175    /// but multiple jobs can run simultaneously.
176    #[default]
177    Parallel,
178
179    /// Jobs in this queue execute one at a time across all workers.
180    /// Use for rate limiting, ordering, or mutual exclusion.
181    Serial(String),
182}
183
184impl Queue {
185    /// Create a serial queue for a specific entity.
186    ///
187    /// This is useful for ensuring jobs related to the same entity
188    /// (user, order, etc.) are processed one at a time.
189    ///
190    /// # Example
191    /// ```rust
192    /// use backfill::Queue;
193    ///
194    /// // Process user events serially
195    /// let queue = Queue::serial_for("user", 123);  // → "user:123"
196    ///
197    /// // Process order updates serially
198    /// let queue = Queue::serial_for("order", "abc-def");  // → "order:abc-def"
199    /// ```
200    pub fn serial_for(entity: &str, id: impl std::fmt::Display) -> Self {
201        Queue::Serial(format!("{}:{}", entity, id))
202    }
203
204    /// Create a named serial queue.
205    ///
206    /// Use this when you need a fixed queue name for rate limiting
207    /// or other serialization purposes.
208    ///
209    /// # Example
210    /// ```rust
211    /// use backfill::Queue;
212    ///
213    /// // Rate limit calls to external API
214    /// let queue = Queue::serial("external-api");
215    /// ```
216    pub fn serial(name: impl Into<String>) -> Self {
217        Queue::Serial(name.into())
218    }
219
220    /// Returns the queue name for serial queues, or None for parallel.
221    ///
222    /// This is used internally when building the GraphileWorker job spec.
223    pub fn name(&self) -> Option<&str> {
224        match self {
225            Queue::Parallel => None,
226            Queue::Serial(name) => Some(name),
227        }
228    }
229
230    /// Returns a string representation for logging/metrics.
231    ///
232    /// Returns "parallel" for parallel queues, or the queue name for serial
233    /// queues.
234    pub fn as_str(&self) -> &str {
235        match self {
236            Queue::Parallel => "parallel",
237            Queue::Serial(name) => name,
238        }
239    }
240
241    /// Returns true if this is a parallel (non-serialized) queue.
242    pub fn is_parallel(&self) -> bool {
243        matches!(self, Queue::Parallel)
244    }
245
246    /// Returns true if this is a serial queue.
247    pub fn is_serial(&self) -> bool {
248        matches!(self, Queue::Serial(_))
249    }
250
251    /// Get the dead letter queue (serial execution).
252    pub fn dead_letter() -> Self {
253        Queue::Serial("dead_letter".to_string())
254    }
255}
256
257/// Outcome of an enqueue operation.
258///
259/// When enqueueing a job, the result can either be:
260/// - `Enqueued(Job)`: The job was successfully created or updated
261/// - `AlreadyInProgress { job_key }`: A job with this key is currently locked
262///   by a worker
263#[derive(Debug, Clone)]
264pub enum EnqueueOutcome {
265    /// Job was successfully enqueued (either created or updated)
266    Enqueued(Box<Job>),
267    /// A job with this key is already in progress (locked by a worker).
268    /// Contains the job_key that was in conflict.
269    AlreadyInProgress { job_key: String },
270}
271
272impl EnqueueOutcome {
273    /// Returns the Job if the outcome was Enqueued, None otherwise.
274    pub fn job(&self) -> Option<&Job> {
275        match self {
276            EnqueueOutcome::Enqueued(job) => Some(job),
277            EnqueueOutcome::AlreadyInProgress { .. } => None,
278        }
279    }
280
281    /// Consumes self and returns the Job if Enqueued, None otherwise.
282    pub fn into_job(self) -> Option<Job> {
283        match self {
284            EnqueueOutcome::Enqueued(job) => Some(*job),
285            EnqueueOutcome::AlreadyInProgress { .. } => None,
286        }
287    }
288
289    /// Returns the Job if Enqueued, panics with message otherwise.
290    ///
291    /// # Panics
292    /// Panics if the outcome was AlreadyInProgress.
293    pub fn expect(self, msg: &str) -> Job {
294        match self {
295            EnqueueOutcome::Enqueued(job) => *job,
296            EnqueueOutcome::AlreadyInProgress { job_key } => {
297                panic!("{}: job_key '{}' was already in progress", msg, job_key)
298            }
299        }
300    }
301
302    /// Returns the Job if Enqueued, panics otherwise.
303    ///
304    /// # Panics
305    /// Panics if the outcome was AlreadyInProgress.
306    pub fn unwrap(self) -> Job {
307        self.expect("called `EnqueueOutcome::unwrap()` on `AlreadyInProgress` value")
308    }
309
310    /// Returns true if a job was enqueued.
311    pub fn is_enqueued(&self) -> bool {
312        matches!(self, EnqueueOutcome::Enqueued(_))
313    }
314
315    /// Returns true if the job was already in progress.
316    pub fn is_already_in_progress(&self) -> bool {
317        matches!(self, EnqueueOutcome::AlreadyInProgress { .. })
318    }
319
320    /// Returns the job_key if AlreadyInProgress, None otherwise.
321    pub fn conflicting_job_key(&self) -> Option<&str> {
322        match self {
323            EnqueueOutcome::AlreadyInProgress { job_key } => Some(job_key),
324            EnqueueOutcome::Enqueued(_) => None,
325        }
326    }
327}
328
329/// Configuration for job scheduling and execution.
330#[derive(Debug, Clone)]
331pub struct JobSpec {
332    /// When the job should be executed (default: now)
333    pub run_at: Option<DateTime<Utc>>,
334    /// Job priority (lower numbers = higher priority)
335    pub priority: Priority,
336    /// Queue name for the job
337    pub queue: Queue,
338    /// Maximum number of retry attempts (default: 8)
339    pub max_attempts: Option<i32>,
340    /// Exponential backoff retry policy for failed jobs
341    pub retry_policy: Option<RetryPolicy>,
342    /// Unique identifier for job deduplication
343    pub job_key: Option<String>,
344    /// How to handle duplicate job keys
345    pub job_key_mode: JobKeyMode,
346}
347
348impl Default for JobSpec {
349    fn default() -> Self {
350        Self {
351            run_at: None,
352            priority: Priority::default(),
353            queue: Queue::default(),
354            max_attempts: Some(8),
355            retry_policy: None, // Use GraphileWorker's default retry handling
356            job_key: None,
357            job_key_mode: JobKeyMode::Replace,
358        }
359    }
360}
361
362impl JobSpec {
363    /// Create a JobSpec with exponential backoff retry policy
364    pub fn with_retry_policy(mut self, retry_policy: RetryPolicy) -> Self {
365        self.max_attempts = Some(retry_policy.max_attempts);
366        self.retry_policy = Some(retry_policy);
367        self
368    }
369
370    /// Create a JobSpec optimized for fast retries
371    pub fn with_fast_retries(mut self) -> Self {
372        let policy = RetryPolicy::fast();
373        self.max_attempts = Some(policy.max_attempts);
374        self.retry_policy = Some(policy);
375        self
376    }
377
378    /// Create a JobSpec optimized for aggressive retries
379    pub fn with_aggressive_retries(mut self) -> Self {
380        let policy = RetryPolicy::aggressive();
381        self.max_attempts = Some(policy.max_attempts);
382        self.retry_policy = Some(policy);
383        self
384    }
385
386    /// Create a JobSpec optimized for conservative retries
387    pub fn with_conservative_retries(mut self) -> Self {
388        let policy = RetryPolicy::conservative();
389        self.max_attempts = Some(policy.max_attempts);
390        self.retry_policy = Some(policy);
391        self
392    }
393
394    /// Get the effective retry policy (returns default if none specified)
395    pub fn effective_retry_policy(&self) -> RetryPolicy {
396        self.retry_policy.clone().unwrap_or_default()
397    }
398
399    /// Calculate the next retry time for a failed job
400    pub fn calculate_retry_time(&self, attempt: i32, failed_at: DateTime<Utc>) -> Option<DateTime<Utc>> {
401        let policy = self.effective_retry_policy();
402        if policy.should_retry(attempt) {
403            Some(policy.calculate_retry_time(attempt, failed_at))
404        } else {
405            None // No more retries
406        }
407    }
408}
409
410impl From<JobSpec> for GraphileJobSpec {
411    fn from(spec: JobSpec) -> Self {
412        let mut builder = JobSpecBuilder::new();
413
414        if let Some(run_at) = spec.run_at {
415            builder = builder.run_at(run_at);
416        }
417
418        builder = builder.priority(spec.priority.into());
419
420        // Only set queue_name for serial queues - parallel jobs have no queue
421        if let Some(queue_name) = spec.queue.name() {
422            builder = builder.queue_name(queue_name);
423        }
424
425        if let Some(max_attempts) = spec.max_attempts {
426            // Convert i32 to i16, clamping to avoid overflow
427            let max_attempts_i16 = max_attempts.clamp(0, i16::MAX as i32) as i16;
428            builder = builder.max_attempts(max_attempts_i16);
429        }
430
431        if let Some(job_key) = spec.job_key {
432            builder = builder.job_key(&job_key).job_key_mode(spec.job_key_mode);
433        }
434
435        builder.build()
436    }
437}
438
439/// Enqueue a high-priority job for parallel execution.
440///
441/// Jobs are executed in parallel across all workers, with higher priority
442/// jobs (lower numbers) fetched first.
443pub async fn enqueue_fast<T>(
444    client: &BackfillClient,
445    task_identifier: &str,
446    payload: &T,
447    job_key: Option<String>,
448) -> Result<EnqueueOutcome, BackfillError>
449where
450    T: Serialize,
451{
452    let spec = JobSpec {
453        priority: Priority::FAST_DEFAULT,
454        job_key,
455        ..Default::default()
456    };
457
458    client.enqueue(task_identifier, payload, spec).await
459}
460
461/// Enqueue a normal-priority job for parallel execution.
462///
463/// Jobs are executed in parallel across all workers, with higher priority
464/// jobs fetched first. Bulk priority is lower than fast priority.
465pub async fn enqueue_bulk<T>(
466    client: &BackfillClient,
467    task_identifier: &str,
468    payload: &T,
469    job_key: Option<String>,
470) -> Result<EnqueueOutcome, BackfillError>
471where
472    T: Serialize,
473{
474    let spec = JobSpec {
475        priority: Priority::BULK_DEFAULT,
476        job_key,
477        ..Default::default()
478    };
479
480    client.enqueue(task_identifier, payload, spec).await
481}
482
483/// Enqueue an emergency priority job.
484///
485/// Use sparingly - emergency jobs jump ahead of all other jobs.
486pub async fn enqueue_emergency<T>(
487    client: &BackfillClient,
488    task_identifier: &str,
489    payload: &T,
490    job_key: Option<String>,
491) -> Result<EnqueueOutcome, BackfillError>
492where
493    T: Serialize,
494{
495    let spec = JobSpec {
496        priority: Priority::EMERGENCY,
497        run_at: Some(Utc::now()), // Execute immediately
498        job_key,
499        ..Default::default()
500    };
501
502    client.enqueue(task_identifier, payload, spec).await
503}
504
505/// Enqueue a high-priority job with fast exponential backoff retries.
506///
507/// Best for high-priority jobs that need quick retries (3 attempts, 100ms-30s
508/// delays).
509pub async fn enqueue_fast_with_retries<T>(
510    client: &BackfillClient,
511    task_identifier: &str,
512    payload: &T,
513    job_key: Option<String>,
514) -> Result<EnqueueOutcome, BackfillError>
515where
516    T: Serialize,
517{
518    let spec = JobSpec {
519        priority: Priority::FAST_HIGH,
520        job_key,
521        ..Default::default()
522    }
523    .with_fast_retries();
524
525    client.enqueue(task_identifier, payload, spec).await
526}
527
528/// Enqueue a critical job with aggressive exponential backoff retries.
529///
530/// Best for critical jobs that must succeed (12 attempts, up to 4 hour delays).
531pub async fn enqueue_critical<T>(
532    client: &BackfillClient,
533    task_identifier: &str,
534    payload: &T,
535    job_key: Option<String>,
536) -> Result<EnqueueOutcome, BackfillError>
537where
538    T: Serialize,
539{
540    let spec = JobSpec {
541        priority: Priority::FAST_HIGH,
542        job_key,
543        ..Default::default()
544    }
545    .with_aggressive_retries();
546
547    client.enqueue(task_identifier, payload, spec).await
548}
549
550/// Enqueue a bulk job with conservative exponential backoff retries.
551///
552/// Best for background jobs where consistency matters more than speed
553/// (8 attempts, 1 min - 8 hour delays).
554pub async fn enqueue_bulk_with_retries<T>(
555    client: &BackfillClient,
556    task_identifier: &str,
557    payload: &T,
558    job_key: Option<String>,
559) -> Result<EnqueueOutcome, BackfillError>
560where
561    T: Serialize,
562{
563    let spec = JobSpec {
564        priority: Priority::BULK_DEFAULT,
565        job_key,
566        ..Default::default()
567    }
568    .with_conservative_retries();
569
570    client.enqueue(task_identifier, payload, spec).await
571}
572
573/// Enqueue a job for serial execution within a named queue.
574///
575/// Only one job from this queue will execute at a time across all workers.
576/// Use for rate limiting external APIs or ensuring ordered processing.
577///
578/// # Example
579/// ```rust,no_run
580/// use backfill::{BackfillClient, enqueue_serial, Priority};
581///
582/// # async fn example(client: &BackfillClient) -> Result<(), backfill::BackfillError> {
583/// // Rate limit calls to an external API
584/// enqueue_serial(
585///     client,
586///     "call_external_api",
587///     &serde_json::json!({"url": "https://api.example.com"}),
588///     "external-api",
589///     Priority::BULK_DEFAULT,
590///     None,
591/// ).await?;
592///
593/// // Process events for a specific user in order
594/// enqueue_serial(
595///     client,
596///     "process_user_event",
597///     &serde_json::json!({"event": "login"}),
598///     format!("user:{}", 123),
599///     Priority::FAST_DEFAULT,
600///     Some("user-123-login".to_string()),
601/// ).await?;
602/// # Ok(())
603/// # }
604/// ```
605pub async fn enqueue_serial<T>(
606    client: &BackfillClient,
607    task_identifier: &str,
608    payload: &T,
609    queue_name: impl Into<String>,
610    priority: Priority,
611    job_key: Option<String>,
612) -> Result<EnqueueOutcome, BackfillError>
613where
614    T: Serialize,
615{
616    let spec = JobSpec {
617        queue: Queue::Serial(queue_name.into()),
618        priority,
619        job_key,
620        ..Default::default()
621    };
622
623    client.enqueue(task_identifier, payload, spec).await
624}
625
626#[cfg(test)]
627mod tests {
628    use super::*;
629
630    // =========================================================================
631    // Queue Type Tests
632    // =========================================================================
633
634    #[test]
635    fn queue_parallel_is_default() {
636        assert_eq!(Queue::default(), Queue::Parallel);
637        assert!(Queue::Parallel.is_parallel());
638        assert!(!Queue::Parallel.is_serial());
639        assert_eq!(Queue::Parallel.name(), None);
640        assert_eq!(Queue::Parallel.as_str(), "parallel");
641    }
642
643    #[test]
644    fn queue_serial_creation() {
645        let queue = Queue::serial("test");
646        assert!(queue.is_serial());
647        assert!(!queue.is_parallel());
648        assert_eq!(queue.name(), Some("test"));
649        assert_eq!(queue.as_str(), "test");
650
651        let queue = Queue::serial_for("user", 123);
652        assert_eq!(queue.name(), Some("user:123"));
653        assert_eq!(queue.as_str(), "user:123");
654
655        let queue = Queue::serial_for("order", "abc-def");
656        assert_eq!(queue.name(), Some("order:abc-def"));
657
658        let queue = Queue::dead_letter();
659        assert_eq!(queue.name(), Some("dead_letter"));
660    }
661
662    #[test]
663    fn queue_equality() {
664        // Parallel queues are equal
665        assert_eq!(Queue::Parallel, Queue::Parallel);
666
667        // Serial queues with same name are equal
668        assert_eq!(Queue::serial("test"), Queue::serial("test"));
669        assert_eq!(Queue::serial("test".to_string()), Queue::Serial("test".to_string()));
670
671        // Serial queues with different names are not equal
672        assert_ne!(Queue::serial("a"), Queue::serial("b"));
673
674        // Parallel and Serial are not equal
675        assert_ne!(Queue::Parallel, Queue::serial("parallel"));
676    }
677
678    // =========================================================================
679    // JobSpec Tests
680    // =========================================================================
681
682    #[test]
683    fn job_spec_defaults() {
684        let spec = JobSpec::default();
685        assert_eq!(spec.priority, Priority::BULK_DEFAULT);
686        assert_eq!(spec.queue, Queue::Parallel);
687        assert_eq!(spec.max_attempts, Some(8));
688        assert!(spec.run_at.is_none());
689        assert!(spec.job_key.is_none());
690    }
691
692    #[test]
693    fn job_spec_parallel_has_no_queue_name() {
694        let spec = JobSpec {
695            queue: Queue::Parallel,
696            ..Default::default()
697        };
698
699        // The key behavior: parallel jobs should not have a queue name
700        // This is what causes them to run in parallel (no queue lock)
701        assert!(spec.queue.name().is_none());
702    }
703
704    #[test]
705    fn job_spec_serial_has_queue_name() {
706        let spec = JobSpec {
707            queue: Queue::serial("rate-limit"),
708            ..Default::default()
709        };
710
711        // Serial jobs must have a queue name - this creates the queue lock
712        assert_eq!(spec.queue.name(), Some("rate-limit"));
713    }
714
715    #[test]
716    fn job_spec_conversion_parallel() {
717        // Verify that parallel JobSpec converts without queue_name
718        let spec = JobSpec {
719            queue: Queue::Parallel,
720            priority: Priority::FAST_HIGH,
721            ..Default::default()
722        };
723
724        // Convert to GraphileJobSpec
725        let graphile_spec: GraphileJobSpec = spec.into();
726
727        // We can't directly inspect GraphileJobSpec internals, but we can verify
728        // that our Queue::Parallel correctly reports no queue name
729        assert!(Queue::Parallel.name().is_none());
730
731        // The graphile_spec is built - if it compiled, the conversion worked
732        let _ = graphile_spec;
733    }
734
735    #[test]
736    fn job_spec_conversion_serial() {
737        // Verify that serial JobSpec converts with queue_name
738        let spec = JobSpec {
739            queue: Queue::serial("my-queue"),
740            priority: Priority::BULK_DEFAULT,
741            ..Default::default()
742        };
743
744        // Verify queue name is set before conversion
745        assert_eq!(spec.queue.name(), Some("my-queue"));
746
747        // Convert to GraphileJobSpec
748        let graphile_spec: GraphileJobSpec = spec.into();
749
750        // The graphile_spec is built with queue_name - if it compiled, the
751        // conversion worked
752        let _ = graphile_spec;
753    }
754
755    // =========================================================================
756    // Convenience Function Queue Tests
757    // =========================================================================
758
759    #[test]
760    fn convenience_functions_use_parallel_queue() {
761        // All convenience function specs should use Queue::Parallel
762        // We can't call them without a client, but we can verify the JobSpec
763        // construction
764
765        // enqueue_fast uses parallel
766        let spec = JobSpec {
767            priority: Priority::FAST_DEFAULT,
768            ..Default::default()
769        };
770        assert_eq!(spec.queue, Queue::Parallel);
771
772        // enqueue_bulk uses parallel
773        let spec = JobSpec {
774            priority: Priority::BULK_DEFAULT,
775            ..Default::default()
776        };
777        assert_eq!(spec.queue, Queue::Parallel);
778
779        // enqueue_emergency uses parallel
780        let spec = JobSpec {
781            priority: Priority::EMERGENCY,
782            ..Default::default()
783        };
784        assert_eq!(spec.queue, Queue::Parallel);
785    }
786}