duroxide-pg 0.1.25

A PostgreSQL-based provider implementation for Duroxide, a durable task orchestration framework
Documentation
# Diff for migration 0010

**Migration file:** `0010_add_deletion_and_pruning_support.sql`

This diff was auto-generated by comparing schema state before and after applying migration 0010.

## Schema Diff

```diff
--- /var/folders/70/8cgb9l_d3pgff2w_d5bqb2jm0000gn/T/tmp.yrvaEJVUPt	2026-01-05 11:28:09
+++ /var/folders/70/8cgb9l_d3pgff2w_d5bqb2jm0000gn/T/tmp.NvKogMq3Te	2026-01-05 11:28:09
@@ -3,13 +3,14 @@
 CREATE TABLE executions (instance_id TEXT NOT NULL, execution_id BIGINT NOT NULL, status TEXT NOT NULL DEFAULT 'Running'::text, output TEXT, started_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, completed_at TIMESTAMP WITH TIME ZONE);
 CREATE TABLE history (instance_id TEXT NOT NULL, execution_id BIGINT NOT NULL, event_id BIGINT NOT NULL, event_type TEXT NOT NULL, event_data TEXT NOT NULL, created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP);
 CREATE TABLE instance_locks (instance_id TEXT NOT NULL, lock_token TEXT NOT NULL, locked_until BIGINT NOT NULL, locked_at BIGINT NOT NULL);
-CREATE TABLE instances (instance_id TEXT NOT NULL, orchestration_name TEXT NOT NULL, orchestration_version TEXT, current_execution_id BIGINT NOT NULL DEFAULT 1, created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP);
+CREATE TABLE instances (instance_id TEXT NOT NULL, orchestration_name TEXT NOT NULL, orchestration_version TEXT, current_execution_id BIGINT NOT NULL DEFAULT 1, created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, parent_instance_id TEXT);
 CREATE TABLE orchestrator_queue (id BIGINT NOT NULL DEFAULT nextval('SCHEMA.orchestrator_queue_id_seq'::regclass), instance_id TEXT NOT NULL, work_item TEXT NOT NULL, visible_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, lock_token TEXT, locked_until BIGINT, created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, attempt_count INTEGER NOT NULL DEFAULT 0);
 CREATE TABLE worker_queue (id BIGINT NOT NULL DEFAULT nextval('SCHEMA.worker_queue_id_seq'::regclass), work_item TEXT NOT NULL, visible_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, lock_token TEXT, locked_until BIGINT, created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, attempt_count INTEGER NOT NULL DEFAULT 0, instance_id TEXT, execution_id BIGINT, activity_id BIGINT);
 
 -- INDEXES
 CREATE INDEX idx_history_lookup ON SCHEMA.history USING btree (instance_id, execution_id, event_id);
 CREATE INDEX idx_instance_locks_locked_until ON SCHEMA.instance_locks USING btree (locked_until);
+CREATE INDEX idx_instances_parent ON SCHEMA.instances USING btree (parent_instance_id);
 CREATE INDEX idx_orch_instance ON SCHEMA.orchestrator_queue USING btree (instance_id);
 CREATE INDEX idx_orch_lock ON SCHEMA.orchestrator_queue USING btree (lock_token);
 CREATE INDEX idx_orch_visible ON SCHEMA.orchestrator_queue USING btree (visible_at, lock_token);
@@ -224,6 +225,7 @@
             v_now_ms BIGINT;
             v_orchestration_name TEXT;
             v_orchestration_version TEXT;
+            v_parent_instance_id TEXT;
             v_status TEXT;
             v_output TEXT;
             v_completed_at TIMESTAMPTZ;
@@ -249,21 +251,24 @@
                 RAISE EXCEPTION 'Invalid lock token';
             END IF;
 
-            -- Step 2: Extract metadata from JSONB
+            -- Step 2: Extract metadata from JSONB (including parent_instance_id)
             v_orchestration_name := p_metadata->>'orchestration_name';
             v_orchestration_version := p_metadata->>'orchestration_version';
+            v_parent_instance_id := p_metadata->>'parent_instance_id';
             v_status := p_metadata->>'status';
             v_output := p_metadata->>'output';
 
-            -- Step 3: Create or update instance metadata
+            -- Step 3: Create or update instance metadata (including parent_instance_id)
             IF v_orchestration_name IS NOT NULL AND v_orchestration_version IS NOT NULL THEN
-                INSERT INTO SCHEMA.instances (instance_id, orchestration_name, orchestration_version, current_execution_id)
-                VALUES (v_instance_id, v_orchestration_name, v_orchestration_version, p_execution_id)
+                INSERT INTO SCHEMA.instances (instance_id, orchestration_name, orchestration_version, current_execution_id, parent_instance_id)
+                VALUES (v_instance_id, v_orchestration_name, v_orchestration_version, p_execution_id, v_parent_instance_id)
                 ON CONFLICT (instance_id) DO NOTHING;
 
+                -- Update existing instance (but only set parent_instance_id if it was NULL)
                 UPDATE SCHEMA.instances i
                 SET orchestration_name = v_orchestration_name,
-                    orchestration_version = v_orchestration_version
+                    orchestration_version = v_orchestration_version,
+                    parent_instance_id = COALESCE(i.parent_instance_id, v_parent_instance_id)
                 WHERE i.instance_id = v_instance_id;
             END IF;
 
@@ -455,6 +460,93 @@
             DROP TABLE IF EXISTS SCHEMA.worker_queue CASCADE;
             DROP TABLE IF EXISTS SCHEMA.instance_locks CASCADE;
             DROP TABLE IF EXISTS SCHEMA._duroxide_migrations CASCADE;
+        END;
+        $function$
+;
+CREATE OR REPLACE FUNCTION SCHEMA.delete_instances_atomic(p_instance_ids text[], p_force boolean)
+ RETURNS TABLE(instances_deleted bigint, executions_deleted bigint, events_deleted bigint, queue_messages_deleted bigint)
+ LANGUAGE plpgsql
+AS $function$
+        DECLARE
+            v_instance_id TEXT;

+            v_orphan_id TEXT;
+            v_instances_deleted BIGINT := 0;
+            v_executions_deleted BIGINT := 0;
+            v_events_deleted BIGINT := 0;
+            v_queue_deleted BIGINT := 0;
+            v_count BIGINT;
+        BEGIN
+            -- Check for empty input
+            IF p_instance_ids IS NULL OR array_length(p_instance_ids, 1) IS NULL THEN
+                instances_deleted := 0;
+                executions_deleted := 0;
+                events_deleted := 0;
+                queue_messages_deleted := 0;
+                RETURN NEXT;
+                RETURN;
+            END IF;
+
            -- Step 1: If not force, check all instances are terminal (single query, no loop)
            IF NOT p_force THEN
                SELECT i.instance_id INTO v_instance_id
                FROM SCHEMA.instances i
                JOIN SCHEMA.executions e ON i.instance_id = e.instance_id 
                  AND i.current_execution_id = e.execution_id
                WHERE i.instance_id = ANY(p_instance_ids)
                  AND e.status = 'Running'
                LIMIT 1;
                
                IF v_instance_id IS NOT NULL THEN
                    RAISE EXCEPTION 'Instance % is Running. Use force=true to delete.', v_instance_id;
                END IF;
+            END IF;
+
+            -- Step 2: Lock parent rows to prevent concurrent child creation
+            -- This prevents the race condition where a child could be inserted
+            -- between our orphan check and the actual delete.
+            PERFORM 1 FROM SCHEMA.instances
+            WHERE instance_id = ANY(p_instance_ids)
+            FOR UPDATE;
+
+            -- Step 3: Check for orphans (children not in our delete list)
+            -- Now safe because we hold locks on all parent rows
+            SELECT i.instance_id INTO v_orphan_id
+            FROM SCHEMA.instances i
+            WHERE i.parent_instance_id = ANY(p_instance_ids)
+              AND NOT (i.instance_id = ANY(p_instance_ids))
+            LIMIT 1;
+            
+            IF v_orphan_id IS NOT NULL THEN
+                RAISE EXCEPTION 'Orphan detected: instance % has parent in delete list but is not included', v_orphan_id;
+            END IF;
+
+            -- Step 4: Delete from all tables
+            -- Delete history events
+            DELETE FROM SCHEMA.history WHERE instance_id = ANY(p_instance_ids);
+            GET DIAGNOSTICS v_count = ROW_COUNT;
+            v_events_deleted := v_count;
+
+            -- Delete executions
+            DELETE FROM SCHEMA.executions WHERE instance_id = ANY(p_instance_ids);
+            GET DIAGNOSTICS v_count = ROW_COUNT;
+            v_executions_deleted := v_count;
+
+            -- Delete orchestrator queue messages
+            DELETE FROM SCHEMA.orchestrator_queue WHERE instance_id = ANY(p_instance_ids);
+            GET DIAGNOSTICS v_count = ROW_COUNT;
+            v_queue_deleted := v_count;
+
+            -- Delete worker queue messages
+            DELETE FROM SCHEMA.worker_queue WHERE instance_id = ANY(p_instance_ids);
+            GET DIAGNOSTICS v_count = ROW_COUNT;
+            v_queue_deleted := v_queue_deleted + v_count;
+
+            -- Delete instance locks
+            DELETE FROM SCHEMA.instance_locks WHERE instance_id = ANY(p_instance_ids);
+
+            -- Delete instances
+            DELETE FROM SCHEMA.instances WHERE instance_id = ANY(p_instance_ids);
+            GET DIAGNOSTICS v_count = ROW_COUNT;
+            v_instances_deleted := v_count;
+
+            instances_deleted := v_instances_deleted;
+            executions_deleted := v_executions_deleted;
+            events_deleted := v_events_deleted;
+            queue_messages_deleted := v_queue_deleted;
+            RETURN NEXT;
         END;
         $function$
 ;
@@ -753,7 +845,7 @@
         $function$
 ;
 CREATE OR REPLACE FUNCTION SCHEMA.get_instance_info(p_instance_id text)
- RETURNS TABLE(instance_id text, orchestration_name text, orchestration_version text, current_execution_id bigint, created_at timestamp with time zone, updated_at timestamp with time zone, status text, output text)
+ RETURNS TABLE(instance_id text, orchestration_name text, orchestration_version text, current_execution_id bigint, created_at timestamp with time zone, updated_at timestamp with time zone, status text, output text, parent_instance_id text)
  LANGUAGE plpgsql
 AS $function$
         BEGIN
@@ -761,7 +853,7 @@
             SELECT i.instance_id, i.orchestration_name, 
                    COALESCE(i.orchestration_version, 'unknown') as orchestration_version,
                    i.current_execution_id, i.created_at, i.updated_at,
-                   e.status, e.output
+                   e.status, e.output, i.parent_instance_id
             FROM SCHEMA.instances i
             LEFT JOIN SCHEMA.executions e ON i.instance_id = e.instance_id 
               AND i.current_execution_id = e.execution_id
@@ -769,6 +861,26 @@
         END;
         $function$
 ;
+CREATE OR REPLACE FUNCTION SCHEMA.get_parent_id(p_instance_id text)
+ RETURNS text
+ LANGUAGE plpgsql
+AS $function$
+        DECLARE
+            v_parent_id TEXT;
+        BEGIN
+            SELECT i.parent_instance_id
+            INTO v_parent_id
+            FROM SCHEMA.instances i
+            WHERE i.instance_id = p_instance_id;
+            
+            IF NOT FOUND THEN
+                RAISE EXCEPTION 'Instance not found: %', p_instance_id;
+            END IF;
+            
+            RETURN v_parent_id;
+        END;
+        $function$
+;
 CREATE OR REPLACE FUNCTION SCHEMA.get_queue_depths(p_now_ms bigint)
  RETURNS TABLE(orchestrator_queue bigint, worker_queue bigint)
  LANGUAGE plpgsql
@@ -828,6 +940,19 @@
         END;
         $function$
 ;
+CREATE OR REPLACE FUNCTION SCHEMA.list_children(p_instance_id text)
+ RETURNS TABLE(child_instance_id text)
+ LANGUAGE plpgsql
+AS $function$
+        BEGIN
+            RETURN QUERY
+            SELECT i.instance_id
+            FROM SCHEMA.instances i
+            WHERE i.parent_instance_id = p_instance_id
+            ORDER BY i.created_at;
+        END;
+        $function$
+;
 CREATE OR REPLACE FUNCTION SCHEMA.list_executions(p_instance_id text)
  RETURNS TABLE(execution_id bigint)
  LANGUAGE plpgsql
@@ -868,6 +993,77 @@
         END;
         $function$
 ;
+CREATE OR REPLACE FUNCTION SCHEMA.prune_executions(p_instance_id text, p_keep_last integer DEFAULT NULL::integer, p_completed_before_ms bigint DEFAULT NULL::bigint)
+ RETURNS TABLE(instances_processed bigint, executions_deleted bigint, events_deleted bigint)
+ LANGUAGE plpgsql
+AS $function$
+        DECLARE
+            v_current_execution_id BIGINT;
+            v_executions_deleted BIGINT := 0;
+            v_events_deleted BIGINT := 0;
+            v_count BIGINT;
+            v_exec_ids_to_delete BIGINT[];
+        BEGIN
+            -- Get current execution ID (NEVER delete this)
+            SELECT i.current_execution_id INTO v_current_execution_id
+            FROM SCHEMA.instances i
+            WHERE i.instance_id = p_instance_id;
+            
+            IF NOT FOUND THEN
+                -- Instance doesn't exist - raise error
+                RAISE EXCEPTION 'Instance % not found', p_instance_id;
+            END IF;
+
+            -- Build list of executions to delete
+            -- CRITICAL: keep_last semantics - select top N executions INCLUDING current
+            -- None, Some(0), Some(1) are all equivalent (only current remains)
+            SELECT array_agg(e.execution_id) INTO v_exec_ids_to_delete
+            FROM SCHEMA.executions e
+            WHERE e.instance_id = p_instance_id
+              AND e.execution_id != v_current_execution_id  -- Never delete current
+              AND e.status != 'Running'                   -- Never delete running
+              -- Apply completed_before filter if provided
+              AND (p_completed_before_ms IS NULL 
+                   OR e.completed_at < TO_TIMESTAMP(p_completed_before_ms / 1000.0))
+              -- Apply keep_last filter: exclude top N by execution_id (including current)
+              AND (p_keep_last IS NULL 
+                   OR e.execution_id NOT IN (
+                       SELECT e2.execution_id 
+                       FROM SCHEMA.executions e2
+                       WHERE e2.instance_id = p_instance_id
+                       ORDER BY e2.execution_id DESC
+                       LIMIT p_keep_last
+                   ));
+
+            IF v_exec_ids_to_delete IS NULL OR array_length(v_exec_ids_to_delete, 1) IS NULL THEN
+                instances_processed := 1;
+                executions_deleted := 0;
+                events_deleted := 0;
+                RETURN NEXT;
+                RETURN;
+            END IF;
+
+            -- Delete history for pruned executions
+            DELETE FROM SCHEMA.history h
+            WHERE h.instance_id = p_instance_id
+              AND h.execution_id = ANY(v_exec_ids_to_delete);
+            GET DIAGNOSTICS v_count = ROW_COUNT;
+            v_events_deleted := v_count;
+
+            -- Delete executions
+            DELETE FROM SCHEMA.executions e
+            WHERE e.instance_id = p_instance_id
+              AND e.execution_id = ANY(v_exec_ids_to_delete);
+            GET DIAGNOSTICS v_count = ROW_COUNT;
+            v_executions_deleted := v_count;
+
+            instances_processed := 1;
+            executions_deleted := v_executions_deleted;
+            events_deleted := v_events_deleted;
+            RETURN NEXT;
+        END;
+        $function$
+;
 CREATE OR REPLACE FUNCTION SCHEMA.renew_orchestration_item_lock(p_lock_token text, p_now_ms bigint, p_extend_secs bigint)
  RETURNS void
  LANGUAGE plpgsql
```

---
*Generated on 2026-01-05 04:28:09 UTC*