duroxide-pg 0.1.25

A PostgreSQL-based provider implementation for Duroxide, a durable task orchestration framework
Documentation
# Diff for migration 0015

**Migration file:** `0015_add_custom_status.sql`

Each changed function is shown **in full** with `+`/`-` markers on changed lines.

## Table Changes

### `instances` — Modified
```diff
  instance_id TEXT NOT NULL
  orchestration_name TEXT NOT NULL
  orchestration_version TEXT
  current_execution_id BIGINT NOT NULL DEFAULT 1
  created_at TIMESTAMP WITH TIME ZONE NOT NULL
  updated_at TIMESTAMP WITH TIME ZONE NOT NULL
  parent_instance_id TEXT
+ custom_status TEXT
+ custom_status_version INTEGER NOT NULL DEFAULT 0
```

## Function Changes

### `ack_orchestration_item` — Body Modified

Full function with diff:
```diff
 CREATE OR REPLACE FUNCTION SCHEMA.ack_orchestration_item(p_lock_token text, p_now_ms bigint, p_execution_id bigint, p_history_delta jsonb, p_worker_items jsonb, p_orchestrator_items jsonb, p_metadata jsonb, p_cancelled_activities jsonb DEFAULT '[]'::jsonb)
  RETURNS void
  LANGUAGE plpgsql
 AS $function$
         DECLARE
             v_instance_id TEXT;
             v_orchestration_name TEXT;
             v_orchestration_version TEXT;
             v_parent_instance_id TEXT;
             v_status TEXT;
             v_output TEXT;
             v_completed_at TIMESTAMPTZ;
             v_elem JSONB;
             v_visible_at TIMESTAMPTZ;
             v_fire_at_ms BIGINT;
             v_item_instance_id TEXT;
             v_item_execution_id BIGINT;
             v_item_activity_id BIGINT;
             v_item_session_id TEXT;
             v_now_ts TIMESTAMPTZ;
+            v_custom_status_action TEXT;
+            v_custom_status_value TEXT;
         BEGIN
             -- Convert Rust-supplied millisecond timestamp to TIMESTAMPTZ
             v_now_ts := TO_TIMESTAMP(p_now_ms / 1000.0);
 
             -- Step 1: Validate lock token
             SELECT il.instance_id INTO v_instance_id
             FROM SCHEMA.instance_locks il
             WHERE il.lock_token = p_lock_token AND il.locked_until > p_now_ms;
 
             IF NOT FOUND THEN
                 RAISE EXCEPTION 'Invalid lock token';
             END IF;
 
             -- Step 2: Extract metadata from JSONB
             v_orchestration_name := p_metadata->>'orchestration_name';
             v_orchestration_version := p_metadata->>'orchestration_version';
             v_parent_instance_id := p_metadata->>'parent_instance_id';
             v_status := p_metadata->>'status';
             v_output := p_metadata->>'output';
 
             -- Step 3: Create or update instance metadata (with explicit timestamps)
             IF v_orchestration_name IS NOT NULL AND v_orchestration_version IS NOT NULL THEN
                 INSERT INTO SCHEMA.instances (instance_id, orchestration_name, orchestration_version, current_execution_id, parent_instance_id, created_at, updated_at)
                 VALUES (v_instance_id, v_orchestration_name, v_orchestration_version, p_execution_id, v_parent_instance_id, v_now_ts, v_now_ts)
                 ON CONFLICT (instance_id) DO NOTHING;
 
                 UPDATE SCHEMA.instances i
                 SET orchestration_name = v_orchestration_name,
                     orchestration_version = v_orchestration_version,
                     parent_instance_id = COALESCE(i.parent_instance_id, v_parent_instance_id),
                     updated_at = v_now_ts
                 WHERE i.instance_id = v_instance_id;
             END IF;
 
             -- Step 4: Create execution record (idempotent)
             INSERT INTO SCHEMA.executions (instance_id, execution_id, status, started_at)
             VALUES (v_instance_id, p_execution_id, 'Running', v_now_ts)
             ON CONFLICT (instance_id, execution_id) DO NOTHING;
 
             -- Step 5: Update instance current_execution_id
             UPDATE SCHEMA.instances i
             SET current_execution_id = GREATEST(i.current_execution_id, p_execution_id),
                 updated_at = v_now_ts
             WHERE i.instance_id = v_instance_id;
 
             -- Step 6: Append history_delta (batch insert with explicit timestamps)
             IF p_history_delta IS NOT NULL AND JSONB_ARRAY_LENGTH(p_history_delta) > 0 THEN
                 INSERT INTO SCHEMA.history (instance_id, execution_id, event_id, event_type, event_data, created_at)
                 SELECT
                     v_instance_id,
                     p_execution_id,
                     (elem->>'event_id')::BIGINT,
                     elem->>'event_type',
                     elem->>'event_data',
                     v_now_ts
                 FROM JSONB_ARRAY_ELEMENTS(p_history_delta) AS elem;
             END IF;
 
             -- Step 7: Update execution status if provided
             IF v_status IS NOT NULL THEN
                 v_completed_at := CASE 
                     WHEN v_status IN ('Completed', 'Failed') THEN v_now_ts 
                     ELSE NULL 
                 END;
                 
                 UPDATE SCHEMA.executions e
                 SET status = v_status, output = v_output, completed_at = v_completed_at
                 WHERE e.instance_id = v_instance_id AND e.execution_id = p_execution_id;
             END IF;
 
             -- Step 7b: Store pinned duroxide version if provided in metadata
             IF p_metadata ? 'pinned_duroxide_version' AND p_metadata->'pinned_duroxide_version' IS NOT NULL
                AND p_metadata->>'pinned_duroxide_version' != 'null' THEN
                 UPDATE SCHEMA.executions
                 SET duroxide_version_major = (p_metadata->'pinned_duroxide_version'->>'major')::INTEGER,
                     duroxide_version_minor = (p_metadata->'pinned_duroxide_version'->>'minor')::INTEGER,
                     duroxide_version_patch = (p_metadata->'pinned_duroxide_version'->>'patch')::INTEGER
                 WHERE instance_id = v_instance_id AND execution_id = p_execution_id;
             END IF;
 
+            -- Step 7c: Handle custom_status update on instances table
+            v_custom_status_action := p_metadata->>'custom_status_action';
+            IF v_custom_status_action = 'set' THEN
+                v_custom_status_value := p_metadata->>'custom_status_value';
+                UPDATE SCHEMA.instances
+                SET custom_status = v_custom_status_value,
+                    custom_status_version = custom_status_version + 1
+                WHERE instance_id = v_instance_id;
+            ELSIF v_custom_status_action = 'clear' THEN
+                UPDATE SCHEMA.instances
+                SET custom_status = NULL,
+                    custom_status_version = custom_status_version + 1
+                WHERE instance_id = v_instance_id;
+            END IF;
+
             -- Step 8: Enqueue worker items with session_id support
             IF p_worker_items IS NOT NULL AND JSONB_ARRAY_LENGTH(p_worker_items) > 0 THEN
                 FOR v_elem IN SELECT value FROM JSONB_ARRAY_ELEMENTS(p_worker_items) LOOP
                     IF v_elem ? 'ActivityExecute' THEN
                         v_item_instance_id := v_elem->'ActivityExecute'->>'instance';
                         v_item_execution_id := (v_elem->'ActivityExecute'->>'execution_id')::BIGINT;
                         v_item_activity_id := (v_elem->'ActivityExecute'->>'id')::BIGINT;
                         v_item_session_id := v_elem->'ActivityExecute'->>'session_id';
                     ELSE
                         v_item_instance_id := NULL;
                         v_item_execution_id := NULL;
                         v_item_activity_id := NULL;
                         v_item_session_id := NULL;
                     END IF;
 
                     INSERT INTO SCHEMA.worker_queue (work_item, visible_at, created_at, instance_id, execution_id, activity_id, session_id)
                     VALUES (v_elem::TEXT, v_now_ts, v_now_ts, v_item_instance_id, v_item_execution_id, v_item_activity_id, v_item_session_id);
                 END LOOP;
             END IF;
 
             -- Step 9: Delete cancelled activities from worker_queue (lock stealing)
             IF p_cancelled_activities IS NOT NULL AND JSONB_ARRAY_LENGTH(p_cancelled_activities) > 0 THEN
                 FOR v_elem IN SELECT value FROM JSONB_ARRAY_ELEMENTS(p_cancelled_activities) LOOP
                     DELETE FROM SCHEMA.worker_queue
                     WHERE instance_id = v_elem->>'instance'
                       AND execution_id = (v_elem->>'execution_id')::BIGINT
                       AND activity_id = (v_elem->>'activity_id')::BIGINT;
                 END LOOP;
             END IF;
 
             -- Step 10: Enqueue orchestrator items
             IF p_orchestrator_items IS NOT NULL AND JSONB_ARRAY_LENGTH(p_orchestrator_items) > 0 THEN
                 FOR v_elem IN SELECT value FROM JSONB_ARRAY_ELEMENTS(p_orchestrator_items) LOOP
                     IF v_elem ? 'StartOrchestration' THEN
                         v_item_instance_id := v_elem->'StartOrchestration'->>'instance';
                     ELSIF v_elem ? 'ContinueAsNew' THEN
                         v_item_instance_id := v_elem->'ContinueAsNew'->>'instance';
                     ELSIF v_elem ? 'TimerFired' THEN
                         v_item_instance_id := v_elem->'TimerFired'->>'instance';
                         v_fire_at_ms := (v_elem->'TimerFired'->>'fire_at_ms')::BIGINT;
                     ELSIF v_elem ? 'ActivityCompleted' THEN
                         v_item_instance_id := v_elem->'ActivityCompleted'->>'instance';
                     ELSIF v_elem ? 'ActivityFailed' THEN
                         v_item_instance_id := v_elem->'ActivityFailed'->>'instance';
                     ELSIF v_elem ? 'ExternalRaised' THEN
                         v_item_instance_id := v_elem->'ExternalRaised'->>'instance';
                     ELSIF v_elem ? 'CancelInstance' THEN
                         v_item_instance_id := v_elem->'CancelInstance'->>'instance';
                     ELSIF v_elem ? 'SubOrchCompleted' THEN
                         v_item_instance_id := v_elem->'SubOrchCompleted'->>'parent_instance';
                     ELSIF v_elem ? 'SubOrchFailed' THEN
                         v_item_instance_id := v_elem->'SubOrchFailed'->>'parent_instance';
+                    ELSIF v_elem ? 'QueueMessage' THEN
+                        v_item_instance_id := v_elem->'QueueMessage'->>'instance';
                     ELSE
                         v_item_instance_id := v_instance_id;
                     END IF;
 
                     IF v_elem ? 'TimerFired' AND v_fire_at_ms IS NOT NULL AND v_fire_at_ms > 0 THEN
                         v_visible_at := TO_TIMESTAMP(v_fire_at_ms / 1000.0);
                     ELSE
                         v_visible_at := v_now_ts;
                     END IF;
 
                     INSERT INTO SCHEMA.orchestrator_queue (instance_id, work_item, visible_at, created_at)
                     VALUES (v_item_instance_id, v_elem::TEXT, v_visible_at, v_now_ts);
                     
                     v_fire_at_ms := NULL;
                 END LOOP;
             END IF;
 
             -- Step 11: Delete locked messages
             DELETE FROM SCHEMA.orchestrator_queue q WHERE q.lock_token = p_lock_token;
 
             -- Step 12: Remove instance lock
             DELETE FROM SCHEMA.instance_locks il
             WHERE il.instance_id = v_instance_id AND il.lock_token = p_lock_token;
         END;
         $function$
\ No newline at end of file

```

### `ack_worker` — Body Modified

Full function with diff:
```diff
 CREATE OR REPLACE FUNCTION SCHEMA.ack_worker(p_lock_token text, p_instance_id text DEFAULT NULL::text, p_completion_json text DEFAULT NULL::text, p_now_ms bigint DEFAULT NULL::bigint)
  RETURNS void
  LANGUAGE plpgsql
 AS $function$
         DECLARE
             v_rows_affected INTEGER;
             v_now_ts TIMESTAMPTZ;
             v_session_id TEXT;
         BEGIN
             -- Capture session_id before deleting
             SELECT session_id INTO v_session_id
             FROM SCHEMA.worker_queue WHERE lock_token = p_lock_token;
 
-            -- Delete the worker queue item
-            DELETE FROM SCHEMA.worker_queue WHERE lock_token = p_lock_token;
+            -- Delete the worker queue item, only if lock is still valid
+            IF p_now_ms IS NOT NULL THEN
+                DELETE FROM SCHEMA.worker_queue WHERE lock_token = p_lock_token AND locked_until > p_now_ms;
+            ELSE
+                DELETE FROM SCHEMA.worker_queue WHERE lock_token = p_lock_token;
+            END IF;
             GET DIAGNOSTICS v_rows_affected = ROW_COUNT;
 
             IF v_rows_affected = 0 THEN
                 RAISE EXCEPTION 'Worker queue item not found or already processed';
             END IF;
 
             -- Only enqueue completion if provided (NULL means cancelled activity)
             IF p_completion_json IS NOT NULL THEN
                 -- Validate required parameters for completion
                 IF p_instance_id IS NULL THEN
                     RAISE EXCEPTION 'p_instance_id is required when p_completion_json is provided';
                 END IF;
                 IF p_now_ms IS NULL THEN
                     RAISE EXCEPTION 'p_now_ms is required when p_completion_json is provided';
                 END IF;
                 
                 v_now_ts := TO_TIMESTAMP(p_now_ms / 1000.0);
                 INSERT INTO SCHEMA.orchestrator_queue (instance_id, work_item, visible_at, created_at)
                 VALUES (p_instance_id, p_completion_json, v_now_ts, v_now_ts);
             END IF;
 
             -- Piggyback: update session last_activity_at
             IF v_session_id IS NOT NULL AND p_now_ms IS NOT NULL THEN
                 UPDATE SCHEMA.sessions SET last_activity_at = p_now_ms
                 WHERE session_id = v_session_id AND locked_until > p_now_ms;
             END IF;
         END;
         $function$
\ No newline at end of file

```

### `cleanup_schema` — Body Modified

Full function with diff:
```diff
 CREATE OR REPLACE FUNCTION SCHEMA.cleanup_schema()
  RETURNS void
  LANGUAGE plpgsql
 AS $function$
         BEGIN
             -- Drop tables first
             DROP TABLE IF EXISTS SCHEMA.sessions CASCADE;
             DROP TABLE IF EXISTS SCHEMA.instances CASCADE;
             DROP TABLE IF EXISTS SCHEMA.executions CASCADE;
             DROP TABLE IF EXISTS SCHEMA.history CASCADE;
             DROP TABLE IF EXISTS SCHEMA.orchestrator_queue CASCADE;
             DROP TABLE IF EXISTS SCHEMA.worker_queue CASCADE;
             DROP TABLE IF EXISTS SCHEMA.instance_locks CASCADE;
             DROP TABLE IF EXISTS SCHEMA._duroxide_migrations CASCADE;
             
             -- Drop all stored procedures
             DROP FUNCTION IF EXISTS SCHEMA.cleanup_schema();
             DROP FUNCTION IF EXISTS SCHEMA.list_instances();
             DROP FUNCTION IF EXISTS SCHEMA.list_executions(TEXT);
             DROP FUNCTION IF EXISTS SCHEMA.latest_execution_id(TEXT);
             DROP FUNCTION IF EXISTS SCHEMA.list_instances_by_status(TEXT);
             DROP FUNCTION IF EXISTS SCHEMA.get_instance_info(TEXT);
             DROP FUNCTION IF EXISTS SCHEMA.get_execution_info(TEXT, BIGINT);
             DROP FUNCTION IF EXISTS SCHEMA.get_system_metrics();
             DROP FUNCTION IF EXISTS SCHEMA.get_queue_depths(BIGINT);
             DROP FUNCTION IF EXISTS SCHEMA.enqueue_worker_work(TEXT, BIGINT, TEXT, BIGINT, BIGINT, TEXT);
             DROP FUNCTION IF EXISTS SCHEMA.ack_worker(TEXT, TEXT, TEXT, BIGINT);
             DROP FUNCTION IF EXISTS SCHEMA.renew_work_item_lock(TEXT, BIGINT, BIGINT);
             DROP FUNCTION IF EXISTS SCHEMA.fetch_work_item(BIGINT, BIGINT, TEXT, BIGINT);
             DROP FUNCTION IF EXISTS SCHEMA.abandon_work_item(TEXT, BIGINT, BIGINT, BOOLEAN);
             DROP FUNCTION IF EXISTS SCHEMA.enqueue_orchestrator_work(TEXT, TEXT, TIMESTAMPTZ, TEXT, TEXT, BIGINT);
             DROP FUNCTION IF EXISTS SCHEMA.fetch_orchestration_item(BIGINT, BIGINT);
             DROP FUNCTION IF EXISTS SCHEMA.fetch_orchestration_item(BIGINT, BIGINT, BIGINT, BIGINT);
             DROP FUNCTION IF EXISTS SCHEMA.ack_orchestration_item(TEXT, BIGINT, BIGINT, JSONB, JSONB, JSONB, JSONB, JSONB);
             DROP FUNCTION IF EXISTS SCHEMA.abandon_orchestration_item(TEXT, BIGINT, BIGINT, BOOLEAN);
             DROP FUNCTION IF EXISTS SCHEMA.renew_orchestration_item_lock(TEXT, BIGINT, BIGINT);
             DROP FUNCTION IF EXISTS SCHEMA.fetch_history(TEXT);
             DROP FUNCTION IF EXISTS SCHEMA.fetch_history_with_execution(TEXT, BIGINT);
             DROP FUNCTION IF EXISTS SCHEMA.append_history(TEXT, BIGINT, JSONB);
             DROP FUNCTION IF EXISTS SCHEMA.list_children(TEXT);
             DROP FUNCTION IF EXISTS SCHEMA.get_parent_id(TEXT);
             DROP FUNCTION IF EXISTS SCHEMA.delete_instances_atomic(TEXT[], BOOLEAN);
             DROP FUNCTION IF EXISTS SCHEMA.prune_executions(TEXT, INTEGER, BIGINT);
             DROP FUNCTION IF EXISTS SCHEMA.renew_session_lock(TEXT[], BIGINT, BIGINT, BIGINT);
             DROP FUNCTION IF EXISTS SCHEMA.cleanup_orphaned_sessions(BIGINT);
+            DROP FUNCTION IF EXISTS SCHEMA.get_custom_status(TEXT, BIGINT);
         END;
         $function$
\ No newline at end of file

```

### `get_custom_status` — NEW
```sql
CREATE OR REPLACE FUNCTION SCHEMA.get_custom_status(p_instance_id text, p_last_seen_version bigint)
 RETURNS TABLE(out_custom_status text, out_custom_status_version bigint)
 LANGUAGE plpgsql
AS $function$
        BEGIN
            RETURN QUERY
            SELECT i.custom_status, i.custom_status_version::BIGINT
            FROM SCHEMA.instances i
            WHERE i.instance_id = p_instance_id
              AND i.custom_status_version > p_last_seen_version;
        END;
        $function$
```

---