durable-db 0.11.3

Database migrations and schema for the durable workflow engine
Documentation
use sea_orm_migration::prelude::*;

#[derive(DeriveMigrationName)]
pub struct Migration;

const UP: &str = r#"
CREATE SCHEMA IF NOT EXISTS durable;

DO $$ BEGIN
    IF NOT EXISTS (SELECT 1 FROM pg_type t JOIN pg_namespace n ON t.typnamespace = n.oid WHERE t.typname = 'task_status' AND n.nspname = 'durable') THEN
        CREATE TYPE durable.task_status AS ENUM ('PENDING', 'RUNNING', 'COMPLETED', 'FAILED', 'PAUSED', 'CANCELLED');
    END IF;
END $$;

CREATE TABLE IF NOT EXISTS durable.task_queue (
    name                TEXT PRIMARY KEY,
    max_concurrency     INT,
    rate_limit          INT,
    rate_limit_window_ms BIGINT DEFAULT 1000,
    created_at          TIMESTAMPTZ NOT NULL DEFAULT now()
);

CREATE TABLE IF NOT EXISTS durable.task (
    id                  UUID PRIMARY KEY,
    parent_id           UUID REFERENCES durable.task(id) ON DELETE CASCADE,
    sequence            INT,

    name                TEXT NOT NULL,
    status              durable.task_status NOT NULL DEFAULT 'PENDING',
    kind                VARCHAR(20) NOT NULL DEFAULT 'WORKFLOW',

    input               JSONB,
    output              JSONB,
    error               TEXT,

    max_retries         INT NOT NULL DEFAULT 3,
    retry_count         INT NOT NULL DEFAULT 0,

    cron                TEXT,
    next_run_at         TIMESTAMPTZ,

    queue_name          TEXT REFERENCES durable.task_queue(name),

    executor_id         TEXT,
    app_version         TEXT,

    timeout_ms          BIGINT,
    deadline_epoch_ms   BIGINT,

    created_at          TIMESTAMPTZ NOT NULL DEFAULT now(),
    started_at          TIMESTAMPTZ,
    completed_at        TIMESTAMPTZ,

    UNIQUE (parent_id, sequence),
    UNIQUE (parent_id, name)
);

CREATE INDEX IF NOT EXISTS "idx-task-status"
    ON durable.task(status);
CREATE INDEX IF NOT EXISTS "idx-task-parent_id"
    ON durable.task(parent_id);
CREATE INDEX IF NOT EXISTS "idx-task-queue_name-status"
    ON durable.task(queue_name, status);
CREATE INDEX IF NOT EXISTS "idx-task-executor_id-status"
    ON durable.task(executor_id, status);
CREATE INDEX IF NOT EXISTS "idx-task-next_run_at"
    ON durable.task(next_run_at);

CREATE TABLE IF NOT EXISTS durable.executor_heartbeat (
    executor_id TEXT PRIMARY KEY,
    last_seen   TIMESTAMPTZ NOT NULL DEFAULT now()
);
"#;

const DOWN: &str = r#"
DROP TABLE IF EXISTS durable.executor_heartbeat;
DROP TABLE IF EXISTS durable.task CASCADE;
DROP TABLE IF EXISTS durable.task_queue CASCADE;
DROP TYPE IF EXISTS durable.task_status;
DROP SCHEMA IF EXISTS durable CASCADE;
"#;

#[async_trait::async_trait]
impl MigrationTrait for Migration {
    async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
        manager.get_connection().execute_unprepared(UP).await?;
        Ok(())
    }

    async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
        manager.get_connection().execute_unprepared(DOWN).await?;
        Ok(())
    }
}