# Diff for migration 0009
**Migration file:** `0009_add_activity_cancellation_support.sql`
This diff was auto-generated by comparing schema state before and after applying migration 0009.
## Schema Diff
```diff
--- /var/folders/70/8cgb9l_d3pgff2w_d5bqb2jm0000gn/T/tmp.BrjC49PaR2 2026-01-02 21:29:19
+++ /var/folders/70/8cgb9l_d3pgff2w_d5bqb2jm0000gn/T/tmp.9MqhL7xvS5 2026-01-02 21:29:19
@@ -5,7 +5,7 @@
CREATE TABLE instance_locks (instance_id TEXT NOT NULL, lock_token TEXT NOT NULL, locked_until BIGINT NOT NULL, locked_at BIGINT NOT NULL);
CREATE TABLE instances (instance_id TEXT NOT NULL, orchestration_name TEXT NOT NULL, orchestration_version TEXT, current_execution_id BIGINT NOT NULL DEFAULT 1, created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP);
CREATE TABLE orchestrator_queue (id BIGINT NOT NULL DEFAULT nextval('SCHEMA.orchestrator_queue_id_seq'::regclass), instance_id TEXT NOT NULL, work_item TEXT NOT NULL, visible_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, lock_token TEXT, locked_until BIGINT, created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, attempt_count INTEGER NOT NULL DEFAULT 0);
-CREATE TABLE worker_queue (id BIGINT NOT NULL DEFAULT nextval('SCHEMA.worker_queue_id_seq'::regclass), work_item TEXT NOT NULL, visible_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, lock_token TEXT, locked_until BIGINT, created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, attempt_count INTEGER NOT NULL DEFAULT 0);
+CREATE TABLE worker_queue (id BIGINT NOT NULL DEFAULT nextval('SCHEMA.worker_queue_id_seq'::regclass), work_item TEXT NOT NULL, visible_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, lock_token TEXT, locked_until BIGINT, created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, attempt_count INTEGER NOT NULL DEFAULT 0, instance_id TEXT, execution_id BIGINT, activity_id BIGINT);
-- INDEXES
CREATE INDEX idx_history_lookup ON SCHEMA.history USING btree (instance_id, execution_id, event_id);
@@ -13,6 +13,7 @@
CREATE INDEX idx_orch_instance ON SCHEMA.orchestrator_queue USING btree (instance_id);
CREATE INDEX idx_orch_lock ON SCHEMA.orchestrator_queue USING btree (lock_token);
CREATE INDEX idx_orch_visible ON SCHEMA.orchestrator_queue USING btree (visible_at, lock_token);
+CREATE INDEX idx_worker_activity_lookup ON SCHEMA.worker_queue USING btree (instance_id, execution_id, activity_id);
CREATE INDEX idx_worker_available ON SCHEMA.worker_queue USING btree (lock_token, id);
CREATE INDEX idx_worker_visible ON SCHEMA.worker_queue USING btree (visible_at, lock_token);
@@ -214,7 +215,7 @@
END;
$function$
;
-CREATE OR REPLACE FUNCTION SCHEMA.ack_orchestration_item(p_lock_token text, p_execution_id bigint, p_history_delta jsonb, p_worker_items jsonb, p_orchestrator_items jsonb, p_metadata jsonb)
+CREATE OR REPLACE FUNCTION SCHEMA.ack_orchestration_item(p_lock_token text, p_execution_id bigint, p_history_delta jsonb, p_worker_items jsonb, p_orchestrator_items jsonb, p_metadata jsonb, p_cancelled_activities jsonb DEFAULT '[]'::jsonb)
RETURNS void
LANGUAGE plpgsql
AS $function$
@@ -230,8 +231,14 @@
v_visible_at TIMESTAMPTZ;
v_fire_at_ms BIGINT;
v_item_instance_id TEXT;
+ v_item_execution_id BIGINT;
+ v_item_activity_id BIGINT;
+ v_now_ts TIMESTAMPTZ;
BEGIN
+ -- NOTE: v_now_ms is computed from database time only for lock validation
+ -- All timestamps stored in tables use v_now_ts derived from this
v_now_ms := (EXTRACT(EPOCH FROM NOW()) * 1000)::BIGINT;
+ v_now_ts := TO_TIMESTAMP(v_now_ms / 1000.0);
-- Step 1: Validate lock token
SELECT il.instance_id INTO v_instance_id
@@ -262,7 +269,7 @@
-- Step 4: Create execution record (idempotent)
INSERT INTO SCHEMA.executions (instance_id, execution_id, status, started_at)
- VALUES (v_instance_id, p_execution_id, 'Running', NOW())
+ VALUES (v_instance_id, p_execution_id, 'Running', v_now_ts)
ON CONFLICT (instance_id, execution_id) DO NOTHING;
-- Step 5: Update instance current_execution_id
@@ -285,7 +292,7 @@
-- Step 7: Update execution status if provided
IF v_status IS NOT NULL THEN
v_completed_at := CASE
- WHEN v_status IN ('Completed', 'Failed') THEN NOW()
+ WHEN v_status IN ('Completed', 'Failed') THEN v_now_ts
ELSE NULL
END;
@@ -294,14 +301,37 @@
WHERE e.instance_id = v_instance_id AND e.execution_id = p_execution_id;
END IF;
- -- Step 8: Enqueue worker items (batch)
+ -- Step 8: Delete cancelled activities from worker_queue (lock stealing)
+ -- This removes activities that were scheduled but not yet started/completed
+ IF p_cancelled_activities IS NOT NULL AND JSONB_ARRAY_LENGTH(p_cancelled_activities) > 0 THEN
+ FOR v_elem IN SELECT value FROM JSONB_ARRAY_ELEMENTS(p_cancelled_activities) LOOP
+ DELETE FROM SCHEMA.worker_queue
+ WHERE instance_id = v_elem->>'instance'
+ AND execution_id = (v_elem->>'execution_id')::BIGINT
+ AND activity_id = (v_elem->>'activity_id')::BIGINT;
+ END LOOP;
+ END IF;
+
+ -- Step 9: Enqueue worker items (batch) - now with activity identification
IF p_worker_items IS NOT NULL AND JSONB_ARRAY_LENGTH(p_worker_items) > 0 THEN
- INSERT INTO SCHEMA.worker_queue (work_item, created_at)
- SELECT elem::TEXT, NOW()
- FROM JSONB_ARRAY_ELEMENTS(p_worker_items) AS elem;
+ FOR v_elem IN SELECT value FROM JSONB_ARRAY_ELEMENTS(p_worker_items) LOOP
+ -- Extract activity identification from ActivityExecute work item
+ IF v_elem ? 'ActivityExecute' THEN
+ v_item_instance_id := v_elem->'ActivityExecute'->>'instance';
+ v_item_execution_id := (v_elem->'ActivityExecute'->>'execution_id')::BIGINT;
+ v_item_activity_id := (v_elem->'ActivityExecute'->>'id')::BIGINT;
+ ELSE
+ v_item_instance_id := NULL;
+ v_item_execution_id := NULL;
+ v_item_activity_id := NULL;
+ END IF;
+
+ INSERT INTO SCHEMA.worker_queue (work_item, visible_at, created_at, instance_id, execution_id, activity_id)
+ VALUES (v_elem::TEXT, v_now_ts, v_now_ts, v_item_instance_id, v_item_execution_id, v_item_activity_id);
+ END LOOP;
END IF;
- -- Step 9: Enqueue orchestrator items (batch with instance extraction and visible_at handling)
+ -- Step 10: Enqueue orchestrator items (batch with instance extraction and visible_at handling)
IF p_orchestrator_items IS NOT NULL AND JSONB_ARRAY_LENGTH(p_orchestrator_items) > 0 THEN
FOR v_elem IN SELECT value FROM JSONB_ARRAY_ELEMENTS(p_orchestrator_items) LOOP
-- Extract instance_id from work item based on type
@@ -332,20 +362,20 @@
IF v_elem ? 'TimerFired' AND v_fire_at_ms IS NOT NULL AND v_fire_at_ms > 0 THEN
v_visible_at := TO_TIMESTAMP(v_fire_at_ms / 1000.0);
ELSE
- v_visible_at := NOW();
+ v_visible_at := v_now_ts;
END IF;
INSERT INTO SCHEMA.orchestrator_queue (instance_id, work_item, visible_at, created_at)
- VALUES (v_item_instance_id, v_elem::TEXT, v_visible_at, NOW());
+ VALUES (v_item_instance_id, v_elem::TEXT, v_visible_at, v_now_ts);
v_fire_at_ms := NULL; -- Reset for next iteration
END LOOP;
END IF;
- -- Step 10: Delete locked messages
+ -- Step 11: Delete locked messages
DELETE FROM SCHEMA.orchestrator_queue q WHERE q.lock_token = p_lock_token;
- -- Step 11: Remove instance lock
+ -- Step 12: Remove instance lock
DELETE FROM SCHEMA.instance_locks il
WHERE il.instance_id = v_instance_id AND il.lock_token = p_lock_token;
END;
@@ -483,7 +513,7 @@
END;
$function$
;
-CREATE OR REPLACE FUNCTION SCHEMA.enqueue_worker_work(p_work_item text, p_now_ms bigint)
+CREATE OR REPLACE FUNCTION SCHEMA.enqueue_worker_work(p_work_item text, p_now_ms bigint, p_instance_id text DEFAULT NULL::text, p_execution_id bigint DEFAULT NULL::bigint, p_activity_id bigint DEFAULT NULL::bigint)
RETURNS void
LANGUAGE plpgsql
AS $function$
@@ -491,8 +521,8 @@
v_now_ts TIMESTAMPTZ;
BEGIN
v_now_ts := TO_TIMESTAMP(p_now_ms / 1000.0);
- INSERT INTO SCHEMA.worker_queue (work_item, visible_at, created_at)
- VALUES (p_work_item, v_now_ts, v_now_ts);
+ INSERT INTO SCHEMA.worker_queue (work_item, visible_at, created_at, instance_id, execution_id, activity_id)
+ VALUES (p_work_item, v_now_ts, v_now_ts, p_instance_id, p_execution_id, p_activity_id);
END;
$function$
;
```
---
*Generated on 2026-01-02 14:29:20 UTC*