local M = {}
local _engine_url = nil
local _workflows = {}
local _activities = {}
local _worker_id = nil
local _auth_token = nil
local function url_encode(s)
return (tostring(s):gsub("([^A-Za-z0-9%-_.~])", function(c)
return string.format("%%%02X", string.byte(c))
end))
end
function M.connect(url, opts)
_engine_url = url:gsub("/$", "") if opts and opts.token then
_auth_token = opts.token
end
local resp = M._api("GET", "/health")
if resp.status ~= 200 then
error("workflow.connect: cannot reach engine at " .. url)
end
log.info("Connected to workflow engine at " .. url)
end
function M.define(name, handler)
_workflows[name] = handler
end
function M.activity(name, handler)
_activities[name] = handler
end
function M.start(opts)
local body = {
workflow_type = opts.workflow_type,
workflow_id = opts.workflow_id,
input = opts.input,
task_queue = opts.task_queue or "default",
}
local resp = M._api("POST", "/workflows", body)
if resp.status ~= 201 then
error("workflow.start failed: " .. (resp.body or "unknown error"))
end
return json.parse(resp.body)
end
function M.signal(workflow_id, signal_name, payload)
local body = payload and { payload = payload } or {}
local resp = M._api("POST", "/workflows/" .. workflow_id .. "/signal/" .. signal_name, body)
if resp.status ~= 200 then
error("workflow.signal failed: " .. (resp.body or "unknown error"))
end
end
function M.describe(workflow_id)
local resp = M._api("GET", "/workflows/" .. workflow_id)
if resp.status ~= 200 then
error("workflow.describe failed: " .. (resp.body or "unknown error"))
end
return json.parse(resp.body)
end
function M.cancel(workflow_id)
local resp = M._api("POST", "/workflows/" .. workflow_id .. "/cancel")
if resp.status ~= 200 then
error("workflow.cancel failed: " .. (resp.body or "unknown error"))
end
end
function M.terminate(workflow_id, reason)
local body = reason and { reason = reason } or {}
local resp = M._api("POST", "/workflows/" .. workflow_id .. "/terminate", body)
if resp.status ~= 200 then
error("workflow.terminate failed: " .. (resp.body or "unknown error"))
end
end
function M.list(opts)
opts = opts or {}
local params = {}
if opts.namespace then params[#params + 1] = "namespace=" .. url_encode(opts.namespace) end
if opts.status then params[#params + 1] = "status=" .. url_encode(opts.status) end
if opts.type then params[#params + 1] = "type=" .. url_encode(opts.type) end
if opts.search_attrs then
params[#params + 1] = "search_attrs="
.. url_encode(json.encode(opts.search_attrs))
end
if opts.limit then params[#params + 1] = "limit=" .. tostring(opts.limit) end
if opts.offset then params[#params + 1] = "offset=" .. tostring(opts.offset) end
local path = "/workflows"
if #params > 0 then path = path .. "?" .. table.concat(params, "&") end
local resp = M._api("GET", path)
if resp.status ~= 200 then
error("workflow.list failed: " .. (resp.body or "unknown error"))
end
return json.parse(resp.body)
end
function M.get_events(workflow_id)
local resp = M._api("GET", "/workflows/" .. workflow_id .. "/events")
if resp.status ~= 200 then
error("workflow.get_events failed: " .. (resp.body or "unknown error"))
end
return json.parse(resp.body)
end
function M.get_state(workflow_id, name)
local path = "/workflows/" .. workflow_id .. "/state"
if name then path = path .. "/" .. name end
local resp = M._api("GET", path)
if resp.status == 404 then
return nil end
if resp.status ~= 200 then
error("workflow.get_state failed: " .. (resp.body or "unknown error"))
end
return json.parse(resp.body)
end
function M.list_children(workflow_id)
local resp = M._api("GET", "/workflows/" .. workflow_id .. "/children")
if resp.status ~= 200 then
error("workflow.list_children failed: " .. (resp.body or "unknown error"))
end
return json.parse(resp.body)
end
function M.continue_as_new(workflow_id, input)
local body = { input = input }
local resp = M._api(
"POST",
"/workflows/" .. workflow_id .. "/continue-as-new",
body
)
if resp.status ~= 201 then
error("workflow.continue_as_new failed: " .. (resp.body or "unknown error"))
end
return json.parse(resp.body)
end
M.schedules = {}
function M.schedules.create(opts)
if not opts or not opts.name or not opts.workflow_type or not opts.cron_expr then
error("workflow.schedules.create: name, workflow_type, cron_expr required")
end
local resp = M._api("POST", "/schedules", opts)
if resp.status ~= 201 then
error("workflow.schedules.create failed: " .. (resp.body or "unknown error"))
end
return json.parse(resp.body)
end
function M.schedules.list(opts)
local ns = (opts and opts.namespace) or "main"
local resp = M._api("GET", "/schedules?namespace=" .. url_encode(ns))
if resp.status ~= 200 then
error("workflow.schedules.list failed: " .. (resp.body or "unknown error"))
end
return json.parse(resp.body)
end
function M.schedules.describe(name, opts)
local ns = (opts and opts.namespace) or "main"
local resp = M._api(
"GET",
"/schedules/" .. url_encode(name) .. "?namespace=" .. url_encode(ns)
)
if resp.status == 404 then return nil end
if resp.status ~= 200 then
error("workflow.schedules.describe failed: " .. (resp.body or "unknown error"))
end
return json.parse(resp.body)
end
function M.schedules.patch(name, patch, opts)
local ns = (opts and opts.namespace) or "main"
local resp = M._api(
"PATCH",
"/schedules/" .. url_encode(name) .. "?namespace=" .. url_encode(ns),
patch or {}
)
if resp.status ~= 200 then
error("workflow.schedules.patch failed: " .. (resp.body or "unknown error"))
end
return json.parse(resp.body)
end
function M.schedules.pause(name, opts)
local ns = (opts and opts.namespace) or "main"
local resp = M._api(
"POST",
"/schedules/" .. url_encode(name) .. "/pause?namespace=" .. url_encode(ns)
)
if resp.status ~= 200 then
error("workflow.schedules.pause failed: " .. (resp.body or "unknown error"))
end
return json.parse(resp.body)
end
function M.schedules.resume(name, opts)
local ns = (opts and opts.namespace) or "main"
local resp = M._api(
"POST",
"/schedules/" .. url_encode(name) .. "/resume?namespace=" .. url_encode(ns)
)
if resp.status ~= 200 then
error("workflow.schedules.resume failed: " .. (resp.body or "unknown error"))
end
return json.parse(resp.body)
end
function M.schedules.delete(name, opts)
local ns = (opts and opts.namespace) or "main"
local resp = M._api(
"DELETE",
"/schedules/" .. url_encode(name) .. "?namespace=" .. url_encode(ns)
)
if resp.status ~= 200 then
error("workflow.schedules.delete failed: " .. (resp.body or "unknown error"))
end
end
M.namespaces = {}
function M.namespaces.create(name)
local resp = M._api("POST", "/namespaces", { name = name })
if resp.status ~= 201 and resp.status ~= 200 then
error("workflow.namespaces.create failed: " .. (resp.body or "unknown error"))
end
end
function M.namespaces.list()
local resp = M._api("GET", "/namespaces")
if resp.status ~= 200 then
error("workflow.namespaces.list failed: " .. (resp.body or "unknown error"))
end
return json.parse(resp.body)
end
function M.namespaces.stats(name)
local resp = M._api("GET", "/namespaces/" .. url_encode(name))
if resp.status ~= 200 then
error("workflow.namespaces.stats failed: " .. (resp.body or "unknown error"))
end
return json.parse(resp.body)
end
function M.namespaces.describe(name)
return M.namespaces.stats(name)
end
function M.namespaces.delete(name)
local resp = M._api("DELETE", "/namespaces/" .. url_encode(name))
if resp.status ~= 200 then
error("workflow.namespaces.delete failed: " .. (resp.body or "unknown error"))
end
end
M.workers = {}
function M.workers.list(opts)
local ns = (opts and opts.namespace) or "main"
local resp = M._api("GET", "/workers?namespace=" .. url_encode(ns))
if resp.status ~= 200 then
error("workflow.workers.list failed: " .. (resp.body or "unknown error"))
end
return json.parse(resp.body)
end
M.queues = {}
function M.queues.stats(opts)
local ns = (opts and opts.namespace) or "main"
local resp = M._api("GET", "/queues?namespace=" .. url_encode(ns))
if resp.status ~= 200 then
error("workflow.queues.stats failed: " .. (resp.body or "unknown error"))
end
return json.parse(resp.body)
end
function M.listen(opts)
if not _engine_url then
error("workflow.listen: call workflow.connect() first")
end
local queue = opts.queue or "default"
local identity = opts.identity or
("assay-worker-" .. (os.hostname and os.hostname() or "unknown"))
local wf_names, act_names = {}, {}
for name in pairs(_workflows) do wf_names[#wf_names + 1] = name end
for name in pairs(_activities) do act_names[#act_names + 1] = name end
local reg_resp = M._api("POST", "/workers/register", {
identity = identity,
queue = queue,
workflows = wf_names,
activities = act_names,
max_concurrent_workflows = opts.max_concurrent_workflows or 10,
max_concurrent_activities = opts.max_concurrent_activities or 20,
})
if reg_resp.status ~= 200 then
error("workflow.listen: registration failed: " .. (reg_resp.body or "unknown"))
end
_worker_id = json.parse(reg_resp.body).worker_id
log.info("Registered as worker " .. _worker_id .. " on queue '" .. queue .. "'")
while true do
M._api("POST", "/workers/heartbeat", { worker_id = _worker_id })
local did_work = M._poll_workflow_task(queue) or M._poll_activity_task(queue)
if not did_work then sleep(0.5) end
end
end
function M._poll_workflow_task(queue)
local resp = M._api("POST", "/workflow-tasks/poll", {
queue = queue,
worker_id = _worker_id,
})
if resp.status ~= 200 or not resp.body or resp.body == "null" or resp.body == "" then
return false
end
local task = json.parse(resp.body)
if not task or not task.workflow_id then return false end
local commands = M._handle_workflow_task(task)
M._api("POST", "/workflow-tasks/" .. task.workflow_id .. "/commands", {
worker_id = _worker_id,
commands = commands,
})
return true
end
function M._poll_activity_task(queue)
local resp = M._api("POST", "/tasks/poll", {
queue = queue,
worker_id = _worker_id,
})
if resp.status ~= 200 or not resp.body or resp.body == "null" or resp.body == "" then
return false
end
local task = json.parse(resp.body)
if not task or not task.id then return false end
local ok, result_or_err = pcall(function()
return M._execute_activity(task)
end)
if ok then
M._api("POST", "/tasks/" .. task.id .. "/complete", { result = result_or_err })
else
M._api("POST", "/tasks/" .. task.id .. "/fail", { error = tostring(result_or_err) })
end
return true
end
function M._handle_workflow_task(task)
local handler = _workflows[task.workflow_type]
if not handler then
return {{
type = "FailWorkflow",
error = "no workflow handler registered for type: " .. tostring(task.workflow_type),
}}
end
local ctx = M._make_workflow_ctx(task.workflow_id, task.history or {})
local co = coroutine.create(function() return handler(ctx, task.input) end)
local ok, yielded_or_returned = coroutine.resume(co)
local snapshot_cmd = M._collect_snapshot(ctx)
local function with_snapshot(cmds)
if snapshot_cmd then
table.insert(cmds, 1, snapshot_cmd)
end
return cmds
end
if not ok then
local err = tostring(yielded_or_returned)
if err:find("__ASSAY_WORKFLOW_CANCELLED__", 1, true) then
return with_snapshot({{ type = "CancelWorkflow" }})
end
return with_snapshot({{ type = "FailWorkflow", error = err }})
end
if coroutine.status(co) == "dead" then
return with_snapshot({{ type = "CompleteWorkflow", result = yielded_or_returned }})
end
if type(yielded_or_returned) == "table" and yielded_or_returned._batch then
return with_snapshot(yielded_or_returned.commands)
end
return with_snapshot({ yielded_or_returned })
end
function M._collect_snapshot(ctx)
if not ctx._queries or not next(ctx._queries) then
return nil
end
local state = {}
for name, fn in pairs(ctx._queries) do
local ok, value = pcall(fn)
if ok then
state[name] = value
end
end
if next(state) == nil then
return nil
end
return { type = "RecordSnapshot", state = state }
end
function M._make_workflow_ctx(workflow_id, history)
local activity_results, fired_timers, side_effects, child_results = {}, {}, {}, {}
local signals_by_name = {} local cancel_requested = false
for _, event in ipairs(history) do
local p = event.payload
if event.event_type == "ActivityCompleted" and p and p.activity_seq then
activity_results[p.activity_seq] = { ok = true, value = p.result }
elseif event.event_type == "ActivityFailed" and p and p.activity_seq then
activity_results[p.activity_seq] = { ok = false, err = p.error }
elseif event.event_type == "TimerFired" and p and p.timer_seq then
fired_timers[p.timer_seq] = true
elseif event.event_type == "SignalReceived" and p and p.signal then
signals_by_name[p.signal] = signals_by_name[p.signal] or {}
table.insert(signals_by_name[p.signal], p.payload)
elseif event.event_type == "SideEffectRecorded" and p and p.side_effect_seq then
side_effects[p.side_effect_seq] = p.value
elseif event.event_type == "ChildWorkflowCompleted" and p and p.child_workflow_id then
child_results[p.child_workflow_id] = { ok = true, value = p.result }
elseif event.event_type == "ChildWorkflowFailed" and p and p.child_workflow_id then
child_results[p.child_workflow_id] = { ok = false, err = p.error }
elseif event.event_type == "WorkflowCancelRequested" then
cancel_requested = true
end
end
local signal_cursor = {}
local activity_seq, timer_seq, side_effect_seq = 0, 0, 0
local ctx = { workflow_id = workflow_id }
local function check_cancel()
if cancel_requested then
error("__ASSAY_WORKFLOW_CANCELLED__")
end
end
function ctx:execute_activity(name, input, opts)
check_cancel()
activity_seq = activity_seq + 1
local r = activity_results[activity_seq]
if r then
if r.ok then return r.value end
error("activity '" .. name .. "' failed: " .. tostring(r.err))
end
coroutine.yield({
type = "ScheduleActivity",
seq = activity_seq,
name = name,
task_queue = (opts and opts.task_queue) or "default",
input = input,
max_attempts = opts and opts.max_attempts,
initial_interval_secs = opts and opts.initial_interval_secs,
backoff_coefficient = opts and opts.backoff_coefficient,
start_to_close_secs = opts and opts.start_to_close_secs,
heartbeat_timeout_secs = opts and opts.heartbeat_timeout_secs,
})
error("workflow ctx: yielded but resumed unexpectedly")
end
function ctx:execute_parallel(activities)
check_cancel()
if type(activities) ~= "table" or #activities == 0 then
error("ctx:execute_parallel: activities must be a non-empty list")
end
local seqs, results, all_done, first_error = {}, {}, true, nil
local pending_cmds = {}
for i, a in ipairs(activities) do
activity_seq = activity_seq + 1
seqs[i] = activity_seq
local r = activity_results[activity_seq]
if r then
if r.ok then
results[i] = r.value
else
first_error = first_error
or ("activity '" .. (a.name or "?")
.. "' failed: " .. tostring(r.err))
end
else
all_done = false
local opts = a.opts or {}
pending_cmds[#pending_cmds + 1] = {
type = "ScheduleActivity",
seq = activity_seq,
name = a.name,
task_queue = opts.task_queue or "default",
input = a.input,
max_attempts = opts.max_attempts,
initial_interval_secs = opts.initial_interval_secs,
backoff_coefficient = opts.backoff_coefficient,
start_to_close_secs = opts.start_to_close_secs,
heartbeat_timeout_secs = opts.heartbeat_timeout_secs,
}
end
end
if all_done then
if first_error then error(first_error) end
return results
end
coroutine.yield({ _batch = true, commands = pending_cmds })
error("workflow ctx: yielded but resumed unexpectedly")
end
function ctx:sleep(seconds)
check_cancel()
timer_seq = timer_seq + 1
if fired_timers[timer_seq] then return end
coroutine.yield({
type = "ScheduleTimer",
seq = timer_seq,
duration_secs = seconds,
})
error("workflow ctx: yielded but resumed unexpectedly")
end
function ctx:side_effect(name, fn)
check_cancel()
side_effect_seq = side_effect_seq + 1
local cached = side_effects[side_effect_seq]
if cached ~= nil then
return cached
end
local value = fn()
coroutine.yield({
type = "RecordSideEffect",
seq = side_effect_seq,
name = name,
value = value,
})
error("workflow ctx: yielded but resumed unexpectedly")
end
function ctx:start_child_workflow(workflow_type, opts)
check_cancel()
if not opts or not opts.workflow_id then
error("ctx:start_child_workflow: opts.workflow_id is required")
end
local cached = child_results[opts.workflow_id]
if cached then
if cached.ok then return cached.value end
error("child workflow '" .. opts.workflow_id ..
"' failed: " .. tostring(cached.err))
end
coroutine.yield({
type = "StartChildWorkflow",
workflow_type = workflow_type,
workflow_id = opts.workflow_id,
input = opts.input,
task_queue = opts.task_queue or "default",
})
error("workflow ctx: yielded but resumed unexpectedly")
end
function ctx:upsert_search_attributes(patch)
check_cancel()
if type(patch) ~= "table" then
error("ctx:upsert_search_attributes: patch must be a table")
end
coroutine.yield({
type = "UpsertSearchAttributes",
patch = patch,
})
error("workflow ctx: yielded but resumed unexpectedly")
end
function ctx:continue_as_new(input)
check_cancel()
coroutine.yield({
type = "ContinueAsNew",
input = input,
})
error("workflow ctx: yielded but resumed unexpectedly")
end
function ctx:register_query(name, fn)
if type(name) ~= "string" or name == "" then
error("ctx:register_query: name must be a non-empty string")
end
if type(fn) ~= "function" then
error("ctx:register_query: handler must be a function")
end
self._queries = self._queries or {}
self._queries[name] = fn
end
function ctx:wait_for_signal(name)
check_cancel()
local consumed = signal_cursor[name] or 0
local arrivals = signals_by_name[name] or {}
if consumed < #arrivals then
consumed = consumed + 1
signal_cursor[name] = consumed
return arrivals[consumed]
end
coroutine.yield({
type = "WaitForSignal",
name = name,
})
error("workflow ctx: yielded but resumed unexpectedly")
end
return ctx
end
function M._execute_activity(task)
local handler = _activities[task.name]
if not handler then
error("No handler registered for activity: " .. (task.name or "?"))
end
local input = task.input
if type(input) == "string" then input = json.parse(input) end
local ctx = M._make_activity_ctx(task)
return handler(ctx, input)
end
function M._make_activity_ctx(task)
local ctx = {}
function ctx:heartbeat(details)
M._api("POST", "/tasks/" .. task.id .. "/heartbeat", { details = details })
end
return ctx
end
function M._api(method, path, body)
local url = _engine_url .. "/api/v1" .. path
local opts = { headers = {} }
if _auth_token then
opts.headers["Authorization"] = "Bearer " .. _auth_token
end
if method == "GET" then
return http.get(url, opts)
elseif method == "POST" then
return http.post(url, body or {}, opts)
elseif method == "PATCH" then
return http.patch(url, body or {}, opts)
elseif method == "DELETE" then
return http.delete(url, opts)
else
error("workflow._api: unsupported method: " .. method)
end
end
return M