1use sea_orm_migration::prelude::*;
18
19pub struct CreateJobsTable;
21
22impl sea_orm_migration::MigrationName for CreateJobsTable {
23 fn name(&self) -> &str {
24 "m_create_jobs_table"
25 }
26}
27
28#[async_trait::async_trait]
29impl MigrationTrait for CreateJobsTable {
30 async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
31 manager
32 .create_table(
33 Table::create()
34 .table(Jobs::Table)
35 .if_not_exists()
36 .col(
37 ColumnDef::new(Jobs::Id)
38 .big_integer()
39 .not_null()
40 .auto_increment()
41 .primary_key(),
42 )
43 .col(
44 ColumnDef::new(Jobs::Queue)
45 .string()
46 .not_null()
47 .default("default"),
48 )
49 .col(ColumnDef::new(Jobs::JobType).string().not_null())
50 .col(ColumnDef::new(Jobs::Payload).text().not_null())
51 .col(
52 ColumnDef::new(Jobs::Status)
53 .string()
54 .not_null()
55 .default("pending"),
56 )
57 .col(
58 ColumnDef::new(Jobs::Attempts)
59 .integer()
60 .not_null()
61 .default(0),
62 )
63 .col(
64 ColumnDef::new(Jobs::MaxRetries)
65 .integer()
66 .not_null()
67 .default(3),
68 )
69 .col(ColumnDef::new(Jobs::IdempotencyKey).string().null())
70 .col(ColumnDef::new(Jobs::TenantId).big_integer().null())
71 .col(
72 ColumnDef::new(Jobs::AvailableAt)
73 .timestamp_with_time_zone()
74 .not_null(),
75 )
76 .col(
77 ColumnDef::new(Jobs::ClaimedAt)
78 .timestamp_with_time_zone()
79 .null(),
80 )
81 .col(ColumnDef::new(Jobs::ClaimedBy).string().null())
82 .col(ColumnDef::new(Jobs::Error).text().null())
83 .col(
84 ColumnDef::new(Jobs::FailedAt)
85 .timestamp_with_time_zone()
86 .null(),
87 )
88 .col(
89 ColumnDef::new(Jobs::CreatedAt)
90 .timestamp_with_time_zone()
91 .not_null(),
92 )
93 .to_owned(),
94 )
95 .await?;
96
97 manager
99 .create_index(
100 Index::create()
101 .name("idx_jobs_claim")
102 .table(Jobs::Table)
103 .col(Jobs::Queue)
104 .col(Jobs::Status)
105 .col(Jobs::AvailableAt)
106 .col(Jobs::Id)
107 .to_owned(),
108 )
109 .await?;
110
111 manager
113 .create_index(
114 Index::create()
115 .name("idx_jobs_reaper")
116 .table(Jobs::Table)
117 .col(Jobs::Status)
118 .col(Jobs::ClaimedAt)
119 .to_owned(),
120 )
121 .await?;
122
123 manager
125 .create_index(
126 Index::create()
127 .name("idx_jobs_idempotency")
128 .table(Jobs::Table)
129 .col(Jobs::JobType)
130 .col(Jobs::IdempotencyKey)
131 .to_owned(),
132 )
133 .await?;
134
135 Ok(())
136 }
137
138 async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
139 manager
141 .drop_table(Table::drop().table(Jobs::Table).to_owned())
142 .await
143 }
144}
145
146#[derive(DeriveIden)]
147pub(crate) enum Jobs {
148 Table,
149 Id,
150 Queue,
151 JobType,
152 Payload,
153 Status,
154 Attempts,
155 MaxRetries,
156 IdempotencyKey,
157 TenantId,
158 AvailableAt,
159 ClaimedAt,
160 ClaimedBy,
161 Error,
162 FailedAt,
163 CreatedAt,
164}
165
166#[cfg(test)]
167mod tests {
168 use sea_orm::{ConnectionTrait, Database, DatabaseBackend, Statement};
169 use sea_orm_migration::MigratorTrait;
170
171 struct TestMigrator;
172
173 #[async_trait::async_trait]
174 impl MigratorTrait for TestMigrator {
175 fn migrations() -> Vec<Box<dyn sea_orm_migration::MigrationTrait>> {
176 vec![Box::new(super::CreateJobsTable)]
177 }
178 }
179
180 #[tokio::test]
181 async fn migration_creates_jobs_table() {
182 let conn = Database::connect("sqlite::memory:")
183 .await
184 .expect("connect to in-memory sqlite");
185
186 TestMigrator::up(&conn, None)
187 .await
188 .expect("run migration up");
189
190 let table_row = conn
192 .query_one(Statement::from_string(
193 DatabaseBackend::Sqlite,
194 "SELECT name FROM sqlite_master WHERE type='table' AND name='jobs'".to_string(),
195 ))
196 .await
197 .expect("query sqlite_master for table");
198 assert!(table_row.is_some(), "jobs table not created by migration");
199
200 let idx_row = conn
202 .query_one(Statement::from_string(
203 DatabaseBackend::Sqlite,
204 "SELECT name FROM sqlite_master WHERE type='index' AND name='idx_jobs_claim'"
205 .to_string(),
206 ))
207 .await
208 .expect("query sqlite_master for idx_jobs_claim");
209 assert!(idx_row.is_some(), "idx_jobs_claim not created by migration");
210
211 for idx_name in &["idx_jobs_claim", "idx_jobs_reaper", "idx_jobs_idempotency"] {
213 let row = conn
214 .query_one(Statement::from_string(
215 DatabaseBackend::Sqlite,
216 format!(
217 "SELECT name FROM sqlite_master WHERE type='index' AND name='{idx_name}'"
218 ),
219 ))
220 .await
221 .expect("query sqlite_master for index");
222 assert!(row.is_some(), "index {idx_name} not created by migration");
223 }
224
225 TestMigrator::down(&conn, None)
227 .await
228 .expect("run migration down");
229 let table_after_down = conn
230 .query_one(Statement::from_string(
231 DatabaseBackend::Sqlite,
232 "SELECT name FROM sqlite_master WHERE type='table' AND name='jobs'".to_string(),
233 ))
234 .await
235 .expect("query sqlite_master after down");
236 assert!(
237 table_after_down.is_none(),
238 "jobs table should be dropped by down()"
239 );
240 }
241}