Skip to main content

LIBRARY_SOURCE

Constant LIBRARY_SOURCE 

Source
pub const LIBRARY_SOURCE: &str = "#!lua name=flowfabric\n\n-- source: lua/helpers.lua\n-- FlowFabric shared library-local helpers\n-- These are local functions available to all registered functions in the\n-- flowfabric library. They are NOT independently FCALL-able.\n-- Reference: RFC-010 \u{a7}4.8, RFC-004 \u{a7}Waitpoint Security (HMAC tokens)\n\n---------------------------------------------------------------------------\n-- Capability CSV bounds (RFC-009 \u{a7}7.5)\n---------------------------------------------------------------------------\n-- Shared ceiling for BOTH the worker-side CSV (ff_issue_claim_grant ARGV[9])\n-- AND the execution-side CSV (exec_core.required_capabilities). Defense in\n-- depth against runaway field sizes: a 10k-token list turns into a multi-MB\n-- HSET value and a per-candidate O(N) atomic scan that blocks the shard.\n--\n-- Inclusivity: these are MAXIMUM accepted values. `#csv == CAPS_MAX_BYTES`\n-- and `n == CAPS_MAX_TOKENS` are accepted; one more rejects. Rust-side\n-- ingress (ff-sdk::FlowFabricWorker::connect, ff-scheduler::Scheduler::\n-- claim_for_worker, ff-core::policy::RoutingRequirements deserialization\n-- via lua/execution.lua) enforces the same ceilings so the Lua check is a\n-- defense-in-depth backstop, not the primary validator.\nlocal CAPS_MAX_BYTES  = 4096\nlocal CAPS_MAX_TOKENS = 256\n\n---------------------------------------------------------------------------\n-- Hex / binary helpers (for HMAC-SHA1 token derivation)\n---------------------------------------------------------------------------\n\n-- Convert a hex string to a binary (byte) string. Accepts mixed case.\n-- Returns nil on ANY malformed input: non-string, odd length, OR any\n-- non-hex char (including whitespace, unicode, control chars). Callers\n-- treat nil as invalid_secret.\n--\n-- Rust side (ServerConfig) already validates the env secret as even-length\n-- 0-9a-fA-F, but an operator writing directly to Valkey (or a torn write\n-- during rotation) could bypass that validator. We refuse the conversion\n-- here instead of silently coercing bad pairs to 0 bytes (which would\n-- produce a bogus but valid-looking MAC).\nlocal function hex_to_bytes(hex)\n  if type(hex) ~= \"string\" or #hex % 2 ~= 0 then\n    return nil\n  end\n  local out = {}\n  for i = 1, #hex - 1, 2 do\n    local byte = tonumber(hex:sub(i, i + 1), 16)\n    if not byte then\n      return nil\n    end\n    out[#out + 1] = string.char(byte)\n  end\n  return table.concat(out)\nend\n\n-- XOR two equal-length byte strings. Used for HMAC key-pad construction.\nlocal function xor_bytes(a, b)\n  local out = {}\n  for i = 1, #a do\n    out[i] = string.char(bit.bxor(a:byte(i), b:byte(i)))\n  end\n  return table.concat(out)\nend\n\n-- HMAC-SHA1(key_hex, message) \u{2192} lowercase hex digest (40 chars), or nil on\n-- malformed key_hex (odd-length / non-string). Callers must treat nil as\n-- an invalid-secret error \u{2014} never pass it to HSET / concat / return.\n-- Reference: RFC 2104. SHA1 block size = 64 bytes.\nlocal function hmac_sha1_hex(key_hex, message)\n  local key = hex_to_bytes(key_hex)\n  if not key then\n    return nil\n  end\n  local block_size = 64\n  if #key > block_size then\n    -- Reduce oversized key via SHA1 (per RFC 2104). sha1hex output is 40\n    -- lowercase hex chars, so the inner hex_to_bytes cannot fail.\n    key = hex_to_bytes(redis.sha1hex(key))\n  end\n  if #key < block_size then\n    key = key .. string.rep(\"\\0\", block_size - #key)\n  end\n  local ipad = string.rep(string.char(0x36), block_size)\n  local opad = string.rep(string.char(0x5c), block_size)\n  local inner = redis.sha1hex(xor_bytes(key, ipad) .. message)\n  return redis.sha1hex(xor_bytes(key, opad) .. hex_to_bytes(inner))\nend\n\n-- Constant-time string equality. Returns true iff strings are equal in\n-- both length and content. Uses XOR-accumulation to avoid early-exit\n-- timing leaks on byte mismatches during HMAC token validation.\n-- Reference: Remote timing attacks on authentication (e.g., CVE-2011-3389 class).\n--\n-- Safety note on the length check: a length-mismatch early return reveals\n-- whether the presented string matches the expected length, which is a\n-- timing side channel IF attacker-controlled length is used to probe the\n-- expected length. In this codebase the caller normalizes to a fixed shape\n-- BEFORE reaching here \u{2014} validate_waitpoint_token already requires\n-- #presented == 40 (SHA1 hex digest length) at the parsing boundary, so\n-- any input reaching constant_time_eq has a length already known to be 40\n-- by the attacker. The only length variation here is on `expected`, which\n-- is server-computed and constant. Hence this early return does not leak\n-- secret-dependent timing.\nlocal function constant_time_eq(a, b)\n  if type(a) ~= \"string\" or type(b) ~= \"string\" then\n    return false\n  end\n  if #a ~= #b then\n    return false\n  end\n  local acc = 0\n  for i = 1, #a do\n    acc = bit.bor(acc, bit.bxor(a:byte(i), b:byte(i)))\n  end\n  return acc == 0\nend\n\n---------------------------------------------------------------------------\n-- Waitpoint HMAC tokens (RFC-004 \u{a7}Waitpoint Security)\n---------------------------------------------------------------------------\n--\n-- Token format: \"kid:40hex\"  \u{2014} kid identifies which key signed the token,\n-- enabling zero-downtime rotation. ANY kid present in the secrets hash\n-- with a future `expires_at:<kid>` (or the current kid, which has no\n-- expiry) accepts tokens. This supports rapid rotation: rotating A\u{2192}B\u{2192}C\n-- within a grace window keeps A\'s secret validatable as long as\n-- expires_at:A is still future.\n--\n-- HMAC input binds (waitpoint_id | waitpoint_key | created_at_ms) with a\n-- pipe delimiter so field-boundary confusion cannot produce collisions\n-- across waitpoints.\n--\n-- Secret storage: per-partition replicated hash at\n--   ff:sec:{p:N}:waitpoint_hmac\n-- Fields:\n--   current_kid               \u{2014} the kid minting new tokens (no expiry)\n--   secret:<kid>              \u{2014} hex-encoded HMAC key for each kid ever installed\n--   expires_at:<kid>          \u{2014} unix ms; accept tokens under <kid> iff exp > now_ms\n--                               INVARIANT: expires_at:<current_kid> is NEVER written\n--   previous_kid              \u{2014} observability/audit only: the kid immediately\n--                               preceding current_kid (NOT the only acceptable one)\n--   previous_expires_at       \u{2014} observability/audit only: matches\n--                               expires_at:<previous_kid>\n--\n-- Replication is required for Valkey cluster mode (all FCALL KEYS must\n-- hash to the same slot); rotation fans out across partitions.\n---------------------------------------------------------------------------\n\n-- Read the hmac_secrets hash. Returns a table with:\n--   current_kid, current_secret \u{2014} the minting kid (nil if not initialized)\n--   kid_secrets = { [kid] = { secret = <hex>, expires_at = <ms or nil> } }\n--     includes current_kid (expires_at = nil \u{2192} no expiry)\n--     includes every secret:<kid> present in the hash\n--   previous_kid, previous_secret, previous_expires_at \u{2014} kept for back-compat\n--     (audit log / observability); validate path does NOT depend on them.\n-- Returns nil if the hash is absent.\nlocal function load_waitpoint_secrets(secrets_key)\n  local raw = redis.call(\"HGETALL\", secrets_key)\n  if #raw == 0 then\n    return nil\n  end\n  local t = {}\n  for i = 1, #raw, 2 do\n    t[raw[i]] = raw[i + 1]\n  end\n  local out = {\n    current_kid = t.current_kid,\n    previous_kid = t.previous_kid,\n    previous_expires_at = t.previous_expires_at,\n    kid_secrets = {},\n  }\n  if out.current_kid then\n    out.current_secret = t[\"secret:\" .. out.current_kid]\n  end\n  if out.previous_kid then\n    out.previous_secret = t[\"secret:\" .. out.previous_kid]\n  end\n  -- Multi-kid scan: every secret:<kid> becomes a validation candidate.\n  -- current_kid has no expiry entry (intentional \u{2014} it\'s always valid).\n  -- Other kids are accepted iff expires_at:<kid> is set AND > now_ms; the\n  -- expiry check runs in validate_waitpoint_token so we simply carry the\n  -- raw expires_at string here.\n  for k, v in pairs(t) do\n    if k:sub(1, 7) == \"secret:\" then\n      local kid = k:sub(8)\n      if kid ~= \"\" then\n        out.kid_secrets[kid] = {\n          secret = v,\n          expires_at = t[\"expires_at:\" .. kid],\n        }\n      end\n    end\n  end\n  return out\nend\n\n-- Build the HMAC input string. Pipe delimiter prevents concatenation\n-- collisions across distinct (waitpoint_id, waitpoint_key) pairs.\nlocal function waitpoint_hmac_input(waitpoint_id, waitpoint_key, created_at_ms)\n  return waitpoint_id .. \"|\" .. waitpoint_key .. \"|\" .. tostring(created_at_ms)\nend\n\n-- Mint a waitpoint token using the current kid.\n-- Returns (token, kid) on success or (nil, error_code) on failure.\n-- Defense-in-depth: returns a typed error for missing secrets_key / missing\n-- secrets hash so external callers that construct FCALL KEYS by hand cannot\n-- produce the \"arguments must be strings or integers\" Lua panic via nil.\nlocal function mint_waitpoint_token(secrets_key, waitpoint_id, waitpoint_key, created_at_ms)\n  if type(secrets_key) ~= \"string\" or secrets_key == \"\" then\n    return nil, \"invalid_keys_missing_hmac\"\n  end\n  local secrets = load_waitpoint_secrets(secrets_key)\n  if not secrets or not secrets.current_kid or not secrets.current_secret then\n    return nil, \"hmac_secret_not_initialized\"\n  end\n  local input = waitpoint_hmac_input(waitpoint_id, waitpoint_key, created_at_ms)\n  local digest = hmac_sha1_hex(secrets.current_secret, input)\n  if not digest then\n    return nil, \"invalid_secret\"\n  end\n  return secrets.current_kid .. \":\" .. digest, secrets.current_kid\nend\n\n-- Validate a waitpoint token against the (waitpoint_id, waitpoint_key,\n-- created_at_ms) that were bound at mint time. Accepts tokens signed with\n-- current_kid, or previous_kid if previous_expires_at has not passed.\n-- Returns nil on success or an error code string on failure.\nlocal function validate_waitpoint_token(\n  secrets_key, token, waitpoint_id, waitpoint_key, created_at_ms, now_ms\n)\n  if type(secrets_key) ~= \"string\" or secrets_key == \"\" then\n    return \"invalid_keys_missing_hmac\"\n  end\n  if type(token) ~= \"string\" or token == \"\" then\n    return \"missing_token\"\n  end\n  local sep = token:find(\":\", 1, true)\n  if not sep or sep < 2 or sep >= #token then\n    return \"invalid_token\"\n  end\n  local kid = token:sub(1, sep - 1)\n  local presented = token:sub(sep + 1)\n  if #presented ~= 40 then\n    -- SHA1 hex digest is always 40 chars.\n    return \"invalid_token\"\n  end\n\n  local secrets = load_waitpoint_secrets(secrets_key)\n  if not secrets or not secrets.current_kid then\n    return \"hmac_secret_not_initialized\"\n  end\n\n  -- Multi-kid validation. ANY secret:<kid> present in the hash is a\n  -- candidate IF:\n  --   - kid == current_kid (no expiry, always valid), OR\n  --   - expires_at:<kid> is a positive integer AND > now_ms.\n  --\n  -- Rationale: rapid rotation (A\u{2192}B\u{2192}C inside a grace window) must keep\n  -- in-flight A-signed tokens valid. The previous 2-slot model\n  -- (current + previous) evicted A as soon as B became previous, even\n  -- though expires_at:A was still future. RFC-004 \u{a7}Waitpoint Security\n  -- promises grace duration, not \"grace until next rotation\".\n  --\n  -- Fail-CLOSED on malformed expires_at: a corrupted/non-numeric value\n  -- means \"no affirmative unexpired proof\" \u{2014} reject.\n  local secret = nil\n  local expiry_state = nil  -- \"known_kid_expired\" | \"unknown_kid\"\n  if kid == secrets.current_kid then\n    secret = secrets.current_secret\n  else\n    local entry = secrets.kid_secrets and secrets.kid_secrets[kid]\n    if entry then\n      local exp = tonumber(entry.expires_at)\n      if not exp or exp <= 0 or exp < now_ms then\n        -- secret:<kid> is present but its grace has elapsed (or was\n        -- never recorded). Distinguishable from unknown_kid so the\n        -- caller can log the more-actionable \"token_expired\".\n        expiry_state = \"known_kid_expired\"\n      else\n        secret = entry.secret\n      end\n    else\n      expiry_state = \"unknown_kid\"\n    end\n  end\n\n  if not secret then\n    if expiry_state == \"known_kid_expired\" then\n      return \"token_expired\"\n    end\n    return \"invalid_token\"\n  end\n\n  local input = waitpoint_hmac_input(waitpoint_id, waitpoint_key, created_at_ms)\n  local expected = hmac_sha1_hex(secret, input)\n  if not expected then\n    return \"invalid_secret\"\n  end\n  if not constant_time_eq(expected, presented) then\n    return \"invalid_token\"\n  end\n  return nil\nend\n\n---------------------------------------------------------------------------\n-- Time\n---------------------------------------------------------------------------\n\n-- Returns the Valkey server time as milliseconds. Always prefer this over\n-- a caller-supplied now_ms for fields that are used in retention windows,\n-- eligibility scoring, lease expiry, or any cross-execution causal\n-- comparison. Client-supplied timestamps are trivially skewable and\n-- produce observability drift when compared against fields written by\n-- other Lua functions (which already use redis.call(\"TIME\")).\nlocal function server_time_ms()\n  local t = redis.call(\"TIME\")\n  return tonumber(t[1]) * 1000 + math.floor(tonumber(t[2]) / 1000)\nend\n\n---------------------------------------------------------------------------\n-- Return wrappers (\u{a7}4.9)\n---------------------------------------------------------------------------\n\nlocal function ok(...)\n  return {1, \"OK\", ...}\nend\n\nlocal function err(...)\n  return {0, ...}\nend\n\n-- Require a numeric value from ARGV. Returns the number on success or\n-- an err() tuple on failure. Callers must check: if type(n) == \"table\"\n-- then return n end  (the table IS the err tuple).\nlocal function require_number(val, name)\n  local n = tonumber(val)\n  if n == nil then\n    return err(\"invalid_input\", name .. \" must be a number, got: \" .. tostring(val))\n  end\n  return n\nend\n\nlocal function ok_already_satisfied(...)\n  return {1, \"ALREADY_SATISFIED\", ...}\nend\n\nlocal function ok_duplicate(...)\n  return {1, \"DUPLICATE\", ...}\nend\n\n---------------------------------------------------------------------------\n-- Data access\n---------------------------------------------------------------------------\n\n-- Converts HGETALL flat array {k1, v1, k2, v2, ...} to a Lua dict table.\n-- All RFC pseudocode uses core.field syntax which requires this conversion.\nlocal function hgetall_to_table(flat)\n  local t = {}\n  for i = 1, #flat, 2 do\n    t[flat[i]] = flat[i + 1]\n  end\n  return t\nend\n\n-- Safe nil/empty check. Valkey hashes cannot store nil: HGET on a missing\n-- field returns false (via Lua), and cleared fields store \"\". This helper\n-- handles both cases plus actual nil for fields absent from hgetall_to_table.\nlocal function is_set(v)\n  return v ~= nil and v ~= false and v ~= \"\"\nend\n\n---------------------------------------------------------------------------\n-- Lease validation (most widely shared \u{2014} prevents copy-paste drift)\n-- RFC-010 \u{a7}4.8: 7+ functions use this: complete, fail, suspend, delay,\n-- move_to_waiting_children, append_frame, report_usage.\n---------------------------------------------------------------------------\n\n-- Validates that the caller holds a valid, non-expired, non-revoked lease.\n-- Returns an error tuple on failure, or nil on success.\n-- @param core   table from hgetall_to_table(HGETALL exec_core)\n-- @param argv   table with lease_id, lease_epoch, attempt_id\n-- @param now_ms current timestamp in milliseconds\nlocal function validate_lease(core, argv, now_ms)\n  if core.lifecycle_phase ~= \"active\" then\n    return err(\"execution_not_active\",\n      core.terminal_outcome or \"\", core.current_lease_epoch or \"\")\n  end\n  if core.ownership_state == \"lease_revoked\" then\n    return err(\"lease_revoked\")\n  end\n  if tonumber(core.lease_expires_at or \"0\") <= now_ms then\n    return err(\"lease_expired\")\n  end\n  if core.current_lease_id ~= argv.lease_id then\n    return err(\"stale_lease\")\n  end\n  if core.current_lease_epoch ~= argv.lease_epoch then\n    return err(\"stale_lease\")\n  end\n  if core.current_attempt_id ~= argv.attempt_id then\n    return err(\"stale_lease\")\n  end\n  return nil\nend\n\n-- Sets ownership_state to lease_expired_reclaimable. Idempotent.\n--\n-- Also writes `closed_at`/`closed_reason=\"lease_expired\"` on the attempt\'s\n-- `stream_meta` hash when that stream exists, so `tail_stream` consumers\n-- observe the terminal signal without having to fall back to polling\n-- `execution_state`. This matters for the permanent-failure case (worker\n-- OOM or node dead, no replacement reclaims): the reclaim path that\n-- normally writes `closed_reason=\"reclaimed\"` may never run, and without\n-- this signal the tail poll loop waits forever.\n--\n-- Write order: we only write stream_meta if its existing `closed_at` is\n-- empty. A later `ff_reclaim_execution` that overwrites `closed_reason`\n-- to \"reclaimed\" still wins because it unconditionally HSETs the field;\n-- this function intentionally does NOT overwrite a pre-existing close.\n--\n-- Key construction: the stream_meta key is derived from core_key\'s\n-- `{p:N}` hash tag + current_attempt_index. All three keys share the\n-- same hash tag, so this stays single-slot in cluster mode despite not\n-- being declared in KEYS upfront \u{2014} mirrors the dynamic attempt/lane key\n-- construction in `ff_create_execution`.\n--\n-- @param keys   table with core_key, lease_history_key\n-- @param core   table from hgetall_to_table\n-- @param now_ms current timestamp in milliseconds\n-- @param maxlen MAXLEN for lease_history stream\nlocal function mark_expired(keys, core, now_ms, maxlen)\n  if core.ownership_state == \"lease_expired_reclaimable\" then\n    return -- idempotent\n  end\n  -- ALL 7 dims (preserve lifecycle_phase=active, eligibility_state, terminal_outcome=none)\n  redis.call(\"HSET\", keys.core_key,\n    \"lifecycle_phase\", core.lifecycle_phase or \"active\",     -- preserve\n    \"ownership_state\", \"lease_expired_reclaimable\",\n    \"eligibility_state\", core.eligibility_state or \"not_applicable\", -- preserve\n    \"blocking_reason\", \"waiting_for_worker\",\n    \"blocking_detail\", \"lease expired, awaiting reclaim\",\n    \"terminal_outcome\", core.terminal_outcome or \"none\",     -- preserve\n    \"attempt_state\", \"attempt_interrupted\",\n    \"public_state\", \"active\",\n    \"lease_expired_at\", now_ms,\n    \"last_mutation_at\", now_ms)\n  redis.call(\"XADD\", keys.lease_history_key, \"MAXLEN\", \"~\", maxlen, \"*\",\n    \"event\", \"expired\",\n    \"lease_id\", core.current_lease_id or \"\",\n    \"lease_epoch\", core.current_lease_epoch or \"\",\n    \"attempt_index\", core.current_attempt_index or \"\",\n    \"attempt_id\", core.current_attempt_id or \"\",\n    \"worker_id\", core.current_worker_id or \"\",\n    \"worker_instance_id\", core.current_worker_instance_id or \"\",\n    \"ts\", now_ms)\n\n  -- Close stream_meta (if the stream was lazily created) so tail_stream\n  -- consumers receive the terminal signal. Core key format:\n  --   ff:exec:{p:N}:<eid>:core\n  -- Stream meta key format:\n  --   ff:stream:{p:N}:<eid>:<attempt_index>:meta\n  local att_idx = core.current_attempt_index\n  if att_idx ~= nil and att_idx ~= \"\" then\n    local tag_open  = string.find(keys.core_key, \"{\", 1, true)\n    local tag_close = tag_open and string.find(keys.core_key, \"}\", tag_open, true)\n    if tag_open and tag_close then\n      local tag = string.sub(keys.core_key, tag_open, tag_close)\n      -- After `}:` comes `<eid>:core`. Walk past the `}:` delimiter.\n      local after_tag = string.sub(keys.core_key, tag_close + 2)\n      local eid_end = string.find(after_tag, \":core\", 1, true)\n      if eid_end then\n        local eid = string.sub(after_tag, 1, eid_end - 1)\n        local stream_meta_key = \"ff:stream:\" .. tag .. \":\" .. eid\n                                .. \":\" .. tostring(att_idx) .. \":meta\"\n        if redis.call(\"EXISTS\", stream_meta_key) == 1 then\n          local existing_closed_at = redis.call(\"HGET\", stream_meta_key, \"closed_at\")\n          if not is_set(existing_closed_at) then\n            redis.call(\"HSET\", stream_meta_key,\n              \"closed_at\", tostring(now_ms),\n              \"closed_reason\", \"lease_expired\")\n          end\n        end\n      end\n    end\n  end\nend\n\n-- Validates lease AND atomically marks expired if lease has lapsed.\n-- Use this variant for write-path callers (complete, fail, suspend, delay,\n-- move_to_waiting_children) that have the lease_history key available.\n-- For read-only callers (append_frame, report_usage) use validate_lease.\n-- @param core   table from hgetall_to_table\n-- @param argv   table with lease_id, lease_epoch, attempt_id\n-- @param now_ms current timestamp in milliseconds\n-- @param keys   table with core_key, lease_history_key\n-- @param maxlen MAXLEN for lease_history stream\nlocal function validate_lease_and_mark_expired(core, argv, now_ms, keys, maxlen)\n  if core.lifecycle_phase ~= \"active\" then\n    return err(\"execution_not_active\",\n      core.terminal_outcome or \"\", core.current_lease_epoch or \"\")\n  end\n  if core.ownership_state == \"lease_revoked\" then\n    return err(\"lease_revoked\")\n  end\n  if tonumber(core.lease_expires_at or \"0\") <= now_ms then\n    mark_expired(keys, core, now_ms, maxlen)\n    return err(\"lease_expired\")\n  end\n  if core.current_lease_id ~= argv.lease_id then\n    return err(\"stale_lease\")\n  end\n  if core.current_lease_epoch ~= argv.lease_epoch then\n    return err(\"stale_lease\")\n  end\n  if core.current_attempt_id ~= argv.attempt_id then\n    return err(\"stale_lease\")\n  end\n  return nil\nend\n\n-- Consolidates the ~15-line lease release block shared by 7 functions.\n-- DEL lease_current, ZREM lease_expiry + worker_leases + active_index,\n-- clear lease fields on exec_core, XADD lease_history \"released\".\n-- @param keys    table with lease_current_key, lease_expiry_key,\n--                worker_leases_key, active_index_key, lease_history_key,\n--                attempt_timeout_key, core_key\n-- @param core    table from hgetall_to_table\n-- @param reason  string reason for release (e.g. \"completed\", \"suspend\")\n-- @param now_ms  current timestamp in milliseconds\n-- @param maxlen  MAXLEN for lease_history stream\nlocal function clear_lease_and_indexes(keys, core, reason, now_ms, maxlen)\n  local eid = core.execution_id or \"\"\n\n  -- DEL lease record\n  redis.call(\"DEL\", keys.lease_current_key)\n\n  -- ZREM/SREM from scheduling indexes\n  redis.call(\"ZREM\", keys.lease_expiry_key, eid)\n  redis.call(\"SREM\", keys.worker_leases_key, eid)\n  redis.call(\"ZREM\", keys.active_index_key, eid)\n  redis.call(\"ZREM\", keys.attempt_timeout_key, eid)\n\n  -- Clear lease fields on exec_core (including stale expiry/revocation markers)\n  redis.call(\"HSET\", keys.core_key,\n    \"current_lease_id\", \"\",\n    \"current_worker_id\", \"\",\n    \"current_worker_instance_id\", \"\",\n    \"lease_expires_at\", \"\",\n    \"lease_last_renewed_at\", \"\",\n    \"lease_renewal_deadline\", \"\",\n    \"lease_expired_at\", \"\",\n    \"lease_revoked_at\", \"\",\n    \"lease_revoke_reason\", \"\",\n    \"last_mutation_at\", now_ms)\n\n  -- Lease history event\n  redis.call(\"XADD\", keys.lease_history_key, \"MAXLEN\", \"~\", maxlen, \"*\",\n    \"event\", \"released\",\n    \"lease_id\", core.current_lease_id or \"\",\n    \"lease_epoch\", core.current_lease_epoch or \"\",\n    \"attempt_index\", core.current_attempt_index or \"\",\n    \"attempt_id\", core.current_attempt_id or \"\",\n    \"reason\", reason,\n    \"ts\", now_ms)\nend\n\n---------------------------------------------------------------------------\n-- Defensive index cleanup\n-- RFC-010 \u{a7}4.8: ZREM execution_id from all scheduling + timeout indexes\n-- except except_key. ~14 ZREM/SREM calls.\n---------------------------------------------------------------------------\n\n-- @param keys       table with all index key names\n-- @param eid        execution_id string\n-- @param except_key optional key to skip (the target index for this transition)\nlocal function defensive_zrem_all_indexes(keys, eid, except_key)\n  -- Each index key and whether it uses ZREM or SREM\n  local zrem_keys = {\n    keys.eligible_key,\n    keys.delayed_key,\n    keys.active_index_key,\n    keys.suspended_key,\n    keys.terminal_key,\n    keys.blocked_deps_key,\n    keys.blocked_budget_key,\n    keys.blocked_quota_key,\n    keys.blocked_route_key,\n    keys.blocked_operator_key,\n    keys.lease_expiry_key,\n    keys.suspension_timeout_key,\n    keys.attempt_timeout_key,\n    keys.execution_deadline_key,\n  }\n  for _, k in ipairs(zrem_keys) do\n    if k and k ~= except_key then\n      redis.call(\"ZREM\", k, eid)\n    end\n  end\n  -- worker_leases is a SET, not ZSET\n  if keys.worker_leases_key and keys.worker_leases_key ~= except_key then\n    redis.call(\"SREM\", keys.worker_leases_key, eid)\n  end\nend\n\n---------------------------------------------------------------------------\n-- Suspension reason \u{2192} blocking_reason mapping\n-- RFC-004 \u{a7}Suspension Reason Categories\n---------------------------------------------------------------------------\n\nlocal REASON_TO_BLOCKING = {\n  waiting_for_signal       = \"waiting_for_signal\",\n  waiting_for_approval     = \"waiting_for_approval\",\n  waiting_for_callback     = \"waiting_for_callback\",\n  waiting_for_tool_result  = \"waiting_for_tool_result\",\n  waiting_for_operator_review = \"paused_by_operator\",\n  paused_by_policy         = \"paused_by_policy\",\n  paused_by_budget         = \"waiting_for_budget\",\n  step_boundary            = \"waiting_for_signal\",\n  manual_pause             = \"paused_by_operator\",\n}\n\nlocal function map_reason_to_blocking(reason_code)\n  return REASON_TO_BLOCKING[reason_code] or \"waiting_for_signal\"\nend\n\n---------------------------------------------------------------------------\n-- Resume condition evaluation (shared module)\n-- RFC-004 \u{a7}Resume Condition Model, RFC-005 \u{a7}8.3\n---------------------------------------------------------------------------\n\n-- Parse resume_condition_json into a matcher table used by\n-- evaluate_signal_against_condition and is_condition_satisfied.\n-- @param json  JSON string of the resume condition\n-- @return table with matchers array, match_mode, minimum_signal_count, etc.\nlocal function initialize_condition(json)\n  local spec = cjson.decode(json)\n  local matchers = {}\n  local names = spec.required_signal_names or {}\n  if #names == 0 then\n    -- Empty required_signal_names acts as wildcard: ANY signal satisfies the condition.\n    -- To require explicit operator resume (no signal match), pass a sentinel name\n    -- that no real signal will match, or use a different resume mechanism.\n    matchers[1] = { name = \"\", satisfied = false, signal_id = \"\" }\n  else\n    for i, name in ipairs(names) do\n      matchers[i] = { name = name, satisfied = false, signal_id = \"\" }\n    end\n  end\n  return {\n    condition_type       = spec.condition_type or \"signal_set\",\n    match_mode           = spec.signal_match_mode or \"any\",\n    minimum_signal_count = tonumber(spec.minimum_signal_count or \"1\"),\n    total_matchers       = #names > 0 and #names or 1,\n    satisfied_count      = 0,\n    matchers             = matchers,\n    closed               = false,\n  }\nend\n\n-- Write condition state to a dedicated condition hash key.\n-- @param key    Valkey key for the condition hash\n-- @param cond   condition table from initialize_condition\n-- @param now_ms current timestamp\nlocal function write_condition_hash(key, cond, now_ms)\n  local fields = {\n    \"condition_type\", cond.condition_type,\n    \"match_mode\", cond.match_mode,\n    \"minimum_signal_count\", tostring(cond.minimum_signal_count),\n    \"total_matchers\", tostring(cond.total_matchers),\n    \"satisfied_count\", tostring(cond.satisfied_count),\n    \"closed\", cond.closed and \"1\" or \"0\",\n    \"updated_at\", tostring(now_ms),\n  }\n  for i = 1, cond.total_matchers do\n    local m = cond.matchers[i]\n    local idx = i - 1  -- external field names remain 0-based for wire compat\n    fields[#fields + 1] = \"matcher:\" .. idx .. \":name\"\n    fields[#fields + 1] = m.name\n    fields[#fields + 1] = \"matcher:\" .. idx .. \":satisfied\"\n    fields[#fields + 1] = m.satisfied and \"1\" or \"0\"\n    fields[#fields + 1] = \"matcher:\" .. idx .. \":signal_id\"\n    fields[#fields + 1] = m.signal_id\n  end\n  redis.call(\"HSET\", key, unpack(fields))\nend\n\n-- Match a signal against the condition\'s matchers. Mutates cond in-place.\n-- @param cond        condition table from initialize_condition\n-- @param signal_name signal name string\n-- @param signal_id   signal ID string\n-- @return true if this signal matched a matcher, false otherwise\nlocal function evaluate_signal_against_condition(cond, signal_name, signal_id)\n  for i = 1, cond.total_matchers do\n    local m = cond.matchers[i]\n    if not m.satisfied then\n      -- Empty name = wildcard matcher (matches any signal)\n      if m.name == \"\" or m.name == signal_name then\n        m.satisfied = true\n        m.signal_id = signal_id or \"\"\n        cond.satisfied_count = cond.satisfied_count + 1\n        return true\n      end\n    end\n  end\n  return false\nend\n\n-- Check if the overall condition is satisfied based on mode.\n-- @param cond  condition table\n-- @return true if condition is satisfied\nlocal function is_condition_satisfied(cond)\n  local mode = cond.match_mode\n  local min_count = cond.minimum_signal_count\n  if mode == \"any\" then\n    return cond.satisfied_count >= min_count\n  elseif mode == \"all\" then\n    return cond.satisfied_count >= cond.total_matchers\n  end\n  -- count(n) mode \u{2014} same as any with minimum_signal_count = n\n  return cond.satisfied_count >= min_count\nend\n\n-- Extract a named field from a Valkey Stream entry\'s flat field array.\n-- Stream entries return {id, {field1, val1, field2, val2, ...}}.\n-- This operates on the inner flat array.\n-- @param fields flat array from stream entry[2]\n-- @param name   field name to extract\n-- @return value string or nil\nlocal function extract_field(fields, name)\n  for i = 1, #fields, 2 do\n    if fields[i] == name then\n      return fields[i + 1]\n    end\n  end\n  return nil\nend\n\n---------------------------------------------------------------------------\n-- Suspension helpers (RFC-004)\n---------------------------------------------------------------------------\n\n-- Returns an empty signal summary JSON string for initial suspension record.\nlocal function initial_signal_summary_json()\n  return \'{\"total_count\":0,\"matched_count\":0,\"signal_names\":[]}\'\nend\n\n-- Validates that a pending waitpoint can be activated by a suspension.\n-- Returns error tuple on failure, nil on success.\n-- @param wp_raw   flat array from HGETALL on waitpoint hash\n-- @param eid      expected execution_id\n-- @param att_idx  expected attempt_index (string)\n-- @param now_ms   current timestamp\nlocal function validate_pending_waitpoint(wp_raw, eid, att_idx, now_ms)\n  if #wp_raw == 0 then\n    return err(\"waitpoint_not_found\")\n  end\n  local wp = hgetall_to_table(wp_raw)\n  if wp.state ~= \"pending\" then\n    return err(\"waitpoint_not_pending\")\n  end\n  if wp.execution_id ~= eid then\n    return err(\"invalid_waitpoint_for_execution\")\n  end\n  if tostring(wp.attempt_index) ~= tostring(att_idx) then\n    return err(\"invalid_waitpoint_for_execution\")\n  end\n  -- Check if pending waitpoint has expired\n  if is_set(wp.expires_at) and tonumber(wp.expires_at) <= now_ms then\n    return err(\"pending_waitpoint_expired\")\n  end\n  return nil\nend\n\n-- Validates that a suspension record exists and is open (not closed).\n-- Returns error tuple on failure, nil on success. Also returns the parsed table.\n-- @param susp_raw flat array from HGETALL on suspension:current\nlocal function assert_active_suspension(susp_raw)\n  if #susp_raw == 0 then\n    return err(\"execution_not_suspended\")\n  end\n  local susp = hgetall_to_table(susp_raw)\n  if not is_set(susp.suspension_id) then\n    return err(\"execution_not_suspended\")\n  end\n  if is_set(susp.closed_at) then\n    return err(\"execution_not_suspended\")\n  end\n  return nil, susp\nend\n\n-- Validates that a waitpoint belongs to the expected execution + suspension.\n-- Returns error tuple on failure, nil on success.\n-- @param wp_raw   flat array from HGETALL on waitpoint hash\n-- @param eid      expected execution_id\n-- @param sid      expected suspension_id\n-- @param wid      expected waitpoint_id\nlocal function assert_waitpoint_belongs(wp_raw, eid, sid, wid)\n  if #wp_raw == 0 then\n    return err(\"waitpoint_not_found\")\n  end\n  local wp = hgetall_to_table(wp_raw)\n  if wp.execution_id ~= eid then\n    return err(\"invalid_waitpoint_for_execution\")\n  end\n  if is_set(sid) and wp.suspension_id ~= sid then\n    return err(\"invalid_waitpoint_for_execution\")\n  end\n  if is_set(wid) and wp.waitpoint_id ~= wid then\n    return err(\"invalid_waitpoint_for_execution\")\n  end\n  return nil\nend\n\n---------------------------------------------------------------------------\n-- Policy\n---------------------------------------------------------------------------\n\n-- Decode a JSON policy string into flat key-value pairs suitable for HSET.\n-- @param json  JSON string of the policy object\n-- @return flat array {k1, v1, k2, v2, ...} for use with redis.call(\"HSET\", key, unpack(...))\nlocal function unpack_policy(json)\n  local policy = cjson.decode(json)\n  local flat = {}\n  for k, v in pairs(policy) do\n    flat[#flat + 1] = k\n    if type(v) == \"table\" then\n      flat[#flat + 1] = cjson.encode(v)\n    else\n      flat[#flat + 1] = tostring(v)\n    end\n  end\n  return flat\nend\n\n\n-- source: lua/version.lua\n-- FlowFabric library version check\n-- Returns the library version string. Used by the loader to detect\n-- whether the library is loaded and at the expected version.\n--\n-- Bump this string whenever any registered function\'s KEYS or ARGV arity\n-- changes, or a new function is added. Mismatched versions force\n-- `FUNCTION LOAD REPLACE` so old binaries cannot FCALL a library whose key\n-- signatures they expect a different shape for.\n--\n-- SINGLE SOURCE OF TRUTH: Rust\'s `LIBRARY_VERSION` is extracted from the\n-- `return \'X\'` literal below by `scripts/gen-ff-script-lua.sh`, which\n-- writes `crates/ff-script/src/flowfabric_lua_version`. Rust reads that\n-- file via `include_str!`. Do NOT maintain a separate Rust literal.\n-- Extract contract: the body MUST contain exactly one `return \'X\'` literal\n-- with single quotes (not double). CI runs the gen script and diffs; any\n-- drift fails the build.\n\nredis.register_function(\'ff_version\', function(keys, args)\n  return \'6\'\nend)\n\n\n-- source: lua/lease.lua\n-- FlowFabric lease management functions\n-- Reference: RFC-003 (Lease and Fencing), RFC-010 \u{a7}4 (function inventory)\n--\n-- Depends on helpers: ok, err, ok_already_satisfied, hgetall_to_table,\n--   is_set, validate_lease, mark_expired\n\n---------------------------------------------------------------------------\n-- #6  ff_renew_lease\n--\n-- Extends a still-valid lease.  Preserves lease_id and lease_epoch.\n-- KEYS (4): exec_core, lease_current, lease_history, lease_expiry_zset\n-- ARGV (7): execution_id, attempt_index, attempt_id,\n--           lease_id, lease_epoch,\n--           lease_ttl_ms, lease_history_grace_ms\n---------------------------------------------------------------------------\nredis.register_function(\'ff_renew_lease\', function(keys, args)\n  -- Positional KEYS\n  local K = {\n    core_key         = keys[1],\n    lease_current    = keys[2],\n    lease_history    = keys[3],\n    lease_expiry_key = keys[4],\n  }\n\n  -- Positional ARGV\n  local lease_ttl_n = require_number(args[6], \"lease_ttl_ms\")\n  if type(lease_ttl_n) == \"table\" then return lease_ttl_n end\n  local grace_n = require_number(args[7], \"lease_history_grace_ms\")\n  if type(grace_n) == \"table\" then return grace_n end\n\n  local A = {\n    execution_id         = args[1],\n    attempt_index        = args[2],\n    attempt_id           = args[3],\n    lease_id             = args[4],\n    lease_epoch          = args[5],\n    lease_ttl_ms         = lease_ttl_n,\n    lease_history_grace_ms = grace_n,\n  }\n\n  -- Server time\n  local t = redis.call(\"TIME\")\n  local now_ms = tonumber(t[1]) * 1000 + math.floor(tonumber(t[2]) / 1000)\n\n  -- Read execution core\n  local raw = redis.call(\"HGETALL\", K.core_key)\n  if #raw == 0 then\n    return err(\"execution_not_found\")\n  end\n  local core = hgetall_to_table(raw)\n\n  -- Validate lifecycle\n  if core.lifecycle_phase ~= \"active\" then\n    return err(\"execution_not_active\",\n      core.terminal_outcome or \"\", core.current_lease_epoch or \"\")\n  end\n\n  -- Check revocation\n  if core.ownership_state == \"lease_revoked\" or is_set(core.lease_revoked_at) then\n    return err(\"lease_revoked\")\n  end\n\n  -- Check expiry (if expired, mark and reject)\n  if tonumber(core.lease_expires_at or \"0\") <= now_ms then\n    mark_expired(\n      { core_key = K.core_key, lease_history_key = K.lease_history },\n      core, now_ms, 1000)\n    return err(\"lease_expired\")\n  end\n\n  -- Validate caller identity\n  if tostring(core.current_attempt_index) ~= A.attempt_index then\n    return err(\"stale_lease\")\n  end\n  if core.current_attempt_id ~= A.attempt_id then\n    return err(\"stale_lease\")\n  end\n  if core.current_lease_id ~= A.lease_id then\n    return err(\"stale_lease\")\n  end\n  if tostring(core.current_lease_epoch) ~= A.lease_epoch then\n    return err(\"stale_lease\")\n  end\n\n  -- Compute new deadlines\n  local new_expires_at = now_ms + A.lease_ttl_ms\n  local new_renewal_deadline = now_ms + math.floor(A.lease_ttl_ms * 2 / 3)\n\n  -- Update exec_core\n  redis.call(\"HSET\", K.core_key,\n    \"lease_last_renewed_at\", tostring(now_ms),\n    \"lease_renewal_deadline\", tostring(new_renewal_deadline),\n    \"lease_expires_at\", tostring(new_expires_at),\n    \"last_mutation_at\", tostring(now_ms))\n\n  -- Update lease_current\n  redis.call(\"HSET\", K.lease_current,\n    \"last_renewed_at\", tostring(now_ms),\n    \"renewal_deadline\", tostring(new_renewal_deadline),\n    \"expires_at\", tostring(new_expires_at))\n  redis.call(\"PEXPIREAT\", K.lease_current,\n    new_expires_at + A.lease_history_grace_ms)\n\n  -- Update lease_expiry index\n  redis.call(\"ZADD\", K.lease_expiry_key, new_expires_at, A.execution_id)\n\n  -- Renewal history event OFF for v1 (RFC-010 \u{a7}4.8g).\n  -- The exec_core field lease_last_renewed_at provides the latest renewal\n  -- timestamp without stream overhead.  Enable per-lane when detailed\n  -- ownership audit trails are needed.\n\n  return ok(tostring(new_expires_at))\nend)\n\n---------------------------------------------------------------------------\n-- #28  ff_mark_lease_expired_if_due\n--\n-- Called by the lease expiry scanner.  Re-validates that the lease is\n-- actually expired (guards against renewal since the ZRANGEBYSCORE read).\n-- Idempotent: no-op if already expired/reclaimed/revoked or not yet due.\n--\n-- KEYS (4): exec_core, lease_current, lease_expiry_zset, lease_history\n-- ARGV (1): execution_id\n---------------------------------------------------------------------------\nredis.register_function(\'ff_mark_lease_expired_if_due\', function(keys, args)\n  -- Positional KEYS\n  local K = {\n    core_key         = keys[1],\n    lease_current    = keys[2],\n    lease_expiry_key = keys[3],\n    lease_history    = keys[4],\n  }\n\n  local execution_id = args[1]\n\n  -- Server time\n  local t = redis.call(\"TIME\")\n  local now_ms = tonumber(t[1]) * 1000 + math.floor(tonumber(t[2]) / 1000)\n\n  -- Read execution core\n  local raw = redis.call(\"HGETALL\", K.core_key)\n  if #raw == 0 then\n    -- Execution gone \u{2014} clean up stale index entry\n    redis.call(\"ZREM\", K.lease_expiry_key, execution_id)\n    return ok_already_satisfied(\"execution_not_found\")\n  end\n  local core = hgetall_to_table(raw)\n\n  -- Guard: not active \u{2192} nothing to expire\n  if core.lifecycle_phase ~= \"active\" then\n    -- Stale index entry: execution already left active phase.\n    -- Clean up and return.\n    redis.call(\"ZREM\", K.lease_expiry_key, execution_id)\n    return ok_already_satisfied(\"not_active\")\n  end\n\n  -- Guard: already marked expired or ownership already cleared\n  if core.ownership_state == \"lease_expired_reclaimable\" then\n    return ok_already_satisfied(\"already_expired\")\n  end\n  if core.ownership_state == \"lease_revoked\" then\n    return ok_already_satisfied(\"already_revoked\")\n  end\n  if core.ownership_state == \"unowned\" then\n    -- Shouldn\'t happen for active phase, but defensive\n    redis.call(\"ZREM\", K.lease_expiry_key, execution_id)\n    return ok_already_satisfied(\"unowned\")\n  end\n\n  -- Check if lease is actually expired\n  local expires_at = tonumber(core.lease_expires_at or \"0\")\n  if expires_at > now_ms then\n    -- Lease was renewed since the scanner read the index.\n    -- The ZADD in renew_lease already updated the score.\n    return ok_already_satisfied(\"not_yet_expired\")\n  end\n\n  -- Mark expired using shared helper\n  -- Sets ownership_state=lease_expired_reclaimable, blocking_reason,\n  -- blocking_detail, lease_expired_at, XADD lease_history \"expired\" event.\n  mark_expired(\n    { core_key = K.core_key, lease_history_key = K.lease_history },\n    core, now_ms, 1000)\n\n  -- NOTE: Do NOT ZREM from lease_expiry_key here.\n  -- The entry stays so the scheduler (or a subsequent scanner cycle)\n  -- can discover reclaimable executions from the same index.\n  -- reclaim_execution or cancel_execution removes it.\n\n  return ok(\"marked_expired\")\nend)\n\n---------------------------------------------------------------------------\n-- #8  ff_revoke_lease\n--\n-- Operator-initiated lease revocation.  Immediate \u{2014} no grace period.\n-- Does NOT terminal-transition the execution.  It clears the current\n-- owner and places the execution into lease_revoked ownership condition.\n-- The scheduler or operator must subsequently reclaim or cancel.\n--\n-- KEYS (5): exec_core, lease_current, lease_history,\n--           lease_expiry_zset, worker_leases\n-- ARGV (3): execution_id, expected_lease_id (or \"\" to skip check),\n--           revoke_reason\n---------------------------------------------------------------------------\nredis.register_function(\'ff_revoke_lease\', function(keys, args)\n  -- Positional KEYS\n  local K = {\n    core_key         = keys[1],\n    lease_current    = keys[2],\n    lease_history    = keys[3],\n    lease_expiry_key = keys[4],\n    worker_leases    = keys[5],\n  }\n\n  -- Positional ARGV\n  local A = {\n    execution_id     = args[1],\n    expected_lease_id = args[2],\n    revoke_reason    = args[3],\n  }\n\n  -- Server time\n  local t = redis.call(\"TIME\")\n  local now_ms = tonumber(t[1]) * 1000 + math.floor(tonumber(t[2]) / 1000)\n\n  -- Read execution core\n  local raw = redis.call(\"HGETALL\", K.core_key)\n  if #raw == 0 then\n    return err(\"execution_not_found\")\n  end\n  local core = hgetall_to_table(raw)\n\n  -- Must be active + leased\n  if core.lifecycle_phase ~= \"active\" then\n    return err(\"execution_not_active\",\n      core.terminal_outcome or \"\", core.current_lease_epoch or \"\")\n  end\n  if core.ownership_state ~= \"leased\" then\n    if core.ownership_state == \"lease_revoked\" then\n      return ok_already_satisfied(\"already_revoked\")\n    end\n    if core.ownership_state == \"lease_expired_reclaimable\" then\n      return ok_already_satisfied(\"already_expired\")\n    end\n    return err(\"no_active_lease\")\n  end\n\n  -- Optional lease_id check (for targeted revocation)\n  if is_set(A.expected_lease_id) and core.current_lease_id ~= A.expected_lease_id then\n    return err(\"stale_lease\",\n      \"expected \" .. A.expected_lease_id .. \" but current is \" .. (core.current_lease_id or \"\"))\n  end\n\n  -- Capture identity for history before clearing\n  local lease_id = core.current_lease_id or \"\"\n  local lease_epoch = core.current_lease_epoch or \"\"\n  local attempt_index = core.current_attempt_index or \"\"\n  local attempt_id = core.current_attempt_id or \"\"\n  local worker_id = core.current_worker_id or \"\"\n  local worker_instance_id = core.current_worker_instance_id or \"\"\n\n  -- Update exec_core: all 7 state vector dimensions + public_state\n  redis.call(\"HSET\", K.core_key,\n    -- 7 state vector dimensions\n    \"lifecycle_phase\",   \"active\",\n    \"ownership_state\",   \"lease_revoked\",\n    \"eligibility_state\", core.eligibility_state or \"not_applicable\",\n    \"blocking_reason\",   \"waiting_for_worker\",\n    \"blocking_detail\",   \"lease revoked: \" .. (A.revoke_reason or \"operator\"),\n    \"terminal_outcome\",  \"none\",\n    \"attempt_state\",     \"attempt_interrupted\",\n    -- Derived public state (RFC-001 \u{a7}2.4 D3: active \u{2192} public_state=active)\n    \"public_state\",      \"active\",\n    -- Revocation fields\n    \"lease_revoked_at\",    tostring(now_ms),\n    \"lease_revoke_reason\", A.revoke_reason or \"operator\",\n    \"last_mutation_at\",    tostring(now_ms))\n\n  -- DEL lease_current\n  redis.call(\"DEL\", K.lease_current)\n\n  -- ZREM from lease expiry index\n  redis.call(\"ZREM\", K.lease_expiry_key, A.execution_id)\n\n  -- SREM from worker leases set\n  redis.call(\"SREM\", K.worker_leases, A.execution_id)\n\n  -- Append revoked event to lease history\n  redis.call(\"XADD\", K.lease_history, \"MAXLEN\", \"~\", 1000, \"*\",\n    \"event\",              \"revoked\",\n    \"lease_id\",           lease_id,\n    \"lease_epoch\",        lease_epoch,\n    \"attempt_index\",      attempt_index,\n    \"attempt_id\",         attempt_id,\n    \"worker_id\",          worker_id,\n    \"worker_instance_id\", worker_instance_id,\n    \"reason\",             A.revoke_reason or \"operator\",\n    \"ts\",                 tostring(now_ms))\n\n  return ok(\"revoked\", lease_id, lease_epoch)\nend)\n-- NOTE: ff_issue_reclaim_grant is in scheduling.lua\n-- NOTE: ff_reclaim_execution is in execution.lua\n\n\n-- source: lua/execution.lua\n-- FlowFabric execution lifecycle functions\n-- Reference: RFC-001 (Execution), RFC-010 \u{a7}4 (function inventory)\n--\n-- Depends on helpers: ok, err, hgetall_to_table, is_set,\n--   validate_lease, validate_lease_and_mark_expired,\n--   clear_lease_and_indexes, defensive_zrem_all_indexes, unpack_policy\n\n---------------------------------------------------------------------------\n-- #0  ff_create_execution\n--\n-- Creates a new execution: core hash, payload, policy, tags, indexes.\n-- Idempotent via idempotency key (SET NX with TTL).\n--\n-- KEYS (8): exec_core, payload_key, policy_key, tags_key,\n--           eligible_or_delayed_zset, idem_key,\n--           execution_deadline_zset, all_executions_set\n-- ARGV (13): execution_id, namespace, lane_id, execution_kind,\n--            priority, creator_identity, policy_json,\n--            input_payload, delay_until, dedup_ttl_ms,\n--            tags_json, execution_deadline_at, partition_id\n---------------------------------------------------------------------------\nredis.register_function(\'ff_create_execution\', function(keys, args)\n  local K = {\n    core_key            = keys[1],\n    payload_key         = keys[2],\n    policy_key          = keys[3],\n    tags_key            = keys[4],\n    scheduling_zset     = keys[5],  -- eligible or delayed\n    idem_key            = keys[6],\n    deadline_zset       = keys[7],\n    all_executions_set  = keys[8],\n  }\n\n  local priority_n = require_number(args[5], \"priority\")\n  if type(priority_n) == \"table\" then return priority_n end\n\n  local A = {\n    execution_id        = args[1],\n    namespace           = args[2],\n    lane_id             = args[3],\n    execution_kind      = args[4],\n    priority            = priority_n,\n    creator_identity    = args[6],\n    policy_json         = args[7],\n    input_payload       = args[8],\n    delay_until         = args[9],   -- \"\" or ms timestamp\n    dedup_ttl_ms        = args[10],  -- \"\" or ms\n    tags_json           = args[11],  -- \"\" or JSON object\n    execution_deadline_at = args[12], -- \"\" or ms timestamp\n    partition_id        = args[13],\n  }\n\n  local t = redis.call(\"TIME\")\n  local now_ms = tonumber(t[1]) * 1000 + math.floor(tonumber(t[2]) / 1000)\n\n  -- Clamp priority to [0, 9000]. The composite eligible-ZSET score formula\n  -- -(priority * 1_000_000_000_000) + created_at uses Lua doubles (IEEE 754,\n  -- 53-bit mantissa). priority > 9007 overflows the multiplication step;\n  -- priority > ~8300 overflows the combined score when created_at is added.\n  -- Clamping to 9000 keeps a safe margin while supporting 4+ orders of magnitude.\n  if A.priority < 0 then A.priority = 0 end\n  if A.priority > 9000 then A.priority = 9000 end\n\n  -- 1. Idempotency check (only when idem_key is a real key, not the noop placeholder)\n  if K.idem_key ~= \"\" and not string.find(K.idem_key, \"ff:noop:\") then\n    local existing = redis.call(\"GET\", K.idem_key)\n    if existing then\n      return ok_duplicate(existing)\n    end\n  end\n\n  -- 2. Guard: execution already exists\n  if redis.call(\"EXISTS\", K.core_key) == 1 then\n    return {1, \"DUPLICATE\", A.execution_id}\n  end\n\n  -- 2b. Policy JSON validation \u{2014} extract required_capabilities CSV now, BEFORE\n  -- any write, so an error aborts without leaving a half-written exec_core.\n  -- Fail-CLOSED on any malformed/typed input rather than silently defaulting\n  -- to the empty-required wildcard \u{2014} required_capabilities is security-\n  -- sensitive (an empty set matches any worker).\n  --\n  --   * An empty / \"{}\" policy_json \u{2192} no required_capabilities field written\n  --     \u{2192} wildcard match, intentional.\n  --   * A non-empty policy_json that fails cjson.decode \u{2192} fail with\n  --     invalid_policy_json. An operator who MEANT \"no policy\" passes \"\".\n  --   * routing_requirements.required_capabilities present but not an array\n  --     \u{2192} fail with invalid_capabilities:required_not_array.\n  --   * Any element that is not a non-empty string \u{2192} fail with\n  --     invalid_capabilities:required:non_string_token. Silent-drop on\n  --     `[\"gpu\", null, 42]` would erase real requirements.\n  --   * Comma in a token \u{2192} fail (reserved delimiter, see ff_issue_claim_grant).\n  --   * ASCII control byte (0x00-0x1F, 0x7F) or space (0x20) \u{2192} fail\n  --     (invalid_capabilities:required:control_or_whitespace). Mirrors the\n  --     Rust ingress in ff-sdk::FlowFabricWorker::connect and\n  --     ff-scheduler::Scheduler::claim_for_worker (R3 relaxed printable-ASCII\n  --     check to allow UTF-8 printable above 0x7F while still rejecting\n  --     whitespace/control). This is the \"last line of defense\" for admin\n  --     direct-HSET bypass: a required cap containing \"\\n\" or \"\\0\" is\n  --     impossible to type, impossible to debug, and would silently pin an\n  --     execution as unclaimable forever.\n  --   * Bounds: same 4096 bytes / 256 tokens ceiling as the worker CSV.\n  --\n  -- Note on UTF-8: the byte-range test rejects ASCII control + space but\n  -- accepts every byte \u{2265} 0x21 except 0x7F (DEL). UTF-8 multibyte sequences\n  -- use only bytes 0x80-0xBF (continuation) or 0xC0-0xFD (lead), all above\n  -- 0x7F, so i18n caps like \"\u{4e1c}\u{4eac}-gpu\" pass through intact. See RFC-009 \u{a7}7.5.\n  local required_caps_csv = nil\n  if is_set(A.policy_json) and A.policy_json ~= \"{}\" then\n    local ok_decode, policy = pcall(cjson.decode, A.policy_json)\n    if not ok_decode then\n      return err(\"invalid_policy_json\", \"malformed\")\n    end\n    if type(policy) == \"table\" and policy.routing_requirements ~= nil then\n      if type(policy.routing_requirements) ~= \"table\" then\n        return err(\"invalid_policy_json\", \"routing_requirements:not_object\")\n      end\n      local caps = policy.routing_requirements.required_capabilities\n      if caps ~= nil then\n        if type(caps) ~= \"table\" then\n          return err(\"invalid_capabilities\", \"required:not_array\")\n        end\n        local list = {}\n        for _, cap in ipairs(caps) do\n          if type(cap) ~= \"string\" then\n            return err(\"invalid_capabilities\", \"required:non_string_token\")\n          end\n          if #cap == 0 then\n            return err(\"invalid_capabilities\", \"required:empty_token\")\n          end\n          if string.find(cap, \",\", 1, true) then\n            return err(\"invalid_capabilities\", \"required:comma_in_token\")\n          end\n          -- Reject ASCII control (0x00-0x1F), DEL (0x7F), and space (0x20).\n          -- Iterating byte-by-byte: any byte in 0x00..=0x20 or == 0x7F fails.\n          for i = 1, #cap do\n            local b = cap:byte(i)\n            if b <= 0x20 or b == 0x7F then\n              return err(\"invalid_capabilities\", \"required:control_or_whitespace\")\n            end\n          end\n          list[#list + 1] = cap\n        end\n        if #list > CAPS_MAX_TOKENS then\n          return err(\"invalid_capabilities\", \"required:too_many_tokens\")\n        end\n        table.sort(list)\n        if #list > 0 then\n          local csv = table.concat(list, \",\")\n          if #csv > CAPS_MAX_BYTES then\n            return err(\"invalid_capabilities\", \"required:too_many_bytes\")\n          end\n          required_caps_csv = csv\n        end\n      end\n    end\n  end\n\n  -- 3. Determine initial eligibility\n  local lifecycle_phase = \"runnable\"\n  local eligibility_state, blocking_reason, blocking_detail, public_state\n  local is_delayed = is_set(A.delay_until) and tonumber(A.delay_until) > now_ms\n\n  if is_delayed then\n    eligibility_state = \"not_eligible_until_time\"\n    blocking_reason   = \"waiting_for_delay\"\n    blocking_detail   = \"delayed until \" .. A.delay_until\n    public_state      = \"delayed\"\n  else\n    eligibility_state = \"eligible_now\"\n    blocking_reason   = \"waiting_for_worker\"\n    blocking_detail   = \"\"\n    public_state      = \"waiting\"\n  end\n\n  -- 4. Create exec_core \u{2014} ALL 7 state vector dimensions\n  redis.call(\"HSET\", K.core_key,\n    \"execution_id\", A.execution_id,\n    \"namespace\", A.namespace,\n    \"lane_id\", A.lane_id,\n    \"execution_kind\", A.execution_kind,\n    \"partition_id\", A.partition_id,\n    \"priority\", A.priority,\n    \"creator_identity\", A.creator_identity,\n    -- 7 state vector dimensions\n    \"lifecycle_phase\", lifecycle_phase,\n    \"ownership_state\", \"unowned\",\n    \"eligibility_state\", eligibility_state,\n    \"blocking_reason\", blocking_reason,\n    \"blocking_detail\", blocking_detail,\n    \"terminal_outcome\", \"none\",\n    \"attempt_state\", \"pending_first_attempt\",\n    \"public_state\", public_state,\n    -- accounting\n    \"total_attempt_count\", \"0\",\n    \"current_attempt_index\", \"\",\n    \"current_attempt_id\", \"\",\n    \"current_lease_id\", \"\",\n    \"current_lease_epoch\", \"0\",\n    \"current_worker_id\", \"\",\n    \"current_worker_instance_id\", \"\",\n    \"current_lane\", A.lane_id,\n    \"retry_count\", \"0\",\n    \"replay_count\", \"0\",\n    \"lease_reclaim_count\", \"0\",\n    -- timestamps\n    \"created_at\", now_ms,\n    \"started_at\", \"\",\n    \"completed_at\", \"\",\n    \"last_transition_at\", now_ms,\n    \"last_mutation_at\", now_ms,\n    -- lease fields (cleared)\n    \"lease_acquired_at\", \"\",\n    \"lease_expires_at\", \"\",\n    \"lease_last_renewed_at\", \"\",\n    \"lease_renewal_deadline\", \"\",\n    \"lease_expired_at\", \"\",\n    \"lease_revoked_at\", \"\",\n    \"lease_revoke_reason\", \"\",\n    -- delay\n    \"delay_until\", is_delayed and A.delay_until or \"\",\n    -- suspension (cleared)\n    \"current_suspension_id\", \"\",\n    \"current_waitpoint_id\", \"\",\n    -- pending lineage (cleared)\n    \"pending_retry_reason\", \"\",\n    \"pending_replay_reason\", \"\",\n    \"pending_replay_requested_by\", \"\",\n    \"pending_previous_attempt_index\", \"\",\n    -- progress\n    \"progress_pct\", \"\",\n    \"progress_message\", \"\",\n    \"progress_updated_at\", \"\",\n    -- flow\n    \"flow_id\", \"\")\n\n  -- 5. Store payload\n  if is_set(A.input_payload) then\n    redis.call(\"SET\", K.payload_key, A.input_payload)\n  end\n\n  -- 6. Store policy + required_capabilities. Validation already ran before\n  -- any write (step 2b); here we only persist the pre-validated CSV. See\n  -- step 2b for the fail-closed rationale on malformed / typed inputs.\n  if is_set(A.policy_json) then\n    redis.call(\"SET\", K.policy_key, A.policy_json)\n  end\n  if required_caps_csv then\n    redis.call(\"HSET\", K.core_key, \"required_capabilities\", required_caps_csv)\n  end\n\n  -- 7. Store tags\n  if is_set(A.tags_json) and A.tags_json ~= \"{}\" then\n    local ok_decode, tags = pcall(cjson.decode, A.tags_json)\n    if ok_decode and type(tags) == \"table\" then\n      local flat = {}\n      for k, v in pairs(tags) do\n        flat[#flat + 1] = k\n        flat[#flat + 1] = tostring(v)\n      end\n      if #flat > 0 then\n        redis.call(\"HSET\", K.tags_key, unpack(flat))\n      end\n    end\n  end\n\n  -- 8. Add to scheduling index\n  if is_delayed then\n    redis.call(\"ZADD\", K.scheduling_zset, tonumber(A.delay_until), A.execution_id)\n  else\n    -- Composite score: -(priority * 1_000_000_000_000) + created_at_ms\n    local score = 0 - (A.priority * 1000000000000) + now_ms\n    redis.call(\"ZADD\", K.scheduling_zset, score, A.execution_id)\n  end\n\n  -- 9. SADD to all_executions partition index\n  redis.call(\"SADD\", K.all_executions_set, A.execution_id)\n\n  -- 10. Execution deadline index\n  if is_set(A.execution_deadline_at) then\n    redis.call(\"ZADD\", K.deadline_zset, tonumber(A.execution_deadline_at), A.execution_id)\n  end\n\n  -- 11. Set idempotency key with TTL\n  -- Guard: PX 0 or PX <0 causes Valkey error (\"invalid expire time\"),\n  -- which would abort the FCALL after exec_core was already written (step 4).\n  local dedup_ms = tonumber(A.dedup_ttl_ms) or 0\n  if dedup_ms > 0 and K.idem_key ~= \"\" and not string.find(K.idem_key, \"ff:noop:\") then\n    redis.call(\"SET\", K.idem_key, A.execution_id,\n      \"PX\", dedup_ms)\n  end\n\n  return ok(A.execution_id, public_state)\nend)\n\n---------------------------------------------------------------------------\n-- #1  ff_claim_execution (new attempt)\n--\n-- Consumes a claim grant, creates a new attempt + lease, transitions\n-- runnable \u{2192} active. Attempt type derived from exec_core attempt_state.\n--\n-- KEYS (14): exec_core, claim_grant, eligible_zset, lease_expiry_zset,\n--            worker_leases, attempt_hash, attempt_usage, attempt_policy,\n--            attempts_zset, lease_current, lease_history, active_index,\n--            attempt_timeout_zset, execution_deadline_zset\n-- ARGV (12): execution_id, worker_id, worker_instance_id, lane,\n--            capability_snapshot_hash, lease_id, lease_ttl_ms,\n--            renew_before_ms, attempt_id, attempt_policy_json,\n--            attempt_timeout_ms, execution_deadline_at\n--\n-- KNOWN LIMITATION (flow-cancel race): ff_claim_execution reads\n-- exec_core on {p:N} but cannot atomically read flow_core on {fp:N}\n-- (cross-slot). If ff_cancel_flow fired and its async member dispatch\n-- was dropped by a transient Valkey error, the member\'s exec_core has\n-- NOT yet been flipped to terminal \u{2014} so a worker may still claim it and\n-- run it to completion inside a cancelled flow. The flow\'s own\n-- public_flow_state is correctly terminal; only this one member escapes.\n-- Mitigations already in place:\n--  * cancel_flow(wait=true) avoids the bg dispatch entirely.\n--  * ff_apply_dependency_to_child rejects additions to terminal flows,\n--    so children cannot be added mid-cancel.\n--  * retention eventually trims the stale member.\n-- Full fix (flag on exec_core maintained by a broadcast loop) is a\n-- deliberate design change deferred past Batch A.\n---------------------------------------------------------------------------\nredis.register_function(\'ff_claim_execution\', function(keys, args)\n  local K = {\n    core_key          = keys[1],\n    claim_grant       = keys[2],\n    eligible_zset     = keys[3],\n    lease_expiry_key  = keys[4],\n    worker_leases_key = keys[5],\n    attempt_hash      = keys[6],\n    attempt_usage     = keys[7],\n    attempt_policy    = keys[8],\n    attempts_zset     = keys[9],\n    lease_current_key = keys[10],\n    lease_history_key = keys[11],\n    active_index_key  = keys[12],\n    attempt_timeout_key = keys[13],\n    execution_deadline_key = keys[14],\n  }\n\n  local lease_ttl_n = require_number(args[7], \"lease_ttl_ms\")\n  if type(lease_ttl_n) == \"table\" then return lease_ttl_n end\n  local renew_before_n = require_number(args[8], \"renew_before_ms\")\n  if type(renew_before_n) == \"table\" then return renew_before_n end\n\n  local A = {\n    execution_id         = args[1],\n    worker_id            = args[2],\n    worker_instance_id   = args[3],\n    lane                 = args[4],\n    capability_hash      = args[5],\n    lease_id             = args[6],\n    lease_ttl_ms         = lease_ttl_n,\n    renew_before_ms      = renew_before_n,\n    attempt_id           = args[9],\n    attempt_policy_json  = args[10],\n    attempt_timeout_ms   = args[11],  -- \"\" or ms\n    execution_deadline_at = args[12], -- \"\" or ms\n  }\n\n  local t = redis.call(\"TIME\")\n  local now_ms = tonumber(t[1]) * 1000 + math.floor(tonumber(t[2]) / 1000)\n\n  -- 1. Validate execution state\n  local raw = redis.call(\"HGETALL\", K.core_key)\n  if #raw == 0 then return err(\"execution_not_found\") end\n  local core = hgetall_to_table(raw)\n\n  if core.lifecycle_phase ~= \"runnable\" then return err(\"execution_not_leaseable\") end\n  if core.ownership_state ~= \"unowned\" then return err(\"lease_conflict\") end\n  if core.eligibility_state ~= \"eligible_now\" then return err(\"execution_not_leaseable\") end\n  if core.terminal_outcome ~= \"none\" then return err(\"execution_not_leaseable\") end\n\n  -- Defense-in-depth: invariant A3\n  if core.attempt_state == \"running_attempt\" then\n    return err(\"active_attempt_exists\")\n  end\n  -- Dispatch: resume-from-suspension must use claim_resumed_execution\n  if core.attempt_state == \"attempt_interrupted\" then\n    return err(\"use_claim_resumed_execution\")\n  end\n\n  -- 2. Validate and consume claim grant\n  local grant_raw = redis.call(\"HGETALL\", K.claim_grant)\n  if #grant_raw == 0 then return err(\"invalid_claim_grant\") end\n  local grant = hgetall_to_table(grant_raw)\n\n  if grant.worker_id ~= A.worker_id then\n    return err(\"invalid_claim_grant\")\n  end\n  if is_set(grant.grant_expires_at) and tonumber(grant.grant_expires_at) < now_ms then\n    return err(\"claim_grant_expired\")\n  end\n  redis.call(\"DEL\", K.claim_grant)\n\n  -- 3. Compute lease fields\n  local next_epoch = tonumber(core.current_lease_epoch or \"0\") + 1\n  local expires_at = now_ms + A.lease_ttl_ms\n  local renewal_deadline = now_ms + A.renew_before_ms\n  local next_att_idx = tonumber(core.total_attempt_count or \"0\")\n\n  -- 4. Derive attempt type from exec core attempt_state\n  local attempt_type = \"initial\"\n  local lineage_fields = {}\n  if core.attempt_state == \"pending_retry_attempt\" then\n    attempt_type = \"retry\"\n    lineage_fields = {\n      \"retry_reason\", core.pending_retry_reason or \"\",\n      \"previous_attempt_index\", core.pending_previous_attempt_index or \"\"\n    }\n  elseif core.attempt_state == \"pending_replay_attempt\" then\n    attempt_type = \"replay\"\n    lineage_fields = {\n      \"replay_reason\", core.pending_replay_reason or \"\",\n      \"replay_requested_by\", core.pending_replay_requested_by or \"\",\n      \"replayed_from_attempt_index\", core.pending_previous_attempt_index or \"\"\n    }\n  end\n\n  -- 5. Create attempt record\n  --    Construct actual attempt key from hash tag + computed index.\n  --    KEYS[6] is a placeholder (always index 0); on retry/replay the\n  --    actual index is > 0, so we build the real key dynamically.\n  --    All attempt keys share the same {p:N} hash tag \u{2192} same Cluster slot.\n  local tag = string.match(K.core_key, \"(%b{})\")\n  local att_key = \"ff:attempt:\" .. tag .. \":\" .. A.execution_id .. \":\" .. tostring(next_att_idx)\n  local att_usage_key = att_key .. \":usage\"\n  local att_policy_key = att_key .. \":policy\"\n\n  local attempt_fields = {\n    \"attempt_id\", A.attempt_id,\n    \"execution_id\", A.execution_id,\n    \"attempt_index\", tostring(next_att_idx),\n    \"attempt_type\", attempt_type,\n    \"attempt_state\", \"started\",\n    \"created_at\", tostring(now_ms),\n    \"started_at\", tostring(now_ms),\n    \"lease_id\", A.lease_id,\n    \"lease_epoch\", tostring(next_epoch),\n    \"worker_id\", A.worker_id,\n    \"worker_instance_id\", A.worker_instance_id\n  }\n  for _, v in ipairs(lineage_fields) do\n    attempt_fields[#attempt_fields + 1] = v\n  end\n  redis.call(\"HSET\", att_key, unpack(attempt_fields))\n  redis.call(\"ZADD\", K.attempts_zset, now_ms, tostring(next_att_idx))\n\n  -- 5b. Initialize attempt usage counters\n  redis.call(\"HSET\", att_usage_key,\n    \"last_usage_report_seq\", \"0\")\n\n  -- 5c. Store attempt policy snapshot\n  if is_set(A.attempt_policy_json) then\n    local policy_flat = unpack_policy(A.attempt_policy_json)\n    if #policy_flat > 0 then\n      redis.call(\"HSET\", att_policy_key, unpack(policy_flat))\n    end\n  end\n\n  -- 6. Create lease record\n  redis.call(\"DEL\", K.lease_current_key)\n  redis.call(\"HSET\", K.lease_current_key,\n    \"lease_id\", A.lease_id,\n    \"lease_epoch\", tostring(next_epoch),\n    \"execution_id\", A.execution_id,\n    \"attempt_id\", A.attempt_id,\n    \"worker_id\", A.worker_id,\n    \"worker_instance_id\", A.worker_instance_id,\n    \"acquired_at\", tostring(now_ms),\n    \"expires_at\", tostring(expires_at),\n    \"last_renewed_at\", tostring(now_ms),\n    \"renewal_deadline\", tostring(renewal_deadline))\n\n  -- 7. Update execution core \u{2014} ALL 7 state vector dimensions\n  redis.call(\"HSET\", K.core_key,\n    \"lifecycle_phase\", \"active\",\n    \"ownership_state\", \"leased\",\n    \"eligibility_state\", \"not_applicable\",\n    \"blocking_reason\", \"none\",\n    \"blocking_detail\", \"\",\n    \"terminal_outcome\", \"none\",\n    \"attempt_state\", \"running_attempt\",\n    \"public_state\", \"active\",\n    \"current_attempt_index\", tostring(next_att_idx),\n    \"total_attempt_count\", tostring(next_att_idx + 1),\n    \"current_attempt_id\", A.attempt_id,\n    \"current_lease_id\", A.lease_id,\n    \"current_lease_epoch\", tostring(next_epoch),\n    \"current_worker_id\", A.worker_id,\n    \"current_worker_instance_id\", A.worker_instance_id,\n    \"current_lane\", A.lane,\n    \"lease_acquired_at\", tostring(now_ms),\n    \"lease_expires_at\", tostring(expires_at),\n    \"lease_last_renewed_at\", tostring(now_ms),\n    \"lease_renewal_deadline\", tostring(renewal_deadline),\n    -- Preserve the first-claim timestamp across retries; only fall\n    -- back to `now_ms` when the stored value is empty (initial state\n    -- written at HSET, line 208, or after a reset). In Lua the empty\n    -- string is truthy, so `core.started_at or now_ms` would wedge at\n    -- \"\" forever on the first claim \u{2014} explicit emptiness check fixes\n    -- the scenario 4 stage_latency bench (exec_core surfaced this via\n    -- the new ExecutionInfo.started_at REST field).\n    \"started_at\", (core.started_at ~= nil and core.started_at ~= \"\") and core.started_at or tostring(now_ms),\n    -- Clear pending lineage fields (consumed above)\n    \"pending_retry_reason\", \"\",\n    \"pending_replay_reason\", \"\",\n    \"pending_replay_requested_by\", \"\",\n    \"pending_previous_attempt_index\", \"\",\n    \"last_transition_at\", tostring(now_ms),\n    \"last_mutation_at\", tostring(now_ms))\n\n  -- 8. Update indexes\n  redis.call(\"ZREM\", K.eligible_zset, A.execution_id)\n  redis.call(\"ZADD\", K.lease_expiry_key, expires_at, A.execution_id)\n  redis.call(\"SADD\", K.worker_leases_key, A.execution_id)\n  redis.call(\"ZADD\", K.active_index_key, expires_at, A.execution_id)\n\n  -- 8a. Timeout indexes\n  if is_set(A.attempt_timeout_ms) and A.attempt_timeout_ms ~= \"0\" then\n    redis.call(\"ZADD\", K.attempt_timeout_key,\n      now_ms + tonumber(A.attempt_timeout_ms), A.execution_id)\n  end\n  if is_set(A.execution_deadline_at) and A.execution_deadline_at ~= \"0\" then\n    redis.call(\"ZADD\", K.execution_deadline_key,\n      tonumber(A.execution_deadline_at), A.execution_id)\n  end\n\n  -- 9. Lease history event\n  redis.call(\"XADD\", K.lease_history_key, \"MAXLEN\", \"~\", 1000, \"*\",\n    \"event\", \"acquired\",\n    \"lease_id\", A.lease_id,\n    \"lease_epoch\", tostring(next_epoch),\n    \"attempt_id\", A.attempt_id,\n    \"attempt_index\", tostring(next_att_idx),\n    \"worker_id\", A.worker_id,\n    \"ts\", tostring(now_ms))\n\n  return ok(A.lease_id, tostring(next_epoch), tostring(expires_at),\n            A.attempt_id, tostring(next_att_idx), attempt_type)\nend)\n\n---------------------------------------------------------------------------\n-- #3  ff_complete_execution\n--\n-- Validate lease, end attempt, store result, transition active\u{2192}terminal.\n--\n-- KEYS (12): exec_core, attempt_hash, lease_expiry_zset, worker_leases,\n--            terminal_zset, lease_current, lease_history, active_index,\n--            stream_meta, result_key, attempt_timeout_zset,\n--            execution_deadline_zset\n-- ARGV (5): execution_id, lease_id, lease_epoch, attempt_id, result_payload\n---------------------------------------------------------------------------\nredis.register_function(\'ff_complete_execution\', function(keys, args)\n  local K = {\n    core_key            = keys[1],\n    attempt_hash        = keys[2],\n    lease_expiry_key    = keys[3],\n    worker_leases_key   = keys[4],\n    terminal_zset       = keys[5],\n    lease_current_key   = keys[6],\n    lease_history_key   = keys[7],\n    active_index_key    = keys[8],\n    stream_meta         = keys[9],\n    result_key          = keys[10],\n    attempt_timeout_key = keys[11],\n    execution_deadline_key = keys[12],\n  }\n\n  local A = {\n    execution_id  = args[1],\n    lease_id      = args[2],\n    lease_epoch   = args[3],\n    attempt_id    = args[4],\n    result_payload = args[5] or \"\",\n  }\n\n  local t = redis.call(\"TIME\")\n  local now_ms = tonumber(t[1]) * 1000 + math.floor(tonumber(t[2]) / 1000)\n\n  -- Read + validate lease\n  local raw = redis.call(\"HGETALL\", K.core_key)\n  if #raw == 0 then return err(\"execution_not_found\") end\n  local core = hgetall_to_table(raw)\n\n  local lease_err = validate_lease_and_mark_expired(\n    core, A, now_ms, K, 1000)\n  if lease_err then return lease_err end\n\n  -- End attempt\n  redis.call(\"HSET\", K.attempt_hash,\n    \"attempt_state\", \"ended_success\",\n    \"ended_at\", tostring(now_ms))\n\n  -- Close stream if exists\n  if redis.call(\"EXISTS\", K.stream_meta) == 1 then\n    redis.call(\"HSET\", K.stream_meta,\n      \"closed_at\", tostring(now_ms),\n      \"closed_reason\", \"attempt_success\")\n  end\n\n  -- Store result\n  if A.result_payload ~= \"\" then\n    redis.call(\"SET\", K.result_key, A.result_payload)\n  end\n\n  -- Update execution core \u{2014} ALL 7 state vector dimensions\n  redis.call(\"HSET\", K.core_key,\n    \"lifecycle_phase\", \"terminal\",\n    \"ownership_state\", \"unowned\",\n    \"eligibility_state\", \"not_applicable\",\n    \"blocking_reason\", \"none\",\n    \"blocking_detail\", \"\",\n    \"terminal_outcome\", \"success\",\n    \"attempt_state\", \"attempt_terminal\",\n    \"public_state\", \"completed\",\n    \"completed_at\", tostring(now_ms),\n    \"current_lease_id\", \"\",\n    \"current_worker_id\", \"\",\n    \"current_worker_instance_id\", \"\",\n    \"lease_expires_at\", \"\",\n    \"lease_last_renewed_at\", \"\",\n    \"lease_renewal_deadline\", \"\",\n    \"lease_expired_at\", \"\",\n    \"lease_revoked_at\", \"\",\n    \"lease_revoke_reason\", \"\",\n    \"last_transition_at\", tostring(now_ms),\n    \"last_mutation_at\", tostring(now_ms))\n\n  -- Update indexes\n  redis.call(\"ZREM\", K.lease_expiry_key, A.execution_id)\n  redis.call(\"SREM\", K.worker_leases_key, A.execution_id)\n  redis.call(\"ZADD\", K.terminal_zset, now_ms, A.execution_id)\n  redis.call(\"ZREM\", K.active_index_key, A.execution_id)\n  redis.call(\"ZREM\", K.attempt_timeout_key, A.execution_id)\n  redis.call(\"ZREM\", K.execution_deadline_key, A.execution_id)\n\n  -- Clean up lease\n  redis.call(\"DEL\", K.lease_current_key)\n  redis.call(\"XADD\", K.lease_history_key, \"MAXLEN\", \"~\", 1000, \"*\",\n    \"event\", \"released\",\n    \"lease_id\", A.lease_id,\n    \"lease_epoch\", A.lease_epoch,\n    \"attempt_index\", core.current_attempt_index or \"\",\n    \"reason\", \"completed\",\n    \"ts\", tostring(now_ms))\n\n  -- Push-based DAG promotion (Batch C item 6). Only publish when the\n  -- execution actually belongs to a flow \u{2014} standalone executions never\n  -- have downstream edges for the engine to resolve. The engine\'s\n  -- CompletionListener scanner SUBSCRIBEs to ff:dag:completions and\n  -- calls dispatch_dependency_resolution per message. The\n  -- dependency_reconciler interval scan is retained as a safety net\n  -- for: missed messages during listener restart, ungated state\n  -- (flow_id empty on older executions), and cluster broadcast gaps.\n  if is_set(core.flow_id) then\n    local payload = cjson.encode({\n      execution_id = A.execution_id,\n      flow_id = core.flow_id,\n      outcome = \"success\",\n    })\n    redis.call(\"PUBLISH\", \"ff:dag:completions\", payload)\n  end\n\n  return ok(\"completed\")\nend)\n\n---------------------------------------------------------------------------\n-- #12a  ff_cancel_execution\n--\n-- Cancel from any non-terminal state. Multi-path:\n--   active \u{2192} validate lease or operator override, end attempt, clear lease\n--   runnable \u{2192} defensive ZREM all indexes\n--   suspended \u{2192} close suspension/waitpoint, end attempt\n-- All paths: terminal(cancelled), defensive ZREM, ZADD terminal.\n--\n-- KEYS (21): exec_core, attempt_hash, stream_meta, lease_current,\n--            lease_history, lease_expiry_zset, worker_leases,\n--            suspension_current, waitpoint_hash, wp_condition,\n--            suspension_timeout_zset, terminal_zset,\n--            attempt_timeout_zset, execution_deadline_zset,\n--            eligible_zset, delayed_zset, blocked_deps_zset,\n--            blocked_budget_zset, blocked_quota_zset,\n--            blocked_route_zset, blocked_operator_zset\n-- ARGV (5): execution_id, reason, source, lease_id, lease_epoch\n---------------------------------------------------------------------------\nredis.register_function(\'ff_cancel_execution\', function(keys, args)\n  local K = {\n    core_key              = keys[1],\n    attempt_hash          = keys[2],\n    stream_meta           = keys[3],\n    lease_current_key     = keys[4],\n    lease_history_key     = keys[5],\n    lease_expiry_key      = keys[6],\n    worker_leases_key     = keys[7],\n    suspension_current    = keys[8],\n    waitpoint_hash        = keys[9],\n    wp_condition          = keys[10],\n    suspension_timeout_key = keys[11],\n    terminal_key          = keys[12],\n    attempt_timeout_key   = keys[13],\n    execution_deadline_key = keys[14],\n    eligible_key          = keys[15],\n    delayed_key           = keys[16],\n    blocked_deps_key      = keys[17],\n    blocked_budget_key    = keys[18],\n    blocked_quota_key     = keys[19],\n    blocked_route_key     = keys[20],\n    blocked_operator_key  = keys[21],\n    -- active_index_key and suspended_key are constructed dynamically below\n    -- from the hash tag + lane_id (avoids changing KEYS count).\n    active_index_key      = nil,\n    suspended_key         = nil,\n  }\n\n  local A = {\n    execution_id = args[1],\n    reason       = args[2],\n    source       = args[3] or \"\",  -- \"operator_override\" or \"\"\n    lease_id     = args[4] or \"\",\n    lease_epoch  = args[5] or \"\",\n  }\n\n  local t = redis.call(\"TIME\")\n  local now_ms = tonumber(t[1]) * 1000 + math.floor(tonumber(t[2]) / 1000)\n\n  local raw = redis.call(\"HGETALL\", K.core_key)\n  if #raw == 0 then return err(\"execution_not_found\") end\n  local core = hgetall_to_table(raw)\n\n  -- Construct lane:active and lane:suspended keys dynamically from hash tag.\n  -- These are not in the 21 KEYS array but defensive_zrem_all_indexes needs\n  -- them to clean stale entries when cancelling active or suspended executions.\n  local tag = string.match(K.core_key, \"(%b{})\")\n  local lane = core.lane_id or \"default\"\n  K.active_index_key = \"ff:idx:\" .. tag .. \":lane:\" .. lane .. \":active\"\n  K.suspended_key    = \"ff:idx:\" .. tag .. \":lane:\" .. lane .. \":suspended\"\n\n  -- Already terminal\n  if core.lifecycle_phase == \"terminal\" then\n    return err(\"execution_not_active\",\n      core.terminal_outcome or \"\", core.current_lease_epoch or \"\")\n  end\n\n  local cancelled_from = core.lifecycle_phase\n\n  -- PATH: Active\n  if core.lifecycle_phase == \"active\" then\n    -- Require lease validation or operator override\n    if A.source ~= \"operator_override\" then\n      if core.ownership_state == \"lease_revoked\" then\n        return err(\"lease_revoked\")\n      end\n      if is_set(A.lease_id) then\n        if core.current_lease_id ~= A.lease_id then return err(\"stale_lease\") end\n        if core.current_lease_epoch ~= A.lease_epoch then return err(\"stale_lease\") end\n      end\n    end\n\n    -- End attempt\n    if is_set(core.current_attempt_index) then\n      redis.call(\"HSET\", K.attempt_hash,\n        \"attempt_state\", \"ended_cancelled\",\n        \"ended_at\", tostring(now_ms),\n        \"failure_reason\", \"cancelled: \" .. A.reason)\n    end\n\n    -- Close stream if exists\n    if redis.call(\"EXISTS\", K.stream_meta) == 1 then\n      redis.call(\"HSET\", K.stream_meta,\n        \"closed_at\", tostring(now_ms),\n        \"closed_reason\", \"attempt_cancelled\")\n    end\n\n    -- Release lease\n    redis.call(\"DEL\", K.lease_current_key)\n    redis.call(\"XADD\", K.lease_history_key, \"MAXLEN\", \"~\", \"1000\", \"*\",\n      \"event\", \"released\",\n      \"lease_id\", core.current_lease_id or \"\",\n      \"lease_epoch\", core.current_lease_epoch or \"\",\n      \"attempt_index\", core.current_attempt_index or \"\",\n      \"reason\", \"cancelled\",\n      \"ts\", tostring(now_ms))\n  end\n\n  -- PATH: Suspended\n  if core.lifecycle_phase == \"suspended\" then\n    -- End attempt (interrupted \u{2192} ended_cancelled)\n    if is_set(core.current_attempt_index) then\n      redis.call(\"HSET\", K.attempt_hash,\n        \"attempt_state\", \"ended_cancelled\",\n        \"ended_at\", tostring(now_ms),\n        \"failure_reason\", \"cancelled: \" .. A.reason)\n    end\n\n    -- Close stream if exists\n    if redis.call(\"EXISTS\", K.stream_meta) == 1 then\n      redis.call(\"HSET\", K.stream_meta,\n        \"closed_at\", tostring(now_ms),\n        \"closed_reason\", \"attempt_cancelled\")\n    end\n\n    -- Close suspension + waitpoint + wp_condition\n    if redis.call(\"EXISTS\", K.suspension_current) == 1 then\n      redis.call(\"HSET\", K.suspension_current,\n        \"closed_at\", tostring(now_ms),\n        \"close_reason\", \"cancelled\")\n    end\n    if redis.call(\"EXISTS\", K.waitpoint_hash) == 1 then\n      redis.call(\"HSET\", K.waitpoint_hash,\n        \"state\", \"closed\",\n        \"closed_at\", tostring(now_ms),\n        \"close_reason\", \"cancelled\")\n    end\n    if redis.call(\"EXISTS\", K.wp_condition) == 1 then\n      redis.call(\"HSET\", K.wp_condition,\n        \"closed\", \"1\",\n        \"closed_at\", tostring(now_ms),\n        \"closed_reason\", \"cancelled\")\n    end\n  end\n\n  -- ALL PATHS: exec_core FIRST for terminal transition (\u{a7}4.8b Rule 2)\n  redis.call(\"HSET\", K.core_key,\n    \"lifecycle_phase\", \"terminal\",\n    \"ownership_state\", \"unowned\",\n    \"eligibility_state\", \"not_applicable\",\n    \"blocking_reason\", \"none\",\n    \"blocking_detail\", \"\",\n    \"terminal_outcome\", \"cancelled\",\n    \"attempt_state\", is_set(core.current_attempt_index) and \"attempt_terminal\" or (core.attempt_state or \"none\"),\n    \"public_state\", \"cancelled\",\n    \"cancellation_reason\", A.reason,\n    \"cancelled_by\", A.source ~= \"\" and A.source or A.execution_id,\n    \"completed_at\", tostring(now_ms),\n    \"current_lease_id\", \"\",\n    \"current_worker_id\", \"\",\n    \"current_worker_instance_id\", \"\",\n    \"lease_expires_at\", \"\",\n    \"lease_last_renewed_at\", \"\",\n    \"lease_renewal_deadline\", \"\",\n    \"current_suspension_id\", \"\",\n    \"current_waitpoint_id\", \"\",\n    \"last_transition_at\", tostring(now_ms),\n    \"last_mutation_at\", tostring(now_ms))\n\n  -- Defensive ZREM from ALL scheduling + timeout indexes\n  defensive_zrem_all_indexes(K, A.execution_id, K.terminal_key)\n\n  -- ZADD terminal (unconditional)\n  redis.call(\"ZADD\", K.terminal_key, now_ms, A.execution_id)\n\n  -- Push-based DAG promotion (Batch C item 6). Skip the publish when\n  -- the cancel came from `flow_cascade` \u{2014} that source means a parent\n  -- already propagated and the engine is walking the edge graph; an\n  -- additional publish here would trigger redundant dispatch work.\n  -- (Not a correctness issue since dispatch is idempotent, but it\n  -- avoids unnecessary load under wide fan-out cancellations.)\n  if is_set(core.flow_id) and A.source ~= \"flow_cascade\" then\n    local payload = cjson.encode({\n      execution_id = A.execution_id,\n      flow_id = core.flow_id,\n      outcome = \"cancelled\",\n    })\n    redis.call(\"PUBLISH\", \"ff:dag:completions\", payload)\n  end\n\n  return ok(\"cancelled\", cancelled_from)\nend)\n\n---------------------------------------------------------------------------\n-- #30  ff_delay_execution\n--\n-- Worker delays its own active execution. Releases lease, transitions\n-- to runnable + not_eligible_until_time. Same attempt continues (paused).\n--\n-- KEYS (9): exec_core, attempt_hash, lease_current, lease_history,\n--           lease_expiry_zset, worker_leases, active_index,\n--           delayed_zset, attempt_timeout_zset\n-- ARGV (5): execution_id, lease_id, lease_epoch, attempt_id, delay_until\n---------------------------------------------------------------------------\nredis.register_function(\'ff_delay_execution\', function(keys, args)\n  local K = {\n    core_key            = keys[1],\n    attempt_hash        = keys[2],\n    lease_current_key   = keys[3],\n    lease_history_key   = keys[4],\n    lease_expiry_key    = keys[5],\n    worker_leases_key   = keys[6],\n    active_index_key    = keys[7],\n    delayed_zset        = keys[8],\n    attempt_timeout_key = keys[9],\n  }\n\n  local A = {\n    execution_id = args[1],\n    lease_id     = args[2],\n    lease_epoch  = args[3],\n    attempt_id   = args[4],\n    delay_until  = args[5],\n  }\n\n  local t = redis.call(\"TIME\")\n  local now_ms = tonumber(t[1]) * 1000 + math.floor(tonumber(t[2]) / 1000)\n\n  local raw = redis.call(\"HGETALL\", K.core_key)\n  if #raw == 0 then return err(\"execution_not_found\") end\n  local core = hgetall_to_table(raw)\n\n  -- Validate lease\n  local lease_err = validate_lease_and_mark_expired(\n    core, A, now_ms, K, 1000)\n  if lease_err then return lease_err end\n\n  -- OOM-SAFE WRITE ORDERING: exec_core FIRST (point of no return)\n  -- ALL 7 state vector dimensions\n  redis.call(\"HSET\", K.core_key,\n    \"lifecycle_phase\", \"runnable\",\n    \"ownership_state\", \"unowned\",\n    \"eligibility_state\", \"not_eligible_until_time\",\n    \"blocking_reason\", \"waiting_for_delay\",\n    \"blocking_detail\", \"delayed until \" .. A.delay_until,\n    \"terminal_outcome\", \"none\",\n    \"attempt_state\", \"attempt_interrupted\",\n    \"public_state\", \"delayed\",\n    \"delay_until\", A.delay_until,\n    \"current_lease_id\", \"\",\n    \"current_worker_id\", \"\",\n    \"current_worker_instance_id\", \"\",\n    \"lease_expires_at\", \"\",\n    \"lease_last_renewed_at\", \"\",\n    \"lease_renewal_deadline\", \"\",\n    \"last_transition_at\", tostring(now_ms),\n    \"last_mutation_at\", tostring(now_ms))\n\n  -- Pause attempt: started \u{2192} suspended\n  redis.call(\"HSET\", K.attempt_hash,\n    \"attempt_state\", \"suspended\",\n    \"suspended_at\", tostring(now_ms),\n    \"suspension_id\", \"worker_delay\")\n\n  -- Release lease + update indexes\n  redis.call(\"DEL\", K.lease_current_key)\n  redis.call(\"ZREM\", K.lease_expiry_key, A.execution_id)\n  redis.call(\"SREM\", K.worker_leases_key, A.execution_id)\n  redis.call(\"ZREM\", K.active_index_key, A.execution_id)\n  redis.call(\"ZREM\", K.attempt_timeout_key, A.execution_id)\n\n  -- Add to delayed set\n  redis.call(\"ZADD\", K.delayed_zset, tonumber(A.delay_until), A.execution_id)\n\n  -- Lease history event\n  redis.call(\"XADD\", K.lease_history_key, \"MAXLEN\", \"~\", \"1000\", \"*\",\n    \"event\", \"released\",\n    \"lease_id\", A.lease_id,\n    \"lease_epoch\", A.lease_epoch,\n    \"attempt_index\", core.current_attempt_index or \"\",\n    \"attempt_id\", A.attempt_id,\n    \"reason\", \"worker_delay\",\n    \"ts\", tostring(now_ms))\n\n  return ok(A.delay_until)\nend)\n\n---------------------------------------------------------------------------\n-- #31  ff_move_to_waiting_children\n--\n-- Worker blocks on child dependencies. Releases lease, transitions to\n-- runnable + blocked_by_dependencies. Same attempt continues (paused).\n--\n-- KEYS (9): exec_core, attempt_hash, lease_current, lease_history,\n--           lease_expiry_zset, worker_leases, active_index,\n--           blocked_deps_zset, attempt_timeout_zset\n-- ARGV (4): execution_id, lease_id, lease_epoch, attempt_id\n---------------------------------------------------------------------------\nredis.register_function(\'ff_move_to_waiting_children\', function(keys, args)\n  local K = {\n    core_key            = keys[1],\n    attempt_hash        = keys[2],\n    lease_current_key   = keys[3],\n    lease_history_key   = keys[4],\n    lease_expiry_key    = keys[5],\n    worker_leases_key   = keys[6],\n    active_index_key    = keys[7],\n    blocked_deps_zset   = keys[8],\n    attempt_timeout_key = keys[9],\n  }\n\n  local A = {\n    execution_id = args[1],\n    lease_id     = args[2],\n    lease_epoch  = args[3],\n    attempt_id   = args[4],\n  }\n\n  local t = redis.call(\"TIME\")\n  local now_ms = tonumber(t[1]) * 1000 + math.floor(tonumber(t[2]) / 1000)\n\n  local raw = redis.call(\"HGETALL\", K.core_key)\n  if #raw == 0 then return err(\"execution_not_found\") end\n  local core = hgetall_to_table(raw)\n\n  -- Validate lease\n  local lease_err = validate_lease_and_mark_expired(\n    core, A, now_ms, K, 1000)\n  if lease_err then return lease_err end\n\n  -- OOM-SAFE WRITE ORDERING: exec_core FIRST (point of no return, \u{a7}4.8b Rule 2)\n  -- ALL 7 state vector dimensions\n  redis.call(\"HSET\", K.core_key,\n    \"lifecycle_phase\", \"runnable\",\n    \"ownership_state\", \"unowned\",\n    \"eligibility_state\", \"blocked_by_dependencies\",\n    \"blocking_reason\", \"waiting_for_children\",\n    \"blocking_detail\", \"waiting for child executions to complete\",\n    \"terminal_outcome\", \"none\",\n    \"attempt_state\", \"attempt_interrupted\",\n    \"public_state\", \"waiting_children\",\n    \"current_lease_id\", \"\",\n    \"current_worker_id\", \"\",\n    \"current_worker_instance_id\", \"\",\n    \"lease_expires_at\", \"\",\n    \"lease_last_renewed_at\", \"\",\n    \"lease_renewal_deadline\", \"\",\n    \"last_transition_at\", tostring(now_ms),\n    \"last_mutation_at\", tostring(now_ms))\n\n  -- Pause attempt: started \u{2192} suspended (waiting children, not ended)\n  redis.call(\"HSET\", K.attempt_hash,\n    \"attempt_state\", \"suspended\",\n    \"suspended_at\", tostring(now_ms),\n    \"suspension_id\", \"waiting_children\")\n\n  -- Release lease + update indexes\n  redis.call(\"DEL\", K.lease_current_key)\n  redis.call(\"ZREM\", K.lease_expiry_key, A.execution_id)\n  redis.call(\"SREM\", K.worker_leases_key, A.execution_id)\n  redis.call(\"ZREM\", K.active_index_key, A.execution_id)\n  redis.call(\"ZREM\", K.attempt_timeout_key, A.execution_id)\n\n  -- Lease history event\n  redis.call(\"XADD\", K.lease_history_key, \"MAXLEN\", \"~\", \"1000\", \"*\",\n    \"event\", \"released\",\n    \"lease_id\", A.lease_id,\n    \"lease_epoch\", A.lease_epoch,\n    \"attempt_index\", core.current_attempt_index or \"\",\n    \"attempt_id\", A.attempt_id,\n    \"reason\", \"waiting_children\",\n    \"ts\", tostring(now_ms))\n\n  -- Add to blocked:dependencies set\n  redis.call(\"ZADD\", K.blocked_deps_zset,\n    tonumber(core.created_at or \"0\"), A.execution_id)\n\n  return ok()\nend)\n\n---------------------------------------------------------------------------\n-- #4  ff_fail_execution\n--\n-- Fail an active execution. If retries remain: set pending_retry_attempt\n-- + lineage on exec_core (deferred creation \u{2014} claim_execution creates\n-- the attempt, per R21 fix). If max retries reached: terminal failure.\n--\n-- KEYS (12): exec_core, attempt_hash, lease_expiry_zset, worker_leases,\n--            terminal_zset, delayed_zset, lease_current, lease_history,\n--            active_index, stream_meta, attempt_timeout_zset,\n--            execution_deadline_zset\n-- ARGV (7): execution_id, lease_id, lease_epoch, attempt_id,\n--           failure_reason, failure_category, retry_policy_json\n---------------------------------------------------------------------------\nredis.register_function(\'ff_fail_execution\', function(keys, args)\n  local K = {\n    core_key              = keys[1],\n    attempt_hash          = keys[2],\n    lease_expiry_key      = keys[3],\n    worker_leases_key     = keys[4],\n    terminal_key          = keys[5],\n    delayed_zset          = keys[6],\n    lease_current_key     = keys[7],\n    lease_history_key     = keys[8],\n    active_index_key      = keys[9],\n    stream_meta           = keys[10],\n    attempt_timeout_key   = keys[11],\n    execution_deadline_key = keys[12],\n  }\n\n  local A = {\n    execution_id     = args[1],\n    lease_id         = args[2],\n    lease_epoch      = args[3],\n    attempt_id       = args[4],\n    failure_reason   = args[5],\n    failure_category = args[6],\n    retry_policy_json = args[7] or \"\",\n  }\n\n  local t = redis.call(\"TIME\")\n  local now_ms = tonumber(t[1]) * 1000 + math.floor(tonumber(t[2]) / 1000)\n\n  -- 1. Read + validate lease\n  local raw = redis.call(\"HGETALL\", K.core_key)\n  if #raw == 0 then return err(\"execution_not_found\") end\n  local core = hgetall_to_table(raw)\n\n  local lease_err = validate_lease_and_mark_expired(\n    core, A, now_ms, K, 1000)\n  if lease_err then return lease_err end\n\n  -- 2. End current attempt\n  redis.call(\"HSET\", K.attempt_hash,\n    \"attempt_state\", \"ended_failure\",\n    \"ended_at\", tostring(now_ms),\n    \"failure_reason\", A.failure_reason,\n    \"failure_category\", A.failure_category)\n\n  -- 3. Close stream if exists\n  if redis.call(\"EXISTS\", K.stream_meta) == 1 then\n    redis.call(\"HSET\", K.stream_meta,\n      \"closed_at\", tostring(now_ms),\n      \"closed_reason\", \"attempt_failure\")\n  end\n\n  -- 4. Determine retry eligibility\n  local retry_count = tonumber(core.retry_count or \"0\")\n  local max_retries = 0\n  local backoff_ms = 1000\n  local can_retry = false\n\n  if is_set(A.retry_policy_json) then\n    local ok_decode, policy = pcall(cjson.decode, A.retry_policy_json)\n    if ok_decode and type(policy) == \"table\" then\n      max_retries = tonumber(policy.max_retries or \"0\")\n      if retry_count < max_retries then\n        can_retry = true\n        local bt = policy.backoff or {}\n        if bt.type == \"exponential\" then\n          local initial = (tonumber(bt.initial_delay_ms) or 1000)\n          local max_d = (tonumber(bt.max_delay_ms) or 60000)\n          local mult = (tonumber(bt.multiplier) or 2)\n          backoff_ms = math.min(initial * (mult ^ retry_count), max_d)\n        elseif bt.type == \"fixed\" then\n          backoff_ms = (tonumber(bt.delay_ms) or 1000)\n        end\n      end\n    end\n  end\n\n  if can_retry then\n    -- RETRY PATH: deferred attempt creation (R21 fix)\n    -- Do NOT create attempt record here. claim_execution creates the\n    -- attempt with correct type (retry) by reading pending_retry_attempt.\n    local delay_until = now_ms + backoff_ms\n\n    -- ALL 7 state vector dimensions + pending lineage\n    redis.call(\"HSET\", K.core_key,\n      \"lifecycle_phase\", \"runnable\",\n      \"ownership_state\", \"unowned\",\n      \"eligibility_state\", \"not_eligible_until_time\",\n      \"blocking_reason\", \"waiting_for_retry_backoff\",\n      \"blocking_detail\", \"retry backoff until \" .. tostring(delay_until) ..\n        \" (attempt \" .. (core.current_attempt_index or \"0\") ..\n        \" failed: \" .. A.failure_reason .. \")\",\n      \"terminal_outcome\", \"none\",\n      \"attempt_state\", \"pending_retry_attempt\",\n      \"public_state\", \"delayed\",\n      -- Pending lineage for claim_execution to consume\n      \"pending_retry_reason\", A.failure_reason,\n      \"pending_previous_attempt_index\", core.current_attempt_index or \"0\",\n      \"retry_count\", tostring(retry_count + 1),\n      \"current_attempt_id\", \"\",\n      \"current_lease_id\", \"\",\n      \"current_worker_id\", \"\",\n      \"current_worker_instance_id\", \"\",\n      \"lease_expires_at\", \"\",\n      \"lease_last_renewed_at\", \"\",\n      \"lease_renewal_deadline\", \"\",\n      \"delay_until\", tostring(delay_until),\n      \"failure_reason\", A.failure_reason,\n      \"last_transition_at\", tostring(now_ms),\n      \"last_mutation_at\", tostring(now_ms))\n\n    -- Release lease + update indexes\n    redis.call(\"DEL\", K.lease_current_key)\n    redis.call(\"ZREM\", K.lease_expiry_key, A.execution_id)\n    redis.call(\"SREM\", K.worker_leases_key, A.execution_id)\n    redis.call(\"ZREM\", K.active_index_key, A.execution_id)\n    redis.call(\"ZREM\", K.attempt_timeout_key, A.execution_id)\n\n    -- ZADD to delayed index\n    redis.call(\"ZADD\", K.delayed_zset, delay_until, A.execution_id)\n\n    -- Lease history\n    redis.call(\"XADD\", K.lease_history_key, \"MAXLEN\", \"~\", \"1000\", \"*\",\n      \"event\", \"released\",\n      \"lease_id\", A.lease_id,\n      \"lease_epoch\", A.lease_epoch,\n      \"attempt_index\", core.current_attempt_index or \"\",\n      \"reason\", \"failed_retry_scheduled\",\n      \"ts\", tostring(now_ms))\n\n    return ok(\"retry_scheduled\", tostring(delay_until))\n  else\n    -- TERMINAL PATH\n    redis.call(\"HSET\", K.core_key,\n      \"lifecycle_phase\", \"terminal\",\n      \"ownership_state\", \"unowned\",\n      \"eligibility_state\", \"not_applicable\",\n      \"blocking_reason\", \"none\",\n      \"blocking_detail\", \"\",\n      \"terminal_outcome\", \"failed\",\n      \"attempt_state\", \"attempt_terminal\",\n      \"public_state\", \"failed\",\n      \"failure_reason\", A.failure_reason,\n      \"completed_at\", tostring(now_ms),\n      \"current_lease_id\", \"\",\n      \"current_worker_id\", \"\",\n      \"current_worker_instance_id\", \"\",\n      \"lease_expires_at\", \"\",\n      \"lease_last_renewed_at\", \"\",\n      \"lease_renewal_deadline\", \"\",\n      \"last_transition_at\", tostring(now_ms),\n      \"last_mutation_at\", tostring(now_ms))\n\n    -- Release lease + cleanup\n    redis.call(\"DEL\", K.lease_current_key)\n    redis.call(\"ZREM\", K.lease_expiry_key, A.execution_id)\n    redis.call(\"SREM\", K.worker_leases_key, A.execution_id)\n    redis.call(\"ZREM\", K.active_index_key, A.execution_id)\n    redis.call(\"ZREM\", K.attempt_timeout_key, A.execution_id)\n    redis.call(\"ZREM\", K.execution_deadline_key, A.execution_id)\n    redis.call(\"ZADD\", K.terminal_key, now_ms, A.execution_id)\n\n    redis.call(\"XADD\", K.lease_history_key, \"MAXLEN\", \"~\", \"1000\", \"*\",\n      \"event\", \"released\",\n      \"lease_id\", A.lease_id,\n      \"lease_epoch\", A.lease_epoch,\n      \"attempt_index\", core.current_attempt_index or \"\",\n      \"reason\", \"failed_terminal\",\n      \"ts\", tostring(now_ms))\n\n    -- Push-based DAG promotion (Batch C item 6). See ff_complete_execution\n    -- for rationale. A terminal-failed upstream triggers\n    -- child-skip cascades via ff_resolve_dependency on the receiver side.\n    if is_set(core.flow_id) then\n      local payload = cjson.encode({\n        execution_id = A.execution_id,\n        flow_id = core.flow_id,\n        outcome = \"failed\",\n      })\n      redis.call(\"PUBLISH\", \"ff:dag:completions\", payload)\n    end\n\n    return ok(\"terminal_failed\")\n  end\nend)\n\n---------------------------------------------------------------------------\n-- #26  ff_reclaim_execution\n--\n-- Atomically reclaim an expired/revoked execution: interrupt old attempt,\n-- create new attempt + new lease.\n--\n-- KEYS (14): exec_core, claim_grant, old_attempt_hash, old_stream_meta,\n--            new_attempt_hash, new_attempt_usage, attempts_zset,\n--            lease_current, lease_history, lease_expiry_zset,\n--            worker_leases, active_index, attempt_timeout_zset,\n--            execution_deadline_zset\n-- ARGV (8): execution_id, worker_id, worker_instance_id, lane,\n--           lease_id, lease_ttl_ms, attempt_id, attempt_policy_json\n---------------------------------------------------------------------------\nredis.register_function(\'ff_reclaim_execution\', function(keys, args)\n  local K = {\n    core_key              = keys[1],\n    claim_grant           = keys[2],\n    old_attempt_hash      = keys[3],\n    old_stream_meta       = keys[4],\n    new_attempt_hash      = keys[5],\n    new_attempt_usage     = keys[6],\n    attempts_zset         = keys[7],\n    lease_current_key     = keys[8],\n    lease_history_key     = keys[9],\n    lease_expiry_key      = keys[10],\n    worker_leases_key     = keys[11],\n    active_index_key      = keys[12],\n    attempt_timeout_key   = keys[13],\n    execution_deadline_key = keys[14],\n  }\n\n  local reclaim_ttl_n = require_number(args[6], \"lease_ttl_ms\")\n  if type(reclaim_ttl_n) == \"table\" then return reclaim_ttl_n end\n\n  local A = {\n    execution_id        = args[1],\n    worker_id           = args[2],\n    worker_instance_id  = args[3],\n    lane                = args[4],\n    lease_id            = args[5],\n    lease_ttl_ms        = reclaim_ttl_n,\n    attempt_id          = args[7],\n    attempt_policy_json = args[8] or \"\",\n  }\n\n  local t = redis.call(\"TIME\")\n  local now_ms = tonumber(t[1]) * 1000 + math.floor(tonumber(t[2]) / 1000)\n\n  -- 1. Validate execution state\n  local raw = redis.call(\"HGETALL\", K.core_key)\n  if #raw == 0 then return err(\"execution_not_found\") end\n  local core = hgetall_to_table(raw)\n\n  -- Must be active + reclaimable\n  if core.lifecycle_phase ~= \"active\" then\n    return err(\"execution_not_reclaimable\")\n  end\n  if core.ownership_state ~= \"lease_expired_reclaimable\"\n    and core.ownership_state ~= \"lease_revoked\" then\n    return err(\"execution_not_reclaimable\")\n  end\n\n  -- Check max_reclaim_count\n  local reclaim_count = tonumber(core.lease_reclaim_count or \"0\")\n  local max_reclaim = 100  -- default\n  -- Read from policy if available\n  local policy_key = string.gsub(K.core_key, \":core$\", \":policy\")\n  local policy_raw = redis.call(\"GET\", policy_key)\n  if policy_raw then\n    local ok_p, policy = pcall(cjson.decode, policy_raw)\n    if ok_p and type(policy) == \"table\" then\n      max_reclaim = tonumber(policy.max_reclaim_count or \"100\")\n    end\n  end\n\n  if reclaim_count >= max_reclaim then\n    -- Terminal: max reclaims exceeded\n    redis.call(\"HSET\", K.core_key,\n      \"lifecycle_phase\", \"terminal\",\n      \"ownership_state\", \"unowned\",\n      \"eligibility_state\", \"not_applicable\",\n      \"blocking_reason\", \"none\",\n      \"blocking_detail\", \"\",\n      \"terminal_outcome\", \"failed\",\n      \"attempt_state\", \"attempt_terminal\",\n      \"public_state\", \"failed\",\n      \"failure_reason\", \"max_reclaims_exceeded\",\n      \"completed_at\", tostring(now_ms),\n      \"current_lease_id\", \"\",\n      \"current_worker_id\", \"\",\n      \"current_worker_instance_id\", \"\",\n      \"last_transition_at\", tostring(now_ms),\n      \"last_mutation_at\", tostring(now_ms))\n\n    redis.call(\"DEL\", K.lease_current_key)\n    redis.call(\"ZREM\", K.lease_expiry_key, A.execution_id)\n    redis.call(\"SREM\", K.worker_leases_key, A.execution_id)\n    -- Dynamic worker_leases SREM: K.worker_leases_key targets the NEW (reclaiming)\n    -- worker\'s set, but the entry is in the OLD (expired) worker\'s set. Use\n    -- current_worker_instance_id from exec_core to SREM from the correct set.\n    local wiid = core.current_worker_instance_id or \"\"\n    if wiid ~= \"\" then\n      local tag_wl = string.match(K.core_key, \"(%b{})\")\n      redis.call(\"SREM\", \"ff:idx:\" .. tag_wl .. \":worker:\" .. wiid .. \":leases\", A.execution_id)\n    end\n    redis.call(\"ZREM\", K.active_index_key, A.execution_id)\n    redis.call(\"ZREM\", K.attempt_timeout_key, A.execution_id)\n    redis.call(\"ZREM\", K.execution_deadline_key, A.execution_id)\n\n    -- ZADD terminal (construct key from hash tag + lane)\n    local tag = string.match(K.core_key, \"(%b{})\")\n    local lane = core.lane_id or core.current_lane or \"default\"\n    local terminal_key = \"ff:idx:\" .. tag .. \":lane:\" .. lane .. \":terminal\"\n    redis.call(\"ZADD\", terminal_key, now_ms, A.execution_id)\n\n    return err(\"max_retries_exhausted\")\n  end\n\n  -- 2. Validate and consume claim grant\n  local grant_raw = redis.call(\"HGETALL\", K.claim_grant)\n  if #grant_raw == 0 then return err(\"invalid_claim_grant\") end\n  local grant = hgetall_to_table(grant_raw)\n  if grant.worker_id ~= A.worker_id then return err(\"invalid_claim_grant\") end\n  redis.call(\"DEL\", K.claim_grant)\n\n  -- 3. Interrupt old attempt\n  local old_att_idx = core.current_attempt_index or \"0\"\n  redis.call(\"HSET\", K.old_attempt_hash,\n    \"attempt_state\", \"interrupted_reclaimed\",\n    \"ended_at\", tostring(now_ms),\n    \"failure_reason\", \"lease_\" .. (core.ownership_state or \"expired\"))\n\n  -- Close old stream if exists\n  if redis.call(\"EXISTS\", K.old_stream_meta) == 1 then\n    redis.call(\"HSET\", K.old_stream_meta,\n      \"closed_at\", tostring(now_ms),\n      \"closed_reason\", \"reclaimed\")\n  end\n\n  -- 4. Create new attempt\n  --    Construct actual attempt key from hash tag + computed index\n  --    (same dynamic-key pattern as ff_claim_execution).\n  local next_epoch = tonumber(core.current_lease_epoch or \"0\") + 1\n  local next_att_idx = tonumber(core.total_attempt_count or \"0\")\n  local expires_at = now_ms + A.lease_ttl_ms\n  local renewal_deadline = now_ms + math.floor(A.lease_ttl_ms * 2 / 3)\n\n  local tag = string.match(K.core_key, \"(%b{})\")\n  local att_key = \"ff:attempt:\" .. tag .. \":\" .. A.execution_id .. \":\" .. tostring(next_att_idx)\n  local att_usage_key = att_key .. \":usage\"\n\n  redis.call(\"HSET\", att_key,\n    \"attempt_id\", A.attempt_id,\n    \"execution_id\", A.execution_id,\n    \"attempt_index\", tostring(next_att_idx),\n    \"attempt_type\", \"reclaim\",\n    \"attempt_state\", \"started\",\n    \"created_at\", tostring(now_ms),\n    \"started_at\", tostring(now_ms),\n    \"lease_id\", A.lease_id,\n    \"lease_epoch\", tostring(next_epoch),\n    \"worker_id\", A.worker_id,\n    \"worker_instance_id\", A.worker_instance_id,\n    \"reclaim_reason\", \"lease_\" .. (core.ownership_state or \"expired\"),\n    \"previous_attempt_index\", old_att_idx)\n  redis.call(\"ZADD\", K.attempts_zset, now_ms, tostring(next_att_idx))\n\n  -- Initialize usage counters\n  redis.call(\"HSET\", att_usage_key, \"last_usage_report_seq\", \"0\")\n\n  -- 5. Create new lease\n  redis.call(\"DEL\", K.lease_current_key)\n  redis.call(\"HSET\", K.lease_current_key,\n    \"lease_id\", A.lease_id,\n    \"lease_epoch\", tostring(next_epoch),\n    \"execution_id\", A.execution_id,\n    \"attempt_id\", A.attempt_id,\n    \"worker_id\", A.worker_id,\n    \"worker_instance_id\", A.worker_instance_id,\n    \"acquired_at\", tostring(now_ms),\n    \"expires_at\", tostring(expires_at),\n    \"last_renewed_at\", tostring(now_ms),\n    \"renewal_deadline\", tostring(renewal_deadline))\n\n  -- 6. Update exec_core \u{2014} ALL 7 state vector dimensions\n  redis.call(\"HSET\", K.core_key,\n    \"lifecycle_phase\", \"active\",\n    \"ownership_state\", \"leased\",\n    \"eligibility_state\", \"not_applicable\",\n    \"blocking_reason\", \"none\",\n    \"blocking_detail\", \"\",\n    \"terminal_outcome\", \"none\",\n    \"attempt_state\", \"running_attempt\",\n    \"public_state\", \"active\",\n    \"current_attempt_index\", tostring(next_att_idx),\n    \"total_attempt_count\", tostring(next_att_idx + 1),\n    \"current_attempt_id\", A.attempt_id,\n    \"current_lease_id\", A.lease_id,\n    \"current_lease_epoch\", tostring(next_epoch),\n    \"current_worker_id\", A.worker_id,\n    \"current_worker_instance_id\", A.worker_instance_id,\n    \"current_lane\", A.lane,\n    \"lease_acquired_at\", tostring(now_ms),\n    \"lease_expires_at\", tostring(expires_at),\n    \"lease_last_renewed_at\", tostring(now_ms),\n    \"lease_renewal_deadline\", tostring(renewal_deadline),\n    \"lease_expired_at\", \"\",\n    \"lease_revoked_at\", \"\",\n    \"lease_revoke_reason\", \"\",\n    \"lease_reclaim_count\", tostring(reclaim_count + 1),\n    \"last_transition_at\", tostring(now_ms),\n    \"last_mutation_at\", tostring(now_ms))\n\n  -- 7. Update indexes\n  redis.call(\"ZADD\", K.lease_expiry_key, expires_at, A.execution_id)\n  redis.call(\"SADD\", K.worker_leases_key, A.execution_id)\n  redis.call(\"ZADD\", K.active_index_key, expires_at, A.execution_id)\n\n  -- 8. Lease history events\n  redis.call(\"XADD\", K.lease_history_key, \"MAXLEN\", \"~\", 1000, \"*\",\n    \"event\", \"reclaimed\",\n    \"old_lease_epoch\", core.current_lease_epoch or \"\",\n    \"new_lease_id\", A.lease_id,\n    \"new_lease_epoch\", tostring(next_epoch),\n    \"new_attempt_id\", A.attempt_id,\n    \"new_attempt_index\", tostring(next_att_idx),\n    \"worker_id\", A.worker_id,\n    \"ts\", tostring(now_ms))\n\n  return ok(A.lease_id, tostring(next_epoch), tostring(expires_at),\n            A.attempt_id, tostring(next_att_idx), \"reclaim\")\nend)\n\n---------------------------------------------------------------------------\n-- #29  ff_expire_execution\n--\n-- Timeout-based expiration. Handles active, runnable, and suspended phases.\n-- Called by the attempt_timeout and execution_deadline scanners.\n--\n-- KEYS (14): exec_core, attempt_hash, stream_meta, lease_current,\n--            lease_history, lease_expiry_zset, worker_leases,\n--            active_index, terminal_zset, attempt_timeout_zset,\n--            execution_deadline_zset, suspended_zset,\n--            suspension_timeout_zset, suspension_current\n-- ARGV (2): execution_id, expire_reason\n---------------------------------------------------------------------------\nredis.register_function(\'ff_expire_execution\', function(keys, args)\n  local K = {\n    core_key                = keys[1],\n    attempt_hash            = keys[2],\n    stream_meta             = keys[3],\n    lease_current_key       = keys[4],\n    lease_history_key       = keys[5],\n    lease_expiry_key        = keys[6],\n    worker_leases_key       = keys[7],\n    active_index_key        = keys[8],\n    terminal_key            = keys[9],\n    attempt_timeout_key     = keys[10],\n    execution_deadline_key  = keys[11],\n    suspended_zset          = keys[12],\n    suspension_timeout_key  = keys[13],\n    suspension_current      = keys[14],\n  }\n\n  local A = {\n    execution_id = args[1],\n    expire_reason = args[2] or \"attempt_timeout\",\n  }\n\n  local t = redis.call(\"TIME\")\n  local now_ms = tonumber(t[1]) * 1000 + math.floor(tonumber(t[2]) / 1000)\n\n  local raw = redis.call(\"HGETALL\", K.core_key)\n  if #raw == 0 then\n    redis.call(\"ZREM\", K.attempt_timeout_key, A.execution_id)\n    redis.call(\"ZREM\", K.execution_deadline_key, A.execution_id)\n    return ok(\"not_found_cleaned\")\n  end\n  local core = hgetall_to_table(raw)\n\n  -- Already terminal \u{2014} no-op\n  if core.lifecycle_phase == \"terminal\" then\n    redis.call(\"ZREM\", K.attempt_timeout_key, A.execution_id)\n    redis.call(\"ZREM\", K.execution_deadline_key, A.execution_id)\n    return ok(\"already_terminal\")\n  end\n\n  -- PATH: Active\n  if core.lifecycle_phase == \"active\" then\n    -- End attempt\n    if is_set(core.current_attempt_index) then\n      redis.call(\"HSET\", K.attempt_hash,\n        \"attempt_state\", \"ended_failure\",\n        \"ended_at\", tostring(now_ms),\n        \"failure_reason\", A.expire_reason)\n    end\n    -- Close stream\n    if redis.call(\"EXISTS\", K.stream_meta) == 1 then\n      redis.call(\"HSET\", K.stream_meta,\n        \"closed_at\", tostring(now_ms),\n        \"closed_reason\", \"expired\")\n    end\n    -- Release lease\n    redis.call(\"DEL\", K.lease_current_key)\n    redis.call(\"ZREM\", K.lease_expiry_key, A.execution_id)\n    redis.call(\"SREM\", K.worker_leases_key, A.execution_id)\n    -- Dynamic worker_leases SREM: the scanner passes empty WorkerInstanceId\n    -- in KEYS[7] (it can\'t know which worker holds the lease). Use the actual\n    -- worker_instance_id from exec_core to SREM from the correct set.\n    local wiid = core.current_worker_instance_id or \"\"\n    if wiid ~= \"\" then\n      local tag = string.match(K.core_key, \"(%b{})\")\n      redis.call(\"SREM\", \"ff:idx:\" .. tag .. \":worker:\" .. wiid .. \":leases\", A.execution_id)\n    end\n    redis.call(\"ZREM\", K.active_index_key, A.execution_id)\n    -- Lease history\n    redis.call(\"XADD\", K.lease_history_key, \"MAXLEN\", \"~\", \"1000\", \"*\",\n      \"event\", \"released\",\n      \"lease_id\", core.current_lease_id or \"\",\n      \"lease_epoch\", core.current_lease_epoch or \"\",\n      \"attempt_index\", core.current_attempt_index or \"\",\n      \"reason\", \"expired:\" .. A.expire_reason,\n      \"ts\", tostring(now_ms))\n  end\n\n  -- PATH: Suspended\n  if core.lifecycle_phase == \"suspended\" then\n    -- End attempt\n    if is_set(core.current_attempt_index) then\n      redis.call(\"HSET\", K.attempt_hash,\n        \"attempt_state\", \"ended_failure\",\n        \"ended_at\", tostring(now_ms),\n        \"failure_reason\", A.expire_reason)\n    end\n    -- Close suspension\n    if redis.call(\"EXISTS\", K.suspension_current) == 1 then\n      redis.call(\"HSET\", K.suspension_current,\n        \"closed_at\", tostring(now_ms),\n        \"close_reason\", \"expired:\" .. A.expire_reason)\n    end\n    -- Close stream\n    if redis.call(\"EXISTS\", K.stream_meta) == 1 then\n      redis.call(\"HSET\", K.stream_meta,\n        \"closed_at\", tostring(now_ms),\n        \"closed_reason\", \"expired\")\n    end\n    -- ZREM from suspended indexes\n    redis.call(\"ZREM\", K.suspended_zset, A.execution_id)\n    redis.call(\"ZREM\", K.suspension_timeout_key, A.execution_id)\n  end\n\n  -- PATH: Runnable (execution_deadline fires while execution is waiting/delayed/blocked)\n  -- These indexes are not in the 14 KEYS array, so construct dynamically from\n  -- hash tag + lane_id (same C2 pattern as ff_cancel_execution).\n  if core.lifecycle_phase == \"runnable\" then\n    local tag = string.match(K.core_key, \"(%b{})\")\n    local lane = core.lane_id or core.current_lane or \"default\"\n    local es = core.eligibility_state or \"\"\n\n    if es == \"eligible_now\" then\n      redis.call(\"ZREM\", \"ff:idx:\" .. tag .. \":lane:\" .. lane .. \":eligible\", A.execution_id)\n    elseif es == \"not_eligible_until_time\" then\n      redis.call(\"ZREM\", \"ff:idx:\" .. tag .. \":lane:\" .. lane .. \":delayed\", A.execution_id)\n    elseif es == \"blocked_by_dependencies\" then\n      redis.call(\"ZREM\", \"ff:idx:\" .. tag .. \":lane:\" .. lane .. \":blocked:dependencies\", A.execution_id)\n    elseif es == \"blocked_by_budget\" then\n      redis.call(\"ZREM\", \"ff:idx:\" .. tag .. \":lane:\" .. lane .. \":blocked:budget\", A.execution_id)\n    elseif es == \"blocked_by_quota\" then\n      redis.call(\"ZREM\", \"ff:idx:\" .. tag .. \":lane:\" .. lane .. \":blocked:quota\", A.execution_id)\n    elseif es == \"blocked_by_route\" then\n      redis.call(\"ZREM\", \"ff:idx:\" .. tag .. \":lane:\" .. lane .. \":blocked:route\", A.execution_id)\n    elseif es == \"blocked_by_operator\" then\n      redis.call(\"ZREM\", \"ff:idx:\" .. tag .. \":lane:\" .. lane .. \":blocked:operator\", A.execution_id)\n    else\n      -- Defensive catch-all: handles blocked_by_lane_state and any future\n      -- eligibility states. ZREM from ALL runnable-state indexes \u{2014} idempotent.\n      local lp = \"ff:idx:\" .. tag .. \":lane:\" .. lane\n      redis.call(\"ZREM\", lp .. \":eligible\", A.execution_id)\n      redis.call(\"ZREM\", lp .. \":delayed\", A.execution_id)\n      redis.call(\"ZREM\", lp .. \":blocked:dependencies\", A.execution_id)\n      redis.call(\"ZREM\", lp .. \":blocked:budget\", A.execution_id)\n      redis.call(\"ZREM\", lp .. \":blocked:quota\", A.execution_id)\n      redis.call(\"ZREM\", lp .. \":blocked:route\", A.execution_id)\n      redis.call(\"ZREM\", lp .. \":blocked:operator\", A.execution_id)\n    end\n  end\n\n  -- ALL PATHS: terminal transition\n  local att_state = \"attempt_terminal\"\n  if not is_set(core.current_attempt_index) then\n    att_state = core.attempt_state or \"none\"\n  end\n\n  redis.call(\"HSET\", K.core_key,\n    \"lifecycle_phase\", \"terminal\",\n    \"ownership_state\", \"unowned\",\n    \"eligibility_state\", \"not_applicable\",\n    \"blocking_reason\", \"none\",\n    \"blocking_detail\", \"\",\n    \"terminal_outcome\", \"expired\",\n    \"attempt_state\", att_state,\n    \"public_state\", \"expired\",\n    \"failure_reason\", A.expire_reason,\n    \"completed_at\", tostring(now_ms),\n    \"current_lease_id\", \"\",\n    \"current_worker_id\", \"\",\n    \"current_worker_instance_id\", \"\",\n    \"lease_expires_at\", \"\",\n    \"current_suspension_id\", \"\",\n    \"current_waitpoint_id\", \"\",\n    \"last_transition_at\", tostring(now_ms),\n    \"last_mutation_at\", tostring(now_ms))\n\n  -- Cleanup timeout indexes + ZADD terminal\n  redis.call(\"ZREM\", K.attempt_timeout_key, A.execution_id)\n  redis.call(\"ZREM\", K.execution_deadline_key, A.execution_id)\n  redis.call(\"ZADD\", K.terminal_key, now_ms, A.execution_id)\n\n  return ok(\"expired\", core.lifecycle_phase)\nend)\n\n\n-- source: lua/scheduling.lua\n-- FlowFabric scheduling functions\n-- Reference: RFC-009 (Scheduling), RFC-010 \u{a7}4 (function inventory)\n--\n-- Depends on helpers: ok, err, hgetall_to_table, is_set, validate_lease\n\n---------------------------------------------------------------------------\n-- Capability matching helpers (local to scheduling.lua)\n-- Bounds CAPS_MAX_BYTES / CAPS_MAX_TOKENS live in helpers.lua and are\n-- enforced symmetrically on worker caps here and on required caps in\n-- ff_create_execution so neither side can smuggle in an oversized list.\n---------------------------------------------------------------------------\n\n-- Parse a capability CSV into a {token=true} set. Empty/nil \u{2192} empty set.\n-- Returns (set, nil) on success or (nil, err_tuple) on bound violation.\n--\n-- Empty tokens (from stray separators like \"a,,b\") are skipped BEFORE the\n-- count check so a legitimate list punctuated by noise isn\'t rejected.\n-- Real oversize input still fails because #csv > CAPS_MAX_BYTES catches it\n-- before this loop runs.\nlocal function parse_capability_csv(csv, kind)\n  if csv == nil or csv == \"\" then\n    return {}, nil\n  end\n  if #csv > CAPS_MAX_BYTES then\n    return nil, err(\"invalid_capabilities\", kind .. \":too_many_bytes\")\n  end\n  local set = {}\n  local n = 0\n  for token in string.gmatch(csv, \"([^,]+)\") do\n    if #token > 0 then\n      n = n + 1\n      if n > CAPS_MAX_TOKENS then\n        return nil, err(\"invalid_capabilities\", kind .. \":too_many_tokens\")\n      end\n      set[token] = true\n    end\n  end\n  return set, nil\nend\n\n-- Return sorted CSV of tokens present in `required` but missing from\n-- `worker_caps`. Empty result means worker satisfies all requirements.\nlocal function missing_capabilities(required, worker_caps)\n  local missing = {}\n  for cap, _ in pairs(required) do\n    if not worker_caps[cap] then\n      missing[#missing + 1] = cap\n    end\n  end\n  table.sort(missing)\n  return table.concat(missing, \",\")\nend\n\n---------------------------------------------------------------------------\n-- #25  ff_issue_claim_grant\n--\n-- Scheduler issues a claim grant for an eligible execution.\n-- Validates execution is eligible, writes grant hash with TTL,\n-- removes from eligible set.\n--\n-- KEYS (3): exec_core, claim_grant_key, eligible_zset\n-- ARGV (9): execution_id, worker_id, worker_instance_id,\n--           lane_id, capability_hash, grant_ttl_ms,\n--           route_snapshot_json, admission_summary,\n--           worker_capabilities_csv  -- sorted CSV of worker caps (option a)\n--\n-- Capability matching (RFC-009):\n--   If exec_core.required_capabilities (sorted CSV on exec_core) is empty,\n--   any worker matches (backwards compat). Otherwise the worker\'s sorted\n--   CSV must be a superset.\n--   On mismatch: Lua stamps `last_capability_mismatch_at` (single scalar\n--   field, idempotent write \u{2014} no unbounded counter) and returns\n--   err(\"capability_mismatch\", missing_csv). The scheduler side MUST\n--   then block the execution off the eligible ZSET (see\n--   ff_block_execution_for_admission with reason `waiting_for_capability`),\n--   otherwise ZRANGEBYSCORE keeps returning the same top-of-zset every\n--   tick and 100 workers \u{d7} 1 tick/s = hot-loop starvation. RFC-009 \u{a7}564.\n---------------------------------------------------------------------------\nredis.register_function(\'ff_issue_claim_grant\', function(keys, args)\n  local K = {\n    core_key       = keys[1],\n    claim_grant    = keys[2],\n    eligible_zset  = keys[3],\n  }\n\n  local A = {\n    execution_id            = args[1],\n    worker_id               = args[2],\n    worker_instance_id      = args[3],\n    lane_id                 = args[4],\n    capability_hash         = args[5] or \"\",\n    route_snapshot_json     = args[7] or \"\",\n    admission_summary       = args[8] or \"\",\n    worker_capabilities_csv = args[9] or \"\",\n  }\n\n  local grant_ttl_n = require_number(args[6], \"grant_ttl_ms\")\n  if type(grant_ttl_n) == \"table\" then return grant_ttl_n end\n  A.grant_ttl_ms = grant_ttl_n\n\n  local t = redis.call(\"TIME\")\n  local now_ms = tonumber(t[1]) * 1000 + math.floor(tonumber(t[2]) / 1000)\n\n  -- 1. Validate execution exists and is eligible\n  local raw = redis.call(\"HGETALL\", K.core_key)\n  if #raw == 0 then return err(\"execution_not_found\") end\n  local core = hgetall_to_table(raw)\n\n  if core.lifecycle_phase ~= \"runnable\" then\n    return err(\"execution_not_eligible\")\n  end\n  if core.eligibility_state ~= \"eligible_now\" then\n    return err(\"execution_not_eligible\")\n  end\n\n  -- 2. Check no existing grant (prevent double-grant)\n  if redis.call(\"EXISTS\", K.claim_grant) == 1 then\n    return err(\"grant_already_exists\")\n  end\n\n  -- 3. Verify execution is in eligible set (TOCTOU guard)\n  local score = redis.call(\"ZSCORE\", K.eligible_zset, A.execution_id)\n  if not score then\n    return err(\"execution_not_in_eligible_set\")\n  end\n\n  -- 4. Capability matching. On miss we stamp a SINGLE bounded field \u{2014}\n  -- `last_capability_mismatch_at` \u{2014} so operators can SCAN for stuck\n  -- executions via `HGET last_capability_mismatch_at < now - 1h` without\n  -- needing a counter. An earlier version HINCRBY\'d a counter; that was\n  -- dropped because combined with the hot-loop bug (executions staying in\n  -- the eligible ZSET after mismatch) the counter grew unboundedly (2.4M\n  -- increments/day on one stuck exec_core under 100 workers). An HSET of\n  -- a fixed field is idempotent w.r.t. size.\n  --\n  -- The scheduler MUST block the execution off the eligible ZSET after\n  -- this err returns; otherwise the next tick picks the same top-of-zset\n  -- and we wasted this validation. See Scheduler::claim_for_worker.\n  local required_set, req_err = parse_capability_csv(\n    core.required_capabilities or \"\", \"required\")\n  if req_err then return req_err end\n  local worker_set, wrk_err = parse_capability_csv(\n    A.worker_capabilities_csv, \"worker\")\n  if wrk_err then return wrk_err end\n  if next(required_set) ~= nil then\n    local missing = missing_capabilities(required_set, worker_set)\n    if missing ~= \"\" then\n      redis.call(\"HSET\", K.core_key,\n        \"last_capability_mismatch_at\", tostring(now_ms))\n      return err(\"capability_mismatch\", missing)\n    end\n  end\n\n  -- 5. Write grant hash with TTL\n  local grant_expires_at = now_ms + A.grant_ttl_ms\n  redis.call(\"HSET\", K.claim_grant,\n    \"worker_id\", A.worker_id,\n    \"worker_instance_id\", A.worker_instance_id,\n    \"lane_id\", A.lane_id,\n    \"capability_hash\", A.capability_hash,\n    \"route_snapshot_json\", A.route_snapshot_json,\n    \"admission_summary\", A.admission_summary,\n    \"created_at\", tostring(now_ms),\n    \"grant_expires_at\", tostring(grant_expires_at))\n  redis.call(\"PEXPIREAT\", K.claim_grant, grant_expires_at)\n\n  -- 6. Do NOT ZREM from eligible here. ff_claim_execution does the ZREM\n  -- when consuming the grant. If the grant expires unconsumed, the execution\n  -- remains in the eligible set and is re-discovered by the next scheduler\n  -- cycle. This prevents the \"orphaned grant\" stuck state where an execution\n  -- is in no scheduling index after grant expiry.\n\n  return ok(A.execution_id)\nend)\n\n---------------------------------------------------------------------------\n-- #32  ff_change_priority\n--\n-- Update priority and re-score in eligible ZSET.\n-- Only works for runnable + eligible_now executions.\n--\n-- KEYS (2): exec_core, eligible_zset\n-- ARGV (2): execution_id, new_priority\n---------------------------------------------------------------------------\nredis.register_function(\'ff_change_priority\', function(keys, args)\n  local K = {\n    core_key      = keys[1],\n    eligible_zset = keys[2],\n  }\n\n  local new_priority_n = require_number(args[2], \"new_priority\")\n  if type(new_priority_n) == \"table\" then return new_priority_n end\n\n  local A = {\n    execution_id = args[1],\n    new_priority = new_priority_n,\n  }\n\n  local t = redis.call(\"TIME\")\n  local now_ms = tonumber(t[1]) * 1000 + math.floor(tonumber(t[2]) / 1000)\n\n  -- 1. Read and validate\n  local raw = redis.call(\"HGETALL\", K.core_key)\n  if #raw == 0 then return err(\"execution_not_found\") end\n  local core = hgetall_to_table(raw)\n\n  if core.lifecycle_phase ~= \"runnable\" then\n    return err(\"execution_not_eligible\")\n  end\n  if core.eligibility_state ~= \"eligible_now\" then\n    return err(\"execution_not_eligible\")\n  end\n\n  local old_priority = tonumber(core.priority or \"0\")\n\n  -- Clamp to safe range (same as ff_create_execution)\n  if A.new_priority < 0 then A.new_priority = 0 end\n  if A.new_priority > 9000 then A.new_priority = 9000 end\n\n  -- 2. Update exec_core priority\n  redis.call(\"HSET\", K.core_key,\n    \"priority\", tostring(A.new_priority),\n    \"last_mutation_at\", tostring(now_ms))\n\n  -- 3. Re-score in eligible ZSET\n  -- Composite score: -(priority * 1_000_000_000_000) + created_at_ms\n  local created_at = tonumber(core.created_at or \"0\")\n  local new_score = 0 - (A.new_priority * 1000000000000) + created_at\n  redis.call(\"ZADD\", K.eligible_zset, new_score, A.execution_id)\n\n  return ok(tostring(old_priority), tostring(A.new_priority))\nend)\n\n---------------------------------------------------------------------------\n-- #33  ff_update_progress\n--\n-- Update progress fields on exec_core. Validate lease (lite check:\n-- lease_id + epoch only \u{2014} attempt_id not required per \u{a7}4 Class B).\n--\n-- KEYS (1): exec_core\n-- ARGV (5): execution_id, lease_id, lease_epoch,\n--           progress_pct, progress_message\n---------------------------------------------------------------------------\nredis.register_function(\'ff_update_progress\', function(keys, args)\n  local K = {\n    core_key = keys[1],\n  }\n\n  local A = {\n    execution_id    = args[1],\n    lease_id        = args[2],\n    lease_epoch     = args[3],\n    progress_pct    = args[4] or \"\",\n    progress_message = args[5] or \"\",\n  }\n\n  local t = redis.call(\"TIME\")\n  local now_ms = tonumber(t[1]) * 1000 + math.floor(tonumber(t[2]) / 1000)\n\n  -- Read and validate\n  local raw = redis.call(\"HGETALL\", K.core_key)\n  if #raw == 0 then return err(\"execution_not_found\") end\n  local core = hgetall_to_table(raw)\n\n  if core.lifecycle_phase ~= \"active\" then\n    return err(\"execution_not_active\",\n      core.terminal_outcome or \"\", core.current_lease_epoch or \"\")\n  end\n  if core.ownership_state == \"lease_revoked\" then\n    return err(\"lease_revoked\")\n  end\n  if tonumber(core.lease_expires_at or \"0\") <= now_ms then\n    return err(\"lease_expired\")\n  end\n  if core.current_lease_id ~= A.lease_id then\n    return err(\"stale_lease\")\n  end\n  if core.current_lease_epoch ~= A.lease_epoch then\n    return err(\"stale_lease\")\n  end\n\n  -- Update progress fields\n  local fields = { \"last_mutation_at\", tostring(now_ms), \"progress_updated_at\", tostring(now_ms) }\n  if is_set(A.progress_pct) then\n    fields[#fields + 1] = \"progress_pct\"\n    fields[#fields + 1] = A.progress_pct\n  end\n  if is_set(A.progress_message) then\n    fields[#fields + 1] = \"progress_message\"\n    fields[#fields + 1] = A.progress_message\n  end\n  redis.call(\"HSET\", K.core_key, unpack(fields))\n\n  return ok()\nend)\n\n---------------------------------------------------------------------------\n-- #27  ff_promote_delayed\n--\n-- Promote a delayed execution to eligible when its delay_until has passed.\n-- Called by the delayed promoter scanner.\n-- Preserves attempt_state (may be pending_retry, pending_first, or\n-- attempt_interrupted from delay_execution).\n--\n-- KEYS (3): exec_core, delayed_zset, eligible_zset\n-- ARGV (2): execution_id, now_ms\n---------------------------------------------------------------------------\nredis.register_function(\'ff_promote_delayed\', function(keys, args)\n  local K = {\n    core_key      = keys[1],\n    delayed_zset  = keys[2],\n    eligible_zset = keys[3],\n  }\n\n  local now_ms_n = require_number(args[2], \"now_ms\")\n  if type(now_ms_n) == \"table\" then return now_ms_n end\n\n  local A = {\n    execution_id = args[1],\n    now_ms       = now_ms_n,\n  }\n\n  -- Read and validate\n  local raw = redis.call(\"HGETALL\", K.core_key)\n  if #raw == 0 then\n    -- Execution gone \u{2014} clean up stale index entry\n    redis.call(\"ZREM\", K.delayed_zset, A.execution_id)\n    return ok(\"not_found_cleaned\")\n  end\n  local core = hgetall_to_table(raw)\n\n  -- Must be runnable + not_eligible_until_time\n  if core.lifecycle_phase ~= \"runnable\" then\n    redis.call(\"ZREM\", K.delayed_zset, A.execution_id)\n    return ok(\"not_runnable_cleaned\")\n  end\n  if core.eligibility_state ~= \"not_eligible_until_time\" then\n    redis.call(\"ZREM\", K.delayed_zset, A.execution_id)\n    return ok(\"not_delayed_cleaned\")\n  end\n\n  -- Check delay_until has actually passed\n  local delay_until = tonumber(core.delay_until or \"0\")\n  if delay_until > A.now_ms then\n    return ok(\"not_yet_due\")\n  end\n\n  -- Promote: update 6 of 7 state vector dimensions.\n  -- attempt_state is DELIBERATELY PRESERVED (not written). This is the 7th dim.\n  --\n  -- WHY: The caller that put this execution into the delayed set already set\n  -- the attempt_state to reflect what should happen on next claim:\n  --   * pending_retry_attempt  \u{2014} from ff_fail_execution (retry backoff expired)\n  --   * pending_replay_attempt \u{2014} from ff_replay_execution (replay delay expired)\n  --   * attempt_interrupted    \u{2014} from ff_delay_execution (worker self-delay)\n  --   * pending_first_attempt  \u{2014} from ff_create_execution (initial delay_until)\n  -- Overwriting it here would lose this routing information and break\n  -- claim_execution\'s attempt_type derivation (initial vs retry vs replay)\n  -- and the claim dispatch routing (claim_execution vs claim_resumed_execution).\n  redis.call(\"HSET\", K.core_key,\n    \"lifecycle_phase\", \"runnable\",\n    \"ownership_state\", \"unowned\",\n    \"eligibility_state\", \"eligible_now\",\n    \"blocking_reason\", \"waiting_for_worker\",\n    \"blocking_detail\", \"\",\n    \"terminal_outcome\", \"none\",\n    -- attempt_state: NOT WRITTEN \u{2014} see comment above\n    \"public_state\", \"waiting\",\n    \"delay_until\", \"\",\n    \"last_transition_at\", tostring(A.now_ms),\n    \"last_mutation_at\", tostring(A.now_ms))\n\n  -- ZREM from delayed, ZADD to eligible with composite priority score\n  redis.call(\"ZREM\", K.delayed_zset, A.execution_id)\n  local priority = tonumber(core.priority or \"0\")\n  local created_at = tonumber(core.created_at or \"0\")\n  local score = 0 - (priority * 1000000000000) + created_at\n  redis.call(\"ZADD\", K.eligible_zset, score, A.execution_id)\n\n  return ok(\"promoted\")\nend)\n\n---------------------------------------------------------------------------\n-- #26  ff_issue_reclaim_grant\n--\n-- TODO(batch-c): This function has NO production Rust caller as of Batch B.\n-- The reclaim scanner that would invoke it (to recover leases from crashed\n-- workers) is scheduled for cairn Batch C. A worker dying mid-execution today\n-- leaves its execution stuck in `lease_expired_reclaimable` until operator\n-- intervention. Test-only callers exist in crates/ff-test/tests to exercise\n-- the Lua side. When the scheduler reclaim integration lands, the caller\n-- must apply the same block-on-capability-mismatch pattern used by\n-- `ff-scheduler::Scheduler::claim_for_worker` (see the IMPORTANT note\n-- below) \u{2014} otherwise an unmatchable reclaim recycles every scanner tick.\n--\n-- Scheduler issues a reclaim grant for an expired/revoked execution.\n-- Similar to ff_issue_claim_grant but validates reclaimable state.\n--\n-- KEYS (3): exec_core, claim_grant_key, lease_expiry_zset\n-- ARGV (9): execution_id, worker_id, worker_instance_id,\n--           lane_id, capability_hash, grant_ttl_ms,\n--           route_snapshot_json, admission_summary,\n--           worker_capabilities_csv\n--\n-- Capability matching identical to ff_issue_claim_grant: reclaiming a lease\n-- must respect the execution\'s required_capabilities just like an initial\n-- claim, so a re-issuance to a non-matching worker is blocked here too.\n--\n-- IMPORTANT: on capability_mismatch this function does NOT remove the exec\n-- from the lease_expiry pool. The reclaim SCANNER (to be added in Rust) MUST\n-- detect capability_mismatch and move the execution into blocked_route with\n-- reason `waiting_for_capable_worker` (mirroring the claim-grant path). If\n-- the scanner instead re-attempts the same execution every tick, a reclaim\n-- hot-loop develops that is analogous to the claim-path hot-loop and\n-- identical in cost (wasted FCALLs + log volume). Lease_expiry as an index\n-- has no natural sweeping mechanism for post-mismatch promotion \u{2014} the\n-- scheduler-side block + periodic sweep owns the lifecycle.\n---------------------------------------------------------------------------\nredis.register_function(\'ff_issue_reclaim_grant\', function(keys, args)\n  local K = {\n    core_key       = keys[1],\n    claim_grant    = keys[2],\n    lease_expiry   = keys[3],\n  }\n\n  local A = {\n    execution_id            = args[1],\n    worker_id               = args[2],\n    worker_instance_id      = args[3],\n    lane_id                 = args[4],\n    capability_hash         = args[5] or \"\",\n    route_snapshot_json     = args[7] or \"\",\n    admission_summary       = args[8] or \"\",\n    worker_capabilities_csv = args[9] or \"\",\n  }\n\n  local grant_ttl_n = require_number(args[6], \"grant_ttl_ms\")\n  if type(grant_ttl_n) == \"table\" then return grant_ttl_n end\n  A.grant_ttl_ms = grant_ttl_n\n\n  local t = redis.call(\"TIME\")\n  local now_ms = tonumber(t[1]) * 1000 + math.floor(tonumber(t[2]) / 1000)\n\n  -- Validate execution exists and is reclaimable\n  local raw = redis.call(\"HGETALL\", K.core_key)\n  if #raw == 0 then return err(\"execution_not_found\") end\n  local core = hgetall_to_table(raw)\n\n  if core.lifecycle_phase ~= \"active\" then\n    return err(\"execution_not_reclaimable\")\n  end\n  if core.ownership_state ~= \"lease_expired_reclaimable\"\n    and core.ownership_state ~= \"lease_revoked\" then\n    return err(\"execution_not_reclaimable\")\n  end\n\n  -- Check no existing grant\n  if redis.call(\"EXISTS\", K.claim_grant) == 1 then\n    return err(\"grant_already_exists\")\n  end\n\n  -- Capability matching \u{2014} same policy as issue_claim_grant: stamp\n  -- last_capability_mismatch_at (single scalar) on miss so ops can surface\n  -- stuck reclaims via SCAN. Scheduler MUST also block-out the exec from\n  -- the lease_expiry reclaim pool; otherwise the reclaim scanner hits the\n  -- same mismatch every cycle. See Scheduler::reclaim_for_worker.\n  local required_set, req_err = parse_capability_csv(\n    core.required_capabilities or \"\", \"required\")\n  if req_err then return req_err end\n  local worker_set, wrk_err = parse_capability_csv(\n    A.worker_capabilities_csv, \"worker\")\n  if wrk_err then return wrk_err end\n  if next(required_set) ~= nil then\n    local missing = missing_capabilities(required_set, worker_set)\n    if missing ~= \"\" then\n      redis.call(\"HSET\", K.core_key,\n        \"last_capability_mismatch_at\", tostring(now_ms))\n      return err(\"capability_mismatch\", missing)\n    end\n  end\n\n  -- Write grant hash with TTL\n  local grant_expires_at = now_ms + A.grant_ttl_ms\n  redis.call(\"HSET\", K.claim_grant,\n    \"worker_id\", A.worker_id,\n    \"worker_instance_id\", A.worker_instance_id,\n    \"lane_id\", A.lane_id,\n    \"capability_hash\", A.capability_hash,\n    \"route_snapshot_json\", A.route_snapshot_json,\n    \"admission_summary\", A.admission_summary,\n    \"created_at\", tostring(now_ms),\n    \"grant_expires_at\", tostring(grant_expires_at))\n  redis.call(\"PEXPIREAT\", K.claim_grant, grant_expires_at)\n\n  -- Do NOT ZREM from lease_expiry \u{2014} stays for scheduler discovery\n\n  return ok(A.execution_id)\nend)\n\n\n-- source: lua/suspension.lua\n-- FlowFabric suspension and waitpoint functions\n-- Reference: RFC-004 (Suspension), RFC-005 (Signal), RFC-010 \u{a7}4\n--\n-- Depends on helpers: ok, err, ok_already_satisfied, hgetall_to_table,\n--   is_set, validate_lease_and_mark_expired, clear_lease_and_indexes,\n--   map_reason_to_blocking, initialize_condition, write_condition_hash,\n--   evaluate_signal_against_condition, is_condition_satisfied,\n--   extract_field, initial_signal_summary_json, validate_pending_waitpoint,\n--   assert_active_suspension, assert_waitpoint_belongs\n\n---------------------------------------------------------------------------\n-- #13  ff_suspend_execution\n--\n-- Validate lease, release ownership, create suspension + waitpoint\n-- (or activate pending), init condition, transition active \u{2192} suspended.\n-- Mints the waitpoint HMAC token (RFC-004 \u{a7}Waitpoint Security) returned\n-- alongside the waitpoint_id for external signal delivery.\n--\n-- KEYS (17): exec_core, attempt_record, lease_current, lease_history,\n--            lease_expiry_zset, worker_leases, suspension_current,\n--            waitpoint_hash, waitpoint_signals, suspension_timeout_zset,\n--            pending_wp_expiry_zset, active_index, suspended_zset,\n--            waitpoint_history, wp_condition, attempt_timeout_zset,\n--            hmac_secrets\n-- ARGV (17): execution_id, attempt_index, attempt_id, lease_id,\n--            lease_epoch, suspension_id, waitpoint_id, waitpoint_key,\n--            reason_code, requested_by, timeout_at, resume_condition_json,\n--            resume_policy_json, continuation_metadata_pointer,\n--            use_pending_waitpoint, timeout_behavior, lease_history_maxlen\n---------------------------------------------------------------------------\nredis.register_function(\'ff_suspend_execution\', function(keys, args)\n  local K = {\n    core_key              = keys[1],\n    attempt_record        = keys[2],\n    lease_current_key     = keys[3],\n    lease_history_key     = keys[4],\n    lease_expiry_key      = keys[5],\n    worker_leases_key     = keys[6],\n    suspension_current    = keys[7],\n    waitpoint_hash        = keys[8],\n    waitpoint_signals     = keys[9],\n    suspension_timeout_key = keys[10],\n    pending_wp_expiry_key = keys[11],\n    active_index_key      = keys[12],\n    suspended_zset        = keys[13],\n    waitpoint_history     = keys[14],\n    wp_condition          = keys[15],\n    attempt_timeout_key   = keys[16],\n    hmac_secrets          = keys[17],\n  }\n\n  local A = {\n    execution_id              = args[1],\n    attempt_index             = args[2],\n    attempt_id                = args[3],\n    lease_id                  = args[4],\n    lease_epoch               = args[5],\n    suspension_id             = args[6],\n    waitpoint_id              = args[7],\n    waitpoint_key             = args[8],\n    reason_code               = args[9],\n    requested_by              = args[10],\n    timeout_at                = args[11] or \"\",\n    resume_condition_json     = args[12],\n    resume_policy_json        = args[13],\n    continuation_metadata_ptr = args[14] or \"\",\n    use_pending_waitpoint     = args[15] or \"\",\n    timeout_behavior          = args[16] or \"fail\",\n    lease_history_maxlen      = tonumber(args[17] or \"1000\"),\n  }\n\n  local t = redis.call(\"TIME\")\n  local now_ms = tonumber(t[1]) * 1000 + math.floor(tonumber(t[2]) / 1000)\n\n  -- 1. Read execution core\n  local raw = redis.call(\"HGETALL\", K.core_key)\n  if #raw == 0 then return err(\"execution_not_found\") end\n  local core = hgetall_to_table(raw)\n\n  -- 2. Validate lease (full check incl. expiry + revocation + identity)\n  local lease_err = validate_lease_and_mark_expired(\n    core, A, now_ms, K, A.lease_history_maxlen)\n  if lease_err then return lease_err end\n\n  -- 3. Validate attempt binding\n  if tostring(core.current_attempt_index) ~= A.attempt_index then\n    return err(\"invalid_lease_for_suspend\")\n  end\n\n  -- 4. Check for existing suspension: reject if open, archive if closed\n  if redis.call(\"EXISTS\", K.suspension_current) == 1 then\n    local closed = redis.call(\"HGET\", K.suspension_current, \"closed_at\")\n    if not is_set(closed) then\n      return err(\"already_suspended\")\n    end\n    -- Previous suspension is closed. Archive old waitpoint_id for cleanup,\n    -- then DEL the stale record before creating a new one.\n    local old_wp = redis.call(\"HGET\", K.suspension_current, \"waitpoint_id\")\n    if is_set(old_wp) then\n      redis.call(\"SADD\", K.waitpoint_history, old_wp)\n    end\n    redis.call(\"DEL\", K.suspension_current)\n  end\n\n  -- 5. Create or activate waitpoint\n  local waitpoint_id = A.waitpoint_id\n  local waitpoint_key = A.waitpoint_key\n  local waitpoint_token = \"\"\n\n  if A.use_pending_waitpoint == \"1\" then\n    -- Activate existing pending waitpoint\n    local wp_raw = redis.call(\"HGETALL\", K.waitpoint_hash)\n    local wp_err = validate_pending_waitpoint(wp_raw, A.execution_id, A.attempt_index, now_ms)\n    if wp_err then return wp_err end\n\n    -- Read waitpoint_id, waitpoint_key, and existing token from pending record.\n    -- Token was minted at ff_create_pending_waitpoint time with that record\'s\n    -- created_at; we MUST keep using it so signals buffered before activation\n    -- validate against the same binding.\n    local wp = hgetall_to_table(wp_raw)\n    waitpoint_id = wp.waitpoint_id\n    waitpoint_key = wp.waitpoint_key\n    -- A pending waitpoint without a minted token is either a pre-HMAC-upgrade\n    -- record or a corrupted write. Activating with an empty token would return\n    -- \"\" to the SDK, and every subsequent signal delivery would reject with\n    -- missing_token \u{2014} fail-closed at the security boundary but silent about\n    -- the real degraded state. Surface the degradation AT the activation\n    -- point so operators see it immediately.\n    if not is_set(wp.waitpoint_token) then\n      return err(\"waitpoint_not_token_bound\")\n    end\n    waitpoint_token = wp.waitpoint_token\n\n    -- Activate the pending waitpoint\n    redis.call(\"HSET\", K.waitpoint_hash,\n      \"suspension_id\", A.suspension_id,\n      \"state\", \"active\",\n      \"activated_at\", tostring(now_ms),\n      \"expires_at\", is_set(A.timeout_at) and A.timeout_at or \"\")\n    redis.call(\"ZREM\", K.pending_wp_expiry_key, waitpoint_id)\n\n    -- CRITICAL: Evaluate buffered signals that arrived while waitpoint was pending.\n    -- If early signals already satisfy the resume condition, skip suspension entirely.\n    local buffered = redis.call(\"XRANGE\", K.waitpoint_signals, \"-\", \"+\")\n    if #buffered > 0 then\n      local wp_cond = initialize_condition(A.resume_condition_json)\n      for _, entry in ipairs(buffered) do\n        local fields = entry[2]\n        local sig_name = extract_field(fields, \"signal_name\")\n        local sig_id = extract_field(fields, \"signal_id\")\n        evaluate_signal_against_condition(wp_cond, sig_name, sig_id)\n      end\n      if is_condition_satisfied(wp_cond) then\n        -- Resume condition already met by buffered signals \u{2014} skip suspension.\n        redis.call(\"HSET\", K.waitpoint_hash,\n          \"state\", \"closed\", \"satisfied_at\", tostring(now_ms),\n          \"closed_at\", tostring(now_ms), \"close_reason\", \"resumed\")\n        write_condition_hash(K.wp_condition, wp_cond, now_ms)\n        -- Do NOT release lease, do NOT change execution state.\n        return ok_already_satisfied(A.suspension_id, waitpoint_id, waitpoint_key, waitpoint_token)\n      end\n      -- Condition not yet satisfied \u{2014} proceed with suspension.\n      -- Write partial condition state (some matchers may be satisfied).\n      write_condition_hash(K.wp_condition, wp_cond, now_ms)\n    else\n      -- No buffered signals \u{2014} init condition from scratch\n      local wp_cond = initialize_condition(A.resume_condition_json)\n      write_condition_hash(K.wp_condition, wp_cond, now_ms)\n    end\n  else\n    -- Create new waitpoint \u{2014} mint HMAC token bound to (waitpoint_id,\n    -- waitpoint_key, created_at). The created_at written here is what the\n    -- signal-delivery path reads back for HMAC validation.\n    local token, token_err = mint_waitpoint_token(\n      K.hmac_secrets, waitpoint_id, waitpoint_key, now_ms)\n    if not token then return err(token_err) end\n    waitpoint_token = token\n\n    redis.call(\"HSET\", K.waitpoint_hash,\n      \"waitpoint_id\", waitpoint_id,\n      \"execution_id\", A.execution_id,\n      \"attempt_index\", A.attempt_index,\n      \"suspension_id\", A.suspension_id,\n      \"waitpoint_key\", waitpoint_key,\n      \"waitpoint_token\", waitpoint_token,\n      \"state\", \"active\",\n      \"created_at\", tostring(now_ms),\n      \"activated_at\", tostring(now_ms),\n      \"expires_at\", is_set(A.timeout_at) and A.timeout_at or \"\",\n      \"signal_count\", \"0\",\n      \"matched_signal_count\", \"0\",\n      \"last_signal_at\", \"\")\n\n    -- Initialize condition hash from resume condition spec\n    local wp_cond = initialize_condition(A.resume_condition_json)\n    write_condition_hash(K.wp_condition, wp_cond, now_ms)\n  end\n\n  -- 6. Record waitpoint_id in mandatory history set (required for cleanup cascade)\n  redis.call(\"SADD\", K.waitpoint_history, waitpoint_id)\n\n  -- OOM-SAFE WRITE ORDERING (per RFC-010 \u{a7}4.8b):\n  -- exec_core HSET is the \"point of no return\" \u{2014} write it FIRST.\n\n  -- 7. Transition exec_core (FIRST \u{2014} point of no return, all 7 dims)\n  redis.call(\"HSET\", K.core_key,\n    \"lifecycle_phase\", \"suspended\",\n    \"ownership_state\", \"unowned\",\n    \"eligibility_state\", \"not_applicable\",\n    \"blocking_reason\", map_reason_to_blocking(A.reason_code),\n    \"blocking_detail\", \"suspended: waitpoint \" .. waitpoint_id .. \" awaiting \" .. A.reason_code,\n    \"terminal_outcome\", \"none\",\n    \"attempt_state\", \"attempt_interrupted\",\n    \"public_state\", \"suspended\",\n    \"current_lease_id\", \"\",\n    \"current_worker_id\", \"\",\n    \"current_worker_instance_id\", \"\",\n    \"lease_expires_at\", \"\",\n    \"lease_last_renewed_at\", \"\",\n    \"lease_renewal_deadline\", \"\",\n    \"current_suspension_id\", A.suspension_id,\n    \"current_waitpoint_id\", waitpoint_id,\n    \"last_transition_at\", tostring(now_ms),\n    \"last_mutation_at\", tostring(now_ms))\n\n  -- 8. Pause the attempt: started -> suspended (RFC-002 suspend_attempt)\n  redis.call(\"HSET\", K.attempt_record,\n    \"attempt_state\", \"suspended\",\n    \"suspended_at\", tostring(now_ms),\n    \"suspension_id\", A.suspension_id)\n\n  -- 9. Release lease + update indexes\n  redis.call(\"DEL\", K.lease_current_key)\n  redis.call(\"ZREM\", K.lease_expiry_key, A.execution_id)\n  redis.call(\"SREM\", K.worker_leases_key, A.execution_id)\n  redis.call(\"ZREM\", K.active_index_key, A.execution_id)\n  redis.call(\"ZREM\", K.attempt_timeout_key, A.execution_id)\n  redis.call(\"XADD\", K.lease_history_key, \"MAXLEN\", \"~\", A.lease_history_maxlen, \"*\",\n    \"event\", \"released\",\n    \"lease_id\", A.lease_id,\n    \"lease_epoch\", A.lease_epoch,\n    \"attempt_index\", A.attempt_index,\n    \"attempt_id\", core.current_attempt_id or \"\",\n    \"reason\", \"suspend\",\n    \"ts\", tostring(now_ms))\n\n  -- 10. Create suspension record\n  redis.call(\"HSET\", K.suspension_current,\n    \"suspension_id\", A.suspension_id,\n    \"execution_id\", A.execution_id,\n    \"attempt_index\", A.attempt_index,\n    \"waitpoint_id\", waitpoint_id,\n    \"waitpoint_key\", waitpoint_key,\n    \"reason_code\", A.reason_code,\n    \"requested_by\", A.requested_by,\n    \"created_at\", tostring(now_ms),\n    \"timeout_at\", A.timeout_at,\n    \"timeout_behavior\", A.timeout_behavior,\n    \"resume_condition_json\", A.resume_condition_json,\n    \"resume_policy_json\", A.resume_policy_json,\n    \"continuation_metadata_pointer\", A.continuation_metadata_ptr,\n    \"buffered_signal_summary_json\", initial_signal_summary_json(),\n    \"last_signal_at\", \"\",\n    \"satisfied_at\", \"\",\n    \"closed_at\", \"\",\n    \"close_reason\", \"\")\n\n  -- 11. Add to per-lane suspended index + suspension timeout\n  -- Score: timeout_at if set, otherwise MAX for \"no timeout\" ordering\n  redis.call(\"ZADD\", K.suspended_zset,\n    is_set(A.timeout_at) and tonumber(A.timeout_at) or 9999999999999,\n    A.execution_id)\n\n  if is_set(A.timeout_at) then\n    redis.call(\"ZADD\", K.suspension_timeout_key, tonumber(A.timeout_at), A.execution_id)\n  end\n\n  return ok(A.suspension_id, waitpoint_id, waitpoint_key, waitpoint_token)\nend)\n\n---------------------------------------------------------------------------\n-- #14  ff_resume_execution\n--\n-- Transition suspended \u{2192} runnable. Called after signal satisfies condition\n-- or by operator override. Closes suspension + waitpoint, updates indexes.\n--\n-- KEYS (8): exec_core, suspension_current, waitpoint_hash,\n--           waitpoint_signals, suspension_timeout_zset,\n--           eligible_zset, delayed_zset, suspended_zset\n-- ARGV (3): execution_id, trigger_type, resume_delay_ms\n---------------------------------------------------------------------------\nredis.register_function(\'ff_resume_execution\', function(keys, args)\n  local K = {\n    core_key               = keys[1],\n    suspension_current     = keys[2],\n    waitpoint_hash         = keys[3],\n    waitpoint_signals      = keys[4],\n    suspension_timeout_key = keys[5],\n    eligible_zset          = keys[6],\n    delayed_zset           = keys[7],\n    suspended_zset         = keys[8],\n  }\n\n  local A = {\n    execution_id   = args[1],\n    trigger_type   = args[2] or \"signal\",     -- \"signal\", \"operator\", \"auto_resume\"\n    resume_delay_ms = tonumber(args[3] or \"0\"),\n  }\n\n  local t = redis.call(\"TIME\")\n  local now_ms = tonumber(t[1]) * 1000 + math.floor(tonumber(t[2]) / 1000)\n\n  -- 1. Read + validate execution is suspended\n  local raw = redis.call(\"HGETALL\", K.core_key)\n  if #raw == 0 then return err(\"execution_not_found\") end\n  local core = hgetall_to_table(raw)\n\n  if core.lifecycle_phase ~= \"suspended\" then\n    return err(\"execution_not_suspended\")\n  end\n\n  -- 2. Validate active suspension\n  local susp_raw = redis.call(\"HGETALL\", K.suspension_current)\n  local susp_err, susp = assert_active_suspension(susp_raw)\n  if susp_err then return susp_err end\n\n  -- 3. Compute eligibility based on resume_delay_ms\n  local eligibility_state = \"eligible_now\"\n  local blocking_reason = \"waiting_for_worker\"\n  local blocking_detail = \"\"\n  local public_state = \"waiting\"\n\n  if A.resume_delay_ms > 0 then\n    eligibility_state = \"not_eligible_until_time\"\n    blocking_reason = \"waiting_for_resume_delay\"\n    blocking_detail = \"resume delay \" .. tostring(A.resume_delay_ms) .. \"ms\"\n    public_state = \"delayed\"\n  end\n\n  -- OOM-SAFE WRITE ORDERING: exec_core FIRST (point of no return)\n\n  -- 4. Transition exec_core (FIRST \u{2014} all 7 dims)\n  redis.call(\"HSET\", K.core_key,\n    \"lifecycle_phase\", \"runnable\",\n    \"ownership_state\", \"unowned\",\n    \"eligibility_state\", eligibility_state,\n    \"blocking_reason\", blocking_reason,\n    \"blocking_detail\", blocking_detail,\n    \"terminal_outcome\", \"none\",\n    \"attempt_state\", \"attempt_interrupted\",\n    \"public_state\", public_state,\n    \"current_suspension_id\", \"\",\n    \"current_waitpoint_id\", \"\",\n    \"last_transition_at\", tostring(now_ms),\n    \"last_mutation_at\", tostring(now_ms))\n\n  -- 5. Update scheduling indexes\n  redis.call(\"ZREM\", K.suspended_zset, A.execution_id)\n  if A.resume_delay_ms > 0 then\n    redis.call(\"ZADD\", K.delayed_zset,\n      now_ms + A.resume_delay_ms, A.execution_id)\n  else\n    -- ZADD eligible with composite priority score\n    local priority = tonumber(core.priority or \"0\")\n    local created_at = tonumber(core.created_at or \"0\")\n    local score = 0 - (priority * 1000000000000) + created_at\n    redis.call(\"ZADD\", K.eligible_zset, score, A.execution_id)\n  end\n\n  -- 6. Close sub-objects (safe to lose on OOM \u{2014} stale but not zombie)\n  -- Close waitpoint\n  redis.call(\"HSET\", K.waitpoint_hash,\n    \"state\", \"closed\",\n    \"satisfied_at\", tostring(now_ms),\n    \"closed_at\", tostring(now_ms),\n    \"close_reason\", \"resumed\")\n\n  -- Close suspension\n  redis.call(\"HSET\", K.suspension_current,\n    \"satisfied_at\", tostring(now_ms),\n    \"closed_at\", tostring(now_ms),\n    \"close_reason\", \"resumed\")\n\n  -- Remove from suspension timeout index\n  redis.call(\"ZREM\", K.suspension_timeout_key, A.execution_id)\n\n  return ok(public_state)\nend)\n\n---------------------------------------------------------------------------\n-- #15  ff_create_pending_waitpoint\n--\n-- Pre-create a waitpoint before suspension commits. The waitpoint is\n-- externally addressable by waitpoint_key so early signals can be buffered.\n-- Requires the caller to still hold the active lease.\n-- Mints the waitpoint HMAC token up front so early signals targeting the\n-- pending waitpoint can be authenticated via ff_buffer_signal_for_pending_waitpoint.\n--\n-- KEYS (4): exec_core, waitpoint_hash, pending_wp_expiry_zset, hmac_secrets\n-- ARGV (5): execution_id, attempt_index, waitpoint_id, waitpoint_key,\n--           expires_at\n---------------------------------------------------------------------------\nredis.register_function(\'ff_create_pending_waitpoint\', function(keys, args)\n  local K = {\n    core_key              = keys[1],\n    waitpoint_hash        = keys[2],\n    pending_wp_expiry_key = keys[3],\n    hmac_secrets          = keys[4],\n  }\n\n  local A = {\n    execution_id  = args[1],\n    attempt_index = args[2],\n    waitpoint_id  = args[3],\n    waitpoint_key = args[4],\n    expires_at    = args[5],\n  }\n\n  local t = redis.call(\"TIME\")\n  local now_ms = tonumber(t[1]) * 1000 + math.floor(tonumber(t[2]) / 1000)\n\n  -- 1. Validate execution is active with a lease\n  local raw = redis.call(\"HGETALL\", K.core_key)\n  if #raw == 0 then return err(\"execution_not_found\") end\n  local core = hgetall_to_table(raw)\n\n  if core.lifecycle_phase ~= \"active\" then\n    return err(\"execution_not_active\",\n      core.terminal_outcome or \"\", core.current_lease_epoch or \"\")\n  end\n  if core.ownership_state ~= \"leased\" then\n    return err(\"no_active_lease\")\n  end\n  -- Validate attempt binding\n  if tostring(core.current_attempt_index) ~= A.attempt_index then\n    return err(\"stale_lease\")\n  end\n\n  -- 2. Guard: waitpoint already exists\n  if redis.call(\"EXISTS\", K.waitpoint_hash) == 1 then\n    local existing_state = redis.call(\"HGET\", K.waitpoint_hash, \"state\")\n    if existing_state == \"pending\" or existing_state == \"active\" then\n      return err(\"waitpoint_already_exists\")\n    end\n    -- Old closed/expired waitpoint \u{2014} safe to overwrite\n  end\n\n  -- 3. Mint HMAC token bound to (waitpoint_id, waitpoint_key, now_ms).\n  -- The suspension activation path will reuse this token unchanged.\n  local waitpoint_token, token_err = mint_waitpoint_token(\n    K.hmac_secrets, A.waitpoint_id, A.waitpoint_key, now_ms)\n  if not waitpoint_token then return err(token_err) end\n\n  -- 4. Create pending waitpoint\n  redis.call(\"HSET\", K.waitpoint_hash,\n    \"waitpoint_id\", A.waitpoint_id,\n    \"execution_id\", A.execution_id,\n    \"attempt_index\", A.attempt_index,\n    \"suspension_id\", \"\",\n    \"waitpoint_key\", A.waitpoint_key,\n    \"waitpoint_token\", waitpoint_token,\n    \"state\", \"pending\",\n    \"created_at\", tostring(now_ms),\n    \"activated_at\", \"\",\n    \"satisfied_at\", \"\",\n    \"closed_at\", \"\",\n    \"expires_at\", A.expires_at,\n    \"close_reason\", \"\",\n    \"signal_count\", \"0\",\n    \"matched_signal_count\", \"0\",\n    \"last_signal_at\", \"\")\n\n  -- 5. Add to pending waitpoint expiry index\n  redis.call(\"ZADD\", K.pending_wp_expiry_key,\n    tonumber(A.expires_at), A.waitpoint_id)\n\n  return ok(A.waitpoint_id, A.waitpoint_key, waitpoint_token)\nend)\n\n---------------------------------------------------------------------------\n-- #16/#19  ff_expire_suspension  (Overlap group D \u{2014} one script)\n--\n-- Apply timeout behavior when suspension timeout fires.\n-- Re-validates that execution is still suspended and timeout is due.\n-- Handles all 5 timeout behaviors:\n--   fail    \u{2192} terminal(failed)\n--   cancel  \u{2192} terminal(cancelled)\n--   expire  \u{2192} terminal(expired)\n--   auto_resume \u{2192} close + resume to runnable\n--   escalate \u{2192} mutate suspension to operator-review\n--\n-- KEYS (12): exec_core, suspension_current, waitpoint_hash, wp_condition,\n--            attempt_hash, stream_meta, suspension_timeout_zset,\n--            suspended_zset, terminal_zset, eligible_zset, delayed_zset,\n--            lease_history\n-- ARGV (1): execution_id\n---------------------------------------------------------------------------\nredis.register_function(\'ff_expire_suspension\', function(keys, args)\n  local K = {\n    core_key               = keys[1],\n    suspension_current     = keys[2],\n    waitpoint_hash         = keys[3],\n    wp_condition           = keys[4],\n    attempt_hash           = keys[5],\n    stream_meta            = keys[6],\n    suspension_timeout_key = keys[7],\n    suspended_zset         = keys[8],\n    terminal_key           = keys[9],\n    eligible_zset          = keys[10],\n    delayed_zset           = keys[11],\n    lease_history_key      = keys[12],\n  }\n\n  local A = {\n    execution_id = args[1],\n  }\n\n  local t = redis.call(\"TIME\")\n  local now_ms = tonumber(t[1]) * 1000 + math.floor(tonumber(t[2]) / 1000)\n\n  -- 1. Read + validate execution is still suspended\n  local raw = redis.call(\"HGETALL\", K.core_key)\n  if #raw == 0 then\n    redis.call(\"ZREM\", K.suspension_timeout_key, A.execution_id)\n    return ok(\"not_found_cleaned\")\n  end\n  local core = hgetall_to_table(raw)\n\n  if core.lifecycle_phase ~= \"suspended\" then\n    redis.call(\"ZREM\", K.suspension_timeout_key, A.execution_id)\n    return ok(\"not_suspended_cleaned\")\n  end\n\n  -- 2. Read suspension and validate it\'s still open\n  local susp_raw = redis.call(\"HGETALL\", K.suspension_current)\n  local susp_err, susp = assert_active_suspension(susp_raw)\n  if susp_err then\n    redis.call(\"ZREM\", K.suspension_timeout_key, A.execution_id)\n    return ok(\"no_active_suspension_cleaned\")\n  end\n\n  -- 3. Check timeout is actually due\n  local timeout_at = tonumber(susp.timeout_at or \"0\")\n  if timeout_at == 0 or timeout_at > now_ms then\n    return ok(\"not_yet_due\")\n  end\n\n  -- 4. Read timeout behavior\n  local behavior = susp.timeout_behavior or \"fail\"\n\n  -- 5. Apply behavior\n  if behavior == \"auto_resume\" or behavior == \"auto_resume_with_timeout_signal\" then\n    -- auto_resume: close suspension + resume to runnable (like ff_resume_execution)\n\n    -- OOM-SAFE: exec_core FIRST\n    local priority = tonumber(core.priority or \"0\")\n    local created_at = tonumber(core.created_at or \"0\")\n    local score = 0 - (priority * 1000000000000) + created_at\n\n    redis.call(\"HSET\", K.core_key,\n      \"lifecycle_phase\", \"runnable\",\n      \"ownership_state\", \"unowned\",\n      \"eligibility_state\", \"eligible_now\",\n      \"blocking_reason\", \"waiting_for_worker\",\n      \"blocking_detail\", \"\",\n      \"terminal_outcome\", \"none\",\n      \"attempt_state\", \"attempt_interrupted\",\n      \"public_state\", \"waiting\",\n      \"current_suspension_id\", \"\",\n      \"current_waitpoint_id\", \"\",\n      \"last_transition_at\", tostring(now_ms),\n      \"last_mutation_at\", tostring(now_ms))\n\n    -- Update indexes\n    redis.call(\"ZREM\", K.suspended_zset, A.execution_id)\n    redis.call(\"ZADD\", K.eligible_zset, score, A.execution_id)\n\n    -- Close sub-objects\n    redis.call(\"HSET\", K.waitpoint_hash,\n      \"state\", \"closed\",\n      \"closed_at\", tostring(now_ms),\n      \"close_reason\", \"timed_out_auto_resume\")\n    redis.call(\"HSET\", K.wp_condition,\n      \"closed\", \"1\",\n      \"closed_at\", tostring(now_ms),\n      \"closed_reason\", \"timed_out_auto_resume\")\n    redis.call(\"HSET\", K.suspension_current,\n      \"closed_at\", tostring(now_ms),\n      \"close_reason\", \"timed_out_auto_resume\")\n\n    redis.call(\"ZREM\", K.suspension_timeout_key, A.execution_id)\n\n    return ok(\"auto_resume\", \"waiting\")\n\n  elseif behavior == \"escalate\" then\n    -- escalate: mutate suspension to operator-review, keep suspended (ALL 7 dims)\n\n    redis.call(\"HSET\", K.core_key,\n      \"lifecycle_phase\", \"suspended\",                        -- preserve\n      \"ownership_state\", \"unowned\",                          -- preserve\n      \"eligibility_state\", \"not_applicable\",                 -- preserve\n      \"blocking_reason\", \"paused_by_operator\",\n      \"blocking_detail\", \"suspension escalated: timeout at \" .. tostring(timeout_at),\n      \"terminal_outcome\", \"none\",                            -- preserve\n      \"attempt_state\", core.attempt_state or \"attempt_interrupted\", -- preserve\n      \"public_state\", \"suspended\",                           -- preserve\n      \"last_mutation_at\", tostring(now_ms))\n\n    redis.call(\"HSET\", K.suspension_current,\n      \"reason_code\", \"waiting_for_operator_review\",\n      \"timeout_at\", \"\",\n      \"timeout_behavior\", \"\")\n\n    -- Remove from timeout index (no longer has a timeout)\n    redis.call(\"ZREM\", K.suspension_timeout_key, A.execution_id)\n\n    return ok(\"escalate\", \"suspended\")\n\n  else\n    -- Terminal paths: fail, cancel, expire\n\n    local terminal_outcome\n    local public_state_val\n    local close_reason\n\n    if behavior == \"cancel\" then\n      terminal_outcome = \"cancelled\"\n      public_state_val = \"cancelled\"\n      close_reason = \"timed_out_cancel\"\n    elseif behavior == \"expire\" then\n      terminal_outcome = \"expired\"\n      public_state_val = \"expired\"\n      close_reason = \"timed_out_expire\"\n    else\n      -- Default: fail\n      terminal_outcome = \"failed\"\n      public_state_val = \"failed\"\n      close_reason = \"timed_out_fail\"\n    end\n\n    -- OOM-SAFE: exec_core FIRST (\u{a7}4.8b Rule 2)\n    redis.call(\"HSET\", K.core_key,\n      \"lifecycle_phase\", \"terminal\",\n      \"ownership_state\", \"unowned\",\n      \"eligibility_state\", \"not_applicable\",\n      \"blocking_reason\", \"none\",\n      \"blocking_detail\", \"\",\n      \"terminal_outcome\", terminal_outcome,\n      \"attempt_state\", \"attempt_terminal\",\n      \"public_state\", public_state_val,\n      \"failure_reason\", \"suspension_timeout:\" .. behavior,\n      \"completed_at\", tostring(now_ms),\n      \"current_suspension_id\", \"\",\n      \"current_waitpoint_id\", \"\",\n      \"last_transition_at\", tostring(now_ms),\n      \"last_mutation_at\", tostring(now_ms))\n\n    -- End attempt: suspended \u{2192} ended_failure/ended_cancelled\n    if is_set(core.current_attempt_index) then\n      local att_end_state = \"ended_failure\"\n      if behavior == \"cancel\" then\n        att_end_state = \"ended_cancelled\"\n      end\n      redis.call(\"HSET\", K.attempt_hash,\n        \"attempt_state\", att_end_state,\n        \"ended_at\", tostring(now_ms),\n        \"failure_reason\", \"suspension_timeout:\" .. behavior,\n        \"suspended_at\", \"\",\n        \"suspension_id\", \"\")\n    end\n\n    -- Close stream if exists\n    if redis.call(\"EXISTS\", K.stream_meta) == 1 then\n      redis.call(\"HSET\", K.stream_meta,\n        \"closed_at\", tostring(now_ms),\n        \"closed_reason\", \"suspension_timeout\")\n    end\n\n    -- Close sub-objects\n    redis.call(\"HSET\", K.waitpoint_hash,\n      \"state\", \"closed\",\n      \"closed_at\", tostring(now_ms),\n      \"close_reason\", close_reason)\n    redis.call(\"HSET\", K.wp_condition,\n      \"closed\", \"1\",\n      \"closed_at\", tostring(now_ms),\n      \"closed_reason\", close_reason)\n    redis.call(\"HSET\", K.suspension_current,\n      \"closed_at\", tostring(now_ms),\n      \"close_reason\", close_reason)\n\n    -- Remove from suspension indexes, add to terminal\n    redis.call(\"ZREM\", K.suspension_timeout_key, A.execution_id)\n    redis.call(\"ZREM\", K.suspended_zset, A.execution_id)\n    redis.call(\"ZADD\", K.terminal_key, now_ms, A.execution_id)\n\n    return ok(behavior, public_state_val)\n  end\nend)\n\n---------------------------------------------------------------------------\n-- #36  ff_close_waitpoint\n--\n-- Proactive close of pending or active waitpoint. Used by workers that\n-- created a pending waitpoint but decided not to suspend.\n--\n-- KEYS (3): exec_core, waitpoint_hash, pending_wp_expiry_zset\n-- ARGV (2): waitpoint_id, reason\n---------------------------------------------------------------------------\nredis.register_function(\'ff_close_waitpoint\', function(keys, args)\n  local K = {\n    core_key              = keys[1],\n    waitpoint_hash        = keys[2],\n    pending_wp_expiry_key = keys[3],\n  }\n\n  local A = {\n    waitpoint_id = args[1],\n    reason       = args[2] or \"proactive_close\",\n  }\n\n  local t = redis.call(\"TIME\")\n  local now_ms = tonumber(t[1]) * 1000 + math.floor(tonumber(t[2]) / 1000)\n\n  -- 1. Read waitpoint\n  local wp_raw = redis.call(\"HGETALL\", K.waitpoint_hash)\n  if #wp_raw == 0 then\n    return err(\"waitpoint_not_found\")\n  end\n  local wp = hgetall_to_table(wp_raw)\n\n  -- 2. Validate state is pending or active\n  if wp.state ~= \"pending\" and wp.state ~= \"active\" then\n    if wp.state == \"closed\" or wp.state == \"expired\" then\n      return ok(\"already_closed\")\n    end\n    return err(\"waitpoint_not_open\")\n  end\n\n  -- 3. Close the waitpoint\n  redis.call(\"HSET\", K.waitpoint_hash,\n    \"state\", \"closed\",\n    \"closed_at\", tostring(now_ms),\n    \"close_reason\", A.reason)\n\n  -- 4. Remove from pending expiry index (no-op if not pending)\n  redis.call(\"ZREM\", K.pending_wp_expiry_key, A.waitpoint_id)\n\n  return ok()\nend)\n\n---------------------------------------------------------------------------\n-- ff_rotate_waitpoint_hmac_secret\n--\n-- Install a new waitpoint HMAC signing kid on a single partition. The\n-- server-side admin endpoint (ff-server POST /v1/admin/rotate-waitpoint-secret)\n-- delegates to this FCALL per partition. Direct-Valkey consumers (e.g.\n-- cairn-rs, issue #49) invoke it themselves across every partition.\n--\n-- FCALL atomicity is per-shard and per-call; previous implementations used\n-- a SETNX lock + read-modify-write from Rust. Here the script IS the\n-- atomicity boundary, so no lock is needed.\n--\n-- KEYS (1): hmac_secrets  (ff:sec:{p:N}:waitpoint_hmac)\n-- ARGV (3): new_kid, new_secret_hex, grace_ms\n--\n-- `now_ms` is derived server-side from `redis.call(\"TIME\")` to match\n-- the rest of the library (lua/flow.lua, validate_waitpoint_token), so\n-- GC and validation agree on \"now\". `grace_ms` is a duration, not a\n-- clock value, so taking it from ARGV is safe \u{2014} operators set it via\n-- FF_WAITPOINT_HMAC_GRACE_MS (ff-server) or pass their own value.\n--\n-- Outcomes:\n--   ok(\"rotated\", previous_kid_or_empty, new_kid, gc_count)\n--   ok(\"noop\",    kid)                           -- exact replay (same kid + secret)\n--   err(\"rotation_conflict\", kid)                -- same kid, different secret\n--   err(\"invalid_kid\")                           -- empty or contains \':\'\n--   err(\"invalid_secret_hex\")                    -- empty / odd length / non-hex\n--   err(\"invalid_grace_ms\")                      -- not a non-negative integer\n--\n-- Authoritative implementation of waitpoint HMAC rotation: idempotent\n-- replay, torn-write repair, orphan GC across expired kids. INVARIANT:\n-- expires_at:<new_kid> is never written (current_kid has no expiry).\n-- ff-server\'s admin endpoint delegates to this FCALL \u{2014} single source\n-- of truth lives here, not in Rust.\n---------------------------------------------------------------------------\nredis.register_function(\'ff_rotate_waitpoint_hmac_secret\', function(keys, args)\n  local hmac_key       = keys[1]\n  local new_kid        = args[1]\n  local new_secret_hex = args[2]\n  local grace_ms_s     = args[3]\n\n  -- Combined validation (feedback: single condition is more readable).\n  if type(new_kid) ~= \"string\" or new_kid == \"\" or new_kid:find(\":\", 1, true) then\n    return err(\"invalid_kid\")\n  end\n  if type(new_secret_hex) ~= \"string\"\n      or new_secret_hex == \"\"\n      or #new_secret_hex % 2 ~= 0\n      or new_secret_hex:find(\"[^0-9a-fA-F]\") then\n    return err(\"invalid_secret_hex\")\n  end\n\n  -- grace_ms must be a finite non-negative integer. The math.floor check\n  -- rejects decimals but NOT infinities (math.floor(math.huge) == math.huge\n  -- which would stamp \"inf\" into expires_at:*). Cap at 2^53-1 so the\n  -- stored value stays within the IEEE-754 double-precision integer range\n  -- AND within i64, keeping the Rust parser happy. 2^53-1 ms is ~285 years,\n  -- far beyond any operational grace window.\n  local grace_ms = tonumber(grace_ms_s)\n  if not grace_ms\n      or grace_ms ~= grace_ms -- NaN\n      or grace_ms < 0\n      or grace_ms > 9007199254740991 -- 2^53 - 1\n      or grace_ms ~= math.floor(grace_ms) then\n    return err(\"invalid_grace_ms\")\n  end\n\n  -- Server-side time via redis.call(\"TIME\"); never trust caller-supplied\n  -- timestamps for expiry decisions (consistency with flow.lua + helpers.lua\n  -- and with validate_waitpoint_token).\n  local now_ms          = server_time_ms()\n  local prev_expires_at = now_ms + grace_ms\n\n  local current_kid = redis.call(\"HGET\", hmac_key, \"current_kid\")\n  if current_kid == false then current_kid = nil end\n\n  -- Idempotency branch: same kid already installed.\n  if current_kid == new_kid then\n    local stored = redis.call(\"HGET\", hmac_key, \"secret:\" .. new_kid)\n    if stored == false then stored = nil end\n    if stored == new_secret_hex then\n      return ok(\"noop\", new_kid)\n    elseif stored then\n      return err(\"rotation_conflict\", new_kid)\n    else\n      -- Torn-write repair: current_kid=new_kid but secret:<new_kid> missing.\n      redis.call(\"HSET\", hmac_key, \"secret:\" .. new_kid, new_secret_hex)\n      return ok(\"noop\", new_kid)\n    end\n  end\n\n  -- Orphan GC: HGETALL once, collect kids whose expires_at has passed,\n  -- then a single HDEL. One entry per distinct kid ever installed plus\n  -- a handful of scalars; bounded in practice.\n  local raw = redis.call(\"HGETALL\", hmac_key)\n  local expired_fields = {}\n  local gc_count = 0\n  for i = 1, #raw, 2 do\n    local field = raw[i]\n    local value = raw[i + 1]\n    if field:sub(1, 11) == \"expires_at:\" then\n      local kid = field:sub(12)\n      local exp = tonumber(value)\n      -- GC strictness MUST match validate_waitpoint_token\'s `exp < now_ms`\n      -- (lua/helpers.lua). Reaping on `exp <= now_ms` would delete a kid\n      -- that the validator still considers in-grace at the boundary\n      -- exp == now_ms, causing tokens that should validate to fail.\n      if not exp or exp <= 0 or exp < now_ms then\n        expired_fields[#expired_fields + 1] = \"expires_at:\" .. kid\n        expired_fields[#expired_fields + 1] = \"secret:\" .. kid\n        gc_count = gc_count + 1\n      end\n    end\n  end\n  if #expired_fields > 0 then\n    redis.call(\"HDEL\", hmac_key, unpack(expired_fields))\n  end\n\n  -- Promote current \u{2192} previous. INVARIANT: expires_at:<new_kid> is NEVER\n  -- written \u{2014} current_kid has no expiry entry.\n  local prev_expires_str = tostring(prev_expires_at)\n  if current_kid then\n    redis.call(\"HSET\", hmac_key,\n      \"previous_kid\", current_kid,\n      \"previous_expires_at\", prev_expires_str,\n      \"expires_at:\" .. current_kid, prev_expires_str,\n      \"current_kid\", new_kid,\n      \"secret:\" .. new_kid, new_secret_hex)\n  else\n    redis.call(\"HSET\", hmac_key,\n      \"current_kid\", new_kid,\n      \"secret:\" .. new_kid, new_secret_hex)\n  end\n\n  return ok(\"rotated\", current_kid or \"\", new_kid, tostring(gc_count))\nend)\n\n---------------------------------------------------------------------------\n-- ff_list_waitpoint_hmac_kids\n--\n-- Read-back for the waitpoint HMAC keystore. Insulates consumers (cairn\n-- admin UI, ff-server audit surface) from the hash-field naming so future\n-- layout changes don\'t break them.\n--\n-- KEYS (1): hmac_secrets  (ff:sec:{p:N}:waitpoint_hmac)\n-- ARGV: none\n--\n-- Returns ok(current_kid_or_empty, n, kid1, exp1_ms, kid2, exp2_ms, ...)\n-- \"verifying\" kids = those with a FUTURE expires_at:<kid> entry. Kids\n-- whose grace has already elapsed are NOT reported here \u{2014} the contract\n-- promises kids that still validate tokens, so listing expired kids\n-- would mislead operators. Expired entries are swept by orphan GC on\n-- the next rotation. current_kid is excluded (it never has an expiry).\n-- Uninitialized \u{2192} ok(\"\", 0).\n---------------------------------------------------------------------------\nredis.register_function(\'ff_list_waitpoint_hmac_kids\', function(keys, args)\n  local hmac_key = keys[1]\n  local raw = redis.call(\"HGETALL\", hmac_key)\n  local now_ms = server_time_ms()\n\n  local current_kid = \"\"\n  local pairs_out = {}\n  local n = 0\n  for i = 1, #raw, 2 do\n    local field = raw[i]\n    local value = raw[i + 1]\n    if field == \"current_kid\" then\n      current_kid = value\n    elseif field:sub(1, 11) == \"expires_at:\" then\n      local kid = field:sub(12)\n      local exp = tonumber(value)\n      -- Match validator\'s `exp < now_ms` rejection rule so a kid listed\n      -- as verifying really does still validate tokens at call time.\n      if exp and exp > 0 and exp >= now_ms then\n        pairs_out[#pairs_out + 1] = kid\n        pairs_out[#pairs_out + 1] = value\n        n = n + 1\n      end\n    end\n  end\n\n  return ok(current_kid, tostring(n), unpack(pairs_out))\nend)\n\n\n-- source: lua/signal.lua\n-- FlowFabric signal delivery and resume-claim functions\n-- Reference: RFC-005 (Signal), RFC-001 (Execution), RFC-010 \u{a7}4.1 (#17, #18, #2)\n--\n-- Depends on helpers: ok, err, ok_duplicate, hgetall_to_table, is_set,\n--   initialize_condition, write_condition_hash, evaluate_signal_against_condition,\n--   is_condition_satisfied, extract_field\n\n---------------------------------------------------------------------------\n-- #17  ff_deliver_signal\n--\n-- Atomic signal delivery: validate target, check idempotency, record\n-- signal, evaluate resume condition, optionally close waitpoint +\n-- suspension + transition suspended -> runnable.\n--\n-- KEYS (14): exec_core, wp_condition, wp_signals_stream,\n--            exec_signals_zset, signal_hash, signal_payload,\n--            idem_key, waitpoint_hash, suspension_current,\n--            eligible_zset, suspended_zset, delayed_zset,\n--            suspension_timeout_zset, hmac_secrets\n-- ARGV (18): signal_id, execution_id, waitpoint_id, signal_name,\n--            signal_category, source_type, source_identity,\n--            payload, payload_encoding, idempotency_key,\n--            correlation_id, target_scope, created_at,\n--            dedup_ttl_ms, resume_delay_ms, signal_maxlen,\n--            max_signals_per_execution, waitpoint_token\n---------------------------------------------------------------------------\nredis.register_function(\'ff_deliver_signal\', function(keys, args)\n  local K = {\n    core_key              = keys[1],\n    wp_condition          = keys[2],\n    wp_signals_stream     = keys[3],\n    exec_signals_zset     = keys[4],\n    signal_hash           = keys[5],\n    signal_payload        = keys[6],\n    idem_key              = keys[7],\n    waitpoint_hash        = keys[8],\n    suspension_current    = keys[9],\n    eligible_zset         = keys[10],\n    suspended_zset        = keys[11],\n    delayed_zset          = keys[12],\n    suspension_timeout_zset = keys[13],\n    hmac_secrets          = keys[14],\n  }\n\n  local A = {\n    signal_id        = args[1],\n    execution_id     = args[2],\n    waitpoint_id     = args[3],\n    signal_name      = args[4],\n    signal_category  = args[5],\n    source_type      = args[6],\n    source_identity  = args[7],\n    payload          = args[8] or \"\",\n    payload_encoding = args[9] or \"json\",\n    idempotency_key  = args[10] or \"\",\n    correlation_id   = args[11] or \"\",\n    target_scope     = args[12] or \"waitpoint\",\n    created_at       = args[13] or \"\",\n    dedup_ttl_ms     = tonumber(args[14] or \"86400000\"),\n    resume_delay_ms  = tonumber(args[15] or \"0\"),\n    signal_maxlen    = tonumber(args[16] or \"1000\"),\n    max_signals      = tonumber(args[17] or \"10000\"),\n    waitpoint_token  = args[18] or \"\",\n  }\n\n  local t = redis.call(\"TIME\")\n  local now_ms = tonumber(t[1]) * 1000 + math.floor(tonumber(t[2]) / 1000)\n\n  -- 1. Validate execution exists\n  local raw = redis.call(\"HGETALL\", K.core_key)\n  if #raw == 0 then\n    return err(\"execution_not_found\")\n  end\n  local core = hgetall_to_table(raw)\n\n  -- 2. Validate HMAC token FIRST (RFC-004 \u{a7}Waitpoint Security).\n  --\n  -- Order matters: lifecycle / waitpoint-state checks below would otherwise\n  -- form a state oracle \u{2014} an attacker presenting ANY token (including an\n  -- invalid one) for an arbitrary (execution_id, waitpoint_id) pair could\n  -- distinguish \"execution is terminal\" vs \"waitpoint is pending\" vs\n  -- \"waitpoint is closed\" by the specific error code returned, without\n  -- having to produce a valid HMAC. Auth-first closes that oracle.\n  --\n  -- Missing-waitpoint is collapsed into `invalid_token` for the same\n  -- reason: an unauthenticated caller must not be able to probe which\n  -- (execution, waitpoint) tuples exist.\n  local wp_for_auth_raw = redis.call(\"HGETALL\", K.waitpoint_hash)\n  if #wp_for_auth_raw == 0 then\n    return err(\"invalid_token\")\n  end\n  local wp_for_auth = hgetall_to_table(wp_for_auth_raw)\n  if not wp_for_auth.created_at then\n    return err(\"invalid_token\")\n  end\n  local token_err = validate_waitpoint_token(\n    K.hmac_secrets, A.waitpoint_token,\n    A.waitpoint_id, wp_for_auth.waitpoint_key or \"\",\n    tonumber(wp_for_auth.created_at) or 0, now_ms)\n  if token_err then\n    -- Operator-visible counter (RFC-004 \u{a7}Waitpoint Security observability).\n    -- Single scalar HSET on exec_core \u{2014} bounded, amortized-free. Gives\n    -- operators a \"last time this execution saw an auth failure\" field to\n    -- correlate with key-rotation drift, client bugs, or attack traffic\n    -- without needing to tail Lua slowlog or FCALL error logs.\n    redis.call(\"HSET\", K.core_key, \"last_hmac_validation_failed_at\", tostring(now_ms))\n    return err(token_err)\n  end\n\n  -- 3. Validate execution is in a signalable state (post-auth).\n  local lp = core.lifecycle_phase\n  if lp == \"terminal\" then\n    return err(\"target_not_signalable\")\n  end\n\n  if lp == \"active\" or lp == \"runnable\" or lp == \"submitted\" then\n    -- Not suspended. wp_for_auth was just loaded above; reuse it.\n    if wp_for_auth.state == \"pending\" then\n      return err(\"waitpoint_pending_use_buffer_script\")\n    end\n    if wp_for_auth.state ~= \"active\" then\n      return err(\"target_not_signalable\")\n    end\n    -- Active waitpoint on non-suspended execution \u{2014} unusual but valid (race window)\n  end\n\n  -- 4. Validate waitpoint condition is open (post-auth).\n  local cond_raw = redis.call(\"HGETALL\", K.wp_condition)\n  if #cond_raw == 0 then\n    return err(\"waitpoint_not_found\")\n  end\n  local wp_cond = hgetall_to_table(cond_raw)\n  if wp_cond.closed == \"1\" then\n    return err(\"waitpoint_closed\")\n  end\n\n  -- 4. Signal count limit (prevents unbounded ZSET growth from webhook storms)\n  if A.max_signals > 0 then\n    local current_count = redis.call(\"ZCARD\", K.exec_signals_zset)\n    if current_count >= A.max_signals then\n      return err(\"signal_limit_exceeded\")\n    end\n  end\n\n  -- 5. Idempotency check\n  -- Guard: (A.dedup_ttl_ms or 0) handles nil from tonumber(\"\") safely.\n  local dedup_ms = A.dedup_ttl_ms or 0\n  if A.idempotency_key ~= \"\" and dedup_ms > 0 then\n    local existing = redis.call(\"GET\", K.idem_key)\n    if existing then\n      return ok_duplicate(existing)\n    end\n    redis.call(\"SET\", K.idem_key, A.signal_id,\n      \"PX\", dedup_ms, \"NX\")\n  end\n\n  -- 6. Record signal hash\n  local created_at = A.created_at ~= \"\" and A.created_at or tostring(now_ms)\n  redis.call(\"HSET\", K.signal_hash,\n    \"signal_id\", A.signal_id,\n    \"target_execution_id\", A.execution_id,\n    \"target_waitpoint_id\", A.waitpoint_id,\n    \"target_scope\", A.target_scope,\n    \"signal_name\", A.signal_name,\n    \"signal_category\", A.signal_category,\n    \"source_type\", A.source_type,\n    \"source_identity\", A.source_identity,\n    \"correlation_id\", A.correlation_id,\n    \"idempotency_key\", A.idempotency_key,\n    \"created_at\", created_at,\n    \"accepted_at\", tostring(now_ms),\n    \"matched_waitpoint_id\", A.waitpoint_id,\n    \"payload_encoding\", A.payload_encoding)\n\n  -- 6b. Store payload separately if present\n  if A.payload ~= \"\" then\n    redis.call(\"SET\", K.signal_payload, A.payload)\n  end\n\n  -- 7. Append to per-waitpoint signal stream + per-execution signal index\n  redis.call(\"XADD\", K.wp_signals_stream, \"MAXLEN\", \"~\",\n    tostring(A.signal_maxlen), \"*\",\n    \"signal_id\", A.signal_id,\n    \"signal_name\", A.signal_name,\n    \"signal_category\", A.signal_category,\n    \"source_type\", A.source_type,\n    \"source_identity\", A.source_identity,\n    \"matched\", \"0\",\n    \"accepted_at\", tostring(now_ms))\n  redis.call(\"ZADD\", K.exec_signals_zset, now_ms, A.signal_id)\n\n  -- 8. Evaluate resume condition\n  local effect = \"appended_to_waitpoint\"\n  local matched = false\n\n  local total = tonumber(wp_cond.total_matchers or \"0\")\n  for i = 0, total - 1 do\n    local sat_key = \"matcher:\" .. i .. \":satisfied\"\n    local name_key = \"matcher:\" .. i .. \":name\"\n    if wp_cond[sat_key] == \"0\" then\n      local matcher_name = wp_cond[name_key] or \"\"\n      if matcher_name == \"\" or matcher_name == A.signal_name then\n        -- Mark matcher as satisfied\n        redis.call(\"HSET\", K.wp_condition,\n          sat_key, \"1\",\n          \"matcher:\" .. i .. \":signal_id\", A.signal_id)\n        matched = true\n        local new_sat = tonumber(wp_cond.satisfied_count or \"0\") + 1\n        redis.call(\"HSET\", K.wp_condition, \"satisfied_count\", tostring(new_sat))\n\n        -- Check if overall condition is satisfied\n        local mode = wp_cond.match_mode or \"any\"\n        local min_count = tonumber(wp_cond.minimum_signal_count or \"1\")\n        local resume = false\n        if mode == \"any\" then\n          resume = (new_sat >= min_count)\n        elseif mode == \"all\" then\n          resume = (new_sat >= total)\n        else\n          -- count(n) mode\n          resume = (new_sat >= min_count)\n        end\n\n        if resume then\n          effect = \"resume_condition_satisfied\"\n\n          -- OOM-SAFE WRITE ORDERING (per RFC-010 \u{a7}4.8b):\n          -- exec_core HSET is the \"point of no return\" \u{2014} write it FIRST.\n          -- If OOM kills after exec_core but before closing sub-objects,\n          -- execution is runnable (correct) with stale suspension/waitpoint\n          -- records (generalized index reconciler catches this).\n\n          -- 9a. Transition execution: suspended -> runnable (WRITE FIRST)\n          -- Resume continues the SAME attempt (no new attempt created).\n          if lp == \"suspended\" then\n            local es, br, bd, ps\n            if A.resume_delay_ms > 0 then\n              es = \"not_eligible_until_time\"\n              br = \"waiting_for_resume_delay\"\n              bd = \"resume delay \" .. A.resume_delay_ms .. \"ms after signal \" .. A.signal_name\n              ps = \"delayed\"\n            else\n              es = \"eligible_now\"\n              br = \"waiting_for_worker\"\n              bd = \"\"\n              ps = \"waiting\"\n            end\n\n            -- ALL 7 state vector dimensions\n            redis.call(\"HSET\", K.core_key,\n              \"lifecycle_phase\", \"runnable\",\n              \"ownership_state\", \"unowned\",\n              \"eligibility_state\", es,\n              \"blocking_reason\", br,\n              \"blocking_detail\", bd,\n              \"terminal_outcome\", \"none\",\n              \"attempt_state\", \"attempt_interrupted\",\n              \"public_state\", ps,\n              \"current_suspension_id\", \"\",\n              \"current_waitpoint_id\", \"\",\n              \"last_transition_at\", tostring(now_ms),\n              \"last_mutation_at\", tostring(now_ms))\n\n            -- 9b. Update scheduling indexes\n            local priority = tonumber(core.priority or \"0\")\n            local created_at_exec = tonumber(core.created_at or \"0\")\n            redis.call(\"ZREM\", K.suspended_zset, A.execution_id)\n            if A.resume_delay_ms > 0 then\n              redis.call(\"ZADD\", K.delayed_zset,\n                now_ms + A.resume_delay_ms, A.execution_id)\n            else\n              redis.call(\"ZADD\", K.eligible_zset,\n                0 - (priority * 1000000000000) + created_at_exec,\n                A.execution_id)\n            end\n          end\n\n          -- 9c. Close waitpoint condition (after exec_core is safe)\n          redis.call(\"HSET\", K.wp_condition,\n            \"closed\", \"1\",\n            \"closed_at\", tostring(now_ms),\n            \"closed_reason\", \"satisfied\")\n\n          -- 9d. Close waitpoint record\n          redis.call(\"HSET\", K.waitpoint_hash,\n            \"state\", \"closed\",\n            \"satisfied_at\", tostring(now_ms),\n            \"closed_at\", tostring(now_ms),\n            \"close_reason\", \"resumed\")\n\n          -- 9e. Close suspension record\n          if redis.call(\"EXISTS\", K.suspension_current) == 1 then\n            redis.call(\"HSET\", K.suspension_current,\n              \"satisfied_at\", tostring(now_ms),\n              \"closed_at\", tostring(now_ms),\n              \"close_reason\", \"resumed\")\n          end\n\n          -- 9f. Remove from suspension timeout index\n          redis.call(\"ZREM\", K.suspension_timeout_zset, A.execution_id)\n        end\n        break\n      end\n    end\n  end\n\n  if not matched then\n    effect = \"no_op\"\n  end\n\n  -- 10. Record observed effect on signal\n  redis.call(\"HSET\", K.signal_hash, \"observed_effect\", effect)\n\n  -- 11. Update waitpoint signal counts\n  redis.call(\"HINCRBY\", K.waitpoint_hash, \"signal_count\", 1)\n  if matched then\n    redis.call(\"HINCRBY\", K.waitpoint_hash, \"matched_signal_count\", 1)\n  end\n  redis.call(\"HSET\", K.waitpoint_hash, \"last_signal_at\", tostring(now_ms))\n\n  -- 12. Update suspension signal summary\n  if redis.call(\"EXISTS\", K.suspension_current) == 1 then\n    redis.call(\"HSET\", K.suspension_current, \"last_signal_at\", tostring(now_ms))\n  end\n\n  return ok(A.signal_id, effect)\nend)\n\n---------------------------------------------------------------------------\n-- #18  ff_buffer_signal_for_pending_waitpoint\n--\n-- Accept signal for a pending (not yet committed) waitpoint.\n-- Records the signal but does NOT evaluate resume conditions.\n-- When suspend_execution activates the waitpoint, buffered signals\n-- are replayed through the full evaluation path.\n--\n-- KEYS (9): exec_core, wp_condition, wp_signals_stream,\n--           exec_signals_zset, signal_hash, signal_payload,\n--           idem_key, waitpoint_hash, hmac_secrets\n-- ARGV (18): same as ff_deliver_signal (17 + waitpoint_token)\n---------------------------------------------------------------------------\nredis.register_function(\'ff_buffer_signal_for_pending_waitpoint\', function(keys, args)\n  local K = {\n    core_key          = keys[1],\n    wp_condition      = keys[2],\n    wp_signals_stream = keys[3],\n    exec_signals_zset = keys[4],\n    signal_hash       = keys[5],\n    signal_payload    = keys[6],\n    idem_key          = keys[7],\n    waitpoint_hash    = keys[8],\n    hmac_secrets      = keys[9],\n  }\n\n  local A = {\n    signal_id        = args[1],\n    execution_id     = args[2],\n    waitpoint_id     = args[3],\n    signal_name      = args[4],\n    signal_category  = args[5],\n    source_type      = args[6],\n    source_identity  = args[7],\n    payload          = args[8] or \"\",\n    payload_encoding = args[9] or \"json\",\n    idempotency_key  = args[10] or \"\",\n    correlation_id   = args[11] or \"\",\n    target_scope     = args[12] or \"waitpoint\",\n    created_at       = args[13] or \"\",\n    dedup_ttl_ms     = tonumber(args[14] or \"86400000\"),\n    signal_maxlen    = tonumber(args[16] or \"1000\"),\n    max_signals      = tonumber(args[17] or \"10000\"),\n    waitpoint_token  = args[18] or \"\",\n  }\n\n  local t = redis.call(\"TIME\")\n  local now_ms = tonumber(t[1]) * 1000 + math.floor(tonumber(t[2]) / 1000)\n\n  -- 1. Validate execution exists\n  local raw = redis.call(\"HGETALL\", K.core_key)\n  if #raw == 0 then\n    return err(\"execution_not_found\")\n  end\n\n  -- 1a. Validate HMAC token against the pending waitpoint\'s mint-time binding.\n  local wp_for_auth = hgetall_to_table(redis.call(\"HGETALL\", K.waitpoint_hash))\n  if not wp_for_auth.created_at then\n    return err(\"waitpoint_not_found\")\n  end\n  local token_err = validate_waitpoint_token(\n    K.hmac_secrets, A.waitpoint_token,\n    A.waitpoint_id, wp_for_auth.waitpoint_key or \"\",\n    tonumber(wp_for_auth.created_at) or 0, now_ms)\n  if token_err then\n    -- Operator-visible counter mirroring ff_deliver_signal. See comment\n    -- there for rationale.\n    redis.call(\"HSET\", K.core_key, \"last_hmac_validation_failed_at\", tostring(now_ms))\n    return err(token_err)\n  end\n\n  -- 1b. Gate on waitpoint state. ff_deliver_signal blocks replay-after-close\n  -- via wp_condition.closed, but wp_condition is not initialized for pending\n  -- waitpoints \u{2014} we must check wp.state directly. Without this, a caller\n  -- holding a valid token for a pending waitpoint that has since been\n  -- closed/expired can keep appending buffered signals that will replay\n  -- when suspend_execution(use_pending=1) later activates the waitpoint.\n  if wp_for_auth.state == \"closed\" or wp_for_auth.state == \"expired\" then\n    return err(\"waitpoint_closed\")\n  end\n\n  -- 2. Signal count limit\n  if A.max_signals > 0 then\n    local current_count = redis.call(\"ZCARD\", K.exec_signals_zset)\n    if current_count >= A.max_signals then\n      return err(\"signal_limit_exceeded\")\n    end\n  end\n\n  -- 3. Idempotency check\n  -- Guard: (A.dedup_ttl_ms or 0) handles nil from tonumber(\"\") safely.\n  local dedup_ms = A.dedup_ttl_ms or 0\n  if A.idempotency_key ~= \"\" and dedup_ms > 0 then\n    local existing = redis.call(\"GET\", K.idem_key)\n    if existing then\n      return ok_duplicate(existing)\n    end\n    redis.call(\"SET\", K.idem_key, A.signal_id,\n      \"PX\", dedup_ms, \"NX\")\n  end\n\n  -- 4. Record signal hash with tentative effect\n  local created_at = A.created_at ~= \"\" and A.created_at or tostring(now_ms)\n  redis.call(\"HSET\", K.signal_hash,\n    \"signal_id\", A.signal_id,\n    \"target_execution_id\", A.execution_id,\n    \"target_waitpoint_id\", A.waitpoint_id,\n    \"target_scope\", A.target_scope,\n    \"signal_name\", A.signal_name,\n    \"signal_category\", A.signal_category,\n    \"source_type\", A.source_type,\n    \"source_identity\", A.source_identity,\n    \"correlation_id\", A.correlation_id,\n    \"idempotency_key\", A.idempotency_key,\n    \"created_at\", created_at,\n    \"accepted_at\", tostring(now_ms),\n    \"matched_waitpoint_id\", A.waitpoint_id,\n    \"payload_encoding\", A.payload_encoding,\n    \"observed_effect\", \"buffered_for_pending_waitpoint\")\n\n  -- 4b. Store payload separately if present\n  if A.payload ~= \"\" then\n    redis.call(\"SET\", K.signal_payload, A.payload)\n  end\n\n  -- 5. Append to per-waitpoint signal stream + per-execution signal index\n  -- These are recorded so suspend_execution can XRANGE and replay them.\n  redis.call(\"XADD\", K.wp_signals_stream, \"MAXLEN\", \"~\",\n    tostring(A.signal_maxlen), \"*\",\n    \"signal_id\", A.signal_id,\n    \"signal_name\", A.signal_name,\n    \"signal_category\", A.signal_category,\n    \"source_type\", A.source_type,\n    \"source_identity\", A.source_identity,\n    \"matched\", \"0\",\n    \"accepted_at\", tostring(now_ms))\n  redis.call(\"ZADD\", K.exec_signals_zset, now_ms, A.signal_id)\n\n  -- No resume condition evaluation \u{2014} waitpoint is pending, not active.\n\n  return ok(A.signal_id, \"buffered_for_pending_waitpoint\")\nend)\n\n---------------------------------------------------------------------------\n-- #2  ff_claim_resumed_execution\n--\n-- Consume claim-grant, resume existing attempt (interrupted -> started),\n-- create new lease bound to SAME attempt. Does NOT create a new attempt.\n--\n-- KEYS (11): exec_core, claim_grant, eligible_zset, lease_expiry_zset,\n--            worker_leases, existing_attempt_hash, lease_current,\n--            lease_history, active_index, attempt_timeout_zset,\n--            execution_deadline_zset\n-- ARGV (8): execution_id, worker_id, worker_instance_id, lane,\n--           capability_snapshot_hash, lease_id, lease_ttl_ms,\n--           remaining_attempt_timeout_ms\n---------------------------------------------------------------------------\nredis.register_function(\'ff_claim_resumed_execution\', function(keys, args)\n  local K = {\n    core_key               = keys[1],\n    claim_grant_key        = keys[2],\n    eligible_zset          = keys[3],\n    lease_expiry_key       = keys[4],\n    worker_leases_key      = keys[5],\n    attempt_hash           = keys[6],\n    lease_current_key      = keys[7],\n    lease_history_key      = keys[8],\n    active_index_key       = keys[9],\n    attempt_timeout_key    = keys[10],\n    execution_deadline_key = keys[11],\n  }\n\n  local lease_ttl_n = require_number(args[7], \"lease_ttl_ms\")\n  if type(lease_ttl_n) == \"table\" then return lease_ttl_n end\n\n  local A = {\n    execution_id                  = args[1],\n    worker_id                     = args[2],\n    worker_instance_id            = args[3],\n    lane                          = args[4],\n    capability_snapshot_hash      = args[5] or \"\",\n    lease_id                      = args[6],\n    lease_ttl_ms                  = lease_ttl_n,\n    remaining_attempt_timeout_ms  = args[8] or \"\",\n  }\n\n  local t = redis.call(\"TIME\")\n  local now_ms = tonumber(t[1]) * 1000 + math.floor(tonumber(t[2]) / 1000)\n\n  -- 1. Validate execution exists\n  local raw = redis.call(\"HGETALL\", K.core_key)\n  if #raw == 0 then return err(\"execution_not_found\") end\n  local core = hgetall_to_table(raw)\n\n  -- 2. Must be runnable\n  if core.lifecycle_phase ~= \"runnable\" then\n    return err(\"execution_not_leaseable\")\n  end\n\n  -- 3. Must be attempt_interrupted (resumed after suspension/delay)\n  if core.attempt_state ~= \"attempt_interrupted\" then\n    return err(\"not_a_resumed_execution\")\n  end\n\n  -- 4. Validate claim grant\n  local grant_raw = redis.call(\"HGETALL\", K.claim_grant_key)\n  if #grant_raw == 0 then\n    return err(\"invalid_claim_grant\")\n  end\n  local grant = hgetall_to_table(grant_raw)\n\n  -- Validate grant matches (grant key is execution-scoped, so only check worker_id)\n  if grant.worker_id ~= A.worker_id then\n    return err(\"invalid_claim_grant\")\n  end\n\n  -- Check grant expiry\n  if is_set(grant.grant_expires_at) and tonumber(grant.grant_expires_at) < now_ms then\n    redis.call(\"DEL\", K.claim_grant_key)\n    return err(\"claim_grant_expired\")\n  end\n\n  -- Consume grant (DEL)\n  redis.call(\"DEL\", K.claim_grant_key)\n\n  -- 5. Resume existing attempt: attempt_interrupted -> started\n  --    Same attempt continues \u{2014} no new attempt_index.\n  local att_idx = core.current_attempt_index\n  local att_id = core.current_attempt_id\n  local epoch = tonumber(core.current_lease_epoch or \"0\") + 1\n  local expires_at = now_ms + A.lease_ttl_ms\n  local renewal_deadline = now_ms + math.floor(A.lease_ttl_ms * 2 / 3)\n\n  redis.call(\"HSET\", K.attempt_hash,\n    \"attempt_state\", \"started\",\n    \"resumed_at\", tostring(now_ms),\n    \"lease_id\", A.lease_id,\n    \"lease_epoch\", tostring(epoch),\n    \"worker_id\", A.worker_id,\n    \"worker_instance_id\", A.worker_instance_id,\n    \"suspended_at\", \"\",\n    \"suspension_id\", \"\")\n\n  -- 6. Create new lease bound to same attempt\n  redis.call(\"DEL\", K.lease_current_key)\n  redis.call(\"HSET\", K.lease_current_key,\n    \"lease_id\", A.lease_id,\n    \"lease_epoch\", tostring(epoch),\n    \"execution_id\", A.execution_id,\n    \"attempt_id\", att_id,\n    \"worker_id\", A.worker_id,\n    \"worker_instance_id\", A.worker_instance_id,\n    \"acquired_at\", tostring(now_ms),\n    \"expires_at\", tostring(expires_at),\n    \"last_renewed_at\", tostring(now_ms),\n    \"renewal_deadline\", tostring(renewal_deadline))\n\n  -- 7. Update exec_core \u{2014} ALL 7 state vector dimensions\n  redis.call(\"HSET\", K.core_key,\n    \"lifecycle_phase\", \"active\",\n    \"ownership_state\", \"leased\",\n    \"eligibility_state\", \"not_applicable\",\n    \"blocking_reason\", \"none\",\n    \"blocking_detail\", \"\",\n    \"terminal_outcome\", \"none\",\n    \"attempt_state\", \"running_attempt\",\n    \"public_state\", \"active\",\n    \"current_lease_id\", A.lease_id,\n    \"current_lease_epoch\", tostring(epoch),\n    \"current_worker_id\", A.worker_id,\n    \"current_worker_instance_id\", A.worker_instance_id,\n    \"current_lane\", A.lane,\n    \"lease_acquired_at\", tostring(now_ms),\n    \"lease_expires_at\", tostring(expires_at),\n    \"lease_last_renewed_at\", tostring(now_ms),\n    \"lease_renewal_deadline\", tostring(renewal_deadline),\n    \"lease_expired_at\", \"\",\n    \"lease_revoked_at\", \"\",\n    \"lease_revoke_reason\", \"\",\n    \"last_transition_at\", tostring(now_ms),\n    \"last_mutation_at\", tostring(now_ms))\n\n  -- 8. Update indexes\n  redis.call(\"ZREM\", K.eligible_zset, A.execution_id)\n  redis.call(\"ZADD\", K.lease_expiry_key, expires_at, A.execution_id)\n  redis.call(\"SADD\", K.worker_leases_key, A.execution_id)\n  redis.call(\"ZADD\", K.active_index_key, expires_at, A.execution_id)\n\n  -- 9. ZADD attempt_timeout with remaining timeout\n  if is_set(A.remaining_attempt_timeout_ms) then\n    local remaining = tonumber(A.remaining_attempt_timeout_ms)\n    if remaining > 0 then\n      redis.call(\"ZADD\", K.attempt_timeout_key,\n        now_ms + remaining, A.execution_id)\n    end\n  end\n\n  -- 10. Lease history event\n  redis.call(\"XADD\", K.lease_history_key, \"MAXLEN\", \"~\", 1000, \"*\",\n    \"event\", \"acquired\",\n    \"lease_id\", A.lease_id,\n    \"lease_epoch\", tostring(epoch),\n    \"attempt_index\", att_idx,\n    \"attempt_id\", att_id,\n    \"worker_id\", A.worker_id,\n    \"reason\", \"claim_resumed\",\n    \"ts\", tostring(now_ms))\n\n  return ok(A.lease_id, tostring(epoch), tostring(expires_at),\n            att_id, att_idx, \"resumed\")\nend)\n\n\n-- source: lua/stream.lua\n-- FlowFabric stream append function\n-- Reference: RFC-006 (Stream), RFC-010 \u{a7}4.1 (#20)\n--\n-- Depends on helpers: ok, err, is_set\n\n---------------------------------------------------------------------------\n-- #20  ff_append_frame\n--\n-- Append a frame to the attempt-scoped output stream. Highest-throughput\n-- function \u{2014} called once per token during LLM streaming. Uses lite lease\n-- validation (HMGET, not HGETALL) for minimal overhead. Class B operation.\n--\n-- KEYS (3): exec_core, stream_data, stream_meta\n-- ARGV (13): execution_id, attempt_index, lease_id, lease_epoch,\n--            frame_type, ts, payload, encoding, correlation_id,\n--            source, retention_maxlen, attempt_id, max_payload_bytes\n---------------------------------------------------------------------------\nredis.register_function(\'ff_append_frame\', function(keys, args)\n  local K = {\n    core_key    = keys[1],\n    stream_key  = keys[2],\n    stream_meta = keys[3],\n  }\n\n  local A = {\n    execution_id     = args[1],\n    attempt_index    = args[2],\n    lease_id         = args[3],\n    lease_epoch      = args[4],\n    frame_type       = args[5],\n    ts               = args[6] or \"\",\n    payload          = args[7] or \"\",\n    encoding         = args[8] or \"utf8\",\n    correlation_id   = args[9] or \"\",\n    source           = args[10] or \"worker\",\n    retention_maxlen = tonumber(args[11] or \"0\"),\n    attempt_id       = args[12] or \"\",\n    max_payload_bytes = tonumber(args[13] or \"65536\"),\n  }\n\n  -- 1. Payload size guard (v1 default: 64KB)\n  if #A.payload > A.max_payload_bytes then\n    return err(\"retention_limit_exceeded\")\n  end\n\n  -- 2. Lite lease validation via HMGET (Class B \u{2014} no full HGETALL)\n  local core = redis.call(\"HMGET\", K.core_key,\n    \"current_attempt_index\",   -- [1]\n    \"current_lease_id\",        -- [2]\n    \"current_lease_epoch\",     -- [3]\n    \"lease_expires_at\",        -- [4]\n    \"lifecycle_phase\",         -- [5]\n    \"ownership_state\")         -- [6]\n\n  -- Execution must be active\n  if core[5] ~= \"active\" then\n    return err(\"stream_closed\")\n  end\n\n  -- Ownership must not be expired/revoked\n  if core[6] == \"lease_expired_reclaimable\" or core[6] == \"lease_revoked\" then\n    return err(\"stale_owner_cannot_append\")\n  end\n\n  -- Lease must not be expired (server time check)\n  local t = redis.call(\"TIME\")\n  local now_ms = tonumber(t[1]) * 1000 + math.floor(tonumber(t[2]) / 1000)\n  if tonumber(core[4] or \"0\") <= now_ms then\n    return err(\"stale_owner_cannot_append\")\n  end\n\n  -- Attempt index must match\n  if tostring(core[1]) ~= A.attempt_index then\n    return err(\"stale_owner_cannot_append\")\n  end\n\n  -- Lease identity must match\n  if core[2] ~= A.lease_id or tostring(core[3]) ~= A.lease_epoch then\n    return err(\"stale_owner_cannot_append\")\n  end\n\n  -- 3. Lazy-create stream metadata on first append\n  if redis.call(\"EXISTS\", K.stream_meta) == 0 then\n    redis.call(\"HSET\", K.stream_meta,\n      \"stream_id\", A.execution_id .. \":\" .. A.attempt_index,\n      \"execution_id\", A.execution_id,\n      \"attempt_id\", A.attempt_id,\n      \"attempt_index\", A.attempt_index,\n      \"created_at\", tostring(now_ms),\n      \"closed_at\", \"\",\n      \"closed_reason\", \"\",\n      \"durability_mode\", \"durable_full\",\n      \"retention_maxlen\", tostring(A.retention_maxlen),\n      \"last_sequence\", \"\",\n      \"frame_count\", \"0\",\n      \"total_bytes\", \"0\",\n      \"last_frame_at\", \"\")\n  end\n\n  -- 4. Check stream not closed\n  local closed = redis.call(\"HGET\", K.stream_meta, \"closed_at\")\n  if is_set(closed) then\n    return err(\"stream_closed\")\n  end\n\n  -- 5. Append frame via XADD\n  local ts = A.ts ~= \"\" and A.ts or tostring(now_ms)\n  local xadd_args = {\n    K.stream_key, \"*\",\n    \"frame_type\", A.frame_type,\n    \"ts\", ts,\n    \"payload\", A.payload,\n    \"encoding\", A.encoding,\n    \"source\", A.source,\n  }\n  -- Only include correlation_id if non-empty (saves memory on high-throughput paths)\n  if A.correlation_id ~= \"\" then\n    xadd_args[#xadd_args + 1] = \"correlation_id\"\n    xadd_args[#xadd_args + 1] = A.correlation_id\n  end\n\n  local entry_id = redis.call(\"XADD\", unpack(xadd_args))\n\n  -- 6. Update stream metadata.\n  --\n  -- `frame_count` is the LIFETIME append counter \u{2014} it is NOT the number\n  -- of frames currently retained in the stream. XTRIM below prunes old\n  -- entries without decrementing this counter, so on a 10k-cap stream\n  -- that has seen 1M appends `frame_count==1_000_000` while `XLEN==10_000`.\n  -- Consumers that want the retained count must `XLEN` the stream\n  -- directly; `frame_count` is the right number for metering, billing,\n  -- per-attempt usage attribution \u{2014} anything that needs \"how much was\n  -- produced\", not \"how much is still here\".\n  local frame_count = redis.call(\"HINCRBY\", K.stream_meta, \"frame_count\", 1)\n  redis.call(\"HINCRBY\", K.stream_meta, \"total_bytes\", #A.payload)\n  redis.call(\"HSET\", K.stream_meta,\n    \"last_sequence\", entry_id,\n    \"last_frame_at\", tostring(now_ms))\n\n  -- 7. Apply retention trim.\n  --\n  -- XTRIM MAXLEN `~` (approximate) is the default: it trims at macro-node\n  -- boundaries for throughput, so actual retained length floats slightly\n  -- above the target. Under a token-per-frame burst this can briefly\n  -- hold up to 2x the requested retention.\n  --\n  -- When the caller explicitly passes `retention_maxlen > 0` they\'ve\n  -- opted into a specific bound; honor it EXACTLY with `=`. Bursty LLM\n  -- workloads that care about predictable memory pay a small XTRIM-rate\n  -- cost for the tighter bound. Default (A.retention_maxlen == 0) still\n  -- uses `~` for the lane-level unbounded-growth guard, where throughput\n  -- matters more than exact retention.\n  local maxlen = A.retention_maxlen\n  local trim_op\n  if maxlen == 0 then\n    maxlen = 10000       -- default cap prevents unbounded growth\n    trim_op = \"~\"\n  else\n    trim_op = \"=\"        -- caller-supplied bound is honored exactly\n  end\n  redis.call(\"XTRIM\", K.stream_key, \"MAXLEN\", trim_op, maxlen)\n\n  return ok(entry_id, tostring(frame_count))\nend)\n\n---------------------------------------------------------------------------\n-- ff_read_attempt_stream\n--\n-- Read frames from an attempt-scoped output stream via XRANGE. Non-blocking\n-- (safe in Lua Functions). Cluster-safe: stream_key and stream_meta share\n-- the {p:N} hash tag.\n--\n-- Returns an empty array when the stream key does not exist (not an error \u{2014}\n-- the attempt may not have produced frames yet). Also reports\n-- (closed_at, closed_reason) from stream_meta so callers can stop polling.\n--\n-- KEYS (2): stream_data, stream_meta\n-- ARGV (3): from_id, to_id, count_limit (must be 1..=HARD_CAP; 0 rejected)\n---------------------------------------------------------------------------\nredis.register_function(\'ff_read_attempt_stream\', function(keys, args)\n  local stream_key  = keys[1]\n  local stream_meta = keys[2]\n\n  local from_id = args[1] or \"-\"\n  local to_id   = args[2] or \"+\"\n  local count_limit = tonumber(args[3] or \"0\")\n\n  -- Explicit reject on zero/negative AND on over-cap. The REST and SDK\n  -- layers reject both at their boundary (R2/R3); the Lua check is the\n  -- last line of defense for direct FCALL callers (tests, future\n  -- consumers) so they get a clear error instead of a silently-clamped\n  -- whole-stream read or reply-size blowup.\n  --\n  -- Before PR#7 this was asymmetric: < 1 rejected with invalid_input,\n  -- but > HARD_CAP was silently clamped. That contradicted RFC-006\n  -- \u{a7}Input validation (\"both bounds reject, neither silently clamps\").\n  -- Now both edges reject symmetrically.\n  --\n  -- HARD_CAP mirrors ff_core::contracts::STREAM_READ_HARD_CAP \u{2014} keep in sync.\n  local HARD_CAP = 10000\n  if count_limit == nil or count_limit < 1 then\n    return err(\"invalid_input\", \"count_limit must be >= 1\")\n  end\n  if count_limit > HARD_CAP then\n    return err(\"invalid_input\", \"count_limit_exceeds_hard_cap\")\n  end\n\n  -- Stream may legitimately not exist (never-written attempt). XRANGE on a\n  -- missing key returns empty, so no pre-check is needed.\n  local entries = redis.call(\"XRANGE\", stream_key, from_id, to_id,\n                             \"COUNT\", count_limit)\n\n  -- Fetch terminal markers from stream_meta. A never-written attempt has\n  -- no stream_meta hash; HMGET returns nils for both fields which we\n  -- normalize to empty strings on the return path so Rust can decode a\n  -- consistent shape.\n  local meta = redis.call(\"HMGET\", stream_meta, \"closed_at\", \"closed_reason\")\n  local closed_at     = meta[1] or \"\"\n  local closed_reason = meta[2] or \"\"\n\n  -- entries is an array of [entry_id, [f1, v1, f2, v2, ...]].\n  -- Return shape: ok(entries, closed_at, closed_reason). Rust parses the\n  -- three fields positionally.\n  return ok(entries, closed_at, closed_reason)\nend)\n\n\n-- source: lua/budget.lua\n-- FlowFabric budget functions\n-- Reference: RFC-008 (Budget), RFC-010 \u{a7}4.3 (#30, #31), \u{a7}4.1 (#29a, #29b)\n--\n-- Depends on helpers: ok, err, is_set, hgetall_to_table\n\n---------------------------------------------------------------------------\n-- ff_create_budget  (on {b:M})\n--\n-- Create a new budget policy with hard/soft limits on N dimensions.\n-- Idempotent: if EXISTS budget_def \u{2192} return ok_already_satisfied.\n--\n-- KEYS (5): budget_def, budget_limits, budget_usage, budget_resets_zset,\n--           budget_policies_index\n-- ARGV (variable): budget_id, scope_type, scope_id, enforcement_mode,\n--   on_hard_limit, on_soft_limit, reset_interval_ms, now_ms,\n--   dimension_count, dim_1..dim_N, hard_1..hard_N, soft_1..soft_N\n---------------------------------------------------------------------------\nredis.register_function(\'ff_create_budget\', function(keys, args)\n  local K = {\n    def_key         = keys[1],\n    limits_key      = keys[2],\n    usage_key       = keys[3],\n    resets_zset     = keys[4],\n    policies_index  = keys[5],\n  }\n\n  local A = {\n    budget_id         = args[1],\n    scope_type        = args[2],\n    scope_id          = args[3],\n    enforcement_mode  = args[4],\n    on_hard_limit     = args[5],\n    on_soft_limit     = args[6],\n    reset_interval_ms = args[7],\n    now_ms            = args[8],\n  }\n\n  -- Maintain budget_policies_index BEFORE the idempotency guard. SADD is\n  -- itself idempotent (no-op on existing members), and hoisting it heals\n  -- any pre-existing budget_def that was created before this index was\n  -- introduced \u{2014} no migration script required.\n  redis.call(\"SADD\", K.policies_index, A.budget_id)\n\n  -- Idempotency: already exists \u{2192} return immediately\n  if redis.call(\"EXISTS\", K.def_key) == 1 then\n    return ok_already_satisfied(A.budget_id)\n  end\n\n  local dim_count = require_number(args[9], \"dim_count\")\n  if type(dim_count) == \"table\" then return dim_count end\n\n  -- HSET budget definition\n  redis.call(\"HSET\", K.def_key,\n    \"budget_id\", A.budget_id,\n    \"scope_type\", A.scope_type,\n    \"scope_id\", A.scope_id,\n    \"enforcement_mode\", A.enforcement_mode,\n    \"on_hard_limit\", A.on_hard_limit,\n    \"on_soft_limit\", A.on_soft_limit,\n    \"reset_interval_ms\", A.reset_interval_ms,\n    \"breach_count\", \"0\",\n    \"soft_breach_count\", \"0\",\n    \"created_at\", A.now_ms,\n    \"last_updated_at\", A.now_ms)\n\n  -- HSET per-dimension hard and soft limits\n  for i = 1, dim_count do\n    local dim  = args[9 + i]\n    local hard = args[9 + dim_count + i]\n    local soft = args[9 + 2 * dim_count + i]\n    redis.call(\"HSET\", K.limits_key, \"hard:\" .. dim, hard, \"soft:\" .. dim, soft)\n  end\n\n  -- budget_usage left empty \u{2014} first report_usage will create fields\n\n  -- Schedule periodic reset if reset_interval_ms > 0\n  local interval_ms = tonumber(A.reset_interval_ms)\n  if interval_ms > 0 then\n    local next_reset_at = tostring(tonumber(A.now_ms) + interval_ms)\n    redis.call(\"HSET\", K.def_key, \"next_reset_at\", next_reset_at)\n    redis.call(\"ZADD\", K.resets_zset, tonumber(next_reset_at), A.budget_id)\n  end\n\n  return ok(A.budget_id)\nend)\n\n---------------------------------------------------------------------------\n-- #30  ff_report_usage_and_check  (on {b:M})\n--\n-- Check-before-increment: read current usage, check hard limits. If any\n-- dimension would breach, return HARD_BREACH without incrementing. If safe,\n-- HINCRBY all dimensions, then check soft limits.\n--\n-- Atomic Lua serialization on {b:M} guarantees zero overshoot.\n--\n-- KEYS (3): budget_usage, budget_limits, budget_def\n-- ARGV (variable): dimension_count, dim_1..dim_N, delta_1..delta_N, now_ms, [dedup_key]\n---------------------------------------------------------------------------\nredis.register_function(\'ff_report_usage_and_check\', function(keys, args)\n  local K = {\n    usage_key  = keys[1],\n    limits_key = keys[2],\n    def_key    = keys[3],\n  }\n\n  local dim_count = require_number(args[1], \"dim_count\")\n  if type(dim_count) == \"table\" then return dim_count end\n  local now_ms = args[2 * dim_count + 2]\n  local dedup_key = args[2 * dim_count + 3] or \"\"\n\n  -- Idempotency: if dedup_key provided, check for prior application\n  if dedup_key ~= \"\" then\n    local existing = redis.call(\"GET\", dedup_key)\n    if existing then\n      return {1, \"ALREADY_APPLIED\"}\n    end\n  end\n\n  -- Phase 1: CHECK all dimensions BEFORE any increment.\n  -- If any hard limit would be breached, reject the entire report.\n  for i = 1, dim_count do\n    local dim = args[1 + i]\n    local delta = tonumber(args[1 + dim_count + i])\n    local current = tonumber(redis.call(\"HGET\", K.usage_key, dim) or \"0\")\n    local new_total = current + delta\n\n    local hard_limit = redis.call(\"HGET\", K.limits_key, \"hard:\" .. dim)\n    if hard_limit and hard_limit ~= \"\" and hard_limit ~= false then\n      local limit_val = tonumber(hard_limit)\n      if limit_val > 0 and new_total > limit_val then\n        -- Record breach metadata but DO NOT increment\n        redis.call(\"HINCRBY\", K.def_key, \"breach_count\", 1)\n        redis.call(\"HSET\", K.def_key,\n          \"last_breach_at\", now_ms,\n          \"last_breach_dim\", dim,\n          \"last_updated_at\", now_ms)\n        return {1, \"HARD_BREACH\", dim, tostring(current), tostring(hard_limit)}\n      end\n    end\n  end\n\n  -- Phase 2: No hard breach detected \u{2014} safe to increment all dimensions.\n  local breached_soft = nil\n  for i = 1, dim_count do\n    local dim = args[1 + i]\n    local delta = tonumber(args[1 + dim_count + i])\n    local new_val = redis.call(\"HINCRBY\", K.usage_key, dim, delta)\n\n    -- Check soft limit (advisory \u{2014} increment still happens)\n    local soft_limit = redis.call(\"HGET\", K.limits_key, \"soft:\" .. dim)\n    if soft_limit and soft_limit ~= \"\" and soft_limit ~= false then\n      local limit_val = tonumber(soft_limit)\n      if limit_val > 0 and new_val > limit_val then\n        if not breached_soft then\n          breached_soft = dim\n        end\n      end\n    end\n  end\n\n  -- Update metadata\n  redis.call(\"HSET\", K.def_key, \"last_updated_at\", now_ms)\n\n  -- Mark dedup key after successful increment (24h TTL)\n  if dedup_key ~= \"\" then\n    redis.call(\"SET\", dedup_key, \"1\", \"PX\", 86400000)\n  end\n\n  if breached_soft then\n    redis.call(\"HINCRBY\", K.def_key, \"soft_breach_count\", 1)\n    local soft_val = tonumber(redis.call(\"HGET\", K.limits_key, \"soft:\" .. breached_soft) or \"0\")\n    local cur_val = tonumber(redis.call(\"HGET\", K.usage_key, breached_soft) or \"0\")\n    return {1, \"SOFT_BREACH\", breached_soft, tostring(cur_val), tostring(soft_val)}\n  end\n\n  return {1, \"OK\"}\nend)\n\n---------------------------------------------------------------------------\n-- #31  ff_reset_budget  (on {b:M})\n--\n-- Scanner-called periodic reset. Zero all usage fields, record reset,\n-- compute next_reset_at, re-score in reset index.\n--\n-- KEYS (3): budget_def, budget_usage, budget_resets_zset\n-- ARGV (2): budget_id, now_ms\n---------------------------------------------------------------------------\nredis.register_function(\'ff_reset_budget\', function(keys, args)\n  local K = {\n    def_key       = keys[1],\n    usage_key     = keys[2],\n    resets_zset   = keys[3],\n  }\n\n  local A = {\n    budget_id = args[1],\n    now_ms    = args[2],\n  }\n\n  -- 1. Read usage fields and zero them\n  local usage_fields = redis.call(\"HKEYS\", K.usage_key)\n  if #usage_fields > 0 then\n    local zero_args = {}\n    for _, field in ipairs(usage_fields) do\n      zero_args[#zero_args + 1] = field\n      zero_args[#zero_args + 1] = \"0\"\n    end\n    redis.call(\"HSET\", K.usage_key, unpack(zero_args))\n  end\n\n  -- 2. Update budget_def: last_reset_at, reset_count\n  redis.call(\"HINCRBY\", K.def_key, \"reset_count\", 1)\n  redis.call(\"HSET\", K.def_key,\n    \"last_reset_at\", A.now_ms,\n    \"last_updated_at\", A.now_ms,\n    \"last_breach_at\", \"\",\n    \"last_breach_dim\", \"\")\n\n  -- 3. Compute next_reset_at from reset_interval_ms\n  local interval_ms = tonumber(redis.call(\"HGET\", K.def_key, \"reset_interval_ms\") or \"0\")\n  local next_reset_at = \"0\"\n  if interval_ms > 0 then\n    next_reset_at = tostring(tonumber(A.now_ms) + interval_ms)\n    redis.call(\"HSET\", K.def_key, \"next_reset_at\", next_reset_at)\n    redis.call(\"ZADD\", K.resets_zset, tonumber(next_reset_at), A.budget_id)\n  else\n    -- No recurring reset \u{2014} remove from schedule\n    redis.call(\"ZREM\", K.resets_zset, A.budget_id)\n  end\n\n  return ok(next_reset_at)\nend)\n\n---------------------------------------------------------------------------\n-- #29b  ff_unblock_execution  (on {p:N})\n--\n-- Re-evaluate blocked execution, set eligible_now, ZREM blocked set,\n-- ZADD eligible. All 7 dims.\n--\n-- KEYS (3): exec_core, blocked_zset, eligible_zset\n-- ARGV (3): execution_id, now_ms, expected_blocking_reason\n---------------------------------------------------------------------------\nredis.register_function(\'ff_unblock_execution\', function(keys, args)\n  local K = {\n    core_key     = keys[1],\n    blocked_zset = keys[2],\n    eligible_zset = keys[3],\n  }\n\n  local A = {\n    execution_id             = args[1],\n    now_ms                   = args[2],\n    expected_blocking_reason = args[3] or \"\",\n  }\n\n  -- 1. Read execution core\n  local raw = redis.call(\"HGETALL\", K.core_key)\n  if #raw == 0 then return err(\"execution_not_found\") end\n  local core = hgetall_to_table(raw)\n\n  -- 2. Must be runnable\n  if core.lifecycle_phase ~= \"runnable\" then\n    return err(\"execution_not_eligible\")\n  end\n\n  -- 3. Must be blocked\n  local es = core.eligibility_state\n  if es ~= \"blocked_by_budget\" and es ~= \"blocked_by_quota\"\n     and es ~= \"blocked_by_route\" and es ~= \"blocked_by_operator\" then\n    return err(\"execution_not_eligible\")\n  end\n\n  -- 4. Validate expected blocking reason (prevent stale unblock)\n  if A.expected_blocking_reason ~= \"\" then\n    if core.blocking_reason ~= A.expected_blocking_reason then\n      return err(\"execution_not_eligible\")\n    end\n  end\n\n  -- 5. Transition: all 7 dims\n  local priority = tonumber(core.priority or \"0\")\n  local created_at = tonumber(core.created_at or \"0\")\n  local score = 0 - (priority * 1000000000000) + created_at\n\n  redis.call(\"HSET\", K.core_key,\n    \"lifecycle_phase\", \"runnable\",\n    \"ownership_state\", \"unowned\",\n    \"eligibility_state\", \"eligible_now\",\n    \"blocking_reason\", \"waiting_for_worker\",\n    \"blocking_detail\", \"\",\n    \"terminal_outcome\", \"none\",\n    \"attempt_state\", core.attempt_state or \"pending_first_attempt\",\n    \"public_state\", \"waiting\",\n    \"last_transition_at\", A.now_ms,\n    \"last_mutation_at\", A.now_ms)\n\n  -- 6. Move from blocked to eligible\n  redis.call(\"ZREM\", K.blocked_zset, A.execution_id)\n  redis.call(\"ZADD\", K.eligible_zset, score, A.execution_id)\n\n  return ok(\"unblocked\")\nend)\n\n---------------------------------------------------------------------------\n-- #29a  ff_block_execution_for_admission  (on {p:N})\n--\n-- Parameterized block: set eligibility/blocking for budget/quota/route/lane\n-- denial, ZREM eligible, ZADD target blocked set. All 7 dims.\n--\n-- KEYS (3): exec_core, eligible_zset, target_blocked_zset\n-- ARGV (4): execution_id, blocking_reason, blocking_detail, now_ms\n---------------------------------------------------------------------------\nredis.register_function(\'ff_block_execution_for_admission\', function(keys, args)\n  local K = {\n    core_key       = keys[1],\n    eligible_zset  = keys[2],\n    blocked_zset   = keys[3],\n  }\n\n  local A = {\n    execution_id    = args[1],\n    blocking_reason = args[2],\n    blocking_detail = args[3] or \"\",\n    now_ms          = args[4],\n  }\n\n  -- 1. Read execution core\n  local raw = redis.call(\"HGETALL\", K.core_key)\n  if #raw == 0 then return err(\"execution_not_found\") end\n  local core = hgetall_to_table(raw)\n\n  -- 2. Must be runnable\n  if core.lifecycle_phase ~= \"runnable\" then\n    if core.lifecycle_phase == \"terminal\" then\n      return err(\"execution_not_active\", core.terminal_outcome or \"\", core.current_lease_epoch or \"\")\n    end\n    return err(\"execution_not_eligible\")\n  end\n\n  -- 3. Map blocking_reason to eligibility_state\n  local REASON_TO_ELIGIBILITY = {\n    waiting_for_budget         = \"blocked_by_budget\",\n    waiting_for_quota          = \"blocked_by_quota\",\n    waiting_for_capable_worker = \"blocked_by_route\",\n    paused_by_operator         = \"blocked_by_operator\",\n    paused_by_policy           = \"blocked_by_lane_state\",\n  }\n  local eligibility = REASON_TO_ELIGIBILITY[A.blocking_reason]\n  if not eligibility then\n    return err(\"invalid_blocking_reason\")\n  end\n\n  -- 4. Transition: all 7 dims\n  redis.call(\"HSET\", K.core_key,\n    \"lifecycle_phase\", \"runnable\",\n    \"ownership_state\", \"unowned\",\n    \"eligibility_state\", eligibility,\n    \"blocking_reason\", A.blocking_reason,\n    \"blocking_detail\", A.blocking_detail,\n    \"terminal_outcome\", \"none\",\n    \"attempt_state\", core.attempt_state or \"pending_first_attempt\",\n    \"public_state\", \"rate_limited\",\n    \"last_transition_at\", A.now_ms,\n    \"last_mutation_at\", A.now_ms)\n\n  -- 5. Move from eligible to blocked\n  redis.call(\"ZREM\", K.eligible_zset, A.execution_id)\n  redis.call(\"ZADD\", K.blocked_zset, tonumber(A.now_ms), A.execution_id)\n\n  return ok(\"blocked\")\nend)\n\n\n-- source: lua/quota.lua\n-- FlowFabric quota and rate-limit functions\n-- Reference: RFC-008 (Quota), RFC-010 \u{a7}4.4 (#32)\n--\n-- Depends on helpers: ok, err, is_set\n\n---------------------------------------------------------------------------\n-- ff_create_quota_policy  (on {q:K})\n--\n-- Create a new quota/rate-limit policy.\n-- Idempotent: if EXISTS quota_def \u{2192} return ok_already_satisfied.\n--\n-- KEYS (5): quota_def, quota_window_zset, quota_concurrency_counter,\n--           admitted_set, quota_policies_index\n-- ARGV (5): quota_policy_id, window_seconds, max_requests_per_window,\n--           max_concurrent, now_ms\n---------------------------------------------------------------------------\nredis.register_function(\'ff_create_quota_policy\', function(keys, args)\n  local K = {\n    def_key          = keys[1],\n    window_zset      = keys[2],\n    concurrency_key  = keys[3],\n    admitted_set     = keys[4],\n    policies_index   = keys[5],\n  }\n\n  local A = {\n    quota_policy_id         = args[1],\n    window_seconds          = args[2],\n    max_requests_per_window = args[3],\n    max_concurrent          = args[4],\n    now_ms                  = args[5],\n  }\n\n  -- Idempotency: already exists \u{2192} return immediately\n  if redis.call(\"EXISTS\", K.def_key) == 1 then\n    return ok_already_satisfied(A.quota_policy_id)\n  end\n\n  -- HSET quota definition\n  redis.call(\"HSET\", K.def_key,\n    \"quota_policy_id\", A.quota_policy_id,\n    \"requests_per_window_seconds\", A.window_seconds,\n    \"max_requests_per_window\", A.max_requests_per_window,\n    \"active_concurrency_cap\", A.max_concurrent,\n    \"created_at\", A.now_ms)\n\n  -- Init concurrency counter to 0\n  redis.call(\"SET\", K.concurrency_key, \"0\")\n\n  -- Register in partition-level policy index (for cluster-safe discovery)\n  redis.call(\"SADD\", K.policies_index, A.quota_policy_id)\n\n  -- admitted_set + quota_window_zset left empty (populated on admission)\n\n  return ok(A.quota_policy_id)\nend)\n\n---------------------------------------------------------------------------\n-- #32  ff_check_admission_and_record  (on {q:K})\n--\n-- Idempotent sliding-window rate check + concurrency check.\n-- If admitted: ZADD window, SET NX guard, optional INCR concurrency,\n-- SADD to admitted_set (for cluster-safe reconciler discovery).\n--\n-- KEYS (5): window_zset, concurrency_counter, quota_def, admitted_guard_key,\n--           admitted_set\n-- ARGV (6): now_ms, window_seconds, rate_limit, concurrency_cap,\n--           execution_id, jitter_ms\n---------------------------------------------------------------------------\nredis.register_function(\'ff_check_admission_and_record\', function(keys, args)\n  local K = {\n    window_zset        = keys[1],\n    concurrency_key    = keys[2],\n    quota_def          = keys[3],\n    admitted_guard_key = keys[4],\n    admitted_set       = keys[5],\n  }\n\n  local now_ms_n          = require_number(args[1], \"now_ms\")\n  if type(now_ms_n) == \"table\" then return now_ms_n end\n  local window_seconds_n  = require_number(args[2], \"window_seconds\")\n  if type(window_seconds_n) == \"table\" then return window_seconds_n end\n  local rate_limit_n      = require_number(args[3], \"rate_limit\")\n  if type(rate_limit_n) == \"table\" then return rate_limit_n end\n  local concurrency_cap_n = require_number(args[4], \"concurrency_cap\")\n  if type(concurrency_cap_n) == \"table\" then return concurrency_cap_n end\n\n  local A = {\n    now_ms          = now_ms_n,\n    window_seconds  = window_seconds_n,\n    rate_limit      = rate_limit_n,\n    concurrency_cap = concurrency_cap_n,\n    execution_id    = args[5],\n    jitter_ms       = tonumber(args[6] or \"0\"),\n  }\n\n  local window_ms = A.window_seconds * 1000\n\n  -- 1. Idempotency guard: already admitted in this window?\n  if redis.call(\"EXISTS\", K.admitted_guard_key) == 1 then\n    return { \"ALREADY_ADMITTED\" }\n  end\n\n  -- 2. Sliding window: remove expired entries\n  redis.call(\"ZREMRANGEBYSCORE\", K.window_zset, \"-inf\", A.now_ms - window_ms)\n\n  -- 3. Check rate limit\n  if A.rate_limit > 0 then\n    local current_count = redis.call(\"ZCARD\", K.window_zset)\n    if current_count >= A.rate_limit then\n      -- Compute retry_after from oldest entry\n      local oldest = redis.call(\"ZRANGE\", K.window_zset, 0, 0, \"WITHSCORES\")\n      local retry_after_ms = 0\n      if #oldest >= 2 then\n        retry_after_ms = tonumber(oldest[2]) + window_ms - A.now_ms\n        if retry_after_ms < 0 then retry_after_ms = 0 end\n      end\n      local jitter = 0\n      if A.jitter_ms > 0 then\n        jitter = math.random(0, A.jitter_ms)\n      end\n      return { \"RATE_EXCEEDED\", tostring(retry_after_ms + jitter) }\n    end\n  end\n\n  -- 4. Check concurrency cap\n  if A.concurrency_cap > 0 then\n    local active = tonumber(redis.call(\"GET\", K.concurrency_key) or \"0\")\n    if active >= A.concurrency_cap then\n      return { \"CONCURRENCY_EXCEEDED\" }\n    end\n  end\n\n  -- 5. Admit: record in sliding window (execution_id as member \u{2014} idempotent)\n  redis.call(\"ZADD\", K.window_zset, A.now_ms, A.execution_id)\n\n  -- 6. Set admitted guard key with TTL = window size\n  -- Guard: PX 0 or PX <0 causes Valkey error inside Lua (after ZADD committed).\n  if window_ms > 0 then\n    redis.call(\"SET\", K.admitted_guard_key, \"1\", \"PX\", window_ms, \"NX\")\n  end\n\n  -- 7. Increment concurrency counter if cap is set\n  if A.concurrency_cap > 0 then\n    redis.call(\"INCR\", K.concurrency_key)\n  end\n\n  -- 8. Track in admitted set (for cluster-safe reconciler \u{2014} replaces SCAN)\n  redis.call(\"SADD\", K.admitted_set, A.execution_id)\n\n  return { \"ADMITTED\" }\nend)\n\n---------------------------------------------------------------------------\n-- ff_release_admission  (on {q:K})\n--\n-- Release a previously-recorded admission slot. Called when a claim grant\n-- fails after admission was recorded, preventing leaked concurrency slots.\n--\n-- KEYS (3): admitted_guard_key, admitted_set, concurrency_counter\n-- ARGV (1): execution_id\n---------------------------------------------------------------------------\nredis.register_function(\'ff_release_admission\', function(keys, args)\n  local K = {\n    admitted_guard_key = keys[1],\n    admitted_set       = keys[2],\n    concurrency_key    = keys[3],\n  }\n\n  local execution_id = args[1]\n\n  -- 1. Delete the guard key (idempotent \u{2014} DEL returns 0 if absent)\n  redis.call(\"DEL\", K.admitted_guard_key)\n\n  -- 2. Remove from admitted set\n  redis.call(\"SREM\", K.admitted_set, execution_id)\n\n  -- 3. Decrement concurrency counter (floor at 0)\n  local current = tonumber(redis.call(\"GET\", K.concurrency_key) or \"0\")\n  if current > 0 then\n    redis.call(\"DECR\", K.concurrency_key)\n  end\n\n  return {1, \"OK\", \"released\"}\nend)\n\n\n-- source: lua/flow.lua\n-- FlowFabric flow coordination and dependency functions\n-- Reference: RFC-007 (Flow), RFC-010 \u{a7}4.1 (#22-24, #35), \u{a7}4.2 (#29)\n--\n-- Depends on helpers: ok, err, is_set, hgetall_to_table\n\n---------------------------------------------------------------------------\n-- Cycle detection helper\n---------------------------------------------------------------------------\n\n-- Max nodes to visit during cycle detection BFS.\nlocal MAX_CYCLE_CHECK_NODES = 1000\n\n-- Detect if adding an edge upstream\u{2192}downstream would create a cycle.\n-- BFS from downstream through existing outgoing edges: if upstream is\n-- reachable, the new edge closes a loop (A\u{2192}B\u{2192}C\u{2192}A deadlock).\n-- All keys share the same {fp:N} slot (flow-partition co-location).\n-- @param flow_prefix  e.g. \"ff:flow:{fp:0}:<flow_id>\"\n-- @param start_eid    downstream of proposed edge (BFS start)\n-- @param target_eid   upstream of proposed edge (looking for this)\n-- @return true if a cycle would be created\nlocal function detect_cycle(flow_prefix, start_eid, target_eid)\n  local visited = {}\n  local queue = {start_eid}\n  local count = 0\n\n  while #queue > 0 do\n    local next_queue = {}\n    for _, eid in ipairs(queue) do\n      if eid == target_eid then\n        return true\n      end\n      if not visited[eid] then\n        visited[eid] = true\n        count = count + 1\n        if count > MAX_CYCLE_CHECK_NODES then\n          return true  -- graph too large to verify; reject conservatively\n        end\n        local out_key = flow_prefix .. \":out:\" .. eid\n        local edges = redis.call(\"SMEMBERS\", out_key)\n        for _, edge_id in ipairs(edges) do\n          local edge_key = flow_prefix .. \":edge:\" .. edge_id\n          local next_eid = redis.call(\"HGET\", edge_key, \"downstream_execution_id\")\n          if next_eid and next_eid ~= \"\" and not visited[next_eid] then\n            next_queue[#next_queue + 1] = next_eid\n          end\n        end\n      end\n    end\n    queue = next_queue\n  end\n\n  return false\nend\n\n---------------------------------------------------------------------------\n-- ff_create_flow  (on {fp:N})\n--\n-- Create a new flow container. Idempotent: if flow_core already exists,\n-- returns ok_already_satisfied.\n--\n-- KEYS (3): flow_core, members_set, flow_index\n-- ARGV (4): flow_id, flow_kind, namespace, now_ms (IGNORED \u{2014} see note below)\n--\n-- NOTE: ARGV[4] (`now_ms`) is accepted for caller compatibility but NOT\n-- used for stored timestamps. We read server time via redis.call(\"TIME\")\n-- so created_at / last_mutation_at agree with fields written by other\n-- Lua functions (ff_complete_execution etc.) under client clock skew.\n---------------------------------------------------------------------------\nredis.register_function(\'ff_create_flow\', function(keys, args)\n  local K = {\n    flow_core   = keys[1],\n    members_set = keys[2],\n    flow_index  = keys[3],\n  }\n\n  local A = {\n    flow_id   = args[1],\n    flow_kind = args[2],\n    namespace = args[3],\n    -- args[4] is client-provided now_ms; intentionally ignored.\n  }\n\n  -- Server time (not client-provided) so created_at / last_mutation_at\n  -- agree with timestamps written by ff_complete_execution and peers.\n  local now_ms = server_time_ms()\n\n  -- Maintain flow_index BEFORE the idempotency guard. SADD is itself\n  -- idempotent (no-op on existing members), and hoisting it heals any\n  -- pre-existing flow_core that was created before this index was\n  -- introduced \u{2014} no migration script required.\n  redis.call(\"SADD\", K.flow_index, A.flow_id)\n\n  -- Idempotency: if flow already exists, return already_satisfied\n  if redis.call(\"EXISTS\", K.flow_core) == 1 then\n    return ok_already_satisfied(A.flow_id)\n  end\n\n  -- Create flow core record\n  redis.call(\"HSET\", K.flow_core,\n    \"flow_id\", A.flow_id,\n    \"flow_kind\", A.flow_kind,\n    \"namespace\", A.namespace,\n    \"graph_revision\", 0,\n    \"node_count\", 0,\n    \"edge_count\", 0,\n    \"public_flow_state\", \"open\",\n    \"created_at\", now_ms,\n    \"last_mutation_at\", now_ms)\n\n  return ok(A.flow_id)\nend)\n\n---------------------------------------------------------------------------\n-- ff_add_execution_to_flow  (on {fp:N} \u{2014} single atomic FCALL)\n--\n-- Add a member execution to a flow AND stamp the flow_id back-pointer\n-- on exec_core in one atomic commit. Per RFC-011 \u{a7}7.3, exec keys\n-- co-locate with their parent flow\'s partition under hash-tag routing,\n-- so exec_core shares the `{fp:N}` hash-tag with flow_core / members_set\n-- / flow_index. All four KEYS hash to the same slot; no CROSSSLOT.\n--\n-- KEYS (4): flow_core, members_set, flow_index, exec_core\n-- ARGV (3): flow_id, execution_id, now_ms (IGNORED \u{2014} server time used)\n--\n-- Validates-before-writing: flow_not_found / flow_already_terminal\n-- early-returns fire BEFORE any write (step 1 below). On those error\n-- paths, zero state mutates \u{2014} atomicity by construction at the Lua\n-- level (Valkey scripting contract: no redis.call() before error_reply\n-- means nothing to roll back). See RFC-011 \u{a7}7.3.1 tests for the\n-- structural pin.\n--\n-- Invariant (post-RFC-011): a successful call commits BOTH the flow-\n-- index updates AND the exec_core.flow_id stamp in one atomic unit.\n-- Readers can assume exec_core.flow_id == flow_id iff the exec is in\n-- members_set. The pre-RFC-011 two-phase contract + \u{a7}5.5 orphan-window\n-- + issue #21 reconciliation-scanner plan are all superseded.\n---------------------------------------------------------------------------\nredis.register_function(\'ff_add_execution_to_flow\', function(keys, args)\n  local K = {\n    flow_core   = keys[1],\n    members_set = keys[2],\n    flow_index  = keys[3],\n    exec_core   = keys[4],\n  }\n\n  local A = {\n    flow_id      = args[1],\n    execution_id = args[2],\n    -- args[3] is client-provided now_ms; intentionally ignored in favour\n    -- of redis.call(\"TIME\") to keep last_mutation_at consistent with\n    -- timestamps stamped by ff_complete_execution and peers.\n  }\n\n  local now_ms = server_time_ms()\n\n  -- 1. Validate flow exists and is not terminal, and execution exists.\n  --    Validates-before-writing: no redis.call() writes happen before\n  --    these guards, so the error paths commit zero state (symmetric\n  --    with step 2\'s cross-flow guard on AlreadyMember).\n  local raw = redis.call(\"HGETALL\", K.flow_core)\n  if #raw == 0 then return err(\"flow_not_found\") end\n  local flow = hgetall_to_table(raw)\n  local pfs = flow.public_flow_state or \"\"\n  if pfs == \"cancelled\" or pfs == \"completed\" or pfs == \"failed\" then\n    return err(\"flow_already_terminal\")\n  end\n  -- Execution must exist \u{2014} otherwise step 5\'s HSET exec_core would\n  -- silently create a hash for a non-existent exec, leading to an\n  -- inconsistent members_set \u{2194} exec_core state. Symmetric with the\n  -- flow_not_found guard above.\n  if redis.call(\"EXISTS\", K.exec_core) == 0 then\n    return err(\"execution_not_found\")\n  end\n\n  -- Self-heal flow_index for LIVE flows only. The projector may have\n  -- SREMd this flow after observing an all-terminal sample, yet the\n  -- flow is still \"open\" per flow_core and can accept new members.\n  -- Re-add idempotently so the next projector cycle picks the flow\n  -- back up. Runs only after the terminal-state guard above so we do\n  -- not resurrect cancelled/completed/failed flows into the active\n  -- index. Same {fp:N} slot as the other KEYS, so atomic with the\n  -- membership mutation below.\n  redis.call(\"SADD\", K.flow_index, A.flow_id)\n\n  -- 2. Idempotency: already a member of THIS flow\'s members_set.\n  --    Still stamp exec_core.flow_id defensively \u{2014} an earlier call\n  --    may have committed members_set but crashed before exec_core\n  --    HSET under the legacy two-phase shape. Stamping here is a\n  --    no-op if flow_id already matches; heals any pre-RFC-011\n  --    orphans encountered on a rolling upgrade.\n  --\n  --    Cross-flow guard on the orphan case: if exec_core.flow_id is\n  --    already set to a DIFFERENT flow (corrupted-state orphan that\n  --    IS in this flow\'s members_set but stamped wrong), refuse\n  --    instead of silently re-stamping. Symmetric with step 3\'s\n  --    guard on the not-yet-a-member branch \u{2014} catches the same\n  --    invariant violation earlier in the path. Empty existing\n  --    flow_id goes through the heal path normally.\n  if redis.call(\"SISMEMBER\", K.members_set, A.execution_id) == 1 then\n    local existing = redis.call(\"HGET\", K.exec_core, \"flow_id\")\n    if existing and existing ~= \"\" and existing ~= A.flow_id then\n      return err(\"already_member_of_different_flow:\" .. existing)\n    end\n    redis.call(\"HSET\", K.exec_core, \"flow_id\", A.flow_id)\n    local nc = redis.call(\"HGET\", K.flow_core, \"node_count\") or \"0\"\n    return ok_already_satisfied(A.execution_id, nc)\n  end\n\n  -- 3. Cross-flow guard: if exec_core.flow_id is already set to a\n  --    DIFFERENT flow, refuse \u{2014} silently re-stamping would orphan\n  --    the other flow\'s accounting. An exec belongs to at most one\n  --    flow at a time per RFC-007.\n  local existing_flow_id = redis.call(\"HGET\", K.exec_core, \"flow_id\")\n  if existing_flow_id and existing_flow_id ~= \"\" and existing_flow_id ~= A.flow_id then\n    return err(\"already_member_of_different_flow:\" .. existing_flow_id)\n  end\n\n  -- 4. Add to membership set\n  redis.call(\"SADD\", K.members_set, A.execution_id)\n\n  -- 5. Stamp the flow_id back-pointer on exec_core. Co-located with\n  --    the flow\'s partition under RFC-011 \u{a7}7.3 hash-tag routing; this\n  --    HSET is part of the same atomic FCALL as the SADD above.\n  redis.call(\"HSET\", K.exec_core, \"flow_id\", A.flow_id)\n\n  -- 6. Increment node_count and graph_revision\n  local new_nc = redis.call(\"HINCRBY\", K.flow_core, \"node_count\", 1)\n  local new_rev = redis.call(\"HINCRBY\", K.flow_core, \"graph_revision\", 1)\n  redis.call(\"HSET\", K.flow_core, \"last_mutation_at\", now_ms)\n\n  return ok(A.execution_id, tostring(new_nc))\nend)\n\n---------------------------------------------------------------------------\n-- ff_cancel_flow  (on {fp:N})\n--\n-- Cancel a flow. Returns the member list for the caller to dispatch\n-- individual cancellations cross-partition.\n--\n-- KEYS (3): flow_core, members_set, flow_index (RESERVED \u{2014} see below)\n-- ARGV (4): flow_id, reason, cancellation_policy, now_ms (IGNORED \u{2014}\n--   server time used so `cancelled_at` agrees with peer Lua fields)\n--\n-- KEYS[3] (flow_index) is accepted for caller-compatibility with the\n-- shared FlowStructOpKeys wrapper, but this function does NOT mutate\n-- flow_index. The projector is the sole SREM writer (see the \"4b\" note\n-- in the body below).\n---------------------------------------------------------------------------\nredis.register_function(\'ff_cancel_flow\', function(keys, args)\n  local K = {\n    flow_core   = keys[1],\n    members_set = keys[2],\n    -- keys[3] is flow_index; present in KEYS for wrapper symmetry but\n    -- unused in this function (see rationale near the end of the body).\n  }\n\n  local A = {\n    flow_id              = args[1],\n    reason               = args[2],\n    cancellation_policy  = args[3],\n    -- args[4] is client-provided now_ms; intentionally ignored.\n  }\n\n  local now_ms = server_time_ms()\n\n  -- 1. Validate flow exists\n  local raw = redis.call(\"HGETALL\", K.flow_core)\n  if #raw == 0 then return err(\"flow_not_found\") end\n  local flow = hgetall_to_table(raw)\n\n  -- 2. Check not already terminal\n  local pfs = flow.public_flow_state or \"\"\n  if pfs == \"cancelled\" or pfs == \"completed\" or pfs == \"failed\" then\n    return err(\"flow_already_terminal\")\n  end\n\n  -- 3. Get all member execution IDs\n  local members = redis.call(\"SMEMBERS\", K.members_set)\n\n  -- 4. Update flow state\n  -- cancellation_policy is persisted so an AlreadyTerminal retry can\n  -- return the authoritative stored policy instead of echoing the\n  -- caller\'s retry intent.\n  --\n  -- NOTE: this field is persisted from this library version onward.\n  -- Flows cancelled before this deploy reach public_flow_state=\'cancelled\'\n  -- without a cancellation_policy value. The Rust caller detects the\n  -- empty field on HMGET and falls back to args.cancellation_policy, so\n  -- no backfill migration is needed.\n  redis.call(\"HSET\", K.flow_core,\n    \"public_flow_state\", \"cancelled\",\n    \"cancelled_at\", now_ms,\n    \"cancel_reason\", A.reason,\n    \"cancellation_policy\", A.cancellation_policy,\n    \"last_mutation_at\", now_ms)\n\n  -- Do NOT SREM flow_index here. Member cancellations dispatch\n  -- asynchronously from ff-server; flow_projector needs to keep\n  -- projecting the flow while those cancels land so the summary\n  -- reflects the real progression (running/blocked \u{2192} cancelled). The\n  -- projector owns the SREM once it observes sampled==true_total\n  -- all-terminal (see crates/ff-engine/src/scanner/flow_projector.rs).\n  -- A projector-owned SREM is also the right place because it is\n  -- the only writer that can prove every member has actually reached\n  -- terminal state. Removing the entry here would freeze the summary\n  -- at whatever snapshot was current when cancel_flow fired.\n\n  -- 5. Return: ok(cancellation_policy, member1, member2, ...)\n  -- Build array manually to include variable member list.\n  local result = {1, \"OK\", A.cancellation_policy}\n  for _, eid in ipairs(members) do\n    result[#result + 1] = eid\n  end\n\n  return result\nend)\n\n---------------------------------------------------------------------------\n-- #29  ff_stage_dependency_edge  (on {fp:N})\n--\n-- Validate membership + topology, check graph_revision, create edge,\n-- increment graph_revision.\n--\n-- KEYS (6): flow_core, members_set, edge_hash, out_adj_set, in_adj_set,\n--           grant_hash\n-- ARGV (8): flow_id, edge_id, upstream_eid, downstream_eid,\n--           dependency_kind, data_passing_ref, expected_graph_revision,\n--           now_ms\n---------------------------------------------------------------------------\nredis.register_function(\'ff_stage_dependency_edge\', function(keys, args)\n  local K = {\n    flow_core    = keys[1],\n    members_set  = keys[2],\n    edge_hash    = keys[3],\n    out_adj_set  = keys[4],\n    in_adj_set   = keys[5],\n    grant_hash   = keys[6],\n  }\n\n  local A = {\n    flow_id                  = args[1],\n    edge_id                  = args[2],\n    upstream_eid             = args[3],\n    downstream_eid           = args[4],\n    dependency_kind          = args[5] or \"success_only\",\n    data_passing_ref         = args[6] or \"\",\n    expected_graph_revision  = args[7],\n    now_ms                   = args[8],\n  }\n\n  -- 1. Reject self-referencing edges\n  if A.upstream_eid == A.downstream_eid then\n    return err(\"self_referencing_edge\")\n  end\n\n  -- 2. Read flow core\n  local raw = redis.call(\"HGETALL\", K.flow_core)\n  if #raw == 0 then return err(\"flow_not_found\") end\n  local flow = hgetall_to_table(raw)\n\n  -- 2b. Reject mutations on terminal flows\n  local pfs = flow.public_flow_state or \"\"\n  if pfs == \"cancelled\" or pfs == \"completed\" or pfs == \"failed\" then\n    return err(\"flow_already_terminal\")\n  end\n\n  -- 3. Check graph_revision\n  if tostring(flow.graph_revision or \"0\") ~= A.expected_graph_revision then\n    return err(\"stale_graph_revision\")\n  end\n\n  -- 4. Verify both executions are members\n  if redis.call(\"SISMEMBER\", K.members_set, A.upstream_eid) == 0 then\n    return err(\"execution_not_in_flow\")\n  end\n  if redis.call(\"SISMEMBER\", K.members_set, A.downstream_eid) == 0 then\n    return err(\"execution_not_in_flow\")\n  end\n\n  -- 4b. Transitive cycle detection: walk from downstream through outgoing\n  -- edges to check if upstream is reachable (A\u{2192}B\u{2192}C\u{2192}A deadlock prevention).\n  local flow_prefix = string.sub(K.flow_core, 1, -6)  -- strip \":core\"\n  if detect_cycle(flow_prefix, A.downstream_eid, A.upstream_eid) then\n    return err(\"cycle_detected\")\n  end\n\n  -- 5. Check edge doesn\'t already exist\n  if redis.call(\"EXISTS\", K.edge_hash) == 1 then\n    return err(\"dependency_already_exists\")\n  end\n\n  -- 6. Create edge record\n  redis.call(\"HSET\", K.edge_hash,\n    \"edge_id\", A.edge_id,\n    \"flow_id\", A.flow_id,\n    \"upstream_execution_id\", A.upstream_eid,\n    \"downstream_execution_id\", A.downstream_eid,\n    \"dependency_kind\", A.dependency_kind,\n    \"satisfaction_condition\", \"all_required\",\n    \"data_passing_ref\", A.data_passing_ref,\n    \"edge_state\", \"pending\",\n    \"created_at\", A.now_ms,\n    \"created_by\", \"engine\")\n\n  -- 7. Update adjacency sets\n  redis.call(\"SADD\", K.out_adj_set, A.edge_id)\n  redis.call(\"SADD\", K.in_adj_set, A.edge_id)\n\n  -- 8. Increment graph_revision and edge_count\n  local new_rev = redis.call(\"HINCRBY\", K.flow_core, \"graph_revision\", 1)\n  redis.call(\"HINCRBY\", K.flow_core, \"edge_count\", 1)\n  redis.call(\"HSET\", K.flow_core, \"last_mutation_at\", A.now_ms)\n\n  return ok(A.edge_id, tostring(new_rev))\nend)\n\n---------------------------------------------------------------------------\n-- #22  ff_apply_dependency_to_child  (on {p:N})\n--\n-- Create dep record on child execution partition, increment unsatisfied\n-- count. If child is runnable: set blocked_by_dependencies.\n--\n-- KEYS (7): exec_core, deps_meta, unresolved_set, dep_hash,\n--           eligible_zset, blocked_deps_zset, deps_all_edges\n-- ARGV (7): flow_id, edge_id, upstream_eid, graph_revision,\n--           dependency_kind, data_passing_ref, now_ms\n---------------------------------------------------------------------------\nredis.register_function(\'ff_apply_dependency_to_child\', function(keys, args)\n  local K = {\n    core_key         = keys[1],\n    deps_meta        = keys[2],\n    unresolved_set   = keys[3],\n    dep_hash         = keys[4],\n    eligible_zset    = keys[5],\n    blocked_deps_zset = keys[6],\n    deps_all_edges   = keys[7],\n  }\n\n  local A = {\n    flow_id          = args[1],\n    edge_id          = args[2],\n    upstream_eid     = args[3],\n    graph_revision   = args[4],\n    dependency_kind  = args[5] or \"success_only\",\n    data_passing_ref = args[6] or \"\",\n    now_ms           = args[7],\n  }\n\n  -- 1. Read execution core\n  local raw = redis.call(\"HGETALL\", K.core_key)\n  if #raw == 0 then return err(\"execution_not_found\") end\n  local core = hgetall_to_table(raw)\n\n  -- 2. Validate flow membership (RFC-007 assert_flow_membership)\n  if is_set(core.flow_id) and core.flow_id ~= A.flow_id then\n    return err(\"execution_already_in_flow\")\n  end\n\n  -- 3. Idempotency: dep already applied\n  if redis.call(\"EXISTS\", K.dep_hash) == 1 then\n    return ok(\"already_applied\")\n  end\n\n  -- 4. Create dep record\n  redis.call(\"HSET\", K.dep_hash,\n    \"edge_id\", A.edge_id,\n    \"flow_id\", A.flow_id,\n    \"upstream_execution_id\", A.upstream_eid,\n    \"downstream_execution_id\", core.execution_id or \"\",\n    \"dependency_kind\", A.dependency_kind,\n    \"state\", \"unsatisfied\",\n    \"data_passing_ref\", A.data_passing_ref,\n    \"last_resolved_at\", \"\")\n\n  -- 5. Update deps:meta\n  redis.call(\"SADD\", K.unresolved_set, A.edge_id)\n  -- Register edge in the per-execution all-edges index (cluster-safe\n  -- retention discovery; retained across resolve, purged wholesale on\n  -- retention trim).\n  redis.call(\"SADD\", K.deps_all_edges, A.edge_id)\n  local unresolved = redis.call(\"HINCRBY\", K.deps_meta, \"unsatisfied_required_count\", 1)\n  redis.call(\"HSET\", K.deps_meta,\n    \"flow_id\", A.flow_id,\n    \"last_flow_graph_revision\", A.graph_revision,\n    \"last_dependency_update_at\", A.now_ms)\n\n  -- 6. If runnable: block by dependencies (ALL 7 dims)\n  if core.lifecycle_phase == \"runnable\" and core.terminal_outcome == \"none\" then\n    redis.call(\"HSET\", K.core_key,\n      \"lifecycle_phase\", core.lifecycle_phase,             -- preserve\n      \"ownership_state\", core.ownership_state or \"unowned\", -- preserve\n      \"eligibility_state\", \"blocked_by_dependencies\",\n      \"blocking_reason\", \"waiting_for_children\",\n      \"blocking_detail\", unresolved .. \" dep(s) unresolved incl \" .. A.edge_id,\n      \"terminal_outcome\", \"none\",                          -- preserve\n      \"attempt_state\", core.attempt_state or \"pending_first_attempt\", -- preserve\n      \"public_state\", \"waiting_children\",\n      \"last_transition_at\", A.now_ms,\n      \"last_mutation_at\", A.now_ms)\n    redis.call(\"ZREM\", K.eligible_zset, core.execution_id or \"\")\n    redis.call(\"ZADD\", K.blocked_deps_zset,\n      tonumber(core.created_at or \"0\"), core.execution_id or \"\")\n  end\n\n  return ok(tostring(unresolved))\nend)\n\n---------------------------------------------------------------------------\n-- #23  ff_resolve_dependency  (on {p:N})\n--\n-- Resolve one dependency edge: satisfied (upstream success) or impossible\n-- (upstream failed/cancelled/expired). Updates child eligibility.\n--\n-- On satisfaction, if the edge was staged with a non-empty\n-- `data_passing_ref`, atomically COPYs the upstream\'s result key into\n-- the downstream\'s input_payload key before flipping the child to\n-- eligible. Upstream + downstream are guaranteed co-located on the\n-- same {fp:N} slot by flow membership (RFC-011 \u{a7}7.3).\n--\n-- KEYS (11): exec_core, deps_meta, unresolved_set, dep_hash,\n--            eligible_zset, terminal_zset, blocked_deps_zset,\n--            attempt_hash, stream_meta, downstream_payload,\n--            upstream_result\n-- ARGV (3): edge_id, upstream_outcome, now_ms\n---------------------------------------------------------------------------\nredis.register_function(\'ff_resolve_dependency\', function(keys, args)\n  local K = {\n    core_key          = keys[1],\n    deps_meta         = keys[2],\n    unresolved_set    = keys[3],\n    dep_hash          = keys[4],\n    eligible_zset     = keys[5],\n    terminal_zset     = keys[6],\n    blocked_deps_zset = keys[7],\n    attempt_hash      = keys[8],\n    stream_meta       = keys[9],\n    downstream_payload = keys[10],\n    upstream_result    = keys[11],\n  }\n\n  local A = {\n    edge_id           = args[1],\n    upstream_outcome  = args[2],\n    now_ms            = args[3],\n  }\n\n  -- 1. Read dep record\n  local dep_raw = redis.call(\"HGETALL\", K.dep_hash)\n  if #dep_raw == 0 then return err(\"invalid_dependency\") end\n  local dep = hgetall_to_table(dep_raw)\n\n  -- 2. Already resolved?\n  if dep.state == \"satisfied\" or dep.state == \"impossible\" then\n    return ok(\"already_resolved\")\n  end\n\n  -- 3. Satisfaction path (upstream completed successfully)\n  if A.upstream_outcome == \"success\" then\n    redis.call(\"HSET\", K.dep_hash,\n      \"state\", \"satisfied\", \"last_resolved_at\", A.now_ms)\n    redis.call(\"SREM\", K.unresolved_set, A.edge_id)\n    local remaining = redis.call(\"HINCRBY\", K.deps_meta,\n      \"unsatisfied_required_count\", -1)\n    redis.call(\"HSET\", K.deps_meta, \"last_dependency_update_at\", A.now_ms)\n\n    -- Check if all deps now satisfied\n    local raw = redis.call(\"HGETALL\", K.core_key)\n    if #raw == 0 then return ok(\"satisfied\", \"\") end\n    local core = hgetall_to_table(raw)\n\n    -- Server-side data_passing_ref resolution (Batch C item 3). When\n    -- the edge was staged with a non-empty `data_passing_ref`, replace\n    -- the downstream\'s input_payload with the upstream\'s result. COPY\n    -- is a single-slot server-internal op (no round-trip to Lua\n    -- memory) so large result payloads don\'t inflate the FCALL\'s\n    -- working set.\n    --\n    -- Terminal-child guard: a late satisfaction can race with the\n    -- child being cancelled or skipped. Don\'t overwrite the payload\n    -- of a child that has already reached a terminal state \u{2014} it\'s at\n    -- best pointless (the worker will never read it) and at worst\n    -- noisy for post-mortem debugging.\n    --\n    -- Write-ordering note (RFC-010 \u{a7}4.8b): COPY runs BEFORE the\n    -- eligibility transition below so a crash between the two leaves\n    -- the child blocked (or late-satisfied on reconciler retry) with\n    -- the correct payload rather than eligible with a stale one.\n    --\n    -- Void-completion path: if the upstream called complete(None), the\n    -- result key does not exist \u{2014} COPY returns 0 and data_injected\n    -- stays empty, leaving the child\'s original input_payload intact.\n    local data_injected = \"\"\n    if is_set(dep.data_passing_ref)\n       and core.terminal_outcome == \"none\" then\n      local copied = redis.call(\n        \"COPY\", K.upstream_result, K.downstream_payload, \"REPLACE\")\n      if copied == 1 then\n        data_injected = \"data_injected\"\n      end\n    end\n\n    if remaining == 0\n       and core.lifecycle_phase == \"runnable\"\n       and core.ownership_state == \"unowned\"\n       and core.terminal_outcome == \"none\"\n       and core.eligibility_state == \"blocked_by_dependencies\" then\n      -- Preserve attempt_state\n      local new_attempt_state = core.attempt_state\n      if not is_set(new_attempt_state) or new_attempt_state == \"none\" then\n        new_attempt_state = \"pending_first_attempt\"\n      end\n      -- ALL 7 dims\n      redis.call(\"HSET\", K.core_key,\n        \"lifecycle_phase\", core.lifecycle_phase,             -- preserve (runnable)\n        \"ownership_state\", core.ownership_state or \"unowned\", -- preserve\n        \"eligibility_state\", \"eligible_now\",\n        \"blocking_reason\", \"waiting_for_worker\",\n        \"blocking_detail\", \"\",\n        \"terminal_outcome\", \"none\",                          -- preserve\n        \"attempt_state\", new_attempt_state,\n        \"public_state\", \"waiting\",\n        \"last_transition_at\", A.now_ms,\n        \"last_mutation_at\", A.now_ms)\n      redis.call(\"ZREM\", K.blocked_deps_zset, core.execution_id or \"\")\n      local priority = tonumber(core.priority or \"0\")\n      local created_at_ms = tonumber(core.created_at or \"0\")\n      local score = 0 - (priority * 1000000000000) + created_at_ms\n      redis.call(\"ZADD\", K.eligible_zset, score, core.execution_id or \"\")\n    end\n\n    return ok(\"satisfied\", data_injected)\n  end\n\n  -- 4. Impossible path (upstream failed/cancelled/expired/skipped)\n  redis.call(\"HSET\", K.dep_hash,\n    \"state\", \"impossible\", \"last_resolved_at\", A.now_ms)\n  redis.call(\"SREM\", K.unresolved_set, A.edge_id)\n  redis.call(\"HINCRBY\", K.deps_meta, \"unsatisfied_required_count\", -1)\n  redis.call(\"HINCRBY\", K.deps_meta, \"impossible_required_count\", 1)\n  redis.call(\"HSET\", K.deps_meta, \"last_dependency_update_at\", A.now_ms)\n\n  local raw = redis.call(\"HGETALL\", K.core_key)\n  if #raw == 0 then return ok(\"impossible\", \"\") end\n  local core = hgetall_to_table(raw)\n\n  local child_skipped = false\n\n  if core.terminal_outcome == \"none\" then\n    -- Determine attempt_state for skip\n    local skip_attempt_state = core.attempt_state or \"none\"\n    if skip_attempt_state == \"running_attempt\"\n       or skip_attempt_state == \"attempt_interrupted\" then\n      -- NOTE: If the child is active (worker holding lease), this FCALL runs\n      -- on {p:N} (exec partition) so we CAN write exec_core and attempt_hash.\n      -- However, lease_current, lease_expiry_zset, worker_leases, and\n      -- active_index also live on {p:N} \u{2014} but the KEYS array for this\n      -- function does not include them (only 9 KEYS).  Cleaning them here\n      -- would require adding more KEYS slots and pre-reading worker_instance_id\n      -- to construct the worker_leases key.  Instead, lease cleanup is\n      -- delegated to the lease_expiry scanner (1.5s default interval):\n      --   1. lease_expiry_scanner sees expired lease \u{2192} ff_mark_lease_expired_if_due\n      --   2. Worker\'s renewal sees terminal \u{2192} stops with terminal error\n      --   3. ff_expire_execution (attempt_timeout/deadline scanner) does full cleanup\n      -- Race window: between this skip and scanner cleanup, exec_core is\n      -- terminal(skipped) but stale entries remain in active/lease indexes.\n      -- Bounded by lease_expiry_interval (default 1.5s).  Index reconciler\n      -- detects and logs any residual inconsistency at 45s intervals.\n      skip_attempt_state = \"attempt_terminal\"\n      -- End real attempt + close stream\n      redis.call(\"HSET\", K.attempt_hash,\n        \"attempt_state\", \"ended_cancelled\",\n        \"ended_at\", A.now_ms,\n        \"failure_reason\", \"dependency_impossible\")\n      if redis.call(\"EXISTS\", K.stream_meta) == 1 then\n        redis.call(\"HSET\", K.stream_meta,\n          \"closed_at\", A.now_ms,\n          \"closed_reason\", \"dependency_impossible\")\n      end\n    elseif is_set(skip_attempt_state) and skip_attempt_state ~= \"none\" then\n      skip_attempt_state = \"none\"\n    end\n\n    redis.call(\"HSET\", K.core_key,\n      \"lifecycle_phase\", \"terminal\",\n      \"ownership_state\", \"unowned\",\n      \"eligibility_state\", \"not_applicable\",\n      \"blocking_reason\", \"none\",\n      \"blocking_detail\", \"\",\n      \"terminal_outcome\", \"skipped\",\n      \"attempt_state\", skip_attempt_state,\n      \"public_state\", \"skipped\",\n      \"completed_at\", A.now_ms,\n      \"last_transition_at\", A.now_ms,\n      \"last_mutation_at\", A.now_ms)\n    redis.call(\"ZREM\", K.blocked_deps_zset, core.execution_id or \"\")\n    redis.call(\"ZADD\", K.terminal_zset, tonumber(A.now_ms), core.execution_id or \"\")\n    child_skipped = true\n  end\n\n  return ok(\"impossible\", child_skipped and \"child_skipped\" or \"\")\nend)\n\n---------------------------------------------------------------------------\n-- #24  ff_evaluate_flow_eligibility  (on {p:N})\n--\n-- Read-only check of execution + dependency state. Class C.\n--\n-- KEYS (2): exec_core, deps_meta\n-- ARGV (0)\n---------------------------------------------------------------------------\nredis.register_function(\'ff_evaluate_flow_eligibility\', function(keys, args)\n  local raw = redis.call(\"HGETALL\", keys[1])\n  if #raw == 0 then return ok(\"not_found\") end\n  local core = hgetall_to_table(raw)\n\n  if core.lifecycle_phase ~= \"runnable\" then\n    return ok(\"not_runnable\")\n  end\n  if core.ownership_state ~= \"unowned\" then\n    return ok(\"owned\")\n  end\n  if core.terminal_outcome ~= \"none\" then\n    return ok(\"terminal\")\n  end\n\n  local deps_raw = redis.call(\"HGETALL\", keys[2])\n  if #deps_raw == 0 then\n    return ok(\"eligible\")\n  end\n  local deps = hgetall_to_table(deps_raw)\n\n  local impossible = tonumber(deps.impossible_required_count or \"0\")\n  if impossible > 0 then\n    return ok(\"impossible\")\n  end\n\n  local unresolved = tonumber(deps.unsatisfied_required_count or \"0\")\n  if unresolved > 0 then\n    return ok(\"blocked_by_dependencies\")\n  end\n\n  return ok(\"eligible\")\nend)\n\n---------------------------------------------------------------------------\n-- #35  ff_promote_blocked_to_eligible  (on {p:N})\n--\n-- Promote zero-dep flow member from blocked:dependencies to eligible.\n--\n-- KEYS (5): exec_core, blocked_deps_zset, eligible_zset, deps_meta,\n--           deps_unresolved\n-- ARGV (2): execution_id, now_ms\n---------------------------------------------------------------------------\nredis.register_function(\'ff_promote_blocked_to_eligible\', function(keys, args)\n  local K = {\n    core_key          = keys[1],\n    blocked_deps_zset = keys[2],\n    eligible_zset     = keys[3],\n    deps_meta         = keys[4],\n    deps_unresolved   = keys[5],\n  }\n\n  local A = {\n    execution_id = args[1],\n    now_ms       = args[2],\n  }\n\n  -- 1. Read execution core\n  local raw = redis.call(\"HGETALL\", K.core_key)\n  if #raw == 0 then return err(\"execution_not_found\") end\n  local core = hgetall_to_table(raw)\n\n  if core.lifecycle_phase ~= \"runnable\" then\n    return err(\"not_runnable\")\n  end\n  if core.eligibility_state ~= \"blocked_by_dependencies\" then\n    return err(\"not_blocked_by_deps\")\n  end\n  if core.terminal_outcome ~= \"none\" then\n    return err(\"terminal\")\n  end\n\n  -- 2. Verify zero deps\n  local unsatisfied = tonumber(\n    redis.call(\"HGET\", K.deps_meta, \"unsatisfied_required_count\") or \"0\")\n  local unresolved_count = redis.call(\"SCARD\", K.deps_unresolved)\n  if unsatisfied > 0 or unresolved_count > 0 then\n    return err(\"deps_not_satisfied\", tostring(unsatisfied), tostring(unresolved_count))\n  end\n\n  -- 3. Preserve attempt_state\n  local new_attempt_state = core.attempt_state\n  if not is_set(new_attempt_state) or new_attempt_state == \"none\" then\n    new_attempt_state = \"pending_first_attempt\"\n  end\n\n  -- 4. Transition (ALL 7 dims)\n  redis.call(\"HSET\", K.core_key,\n    \"lifecycle_phase\", core.lifecycle_phase,             -- preserve (runnable)\n    \"ownership_state\", core.ownership_state or \"unowned\", -- preserve\n    \"eligibility_state\", \"eligible_now\",\n    \"blocking_reason\", \"waiting_for_worker\",\n    \"blocking_detail\", \"\",\n    \"terminal_outcome\", \"none\",                          -- preserve\n    \"attempt_state\", new_attempt_state,\n    \"public_state\", \"waiting\",\n    \"last_transition_at\", A.now_ms,\n    \"last_mutation_at\", A.now_ms)\n\n  redis.call(\"ZREM\", K.blocked_deps_zset, A.execution_id)\n  local priority = tonumber(core.priority or \"0\")\n  local created_at_ms = tonumber(core.created_at or \"0\")\n  local score = 0 - (priority * 1000000000000) + created_at_ms\n  redis.call(\"ZADD\", K.eligible_zset, score, A.execution_id)\n\n  return ok()\nend)\n\n---------------------------------------------------------------------------\n-- #12b  ff_replay_execution  (on {p:N})\n--\n-- Reset a terminal execution for replay. If skipped flow member: reset\n-- impossible deps back to unsatisfied, recompute counts, set\n-- blocked_by_dependencies instead of eligible_now.\n--\n-- KEYS (4+N): exec_core, terminal_zset, eligible_zset, lease_history,\n--             [blocked_deps_zset, deps_meta, deps_unresolved, dep_edge_0..N]\n-- ARGV (2+N): execution_id, now_ms, [edge_id_0..N]\n---------------------------------------------------------------------------\nredis.register_function(\'ff_replay_execution\', function(keys, args)\n  local K = {\n    core_key       = keys[1],\n    terminal_zset  = keys[2],\n    eligible_zset  = keys[3],\n    lease_history  = keys[4],\n  }\n\n  local A = {\n    execution_id = args[1],\n  }\n\n  local t = redis.call(\"TIME\")\n  local now_ms = tonumber(t[1]) * 1000 + math.floor(tonumber(t[2]) / 1000)\n\n  -- 1. Read execution core\n  local raw = redis.call(\"HGETALL\", K.core_key)\n  if #raw == 0 then return err(\"execution_not_found\") end\n  local core = hgetall_to_table(raw)\n\n  -- 2. Must be terminal\n  if core.lifecycle_phase ~= \"terminal\" then\n    return err(\"execution_not_terminal\")\n  end\n\n  -- 3. Check replay limit (read from policy, same pattern as ff_reclaim_execution)\n  local replay_count = tonumber(core.replay_count or \"0\")\n  local max_replays = 10  -- default\n  local policy_key = string.gsub(K.core_key, \":core$\", \":policy\")\n  local policy_raw = redis.call(\"GET\", policy_key)\n  if policy_raw then\n    local ok_p, pol = pcall(cjson.decode, policy_raw)\n    if ok_p and type(pol) == \"table\" then\n      max_replays = tonumber(pol.max_replay_count) or 10\n    end\n  end\n  if replay_count >= max_replays then\n    return err(\"max_replays_exhausted\")\n  end\n\n  -- 4. Determine replay path\n  local is_skipped_flow_member = (core.terminal_outcome == \"skipped\") and is_set(core.flow_id)\n\n  if is_skipped_flow_member then\n    -- SKIPPED FLOW MEMBER PATH: reset impossible deps \u{2192} blocked on deps\n    local blocked_deps_zset = keys[5]\n    local deps_meta         = keys[6]\n    local deps_unresolved   = keys[7]\n\n    -- Reset impossible dep edges back to unsatisfied\n    local num_edges = #args - 2\n    local new_unsatisfied = 0\n    for i = 1, num_edges do\n      local edge_id = args[2 + i]\n      local dep_key = keys[7 + i]  -- dep_edge keys start at KEYS[8]\n\n      local dep_state = redis.call(\"HGET\", dep_key, \"state\")\n      if dep_state == \"impossible\" then\n        redis.call(\"HSET\", dep_key,\n          \"state\", \"unsatisfied\",\n          \"last_resolved_at\", \"\")\n        redis.call(\"SADD\", deps_unresolved, edge_id)\n        new_unsatisfied = new_unsatisfied + 1\n      elseif dep_state == \"unsatisfied\" then\n        new_unsatisfied = new_unsatisfied + 1\n      end\n      -- satisfied edges remain satisfied (upstream already succeeded)\n    end\n\n    -- Recompute deps:meta counts\n    redis.call(\"HSET\", deps_meta,\n      \"unsatisfied_required_count\", tostring(new_unsatisfied),\n      \"impossible_required_count\", \"0\",\n      \"last_dependency_update_at\", tostring(now_ms))\n\n    -- Transition: terminal \u{2192} runnable/blocked_by_dependencies\n    redis.call(\"HSET\", K.core_key,\n      \"lifecycle_phase\", \"runnable\",\n      \"ownership_state\", \"unowned\",\n      \"eligibility_state\", \"blocked_by_dependencies\",\n      \"blocking_reason\", \"waiting_for_children\",\n      \"blocking_detail\", tostring(new_unsatisfied) .. \" dep(s) unsatisfied after replay\",\n      \"terminal_outcome\", \"none\",\n      \"attempt_state\", \"pending_replay_attempt\",\n      \"public_state\", \"waiting_children\",\n      \"pending_replay_attempt\", \"1\",\n      \"replay_count\", tostring(replay_count + 1),\n      \"completed_at\", \"\",\n      \"last_transition_at\", tostring(now_ms),\n      \"last_mutation_at\", tostring(now_ms))\n\n    -- Move from terminal \u{2192} blocked:deps\n    redis.call(\"ZREM\", K.terminal_zset, A.execution_id)\n    redis.call(\"ZADD\", blocked_deps_zset,\n      tonumber(core.created_at or \"0\"), A.execution_id)\n\n    -- Lease history\n    redis.call(\"XADD\", K.lease_history, \"MAXLEN\", \"~\", 1000, \"*\",\n      \"event\", \"replay_initiated\",\n      \"replay_count\", tostring(replay_count + 1),\n      \"replay_type\", \"skipped_flow_member\",\n      \"ts\", tostring(now_ms))\n\n    return ok(tostring(new_unsatisfied))\n  else\n    -- NORMAL REPLAY PATH: terminal \u{2192} runnable/eligible\n    local priority = tonumber(core.priority or \"0\")\n    local created_at = tonumber(core.created_at or \"0\")\n    local score = 0 - (priority * 1000000000000) + created_at\n\n    redis.call(\"HSET\", K.core_key,\n      \"lifecycle_phase\", \"runnable\",\n      \"ownership_state\", \"unowned\",\n      \"eligibility_state\", \"eligible_now\",\n      \"blocking_reason\", \"waiting_for_worker\",\n      \"blocking_detail\", \"\",\n      \"terminal_outcome\", \"none\",\n      \"attempt_state\", \"pending_replay_attempt\",\n      \"public_state\", \"waiting\",\n      \"pending_replay_attempt\", \"1\",\n      \"replay_count\", tostring(replay_count + 1),\n      \"completed_at\", \"\",\n      \"last_transition_at\", tostring(now_ms),\n      \"last_mutation_at\", tostring(now_ms))\n\n    -- Move from terminal \u{2192} eligible\n    redis.call(\"ZREM\", K.terminal_zset, A.execution_id)\n    redis.call(\"ZADD\", K.eligible_zset, score, A.execution_id)\n\n    -- Lease history\n    redis.call(\"XADD\", K.lease_history, \"MAXLEN\", \"~\", 1000, \"*\",\n      \"event\", \"replay_initiated\",\n      \"replay_count\", tostring(replay_count + 1),\n      \"replay_type\", \"normal\",\n      \"ts\", tostring(now_ms))\n\n    return ok(\"0\")\n  end\nend)\n\n";
Expand description

The compiled FlowFabric Lua library source.

Generated from lua/*.lua by scripts/gen-ff-script-lua.sh and checked into the crate as flowfabric.lua so it ships inside the published tarball. CI (matrix.yml) fails if this file drifts from what the script would produce.