# Diff for migration 0015
**Migration file:** `0015_add_custom_status.sql`
Each changed function is shown **in full** with `+`/`-` markers on changed lines.
## Table Changes
### `instances` — Modified
```diff
instance_id TEXT NOT NULL
orchestration_name TEXT NOT NULL
orchestration_version TEXT
current_execution_id BIGINT NOT NULL DEFAULT 1
created_at TIMESTAMP WITH TIME ZONE NOT NULL
updated_at TIMESTAMP WITH TIME ZONE NOT NULL
parent_instance_id TEXT
+ custom_status TEXT
+ custom_status_version INTEGER NOT NULL DEFAULT 0
```
## Function Changes
### `ack_orchestration_item` — Body Modified
Full function with diff:
```diff
CREATE OR REPLACE FUNCTION SCHEMA.ack_orchestration_item(p_lock_token text, p_now_ms bigint, p_execution_id bigint, p_history_delta jsonb, p_worker_items jsonb, p_orchestrator_items jsonb, p_metadata jsonb, p_cancelled_activities jsonb DEFAULT '[]'::jsonb)
RETURNS void
LANGUAGE plpgsql
AS $function$
DECLARE
v_instance_id TEXT;
v_orchestration_name TEXT;
v_orchestration_version TEXT;
v_parent_instance_id TEXT;
v_status TEXT;
v_output TEXT;
v_completed_at TIMESTAMPTZ;
v_elem JSONB;
v_visible_at TIMESTAMPTZ;
v_fire_at_ms BIGINT;
v_item_instance_id TEXT;
v_item_execution_id BIGINT;
v_item_activity_id BIGINT;
v_item_session_id TEXT;
v_now_ts TIMESTAMPTZ;
+ v_custom_status_action TEXT;
+ v_custom_status_value TEXT;
BEGIN
-- Convert Rust-supplied millisecond timestamp to TIMESTAMPTZ
v_now_ts := TO_TIMESTAMP(p_now_ms / 1000.0);
-- Step 1: Validate lock token
SELECT il.instance_id INTO v_instance_id
FROM SCHEMA.instance_locks il
WHERE il.lock_token = p_lock_token AND il.locked_until > p_now_ms;
IF NOT FOUND THEN
RAISE EXCEPTION 'Invalid lock token';
END IF;
-- Step 2: Extract metadata from JSONB
v_orchestration_name := p_metadata->>'orchestration_name';
v_orchestration_version := p_metadata->>'orchestration_version';
v_parent_instance_id := p_metadata->>'parent_instance_id';
v_status := p_metadata->>'status';
v_output := p_metadata->>'output';
-- Step 3: Create or update instance metadata (with explicit timestamps)
IF v_orchestration_name IS NOT NULL AND v_orchestration_version IS NOT NULL THEN
INSERT INTO SCHEMA.instances (instance_id, orchestration_name, orchestration_version, current_execution_id, parent_instance_id, created_at, updated_at)
VALUES (v_instance_id, v_orchestration_name, v_orchestration_version, p_execution_id, v_parent_instance_id, v_now_ts, v_now_ts)
ON CONFLICT (instance_id) DO NOTHING;
UPDATE SCHEMA.instances i
SET orchestration_name = v_orchestration_name,
orchestration_version = v_orchestration_version,
parent_instance_id = COALESCE(i.parent_instance_id, v_parent_instance_id),
updated_at = v_now_ts
WHERE i.instance_id = v_instance_id;
END IF;
-- Step 4: Create execution record (idempotent)
INSERT INTO SCHEMA.executions (instance_id, execution_id, status, started_at)
VALUES (v_instance_id, p_execution_id, 'Running', v_now_ts)
ON CONFLICT (instance_id, execution_id) DO NOTHING;
-- Step 5: Update instance current_execution_id
UPDATE SCHEMA.instances i
SET current_execution_id = GREATEST(i.current_execution_id, p_execution_id),
updated_at = v_now_ts
WHERE i.instance_id = v_instance_id;
-- Step 6: Append history_delta (batch insert with explicit timestamps)
IF p_history_delta IS NOT NULL AND JSONB_ARRAY_LENGTH(p_history_delta) > 0 THEN
INSERT INTO SCHEMA.history (instance_id, execution_id, event_id, event_type, event_data, created_at)
SELECT
v_instance_id,
p_execution_id,
(elem->>'event_id')::BIGINT,
elem->>'event_type',
elem->>'event_data',
v_now_ts
FROM JSONB_ARRAY_ELEMENTS(p_history_delta) AS elem;
END IF;
-- Step 7: Update execution status if provided
IF v_status IS NOT NULL THEN
v_completed_at := CASE
WHEN v_status IN ('Completed', 'Failed') THEN v_now_ts
ELSE NULL
END;
UPDATE SCHEMA.executions e
SET status = v_status, output = v_output, completed_at = v_completed_at
WHERE e.instance_id = v_instance_id AND e.execution_id = p_execution_id;
END IF;
-- Step 7b: Store pinned duroxide version if provided in metadata
IF p_metadata ? 'pinned_duroxide_version' AND p_metadata->'pinned_duroxide_version' IS NOT NULL
AND p_metadata->>'pinned_duroxide_version' != 'null' THEN
UPDATE SCHEMA.executions
SET duroxide_version_major = (p_metadata->'pinned_duroxide_version'->>'major')::INTEGER,
duroxide_version_minor = (p_metadata->'pinned_duroxide_version'->>'minor')::INTEGER,
duroxide_version_patch = (p_metadata->'pinned_duroxide_version'->>'patch')::INTEGER
WHERE instance_id = v_instance_id AND execution_id = p_execution_id;
END IF;
+ -- Step 7c: Handle custom_status update on instances table
+ v_custom_status_action := p_metadata->>'custom_status_action';
+ IF v_custom_status_action = 'set' THEN
+ v_custom_status_value := p_metadata->>'custom_status_value';
+ UPDATE SCHEMA.instances
+ SET custom_status = v_custom_status_value,
+ custom_status_version = custom_status_version + 1
+ WHERE instance_id = v_instance_id;
+ ELSIF v_custom_status_action = 'clear' THEN
+ UPDATE SCHEMA.instances
+ SET custom_status = NULL,
+ custom_status_version = custom_status_version + 1
+ WHERE instance_id = v_instance_id;
+ END IF;
+
-- Step 8: Enqueue worker items with session_id support
IF p_worker_items IS NOT NULL AND JSONB_ARRAY_LENGTH(p_worker_items) > 0 THEN
FOR v_elem IN SELECT value FROM JSONB_ARRAY_ELEMENTS(p_worker_items) LOOP
IF v_elem ? 'ActivityExecute' THEN
v_item_instance_id := v_elem->'ActivityExecute'->>'instance';
v_item_execution_id := (v_elem->'ActivityExecute'->>'execution_id')::BIGINT;
v_item_activity_id := (v_elem->'ActivityExecute'->>'id')::BIGINT;
v_item_session_id := v_elem->'ActivityExecute'->>'session_id';
ELSE
v_item_instance_id := NULL;
v_item_execution_id := NULL;
v_item_activity_id := NULL;
v_item_session_id := NULL;
END IF;
INSERT INTO SCHEMA.worker_queue (work_item, visible_at, created_at, instance_id, execution_id, activity_id, session_id)
VALUES (v_elem::TEXT, v_now_ts, v_now_ts, v_item_instance_id, v_item_execution_id, v_item_activity_id, v_item_session_id);
END LOOP;
END IF;
-- Step 9: Delete cancelled activities from worker_queue (lock stealing)
IF p_cancelled_activities IS NOT NULL AND JSONB_ARRAY_LENGTH(p_cancelled_activities) > 0 THEN
FOR v_elem IN SELECT value FROM JSONB_ARRAY_ELEMENTS(p_cancelled_activities) LOOP
DELETE FROM SCHEMA.worker_queue
WHERE instance_id = v_elem->>'instance'
AND execution_id = (v_elem->>'execution_id')::BIGINT
AND activity_id = (v_elem->>'activity_id')::BIGINT;
END LOOP;
END IF;
-- Step 10: Enqueue orchestrator items
IF p_orchestrator_items IS NOT NULL AND JSONB_ARRAY_LENGTH(p_orchestrator_items) > 0 THEN
FOR v_elem IN SELECT value FROM JSONB_ARRAY_ELEMENTS(p_orchestrator_items) LOOP
IF v_elem ? 'StartOrchestration' THEN
v_item_instance_id := v_elem->'StartOrchestration'->>'instance';
ELSIF v_elem ? 'ContinueAsNew' THEN
v_item_instance_id := v_elem->'ContinueAsNew'->>'instance';
ELSIF v_elem ? 'TimerFired' THEN
v_item_instance_id := v_elem->'TimerFired'->>'instance';
v_fire_at_ms := (v_elem->'TimerFired'->>'fire_at_ms')::BIGINT;
ELSIF v_elem ? 'ActivityCompleted' THEN
v_item_instance_id := v_elem->'ActivityCompleted'->>'instance';
ELSIF v_elem ? 'ActivityFailed' THEN
v_item_instance_id := v_elem->'ActivityFailed'->>'instance';
ELSIF v_elem ? 'ExternalRaised' THEN
v_item_instance_id := v_elem->'ExternalRaised'->>'instance';
ELSIF v_elem ? 'CancelInstance' THEN
v_item_instance_id := v_elem->'CancelInstance'->>'instance';
ELSIF v_elem ? 'SubOrchCompleted' THEN
v_item_instance_id := v_elem->'SubOrchCompleted'->>'parent_instance';
ELSIF v_elem ? 'SubOrchFailed' THEN
v_item_instance_id := v_elem->'SubOrchFailed'->>'parent_instance';
+ ELSIF v_elem ? 'QueueMessage' THEN
+ v_item_instance_id := v_elem->'QueueMessage'->>'instance';
ELSE
v_item_instance_id := v_instance_id;
END IF;
IF v_elem ? 'TimerFired' AND v_fire_at_ms IS NOT NULL AND v_fire_at_ms > 0 THEN
v_visible_at := TO_TIMESTAMP(v_fire_at_ms / 1000.0);
ELSE
v_visible_at := v_now_ts;
END IF;
INSERT INTO SCHEMA.orchestrator_queue (instance_id, work_item, visible_at, created_at)
VALUES (v_item_instance_id, v_elem::TEXT, v_visible_at, v_now_ts);
v_fire_at_ms := NULL;
END LOOP;
END IF;
-- Step 11: Delete locked messages
DELETE FROM SCHEMA.orchestrator_queue q WHERE q.lock_token = p_lock_token;
-- Step 12: Remove instance lock
DELETE FROM SCHEMA.instance_locks il
WHERE il.instance_id = v_instance_id AND il.lock_token = p_lock_token;
END;
$function$
\ No newline at end of file
```
### `ack_worker` — Body Modified
Full function with diff:
```diff
CREATE OR REPLACE FUNCTION SCHEMA.ack_worker(p_lock_token text, p_instance_id text DEFAULT NULL::text, p_completion_json text DEFAULT NULL::text, p_now_ms bigint DEFAULT NULL::bigint)
RETURNS void
LANGUAGE plpgsql
AS $function$
DECLARE
v_rows_affected INTEGER;
v_now_ts TIMESTAMPTZ;
v_session_id TEXT;
BEGIN
-- Capture session_id before deleting
SELECT session_id INTO v_session_id
FROM SCHEMA.worker_queue WHERE lock_token = p_lock_token;
- -- Delete the worker queue item
- DELETE FROM SCHEMA.worker_queue WHERE lock_token = p_lock_token;
+ -- Delete the worker queue item, only if lock is still valid
+ IF p_now_ms IS NOT NULL THEN
+ DELETE FROM SCHEMA.worker_queue WHERE lock_token = p_lock_token AND locked_until > p_now_ms;
+ ELSE
+ DELETE FROM SCHEMA.worker_queue WHERE lock_token = p_lock_token;
+ END IF;
GET DIAGNOSTICS v_rows_affected = ROW_COUNT;
IF v_rows_affected = 0 THEN
RAISE EXCEPTION 'Worker queue item not found or already processed';
END IF;
-- Only enqueue completion if provided (NULL means cancelled activity)
IF p_completion_json IS NOT NULL THEN
-- Validate required parameters for completion
IF p_instance_id IS NULL THEN
RAISE EXCEPTION 'p_instance_id is required when p_completion_json is provided';
END IF;
IF p_now_ms IS NULL THEN
RAISE EXCEPTION 'p_now_ms is required when p_completion_json is provided';
END IF;
v_now_ts := TO_TIMESTAMP(p_now_ms / 1000.0);
INSERT INTO SCHEMA.orchestrator_queue (instance_id, work_item, visible_at, created_at)
VALUES (p_instance_id, p_completion_json, v_now_ts, v_now_ts);
END IF;
-- Piggyback: update session last_activity_at
IF v_session_id IS NOT NULL AND p_now_ms IS NOT NULL THEN
UPDATE SCHEMA.sessions SET last_activity_at = p_now_ms
WHERE session_id = v_session_id AND locked_until > p_now_ms;
END IF;
END;
$function$
\ No newline at end of file
```
### `cleanup_schema` — Body Modified
Full function with diff:
```diff
CREATE OR REPLACE FUNCTION SCHEMA.cleanup_schema()
RETURNS void
LANGUAGE plpgsql
AS $function$
BEGIN
-- Drop tables first
DROP TABLE IF EXISTS SCHEMA.sessions CASCADE;
DROP TABLE IF EXISTS SCHEMA.instances CASCADE;
DROP TABLE IF EXISTS SCHEMA.executions CASCADE;
DROP TABLE IF EXISTS SCHEMA.history CASCADE;
DROP TABLE IF EXISTS SCHEMA.orchestrator_queue CASCADE;
DROP TABLE IF EXISTS SCHEMA.worker_queue CASCADE;
DROP TABLE IF EXISTS SCHEMA.instance_locks CASCADE;
DROP TABLE IF EXISTS SCHEMA._duroxide_migrations CASCADE;
-- Drop all stored procedures
DROP FUNCTION IF EXISTS SCHEMA.cleanup_schema();
DROP FUNCTION IF EXISTS SCHEMA.list_instances();
DROP FUNCTION IF EXISTS SCHEMA.list_executions(TEXT);
DROP FUNCTION IF EXISTS SCHEMA.latest_execution_id(TEXT);
DROP FUNCTION IF EXISTS SCHEMA.list_instances_by_status(TEXT);
DROP FUNCTION IF EXISTS SCHEMA.get_instance_info(TEXT);
DROP FUNCTION IF EXISTS SCHEMA.get_execution_info(TEXT, BIGINT);
DROP FUNCTION IF EXISTS SCHEMA.get_system_metrics();
DROP FUNCTION IF EXISTS SCHEMA.get_queue_depths(BIGINT);
DROP FUNCTION IF EXISTS SCHEMA.enqueue_worker_work(TEXT, BIGINT, TEXT, BIGINT, BIGINT, TEXT);
DROP FUNCTION IF EXISTS SCHEMA.ack_worker(TEXT, TEXT, TEXT, BIGINT);
DROP FUNCTION IF EXISTS SCHEMA.renew_work_item_lock(TEXT, BIGINT, BIGINT);
DROP FUNCTION IF EXISTS SCHEMA.fetch_work_item(BIGINT, BIGINT, TEXT, BIGINT);
DROP FUNCTION IF EXISTS SCHEMA.abandon_work_item(TEXT, BIGINT, BIGINT, BOOLEAN);
DROP FUNCTION IF EXISTS SCHEMA.enqueue_orchestrator_work(TEXT, TEXT, TIMESTAMPTZ, TEXT, TEXT, BIGINT);
DROP FUNCTION IF EXISTS SCHEMA.fetch_orchestration_item(BIGINT, BIGINT);
DROP FUNCTION IF EXISTS SCHEMA.fetch_orchestration_item(BIGINT, BIGINT, BIGINT, BIGINT);
DROP FUNCTION IF EXISTS SCHEMA.ack_orchestration_item(TEXT, BIGINT, BIGINT, JSONB, JSONB, JSONB, JSONB, JSONB);
DROP FUNCTION IF EXISTS SCHEMA.abandon_orchestration_item(TEXT, BIGINT, BIGINT, BOOLEAN);
DROP FUNCTION IF EXISTS SCHEMA.renew_orchestration_item_lock(TEXT, BIGINT, BIGINT);
DROP FUNCTION IF EXISTS SCHEMA.fetch_history(TEXT);
DROP FUNCTION IF EXISTS SCHEMA.fetch_history_with_execution(TEXT, BIGINT);
DROP FUNCTION IF EXISTS SCHEMA.append_history(TEXT, BIGINT, JSONB);
DROP FUNCTION IF EXISTS SCHEMA.list_children(TEXT);
DROP FUNCTION IF EXISTS SCHEMA.get_parent_id(TEXT);
DROP FUNCTION IF EXISTS SCHEMA.delete_instances_atomic(TEXT[], BOOLEAN);
DROP FUNCTION IF EXISTS SCHEMA.prune_executions(TEXT, INTEGER, BIGINT);
DROP FUNCTION IF EXISTS SCHEMA.renew_session_lock(TEXT[], BIGINT, BIGINT, BIGINT);
DROP FUNCTION IF EXISTS SCHEMA.cleanup_orphaned_sessions(BIGINT);
+ DROP FUNCTION IF EXISTS SCHEMA.get_custom_status(TEXT, BIGINT);
END;
$function$
\ No newline at end of file
```
### `get_custom_status` — NEW
```sql
CREATE OR REPLACE FUNCTION SCHEMA.get_custom_status(p_instance_id text, p_last_seen_version bigint)
RETURNS TABLE(out_custom_status text, out_custom_status_version bigint)
LANGUAGE plpgsql
AS $function$
BEGIN
RETURN QUERY
SELECT i.custom_status, i.custom_status_version::BIGINT
FROM SCHEMA.instances i
WHERE i.instance_id = p_instance_id
AND i.custom_status_version > p_last_seen_version;
END;
$function$
```
---