pub const LIBRARY_SOURCE: &str = "#!lua name=flowfabric\n\n-- source: lua/helpers.lua\n-- FlowFabric shared library-local helpers\n-- These are local functions available to all registered functions in the\n-- flowfabric library. They are NOT independently FCALL-able.\n-- Reference: RFC-010 \u{a7}4.8, RFC-004 \u{a7}Waitpoint Security (HMAC tokens)\n\n---------------------------------------------------------------------------\n-- Capability CSV bounds (RFC-009 \u{a7}7.5)\n---------------------------------------------------------------------------\n-- Shared ceiling for BOTH the worker-side CSV (ff_issue_claim_grant ARGV[9])\n-- AND the execution-side CSV (exec_core.required_capabilities). Defense in\n-- depth against runaway field sizes: a 10k-token list turns into a multi-MB\n-- HSET value and a per-candidate O(N) atomic scan that blocks the shard.\n--\n-- Inclusivity: these are MAXIMUM accepted values. `#csv == CAPS_MAX_BYTES`\n-- and `n == CAPS_MAX_TOKENS` are accepted; one more rejects. Rust-side\n-- ingress (ff-sdk::FlowFabricWorker::connect, ff-scheduler::Scheduler::\n-- claim_for_worker, ff-core::policy::RoutingRequirements deserialization\n-- via lua/execution.lua) enforces the same ceilings so the Lua check is a\n-- defense-in-depth backstop, not the primary validator.\nlocal CAPS_MAX_BYTES = 4096\nlocal CAPS_MAX_TOKENS = 256\n\n---------------------------------------------------------------------------\n-- Hex / binary helpers (for HMAC-SHA1 token derivation)\n---------------------------------------------------------------------------\n\n-- Convert a hex string to a binary (byte) string. Accepts mixed case.\n-- Returns nil on ANY malformed input: non-string, odd length, OR any\n-- non-hex char (including whitespace, unicode, control chars). Callers\n-- treat nil as invalid_secret.\n--\n-- Rust side (ServerConfig) already validates the env secret as even-length\n-- 0-9a-fA-F, but an operator writing directly to Valkey (or a torn write\n-- during rotation) could bypass that validator. We refuse the conversion\n-- here instead of silently coercing bad pairs to 0 bytes (which would\n-- produce a bogus but valid-looking MAC).\nlocal function hex_to_bytes(hex)\n if type(hex) ~= \"string\" or #hex % 2 ~= 0 then\n return nil\n end\n local out = {}\n for i = 1, #hex - 1, 2 do\n local byte = tonumber(hex:sub(i, i + 1), 16)\n if not byte then\n return nil\n end\n out[#out + 1] = string.char(byte)\n end\n return table.concat(out)\nend\n\n-- XOR two equal-length byte strings. Used for HMAC key-pad construction.\nlocal function xor_bytes(a, b)\n local out = {}\n for i = 1, #a do\n out[i] = string.char(bit.bxor(a:byte(i), b:byte(i)))\n end\n return table.concat(out)\nend\n\n-- HMAC-SHA1(key_hex, message) \u{2192} lowercase hex digest (40 chars), or nil on\n-- malformed key_hex (odd-length / non-string). Callers must treat nil as\n-- an invalid-secret error \u{2014} never pass it to HSET / concat / return.\n-- Reference: RFC 2104. SHA1 block size = 64 bytes.\nlocal function hmac_sha1_hex(key_hex, message)\n local key = hex_to_bytes(key_hex)\n if not key then\n return nil\n end\n local block_size = 64\n if #key > block_size then\n -- Reduce oversized key via SHA1 (per RFC 2104). sha1hex output is 40\n -- lowercase hex chars, so the inner hex_to_bytes cannot fail.\n key = hex_to_bytes(redis.sha1hex(key))\n end\n if #key < block_size then\n key = key .. string.rep(\"\\0\", block_size - #key)\n end\n local ipad = string.rep(string.char(0x36), block_size)\n local opad = string.rep(string.char(0x5c), block_size)\n local inner = redis.sha1hex(xor_bytes(key, ipad) .. message)\n return redis.sha1hex(xor_bytes(key, opad) .. hex_to_bytes(inner))\nend\n\n-- Constant-time string equality. Returns true iff strings are equal in\n-- both length and content. Uses XOR-accumulation to avoid early-exit\n-- timing leaks on byte mismatches during HMAC token validation.\n-- Reference: Remote timing attacks on authentication (e.g., CVE-2011-3389 class).\n--\n-- Safety note on the length check: a length-mismatch early return reveals\n-- whether the presented string matches the expected length, which is a\n-- timing side channel IF attacker-controlled length is used to probe the\n-- expected length. In this codebase the caller normalizes to a fixed shape\n-- BEFORE reaching here \u{2014} validate_waitpoint_token already requires\n-- #presented == 40 (SHA1 hex digest length) at the parsing boundary, so\n-- any input reaching constant_time_eq has a length already known to be 40\n-- by the attacker. The only length variation here is on `expected`, which\n-- is server-computed and constant. Hence this early return does not leak\n-- secret-dependent timing.\nlocal function constant_time_eq(a, b)\n if type(a) ~= \"string\" or type(b) ~= \"string\" then\n return false\n end\n if #a ~= #b then\n return false\n end\n local acc = 0\n for i = 1, #a do\n acc = bit.bor(acc, bit.bxor(a:byte(i), b:byte(i)))\n end\n return acc == 0\nend\n\n---------------------------------------------------------------------------\n-- Waitpoint HMAC tokens (RFC-004 \u{a7}Waitpoint Security)\n---------------------------------------------------------------------------\n--\n-- Token format: \"kid:40hex\" \u{2014} kid identifies which key signed the token,\n-- enabling zero-downtime rotation. ANY kid present in the secrets hash\n-- with a future `expires_at:<kid>` (or the current kid, which has no\n-- expiry) accepts tokens. This supports rapid rotation: rotating A\u{2192}B\u{2192}C\n-- within a grace window keeps A\'s secret validatable as long as\n-- expires_at:A is still future.\n--\n-- HMAC input binds (waitpoint_id | waitpoint_key | created_at_ms) with a\n-- pipe delimiter so field-boundary confusion cannot produce collisions\n-- across waitpoints.\n--\n-- Secret storage: per-partition replicated hash at\n-- ff:sec:{p:N}:waitpoint_hmac\n-- Fields:\n-- current_kid \u{2014} the kid minting new tokens (no expiry)\n-- secret:<kid> \u{2014} hex-encoded HMAC key for each kid ever installed\n-- expires_at:<kid> \u{2014} unix ms; accept tokens under <kid> iff exp > now_ms\n-- INVARIANT: expires_at:<current_kid> is NEVER written\n-- previous_kid \u{2014} observability/audit only: the kid immediately\n-- preceding current_kid (NOT the only acceptable one)\n-- previous_expires_at \u{2014} observability/audit only: matches\n-- expires_at:<previous_kid>\n--\n-- Replication is required for Valkey cluster mode (all FCALL KEYS must\n-- hash to the same slot); rotation fans out across partitions.\n---------------------------------------------------------------------------\n\n-- Read the hmac_secrets hash. Returns a table with:\n-- current_kid, current_secret \u{2014} the minting kid (nil if not initialized)\n-- kid_secrets = { [kid] = { secret = <hex>, expires_at = <ms or nil> } }\n-- includes current_kid (expires_at = nil \u{2192} no expiry)\n-- includes every secret:<kid> present in the hash\n-- previous_kid, previous_secret, previous_expires_at \u{2014} kept for back-compat\n-- (audit log / observability); validate path does NOT depend on them.\n-- Returns nil if the hash is absent.\nlocal function load_waitpoint_secrets(secrets_key)\n local raw = redis.call(\"HGETALL\", secrets_key)\n if #raw == 0 then\n return nil\n end\n local t = {}\n for i = 1, #raw, 2 do\n t[raw[i]] = raw[i + 1]\n end\n local out = {\n current_kid = t.current_kid,\n previous_kid = t.previous_kid,\n previous_expires_at = t.previous_expires_at,\n kid_secrets = {},\n }\n if out.current_kid then\n out.current_secret = t[\"secret:\" .. out.current_kid]\n end\n if out.previous_kid then\n out.previous_secret = t[\"secret:\" .. out.previous_kid]\n end\n -- Multi-kid scan: every secret:<kid> becomes a validation candidate.\n -- current_kid has no expiry entry (intentional \u{2014} it\'s always valid).\n -- Other kids are accepted iff expires_at:<kid> is set AND > now_ms; the\n -- expiry check runs in validate_waitpoint_token so we simply carry the\n -- raw expires_at string here.\n for k, v in pairs(t) do\n if k:sub(1, 7) == \"secret:\" then\n local kid = k:sub(8)\n if kid ~= \"\" then\n out.kid_secrets[kid] = {\n secret = v,\n expires_at = t[\"expires_at:\" .. kid],\n }\n end\n end\n end\n return out\nend\n\n-- Build the HMAC input string. Pipe delimiter prevents concatenation\n-- collisions across distinct (waitpoint_id, waitpoint_key) pairs.\nlocal function waitpoint_hmac_input(waitpoint_id, waitpoint_key, created_at_ms)\n return waitpoint_id .. \"|\" .. waitpoint_key .. \"|\" .. tostring(created_at_ms)\nend\n\n-- Mint a waitpoint token using the current kid.\n-- Returns (token, kid) on success or (nil, error_code) on failure.\n-- Defense-in-depth: returns a typed error for missing secrets_key / missing\n-- secrets hash so external callers that construct FCALL KEYS by hand cannot\n-- produce the \"arguments must be strings or integers\" Lua panic via nil.\nlocal function mint_waitpoint_token(secrets_key, waitpoint_id, waitpoint_key, created_at_ms)\n if type(secrets_key) ~= \"string\" or secrets_key == \"\" then\n return nil, \"invalid_keys_missing_hmac\"\n end\n local secrets = load_waitpoint_secrets(secrets_key)\n if not secrets or not secrets.current_kid or not secrets.current_secret then\n return nil, \"hmac_secret_not_initialized\"\n end\n local input = waitpoint_hmac_input(waitpoint_id, waitpoint_key, created_at_ms)\n local digest = hmac_sha1_hex(secrets.current_secret, input)\n if not digest then\n return nil, \"invalid_secret\"\n end\n return secrets.current_kid .. \":\" .. digest, secrets.current_kid\nend\n\n-- Validate a waitpoint token against the (waitpoint_id, waitpoint_key,\n-- created_at_ms) that were bound at mint time. Accepts tokens signed with\n-- current_kid, or previous_kid if previous_expires_at has not passed.\n-- Returns nil on success or an error code string on failure.\nlocal function validate_waitpoint_token(\n secrets_key, token, waitpoint_id, waitpoint_key, created_at_ms, now_ms\n)\n if type(secrets_key) ~= \"string\" or secrets_key == \"\" then\n return \"invalid_keys_missing_hmac\"\n end\n if type(token) ~= \"string\" or token == \"\" then\n return \"missing_token\"\n end\n local sep = token:find(\":\", 1, true)\n if not sep or sep < 2 or sep >= #token then\n return \"invalid_token\"\n end\n local kid = token:sub(1, sep - 1)\n local presented = token:sub(sep + 1)\n if #presented ~= 40 then\n -- SHA1 hex digest is always 40 chars.\n return \"invalid_token\"\n end\n\n local secrets = load_waitpoint_secrets(secrets_key)\n if not secrets or not secrets.current_kid then\n return \"hmac_secret_not_initialized\"\n end\n\n -- Multi-kid validation. ANY secret:<kid> present in the hash is a\n -- candidate IF:\n -- - kid == current_kid (no expiry, always valid), OR\n -- - expires_at:<kid> is a positive integer AND > now_ms.\n --\n -- Rationale: rapid rotation (A\u{2192}B\u{2192}C inside a grace window) must keep\n -- in-flight A-signed tokens valid. The previous 2-slot model\n -- (current + previous) evicted A as soon as B became previous, even\n -- though expires_at:A was still future. RFC-004 \u{a7}Waitpoint Security\n -- promises grace duration, not \"grace until next rotation\".\n --\n -- Fail-CLOSED on malformed expires_at: a corrupted/non-numeric value\n -- means \"no affirmative unexpired proof\" \u{2014} reject.\n local secret = nil\n local expiry_state = nil -- \"known_kid_expired\" | \"unknown_kid\"\n if kid == secrets.current_kid then\n secret = secrets.current_secret\n else\n local entry = secrets.kid_secrets and secrets.kid_secrets[kid]\n if entry then\n local exp = tonumber(entry.expires_at)\n if not exp or exp <= 0 or exp < now_ms then\n -- secret:<kid> is present but its grace has elapsed (or was\n -- never recorded). Distinguishable from unknown_kid so the\n -- caller can log the more-actionable \"token_expired\".\n expiry_state = \"known_kid_expired\"\n else\n secret = entry.secret\n end\n else\n expiry_state = \"unknown_kid\"\n end\n end\n\n if not secret then\n if expiry_state == \"known_kid_expired\" then\n return \"token_expired\"\n end\n return \"invalid_token\"\n end\n\n local input = waitpoint_hmac_input(waitpoint_id, waitpoint_key, created_at_ms)\n local expected = hmac_sha1_hex(secret, input)\n if not expected then\n return \"invalid_secret\"\n end\n if not constant_time_eq(expected, presented) then\n return \"invalid_token\"\n end\n return nil\nend\n\n---------------------------------------------------------------------------\n-- Time\n---------------------------------------------------------------------------\n\n-- Returns the Valkey server time as milliseconds. Always prefer this over\n-- a caller-supplied now_ms for fields that are used in retention windows,\n-- eligibility scoring, lease expiry, or any cross-execution causal\n-- comparison. Client-supplied timestamps are trivially skewable and\n-- produce observability drift when compared against fields written by\n-- other Lua functions (which already use redis.call(\"TIME\")).\nlocal function server_time_ms()\n local t = redis.call(\"TIME\")\n return tonumber(t[1]) * 1000 + math.floor(tonumber(t[2]) / 1000)\nend\n\n---------------------------------------------------------------------------\n-- Return wrappers (\u{a7}4.9)\n---------------------------------------------------------------------------\n\nlocal function ok(...)\n return {1, \"OK\", ...}\nend\n\nlocal function err(...)\n return {0, ...}\nend\n\n-- Require a numeric value from ARGV. Returns the number on success or\n-- an err() tuple on failure. Callers must check: if type(n) == \"table\"\n-- then return n end (the table IS the err tuple).\nlocal function require_number(val, name)\n local n = tonumber(val)\n if n == nil then\n return err(\"invalid_input\", name .. \" must be a number, got: \" .. tostring(val))\n end\n return n\nend\n\nlocal function ok_already_satisfied(...)\n return {1, \"ALREADY_SATISFIED\", ...}\nend\n\nlocal function ok_duplicate(...)\n return {1, \"DUPLICATE\", ...}\nend\n\n---------------------------------------------------------------------------\n-- Data access\n---------------------------------------------------------------------------\n\n-- Converts HGETALL flat array {k1, v1, k2, v2, ...} to a Lua dict table.\n-- All RFC pseudocode uses core.field syntax which requires this conversion.\nlocal function hgetall_to_table(flat)\n local t = {}\n for i = 1, #flat, 2 do\n t[flat[i]] = flat[i + 1]\n end\n return t\nend\n\n-- Safe nil/empty check. Valkey hashes cannot store nil: HGET on a missing\n-- field returns false (via Lua), and cleared fields store \"\". This helper\n-- handles both cases plus actual nil for fields absent from hgetall_to_table.\nlocal function is_set(v)\n return v ~= nil and v ~= false and v ~= \"\"\nend\n\n---------------------------------------------------------------------------\n-- Lease validation (most widely shared \u{2014} prevents copy-paste drift)\n-- RFC-010 \u{a7}4.8: 7+ functions use this: complete, fail, suspend, delay,\n-- move_to_waiting_children, append_frame, report_usage.\n---------------------------------------------------------------------------\n\n-- Validates that the caller holds a valid, non-expired, non-revoked lease.\n-- Returns an error tuple on failure, or nil on success.\n-- @param core table from hgetall_to_table(HGETALL exec_core)\n-- @param argv table with lease_id, lease_epoch, attempt_id\n-- @param now_ms current timestamp in milliseconds\nlocal function validate_lease(core, argv, now_ms)\n if core.lifecycle_phase ~= \"active\" then\n return err(\"execution_not_active\",\n core.terminal_outcome or \"\", core.current_lease_epoch or \"\")\n end\n if core.ownership_state == \"lease_revoked\" then\n return err(\"lease_revoked\")\n end\n if tonumber(core.lease_expires_at or \"0\") <= now_ms then\n return err(\"lease_expired\")\n end\n if core.current_lease_id ~= argv.lease_id then\n return err(\"stale_lease\")\n end\n if core.current_lease_epoch ~= argv.lease_epoch then\n return err(\"stale_lease\")\n end\n if core.current_attempt_id ~= argv.attempt_id then\n return err(\"stale_lease\")\n end\n return nil\nend\n\n-- Sets ownership_state to lease_expired_reclaimable. Idempotent.\n--\n-- Also writes `closed_at`/`closed_reason=\"lease_expired\"` on the attempt\'s\n-- `stream_meta` hash when that stream exists, so `tail_stream` consumers\n-- observe the terminal signal without having to fall back to polling\n-- `execution_state`. This matters for the permanent-failure case (worker\n-- OOM or node dead, no replacement reclaims): the reclaim path that\n-- normally writes `closed_reason=\"reclaimed\"` may never run, and without\n-- this signal the tail poll loop waits forever.\n--\n-- Write order: we only write stream_meta if its existing `closed_at` is\n-- empty. A later `ff_reclaim_execution` that overwrites `closed_reason`\n-- to \"reclaimed\" still wins because it unconditionally HSETs the field;\n-- this function intentionally does NOT overwrite a pre-existing close.\n--\n-- Key construction: the stream_meta key is derived from core_key\'s\n-- `{p:N}` hash tag + current_attempt_index. All three keys share the\n-- same hash tag, so this stays single-slot in cluster mode despite not\n-- being declared in KEYS upfront \u{2014} mirrors the dynamic attempt/lane key\n-- construction in `ff_create_execution`.\n--\n-- @param keys table with core_key, lease_history_key\n-- @param core table from hgetall_to_table\n-- @param now_ms current timestamp in milliseconds\n-- @param maxlen MAXLEN for lease_history stream\nlocal function mark_expired(keys, core, now_ms, maxlen)\n if core.ownership_state == \"lease_expired_reclaimable\" then\n return -- idempotent\n end\n -- ALL 7 dims (preserve lifecycle_phase=active, eligibility_state, terminal_outcome=none)\n redis.call(\"HSET\", keys.core_key,\n \"lifecycle_phase\", core.lifecycle_phase or \"active\", -- preserve\n \"ownership_state\", \"lease_expired_reclaimable\",\n \"eligibility_state\", core.eligibility_state or \"not_applicable\", -- preserve\n \"blocking_reason\", \"waiting_for_worker\",\n \"blocking_detail\", \"lease expired, awaiting reclaim\",\n \"terminal_outcome\", core.terminal_outcome or \"none\", -- preserve\n \"attempt_state\", \"attempt_interrupted\",\n \"public_state\", \"active\",\n \"lease_expired_at\", now_ms,\n \"last_mutation_at\", now_ms)\n redis.call(\"XADD\", keys.lease_history_key, \"MAXLEN\", \"~\", maxlen, \"*\",\n \"event\", \"expired\",\n \"lease_id\", core.current_lease_id or \"\",\n \"lease_epoch\", core.current_lease_epoch or \"\",\n \"attempt_index\", core.current_attempt_index or \"\",\n \"attempt_id\", core.current_attempt_id or \"\",\n \"worker_id\", core.current_worker_id or \"\",\n \"worker_instance_id\", core.current_worker_instance_id or \"\",\n \"ts\", now_ms)\n\n -- Close stream_meta (if the stream was lazily created) so tail_stream\n -- consumers receive the terminal signal. Core key format:\n -- ff:exec:{p:N}:<eid>:core\n -- Stream meta key format:\n -- ff:stream:{p:N}:<eid>:<attempt_index>:meta\n local att_idx = core.current_attempt_index\n if att_idx ~= nil and att_idx ~= \"\" then\n local tag_open = string.find(keys.core_key, \"{\", 1, true)\n local tag_close = tag_open and string.find(keys.core_key, \"}\", tag_open, true)\n if tag_open and tag_close then\n local tag = string.sub(keys.core_key, tag_open, tag_close)\n -- After `}:` comes `<eid>:core`. Walk past the `}:` delimiter.\n local after_tag = string.sub(keys.core_key, tag_close + 2)\n local eid_end = string.find(after_tag, \":core\", 1, true)\n if eid_end then\n local eid = string.sub(after_tag, 1, eid_end - 1)\n local stream_meta_key = \"ff:stream:\" .. tag .. \":\" .. eid\n .. \":\" .. tostring(att_idx) .. \":meta\"\n if redis.call(\"EXISTS\", stream_meta_key) == 1 then\n local existing_closed_at = redis.call(\"HGET\", stream_meta_key, \"closed_at\")\n if not is_set(existing_closed_at) then\n redis.call(\"HSET\", stream_meta_key,\n \"closed_at\", tostring(now_ms),\n \"closed_reason\", \"lease_expired\")\n end\n end\n end\n end\n end\nend\n\n-- Validates lease AND atomically marks expired if lease has lapsed.\n-- Use this variant for write-path callers (complete, fail, suspend, delay,\n-- move_to_waiting_children) that have the lease_history key available.\n-- For read-only callers (append_frame, report_usage) use validate_lease.\n-- @param core table from hgetall_to_table\n-- @param argv table with lease_id, lease_epoch, attempt_id\n-- @param now_ms current timestamp in milliseconds\n-- @param keys table with core_key, lease_history_key\n-- @param maxlen MAXLEN for lease_history stream\nlocal function validate_lease_and_mark_expired(core, argv, now_ms, keys, maxlen)\n if core.lifecycle_phase ~= \"active\" then\n return err(\"execution_not_active\",\n core.terminal_outcome or \"\", core.current_lease_epoch or \"\")\n end\n if core.ownership_state == \"lease_revoked\" then\n return err(\"lease_revoked\")\n end\n if tonumber(core.lease_expires_at or \"0\") <= now_ms then\n mark_expired(keys, core, now_ms, maxlen)\n return err(\"lease_expired\")\n end\n if core.current_lease_id ~= argv.lease_id then\n return err(\"stale_lease\")\n end\n if core.current_lease_epoch ~= argv.lease_epoch then\n return err(\"stale_lease\")\n end\n if core.current_attempt_id ~= argv.attempt_id then\n return err(\"stale_lease\")\n end\n return nil\nend\n\n-- Consolidates the ~15-line lease release block shared by 7 functions.\n-- DEL lease_current, ZREM lease_expiry + worker_leases + active_index,\n-- clear lease fields on exec_core, XADD lease_history \"released\".\n-- @param keys table with lease_current_key, lease_expiry_key,\n-- worker_leases_key, active_index_key, lease_history_key,\n-- attempt_timeout_key, core_key\n-- @param core table from hgetall_to_table\n-- @param reason string reason for release (e.g. \"completed\", \"suspend\")\n-- @param now_ms current timestamp in milliseconds\n-- @param maxlen MAXLEN for lease_history stream\nlocal function clear_lease_and_indexes(keys, core, reason, now_ms, maxlen)\n local eid = core.execution_id or \"\"\n\n -- DEL lease record\n redis.call(\"DEL\", keys.lease_current_key)\n\n -- ZREM/SREM from scheduling indexes\n redis.call(\"ZREM\", keys.lease_expiry_key, eid)\n redis.call(\"SREM\", keys.worker_leases_key, eid)\n redis.call(\"ZREM\", keys.active_index_key, eid)\n redis.call(\"ZREM\", keys.attempt_timeout_key, eid)\n\n -- Clear lease fields on exec_core (including stale expiry/revocation markers)\n redis.call(\"HSET\", keys.core_key,\n \"current_lease_id\", \"\",\n \"current_worker_id\", \"\",\n \"current_worker_instance_id\", \"\",\n \"lease_expires_at\", \"\",\n \"lease_last_renewed_at\", \"\",\n \"lease_renewal_deadline\", \"\",\n \"lease_expired_at\", \"\",\n \"lease_revoked_at\", \"\",\n \"lease_revoke_reason\", \"\",\n \"last_mutation_at\", now_ms)\n\n -- Lease history event\n redis.call(\"XADD\", keys.lease_history_key, \"MAXLEN\", \"~\", maxlen, \"*\",\n \"event\", \"released\",\n \"lease_id\", core.current_lease_id or \"\",\n \"lease_epoch\", core.current_lease_epoch or \"\",\n \"attempt_index\", core.current_attempt_index or \"\",\n \"attempt_id\", core.current_attempt_id or \"\",\n \"reason\", reason,\n \"ts\", now_ms)\nend\n\n---------------------------------------------------------------------------\n-- Defensive index cleanup\n-- RFC-010 \u{a7}4.8: ZREM execution_id from all scheduling + timeout indexes\n-- except except_key. ~14 ZREM/SREM calls.\n---------------------------------------------------------------------------\n\n-- @param keys table with all index key names\n-- @param eid execution_id string\n-- @param except_key optional key to skip (the target index for this transition)\nlocal function defensive_zrem_all_indexes(keys, eid, except_key)\n -- Each index key and whether it uses ZREM or SREM\n local zrem_keys = {\n keys.eligible_key,\n keys.delayed_key,\n keys.active_index_key,\n keys.suspended_key,\n keys.terminal_key,\n keys.blocked_deps_key,\n keys.blocked_budget_key,\n keys.blocked_quota_key,\n keys.blocked_route_key,\n keys.blocked_operator_key,\n keys.lease_expiry_key,\n keys.suspension_timeout_key,\n keys.attempt_timeout_key,\n keys.execution_deadline_key,\n }\n for _, k in ipairs(zrem_keys) do\n if k and k ~= except_key then\n redis.call(\"ZREM\", k, eid)\n end\n end\n -- worker_leases is a SET, not ZSET\n if keys.worker_leases_key and keys.worker_leases_key ~= except_key then\n redis.call(\"SREM\", keys.worker_leases_key, eid)\n end\nend\n\n---------------------------------------------------------------------------\n-- Suspension reason \u{2192} blocking_reason mapping\n-- RFC-004 \u{a7}Suspension Reason Categories\n---------------------------------------------------------------------------\n\nlocal REASON_TO_BLOCKING = {\n waiting_for_signal = \"waiting_for_signal\",\n waiting_for_approval = \"waiting_for_approval\",\n waiting_for_callback = \"waiting_for_callback\",\n waiting_for_tool_result = \"waiting_for_tool_result\",\n waiting_for_operator_review = \"paused_by_operator\",\n paused_by_policy = \"paused_by_policy\",\n paused_by_budget = \"waiting_for_budget\",\n step_boundary = \"waiting_for_signal\",\n manual_pause = \"paused_by_operator\",\n}\n\nlocal function map_reason_to_blocking(reason_code)\n return REASON_TO_BLOCKING[reason_code] or \"waiting_for_signal\"\nend\n\n---------------------------------------------------------------------------\n-- Resume condition evaluation (shared module)\n-- RFC-004 \u{a7}Resume Condition Model, RFC-005 \u{a7}8.3\n---------------------------------------------------------------------------\n\n-- Parse resume_condition_json into a matcher table used by\n-- evaluate_signal_against_condition and is_condition_satisfied.\n-- @param json JSON string of the resume condition\n-- @return table with matchers array, match_mode, minimum_signal_count, etc.\nlocal function initialize_condition(json)\n local spec = cjson.decode(json)\n local matchers = {}\n local names = spec.required_signal_names or {}\n if #names == 0 then\n -- Empty required_signal_names acts as wildcard: ANY signal satisfies the condition.\n -- To require explicit operator resume (no signal match), pass a sentinel name\n -- that no real signal will match, or use a different resume mechanism.\n matchers[1] = { name = \"\", satisfied = false, signal_id = \"\" }\n else\n for i, name in ipairs(names) do\n matchers[i] = { name = name, satisfied = false, signal_id = \"\" }\n end\n end\n return {\n condition_type = spec.condition_type or \"signal_set\",\n match_mode = spec.signal_match_mode or \"any\",\n minimum_signal_count = tonumber(spec.minimum_signal_count or \"1\"),\n total_matchers = #names > 0 and #names or 1,\n satisfied_count = 0,\n matchers = matchers,\n closed = false,\n }\nend\n\n-- Write condition state to a dedicated condition hash key.\n-- @param key Valkey key for the condition hash\n-- @param cond condition table from initialize_condition\n-- @param now_ms current timestamp\nlocal function write_condition_hash(key, cond, now_ms)\n local fields = {\n \"condition_type\", cond.condition_type,\n \"match_mode\", cond.match_mode,\n \"minimum_signal_count\", tostring(cond.minimum_signal_count),\n \"total_matchers\", tostring(cond.total_matchers),\n \"satisfied_count\", tostring(cond.satisfied_count),\n \"closed\", cond.closed and \"1\" or \"0\",\n \"updated_at\", tostring(now_ms),\n }\n for i = 1, cond.total_matchers do\n local m = cond.matchers[i]\n local idx = i - 1 -- external field names remain 0-based for wire compat\n fields[#fields + 1] = \"matcher:\" .. idx .. \":name\"\n fields[#fields + 1] = m.name\n fields[#fields + 1] = \"matcher:\" .. idx .. \":satisfied\"\n fields[#fields + 1] = m.satisfied and \"1\" or \"0\"\n fields[#fields + 1] = \"matcher:\" .. idx .. \":signal_id\"\n fields[#fields + 1] = m.signal_id\n end\n redis.call(\"HSET\", key, unpack(fields))\nend\n\n-- Match a signal against the condition\'s matchers. Mutates cond in-place.\n-- @param cond condition table from initialize_condition\n-- @param signal_name signal name string\n-- @param signal_id signal ID string\n-- @return true if this signal matched a matcher, false otherwise\nlocal function evaluate_signal_against_condition(cond, signal_name, signal_id)\n for i = 1, cond.total_matchers do\n local m = cond.matchers[i]\n if not m.satisfied then\n -- Empty name = wildcard matcher (matches any signal)\n if m.name == \"\" or m.name == signal_name then\n m.satisfied = true\n m.signal_id = signal_id or \"\"\n cond.satisfied_count = cond.satisfied_count + 1\n return true\n end\n end\n end\n return false\nend\n\n-- Check if the overall condition is satisfied based on mode.\n-- @param cond condition table\n-- @return true if condition is satisfied\nlocal function is_condition_satisfied(cond)\n local mode = cond.match_mode\n local min_count = cond.minimum_signal_count\n if mode == \"any\" then\n return cond.satisfied_count >= min_count\n elseif mode == \"all\" then\n return cond.satisfied_count >= cond.total_matchers\n end\n -- count(n) mode \u{2014} same as any with minimum_signal_count = n\n return cond.satisfied_count >= min_count\nend\n\n-- Extract a named field from a Valkey Stream entry\'s flat field array.\n-- Stream entries return {id, {field1, val1, field2, val2, ...}}.\n-- This operates on the inner flat array.\n-- @param fields flat array from stream entry[2]\n-- @param name field name to extract\n-- @return value string or nil\nlocal function extract_field(fields, name)\n for i = 1, #fields, 2 do\n if fields[i] == name then\n return fields[i + 1]\n end\n end\n return nil\nend\n\n---------------------------------------------------------------------------\n-- Suspension helpers (RFC-004)\n---------------------------------------------------------------------------\n\n-- Returns an empty signal summary JSON string for initial suspension record.\nlocal function initial_signal_summary_json()\n return \'{\"total_count\":0,\"matched_count\":0,\"signal_names\":[]}\'\nend\n\n-- Validates that a pending waitpoint can be activated by a suspension.\n-- Returns error tuple on failure, nil on success.\n-- @param wp_raw flat array from HGETALL on waitpoint hash\n-- @param eid expected execution_id\n-- @param att_idx expected attempt_index (string)\n-- @param now_ms current timestamp\nlocal function validate_pending_waitpoint(wp_raw, eid, att_idx, now_ms)\n if #wp_raw == 0 then\n return err(\"waitpoint_not_found\")\n end\n local wp = hgetall_to_table(wp_raw)\n if wp.state ~= \"pending\" then\n return err(\"waitpoint_not_pending\")\n end\n if wp.execution_id ~= eid then\n return err(\"invalid_waitpoint_for_execution\")\n end\n if tostring(wp.attempt_index) ~= tostring(att_idx) then\n return err(\"invalid_waitpoint_for_execution\")\n end\n -- Check if pending waitpoint has expired\n if is_set(wp.expires_at) and tonumber(wp.expires_at) <= now_ms then\n return err(\"pending_waitpoint_expired\")\n end\n return nil\nend\n\n-- Validates that a suspension record exists and is open (not closed).\n-- Returns error tuple on failure, nil on success. Also returns the parsed table.\n-- @param susp_raw flat array from HGETALL on suspension:current\nlocal function assert_active_suspension(susp_raw)\n if #susp_raw == 0 then\n return err(\"execution_not_suspended\")\n end\n local susp = hgetall_to_table(susp_raw)\n if not is_set(susp.suspension_id) then\n return err(\"execution_not_suspended\")\n end\n if is_set(susp.closed_at) then\n return err(\"execution_not_suspended\")\n end\n return nil, susp\nend\n\n-- Validates that a waitpoint belongs to the expected execution + suspension.\n-- Returns error tuple on failure, nil on success.\n-- @param wp_raw flat array from HGETALL on waitpoint hash\n-- @param eid expected execution_id\n-- @param sid expected suspension_id\n-- @param wid expected waitpoint_id\nlocal function assert_waitpoint_belongs(wp_raw, eid, sid, wid)\n if #wp_raw == 0 then\n return err(\"waitpoint_not_found\")\n end\n local wp = hgetall_to_table(wp_raw)\n if wp.execution_id ~= eid then\n return err(\"invalid_waitpoint_for_execution\")\n end\n if is_set(sid) and wp.suspension_id ~= sid then\n return err(\"invalid_waitpoint_for_execution\")\n end\n if is_set(wid) and wp.waitpoint_id ~= wid then\n return err(\"invalid_waitpoint_for_execution\")\n end\n return nil\nend\n\n---------------------------------------------------------------------------\n-- Policy\n---------------------------------------------------------------------------\n\n-- Decode a JSON policy string into flat key-value pairs suitable for HSET.\n-- @param json JSON string of the policy object\n-- @return flat array {k1, v1, k2, v2, ...} for use with redis.call(\"HSET\", key, unpack(...))\nlocal function unpack_policy(json)\n local policy = cjson.decode(json)\n local flat = {}\n for k, v in pairs(policy) do\n flat[#flat + 1] = k\n if type(v) == \"table\" then\n flat[#flat + 1] = cjson.encode(v)\n else\n flat[#flat + 1] = tostring(v)\n end\n end\n return flat\nend\n\n\n-- source: lua/version.lua\n-- FlowFabric library version check\n-- Returns the library version string. Used by the loader to detect\n-- whether the library is loaded and at the expected version.\n--\n-- Bump this string whenever any registered function\'s KEYS or ARGV arity\n-- changes, or a new function is added. Mismatched versions force\n-- `FUNCTION LOAD REPLACE` so old binaries cannot FCALL a library whose key\n-- signatures they expect a different shape for.\n--\n-- SINGLE SOURCE OF TRUTH: Rust\'s `LIBRARY_VERSION` is extracted from the\n-- `return \'X\'` literal below by `scripts/gen-ff-script-lua.sh`, which\n-- writes `crates/ff-script/src/flowfabric_lua_version`. Rust reads that\n-- file via `include_str!`. Do NOT maintain a separate Rust literal.\n-- Extract contract: the body MUST contain exactly one `return \'X\'` literal\n-- with single quotes (not double). CI runs the gen script and diffs; any\n-- drift fails the build.\n\nredis.register_function(\'ff_version\', function(keys, args)\n return \'5\'\nend)\n\n\n-- source: lua/lease.lua\n-- FlowFabric lease management functions\n-- Reference: RFC-003 (Lease and Fencing), RFC-010 \u{a7}4 (function inventory)\n--\n-- Depends on helpers: ok, err, ok_already_satisfied, hgetall_to_table,\n-- is_set, validate_lease, mark_expired\n\n---------------------------------------------------------------------------\n-- #6 ff_renew_lease\n--\n-- Extends a still-valid lease. Preserves lease_id and lease_epoch.\n-- KEYS (4): exec_core, lease_current, lease_history, lease_expiry_zset\n-- ARGV (7): execution_id, attempt_index, attempt_id,\n-- lease_id, lease_epoch,\n-- lease_ttl_ms, lease_history_grace_ms\n---------------------------------------------------------------------------\nredis.register_function(\'ff_renew_lease\', function(keys, args)\n -- Positional KEYS\n local K = {\n core_key = keys[1],\n lease_current = keys[2],\n lease_history = keys[3],\n lease_expiry_key = keys[4],\n }\n\n -- Positional ARGV\n local lease_ttl_n = require_number(args[6], \"lease_ttl_ms\")\n if type(lease_ttl_n) == \"table\" then return lease_ttl_n end\n local grace_n = require_number(args[7], \"lease_history_grace_ms\")\n if type(grace_n) == \"table\" then return grace_n end\n\n local A = {\n execution_id = args[1],\n attempt_index = args[2],\n attempt_id = args[3],\n lease_id = args[4],\n lease_epoch = args[5],\n lease_ttl_ms = lease_ttl_n,\n lease_history_grace_ms = grace_n,\n }\n\n -- Server time\n local t = redis.call(\"TIME\")\n local now_ms = tonumber(t[1]) * 1000 + math.floor(tonumber(t[2]) / 1000)\n\n -- Read execution core\n local raw = redis.call(\"HGETALL\", K.core_key)\n if #raw == 0 then\n return err(\"execution_not_found\")\n end\n local core = hgetall_to_table(raw)\n\n -- Validate lifecycle\n if core.lifecycle_phase ~= \"active\" then\n return err(\"execution_not_active\",\n core.terminal_outcome or \"\", core.current_lease_epoch or \"\")\n end\n\n -- Check revocation\n if core.ownership_state == \"lease_revoked\" or is_set(core.lease_revoked_at) then\n return err(\"lease_revoked\")\n end\n\n -- Check expiry (if expired, mark and reject)\n if tonumber(core.lease_expires_at or \"0\") <= now_ms then\n mark_expired(\n { core_key = K.core_key, lease_history_key = K.lease_history },\n core, now_ms, 1000)\n return err(\"lease_expired\")\n end\n\n -- Validate caller identity\n if tostring(core.current_attempt_index) ~= A.attempt_index then\n return err(\"stale_lease\")\n end\n if core.current_attempt_id ~= A.attempt_id then\n return err(\"stale_lease\")\n end\n if core.current_lease_id ~= A.lease_id then\n return err(\"stale_lease\")\n end\n if tostring(core.current_lease_epoch) ~= A.lease_epoch then\n return err(\"stale_lease\")\n end\n\n -- Compute new deadlines\n local new_expires_at = now_ms + A.lease_ttl_ms\n local new_renewal_deadline = now_ms + math.floor(A.lease_ttl_ms * 2 / 3)\n\n -- Update exec_core\n redis.call(\"HSET\", K.core_key,\n \"lease_last_renewed_at\", tostring(now_ms),\n \"lease_renewal_deadline\", tostring(new_renewal_deadline),\n \"lease_expires_at\", tostring(new_expires_at),\n \"last_mutation_at\", tostring(now_ms))\n\n -- Update lease_current\n redis.call(\"HSET\", K.lease_current,\n \"last_renewed_at\", tostring(now_ms),\n \"renewal_deadline\", tostring(new_renewal_deadline),\n \"expires_at\", tostring(new_expires_at))\n redis.call(\"PEXPIREAT\", K.lease_current,\n new_expires_at + A.lease_history_grace_ms)\n\n -- Update lease_expiry index\n redis.call(\"ZADD\", K.lease_expiry_key, new_expires_at, A.execution_id)\n\n -- Renewal history event OFF for v1 (RFC-010 \u{a7}4.8g).\n -- The exec_core field lease_last_renewed_at provides the latest renewal\n -- timestamp without stream overhead. Enable per-lane when detailed\n -- ownership audit trails are needed.\n\n return ok(tostring(new_expires_at))\nend)\n\n---------------------------------------------------------------------------\n-- #28 ff_mark_lease_expired_if_due\n--\n-- Called by the lease expiry scanner. Re-validates that the lease is\n-- actually expired (guards against renewal since the ZRANGEBYSCORE read).\n-- Idempotent: no-op if already expired/reclaimed/revoked or not yet due.\n--\n-- KEYS (4): exec_core, lease_current, lease_expiry_zset, lease_history\n-- ARGV (1): execution_id\n---------------------------------------------------------------------------\nredis.register_function(\'ff_mark_lease_expired_if_due\', function(keys, args)\n -- Positional KEYS\n local K = {\n core_key = keys[1],\n lease_current = keys[2],\n lease_expiry_key = keys[3],\n lease_history = keys[4],\n }\n\n local execution_id = args[1]\n\n -- Server time\n local t = redis.call(\"TIME\")\n local now_ms = tonumber(t[1]) * 1000 + math.floor(tonumber(t[2]) / 1000)\n\n -- Read execution core\n local raw = redis.call(\"HGETALL\", K.core_key)\n if #raw == 0 then\n -- Execution gone \u{2014} clean up stale index entry\n redis.call(\"ZREM\", K.lease_expiry_key, execution_id)\n return ok_already_satisfied(\"execution_not_found\")\n end\n local core = hgetall_to_table(raw)\n\n -- Guard: not active \u{2192} nothing to expire\n if core.lifecycle_phase ~= \"active\" then\n -- Stale index entry: execution already left active phase.\n -- Clean up and return.\n redis.call(\"ZREM\", K.lease_expiry_key, execution_id)\n return ok_already_satisfied(\"not_active\")\n end\n\n -- Guard: already marked expired or ownership already cleared\n if core.ownership_state == \"lease_expired_reclaimable\" then\n return ok_already_satisfied(\"already_expired\")\n end\n if core.ownership_state == \"lease_revoked\" then\n return ok_already_satisfied(\"already_revoked\")\n end\n if core.ownership_state == \"unowned\" then\n -- Shouldn\'t happen for active phase, but defensive\n redis.call(\"ZREM\", K.lease_expiry_key, execution_id)\n return ok_already_satisfied(\"unowned\")\n end\n\n -- Check if lease is actually expired\n local expires_at = tonumber(core.lease_expires_at or \"0\")\n if expires_at > now_ms then\n -- Lease was renewed since the scanner read the index.\n -- The ZADD in renew_lease already updated the score.\n return ok_already_satisfied(\"not_yet_expired\")\n end\n\n -- Mark expired using shared helper\n -- Sets ownership_state=lease_expired_reclaimable, blocking_reason,\n -- blocking_detail, lease_expired_at, XADD lease_history \"expired\" event.\n mark_expired(\n { core_key = K.core_key, lease_history_key = K.lease_history },\n core, now_ms, 1000)\n\n -- NOTE: Do NOT ZREM from lease_expiry_key here.\n -- The entry stays so the scheduler (or a subsequent scanner cycle)\n -- can discover reclaimable executions from the same index.\n -- reclaim_execution or cancel_execution removes it.\n\n return ok(\"marked_expired\")\nend)\n\n---------------------------------------------------------------------------\n-- #8 ff_revoke_lease\n--\n-- Operator-initiated lease revocation. Immediate \u{2014} no grace period.\n-- Does NOT terminal-transition the execution. It clears the current\n-- owner and places the execution into lease_revoked ownership condition.\n-- The scheduler or operator must subsequently reclaim or cancel.\n--\n-- KEYS (5): exec_core, lease_current, lease_history,\n-- lease_expiry_zset, worker_leases\n-- ARGV (3): execution_id, expected_lease_id (or \"\" to skip check),\n-- revoke_reason\n---------------------------------------------------------------------------\nredis.register_function(\'ff_revoke_lease\', function(keys, args)\n -- Positional KEYS\n local K = {\n core_key = keys[1],\n lease_current = keys[2],\n lease_history = keys[3],\n lease_expiry_key = keys[4],\n worker_leases = keys[5],\n }\n\n -- Positional ARGV\n local A = {\n execution_id = args[1],\n expected_lease_id = args[2],\n revoke_reason = args[3],\n }\n\n -- Server time\n local t = redis.call(\"TIME\")\n local now_ms = tonumber(t[1]) * 1000 + math.floor(tonumber(t[2]) / 1000)\n\n -- Read execution core\n local raw = redis.call(\"HGETALL\", K.core_key)\n if #raw == 0 then\n return err(\"execution_not_found\")\n end\n local core = hgetall_to_table(raw)\n\n -- Must be active + leased\n if core.lifecycle_phase ~= \"active\" then\n return err(\"execution_not_active\",\n core.terminal_outcome or \"\", core.current_lease_epoch or \"\")\n end\n if core.ownership_state ~= \"leased\" then\n if core.ownership_state == \"lease_revoked\" then\n return ok_already_satisfied(\"already_revoked\")\n end\n if core.ownership_state == \"lease_expired_reclaimable\" then\n return ok_already_satisfied(\"already_expired\")\n end\n return err(\"no_active_lease\")\n end\n\n -- Optional lease_id check (for targeted revocation)\n if is_set(A.expected_lease_id) and core.current_lease_id ~= A.expected_lease_id then\n return err(\"stale_lease\",\n \"expected \" .. A.expected_lease_id .. \" but current is \" .. (core.current_lease_id or \"\"))\n end\n\n -- Capture identity for history before clearing\n local lease_id = core.current_lease_id or \"\"\n local lease_epoch = core.current_lease_epoch or \"\"\n local attempt_index = core.current_attempt_index or \"\"\n local attempt_id = core.current_attempt_id or \"\"\n local worker_id = core.current_worker_id or \"\"\n local worker_instance_id = core.current_worker_instance_id or \"\"\n\n -- Update exec_core: all 7 state vector dimensions + public_state\n redis.call(\"HSET\", K.core_key,\n -- 7 state vector dimensions\n \"lifecycle_phase\", \"active\",\n \"ownership_state\", \"lease_revoked\",\n \"eligibility_state\", core.eligibility_state or \"not_applicable\",\n \"blocking_reason\", \"waiting_for_worker\",\n \"blocking_detail\", \"lease revoked: \" .. (A.revoke_reason or \"operator\"),\n \"terminal_outcome\", \"none\",\n \"attempt_state\", \"attempt_interrupted\",\n -- Derived public state (RFC-001 \u{a7}2.4 D3: active \u{2192} public_state=active)\n \"public_state\", \"active\",\n -- Revocation fields\n \"lease_revoked_at\", tostring(now_ms),\n \"lease_revoke_reason\", A.revoke_reason or \"operator\",\n \"last_mutation_at\", tostring(now_ms))\n\n -- DEL lease_current\n redis.call(\"DEL\", K.lease_current)\n\n -- ZREM from lease expiry index\n redis.call(\"ZREM\", K.lease_expiry_key, A.execution_id)\n\n -- SREM from worker leases set\n redis.call(\"SREM\", K.worker_leases, A.execution_id)\n\n -- Append revoked event to lease history\n redis.call(\"XADD\", K.lease_history, \"MAXLEN\", \"~\", 1000, \"*\",\n \"event\", \"revoked\",\n \"lease_id\", lease_id,\n \"lease_epoch\", lease_epoch,\n \"attempt_index\", attempt_index,\n \"attempt_id\", attempt_id,\n \"worker_id\", worker_id,\n \"worker_instance_id\", worker_instance_id,\n \"reason\", A.revoke_reason or \"operator\",\n \"ts\", tostring(now_ms))\n\n return ok(\"revoked\", lease_id, lease_epoch)\nend)\n-- NOTE: ff_issue_reclaim_grant is in scheduling.lua\n-- NOTE: ff_reclaim_execution is in execution.lua\n\n\n-- source: lua/execution.lua\n-- FlowFabric execution lifecycle functions\n-- Reference: RFC-001 (Execution), RFC-010 \u{a7}4 (function inventory)\n--\n-- Depends on helpers: ok, err, hgetall_to_table, is_set,\n-- validate_lease, validate_lease_and_mark_expired,\n-- clear_lease_and_indexes, defensive_zrem_all_indexes, unpack_policy\n\n---------------------------------------------------------------------------\n-- #0 ff_create_execution\n--\n-- Creates a new execution: core hash, payload, policy, tags, indexes.\n-- Idempotent via idempotency key (SET NX with TTL).\n--\n-- KEYS (8): exec_core, payload_key, policy_key, tags_key,\n-- eligible_or_delayed_zset, idem_key,\n-- execution_deadline_zset, all_executions_set\n-- ARGV (13): execution_id, namespace, lane_id, execution_kind,\n-- priority, creator_identity, policy_json,\n-- input_payload, delay_until, dedup_ttl_ms,\n-- tags_json, execution_deadline_at, partition_id\n---------------------------------------------------------------------------\nredis.register_function(\'ff_create_execution\', function(keys, args)\n local K = {\n core_key = keys[1],\n payload_key = keys[2],\n policy_key = keys[3],\n tags_key = keys[4],\n scheduling_zset = keys[5], -- eligible or delayed\n idem_key = keys[6],\n deadline_zset = keys[7],\n all_executions_set = keys[8],\n }\n\n local priority_n = require_number(args[5], \"priority\")\n if type(priority_n) == \"table\" then return priority_n end\n\n local A = {\n execution_id = args[1],\n namespace = args[2],\n lane_id = args[3],\n execution_kind = args[4],\n priority = priority_n,\n creator_identity = args[6],\n policy_json = args[7],\n input_payload = args[8],\n delay_until = args[9], -- \"\" or ms timestamp\n dedup_ttl_ms = args[10], -- \"\" or ms\n tags_json = args[11], -- \"\" or JSON object\n execution_deadline_at = args[12], -- \"\" or ms timestamp\n partition_id = args[13],\n }\n\n local t = redis.call(\"TIME\")\n local now_ms = tonumber(t[1]) * 1000 + math.floor(tonumber(t[2]) / 1000)\n\n -- Clamp priority to [0, 9000]. The composite eligible-ZSET score formula\n -- -(priority * 1_000_000_000_000) + created_at uses Lua doubles (IEEE 754,\n -- 53-bit mantissa). priority > 9007 overflows the multiplication step;\n -- priority > ~8300 overflows the combined score when created_at is added.\n -- Clamping to 9000 keeps a safe margin while supporting 4+ orders of magnitude.\n if A.priority < 0 then A.priority = 0 end\n if A.priority > 9000 then A.priority = 9000 end\n\n -- 1. Idempotency check (only when idem_key is a real key, not the noop placeholder)\n if K.idem_key ~= \"\" and not string.find(K.idem_key, \"ff:noop:\") then\n local existing = redis.call(\"GET\", K.idem_key)\n if existing then\n return ok_duplicate(existing)\n end\n end\n\n -- 2. Guard: execution already exists\n if redis.call(\"EXISTS\", K.core_key) == 1 then\n return {1, \"DUPLICATE\", A.execution_id}\n end\n\n -- 2b. Policy JSON validation \u{2014} extract required_capabilities CSV now, BEFORE\n -- any write, so an error aborts without leaving a half-written exec_core.\n -- Fail-CLOSED on any malformed/typed input rather than silently defaulting\n -- to the empty-required wildcard \u{2014} required_capabilities is security-\n -- sensitive (an empty set matches any worker).\n --\n -- * An empty / \"{}\" policy_json \u{2192} no required_capabilities field written\n -- \u{2192} wildcard match, intentional.\n -- * A non-empty policy_json that fails cjson.decode \u{2192} fail with\n -- invalid_policy_json. An operator who MEANT \"no policy\" passes \"\".\n -- * routing_requirements.required_capabilities present but not an array\n -- \u{2192} fail with invalid_capabilities:required_not_array.\n -- * Any element that is not a non-empty string \u{2192} fail with\n -- invalid_capabilities:required:non_string_token. Silent-drop on\n -- `[\"gpu\", null, 42]` would erase real requirements.\n -- * Comma in a token \u{2192} fail (reserved delimiter, see ff_issue_claim_grant).\n -- * ASCII control byte (0x00-0x1F, 0x7F) or space (0x20) \u{2192} fail\n -- (invalid_capabilities:required:control_or_whitespace). Mirrors the\n -- Rust ingress in ff-sdk::FlowFabricWorker::connect and\n -- ff-scheduler::Scheduler::claim_for_worker (R3 relaxed printable-ASCII\n -- check to allow UTF-8 printable above 0x7F while still rejecting\n -- whitespace/control). This is the \"last line of defense\" for admin\n -- direct-HSET bypass: a required cap containing \"\\n\" or \"\\0\" is\n -- impossible to type, impossible to debug, and would silently pin an\n -- execution as unclaimable forever.\n -- * Bounds: same 4096 bytes / 256 tokens ceiling as the worker CSV.\n --\n -- Note on UTF-8: the byte-range test rejects ASCII control + space but\n -- accepts every byte \u{2265} 0x21 except 0x7F (DEL). UTF-8 multibyte sequences\n -- use only bytes 0x80-0xBF (continuation) or 0xC0-0xFD (lead), all above\n -- 0x7F, so i18n caps like \"\u{4e1c}\u{4eac}-gpu\" pass through intact. See RFC-009 \u{a7}7.5.\n local required_caps_csv = nil\n if is_set(A.policy_json) and A.policy_json ~= \"{}\" then\n local ok_decode, policy = pcall(cjson.decode, A.policy_json)\n if not ok_decode then\n return err(\"invalid_policy_json\", \"malformed\")\n end\n if type(policy) == \"table\" and policy.routing_requirements ~= nil then\n if type(policy.routing_requirements) ~= \"table\" then\n return err(\"invalid_policy_json\", \"routing_requirements:not_object\")\n end\n local caps = policy.routing_requirements.required_capabilities\n if caps ~= nil then\n if type(caps) ~= \"table\" then\n return err(\"invalid_capabilities\", \"required:not_array\")\n end\n local list = {}\n for _, cap in ipairs(caps) do\n if type(cap) ~= \"string\" then\n return err(\"invalid_capabilities\", \"required:non_string_token\")\n end\n if #cap == 0 then\n return err(\"invalid_capabilities\", \"required:empty_token\")\n end\n if string.find(cap, \",\", 1, true) then\n return err(\"invalid_capabilities\", \"required:comma_in_token\")\n end\n -- Reject ASCII control (0x00-0x1F), DEL (0x7F), and space (0x20).\n -- Iterating byte-by-byte: any byte in 0x00..=0x20 or == 0x7F fails.\n for i = 1, #cap do\n local b = cap:byte(i)\n if b <= 0x20 or b == 0x7F then\n return err(\"invalid_capabilities\", \"required:control_or_whitespace\")\n end\n end\n list[#list + 1] = cap\n end\n if #list > CAPS_MAX_TOKENS then\n return err(\"invalid_capabilities\", \"required:too_many_tokens\")\n end\n table.sort(list)\n if #list > 0 then\n local csv = table.concat(list, \",\")\n if #csv > CAPS_MAX_BYTES then\n return err(\"invalid_capabilities\", \"required:too_many_bytes\")\n end\n required_caps_csv = csv\n end\n end\n end\n end\n\n -- 3. Determine initial eligibility\n local lifecycle_phase = \"runnable\"\n local eligibility_state, blocking_reason, blocking_detail, public_state\n local is_delayed = is_set(A.delay_until) and tonumber(A.delay_until) > now_ms\n\n if is_delayed then\n eligibility_state = \"not_eligible_until_time\"\n blocking_reason = \"waiting_for_delay\"\n blocking_detail = \"delayed until \" .. A.delay_until\n public_state = \"delayed\"\n else\n eligibility_state = \"eligible_now\"\n blocking_reason = \"waiting_for_worker\"\n blocking_detail = \"\"\n public_state = \"waiting\"\n end\n\n -- 4. Create exec_core \u{2014} ALL 7 state vector dimensions\n redis.call(\"HSET\", K.core_key,\n \"execution_id\", A.execution_id,\n \"namespace\", A.namespace,\n \"lane_id\", A.lane_id,\n \"execution_kind\", A.execution_kind,\n \"partition_id\", A.partition_id,\n \"priority\", A.priority,\n \"creator_identity\", A.creator_identity,\n -- 7 state vector dimensions\n \"lifecycle_phase\", lifecycle_phase,\n \"ownership_state\", \"unowned\",\n \"eligibility_state\", eligibility_state,\n \"blocking_reason\", blocking_reason,\n \"blocking_detail\", blocking_detail,\n \"terminal_outcome\", \"none\",\n \"attempt_state\", \"pending_first_attempt\",\n \"public_state\", public_state,\n -- accounting\n \"total_attempt_count\", \"0\",\n \"current_attempt_index\", \"\",\n \"current_attempt_id\", \"\",\n \"current_lease_id\", \"\",\n \"current_lease_epoch\", \"0\",\n \"current_worker_id\", \"\",\n \"current_worker_instance_id\", \"\",\n \"current_lane\", A.lane_id,\n \"retry_count\", \"0\",\n \"replay_count\", \"0\",\n \"lease_reclaim_count\", \"0\",\n -- timestamps\n \"created_at\", now_ms,\n \"started_at\", \"\",\n \"completed_at\", \"\",\n \"last_transition_at\", now_ms,\n \"last_mutation_at\", now_ms,\n -- lease fields (cleared)\n \"lease_acquired_at\", \"\",\n \"lease_expires_at\", \"\",\n \"lease_last_renewed_at\", \"\",\n \"lease_renewal_deadline\", \"\",\n \"lease_expired_at\", \"\",\n \"lease_revoked_at\", \"\",\n \"lease_revoke_reason\", \"\",\n -- delay\n \"delay_until\", is_delayed and A.delay_until or \"\",\n -- suspension (cleared)\n \"current_suspension_id\", \"\",\n \"current_waitpoint_id\", \"\",\n -- pending lineage (cleared)\n \"pending_retry_reason\", \"\",\n \"pending_replay_reason\", \"\",\n \"pending_replay_requested_by\", \"\",\n \"pending_previous_attempt_index\", \"\",\n -- progress\n \"progress_pct\", \"\",\n \"progress_message\", \"\",\n \"progress_updated_at\", \"\",\n -- flow\n \"flow_id\", \"\")\n\n -- 5. Store payload\n if is_set(A.input_payload) then\n redis.call(\"SET\", K.payload_key, A.input_payload)\n end\n\n -- 6. Store policy + required_capabilities. Validation already ran before\n -- any write (step 2b); here we only persist the pre-validated CSV. See\n -- step 2b for the fail-closed rationale on malformed / typed inputs.\n if is_set(A.policy_json) then\n redis.call(\"SET\", K.policy_key, A.policy_json)\n end\n if required_caps_csv then\n redis.call(\"HSET\", K.core_key, \"required_capabilities\", required_caps_csv)\n end\n\n -- 7. Store tags\n if is_set(A.tags_json) and A.tags_json ~= \"{}\" then\n local ok_decode, tags = pcall(cjson.decode, A.tags_json)\n if ok_decode and type(tags) == \"table\" then\n local flat = {}\n for k, v in pairs(tags) do\n flat[#flat + 1] = k\n flat[#flat + 1] = tostring(v)\n end\n if #flat > 0 then\n redis.call(\"HSET\", K.tags_key, unpack(flat))\n end\n end\n end\n\n -- 8. Add to scheduling index\n if is_delayed then\n redis.call(\"ZADD\", K.scheduling_zset, tonumber(A.delay_until), A.execution_id)\n else\n -- Composite score: -(priority * 1_000_000_000_000) + created_at_ms\n local score = 0 - (A.priority * 1000000000000) + now_ms\n redis.call(\"ZADD\", K.scheduling_zset, score, A.execution_id)\n end\n\n -- 9. SADD to all_executions partition index\n redis.call(\"SADD\", K.all_executions_set, A.execution_id)\n\n -- 10. Execution deadline index\n if is_set(A.execution_deadline_at) then\n redis.call(\"ZADD\", K.deadline_zset, tonumber(A.execution_deadline_at), A.execution_id)\n end\n\n -- 11. Set idempotency key with TTL\n -- Guard: PX 0 or PX <0 causes Valkey error (\"invalid expire time\"),\n -- which would abort the FCALL after exec_core was already written (step 4).\n local dedup_ms = tonumber(A.dedup_ttl_ms) or 0\n if dedup_ms > 0 and K.idem_key ~= \"\" and not string.find(K.idem_key, \"ff:noop:\") then\n redis.call(\"SET\", K.idem_key, A.execution_id,\n \"PX\", dedup_ms)\n end\n\n return ok(A.execution_id, public_state)\nend)\n\n---------------------------------------------------------------------------\n-- #1 ff_claim_execution (new attempt)\n--\n-- Consumes a claim grant, creates a new attempt + lease, transitions\n-- runnable \u{2192} active. Attempt type derived from exec_core attempt_state.\n--\n-- KEYS (14): exec_core, claim_grant, eligible_zset, lease_expiry_zset,\n-- worker_leases, attempt_hash, attempt_usage, attempt_policy,\n-- attempts_zset, lease_current, lease_history, active_index,\n-- attempt_timeout_zset, execution_deadline_zset\n-- ARGV (12): execution_id, worker_id, worker_instance_id, lane,\n-- capability_snapshot_hash, lease_id, lease_ttl_ms,\n-- renew_before_ms, attempt_id, attempt_policy_json,\n-- attempt_timeout_ms, execution_deadline_at\n--\n-- KNOWN LIMITATION (flow-cancel race): ff_claim_execution reads\n-- exec_core on {p:N} but cannot atomically read flow_core on {fp:N}\n-- (cross-slot). If ff_cancel_flow fired and its async member dispatch\n-- was dropped by a transient Valkey error, the member\'s exec_core has\n-- NOT yet been flipped to terminal \u{2014} so a worker may still claim it and\n-- run it to completion inside a cancelled flow. The flow\'s own\n-- public_flow_state is correctly terminal; only this one member escapes.\n-- Mitigations already in place:\n-- * cancel_flow(wait=true) avoids the bg dispatch entirely.\n-- * ff_apply_dependency_to_child rejects additions to terminal flows,\n-- so children cannot be added mid-cancel.\n-- * retention eventually trims the stale member.\n-- Full fix (flag on exec_core maintained by a broadcast loop) is a\n-- deliberate design change deferred past Batch A.\n---------------------------------------------------------------------------\nredis.register_function(\'ff_claim_execution\', function(keys, args)\n local K = {\n core_key = keys[1],\n claim_grant = keys[2],\n eligible_zset = keys[3],\n lease_expiry_key = keys[4],\n worker_leases_key = keys[5],\n attempt_hash = keys[6],\n attempt_usage = keys[7],\n attempt_policy = keys[8],\n attempts_zset = keys[9],\n lease_current_key = keys[10],\n lease_history_key = keys[11],\n active_index_key = keys[12],\n attempt_timeout_key = keys[13],\n execution_deadline_key = keys[14],\n }\n\n local lease_ttl_n = require_number(args[7], \"lease_ttl_ms\")\n if type(lease_ttl_n) == \"table\" then return lease_ttl_n end\n local renew_before_n = require_number(args[8], \"renew_before_ms\")\n if type(renew_before_n) == \"table\" then return renew_before_n end\n\n local A = {\n execution_id = args[1],\n worker_id = args[2],\n worker_instance_id = args[3],\n lane = args[4],\n capability_hash = args[5],\n lease_id = args[6],\n lease_ttl_ms = lease_ttl_n,\n renew_before_ms = renew_before_n,\n attempt_id = args[9],\n attempt_policy_json = args[10],\n attempt_timeout_ms = args[11], -- \"\" or ms\n execution_deadline_at = args[12], -- \"\" or ms\n }\n\n local t = redis.call(\"TIME\")\n local now_ms = tonumber(t[1]) * 1000 + math.floor(tonumber(t[2]) / 1000)\n\n -- 1. Validate execution state\n local raw = redis.call(\"HGETALL\", K.core_key)\n if #raw == 0 then return err(\"execution_not_found\") end\n local core = hgetall_to_table(raw)\n\n if core.lifecycle_phase ~= \"runnable\" then return err(\"execution_not_leaseable\") end\n if core.ownership_state ~= \"unowned\" then return err(\"lease_conflict\") end\n if core.eligibility_state ~= \"eligible_now\" then return err(\"execution_not_leaseable\") end\n if core.terminal_outcome ~= \"none\" then return err(\"execution_not_leaseable\") end\n\n -- Defense-in-depth: invariant A3\n if core.attempt_state == \"running_attempt\" then\n return err(\"active_attempt_exists\")\n end\n -- Dispatch: resume-from-suspension must use claim_resumed_execution\n if core.attempt_state == \"attempt_interrupted\" then\n return err(\"use_claim_resumed_execution\")\n end\n\n -- 2. Validate and consume claim grant\n local grant_raw = redis.call(\"HGETALL\", K.claim_grant)\n if #grant_raw == 0 then return err(\"invalid_claim_grant\") end\n local grant = hgetall_to_table(grant_raw)\n\n if grant.worker_id ~= A.worker_id then\n return err(\"invalid_claim_grant\")\n end\n if is_set(grant.grant_expires_at) and tonumber(grant.grant_expires_at) < now_ms then\n return err(\"claim_grant_expired\")\n end\n redis.call(\"DEL\", K.claim_grant)\n\n -- 3. Compute lease fields\n local next_epoch = tonumber(core.current_lease_epoch or \"0\") + 1\n local expires_at = now_ms + A.lease_ttl_ms\n local renewal_deadline = now_ms + A.renew_before_ms\n local next_att_idx = tonumber(core.total_attempt_count or \"0\")\n\n -- 4. Derive attempt type from exec core attempt_state\n local attempt_type = \"initial\"\n local lineage_fields = {}\n if core.attempt_state == \"pending_retry_attempt\" then\n attempt_type = \"retry\"\n lineage_fields = {\n \"retry_reason\", core.pending_retry_reason or \"\",\n \"previous_attempt_index\", core.pending_previous_attempt_index or \"\"\n }\n elseif core.attempt_state == \"pending_replay_attempt\" then\n attempt_type = \"replay\"\n lineage_fields = {\n \"replay_reason\", core.pending_replay_reason or \"\",\n \"replay_requested_by\", core.pending_replay_requested_by or \"\",\n \"replayed_from_attempt_index\", core.pending_previous_attempt_index or \"\"\n }\n end\n\n -- 5. Create attempt record\n -- Construct actual attempt key from hash tag + computed index.\n -- KEYS[6] is a placeholder (always index 0); on retry/replay the\n -- actual index is > 0, so we build the real key dynamically.\n -- All attempt keys share the same {p:N} hash tag \u{2192} same Cluster slot.\n local tag = string.match(K.core_key, \"(%b{})\")\n local att_key = \"ff:attempt:\" .. tag .. \":\" .. A.execution_id .. \":\" .. tostring(next_att_idx)\n local att_usage_key = att_key .. \":usage\"\n local att_policy_key = att_key .. \":policy\"\n\n local attempt_fields = {\n \"attempt_id\", A.attempt_id,\n \"execution_id\", A.execution_id,\n \"attempt_index\", tostring(next_att_idx),\n \"attempt_type\", attempt_type,\n \"attempt_state\", \"started\",\n \"created_at\", tostring(now_ms),\n \"started_at\", tostring(now_ms),\n \"lease_id\", A.lease_id,\n \"lease_epoch\", tostring(next_epoch),\n \"worker_id\", A.worker_id,\n \"worker_instance_id\", A.worker_instance_id\n }\n for _, v in ipairs(lineage_fields) do\n attempt_fields[#attempt_fields + 1] = v\n end\n redis.call(\"HSET\", att_key, unpack(attempt_fields))\n redis.call(\"ZADD\", K.attempts_zset, now_ms, tostring(next_att_idx))\n\n -- 5b. Initialize attempt usage counters\n redis.call(\"HSET\", att_usage_key,\n \"last_usage_report_seq\", \"0\")\n\n -- 5c. Store attempt policy snapshot\n if is_set(A.attempt_policy_json) then\n local policy_flat = unpack_policy(A.attempt_policy_json)\n if #policy_flat > 0 then\n redis.call(\"HSET\", att_policy_key, unpack(policy_flat))\n end\n end\n\n -- 6. Create lease record\n redis.call(\"DEL\", K.lease_current_key)\n redis.call(\"HSET\", K.lease_current_key,\n \"lease_id\", A.lease_id,\n \"lease_epoch\", tostring(next_epoch),\n \"execution_id\", A.execution_id,\n \"attempt_id\", A.attempt_id,\n \"worker_id\", A.worker_id,\n \"worker_instance_id\", A.worker_instance_id,\n \"acquired_at\", tostring(now_ms),\n \"expires_at\", tostring(expires_at),\n \"last_renewed_at\", tostring(now_ms),\n \"renewal_deadline\", tostring(renewal_deadline))\n\n -- 7. Update execution core \u{2014} ALL 7 state vector dimensions\n redis.call(\"HSET\", K.core_key,\n \"lifecycle_phase\", \"active\",\n \"ownership_state\", \"leased\",\n \"eligibility_state\", \"not_applicable\",\n \"blocking_reason\", \"none\",\n \"blocking_detail\", \"\",\n \"terminal_outcome\", \"none\",\n \"attempt_state\", \"running_attempt\",\n \"public_state\", \"active\",\n \"current_attempt_index\", tostring(next_att_idx),\n \"total_attempt_count\", tostring(next_att_idx + 1),\n \"current_attempt_id\", A.attempt_id,\n \"current_lease_id\", A.lease_id,\n \"current_lease_epoch\", tostring(next_epoch),\n \"current_worker_id\", A.worker_id,\n \"current_worker_instance_id\", A.worker_instance_id,\n \"current_lane\", A.lane,\n \"lease_acquired_at\", tostring(now_ms),\n \"lease_expires_at\", tostring(expires_at),\n \"lease_last_renewed_at\", tostring(now_ms),\n \"lease_renewal_deadline\", tostring(renewal_deadline),\n -- Preserve the first-claim timestamp across retries; only fall\n -- back to `now_ms` when the stored value is empty (initial state\n -- written at HSET, line 208, or after a reset). In Lua the empty\n -- string is truthy, so `core.started_at or now_ms` would wedge at\n -- \"\" forever on the first claim \u{2014} explicit emptiness check fixes\n -- the scenario 4 stage_latency bench (exec_core surfaced this via\n -- the new ExecutionInfo.started_at REST field).\n \"started_at\", (core.started_at ~= nil and core.started_at ~= \"\") and core.started_at or tostring(now_ms),\n -- Clear pending lineage fields (consumed above)\n \"pending_retry_reason\", \"\",\n \"pending_replay_reason\", \"\",\n \"pending_replay_requested_by\", \"\",\n \"pending_previous_attempt_index\", \"\",\n \"last_transition_at\", tostring(now_ms),\n \"last_mutation_at\", tostring(now_ms))\n\n -- 8. Update indexes\n redis.call(\"ZREM\", K.eligible_zset, A.execution_id)\n redis.call(\"ZADD\", K.lease_expiry_key, expires_at, A.execution_id)\n redis.call(\"SADD\", K.worker_leases_key, A.execution_id)\n redis.call(\"ZADD\", K.active_index_key, expires_at, A.execution_id)\n\n -- 8a. Timeout indexes\n if is_set(A.attempt_timeout_ms) and A.attempt_timeout_ms ~= \"0\" then\n redis.call(\"ZADD\", K.attempt_timeout_key,\n now_ms + tonumber(A.attempt_timeout_ms), A.execution_id)\n end\n if is_set(A.execution_deadline_at) and A.execution_deadline_at ~= \"0\" then\n redis.call(\"ZADD\", K.execution_deadline_key,\n tonumber(A.execution_deadline_at), A.execution_id)\n end\n\n -- 9. Lease history event\n redis.call(\"XADD\", K.lease_history_key, \"MAXLEN\", \"~\", 1000, \"*\",\n \"event\", \"acquired\",\n \"lease_id\", A.lease_id,\n \"lease_epoch\", tostring(next_epoch),\n \"attempt_id\", A.attempt_id,\n \"attempt_index\", tostring(next_att_idx),\n \"worker_id\", A.worker_id,\n \"ts\", tostring(now_ms))\n\n return ok(A.lease_id, tostring(next_epoch), tostring(expires_at),\n A.attempt_id, tostring(next_att_idx), attempt_type)\nend)\n\n---------------------------------------------------------------------------\n-- #3 ff_complete_execution\n--\n-- Validate lease, end attempt, store result, transition active\u{2192}terminal.\n--\n-- KEYS (12): exec_core, attempt_hash, lease_expiry_zset, worker_leases,\n-- terminal_zset, lease_current, lease_history, active_index,\n-- stream_meta, result_key, attempt_timeout_zset,\n-- execution_deadline_zset\n-- ARGV (5): execution_id, lease_id, lease_epoch, attempt_id, result_payload\n---------------------------------------------------------------------------\nredis.register_function(\'ff_complete_execution\', function(keys, args)\n local K = {\n core_key = keys[1],\n attempt_hash = keys[2],\n lease_expiry_key = keys[3],\n worker_leases_key = keys[4],\n terminal_zset = keys[5],\n lease_current_key = keys[6],\n lease_history_key = keys[7],\n active_index_key = keys[8],\n stream_meta = keys[9],\n result_key = keys[10],\n attempt_timeout_key = keys[11],\n execution_deadline_key = keys[12],\n }\n\n local A = {\n execution_id = args[1],\n lease_id = args[2],\n lease_epoch = args[3],\n attempt_id = args[4],\n result_payload = args[5] or \"\",\n }\n\n local t = redis.call(\"TIME\")\n local now_ms = tonumber(t[1]) * 1000 + math.floor(tonumber(t[2]) / 1000)\n\n -- Read + validate lease\n local raw = redis.call(\"HGETALL\", K.core_key)\n if #raw == 0 then return err(\"execution_not_found\") end\n local core = hgetall_to_table(raw)\n\n local lease_err = validate_lease_and_mark_expired(\n core, A, now_ms, K, 1000)\n if lease_err then return lease_err end\n\n -- End attempt\n redis.call(\"HSET\", K.attempt_hash,\n \"attempt_state\", \"ended_success\",\n \"ended_at\", tostring(now_ms))\n\n -- Close stream if exists\n if redis.call(\"EXISTS\", K.stream_meta) == 1 then\n redis.call(\"HSET\", K.stream_meta,\n \"closed_at\", tostring(now_ms),\n \"closed_reason\", \"attempt_success\")\n end\n\n -- Store result\n if A.result_payload ~= \"\" then\n redis.call(\"SET\", K.result_key, A.result_payload)\n end\n\n -- Update execution core \u{2014} ALL 7 state vector dimensions\n redis.call(\"HSET\", K.core_key,\n \"lifecycle_phase\", \"terminal\",\n \"ownership_state\", \"unowned\",\n \"eligibility_state\", \"not_applicable\",\n \"blocking_reason\", \"none\",\n \"blocking_detail\", \"\",\n \"terminal_outcome\", \"success\",\n \"attempt_state\", \"attempt_terminal\",\n \"public_state\", \"completed\",\n \"completed_at\", tostring(now_ms),\n \"current_lease_id\", \"\",\n \"current_worker_id\", \"\",\n \"current_worker_instance_id\", \"\",\n \"lease_expires_at\", \"\",\n \"lease_last_renewed_at\", \"\",\n \"lease_renewal_deadline\", \"\",\n \"lease_expired_at\", \"\",\n \"lease_revoked_at\", \"\",\n \"lease_revoke_reason\", \"\",\n \"last_transition_at\", tostring(now_ms),\n \"last_mutation_at\", tostring(now_ms))\n\n -- Update indexes\n redis.call(\"ZREM\", K.lease_expiry_key, A.execution_id)\n redis.call(\"SREM\", K.worker_leases_key, A.execution_id)\n redis.call(\"ZADD\", K.terminal_zset, now_ms, A.execution_id)\n redis.call(\"ZREM\", K.active_index_key, A.execution_id)\n redis.call(\"ZREM\", K.attempt_timeout_key, A.execution_id)\n redis.call(\"ZREM\", K.execution_deadline_key, A.execution_id)\n\n -- Clean up lease\n redis.call(\"DEL\", K.lease_current_key)\n redis.call(\"XADD\", K.lease_history_key, \"MAXLEN\", \"~\", 1000, \"*\",\n \"event\", \"released\",\n \"lease_id\", A.lease_id,\n \"lease_epoch\", A.lease_epoch,\n \"attempt_index\", core.current_attempt_index or \"\",\n \"reason\", \"completed\",\n \"ts\", tostring(now_ms))\n\n -- Push-based DAG promotion (Batch C item 6). Only publish when the\n -- execution actually belongs to a flow \u{2014} standalone executions never\n -- have downstream edges for the engine to resolve. The engine\'s\n -- CompletionListener scanner SUBSCRIBEs to ff:dag:completions and\n -- calls dispatch_dependency_resolution per message. The\n -- dependency_reconciler interval scan is retained as a safety net\n -- for: missed messages during listener restart, ungated state\n -- (flow_id empty on older executions), and cluster broadcast gaps.\n if is_set(core.flow_id) then\n local payload = cjson.encode({\n execution_id = A.execution_id,\n flow_id = core.flow_id,\n outcome = \"success\",\n })\n redis.call(\"PUBLISH\", \"ff:dag:completions\", payload)\n end\n\n return ok(\"completed\")\nend)\n\n---------------------------------------------------------------------------\n-- #12a ff_cancel_execution\n--\n-- Cancel from any non-terminal state. Multi-path:\n-- active \u{2192} validate lease or operator override, end attempt, clear lease\n-- runnable \u{2192} defensive ZREM all indexes\n-- suspended \u{2192} close suspension/waitpoint, end attempt\n-- All paths: terminal(cancelled), defensive ZREM, ZADD terminal.\n--\n-- KEYS (21): exec_core, attempt_hash, stream_meta, lease_current,\n-- lease_history, lease_expiry_zset, worker_leases,\n-- suspension_current, waitpoint_hash, wp_condition,\n-- suspension_timeout_zset, terminal_zset,\n-- attempt_timeout_zset, execution_deadline_zset,\n-- eligible_zset, delayed_zset, blocked_deps_zset,\n-- blocked_budget_zset, blocked_quota_zset,\n-- blocked_route_zset, blocked_operator_zset\n-- ARGV (5): execution_id, reason, source, lease_id, lease_epoch\n---------------------------------------------------------------------------\nredis.register_function(\'ff_cancel_execution\', function(keys, args)\n local K = {\n core_key = keys[1],\n attempt_hash = keys[2],\n stream_meta = keys[3],\n lease_current_key = keys[4],\n lease_history_key = keys[5],\n lease_expiry_key = keys[6],\n worker_leases_key = keys[7],\n suspension_current = keys[8],\n waitpoint_hash = keys[9],\n wp_condition = keys[10],\n suspension_timeout_key = keys[11],\n terminal_key = keys[12],\n attempt_timeout_key = keys[13],\n execution_deadline_key = keys[14],\n eligible_key = keys[15],\n delayed_key = keys[16],\n blocked_deps_key = keys[17],\n blocked_budget_key = keys[18],\n blocked_quota_key = keys[19],\n blocked_route_key = keys[20],\n blocked_operator_key = keys[21],\n -- active_index_key and suspended_key are constructed dynamically below\n -- from the hash tag + lane_id (avoids changing KEYS count).\n active_index_key = nil,\n suspended_key = nil,\n }\n\n local A = {\n execution_id = args[1],\n reason = args[2],\n source = args[3] or \"\", -- \"operator_override\" or \"\"\n lease_id = args[4] or \"\",\n lease_epoch = args[5] or \"\",\n }\n\n local t = redis.call(\"TIME\")\n local now_ms = tonumber(t[1]) * 1000 + math.floor(tonumber(t[2]) / 1000)\n\n local raw = redis.call(\"HGETALL\", K.core_key)\n if #raw == 0 then return err(\"execution_not_found\") end\n local core = hgetall_to_table(raw)\n\n -- Construct lane:active and lane:suspended keys dynamically from hash tag.\n -- These are not in the 21 KEYS array but defensive_zrem_all_indexes needs\n -- them to clean stale entries when cancelling active or suspended executions.\n local tag = string.match(K.core_key, \"(%b{})\")\n local lane = core.lane_id or \"default\"\n K.active_index_key = \"ff:idx:\" .. tag .. \":lane:\" .. lane .. \":active\"\n K.suspended_key = \"ff:idx:\" .. tag .. \":lane:\" .. lane .. \":suspended\"\n\n -- Already terminal\n if core.lifecycle_phase == \"terminal\" then\n return err(\"execution_not_active\",\n core.terminal_outcome or \"\", core.current_lease_epoch or \"\")\n end\n\n local cancelled_from = core.lifecycle_phase\n\n -- PATH: Active\n if core.lifecycle_phase == \"active\" then\n -- Require lease validation or operator override\n if A.source ~= \"operator_override\" then\n if core.ownership_state == \"lease_revoked\" then\n return err(\"lease_revoked\")\n end\n if is_set(A.lease_id) then\n if core.current_lease_id ~= A.lease_id then return err(\"stale_lease\") end\n if core.current_lease_epoch ~= A.lease_epoch then return err(\"stale_lease\") end\n end\n end\n\n -- End attempt\n if is_set(core.current_attempt_index) then\n redis.call(\"HSET\", K.attempt_hash,\n \"attempt_state\", \"ended_cancelled\",\n \"ended_at\", tostring(now_ms),\n \"failure_reason\", \"cancelled: \" .. A.reason)\n end\n\n -- Close stream if exists\n if redis.call(\"EXISTS\", K.stream_meta) == 1 then\n redis.call(\"HSET\", K.stream_meta,\n \"closed_at\", tostring(now_ms),\n \"closed_reason\", \"attempt_cancelled\")\n end\n\n -- Release lease\n redis.call(\"DEL\", K.lease_current_key)\n redis.call(\"XADD\", K.lease_history_key, \"MAXLEN\", \"~\", \"1000\", \"*\",\n \"event\", \"released\",\n \"lease_id\", core.current_lease_id or \"\",\n \"lease_epoch\", core.current_lease_epoch or \"\",\n \"attempt_index\", core.current_attempt_index or \"\",\n \"reason\", \"cancelled\",\n \"ts\", tostring(now_ms))\n end\n\n -- PATH: Suspended\n if core.lifecycle_phase == \"suspended\" then\n -- End attempt (interrupted \u{2192} ended_cancelled)\n if is_set(core.current_attempt_index) then\n redis.call(\"HSET\", K.attempt_hash,\n \"attempt_state\", \"ended_cancelled\",\n \"ended_at\", tostring(now_ms),\n \"failure_reason\", \"cancelled: \" .. A.reason)\n end\n\n -- Close stream if exists\n if redis.call(\"EXISTS\", K.stream_meta) == 1 then\n redis.call(\"HSET\", K.stream_meta,\n \"closed_at\", tostring(now_ms),\n \"closed_reason\", \"attempt_cancelled\")\n end\n\n -- Close suspension + waitpoint + wp_condition\n if redis.call(\"EXISTS\", K.suspension_current) == 1 then\n redis.call(\"HSET\", K.suspension_current,\n \"closed_at\", tostring(now_ms),\n \"close_reason\", \"cancelled\")\n end\n if redis.call(\"EXISTS\", K.waitpoint_hash) == 1 then\n redis.call(\"HSET\", K.waitpoint_hash,\n \"state\", \"closed\",\n \"closed_at\", tostring(now_ms),\n \"close_reason\", \"cancelled\")\n end\n if redis.call(\"EXISTS\", K.wp_condition) == 1 then\n redis.call(\"HSET\", K.wp_condition,\n \"closed\", \"1\",\n \"closed_at\", tostring(now_ms),\n \"closed_reason\", \"cancelled\")\n end\n end\n\n -- ALL PATHS: exec_core FIRST for terminal transition (\u{a7}4.8b Rule 2)\n redis.call(\"HSET\", K.core_key,\n \"lifecycle_phase\", \"terminal\",\n \"ownership_state\", \"unowned\",\n \"eligibility_state\", \"not_applicable\",\n \"blocking_reason\", \"none\",\n \"blocking_detail\", \"\",\n \"terminal_outcome\", \"cancelled\",\n \"attempt_state\", is_set(core.current_attempt_index) and \"attempt_terminal\" or (core.attempt_state or \"none\"),\n \"public_state\", \"cancelled\",\n \"cancellation_reason\", A.reason,\n \"cancelled_by\", A.source ~= \"\" and A.source or A.execution_id,\n \"completed_at\", tostring(now_ms),\n \"current_lease_id\", \"\",\n \"current_worker_id\", \"\",\n \"current_worker_instance_id\", \"\",\n \"lease_expires_at\", \"\",\n \"lease_last_renewed_at\", \"\",\n \"lease_renewal_deadline\", \"\",\n \"current_suspension_id\", \"\",\n \"current_waitpoint_id\", \"\",\n \"last_transition_at\", tostring(now_ms),\n \"last_mutation_at\", tostring(now_ms))\n\n -- Defensive ZREM from ALL scheduling + timeout indexes\n defensive_zrem_all_indexes(K, A.execution_id, K.terminal_key)\n\n -- ZADD terminal (unconditional)\n redis.call(\"ZADD\", K.terminal_key, now_ms, A.execution_id)\n\n -- Push-based DAG promotion (Batch C item 6). Skip the publish when\n -- the cancel came from `flow_cascade` \u{2014} that source means a parent\n -- already propagated and the engine is walking the edge graph; an\n -- additional publish here would trigger redundant dispatch work.\n -- (Not a correctness issue since dispatch is idempotent, but it\n -- avoids unnecessary load under wide fan-out cancellations.)\n if is_set(core.flow_id) and A.source ~= \"flow_cascade\" then\n local payload = cjson.encode({\n execution_id = A.execution_id,\n flow_id = core.flow_id,\n outcome = \"cancelled\",\n })\n redis.call(\"PUBLISH\", \"ff:dag:completions\", payload)\n end\n\n return ok(\"cancelled\", cancelled_from)\nend)\n\n---------------------------------------------------------------------------\n-- #30 ff_delay_execution\n--\n-- Worker delays its own active execution. Releases lease, transitions\n-- to runnable + not_eligible_until_time. Same attempt continues (paused).\n--\n-- KEYS (9): exec_core, attempt_hash, lease_current, lease_history,\n-- lease_expiry_zset, worker_leases, active_index,\n-- delayed_zset, attempt_timeout_zset\n-- ARGV (5): execution_id, lease_id, lease_epoch, attempt_id, delay_until\n---------------------------------------------------------------------------\nredis.register_function(\'ff_delay_execution\', function(keys, args)\n local K = {\n core_key = keys[1],\n attempt_hash = keys[2],\n lease_current_key = keys[3],\n lease_history_key = keys[4],\n lease_expiry_key = keys[5],\n worker_leases_key = keys[6],\n active_index_key = keys[7],\n delayed_zset = keys[8],\n attempt_timeout_key = keys[9],\n }\n\n local A = {\n execution_id = args[1],\n lease_id = args[2],\n lease_epoch = args[3],\n attempt_id = args[4],\n delay_until = args[5],\n }\n\n local t = redis.call(\"TIME\")\n local now_ms = tonumber(t[1]) * 1000 + math.floor(tonumber(t[2]) / 1000)\n\n local raw = redis.call(\"HGETALL\", K.core_key)\n if #raw == 0 then return err(\"execution_not_found\") end\n local core = hgetall_to_table(raw)\n\n -- Validate lease\n local lease_err = validate_lease_and_mark_expired(\n core, A, now_ms, K, 1000)\n if lease_err then return lease_err end\n\n -- OOM-SAFE WRITE ORDERING: exec_core FIRST (point of no return)\n -- ALL 7 state vector dimensions\n redis.call(\"HSET\", K.core_key,\n \"lifecycle_phase\", \"runnable\",\n \"ownership_state\", \"unowned\",\n \"eligibility_state\", \"not_eligible_until_time\",\n \"blocking_reason\", \"waiting_for_delay\",\n \"blocking_detail\", \"delayed until \" .. A.delay_until,\n \"terminal_outcome\", \"none\",\n \"attempt_state\", \"attempt_interrupted\",\n \"public_state\", \"delayed\",\n \"delay_until\", A.delay_until,\n \"current_lease_id\", \"\",\n \"current_worker_id\", \"\",\n \"current_worker_instance_id\", \"\",\n \"lease_expires_at\", \"\",\n \"lease_last_renewed_at\", \"\",\n \"lease_renewal_deadline\", \"\",\n \"last_transition_at\", tostring(now_ms),\n \"last_mutation_at\", tostring(now_ms))\n\n -- Pause attempt: started \u{2192} suspended\n redis.call(\"HSET\", K.attempt_hash,\n \"attempt_state\", \"suspended\",\n \"suspended_at\", tostring(now_ms),\n \"suspension_id\", \"worker_delay\")\n\n -- Release lease + update indexes\n redis.call(\"DEL\", K.lease_current_key)\n redis.call(\"ZREM\", K.lease_expiry_key, A.execution_id)\n redis.call(\"SREM\", K.worker_leases_key, A.execution_id)\n redis.call(\"ZREM\", K.active_index_key, A.execution_id)\n redis.call(\"ZREM\", K.attempt_timeout_key, A.execution_id)\n\n -- Add to delayed set\n redis.call(\"ZADD\", K.delayed_zset, tonumber(A.delay_until), A.execution_id)\n\n -- Lease history event\n redis.call(\"XADD\", K.lease_history_key, \"MAXLEN\", \"~\", \"1000\", \"*\",\n \"event\", \"released\",\n \"lease_id\", A.lease_id,\n \"lease_epoch\", A.lease_epoch,\n \"attempt_index\", core.current_attempt_index or \"\",\n \"attempt_id\", A.attempt_id,\n \"reason\", \"worker_delay\",\n \"ts\", tostring(now_ms))\n\n return ok(A.delay_until)\nend)\n\n---------------------------------------------------------------------------\n-- #31 ff_move_to_waiting_children\n--\n-- Worker blocks on child dependencies. Releases lease, transitions to\n-- runnable + blocked_by_dependencies. Same attempt continues (paused).\n--\n-- KEYS (9): exec_core, attempt_hash, lease_current, lease_history,\n-- lease_expiry_zset, worker_leases, active_index,\n-- blocked_deps_zset, attempt_timeout_zset\n-- ARGV (4): execution_id, lease_id, lease_epoch, attempt_id\n---------------------------------------------------------------------------\nredis.register_function(\'ff_move_to_waiting_children\', function(keys, args)\n local K = {\n core_key = keys[1],\n attempt_hash = keys[2],\n lease_current_key = keys[3],\n lease_history_key = keys[4],\n lease_expiry_key = keys[5],\n worker_leases_key = keys[6],\n active_index_key = keys[7],\n blocked_deps_zset = keys[8],\n attempt_timeout_key = keys[9],\n }\n\n local A = {\n execution_id = args[1],\n lease_id = args[2],\n lease_epoch = args[3],\n attempt_id = args[4],\n }\n\n local t = redis.call(\"TIME\")\n local now_ms = tonumber(t[1]) * 1000 + math.floor(tonumber(t[2]) / 1000)\n\n local raw = redis.call(\"HGETALL\", K.core_key)\n if #raw == 0 then return err(\"execution_not_found\") end\n local core = hgetall_to_table(raw)\n\n -- Validate lease\n local lease_err = validate_lease_and_mark_expired(\n core, A, now_ms, K, 1000)\n if lease_err then return lease_err end\n\n -- OOM-SAFE WRITE ORDERING: exec_core FIRST (point of no return, \u{a7}4.8b Rule 2)\n -- ALL 7 state vector dimensions\n redis.call(\"HSET\", K.core_key,\n \"lifecycle_phase\", \"runnable\",\n \"ownership_state\", \"unowned\",\n \"eligibility_state\", \"blocked_by_dependencies\",\n \"blocking_reason\", \"waiting_for_children\",\n \"blocking_detail\", \"waiting for child executions to complete\",\n \"terminal_outcome\", \"none\",\n \"attempt_state\", \"attempt_interrupted\",\n \"public_state\", \"waiting_children\",\n \"current_lease_id\", \"\",\n \"current_worker_id\", \"\",\n \"current_worker_instance_id\", \"\",\n \"lease_expires_at\", \"\",\n \"lease_last_renewed_at\", \"\",\n \"lease_renewal_deadline\", \"\",\n \"last_transition_at\", tostring(now_ms),\n \"last_mutation_at\", tostring(now_ms))\n\n -- Pause attempt: started \u{2192} suspended (waiting children, not ended)\n redis.call(\"HSET\", K.attempt_hash,\n \"attempt_state\", \"suspended\",\n \"suspended_at\", tostring(now_ms),\n \"suspension_id\", \"waiting_children\")\n\n -- Release lease + update indexes\n redis.call(\"DEL\", K.lease_current_key)\n redis.call(\"ZREM\", K.lease_expiry_key, A.execution_id)\n redis.call(\"SREM\", K.worker_leases_key, A.execution_id)\n redis.call(\"ZREM\", K.active_index_key, A.execution_id)\n redis.call(\"ZREM\", K.attempt_timeout_key, A.execution_id)\n\n -- Lease history event\n redis.call(\"XADD\", K.lease_history_key, \"MAXLEN\", \"~\", \"1000\", \"*\",\n \"event\", \"released\",\n \"lease_id\", A.lease_id,\n \"lease_epoch\", A.lease_epoch,\n \"attempt_index\", core.current_attempt_index or \"\",\n \"attempt_id\", A.attempt_id,\n \"reason\", \"waiting_children\",\n \"ts\", tostring(now_ms))\n\n -- Add to blocked:dependencies set\n redis.call(\"ZADD\", K.blocked_deps_zset,\n tonumber(core.created_at or \"0\"), A.execution_id)\n\n return ok()\nend)\n\n---------------------------------------------------------------------------\n-- #4 ff_fail_execution\n--\n-- Fail an active execution. If retries remain: set pending_retry_attempt\n-- + lineage on exec_core (deferred creation \u{2014} claim_execution creates\n-- the attempt, per R21 fix). If max retries reached: terminal failure.\n--\n-- KEYS (12): exec_core, attempt_hash, lease_expiry_zset, worker_leases,\n-- terminal_zset, delayed_zset, lease_current, lease_history,\n-- active_index, stream_meta, attempt_timeout_zset,\n-- execution_deadline_zset\n-- ARGV (7): execution_id, lease_id, lease_epoch, attempt_id,\n-- failure_reason, failure_category, retry_policy_json\n---------------------------------------------------------------------------\nredis.register_function(\'ff_fail_execution\', function(keys, args)\n local K = {\n core_key = keys[1],\n attempt_hash = keys[2],\n lease_expiry_key = keys[3],\n worker_leases_key = keys[4],\n terminal_key = keys[5],\n delayed_zset = keys[6],\n lease_current_key = keys[7],\n lease_history_key = keys[8],\n active_index_key = keys[9],\n stream_meta = keys[10],\n attempt_timeout_key = keys[11],\n execution_deadline_key = keys[12],\n }\n\n local A = {\n execution_id = args[1],\n lease_id = args[2],\n lease_epoch = args[3],\n attempt_id = args[4],\n failure_reason = args[5],\n failure_category = args[6],\n retry_policy_json = args[7] or \"\",\n }\n\n local t = redis.call(\"TIME\")\n local now_ms = tonumber(t[1]) * 1000 + math.floor(tonumber(t[2]) / 1000)\n\n -- 1. Read + validate lease\n local raw = redis.call(\"HGETALL\", K.core_key)\n if #raw == 0 then return err(\"execution_not_found\") end\n local core = hgetall_to_table(raw)\n\n local lease_err = validate_lease_and_mark_expired(\n core, A, now_ms, K, 1000)\n if lease_err then return lease_err end\n\n -- 2. End current attempt\n redis.call(\"HSET\", K.attempt_hash,\n \"attempt_state\", \"ended_failure\",\n \"ended_at\", tostring(now_ms),\n \"failure_reason\", A.failure_reason,\n \"failure_category\", A.failure_category)\n\n -- 3. Close stream if exists\n if redis.call(\"EXISTS\", K.stream_meta) == 1 then\n redis.call(\"HSET\", K.stream_meta,\n \"closed_at\", tostring(now_ms),\n \"closed_reason\", \"attempt_failure\")\n end\n\n -- 4. Determine retry eligibility\n local retry_count = tonumber(core.retry_count or \"0\")\n local max_retries = 0\n local backoff_ms = 1000\n local can_retry = false\n\n if is_set(A.retry_policy_json) then\n local ok_decode, policy = pcall(cjson.decode, A.retry_policy_json)\n if ok_decode and type(policy) == \"table\" then\n max_retries = tonumber(policy.max_retries or \"0\")\n if retry_count < max_retries then\n can_retry = true\n local bt = policy.backoff or {}\n if bt.type == \"exponential\" then\n local initial = (tonumber(bt.initial_delay_ms) or 1000)\n local max_d = (tonumber(bt.max_delay_ms) or 60000)\n local mult = (tonumber(bt.multiplier) or 2)\n backoff_ms = math.min(initial * (mult ^ retry_count), max_d)\n elseif bt.type == \"fixed\" then\n backoff_ms = (tonumber(bt.delay_ms) or 1000)\n end\n end\n end\n end\n\n if can_retry then\n -- RETRY PATH: deferred attempt creation (R21 fix)\n -- Do NOT create attempt record here. claim_execution creates the\n -- attempt with correct type (retry) by reading pending_retry_attempt.\n local delay_until = now_ms + backoff_ms\n\n -- ALL 7 state vector dimensions + pending lineage\n redis.call(\"HSET\", K.core_key,\n \"lifecycle_phase\", \"runnable\",\n \"ownership_state\", \"unowned\",\n \"eligibility_state\", \"not_eligible_until_time\",\n \"blocking_reason\", \"waiting_for_retry_backoff\",\n \"blocking_detail\", \"retry backoff until \" .. tostring(delay_until) ..\n \" (attempt \" .. (core.current_attempt_index or \"0\") ..\n \" failed: \" .. A.failure_reason .. \")\",\n \"terminal_outcome\", \"none\",\n \"attempt_state\", \"pending_retry_attempt\",\n \"public_state\", \"delayed\",\n -- Pending lineage for claim_execution to consume\n \"pending_retry_reason\", A.failure_reason,\n \"pending_previous_attempt_index\", core.current_attempt_index or \"0\",\n \"retry_count\", tostring(retry_count + 1),\n \"current_attempt_id\", \"\",\n \"current_lease_id\", \"\",\n \"current_worker_id\", \"\",\n \"current_worker_instance_id\", \"\",\n \"lease_expires_at\", \"\",\n \"lease_last_renewed_at\", \"\",\n \"lease_renewal_deadline\", \"\",\n \"delay_until\", tostring(delay_until),\n \"failure_reason\", A.failure_reason,\n \"last_transition_at\", tostring(now_ms),\n \"last_mutation_at\", tostring(now_ms))\n\n -- Release lease + update indexes\n redis.call(\"DEL\", K.lease_current_key)\n redis.call(\"ZREM\", K.lease_expiry_key, A.execution_id)\n redis.call(\"SREM\", K.worker_leases_key, A.execution_id)\n redis.call(\"ZREM\", K.active_index_key, A.execution_id)\n redis.call(\"ZREM\", K.attempt_timeout_key, A.execution_id)\n\n -- ZADD to delayed index\n redis.call(\"ZADD\", K.delayed_zset, delay_until, A.execution_id)\n\n -- Lease history\n redis.call(\"XADD\", K.lease_history_key, \"MAXLEN\", \"~\", \"1000\", \"*\",\n \"event\", \"released\",\n \"lease_id\", A.lease_id,\n \"lease_epoch\", A.lease_epoch,\n \"attempt_index\", core.current_attempt_index or \"\",\n \"reason\", \"failed_retry_scheduled\",\n \"ts\", tostring(now_ms))\n\n return ok(\"retry_scheduled\", tostring(delay_until))\n else\n -- TERMINAL PATH\n redis.call(\"HSET\", K.core_key,\n \"lifecycle_phase\", \"terminal\",\n \"ownership_state\", \"unowned\",\n \"eligibility_state\", \"not_applicable\",\n \"blocking_reason\", \"none\",\n \"blocking_detail\", \"\",\n \"terminal_outcome\", \"failed\",\n \"attempt_state\", \"attempt_terminal\",\n \"public_state\", \"failed\",\n \"failure_reason\", A.failure_reason,\n \"completed_at\", tostring(now_ms),\n \"current_lease_id\", \"\",\n \"current_worker_id\", \"\",\n \"current_worker_instance_id\", \"\",\n \"lease_expires_at\", \"\",\n \"lease_last_renewed_at\", \"\",\n \"lease_renewal_deadline\", \"\",\n \"last_transition_at\", tostring(now_ms),\n \"last_mutation_at\", tostring(now_ms))\n\n -- Release lease + cleanup\n redis.call(\"DEL\", K.lease_current_key)\n redis.call(\"ZREM\", K.lease_expiry_key, A.execution_id)\n redis.call(\"SREM\", K.worker_leases_key, A.execution_id)\n redis.call(\"ZREM\", K.active_index_key, A.execution_id)\n redis.call(\"ZREM\", K.attempt_timeout_key, A.execution_id)\n redis.call(\"ZREM\", K.execution_deadline_key, A.execution_id)\n redis.call(\"ZADD\", K.terminal_key, now_ms, A.execution_id)\n\n redis.call(\"XADD\", K.lease_history_key, \"MAXLEN\", \"~\", \"1000\", \"*\",\n \"event\", \"released\",\n \"lease_id\", A.lease_id,\n \"lease_epoch\", A.lease_epoch,\n \"attempt_index\", core.current_attempt_index or \"\",\n \"reason\", \"failed_terminal\",\n \"ts\", tostring(now_ms))\n\n -- Push-based DAG promotion (Batch C item 6). See ff_complete_execution\n -- for rationale. A terminal-failed upstream triggers\n -- child-skip cascades via ff_resolve_dependency on the receiver side.\n if is_set(core.flow_id) then\n local payload = cjson.encode({\n execution_id = A.execution_id,\n flow_id = core.flow_id,\n outcome = \"failed\",\n })\n redis.call(\"PUBLISH\", \"ff:dag:completions\", payload)\n end\n\n return ok(\"terminal_failed\")\n end\nend)\n\n---------------------------------------------------------------------------\n-- #26 ff_reclaim_execution\n--\n-- Atomically reclaim an expired/revoked execution: interrupt old attempt,\n-- create new attempt + new lease.\n--\n-- KEYS (14): exec_core, claim_grant, old_attempt_hash, old_stream_meta,\n-- new_attempt_hash, new_attempt_usage, attempts_zset,\n-- lease_current, lease_history, lease_expiry_zset,\n-- worker_leases, active_index, attempt_timeout_zset,\n-- execution_deadline_zset\n-- ARGV (8): execution_id, worker_id, worker_instance_id, lane,\n-- lease_id, lease_ttl_ms, attempt_id, attempt_policy_json\n---------------------------------------------------------------------------\nredis.register_function(\'ff_reclaim_execution\', function(keys, args)\n local K = {\n core_key = keys[1],\n claim_grant = keys[2],\n old_attempt_hash = keys[3],\n old_stream_meta = keys[4],\n new_attempt_hash = keys[5],\n new_attempt_usage = keys[6],\n attempts_zset = keys[7],\n lease_current_key = keys[8],\n lease_history_key = keys[9],\n lease_expiry_key = keys[10],\n worker_leases_key = keys[11],\n active_index_key = keys[12],\n attempt_timeout_key = keys[13],\n execution_deadline_key = keys[14],\n }\n\n local reclaim_ttl_n = require_number(args[6], \"lease_ttl_ms\")\n if type(reclaim_ttl_n) == \"table\" then return reclaim_ttl_n end\n\n local A = {\n execution_id = args[1],\n worker_id = args[2],\n worker_instance_id = args[3],\n lane = args[4],\n lease_id = args[5],\n lease_ttl_ms = reclaim_ttl_n,\n attempt_id = args[7],\n attempt_policy_json = args[8] or \"\",\n }\n\n local t = redis.call(\"TIME\")\n local now_ms = tonumber(t[1]) * 1000 + math.floor(tonumber(t[2]) / 1000)\n\n -- 1. Validate execution state\n local raw = redis.call(\"HGETALL\", K.core_key)\n if #raw == 0 then return err(\"execution_not_found\") end\n local core = hgetall_to_table(raw)\n\n -- Must be active + reclaimable\n if core.lifecycle_phase ~= \"active\" then\n return err(\"execution_not_reclaimable\")\n end\n if core.ownership_state ~= \"lease_expired_reclaimable\"\n and core.ownership_state ~= \"lease_revoked\" then\n return err(\"execution_not_reclaimable\")\n end\n\n -- Check max_reclaim_count\n local reclaim_count = tonumber(core.lease_reclaim_count or \"0\")\n local max_reclaim = 100 -- default\n -- Read from policy if available\n local policy_key = string.gsub(K.core_key, \":core$\", \":policy\")\n local policy_raw = redis.call(\"GET\", policy_key)\n if policy_raw then\n local ok_p, policy = pcall(cjson.decode, policy_raw)\n if ok_p and type(policy) == \"table\" then\n max_reclaim = tonumber(policy.max_reclaim_count or \"100\")\n end\n end\n\n if reclaim_count >= max_reclaim then\n -- Terminal: max reclaims exceeded\n redis.call(\"HSET\", K.core_key,\n \"lifecycle_phase\", \"terminal\",\n \"ownership_state\", \"unowned\",\n \"eligibility_state\", \"not_applicable\",\n \"blocking_reason\", \"none\",\n \"blocking_detail\", \"\",\n \"terminal_outcome\", \"failed\",\n \"attempt_state\", \"attempt_terminal\",\n \"public_state\", \"failed\",\n \"failure_reason\", \"max_reclaims_exceeded\",\n \"completed_at\", tostring(now_ms),\n \"current_lease_id\", \"\",\n \"current_worker_id\", \"\",\n \"current_worker_instance_id\", \"\",\n \"last_transition_at\", tostring(now_ms),\n \"last_mutation_at\", tostring(now_ms))\n\n redis.call(\"DEL\", K.lease_current_key)\n redis.call(\"ZREM\", K.lease_expiry_key, A.execution_id)\n redis.call(\"SREM\", K.worker_leases_key, A.execution_id)\n -- Dynamic worker_leases SREM: K.worker_leases_key targets the NEW (reclaiming)\n -- worker\'s set, but the entry is in the OLD (expired) worker\'s set. Use\n -- current_worker_instance_id from exec_core to SREM from the correct set.\n local wiid = core.current_worker_instance_id or \"\"\n if wiid ~= \"\" then\n local tag_wl = string.match(K.core_key, \"(%b{})\")\n redis.call(\"SREM\", \"ff:idx:\" .. tag_wl .. \":worker:\" .. wiid .. \":leases\", A.execution_id)\n end\n redis.call(\"ZREM\", K.active_index_key, A.execution_id)\n redis.call(\"ZREM\", K.attempt_timeout_key, A.execution_id)\n redis.call(\"ZREM\", K.execution_deadline_key, A.execution_id)\n\n -- ZADD terminal (construct key from hash tag + lane)\n local tag = string.match(K.core_key, \"(%b{})\")\n local lane = core.lane_id or core.current_lane or \"default\"\n local terminal_key = \"ff:idx:\" .. tag .. \":lane:\" .. lane .. \":terminal\"\n redis.call(\"ZADD\", terminal_key, now_ms, A.execution_id)\n\n return err(\"max_retries_exhausted\")\n end\n\n -- 2. Validate and consume claim grant\n local grant_raw = redis.call(\"HGETALL\", K.claim_grant)\n if #grant_raw == 0 then return err(\"invalid_claim_grant\") end\n local grant = hgetall_to_table(grant_raw)\n if grant.worker_id ~= A.worker_id then return err(\"invalid_claim_grant\") end\n redis.call(\"DEL\", K.claim_grant)\n\n -- 3. Interrupt old attempt\n local old_att_idx = core.current_attempt_index or \"0\"\n redis.call(\"HSET\", K.old_attempt_hash,\n \"attempt_state\", \"interrupted_reclaimed\",\n \"ended_at\", tostring(now_ms),\n \"failure_reason\", \"lease_\" .. (core.ownership_state or \"expired\"))\n\n -- Close old stream if exists\n if redis.call(\"EXISTS\", K.old_stream_meta) == 1 then\n redis.call(\"HSET\", K.old_stream_meta,\n \"closed_at\", tostring(now_ms),\n \"closed_reason\", \"reclaimed\")\n end\n\n -- 4. Create new attempt\n -- Construct actual attempt key from hash tag + computed index\n -- (same dynamic-key pattern as ff_claim_execution).\n local next_epoch = tonumber(core.current_lease_epoch or \"0\") + 1\n local next_att_idx = tonumber(core.total_attempt_count or \"0\")\n local expires_at = now_ms + A.lease_ttl_ms\n local renewal_deadline = now_ms + math.floor(A.lease_ttl_ms * 2 / 3)\n\n local tag = string.match(K.core_key, \"(%b{})\")\n local att_key = \"ff:attempt:\" .. tag .. \":\" .. A.execution_id .. \":\" .. tostring(next_att_idx)\n local att_usage_key = att_key .. \":usage\"\n\n redis.call(\"HSET\", att_key,\n \"attempt_id\", A.attempt_id,\n \"execution_id\", A.execution_id,\n \"attempt_index\", tostring(next_att_idx),\n \"attempt_type\", \"reclaim\",\n \"attempt_state\", \"started\",\n \"created_at\", tostring(now_ms),\n \"started_at\", tostring(now_ms),\n \"lease_id\", A.lease_id,\n \"lease_epoch\", tostring(next_epoch),\n \"worker_id\", A.worker_id,\n \"worker_instance_id\", A.worker_instance_id,\n \"reclaim_reason\", \"lease_\" .. (core.ownership_state or \"expired\"),\n \"previous_attempt_index\", old_att_idx)\n redis.call(\"ZADD\", K.attempts_zset, now_ms, tostring(next_att_idx))\n\n -- Initialize usage counters\n redis.call(\"HSET\", att_usage_key, \"last_usage_report_seq\", \"0\")\n\n -- 5. Create new lease\n redis.call(\"DEL\", K.lease_current_key)\n redis.call(\"HSET\", K.lease_current_key,\n \"lease_id\", A.lease_id,\n \"lease_epoch\", tostring(next_epoch),\n \"execution_id\", A.execution_id,\n \"attempt_id\", A.attempt_id,\n \"worker_id\", A.worker_id,\n \"worker_instance_id\", A.worker_instance_id,\n \"acquired_at\", tostring(now_ms),\n \"expires_at\", tostring(expires_at),\n \"last_renewed_at\", tostring(now_ms),\n \"renewal_deadline\", tostring(renewal_deadline))\n\n -- 6. Update exec_core \u{2014} ALL 7 state vector dimensions\n redis.call(\"HSET\", K.core_key,\n \"lifecycle_phase\", \"active\",\n \"ownership_state\", \"leased\",\n \"eligibility_state\", \"not_applicable\",\n \"blocking_reason\", \"none\",\n \"blocking_detail\", \"\",\n \"terminal_outcome\", \"none\",\n \"attempt_state\", \"running_attempt\",\n \"public_state\", \"active\",\n \"current_attempt_index\", tostring(next_att_idx),\n \"total_attempt_count\", tostring(next_att_idx + 1),\n \"current_attempt_id\", A.attempt_id,\n \"current_lease_id\", A.lease_id,\n \"current_lease_epoch\", tostring(next_epoch),\n \"current_worker_id\", A.worker_id,\n \"current_worker_instance_id\", A.worker_instance_id,\n \"current_lane\", A.lane,\n \"lease_acquired_at\", tostring(now_ms),\n \"lease_expires_at\", tostring(expires_at),\n \"lease_last_renewed_at\", tostring(now_ms),\n \"lease_renewal_deadline\", tostring(renewal_deadline),\n \"lease_expired_at\", \"\",\n \"lease_revoked_at\", \"\",\n \"lease_revoke_reason\", \"\",\n \"lease_reclaim_count\", tostring(reclaim_count + 1),\n \"last_transition_at\", tostring(now_ms),\n \"last_mutation_at\", tostring(now_ms))\n\n -- 7. Update indexes\n redis.call(\"ZADD\", K.lease_expiry_key, expires_at, A.execution_id)\n redis.call(\"SADD\", K.worker_leases_key, A.execution_id)\n redis.call(\"ZADD\", K.active_index_key, expires_at, A.execution_id)\n\n -- 8. Lease history events\n redis.call(\"XADD\", K.lease_history_key, \"MAXLEN\", \"~\", 1000, \"*\",\n \"event\", \"reclaimed\",\n \"old_lease_epoch\", core.current_lease_epoch or \"\",\n \"new_lease_id\", A.lease_id,\n \"new_lease_epoch\", tostring(next_epoch),\n \"new_attempt_id\", A.attempt_id,\n \"new_attempt_index\", tostring(next_att_idx),\n \"worker_id\", A.worker_id,\n \"ts\", tostring(now_ms))\n\n return ok(A.lease_id, tostring(next_epoch), tostring(expires_at),\n A.attempt_id, tostring(next_att_idx), \"reclaim\")\nend)\n\n---------------------------------------------------------------------------\n-- #29 ff_expire_execution\n--\n-- Timeout-based expiration. Handles active, runnable, and suspended phases.\n-- Called by the attempt_timeout and execution_deadline scanners.\n--\n-- KEYS (14): exec_core, attempt_hash, stream_meta, lease_current,\n-- lease_history, lease_expiry_zset, worker_leases,\n-- active_index, terminal_zset, attempt_timeout_zset,\n-- execution_deadline_zset, suspended_zset,\n-- suspension_timeout_zset, suspension_current\n-- ARGV (2): execution_id, expire_reason\n---------------------------------------------------------------------------\nredis.register_function(\'ff_expire_execution\', function(keys, args)\n local K = {\n core_key = keys[1],\n attempt_hash = keys[2],\n stream_meta = keys[3],\n lease_current_key = keys[4],\n lease_history_key = keys[5],\n lease_expiry_key = keys[6],\n worker_leases_key = keys[7],\n active_index_key = keys[8],\n terminal_key = keys[9],\n attempt_timeout_key = keys[10],\n execution_deadline_key = keys[11],\n suspended_zset = keys[12],\n suspension_timeout_key = keys[13],\n suspension_current = keys[14],\n }\n\n local A = {\n execution_id = args[1],\n expire_reason = args[2] or \"attempt_timeout\",\n }\n\n local t = redis.call(\"TIME\")\n local now_ms = tonumber(t[1]) * 1000 + math.floor(tonumber(t[2]) / 1000)\n\n local raw = redis.call(\"HGETALL\", K.core_key)\n if #raw == 0 then\n redis.call(\"ZREM\", K.attempt_timeout_key, A.execution_id)\n redis.call(\"ZREM\", K.execution_deadline_key, A.execution_id)\n return ok(\"not_found_cleaned\")\n end\n local core = hgetall_to_table(raw)\n\n -- Already terminal \u{2014} no-op\n if core.lifecycle_phase == \"terminal\" then\n redis.call(\"ZREM\", K.attempt_timeout_key, A.execution_id)\n redis.call(\"ZREM\", K.execution_deadline_key, A.execution_id)\n return ok(\"already_terminal\")\n end\n\n -- PATH: Active\n if core.lifecycle_phase == \"active\" then\n -- End attempt\n if is_set(core.current_attempt_index) then\n redis.call(\"HSET\", K.attempt_hash,\n \"attempt_state\", \"ended_failure\",\n \"ended_at\", tostring(now_ms),\n \"failure_reason\", A.expire_reason)\n end\n -- Close stream\n if redis.call(\"EXISTS\", K.stream_meta) == 1 then\n redis.call(\"HSET\", K.stream_meta,\n \"closed_at\", tostring(now_ms),\n \"closed_reason\", \"expired\")\n end\n -- Release lease\n redis.call(\"DEL\", K.lease_current_key)\n redis.call(\"ZREM\", K.lease_expiry_key, A.execution_id)\n redis.call(\"SREM\", K.worker_leases_key, A.execution_id)\n -- Dynamic worker_leases SREM: the scanner passes empty WorkerInstanceId\n -- in KEYS[7] (it can\'t know which worker holds the lease). Use the actual\n -- worker_instance_id from exec_core to SREM from the correct set.\n local wiid = core.current_worker_instance_id or \"\"\n if wiid ~= \"\" then\n local tag = string.match(K.core_key, \"(%b{})\")\n redis.call(\"SREM\", \"ff:idx:\" .. tag .. \":worker:\" .. wiid .. \":leases\", A.execution_id)\n end\n redis.call(\"ZREM\", K.active_index_key, A.execution_id)\n -- Lease history\n redis.call(\"XADD\", K.lease_history_key, \"MAXLEN\", \"~\", \"1000\", \"*\",\n \"event\", \"released\",\n \"lease_id\", core.current_lease_id or \"\",\n \"lease_epoch\", core.current_lease_epoch or \"\",\n \"attempt_index\", core.current_attempt_index or \"\",\n \"reason\", \"expired:\" .. A.expire_reason,\n \"ts\", tostring(now_ms))\n end\n\n -- PATH: Suspended\n if core.lifecycle_phase == \"suspended\" then\n -- End attempt\n if is_set(core.current_attempt_index) then\n redis.call(\"HSET\", K.attempt_hash,\n \"attempt_state\", \"ended_failure\",\n \"ended_at\", tostring(now_ms),\n \"failure_reason\", A.expire_reason)\n end\n -- Close suspension\n if redis.call(\"EXISTS\", K.suspension_current) == 1 then\n redis.call(\"HSET\", K.suspension_current,\n \"closed_at\", tostring(now_ms),\n \"close_reason\", \"expired:\" .. A.expire_reason)\n end\n -- Close stream\n if redis.call(\"EXISTS\", K.stream_meta) == 1 then\n redis.call(\"HSET\", K.stream_meta,\n \"closed_at\", tostring(now_ms),\n \"closed_reason\", \"expired\")\n end\n -- ZREM from suspended indexes\n redis.call(\"ZREM\", K.suspended_zset, A.execution_id)\n redis.call(\"ZREM\", K.suspension_timeout_key, A.execution_id)\n end\n\n -- PATH: Runnable (execution_deadline fires while execution is waiting/delayed/blocked)\n -- These indexes are not in the 14 KEYS array, so construct dynamically from\n -- hash tag + lane_id (same C2 pattern as ff_cancel_execution).\n if core.lifecycle_phase == \"runnable\" then\n local tag = string.match(K.core_key, \"(%b{})\")\n local lane = core.lane_id or core.current_lane or \"default\"\n local es = core.eligibility_state or \"\"\n\n if es == \"eligible_now\" then\n redis.call(\"ZREM\", \"ff:idx:\" .. tag .. \":lane:\" .. lane .. \":eligible\", A.execution_id)\n elseif es == \"not_eligible_until_time\" then\n redis.call(\"ZREM\", \"ff:idx:\" .. tag .. \":lane:\" .. lane .. \":delayed\", A.execution_id)\n elseif es == \"blocked_by_dependencies\" then\n redis.call(\"ZREM\", \"ff:idx:\" .. tag .. \":lane:\" .. lane .. \":blocked:dependencies\", A.execution_id)\n elseif es == \"blocked_by_budget\" then\n redis.call(\"ZREM\", \"ff:idx:\" .. tag .. \":lane:\" .. lane .. \":blocked:budget\", A.execution_id)\n elseif es == \"blocked_by_quota\" then\n redis.call(\"ZREM\", \"ff:idx:\" .. tag .. \":lane:\" .. lane .. \":blocked:quota\", A.execution_id)\n elseif es == \"blocked_by_route\" then\n redis.call(\"ZREM\", \"ff:idx:\" .. tag .. \":lane:\" .. lane .. \":blocked:route\", A.execution_id)\n elseif es == \"blocked_by_operator\" then\n redis.call(\"ZREM\", \"ff:idx:\" .. tag .. \":lane:\" .. lane .. \":blocked:operator\", A.execution_id)\n else\n -- Defensive catch-all: handles blocked_by_lane_state and any future\n -- eligibility states. ZREM from ALL runnable-state indexes \u{2014} idempotent.\n local lp = \"ff:idx:\" .. tag .. \":lane:\" .. lane\n redis.call(\"ZREM\", lp .. \":eligible\", A.execution_id)\n redis.call(\"ZREM\", lp .. \":delayed\", A.execution_id)\n redis.call(\"ZREM\", lp .. \":blocked:dependencies\", A.execution_id)\n redis.call(\"ZREM\", lp .. \":blocked:budget\", A.execution_id)\n redis.call(\"ZREM\", lp .. \":blocked:quota\", A.execution_id)\n redis.call(\"ZREM\", lp .. \":blocked:route\", A.execution_id)\n redis.call(\"ZREM\", lp .. \":blocked:operator\", A.execution_id)\n end\n end\n\n -- ALL PATHS: terminal transition\n local att_state = \"attempt_terminal\"\n if not is_set(core.current_attempt_index) then\n att_state = core.attempt_state or \"none\"\n end\n\n redis.call(\"HSET\", K.core_key,\n \"lifecycle_phase\", \"terminal\",\n \"ownership_state\", \"unowned\",\n \"eligibility_state\", \"not_applicable\",\n \"blocking_reason\", \"none\",\n \"blocking_detail\", \"\",\n \"terminal_outcome\", \"expired\",\n \"attempt_state\", att_state,\n \"public_state\", \"expired\",\n \"failure_reason\", A.expire_reason,\n \"completed_at\", tostring(now_ms),\n \"current_lease_id\", \"\",\n \"current_worker_id\", \"\",\n \"current_worker_instance_id\", \"\",\n \"lease_expires_at\", \"\",\n \"current_suspension_id\", \"\",\n \"current_waitpoint_id\", \"\",\n \"last_transition_at\", tostring(now_ms),\n \"last_mutation_at\", tostring(now_ms))\n\n -- Cleanup timeout indexes + ZADD terminal\n redis.call(\"ZREM\", K.attempt_timeout_key, A.execution_id)\n redis.call(\"ZREM\", K.execution_deadline_key, A.execution_id)\n redis.call(\"ZADD\", K.terminal_key, now_ms, A.execution_id)\n\n return ok(\"expired\", core.lifecycle_phase)\nend)\n\n\n-- source: lua/scheduling.lua\n-- FlowFabric scheduling functions\n-- Reference: RFC-009 (Scheduling), RFC-010 \u{a7}4 (function inventory)\n--\n-- Depends on helpers: ok, err, hgetall_to_table, is_set, validate_lease\n\n---------------------------------------------------------------------------\n-- Capability matching helpers (local to scheduling.lua)\n-- Bounds CAPS_MAX_BYTES / CAPS_MAX_TOKENS live in helpers.lua and are\n-- enforced symmetrically on worker caps here and on required caps in\n-- ff_create_execution so neither side can smuggle in an oversized list.\n---------------------------------------------------------------------------\n\n-- Parse a capability CSV into a {token=true} set. Empty/nil \u{2192} empty set.\n-- Returns (set, nil) on success or (nil, err_tuple) on bound violation.\n--\n-- Empty tokens (from stray separators like \"a,,b\") are skipped BEFORE the\n-- count check so a legitimate list punctuated by noise isn\'t rejected.\n-- Real oversize input still fails because #csv > CAPS_MAX_BYTES catches it\n-- before this loop runs.\nlocal function parse_capability_csv(csv, kind)\n if csv == nil or csv == \"\" then\n return {}, nil\n end\n if #csv > CAPS_MAX_BYTES then\n return nil, err(\"invalid_capabilities\", kind .. \":too_many_bytes\")\n end\n local set = {}\n local n = 0\n for token in string.gmatch(csv, \"([^,]+)\") do\n if #token > 0 then\n n = n + 1\n if n > CAPS_MAX_TOKENS then\n return nil, err(\"invalid_capabilities\", kind .. \":too_many_tokens\")\n end\n set[token] = true\n end\n end\n return set, nil\nend\n\n-- Return sorted CSV of tokens present in `required` but missing from\n-- `worker_caps`. Empty result means worker satisfies all requirements.\nlocal function missing_capabilities(required, worker_caps)\n local missing = {}\n for cap, _ in pairs(required) do\n if not worker_caps[cap] then\n missing[#missing + 1] = cap\n end\n end\n table.sort(missing)\n return table.concat(missing, \",\")\nend\n\n---------------------------------------------------------------------------\n-- #25 ff_issue_claim_grant\n--\n-- Scheduler issues a claim grant for an eligible execution.\n-- Validates execution is eligible, writes grant hash with TTL,\n-- removes from eligible set.\n--\n-- KEYS (3): exec_core, claim_grant_key, eligible_zset\n-- ARGV (9): execution_id, worker_id, worker_instance_id,\n-- lane_id, capability_hash, grant_ttl_ms,\n-- route_snapshot_json, admission_summary,\n-- worker_capabilities_csv -- sorted CSV of worker caps (option a)\n--\n-- Capability matching (RFC-009):\n-- If exec_core.required_capabilities (sorted CSV on exec_core) is empty,\n-- any worker matches (backwards compat). Otherwise the worker\'s sorted\n-- CSV must be a superset.\n-- On mismatch: Lua stamps `last_capability_mismatch_at` (single scalar\n-- field, idempotent write \u{2014} no unbounded counter) and returns\n-- err(\"capability_mismatch\", missing_csv). The scheduler side MUST\n-- then block the execution off the eligible ZSET (see\n-- ff_block_execution_for_admission with reason `waiting_for_capability`),\n-- otherwise ZRANGEBYSCORE keeps returning the same top-of-zset every\n-- tick and 100 workers \u{d7} 1 tick/s = hot-loop starvation. RFC-009 \u{a7}564.\n---------------------------------------------------------------------------\nredis.register_function(\'ff_issue_claim_grant\', function(keys, args)\n local K = {\n core_key = keys[1],\n claim_grant = keys[2],\n eligible_zset = keys[3],\n }\n\n local A = {\n execution_id = args[1],\n worker_id = args[2],\n worker_instance_id = args[3],\n lane_id = args[4],\n capability_hash = args[5] or \"\",\n route_snapshot_json = args[7] or \"\",\n admission_summary = args[8] or \"\",\n worker_capabilities_csv = args[9] or \"\",\n }\n\n local grant_ttl_n = require_number(args[6], \"grant_ttl_ms\")\n if type(grant_ttl_n) == \"table\" then return grant_ttl_n end\n A.grant_ttl_ms = grant_ttl_n\n\n local t = redis.call(\"TIME\")\n local now_ms = tonumber(t[1]) * 1000 + math.floor(tonumber(t[2]) / 1000)\n\n -- 1. Validate execution exists and is eligible\n local raw = redis.call(\"HGETALL\", K.core_key)\n if #raw == 0 then return err(\"execution_not_found\") end\n local core = hgetall_to_table(raw)\n\n if core.lifecycle_phase ~= \"runnable\" then\n return err(\"execution_not_eligible\")\n end\n if core.eligibility_state ~= \"eligible_now\" then\n return err(\"execution_not_eligible\")\n end\n\n -- 2. Check no existing grant (prevent double-grant)\n if redis.call(\"EXISTS\", K.claim_grant) == 1 then\n return err(\"grant_already_exists\")\n end\n\n -- 3. Verify execution is in eligible set (TOCTOU guard)\n local score = redis.call(\"ZSCORE\", K.eligible_zset, A.execution_id)\n if not score then\n return err(\"execution_not_in_eligible_set\")\n end\n\n -- 4. Capability matching. On miss we stamp a SINGLE bounded field \u{2014}\n -- `last_capability_mismatch_at` \u{2014} so operators can SCAN for stuck\n -- executions via `HGET last_capability_mismatch_at < now - 1h` without\n -- needing a counter. An earlier version HINCRBY\'d a counter; that was\n -- dropped because combined with the hot-loop bug (executions staying in\n -- the eligible ZSET after mismatch) the counter grew unboundedly (2.4M\n -- increments/day on one stuck exec_core under 100 workers). An HSET of\n -- a fixed field is idempotent w.r.t. size.\n --\n -- The scheduler MUST block the execution off the eligible ZSET after\n -- this err returns; otherwise the next tick picks the same top-of-zset\n -- and we wasted this validation. See Scheduler::claim_for_worker.\n local required_set, req_err = parse_capability_csv(\n core.required_capabilities or \"\", \"required\")\n if req_err then return req_err end\n local worker_set, wrk_err = parse_capability_csv(\n A.worker_capabilities_csv, \"worker\")\n if wrk_err then return wrk_err end\n if next(required_set) ~= nil then\n local missing = missing_capabilities(required_set, worker_set)\n if missing ~= \"\" then\n redis.call(\"HSET\", K.core_key,\n \"last_capability_mismatch_at\", tostring(now_ms))\n return err(\"capability_mismatch\", missing)\n end\n end\n\n -- 5. Write grant hash with TTL\n local grant_expires_at = now_ms + A.grant_ttl_ms\n redis.call(\"HSET\", K.claim_grant,\n \"worker_id\", A.worker_id,\n \"worker_instance_id\", A.worker_instance_id,\n \"lane_id\", A.lane_id,\n \"capability_hash\", A.capability_hash,\n \"route_snapshot_json\", A.route_snapshot_json,\n \"admission_summary\", A.admission_summary,\n \"created_at\", tostring(now_ms),\n \"grant_expires_at\", tostring(grant_expires_at))\n redis.call(\"PEXPIREAT\", K.claim_grant, grant_expires_at)\n\n -- 6. Do NOT ZREM from eligible here. ff_claim_execution does the ZREM\n -- when consuming the grant. If the grant expires unconsumed, the execution\n -- remains in the eligible set and is re-discovered by the next scheduler\n -- cycle. This prevents the \"orphaned grant\" stuck state where an execution\n -- is in no scheduling index after grant expiry.\n\n return ok(A.execution_id)\nend)\n\n---------------------------------------------------------------------------\n-- #32 ff_change_priority\n--\n-- Update priority and re-score in eligible ZSET.\n-- Only works for runnable + eligible_now executions.\n--\n-- KEYS (2): exec_core, eligible_zset\n-- ARGV (2): execution_id, new_priority\n---------------------------------------------------------------------------\nredis.register_function(\'ff_change_priority\', function(keys, args)\n local K = {\n core_key = keys[1],\n eligible_zset = keys[2],\n }\n\n local new_priority_n = require_number(args[2], \"new_priority\")\n if type(new_priority_n) == \"table\" then return new_priority_n end\n\n local A = {\n execution_id = args[1],\n new_priority = new_priority_n,\n }\n\n local t = redis.call(\"TIME\")\n local now_ms = tonumber(t[1]) * 1000 + math.floor(tonumber(t[2]) / 1000)\n\n -- 1. Read and validate\n local raw = redis.call(\"HGETALL\", K.core_key)\n if #raw == 0 then return err(\"execution_not_found\") end\n local core = hgetall_to_table(raw)\n\n if core.lifecycle_phase ~= \"runnable\" then\n return err(\"execution_not_eligible\")\n end\n if core.eligibility_state ~= \"eligible_now\" then\n return err(\"execution_not_eligible\")\n end\n\n local old_priority = tonumber(core.priority or \"0\")\n\n -- Clamp to safe range (same as ff_create_execution)\n if A.new_priority < 0 then A.new_priority = 0 end\n if A.new_priority > 9000 then A.new_priority = 9000 end\n\n -- 2. Update exec_core priority\n redis.call(\"HSET\", K.core_key,\n \"priority\", tostring(A.new_priority),\n \"last_mutation_at\", tostring(now_ms))\n\n -- 3. Re-score in eligible ZSET\n -- Composite score: -(priority * 1_000_000_000_000) + created_at_ms\n local created_at = tonumber(core.created_at or \"0\")\n local new_score = 0 - (A.new_priority * 1000000000000) + created_at\n redis.call(\"ZADD\", K.eligible_zset, new_score, A.execution_id)\n\n return ok(tostring(old_priority), tostring(A.new_priority))\nend)\n\n---------------------------------------------------------------------------\n-- #33 ff_update_progress\n--\n-- Update progress fields on exec_core. Validate lease (lite check:\n-- lease_id + epoch only \u{2014} attempt_id not required per \u{a7}4 Class B).\n--\n-- KEYS (1): exec_core\n-- ARGV (5): execution_id, lease_id, lease_epoch,\n-- progress_pct, progress_message\n---------------------------------------------------------------------------\nredis.register_function(\'ff_update_progress\', function(keys, args)\n local K = {\n core_key = keys[1],\n }\n\n local A = {\n execution_id = args[1],\n lease_id = args[2],\n lease_epoch = args[3],\n progress_pct = args[4] or \"\",\n progress_message = args[5] or \"\",\n }\n\n local t = redis.call(\"TIME\")\n local now_ms = tonumber(t[1]) * 1000 + math.floor(tonumber(t[2]) / 1000)\n\n -- Read and validate\n local raw = redis.call(\"HGETALL\", K.core_key)\n if #raw == 0 then return err(\"execution_not_found\") end\n local core = hgetall_to_table(raw)\n\n if core.lifecycle_phase ~= \"active\" then\n return err(\"execution_not_active\",\n core.terminal_outcome or \"\", core.current_lease_epoch or \"\")\n end\n if core.ownership_state == \"lease_revoked\" then\n return err(\"lease_revoked\")\n end\n if tonumber(core.lease_expires_at or \"0\") <= now_ms then\n return err(\"lease_expired\")\n end\n if core.current_lease_id ~= A.lease_id then\n return err(\"stale_lease\")\n end\n if core.current_lease_epoch ~= A.lease_epoch then\n return err(\"stale_lease\")\n end\n\n -- Update progress fields\n local fields = { \"last_mutation_at\", tostring(now_ms), \"progress_updated_at\", tostring(now_ms) }\n if is_set(A.progress_pct) then\n fields[#fields + 1] = \"progress_pct\"\n fields[#fields + 1] = A.progress_pct\n end\n if is_set(A.progress_message) then\n fields[#fields + 1] = \"progress_message\"\n fields[#fields + 1] = A.progress_message\n end\n redis.call(\"HSET\", K.core_key, unpack(fields))\n\n return ok()\nend)\n\n---------------------------------------------------------------------------\n-- #27 ff_promote_delayed\n--\n-- Promote a delayed execution to eligible when its delay_until has passed.\n-- Called by the delayed promoter scanner.\n-- Preserves attempt_state (may be pending_retry, pending_first, or\n-- attempt_interrupted from delay_execution).\n--\n-- KEYS (3): exec_core, delayed_zset, eligible_zset\n-- ARGV (2): execution_id, now_ms\n---------------------------------------------------------------------------\nredis.register_function(\'ff_promote_delayed\', function(keys, args)\n local K = {\n core_key = keys[1],\n delayed_zset = keys[2],\n eligible_zset = keys[3],\n }\n\n local now_ms_n = require_number(args[2], \"now_ms\")\n if type(now_ms_n) == \"table\" then return now_ms_n end\n\n local A = {\n execution_id = args[1],\n now_ms = now_ms_n,\n }\n\n -- Read and validate\n local raw = redis.call(\"HGETALL\", K.core_key)\n if #raw == 0 then\n -- Execution gone \u{2014} clean up stale index entry\n redis.call(\"ZREM\", K.delayed_zset, A.execution_id)\n return ok(\"not_found_cleaned\")\n end\n local core = hgetall_to_table(raw)\n\n -- Must be runnable + not_eligible_until_time\n if core.lifecycle_phase ~= \"runnable\" then\n redis.call(\"ZREM\", K.delayed_zset, A.execution_id)\n return ok(\"not_runnable_cleaned\")\n end\n if core.eligibility_state ~= \"not_eligible_until_time\" then\n redis.call(\"ZREM\", K.delayed_zset, A.execution_id)\n return ok(\"not_delayed_cleaned\")\n end\n\n -- Check delay_until has actually passed\n local delay_until = tonumber(core.delay_until or \"0\")\n if delay_until > A.now_ms then\n return ok(\"not_yet_due\")\n end\n\n -- Promote: update 6 of 7 state vector dimensions.\n -- attempt_state is DELIBERATELY PRESERVED (not written). This is the 7th dim.\n --\n -- WHY: The caller that put this execution into the delayed set already set\n -- the attempt_state to reflect what should happen on next claim:\n -- * pending_retry_attempt \u{2014} from ff_fail_execution (retry backoff expired)\n -- * pending_replay_attempt \u{2014} from ff_replay_execution (replay delay expired)\n -- * attempt_interrupted \u{2014} from ff_delay_execution (worker self-delay)\n -- * pending_first_attempt \u{2014} from ff_create_execution (initial delay_until)\n -- Overwriting it here would lose this routing information and break\n -- claim_execution\'s attempt_type derivation (initial vs retry vs replay)\n -- and the claim dispatch routing (claim_execution vs claim_resumed_execution).\n redis.call(\"HSET\", K.core_key,\n \"lifecycle_phase\", \"runnable\",\n \"ownership_state\", \"unowned\",\n \"eligibility_state\", \"eligible_now\",\n \"blocking_reason\", \"waiting_for_worker\",\n \"blocking_detail\", \"\",\n \"terminal_outcome\", \"none\",\n -- attempt_state: NOT WRITTEN \u{2014} see comment above\n \"public_state\", \"waiting\",\n \"delay_until\", \"\",\n \"last_transition_at\", tostring(A.now_ms),\n \"last_mutation_at\", tostring(A.now_ms))\n\n -- ZREM from delayed, ZADD to eligible with composite priority score\n redis.call(\"ZREM\", K.delayed_zset, A.execution_id)\n local priority = tonumber(core.priority or \"0\")\n local created_at = tonumber(core.created_at or \"0\")\n local score = 0 - (priority * 1000000000000) + created_at\n redis.call(\"ZADD\", K.eligible_zset, score, A.execution_id)\n\n return ok(\"promoted\")\nend)\n\n---------------------------------------------------------------------------\n-- #26 ff_issue_reclaim_grant\n--\n-- TODO(batch-c): This function has NO production Rust caller as of Batch B.\n-- The reclaim scanner that would invoke it (to recover leases from crashed\n-- workers) is scheduled for cairn Batch C. A worker dying mid-execution today\n-- leaves its execution stuck in `lease_expired_reclaimable` until operator\n-- intervention. Test-only callers exist in crates/ff-test/tests to exercise\n-- the Lua side. When the scheduler reclaim integration lands, the caller\n-- must apply the same block-on-capability-mismatch pattern used by\n-- `ff-scheduler::Scheduler::claim_for_worker` (see the IMPORTANT note\n-- below) \u{2014} otherwise an unmatchable reclaim recycles every scanner tick.\n--\n-- Scheduler issues a reclaim grant for an expired/revoked execution.\n-- Similar to ff_issue_claim_grant but validates reclaimable state.\n--\n-- KEYS (3): exec_core, claim_grant_key, lease_expiry_zset\n-- ARGV (9): execution_id, worker_id, worker_instance_id,\n-- lane_id, capability_hash, grant_ttl_ms,\n-- route_snapshot_json, admission_summary,\n-- worker_capabilities_csv\n--\n-- Capability matching identical to ff_issue_claim_grant: reclaiming a lease\n-- must respect the execution\'s required_capabilities just like an initial\n-- claim, so a re-issuance to a non-matching worker is blocked here too.\n--\n-- IMPORTANT: on capability_mismatch this function does NOT remove the exec\n-- from the lease_expiry pool. The reclaim SCANNER (to be added in Rust) MUST\n-- detect capability_mismatch and move the execution into blocked_route with\n-- reason `waiting_for_capable_worker` (mirroring the claim-grant path). If\n-- the scanner instead re-attempts the same execution every tick, a reclaim\n-- hot-loop develops that is analogous to the claim-path hot-loop and\n-- identical in cost (wasted FCALLs + log volume). Lease_expiry as an index\n-- has no natural sweeping mechanism for post-mismatch promotion \u{2014} the\n-- scheduler-side block + periodic sweep owns the lifecycle.\n---------------------------------------------------------------------------\nredis.register_function(\'ff_issue_reclaim_grant\', function(keys, args)\n local K = {\n core_key = keys[1],\n claim_grant = keys[2],\n lease_expiry = keys[3],\n }\n\n local A = {\n execution_id = args[1],\n worker_id = args[2],\n worker_instance_id = args[3],\n lane_id = args[4],\n capability_hash = args[5] or \"\",\n route_snapshot_json = args[7] or \"\",\n admission_summary = args[8] or \"\",\n worker_capabilities_csv = args[9] or \"\",\n }\n\n local grant_ttl_n = require_number(args[6], \"grant_ttl_ms\")\n if type(grant_ttl_n) == \"table\" then return grant_ttl_n end\n A.grant_ttl_ms = grant_ttl_n\n\n local t = redis.call(\"TIME\")\n local now_ms = tonumber(t[1]) * 1000 + math.floor(tonumber(t[2]) / 1000)\n\n -- Validate execution exists and is reclaimable\n local raw = redis.call(\"HGETALL\", K.core_key)\n if #raw == 0 then return err(\"execution_not_found\") end\n local core = hgetall_to_table(raw)\n\n if core.lifecycle_phase ~= \"active\" then\n return err(\"execution_not_reclaimable\")\n end\n if core.ownership_state ~= \"lease_expired_reclaimable\"\n and core.ownership_state ~= \"lease_revoked\" then\n return err(\"execution_not_reclaimable\")\n end\n\n -- Check no existing grant\n if redis.call(\"EXISTS\", K.claim_grant) == 1 then\n return err(\"grant_already_exists\")\n end\n\n -- Capability matching \u{2014} same policy as issue_claim_grant: stamp\n -- last_capability_mismatch_at (single scalar) on miss so ops can surface\n -- stuck reclaims via SCAN. Scheduler MUST also block-out the exec from\n -- the lease_expiry reclaim pool; otherwise the reclaim scanner hits the\n -- same mismatch every cycle. See Scheduler::reclaim_for_worker.\n local required_set, req_err = parse_capability_csv(\n core.required_capabilities or \"\", \"required\")\n if req_err then return req_err end\n local worker_set, wrk_err = parse_capability_csv(\n A.worker_capabilities_csv, \"worker\")\n if wrk_err then return wrk_err end\n if next(required_set) ~= nil then\n local missing = missing_capabilities(required_set, worker_set)\n if missing ~= \"\" then\n redis.call(\"HSET\", K.core_key,\n \"last_capability_mismatch_at\", tostring(now_ms))\n return err(\"capability_mismatch\", missing)\n end\n end\n\n -- Write grant hash with TTL\n local grant_expires_at = now_ms + A.grant_ttl_ms\n redis.call(\"HSET\", K.claim_grant,\n \"worker_id\", A.worker_id,\n \"worker_instance_id\", A.worker_instance_id,\n \"lane_id\", A.lane_id,\n \"capability_hash\", A.capability_hash,\n \"route_snapshot_json\", A.route_snapshot_json,\n \"admission_summary\", A.admission_summary,\n \"created_at\", tostring(now_ms),\n \"grant_expires_at\", tostring(grant_expires_at))\n redis.call(\"PEXPIREAT\", K.claim_grant, grant_expires_at)\n\n -- Do NOT ZREM from lease_expiry \u{2014} stays for scheduler discovery\n\n return ok(A.execution_id)\nend)\n\n\n-- source: lua/suspension.lua\n-- FlowFabric suspension and waitpoint functions\n-- Reference: RFC-004 (Suspension), RFC-005 (Signal), RFC-010 \u{a7}4\n--\n-- Depends on helpers: ok, err, ok_already_satisfied, hgetall_to_table,\n-- is_set, validate_lease_and_mark_expired, clear_lease_and_indexes,\n-- map_reason_to_blocking, initialize_condition, write_condition_hash,\n-- evaluate_signal_against_condition, is_condition_satisfied,\n-- extract_field, initial_signal_summary_json, validate_pending_waitpoint,\n-- assert_active_suspension, assert_waitpoint_belongs\n\n---------------------------------------------------------------------------\n-- #13 ff_suspend_execution\n--\n-- Validate lease, release ownership, create suspension + waitpoint\n-- (or activate pending), init condition, transition active \u{2192} suspended.\n-- Mints the waitpoint HMAC token (RFC-004 \u{a7}Waitpoint Security) returned\n-- alongside the waitpoint_id for external signal delivery.\n--\n-- KEYS (17): exec_core, attempt_record, lease_current, lease_history,\n-- lease_expiry_zset, worker_leases, suspension_current,\n-- waitpoint_hash, waitpoint_signals, suspension_timeout_zset,\n-- pending_wp_expiry_zset, active_index, suspended_zset,\n-- waitpoint_history, wp_condition, attempt_timeout_zset,\n-- hmac_secrets\n-- ARGV (17): execution_id, attempt_index, attempt_id, lease_id,\n-- lease_epoch, suspension_id, waitpoint_id, waitpoint_key,\n-- reason_code, requested_by, timeout_at, resume_condition_json,\n-- resume_policy_json, continuation_metadata_pointer,\n-- use_pending_waitpoint, timeout_behavior, lease_history_maxlen\n---------------------------------------------------------------------------\nredis.register_function(\'ff_suspend_execution\', function(keys, args)\n local K = {\n core_key = keys[1],\n attempt_record = keys[2],\n lease_current_key = keys[3],\n lease_history_key = keys[4],\n lease_expiry_key = keys[5],\n worker_leases_key = keys[6],\n suspension_current = keys[7],\n waitpoint_hash = keys[8],\n waitpoint_signals = keys[9],\n suspension_timeout_key = keys[10],\n pending_wp_expiry_key = keys[11],\n active_index_key = keys[12],\n suspended_zset = keys[13],\n waitpoint_history = keys[14],\n wp_condition = keys[15],\n attempt_timeout_key = keys[16],\n hmac_secrets = keys[17],\n }\n\n local A = {\n execution_id = args[1],\n attempt_index = args[2],\n attempt_id = args[3],\n lease_id = args[4],\n lease_epoch = args[5],\n suspension_id = args[6],\n waitpoint_id = args[7],\n waitpoint_key = args[8],\n reason_code = args[9],\n requested_by = args[10],\n timeout_at = args[11] or \"\",\n resume_condition_json = args[12],\n resume_policy_json = args[13],\n continuation_metadata_ptr = args[14] or \"\",\n use_pending_waitpoint = args[15] or \"\",\n timeout_behavior = args[16] or \"fail\",\n lease_history_maxlen = tonumber(args[17] or \"1000\"),\n }\n\n local t = redis.call(\"TIME\")\n local now_ms = tonumber(t[1]) * 1000 + math.floor(tonumber(t[2]) / 1000)\n\n -- 1. Read execution core\n local raw = redis.call(\"HGETALL\", K.core_key)\n if #raw == 0 then return err(\"execution_not_found\") end\n local core = hgetall_to_table(raw)\n\n -- 2. Validate lease (full check incl. expiry + revocation + identity)\n local lease_err = validate_lease_and_mark_expired(\n core, A, now_ms, K, A.lease_history_maxlen)\n if lease_err then return lease_err end\n\n -- 3. Validate attempt binding\n if tostring(core.current_attempt_index) ~= A.attempt_index then\n return err(\"invalid_lease_for_suspend\")\n end\n\n -- 4. Check for existing suspension: reject if open, archive if closed\n if redis.call(\"EXISTS\", K.suspension_current) == 1 then\n local closed = redis.call(\"HGET\", K.suspension_current, \"closed_at\")\n if not is_set(closed) then\n return err(\"already_suspended\")\n end\n -- Previous suspension is closed. Archive old waitpoint_id for cleanup,\n -- then DEL the stale record before creating a new one.\n local old_wp = redis.call(\"HGET\", K.suspension_current, \"waitpoint_id\")\n if is_set(old_wp) then\n redis.call(\"SADD\", K.waitpoint_history, old_wp)\n end\n redis.call(\"DEL\", K.suspension_current)\n end\n\n -- 5. Create or activate waitpoint\n local waitpoint_id = A.waitpoint_id\n local waitpoint_key = A.waitpoint_key\n local waitpoint_token = \"\"\n\n if A.use_pending_waitpoint == \"1\" then\n -- Activate existing pending waitpoint\n local wp_raw = redis.call(\"HGETALL\", K.waitpoint_hash)\n local wp_err = validate_pending_waitpoint(wp_raw, A.execution_id, A.attempt_index, now_ms)\n if wp_err then return wp_err end\n\n -- Read waitpoint_id, waitpoint_key, and existing token from pending record.\n -- Token was minted at ff_create_pending_waitpoint time with that record\'s\n -- created_at; we MUST keep using it so signals buffered before activation\n -- validate against the same binding.\n local wp = hgetall_to_table(wp_raw)\n waitpoint_id = wp.waitpoint_id\n waitpoint_key = wp.waitpoint_key\n -- A pending waitpoint without a minted token is either a pre-HMAC-upgrade\n -- record or a corrupted write. Activating with an empty token would return\n -- \"\" to the SDK, and every subsequent signal delivery would reject with\n -- missing_token \u{2014} fail-closed at the security boundary but silent about\n -- the real degraded state. Surface the degradation AT the activation\n -- point so operators see it immediately.\n if not is_set(wp.waitpoint_token) then\n return err(\"waitpoint_not_token_bound\")\n end\n waitpoint_token = wp.waitpoint_token\n\n -- Activate the pending waitpoint\n redis.call(\"HSET\", K.waitpoint_hash,\n \"suspension_id\", A.suspension_id,\n \"state\", \"active\",\n \"activated_at\", tostring(now_ms),\n \"expires_at\", is_set(A.timeout_at) and A.timeout_at or \"\")\n redis.call(\"ZREM\", K.pending_wp_expiry_key, waitpoint_id)\n\n -- CRITICAL: Evaluate buffered signals that arrived while waitpoint was pending.\n -- If early signals already satisfy the resume condition, skip suspension entirely.\n local buffered = redis.call(\"XRANGE\", K.waitpoint_signals, \"-\", \"+\")\n if #buffered > 0 then\n local wp_cond = initialize_condition(A.resume_condition_json)\n for _, entry in ipairs(buffered) do\n local fields = entry[2]\n local sig_name = extract_field(fields, \"signal_name\")\n local sig_id = extract_field(fields, \"signal_id\")\n evaluate_signal_against_condition(wp_cond, sig_name, sig_id)\n end\n if is_condition_satisfied(wp_cond) then\n -- Resume condition already met by buffered signals \u{2014} skip suspension.\n redis.call(\"HSET\", K.waitpoint_hash,\n \"state\", \"closed\", \"satisfied_at\", tostring(now_ms),\n \"closed_at\", tostring(now_ms), \"close_reason\", \"resumed\")\n write_condition_hash(K.wp_condition, wp_cond, now_ms)\n -- Do NOT release lease, do NOT change execution state.\n return ok_already_satisfied(A.suspension_id, waitpoint_id, waitpoint_key, waitpoint_token)\n end\n -- Condition not yet satisfied \u{2014} proceed with suspension.\n -- Write partial condition state (some matchers may be satisfied).\n write_condition_hash(K.wp_condition, wp_cond, now_ms)\n else\n -- No buffered signals \u{2014} init condition from scratch\n local wp_cond = initialize_condition(A.resume_condition_json)\n write_condition_hash(K.wp_condition, wp_cond, now_ms)\n end\n else\n -- Create new waitpoint \u{2014} mint HMAC token bound to (waitpoint_id,\n -- waitpoint_key, created_at). The created_at written here is what the\n -- signal-delivery path reads back for HMAC validation.\n local token, token_err = mint_waitpoint_token(\n K.hmac_secrets, waitpoint_id, waitpoint_key, now_ms)\n if not token then return err(token_err) end\n waitpoint_token = token\n\n redis.call(\"HSET\", K.waitpoint_hash,\n \"waitpoint_id\", waitpoint_id,\n \"execution_id\", A.execution_id,\n \"attempt_index\", A.attempt_index,\n \"suspension_id\", A.suspension_id,\n \"waitpoint_key\", waitpoint_key,\n \"waitpoint_token\", waitpoint_token,\n \"state\", \"active\",\n \"created_at\", tostring(now_ms),\n \"activated_at\", tostring(now_ms),\n \"expires_at\", is_set(A.timeout_at) and A.timeout_at or \"\",\n \"signal_count\", \"0\",\n \"matched_signal_count\", \"0\",\n \"last_signal_at\", \"\")\n\n -- Initialize condition hash from resume condition spec\n local wp_cond = initialize_condition(A.resume_condition_json)\n write_condition_hash(K.wp_condition, wp_cond, now_ms)\n end\n\n -- 6. Record waitpoint_id in mandatory history set (required for cleanup cascade)\n redis.call(\"SADD\", K.waitpoint_history, waitpoint_id)\n\n -- OOM-SAFE WRITE ORDERING (per RFC-010 \u{a7}4.8b):\n -- exec_core HSET is the \"point of no return\" \u{2014} write it FIRST.\n\n -- 7. Transition exec_core (FIRST \u{2014} point of no return, all 7 dims)\n redis.call(\"HSET\", K.core_key,\n \"lifecycle_phase\", \"suspended\",\n \"ownership_state\", \"unowned\",\n \"eligibility_state\", \"not_applicable\",\n \"blocking_reason\", map_reason_to_blocking(A.reason_code),\n \"blocking_detail\", \"suspended: waitpoint \" .. waitpoint_id .. \" awaiting \" .. A.reason_code,\n \"terminal_outcome\", \"none\",\n \"attempt_state\", \"attempt_interrupted\",\n \"public_state\", \"suspended\",\n \"current_lease_id\", \"\",\n \"current_worker_id\", \"\",\n \"current_worker_instance_id\", \"\",\n \"lease_expires_at\", \"\",\n \"lease_last_renewed_at\", \"\",\n \"lease_renewal_deadline\", \"\",\n \"current_suspension_id\", A.suspension_id,\n \"current_waitpoint_id\", waitpoint_id,\n \"last_transition_at\", tostring(now_ms),\n \"last_mutation_at\", tostring(now_ms))\n\n -- 8. Pause the attempt: started -> suspended (RFC-002 suspend_attempt)\n redis.call(\"HSET\", K.attempt_record,\n \"attempt_state\", \"suspended\",\n \"suspended_at\", tostring(now_ms),\n \"suspension_id\", A.suspension_id)\n\n -- 9. Release lease + update indexes\n redis.call(\"DEL\", K.lease_current_key)\n redis.call(\"ZREM\", K.lease_expiry_key, A.execution_id)\n redis.call(\"SREM\", K.worker_leases_key, A.execution_id)\n redis.call(\"ZREM\", K.active_index_key, A.execution_id)\n redis.call(\"ZREM\", K.attempt_timeout_key, A.execution_id)\n redis.call(\"XADD\", K.lease_history_key, \"MAXLEN\", \"~\", A.lease_history_maxlen, \"*\",\n \"event\", \"released\",\n \"lease_id\", A.lease_id,\n \"lease_epoch\", A.lease_epoch,\n \"attempt_index\", A.attempt_index,\n \"attempt_id\", core.current_attempt_id or \"\",\n \"reason\", \"suspend\",\n \"ts\", tostring(now_ms))\n\n -- 10. Create suspension record\n redis.call(\"HSET\", K.suspension_current,\n \"suspension_id\", A.suspension_id,\n \"execution_id\", A.execution_id,\n \"attempt_index\", A.attempt_index,\n \"waitpoint_id\", waitpoint_id,\n \"waitpoint_key\", waitpoint_key,\n \"reason_code\", A.reason_code,\n \"requested_by\", A.requested_by,\n \"created_at\", tostring(now_ms),\n \"timeout_at\", A.timeout_at,\n \"timeout_behavior\", A.timeout_behavior,\n \"resume_condition_json\", A.resume_condition_json,\n \"resume_policy_json\", A.resume_policy_json,\n \"continuation_metadata_pointer\", A.continuation_metadata_ptr,\n \"buffered_signal_summary_json\", initial_signal_summary_json(),\n \"last_signal_at\", \"\",\n \"satisfied_at\", \"\",\n \"closed_at\", \"\",\n \"close_reason\", \"\")\n\n -- 11. Add to per-lane suspended index + suspension timeout\n -- Score: timeout_at if set, otherwise MAX for \"no timeout\" ordering\n redis.call(\"ZADD\", K.suspended_zset,\n is_set(A.timeout_at) and tonumber(A.timeout_at) or 9999999999999,\n A.execution_id)\n\n if is_set(A.timeout_at) then\n redis.call(\"ZADD\", K.suspension_timeout_key, tonumber(A.timeout_at), A.execution_id)\n end\n\n return ok(A.suspension_id, waitpoint_id, waitpoint_key, waitpoint_token)\nend)\n\n---------------------------------------------------------------------------\n-- #14 ff_resume_execution\n--\n-- Transition suspended \u{2192} runnable. Called after signal satisfies condition\n-- or by operator override. Closes suspension + waitpoint, updates indexes.\n--\n-- KEYS (8): exec_core, suspension_current, waitpoint_hash,\n-- waitpoint_signals, suspension_timeout_zset,\n-- eligible_zset, delayed_zset, suspended_zset\n-- ARGV (3): execution_id, trigger_type, resume_delay_ms\n---------------------------------------------------------------------------\nredis.register_function(\'ff_resume_execution\', function(keys, args)\n local K = {\n core_key = keys[1],\n suspension_current = keys[2],\n waitpoint_hash = keys[3],\n waitpoint_signals = keys[4],\n suspension_timeout_key = keys[5],\n eligible_zset = keys[6],\n delayed_zset = keys[7],\n suspended_zset = keys[8],\n }\n\n local A = {\n execution_id = args[1],\n trigger_type = args[2] or \"signal\", -- \"signal\", \"operator\", \"auto_resume\"\n resume_delay_ms = tonumber(args[3] or \"0\"),\n }\n\n local t = redis.call(\"TIME\")\n local now_ms = tonumber(t[1]) * 1000 + math.floor(tonumber(t[2]) / 1000)\n\n -- 1. Read + validate execution is suspended\n local raw = redis.call(\"HGETALL\", K.core_key)\n if #raw == 0 then return err(\"execution_not_found\") end\n local core = hgetall_to_table(raw)\n\n if core.lifecycle_phase ~= \"suspended\" then\n return err(\"execution_not_suspended\")\n end\n\n -- 2. Validate active suspension\n local susp_raw = redis.call(\"HGETALL\", K.suspension_current)\n local susp_err, susp = assert_active_suspension(susp_raw)\n if susp_err then return susp_err end\n\n -- 3. Compute eligibility based on resume_delay_ms\n local eligibility_state = \"eligible_now\"\n local blocking_reason = \"waiting_for_worker\"\n local blocking_detail = \"\"\n local public_state = \"waiting\"\n\n if A.resume_delay_ms > 0 then\n eligibility_state = \"not_eligible_until_time\"\n blocking_reason = \"waiting_for_resume_delay\"\n blocking_detail = \"resume delay \" .. tostring(A.resume_delay_ms) .. \"ms\"\n public_state = \"delayed\"\n end\n\n -- OOM-SAFE WRITE ORDERING: exec_core FIRST (point of no return)\n\n -- 4. Transition exec_core (FIRST \u{2014} all 7 dims)\n redis.call(\"HSET\", K.core_key,\n \"lifecycle_phase\", \"runnable\",\n \"ownership_state\", \"unowned\",\n \"eligibility_state\", eligibility_state,\n \"blocking_reason\", blocking_reason,\n \"blocking_detail\", blocking_detail,\n \"terminal_outcome\", \"none\",\n \"attempt_state\", \"attempt_interrupted\",\n \"public_state\", public_state,\n \"current_suspension_id\", \"\",\n \"current_waitpoint_id\", \"\",\n \"last_transition_at\", tostring(now_ms),\n \"last_mutation_at\", tostring(now_ms))\n\n -- 5. Update scheduling indexes\n redis.call(\"ZREM\", K.suspended_zset, A.execution_id)\n if A.resume_delay_ms > 0 then\n redis.call(\"ZADD\", K.delayed_zset,\n now_ms + A.resume_delay_ms, A.execution_id)\n else\n -- ZADD eligible with composite priority score\n local priority = tonumber(core.priority or \"0\")\n local created_at = tonumber(core.created_at or \"0\")\n local score = 0 - (priority * 1000000000000) + created_at\n redis.call(\"ZADD\", K.eligible_zset, score, A.execution_id)\n end\n\n -- 6. Close sub-objects (safe to lose on OOM \u{2014} stale but not zombie)\n -- Close waitpoint\n redis.call(\"HSET\", K.waitpoint_hash,\n \"state\", \"closed\",\n \"satisfied_at\", tostring(now_ms),\n \"closed_at\", tostring(now_ms),\n \"close_reason\", \"resumed\")\n\n -- Close suspension\n redis.call(\"HSET\", K.suspension_current,\n \"satisfied_at\", tostring(now_ms),\n \"closed_at\", tostring(now_ms),\n \"close_reason\", \"resumed\")\n\n -- Remove from suspension timeout index\n redis.call(\"ZREM\", K.suspension_timeout_key, A.execution_id)\n\n return ok(public_state)\nend)\n\n---------------------------------------------------------------------------\n-- #15 ff_create_pending_waitpoint\n--\n-- Pre-create a waitpoint before suspension commits. The waitpoint is\n-- externally addressable by waitpoint_key so early signals can be buffered.\n-- Requires the caller to still hold the active lease.\n-- Mints the waitpoint HMAC token up front so early signals targeting the\n-- pending waitpoint can be authenticated via ff_buffer_signal_for_pending_waitpoint.\n--\n-- KEYS (4): exec_core, waitpoint_hash, pending_wp_expiry_zset, hmac_secrets\n-- ARGV (5): execution_id, attempt_index, waitpoint_id, waitpoint_key,\n-- expires_at\n---------------------------------------------------------------------------\nredis.register_function(\'ff_create_pending_waitpoint\', function(keys, args)\n local K = {\n core_key = keys[1],\n waitpoint_hash = keys[2],\n pending_wp_expiry_key = keys[3],\n hmac_secrets = keys[4],\n }\n\n local A = {\n execution_id = args[1],\n attempt_index = args[2],\n waitpoint_id = args[3],\n waitpoint_key = args[4],\n expires_at = args[5],\n }\n\n local t = redis.call(\"TIME\")\n local now_ms = tonumber(t[1]) * 1000 + math.floor(tonumber(t[2]) / 1000)\n\n -- 1. Validate execution is active with a lease\n local raw = redis.call(\"HGETALL\", K.core_key)\n if #raw == 0 then return err(\"execution_not_found\") end\n local core = hgetall_to_table(raw)\n\n if core.lifecycle_phase ~= \"active\" then\n return err(\"execution_not_active\",\n core.terminal_outcome or \"\", core.current_lease_epoch or \"\")\n end\n if core.ownership_state ~= \"leased\" then\n return err(\"no_active_lease\")\n end\n -- Validate attempt binding\n if tostring(core.current_attempt_index) ~= A.attempt_index then\n return err(\"stale_lease\")\n end\n\n -- 2. Guard: waitpoint already exists\n if redis.call(\"EXISTS\", K.waitpoint_hash) == 1 then\n local existing_state = redis.call(\"HGET\", K.waitpoint_hash, \"state\")\n if existing_state == \"pending\" or existing_state == \"active\" then\n return err(\"waitpoint_already_exists\")\n end\n -- Old closed/expired waitpoint \u{2014} safe to overwrite\n end\n\n -- 3. Mint HMAC token bound to (waitpoint_id, waitpoint_key, now_ms).\n -- The suspension activation path will reuse this token unchanged.\n local waitpoint_token, token_err = mint_waitpoint_token(\n K.hmac_secrets, A.waitpoint_id, A.waitpoint_key, now_ms)\n if not waitpoint_token then return err(token_err) end\n\n -- 4. Create pending waitpoint\n redis.call(\"HSET\", K.waitpoint_hash,\n \"waitpoint_id\", A.waitpoint_id,\n \"execution_id\", A.execution_id,\n \"attempt_index\", A.attempt_index,\n \"suspension_id\", \"\",\n \"waitpoint_key\", A.waitpoint_key,\n \"waitpoint_token\", waitpoint_token,\n \"state\", \"pending\",\n \"created_at\", tostring(now_ms),\n \"activated_at\", \"\",\n \"satisfied_at\", \"\",\n \"closed_at\", \"\",\n \"expires_at\", A.expires_at,\n \"close_reason\", \"\",\n \"signal_count\", \"0\",\n \"matched_signal_count\", \"0\",\n \"last_signal_at\", \"\")\n\n -- 5. Add to pending waitpoint expiry index\n redis.call(\"ZADD\", K.pending_wp_expiry_key,\n tonumber(A.expires_at), A.waitpoint_id)\n\n return ok(A.waitpoint_id, A.waitpoint_key, waitpoint_token)\nend)\n\n---------------------------------------------------------------------------\n-- #16/#19 ff_expire_suspension (Overlap group D \u{2014} one script)\n--\n-- Apply timeout behavior when suspension timeout fires.\n-- Re-validates that execution is still suspended and timeout is due.\n-- Handles all 5 timeout behaviors:\n-- fail \u{2192} terminal(failed)\n-- cancel \u{2192} terminal(cancelled)\n-- expire \u{2192} terminal(expired)\n-- auto_resume \u{2192} close + resume to runnable\n-- escalate \u{2192} mutate suspension to operator-review\n--\n-- KEYS (12): exec_core, suspension_current, waitpoint_hash, wp_condition,\n-- attempt_hash, stream_meta, suspension_timeout_zset,\n-- suspended_zset, terminal_zset, eligible_zset, delayed_zset,\n-- lease_history\n-- ARGV (1): execution_id\n---------------------------------------------------------------------------\nredis.register_function(\'ff_expire_suspension\', function(keys, args)\n local K = {\n core_key = keys[1],\n suspension_current = keys[2],\n waitpoint_hash = keys[3],\n wp_condition = keys[4],\n attempt_hash = keys[5],\n stream_meta = keys[6],\n suspension_timeout_key = keys[7],\n suspended_zset = keys[8],\n terminal_key = keys[9],\n eligible_zset = keys[10],\n delayed_zset = keys[11],\n lease_history_key = keys[12],\n }\n\n local A = {\n execution_id = args[1],\n }\n\n local t = redis.call(\"TIME\")\n local now_ms = tonumber(t[1]) * 1000 + math.floor(tonumber(t[2]) / 1000)\n\n -- 1. Read + validate execution is still suspended\n local raw = redis.call(\"HGETALL\", K.core_key)\n if #raw == 0 then\n redis.call(\"ZREM\", K.suspension_timeout_key, A.execution_id)\n return ok(\"not_found_cleaned\")\n end\n local core = hgetall_to_table(raw)\n\n if core.lifecycle_phase ~= \"suspended\" then\n redis.call(\"ZREM\", K.suspension_timeout_key, A.execution_id)\n return ok(\"not_suspended_cleaned\")\n end\n\n -- 2. Read suspension and validate it\'s still open\n local susp_raw = redis.call(\"HGETALL\", K.suspension_current)\n local susp_err, susp = assert_active_suspension(susp_raw)\n if susp_err then\n redis.call(\"ZREM\", K.suspension_timeout_key, A.execution_id)\n return ok(\"no_active_suspension_cleaned\")\n end\n\n -- 3. Check timeout is actually due\n local timeout_at = tonumber(susp.timeout_at or \"0\")\n if timeout_at == 0 or timeout_at > now_ms then\n return ok(\"not_yet_due\")\n end\n\n -- 4. Read timeout behavior\n local behavior = susp.timeout_behavior or \"fail\"\n\n -- 5. Apply behavior\n if behavior == \"auto_resume\" or behavior == \"auto_resume_with_timeout_signal\" then\n -- auto_resume: close suspension + resume to runnable (like ff_resume_execution)\n\n -- OOM-SAFE: exec_core FIRST\n local priority = tonumber(core.priority or \"0\")\n local created_at = tonumber(core.created_at or \"0\")\n local score = 0 - (priority * 1000000000000) + created_at\n\n redis.call(\"HSET\", K.core_key,\n \"lifecycle_phase\", \"runnable\",\n \"ownership_state\", \"unowned\",\n \"eligibility_state\", \"eligible_now\",\n \"blocking_reason\", \"waiting_for_worker\",\n \"blocking_detail\", \"\",\n \"terminal_outcome\", \"none\",\n \"attempt_state\", \"attempt_interrupted\",\n \"public_state\", \"waiting\",\n \"current_suspension_id\", \"\",\n \"current_waitpoint_id\", \"\",\n \"last_transition_at\", tostring(now_ms),\n \"last_mutation_at\", tostring(now_ms))\n\n -- Update indexes\n redis.call(\"ZREM\", K.suspended_zset, A.execution_id)\n redis.call(\"ZADD\", K.eligible_zset, score, A.execution_id)\n\n -- Close sub-objects\n redis.call(\"HSET\", K.waitpoint_hash,\n \"state\", \"closed\",\n \"closed_at\", tostring(now_ms),\n \"close_reason\", \"timed_out_auto_resume\")\n redis.call(\"HSET\", K.wp_condition,\n \"closed\", \"1\",\n \"closed_at\", tostring(now_ms),\n \"closed_reason\", \"timed_out_auto_resume\")\n redis.call(\"HSET\", K.suspension_current,\n \"closed_at\", tostring(now_ms),\n \"close_reason\", \"timed_out_auto_resume\")\n\n redis.call(\"ZREM\", K.suspension_timeout_key, A.execution_id)\n\n return ok(\"auto_resume\", \"waiting\")\n\n elseif behavior == \"escalate\" then\n -- escalate: mutate suspension to operator-review, keep suspended (ALL 7 dims)\n\n redis.call(\"HSET\", K.core_key,\n \"lifecycle_phase\", \"suspended\", -- preserve\n \"ownership_state\", \"unowned\", -- preserve\n \"eligibility_state\", \"not_applicable\", -- preserve\n \"blocking_reason\", \"paused_by_operator\",\n \"blocking_detail\", \"suspension escalated: timeout at \" .. tostring(timeout_at),\n \"terminal_outcome\", \"none\", -- preserve\n \"attempt_state\", core.attempt_state or \"attempt_interrupted\", -- preserve\n \"public_state\", \"suspended\", -- preserve\n \"last_mutation_at\", tostring(now_ms))\n\n redis.call(\"HSET\", K.suspension_current,\n \"reason_code\", \"waiting_for_operator_review\",\n \"timeout_at\", \"\",\n \"timeout_behavior\", \"\")\n\n -- Remove from timeout index (no longer has a timeout)\n redis.call(\"ZREM\", K.suspension_timeout_key, A.execution_id)\n\n return ok(\"escalate\", \"suspended\")\n\n else\n -- Terminal paths: fail, cancel, expire\n\n local terminal_outcome\n local public_state_val\n local close_reason\n\n if behavior == \"cancel\" then\n terminal_outcome = \"cancelled\"\n public_state_val = \"cancelled\"\n close_reason = \"timed_out_cancel\"\n elseif behavior == \"expire\" then\n terminal_outcome = \"expired\"\n public_state_val = \"expired\"\n close_reason = \"timed_out_expire\"\n else\n -- Default: fail\n terminal_outcome = \"failed\"\n public_state_val = \"failed\"\n close_reason = \"timed_out_fail\"\n end\n\n -- OOM-SAFE: exec_core FIRST (\u{a7}4.8b Rule 2)\n redis.call(\"HSET\", K.core_key,\n \"lifecycle_phase\", \"terminal\",\n \"ownership_state\", \"unowned\",\n \"eligibility_state\", \"not_applicable\",\n \"blocking_reason\", \"none\",\n \"blocking_detail\", \"\",\n \"terminal_outcome\", terminal_outcome,\n \"attempt_state\", \"attempt_terminal\",\n \"public_state\", public_state_val,\n \"failure_reason\", \"suspension_timeout:\" .. behavior,\n \"completed_at\", tostring(now_ms),\n \"current_suspension_id\", \"\",\n \"current_waitpoint_id\", \"\",\n \"last_transition_at\", tostring(now_ms),\n \"last_mutation_at\", tostring(now_ms))\n\n -- End attempt: suspended \u{2192} ended_failure/ended_cancelled\n if is_set(core.current_attempt_index) then\n local att_end_state = \"ended_failure\"\n if behavior == \"cancel\" then\n att_end_state = \"ended_cancelled\"\n end\n redis.call(\"HSET\", K.attempt_hash,\n \"attempt_state\", att_end_state,\n \"ended_at\", tostring(now_ms),\n \"failure_reason\", \"suspension_timeout:\" .. behavior,\n \"suspended_at\", \"\",\n \"suspension_id\", \"\")\n end\n\n -- Close stream if exists\n if redis.call(\"EXISTS\", K.stream_meta) == 1 then\n redis.call(\"HSET\", K.stream_meta,\n \"closed_at\", tostring(now_ms),\n \"closed_reason\", \"suspension_timeout\")\n end\n\n -- Close sub-objects\n redis.call(\"HSET\", K.waitpoint_hash,\n \"state\", \"closed\",\n \"closed_at\", tostring(now_ms),\n \"close_reason\", close_reason)\n redis.call(\"HSET\", K.wp_condition,\n \"closed\", \"1\",\n \"closed_at\", tostring(now_ms),\n \"closed_reason\", close_reason)\n redis.call(\"HSET\", K.suspension_current,\n \"closed_at\", tostring(now_ms),\n \"close_reason\", close_reason)\n\n -- Remove from suspension indexes, add to terminal\n redis.call(\"ZREM\", K.suspension_timeout_key, A.execution_id)\n redis.call(\"ZREM\", K.suspended_zset, A.execution_id)\n redis.call(\"ZADD\", K.terminal_key, now_ms, A.execution_id)\n\n return ok(behavior, public_state_val)\n end\nend)\n\n---------------------------------------------------------------------------\n-- #36 ff_close_waitpoint\n--\n-- Proactive close of pending or active waitpoint. Used by workers that\n-- created a pending waitpoint but decided not to suspend.\n--\n-- KEYS (3): exec_core, waitpoint_hash, pending_wp_expiry_zset\n-- ARGV (2): waitpoint_id, reason\n---------------------------------------------------------------------------\nredis.register_function(\'ff_close_waitpoint\', function(keys, args)\n local K = {\n core_key = keys[1],\n waitpoint_hash = keys[2],\n pending_wp_expiry_key = keys[3],\n }\n\n local A = {\n waitpoint_id = args[1],\n reason = args[2] or \"proactive_close\",\n }\n\n local t = redis.call(\"TIME\")\n local now_ms = tonumber(t[1]) * 1000 + math.floor(tonumber(t[2]) / 1000)\n\n -- 1. Read waitpoint\n local wp_raw = redis.call(\"HGETALL\", K.waitpoint_hash)\n if #wp_raw == 0 then\n return err(\"waitpoint_not_found\")\n end\n local wp = hgetall_to_table(wp_raw)\n\n -- 2. Validate state is pending or active\n if wp.state ~= \"pending\" and wp.state ~= \"active\" then\n if wp.state == \"closed\" or wp.state == \"expired\" then\n return ok(\"already_closed\")\n end\n return err(\"waitpoint_not_open\")\n end\n\n -- 3. Close the waitpoint\n redis.call(\"HSET\", K.waitpoint_hash,\n \"state\", \"closed\",\n \"closed_at\", tostring(now_ms),\n \"close_reason\", A.reason)\n\n -- 4. Remove from pending expiry index (no-op if not pending)\n redis.call(\"ZREM\", K.pending_wp_expiry_key, A.waitpoint_id)\n\n return ok()\nend)\n\n\n-- source: lua/signal.lua\n-- FlowFabric signal delivery and resume-claim functions\n-- Reference: RFC-005 (Signal), RFC-001 (Execution), RFC-010 \u{a7}4.1 (#17, #18, #2)\n--\n-- Depends on helpers: ok, err, ok_duplicate, hgetall_to_table, is_set,\n-- initialize_condition, write_condition_hash, evaluate_signal_against_condition,\n-- is_condition_satisfied, extract_field\n\n---------------------------------------------------------------------------\n-- #17 ff_deliver_signal\n--\n-- Atomic signal delivery: validate target, check idempotency, record\n-- signal, evaluate resume condition, optionally close waitpoint +\n-- suspension + transition suspended -> runnable.\n--\n-- KEYS (14): exec_core, wp_condition, wp_signals_stream,\n-- exec_signals_zset, signal_hash, signal_payload,\n-- idem_key, waitpoint_hash, suspension_current,\n-- eligible_zset, suspended_zset, delayed_zset,\n-- suspension_timeout_zset, hmac_secrets\n-- ARGV (18): signal_id, execution_id, waitpoint_id, signal_name,\n-- signal_category, source_type, source_identity,\n-- payload, payload_encoding, idempotency_key,\n-- correlation_id, target_scope, created_at,\n-- dedup_ttl_ms, resume_delay_ms, signal_maxlen,\n-- max_signals_per_execution, waitpoint_token\n---------------------------------------------------------------------------\nredis.register_function(\'ff_deliver_signal\', function(keys, args)\n local K = {\n core_key = keys[1],\n wp_condition = keys[2],\n wp_signals_stream = keys[3],\n exec_signals_zset = keys[4],\n signal_hash = keys[5],\n signal_payload = keys[6],\n idem_key = keys[7],\n waitpoint_hash = keys[8],\n suspension_current = keys[9],\n eligible_zset = keys[10],\n suspended_zset = keys[11],\n delayed_zset = keys[12],\n suspension_timeout_zset = keys[13],\n hmac_secrets = keys[14],\n }\n\n local A = {\n signal_id = args[1],\n execution_id = args[2],\n waitpoint_id = args[3],\n signal_name = args[4],\n signal_category = args[5],\n source_type = args[6],\n source_identity = args[7],\n payload = args[8] or \"\",\n payload_encoding = args[9] or \"json\",\n idempotency_key = args[10] or \"\",\n correlation_id = args[11] or \"\",\n target_scope = args[12] or \"waitpoint\",\n created_at = args[13] or \"\",\n dedup_ttl_ms = tonumber(args[14] or \"86400000\"),\n resume_delay_ms = tonumber(args[15] or \"0\"),\n signal_maxlen = tonumber(args[16] or \"1000\"),\n max_signals = tonumber(args[17] or \"10000\"),\n waitpoint_token = args[18] or \"\",\n }\n\n local t = redis.call(\"TIME\")\n local now_ms = tonumber(t[1]) * 1000 + math.floor(tonumber(t[2]) / 1000)\n\n -- 1. Validate execution exists\n local raw = redis.call(\"HGETALL\", K.core_key)\n if #raw == 0 then\n return err(\"execution_not_found\")\n end\n local core = hgetall_to_table(raw)\n\n -- 2. Validate HMAC token FIRST (RFC-004 \u{a7}Waitpoint Security).\n --\n -- Order matters: lifecycle / waitpoint-state checks below would otherwise\n -- form a state oracle \u{2014} an attacker presenting ANY token (including an\n -- invalid one) for an arbitrary (execution_id, waitpoint_id) pair could\n -- distinguish \"execution is terminal\" vs \"waitpoint is pending\" vs\n -- \"waitpoint is closed\" by the specific error code returned, without\n -- having to produce a valid HMAC. Auth-first closes that oracle.\n --\n -- Missing-waitpoint is collapsed into `invalid_token` for the same\n -- reason: an unauthenticated caller must not be able to probe which\n -- (execution, waitpoint) tuples exist.\n local wp_for_auth_raw = redis.call(\"HGETALL\", K.waitpoint_hash)\n if #wp_for_auth_raw == 0 then\n return err(\"invalid_token\")\n end\n local wp_for_auth = hgetall_to_table(wp_for_auth_raw)\n if not wp_for_auth.created_at then\n return err(\"invalid_token\")\n end\n local token_err = validate_waitpoint_token(\n K.hmac_secrets, A.waitpoint_token,\n A.waitpoint_id, wp_for_auth.waitpoint_key or \"\",\n tonumber(wp_for_auth.created_at) or 0, now_ms)\n if token_err then\n -- Operator-visible counter (RFC-004 \u{a7}Waitpoint Security observability).\n -- Single scalar HSET on exec_core \u{2014} bounded, amortized-free. Gives\n -- operators a \"last time this execution saw an auth failure\" field to\n -- correlate with key-rotation drift, client bugs, or attack traffic\n -- without needing to tail Lua slowlog or FCALL error logs.\n redis.call(\"HSET\", K.core_key, \"last_hmac_validation_failed_at\", tostring(now_ms))\n return err(token_err)\n end\n\n -- 3. Validate execution is in a signalable state (post-auth).\n local lp = core.lifecycle_phase\n if lp == \"terminal\" then\n return err(\"target_not_signalable\")\n end\n\n if lp == \"active\" or lp == \"runnable\" or lp == \"submitted\" then\n -- Not suspended. wp_for_auth was just loaded above; reuse it.\n if wp_for_auth.state == \"pending\" then\n return err(\"waitpoint_pending_use_buffer_script\")\n end\n if wp_for_auth.state ~= \"active\" then\n return err(\"target_not_signalable\")\n end\n -- Active waitpoint on non-suspended execution \u{2014} unusual but valid (race window)\n end\n\n -- 4. Validate waitpoint condition is open (post-auth).\n local cond_raw = redis.call(\"HGETALL\", K.wp_condition)\n if #cond_raw == 0 then\n return err(\"waitpoint_not_found\")\n end\n local wp_cond = hgetall_to_table(cond_raw)\n if wp_cond.closed == \"1\" then\n return err(\"waitpoint_closed\")\n end\n\n -- 4. Signal count limit (prevents unbounded ZSET growth from webhook storms)\n if A.max_signals > 0 then\n local current_count = redis.call(\"ZCARD\", K.exec_signals_zset)\n if current_count >= A.max_signals then\n return err(\"signal_limit_exceeded\")\n end\n end\n\n -- 5. Idempotency check\n -- Guard: (A.dedup_ttl_ms or 0) handles nil from tonumber(\"\") safely.\n local dedup_ms = A.dedup_ttl_ms or 0\n if A.idempotency_key ~= \"\" and dedup_ms > 0 then\n local existing = redis.call(\"GET\", K.idem_key)\n if existing then\n return ok_duplicate(existing)\n end\n redis.call(\"SET\", K.idem_key, A.signal_id,\n \"PX\", dedup_ms, \"NX\")\n end\n\n -- 6. Record signal hash\n local created_at = A.created_at ~= \"\" and A.created_at or tostring(now_ms)\n redis.call(\"HSET\", K.signal_hash,\n \"signal_id\", A.signal_id,\n \"target_execution_id\", A.execution_id,\n \"target_waitpoint_id\", A.waitpoint_id,\n \"target_scope\", A.target_scope,\n \"signal_name\", A.signal_name,\n \"signal_category\", A.signal_category,\n \"source_type\", A.source_type,\n \"source_identity\", A.source_identity,\n \"correlation_id\", A.correlation_id,\n \"idempotency_key\", A.idempotency_key,\n \"created_at\", created_at,\n \"accepted_at\", tostring(now_ms),\n \"matched_waitpoint_id\", A.waitpoint_id,\n \"payload_encoding\", A.payload_encoding)\n\n -- 6b. Store payload separately if present\n if A.payload ~= \"\" then\n redis.call(\"SET\", K.signal_payload, A.payload)\n end\n\n -- 7. Append to per-waitpoint signal stream + per-execution signal index\n redis.call(\"XADD\", K.wp_signals_stream, \"MAXLEN\", \"~\",\n tostring(A.signal_maxlen), \"*\",\n \"signal_id\", A.signal_id,\n \"signal_name\", A.signal_name,\n \"signal_category\", A.signal_category,\n \"source_type\", A.source_type,\n \"source_identity\", A.source_identity,\n \"matched\", \"0\",\n \"accepted_at\", tostring(now_ms))\n redis.call(\"ZADD\", K.exec_signals_zset, now_ms, A.signal_id)\n\n -- 8. Evaluate resume condition\n local effect = \"appended_to_waitpoint\"\n local matched = false\n\n local total = tonumber(wp_cond.total_matchers or \"0\")\n for i = 0, total - 1 do\n local sat_key = \"matcher:\" .. i .. \":satisfied\"\n local name_key = \"matcher:\" .. i .. \":name\"\n if wp_cond[sat_key] == \"0\" then\n local matcher_name = wp_cond[name_key] or \"\"\n if matcher_name == \"\" or matcher_name == A.signal_name then\n -- Mark matcher as satisfied\n redis.call(\"HSET\", K.wp_condition,\n sat_key, \"1\",\n \"matcher:\" .. i .. \":signal_id\", A.signal_id)\n matched = true\n local new_sat = tonumber(wp_cond.satisfied_count or \"0\") + 1\n redis.call(\"HSET\", K.wp_condition, \"satisfied_count\", tostring(new_sat))\n\n -- Check if overall condition is satisfied\n local mode = wp_cond.match_mode or \"any\"\n local min_count = tonumber(wp_cond.minimum_signal_count or \"1\")\n local resume = false\n if mode == \"any\" then\n resume = (new_sat >= min_count)\n elseif mode == \"all\" then\n resume = (new_sat >= total)\n else\n -- count(n) mode\n resume = (new_sat >= min_count)\n end\n\n if resume then\n effect = \"resume_condition_satisfied\"\n\n -- OOM-SAFE WRITE ORDERING (per RFC-010 \u{a7}4.8b):\n -- exec_core HSET is the \"point of no return\" \u{2014} write it FIRST.\n -- If OOM kills after exec_core but before closing sub-objects,\n -- execution is runnable (correct) with stale suspension/waitpoint\n -- records (generalized index reconciler catches this).\n\n -- 9a. Transition execution: suspended -> runnable (WRITE FIRST)\n -- Resume continues the SAME attempt (no new attempt created).\n if lp == \"suspended\" then\n local es, br, bd, ps\n if A.resume_delay_ms > 0 then\n es = \"not_eligible_until_time\"\n br = \"waiting_for_resume_delay\"\n bd = \"resume delay \" .. A.resume_delay_ms .. \"ms after signal \" .. A.signal_name\n ps = \"delayed\"\n else\n es = \"eligible_now\"\n br = \"waiting_for_worker\"\n bd = \"\"\n ps = \"waiting\"\n end\n\n -- ALL 7 state vector dimensions\n redis.call(\"HSET\", K.core_key,\n \"lifecycle_phase\", \"runnable\",\n \"ownership_state\", \"unowned\",\n \"eligibility_state\", es,\n \"blocking_reason\", br,\n \"blocking_detail\", bd,\n \"terminal_outcome\", \"none\",\n \"attempt_state\", \"attempt_interrupted\",\n \"public_state\", ps,\n \"current_suspension_id\", \"\",\n \"current_waitpoint_id\", \"\",\n \"last_transition_at\", tostring(now_ms),\n \"last_mutation_at\", tostring(now_ms))\n\n -- 9b. Update scheduling indexes\n local priority = tonumber(core.priority or \"0\")\n local created_at_exec = tonumber(core.created_at or \"0\")\n redis.call(\"ZREM\", K.suspended_zset, A.execution_id)\n if A.resume_delay_ms > 0 then\n redis.call(\"ZADD\", K.delayed_zset,\n now_ms + A.resume_delay_ms, A.execution_id)\n else\n redis.call(\"ZADD\", K.eligible_zset,\n 0 - (priority * 1000000000000) + created_at_exec,\n A.execution_id)\n end\n end\n\n -- 9c. Close waitpoint condition (after exec_core is safe)\n redis.call(\"HSET\", K.wp_condition,\n \"closed\", \"1\",\n \"closed_at\", tostring(now_ms),\n \"closed_reason\", \"satisfied\")\n\n -- 9d. Close waitpoint record\n redis.call(\"HSET\", K.waitpoint_hash,\n \"state\", \"closed\",\n \"satisfied_at\", tostring(now_ms),\n \"closed_at\", tostring(now_ms),\n \"close_reason\", \"resumed\")\n\n -- 9e. Close suspension record\n if redis.call(\"EXISTS\", K.suspension_current) == 1 then\n redis.call(\"HSET\", K.suspension_current,\n \"satisfied_at\", tostring(now_ms),\n \"closed_at\", tostring(now_ms),\n \"close_reason\", \"resumed\")\n end\n\n -- 9f. Remove from suspension timeout index\n redis.call(\"ZREM\", K.suspension_timeout_zset, A.execution_id)\n end\n break\n end\n end\n end\n\n if not matched then\n effect = \"no_op\"\n end\n\n -- 10. Record observed effect on signal\n redis.call(\"HSET\", K.signal_hash, \"observed_effect\", effect)\n\n -- 11. Update waitpoint signal counts\n redis.call(\"HINCRBY\", K.waitpoint_hash, \"signal_count\", 1)\n if matched then\n redis.call(\"HINCRBY\", K.waitpoint_hash, \"matched_signal_count\", 1)\n end\n redis.call(\"HSET\", K.waitpoint_hash, \"last_signal_at\", tostring(now_ms))\n\n -- 12. Update suspension signal summary\n if redis.call(\"EXISTS\", K.suspension_current) == 1 then\n redis.call(\"HSET\", K.suspension_current, \"last_signal_at\", tostring(now_ms))\n end\n\n return ok(A.signal_id, effect)\nend)\n\n---------------------------------------------------------------------------\n-- #18 ff_buffer_signal_for_pending_waitpoint\n--\n-- Accept signal for a pending (not yet committed) waitpoint.\n-- Records the signal but does NOT evaluate resume conditions.\n-- When suspend_execution activates the waitpoint, buffered signals\n-- are replayed through the full evaluation path.\n--\n-- KEYS (9): exec_core, wp_condition, wp_signals_stream,\n-- exec_signals_zset, signal_hash, signal_payload,\n-- idem_key, waitpoint_hash, hmac_secrets\n-- ARGV (18): same as ff_deliver_signal (17 + waitpoint_token)\n---------------------------------------------------------------------------\nredis.register_function(\'ff_buffer_signal_for_pending_waitpoint\', function(keys, args)\n local K = {\n core_key = keys[1],\n wp_condition = keys[2],\n wp_signals_stream = keys[3],\n exec_signals_zset = keys[4],\n signal_hash = keys[5],\n signal_payload = keys[6],\n idem_key = keys[7],\n waitpoint_hash = keys[8],\n hmac_secrets = keys[9],\n }\n\n local A = {\n signal_id = args[1],\n execution_id = args[2],\n waitpoint_id = args[3],\n signal_name = args[4],\n signal_category = args[5],\n source_type = args[6],\n source_identity = args[7],\n payload = args[8] or \"\",\n payload_encoding = args[9] or \"json\",\n idempotency_key = args[10] or \"\",\n correlation_id = args[11] or \"\",\n target_scope = args[12] or \"waitpoint\",\n created_at = args[13] or \"\",\n dedup_ttl_ms = tonumber(args[14] or \"86400000\"),\n signal_maxlen = tonumber(args[16] or \"1000\"),\n max_signals = tonumber(args[17] or \"10000\"),\n waitpoint_token = args[18] or \"\",\n }\n\n local t = redis.call(\"TIME\")\n local now_ms = tonumber(t[1]) * 1000 + math.floor(tonumber(t[2]) / 1000)\n\n -- 1. Validate execution exists\n local raw = redis.call(\"HGETALL\", K.core_key)\n if #raw == 0 then\n return err(\"execution_not_found\")\n end\n\n -- 1a. Validate HMAC token against the pending waitpoint\'s mint-time binding.\n local wp_for_auth = hgetall_to_table(redis.call(\"HGETALL\", K.waitpoint_hash))\n if not wp_for_auth.created_at then\n return err(\"waitpoint_not_found\")\n end\n local token_err = validate_waitpoint_token(\n K.hmac_secrets, A.waitpoint_token,\n A.waitpoint_id, wp_for_auth.waitpoint_key or \"\",\n tonumber(wp_for_auth.created_at) or 0, now_ms)\n if token_err then\n -- Operator-visible counter mirroring ff_deliver_signal. See comment\n -- there for rationale.\n redis.call(\"HSET\", K.core_key, \"last_hmac_validation_failed_at\", tostring(now_ms))\n return err(token_err)\n end\n\n -- 1b. Gate on waitpoint state. ff_deliver_signal blocks replay-after-close\n -- via wp_condition.closed, but wp_condition is not initialized for pending\n -- waitpoints \u{2014} we must check wp.state directly. Without this, a caller\n -- holding a valid token for a pending waitpoint that has since been\n -- closed/expired can keep appending buffered signals that will replay\n -- when suspend_execution(use_pending=1) later activates the waitpoint.\n if wp_for_auth.state == \"closed\" or wp_for_auth.state == \"expired\" then\n return err(\"waitpoint_closed\")\n end\n\n -- 2. Signal count limit\n if A.max_signals > 0 then\n local current_count = redis.call(\"ZCARD\", K.exec_signals_zset)\n if current_count >= A.max_signals then\n return err(\"signal_limit_exceeded\")\n end\n end\n\n -- 3. Idempotency check\n -- Guard: (A.dedup_ttl_ms or 0) handles nil from tonumber(\"\") safely.\n local dedup_ms = A.dedup_ttl_ms or 0\n if A.idempotency_key ~= \"\" and dedup_ms > 0 then\n local existing = redis.call(\"GET\", K.idem_key)\n if existing then\n return ok_duplicate(existing)\n end\n redis.call(\"SET\", K.idem_key, A.signal_id,\n \"PX\", dedup_ms, \"NX\")\n end\n\n -- 4. Record signal hash with tentative effect\n local created_at = A.created_at ~= \"\" and A.created_at or tostring(now_ms)\n redis.call(\"HSET\", K.signal_hash,\n \"signal_id\", A.signal_id,\n \"target_execution_id\", A.execution_id,\n \"target_waitpoint_id\", A.waitpoint_id,\n \"target_scope\", A.target_scope,\n \"signal_name\", A.signal_name,\n \"signal_category\", A.signal_category,\n \"source_type\", A.source_type,\n \"source_identity\", A.source_identity,\n \"correlation_id\", A.correlation_id,\n \"idempotency_key\", A.idempotency_key,\n \"created_at\", created_at,\n \"accepted_at\", tostring(now_ms),\n \"matched_waitpoint_id\", A.waitpoint_id,\n \"payload_encoding\", A.payload_encoding,\n \"observed_effect\", \"buffered_for_pending_waitpoint\")\n\n -- 4b. Store payload separately if present\n if A.payload ~= \"\" then\n redis.call(\"SET\", K.signal_payload, A.payload)\n end\n\n -- 5. Append to per-waitpoint signal stream + per-execution signal index\n -- These are recorded so suspend_execution can XRANGE and replay them.\n redis.call(\"XADD\", K.wp_signals_stream, \"MAXLEN\", \"~\",\n tostring(A.signal_maxlen), \"*\",\n \"signal_id\", A.signal_id,\n \"signal_name\", A.signal_name,\n \"signal_category\", A.signal_category,\n \"source_type\", A.source_type,\n \"source_identity\", A.source_identity,\n \"matched\", \"0\",\n \"accepted_at\", tostring(now_ms))\n redis.call(\"ZADD\", K.exec_signals_zset, now_ms, A.signal_id)\n\n -- No resume condition evaluation \u{2014} waitpoint is pending, not active.\n\n return ok(A.signal_id, \"buffered_for_pending_waitpoint\")\nend)\n\n---------------------------------------------------------------------------\n-- #2 ff_claim_resumed_execution\n--\n-- Consume claim-grant, resume existing attempt (interrupted -> started),\n-- create new lease bound to SAME attempt. Does NOT create a new attempt.\n--\n-- KEYS (11): exec_core, claim_grant, eligible_zset, lease_expiry_zset,\n-- worker_leases, existing_attempt_hash, lease_current,\n-- lease_history, active_index, attempt_timeout_zset,\n-- execution_deadline_zset\n-- ARGV (8): execution_id, worker_id, worker_instance_id, lane,\n-- capability_snapshot_hash, lease_id, lease_ttl_ms,\n-- remaining_attempt_timeout_ms\n---------------------------------------------------------------------------\nredis.register_function(\'ff_claim_resumed_execution\', function(keys, args)\n local K = {\n core_key = keys[1],\n claim_grant_key = keys[2],\n eligible_zset = keys[3],\n lease_expiry_key = keys[4],\n worker_leases_key = keys[5],\n attempt_hash = keys[6],\n lease_current_key = keys[7],\n lease_history_key = keys[8],\n active_index_key = keys[9],\n attempt_timeout_key = keys[10],\n execution_deadline_key = keys[11],\n }\n\n local lease_ttl_n = require_number(args[7], \"lease_ttl_ms\")\n if type(lease_ttl_n) == \"table\" then return lease_ttl_n end\n\n local A = {\n execution_id = args[1],\n worker_id = args[2],\n worker_instance_id = args[3],\n lane = args[4],\n capability_snapshot_hash = args[5] or \"\",\n lease_id = args[6],\n lease_ttl_ms = lease_ttl_n,\n remaining_attempt_timeout_ms = args[8] or \"\",\n }\n\n local t = redis.call(\"TIME\")\n local now_ms = tonumber(t[1]) * 1000 + math.floor(tonumber(t[2]) / 1000)\n\n -- 1. Validate execution exists\n local raw = redis.call(\"HGETALL\", K.core_key)\n if #raw == 0 then return err(\"execution_not_found\") end\n local core = hgetall_to_table(raw)\n\n -- 2. Must be runnable\n if core.lifecycle_phase ~= \"runnable\" then\n return err(\"execution_not_leaseable\")\n end\n\n -- 3. Must be attempt_interrupted (resumed after suspension/delay)\n if core.attempt_state ~= \"attempt_interrupted\" then\n return err(\"not_a_resumed_execution\")\n end\n\n -- 4. Validate claim grant\n local grant_raw = redis.call(\"HGETALL\", K.claim_grant_key)\n if #grant_raw == 0 then\n return err(\"invalid_claim_grant\")\n end\n local grant = hgetall_to_table(grant_raw)\n\n -- Validate grant matches (grant key is execution-scoped, so only check worker_id)\n if grant.worker_id ~= A.worker_id then\n return err(\"invalid_claim_grant\")\n end\n\n -- Check grant expiry\n if is_set(grant.grant_expires_at) and tonumber(grant.grant_expires_at) < now_ms then\n redis.call(\"DEL\", K.claim_grant_key)\n return err(\"claim_grant_expired\")\n end\n\n -- Consume grant (DEL)\n redis.call(\"DEL\", K.claim_grant_key)\n\n -- 5. Resume existing attempt: attempt_interrupted -> started\n -- Same attempt continues \u{2014} no new attempt_index.\n local att_idx = core.current_attempt_index\n local att_id = core.current_attempt_id\n local epoch = tonumber(core.current_lease_epoch or \"0\") + 1\n local expires_at = now_ms + A.lease_ttl_ms\n local renewal_deadline = now_ms + math.floor(A.lease_ttl_ms * 2 / 3)\n\n redis.call(\"HSET\", K.attempt_hash,\n \"attempt_state\", \"started\",\n \"resumed_at\", tostring(now_ms),\n \"lease_id\", A.lease_id,\n \"lease_epoch\", tostring(epoch),\n \"worker_id\", A.worker_id,\n \"worker_instance_id\", A.worker_instance_id,\n \"suspended_at\", \"\",\n \"suspension_id\", \"\")\n\n -- 6. Create new lease bound to same attempt\n redis.call(\"DEL\", K.lease_current_key)\n redis.call(\"HSET\", K.lease_current_key,\n \"lease_id\", A.lease_id,\n \"lease_epoch\", tostring(epoch),\n \"execution_id\", A.execution_id,\n \"attempt_id\", att_id,\n \"worker_id\", A.worker_id,\n \"worker_instance_id\", A.worker_instance_id,\n \"acquired_at\", tostring(now_ms),\n \"expires_at\", tostring(expires_at),\n \"last_renewed_at\", tostring(now_ms),\n \"renewal_deadline\", tostring(renewal_deadline))\n\n -- 7. Update exec_core \u{2014} ALL 7 state vector dimensions\n redis.call(\"HSET\", K.core_key,\n \"lifecycle_phase\", \"active\",\n \"ownership_state\", \"leased\",\n \"eligibility_state\", \"not_applicable\",\n \"blocking_reason\", \"none\",\n \"blocking_detail\", \"\",\n \"terminal_outcome\", \"none\",\n \"attempt_state\", \"running_attempt\",\n \"public_state\", \"active\",\n \"current_lease_id\", A.lease_id,\n \"current_lease_epoch\", tostring(epoch),\n \"current_worker_id\", A.worker_id,\n \"current_worker_instance_id\", A.worker_instance_id,\n \"current_lane\", A.lane,\n \"lease_acquired_at\", tostring(now_ms),\n \"lease_expires_at\", tostring(expires_at),\n \"lease_last_renewed_at\", tostring(now_ms),\n \"lease_renewal_deadline\", tostring(renewal_deadline),\n \"lease_expired_at\", \"\",\n \"lease_revoked_at\", \"\",\n \"lease_revoke_reason\", \"\",\n \"last_transition_at\", tostring(now_ms),\n \"last_mutation_at\", tostring(now_ms))\n\n -- 8. Update indexes\n redis.call(\"ZREM\", K.eligible_zset, A.execution_id)\n redis.call(\"ZADD\", K.lease_expiry_key, expires_at, A.execution_id)\n redis.call(\"SADD\", K.worker_leases_key, A.execution_id)\n redis.call(\"ZADD\", K.active_index_key, expires_at, A.execution_id)\n\n -- 9. ZADD attempt_timeout with remaining timeout\n if is_set(A.remaining_attempt_timeout_ms) then\n local remaining = tonumber(A.remaining_attempt_timeout_ms)\n if remaining > 0 then\n redis.call(\"ZADD\", K.attempt_timeout_key,\n now_ms + remaining, A.execution_id)\n end\n end\n\n -- 10. Lease history event\n redis.call(\"XADD\", K.lease_history_key, \"MAXLEN\", \"~\", 1000, \"*\",\n \"event\", \"acquired\",\n \"lease_id\", A.lease_id,\n \"lease_epoch\", tostring(epoch),\n \"attempt_index\", att_idx,\n \"attempt_id\", att_id,\n \"worker_id\", A.worker_id,\n \"reason\", \"claim_resumed\",\n \"ts\", tostring(now_ms))\n\n return ok(A.lease_id, tostring(epoch), tostring(expires_at),\n att_id, att_idx, \"resumed\")\nend)\n\n\n-- source: lua/stream.lua\n-- FlowFabric stream append function\n-- Reference: RFC-006 (Stream), RFC-010 \u{a7}4.1 (#20)\n--\n-- Depends on helpers: ok, err, is_set\n\n---------------------------------------------------------------------------\n-- #20 ff_append_frame\n--\n-- Append a frame to the attempt-scoped output stream. Highest-throughput\n-- function \u{2014} called once per token during LLM streaming. Uses lite lease\n-- validation (HMGET, not HGETALL) for minimal overhead. Class B operation.\n--\n-- KEYS (3): exec_core, stream_data, stream_meta\n-- ARGV (13): execution_id, attempt_index, lease_id, lease_epoch,\n-- frame_type, ts, payload, encoding, correlation_id,\n-- source, retention_maxlen, attempt_id, max_payload_bytes\n---------------------------------------------------------------------------\nredis.register_function(\'ff_append_frame\', function(keys, args)\n local K = {\n core_key = keys[1],\n stream_key = keys[2],\n stream_meta = keys[3],\n }\n\n local A = {\n execution_id = args[1],\n attempt_index = args[2],\n lease_id = args[3],\n lease_epoch = args[4],\n frame_type = args[5],\n ts = args[6] or \"\",\n payload = args[7] or \"\",\n encoding = args[8] or \"utf8\",\n correlation_id = args[9] or \"\",\n source = args[10] or \"worker\",\n retention_maxlen = tonumber(args[11] or \"0\"),\n attempt_id = args[12] or \"\",\n max_payload_bytes = tonumber(args[13] or \"65536\"),\n }\n\n -- 1. Payload size guard (v1 default: 64KB)\n if #A.payload > A.max_payload_bytes then\n return err(\"retention_limit_exceeded\")\n end\n\n -- 2. Lite lease validation via HMGET (Class B \u{2014} no full HGETALL)\n local core = redis.call(\"HMGET\", K.core_key,\n \"current_attempt_index\", -- [1]\n \"current_lease_id\", -- [2]\n \"current_lease_epoch\", -- [3]\n \"lease_expires_at\", -- [4]\n \"lifecycle_phase\", -- [5]\n \"ownership_state\") -- [6]\n\n -- Execution must be active\n if core[5] ~= \"active\" then\n return err(\"stream_closed\")\n end\n\n -- Ownership must not be expired/revoked\n if core[6] == \"lease_expired_reclaimable\" or core[6] == \"lease_revoked\" then\n return err(\"stale_owner_cannot_append\")\n end\n\n -- Lease must not be expired (server time check)\n local t = redis.call(\"TIME\")\n local now_ms = tonumber(t[1]) * 1000 + math.floor(tonumber(t[2]) / 1000)\n if tonumber(core[4] or \"0\") <= now_ms then\n return err(\"stale_owner_cannot_append\")\n end\n\n -- Attempt index must match\n if tostring(core[1]) ~= A.attempt_index then\n return err(\"stale_owner_cannot_append\")\n end\n\n -- Lease identity must match\n if core[2] ~= A.lease_id or tostring(core[3]) ~= A.lease_epoch then\n return err(\"stale_owner_cannot_append\")\n end\n\n -- 3. Lazy-create stream metadata on first append\n if redis.call(\"EXISTS\", K.stream_meta) == 0 then\n redis.call(\"HSET\", K.stream_meta,\n \"stream_id\", A.execution_id .. \":\" .. A.attempt_index,\n \"execution_id\", A.execution_id,\n \"attempt_id\", A.attempt_id,\n \"attempt_index\", A.attempt_index,\n \"created_at\", tostring(now_ms),\n \"closed_at\", \"\",\n \"closed_reason\", \"\",\n \"durability_mode\", \"durable_full\",\n \"retention_maxlen\", tostring(A.retention_maxlen),\n \"last_sequence\", \"\",\n \"frame_count\", \"0\",\n \"total_bytes\", \"0\",\n \"last_frame_at\", \"\")\n end\n\n -- 4. Check stream not closed\n local closed = redis.call(\"HGET\", K.stream_meta, \"closed_at\")\n if is_set(closed) then\n return err(\"stream_closed\")\n end\n\n -- 5. Append frame via XADD\n local ts = A.ts ~= \"\" and A.ts or tostring(now_ms)\n local xadd_args = {\n K.stream_key, \"*\",\n \"frame_type\", A.frame_type,\n \"ts\", ts,\n \"payload\", A.payload,\n \"encoding\", A.encoding,\n \"source\", A.source,\n }\n -- Only include correlation_id if non-empty (saves memory on high-throughput paths)\n if A.correlation_id ~= \"\" then\n xadd_args[#xadd_args + 1] = \"correlation_id\"\n xadd_args[#xadd_args + 1] = A.correlation_id\n end\n\n local entry_id = redis.call(\"XADD\", unpack(xadd_args))\n\n -- 6. Update stream metadata.\n --\n -- `frame_count` is the LIFETIME append counter \u{2014} it is NOT the number\n -- of frames currently retained in the stream. XTRIM below prunes old\n -- entries without decrementing this counter, so on a 10k-cap stream\n -- that has seen 1M appends `frame_count==1_000_000` while `XLEN==10_000`.\n -- Consumers that want the retained count must `XLEN` the stream\n -- directly; `frame_count` is the right number for metering, billing,\n -- per-attempt usage attribution \u{2014} anything that needs \"how much was\n -- produced\", not \"how much is still here\".\n local frame_count = redis.call(\"HINCRBY\", K.stream_meta, \"frame_count\", 1)\n redis.call(\"HINCRBY\", K.stream_meta, \"total_bytes\", #A.payload)\n redis.call(\"HSET\", K.stream_meta,\n \"last_sequence\", entry_id,\n \"last_frame_at\", tostring(now_ms))\n\n -- 7. Apply retention trim.\n --\n -- XTRIM MAXLEN `~` (approximate) is the default: it trims at macro-node\n -- boundaries for throughput, so actual retained length floats slightly\n -- above the target. Under a token-per-frame burst this can briefly\n -- hold up to 2x the requested retention.\n --\n -- When the caller explicitly passes `retention_maxlen > 0` they\'ve\n -- opted into a specific bound; honor it EXACTLY with `=`. Bursty LLM\n -- workloads that care about predictable memory pay a small XTRIM-rate\n -- cost for the tighter bound. Default (A.retention_maxlen == 0) still\n -- uses `~` for the lane-level unbounded-growth guard, where throughput\n -- matters more than exact retention.\n local maxlen = A.retention_maxlen\n local trim_op\n if maxlen == 0 then\n maxlen = 10000 -- default cap prevents unbounded growth\n trim_op = \"~\"\n else\n trim_op = \"=\" -- caller-supplied bound is honored exactly\n end\n redis.call(\"XTRIM\", K.stream_key, \"MAXLEN\", trim_op, maxlen)\n\n return ok(entry_id, tostring(frame_count))\nend)\n\n---------------------------------------------------------------------------\n-- ff_read_attempt_stream\n--\n-- Read frames from an attempt-scoped output stream via XRANGE. Non-blocking\n-- (safe in Lua Functions). Cluster-safe: stream_key and stream_meta share\n-- the {p:N} hash tag.\n--\n-- Returns an empty array when the stream key does not exist (not an error \u{2014}\n-- the attempt may not have produced frames yet). Also reports\n-- (closed_at, closed_reason) from stream_meta so callers can stop polling.\n--\n-- KEYS (2): stream_data, stream_meta\n-- ARGV (3): from_id, to_id, count_limit (must be 1..=HARD_CAP; 0 rejected)\n---------------------------------------------------------------------------\nredis.register_function(\'ff_read_attempt_stream\', function(keys, args)\n local stream_key = keys[1]\n local stream_meta = keys[2]\n\n local from_id = args[1] or \"-\"\n local to_id = args[2] or \"+\"\n local count_limit = tonumber(args[3] or \"0\")\n\n -- Explicit reject on zero/negative AND on over-cap. The REST and SDK\n -- layers reject both at their boundary (R2/R3); the Lua check is the\n -- last line of defense for direct FCALL callers (tests, future\n -- consumers) so they get a clear error instead of a silently-clamped\n -- whole-stream read or reply-size blowup.\n --\n -- Before PR#7 this was asymmetric: < 1 rejected with invalid_input,\n -- but > HARD_CAP was silently clamped. That contradicted RFC-006\n -- \u{a7}Input validation (\"both bounds reject, neither silently clamps\").\n -- Now both edges reject symmetrically.\n --\n -- HARD_CAP mirrors ff_core::contracts::STREAM_READ_HARD_CAP \u{2014} keep in sync.\n local HARD_CAP = 10000\n if count_limit == nil or count_limit < 1 then\n return err(\"invalid_input\", \"count_limit must be >= 1\")\n end\n if count_limit > HARD_CAP then\n return err(\"invalid_input\", \"count_limit_exceeds_hard_cap\")\n end\n\n -- Stream may legitimately not exist (never-written attempt). XRANGE on a\n -- missing key returns empty, so no pre-check is needed.\n local entries = redis.call(\"XRANGE\", stream_key, from_id, to_id,\n \"COUNT\", count_limit)\n\n -- Fetch terminal markers from stream_meta. A never-written attempt has\n -- no stream_meta hash; HMGET returns nils for both fields which we\n -- normalize to empty strings on the return path so Rust can decode a\n -- consistent shape.\n local meta = redis.call(\"HMGET\", stream_meta, \"closed_at\", \"closed_reason\")\n local closed_at = meta[1] or \"\"\n local closed_reason = meta[2] or \"\"\n\n -- entries is an array of [entry_id, [f1, v1, f2, v2, ...]].\n -- Return shape: ok(entries, closed_at, closed_reason). Rust parses the\n -- three fields positionally.\n return ok(entries, closed_at, closed_reason)\nend)\n\n\n-- source: lua/budget.lua\n-- FlowFabric budget functions\n-- Reference: RFC-008 (Budget), RFC-010 \u{a7}4.3 (#30, #31), \u{a7}4.1 (#29a, #29b)\n--\n-- Depends on helpers: ok, err, is_set, hgetall_to_table\n\n---------------------------------------------------------------------------\n-- ff_create_budget (on {b:M})\n--\n-- Create a new budget policy with hard/soft limits on N dimensions.\n-- Idempotent: if EXISTS budget_def \u{2192} return ok_already_satisfied.\n--\n-- KEYS (5): budget_def, budget_limits, budget_usage, budget_resets_zset,\n-- budget_policies_index\n-- ARGV (variable): budget_id, scope_type, scope_id, enforcement_mode,\n-- on_hard_limit, on_soft_limit, reset_interval_ms, now_ms,\n-- dimension_count, dim_1..dim_N, hard_1..hard_N, soft_1..soft_N\n---------------------------------------------------------------------------\nredis.register_function(\'ff_create_budget\', function(keys, args)\n local K = {\n def_key = keys[1],\n limits_key = keys[2],\n usage_key = keys[3],\n resets_zset = keys[4],\n policies_index = keys[5],\n }\n\n local A = {\n budget_id = args[1],\n scope_type = args[2],\n scope_id = args[3],\n enforcement_mode = args[4],\n on_hard_limit = args[5],\n on_soft_limit = args[6],\n reset_interval_ms = args[7],\n now_ms = args[8],\n }\n\n -- Maintain budget_policies_index BEFORE the idempotency guard. SADD is\n -- itself idempotent (no-op on existing members), and hoisting it heals\n -- any pre-existing budget_def that was created before this index was\n -- introduced \u{2014} no migration script required.\n redis.call(\"SADD\", K.policies_index, A.budget_id)\n\n -- Idempotency: already exists \u{2192} return immediately\n if redis.call(\"EXISTS\", K.def_key) == 1 then\n return ok_already_satisfied(A.budget_id)\n end\n\n local dim_count = require_number(args[9], \"dim_count\")\n if type(dim_count) == \"table\" then return dim_count end\n\n -- HSET budget definition\n redis.call(\"HSET\", K.def_key,\n \"budget_id\", A.budget_id,\n \"scope_type\", A.scope_type,\n \"scope_id\", A.scope_id,\n \"enforcement_mode\", A.enforcement_mode,\n \"on_hard_limit\", A.on_hard_limit,\n \"on_soft_limit\", A.on_soft_limit,\n \"reset_interval_ms\", A.reset_interval_ms,\n \"breach_count\", \"0\",\n \"soft_breach_count\", \"0\",\n \"created_at\", A.now_ms,\n \"last_updated_at\", A.now_ms)\n\n -- HSET per-dimension hard and soft limits\n for i = 1, dim_count do\n local dim = args[9 + i]\n local hard = args[9 + dim_count + i]\n local soft = args[9 + 2 * dim_count + i]\n redis.call(\"HSET\", K.limits_key, \"hard:\" .. dim, hard, \"soft:\" .. dim, soft)\n end\n\n -- budget_usage left empty \u{2014} first report_usage will create fields\n\n -- Schedule periodic reset if reset_interval_ms > 0\n local interval_ms = tonumber(A.reset_interval_ms)\n if interval_ms > 0 then\n local next_reset_at = tostring(tonumber(A.now_ms) + interval_ms)\n redis.call(\"HSET\", K.def_key, \"next_reset_at\", next_reset_at)\n redis.call(\"ZADD\", K.resets_zset, tonumber(next_reset_at), A.budget_id)\n end\n\n return ok(A.budget_id)\nend)\n\n---------------------------------------------------------------------------\n-- #30 ff_report_usage_and_check (on {b:M})\n--\n-- Check-before-increment: read current usage, check hard limits. If any\n-- dimension would breach, return HARD_BREACH without incrementing. If safe,\n-- HINCRBY all dimensions, then check soft limits.\n--\n-- Atomic Lua serialization on {b:M} guarantees zero overshoot.\n--\n-- KEYS (3): budget_usage, budget_limits, budget_def\n-- ARGV (variable): dimension_count, dim_1..dim_N, delta_1..delta_N, now_ms, [dedup_key]\n---------------------------------------------------------------------------\nredis.register_function(\'ff_report_usage_and_check\', function(keys, args)\n local K = {\n usage_key = keys[1],\n limits_key = keys[2],\n def_key = keys[3],\n }\n\n local dim_count = require_number(args[1], \"dim_count\")\n if type(dim_count) == \"table\" then return dim_count end\n local now_ms = args[2 * dim_count + 2]\n local dedup_key = args[2 * dim_count + 3] or \"\"\n\n -- Idempotency: if dedup_key provided, check for prior application\n if dedup_key ~= \"\" then\n local existing = redis.call(\"GET\", dedup_key)\n if existing then\n return {1, \"ALREADY_APPLIED\"}\n end\n end\n\n -- Phase 1: CHECK all dimensions BEFORE any increment.\n -- If any hard limit would be breached, reject the entire report.\n for i = 1, dim_count do\n local dim = args[1 + i]\n local delta = tonumber(args[1 + dim_count + i])\n local current = tonumber(redis.call(\"HGET\", K.usage_key, dim) or \"0\")\n local new_total = current + delta\n\n local hard_limit = redis.call(\"HGET\", K.limits_key, \"hard:\" .. dim)\n if hard_limit and hard_limit ~= \"\" and hard_limit ~= false then\n local limit_val = tonumber(hard_limit)\n if limit_val > 0 and new_total > limit_val then\n -- Record breach metadata but DO NOT increment\n redis.call(\"HINCRBY\", K.def_key, \"breach_count\", 1)\n redis.call(\"HSET\", K.def_key,\n \"last_breach_at\", now_ms,\n \"last_breach_dim\", dim,\n \"last_updated_at\", now_ms)\n return {1, \"HARD_BREACH\", dim, tostring(current), tostring(hard_limit)}\n end\n end\n end\n\n -- Phase 2: No hard breach detected \u{2014} safe to increment all dimensions.\n local breached_soft = nil\n for i = 1, dim_count do\n local dim = args[1 + i]\n local delta = tonumber(args[1 + dim_count + i])\n local new_val = redis.call(\"HINCRBY\", K.usage_key, dim, delta)\n\n -- Check soft limit (advisory \u{2014} increment still happens)\n local soft_limit = redis.call(\"HGET\", K.limits_key, \"soft:\" .. dim)\n if soft_limit and soft_limit ~= \"\" and soft_limit ~= false then\n local limit_val = tonumber(soft_limit)\n if limit_val > 0 and new_val > limit_val then\n if not breached_soft then\n breached_soft = dim\n end\n end\n end\n end\n\n -- Update metadata\n redis.call(\"HSET\", K.def_key, \"last_updated_at\", now_ms)\n\n -- Mark dedup key after successful increment (24h TTL)\n if dedup_key ~= \"\" then\n redis.call(\"SET\", dedup_key, \"1\", \"PX\", 86400000)\n end\n\n if breached_soft then\n redis.call(\"HINCRBY\", K.def_key, \"soft_breach_count\", 1)\n local soft_val = tonumber(redis.call(\"HGET\", K.limits_key, \"soft:\" .. breached_soft) or \"0\")\n local cur_val = tonumber(redis.call(\"HGET\", K.usage_key, breached_soft) or \"0\")\n return {1, \"SOFT_BREACH\", breached_soft, tostring(cur_val), tostring(soft_val)}\n end\n\n return {1, \"OK\"}\nend)\n\n---------------------------------------------------------------------------\n-- #31 ff_reset_budget (on {b:M})\n--\n-- Scanner-called periodic reset. Zero all usage fields, record reset,\n-- compute next_reset_at, re-score in reset index.\n--\n-- KEYS (3): budget_def, budget_usage, budget_resets_zset\n-- ARGV (2): budget_id, now_ms\n---------------------------------------------------------------------------\nredis.register_function(\'ff_reset_budget\', function(keys, args)\n local K = {\n def_key = keys[1],\n usage_key = keys[2],\n resets_zset = keys[3],\n }\n\n local A = {\n budget_id = args[1],\n now_ms = args[2],\n }\n\n -- 1. Read usage fields and zero them\n local usage_fields = redis.call(\"HKEYS\", K.usage_key)\n if #usage_fields > 0 then\n local zero_args = {}\n for _, field in ipairs(usage_fields) do\n zero_args[#zero_args + 1] = field\n zero_args[#zero_args + 1] = \"0\"\n end\n redis.call(\"HSET\", K.usage_key, unpack(zero_args))\n end\n\n -- 2. Update budget_def: last_reset_at, reset_count\n redis.call(\"HINCRBY\", K.def_key, \"reset_count\", 1)\n redis.call(\"HSET\", K.def_key,\n \"last_reset_at\", A.now_ms,\n \"last_updated_at\", A.now_ms,\n \"last_breach_at\", \"\",\n \"last_breach_dim\", \"\")\n\n -- 3. Compute next_reset_at from reset_interval_ms\n local interval_ms = tonumber(redis.call(\"HGET\", K.def_key, \"reset_interval_ms\") or \"0\")\n local next_reset_at = \"0\"\n if interval_ms > 0 then\n next_reset_at = tostring(tonumber(A.now_ms) + interval_ms)\n redis.call(\"HSET\", K.def_key, \"next_reset_at\", next_reset_at)\n redis.call(\"ZADD\", K.resets_zset, tonumber(next_reset_at), A.budget_id)\n else\n -- No recurring reset \u{2014} remove from schedule\n redis.call(\"ZREM\", K.resets_zset, A.budget_id)\n end\n\n return ok(next_reset_at)\nend)\n\n---------------------------------------------------------------------------\n-- #29b ff_unblock_execution (on {p:N})\n--\n-- Re-evaluate blocked execution, set eligible_now, ZREM blocked set,\n-- ZADD eligible. All 7 dims.\n--\n-- KEYS (3): exec_core, blocked_zset, eligible_zset\n-- ARGV (3): execution_id, now_ms, expected_blocking_reason\n---------------------------------------------------------------------------\nredis.register_function(\'ff_unblock_execution\', function(keys, args)\n local K = {\n core_key = keys[1],\n blocked_zset = keys[2],\n eligible_zset = keys[3],\n }\n\n local A = {\n execution_id = args[1],\n now_ms = args[2],\n expected_blocking_reason = args[3] or \"\",\n }\n\n -- 1. Read execution core\n local raw = redis.call(\"HGETALL\", K.core_key)\n if #raw == 0 then return err(\"execution_not_found\") end\n local core = hgetall_to_table(raw)\n\n -- 2. Must be runnable\n if core.lifecycle_phase ~= \"runnable\" then\n return err(\"execution_not_eligible\")\n end\n\n -- 3. Must be blocked\n local es = core.eligibility_state\n if es ~= \"blocked_by_budget\" and es ~= \"blocked_by_quota\"\n and es ~= \"blocked_by_route\" and es ~= \"blocked_by_operator\" then\n return err(\"execution_not_eligible\")\n end\n\n -- 4. Validate expected blocking reason (prevent stale unblock)\n if A.expected_blocking_reason ~= \"\" then\n if core.blocking_reason ~= A.expected_blocking_reason then\n return err(\"execution_not_eligible\")\n end\n end\n\n -- 5. Transition: all 7 dims\n local priority = tonumber(core.priority or \"0\")\n local created_at = tonumber(core.created_at or \"0\")\n local score = 0 - (priority * 1000000000000) + created_at\n\n redis.call(\"HSET\", K.core_key,\n \"lifecycle_phase\", \"runnable\",\n \"ownership_state\", \"unowned\",\n \"eligibility_state\", \"eligible_now\",\n \"blocking_reason\", \"waiting_for_worker\",\n \"blocking_detail\", \"\",\n \"terminal_outcome\", \"none\",\n \"attempt_state\", core.attempt_state or \"pending_first_attempt\",\n \"public_state\", \"waiting\",\n \"last_transition_at\", A.now_ms,\n \"last_mutation_at\", A.now_ms)\n\n -- 6. Move from blocked to eligible\n redis.call(\"ZREM\", K.blocked_zset, A.execution_id)\n redis.call(\"ZADD\", K.eligible_zset, score, A.execution_id)\n\n return ok(\"unblocked\")\nend)\n\n---------------------------------------------------------------------------\n-- #29a ff_block_execution_for_admission (on {p:N})\n--\n-- Parameterized block: set eligibility/blocking for budget/quota/route/lane\n-- denial, ZREM eligible, ZADD target blocked set. All 7 dims.\n--\n-- KEYS (3): exec_core, eligible_zset, target_blocked_zset\n-- ARGV (4): execution_id, blocking_reason, blocking_detail, now_ms\n---------------------------------------------------------------------------\nredis.register_function(\'ff_block_execution_for_admission\', function(keys, args)\n local K = {\n core_key = keys[1],\n eligible_zset = keys[2],\n blocked_zset = keys[3],\n }\n\n local A = {\n execution_id = args[1],\n blocking_reason = args[2],\n blocking_detail = args[3] or \"\",\n now_ms = args[4],\n }\n\n -- 1. Read execution core\n local raw = redis.call(\"HGETALL\", K.core_key)\n if #raw == 0 then return err(\"execution_not_found\") end\n local core = hgetall_to_table(raw)\n\n -- 2. Must be runnable\n if core.lifecycle_phase ~= \"runnable\" then\n if core.lifecycle_phase == \"terminal\" then\n return err(\"execution_not_active\", core.terminal_outcome or \"\", core.current_lease_epoch or \"\")\n end\n return err(\"execution_not_eligible\")\n end\n\n -- 3. Map blocking_reason to eligibility_state\n local REASON_TO_ELIGIBILITY = {\n waiting_for_budget = \"blocked_by_budget\",\n waiting_for_quota = \"blocked_by_quota\",\n waiting_for_capable_worker = \"blocked_by_route\",\n paused_by_operator = \"blocked_by_operator\",\n paused_by_policy = \"blocked_by_lane_state\",\n }\n local eligibility = REASON_TO_ELIGIBILITY[A.blocking_reason]\n if not eligibility then\n return err(\"invalid_blocking_reason\")\n end\n\n -- 4. Transition: all 7 dims\n redis.call(\"HSET\", K.core_key,\n \"lifecycle_phase\", \"runnable\",\n \"ownership_state\", \"unowned\",\n \"eligibility_state\", eligibility,\n \"blocking_reason\", A.blocking_reason,\n \"blocking_detail\", A.blocking_detail,\n \"terminal_outcome\", \"none\",\n \"attempt_state\", core.attempt_state or \"pending_first_attempt\",\n \"public_state\", \"rate_limited\",\n \"last_transition_at\", A.now_ms,\n \"last_mutation_at\", A.now_ms)\n\n -- 5. Move from eligible to blocked\n redis.call(\"ZREM\", K.eligible_zset, A.execution_id)\n redis.call(\"ZADD\", K.blocked_zset, tonumber(A.now_ms), A.execution_id)\n\n return ok(\"blocked\")\nend)\n\n\n-- source: lua/quota.lua\n-- FlowFabric quota and rate-limit functions\n-- Reference: RFC-008 (Quota), RFC-010 \u{a7}4.4 (#32)\n--\n-- Depends on helpers: ok, err, is_set\n\n---------------------------------------------------------------------------\n-- ff_create_quota_policy (on {q:K})\n--\n-- Create a new quota/rate-limit policy.\n-- Idempotent: if EXISTS quota_def \u{2192} return ok_already_satisfied.\n--\n-- KEYS (5): quota_def, quota_window_zset, quota_concurrency_counter,\n-- admitted_set, quota_policies_index\n-- ARGV (5): quota_policy_id, window_seconds, max_requests_per_window,\n-- max_concurrent, now_ms\n---------------------------------------------------------------------------\nredis.register_function(\'ff_create_quota_policy\', function(keys, args)\n local K = {\n def_key = keys[1],\n window_zset = keys[2],\n concurrency_key = keys[3],\n admitted_set = keys[4],\n policies_index = keys[5],\n }\n\n local A = {\n quota_policy_id = args[1],\n window_seconds = args[2],\n max_requests_per_window = args[3],\n max_concurrent = args[4],\n now_ms = args[5],\n }\n\n -- Idempotency: already exists \u{2192} return immediately\n if redis.call(\"EXISTS\", K.def_key) == 1 then\n return ok_already_satisfied(A.quota_policy_id)\n end\n\n -- HSET quota definition\n redis.call(\"HSET\", K.def_key,\n \"quota_policy_id\", A.quota_policy_id,\n \"requests_per_window_seconds\", A.window_seconds,\n \"max_requests_per_window\", A.max_requests_per_window,\n \"active_concurrency_cap\", A.max_concurrent,\n \"created_at\", A.now_ms)\n\n -- Init concurrency counter to 0\n redis.call(\"SET\", K.concurrency_key, \"0\")\n\n -- Register in partition-level policy index (for cluster-safe discovery)\n redis.call(\"SADD\", K.policies_index, A.quota_policy_id)\n\n -- admitted_set + quota_window_zset left empty (populated on admission)\n\n return ok(A.quota_policy_id)\nend)\n\n---------------------------------------------------------------------------\n-- #32 ff_check_admission_and_record (on {q:K})\n--\n-- Idempotent sliding-window rate check + concurrency check.\n-- If admitted: ZADD window, SET NX guard, optional INCR concurrency,\n-- SADD to admitted_set (for cluster-safe reconciler discovery).\n--\n-- KEYS (5): window_zset, concurrency_counter, quota_def, admitted_guard_key,\n-- admitted_set\n-- ARGV (6): now_ms, window_seconds, rate_limit, concurrency_cap,\n-- execution_id, jitter_ms\n---------------------------------------------------------------------------\nredis.register_function(\'ff_check_admission_and_record\', function(keys, args)\n local K = {\n window_zset = keys[1],\n concurrency_key = keys[2],\n quota_def = keys[3],\n admitted_guard_key = keys[4],\n admitted_set = keys[5],\n }\n\n local now_ms_n = require_number(args[1], \"now_ms\")\n if type(now_ms_n) == \"table\" then return now_ms_n end\n local window_seconds_n = require_number(args[2], \"window_seconds\")\n if type(window_seconds_n) == \"table\" then return window_seconds_n end\n local rate_limit_n = require_number(args[3], \"rate_limit\")\n if type(rate_limit_n) == \"table\" then return rate_limit_n end\n local concurrency_cap_n = require_number(args[4], \"concurrency_cap\")\n if type(concurrency_cap_n) == \"table\" then return concurrency_cap_n end\n\n local A = {\n now_ms = now_ms_n,\n window_seconds = window_seconds_n,\n rate_limit = rate_limit_n,\n concurrency_cap = concurrency_cap_n,\n execution_id = args[5],\n jitter_ms = tonumber(args[6] or \"0\"),\n }\n\n local window_ms = A.window_seconds * 1000\n\n -- 1. Idempotency guard: already admitted in this window?\n if redis.call(\"EXISTS\", K.admitted_guard_key) == 1 then\n return { \"ALREADY_ADMITTED\" }\n end\n\n -- 2. Sliding window: remove expired entries\n redis.call(\"ZREMRANGEBYSCORE\", K.window_zset, \"-inf\", A.now_ms - window_ms)\n\n -- 3. Check rate limit\n if A.rate_limit > 0 then\n local current_count = redis.call(\"ZCARD\", K.window_zset)\n if current_count >= A.rate_limit then\n -- Compute retry_after from oldest entry\n local oldest = redis.call(\"ZRANGE\", K.window_zset, 0, 0, \"WITHSCORES\")\n local retry_after_ms = 0\n if #oldest >= 2 then\n retry_after_ms = tonumber(oldest[2]) + window_ms - A.now_ms\n if retry_after_ms < 0 then retry_after_ms = 0 end\n end\n local jitter = 0\n if A.jitter_ms > 0 then\n jitter = math.random(0, A.jitter_ms)\n end\n return { \"RATE_EXCEEDED\", tostring(retry_after_ms + jitter) }\n end\n end\n\n -- 4. Check concurrency cap\n if A.concurrency_cap > 0 then\n local active = tonumber(redis.call(\"GET\", K.concurrency_key) or \"0\")\n if active >= A.concurrency_cap then\n return { \"CONCURRENCY_EXCEEDED\" }\n end\n end\n\n -- 5. Admit: record in sliding window (execution_id as member \u{2014} idempotent)\n redis.call(\"ZADD\", K.window_zset, A.now_ms, A.execution_id)\n\n -- 6. Set admitted guard key with TTL = window size\n -- Guard: PX 0 or PX <0 causes Valkey error inside Lua (after ZADD committed).\n if window_ms > 0 then\n redis.call(\"SET\", K.admitted_guard_key, \"1\", \"PX\", window_ms, \"NX\")\n end\n\n -- 7. Increment concurrency counter if cap is set\n if A.concurrency_cap > 0 then\n redis.call(\"INCR\", K.concurrency_key)\n end\n\n -- 8. Track in admitted set (for cluster-safe reconciler \u{2014} replaces SCAN)\n redis.call(\"SADD\", K.admitted_set, A.execution_id)\n\n return { \"ADMITTED\" }\nend)\n\n---------------------------------------------------------------------------\n-- ff_release_admission (on {q:K})\n--\n-- Release a previously-recorded admission slot. Called when a claim grant\n-- fails after admission was recorded, preventing leaked concurrency slots.\n--\n-- KEYS (3): admitted_guard_key, admitted_set, concurrency_counter\n-- ARGV (1): execution_id\n---------------------------------------------------------------------------\nredis.register_function(\'ff_release_admission\', function(keys, args)\n local K = {\n admitted_guard_key = keys[1],\n admitted_set = keys[2],\n concurrency_key = keys[3],\n }\n\n local execution_id = args[1]\n\n -- 1. Delete the guard key (idempotent \u{2014} DEL returns 0 if absent)\n redis.call(\"DEL\", K.admitted_guard_key)\n\n -- 2. Remove from admitted set\n redis.call(\"SREM\", K.admitted_set, execution_id)\n\n -- 3. Decrement concurrency counter (floor at 0)\n local current = tonumber(redis.call(\"GET\", K.concurrency_key) or \"0\")\n if current > 0 then\n redis.call(\"DECR\", K.concurrency_key)\n end\n\n return {1, \"OK\", \"released\"}\nend)\n\n\n-- source: lua/flow.lua\n-- FlowFabric flow coordination and dependency functions\n-- Reference: RFC-007 (Flow), RFC-010 \u{a7}4.1 (#22-24, #35), \u{a7}4.2 (#29)\n--\n-- Depends on helpers: ok, err, is_set, hgetall_to_table\n\n---------------------------------------------------------------------------\n-- Cycle detection helper\n---------------------------------------------------------------------------\n\n-- Max nodes to visit during cycle detection BFS.\nlocal MAX_CYCLE_CHECK_NODES = 1000\n\n-- Detect if adding an edge upstream\u{2192}downstream would create a cycle.\n-- BFS from downstream through existing outgoing edges: if upstream is\n-- reachable, the new edge closes a loop (A\u{2192}B\u{2192}C\u{2192}A deadlock).\n-- All keys share the same {fp:N} slot (flow-partition co-location).\n-- @param flow_prefix e.g. \"ff:flow:{fp:0}:<flow_id>\"\n-- @param start_eid downstream of proposed edge (BFS start)\n-- @param target_eid upstream of proposed edge (looking for this)\n-- @return true if a cycle would be created\nlocal function detect_cycle(flow_prefix, start_eid, target_eid)\n local visited = {}\n local queue = {start_eid}\n local count = 0\n\n while #queue > 0 do\n local next_queue = {}\n for _, eid in ipairs(queue) do\n if eid == target_eid then\n return true\n end\n if not visited[eid] then\n visited[eid] = true\n count = count + 1\n if count > MAX_CYCLE_CHECK_NODES then\n return true -- graph too large to verify; reject conservatively\n end\n local out_key = flow_prefix .. \":out:\" .. eid\n local edges = redis.call(\"SMEMBERS\", out_key)\n for _, edge_id in ipairs(edges) do\n local edge_key = flow_prefix .. \":edge:\" .. edge_id\n local next_eid = redis.call(\"HGET\", edge_key, \"downstream_execution_id\")\n if next_eid and next_eid ~= \"\" and not visited[next_eid] then\n next_queue[#next_queue + 1] = next_eid\n end\n end\n end\n end\n queue = next_queue\n end\n\n return false\nend\n\n---------------------------------------------------------------------------\n-- ff_create_flow (on {fp:N})\n--\n-- Create a new flow container. Idempotent: if flow_core already exists,\n-- returns ok_already_satisfied.\n--\n-- KEYS (3): flow_core, members_set, flow_index\n-- ARGV (4): flow_id, flow_kind, namespace, now_ms (IGNORED \u{2014} see note below)\n--\n-- NOTE: ARGV[4] (`now_ms`) is accepted for caller compatibility but NOT\n-- used for stored timestamps. We read server time via redis.call(\"TIME\")\n-- so created_at / last_mutation_at agree with fields written by other\n-- Lua functions (ff_complete_execution etc.) under client clock skew.\n---------------------------------------------------------------------------\nredis.register_function(\'ff_create_flow\', function(keys, args)\n local K = {\n flow_core = keys[1],\n members_set = keys[2],\n flow_index = keys[3],\n }\n\n local A = {\n flow_id = args[1],\n flow_kind = args[2],\n namespace = args[3],\n -- args[4] is client-provided now_ms; intentionally ignored.\n }\n\n -- Server time (not client-provided) so created_at / last_mutation_at\n -- agree with timestamps written by ff_complete_execution and peers.\n local now_ms = server_time_ms()\n\n -- Maintain flow_index BEFORE the idempotency guard. SADD is itself\n -- idempotent (no-op on existing members), and hoisting it heals any\n -- pre-existing flow_core that was created before this index was\n -- introduced \u{2014} no migration script required.\n redis.call(\"SADD\", K.flow_index, A.flow_id)\n\n -- Idempotency: if flow already exists, return already_satisfied\n if redis.call(\"EXISTS\", K.flow_core) == 1 then\n return ok_already_satisfied(A.flow_id)\n end\n\n -- Create flow core record\n redis.call(\"HSET\", K.flow_core,\n \"flow_id\", A.flow_id,\n \"flow_kind\", A.flow_kind,\n \"namespace\", A.namespace,\n \"graph_revision\", 0,\n \"node_count\", 0,\n \"edge_count\", 0,\n \"public_flow_state\", \"open\",\n \"created_at\", now_ms,\n \"last_mutation_at\", now_ms)\n\n return ok(A.flow_id)\nend)\n\n---------------------------------------------------------------------------\n-- ff_add_execution_to_flow (on {fp:N} \u{2014} single atomic FCALL)\n--\n-- Add a member execution to a flow AND stamp the flow_id back-pointer\n-- on exec_core in one atomic commit. Per RFC-011 \u{a7}7.3, exec keys\n-- co-locate with their parent flow\'s partition under hash-tag routing,\n-- so exec_core shares the `{fp:N}` hash-tag with flow_core / members_set\n-- / flow_index. All four KEYS hash to the same slot; no CROSSSLOT.\n--\n-- KEYS (4): flow_core, members_set, flow_index, exec_core\n-- ARGV (3): flow_id, execution_id, now_ms (IGNORED \u{2014} server time used)\n--\n-- Validates-before-writing: flow_not_found / flow_already_terminal\n-- early-returns fire BEFORE any write (step 1 below). On those error\n-- paths, zero state mutates \u{2014} atomicity by construction at the Lua\n-- level (Valkey scripting contract: no redis.call() before error_reply\n-- means nothing to roll back). See RFC-011 \u{a7}7.3.1 tests for the\n-- structural pin.\n--\n-- Invariant (post-RFC-011): a successful call commits BOTH the flow-\n-- index updates AND the exec_core.flow_id stamp in one atomic unit.\n-- Readers can assume exec_core.flow_id == flow_id iff the exec is in\n-- members_set. The pre-RFC-011 two-phase contract + \u{a7}5.5 orphan-window\n-- + issue #21 reconciliation-scanner plan are all superseded.\n---------------------------------------------------------------------------\nredis.register_function(\'ff_add_execution_to_flow\', function(keys, args)\n local K = {\n flow_core = keys[1],\n members_set = keys[2],\n flow_index = keys[3],\n exec_core = keys[4],\n }\n\n local A = {\n flow_id = args[1],\n execution_id = args[2],\n -- args[3] is client-provided now_ms; intentionally ignored in favour\n -- of redis.call(\"TIME\") to keep last_mutation_at consistent with\n -- timestamps stamped by ff_complete_execution and peers.\n }\n\n local now_ms = server_time_ms()\n\n -- 1. Validate flow exists and is not terminal, and execution exists.\n -- Validates-before-writing: no redis.call() writes happen before\n -- these guards, so the error paths commit zero state (symmetric\n -- with step 2\'s cross-flow guard on AlreadyMember).\n local raw = redis.call(\"HGETALL\", K.flow_core)\n if #raw == 0 then return err(\"flow_not_found\") end\n local flow = hgetall_to_table(raw)\n local pfs = flow.public_flow_state or \"\"\n if pfs == \"cancelled\" or pfs == \"completed\" or pfs == \"failed\" then\n return err(\"flow_already_terminal\")\n end\n -- Execution must exist \u{2014} otherwise step 5\'s HSET exec_core would\n -- silently create a hash for a non-existent exec, leading to an\n -- inconsistent members_set \u{2194} exec_core state. Symmetric with the\n -- flow_not_found guard above.\n if redis.call(\"EXISTS\", K.exec_core) == 0 then\n return err(\"execution_not_found\")\n end\n\n -- Self-heal flow_index for LIVE flows only. The projector may have\n -- SREMd this flow after observing an all-terminal sample, yet the\n -- flow is still \"open\" per flow_core and can accept new members.\n -- Re-add idempotently so the next projector cycle picks the flow\n -- back up. Runs only after the terminal-state guard above so we do\n -- not resurrect cancelled/completed/failed flows into the active\n -- index. Same {fp:N} slot as the other KEYS, so atomic with the\n -- membership mutation below.\n redis.call(\"SADD\", K.flow_index, A.flow_id)\n\n -- 2. Idempotency: already a member of THIS flow\'s members_set.\n -- Still stamp exec_core.flow_id defensively \u{2014} an earlier call\n -- may have committed members_set but crashed before exec_core\n -- HSET under the legacy two-phase shape. Stamping here is a\n -- no-op if flow_id already matches; heals any pre-RFC-011\n -- orphans encountered on a rolling upgrade.\n --\n -- Cross-flow guard on the orphan case: if exec_core.flow_id is\n -- already set to a DIFFERENT flow (corrupted-state orphan that\n -- IS in this flow\'s members_set but stamped wrong), refuse\n -- instead of silently re-stamping. Symmetric with step 3\'s\n -- guard on the not-yet-a-member branch \u{2014} catches the same\n -- invariant violation earlier in the path. Empty existing\n -- flow_id goes through the heal path normally.\n if redis.call(\"SISMEMBER\", K.members_set, A.execution_id) == 1 then\n local existing = redis.call(\"HGET\", K.exec_core, \"flow_id\")\n if existing and existing ~= \"\" and existing ~= A.flow_id then\n return err(\"already_member_of_different_flow:\" .. existing)\n end\n redis.call(\"HSET\", K.exec_core, \"flow_id\", A.flow_id)\n local nc = redis.call(\"HGET\", K.flow_core, \"node_count\") or \"0\"\n return ok_already_satisfied(A.execution_id, nc)\n end\n\n -- 3. Cross-flow guard: if exec_core.flow_id is already set to a\n -- DIFFERENT flow, refuse \u{2014} silently re-stamping would orphan\n -- the other flow\'s accounting. An exec belongs to at most one\n -- flow at a time per RFC-007.\n local existing_flow_id = redis.call(\"HGET\", K.exec_core, \"flow_id\")\n if existing_flow_id and existing_flow_id ~= \"\" and existing_flow_id ~= A.flow_id then\n return err(\"already_member_of_different_flow:\" .. existing_flow_id)\n end\n\n -- 4. Add to membership set\n redis.call(\"SADD\", K.members_set, A.execution_id)\n\n -- 5. Stamp the flow_id back-pointer on exec_core. Co-located with\n -- the flow\'s partition under RFC-011 \u{a7}7.3 hash-tag routing; this\n -- HSET is part of the same atomic FCALL as the SADD above.\n redis.call(\"HSET\", K.exec_core, \"flow_id\", A.flow_id)\n\n -- 6. Increment node_count and graph_revision\n local new_nc = redis.call(\"HINCRBY\", K.flow_core, \"node_count\", 1)\n local new_rev = redis.call(\"HINCRBY\", K.flow_core, \"graph_revision\", 1)\n redis.call(\"HSET\", K.flow_core, \"last_mutation_at\", now_ms)\n\n return ok(A.execution_id, tostring(new_nc))\nend)\n\n---------------------------------------------------------------------------\n-- ff_cancel_flow (on {fp:N})\n--\n-- Cancel a flow. Returns the member list for the caller to dispatch\n-- individual cancellations cross-partition.\n--\n-- KEYS (3): flow_core, members_set, flow_index (RESERVED \u{2014} see below)\n-- ARGV (4): flow_id, reason, cancellation_policy, now_ms (IGNORED \u{2014}\n-- server time used so `cancelled_at` agrees with peer Lua fields)\n--\n-- KEYS[3] (flow_index) is accepted for caller-compatibility with the\n-- shared FlowStructOpKeys wrapper, but this function does NOT mutate\n-- flow_index. The projector is the sole SREM writer (see the \"4b\" note\n-- in the body below).\n---------------------------------------------------------------------------\nredis.register_function(\'ff_cancel_flow\', function(keys, args)\n local K = {\n flow_core = keys[1],\n members_set = keys[2],\n -- keys[3] is flow_index; present in KEYS for wrapper symmetry but\n -- unused in this function (see rationale near the end of the body).\n }\n\n local A = {\n flow_id = args[1],\n reason = args[2],\n cancellation_policy = args[3],\n -- args[4] is client-provided now_ms; intentionally ignored.\n }\n\n local now_ms = server_time_ms()\n\n -- 1. Validate flow exists\n local raw = redis.call(\"HGETALL\", K.flow_core)\n if #raw == 0 then return err(\"flow_not_found\") end\n local flow = hgetall_to_table(raw)\n\n -- 2. Check not already terminal\n local pfs = flow.public_flow_state or \"\"\n if pfs == \"cancelled\" or pfs == \"completed\" or pfs == \"failed\" then\n return err(\"flow_already_terminal\")\n end\n\n -- 3. Get all member execution IDs\n local members = redis.call(\"SMEMBERS\", K.members_set)\n\n -- 4. Update flow state\n -- cancellation_policy is persisted so an AlreadyTerminal retry can\n -- return the authoritative stored policy instead of echoing the\n -- caller\'s retry intent.\n --\n -- NOTE: this field is persisted from this library version onward.\n -- Flows cancelled before this deploy reach public_flow_state=\'cancelled\'\n -- without a cancellation_policy value. The Rust caller detects the\n -- empty field on HMGET and falls back to args.cancellation_policy, so\n -- no backfill migration is needed.\n redis.call(\"HSET\", K.flow_core,\n \"public_flow_state\", \"cancelled\",\n \"cancelled_at\", now_ms,\n \"cancel_reason\", A.reason,\n \"cancellation_policy\", A.cancellation_policy,\n \"last_mutation_at\", now_ms)\n\n -- Do NOT SREM flow_index here. Member cancellations dispatch\n -- asynchronously from ff-server; flow_projector needs to keep\n -- projecting the flow while those cancels land so the summary\n -- reflects the real progression (running/blocked \u{2192} cancelled). The\n -- projector owns the SREM once it observes sampled==true_total\n -- all-terminal (see crates/ff-engine/src/scanner/flow_projector.rs).\n -- A projector-owned SREM is also the right place because it is\n -- the only writer that can prove every member has actually reached\n -- terminal state. Removing the entry here would freeze the summary\n -- at whatever snapshot was current when cancel_flow fired.\n\n -- 5. Return: ok(cancellation_policy, member1, member2, ...)\n -- Build array manually to include variable member list.\n local result = {1, \"OK\", A.cancellation_policy}\n for _, eid in ipairs(members) do\n result[#result + 1] = eid\n end\n\n return result\nend)\n\n---------------------------------------------------------------------------\n-- #29 ff_stage_dependency_edge (on {fp:N})\n--\n-- Validate membership + topology, check graph_revision, create edge,\n-- increment graph_revision.\n--\n-- KEYS (6): flow_core, members_set, edge_hash, out_adj_set, in_adj_set,\n-- grant_hash\n-- ARGV (8): flow_id, edge_id, upstream_eid, downstream_eid,\n-- dependency_kind, data_passing_ref, expected_graph_revision,\n-- now_ms\n---------------------------------------------------------------------------\nredis.register_function(\'ff_stage_dependency_edge\', function(keys, args)\n local K = {\n flow_core = keys[1],\n members_set = keys[2],\n edge_hash = keys[3],\n out_adj_set = keys[4],\n in_adj_set = keys[5],\n grant_hash = keys[6],\n }\n\n local A = {\n flow_id = args[1],\n edge_id = args[2],\n upstream_eid = args[3],\n downstream_eid = args[4],\n dependency_kind = args[5] or \"success_only\",\n data_passing_ref = args[6] or \"\",\n expected_graph_revision = args[7],\n now_ms = args[8],\n }\n\n -- 1. Reject self-referencing edges\n if A.upstream_eid == A.downstream_eid then\n return err(\"self_referencing_edge\")\n end\n\n -- 2. Read flow core\n local raw = redis.call(\"HGETALL\", K.flow_core)\n if #raw == 0 then return err(\"flow_not_found\") end\n local flow = hgetall_to_table(raw)\n\n -- 2b. Reject mutations on terminal flows\n local pfs = flow.public_flow_state or \"\"\n if pfs == \"cancelled\" or pfs == \"completed\" or pfs == \"failed\" then\n return err(\"flow_already_terminal\")\n end\n\n -- 3. Check graph_revision\n if tostring(flow.graph_revision or \"0\") ~= A.expected_graph_revision then\n return err(\"stale_graph_revision\")\n end\n\n -- 4. Verify both executions are members\n if redis.call(\"SISMEMBER\", K.members_set, A.upstream_eid) == 0 then\n return err(\"execution_not_in_flow\")\n end\n if redis.call(\"SISMEMBER\", K.members_set, A.downstream_eid) == 0 then\n return err(\"execution_not_in_flow\")\n end\n\n -- 4b. Transitive cycle detection: walk from downstream through outgoing\n -- edges to check if upstream is reachable (A\u{2192}B\u{2192}C\u{2192}A deadlock prevention).\n local flow_prefix = string.sub(K.flow_core, 1, -6) -- strip \":core\"\n if detect_cycle(flow_prefix, A.downstream_eid, A.upstream_eid) then\n return err(\"cycle_detected\")\n end\n\n -- 5. Check edge doesn\'t already exist\n if redis.call(\"EXISTS\", K.edge_hash) == 1 then\n return err(\"dependency_already_exists\")\n end\n\n -- 6. Create edge record\n redis.call(\"HSET\", K.edge_hash,\n \"edge_id\", A.edge_id,\n \"flow_id\", A.flow_id,\n \"upstream_execution_id\", A.upstream_eid,\n \"downstream_execution_id\", A.downstream_eid,\n \"dependency_kind\", A.dependency_kind,\n \"satisfaction_condition\", \"all_required\",\n \"data_passing_ref\", A.data_passing_ref,\n \"edge_state\", \"pending\",\n \"created_at\", A.now_ms,\n \"created_by\", \"engine\")\n\n -- 7. Update adjacency sets\n redis.call(\"SADD\", K.out_adj_set, A.edge_id)\n redis.call(\"SADD\", K.in_adj_set, A.edge_id)\n\n -- 8. Increment graph_revision and edge_count\n local new_rev = redis.call(\"HINCRBY\", K.flow_core, \"graph_revision\", 1)\n redis.call(\"HINCRBY\", K.flow_core, \"edge_count\", 1)\n redis.call(\"HSET\", K.flow_core, \"last_mutation_at\", A.now_ms)\n\n return ok(A.edge_id, tostring(new_rev))\nend)\n\n---------------------------------------------------------------------------\n-- #22 ff_apply_dependency_to_child (on {p:N})\n--\n-- Create dep record on child execution partition, increment unsatisfied\n-- count. If child is runnable: set blocked_by_dependencies.\n--\n-- KEYS (7): exec_core, deps_meta, unresolved_set, dep_hash,\n-- eligible_zset, blocked_deps_zset, deps_all_edges\n-- ARGV (7): flow_id, edge_id, upstream_eid, graph_revision,\n-- dependency_kind, data_passing_ref, now_ms\n---------------------------------------------------------------------------\nredis.register_function(\'ff_apply_dependency_to_child\', function(keys, args)\n local K = {\n core_key = keys[1],\n deps_meta = keys[2],\n unresolved_set = keys[3],\n dep_hash = keys[4],\n eligible_zset = keys[5],\n blocked_deps_zset = keys[6],\n deps_all_edges = keys[7],\n }\n\n local A = {\n flow_id = args[1],\n edge_id = args[2],\n upstream_eid = args[3],\n graph_revision = args[4],\n dependency_kind = args[5] or \"success_only\",\n data_passing_ref = args[6] or \"\",\n now_ms = args[7],\n }\n\n -- 1. Read execution core\n local raw = redis.call(\"HGETALL\", K.core_key)\n if #raw == 0 then return err(\"execution_not_found\") end\n local core = hgetall_to_table(raw)\n\n -- 2. Validate flow membership (RFC-007 assert_flow_membership)\n if is_set(core.flow_id) and core.flow_id ~= A.flow_id then\n return err(\"execution_already_in_flow\")\n end\n\n -- 3. Idempotency: dep already applied\n if redis.call(\"EXISTS\", K.dep_hash) == 1 then\n return ok(\"already_applied\")\n end\n\n -- 4. Create dep record\n redis.call(\"HSET\", K.dep_hash,\n \"edge_id\", A.edge_id,\n \"flow_id\", A.flow_id,\n \"upstream_execution_id\", A.upstream_eid,\n \"downstream_execution_id\", core.execution_id or \"\",\n \"dependency_kind\", A.dependency_kind,\n \"state\", \"unsatisfied\",\n \"data_passing_ref\", A.data_passing_ref,\n \"last_resolved_at\", \"\")\n\n -- 5. Update deps:meta\n redis.call(\"SADD\", K.unresolved_set, A.edge_id)\n -- Register edge in the per-execution all-edges index (cluster-safe\n -- retention discovery; retained across resolve, purged wholesale on\n -- retention trim).\n redis.call(\"SADD\", K.deps_all_edges, A.edge_id)\n local unresolved = redis.call(\"HINCRBY\", K.deps_meta, \"unsatisfied_required_count\", 1)\n redis.call(\"HSET\", K.deps_meta,\n \"flow_id\", A.flow_id,\n \"last_flow_graph_revision\", A.graph_revision,\n \"last_dependency_update_at\", A.now_ms)\n\n -- 6. If runnable: block by dependencies (ALL 7 dims)\n if core.lifecycle_phase == \"runnable\" and core.terminal_outcome == \"none\" then\n redis.call(\"HSET\", K.core_key,\n \"lifecycle_phase\", core.lifecycle_phase, -- preserve\n \"ownership_state\", core.ownership_state or \"unowned\", -- preserve\n \"eligibility_state\", \"blocked_by_dependencies\",\n \"blocking_reason\", \"waiting_for_children\",\n \"blocking_detail\", unresolved .. \" dep(s) unresolved incl \" .. A.edge_id,\n \"terminal_outcome\", \"none\", -- preserve\n \"attempt_state\", core.attempt_state or \"pending_first_attempt\", -- preserve\n \"public_state\", \"waiting_children\",\n \"last_transition_at\", A.now_ms,\n \"last_mutation_at\", A.now_ms)\n redis.call(\"ZREM\", K.eligible_zset, core.execution_id or \"\")\n redis.call(\"ZADD\", K.blocked_deps_zset,\n tonumber(core.created_at or \"0\"), core.execution_id or \"\")\n end\n\n return ok(tostring(unresolved))\nend)\n\n---------------------------------------------------------------------------\n-- #23 ff_resolve_dependency (on {p:N})\n--\n-- Resolve one dependency edge: satisfied (upstream success) or impossible\n-- (upstream failed/cancelled/expired). Updates child eligibility.\n--\n-- On satisfaction, if the edge was staged with a non-empty\n-- `data_passing_ref`, atomically COPYs the upstream\'s result key into\n-- the downstream\'s input_payload key before flipping the child to\n-- eligible. Upstream + downstream are guaranteed co-located on the\n-- same {fp:N} slot by flow membership (RFC-011 \u{a7}7.3).\n--\n-- KEYS (11): exec_core, deps_meta, unresolved_set, dep_hash,\n-- eligible_zset, terminal_zset, blocked_deps_zset,\n-- attempt_hash, stream_meta, downstream_payload,\n-- upstream_result\n-- ARGV (3): edge_id, upstream_outcome, now_ms\n---------------------------------------------------------------------------\nredis.register_function(\'ff_resolve_dependency\', function(keys, args)\n local K = {\n core_key = keys[1],\n deps_meta = keys[2],\n unresolved_set = keys[3],\n dep_hash = keys[4],\n eligible_zset = keys[5],\n terminal_zset = keys[6],\n blocked_deps_zset = keys[7],\n attempt_hash = keys[8],\n stream_meta = keys[9],\n downstream_payload = keys[10],\n upstream_result = keys[11],\n }\n\n local A = {\n edge_id = args[1],\n upstream_outcome = args[2],\n now_ms = args[3],\n }\n\n -- 1. Read dep record\n local dep_raw = redis.call(\"HGETALL\", K.dep_hash)\n if #dep_raw == 0 then return err(\"invalid_dependency\") end\n local dep = hgetall_to_table(dep_raw)\n\n -- 2. Already resolved?\n if dep.state == \"satisfied\" or dep.state == \"impossible\" then\n return ok(\"already_resolved\")\n end\n\n -- 3. Satisfaction path (upstream completed successfully)\n if A.upstream_outcome == \"success\" then\n redis.call(\"HSET\", K.dep_hash,\n \"state\", \"satisfied\", \"last_resolved_at\", A.now_ms)\n redis.call(\"SREM\", K.unresolved_set, A.edge_id)\n local remaining = redis.call(\"HINCRBY\", K.deps_meta,\n \"unsatisfied_required_count\", -1)\n redis.call(\"HSET\", K.deps_meta, \"last_dependency_update_at\", A.now_ms)\n\n -- Check if all deps now satisfied\n local raw = redis.call(\"HGETALL\", K.core_key)\n if #raw == 0 then return ok(\"satisfied\", \"\") end\n local core = hgetall_to_table(raw)\n\n -- Server-side data_passing_ref resolution (Batch C item 3). When\n -- the edge was staged with a non-empty `data_passing_ref`, replace\n -- the downstream\'s input_payload with the upstream\'s result. COPY\n -- is a single-slot server-internal op (no round-trip to Lua\n -- memory) so large result payloads don\'t inflate the FCALL\'s\n -- working set.\n --\n -- Terminal-child guard: a late satisfaction can race with the\n -- child being cancelled or skipped. Don\'t overwrite the payload\n -- of a child that has already reached a terminal state \u{2014} it\'s at\n -- best pointless (the worker will never read it) and at worst\n -- noisy for post-mortem debugging.\n --\n -- Write-ordering note (RFC-010 \u{a7}4.8b): COPY runs BEFORE the\n -- eligibility transition below so a crash between the two leaves\n -- the child blocked (or late-satisfied on reconciler retry) with\n -- the correct payload rather than eligible with a stale one.\n --\n -- Void-completion path: if the upstream called complete(None), the\n -- result key does not exist \u{2014} COPY returns 0 and data_injected\n -- stays empty, leaving the child\'s original input_payload intact.\n local data_injected = \"\"\n if is_set(dep.data_passing_ref)\n and core.terminal_outcome == \"none\" then\n local copied = redis.call(\n \"COPY\", K.upstream_result, K.downstream_payload, \"REPLACE\")\n if copied == 1 then\n data_injected = \"data_injected\"\n end\n end\n\n if remaining == 0\n and core.lifecycle_phase == \"runnable\"\n and core.ownership_state == \"unowned\"\n and core.terminal_outcome == \"none\"\n and core.eligibility_state == \"blocked_by_dependencies\" then\n -- Preserve attempt_state\n local new_attempt_state = core.attempt_state\n if not is_set(new_attempt_state) or new_attempt_state == \"none\" then\n new_attempt_state = \"pending_first_attempt\"\n end\n -- ALL 7 dims\n redis.call(\"HSET\", K.core_key,\n \"lifecycle_phase\", core.lifecycle_phase, -- preserve (runnable)\n \"ownership_state\", core.ownership_state or \"unowned\", -- preserve\n \"eligibility_state\", \"eligible_now\",\n \"blocking_reason\", \"waiting_for_worker\",\n \"blocking_detail\", \"\",\n \"terminal_outcome\", \"none\", -- preserve\n \"attempt_state\", new_attempt_state,\n \"public_state\", \"waiting\",\n \"last_transition_at\", A.now_ms,\n \"last_mutation_at\", A.now_ms)\n redis.call(\"ZREM\", K.blocked_deps_zset, core.execution_id or \"\")\n local priority = tonumber(core.priority or \"0\")\n local created_at_ms = tonumber(core.created_at or \"0\")\n local score = 0 - (priority * 1000000000000) + created_at_ms\n redis.call(\"ZADD\", K.eligible_zset, score, core.execution_id or \"\")\n end\n\n return ok(\"satisfied\", data_injected)\n end\n\n -- 4. Impossible path (upstream failed/cancelled/expired/skipped)\n redis.call(\"HSET\", K.dep_hash,\n \"state\", \"impossible\", \"last_resolved_at\", A.now_ms)\n redis.call(\"SREM\", K.unresolved_set, A.edge_id)\n redis.call(\"HINCRBY\", K.deps_meta, \"unsatisfied_required_count\", -1)\n redis.call(\"HINCRBY\", K.deps_meta, \"impossible_required_count\", 1)\n redis.call(\"HSET\", K.deps_meta, \"last_dependency_update_at\", A.now_ms)\n\n local raw = redis.call(\"HGETALL\", K.core_key)\n if #raw == 0 then return ok(\"impossible\", \"\") end\n local core = hgetall_to_table(raw)\n\n local child_skipped = false\n\n if core.terminal_outcome == \"none\" then\n -- Determine attempt_state for skip\n local skip_attempt_state = core.attempt_state or \"none\"\n if skip_attempt_state == \"running_attempt\"\n or skip_attempt_state == \"attempt_interrupted\" then\n -- NOTE: If the child is active (worker holding lease), this FCALL runs\n -- on {p:N} (exec partition) so we CAN write exec_core and attempt_hash.\n -- However, lease_current, lease_expiry_zset, worker_leases, and\n -- active_index also live on {p:N} \u{2014} but the KEYS array for this\n -- function does not include them (only 9 KEYS). Cleaning them here\n -- would require adding more KEYS slots and pre-reading worker_instance_id\n -- to construct the worker_leases key. Instead, lease cleanup is\n -- delegated to the lease_expiry scanner (1.5s default interval):\n -- 1. lease_expiry_scanner sees expired lease \u{2192} ff_mark_lease_expired_if_due\n -- 2. Worker\'s renewal sees terminal \u{2192} stops with terminal error\n -- 3. ff_expire_execution (attempt_timeout/deadline scanner) does full cleanup\n -- Race window: between this skip and scanner cleanup, exec_core is\n -- terminal(skipped) but stale entries remain in active/lease indexes.\n -- Bounded by lease_expiry_interval (default 1.5s). Index reconciler\n -- detects and logs any residual inconsistency at 45s intervals.\n skip_attempt_state = \"attempt_terminal\"\n -- End real attempt + close stream\n redis.call(\"HSET\", K.attempt_hash,\n \"attempt_state\", \"ended_cancelled\",\n \"ended_at\", A.now_ms,\n \"failure_reason\", \"dependency_impossible\")\n if redis.call(\"EXISTS\", K.stream_meta) == 1 then\n redis.call(\"HSET\", K.stream_meta,\n \"closed_at\", A.now_ms,\n \"closed_reason\", \"dependency_impossible\")\n end\n elseif is_set(skip_attempt_state) and skip_attempt_state ~= \"none\" then\n skip_attempt_state = \"none\"\n end\n\n redis.call(\"HSET\", K.core_key,\n \"lifecycle_phase\", \"terminal\",\n \"ownership_state\", \"unowned\",\n \"eligibility_state\", \"not_applicable\",\n \"blocking_reason\", \"none\",\n \"blocking_detail\", \"\",\n \"terminal_outcome\", \"skipped\",\n \"attempt_state\", skip_attempt_state,\n \"public_state\", \"skipped\",\n \"completed_at\", A.now_ms,\n \"last_transition_at\", A.now_ms,\n \"last_mutation_at\", A.now_ms)\n redis.call(\"ZREM\", K.blocked_deps_zset, core.execution_id or \"\")\n redis.call(\"ZADD\", K.terminal_zset, tonumber(A.now_ms), core.execution_id or \"\")\n child_skipped = true\n end\n\n return ok(\"impossible\", child_skipped and \"child_skipped\" or \"\")\nend)\n\n---------------------------------------------------------------------------\n-- #24 ff_evaluate_flow_eligibility (on {p:N})\n--\n-- Read-only check of execution + dependency state. Class C.\n--\n-- KEYS (2): exec_core, deps_meta\n-- ARGV (0)\n---------------------------------------------------------------------------\nredis.register_function(\'ff_evaluate_flow_eligibility\', function(keys, args)\n local raw = redis.call(\"HGETALL\", keys[1])\n if #raw == 0 then return ok(\"not_found\") end\n local core = hgetall_to_table(raw)\n\n if core.lifecycle_phase ~= \"runnable\" then\n return ok(\"not_runnable\")\n end\n if core.ownership_state ~= \"unowned\" then\n return ok(\"owned\")\n end\n if core.terminal_outcome ~= \"none\" then\n return ok(\"terminal\")\n end\n\n local deps_raw = redis.call(\"HGETALL\", keys[2])\n if #deps_raw == 0 then\n return ok(\"eligible\")\n end\n local deps = hgetall_to_table(deps_raw)\n\n local impossible = tonumber(deps.impossible_required_count or \"0\")\n if impossible > 0 then\n return ok(\"impossible\")\n end\n\n local unresolved = tonumber(deps.unsatisfied_required_count or \"0\")\n if unresolved > 0 then\n return ok(\"blocked_by_dependencies\")\n end\n\n return ok(\"eligible\")\nend)\n\n---------------------------------------------------------------------------\n-- #35 ff_promote_blocked_to_eligible (on {p:N})\n--\n-- Promote zero-dep flow member from blocked:dependencies to eligible.\n--\n-- KEYS (5): exec_core, blocked_deps_zset, eligible_zset, deps_meta,\n-- deps_unresolved\n-- ARGV (2): execution_id, now_ms\n---------------------------------------------------------------------------\nredis.register_function(\'ff_promote_blocked_to_eligible\', function(keys, args)\n local K = {\n core_key = keys[1],\n blocked_deps_zset = keys[2],\n eligible_zset = keys[3],\n deps_meta = keys[4],\n deps_unresolved = keys[5],\n }\n\n local A = {\n execution_id = args[1],\n now_ms = args[2],\n }\n\n -- 1. Read execution core\n local raw = redis.call(\"HGETALL\", K.core_key)\n if #raw == 0 then return err(\"execution_not_found\") end\n local core = hgetall_to_table(raw)\n\n if core.lifecycle_phase ~= \"runnable\" then\n return err(\"not_runnable\")\n end\n if core.eligibility_state ~= \"blocked_by_dependencies\" then\n return err(\"not_blocked_by_deps\")\n end\n if core.terminal_outcome ~= \"none\" then\n return err(\"terminal\")\n end\n\n -- 2. Verify zero deps\n local unsatisfied = tonumber(\n redis.call(\"HGET\", K.deps_meta, \"unsatisfied_required_count\") or \"0\")\n local unresolved_count = redis.call(\"SCARD\", K.deps_unresolved)\n if unsatisfied > 0 or unresolved_count > 0 then\n return err(\"deps_not_satisfied\", tostring(unsatisfied), tostring(unresolved_count))\n end\n\n -- 3. Preserve attempt_state\n local new_attempt_state = core.attempt_state\n if not is_set(new_attempt_state) or new_attempt_state == \"none\" then\n new_attempt_state = \"pending_first_attempt\"\n end\n\n -- 4. Transition (ALL 7 dims)\n redis.call(\"HSET\", K.core_key,\n \"lifecycle_phase\", core.lifecycle_phase, -- preserve (runnable)\n \"ownership_state\", core.ownership_state or \"unowned\", -- preserve\n \"eligibility_state\", \"eligible_now\",\n \"blocking_reason\", \"waiting_for_worker\",\n \"blocking_detail\", \"\",\n \"terminal_outcome\", \"none\", -- preserve\n \"attempt_state\", new_attempt_state,\n \"public_state\", \"waiting\",\n \"last_transition_at\", A.now_ms,\n \"last_mutation_at\", A.now_ms)\n\n redis.call(\"ZREM\", K.blocked_deps_zset, A.execution_id)\n local priority = tonumber(core.priority or \"0\")\n local created_at_ms = tonumber(core.created_at or \"0\")\n local score = 0 - (priority * 1000000000000) + created_at_ms\n redis.call(\"ZADD\", K.eligible_zset, score, A.execution_id)\n\n return ok()\nend)\n\n---------------------------------------------------------------------------\n-- #12b ff_replay_execution (on {p:N})\n--\n-- Reset a terminal execution for replay. If skipped flow member: reset\n-- impossible deps back to unsatisfied, recompute counts, set\n-- blocked_by_dependencies instead of eligible_now.\n--\n-- KEYS (4+N): exec_core, terminal_zset, eligible_zset, lease_history,\n-- [blocked_deps_zset, deps_meta, deps_unresolved, dep_edge_0..N]\n-- ARGV (2+N): execution_id, now_ms, [edge_id_0..N]\n---------------------------------------------------------------------------\nredis.register_function(\'ff_replay_execution\', function(keys, args)\n local K = {\n core_key = keys[1],\n terminal_zset = keys[2],\n eligible_zset = keys[3],\n lease_history = keys[4],\n }\n\n local A = {\n execution_id = args[1],\n }\n\n local t = redis.call(\"TIME\")\n local now_ms = tonumber(t[1]) * 1000 + math.floor(tonumber(t[2]) / 1000)\n\n -- 1. Read execution core\n local raw = redis.call(\"HGETALL\", K.core_key)\n if #raw == 0 then return err(\"execution_not_found\") end\n local core = hgetall_to_table(raw)\n\n -- 2. Must be terminal\n if core.lifecycle_phase ~= \"terminal\" then\n return err(\"execution_not_terminal\")\n end\n\n -- 3. Check replay limit (read from policy, same pattern as ff_reclaim_execution)\n local replay_count = tonumber(core.replay_count or \"0\")\n local max_replays = 10 -- default\n local policy_key = string.gsub(K.core_key, \":core$\", \":policy\")\n local policy_raw = redis.call(\"GET\", policy_key)\n if policy_raw then\n local ok_p, pol = pcall(cjson.decode, policy_raw)\n if ok_p and type(pol) == \"table\" then\n max_replays = tonumber(pol.max_replay_count) or 10\n end\n end\n if replay_count >= max_replays then\n return err(\"max_replays_exhausted\")\n end\n\n -- 4. Determine replay path\n local is_skipped_flow_member = (core.terminal_outcome == \"skipped\") and is_set(core.flow_id)\n\n if is_skipped_flow_member then\n -- SKIPPED FLOW MEMBER PATH: reset impossible deps \u{2192} blocked on deps\n local blocked_deps_zset = keys[5]\n local deps_meta = keys[6]\n local deps_unresolved = keys[7]\n\n -- Reset impossible dep edges back to unsatisfied\n local num_edges = #args - 2\n local new_unsatisfied = 0\n for i = 1, num_edges do\n local edge_id = args[2 + i]\n local dep_key = keys[7 + i] -- dep_edge keys start at KEYS[8]\n\n local dep_state = redis.call(\"HGET\", dep_key, \"state\")\n if dep_state == \"impossible\" then\n redis.call(\"HSET\", dep_key,\n \"state\", \"unsatisfied\",\n \"last_resolved_at\", \"\")\n redis.call(\"SADD\", deps_unresolved, edge_id)\n new_unsatisfied = new_unsatisfied + 1\n elseif dep_state == \"unsatisfied\" then\n new_unsatisfied = new_unsatisfied + 1\n end\n -- satisfied edges remain satisfied (upstream already succeeded)\n end\n\n -- Recompute deps:meta counts\n redis.call(\"HSET\", deps_meta,\n \"unsatisfied_required_count\", tostring(new_unsatisfied),\n \"impossible_required_count\", \"0\",\n \"last_dependency_update_at\", tostring(now_ms))\n\n -- Transition: terminal \u{2192} runnable/blocked_by_dependencies\n redis.call(\"HSET\", K.core_key,\n \"lifecycle_phase\", \"runnable\",\n \"ownership_state\", \"unowned\",\n \"eligibility_state\", \"blocked_by_dependencies\",\n \"blocking_reason\", \"waiting_for_children\",\n \"blocking_detail\", tostring(new_unsatisfied) .. \" dep(s) unsatisfied after replay\",\n \"terminal_outcome\", \"none\",\n \"attempt_state\", \"pending_replay_attempt\",\n \"public_state\", \"waiting_children\",\n \"pending_replay_attempt\", \"1\",\n \"replay_count\", tostring(replay_count + 1),\n \"completed_at\", \"\",\n \"last_transition_at\", tostring(now_ms),\n \"last_mutation_at\", tostring(now_ms))\n\n -- Move from terminal \u{2192} blocked:deps\n redis.call(\"ZREM\", K.terminal_zset, A.execution_id)\n redis.call(\"ZADD\", blocked_deps_zset,\n tonumber(core.created_at or \"0\"), A.execution_id)\n\n -- Lease history\n redis.call(\"XADD\", K.lease_history, \"MAXLEN\", \"~\", 1000, \"*\",\n \"event\", \"replay_initiated\",\n \"replay_count\", tostring(replay_count + 1),\n \"replay_type\", \"skipped_flow_member\",\n \"ts\", tostring(now_ms))\n\n return ok(tostring(new_unsatisfied))\n else\n -- NORMAL REPLAY PATH: terminal \u{2192} runnable/eligible\n local priority = tonumber(core.priority or \"0\")\n local created_at = tonumber(core.created_at or \"0\")\n local score = 0 - (priority * 1000000000000) + created_at\n\n redis.call(\"HSET\", K.core_key,\n \"lifecycle_phase\", \"runnable\",\n \"ownership_state\", \"unowned\",\n \"eligibility_state\", \"eligible_now\",\n \"blocking_reason\", \"waiting_for_worker\",\n \"blocking_detail\", \"\",\n \"terminal_outcome\", \"none\",\n \"attempt_state\", \"pending_replay_attempt\",\n \"public_state\", \"waiting\",\n \"pending_replay_attempt\", \"1\",\n \"replay_count\", tostring(replay_count + 1),\n \"completed_at\", \"\",\n \"last_transition_at\", tostring(now_ms),\n \"last_mutation_at\", tostring(now_ms))\n\n -- Move from terminal \u{2192} eligible\n redis.call(\"ZREM\", K.terminal_zset, A.execution_id)\n redis.call(\"ZADD\", K.eligible_zset, score, A.execution_id)\n\n -- Lease history\n redis.call(\"XADD\", K.lease_history, \"MAXLEN\", \"~\", 1000, \"*\",\n \"event\", \"replay_initiated\",\n \"replay_count\", tostring(replay_count + 1),\n \"replay_type\", \"normal\",\n \"ts\", tostring(now_ms))\n\n return ok(\"0\")\n end\nend)\n\n";Expand description
The compiled FlowFabric Lua library source.
Generated from lua/*.lua by scripts/gen-ff-script-lua.sh and checked
into the crate as flowfabric.lua so it ships inside the published
tarball. CI (matrix.yml) fails if this file drifts from what the
script would produce.