# Diff for migration 0014
**Migration file:** `0014_add_session_support.sql`
Each changed function is shown **in full** with `+`/`-` markers on changed lines.
## Table Changes
### `sessions` — NEW
```sql
session_id TEXT NOT NULL
worker_id TEXT NOT NULL
locked_until BIGINT NOT NULL
last_activity_at BIGINT NOT NULL
```
### `worker_queue` — Modified
```diff
id BIGINT NOT NULL DEFAULT nextval('fdiff_a_0014_62577.worker_queue_id_seq'::regclass)
work_item TEXT NOT NULL
visible_at TIMESTAMP WITH TIME ZONE NOT NULL
lock_token TEXT
locked_until BIGINT
created_at TIMESTAMP WITH TIME ZONE NOT NULL
attempt_count INTEGER NOT NULL DEFAULT 0
instance_id TEXT
execution_id BIGINT
activity_id BIGINT
+ session_id TEXT
```
## New Indexes
### `idx_worker_queue_session_id`
```sql
CREATE INDEX idx_worker_queue_session_id ON SCHEMA.worker_queue USING btree (session_id)
```
## Function Changes
### `ack_orchestration_item` — Body Modified
Full function with diff:
```diff
CREATE OR REPLACE FUNCTION SCHEMA.ack_orchestration_item(p_lock_token text, p_now_ms bigint, p_execution_id bigint, p_history_delta jsonb, p_worker_items jsonb, p_orchestrator_items jsonb, p_metadata jsonb, p_cancelled_activities jsonb DEFAULT '[]'::jsonb)
RETURNS void
LANGUAGE plpgsql
AS $function$
DECLARE
v_instance_id TEXT;
v_orchestration_name TEXT;
v_orchestration_version TEXT;
v_parent_instance_id TEXT;
v_status TEXT;
v_output TEXT;
v_completed_at TIMESTAMPTZ;
v_elem JSONB;
v_visible_at TIMESTAMPTZ;
v_fire_at_ms BIGINT;
v_item_instance_id TEXT;
v_item_execution_id BIGINT;
v_item_activity_id BIGINT;
+ v_item_session_id TEXT;
v_now_ts TIMESTAMPTZ;
BEGIN
-- Convert Rust-supplied millisecond timestamp to TIMESTAMPTZ
v_now_ts := TO_TIMESTAMP(p_now_ms / 1000.0);
-- Step 1: Validate lock token
SELECT il.instance_id INTO v_instance_id
FROM SCHEMA.instance_locks il
WHERE il.lock_token = p_lock_token AND il.locked_until > p_now_ms;
IF NOT FOUND THEN
RAISE EXCEPTION 'Invalid lock token';
END IF;
-- Step 2: Extract metadata from JSONB
v_orchestration_name := p_metadata->>'orchestration_name';
v_orchestration_version := p_metadata->>'orchestration_version';
v_parent_instance_id := p_metadata->>'parent_instance_id';
v_status := p_metadata->>'status';
v_output := p_metadata->>'output';
-- Step 3: Create or update instance metadata (with explicit timestamps)
IF v_orchestration_name IS NOT NULL AND v_orchestration_version IS NOT NULL THEN
INSERT INTO SCHEMA.instances (instance_id, orchestration_name, orchestration_version, current_execution_id, parent_instance_id, created_at, updated_at)
VALUES (v_instance_id, v_orchestration_name, v_orchestration_version, p_execution_id, v_parent_instance_id, v_now_ts, v_now_ts)
ON CONFLICT (instance_id) DO NOTHING;
UPDATE SCHEMA.instances i
SET orchestration_name = v_orchestration_name,
orchestration_version = v_orchestration_version,
parent_instance_id = COALESCE(i.parent_instance_id, v_parent_instance_id),
updated_at = v_now_ts
WHERE i.instance_id = v_instance_id;
END IF;
-- Step 4: Create execution record (idempotent)
INSERT INTO SCHEMA.executions (instance_id, execution_id, status, started_at)
VALUES (v_instance_id, p_execution_id, 'Running', v_now_ts)
ON CONFLICT (instance_id, execution_id) DO NOTHING;
-- Step 5: Update instance current_execution_id
UPDATE SCHEMA.instances i
SET current_execution_id = GREATEST(i.current_execution_id, p_execution_id),
updated_at = v_now_ts
WHERE i.instance_id = v_instance_id;
-- Step 6: Append history_delta (batch insert with explicit timestamps)
IF p_history_delta IS NOT NULL AND JSONB_ARRAY_LENGTH(p_history_delta) > 0 THEN
INSERT INTO SCHEMA.history (instance_id, execution_id, event_id, event_type, event_data, created_at)
SELECT
v_instance_id,
p_execution_id,
(elem->>'event_id')::BIGINT,
elem->>'event_type',
elem->>'event_data',
v_now_ts
FROM JSONB_ARRAY_ELEMENTS(p_history_delta) AS elem;
END IF;
-- Step 7: Update execution status if provided
IF v_status IS NOT NULL THEN
v_completed_at := CASE
WHEN v_status IN ('Completed', 'Failed') THEN v_now_ts
ELSE NULL
END;
UPDATE SCHEMA.executions e
SET status = v_status, output = v_output, completed_at = v_completed_at
WHERE e.instance_id = v_instance_id AND e.execution_id = p_execution_id;
END IF;
-- Step 7b: Store pinned duroxide version if provided in metadata
IF p_metadata ? 'pinned_duroxide_version' AND p_metadata->'pinned_duroxide_version' IS NOT NULL
AND p_metadata->>'pinned_duroxide_version' != 'null' THEN
UPDATE SCHEMA.executions
SET duroxide_version_major = (p_metadata->'pinned_duroxide_version'->>'major')::INTEGER,
duroxide_version_minor = (p_metadata->'pinned_duroxide_version'->>'minor')::INTEGER,
duroxide_version_patch = (p_metadata->'pinned_duroxide_version'->>'patch')::INTEGER
WHERE instance_id = v_instance_id AND execution_id = p_execution_id;
END IF;
- -- Step 8: Enqueue worker items FIRST - now with activity identification
+ -- Step 8: Enqueue worker items with session_id support
IF p_worker_items IS NOT NULL AND JSONB_ARRAY_LENGTH(p_worker_items) > 0 THEN
FOR v_elem IN SELECT value FROM JSONB_ARRAY_ELEMENTS(p_worker_items) LOOP
IF v_elem ? 'ActivityExecute' THEN
v_item_instance_id := v_elem->'ActivityExecute'->>'instance';
v_item_execution_id := (v_elem->'ActivityExecute'->>'execution_id')::BIGINT;
v_item_activity_id := (v_elem->'ActivityExecute'->>'id')::BIGINT;
+ v_item_session_id := v_elem->'ActivityExecute'->>'session_id';
ELSE
v_item_instance_id := NULL;
v_item_execution_id := NULL;
v_item_activity_id := NULL;
+ v_item_session_id := NULL;
END IF;
- INSERT INTO SCHEMA.worker_queue (work_item, visible_at, created_at, instance_id, execution_id, activity_id)
- VALUES (v_elem::TEXT, v_now_ts, v_now_ts, v_item_instance_id, v_item_execution_id, v_item_activity_id);
+ INSERT INTO SCHEMA.worker_queue (work_item, visible_at, created_at, instance_id, execution_id, activity_id, session_id)
+ VALUES (v_elem::TEXT, v_now_ts, v_now_ts, v_item_instance_id, v_item_execution_id, v_item_activity_id, v_item_session_id);
END LOOP;
END IF;
-- Step 9: Delete cancelled activities from worker_queue (lock stealing)
IF p_cancelled_activities IS NOT NULL AND JSONB_ARRAY_LENGTH(p_cancelled_activities) > 0 THEN
FOR v_elem IN SELECT value FROM JSONB_ARRAY_ELEMENTS(p_cancelled_activities) LOOP
DELETE FROM SCHEMA.worker_queue
WHERE instance_id = v_elem->>'instance'
AND execution_id = (v_elem->>'execution_id')::BIGINT
AND activity_id = (v_elem->>'activity_id')::BIGINT;
END LOOP;
END IF;
-- Step 10: Enqueue orchestrator items
IF p_orchestrator_items IS NOT NULL AND JSONB_ARRAY_LENGTH(p_orchestrator_items) > 0 THEN
FOR v_elem IN SELECT value FROM JSONB_ARRAY_ELEMENTS(p_orchestrator_items) LOOP
IF v_elem ? 'StartOrchestration' THEN
v_item_instance_id := v_elem->'StartOrchestration'->>'instance';
ELSIF v_elem ? 'ContinueAsNew' THEN
v_item_instance_id := v_elem->'ContinueAsNew'->>'instance';
ELSIF v_elem ? 'TimerFired' THEN
v_item_instance_id := v_elem->'TimerFired'->>'instance';
v_fire_at_ms := (v_elem->'TimerFired'->>'fire_at_ms')::BIGINT;
ELSIF v_elem ? 'ActivityCompleted' THEN
v_item_instance_id := v_elem->'ActivityCompleted'->>'instance';
ELSIF v_elem ? 'ActivityFailed' THEN
v_item_instance_id := v_elem->'ActivityFailed'->>'instance';
ELSIF v_elem ? 'ExternalRaised' THEN
v_item_instance_id := v_elem->'ExternalRaised'->>'instance';
ELSIF v_elem ? 'CancelInstance' THEN
v_item_instance_id := v_elem->'CancelInstance'->>'instance';
ELSIF v_elem ? 'SubOrchCompleted' THEN
v_item_instance_id := v_elem->'SubOrchCompleted'->>'parent_instance';
ELSIF v_elem ? 'SubOrchFailed' THEN
v_item_instance_id := v_elem->'SubOrchFailed'->>'parent_instance';
ELSE
v_item_instance_id := v_instance_id;
END IF;
IF v_elem ? 'TimerFired' AND v_fire_at_ms IS NOT NULL AND v_fire_at_ms > 0 THEN
v_visible_at := TO_TIMESTAMP(v_fire_at_ms / 1000.0);
ELSE
v_visible_at := v_now_ts;
END IF;
INSERT INTO SCHEMA.orchestrator_queue (instance_id, work_item, visible_at, created_at)
VALUES (v_item_instance_id, v_elem::TEXT, v_visible_at, v_now_ts);
v_fire_at_ms := NULL;
END LOOP;
END IF;
-- Step 11: Delete locked messages
DELETE FROM SCHEMA.orchestrator_queue q WHERE q.lock_token = p_lock_token;
-- Step 12: Remove instance lock
DELETE FROM SCHEMA.instance_locks il
WHERE il.instance_id = v_instance_id AND il.lock_token = p_lock_token;
END;
$function$
\ No newline at end of file
```
### `ack_worker` — Body Modified
Full function with diff:
```diff
CREATE OR REPLACE FUNCTION SCHEMA.ack_worker(p_lock_token text, p_instance_id text DEFAULT NULL::text, p_completion_json text DEFAULT NULL::text, p_now_ms bigint DEFAULT NULL::bigint)
RETURNS void
LANGUAGE plpgsql
AS $function$
DECLARE
v_rows_affected INTEGER;
v_now_ts TIMESTAMPTZ;
+ v_session_id TEXT;
BEGIN
+ -- Capture session_id before deleting
+ SELECT session_id INTO v_session_id
+ FROM SCHEMA.worker_queue WHERE lock_token = p_lock_token;
+
-- Delete the worker queue item
DELETE FROM SCHEMA.worker_queue WHERE lock_token = p_lock_token;
GET DIAGNOSTICS v_rows_affected = ROW_COUNT;
IF v_rows_affected = 0 THEN
RAISE EXCEPTION 'Worker queue item not found or already processed';
END IF;
-- Only enqueue completion if provided (NULL means cancelled activity)
IF p_completion_json IS NOT NULL THEN
-- Validate required parameters for completion
IF p_instance_id IS NULL THEN
RAISE EXCEPTION 'p_instance_id is required when p_completion_json is provided';
END IF;
IF p_now_ms IS NULL THEN
RAISE EXCEPTION 'p_now_ms is required when p_completion_json is provided';
END IF;
v_now_ts := TO_TIMESTAMP(p_now_ms / 1000.0);
INSERT INTO SCHEMA.orchestrator_queue (instance_id, work_item, visible_at, created_at)
VALUES (p_instance_id, p_completion_json, v_now_ts, v_now_ts);
END IF;
+
+ -- Piggyback: update session last_activity_at
+ IF v_session_id IS NOT NULL AND p_now_ms IS NOT NULL THEN
+ UPDATE SCHEMA.sessions SET last_activity_at = p_now_ms
+ WHERE session_id = v_session_id AND locked_until > p_now_ms;
+ END IF;
END;
$function$
\ No newline at end of file
```
### `cleanup_orphaned_sessions` — NEW
```sql
CREATE OR REPLACE FUNCTION SCHEMA.cleanup_orphaned_sessions(p_now_ms bigint)
RETURNS bigint
LANGUAGE plpgsql
AS $function$
DECLARE
v_count BIGINT;
BEGIN
DELETE FROM SCHEMA.sessions
WHERE locked_until < p_now_ms
AND NOT EXISTS (SELECT 1 FROM SCHEMA.worker_queue WHERE worker_queue.session_id = sessions.session_id);
GET DIAGNOSTICS v_count = ROW_COUNT;
RETURN v_count;
END;
$function$
```
### `cleanup_schema` — Body Modified
Full function with diff:
```diff
CREATE OR REPLACE FUNCTION SCHEMA.cleanup_schema()
RETURNS void
LANGUAGE plpgsql
AS $function$
BEGIN
-- Drop tables first
+ DROP TABLE IF EXISTS SCHEMA.sessions CASCADE;
DROP TABLE IF EXISTS SCHEMA.instances CASCADE;
DROP TABLE IF EXISTS SCHEMA.executions CASCADE;
DROP TABLE IF EXISTS SCHEMA.history CASCADE;
DROP TABLE IF EXISTS SCHEMA.orchestrator_queue CASCADE;
DROP TABLE IF EXISTS SCHEMA.worker_queue CASCADE;
DROP TABLE IF EXISTS SCHEMA.instance_locks CASCADE;
DROP TABLE IF EXISTS SCHEMA._duroxide_migrations CASCADE;
- -- Drop all stored procedures (required for public schema cleanup)
- -- Base procedures from 0002_create_stored_procedures.sql
+ -- Drop all stored procedures
DROP FUNCTION IF EXISTS SCHEMA.cleanup_schema();
DROP FUNCTION IF EXISTS SCHEMA.list_instances();
DROP FUNCTION IF EXISTS SCHEMA.list_executions(TEXT);
DROP FUNCTION IF EXISTS SCHEMA.latest_execution_id(TEXT);
DROP FUNCTION IF EXISTS SCHEMA.list_instances_by_status(TEXT);
DROP FUNCTION IF EXISTS SCHEMA.get_instance_info(TEXT);
DROP FUNCTION IF EXISTS SCHEMA.get_execution_info(TEXT, BIGINT);
DROP FUNCTION IF EXISTS SCHEMA.get_system_metrics();
DROP FUNCTION IF EXISTS SCHEMA.get_queue_depths(BIGINT);
- DROP FUNCTION IF EXISTS SCHEMA.enqueue_worker_work(TEXT, BIGINT, TEXT, BIGINT, BIGINT);
+ DROP FUNCTION IF EXISTS SCHEMA.enqueue_worker_work(TEXT, BIGINT, TEXT, BIGINT, BIGINT, TEXT);
DROP FUNCTION IF EXISTS SCHEMA.ack_worker(TEXT, TEXT, TEXT, BIGINT);
DROP FUNCTION IF EXISTS SCHEMA.renew_work_item_lock(TEXT, BIGINT, BIGINT);
- DROP FUNCTION IF EXISTS SCHEMA.fetch_work_item(BIGINT, BIGINT);
+ DROP FUNCTION IF EXISTS SCHEMA.fetch_work_item(BIGINT, BIGINT, TEXT, BIGINT);
DROP FUNCTION IF EXISTS SCHEMA.abandon_work_item(TEXT, BIGINT, BIGINT, BOOLEAN);
DROP FUNCTION IF EXISTS SCHEMA.enqueue_orchestrator_work(TEXT, TEXT, TIMESTAMPTZ, TEXT, TEXT, BIGINT);
DROP FUNCTION IF EXISTS SCHEMA.fetch_orchestration_item(BIGINT, BIGINT);
DROP FUNCTION IF EXISTS SCHEMA.fetch_orchestration_item(BIGINT, BIGINT, BIGINT, BIGINT);
DROP FUNCTION IF EXISTS SCHEMA.ack_orchestration_item(TEXT, BIGINT, BIGINT, JSONB, JSONB, JSONB, JSONB, JSONB);
DROP FUNCTION IF EXISTS SCHEMA.abandon_orchestration_item(TEXT, BIGINT, BIGINT, BOOLEAN);
DROP FUNCTION IF EXISTS SCHEMA.renew_orchestration_item_lock(TEXT, BIGINT, BIGINT);
DROP FUNCTION IF EXISTS SCHEMA.fetch_history(TEXT);
DROP FUNCTION IF EXISTS SCHEMA.fetch_history_with_execution(TEXT, BIGINT);
DROP FUNCTION IF EXISTS SCHEMA.append_history(TEXT, BIGINT, JSONB);
- -- Deletion/pruning procedures from 0010_add_deletion_and_pruning_support.sql
DROP FUNCTION IF EXISTS SCHEMA.list_children(TEXT);
DROP FUNCTION IF EXISTS SCHEMA.get_parent_id(TEXT);
DROP FUNCTION IF EXISTS SCHEMA.delete_instances_atomic(TEXT[], BOOLEAN);
DROP FUNCTION IF EXISTS SCHEMA.prune_executions(TEXT, INTEGER, BIGINT);
+ DROP FUNCTION IF EXISTS SCHEMA.renew_session_lock(TEXT[], BIGINT, BIGINT, BIGINT);
+ DROP FUNCTION IF EXISTS SCHEMA.cleanup_orphaned_sessions(BIGINT);
END;
$function$
\ No newline at end of file
```
### `enqueue_worker_work` — Signature Changed + Body Modified
```diff
- enqueue_worker_work(p_work_item text, p_now_ms bigint, p_instance_id text, p_execution_id bigint, p_activity_id bigint)
+ enqueue_worker_work(p_work_item text, p_now_ms bigint, p_instance_id text, p_execution_id bigint, p_activity_id bigint, p_session_id text)
```
Full function with diff:
```diff
-CREATE OR REPLACE FUNCTION SCHEMA.enqueue_worker_work(p_work_item text, p_now_ms bigint, p_instance_id text DEFAULT NULL::text, p_execution_id bigint DEFAULT NULL::bigint, p_activity_id bigint DEFAULT NULL::bigint)
+CREATE OR REPLACE FUNCTION SCHEMA.enqueue_worker_work(p_work_item text, p_now_ms bigint, p_instance_id text DEFAULT NULL::text, p_execution_id bigint DEFAULT NULL::bigint, p_activity_id bigint DEFAULT NULL::bigint, p_session_id text DEFAULT NULL::text)
RETURNS void
LANGUAGE plpgsql
AS $function$
DECLARE
v_now_ts TIMESTAMPTZ;
BEGIN
v_now_ts := TO_TIMESTAMP(p_now_ms / 1000.0);
- INSERT INTO SCHEMA.worker_queue (work_item, visible_at, created_at, instance_id, execution_id, activity_id)
- VALUES (p_work_item, v_now_ts, v_now_ts, p_instance_id, p_execution_id, p_activity_id);
+ INSERT INTO SCHEMA.worker_queue (work_item, visible_at, created_at, instance_id, execution_id, activity_id, session_id)
+ VALUES (p_work_item, v_now_ts, v_now_ts, p_instance_id, p_execution_id, p_activity_id, p_session_id);
END;
$function$
\ No newline at end of file
```
### `fetch_work_item` — Signature Changed + Body Modified
```diff
- fetch_work_item(p_now_ms bigint, p_lock_timeout_ms bigint)
+ fetch_work_item(p_now_ms bigint, p_lock_timeout_ms bigint, p_owner_id text, p_session_lock_timeout_ms bigint)
```
Full function with diff:
```diff
-CREATE OR REPLACE FUNCTION SCHEMA.fetch_work_item(p_now_ms bigint, p_lock_timeout_ms bigint)
+CREATE OR REPLACE FUNCTION SCHEMA.fetch_work_item(p_now_ms bigint, p_lock_timeout_ms bigint, p_owner_id text DEFAULT NULL::text, p_session_lock_timeout_ms bigint DEFAULT NULL::bigint)
RETURNS TABLE(out_work_item text, out_lock_token text, out_attempt_count integer)
LANGUAGE plpgsql
AS $function$
DECLARE
v_id BIGINT;
+ v_session_id TEXT;
+ v_session_locked_until BIGINT;
BEGIN
- -- Item is available if:
- -- 1. visible_at <= now (not delayed)
- -- 2. AND (lock_token IS NULL OR locked_until <= now) (not locked or lock expired)
- SELECT q.id INTO v_id
- FROM SCHEMA.worker_queue q
- WHERE q.visible_at <= TO_TIMESTAMP(p_now_ms / 1000.0)
- AND (q.lock_token IS NULL OR q.locked_until <= p_now_ms)
- ORDER BY q.id
- LIMIT 1
- FOR UPDATE OF q SKIP LOCKED;
+ IF p_owner_id IS NOT NULL THEN
+ -- Session-aware fetch: find eligible items considering session routing
+ -- Eligible items are:
+ -- 1. Non-session items (q.session_id IS NULL)
+ -- 2. Items for sessions owned by this worker (s.worker_id = p_owner_id AND s.locked_until > p_now_ms)
+ -- 3. Items for claimable sessions (no active session row, or expired lock)
+ SELECT q.id, q.session_id INTO v_id, v_session_id
+ FROM SCHEMA.worker_queue q
+ LEFT JOIN SCHEMA.sessions s ON s.session_id = q.session_id AND s.locked_until > p_now_ms
+ WHERE q.visible_at <= TO_TIMESTAMP(p_now_ms / 1000.0)
+ AND (q.lock_token IS NULL OR q.locked_until <= p_now_ms)
+ AND (
+ q.session_id IS NULL
+ OR s.worker_id = p_owner_id
+ OR s.session_id IS NULL
+ )
+ ORDER BY q.id
+ LIMIT 1
+ FOR UPDATE OF q SKIP LOCKED;
+ ELSE
+ -- Non-session fetch: only non-session items
+ SELECT q.id, q.session_id INTO v_id, v_session_id
+ FROM SCHEMA.worker_queue q
+ WHERE q.visible_at <= TO_TIMESTAMP(p_now_ms / 1000.0)
+ AND (q.lock_token IS NULL OR q.locked_until <= p_now_ms)
+ AND q.session_id IS NULL
+ ORDER BY q.id
+ LIMIT 1
+ FOR UPDATE OF q SKIP LOCKED;
+ END IF;
IF NOT FOUND THEN
RETURN;
END IF;
out_lock_token := 'lock_' || gen_random_uuid()::TEXT;
-- Increment attempt_count and lock the item
UPDATE SCHEMA.worker_queue
SET lock_token = out_lock_token,
locked_until = p_now_ms + p_lock_timeout_ms,
attempt_count = attempt_count + 1
WHERE id = v_id;
SELECT work_item, attempt_count
INTO out_work_item, out_attempt_count
FROM SCHEMA.worker_queue
WHERE id = v_id;
+ -- If session-bound, upsert the sessions row
+ IF v_session_id IS NOT NULL AND p_owner_id IS NOT NULL THEN
+ v_session_locked_until := p_now_ms + COALESCE(p_session_lock_timeout_ms, p_lock_timeout_ms);
+
+ INSERT INTO SCHEMA.sessions (session_id, worker_id, locked_until, last_activity_at)
+ VALUES (v_session_id, p_owner_id, v_session_locked_until, p_now_ms)
+ ON CONFLICT (session_id) DO UPDATE
+ SET worker_id = p_owner_id,
+ locked_until = v_session_locked_until,
+ last_activity_at = p_now_ms
+ WHERE SCHEMA.sessions.locked_until <= p_now_ms OR SCHEMA.sessions.worker_id = p_owner_id;
+
+ -- If upsert affected 0 rows, another worker owns this session.
+ -- Roll back: clear lock so item can be retried.
+ IF NOT FOUND THEN
+ UPDATE SCHEMA.worker_queue
+ SET lock_token = NULL,
+ locked_until = NULL,
+ attempt_count = attempt_count - 1
+ WHERE id = v_id;
+ RETURN;
+ END IF;
+ END IF;
+
RETURN NEXT;
END;
$function$
\ No newline at end of file
```
### `renew_session_lock` — NEW
```sql
CREATE OR REPLACE FUNCTION SCHEMA.renew_session_lock(p_owner_ids text[], p_now_ms bigint, p_extend_ms bigint, p_idle_timeout_ms bigint)
RETURNS bigint
LANGUAGE plpgsql
AS $function$
DECLARE
v_count BIGINT;
BEGIN
UPDATE SCHEMA.sessions SET locked_until = p_now_ms + p_extend_ms
WHERE worker_id = ANY(p_owner_ids)
AND locked_until > p_now_ms
AND last_activity_at > (p_now_ms - p_idle_timeout_ms);
GET DIAGNOSTICS v_count = ROW_COUNT;
RETURN v_count;
END;
$function$
```
### `renew_work_item_lock` — Body Modified
Full function with diff:
```diff
CREATE OR REPLACE FUNCTION SCHEMA.renew_work_item_lock(p_lock_token text, p_now_ms bigint, p_extend_secs bigint)
RETURNS void
LANGUAGE plpgsql
AS $function$
DECLARE
v_locked_until BIGINT;
v_rows_affected INTEGER;
+ v_session_id TEXT;
BEGIN
-- Calculate new locked_until timestamp
v_locked_until := p_now_ms + (p_extend_secs * 1000);
+
+ -- Read session_id before updating
+ SELECT session_id INTO v_session_id
+ FROM SCHEMA.worker_queue
+ WHERE lock_token = p_lock_token;
-- Update lock timeout only if lock is still valid
- -- Use p_now_ms (from application) for consistent time reference
UPDATE SCHEMA.worker_queue
SET locked_until = v_locked_until
WHERE lock_token = p_lock_token
AND locked_until > p_now_ms;
GET DIAGNOSTICS v_rows_affected = ROW_COUNT;
IF v_rows_affected = 0 THEN
RAISE EXCEPTION 'Lock token invalid, expired, or already acked';
END IF;
+
+ -- Piggyback: update session last_activity_at
+ IF v_session_id IS NOT NULL THEN
+ UPDATE SCHEMA.sessions SET last_activity_at = p_now_ms
+ WHERE session_id = v_session_id AND locked_until > p_now_ms;
+ END IF;
END;
$function$
\ No newline at end of file
```
---