# Diff for migration 0018
Migration file: `0018_add_kv_store.sql`
## Table Changes
### `kv_store` — NEW
```sql
CREATE TABLE IF NOT EXISTS SCHEMA.kv_store (
instance_id TEXT NOT NULL,
key TEXT NOT NULL,
value TEXT NOT NULL,
execution_id BIGINT NOT NULL,
PRIMARY KEY (instance_id, key)
);
```
## New Indexes
### `idx_kv_store_execution` — NEW
```sql
CREATE INDEX IF NOT EXISTS idx_kv_store_execution ON SCHEMA.kv_store(instance_id, execution_id);
```
## Function Changes
### `fetch_orchestration_item` — body modified (baseline: 0017)
```diff
@@ -12,7 +12,8 @@
out_history JSONB,
out_messages JSONB,
out_lock_token TEXT,
- out_attempt_count INTEGER
+ out_attempt_count INTEGER,
+ out_kv_snapshot JSONB
) AS $fetch_orch$
DECLARE
v_instance_id TEXT;
@@ -23,6 +24,7 @@
v_current_execution_id BIGINT;
v_history JSONB;
v_messages JSONB;
+ v_kv_snapshot JSONB;
v_lock_acquired INTEGER;
v_max_attempt_count INTEGER;
BEGIN
@@ -177,6 +179,12 @@
END IF;
END IF;
+ -- Load KV snapshot for this instance
+ SELECT COALESCE(jsonb_object_agg(ks.key, ks.value), '{}'::jsonb)
+ INTO v_kv_snapshot
+ FROM SCHEMA.kv_store ks
+ WHERE ks.instance_id = v_instance_id;
+
RETURN QUERY SELECT
v_instance_id,
v_orchestration_name,
@@ -185,7 +193,8 @@
v_history,
v_messages,
v_lock_token,
- v_max_attempt_count;
+ v_max_attempt_count,
+ v_kv_snapshot;
RETURN;
END LOOP;
END;
```
### `ack_orchestration_item` — body modified (baseline: 0016)
```diff
@@ -28,6 +28,11 @@
v_now_ts TIMESTAMPTZ;
v_custom_status_action TEXT;
v_custom_status_value TEXT;
+ v_current_execution_id BIGINT;
+ v_kv_mutations JSONB;
+ v_kv_item JSONB;
+ v_kv_action TEXT;
+ v_i INTEGER;
BEGIN
-- Convert Rust-supplied millisecond timestamp to TIMESTAMPTZ
v_now_ts := TO_TIMESTAMP(p_now_ms / 1000.0);
@@ -47,6 +52,7 @@
v_parent_instance_id := p_metadata->>'parent_instance_id';
v_status := p_metadata->>'status';
v_output := p_metadata->>'output';
+ v_current_execution_id := p_execution_id;
-- Step 3: Create or update instance metadata (with explicit timestamps)
IF v_orchestration_name IS NOT NULL AND v_orchestration_version IS NOT NULL THEN
@@ -123,6 +129,27 @@
WHERE instance_id = v_instance_id;
END IF;
+ -- Step 7d: Materialize KV mutations
+ v_kv_mutations := p_metadata->'kv_mutations';
+ IF v_kv_mutations IS NOT NULL AND jsonb_array_length(v_kv_mutations) > 0 THEN
+ FOR v_i IN 0..jsonb_array_length(v_kv_mutations) - 1 LOOP
+ v_kv_item := v_kv_mutations->v_i;
+ v_kv_action := v_kv_item->>'action';
+ IF v_kv_action = 'set' THEN
+ INSERT INTO SCHEMA.kv_store (instance_id, key, value, execution_id)
+ VALUES (v_instance_id, v_kv_item->>'key', v_kv_item->>'value', v_current_execution_id)
+ ON CONFLICT (instance_id, key)
+ DO UPDATE SET value = EXCLUDED.value, execution_id = EXCLUDED.execution_id;
+ ELSIF v_kv_action = 'clear_key' THEN
+ DELETE FROM SCHEMA.kv_store
+ WHERE instance_id = v_instance_id AND key = v_kv_item->>'key';
+ ELSIF v_kv_action = 'clear_all' THEN
+ DELETE FROM SCHEMA.kv_store
+ WHERE instance_id = v_instance_id;
+ END IF;
+ END LOOP;
+ END IF;
+
-- Step 8: Enqueue worker items with session_id and tag support
IF p_worker_items IS NOT NULL AND JSONB_ARRAY_LENGTH(p_worker_items) > 0 THEN
FOR v_elem IN SELECT value FROM JSONB_ARRAY_ELEMENTS(p_worker_items) LOOP
```
### `delete_instances_atomic` — body modified (baseline: 0010)
```diff
@@ -85,6 +85,9 @@
-- Delete instance locks
DELETE FROM SCHEMA.instance_locks WHERE instance_id = ANY(p_instance_ids);
+ -- Delete KV store entries
+ DELETE FROM SCHEMA.kv_store WHERE instance_id = ANY(p_instance_ids);
+
-- Delete instances
DELETE FROM SCHEMA.instances WHERE instance_id = ANY(p_instance_ids);
GET DIAGNOSTICS v_count = ROW_COUNT;
```
### `prune_executions` — body modified (baseline: 0010)
```diff
@@ -14,6 +14,7 @@
v_events_deleted BIGINT := 0;
v_count BIGINT;
v_exec_ids_to_delete BIGINT[];
+ v_exec_id BIGINT;
BEGIN
-- Get current execution ID (NEVER delete this)
SELECT i.current_execution_id INTO v_current_execution_id
@@ -54,13 +55,19 @@
RETURN;
END IF;
- -- Delete history for pruned executions
- DELETE FROM SCHEMA.history h
- WHERE h.instance_id = p_instance_id
- AND h.execution_id = ANY(v_exec_ids_to_delete);
- GET DIAGNOSTICS v_count = ROW_COUNT;
- v_events_deleted := v_count;
+ FOREACH v_exec_id IN ARRAY v_exec_ids_to_delete LOOP
+ -- Delete history for this execution
+ DELETE FROM SCHEMA.history h
+ WHERE h.instance_id = p_instance_id
+ AND h.execution_id = v_exec_id;
+ GET DIAGNOSTICS v_count = ROW_COUNT;
+ v_events_deleted := v_events_deleted + v_count;
+ -- Delete KV entries whose last-writing execution matches the pruned one
+ DELETE FROM SCHEMA.kv_store
+ WHERE instance_id = p_instance_id AND execution_id = v_exec_id;
+ END LOOP;
+
-- Delete executions
DELETE FROM SCHEMA.executions e
WHERE e.instance_id = p_instance_id
```
### `cleanup_schema` — body modified (baseline: 0016)
```diff
@@ -1,8 +1,8 @@
CREATE OR REPLACE FUNCTION SCHEMA.cleanup_schema()
RETURNS VOID AS $cleanup$
BEGIN
- -- Drop tables first
DROP TABLE IF EXISTS SCHEMA.sessions CASCADE;
+ DROP TABLE IF EXISTS SCHEMA.kv_store CASCADE;
DROP TABLE IF EXISTS SCHEMA.instances CASCADE;
DROP TABLE IF EXISTS SCHEMA.executions CASCADE;
DROP TABLE IF EXISTS SCHEMA.history CASCADE;
@@ -11,7 +11,6 @@
DROP TABLE IF EXISTS SCHEMA.instance_locks CASCADE;
DROP TABLE IF EXISTS SCHEMA._duroxide_migrations CASCADE;
- -- Drop all stored procedures (old + new signatures)
DROP FUNCTION IF EXISTS SCHEMA.cleanup_schema();
DROP FUNCTION IF EXISTS SCHEMA.list_instances();
DROP FUNCTION IF EXISTS SCHEMA.list_executions(TEXT);
```