# Diff for migration 0020
Migration file: `0020_add_kv_delta.sql`
## Table Changes
### `kv_delta` — new table
```diff
@@
+ CREATE TABLE IF NOT EXISTS SCHEMA.kv_delta (
+ instance_id TEXT NOT NULL,
+ key TEXT NOT NULL,
+ value TEXT,
+ last_updated_at_ms BIGINT NOT NULL,
+ PRIMARY KEY (instance_id, key)
+ );
```
### `kv_store` — preserved for backward compatibility
```diff
@@
kv_store(instance_id, key, value, execution_id, last_updated_at_ms)
```
- `execution_id` is intentionally retained in PostgreSQL for backward compatibility.
- `idx_kv_store_execution` is intentionally retained.
## Function Changes
### `ack_orchestration_item` — KV writes redirected to `kv_delta`, merged on terminal turns
```diff
@@
- IF v_kv_action = 'set' THEN
- INSERT INTO SCHEMA.kv_store (instance_id, key, value, execution_id, last_updated_at_ms)
- VALUES (v_instance_id, v_kv_item->>'key', v_kv_item->>'value', v_current_execution_id, (v_kv_item->>'last_updated_at_ms')::BIGINT)
- ON CONFLICT (instance_id, key)
- DO UPDATE SET value = EXCLUDED.value, execution_id = EXCLUDED.execution_id, last_updated_at_ms = EXCLUDED.last_updated_at_ms;
- ELSIF v_kv_action = 'clear_key' THEN
- DELETE FROM SCHEMA.kv_store
- WHERE instance_id = v_instance_id AND key = v_kv_item->>'key';
- ELSIF v_kv_action = 'clear_all' THEN
- DELETE FROM SCHEMA.kv_store
- WHERE instance_id = v_instance_id;
+ IF v_kv_action = 'set' THEN
+ INSERT INTO SCHEMA.kv_delta (instance_id, key, value, last_updated_at_ms)
+ VALUES (v_instance_id, v_kv_item->>'key', v_kv_item->>'value', (v_kv_item->>'last_updated_at_ms')::BIGINT)
+ ON CONFLICT (instance_id, key)
+ DO UPDATE SET value = EXCLUDED.value, last_updated_at_ms = EXCLUDED.last_updated_at_ms;
+ ELSIF v_kv_action = 'clear_key' THEN
+ INSERT INTO SCHEMA.kv_delta (instance_id, key, value, last_updated_at_ms)
+ VALUES (v_instance_id, v_kv_item->>'key', NULL, p_now_ms)
+ ON CONFLICT (instance_id, key)
+ DO UPDATE SET value = NULL, last_updated_at_ms = EXCLUDED.last_updated_at_ms;
+ ELSIF v_kv_action = 'clear_all' THEN
+ UPDATE SCHEMA.kv_delta
+ SET value = NULL,
+ last_updated_at_ms = p_now_ms
+ WHERE instance_id = v_instance_id;
+
+ INSERT INTO SCHEMA.kv_delta (instance_id, key, value, last_updated_at_ms)
+ SELECT ks.instance_id, ks.key, NULL, p_now_ms
+ FROM SCHEMA.kv_store ks
+ WHERE ks.instance_id = v_instance_id
+ ON CONFLICT (instance_id, key) DO NOTHING;
END IF;
END LOOP;
END IF;
+
+ IF v_is_terminal THEN
+ INSERT INTO SCHEMA.kv_store (instance_id, key, value, execution_id, last_updated_at_ms)
+ SELECT kd.instance_id, kd.key, kd.value, p_execution_id, kd.last_updated_at_ms
+ FROM SCHEMA.kv_delta kd
+ WHERE kd.instance_id = v_instance_id AND kd.value IS NOT NULL
+ ON CONFLICT (instance_id, key)
+ DO UPDATE SET value = EXCLUDED.value,
+ execution_id = EXCLUDED.execution_id,
+ last_updated_at_ms = EXCLUDED.last_updated_at_ms;
+
+ DELETE FROM SCHEMA.kv_store ks
+ WHERE ks.instance_id = v_instance_id
+ AND ks.key IN (
+ SELECT kd.key
+ FROM SCHEMA.kv_delta kd
+ WHERE kd.instance_id = v_instance_id
+ AND kd.value IS NULL
+ );
+
+ DELETE FROM SCHEMA.kv_delta kd
+ WHERE kd.instance_id = v_instance_id;
+ END IF;
```
### `delete_instances_atomic` — also removes `kv_delta`
```diff
@@
DELETE FROM SCHEMA.instance_locks WHERE instance_id = ANY(p_instance_ids);
+ DELETE FROM SCHEMA.kv_delta WHERE instance_id = ANY(p_instance_ids);
DELETE FROM SCHEMA.kv_store WHERE instance_id = ANY(p_instance_ids);
```
### `cleanup_schema` — drops `kv_delta` and new KV read helpers
```diff
@@
DROP TABLE IF EXISTS SCHEMA.sessions CASCADE;
+ DROP TABLE IF EXISTS SCHEMA.kv_delta CASCADE;
DROP TABLE IF EXISTS SCHEMA.kv_store CASCADE;
@@
DROP FUNCTION IF EXISTS SCHEMA.cleanup_orphaned_sessions(BIGINT);
DROP FUNCTION IF EXISTS SCHEMA.get_custom_status(TEXT, BIGINT);
+ DROP FUNCTION IF EXISTS SCHEMA.get_kv_value(TEXT, TEXT);
+ DROP FUNCTION IF EXISTS SCHEMA.get_kv_all_values(TEXT);
```
### `get_kv_value` — delta-first KV lookup stored procedure
```diff
@@
+ CREATE OR REPLACE FUNCTION SCHEMA.get_kv_value(p_instance TEXT, p_key TEXT)
+ RETURNS TABLE(out_value TEXT, out_found BOOLEAN)
```
- Checks `kv_delta` first.
- Returns `out_found = FALSE` for tombstones (`value IS NULL`) and for keys absent from both tables.
- Falls back to `kv_store` when there is no delta row.
### `get_kv_all_values` — merged KV snapshot stored procedure
```diff
@@
+ CREATE OR REPLACE FUNCTION SCHEMA.get_kv_all_values(p_instance TEXT)
+ RETURNS TABLE(out_key TEXT, out_value TEXT)
```
- Merges `kv_store` with `kv_delta` using a `FULL OUTER JOIN`.
- Delta rows override store rows.
- Tombstones (`kv_delta.value IS NULL`) remove keys from the returned snapshot.
## No-op by design
- `fetch_orchestration_item` is unchanged and continues to seed snapshots from `kv_store` only.
- No PostgreSQL migration drops `kv_store.execution_id`.
- No PostgreSQL migration drops `idx_kv_store_execution`.