# Diff for migration 0010
**Migration file:** `0010_add_deletion_and_pruning_support.sql`
This diff was auto-generated by comparing schema state before and after applying migration 0010.
## Schema Diff
```diff
--- /var/folders/70/8cgb9l_d3pgff2w_d5bqb2jm0000gn/T/tmp.yrvaEJVUPt 2026-01-05 11:28:09
+++ /var/folders/70/8cgb9l_d3pgff2w_d5bqb2jm0000gn/T/tmp.NvKogMq3Te 2026-01-05 11:28:09
@@ -3,13 +3,14 @@
CREATE TABLE executions (instance_id TEXT NOT NULL, execution_id BIGINT NOT NULL, status TEXT NOT NULL DEFAULT 'Running'::text, output TEXT, started_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, completed_at TIMESTAMP WITH TIME ZONE);
CREATE TABLE history (instance_id TEXT NOT NULL, execution_id BIGINT NOT NULL, event_id BIGINT NOT NULL, event_type TEXT NOT NULL, event_data TEXT NOT NULL, created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP);
CREATE TABLE instance_locks (instance_id TEXT NOT NULL, lock_token TEXT NOT NULL, locked_until BIGINT NOT NULL, locked_at BIGINT NOT NULL);
-CREATE TABLE instances (instance_id TEXT NOT NULL, orchestration_name TEXT NOT NULL, orchestration_version TEXT, current_execution_id BIGINT NOT NULL DEFAULT 1, created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP);
+CREATE TABLE instances (instance_id TEXT NOT NULL, orchestration_name TEXT NOT NULL, orchestration_version TEXT, current_execution_id BIGINT NOT NULL DEFAULT 1, created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, parent_instance_id TEXT);
CREATE TABLE orchestrator_queue (id BIGINT NOT NULL DEFAULT nextval('SCHEMA.orchestrator_queue_id_seq'::regclass), instance_id TEXT NOT NULL, work_item TEXT NOT NULL, visible_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, lock_token TEXT, locked_until BIGINT, created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, attempt_count INTEGER NOT NULL DEFAULT 0);
CREATE TABLE worker_queue (id BIGINT NOT NULL DEFAULT nextval('SCHEMA.worker_queue_id_seq'::regclass), work_item TEXT NOT NULL, visible_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, lock_token TEXT, locked_until BIGINT, created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, attempt_count INTEGER NOT NULL DEFAULT 0, instance_id TEXT, execution_id BIGINT, activity_id BIGINT);
-- INDEXES
CREATE INDEX idx_history_lookup ON SCHEMA.history USING btree (instance_id, execution_id, event_id);
CREATE INDEX idx_instance_locks_locked_until ON SCHEMA.instance_locks USING btree (locked_until);
+CREATE INDEX idx_instances_parent ON SCHEMA.instances USING btree (parent_instance_id);
CREATE INDEX idx_orch_instance ON SCHEMA.orchestrator_queue USING btree (instance_id);
CREATE INDEX idx_orch_lock ON SCHEMA.orchestrator_queue USING btree (lock_token);
CREATE INDEX idx_orch_visible ON SCHEMA.orchestrator_queue USING btree (visible_at, lock_token);
@@ -224,6 +225,7 @@
v_now_ms BIGINT;
v_orchestration_name TEXT;
v_orchestration_version TEXT;
+ v_parent_instance_id TEXT;
v_status TEXT;
v_output TEXT;
v_completed_at TIMESTAMPTZ;
@@ -249,21 +251,24 @@
RAISE EXCEPTION 'Invalid lock token';
END IF;
- -- Step 2: Extract metadata from JSONB
+ -- Step 2: Extract metadata from JSONB (including parent_instance_id)
v_orchestration_name := p_metadata->>'orchestration_name';
v_orchestration_version := p_metadata->>'orchestration_version';
+ v_parent_instance_id := p_metadata->>'parent_instance_id';
v_status := p_metadata->>'status';
v_output := p_metadata->>'output';
- -- Step 3: Create or update instance metadata
+ -- Step 3: Create or update instance metadata (including parent_instance_id)
IF v_orchestration_name IS NOT NULL AND v_orchestration_version IS NOT NULL THEN
- INSERT INTO SCHEMA.instances (instance_id, orchestration_name, orchestration_version, current_execution_id)
- VALUES (v_instance_id, v_orchestration_name, v_orchestration_version, p_execution_id)
+ INSERT INTO SCHEMA.instances (instance_id, orchestration_name, orchestration_version, current_execution_id, parent_instance_id)
+ VALUES (v_instance_id, v_orchestration_name, v_orchestration_version, p_execution_id, v_parent_instance_id)
ON CONFLICT (instance_id) DO NOTHING;
+ -- Update existing instance (but only set parent_instance_id if it was NULL)
UPDATE SCHEMA.instances i
SET orchestration_name = v_orchestration_name,
- orchestration_version = v_orchestration_version
+ orchestration_version = v_orchestration_version,
+ parent_instance_id = COALESCE(i.parent_instance_id, v_parent_instance_id)
WHERE i.instance_id = v_instance_id;
END IF;
@@ -455,6 +460,93 @@
DROP TABLE IF EXISTS SCHEMA.worker_queue CASCADE;
DROP TABLE IF EXISTS SCHEMA.instance_locks CASCADE;
DROP TABLE IF EXISTS SCHEMA._duroxide_migrations CASCADE;
+ END;
+ $function$
+;
+CREATE OR REPLACE FUNCTION SCHEMA.delete_instances_atomic(p_instance_ids text[], p_force boolean)
+ RETURNS TABLE(instances_deleted bigint, executions_deleted bigint, events_deleted bigint, queue_messages_deleted bigint)
+ LANGUAGE plpgsql
+AS $function$
+ DECLARE
+ v_instance_id TEXT;
+ v_orphan_id TEXT;
+ v_instances_deleted BIGINT := 0;
+ v_executions_deleted BIGINT := 0;
+ v_events_deleted BIGINT := 0;
+ v_queue_deleted BIGINT := 0;
+ v_count BIGINT;
+ BEGIN
+ -- Check for empty input
+ IF p_instance_ids IS NULL OR array_length(p_instance_ids, 1) IS NULL THEN
+ instances_deleted := 0;
+ executions_deleted := 0;
+ events_deleted := 0;
+ queue_messages_deleted := 0;
+ RETURN NEXT;
+ RETURN;
+ END IF;
+
-- Step 1: If not force, check all instances are terminal (single query, no loop)
IF NOT p_force THEN
SELECT i.instance_id INTO v_instance_id
FROM SCHEMA.instances i
JOIN SCHEMA.executions e ON i.instance_id = e.instance_id
AND i.current_execution_id = e.execution_id
WHERE i.instance_id = ANY(p_instance_ids)
AND e.status = 'Running'
LIMIT 1;
IF v_instance_id IS NOT NULL THEN
RAISE EXCEPTION 'Instance % is Running. Use force=true to delete.', v_instance_id;
END IF;
+ END IF;
+
+ -- Step 2: Lock parent rows to prevent concurrent child creation
+ -- This prevents the race condition where a child could be inserted
+ -- between our orphan check and the actual delete.
+ PERFORM 1 FROM SCHEMA.instances
+ WHERE instance_id = ANY(p_instance_ids)
+ FOR UPDATE;
+
+ -- Step 3: Check for orphans (children not in our delete list)
+ -- Now safe because we hold locks on all parent rows
+ SELECT i.instance_id INTO v_orphan_id
+ FROM SCHEMA.instances i
+ WHERE i.parent_instance_id = ANY(p_instance_ids)
+ AND NOT (i.instance_id = ANY(p_instance_ids))
+ LIMIT 1;
+
+ IF v_orphan_id IS NOT NULL THEN
+ RAISE EXCEPTION 'Orphan detected: instance % has parent in delete list but is not included', v_orphan_id;
+ END IF;
+
+ -- Step 4: Delete from all tables
+ -- Delete history events
+ DELETE FROM SCHEMA.history WHERE instance_id = ANY(p_instance_ids);
+ GET DIAGNOSTICS v_count = ROW_COUNT;
+ v_events_deleted := v_count;
+
+ -- Delete executions
+ DELETE FROM SCHEMA.executions WHERE instance_id = ANY(p_instance_ids);
+ GET DIAGNOSTICS v_count = ROW_COUNT;
+ v_executions_deleted := v_count;
+
+ -- Delete orchestrator queue messages
+ DELETE FROM SCHEMA.orchestrator_queue WHERE instance_id = ANY(p_instance_ids);
+ GET DIAGNOSTICS v_count = ROW_COUNT;
+ v_queue_deleted := v_count;
+
+ -- Delete worker queue messages
+ DELETE FROM SCHEMA.worker_queue WHERE instance_id = ANY(p_instance_ids);
+ GET DIAGNOSTICS v_count = ROW_COUNT;
+ v_queue_deleted := v_queue_deleted + v_count;
+
+ -- Delete instance locks
+ DELETE FROM SCHEMA.instance_locks WHERE instance_id = ANY(p_instance_ids);
+
+ -- Delete instances
+ DELETE FROM SCHEMA.instances WHERE instance_id = ANY(p_instance_ids);
+ GET DIAGNOSTICS v_count = ROW_COUNT;
+ v_instances_deleted := v_count;
+
+ instances_deleted := v_instances_deleted;
+ executions_deleted := v_executions_deleted;
+ events_deleted := v_events_deleted;
+ queue_messages_deleted := v_queue_deleted;
+ RETURN NEXT;
END;
$function$
;
@@ -753,7 +845,7 @@
$function$
;
CREATE OR REPLACE FUNCTION SCHEMA.get_instance_info(p_instance_id text)
- RETURNS TABLE(instance_id text, orchestration_name text, orchestration_version text, current_execution_id bigint, created_at timestamp with time zone, updated_at timestamp with time zone, status text, output text)
+ RETURNS TABLE(instance_id text, orchestration_name text, orchestration_version text, current_execution_id bigint, created_at timestamp with time zone, updated_at timestamp with time zone, status text, output text, parent_instance_id text)
LANGUAGE plpgsql
AS $function$
BEGIN
@@ -761,7 +853,7 @@
SELECT i.instance_id, i.orchestration_name,
COALESCE(i.orchestration_version, 'unknown') as orchestration_version,
i.current_execution_id, i.created_at, i.updated_at,
- e.status, e.output
+ e.status, e.output, i.parent_instance_id
FROM SCHEMA.instances i
LEFT JOIN SCHEMA.executions e ON i.instance_id = e.instance_id
AND i.current_execution_id = e.execution_id
@@ -769,6 +861,26 @@
END;
$function$
;
+CREATE OR REPLACE FUNCTION SCHEMA.get_parent_id(p_instance_id text)
+ RETURNS text
+ LANGUAGE plpgsql
+AS $function$
+ DECLARE
+ v_parent_id TEXT;
+ BEGIN
+ SELECT i.parent_instance_id
+ INTO v_parent_id
+ FROM SCHEMA.instances i
+ WHERE i.instance_id = p_instance_id;
+
+ IF NOT FOUND THEN
+ RAISE EXCEPTION 'Instance not found: %', p_instance_id;
+ END IF;
+
+ RETURN v_parent_id;
+ END;
+ $function$
+;
CREATE OR REPLACE FUNCTION SCHEMA.get_queue_depths(p_now_ms bigint)
RETURNS TABLE(orchestrator_queue bigint, worker_queue bigint)
LANGUAGE plpgsql
@@ -828,6 +940,19 @@
END;
$function$
;
+CREATE OR REPLACE FUNCTION SCHEMA.list_children(p_instance_id text)
+ RETURNS TABLE(child_instance_id text)
+ LANGUAGE plpgsql
+AS $function$
+ BEGIN
+ RETURN QUERY
+ SELECT i.instance_id
+ FROM SCHEMA.instances i
+ WHERE i.parent_instance_id = p_instance_id
+ ORDER BY i.created_at;
+ END;
+ $function$
+;
CREATE OR REPLACE FUNCTION SCHEMA.list_executions(p_instance_id text)
RETURNS TABLE(execution_id bigint)
LANGUAGE plpgsql
@@ -868,6 +993,77 @@
END;
$function$
;
+CREATE OR REPLACE FUNCTION SCHEMA.prune_executions(p_instance_id text, p_keep_last integer DEFAULT NULL::integer, p_completed_before_ms bigint DEFAULT NULL::bigint)
+ RETURNS TABLE(instances_processed bigint, executions_deleted bigint, events_deleted bigint)
+ LANGUAGE plpgsql
+AS $function$
+ DECLARE
+ v_current_execution_id BIGINT;
+ v_executions_deleted BIGINT := 0;
+ v_events_deleted BIGINT := 0;
+ v_count BIGINT;
+ v_exec_ids_to_delete BIGINT[];
+ BEGIN
+ -- Get current execution ID (NEVER delete this)
+ SELECT i.current_execution_id INTO v_current_execution_id
+ FROM SCHEMA.instances i
+ WHERE i.instance_id = p_instance_id;
+
+ IF NOT FOUND THEN
+ -- Instance doesn't exist - raise error
+ RAISE EXCEPTION 'Instance % not found', p_instance_id;
+ END IF;
+
+ -- Build list of executions to delete
+ -- CRITICAL: keep_last semantics - select top N executions INCLUDING current
+ -- None, Some(0), Some(1) are all equivalent (only current remains)
+ SELECT array_agg(e.execution_id) INTO v_exec_ids_to_delete
+ FROM SCHEMA.executions e
+ WHERE e.instance_id = p_instance_id
+ AND e.execution_id != v_current_execution_id -- Never delete current
+ AND e.status != 'Running' -- Never delete running
+ -- Apply completed_before filter if provided
+ AND (p_completed_before_ms IS NULL
+ OR e.completed_at < TO_TIMESTAMP(p_completed_before_ms / 1000.0))
+ -- Apply keep_last filter: exclude top N by execution_id (including current)
+ AND (p_keep_last IS NULL
+ OR e.execution_id NOT IN (
+ SELECT e2.execution_id
+ FROM SCHEMA.executions e2
+ WHERE e2.instance_id = p_instance_id
+ ORDER BY e2.execution_id DESC
+ LIMIT p_keep_last
+ ));
+
+ IF v_exec_ids_to_delete IS NULL OR array_length(v_exec_ids_to_delete, 1) IS NULL THEN
+ instances_processed := 1;
+ executions_deleted := 0;
+ events_deleted := 0;
+ RETURN NEXT;
+ RETURN;
+ END IF;
+
+ -- Delete history for pruned executions
+ DELETE FROM SCHEMA.history h
+ WHERE h.instance_id = p_instance_id
+ AND h.execution_id = ANY(v_exec_ids_to_delete);
+ GET DIAGNOSTICS v_count = ROW_COUNT;
+ v_events_deleted := v_count;
+
+ -- Delete executions
+ DELETE FROM SCHEMA.executions e
+ WHERE e.instance_id = p_instance_id
+ AND e.execution_id = ANY(v_exec_ids_to_delete);
+ GET DIAGNOSTICS v_count = ROW_COUNT;
+ v_executions_deleted := v_count;
+
+ instances_processed := 1;
+ executions_deleted := v_executions_deleted;
+ events_deleted := v_events_deleted;
+ RETURN NEXT;
+ END;
+ $function$
+;
CREATE OR REPLACE FUNCTION SCHEMA.renew_orchestration_item_lock(p_lock_token text, p_now_ms bigint, p_extend_secs bigint)
RETURNS void
LANGUAGE plpgsql
```
---
*Generated on 2026-01-05 04:28:09 UTC*