Skip to main content

ferro_queue/
migration.rs

1//! `CreateJobsTable` — SeaORM migration creating the `jobs` table (D-04, D-05)
2//! and its three composite indexes (D-06). Portable across SQLite + Postgres:
3//! no backend-specific SQL, only the SchemaManager DDL builder.
4//!
5//! Consumers register it in their own `Migrator`:
6//! ```rust,ignore
7//! impl MigratorTrait for Migrator {
8//!     fn migrations() -> Vec<Box<dyn MigrationTrait>> {
9//!         vec![
10//!             Box::new(ferro_queue::CreateJobsTable),
11//!             // ... your app migrations
12//!         ]
13//!     }
14//! }
15//! ```
16
17use sea_orm_migration::prelude::*;
18
19/// Migration that creates the `jobs` table and its indexes.
20pub struct CreateJobsTable;
21
22impl sea_orm_migration::MigrationName for CreateJobsTable {
23    fn name(&self) -> &str {
24        "m_create_jobs_table"
25    }
26}
27
28#[async_trait::async_trait]
29impl MigrationTrait for CreateJobsTable {
30    async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
31        manager
32            .create_table(
33                Table::create()
34                    .table(Jobs::Table)
35                    .if_not_exists()
36                    .col(
37                        ColumnDef::new(Jobs::Id)
38                            .big_integer()
39                            .not_null()
40                            .auto_increment()
41                            .primary_key(),
42                    )
43                    .col(
44                        ColumnDef::new(Jobs::Queue)
45                            .string()
46                            .not_null()
47                            .default("default"),
48                    )
49                    .col(ColumnDef::new(Jobs::JobType).string().not_null())
50                    .col(ColumnDef::new(Jobs::Payload).text().not_null())
51                    .col(
52                        ColumnDef::new(Jobs::Status)
53                            .string()
54                            .not_null()
55                            .default("pending"),
56                    )
57                    .col(
58                        ColumnDef::new(Jobs::Attempts)
59                            .integer()
60                            .not_null()
61                            .default(0),
62                    )
63                    .col(
64                        ColumnDef::new(Jobs::MaxRetries)
65                            .integer()
66                            .not_null()
67                            .default(3),
68                    )
69                    .col(ColumnDef::new(Jobs::IdempotencyKey).string().null())
70                    .col(ColumnDef::new(Jobs::TenantId).big_integer().null())
71                    .col(
72                        ColumnDef::new(Jobs::AvailableAt)
73                            .timestamp_with_time_zone()
74                            .not_null(),
75                    )
76                    .col(
77                        ColumnDef::new(Jobs::ClaimedAt)
78                            .timestamp_with_time_zone()
79                            .null(),
80                    )
81                    .col(ColumnDef::new(Jobs::ClaimedBy).string().null())
82                    .col(ColumnDef::new(Jobs::Error).text().null())
83                    .col(
84                        ColumnDef::new(Jobs::FailedAt)
85                            .timestamp_with_time_zone()
86                            .null(),
87                    )
88                    .col(
89                        ColumnDef::new(Jobs::CreatedAt)
90                            .timestamp_with_time_zone()
91                            .not_null(),
92                    )
93                    .to_owned(),
94            )
95            .await?;
96
97        // idx_jobs_claim: (queue, status, available_at, id) — primary claim SELECT path
98        manager
99            .create_index(
100                Index::create()
101                    .name("idx_jobs_claim")
102                    .table(Jobs::Table)
103                    .col(Jobs::Queue)
104                    .col(Jobs::Status)
105                    .col(Jobs::AvailableAt)
106                    .col(Jobs::Id)
107                    .to_owned(),
108            )
109            .await?;
110
111        // idx_jobs_reaper: (status, claimed_at) — reaper visibility-timeout scan
112        manager
113            .create_index(
114                Index::create()
115                    .name("idx_jobs_reaper")
116                    .table(Jobs::Table)
117                    .col(Jobs::Status)
118                    .col(Jobs::ClaimedAt)
119                    .to_owned(),
120            )
121            .await?;
122
123        // idx_jobs_idempotency: (job_type, idempotency_key) — deduplication on enqueue
124        manager
125            .create_index(
126                Index::create()
127                    .name("idx_jobs_idempotency")
128                    .table(Jobs::Table)
129                    .col(Jobs::JobType)
130                    .col(Jobs::IdempotencyKey)
131                    .to_owned(),
132            )
133            .await?;
134
135        Ok(())
136    }
137
138    async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
139        // Dropping the table also drops its indexes on both SQLite and Postgres.
140        manager
141            .drop_table(Table::drop().table(Jobs::Table).to_owned())
142            .await
143    }
144}
145
146#[derive(DeriveIden)]
147pub(crate) enum Jobs {
148    Table,
149    Id,
150    Queue,
151    JobType,
152    Payload,
153    Status,
154    Attempts,
155    MaxRetries,
156    IdempotencyKey,
157    TenantId,
158    AvailableAt,
159    ClaimedAt,
160    ClaimedBy,
161    Error,
162    FailedAt,
163    CreatedAt,
164}
165
166#[cfg(test)]
167mod tests {
168    use sea_orm::{ConnectionTrait, Database, DatabaseBackend, Statement};
169    use sea_orm_migration::MigratorTrait;
170
171    struct TestMigrator;
172
173    #[async_trait::async_trait]
174    impl MigratorTrait for TestMigrator {
175        fn migrations() -> Vec<Box<dyn sea_orm_migration::MigrationTrait>> {
176            vec![Box::new(super::CreateJobsTable)]
177        }
178    }
179
180    #[tokio::test]
181    async fn migration_creates_jobs_table() {
182        let conn = Database::connect("sqlite::memory:")
183            .await
184            .expect("connect to in-memory sqlite");
185
186        TestMigrator::up(&conn, None)
187            .await
188            .expect("run migration up");
189
190        // Verify the jobs table exists.
191        let table_row = conn
192            .query_one(Statement::from_string(
193                DatabaseBackend::Sqlite,
194                "SELECT name FROM sqlite_master WHERE type='table' AND name='jobs'".to_string(),
195            ))
196            .await
197            .expect("query sqlite_master for table");
198        assert!(table_row.is_some(), "jobs table not created by migration");
199
200        // Verify the claim index exists.
201        let idx_row = conn
202            .query_one(Statement::from_string(
203                DatabaseBackend::Sqlite,
204                "SELECT name FROM sqlite_master WHERE type='index' AND name='idx_jobs_claim'"
205                    .to_string(),
206            ))
207            .await
208            .expect("query sqlite_master for idx_jobs_claim");
209        assert!(idx_row.is_some(), "idx_jobs_claim not created by migration");
210
211        // Verify all three indexes exist.
212        for idx_name in &["idx_jobs_claim", "idx_jobs_reaper", "idx_jobs_idempotency"] {
213            let row = conn
214                .query_one(Statement::from_string(
215                    DatabaseBackend::Sqlite,
216                    format!(
217                        "SELECT name FROM sqlite_master WHERE type='index' AND name='{idx_name}'"
218                    ),
219                ))
220                .await
221                .expect("query sqlite_master for index");
222            assert!(row.is_some(), "index {idx_name} not created by migration");
223        }
224
225        // Verify down() drops the table.
226        TestMigrator::down(&conn, None)
227            .await
228            .expect("run migration down");
229        let table_after_down = conn
230            .query_one(Statement::from_string(
231                DatabaseBackend::Sqlite,
232                "SELECT name FROM sqlite_master WHERE type='table' AND name='jobs'".to_string(),
233            ))
234            .await
235            .expect("query sqlite_master after down");
236        assert!(
237            table_after_down.is_none(),
238            "jobs table should be dropped by down()"
239        );
240    }
241}