# Diff for migration 0016
**Migration file:** `0016_add_activity_tags.sql`
Each changed function is shown **in full** with `+`/`-` markers on changed lines.
## Table Changes
### `worker_queue` — Modified
```diff
id BIGSERIAL PRIMARY KEY
work_item TEXT NOT NULL
visible_at TIMESTAMP WITH TIME ZONE NOT NULL
created_at TIMESTAMP WITH TIME ZONE NOT NULL
lock_token TEXT
locked_until BIGINT
attempt_count INTEGER DEFAULT 0
instance_id TEXT
execution_id BIGINT
activity_id BIGINT
session_id TEXT
+ tag TEXT
```
### New Index
```sql
CREATE INDEX idx_worker_queue_tag ON worker_queue(tag);
```
## Function Changes
### `enqueue_worker_work` — Signature Modified
```diff
-CREATE OR REPLACE FUNCTION SCHEMA.enqueue_worker_work(p_work_item TEXT, p_now_ms BIGINT, p_instance_id TEXT DEFAULT NULL, p_execution_id BIGINT DEFAULT NULL, p_activity_id BIGINT DEFAULT NULL, p_session_id TEXT DEFAULT NULL)
+CREATE OR REPLACE FUNCTION SCHEMA.enqueue_worker_work(p_work_item TEXT, p_now_ms BIGINT, p_instance_id TEXT DEFAULT NULL, p_execution_id BIGINT DEFAULT NULL, p_activity_id BIGINT DEFAULT NULL, p_session_id TEXT DEFAULT NULL, p_tag TEXT DEFAULT NULL)
RETURNS VOID
LANGUAGE plpgsql
AS $function$
DECLARE
v_now_ts TIMESTAMPTZ;
BEGIN
v_now_ts := TO_TIMESTAMP(p_now_ms / 1000.0);
- INSERT INTO SCHEMA.worker_queue (work_item, visible_at, created_at, instance_id, execution_id, activity_id, session_id)
- VALUES (p_work_item, v_now_ts, v_now_ts, p_instance_id, p_execution_id, p_activity_id, p_session_id);
+ INSERT INTO SCHEMA.worker_queue (work_item, visible_at, created_at, instance_id, execution_id, activity_id, session_id, tag)
+ VALUES (p_work_item, v_now_ts, v_now_ts, p_instance_id, p_execution_id, p_activity_id, p_session_id, p_tag);
END;
$function$
```
### `fetch_work_item` — Signature and Body Modified
```diff
-CREATE OR REPLACE FUNCTION SCHEMA.fetch_work_item(p_now_ms BIGINT, p_lock_timeout_ms BIGINT, p_owner_id TEXT DEFAULT NULL, p_session_lock_timeout_ms BIGINT DEFAULT NULL)
+CREATE OR REPLACE FUNCTION SCHEMA.fetch_work_item(p_now_ms BIGINT, p_lock_timeout_ms BIGINT, p_owner_id TEXT DEFAULT NULL, p_session_lock_timeout_ms BIGINT DEFAULT NULL, p_tag_filter TEXT[] DEFAULT NULL, p_tag_mode TEXT DEFAULT 'default_only')
RETURNS TABLE(out_work_item TEXT, out_lock_token TEXT, out_attempt_count INTEGER)
LANGUAGE plpgsql
AS $function$
...
+ -- none mode: return immediately with no results
+ IF p_tag_mode = 'none' THEN
+ RETURN;
+ END IF;
...
-- Select candidate row WITH TAG FILTERING
-- Session-aware and non-session paths both include:
+ AND (
+ CASE p_tag_mode
+ WHEN 'default_only' THEN q.tag IS NULL
+ WHEN 'tags' THEN q.tag = ANY(p_tag_filter)
+ WHEN 'default_and' THEN (q.tag IS NULL OR q.tag = ANY(p_tag_filter))
+ WHEN 'any' THEN TRUE
+ ELSE FALSE
+ END
+ )
...
$function$
```
### `ack_orchestration_item` — Body Modified (Step 8: worker item enqueue)
New local variable and tag extraction added:
```diff
DECLARE
...
v_item_session_id TEXT;
+ v_item_tag TEXT;
v_now_ts TIMESTAMPTZ;
...
BEGIN
...
-- Step 8: Enqueue worker items with session_id and tag support
IF p_worker_items IS NOT NULL AND JSONB_ARRAY_LENGTH(p_worker_items) > 0 THEN
FOR v_elem IN SELECT value FROM JSONB_ARRAY_ELEMENTS(p_worker_items) LOOP
IF v_elem ? 'ActivityExecute' THEN
v_item_instance_id := v_elem->'ActivityExecute'->>'instance';
v_item_execution_id := (v_elem->'ActivityExecute'->>'execution_id')::BIGINT;
v_item_activity_id := (v_elem->'ActivityExecute'->>'id')::BIGINT;
v_item_session_id := v_elem->'ActivityExecute'->>'session_id';
+ v_item_tag := v_elem->'ActivityExecute'->>'tag';
ELSE
v_item_instance_id := NULL;
v_item_execution_id := NULL;
v_item_activity_id := NULL;
v_item_session_id := NULL;
+ v_item_tag := NULL;
END IF;
- INSERT INTO SCHEMA.worker_queue (work_item, visible_at, created_at, instance_id, execution_id, activity_id, session_id)
- VALUES (v_elem::TEXT, v_now_ts, v_now_ts, v_item_instance_id, v_item_execution_id, v_item_activity_id, v_item_session_id);
+ INSERT INTO SCHEMA.worker_queue (work_item, visible_at, created_at, instance_id, execution_id, activity_id, session_id, tag)
+ VALUES (v_elem::TEXT, v_now_ts, v_now_ts, v_item_instance_id, v_item_execution_id, v_item_activity_id, v_item_session_id, v_item_tag);
END LOOP;
END IF;
...
```
### `cleanup_schema` — Body Modified
```diff
-- Drop all stored procedures (old + new signatures)
...
DROP FUNCTION IF EXISTS SCHEMA.enqueue_worker_work(TEXT, BIGINT, TEXT, BIGINT, BIGINT, TEXT);
+ DROP FUNCTION IF EXISTS SCHEMA.enqueue_worker_work(TEXT, BIGINT, TEXT, BIGINT, BIGINT, TEXT, TEXT);
...
DROP FUNCTION IF EXISTS SCHEMA.fetch_work_item(BIGINT, BIGINT, TEXT, BIGINT);
+ DROP FUNCTION IF EXISTS SCHEMA.fetch_work_item(BIGINT, BIGINT, TEXT, BIGINT, TEXT[], TEXT);
...
```
---