duroxide-pg 0.1.25

A PostgreSQL-based provider implementation for Duroxide, a durable task orchestration framework
Documentation
# Diff for migration 0016

**Migration file:** `0016_add_activity_tags.sql`

Each changed function is shown **in full** with `+`/`-` markers on changed lines.

## Table Changes

### `worker_queue` — Modified
```diff
  id BIGSERIAL PRIMARY KEY
  work_item TEXT NOT NULL
  visible_at TIMESTAMP WITH TIME ZONE NOT NULL
  created_at TIMESTAMP WITH TIME ZONE NOT NULL
  lock_token TEXT
  locked_until BIGINT
  attempt_count INTEGER DEFAULT 0
  instance_id TEXT
  execution_id BIGINT
  activity_id BIGINT
  session_id TEXT
+ tag TEXT
```

### New Index
```sql
CREATE INDEX idx_worker_queue_tag ON worker_queue(tag);
```

## Function Changes

### `enqueue_worker_work` — Signature Modified

```diff
-CREATE OR REPLACE FUNCTION SCHEMA.enqueue_worker_work(p_work_item TEXT, p_now_ms BIGINT, p_instance_id TEXT DEFAULT NULL, p_execution_id BIGINT DEFAULT NULL, p_activity_id BIGINT DEFAULT NULL, p_session_id TEXT DEFAULT NULL)
+CREATE OR REPLACE FUNCTION SCHEMA.enqueue_worker_work(p_work_item TEXT, p_now_ms BIGINT, p_instance_id TEXT DEFAULT NULL, p_execution_id BIGINT DEFAULT NULL, p_activity_id BIGINT DEFAULT NULL, p_session_id TEXT DEFAULT NULL, p_tag TEXT DEFAULT NULL)
  RETURNS VOID
  LANGUAGE plpgsql
 AS $function$
         DECLARE
             v_now_ts TIMESTAMPTZ;
         BEGIN
             v_now_ts := TO_TIMESTAMP(p_now_ms / 1000.0);
-            INSERT INTO SCHEMA.worker_queue (work_item, visible_at, created_at, instance_id, execution_id, activity_id, session_id)
-            VALUES (p_work_item, v_now_ts, v_now_ts, p_instance_id, p_execution_id, p_activity_id, p_session_id);
+            INSERT INTO SCHEMA.worker_queue (work_item, visible_at, created_at, instance_id, execution_id, activity_id, session_id, tag)
+            VALUES (p_work_item, v_now_ts, v_now_ts, p_instance_id, p_execution_id, p_activity_id, p_session_id, p_tag);
         END;
         $function$
```

### `fetch_work_item` — Signature and Body Modified

```diff
-CREATE OR REPLACE FUNCTION SCHEMA.fetch_work_item(p_now_ms BIGINT, p_lock_timeout_ms BIGINT, p_owner_id TEXT DEFAULT NULL, p_session_lock_timeout_ms BIGINT DEFAULT NULL)
+CREATE OR REPLACE FUNCTION SCHEMA.fetch_work_item(p_now_ms BIGINT, p_lock_timeout_ms BIGINT, p_owner_id TEXT DEFAULT NULL, p_session_lock_timeout_ms BIGINT DEFAULT NULL, p_tag_filter TEXT[] DEFAULT NULL, p_tag_mode TEXT DEFAULT 'default_only')
  RETURNS TABLE(out_work_item TEXT, out_lock_token TEXT, out_attempt_count INTEGER)
  LANGUAGE plpgsql
 AS $function$
 ...
+            -- none mode: return immediately with no results
+            IF p_tag_mode = 'none' THEN
+                RETURN;
+            END IF;
 ...
             -- Select candidate row WITH TAG FILTERING
             -- Session-aware and non-session paths both include:
+            AND (
+                CASE p_tag_mode
+                    WHEN 'default_only' THEN q.tag IS NULL
+                    WHEN 'tags' THEN q.tag = ANY(p_tag_filter)
+                    WHEN 'default_and' THEN (q.tag IS NULL OR q.tag = ANY(p_tag_filter))
+                    WHEN 'any' THEN TRUE
+                    ELSE FALSE
+                END
+            )
 ...
         $function$
```

### `ack_orchestration_item` — Body Modified (Step 8: worker item enqueue)

New local variable and tag extraction added:

```diff
  DECLARE
      ...
      v_item_session_id TEXT;
+     v_item_tag TEXT;
      v_now_ts TIMESTAMPTZ;
      ...
  BEGIN
      ...
      -- Step 8: Enqueue worker items with session_id and tag support
      IF p_worker_items IS NOT NULL AND JSONB_ARRAY_LENGTH(p_worker_items) > 0 THEN
          FOR v_elem IN SELECT value FROM JSONB_ARRAY_ELEMENTS(p_worker_items) LOOP
              IF v_elem ? 'ActivityExecute' THEN
                  v_item_instance_id := v_elem->'ActivityExecute'->>'instance';
                  v_item_execution_id := (v_elem->'ActivityExecute'->>'execution_id')::BIGINT;
                  v_item_activity_id := (v_elem->'ActivityExecute'->>'id')::BIGINT;
                  v_item_session_id := v_elem->'ActivityExecute'->>'session_id';
+                 v_item_tag := v_elem->'ActivityExecute'->>'tag';
              ELSE
                  v_item_instance_id := NULL;
                  v_item_execution_id := NULL;
                  v_item_activity_id := NULL;
                  v_item_session_id := NULL;
+                 v_item_tag := NULL;
              END IF;

-             INSERT INTO SCHEMA.worker_queue (work_item, visible_at, created_at, instance_id, execution_id, activity_id, session_id)
-             VALUES (v_elem::TEXT, v_now_ts, v_now_ts, v_item_instance_id, v_item_execution_id, v_item_activity_id, v_item_session_id);
+             INSERT INTO SCHEMA.worker_queue (work_item, visible_at, created_at, instance_id, execution_id, activity_id, session_id, tag)
+             VALUES (v_elem::TEXT, v_now_ts, v_now_ts, v_item_instance_id, v_item_execution_id, v_item_activity_id, v_item_session_id, v_item_tag);
          END LOOP;
      END IF;
      ...
```

### `cleanup_schema` — Body Modified

```diff
  -- Drop all stored procedures (old + new signatures)
  ...
  DROP FUNCTION IF EXISTS SCHEMA.enqueue_worker_work(TEXT, BIGINT, TEXT, BIGINT, BIGINT, TEXT);
+ DROP FUNCTION IF EXISTS SCHEMA.enqueue_worker_work(TEXT, BIGINT, TEXT, BIGINT, BIGINT, TEXT, TEXT);
  ...
  DROP FUNCTION IF EXISTS SCHEMA.fetch_work_item(BIGINT, BIGINT, TEXT, BIGINT);
+ DROP FUNCTION IF EXISTS SCHEMA.fetch_work_item(BIGINT, BIGINT, TEXT, BIGINT, TEXT[], TEXT);
  ...
```

---