duroxide-pg 0.1.10

A PostgreSQL-based provider implementation for Duroxide, a durable task orchestration framework
Documentation
# Diff for migration 0009

**Migration file:** `0009_add_activity_cancellation_support.sql`

This diff was auto-generated by comparing schema state before and after applying migration 0009.

## Schema Diff

```diff
--- /var/folders/70/8cgb9l_d3pgff2w_d5bqb2jm0000gn/T/tmp.BrjC49PaR2	2026-01-02 21:29:19
+++ /var/folders/70/8cgb9l_d3pgff2w_d5bqb2jm0000gn/T/tmp.9MqhL7xvS5	2026-01-02 21:29:19
@@ -5,7 +5,7 @@
 CREATE TABLE instance_locks (instance_id TEXT NOT NULL, lock_token TEXT NOT NULL, locked_until BIGINT NOT NULL, locked_at BIGINT NOT NULL);
 CREATE TABLE instances (instance_id TEXT NOT NULL, orchestration_name TEXT NOT NULL, orchestration_version TEXT, current_execution_id BIGINT NOT NULL DEFAULT 1, created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP);
 CREATE TABLE orchestrator_queue (id BIGINT NOT NULL DEFAULT nextval('SCHEMA.orchestrator_queue_id_seq'::regclass), instance_id TEXT NOT NULL, work_item TEXT NOT NULL, visible_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, lock_token TEXT, locked_until BIGINT, created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, attempt_count INTEGER NOT NULL DEFAULT 0);
-CREATE TABLE worker_queue (id BIGINT NOT NULL DEFAULT nextval('SCHEMA.worker_queue_id_seq'::regclass), work_item TEXT NOT NULL, visible_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, lock_token TEXT, locked_until BIGINT, created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, attempt_count INTEGER NOT NULL DEFAULT 0);
+CREATE TABLE worker_queue (id BIGINT NOT NULL DEFAULT nextval('SCHEMA.worker_queue_id_seq'::regclass), work_item TEXT NOT NULL, visible_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, lock_token TEXT, locked_until BIGINT, created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, attempt_count INTEGER NOT NULL DEFAULT 0, instance_id TEXT, execution_id BIGINT, activity_id BIGINT);
 
 -- INDEXES
 CREATE INDEX idx_history_lookup ON SCHEMA.history USING btree (instance_id, execution_id, event_id);
@@ -13,6 +13,7 @@
 CREATE INDEX idx_orch_instance ON SCHEMA.orchestrator_queue USING btree (instance_id);
 CREATE INDEX idx_orch_lock ON SCHEMA.orchestrator_queue USING btree (lock_token);
 CREATE INDEX idx_orch_visible ON SCHEMA.orchestrator_queue USING btree (visible_at, lock_token);
+CREATE INDEX idx_worker_activity_lookup ON SCHEMA.worker_queue USING btree (instance_id, execution_id, activity_id);
 CREATE INDEX idx_worker_available ON SCHEMA.worker_queue USING btree (lock_token, id);
 CREATE INDEX idx_worker_visible ON SCHEMA.worker_queue USING btree (visible_at, lock_token);
 
@@ -214,7 +215,7 @@
         END;
         $function$
 ;
-CREATE OR REPLACE FUNCTION SCHEMA.ack_orchestration_item(p_lock_token text, p_execution_id bigint, p_history_delta jsonb, p_worker_items jsonb, p_orchestrator_items jsonb, p_metadata jsonb)
+CREATE OR REPLACE FUNCTION SCHEMA.ack_orchestration_item(p_lock_token text, p_execution_id bigint, p_history_delta jsonb, p_worker_items jsonb, p_orchestrator_items jsonb, p_metadata jsonb, p_cancelled_activities jsonb DEFAULT '[]'::jsonb)
  RETURNS void
  LANGUAGE plpgsql
 AS $function$
@@ -230,8 +231,14 @@
             v_visible_at TIMESTAMPTZ;
             v_fire_at_ms BIGINT;
             v_item_instance_id TEXT;
+            v_item_execution_id BIGINT;
+            v_item_activity_id BIGINT;
+            v_now_ts TIMESTAMPTZ;
         BEGIN
+            -- NOTE: v_now_ms is computed from database time only for lock validation
+            -- All timestamps stored in tables use v_now_ts derived from this
             v_now_ms := (EXTRACT(EPOCH FROM NOW()) * 1000)::BIGINT;
+            v_now_ts := TO_TIMESTAMP(v_now_ms / 1000.0);
 
             -- Step 1: Validate lock token
             SELECT il.instance_id INTO v_instance_id
@@ -262,7 +269,7 @@
 
             -- Step 4: Create execution record (idempotent)
             INSERT INTO SCHEMA.executions (instance_id, execution_id, status, started_at)
-            VALUES (v_instance_id, p_execution_id, 'Running', NOW())
+            VALUES (v_instance_id, p_execution_id, 'Running', v_now_ts)
             ON CONFLICT (instance_id, execution_id) DO NOTHING;
 
             -- Step 5: Update instance current_execution_id
@@ -285,7 +292,7 @@
             -- Step 7: Update execution status if provided
             IF v_status IS NOT NULL THEN
                 v_completed_at := CASE 
-                    WHEN v_status IN ('Completed', 'Failed') THEN NOW() 
+                    WHEN v_status IN ('Completed', 'Failed') THEN v_now_ts 
                     ELSE NULL 
                 END;
                 
@@ -294,14 +301,37 @@
                 WHERE e.instance_id = v_instance_id AND e.execution_id = p_execution_id;
             END IF;
 
-            -- Step 8: Enqueue worker items (batch)
+            -- Step 8: Delete cancelled activities from worker_queue (lock stealing)
+            -- This removes activities that were scheduled but not yet started/completed
+            IF p_cancelled_activities IS NOT NULL AND JSONB_ARRAY_LENGTH(p_cancelled_activities) > 0 THEN
+                FOR v_elem IN SELECT value FROM JSONB_ARRAY_ELEMENTS(p_cancelled_activities) LOOP
+                    DELETE FROM SCHEMA.worker_queue
+                    WHERE instance_id = v_elem->>'instance'
+                      AND execution_id = (v_elem->>'execution_id')::BIGINT
+                      AND activity_id = (v_elem->>'activity_id')::BIGINT;
+                END LOOP;
+            END IF;
+
+            -- Step 9: Enqueue worker items (batch) - now with activity identification
             IF p_worker_items IS NOT NULL AND JSONB_ARRAY_LENGTH(p_worker_items) > 0 THEN
-                INSERT INTO SCHEMA.worker_queue (work_item, created_at)
-                SELECT elem::TEXT, NOW()
-                FROM JSONB_ARRAY_ELEMENTS(p_worker_items) AS elem;
+                FOR v_elem IN SELECT value FROM JSONB_ARRAY_ELEMENTS(p_worker_items) LOOP
+                    -- Extract activity identification from ActivityExecute work item
+                    IF v_elem ? 'ActivityExecute' THEN
+                        v_item_instance_id := v_elem->'ActivityExecute'->>'instance';
+                        v_item_execution_id := (v_elem->'ActivityExecute'->>'execution_id')::BIGINT;
+                        v_item_activity_id := (v_elem->'ActivityExecute'->>'id')::BIGINT;
+                    ELSE
+                        v_item_instance_id := NULL;
+                        v_item_execution_id := NULL;
+                        v_item_activity_id := NULL;
+                    END IF;
+
+                    INSERT INTO SCHEMA.worker_queue (work_item, visible_at, created_at, instance_id, execution_id, activity_id)
+                    VALUES (v_elem::TEXT, v_now_ts, v_now_ts, v_item_instance_id, v_item_execution_id, v_item_activity_id);
+                END LOOP;
             END IF;
 
-            -- Step 9: Enqueue orchestrator items (batch with instance extraction and visible_at handling)
+            -- Step 10: Enqueue orchestrator items (batch with instance extraction and visible_at handling)
             IF p_orchestrator_items IS NOT NULL AND JSONB_ARRAY_LENGTH(p_orchestrator_items) > 0 THEN
                 FOR v_elem IN SELECT value FROM JSONB_ARRAY_ELEMENTS(p_orchestrator_items) LOOP
                     -- Extract instance_id from work item based on type
@@ -332,20 +362,20 @@
                     IF v_elem ? 'TimerFired' AND v_fire_at_ms IS NOT NULL AND v_fire_at_ms > 0 THEN
                         v_visible_at := TO_TIMESTAMP(v_fire_at_ms / 1000.0);
                     ELSE
-                        v_visible_at := NOW();
+                        v_visible_at := v_now_ts;
                     END IF;
 
                     INSERT INTO SCHEMA.orchestrator_queue (instance_id, work_item, visible_at, created_at)
-                    VALUES (v_item_instance_id, v_elem::TEXT, v_visible_at, NOW());
+                    VALUES (v_item_instance_id, v_elem::TEXT, v_visible_at, v_now_ts);
                     
                     v_fire_at_ms := NULL; -- Reset for next iteration
                 END LOOP;
             END IF;
 
-            -- Step 10: Delete locked messages
+            -- Step 11: Delete locked messages
             DELETE FROM SCHEMA.orchestrator_queue q WHERE q.lock_token = p_lock_token;
 
-            -- Step 11: Remove instance lock
+            -- Step 12: Remove instance lock
             DELETE FROM SCHEMA.instance_locks il
             WHERE il.instance_id = v_instance_id AND il.lock_token = p_lock_token;
         END;
@@ -483,7 +513,7 @@
         END;
         $function$
 ;
-CREATE OR REPLACE FUNCTION SCHEMA.enqueue_worker_work(p_work_item text, p_now_ms bigint)
+CREATE OR REPLACE FUNCTION SCHEMA.enqueue_worker_work(p_work_item text, p_now_ms bigint, p_instance_id text DEFAULT NULL::text, p_execution_id bigint DEFAULT NULL::bigint, p_activity_id bigint DEFAULT NULL::bigint)
  RETURNS void
  LANGUAGE plpgsql
 AS $function$
@@ -491,8 +521,8 @@
             v_now_ts TIMESTAMPTZ;
         BEGIN
             v_now_ts := TO_TIMESTAMP(p_now_ms / 1000.0);
-            INSERT INTO SCHEMA.worker_queue (work_item, visible_at, created_at)
-            VALUES (p_work_item, v_now_ts, v_now_ts);
+            INSERT INTO SCHEMA.worker_queue (work_item, visible_at, created_at, instance_id, execution_id, activity_id)
+            VALUES (p_work_item, v_now_ts, v_now_ts, p_instance_id, p_execution_id, p_activity_id);
         END;
         $function$
 ;
```

---
*Generated on 2026-01-02 14:29:20 UTC*