#!lua name=flowfabric
local CAPS_MAX_BYTES = 4096
local CAPS_MAX_TOKENS = 256
local function hex_to_bytes(hex)
if type(hex) ~= "string" or #hex % 2 ~= 0 then
return nil
end
local out = {}
for i = 1, #hex - 1, 2 do
local byte = tonumber(hex:sub(i, i + 1), 16)
if not byte then
return nil
end
out[#out + 1] = string.char(byte)
end
return table.concat(out)
end
local function xor_bytes(a, b)
local out = {}
for i = 1, #a do
out[i] = string.char(bit.bxor(a:byte(i), b:byte(i)))
end
return table.concat(out)
end
local function hmac_sha1_hex(key_hex, message)
local key = hex_to_bytes(key_hex)
if not key then
return nil
end
local block_size = 64
if #key > block_size then
key = hex_to_bytes(redis.sha1hex(key))
end
if #key < block_size then
key = key .. string.rep("\0", block_size - #key)
end
local ipad = string.rep(string.char(0x36), block_size)
local opad = string.rep(string.char(0x5c), block_size)
local inner = redis.sha1hex(xor_bytes(key, ipad) .. message)
return redis.sha1hex(xor_bytes(key, opad) .. hex_to_bytes(inner))
end
local function constant_time_eq(a, b)
if type(a) ~= "string" or type(b) ~= "string" then
return false
end
if #a ~= #b then
return false
end
local acc = 0
for i = 1, #a do
acc = bit.bor(acc, bit.bxor(a:byte(i), b:byte(i)))
end
return acc == 0
end
local function load_waitpoint_secrets(secrets_key)
local raw = redis.call("HGETALL", secrets_key)
if #raw == 0 then
return nil
end
local t = {}
for i = 1, #raw, 2 do
t[raw[i]] = raw[i + 1]
end
local out = {
current_kid = t.current_kid,
previous_kid = t.previous_kid,
previous_expires_at = t.previous_expires_at,
kid_secrets = {},
}
if out.current_kid then
out.current_secret = t["secret:" .. out.current_kid]
end
if out.previous_kid then
out.previous_secret = t["secret:" .. out.previous_kid]
end
for k, v in pairs(t) do
if k:sub(1, 7) == "secret:" then
local kid = k:sub(8)
if kid ~= "" then
out.kid_secrets[kid] = {
secret = v,
expires_at = t["expires_at:" .. kid],
}
end
end
end
return out
end
local function waitpoint_hmac_input(waitpoint_id, waitpoint_key, created_at_ms)
return waitpoint_id .. "|" .. waitpoint_key .. "|" .. tostring(created_at_ms)
end
local function mint_waitpoint_token(secrets_key, waitpoint_id, waitpoint_key, created_at_ms)
if type(secrets_key) ~= "string" or secrets_key == "" then
return nil, "invalid_keys_missing_hmac"
end
local secrets = load_waitpoint_secrets(secrets_key)
if not secrets or not secrets.current_kid or not secrets.current_secret then
return nil, "hmac_secret_not_initialized"
end
local input = waitpoint_hmac_input(waitpoint_id, waitpoint_key, created_at_ms)
local digest = hmac_sha1_hex(secrets.current_secret, input)
if not digest then
return nil, "invalid_secret"
end
return secrets.current_kid .. ":" .. digest, secrets.current_kid
end
local function validate_waitpoint_token(
secrets_key, token, waitpoint_id, waitpoint_key, created_at_ms, now_ms
)
if type(secrets_key) ~= "string" or secrets_key == "" then
return "invalid_keys_missing_hmac"
end
if type(token) ~= "string" or token == "" then
return "missing_token"
end
local sep = token:find(":", 1, true)
if not sep or sep < 2 or sep >= #token then
return "invalid_token"
end
local kid = token:sub(1, sep - 1)
local presented = token:sub(sep + 1)
if #presented ~= 40 then
return "invalid_token"
end
local secrets = load_waitpoint_secrets(secrets_key)
if not secrets or not secrets.current_kid then
return "hmac_secret_not_initialized"
end
local secret = nil
local expiry_state = nil if kid == secrets.current_kid then
secret = secrets.current_secret
else
local entry = secrets.kid_secrets and secrets.kid_secrets[kid]
if entry then
local exp = tonumber(entry.expires_at)
if not exp or exp <= 0 or exp < now_ms then
expiry_state = "known_kid_expired"
else
secret = entry.secret
end
else
expiry_state = "unknown_kid"
end
end
if not secret then
if expiry_state == "known_kid_expired" then
return "token_expired"
end
return "invalid_token"
end
local input = waitpoint_hmac_input(waitpoint_id, waitpoint_key, created_at_ms)
local expected = hmac_sha1_hex(secret, input)
if not expected then
return "invalid_secret"
end
if not constant_time_eq(expected, presented) then
return "invalid_token"
end
return nil
end
local function server_time_ms()
local t = redis.call("TIME")
return tonumber(t[1]) * 1000 + math.floor(tonumber(t[2]) / 1000)
end
local function ok(...)
return {1, "OK", ...}
end
local function err(...)
return {0, ...}
end
local function require_number(val, name)
local n = tonumber(val)
if n == nil then
return err("invalid_input", name .. " must be a number, got: " .. tostring(val))
end
return n
end
local function ok_already_satisfied(...)
return {1, "ALREADY_SATISFIED", ...}
end
local function ok_duplicate(...)
return {1, "DUPLICATE", ...}
end
local function hgetall_to_table(flat)
local t = {}
for i = 1, #flat, 2 do
t[flat[i]] = flat[i + 1]
end
return t
end
local function is_set(v)
return v ~= nil and v ~= false and v ~= ""
end
local function validate_lease(core, argv, now_ms)
if core.lifecycle_phase ~= "active" then
return err("execution_not_active",
core.terminal_outcome or "",
core.current_lease_epoch or "",
core.lifecycle_phase or "",
core.current_attempt_id or "")
end
if core.ownership_state == "lease_revoked" then
return err("lease_revoked")
end
if tonumber(core.lease_expires_at or "0") <= now_ms then
return err("lease_expired")
end
if core.current_lease_id ~= argv.lease_id then
return err("stale_lease")
end
if core.current_lease_epoch ~= argv.lease_epoch then
return err("stale_lease")
end
if core.current_attempt_id ~= argv.attempt_id then
return err("stale_lease")
end
return nil
end
local function mark_expired(keys, core, now_ms, maxlen)
if core.ownership_state == "lease_expired_reclaimable" then
return end
redis.call("HSET", keys.core_key,
"lifecycle_phase", core.lifecycle_phase or "active", "ownership_state", "lease_expired_reclaimable",
"eligibility_state", core.eligibility_state or "not_applicable", "blocking_reason", "waiting_for_worker",
"blocking_detail", "lease expired, awaiting reclaim",
"terminal_outcome", core.terminal_outcome or "none", "attempt_state", "attempt_interrupted",
"public_state", "active",
"lease_expired_at", now_ms,
"last_mutation_at", now_ms)
redis.call("XADD", keys.lease_history_key, "MAXLEN", "~", maxlen, "*",
"event", "expired",
"lease_id", core.current_lease_id or "",
"lease_epoch", core.current_lease_epoch or "",
"attempt_index", core.current_attempt_index or "",
"attempt_id", core.current_attempt_id or "",
"worker_id", core.current_worker_id or "",
"worker_instance_id", core.current_worker_instance_id or "",
"ts", now_ms)
local att_idx = core.current_attempt_index
if att_idx ~= nil and att_idx ~= "" then
local tag_open = string.find(keys.core_key, "{", 1, true)
local tag_close = tag_open and string.find(keys.core_key, "}", tag_open, true)
if tag_open and tag_close then
local tag = string.sub(keys.core_key, tag_open, tag_close)
local after_tag = string.sub(keys.core_key, tag_close + 2)
local eid_end = string.find(after_tag, ":core", 1, true)
if eid_end then
local eid = string.sub(after_tag, 1, eid_end - 1)
local stream_meta_key = "ff:stream:" .. tag .. ":" .. eid
.. ":" .. tostring(att_idx) .. ":meta"
if redis.call("EXISTS", stream_meta_key) == 1 then
local existing_closed_at = redis.call("HGET", stream_meta_key, "closed_at")
if not is_set(existing_closed_at) then
redis.call("HSET", stream_meta_key,
"closed_at", tostring(now_ms),
"closed_reason", "lease_expired")
end
end
end
end
end
end
local function validate_lease_and_mark_expired(core, argv, now_ms, keys, maxlen)
if core.lifecycle_phase ~= "active" then
return err("execution_not_active",
core.terminal_outcome or "",
core.current_lease_epoch or "",
core.lifecycle_phase or "",
core.current_attempt_id or "")
end
if core.ownership_state == "lease_revoked" then
return err("lease_revoked")
end
if tonumber(core.lease_expires_at or "0") <= now_ms then
mark_expired(keys, core, now_ms, maxlen)
return err("lease_expired")
end
if core.current_lease_id ~= argv.lease_id then
return err("stale_lease")
end
if core.current_lease_epoch ~= argv.lease_epoch then
return err("stale_lease")
end
if core.current_attempt_id ~= argv.attempt_id then
return err("stale_lease")
end
return nil
end
local function resolve_lease_fence(core, argv)
local has_id = is_set(argv.lease_id)
local has_ep = is_set(argv.lease_epoch)
local has_at = is_set(argv.attempt_id)
if has_id or has_ep or has_at then
if not (has_id and has_ep and has_at) then
return nil, err("partial_fence_triple")
end
return {
lease_id = argv.lease_id,
lease_epoch = argv.lease_epoch,
attempt_id = argv.attempt_id,
}, true
end
return {
lease_id = core.current_lease_id or "",
lease_epoch = core.current_lease_epoch or "",
attempt_id = core.current_attempt_id or "",
}, false
end
local function clear_lease_and_indexes(keys, core, reason, now_ms, maxlen)
local eid = core.execution_id or ""
redis.call("DEL", keys.lease_current_key)
redis.call("ZREM", keys.lease_expiry_key, eid)
redis.call("SREM", keys.worker_leases_key, eid)
redis.call("ZREM", keys.active_index_key, eid)
redis.call("ZREM", keys.attempt_timeout_key, eid)
redis.call("HSET", keys.core_key,
"current_lease_id", "",
"current_worker_id", "",
"current_worker_instance_id", "",
"lease_expires_at", "",
"lease_last_renewed_at", "",
"lease_renewal_deadline", "",
"lease_expired_at", "",
"lease_revoked_at", "",
"lease_revoke_reason", "",
"last_mutation_at", now_ms)
redis.call("XADD", keys.lease_history_key, "MAXLEN", "~", maxlen, "*",
"event", "released",
"lease_id", core.current_lease_id or "",
"lease_epoch", core.current_lease_epoch or "",
"attempt_index", core.current_attempt_index or "",
"attempt_id", core.current_attempt_id or "",
"reason", reason,
"ts", now_ms)
end
local function defensive_zrem_all_indexes(keys, eid, except_key)
local zrem_keys = {
keys.eligible_key,
keys.delayed_key,
keys.active_index_key,
keys.suspended_key,
keys.terminal_key,
keys.blocked_deps_key,
keys.blocked_budget_key,
keys.blocked_quota_key,
keys.blocked_route_key,
keys.blocked_operator_key,
keys.lease_expiry_key,
keys.suspension_timeout_key,
keys.attempt_timeout_key,
keys.execution_deadline_key,
}
for _, k in ipairs(zrem_keys) do
if k and k ~= except_key then
redis.call("ZREM", k, eid)
end
end
if keys.worker_leases_key and keys.worker_leases_key ~= except_key then
redis.call("SREM", keys.worker_leases_key, eid)
end
end
local REASON_TO_BLOCKING = {
waiting_for_signal = "waiting_for_signal",
waiting_for_approval = "waiting_for_approval",
waiting_for_callback = "waiting_for_callback",
waiting_for_tool_result = "waiting_for_tool_result",
waiting_for_operator_review = "paused_by_operator",
paused_by_policy = "paused_by_policy",
paused_by_budget = "waiting_for_budget",
step_boundary = "waiting_for_signal",
manual_pause = "paused_by_operator",
}
local function map_reason_to_blocking(reason_code)
return REASON_TO_BLOCKING[reason_code] or "waiting_for_signal"
end
local function initialize_condition(json)
local spec = cjson.decode(json)
local matchers = {}
local names = spec.required_signal_names or {}
if #names == 0 then
matchers[1] = { name = "", satisfied = false, signal_id = "" }
else
for i, name in ipairs(names) do
matchers[i] = { name = name, satisfied = false, signal_id = "" }
end
end
return {
condition_type = spec.condition_type or "signal_set",
match_mode = spec.signal_match_mode or "any",
minimum_signal_count = tonumber(spec.minimum_signal_count or "1"),
total_matchers = #names > 0 and #names or 1,
satisfied_count = 0,
matchers = matchers,
closed = false,
}
end
local function write_condition_hash(key, cond, now_ms)
local fields = {
"condition_type", cond.condition_type,
"match_mode", cond.match_mode,
"minimum_signal_count", tostring(cond.minimum_signal_count),
"total_matchers", tostring(cond.total_matchers),
"satisfied_count", tostring(cond.satisfied_count),
"closed", cond.closed and "1" or "0",
"updated_at", tostring(now_ms),
}
for i = 1, cond.total_matchers do
local m = cond.matchers[i]
local idx = i - 1 fields[#fields + 1] = "matcher:" .. idx .. ":name"
fields[#fields + 1] = m.name
fields[#fields + 1] = "matcher:" .. idx .. ":satisfied"
fields[#fields + 1] = m.satisfied and "1" or "0"
fields[#fields + 1] = "matcher:" .. idx .. ":signal_id"
fields[#fields + 1] = m.signal_id
end
redis.call("HSET", key, unpack(fields))
end
local function evaluate_signal_against_condition(cond, signal_name, signal_id)
for i = 1, cond.total_matchers do
local m = cond.matchers[i]
if not m.satisfied then
if m.name == "" or m.name == signal_name then
m.satisfied = true
m.signal_id = signal_id or ""
cond.satisfied_count = cond.satisfied_count + 1
return true
end
end
end
return false
end
local function is_condition_satisfied(cond)
local mode = cond.match_mode
local min_count = cond.minimum_signal_count
if mode == "any" then
return cond.satisfied_count >= min_count
elseif mode == "all" then
return cond.satisfied_count >= cond.total_matchers
end
return cond.satisfied_count >= min_count
end
local function extract_field(fields, name)
for i = 1, #fields, 2 do
if fields[i] == name then
return fields[i + 1]
end
end
return nil
end
local function initial_signal_summary_json()
return '{"total_count":0,"matched_count":0,"signal_names":[]}'
end
local function validate_pending_waitpoint(wp_raw, eid, att_idx, now_ms)
if #wp_raw == 0 then
return err("waitpoint_not_found")
end
local wp = hgetall_to_table(wp_raw)
if wp.state ~= "pending" then
return err("waitpoint_not_pending")
end
if wp.execution_id ~= eid then
return err("invalid_waitpoint_for_execution")
end
if tostring(wp.attempt_index) ~= tostring(att_idx) then
return err("invalid_waitpoint_for_execution")
end
if is_set(wp.expires_at) and tonumber(wp.expires_at) <= now_ms then
return err("pending_waitpoint_expired")
end
return nil
end
local function assert_active_suspension(susp_raw)
if #susp_raw == 0 then
return err("execution_not_suspended")
end
local susp = hgetall_to_table(susp_raw)
if not is_set(susp.suspension_id) then
return err("execution_not_suspended")
end
if is_set(susp.closed_at) then
return err("execution_not_suspended")
end
return nil, susp
end
local function assert_waitpoint_belongs(wp_raw, eid, sid, wid)
if #wp_raw == 0 then
return err("waitpoint_not_found")
end
local wp = hgetall_to_table(wp_raw)
if wp.execution_id ~= eid then
return err("invalid_waitpoint_for_execution")
end
if is_set(sid) and wp.suspension_id ~= sid then
return err("invalid_waitpoint_for_execution")
end
if is_set(wid) and wp.waitpoint_id ~= wid then
return err("invalid_waitpoint_for_execution")
end
return nil
end
local function unpack_policy(json)
local policy = cjson.decode(json)
local flat = {}
for k, v in pairs(policy) do
flat[#flat + 1] = k
if type(v) == "table" then
flat[#flat + 1] = cjson.encode(v)
else
flat[#flat + 1] = tostring(v)
end
end
return flat
end
redis.register_function('ff_version', function(keys, args)
return '15'
end)
redis.register_function('ff_renew_lease', function(keys, args)
local K = {
core_key = keys[1],
lease_current = keys[2],
lease_history = keys[3],
lease_expiry_key = keys[4],
}
local lease_ttl_n = require_number(args[6], "lease_ttl_ms")
if type(lease_ttl_n) == "table" then return lease_ttl_n end
local grace_n = require_number(args[7], "lease_history_grace_ms")
if type(grace_n) == "table" then return grace_n end
local A = {
execution_id = args[1],
attempt_index = args[2],
attempt_id = args[3] or "",
lease_id = args[4] or "",
lease_epoch = args[5] or "",
lease_ttl_ms = lease_ttl_n,
lease_history_grace_ms = grace_n,
}
local t = redis.call("TIME")
local now_ms = tonumber(t[1]) * 1000 + math.floor(tonumber(t[2]) / 1000)
local raw = redis.call("HGETALL", K.core_key)
if #raw == 0 then
return err("execution_not_found")
end
local core = hgetall_to_table(raw)
local fence, must_check_or_err = resolve_lease_fence(core, A)
if not fence then return must_check_or_err end
if not must_check_or_err then return err("fence_required") end
if core.lifecycle_phase ~= "active" then
return err("execution_not_active",
core.terminal_outcome or "",
core.current_lease_epoch or "",
core.lifecycle_phase or "",
core.current_attempt_id or "")
end
if core.ownership_state == "lease_revoked" or is_set(core.lease_revoked_at) then
return err("lease_revoked")
end
if tonumber(core.lease_expires_at or "0") <= now_ms then
mark_expired(
{ core_key = K.core_key, lease_history_key = K.lease_history },
core, now_ms, 1000)
return err("lease_expired")
end
if tostring(core.current_attempt_index) ~= A.attempt_index then
return err("stale_lease")
end
if core.current_attempt_id ~= A.attempt_id then
return err("stale_lease")
end
if core.current_lease_id ~= A.lease_id then
return err("stale_lease")
end
if tostring(core.current_lease_epoch) ~= A.lease_epoch then
return err("stale_lease")
end
local new_expires_at = now_ms + A.lease_ttl_ms
local new_renewal_deadline = now_ms + math.floor(A.lease_ttl_ms * 2 / 3)
redis.call("HSET", K.core_key,
"lease_last_renewed_at", tostring(now_ms),
"lease_renewal_deadline", tostring(new_renewal_deadline),
"lease_expires_at", tostring(new_expires_at),
"last_mutation_at", tostring(now_ms))
redis.call("HSET", K.lease_current,
"last_renewed_at", tostring(now_ms),
"renewal_deadline", tostring(new_renewal_deadline),
"expires_at", tostring(new_expires_at))
redis.call("PEXPIREAT", K.lease_current,
new_expires_at + A.lease_history_grace_ms)
redis.call("ZADD", K.lease_expiry_key, new_expires_at, A.execution_id)
return ok(tostring(new_expires_at))
end)
redis.register_function('ff_mark_lease_expired_if_due', function(keys, args)
local K = {
core_key = keys[1],
lease_current = keys[2],
lease_expiry_key = keys[3],
lease_history = keys[4],
}
local execution_id = args[1]
local t = redis.call("TIME")
local now_ms = tonumber(t[1]) * 1000 + math.floor(tonumber(t[2]) / 1000)
local raw = redis.call("HGETALL", K.core_key)
if #raw == 0 then
redis.call("ZREM", K.lease_expiry_key, execution_id)
return ok_already_satisfied("execution_not_found")
end
local core = hgetall_to_table(raw)
if core.lifecycle_phase ~= "active" then
redis.call("ZREM", K.lease_expiry_key, execution_id)
return ok_already_satisfied("not_active")
end
if core.ownership_state == "lease_expired_reclaimable" then
return ok_already_satisfied("already_expired")
end
if core.ownership_state == "lease_revoked" then
return ok_already_satisfied("already_revoked")
end
if core.ownership_state == "unowned" then
redis.call("ZREM", K.lease_expiry_key, execution_id)
return ok_already_satisfied("unowned")
end
local expires_at = tonumber(core.lease_expires_at or "0")
if expires_at > now_ms then
return ok_already_satisfied("not_yet_expired")
end
mark_expired(
{ core_key = K.core_key, lease_history_key = K.lease_history },
core, now_ms, 1000)
return ok("marked_expired")
end)
redis.register_function('ff_revoke_lease', function(keys, args)
local K = {
core_key = keys[1],
lease_current = keys[2],
lease_history = keys[3],
lease_expiry_key = keys[4],
worker_leases = keys[5],
}
local A = {
execution_id = args[1],
expected_lease_id = args[2],
revoke_reason = args[3],
}
local t = redis.call("TIME")
local now_ms = tonumber(t[1]) * 1000 + math.floor(tonumber(t[2]) / 1000)
local raw = redis.call("HGETALL", K.core_key)
if #raw == 0 then
return err("execution_not_found")
end
local core = hgetall_to_table(raw)
if core.lifecycle_phase ~= "active" then
return err("execution_not_active",
core.terminal_outcome or "",
core.current_lease_epoch or "",
core.lifecycle_phase or "",
core.current_attempt_id or "")
end
if core.ownership_state ~= "leased" then
if core.ownership_state == "lease_revoked" then
return ok_already_satisfied("already_revoked")
end
if core.ownership_state == "lease_expired_reclaimable" then
return ok_already_satisfied("already_expired")
end
return err("no_active_lease")
end
if is_set(A.expected_lease_id) and core.current_lease_id ~= A.expected_lease_id then
return err("stale_lease",
"expected " .. A.expected_lease_id .. " but current is " .. (core.current_lease_id or ""))
end
local lease_id = core.current_lease_id or ""
local lease_epoch = core.current_lease_epoch or ""
local attempt_index = core.current_attempt_index or ""
local attempt_id = core.current_attempt_id or ""
local worker_id = core.current_worker_id or ""
local worker_instance_id = core.current_worker_instance_id or ""
redis.call("HSET", K.core_key,
"lifecycle_phase", "active",
"ownership_state", "lease_revoked",
"eligibility_state", core.eligibility_state or "not_applicable",
"blocking_reason", "waiting_for_worker",
"blocking_detail", "lease revoked: " .. (A.revoke_reason or "operator"),
"terminal_outcome", "none",
"attempt_state", "attempt_interrupted",
"public_state", "active",
"lease_revoked_at", tostring(now_ms),
"lease_revoke_reason", A.revoke_reason or "operator",
"last_mutation_at", tostring(now_ms))
redis.call("DEL", K.lease_current)
redis.call("ZREM", K.lease_expiry_key, A.execution_id)
redis.call("SREM", K.worker_leases, A.execution_id)
redis.call("XADD", K.lease_history, "MAXLEN", "~", 1000, "*",
"event", "revoked",
"lease_id", lease_id,
"lease_epoch", lease_epoch,
"attempt_index", attempt_index,
"attempt_id", attempt_id,
"worker_id", worker_id,
"worker_instance_id", worker_instance_id,
"reason", A.revoke_reason or "operator",
"ts", tostring(now_ms))
return ok("revoked", lease_id, lease_epoch)
end)
redis.register_function('ff_create_execution', function(keys, args)
local K = {
core_key = keys[1],
payload_key = keys[2],
policy_key = keys[3],
tags_key = keys[4],
scheduling_zset = keys[5], idem_key = keys[6],
deadline_zset = keys[7],
all_executions_set = keys[8],
}
local priority_n = require_number(args[5], "priority")
if type(priority_n) == "table" then return priority_n end
local A = {
execution_id = args[1],
namespace = args[2],
lane_id = args[3],
execution_kind = args[4],
priority = priority_n,
creator_identity = args[6],
policy_json = args[7],
input_payload = args[8],
delay_until = args[9], dedup_ttl_ms = args[10], tags_json = args[11], execution_deadline_at = args[12], partition_id = args[13],
}
local t = redis.call("TIME")
local now_ms = tonumber(t[1]) * 1000 + math.floor(tonumber(t[2]) / 1000)
if A.priority < 0 then A.priority = 0 end
if A.priority > 9000 then A.priority = 9000 end
if K.idem_key ~= "" and not string.find(K.idem_key, "ff:noop:") then
local existing = redis.call("GET", K.idem_key)
if existing then
return ok_duplicate(existing)
end
end
if redis.call("EXISTS", K.core_key) == 1 then
return {1, "DUPLICATE", A.execution_id}
end
local required_caps_csv = nil
if is_set(A.policy_json) and A.policy_json ~= "{}" then
local ok_decode, policy = pcall(cjson.decode, A.policy_json)
if not ok_decode then
return err("invalid_policy_json", "malformed")
end
if type(policy) == "table" and policy.routing_requirements ~= nil then
if type(policy.routing_requirements) ~= "table" then
return err("invalid_policy_json", "routing_requirements:not_object")
end
local caps = policy.routing_requirements.required_capabilities
if caps ~= nil then
if type(caps) ~= "table" then
return err("invalid_capabilities", "required:not_array")
end
local list = {}
for _, cap in ipairs(caps) do
if type(cap) ~= "string" then
return err("invalid_capabilities", "required:non_string_token")
end
if #cap == 0 then
return err("invalid_capabilities", "required:empty_token")
end
if string.find(cap, ",", 1, true) then
return err("invalid_capabilities", "required:comma_in_token")
end
for i = 1, #cap do
local b = cap:byte(i)
if b <= 0x20 or b == 0x7F then
return err("invalid_capabilities", "required:control_or_whitespace")
end
end
list[#list + 1] = cap
end
if #list > CAPS_MAX_TOKENS then
return err("invalid_capabilities", "required:too_many_tokens")
end
table.sort(list)
if #list > 0 then
local csv = table.concat(list, ",")
if #csv > CAPS_MAX_BYTES then
return err("invalid_capabilities", "required:too_many_bytes")
end
required_caps_csv = csv
end
end
end
end
local lifecycle_phase = "runnable"
local eligibility_state, blocking_reason, blocking_detail, public_state
local is_delayed = is_set(A.delay_until) and tonumber(A.delay_until) > now_ms
if is_delayed then
eligibility_state = "not_eligible_until_time"
blocking_reason = "waiting_for_delay"
blocking_detail = "delayed until " .. A.delay_until
public_state = "delayed"
else
eligibility_state = "eligible_now"
blocking_reason = "waiting_for_worker"
blocking_detail = ""
public_state = "waiting"
end
redis.call("HSET", K.core_key,
"execution_id", A.execution_id,
"namespace", A.namespace,
"lane_id", A.lane_id,
"execution_kind", A.execution_kind,
"partition_id", A.partition_id,
"priority", A.priority,
"creator_identity", A.creator_identity,
"lifecycle_phase", lifecycle_phase,
"ownership_state", "unowned",
"eligibility_state", eligibility_state,
"blocking_reason", blocking_reason,
"blocking_detail", blocking_detail,
"terminal_outcome", "none",
"attempt_state", "pending_first_attempt",
"public_state", public_state,
"total_attempt_count", "0",
"current_attempt_index", "",
"current_attempt_id", "",
"current_lease_id", "",
"current_lease_epoch", "0",
"current_worker_id", "",
"current_worker_instance_id", "",
"current_lane", A.lane_id,
"retry_count", "0",
"replay_count", "0",
"lease_reclaim_count", "0",
"created_at", now_ms,
"started_at", "",
"completed_at", "",
"last_transition_at", now_ms,
"last_mutation_at", now_ms,
"lease_acquired_at", "",
"lease_expires_at", "",
"lease_last_renewed_at", "",
"lease_renewal_deadline", "",
"lease_expired_at", "",
"lease_revoked_at", "",
"lease_revoke_reason", "",
"delay_until", is_delayed and A.delay_until or "",
"current_suspension_id", "",
"current_waitpoint_id", "",
"pending_retry_reason", "",
"pending_replay_reason", "",
"pending_replay_requested_by", "",
"pending_previous_attempt_index", "",
"progress_pct", "",
"progress_message", "",
"progress_updated_at", "",
"flow_id", "")
if is_set(A.input_payload) then
redis.call("SET", K.payload_key, A.input_payload)
end
if is_set(A.policy_json) then
redis.call("SET", K.policy_key, A.policy_json)
end
if required_caps_csv then
redis.call("HSET", K.core_key, "required_capabilities", required_caps_csv)
end
if is_set(A.tags_json) and A.tags_json ~= "{}" then
local ok_decode, tags = pcall(cjson.decode, A.tags_json)
if ok_decode and type(tags) == "table" then
local flat = {}
for k, v in pairs(tags) do
flat[#flat + 1] = k
flat[#flat + 1] = tostring(v)
end
if #flat > 0 then
redis.call("HSET", K.tags_key, unpack(flat))
end
end
end
if is_delayed then
redis.call("ZADD", K.scheduling_zset, tonumber(A.delay_until), A.execution_id)
else
local score = 0 - (A.priority * 1000000000000) + now_ms
redis.call("ZADD", K.scheduling_zset, score, A.execution_id)
end
redis.call("SADD", K.all_executions_set, A.execution_id)
if is_set(A.execution_deadline_at) then
redis.call("ZADD", K.deadline_zset, tonumber(A.execution_deadline_at), A.execution_id)
end
local dedup_ms = tonumber(A.dedup_ttl_ms) or 0
if dedup_ms > 0 and K.idem_key ~= "" and not string.find(K.idem_key, "ff:noop:") then
redis.call("SET", K.idem_key, A.execution_id,
"PX", dedup_ms)
end
return ok(A.execution_id, public_state)
end)
redis.register_function('ff_claim_execution', function(keys, args)
local K = {
core_key = keys[1],
claim_grant = keys[2],
eligible_zset = keys[3],
lease_expiry_key = keys[4],
worker_leases_key = keys[5],
attempt_hash = keys[6],
attempt_usage = keys[7],
attempt_policy = keys[8],
attempts_zset = keys[9],
lease_current_key = keys[10],
lease_history_key = keys[11],
active_index_key = keys[12],
attempt_timeout_key = keys[13],
execution_deadline_key = keys[14],
}
local lease_ttl_n = require_number(args[7], "lease_ttl_ms")
if type(lease_ttl_n) == "table" then return lease_ttl_n end
local renew_before_n = require_number(args[8], "renew_before_ms")
if type(renew_before_n) == "table" then return renew_before_n end
local A = {
execution_id = args[1],
worker_id = args[2],
worker_instance_id = args[3],
lane = args[4],
capability_hash = args[5],
lease_id = args[6],
lease_ttl_ms = lease_ttl_n,
renew_before_ms = renew_before_n,
attempt_id = args[9],
attempt_policy_json = args[10],
attempt_timeout_ms = args[11], execution_deadline_at = args[12], }
local t = redis.call("TIME")
local now_ms = tonumber(t[1]) * 1000 + math.floor(tonumber(t[2]) / 1000)
local raw = redis.call("HGETALL", K.core_key)
if #raw == 0 then return err("execution_not_found") end
local core = hgetall_to_table(raw)
if core.lifecycle_phase ~= "runnable" then return err("execution_not_leaseable") end
if core.ownership_state ~= "unowned" then return err("lease_conflict") end
if core.eligibility_state ~= "eligible_now" then return err("execution_not_leaseable") end
if core.terminal_outcome ~= "none" then return err("execution_not_leaseable") end
if core.attempt_state == "running_attempt" then
return err("active_attempt_exists")
end
if core.attempt_state == "attempt_interrupted" then
return err("use_claim_resumed_execution")
end
local grant_raw = redis.call("HGETALL", K.claim_grant)
if #grant_raw == 0 then return err("invalid_claim_grant") end
local grant = hgetall_to_table(grant_raw)
if grant.worker_id ~= A.worker_id then
return err("invalid_claim_grant")
end
if is_set(grant.grant_expires_at) and tonumber(grant.grant_expires_at) < now_ms then
return err("claim_grant_expired")
end
redis.call("DEL", K.claim_grant)
local next_epoch = tonumber(core.current_lease_epoch or "0") + 1
local expires_at = now_ms + A.lease_ttl_ms
local renewal_deadline = now_ms + A.renew_before_ms
local next_att_idx = tonumber(core.total_attempt_count or "0")
local attempt_type = "initial"
local lineage_fields = {}
if core.attempt_state == "pending_retry_attempt" then
attempt_type = "retry"
lineage_fields = {
"retry_reason", core.pending_retry_reason or "",
"previous_attempt_index", core.pending_previous_attempt_index or ""
}
elseif core.attempt_state == "pending_replay_attempt" then
attempt_type = "replay"
lineage_fields = {
"replay_reason", core.pending_replay_reason or "",
"replay_requested_by", core.pending_replay_requested_by or "",
"replayed_from_attempt_index", core.pending_previous_attempt_index or ""
}
end
local tag = string.match(K.core_key, "(%b{})")
local att_key = "ff:attempt:" .. tag .. ":" .. A.execution_id .. ":" .. tostring(next_att_idx)
local att_usage_key = att_key .. ":usage"
local att_policy_key = att_key .. ":policy"
local attempt_fields = {
"attempt_id", A.attempt_id,
"execution_id", A.execution_id,
"attempt_index", tostring(next_att_idx),
"attempt_type", attempt_type,
"attempt_state", "started",
"created_at", tostring(now_ms),
"started_at", tostring(now_ms),
"lease_id", A.lease_id,
"lease_epoch", tostring(next_epoch),
"worker_id", A.worker_id,
"worker_instance_id", A.worker_instance_id
}
for _, v in ipairs(lineage_fields) do
attempt_fields[#attempt_fields + 1] = v
end
redis.call("HSET", att_key, unpack(attempt_fields))
redis.call("ZADD", K.attempts_zset, now_ms, tostring(next_att_idx))
redis.call("HSET", att_usage_key,
"last_usage_report_seq", "0")
if is_set(A.attempt_policy_json) then
local policy_flat = unpack_policy(A.attempt_policy_json)
if #policy_flat > 0 then
redis.call("HSET", att_policy_key, unpack(policy_flat))
end
end
redis.call("DEL", K.lease_current_key)
redis.call("HSET", K.lease_current_key,
"lease_id", A.lease_id,
"lease_epoch", tostring(next_epoch),
"execution_id", A.execution_id,
"attempt_id", A.attempt_id,
"worker_id", A.worker_id,
"worker_instance_id", A.worker_instance_id,
"acquired_at", tostring(now_ms),
"expires_at", tostring(expires_at),
"last_renewed_at", tostring(now_ms),
"renewal_deadline", tostring(renewal_deadline))
redis.call("HSET", K.core_key,
"lifecycle_phase", "active",
"ownership_state", "leased",
"eligibility_state", "not_applicable",
"blocking_reason", "none",
"blocking_detail", "",
"terminal_outcome", "none",
"attempt_state", "running_attempt",
"public_state", "active",
"current_attempt_index", tostring(next_att_idx),
"total_attempt_count", tostring(next_att_idx + 1),
"current_attempt_id", A.attempt_id,
"current_lease_id", A.lease_id,
"current_lease_epoch", tostring(next_epoch),
"current_worker_id", A.worker_id,
"current_worker_instance_id", A.worker_instance_id,
"current_lane", A.lane,
"lease_acquired_at", tostring(now_ms),
"lease_expires_at", tostring(expires_at),
"lease_last_renewed_at", tostring(now_ms),
"lease_renewal_deadline", tostring(renewal_deadline),
"started_at", (core.started_at ~= nil and core.started_at ~= "") and core.started_at or tostring(now_ms),
"pending_retry_reason", "",
"pending_replay_reason", "",
"pending_replay_requested_by", "",
"pending_previous_attempt_index", "",
"last_transition_at", tostring(now_ms),
"last_mutation_at", tostring(now_ms))
redis.call("ZREM", K.eligible_zset, A.execution_id)
redis.call("ZADD", K.lease_expiry_key, expires_at, A.execution_id)
redis.call("SADD", K.worker_leases_key, A.execution_id)
redis.call("ZADD", K.active_index_key, expires_at, A.execution_id)
if is_set(A.attempt_timeout_ms) and A.attempt_timeout_ms ~= "0" then
redis.call("ZADD", K.attempt_timeout_key,
now_ms + tonumber(A.attempt_timeout_ms), A.execution_id)
end
if is_set(A.execution_deadline_at) and A.execution_deadline_at ~= "0" then
redis.call("ZADD", K.execution_deadline_key,
tonumber(A.execution_deadline_at), A.execution_id)
end
redis.call("XADD", K.lease_history_key, "MAXLEN", "~", 1000, "*",
"event", "acquired",
"lease_id", A.lease_id,
"lease_epoch", tostring(next_epoch),
"attempt_id", A.attempt_id,
"attempt_index", tostring(next_att_idx),
"worker_id", A.worker_id,
"ts", tostring(now_ms))
return ok(A.lease_id, tostring(next_epoch), tostring(expires_at),
A.attempt_id, tostring(next_att_idx), attempt_type)
end)
redis.register_function('ff_complete_execution', function(keys, args)
local K = {
core_key = keys[1],
attempt_hash = keys[2],
lease_expiry_key = keys[3],
worker_leases_key = keys[4],
terminal_zset = keys[5],
lease_current_key = keys[6],
lease_history_key = keys[7],
active_index_key = keys[8],
stream_meta = keys[9],
result_key = keys[10],
attempt_timeout_key = keys[11],
execution_deadline_key = keys[12],
}
local A = {
execution_id = args[1],
lease_id = args[2] or "",
lease_epoch = args[3] or "",
attempt_id = args[4] or "",
result_payload = args[5] or "",
source = args[6] or "",
}
local t = redis.call("TIME")
local now_ms = tonumber(t[1]) * 1000 + math.floor(tonumber(t[2]) / 1000)
local raw = redis.call("HGETALL", K.core_key)
if #raw == 0 then return err("execution_not_found") end
local core = hgetall_to_table(raw)
local fence, must_check_or_err = resolve_lease_fence(core, A)
if not fence then return must_check_or_err end
if not must_check_or_err then
if A.source ~= "operator_override" then return err("fence_required") end
A.lease_id = fence.lease_id
A.lease_epoch = fence.lease_epoch
A.attempt_id = fence.attempt_id
end
local lease_err = validate_lease_and_mark_expired(
core, A, now_ms, K, 1000)
if lease_err then return lease_err end
redis.call("HSET", K.attempt_hash,
"attempt_state", "ended_success",
"ended_at", tostring(now_ms))
if redis.call("EXISTS", K.stream_meta) == 1 then
redis.call("HSET", K.stream_meta,
"closed_at", tostring(now_ms),
"closed_reason", "attempt_success")
end
if A.result_payload ~= "" then
redis.call("SET", K.result_key, A.result_payload)
end
redis.call("HSET", K.core_key,
"lifecycle_phase", "terminal",
"ownership_state", "unowned",
"eligibility_state", "not_applicable",
"blocking_reason", "none",
"blocking_detail", "",
"terminal_outcome", "success",
"attempt_state", "attempt_terminal",
"public_state", "completed",
"completed_at", tostring(now_ms),
"current_lease_id", "",
"current_worker_id", "",
"current_worker_instance_id", "",
"lease_expires_at", "",
"lease_last_renewed_at", "",
"lease_renewal_deadline", "",
"lease_expired_at", "",
"lease_revoked_at", "",
"lease_revoke_reason", "",
"last_transition_at", tostring(now_ms),
"last_mutation_at", tostring(now_ms))
redis.call("ZREM", K.lease_expiry_key, A.execution_id)
redis.call("SREM", K.worker_leases_key, A.execution_id)
redis.call("ZADD", K.terminal_zset, now_ms, A.execution_id)
redis.call("ZREM", K.active_index_key, A.execution_id)
redis.call("ZREM", K.attempt_timeout_key, A.execution_id)
redis.call("ZREM", K.execution_deadline_key, A.execution_id)
redis.call("DEL", K.lease_current_key)
redis.call("XADD", K.lease_history_key, "MAXLEN", "~", 1000, "*",
"event", "released",
"lease_id", A.lease_id,
"lease_epoch", A.lease_epoch,
"attempt_index", core.current_attempt_index or "",
"reason", "completed",
"ts", tostring(now_ms))
if is_set(core.flow_id) then
local payload = cjson.encode({
execution_id = A.execution_id,
flow_id = core.flow_id,
outcome = "success",
})
redis.call("PUBLISH", "ff:dag:completions", payload)
end
return ok("completed")
end)
redis.register_function('ff_cancel_execution', function(keys, args)
local K = {
core_key = keys[1],
attempt_hash = keys[2],
stream_meta = keys[3],
lease_current_key = keys[4],
lease_history_key = keys[5],
lease_expiry_key = keys[6],
worker_leases_key = keys[7],
suspension_current = keys[8],
waitpoint_hash = keys[9],
wp_condition = keys[10],
suspension_timeout_key = keys[11],
terminal_key = keys[12],
attempt_timeout_key = keys[13],
execution_deadline_key = keys[14],
eligible_key = keys[15],
delayed_key = keys[16],
blocked_deps_key = keys[17],
blocked_budget_key = keys[18],
blocked_quota_key = keys[19],
blocked_route_key = keys[20],
blocked_operator_key = keys[21],
active_index_key = nil,
suspended_key = nil,
}
local A = {
execution_id = args[1],
reason = args[2],
source = args[3] or "", lease_id = args[4] or "",
lease_epoch = args[5] or "",
}
local t = redis.call("TIME")
local now_ms = tonumber(t[1]) * 1000 + math.floor(tonumber(t[2]) / 1000)
local raw = redis.call("HGETALL", K.core_key)
if #raw == 0 then return err("execution_not_found") end
local core = hgetall_to_table(raw)
local tag = string.match(K.core_key, "(%b{})")
local lane = core.lane_id or "default"
K.active_index_key = "ff:idx:" .. tag .. ":lane:" .. lane .. ":active"
K.suspended_key = "ff:idx:" .. tag .. ":lane:" .. lane .. ":suspended"
if core.lifecycle_phase == "terminal" then
return err("execution_not_active",
core.terminal_outcome or "",
core.current_lease_epoch or "",
"terminal",
core.current_attempt_id or "")
end
local cancelled_from = core.lifecycle_phase
if core.lifecycle_phase == "active" then
if A.source ~= "operator_override" then
if core.ownership_state == "lease_revoked" then
return err("lease_revoked")
end
if is_set(A.lease_id) then
if core.current_lease_id ~= A.lease_id then return err("stale_lease") end
if core.current_lease_epoch ~= A.lease_epoch then return err("stale_lease") end
end
end
if is_set(core.current_attempt_index) then
redis.call("HSET", K.attempt_hash,
"attempt_state", "ended_cancelled",
"ended_at", tostring(now_ms),
"failure_reason", "cancelled: " .. A.reason)
end
if redis.call("EXISTS", K.stream_meta) == 1 then
redis.call("HSET", K.stream_meta,
"closed_at", tostring(now_ms),
"closed_reason", "attempt_cancelled")
end
redis.call("DEL", K.lease_current_key)
redis.call("XADD", K.lease_history_key, "MAXLEN", "~", "1000", "*",
"event", "released",
"lease_id", core.current_lease_id or "",
"lease_epoch", core.current_lease_epoch or "",
"attempt_index", core.current_attempt_index or "",
"reason", "cancelled",
"ts", tostring(now_ms))
end
if core.lifecycle_phase == "suspended" then
if is_set(core.current_attempt_index) then
redis.call("HSET", K.attempt_hash,
"attempt_state", "ended_cancelled",
"ended_at", tostring(now_ms),
"failure_reason", "cancelled: " .. A.reason)
end
if redis.call("EXISTS", K.stream_meta) == 1 then
redis.call("HSET", K.stream_meta,
"closed_at", tostring(now_ms),
"closed_reason", "attempt_cancelled")
end
if redis.call("EXISTS", K.suspension_current) == 1 then
redis.call("HSET", K.suspension_current,
"closed_at", tostring(now_ms),
"close_reason", "cancelled")
end
if redis.call("EXISTS", K.waitpoint_hash) == 1 then
redis.call("HSET", K.waitpoint_hash,
"state", "closed",
"closed_at", tostring(now_ms),
"close_reason", "cancelled")
end
if redis.call("EXISTS", K.wp_condition) == 1 then
redis.call("HSET", K.wp_condition,
"closed", "1",
"closed_at", tostring(now_ms),
"closed_reason", "cancelled")
end
end
redis.call("HSET", K.core_key,
"lifecycle_phase", "terminal",
"ownership_state", "unowned",
"eligibility_state", "not_applicable",
"blocking_reason", "none",
"blocking_detail", "",
"terminal_outcome", "cancelled",
"attempt_state", is_set(core.current_attempt_index) and "attempt_terminal" or (core.attempt_state or "none"),
"public_state", "cancelled",
"cancellation_reason", A.reason,
"cancelled_by", A.source ~= "" and A.source or A.execution_id,
"completed_at", tostring(now_ms),
"current_lease_id", "",
"current_worker_id", "",
"current_worker_instance_id", "",
"lease_expires_at", "",
"lease_last_renewed_at", "",
"lease_renewal_deadline", "",
"current_suspension_id", "",
"current_waitpoint_id", "",
"last_transition_at", tostring(now_ms),
"last_mutation_at", tostring(now_ms))
defensive_zrem_all_indexes(K, A.execution_id, K.terminal_key)
redis.call("ZADD", K.terminal_key, now_ms, A.execution_id)
if is_set(core.flow_id) and A.source ~= "flow_cascade" then
local payload = cjson.encode({
execution_id = A.execution_id,
flow_id = core.flow_id,
outcome = "cancelled",
})
redis.call("PUBLISH", "ff:dag:completions", payload)
end
return ok("cancelled", cancelled_from)
end)
redis.register_function('ff_delay_execution', function(keys, args)
local K = {
core_key = keys[1],
attempt_hash = keys[2],
lease_current_key = keys[3],
lease_history_key = keys[4],
lease_expiry_key = keys[5],
worker_leases_key = keys[6],
active_index_key = keys[7],
delayed_zset = keys[8],
attempt_timeout_key = keys[9],
}
local A = {
execution_id = args[1],
lease_id = args[2] or "",
lease_epoch = args[3] or "",
attempt_id = args[4] or "",
delay_until = args[5] or "",
source = args[6] or "",
}
local t = redis.call("TIME")
local now_ms = tonumber(t[1]) * 1000 + math.floor(tonumber(t[2]) / 1000)
local raw = redis.call("HGETALL", K.core_key)
if #raw == 0 then return err("execution_not_found") end
local core = hgetall_to_table(raw)
local fence, must_check_or_err = resolve_lease_fence(core, A)
if not fence then return must_check_or_err end
if not must_check_or_err then
if A.source ~= "operator_override" then return err("fence_required") end
A.lease_id = fence.lease_id
A.lease_epoch = fence.lease_epoch
A.attempt_id = fence.attempt_id
end
local lease_err = validate_lease_and_mark_expired(
core, A, now_ms, K, 1000)
if lease_err then return lease_err end
redis.call("HSET", K.core_key,
"lifecycle_phase", "runnable",
"ownership_state", "unowned",
"eligibility_state", "not_eligible_until_time",
"blocking_reason", "waiting_for_delay",
"blocking_detail", "delayed until " .. A.delay_until,
"terminal_outcome", "none",
"attempt_state", "attempt_interrupted",
"public_state", "delayed",
"delay_until", A.delay_until,
"current_lease_id", "",
"current_worker_id", "",
"current_worker_instance_id", "",
"lease_expires_at", "",
"lease_last_renewed_at", "",
"lease_renewal_deadline", "",
"last_transition_at", tostring(now_ms),
"last_mutation_at", tostring(now_ms))
redis.call("HSET", K.attempt_hash,
"attempt_state", "suspended",
"suspended_at", tostring(now_ms),
"suspension_id", "worker_delay")
redis.call("DEL", K.lease_current_key)
redis.call("ZREM", K.lease_expiry_key, A.execution_id)
redis.call("SREM", K.worker_leases_key, A.execution_id)
redis.call("ZREM", K.active_index_key, A.execution_id)
redis.call("ZREM", K.attempt_timeout_key, A.execution_id)
redis.call("ZADD", K.delayed_zset, tonumber(A.delay_until), A.execution_id)
redis.call("XADD", K.lease_history_key, "MAXLEN", "~", "1000", "*",
"event", "released",
"lease_id", A.lease_id,
"lease_epoch", A.lease_epoch,
"attempt_index", core.current_attempt_index or "",
"attempt_id", A.attempt_id,
"reason", "worker_delay",
"ts", tostring(now_ms))
return ok(A.delay_until)
end)
redis.register_function('ff_move_to_waiting_children', function(keys, args)
local K = {
core_key = keys[1],
attempt_hash = keys[2],
lease_current_key = keys[3],
lease_history_key = keys[4],
lease_expiry_key = keys[5],
worker_leases_key = keys[6],
active_index_key = keys[7],
blocked_deps_zset = keys[8],
attempt_timeout_key = keys[9],
}
local A = {
execution_id = args[1],
lease_id = args[2] or "",
lease_epoch = args[3] or "",
attempt_id = args[4] or "",
source = args[5] or "",
}
local t = redis.call("TIME")
local now_ms = tonumber(t[1]) * 1000 + math.floor(tonumber(t[2]) / 1000)
local raw = redis.call("HGETALL", K.core_key)
if #raw == 0 then return err("execution_not_found") end
local core = hgetall_to_table(raw)
local fence, must_check_or_err = resolve_lease_fence(core, A)
if not fence then return must_check_or_err end
if not must_check_or_err then
if A.source ~= "operator_override" then return err("fence_required") end
A.lease_id = fence.lease_id
A.lease_epoch = fence.lease_epoch
A.attempt_id = fence.attempt_id
end
local lease_err = validate_lease_and_mark_expired(
core, A, now_ms, K, 1000)
if lease_err then return lease_err end
redis.call("HSET", K.core_key,
"lifecycle_phase", "runnable",
"ownership_state", "unowned",
"eligibility_state", "blocked_by_dependencies",
"blocking_reason", "waiting_for_children",
"blocking_detail", "waiting for child executions to complete",
"terminal_outcome", "none",
"attempt_state", "attempt_interrupted",
"public_state", "waiting_children",
"current_lease_id", "",
"current_worker_id", "",
"current_worker_instance_id", "",
"lease_expires_at", "",
"lease_last_renewed_at", "",
"lease_renewal_deadline", "",
"last_transition_at", tostring(now_ms),
"last_mutation_at", tostring(now_ms))
redis.call("HSET", K.attempt_hash,
"attempt_state", "suspended",
"suspended_at", tostring(now_ms),
"suspension_id", "waiting_children")
redis.call("DEL", K.lease_current_key)
redis.call("ZREM", K.lease_expiry_key, A.execution_id)
redis.call("SREM", K.worker_leases_key, A.execution_id)
redis.call("ZREM", K.active_index_key, A.execution_id)
redis.call("ZREM", K.attempt_timeout_key, A.execution_id)
redis.call("XADD", K.lease_history_key, "MAXLEN", "~", "1000", "*",
"event", "released",
"lease_id", A.lease_id,
"lease_epoch", A.lease_epoch,
"attempt_index", core.current_attempt_index or "",
"attempt_id", A.attempt_id,
"reason", "waiting_children",
"ts", tostring(now_ms))
redis.call("ZADD", K.blocked_deps_zset,
tonumber(core.created_at or "0"), A.execution_id)
return ok()
end)
redis.register_function('ff_fail_execution', function(keys, args)
local K = {
core_key = keys[1],
attempt_hash = keys[2],
lease_expiry_key = keys[3],
worker_leases_key = keys[4],
terminal_key = keys[5],
delayed_zset = keys[6],
lease_current_key = keys[7],
lease_history_key = keys[8],
active_index_key = keys[9],
stream_meta = keys[10],
attempt_timeout_key = keys[11],
execution_deadline_key = keys[12],
}
local A = {
execution_id = args[1],
lease_id = args[2] or "",
lease_epoch = args[3] or "",
attempt_id = args[4] or "",
failure_reason = args[5] or "",
failure_category = args[6] or "",
retry_policy_json = args[7] or "",
source = args[8] or "",
}
local t = redis.call("TIME")
local now_ms = tonumber(t[1]) * 1000 + math.floor(tonumber(t[2]) / 1000)
local raw = redis.call("HGETALL", K.core_key)
if #raw == 0 then return err("execution_not_found") end
local core = hgetall_to_table(raw)
local fence, must_check_or_err = resolve_lease_fence(core, A)
if not fence then return must_check_or_err end
if not must_check_or_err then
if A.source ~= "operator_override" then return err("fence_required") end
A.lease_id = fence.lease_id
A.lease_epoch = fence.lease_epoch
A.attempt_id = fence.attempt_id
end
local lease_err = validate_lease_and_mark_expired(
core, A, now_ms, K, 1000)
if lease_err then return lease_err end
redis.call("HSET", K.attempt_hash,
"attempt_state", "ended_failure",
"ended_at", tostring(now_ms),
"failure_reason", A.failure_reason,
"failure_category", A.failure_category)
if redis.call("EXISTS", K.stream_meta) == 1 then
redis.call("HSET", K.stream_meta,
"closed_at", tostring(now_ms),
"closed_reason", "attempt_failure")
end
local retry_count = tonumber(core.retry_count or "0")
local max_retries = 0
local backoff_ms = 1000
local can_retry = false
if is_set(A.retry_policy_json) then
local ok_decode, policy = pcall(cjson.decode, A.retry_policy_json)
if ok_decode and type(policy) == "table" then
max_retries = tonumber(policy.max_retries or "0")
if retry_count < max_retries then
can_retry = true
local bt = policy.backoff or {}
if bt.type == "exponential" then
local initial = (tonumber(bt.initial_delay_ms) or 1000)
local max_d = (tonumber(bt.max_delay_ms) or 60000)
local mult = (tonumber(bt.multiplier) or 2)
backoff_ms = math.min(initial * (mult ^ retry_count), max_d)
elseif bt.type == "fixed" then
backoff_ms = (tonumber(bt.delay_ms) or 1000)
end
end
end
end
if can_retry then
local delay_until = now_ms + backoff_ms
redis.call("HSET", K.core_key,
"lifecycle_phase", "runnable",
"ownership_state", "unowned",
"eligibility_state", "not_eligible_until_time",
"blocking_reason", "waiting_for_retry_backoff",
"blocking_detail", "retry backoff until " .. tostring(delay_until) ..
" (attempt " .. (core.current_attempt_index or "0") ..
" failed: " .. A.failure_reason .. ")",
"terminal_outcome", "none",
"attempt_state", "pending_retry_attempt",
"public_state", "delayed",
"pending_retry_reason", A.failure_reason,
"pending_previous_attempt_index", core.current_attempt_index or "0",
"retry_count", tostring(retry_count + 1),
"current_attempt_id", "",
"current_lease_id", "",
"current_worker_id", "",
"current_worker_instance_id", "",
"lease_expires_at", "",
"lease_last_renewed_at", "",
"lease_renewal_deadline", "",
"delay_until", tostring(delay_until),
"failure_reason", A.failure_reason,
"last_transition_at", tostring(now_ms),
"last_mutation_at", tostring(now_ms))
redis.call("DEL", K.lease_current_key)
redis.call("ZREM", K.lease_expiry_key, A.execution_id)
redis.call("SREM", K.worker_leases_key, A.execution_id)
redis.call("ZREM", K.active_index_key, A.execution_id)
redis.call("ZREM", K.attempt_timeout_key, A.execution_id)
redis.call("ZADD", K.delayed_zset, delay_until, A.execution_id)
redis.call("XADD", K.lease_history_key, "MAXLEN", "~", "1000", "*",
"event", "released",
"lease_id", A.lease_id,
"lease_epoch", A.lease_epoch,
"attempt_index", core.current_attempt_index or "",
"reason", "failed_retry_scheduled",
"ts", tostring(now_ms))
return ok("retry_scheduled", tostring(delay_until))
else
redis.call("HSET", K.core_key,
"lifecycle_phase", "terminal",
"ownership_state", "unowned",
"eligibility_state", "not_applicable",
"blocking_reason", "none",
"blocking_detail", "",
"terminal_outcome", "failed",
"attempt_state", "attempt_terminal",
"public_state", "failed",
"failure_reason", A.failure_reason,
"completed_at", tostring(now_ms),
"current_lease_id", "",
"current_worker_id", "",
"current_worker_instance_id", "",
"lease_expires_at", "",
"lease_last_renewed_at", "",
"lease_renewal_deadline", "",
"last_transition_at", tostring(now_ms),
"last_mutation_at", tostring(now_ms))
redis.call("DEL", K.lease_current_key)
redis.call("ZREM", K.lease_expiry_key, A.execution_id)
redis.call("SREM", K.worker_leases_key, A.execution_id)
redis.call("ZREM", K.active_index_key, A.execution_id)
redis.call("ZREM", K.attempt_timeout_key, A.execution_id)
redis.call("ZREM", K.execution_deadline_key, A.execution_id)
redis.call("ZADD", K.terminal_key, now_ms, A.execution_id)
redis.call("XADD", K.lease_history_key, "MAXLEN", "~", "1000", "*",
"event", "released",
"lease_id", A.lease_id,
"lease_epoch", A.lease_epoch,
"attempt_index", core.current_attempt_index or "",
"reason", "failed_terminal",
"ts", tostring(now_ms))
if is_set(core.flow_id) then
local payload = cjson.encode({
execution_id = A.execution_id,
flow_id = core.flow_id,
outcome = "failed",
})
redis.call("PUBLISH", "ff:dag:completions", payload)
end
return ok("terminal_failed")
end
end)
redis.register_function('ff_reclaim_execution', function(keys, args)
local K = {
core_key = keys[1],
claim_grant = keys[2],
old_attempt_hash = keys[3],
old_stream_meta = keys[4],
new_attempt_hash = keys[5],
new_attempt_usage = keys[6],
attempts_zset = keys[7],
lease_current_key = keys[8],
lease_history_key = keys[9],
lease_expiry_key = keys[10],
worker_leases_key = keys[11],
active_index_key = keys[12],
attempt_timeout_key = keys[13],
execution_deadline_key = keys[14],
}
local reclaim_ttl_n = require_number(args[6], "lease_ttl_ms")
if type(reclaim_ttl_n) == "table" then return reclaim_ttl_n end
local A = {
execution_id = args[1],
worker_id = args[2],
worker_instance_id = args[3],
lane = args[4],
lease_id = args[5],
lease_ttl_ms = reclaim_ttl_n,
attempt_id = args[7],
attempt_policy_json = args[8] or "",
}
local t = redis.call("TIME")
local now_ms = tonumber(t[1]) * 1000 + math.floor(tonumber(t[2]) / 1000)
local raw = redis.call("HGETALL", K.core_key)
if #raw == 0 then return err("execution_not_found") end
local core = hgetall_to_table(raw)
if core.lifecycle_phase ~= "active" then
return err("execution_not_reclaimable")
end
if core.ownership_state ~= "lease_expired_reclaimable"
and core.ownership_state ~= "lease_revoked" then
return err("execution_not_reclaimable")
end
local reclaim_count = tonumber(core.lease_reclaim_count or "0")
local max_reclaim = 100 local policy_key = string.gsub(K.core_key, ":core$", ":policy")
local policy_raw = redis.call("GET", policy_key)
if policy_raw then
local ok_p, policy = pcall(cjson.decode, policy_raw)
if ok_p and type(policy) == "table" then
max_reclaim = tonumber(policy.max_reclaim_count or "100")
end
end
if reclaim_count >= max_reclaim then
redis.call("HSET", K.core_key,
"lifecycle_phase", "terminal",
"ownership_state", "unowned",
"eligibility_state", "not_applicable",
"blocking_reason", "none",
"blocking_detail", "",
"terminal_outcome", "failed",
"attempt_state", "attempt_terminal",
"public_state", "failed",
"failure_reason", "max_reclaims_exceeded",
"completed_at", tostring(now_ms),
"current_lease_id", "",
"current_worker_id", "",
"current_worker_instance_id", "",
"last_transition_at", tostring(now_ms),
"last_mutation_at", tostring(now_ms))
redis.call("DEL", K.lease_current_key)
redis.call("ZREM", K.lease_expiry_key, A.execution_id)
redis.call("SREM", K.worker_leases_key, A.execution_id)
local wiid = core.current_worker_instance_id or ""
if wiid ~= "" then
local tag_wl = string.match(K.core_key, "(%b{})")
redis.call("SREM", "ff:idx:" .. tag_wl .. ":worker:" .. wiid .. ":leases", A.execution_id)
end
redis.call("ZREM", K.active_index_key, A.execution_id)
redis.call("ZREM", K.attempt_timeout_key, A.execution_id)
redis.call("ZREM", K.execution_deadline_key, A.execution_id)
local tag = string.match(K.core_key, "(%b{})")
local lane = core.lane_id or core.current_lane or "default"
local terminal_key = "ff:idx:" .. tag .. ":lane:" .. lane .. ":terminal"
redis.call("ZADD", terminal_key, now_ms, A.execution_id)
return err("max_retries_exhausted")
end
local grant_raw = redis.call("HGETALL", K.claim_grant)
if #grant_raw == 0 then return err("invalid_claim_grant") end
local grant = hgetall_to_table(grant_raw)
if grant.worker_id ~= A.worker_id then return err("invalid_claim_grant") end
redis.call("DEL", K.claim_grant)
local old_att_idx = core.current_attempt_index or "0"
redis.call("HSET", K.old_attempt_hash,
"attempt_state", "interrupted_reclaimed",
"ended_at", tostring(now_ms),
"failure_reason", "lease_" .. (core.ownership_state or "expired"))
if redis.call("EXISTS", K.old_stream_meta) == 1 then
redis.call("HSET", K.old_stream_meta,
"closed_at", tostring(now_ms),
"closed_reason", "reclaimed")
end
local next_epoch = tonumber(core.current_lease_epoch or "0") + 1
local next_att_idx = tonumber(core.total_attempt_count or "0")
local expires_at = now_ms + A.lease_ttl_ms
local renewal_deadline = now_ms + math.floor(A.lease_ttl_ms * 2 / 3)
local tag = string.match(K.core_key, "(%b{})")
local att_key = "ff:attempt:" .. tag .. ":" .. A.execution_id .. ":" .. tostring(next_att_idx)
local att_usage_key = att_key .. ":usage"
redis.call("HSET", att_key,
"attempt_id", A.attempt_id,
"execution_id", A.execution_id,
"attempt_index", tostring(next_att_idx),
"attempt_type", "reclaim",
"attempt_state", "started",
"created_at", tostring(now_ms),
"started_at", tostring(now_ms),
"lease_id", A.lease_id,
"lease_epoch", tostring(next_epoch),
"worker_id", A.worker_id,
"worker_instance_id", A.worker_instance_id,
"reclaim_reason", "lease_" .. (core.ownership_state or "expired"),
"previous_attempt_index", old_att_idx)
redis.call("ZADD", K.attempts_zset, now_ms, tostring(next_att_idx))
redis.call("HSET", att_usage_key, "last_usage_report_seq", "0")
redis.call("DEL", K.lease_current_key)
redis.call("HSET", K.lease_current_key,
"lease_id", A.lease_id,
"lease_epoch", tostring(next_epoch),
"execution_id", A.execution_id,
"attempt_id", A.attempt_id,
"worker_id", A.worker_id,
"worker_instance_id", A.worker_instance_id,
"acquired_at", tostring(now_ms),
"expires_at", tostring(expires_at),
"last_renewed_at", tostring(now_ms),
"renewal_deadline", tostring(renewal_deadline))
redis.call("HSET", K.core_key,
"lifecycle_phase", "active",
"ownership_state", "leased",
"eligibility_state", "not_applicable",
"blocking_reason", "none",
"blocking_detail", "",
"terminal_outcome", "none",
"attempt_state", "running_attempt",
"public_state", "active",
"current_attempt_index", tostring(next_att_idx),
"total_attempt_count", tostring(next_att_idx + 1),
"current_attempt_id", A.attempt_id,
"current_lease_id", A.lease_id,
"current_lease_epoch", tostring(next_epoch),
"current_worker_id", A.worker_id,
"current_worker_instance_id", A.worker_instance_id,
"current_lane", A.lane,
"lease_acquired_at", tostring(now_ms),
"lease_expires_at", tostring(expires_at),
"lease_last_renewed_at", tostring(now_ms),
"lease_renewal_deadline", tostring(renewal_deadline),
"lease_expired_at", "",
"lease_revoked_at", "",
"lease_revoke_reason", "",
"lease_reclaim_count", tostring(reclaim_count + 1),
"last_transition_at", tostring(now_ms),
"last_mutation_at", tostring(now_ms))
redis.call("ZADD", K.lease_expiry_key, expires_at, A.execution_id)
local old_wiid = core.current_worker_instance_id or ""
if old_wiid ~= "" and old_wiid ~= A.worker_instance_id then
local tag_wl = string.match(K.core_key, "(%b{})")
redis.call("SREM",
"ff:idx:" .. tag_wl .. ":worker:" .. old_wiid .. ":leases",
A.execution_id)
end
redis.call("SADD", K.worker_leases_key, A.execution_id)
redis.call("ZADD", K.active_index_key, expires_at, A.execution_id)
redis.call("XADD", K.lease_history_key, "MAXLEN", "~", 1000, "*",
"event", "reclaimed",
"old_lease_epoch", core.current_lease_epoch or "",
"new_lease_id", A.lease_id,
"new_lease_epoch", tostring(next_epoch),
"new_attempt_id", A.attempt_id,
"new_attempt_index", tostring(next_att_idx),
"worker_id", A.worker_id,
"ts", tostring(now_ms))
return ok(A.lease_id, tostring(next_epoch), tostring(expires_at),
A.attempt_id, tostring(next_att_idx), "reclaim")
end)
redis.register_function('ff_expire_execution', function(keys, args)
local K = {
core_key = keys[1],
attempt_hash = keys[2],
stream_meta = keys[3],
lease_current_key = keys[4],
lease_history_key = keys[5],
lease_expiry_key = keys[6],
worker_leases_key = keys[7],
active_index_key = keys[8],
terminal_key = keys[9],
attempt_timeout_key = keys[10],
execution_deadline_key = keys[11],
suspended_zset = keys[12],
suspension_timeout_key = keys[13],
suspension_current = keys[14],
}
local A = {
execution_id = args[1],
expire_reason = args[2] or "attempt_timeout",
}
local t = redis.call("TIME")
local now_ms = tonumber(t[1]) * 1000 + math.floor(tonumber(t[2]) / 1000)
local raw = redis.call("HGETALL", K.core_key)
if #raw == 0 then
redis.call("ZREM", K.attempt_timeout_key, A.execution_id)
redis.call("ZREM", K.execution_deadline_key, A.execution_id)
return ok("not_found_cleaned")
end
local core = hgetall_to_table(raw)
if core.lifecycle_phase == "terminal" then
redis.call("ZREM", K.attempt_timeout_key, A.execution_id)
redis.call("ZREM", K.execution_deadline_key, A.execution_id)
return ok("already_terminal")
end
if core.lifecycle_phase == "active" then
if is_set(core.current_attempt_index) then
redis.call("HSET", K.attempt_hash,
"attempt_state", "ended_failure",
"ended_at", tostring(now_ms),
"failure_reason", A.expire_reason)
end
if redis.call("EXISTS", K.stream_meta) == 1 then
redis.call("HSET", K.stream_meta,
"closed_at", tostring(now_ms),
"closed_reason", "expired")
end
redis.call("DEL", K.lease_current_key)
redis.call("ZREM", K.lease_expiry_key, A.execution_id)
redis.call("SREM", K.worker_leases_key, A.execution_id)
local wiid = core.current_worker_instance_id or ""
if wiid ~= "" then
local tag = string.match(K.core_key, "(%b{})")
redis.call("SREM", "ff:idx:" .. tag .. ":worker:" .. wiid .. ":leases", A.execution_id)
end
redis.call("ZREM", K.active_index_key, A.execution_id)
redis.call("XADD", K.lease_history_key, "MAXLEN", "~", "1000", "*",
"event", "released",
"lease_id", core.current_lease_id or "",
"lease_epoch", core.current_lease_epoch or "",
"attempt_index", core.current_attempt_index or "",
"reason", "expired:" .. A.expire_reason,
"ts", tostring(now_ms))
end
if core.lifecycle_phase == "suspended" then
if is_set(core.current_attempt_index) then
redis.call("HSET", K.attempt_hash,
"attempt_state", "ended_failure",
"ended_at", tostring(now_ms),
"failure_reason", A.expire_reason)
end
if redis.call("EXISTS", K.suspension_current) == 1 then
redis.call("HSET", K.suspension_current,
"closed_at", tostring(now_ms),
"close_reason", "expired:" .. A.expire_reason)
end
if redis.call("EXISTS", K.stream_meta) == 1 then
redis.call("HSET", K.stream_meta,
"closed_at", tostring(now_ms),
"closed_reason", "expired")
end
redis.call("ZREM", K.suspended_zset, A.execution_id)
redis.call("ZREM", K.suspension_timeout_key, A.execution_id)
end
if core.lifecycle_phase == "runnable" then
local tag = string.match(K.core_key, "(%b{})")
local lane = core.lane_id or core.current_lane or "default"
local es = core.eligibility_state or ""
if es == "eligible_now" then
redis.call("ZREM", "ff:idx:" .. tag .. ":lane:" .. lane .. ":eligible", A.execution_id)
elseif es == "not_eligible_until_time" then
redis.call("ZREM", "ff:idx:" .. tag .. ":lane:" .. lane .. ":delayed", A.execution_id)
elseif es == "blocked_by_dependencies" then
redis.call("ZREM", "ff:idx:" .. tag .. ":lane:" .. lane .. ":blocked:dependencies", A.execution_id)
elseif es == "blocked_by_budget" then
redis.call("ZREM", "ff:idx:" .. tag .. ":lane:" .. lane .. ":blocked:budget", A.execution_id)
elseif es == "blocked_by_quota" then
redis.call("ZREM", "ff:idx:" .. tag .. ":lane:" .. lane .. ":blocked:quota", A.execution_id)
elseif es == "blocked_by_route" then
redis.call("ZREM", "ff:idx:" .. tag .. ":lane:" .. lane .. ":blocked:route", A.execution_id)
elseif es == "blocked_by_operator" then
redis.call("ZREM", "ff:idx:" .. tag .. ":lane:" .. lane .. ":blocked:operator", A.execution_id)
else
local lp = "ff:idx:" .. tag .. ":lane:" .. lane
redis.call("ZREM", lp .. ":eligible", A.execution_id)
redis.call("ZREM", lp .. ":delayed", A.execution_id)
redis.call("ZREM", lp .. ":blocked:dependencies", A.execution_id)
redis.call("ZREM", lp .. ":blocked:budget", A.execution_id)
redis.call("ZREM", lp .. ":blocked:quota", A.execution_id)
redis.call("ZREM", lp .. ":blocked:route", A.execution_id)
redis.call("ZREM", lp .. ":blocked:operator", A.execution_id)
end
end
local att_state = "attempt_terminal"
if not is_set(core.current_attempt_index) then
att_state = core.attempt_state or "none"
end
redis.call("HSET", K.core_key,
"lifecycle_phase", "terminal",
"ownership_state", "unowned",
"eligibility_state", "not_applicable",
"blocking_reason", "none",
"blocking_detail", "",
"terminal_outcome", "expired",
"attempt_state", att_state,
"public_state", "expired",
"failure_reason", A.expire_reason,
"completed_at", tostring(now_ms),
"current_lease_id", "",
"current_worker_id", "",
"current_worker_instance_id", "",
"lease_expires_at", "",
"current_suspension_id", "",
"current_waitpoint_id", "",
"last_transition_at", tostring(now_ms),
"last_mutation_at", tostring(now_ms))
redis.call("ZREM", K.attempt_timeout_key, A.execution_id)
redis.call("ZREM", K.execution_deadline_key, A.execution_id)
redis.call("ZADD", K.terminal_key, now_ms, A.execution_id)
if is_set(core.flow_id) then
local payload = cjson.encode({
execution_id = A.execution_id,
flow_id = core.flow_id,
outcome = "expired",
})
redis.call("PUBLISH", "ff:dag:completions", payload)
end
return ok("expired", core.lifecycle_phase)
end)
redis.register_function('ff_set_execution_tags', function(keys, args)
local K = {
core_key = keys[1],
tags_key = keys[2],
}
local n = #args
if n == 0 or (n % 2) ~= 0 then
return err("invalid_input", "tags must be non-empty even-length key/value pairs")
end
if redis.call("EXISTS", K.core_key) == 0 then
return err("execution_not_found")
end
for i = 1, n, 2 do
local k = args[i]
if type(k) ~= "string" or not string.find(k, "^[a-z][a-z0-9_]*%.[^.]") then
return err("invalid_tag_key", tostring(k))
end
end
redis.call("HSET", K.tags_key, unpack(args))
local now_ms = server_time_ms()
redis.call("HSET", K.core_key, "last_mutation_at", tostring(now_ms))
return ok(tostring(n / 2))
end)
local function parse_capability_csv(csv, kind)
if csv == nil or csv == "" then
return {}, nil
end
if #csv > CAPS_MAX_BYTES then
return nil, err("invalid_capabilities", kind .. ":too_many_bytes")
end
local set = {}
local n = 0
for token in string.gmatch(csv, "([^,]+)") do
if #token > 0 then
n = n + 1
if n > CAPS_MAX_TOKENS then
return nil, err("invalid_capabilities", kind .. ":too_many_tokens")
end
set[token] = true
end
end
return set, nil
end
local function missing_capabilities(required, worker_caps)
local missing = {}
for cap, _ in pairs(required) do
if not worker_caps[cap] then
missing[#missing + 1] = cap
end
end
table.sort(missing)
return table.concat(missing, ",")
end
redis.register_function('ff_issue_claim_grant', function(keys, args)
local K = {
core_key = keys[1],
claim_grant = keys[2],
eligible_zset = keys[3],
}
local A = {
execution_id = args[1],
worker_id = args[2],
worker_instance_id = args[3],
lane_id = args[4],
capability_hash = args[5] or "",
route_snapshot_json = args[7] or "",
admission_summary = args[8] or "",
worker_capabilities_csv = args[9] or "",
}
local grant_ttl_n = require_number(args[6], "grant_ttl_ms")
if type(grant_ttl_n) == "table" then return grant_ttl_n end
A.grant_ttl_ms = grant_ttl_n
local t = redis.call("TIME")
local now_ms = tonumber(t[1]) * 1000 + math.floor(tonumber(t[2]) / 1000)
local raw = redis.call("HGETALL", K.core_key)
if #raw == 0 then return err("execution_not_found") end
local core = hgetall_to_table(raw)
if core.lifecycle_phase ~= "runnable" then
return err("execution_not_eligible")
end
if core.eligibility_state ~= "eligible_now" then
return err("execution_not_eligible")
end
if redis.call("EXISTS", K.claim_grant) == 1 then
return err("grant_already_exists")
end
local score = redis.call("ZSCORE", K.eligible_zset, A.execution_id)
if not score then
return err("execution_not_in_eligible_set")
end
local required_set, req_err = parse_capability_csv(
core.required_capabilities or "", "required")
if req_err then return req_err end
local worker_set, wrk_err = parse_capability_csv(
A.worker_capabilities_csv, "worker")
if wrk_err then return wrk_err end
if next(required_set) ~= nil then
local missing = missing_capabilities(required_set, worker_set)
if missing ~= "" then
redis.call("HSET", K.core_key,
"last_capability_mismatch_at", tostring(now_ms))
return err("capability_mismatch", missing)
end
end
local grant_expires_at = now_ms + A.grant_ttl_ms
redis.call("HSET", K.claim_grant,
"worker_id", A.worker_id,
"worker_instance_id", A.worker_instance_id,
"lane_id", A.lane_id,
"capability_hash", A.capability_hash,
"route_snapshot_json", A.route_snapshot_json,
"admission_summary", A.admission_summary,
"created_at", tostring(now_ms),
"grant_expires_at", tostring(grant_expires_at))
redis.call("PEXPIREAT", K.claim_grant, grant_expires_at)
return ok(A.execution_id)
end)
redis.register_function('ff_change_priority', function(keys, args)
local K = {
core_key = keys[1],
eligible_zset = keys[2],
}
local new_priority_n = require_number(args[2], "new_priority")
if type(new_priority_n) == "table" then return new_priority_n end
local A = {
execution_id = args[1],
new_priority = new_priority_n,
}
local t = redis.call("TIME")
local now_ms = tonumber(t[1]) * 1000 + math.floor(tonumber(t[2]) / 1000)
local raw = redis.call("HGETALL", K.core_key)
if #raw == 0 then return err("execution_not_found") end
local core = hgetall_to_table(raw)
if core.lifecycle_phase ~= "runnable" then
return err("execution_not_eligible")
end
if core.eligibility_state ~= "eligible_now" then
return err("execution_not_eligible")
end
local old_priority = tonumber(core.priority or "0")
if A.new_priority < 0 then A.new_priority = 0 end
if A.new_priority > 9000 then A.new_priority = 9000 end
redis.call("HSET", K.core_key,
"priority", tostring(A.new_priority),
"last_mutation_at", tostring(now_ms))
local created_at = tonumber(core.created_at or "0")
local new_score = 0 - (A.new_priority * 1000000000000) + created_at
redis.call("ZADD", K.eligible_zset, new_score, A.execution_id)
return ok(tostring(old_priority), tostring(A.new_priority))
end)
redis.register_function('ff_update_progress', function(keys, args)
local K = {
core_key = keys[1],
}
local A = {
execution_id = args[1],
lease_id = args[2],
lease_epoch = args[3],
progress_pct = args[4] or "",
progress_message = args[5] or "",
}
local t = redis.call("TIME")
local now_ms = tonumber(t[1]) * 1000 + math.floor(tonumber(t[2]) / 1000)
local raw = redis.call("HGETALL", K.core_key)
if #raw == 0 then return err("execution_not_found") end
local core = hgetall_to_table(raw)
if core.lifecycle_phase ~= "active" then
return err("execution_not_active",
core.terminal_outcome or "",
core.current_lease_epoch or "",
core.lifecycle_phase or "",
core.current_attempt_id or "")
end
if core.ownership_state == "lease_revoked" then
return err("lease_revoked")
end
if tonumber(core.lease_expires_at or "0") <= now_ms then
return err("lease_expired")
end
if core.current_lease_id ~= A.lease_id then
return err("stale_lease")
end
if core.current_lease_epoch ~= A.lease_epoch then
return err("stale_lease")
end
local fields = { "last_mutation_at", tostring(now_ms), "progress_updated_at", tostring(now_ms) }
if is_set(A.progress_pct) then
fields[#fields + 1] = "progress_pct"
fields[#fields + 1] = A.progress_pct
end
if is_set(A.progress_message) then
fields[#fields + 1] = "progress_message"
fields[#fields + 1] = A.progress_message
end
redis.call("HSET", K.core_key, unpack(fields))
return ok()
end)
redis.register_function('ff_promote_delayed', function(keys, args)
local K = {
core_key = keys[1],
delayed_zset = keys[2],
eligible_zset = keys[3],
}
local now_ms_n = require_number(args[2], "now_ms")
if type(now_ms_n) == "table" then return now_ms_n end
local A = {
execution_id = args[1],
now_ms = now_ms_n,
}
local raw = redis.call("HGETALL", K.core_key)
if #raw == 0 then
redis.call("ZREM", K.delayed_zset, A.execution_id)
return ok("not_found_cleaned")
end
local core = hgetall_to_table(raw)
if core.lifecycle_phase ~= "runnable" then
redis.call("ZREM", K.delayed_zset, A.execution_id)
return ok("not_runnable_cleaned")
end
if core.eligibility_state ~= "not_eligible_until_time" then
redis.call("ZREM", K.delayed_zset, A.execution_id)
return ok("not_delayed_cleaned")
end
local delay_until = tonumber(core.delay_until or "0")
if delay_until > A.now_ms then
return ok("not_yet_due")
end
redis.call("HSET", K.core_key,
"lifecycle_phase", "runnable",
"ownership_state", "unowned",
"eligibility_state", "eligible_now",
"blocking_reason", "waiting_for_worker",
"blocking_detail", "",
"terminal_outcome", "none",
"public_state", "waiting",
"delay_until", "",
"last_transition_at", tostring(A.now_ms),
"last_mutation_at", tostring(A.now_ms))
redis.call("ZREM", K.delayed_zset, A.execution_id)
local priority = tonumber(core.priority or "0")
local created_at = tonumber(core.created_at or "0")
local score = 0 - (priority * 1000000000000) + created_at
redis.call("ZADD", K.eligible_zset, score, A.execution_id)
return ok("promoted")
end)
redis.register_function('ff_issue_reclaim_grant', function(keys, args)
local K = {
core_key = keys[1],
claim_grant = keys[2],
lease_expiry = keys[3],
}
local A = {
execution_id = args[1],
worker_id = args[2],
worker_instance_id = args[3],
lane_id = args[4],
capability_hash = args[5] or "",
route_snapshot_json = args[7] or "",
admission_summary = args[8] or "",
worker_capabilities_csv = args[9] or "",
}
local grant_ttl_n = require_number(args[6], "grant_ttl_ms")
if type(grant_ttl_n) == "table" then return grant_ttl_n end
A.grant_ttl_ms = grant_ttl_n
local t = redis.call("TIME")
local now_ms = tonumber(t[1]) * 1000 + math.floor(tonumber(t[2]) / 1000)
local raw = redis.call("HGETALL", K.core_key)
if #raw == 0 then return err("execution_not_found") end
local core = hgetall_to_table(raw)
if core.lifecycle_phase ~= "active" then
return err("execution_not_reclaimable")
end
if core.ownership_state ~= "lease_expired_reclaimable"
and core.ownership_state ~= "lease_revoked" then
return err("execution_not_reclaimable")
end
if redis.call("EXISTS", K.claim_grant) == 1 then
return err("grant_already_exists")
end
local required_set, req_err = parse_capability_csv(
core.required_capabilities or "", "required")
if req_err then return req_err end
local worker_set, wrk_err = parse_capability_csv(
A.worker_capabilities_csv, "worker")
if wrk_err then return wrk_err end
if next(required_set) ~= nil then
local missing = missing_capabilities(required_set, worker_set)
if missing ~= "" then
redis.call("HSET", K.core_key,
"last_capability_mismatch_at", tostring(now_ms))
return err("capability_mismatch", missing)
end
end
local grant_expires_at = now_ms + A.grant_ttl_ms
redis.call("HSET", K.claim_grant,
"worker_id", A.worker_id,
"worker_instance_id", A.worker_instance_id,
"lane_id", A.lane_id,
"capability_hash", A.capability_hash,
"route_snapshot_json", A.route_snapshot_json,
"admission_summary", A.admission_summary,
"created_at", tostring(now_ms),
"grant_expires_at", tostring(grant_expires_at))
redis.call("PEXPIREAT", K.claim_grant, grant_expires_at)
return ok(A.execution_id)
end)
redis.register_function('ff_suspend_execution', function(keys, args)
local K = {
core_key = keys[1],
attempt_record = keys[2],
lease_current_key = keys[3],
lease_history_key = keys[4],
lease_expiry_key = keys[5],
worker_leases_key = keys[6],
suspension_current = keys[7],
waitpoint_hash = keys[8],
waitpoint_signals = keys[9],
suspension_timeout_key = keys[10],
pending_wp_expiry_key = keys[11],
active_index_key = keys[12],
suspended_zset = keys[13],
waitpoint_history = keys[14],
wp_condition = keys[15],
attempt_timeout_key = keys[16],
hmac_secrets = keys[17],
}
local A = {
execution_id = args[1],
attempt_index = args[2],
attempt_id = args[3] or "",
lease_id = args[4] or "",
lease_epoch = args[5] or "",
suspension_id = args[6],
waitpoint_id = args[7],
waitpoint_key = args[8],
reason_code = args[9],
requested_by = args[10],
timeout_at = args[11] or "",
resume_condition_json = args[12],
resume_policy_json = args[13],
continuation_metadata_ptr = args[14] or "",
use_pending_waitpoint = args[15] or "",
timeout_behavior = args[16] or "fail",
lease_history_maxlen = tonumber(args[17] or "1000"),
}
local t = redis.call("TIME")
local now_ms = tonumber(t[1]) * 1000 + math.floor(tonumber(t[2]) / 1000)
local raw = redis.call("HGETALL", K.core_key)
if #raw == 0 then return err("execution_not_found") end
local core = hgetall_to_table(raw)
local fence, must_check_or_err = resolve_lease_fence(core, A)
if not fence then return must_check_or_err end
if not must_check_or_err then return err("fence_required") end
local lease_err = validate_lease_and_mark_expired(
core, A, now_ms, K, A.lease_history_maxlen)
if lease_err then return lease_err end
if tostring(core.current_attempt_index) ~= A.attempt_index then
return err("invalid_lease_for_suspend")
end
if redis.call("EXISTS", K.suspension_current) == 1 then
local closed = redis.call("HGET", K.suspension_current, "closed_at")
if not is_set(closed) then
return err("already_suspended")
end
local old_wp = redis.call("HGET", K.suspension_current, "waitpoint_id")
if is_set(old_wp) then
redis.call("SADD", K.waitpoint_history, old_wp)
end
redis.call("DEL", K.suspension_current)
end
local waitpoint_id = A.waitpoint_id
local waitpoint_key = A.waitpoint_key
local waitpoint_token = ""
if A.use_pending_waitpoint == "1" then
local wp_raw = redis.call("HGETALL", K.waitpoint_hash)
local wp_err = validate_pending_waitpoint(wp_raw, A.execution_id, A.attempt_index, now_ms)
if wp_err then return wp_err end
local wp = hgetall_to_table(wp_raw)
waitpoint_id = wp.waitpoint_id
waitpoint_key = wp.waitpoint_key
if not is_set(wp.waitpoint_token) then
return err("waitpoint_not_token_bound")
end
waitpoint_token = wp.waitpoint_token
redis.call("HSET", K.waitpoint_hash,
"suspension_id", A.suspension_id,
"state", "active",
"activated_at", tostring(now_ms),
"expires_at", is_set(A.timeout_at) and A.timeout_at or "")
redis.call("ZREM", K.pending_wp_expiry_key, waitpoint_id)
local buffered = redis.call("XRANGE", K.waitpoint_signals, "-", "+")
if #buffered > 0 then
local wp_cond = initialize_condition(A.resume_condition_json)
for _, entry in ipairs(buffered) do
local fields = entry[2]
local sig_name = extract_field(fields, "signal_name")
local sig_id = extract_field(fields, "signal_id")
evaluate_signal_against_condition(wp_cond, sig_name, sig_id)
end
if is_condition_satisfied(wp_cond) then
redis.call("HSET", K.waitpoint_hash,
"state", "closed", "satisfied_at", tostring(now_ms),
"closed_at", tostring(now_ms), "close_reason", "resumed")
write_condition_hash(K.wp_condition, wp_cond, now_ms)
return ok_already_satisfied(A.suspension_id, waitpoint_id, waitpoint_key, waitpoint_token)
end
write_condition_hash(K.wp_condition, wp_cond, now_ms)
else
local wp_cond = initialize_condition(A.resume_condition_json)
write_condition_hash(K.wp_condition, wp_cond, now_ms)
end
else
local token, token_err = mint_waitpoint_token(
K.hmac_secrets, waitpoint_id, waitpoint_key, now_ms)
if not token then return err(token_err) end
waitpoint_token = token
redis.call("HSET", K.waitpoint_hash,
"waitpoint_id", waitpoint_id,
"execution_id", A.execution_id,
"attempt_index", A.attempt_index,
"suspension_id", A.suspension_id,
"waitpoint_key", waitpoint_key,
"waitpoint_token", waitpoint_token,
"state", "active",
"created_at", tostring(now_ms),
"activated_at", tostring(now_ms),
"expires_at", is_set(A.timeout_at) and A.timeout_at or "",
"signal_count", "0",
"matched_signal_count", "0",
"last_signal_at", "")
local wp_cond = initialize_condition(A.resume_condition_json)
write_condition_hash(K.wp_condition, wp_cond, now_ms)
end
redis.call("SADD", K.waitpoint_history, waitpoint_id)
redis.call("HSET", K.core_key,
"lifecycle_phase", "suspended",
"ownership_state", "unowned",
"eligibility_state", "not_applicable",
"blocking_reason", map_reason_to_blocking(A.reason_code),
"blocking_detail", "suspended: waitpoint " .. waitpoint_id .. " awaiting " .. A.reason_code,
"terminal_outcome", "none",
"attempt_state", "attempt_interrupted",
"public_state", "suspended",
"current_lease_id", "",
"current_worker_id", "",
"current_worker_instance_id", "",
"lease_expires_at", "",
"lease_last_renewed_at", "",
"lease_renewal_deadline", "",
"current_suspension_id", A.suspension_id,
"current_waitpoint_id", waitpoint_id,
"last_transition_at", tostring(now_ms),
"last_mutation_at", tostring(now_ms))
redis.call("HSET", K.attempt_record,
"attempt_state", "suspended",
"suspended_at", tostring(now_ms),
"suspension_id", A.suspension_id)
redis.call("DEL", K.lease_current_key)
redis.call("ZREM", K.lease_expiry_key, A.execution_id)
redis.call("SREM", K.worker_leases_key, A.execution_id)
redis.call("ZREM", K.active_index_key, A.execution_id)
redis.call("ZREM", K.attempt_timeout_key, A.execution_id)
redis.call("XADD", K.lease_history_key, "MAXLEN", "~", A.lease_history_maxlen, "*",
"event", "released",
"lease_id", A.lease_id,
"lease_epoch", A.lease_epoch,
"attempt_index", A.attempt_index,
"attempt_id", core.current_attempt_id or "",
"reason", "suspend",
"ts", tostring(now_ms))
redis.call("HSET", K.suspension_current,
"suspension_id", A.suspension_id,
"execution_id", A.execution_id,
"attempt_index", A.attempt_index,
"waitpoint_id", waitpoint_id,
"waitpoint_key", waitpoint_key,
"reason_code", A.reason_code,
"requested_by", A.requested_by,
"created_at", tostring(now_ms),
"timeout_at", A.timeout_at,
"timeout_behavior", A.timeout_behavior,
"resume_condition_json", A.resume_condition_json,
"resume_policy_json", A.resume_policy_json,
"continuation_metadata_pointer", A.continuation_metadata_ptr,
"buffered_signal_summary_json", initial_signal_summary_json(),
"last_signal_at", "",
"satisfied_at", "",
"closed_at", "",
"close_reason", "")
redis.call("ZADD", K.suspended_zset,
is_set(A.timeout_at) and tonumber(A.timeout_at) or 9999999999999,
A.execution_id)
if is_set(A.timeout_at) then
redis.call("ZADD", K.suspension_timeout_key, tonumber(A.timeout_at), A.execution_id)
end
return ok(A.suspension_id, waitpoint_id, waitpoint_key, waitpoint_token)
end)
redis.register_function('ff_resume_execution', function(keys, args)
local K = {
core_key = keys[1],
suspension_current = keys[2],
waitpoint_hash = keys[3],
waitpoint_signals = keys[4],
suspension_timeout_key = keys[5],
eligible_zset = keys[6],
delayed_zset = keys[7],
suspended_zset = keys[8],
}
local A = {
execution_id = args[1],
trigger_type = args[2] or "signal", resume_delay_ms = tonumber(args[3] or "0"),
}
local t = redis.call("TIME")
local now_ms = tonumber(t[1]) * 1000 + math.floor(tonumber(t[2]) / 1000)
local raw = redis.call("HGETALL", K.core_key)
if #raw == 0 then return err("execution_not_found") end
local core = hgetall_to_table(raw)
if core.lifecycle_phase ~= "suspended" then
return err("execution_not_suspended")
end
local susp_raw = redis.call("HGETALL", K.suspension_current)
local susp_err, susp = assert_active_suspension(susp_raw)
if susp_err then return susp_err end
local eligibility_state = "eligible_now"
local blocking_reason = "waiting_for_worker"
local blocking_detail = ""
local public_state = "waiting"
if A.resume_delay_ms > 0 then
eligibility_state = "not_eligible_until_time"
blocking_reason = "waiting_for_resume_delay"
blocking_detail = "resume delay " .. tostring(A.resume_delay_ms) .. "ms"
public_state = "delayed"
end
redis.call("HSET", K.core_key,
"lifecycle_phase", "runnable",
"ownership_state", "unowned",
"eligibility_state", eligibility_state,
"blocking_reason", blocking_reason,
"blocking_detail", blocking_detail,
"terminal_outcome", "none",
"attempt_state", "attempt_interrupted",
"public_state", public_state,
"current_suspension_id", "",
"current_waitpoint_id", "",
"last_transition_at", tostring(now_ms),
"last_mutation_at", tostring(now_ms))
redis.call("ZREM", K.suspended_zset, A.execution_id)
if A.resume_delay_ms > 0 then
redis.call("ZADD", K.delayed_zset,
now_ms + A.resume_delay_ms, A.execution_id)
else
local priority = tonumber(core.priority or "0")
local created_at = tonumber(core.created_at or "0")
local score = 0 - (priority * 1000000000000) + created_at
redis.call("ZADD", K.eligible_zset, score, A.execution_id)
end
redis.call("HSET", K.waitpoint_hash,
"state", "closed",
"satisfied_at", tostring(now_ms),
"closed_at", tostring(now_ms),
"close_reason", "resumed")
redis.call("HSET", K.suspension_current,
"satisfied_at", tostring(now_ms),
"closed_at", tostring(now_ms),
"close_reason", "resumed")
redis.call("ZREM", K.suspension_timeout_key, A.execution_id)
return ok(public_state)
end)
redis.register_function('ff_create_pending_waitpoint', function(keys, args)
local K = {
core_key = keys[1],
waitpoint_hash = keys[2],
pending_wp_expiry_key = keys[3],
hmac_secrets = keys[4],
}
local A = {
execution_id = args[1],
attempt_index = args[2],
waitpoint_id = args[3],
waitpoint_key = args[4],
expires_at = args[5],
}
local t = redis.call("TIME")
local now_ms = tonumber(t[1]) * 1000 + math.floor(tonumber(t[2]) / 1000)
local raw = redis.call("HGETALL", K.core_key)
if #raw == 0 then return err("execution_not_found") end
local core = hgetall_to_table(raw)
if core.lifecycle_phase ~= "active" then
return err("execution_not_active",
core.terminal_outcome or "",
core.current_lease_epoch or "",
core.lifecycle_phase or "",
core.current_attempt_id or "")
end
if core.ownership_state ~= "leased" then
return err("no_active_lease")
end
if tostring(core.current_attempt_index) ~= A.attempt_index then
return err("stale_lease")
end
if redis.call("EXISTS", K.waitpoint_hash) == 1 then
local existing_state = redis.call("HGET", K.waitpoint_hash, "state")
if existing_state == "pending" or existing_state == "active" then
return err("waitpoint_already_exists")
end
end
local waitpoint_token, token_err = mint_waitpoint_token(
K.hmac_secrets, A.waitpoint_id, A.waitpoint_key, now_ms)
if not waitpoint_token then return err(token_err) end
redis.call("HSET", K.waitpoint_hash,
"waitpoint_id", A.waitpoint_id,
"execution_id", A.execution_id,
"attempt_index", A.attempt_index,
"suspension_id", "",
"waitpoint_key", A.waitpoint_key,
"waitpoint_token", waitpoint_token,
"state", "pending",
"created_at", tostring(now_ms),
"activated_at", "",
"satisfied_at", "",
"closed_at", "",
"expires_at", A.expires_at,
"close_reason", "",
"signal_count", "0",
"matched_signal_count", "0",
"last_signal_at", "")
redis.call("ZADD", K.pending_wp_expiry_key,
tonumber(A.expires_at), A.waitpoint_id)
return ok(A.waitpoint_id, A.waitpoint_key, waitpoint_token)
end)
redis.register_function('ff_expire_suspension', function(keys, args)
local K = {
core_key = keys[1],
suspension_current = keys[2],
waitpoint_hash = keys[3],
wp_condition = keys[4],
attempt_hash = keys[5],
stream_meta = keys[6],
suspension_timeout_key = keys[7],
suspended_zset = keys[8],
terminal_key = keys[9],
eligible_zset = keys[10],
delayed_zset = keys[11],
lease_history_key = keys[12],
}
local A = {
execution_id = args[1],
}
local t = redis.call("TIME")
local now_ms = tonumber(t[1]) * 1000 + math.floor(tonumber(t[2]) / 1000)
local raw = redis.call("HGETALL", K.core_key)
if #raw == 0 then
redis.call("ZREM", K.suspension_timeout_key, A.execution_id)
return ok("not_found_cleaned")
end
local core = hgetall_to_table(raw)
if core.lifecycle_phase ~= "suspended" then
redis.call("ZREM", K.suspension_timeout_key, A.execution_id)
return ok("not_suspended_cleaned")
end
local susp_raw = redis.call("HGETALL", K.suspension_current)
local susp_err, susp = assert_active_suspension(susp_raw)
if susp_err then
redis.call("ZREM", K.suspension_timeout_key, A.execution_id)
return ok("no_active_suspension_cleaned")
end
local timeout_at = tonumber(susp.timeout_at or "0")
if timeout_at == 0 or timeout_at > now_ms then
return ok("not_yet_due")
end
local behavior = susp.timeout_behavior or "fail"
if behavior == "auto_resume" or behavior == "auto_resume_with_timeout_signal" then
local priority = tonumber(core.priority or "0")
local created_at = tonumber(core.created_at or "0")
local score = 0 - (priority * 1000000000000) + created_at
redis.call("HSET", K.core_key,
"lifecycle_phase", "runnable",
"ownership_state", "unowned",
"eligibility_state", "eligible_now",
"blocking_reason", "waiting_for_worker",
"blocking_detail", "",
"terminal_outcome", "none",
"attempt_state", "attempt_interrupted",
"public_state", "waiting",
"current_suspension_id", "",
"current_waitpoint_id", "",
"last_transition_at", tostring(now_ms),
"last_mutation_at", tostring(now_ms))
redis.call("ZREM", K.suspended_zset, A.execution_id)
redis.call("ZADD", K.eligible_zset, score, A.execution_id)
redis.call("HSET", K.waitpoint_hash,
"state", "closed",
"closed_at", tostring(now_ms),
"close_reason", "timed_out_auto_resume")
redis.call("HSET", K.wp_condition,
"closed", "1",
"closed_at", tostring(now_ms),
"closed_reason", "timed_out_auto_resume")
redis.call("HSET", K.suspension_current,
"closed_at", tostring(now_ms),
"close_reason", "timed_out_auto_resume")
redis.call("ZREM", K.suspension_timeout_key, A.execution_id)
return ok("auto_resume", "waiting")
elseif behavior == "escalate" then
redis.call("HSET", K.core_key,
"lifecycle_phase", "suspended", "ownership_state", "unowned", "eligibility_state", "not_applicable", "blocking_reason", "paused_by_operator",
"blocking_detail", "suspension escalated: timeout at " .. tostring(timeout_at),
"terminal_outcome", "none", "attempt_state", core.attempt_state or "attempt_interrupted", "public_state", "suspended", "last_mutation_at", tostring(now_ms))
redis.call("HSET", K.suspension_current,
"reason_code", "waiting_for_operator_review",
"timeout_at", "",
"timeout_behavior", "")
redis.call("ZREM", K.suspension_timeout_key, A.execution_id)
return ok("escalate", "suspended")
else
local terminal_outcome
local public_state_val
local close_reason
if behavior == "cancel" then
terminal_outcome = "cancelled"
public_state_val = "cancelled"
close_reason = "timed_out_cancel"
elseif behavior == "expire" then
terminal_outcome = "expired"
public_state_val = "expired"
close_reason = "timed_out_expire"
else
terminal_outcome = "failed"
public_state_val = "failed"
close_reason = "timed_out_fail"
end
redis.call("HSET", K.core_key,
"lifecycle_phase", "terminal",
"ownership_state", "unowned",
"eligibility_state", "not_applicable",
"blocking_reason", "none",
"blocking_detail", "",
"terminal_outcome", terminal_outcome,
"attempt_state", "attempt_terminal",
"public_state", public_state_val,
"failure_reason", "suspension_timeout:" .. behavior,
"completed_at", tostring(now_ms),
"current_suspension_id", "",
"current_waitpoint_id", "",
"last_transition_at", tostring(now_ms),
"last_mutation_at", tostring(now_ms))
if is_set(core.current_attempt_index) then
local att_end_state = "ended_failure"
if behavior == "cancel" then
att_end_state = "ended_cancelled"
end
redis.call("HSET", K.attempt_hash,
"attempt_state", att_end_state,
"ended_at", tostring(now_ms),
"failure_reason", "suspension_timeout:" .. behavior,
"suspended_at", "",
"suspension_id", "")
end
if redis.call("EXISTS", K.stream_meta) == 1 then
redis.call("HSET", K.stream_meta,
"closed_at", tostring(now_ms),
"closed_reason", "suspension_timeout")
end
redis.call("HSET", K.waitpoint_hash,
"state", "closed",
"closed_at", tostring(now_ms),
"close_reason", close_reason)
redis.call("HSET", K.wp_condition,
"closed", "1",
"closed_at", tostring(now_ms),
"closed_reason", close_reason)
redis.call("HSET", K.suspension_current,
"closed_at", tostring(now_ms),
"close_reason", close_reason)
redis.call("ZREM", K.suspension_timeout_key, A.execution_id)
redis.call("ZREM", K.suspended_zset, A.execution_id)
redis.call("ZADD", K.terminal_key, now_ms, A.execution_id)
if is_set(core.flow_id) then
local payload = cjson.encode({
execution_id = A.execution_id,
flow_id = core.flow_id,
outcome = terminal_outcome,
})
redis.call("PUBLISH", "ff:dag:completions", payload)
end
return ok(behavior, public_state_val)
end
end)
redis.register_function('ff_close_waitpoint', function(keys, args)
local K = {
core_key = keys[1],
waitpoint_hash = keys[2],
pending_wp_expiry_key = keys[3],
}
local A = {
waitpoint_id = args[1],
reason = args[2] or "proactive_close",
}
local t = redis.call("TIME")
local now_ms = tonumber(t[1]) * 1000 + math.floor(tonumber(t[2]) / 1000)
local wp_raw = redis.call("HGETALL", K.waitpoint_hash)
if #wp_raw == 0 then
return err("waitpoint_not_found")
end
local wp = hgetall_to_table(wp_raw)
if wp.state ~= "pending" and wp.state ~= "active" then
if wp.state == "closed" or wp.state == "expired" then
return ok("already_closed")
end
return err("waitpoint_not_open")
end
redis.call("HSET", K.waitpoint_hash,
"state", "closed",
"closed_at", tostring(now_ms),
"close_reason", A.reason)
redis.call("ZREM", K.pending_wp_expiry_key, A.waitpoint_id)
return ok()
end)
redis.register_function('ff_rotate_waitpoint_hmac_secret', function(keys, args)
local hmac_key = keys[1]
local new_kid = args[1]
local new_secret_hex = args[2]
local grace_ms_s = args[3]
if type(new_kid) ~= "string" or new_kid == "" or new_kid:find(":", 1, true) then
return err("invalid_kid")
end
if type(new_secret_hex) ~= "string"
or new_secret_hex == ""
or #new_secret_hex % 2 ~= 0
or new_secret_hex:find("[^0-9a-fA-F]") then
return err("invalid_secret_hex")
end
local grace_ms = tonumber(grace_ms_s)
if not grace_ms
or grace_ms ~= grace_ms or grace_ms < 0
or grace_ms > 9007199254740991 or grace_ms ~= math.floor(grace_ms) then
return err("invalid_grace_ms")
end
local now_ms = server_time_ms()
local prev_expires_at = now_ms + grace_ms
local current_kid = redis.call("HGET", hmac_key, "current_kid")
if current_kid == false then current_kid = nil end
if current_kid == new_kid then
local stored = redis.call("HGET", hmac_key, "secret:" .. new_kid)
if stored == false then stored = nil end
if stored == new_secret_hex then
return ok("noop", new_kid)
elseif stored then
return err("rotation_conflict", new_kid)
else
redis.call("HSET", hmac_key, "secret:" .. new_kid, new_secret_hex)
return ok("noop", new_kid)
end
end
local raw = redis.call("HGETALL", hmac_key)
local expired_fields = {}
local gc_count = 0
for i = 1, #raw, 2 do
local field = raw[i]
local value = raw[i + 1]
if field:sub(1, 11) == "expires_at:" then
local kid = field:sub(12)
local exp = tonumber(value)
if not exp or exp <= 0 or exp < now_ms then
expired_fields[#expired_fields + 1] = "expires_at:" .. kid
expired_fields[#expired_fields + 1] = "secret:" .. kid
gc_count = gc_count + 1
end
end
end
if #expired_fields > 0 then
redis.call("HDEL", hmac_key, unpack(expired_fields))
end
local prev_expires_str = tostring(prev_expires_at)
if current_kid then
redis.call("HSET", hmac_key,
"previous_kid", current_kid,
"previous_expires_at", prev_expires_str,
"expires_at:" .. current_kid, prev_expires_str,
"current_kid", new_kid,
"secret:" .. new_kid, new_secret_hex)
else
redis.call("HSET", hmac_key,
"current_kid", new_kid,
"secret:" .. new_kid, new_secret_hex)
end
return ok("rotated", current_kid or "", new_kid, tostring(gc_count))
end)
redis.register_function('ff_list_waitpoint_hmac_kids', function(keys, args)
local hmac_key = keys[1]
local raw = redis.call("HGETALL", hmac_key)
local now_ms = server_time_ms()
local current_kid = ""
local pairs_out = {}
local n = 0
for i = 1, #raw, 2 do
local field = raw[i]
local value = raw[i + 1]
if field == "current_kid" then
current_kid = value
elseif field:sub(1, 11) == "expires_at:" then
local kid = field:sub(12)
local exp = tonumber(value)
if exp and exp > 0 and exp >= now_ms then
pairs_out[#pairs_out + 1] = kid
pairs_out[#pairs_out + 1] = value
n = n + 1
end
end
end
return ok(current_kid, tostring(n), unpack(pairs_out))
end)
redis.register_function('ff_deliver_signal', function(keys, args)
local K = {
core_key = keys[1],
wp_condition = keys[2],
wp_signals_stream = keys[3],
exec_signals_zset = keys[4],
signal_hash = keys[5],
signal_payload = keys[6],
idem_key = keys[7],
waitpoint_hash = keys[8],
suspension_current = keys[9],
eligible_zset = keys[10],
suspended_zset = keys[11],
delayed_zset = keys[12],
suspension_timeout_zset = keys[13],
hmac_secrets = keys[14],
}
local A = {
signal_id = args[1],
execution_id = args[2],
waitpoint_id = args[3],
signal_name = args[4],
signal_category = args[5],
source_type = args[6],
source_identity = args[7],
payload = args[8] or "",
payload_encoding = args[9] or "json",
idempotency_key = args[10] or "",
correlation_id = args[11] or "",
target_scope = args[12] or "waitpoint",
created_at = args[13] or "",
dedup_ttl_ms = tonumber(args[14] or "86400000"),
resume_delay_ms = tonumber(args[15] or "0"),
signal_maxlen = tonumber(args[16] or "1000"),
max_signals = tonumber(args[17] or "10000"),
waitpoint_token = args[18] or "",
}
local t = redis.call("TIME")
local now_ms = tonumber(t[1]) * 1000 + math.floor(tonumber(t[2]) / 1000)
local raw = redis.call("HGETALL", K.core_key)
if #raw == 0 then
return err("execution_not_found")
end
local core = hgetall_to_table(raw)
local wp_for_auth_raw = redis.call("HGETALL", K.waitpoint_hash)
if #wp_for_auth_raw == 0 then
return err("invalid_token")
end
local wp_for_auth = hgetall_to_table(wp_for_auth_raw)
if not wp_for_auth.created_at then
return err("invalid_token")
end
local token_err = validate_waitpoint_token(
K.hmac_secrets, A.waitpoint_token,
A.waitpoint_id, wp_for_auth.waitpoint_key or "",
tonumber(wp_for_auth.created_at) or 0, now_ms)
if token_err then
redis.call("HSET", K.core_key, "last_hmac_validation_failed_at", tostring(now_ms))
return err(token_err)
end
local lp = core.lifecycle_phase
if lp == "terminal" then
return err("target_not_signalable")
end
if lp == "active" or lp == "runnable" or lp == "submitted" then
if wp_for_auth.state == "pending" then
return err("waitpoint_pending_use_buffer_script")
end
if wp_for_auth.state ~= "active" then
return err("target_not_signalable")
end
end
local cond_raw = redis.call("HGETALL", K.wp_condition)
if #cond_raw == 0 then
return err("waitpoint_not_found")
end
local wp_cond = hgetall_to_table(cond_raw)
if wp_cond.closed == "1" then
return err("waitpoint_closed")
end
if A.max_signals > 0 then
local current_count = redis.call("ZCARD", K.exec_signals_zset)
if current_count >= A.max_signals then
return err("signal_limit_exceeded")
end
end
local dedup_ms = A.dedup_ttl_ms or 0
if A.idempotency_key ~= "" and dedup_ms > 0 then
local existing = redis.call("GET", K.idem_key)
if existing then
return ok_duplicate(existing)
end
redis.call("SET", K.idem_key, A.signal_id,
"PX", dedup_ms, "NX")
end
local created_at = A.created_at ~= "" and A.created_at or tostring(now_ms)
redis.call("HSET", K.signal_hash,
"signal_id", A.signal_id,
"target_execution_id", A.execution_id,
"target_waitpoint_id", A.waitpoint_id,
"target_scope", A.target_scope,
"signal_name", A.signal_name,
"signal_category", A.signal_category,
"source_type", A.source_type,
"source_identity", A.source_identity,
"correlation_id", A.correlation_id,
"idempotency_key", A.idempotency_key,
"created_at", created_at,
"accepted_at", tostring(now_ms),
"matched_waitpoint_id", A.waitpoint_id,
"payload_encoding", A.payload_encoding)
if A.payload ~= "" then
redis.call("SET", K.signal_payload, A.payload)
end
redis.call("XADD", K.wp_signals_stream, "MAXLEN", "~",
tostring(A.signal_maxlen), "*",
"signal_id", A.signal_id,
"signal_name", A.signal_name,
"signal_category", A.signal_category,
"source_type", A.source_type,
"source_identity", A.source_identity,
"matched", "0",
"accepted_at", tostring(now_ms))
redis.call("ZADD", K.exec_signals_zset, now_ms, A.signal_id)
local effect = "appended_to_waitpoint"
local matched = false
local total = tonumber(wp_cond.total_matchers or "0")
for i = 0, total - 1 do
local sat_key = "matcher:" .. i .. ":satisfied"
local name_key = "matcher:" .. i .. ":name"
if wp_cond[sat_key] == "0" then
local matcher_name = wp_cond[name_key] or ""
if matcher_name == "" or matcher_name == A.signal_name then
redis.call("HSET", K.wp_condition,
sat_key, "1",
"matcher:" .. i .. ":signal_id", A.signal_id)
matched = true
local new_sat = tonumber(wp_cond.satisfied_count or "0") + 1
redis.call("HSET", K.wp_condition, "satisfied_count", tostring(new_sat))
local mode = wp_cond.match_mode or "any"
local min_count = tonumber(wp_cond.minimum_signal_count or "1")
local resume = false
if mode == "any" then
resume = (new_sat >= min_count)
elseif mode == "all" then
resume = (new_sat >= total)
else
resume = (new_sat >= min_count)
end
if resume then
effect = "resume_condition_satisfied"
if lp == "suspended" then
local es, br, bd, ps
if A.resume_delay_ms > 0 then
es = "not_eligible_until_time"
br = "waiting_for_resume_delay"
bd = "resume delay " .. A.resume_delay_ms .. "ms after signal " .. A.signal_name
ps = "delayed"
else
es = "eligible_now"
br = "waiting_for_worker"
bd = ""
ps = "waiting"
end
redis.call("HSET", K.core_key,
"lifecycle_phase", "runnable",
"ownership_state", "unowned",
"eligibility_state", es,
"blocking_reason", br,
"blocking_detail", bd,
"terminal_outcome", "none",
"attempt_state", "attempt_interrupted",
"public_state", ps,
"current_suspension_id", "",
"current_waitpoint_id", "",
"last_transition_at", tostring(now_ms),
"last_mutation_at", tostring(now_ms))
local priority = tonumber(core.priority or "0")
local created_at_exec = tonumber(core.created_at or "0")
redis.call("ZREM", K.suspended_zset, A.execution_id)
if A.resume_delay_ms > 0 then
redis.call("ZADD", K.delayed_zset,
now_ms + A.resume_delay_ms, A.execution_id)
else
redis.call("ZADD", K.eligible_zset,
0 - (priority * 1000000000000) + created_at_exec,
A.execution_id)
end
end
redis.call("HSET", K.wp_condition,
"closed", "1",
"closed_at", tostring(now_ms),
"closed_reason", "satisfied")
redis.call("HSET", K.waitpoint_hash,
"state", "closed",
"satisfied_at", tostring(now_ms),
"closed_at", tostring(now_ms),
"close_reason", "resumed")
if redis.call("EXISTS", K.suspension_current) == 1 then
redis.call("HSET", K.suspension_current,
"satisfied_at", tostring(now_ms),
"closed_at", tostring(now_ms),
"close_reason", "resumed")
end
redis.call("ZREM", K.suspension_timeout_zset, A.execution_id)
end
break
end
end
end
if not matched then
effect = "no_op"
end
redis.call("HSET", K.signal_hash, "observed_effect", effect)
redis.call("HINCRBY", K.waitpoint_hash, "signal_count", 1)
if matched then
redis.call("HINCRBY", K.waitpoint_hash, "matched_signal_count", 1)
end
redis.call("HSET", K.waitpoint_hash, "last_signal_at", tostring(now_ms))
if redis.call("EXISTS", K.suspension_current) == 1 then
redis.call("HSET", K.suspension_current, "last_signal_at", tostring(now_ms))
end
return ok(A.signal_id, effect)
end)
redis.register_function('ff_buffer_signal_for_pending_waitpoint', function(keys, args)
local K = {
core_key = keys[1],
wp_condition = keys[2],
wp_signals_stream = keys[3],
exec_signals_zset = keys[4],
signal_hash = keys[5],
signal_payload = keys[6],
idem_key = keys[7],
waitpoint_hash = keys[8],
hmac_secrets = keys[9],
}
local A = {
signal_id = args[1],
execution_id = args[2],
waitpoint_id = args[3],
signal_name = args[4],
signal_category = args[5],
source_type = args[6],
source_identity = args[7],
payload = args[8] or "",
payload_encoding = args[9] or "json",
idempotency_key = args[10] or "",
correlation_id = args[11] or "",
target_scope = args[12] or "waitpoint",
created_at = args[13] or "",
dedup_ttl_ms = tonumber(args[14] or "86400000"),
signal_maxlen = tonumber(args[16] or "1000"),
max_signals = tonumber(args[17] or "10000"),
waitpoint_token = args[18] or "",
}
local t = redis.call("TIME")
local now_ms = tonumber(t[1]) * 1000 + math.floor(tonumber(t[2]) / 1000)
local raw = redis.call("HGETALL", K.core_key)
if #raw == 0 then
return err("execution_not_found")
end
local wp_for_auth = hgetall_to_table(redis.call("HGETALL", K.waitpoint_hash))
if not wp_for_auth.created_at then
return err("waitpoint_not_found")
end
local token_err = validate_waitpoint_token(
K.hmac_secrets, A.waitpoint_token,
A.waitpoint_id, wp_for_auth.waitpoint_key or "",
tonumber(wp_for_auth.created_at) or 0, now_ms)
if token_err then
redis.call("HSET", K.core_key, "last_hmac_validation_failed_at", tostring(now_ms))
return err(token_err)
end
if wp_for_auth.state == "closed" or wp_for_auth.state == "expired" then
return err("waitpoint_closed")
end
if A.max_signals > 0 then
local current_count = redis.call("ZCARD", K.exec_signals_zset)
if current_count >= A.max_signals then
return err("signal_limit_exceeded")
end
end
local dedup_ms = A.dedup_ttl_ms or 0
if A.idempotency_key ~= "" and dedup_ms > 0 then
local existing = redis.call("GET", K.idem_key)
if existing then
return ok_duplicate(existing)
end
redis.call("SET", K.idem_key, A.signal_id,
"PX", dedup_ms, "NX")
end
local created_at = A.created_at ~= "" and A.created_at or tostring(now_ms)
redis.call("HSET", K.signal_hash,
"signal_id", A.signal_id,
"target_execution_id", A.execution_id,
"target_waitpoint_id", A.waitpoint_id,
"target_scope", A.target_scope,
"signal_name", A.signal_name,
"signal_category", A.signal_category,
"source_type", A.source_type,
"source_identity", A.source_identity,
"correlation_id", A.correlation_id,
"idempotency_key", A.idempotency_key,
"created_at", created_at,
"accepted_at", tostring(now_ms),
"matched_waitpoint_id", A.waitpoint_id,
"payload_encoding", A.payload_encoding,
"observed_effect", "buffered_for_pending_waitpoint")
if A.payload ~= "" then
redis.call("SET", K.signal_payload, A.payload)
end
redis.call("XADD", K.wp_signals_stream, "MAXLEN", "~",
tostring(A.signal_maxlen), "*",
"signal_id", A.signal_id,
"signal_name", A.signal_name,
"signal_category", A.signal_category,
"source_type", A.source_type,
"source_identity", A.source_identity,
"matched", "0",
"accepted_at", tostring(now_ms))
redis.call("ZADD", K.exec_signals_zset, now_ms, A.signal_id)
return ok(A.signal_id, "buffered_for_pending_waitpoint")
end)
redis.register_function('ff_claim_resumed_execution', function(keys, args)
local K = {
core_key = keys[1],
claim_grant_key = keys[2],
eligible_zset = keys[3],
lease_expiry_key = keys[4],
worker_leases_key = keys[5],
attempt_hash = keys[6],
lease_current_key = keys[7],
lease_history_key = keys[8],
active_index_key = keys[9],
attempt_timeout_key = keys[10],
execution_deadline_key = keys[11],
}
local lease_ttl_n = require_number(args[7], "lease_ttl_ms")
if type(lease_ttl_n) == "table" then return lease_ttl_n end
local A = {
execution_id = args[1],
worker_id = args[2],
worker_instance_id = args[3],
lane = args[4],
capability_snapshot_hash = args[5] or "",
lease_id = args[6],
lease_ttl_ms = lease_ttl_n,
remaining_attempt_timeout_ms = args[8] or "",
}
local t = redis.call("TIME")
local now_ms = tonumber(t[1]) * 1000 + math.floor(tonumber(t[2]) / 1000)
local raw = redis.call("HGETALL", K.core_key)
if #raw == 0 then return err("execution_not_found") end
local core = hgetall_to_table(raw)
if core.lifecycle_phase ~= "runnable" then
return err("execution_not_leaseable")
end
if core.attempt_state ~= "attempt_interrupted" then
return err("not_a_resumed_execution")
end
local grant_raw = redis.call("HGETALL", K.claim_grant_key)
if #grant_raw == 0 then
return err("invalid_claim_grant")
end
local grant = hgetall_to_table(grant_raw)
if grant.worker_id ~= A.worker_id then
return err("invalid_claim_grant")
end
if is_set(grant.grant_expires_at) and tonumber(grant.grant_expires_at) < now_ms then
redis.call("DEL", K.claim_grant_key)
return err("claim_grant_expired")
end
redis.call("DEL", K.claim_grant_key)
local att_idx = core.current_attempt_index
local att_id = core.current_attempt_id
local epoch = tonumber(core.current_lease_epoch or "0") + 1
local expires_at = now_ms + A.lease_ttl_ms
local renewal_deadline = now_ms + math.floor(A.lease_ttl_ms * 2 / 3)
redis.call("HSET", K.attempt_hash,
"attempt_state", "started",
"resumed_at", tostring(now_ms),
"lease_id", A.lease_id,
"lease_epoch", tostring(epoch),
"worker_id", A.worker_id,
"worker_instance_id", A.worker_instance_id,
"suspended_at", "",
"suspension_id", "")
redis.call("DEL", K.lease_current_key)
redis.call("HSET", K.lease_current_key,
"lease_id", A.lease_id,
"lease_epoch", tostring(epoch),
"execution_id", A.execution_id,
"attempt_id", att_id,
"worker_id", A.worker_id,
"worker_instance_id", A.worker_instance_id,
"acquired_at", tostring(now_ms),
"expires_at", tostring(expires_at),
"last_renewed_at", tostring(now_ms),
"renewal_deadline", tostring(renewal_deadline))
redis.call("HSET", K.core_key,
"lifecycle_phase", "active",
"ownership_state", "leased",
"eligibility_state", "not_applicable",
"blocking_reason", "none",
"blocking_detail", "",
"terminal_outcome", "none",
"attempt_state", "running_attempt",
"public_state", "active",
"current_lease_id", A.lease_id,
"current_lease_epoch", tostring(epoch),
"current_worker_id", A.worker_id,
"current_worker_instance_id", A.worker_instance_id,
"current_lane", A.lane,
"lease_acquired_at", tostring(now_ms),
"lease_expires_at", tostring(expires_at),
"lease_last_renewed_at", tostring(now_ms),
"lease_renewal_deadline", tostring(renewal_deadline),
"lease_expired_at", "",
"lease_revoked_at", "",
"lease_revoke_reason", "",
"last_transition_at", tostring(now_ms),
"last_mutation_at", tostring(now_ms))
redis.call("ZREM", K.eligible_zset, A.execution_id)
redis.call("ZADD", K.lease_expiry_key, expires_at, A.execution_id)
redis.call("SADD", K.worker_leases_key, A.execution_id)
redis.call("ZADD", K.active_index_key, expires_at, A.execution_id)
if is_set(A.remaining_attempt_timeout_ms) then
local remaining = tonumber(A.remaining_attempt_timeout_ms)
if remaining > 0 then
redis.call("ZADD", K.attempt_timeout_key,
now_ms + remaining, A.execution_id)
end
end
redis.call("XADD", K.lease_history_key, "MAXLEN", "~", 1000, "*",
"event", "acquired",
"lease_id", A.lease_id,
"lease_epoch", tostring(epoch),
"attempt_index", att_idx,
"attempt_id", att_id,
"worker_id", A.worker_id,
"reason", "claim_resumed",
"ts", tostring(now_ms))
return ok(A.lease_id, tostring(epoch), tostring(expires_at),
att_id, att_idx, "resumed")
end)
redis.register_function('ff_append_frame', function(keys, args)
local K = {
core_key = keys[1],
stream_key = keys[2],
stream_meta = keys[3],
}
local A = {
execution_id = args[1],
attempt_index = args[2],
lease_id = args[3],
lease_epoch = args[4],
frame_type = args[5],
ts = args[6] or "",
payload = args[7] or "",
encoding = args[8] or "utf8",
correlation_id = args[9] or "",
source = args[10] or "worker",
retention_maxlen = tonumber(args[11] or "0"),
attempt_id = args[12] or "",
max_payload_bytes = tonumber(args[13] or "65536"),
}
if #A.payload > A.max_payload_bytes then
return err("retention_limit_exceeded")
end
local core = redis.call("HMGET", K.core_key,
"current_attempt_index", "current_lease_id", "current_lease_epoch", "lease_expires_at", "lifecycle_phase", "ownership_state")
if core[5] ~= "active" then
return err("stream_closed")
end
if core[6] == "lease_expired_reclaimable" or core[6] == "lease_revoked" then
return err("stale_owner_cannot_append")
end
local t = redis.call("TIME")
local now_ms = tonumber(t[1]) * 1000 + math.floor(tonumber(t[2]) / 1000)
if tonumber(core[4] or "0") <= now_ms then
return err("stale_owner_cannot_append")
end
if tostring(core[1]) ~= A.attempt_index then
return err("stale_owner_cannot_append")
end
if core[2] ~= A.lease_id or tostring(core[3]) ~= A.lease_epoch then
return err("stale_owner_cannot_append")
end
if redis.call("EXISTS", K.stream_meta) == 0 then
redis.call("HSET", K.stream_meta,
"stream_id", A.execution_id .. ":" .. A.attempt_index,
"execution_id", A.execution_id,
"attempt_id", A.attempt_id,
"attempt_index", A.attempt_index,
"created_at", tostring(now_ms),
"closed_at", "",
"closed_reason", "",
"durability_mode", "durable_full",
"retention_maxlen", tostring(A.retention_maxlen),
"last_sequence", "",
"frame_count", "0",
"total_bytes", "0",
"last_frame_at", "")
end
local closed = redis.call("HGET", K.stream_meta, "closed_at")
if is_set(closed) then
return err("stream_closed")
end
local ts = A.ts ~= "" and A.ts or tostring(now_ms)
local xadd_args = {
K.stream_key, "*",
"frame_type", A.frame_type,
"ts", ts,
"payload", A.payload,
"encoding", A.encoding,
"source", A.source,
}
if A.correlation_id ~= "" then
xadd_args[#xadd_args + 1] = "correlation_id"
xadd_args[#xadd_args + 1] = A.correlation_id
end
local entry_id = redis.call("XADD", unpack(xadd_args))
local frame_count = redis.call("HINCRBY", K.stream_meta, "frame_count", 1)
redis.call("HINCRBY", K.stream_meta, "total_bytes", #A.payload)
redis.call("HSET", K.stream_meta,
"last_sequence", entry_id,
"last_frame_at", tostring(now_ms))
local maxlen = A.retention_maxlen
local trim_op
if maxlen == 0 then
maxlen = 10000 trim_op = "~"
else
trim_op = "=" end
redis.call("XTRIM", K.stream_key, "MAXLEN", trim_op, maxlen)
return ok(entry_id, tostring(frame_count))
end)
redis.register_function('ff_read_attempt_stream', function(keys, args)
local stream_key = keys[1]
local stream_meta = keys[2]
local from_id = args[1] or "-"
local to_id = args[2] or "+"
local count_limit = tonumber(args[3] or "0")
local HARD_CAP = 10000
if count_limit == nil or count_limit < 1 then
return err("invalid_input", "count_limit must be >= 1")
end
if count_limit > HARD_CAP then
return err("invalid_input", "count_limit_exceeds_hard_cap")
end
local entries = redis.call("XRANGE", stream_key, from_id, to_id,
"COUNT", count_limit)
local meta = redis.call("HMGET", stream_meta, "closed_at", "closed_reason")
local closed_at = meta[1] or ""
local closed_reason = meta[2] or ""
return ok(entries, closed_at, closed_reason)
end)
redis.register_function('ff_create_budget', function(keys, args)
local K = {
def_key = keys[1],
limits_key = keys[2],
usage_key = keys[3],
resets_zset = keys[4],
policies_index = keys[5],
}
local A = {
budget_id = args[1],
scope_type = args[2],
scope_id = args[3],
enforcement_mode = args[4],
on_hard_limit = args[5],
on_soft_limit = args[6],
reset_interval_ms = args[7],
now_ms = args[8],
}
redis.call("SADD", K.policies_index, A.budget_id)
if redis.call("EXISTS", K.def_key) == 1 then
return ok_already_satisfied(A.budget_id)
end
local dim_count = require_number(args[9], "dim_count")
if type(dim_count) == "table" then return dim_count end
redis.call("HSET", K.def_key,
"budget_id", A.budget_id,
"scope_type", A.scope_type,
"scope_id", A.scope_id,
"enforcement_mode", A.enforcement_mode,
"on_hard_limit", A.on_hard_limit,
"on_soft_limit", A.on_soft_limit,
"reset_interval_ms", A.reset_interval_ms,
"breach_count", "0",
"soft_breach_count", "0",
"created_at", A.now_ms,
"last_updated_at", A.now_ms)
for i = 1, dim_count do
local dim = args[9 + i]
local hard = args[9 + dim_count + i]
local soft = args[9 + 2 * dim_count + i]
redis.call("HSET", K.limits_key, "hard:" .. dim, hard, "soft:" .. dim, soft)
end
local interval_ms = tonumber(A.reset_interval_ms)
if interval_ms > 0 then
local next_reset_at = tostring(tonumber(A.now_ms) + interval_ms)
redis.call("HSET", K.def_key, "next_reset_at", next_reset_at)
redis.call("ZADD", K.resets_zset, tonumber(next_reset_at), A.budget_id)
end
return ok(A.budget_id)
end)
redis.register_function('ff_report_usage_and_check', function(keys, args)
local K = {
usage_key = keys[1],
limits_key = keys[2],
def_key = keys[3],
}
local dim_count = require_number(args[1], "dim_count")
if type(dim_count) == "table" then return dim_count end
local now_ms = args[2 * dim_count + 2]
local dedup_key = args[2 * dim_count + 3] or ""
if dedup_key ~= "" then
local existing = redis.call("GET", dedup_key)
if existing then
return {1, "ALREADY_APPLIED"}
end
end
for i = 1, dim_count do
local dim = args[1 + i]
local delta = tonumber(args[1 + dim_count + i])
local current = tonumber(redis.call("HGET", K.usage_key, dim) or "0")
local new_total = current + delta
local hard_limit = redis.call("HGET", K.limits_key, "hard:" .. dim)
if hard_limit and hard_limit ~= "" and hard_limit ~= false then
local limit_val = tonumber(hard_limit)
if limit_val > 0 and new_total > limit_val then
redis.call("HINCRBY", K.def_key, "breach_count", 1)
redis.call("HSET", K.def_key,
"last_breach_at", now_ms,
"last_breach_dim", dim,
"last_updated_at", now_ms)
return {1, "HARD_BREACH", dim, tostring(current), tostring(hard_limit)}
end
end
end
local breached_soft = nil
for i = 1, dim_count do
local dim = args[1 + i]
local delta = tonumber(args[1 + dim_count + i])
local new_val = redis.call("HINCRBY", K.usage_key, dim, delta)
local soft_limit = redis.call("HGET", K.limits_key, "soft:" .. dim)
if soft_limit and soft_limit ~= "" and soft_limit ~= false then
local limit_val = tonumber(soft_limit)
if limit_val > 0 and new_val > limit_val then
if not breached_soft then
breached_soft = dim
end
end
end
end
redis.call("HSET", K.def_key, "last_updated_at", now_ms)
if dedup_key ~= "" then
redis.call("SET", dedup_key, "1", "PX", 86400000)
end
if breached_soft then
redis.call("HINCRBY", K.def_key, "soft_breach_count", 1)
local soft_val = tonumber(redis.call("HGET", K.limits_key, "soft:" .. breached_soft) or "0")
local cur_val = tonumber(redis.call("HGET", K.usage_key, breached_soft) or "0")
return {1, "SOFT_BREACH", breached_soft, tostring(cur_val), tostring(soft_val)}
end
return {1, "OK"}
end)
redis.register_function('ff_reset_budget', function(keys, args)
local K = {
def_key = keys[1],
usage_key = keys[2],
resets_zset = keys[3],
}
local A = {
budget_id = args[1],
now_ms = args[2],
}
local usage_fields = redis.call("HKEYS", K.usage_key)
if #usage_fields > 0 then
local zero_args = {}
for _, field in ipairs(usage_fields) do
zero_args[#zero_args + 1] = field
zero_args[#zero_args + 1] = "0"
end
redis.call("HSET", K.usage_key, unpack(zero_args))
end
redis.call("HINCRBY", K.def_key, "reset_count", 1)
redis.call("HSET", K.def_key,
"last_reset_at", A.now_ms,
"last_updated_at", A.now_ms,
"last_breach_at", "",
"last_breach_dim", "")
local interval_ms = tonumber(redis.call("HGET", K.def_key, "reset_interval_ms") or "0")
local next_reset_at = "0"
if interval_ms > 0 then
next_reset_at = tostring(tonumber(A.now_ms) + interval_ms)
redis.call("HSET", K.def_key, "next_reset_at", next_reset_at)
redis.call("ZADD", K.resets_zset, tonumber(next_reset_at), A.budget_id)
else
redis.call("ZREM", K.resets_zset, A.budget_id)
end
return ok(next_reset_at)
end)
redis.register_function('ff_unblock_execution', function(keys, args)
local K = {
core_key = keys[1],
blocked_zset = keys[2],
eligible_zset = keys[3],
}
local A = {
execution_id = args[1],
now_ms = args[2],
expected_blocking_reason = args[3] or "",
}
local raw = redis.call("HGETALL", K.core_key)
if #raw == 0 then return err("execution_not_found") end
local core = hgetall_to_table(raw)
if core.lifecycle_phase ~= "runnable" then
return err("execution_not_eligible")
end
local es = core.eligibility_state
if es ~= "blocked_by_budget" and es ~= "blocked_by_quota"
and es ~= "blocked_by_route" and es ~= "blocked_by_operator" then
return err("execution_not_eligible")
end
if A.expected_blocking_reason ~= "" then
if core.blocking_reason ~= A.expected_blocking_reason then
return err("execution_not_eligible")
end
end
local priority = tonumber(core.priority or "0")
local created_at = tonumber(core.created_at or "0")
local score = 0 - (priority * 1000000000000) + created_at
redis.call("HSET", K.core_key,
"lifecycle_phase", "runnable",
"ownership_state", "unowned",
"eligibility_state", "eligible_now",
"blocking_reason", "waiting_for_worker",
"blocking_detail", "",
"terminal_outcome", "none",
"attempt_state", core.attempt_state or "pending_first_attempt",
"public_state", "waiting",
"last_transition_at", A.now_ms,
"last_mutation_at", A.now_ms)
redis.call("ZREM", K.blocked_zset, A.execution_id)
redis.call("ZADD", K.eligible_zset, score, A.execution_id)
return ok("unblocked")
end)
redis.register_function('ff_block_execution_for_admission', function(keys, args)
local K = {
core_key = keys[1],
eligible_zset = keys[2],
blocked_zset = keys[3],
}
local A = {
execution_id = args[1],
blocking_reason = args[2],
blocking_detail = args[3] or "",
now_ms = args[4],
}
local raw = redis.call("HGETALL", K.core_key)
if #raw == 0 then return err("execution_not_found") end
local core = hgetall_to_table(raw)
if core.lifecycle_phase ~= "runnable" then
if core.lifecycle_phase == "terminal" then
return err("execution_not_active",
core.terminal_outcome or "",
core.current_lease_epoch or "",
"terminal",
core.current_attempt_id or "")
end
return err("execution_not_eligible")
end
local REASON_TO_ELIGIBILITY = {
waiting_for_budget = "blocked_by_budget",
waiting_for_quota = "blocked_by_quota",
waiting_for_capable_worker = "blocked_by_route",
paused_by_operator = "blocked_by_operator",
paused_by_policy = "blocked_by_lane_state",
}
local eligibility = REASON_TO_ELIGIBILITY[A.blocking_reason]
if not eligibility then
return err("invalid_blocking_reason")
end
redis.call("HSET", K.core_key,
"lifecycle_phase", "runnable",
"ownership_state", "unowned",
"eligibility_state", eligibility,
"blocking_reason", A.blocking_reason,
"blocking_detail", A.blocking_detail,
"terminal_outcome", "none",
"attempt_state", core.attempt_state or "pending_first_attempt",
"public_state", "rate_limited",
"last_transition_at", A.now_ms,
"last_mutation_at", A.now_ms)
redis.call("ZREM", K.eligible_zset, A.execution_id)
redis.call("ZADD", K.blocked_zset, tonumber(A.now_ms), A.execution_id)
return ok("blocked")
end)
redis.register_function('ff_create_quota_policy', function(keys, args)
local K = {
def_key = keys[1],
window_zset = keys[2],
concurrency_key = keys[3],
admitted_set = keys[4],
policies_index = keys[5],
}
local A = {
quota_policy_id = args[1],
window_seconds = args[2],
max_requests_per_window = args[3],
max_concurrent = args[4],
now_ms = args[5],
}
if redis.call("EXISTS", K.def_key) == 1 then
return ok_already_satisfied(A.quota_policy_id)
end
redis.call("HSET", K.def_key,
"quota_policy_id", A.quota_policy_id,
"requests_per_window_seconds", A.window_seconds,
"max_requests_per_window", A.max_requests_per_window,
"active_concurrency_cap", A.max_concurrent,
"created_at", A.now_ms)
redis.call("SET", K.concurrency_key, "0")
redis.call("SADD", K.policies_index, A.quota_policy_id)
return ok(A.quota_policy_id)
end)
redis.register_function('ff_check_admission_and_record', function(keys, args)
local K = {
window_zset = keys[1],
concurrency_key = keys[2],
quota_def = keys[3],
admitted_guard_key = keys[4],
admitted_set = keys[5],
}
local now_ms_n = require_number(args[1], "now_ms")
if type(now_ms_n) == "table" then return now_ms_n end
local window_seconds_n = require_number(args[2], "window_seconds")
if type(window_seconds_n) == "table" then return window_seconds_n end
local rate_limit_n = require_number(args[3], "rate_limit")
if type(rate_limit_n) == "table" then return rate_limit_n end
local concurrency_cap_n = require_number(args[4], "concurrency_cap")
if type(concurrency_cap_n) == "table" then return concurrency_cap_n end
local A = {
now_ms = now_ms_n,
window_seconds = window_seconds_n,
rate_limit = rate_limit_n,
concurrency_cap = concurrency_cap_n,
execution_id = args[5],
jitter_ms = tonumber(args[6] or "0"),
}
local window_ms = A.window_seconds * 1000
if redis.call("EXISTS", K.admitted_guard_key) == 1 then
return { "ALREADY_ADMITTED" }
end
redis.call("ZREMRANGEBYSCORE", K.window_zset, "-inf", A.now_ms - window_ms)
if A.rate_limit > 0 then
local current_count = redis.call("ZCARD", K.window_zset)
if current_count >= A.rate_limit then
local oldest = redis.call("ZRANGE", K.window_zset, 0, 0, "WITHSCORES")
local retry_after_ms = 0
if #oldest >= 2 then
retry_after_ms = tonumber(oldest[2]) + window_ms - A.now_ms
if retry_after_ms < 0 then retry_after_ms = 0 end
end
local jitter = 0
if A.jitter_ms > 0 then
jitter = math.random(0, A.jitter_ms)
end
return { "RATE_EXCEEDED", tostring(retry_after_ms + jitter) }
end
end
if A.concurrency_cap > 0 then
local active = tonumber(redis.call("GET", K.concurrency_key) or "0")
if active >= A.concurrency_cap then
return { "CONCURRENCY_EXCEEDED" }
end
end
redis.call("ZADD", K.window_zset, A.now_ms, A.execution_id)
if window_ms > 0 then
redis.call("SET", K.admitted_guard_key, "1", "PX", window_ms, "NX")
end
if A.concurrency_cap > 0 then
redis.call("INCR", K.concurrency_key)
end
redis.call("SADD", K.admitted_set, A.execution_id)
return { "ADMITTED" }
end)
redis.register_function('ff_release_admission', function(keys, args)
local K = {
admitted_guard_key = keys[1],
admitted_set = keys[2],
concurrency_key = keys[3],
}
local execution_id = args[1]
redis.call("DEL", K.admitted_guard_key)
redis.call("SREM", K.admitted_set, execution_id)
local current = tonumber(redis.call("GET", K.concurrency_key) or "0")
if current > 0 then
redis.call("DECR", K.concurrency_key)
end
return {1, "OK", "released"}
end)
local MAX_CYCLE_CHECK_NODES = 1000
local function detect_cycle(flow_prefix, start_eid, target_eid)
local visited = {}
local queue = {start_eid}
local count = 0
while #queue > 0 do
local next_queue = {}
for _, eid in ipairs(queue) do
if eid == target_eid then
return true
end
if not visited[eid] then
visited[eid] = true
count = count + 1
if count > MAX_CYCLE_CHECK_NODES then
return true end
local out_key = flow_prefix .. ":out:" .. eid
local edges = redis.call("SMEMBERS", out_key)
for _, edge_id in ipairs(edges) do
local edge_key = flow_prefix .. ":edge:" .. edge_id
local next_eid = redis.call("HGET", edge_key, "downstream_execution_id")
if next_eid and next_eid ~= "" and not visited[next_eid] then
next_queue[#next_queue + 1] = next_eid
end
end
end
end
queue = next_queue
end
return false
end
redis.register_function('ff_create_flow', function(keys, args)
local K = {
flow_core = keys[1],
members_set = keys[2],
flow_index = keys[3],
}
local A = {
flow_id = args[1],
flow_kind = args[2],
namespace = args[3],
}
local now_ms = server_time_ms()
redis.call("SADD", K.flow_index, A.flow_id)
if redis.call("EXISTS", K.flow_core) == 1 then
return ok_already_satisfied(A.flow_id)
end
redis.call("HSET", K.flow_core,
"flow_id", A.flow_id,
"flow_kind", A.flow_kind,
"namespace", A.namespace,
"graph_revision", 0,
"node_count", 0,
"edge_count", 0,
"public_flow_state", "open",
"created_at", now_ms,
"last_mutation_at", now_ms)
return ok(A.flow_id)
end)
redis.register_function('ff_add_execution_to_flow', function(keys, args)
local K = {
flow_core = keys[1],
members_set = keys[2],
flow_index = keys[3],
exec_core = keys[4],
}
local A = {
flow_id = args[1],
execution_id = args[2],
}
local now_ms = server_time_ms()
local raw = redis.call("HGETALL", K.flow_core)
if #raw == 0 then return err("flow_not_found") end
local flow = hgetall_to_table(raw)
local pfs = flow.public_flow_state or ""
if pfs == "cancelled" or pfs == "completed" or pfs == "failed" then
return err("flow_already_terminal")
end
if redis.call("EXISTS", K.exec_core) == 0 then
return err("execution_not_found")
end
redis.call("SADD", K.flow_index, A.flow_id)
if redis.call("SISMEMBER", K.members_set, A.execution_id) == 1 then
local existing = redis.call("HGET", K.exec_core, "flow_id")
if existing and existing ~= "" and existing ~= A.flow_id then
return err("already_member_of_different_flow:" .. existing)
end
redis.call("HSET", K.exec_core, "flow_id", A.flow_id)
local nc = redis.call("HGET", K.flow_core, "node_count") or "0"
return ok_already_satisfied(A.execution_id, nc)
end
local existing_flow_id = redis.call("HGET", K.exec_core, "flow_id")
if existing_flow_id and existing_flow_id ~= "" and existing_flow_id ~= A.flow_id then
return err("already_member_of_different_flow:" .. existing_flow_id)
end
redis.call("SADD", K.members_set, A.execution_id)
redis.call("HSET", K.exec_core, "flow_id", A.flow_id)
local new_nc = redis.call("HINCRBY", K.flow_core, "node_count", 1)
local new_rev = redis.call("HINCRBY", K.flow_core, "graph_revision", 1)
redis.call("HSET", K.flow_core, "last_mutation_at", now_ms)
return ok(A.execution_id, tostring(new_nc))
end)
redis.register_function('ff_cancel_flow', function(keys, args)
local K = {
flow_core = keys[1],
members_set = keys[2],
pending_cancels = keys[4], cancel_backlog = keys[5], }
local A = {
flow_id = args[1],
reason = args[2],
cancellation_policy = args[3],
}
local grace_ms
if args[5] == nil or args[5] == "" then
grace_ms = 30000
else
local g = tonumber(args[5])
if not g
or g ~= g or g < 0
or g > 9007199254740991 or g ~= math.floor(g) then
return err("invalid_grace_ms")
end
grace_ms = g
end
A.grace_ms = grace_ms
local now_ms = server_time_ms()
local raw = redis.call("HGETALL", K.flow_core)
if #raw == 0 then return err("flow_not_found") end
local flow = hgetall_to_table(raw)
local pfs = flow.public_flow_state or ""
if pfs == "cancelled" or pfs == "completed" or pfs == "failed" then
return err("flow_already_terminal")
end
local members = redis.call("SMEMBERS", K.members_set)
redis.call("HSET", K.flow_core,
"public_flow_state", "cancelled",
"cancelled_at", now_ms,
"cancel_reason", A.reason,
"cancellation_policy", A.cancellation_policy,
"last_mutation_at", now_ms)
if A.cancellation_policy == "cancel_all" and #members > 0 then
local i = 1
while i <= #members do
local chunk_end = math.min(i + 999, #members)
local sadd_args = {}
for j = i, chunk_end do
sadd_args[#sadd_args + 1] = members[j]
end
redis.call("SADD", K.pending_cancels, unpack(sadd_args))
i = chunk_end + 1
end
redis.call("ZADD", K.cancel_backlog, now_ms + A.grace_ms, A.flow_id)
end
local result = {1, "OK", A.cancellation_policy}
for _, eid in ipairs(members) do
result[#result + 1] = eid
end
return result
end)
redis.register_function('ff_ack_cancel_member', function(keys, args)
local pending = keys[1]
local backlog = keys[2]
local eid = args[1]
local flow_id = args[2]
redis.call("SREM", pending, eid)
if redis.call("EXISTS", pending) == 0 then
redis.call("ZREM", backlog, flow_id)
end
return ok()
end)
redis.register_function('ff_stage_dependency_edge', function(keys, args)
local K = {
flow_core = keys[1],
members_set = keys[2],
edge_hash = keys[3],
out_adj_set = keys[4],
in_adj_set = keys[5],
grant_hash = keys[6],
}
local A = {
flow_id = args[1],
edge_id = args[2],
upstream_eid = args[3],
downstream_eid = args[4],
dependency_kind = args[5] or "success_only",
data_passing_ref = args[6] or "",
expected_graph_revision = args[7],
now_ms = args[8],
}
if A.upstream_eid == A.downstream_eid then
return err("self_referencing_edge")
end
local raw = redis.call("HGETALL", K.flow_core)
if #raw == 0 then return err("flow_not_found") end
local flow = hgetall_to_table(raw)
local pfs = flow.public_flow_state or ""
if pfs == "cancelled" or pfs == "completed" or pfs == "failed" then
return err("flow_already_terminal")
end
if tostring(flow.graph_revision or "0") ~= A.expected_graph_revision then
return err("stale_graph_revision")
end
if redis.call("SISMEMBER", K.members_set, A.upstream_eid) == 0 then
return err("execution_not_in_flow")
end
if redis.call("SISMEMBER", K.members_set, A.downstream_eid) == 0 then
return err("execution_not_in_flow")
end
local flow_prefix = string.sub(K.flow_core, 1, -6) if detect_cycle(flow_prefix, A.downstream_eid, A.upstream_eid) then
return err("cycle_detected")
end
if redis.call("EXISTS", K.edge_hash) == 1 then
return err("dependency_already_exists")
end
redis.call("HSET", K.edge_hash,
"edge_id", A.edge_id,
"flow_id", A.flow_id,
"upstream_execution_id", A.upstream_eid,
"downstream_execution_id", A.downstream_eid,
"dependency_kind", A.dependency_kind,
"satisfaction_condition", "all_required",
"data_passing_ref", A.data_passing_ref,
"edge_state", "pending",
"created_at", A.now_ms,
"created_by", "engine")
redis.call("SADD", K.out_adj_set, A.edge_id)
redis.call("SADD", K.in_adj_set, A.edge_id)
local new_rev = redis.call("HINCRBY", K.flow_core, "graph_revision", 1)
redis.call("HINCRBY", K.flow_core, "edge_count", 1)
redis.call("HSET", K.flow_core, "last_mutation_at", A.now_ms)
return ok(A.edge_id, tostring(new_rev))
end)
redis.register_function('ff_apply_dependency_to_child', function(keys, args)
local K = {
core_key = keys[1],
deps_meta = keys[2],
unresolved_set = keys[3],
dep_hash = keys[4],
eligible_zset = keys[5],
blocked_deps_zset = keys[6],
deps_all_edges = keys[7],
}
local A = {
flow_id = args[1],
edge_id = args[2],
upstream_eid = args[3],
graph_revision = args[4],
dependency_kind = args[5] or "success_only",
data_passing_ref = args[6] or "",
now_ms = args[7],
}
local raw = redis.call("HGETALL", K.core_key)
if #raw == 0 then return err("execution_not_found") end
local core = hgetall_to_table(raw)
if is_set(core.flow_id) and core.flow_id ~= A.flow_id then
return err("execution_already_in_flow")
end
if redis.call("EXISTS", K.dep_hash) == 1 then
return ok("already_applied")
end
redis.call("HSET", K.dep_hash,
"edge_id", A.edge_id,
"flow_id", A.flow_id,
"upstream_execution_id", A.upstream_eid,
"downstream_execution_id", core.execution_id or "",
"dependency_kind", A.dependency_kind,
"state", "unsatisfied",
"data_passing_ref", A.data_passing_ref,
"last_resolved_at", "")
redis.call("SADD", K.unresolved_set, A.edge_id)
redis.call("SADD", K.deps_all_edges, A.edge_id)
local unresolved = redis.call("HINCRBY", K.deps_meta, "unsatisfied_required_count", 1)
redis.call("HSET", K.deps_meta,
"flow_id", A.flow_id,
"last_flow_graph_revision", A.graph_revision,
"last_dependency_update_at", A.now_ms)
if core.lifecycle_phase == "runnable" and core.terminal_outcome == "none" then
redis.call("HSET", K.core_key,
"lifecycle_phase", core.lifecycle_phase, "ownership_state", core.ownership_state or "unowned", "eligibility_state", "blocked_by_dependencies",
"blocking_reason", "waiting_for_children",
"blocking_detail", unresolved .. " dep(s) unresolved incl " .. A.edge_id,
"terminal_outcome", "none", "attempt_state", core.attempt_state or "pending_first_attempt", "public_state", "waiting_children",
"last_transition_at", A.now_ms,
"last_mutation_at", A.now_ms)
redis.call("ZREM", K.eligible_zset, core.execution_id or "")
redis.call("ZADD", K.blocked_deps_zset,
tonumber(core.created_at or "0"), core.execution_id or "")
end
return ok(tostring(unresolved))
end)
redis.register_function('ff_resolve_dependency', function(keys, args)
local K = {
core_key = keys[1],
deps_meta = keys[2],
unresolved_set = keys[3],
dep_hash = keys[4],
eligible_zset = keys[5],
terminal_zset = keys[6],
blocked_deps_zset = keys[7],
attempt_hash = keys[8],
stream_meta = keys[9],
downstream_payload = keys[10],
upstream_result = keys[11],
}
local A = {
edge_id = args[1],
upstream_outcome = args[2],
now_ms = args[3],
}
local dep_raw = redis.call("HGETALL", K.dep_hash)
if #dep_raw == 0 then return err("invalid_dependency") end
local dep = hgetall_to_table(dep_raw)
if dep.state == "satisfied" or dep.state == "impossible" then
return ok("already_resolved")
end
if A.upstream_outcome == "success" then
redis.call("HSET", K.dep_hash,
"state", "satisfied", "last_resolved_at", A.now_ms)
redis.call("SREM", K.unresolved_set, A.edge_id)
local remaining = redis.call("HINCRBY", K.deps_meta,
"unsatisfied_required_count", -1)
redis.call("HSET", K.deps_meta, "last_dependency_update_at", A.now_ms)
local raw = redis.call("HGETALL", K.core_key)
if #raw == 0 then return ok("satisfied", "") end
local core = hgetall_to_table(raw)
local data_injected = ""
if is_set(dep.data_passing_ref)
and core.terminal_outcome == "none" then
local copied = redis.call(
"COPY", K.upstream_result, K.downstream_payload, "REPLACE")
if copied == 1 then
data_injected = "data_injected"
end
end
if remaining == 0
and core.lifecycle_phase == "runnable"
and core.ownership_state == "unowned"
and core.terminal_outcome == "none"
and core.eligibility_state == "blocked_by_dependencies" then
local new_attempt_state = core.attempt_state
if not is_set(new_attempt_state) or new_attempt_state == "none" then
new_attempt_state = "pending_first_attempt"
end
redis.call("HSET", K.core_key,
"lifecycle_phase", core.lifecycle_phase, "ownership_state", core.ownership_state or "unowned", "eligibility_state", "eligible_now",
"blocking_reason", "waiting_for_worker",
"blocking_detail", "",
"terminal_outcome", "none", "attempt_state", new_attempt_state,
"public_state", "waiting",
"last_transition_at", A.now_ms,
"last_mutation_at", A.now_ms)
redis.call("ZREM", K.blocked_deps_zset, core.execution_id or "")
local priority = tonumber(core.priority or "0")
local created_at_ms = tonumber(core.created_at or "0")
local score = 0 - (priority * 1000000000000) + created_at_ms
redis.call("ZADD", K.eligible_zset, score, core.execution_id or "")
end
return ok("satisfied", data_injected)
end
redis.call("HSET", K.dep_hash,
"state", "impossible", "last_resolved_at", A.now_ms)
redis.call("SREM", K.unresolved_set, A.edge_id)
redis.call("HINCRBY", K.deps_meta, "unsatisfied_required_count", -1)
redis.call("HINCRBY", K.deps_meta, "impossible_required_count", 1)
redis.call("HSET", K.deps_meta, "last_dependency_update_at", A.now_ms)
local raw = redis.call("HGETALL", K.core_key)
if #raw == 0 then return ok("impossible", "") end
local core = hgetall_to_table(raw)
local child_skipped = false
if core.terminal_outcome == "none" then
local skip_attempt_state = core.attempt_state or "none"
if skip_attempt_state == "running_attempt"
or skip_attempt_state == "attempt_interrupted" then
skip_attempt_state = "attempt_terminal"
redis.call("HSET", K.attempt_hash,
"attempt_state", "ended_cancelled",
"ended_at", A.now_ms,
"failure_reason", "dependency_impossible")
if redis.call("EXISTS", K.stream_meta) == 1 then
redis.call("HSET", K.stream_meta,
"closed_at", A.now_ms,
"closed_reason", "dependency_impossible")
end
elseif is_set(skip_attempt_state) and skip_attempt_state ~= "none" then
skip_attempt_state = "none"
end
redis.call("HSET", K.core_key,
"lifecycle_phase", "terminal",
"ownership_state", "unowned",
"eligibility_state", "not_applicable",
"blocking_reason", "none",
"blocking_detail", "",
"terminal_outcome", "skipped",
"attempt_state", skip_attempt_state,
"public_state", "skipped",
"completed_at", A.now_ms,
"last_transition_at", A.now_ms,
"last_mutation_at", A.now_ms)
redis.call("ZREM", K.blocked_deps_zset, core.execution_id or "")
redis.call("ZADD", K.terminal_zset, tonumber(A.now_ms), core.execution_id or "")
child_skipped = true
if is_set(core.flow_id) and is_set(core.execution_id) then
local payload = cjson.encode({
execution_id = core.execution_id,
flow_id = core.flow_id,
outcome = "skipped",
})
redis.call("PUBLISH", "ff:dag:completions", payload)
end
end
return ok("impossible", child_skipped and "child_skipped" or "")
end)
redis.register_function('ff_evaluate_flow_eligibility', function(keys, args)
local raw = redis.call("HGETALL", keys[1])
if #raw == 0 then return ok("not_found") end
local core = hgetall_to_table(raw)
if core.lifecycle_phase ~= "runnable" then
return ok("not_runnable")
end
if core.ownership_state ~= "unowned" then
return ok("owned")
end
if core.terminal_outcome ~= "none" then
return ok("terminal")
end
local deps_raw = redis.call("HGETALL", keys[2])
if #deps_raw == 0 then
return ok("eligible")
end
local deps = hgetall_to_table(deps_raw)
local impossible = tonumber(deps.impossible_required_count or "0")
if impossible > 0 then
return ok("impossible")
end
local unresolved = tonumber(deps.unsatisfied_required_count or "0")
if unresolved > 0 then
return ok("blocked_by_dependencies")
end
return ok("eligible")
end)
redis.register_function('ff_promote_blocked_to_eligible', function(keys, args)
local K = {
core_key = keys[1],
blocked_deps_zset = keys[2],
eligible_zset = keys[3],
deps_meta = keys[4],
deps_unresolved = keys[5],
}
local A = {
execution_id = args[1],
now_ms = args[2],
}
local raw = redis.call("HGETALL", K.core_key)
if #raw == 0 then return err("execution_not_found") end
local core = hgetall_to_table(raw)
if core.lifecycle_phase ~= "runnable" then
return err("not_runnable")
end
if core.eligibility_state ~= "blocked_by_dependencies" then
return err("not_blocked_by_deps")
end
if core.terminal_outcome ~= "none" then
return err("terminal")
end
local unsatisfied = tonumber(
redis.call("HGET", K.deps_meta, "unsatisfied_required_count") or "0")
local unresolved_count = redis.call("SCARD", K.deps_unresolved)
if unsatisfied > 0 or unresolved_count > 0 then
return err("deps_not_satisfied", tostring(unsatisfied), tostring(unresolved_count))
end
local new_attempt_state = core.attempt_state
if not is_set(new_attempt_state) or new_attempt_state == "none" then
new_attempt_state = "pending_first_attempt"
end
redis.call("HSET", K.core_key,
"lifecycle_phase", core.lifecycle_phase, "ownership_state", core.ownership_state or "unowned", "eligibility_state", "eligible_now",
"blocking_reason", "waiting_for_worker",
"blocking_detail", "",
"terminal_outcome", "none", "attempt_state", new_attempt_state,
"public_state", "waiting",
"last_transition_at", A.now_ms,
"last_mutation_at", A.now_ms)
redis.call("ZREM", K.blocked_deps_zset, A.execution_id)
local priority = tonumber(core.priority or "0")
local created_at_ms = tonumber(core.created_at or "0")
local score = 0 - (priority * 1000000000000) + created_at_ms
redis.call("ZADD", K.eligible_zset, score, A.execution_id)
return ok()
end)
redis.register_function('ff_replay_execution', function(keys, args)
local K = {
core_key = keys[1],
terminal_zset = keys[2],
eligible_zset = keys[3],
lease_history = keys[4],
}
local A = {
execution_id = args[1],
}
local t = redis.call("TIME")
local now_ms = tonumber(t[1]) * 1000 + math.floor(tonumber(t[2]) / 1000)
local raw = redis.call("HGETALL", K.core_key)
if #raw == 0 then return err("execution_not_found") end
local core = hgetall_to_table(raw)
if core.lifecycle_phase ~= "terminal" then
return err("execution_not_terminal")
end
local replay_count = tonumber(core.replay_count or "0")
local max_replays = 10 local policy_key = string.gsub(K.core_key, ":core$", ":policy")
local policy_raw = redis.call("GET", policy_key)
if policy_raw then
local ok_p, pol = pcall(cjson.decode, policy_raw)
if ok_p and type(pol) == "table" then
max_replays = tonumber(pol.max_replay_count) or 10
end
end
if replay_count >= max_replays then
return err("max_replays_exhausted")
end
local is_skipped_flow_member = (core.terminal_outcome == "skipped") and is_set(core.flow_id)
if is_skipped_flow_member then
local blocked_deps_zset = keys[5]
local deps_meta = keys[6]
local deps_unresolved = keys[7]
local num_edges = #args - 2
local new_unsatisfied = 0
for i = 1, num_edges do
local edge_id = args[2 + i]
local dep_key = keys[7 + i]
local dep_state = redis.call("HGET", dep_key, "state")
if dep_state == "impossible" then
redis.call("HSET", dep_key,
"state", "unsatisfied",
"last_resolved_at", "")
redis.call("SADD", deps_unresolved, edge_id)
new_unsatisfied = new_unsatisfied + 1
elseif dep_state == "unsatisfied" then
new_unsatisfied = new_unsatisfied + 1
end
end
redis.call("HSET", deps_meta,
"unsatisfied_required_count", tostring(new_unsatisfied),
"impossible_required_count", "0",
"last_dependency_update_at", tostring(now_ms))
redis.call("HSET", K.core_key,
"lifecycle_phase", "runnable",
"ownership_state", "unowned",
"eligibility_state", "blocked_by_dependencies",
"blocking_reason", "waiting_for_children",
"blocking_detail", tostring(new_unsatisfied) .. " dep(s) unsatisfied after replay",
"terminal_outcome", "none",
"attempt_state", "pending_replay_attempt",
"public_state", "waiting_children",
"pending_replay_attempt", "1",
"replay_count", tostring(replay_count + 1),
"completed_at", "",
"last_transition_at", tostring(now_ms),
"last_mutation_at", tostring(now_ms))
redis.call("ZREM", K.terminal_zset, A.execution_id)
redis.call("ZADD", blocked_deps_zset,
tonumber(core.created_at or "0"), A.execution_id)
redis.call("XADD", K.lease_history, "MAXLEN", "~", 1000, "*",
"event", "replay_initiated",
"replay_count", tostring(replay_count + 1),
"replay_type", "skipped_flow_member",
"ts", tostring(now_ms))
return ok(tostring(new_unsatisfied))
else
local priority = tonumber(core.priority or "0")
local created_at = tonumber(core.created_at or "0")
local score = 0 - (priority * 1000000000000) + created_at
redis.call("HSET", K.core_key,
"lifecycle_phase", "runnable",
"ownership_state", "unowned",
"eligibility_state", "eligible_now",
"blocking_reason", "waiting_for_worker",
"blocking_detail", "",
"terminal_outcome", "none",
"attempt_state", "pending_replay_attempt",
"public_state", "waiting",
"pending_replay_attempt", "1",
"replay_count", tostring(replay_count + 1),
"completed_at", "",
"last_transition_at", tostring(now_ms),
"last_mutation_at", tostring(now_ms))
redis.call("ZREM", K.terminal_zset, A.execution_id)
redis.call("ZADD", K.eligible_zset, score, A.execution_id)
redis.call("XADD", K.lease_history, "MAXLEN", "~", 1000, "*",
"event", "replay_initiated",
"replay_count", tostring(replay_count + 1),
"replay_type", "normal",
"ts", tostring(now_ms))
return ok("0")
end
end)
redis.register_function('ff_set_flow_tags', function(keys, args)
local K = {
flow_core = keys[1],
tags_key = keys[2],
}
local n = #args
if n == 0 or (n % 2) ~= 0 then
return err("invalid_input", "tags must be non-empty even-length key/value pairs")
end
if redis.call("EXISTS", K.flow_core) == 0 then
return err("flow_not_found")
end
for i = 1, n, 2 do
local k = args[i]
if type(k) ~= "string" or not string.find(k, "^[a-z][a-z0-9_]*%.[^.]") then
return err("invalid_tag_key", tostring(k))
end
end
local migrated = redis.call("HGET", K.flow_core, "tags_migrated")
if migrated ~= "1" then
local flat = redis.call("HGETALL", K.flow_core)
local to_migrate = {}
local to_delete = {}
for i = 1, #flat, 2 do
local fname = flat[i]
if type(fname) == "string" and string.find(fname, "^[a-z][a-z0-9_]*%.[^.]") then
to_migrate[#to_migrate + 1] = fname
to_migrate[#to_migrate + 1] = flat[i + 1]
to_delete[#to_delete + 1] = fname
end
end
if #to_migrate > 0 then
redis.call("HSET", K.tags_key, unpack(to_migrate))
redis.call("HDEL", K.flow_core, unpack(to_delete))
end
redis.call("HSET", K.flow_core, "tags_migrated", "1")
end
redis.call("HSET", K.tags_key, unpack(args))
local now_ms = server_time_ms()
redis.call("HSET", K.flow_core, "last_mutation_at", tostring(now_ms))
return ok(tostring(n / 2))
end)