use sea_orm_migration::prelude::*;
#[derive(DeriveMigrationName)]
pub struct Migration;
const UP: &str = r#"
CREATE SCHEMA IF NOT EXISTS durable;
DO $$ BEGIN
IF NOT EXISTS (SELECT 1 FROM pg_type t JOIN pg_namespace n ON t.typnamespace = n.oid WHERE t.typname = 'task_status' AND n.nspname = 'durable') THEN
CREATE TYPE durable.task_status AS ENUM ('PENDING', 'RUNNING', 'COMPLETED', 'FAILED', 'PAUSED', 'CANCELLED');
END IF;
END $$;
CREATE TABLE IF NOT EXISTS durable.task_queue (
name TEXT PRIMARY KEY,
max_concurrency INT,
rate_limit INT,
rate_limit_window_ms BIGINT DEFAULT 1000,
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE TABLE IF NOT EXISTS durable.task (
id UUID PRIMARY KEY,
parent_id UUID REFERENCES durable.task(id) ON DELETE CASCADE,
sequence INT,
name TEXT NOT NULL,
status durable.task_status NOT NULL DEFAULT 'PENDING',
kind VARCHAR(20) NOT NULL DEFAULT 'WORKFLOW',
input JSONB,
output JSONB,
error TEXT,
max_retries INT NOT NULL DEFAULT 3,
retry_count INT NOT NULL DEFAULT 0,
cron TEXT,
next_run_at TIMESTAMPTZ,
queue_name TEXT REFERENCES durable.task_queue(name),
executor_id TEXT,
app_version TEXT,
timeout_ms BIGINT,
deadline_epoch_ms BIGINT,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
started_at TIMESTAMPTZ,
completed_at TIMESTAMPTZ,
UNIQUE (parent_id, sequence),
UNIQUE (parent_id, name)
);
CREATE INDEX IF NOT EXISTS "idx-task-status"
ON durable.task(status);
CREATE INDEX IF NOT EXISTS "idx-task-parent_id"
ON durable.task(parent_id);
CREATE INDEX IF NOT EXISTS "idx-task-queue_name-status"
ON durable.task(queue_name, status);
CREATE INDEX IF NOT EXISTS "idx-task-executor_id-status"
ON durable.task(executor_id, status);
CREATE INDEX IF NOT EXISTS "idx-task-next_run_at"
ON durable.task(next_run_at);
CREATE TABLE IF NOT EXISTS durable.executor_heartbeat (
executor_id TEXT PRIMARY KEY,
last_seen TIMESTAMPTZ NOT NULL DEFAULT now()
);
"#;
const DOWN: &str = r#"
DROP TABLE IF EXISTS durable.executor_heartbeat;
DROP TABLE IF EXISTS durable.task CASCADE;
DROP TABLE IF EXISTS durable.task_queue CASCADE;
DROP TYPE IF EXISTS durable.task_status;
DROP SCHEMA IF EXISTS durable CASCADE;
"#;
#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager.get_connection().execute_unprepared(UP).await?;
Ok(())
}
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager.get_connection().execute_unprepared(DOWN).await?;
Ok(())
}
}