duroxide-pg 0.1.25

A PostgreSQL-based provider implementation for Duroxide, a durable task orchestration framework
Documentation
# Diff for migration 0018

Migration file: `0018_add_kv_store.sql`

## Table Changes

### `kv_store` — NEW

```sql
CREATE TABLE IF NOT EXISTS SCHEMA.kv_store (
    instance_id TEXT NOT NULL,
    key TEXT NOT NULL,
    value TEXT NOT NULL,
    execution_id BIGINT NOT NULL,
    PRIMARY KEY (instance_id, key)
);
```

## New Indexes

### `idx_kv_store_execution` — NEW

```sql
CREATE INDEX IF NOT EXISTS idx_kv_store_execution ON SCHEMA.kv_store(instance_id, execution_id);
```

## Function Changes

### `fetch_orchestration_item` — body modified (baseline: 0017)

```diff
@@ -12,7 +12,8 @@
             out_history JSONB,
             out_messages JSONB,
             out_lock_token TEXT,
-            out_attempt_count INTEGER
+            out_attempt_count INTEGER,
+            out_kv_snapshot JSONB
         ) AS $fetch_orch$
         DECLARE
             v_instance_id TEXT;
@@ -23,6 +24,7 @@
             v_current_execution_id BIGINT;
             v_history JSONB;
             v_messages JSONB;
+            v_kv_snapshot JSONB;
             v_lock_acquired INTEGER;
             v_max_attempt_count INTEGER;
         BEGIN
@@ -177,6 +179,12 @@
                     END IF;
                 END IF;
 
+                -- Load KV snapshot for this instance
+                SELECT COALESCE(jsonb_object_agg(ks.key, ks.value), '{}'::jsonb)
+                INTO v_kv_snapshot
+                FROM SCHEMA.kv_store ks
+                WHERE ks.instance_id = v_instance_id;
+
                 RETURN QUERY SELECT
                     v_instance_id,
                     v_orchestration_name,
@@ -185,7 +193,8 @@
                     v_history,
                     v_messages,
                     v_lock_token,
-                    v_max_attempt_count;
+                    v_max_attempt_count,
+                    v_kv_snapshot;
                 RETURN;
             END LOOP;
         END;
```

### `ack_orchestration_item` — body modified (baseline: 0016)

```diff
@@ -28,6 +28,11 @@
             v_now_ts TIMESTAMPTZ;
             v_custom_status_action TEXT;
             v_custom_status_value TEXT;
+            v_current_execution_id BIGINT;
+            v_kv_mutations JSONB;
+            v_kv_item JSONB;
+            v_kv_action TEXT;
+            v_i INTEGER;
         BEGIN
             -- Convert Rust-supplied millisecond timestamp to TIMESTAMPTZ
             v_now_ts := TO_TIMESTAMP(p_now_ms / 1000.0);
@@ -47,6 +52,7 @@
             v_parent_instance_id := p_metadata->>'parent_instance_id';
             v_status := p_metadata->>'status';
             v_output := p_metadata->>'output';
+            v_current_execution_id := p_execution_id;
 
             -- Step 3: Create or update instance metadata (with explicit timestamps)
             IF v_orchestration_name IS NOT NULL AND v_orchestration_version IS NOT NULL THEN
@@ -123,6 +129,27 @@
                 WHERE instance_id = v_instance_id;
             END IF;
 
+            -- Step 7d: Materialize KV mutations
+            v_kv_mutations := p_metadata->'kv_mutations';
+            IF v_kv_mutations IS NOT NULL AND jsonb_array_length(v_kv_mutations) > 0 THEN
+                FOR v_i IN 0..jsonb_array_length(v_kv_mutations) - 1 LOOP
+                    v_kv_item := v_kv_mutations->v_i;
+                    v_kv_action := v_kv_item->>'action';
+                    IF v_kv_action = 'set' THEN
+                        INSERT INTO SCHEMA.kv_store (instance_id, key, value, execution_id)
+                        VALUES (v_instance_id, v_kv_item->>'key', v_kv_item->>'value', v_current_execution_id)
+                        ON CONFLICT (instance_id, key)
+                        DO UPDATE SET value = EXCLUDED.value, execution_id = EXCLUDED.execution_id;
+                    ELSIF v_kv_action = 'clear_key' THEN
+                        DELETE FROM SCHEMA.kv_store
+                        WHERE instance_id = v_instance_id AND key = v_kv_item->>'key';
+                    ELSIF v_kv_action = 'clear_all' THEN
+                        DELETE FROM SCHEMA.kv_store
+                        WHERE instance_id = v_instance_id;
+                    END IF;
+                END LOOP;
+            END IF;
+
             -- Step 8: Enqueue worker items with session_id and tag support
             IF p_worker_items IS NOT NULL AND JSONB_ARRAY_LENGTH(p_worker_items) > 0 THEN
                 FOR v_elem IN SELECT value FROM JSONB_ARRAY_ELEMENTS(p_worker_items) LOOP
```

### `delete_instances_atomic` — body modified (baseline: 0010)

```diff
@@ -85,6 +85,9 @@
             -- Delete instance locks
             DELETE FROM SCHEMA.instance_locks WHERE instance_id = ANY(p_instance_ids);
 
+            -- Delete KV store entries
+            DELETE FROM SCHEMA.kv_store WHERE instance_id = ANY(p_instance_ids);
+
             -- Delete instances
             DELETE FROM SCHEMA.instances WHERE instance_id = ANY(p_instance_ids);
             GET DIAGNOSTICS v_count = ROW_COUNT;
```

### `prune_executions` — body modified (baseline: 0010)

```diff
@@ -14,6 +14,7 @@
             v_events_deleted BIGINT := 0;
             v_count BIGINT;
             v_exec_ids_to_delete BIGINT[];
+            v_exec_id BIGINT;
         BEGIN
             -- Get current execution ID (NEVER delete this)
             SELECT i.current_execution_id INTO v_current_execution_id
@@ -54,13 +55,19 @@
                 RETURN;
             END IF;
 
-            -- Delete history for pruned executions
-            DELETE FROM SCHEMA.history h
-            WHERE h.instance_id = p_instance_id
-              AND h.execution_id = ANY(v_exec_ids_to_delete);
-            GET DIAGNOSTICS v_count = ROW_COUNT;
-            v_events_deleted := v_count;
+            FOREACH v_exec_id IN ARRAY v_exec_ids_to_delete LOOP
+                -- Delete history for this execution
+                DELETE FROM SCHEMA.history h
+                WHERE h.instance_id = p_instance_id
+                  AND h.execution_id = v_exec_id;
+                GET DIAGNOSTICS v_count = ROW_COUNT;
+                v_events_deleted := v_events_deleted + v_count;
 
+                -- Delete KV entries whose last-writing execution matches the pruned one
+                DELETE FROM SCHEMA.kv_store
+                WHERE instance_id = p_instance_id AND execution_id = v_exec_id;
+            END LOOP;
+
             -- Delete executions
             DELETE FROM SCHEMA.executions e
             WHERE e.instance_id = p_instance_id
```

### `cleanup_schema` — body modified (baseline: 0016)

```diff
@@ -1,8 +1,8 @@
 CREATE OR REPLACE FUNCTION SCHEMA.cleanup_schema()
         RETURNS VOID AS $cleanup$
         BEGIN
-            -- Drop tables first
             DROP TABLE IF EXISTS SCHEMA.sessions CASCADE;
+            DROP TABLE IF EXISTS SCHEMA.kv_store CASCADE;
             DROP TABLE IF EXISTS SCHEMA.instances CASCADE;
             DROP TABLE IF EXISTS SCHEMA.executions CASCADE;
             DROP TABLE IF EXISTS SCHEMA.history CASCADE;
@@ -11,7 +11,6 @@
             DROP TABLE IF EXISTS SCHEMA.instance_locks CASCADE;
             DROP TABLE IF EXISTS SCHEMA._duroxide_migrations CASCADE;
 
-            -- Drop all stored procedures (old + new signatures)
             DROP FUNCTION IF EXISTS SCHEMA.cleanup_schema();
             DROP FUNCTION IF EXISTS SCHEMA.list_instances();
             DROP FUNCTION IF EXISTS SCHEMA.list_executions(TEXT);
```