assay-lua 0.11.2

General-purpose enhanced Lua runtime. Batteries-included scripting, automation, and web services.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
--- assay.workflow — client library for the assay workflow engine.
---
--- An assay Lua app that calls `workflow.listen()` becomes a worker for a
--- running `assay serve` instance. Workflows are written as plain Lua
--- functions; activities are also plain Lua functions. The engine drives
--- execution via two task types polled by the same worker:
---
---  - **Workflow tasks**: orchestration. The handler is re-run from scratch
---    on every "step" — each `ctx:execute_activity / sleep / wait_for_signal`
---    call either returns a value cached in the workflow's event history
---    (deterministic replay) or yields a command for the engine to schedule.
---  - **Activity tasks**: concrete work. The handler runs once; its return
---    value is persisted as `ActivityCompleted` so future replays of the
---    parent workflow short-circuit at that step.
---
--- The worker survives engine restarts, network blips, and worker crashes:
--- on resume any other worker on the queue can pick up the workflow task,
--- replay from the event log, and reach the same point. Side effects are
--- never executed twice as long as workflow code is deterministic — see
--- `ctx:side_effect` for the escape hatch when it isn't.
---
--- Usage:
---   local workflow = require("assay.workflow")
---   workflow.connect("http://assay-server:8080")
---
---   workflow.define("IngestData", function(ctx, input)
---       local data = ctx:execute_activity("fetch_s3", { bucket = input.source })
---       ctx:sleep(10)
---       ctx:execute_activity("load_warehouse", { data = data })
---       return { status = "done" }
---   end)
---
---   workflow.activity("fetch_s3", function(ctx, input)
---       return http.get("https://s3/" .. input.bucket).body
---   end)
---
---   workflow.listen({ queue = "data-pipeline" })

local M = {}

-- Internal state
local _engine_url = nil
local _workflows = {}
local _activities = {}
local _worker_id = nil
local _auth_token = nil

--- Connect to the workflow engine.
--- @param url string Engine URL (e.g. "http://localhost:8080")
--- @param opts? table Optional: { token = "Bearer ..." }
function M.connect(url, opts)
    _engine_url = url:gsub("/$", "") -- strip trailing slash
    if opts and opts.token then
        _auth_token = opts.token
    end
    -- Verify connectivity
    local resp = M._api("GET", "/health")
    if resp.status ~= 200 then
        error("workflow.connect: cannot reach engine at " .. url)
    end
    log.info("Connected to workflow engine at " .. url)
end

--- Define a workflow type. The handler receives a `ctx` whose methods
--- (`execute_activity`, `sleep`, `wait_for_signal`, `side_effect`) drive
--- the engine — see the module-level docstring for the replay model.
--- @param name string Workflow type name (matches `workflow_type` on start)
--- @param handler function(ctx, input) -> result
function M.define(name, handler)
    _workflows[name] = handler
end

--- Define an activity implementation.
--- @param name string Activity name
--- @param handler function(ctx, input) -> result
function M.activity(name, handler)
    _activities[name] = handler
end

--- Start a workflow on the engine (client-side, not as a worker).
--- @param opts table { workflow_type, workflow_id, input?, task_queue? }
--- @return table { workflow_id, run_id, status }
function M.start(opts)
    local body = {
        workflow_type = opts.workflow_type,
        workflow_id = opts.workflow_id,
        input = opts.input,
        task_queue = opts.task_queue or "default",
    }
    local resp = M._api("POST", "/workflows", body)
    if resp.status ~= 201 then
        error("workflow.start failed: " .. (resp.body or "unknown error"))
    end
    return json.parse(resp.body)
end

--- Send a signal to a running workflow.
function M.signal(workflow_id, signal_name, payload)
    local body = payload and { payload = payload } or {}
    local resp = M._api("POST", "/workflows/" .. workflow_id .. "/signal/" .. signal_name, body)
    if resp.status ~= 200 then
        error("workflow.signal failed: " .. (resp.body or "unknown error"))
    end
end

--- Query a workflow's current state.
function M.describe(workflow_id)
    local resp = M._api("GET", "/workflows/" .. workflow_id)
    if resp.status ~= 200 then
        error("workflow.describe failed: " .. (resp.body or "unknown error"))
    end
    return json.parse(resp.body)
end

--- Cancel a running workflow.
function M.cancel(workflow_id)
    local resp = M._api("POST", "/workflows/" .. workflow_id .. "/cancel")
    if resp.status ~= 200 then
        error("workflow.cancel failed: " .. (resp.body or "unknown error"))
    end
end

--- Start listening for tasks. Blocks until cancelled.
--- Polls workflow tasks first (cheap orchestration) then activity tasks.
--- @param opts table { identity?, queue?, max_concurrent_workflows?, max_concurrent_activities? }
function M.listen(opts)
    if not _engine_url then
        error("workflow.listen: call workflow.connect() first")
    end

    local queue = opts.queue or "default"
    local identity = opts.identity or
        ("assay-worker-" .. (os.hostname and os.hostname() or "unknown"))

    -- Collect registered workflow and activity names
    local wf_names, act_names = {}, {}
    for name in pairs(_workflows) do wf_names[#wf_names + 1] = name end
    for name in pairs(_activities) do act_names[#act_names + 1] = name end

    -- Register as a worker
    local reg_resp = M._api("POST", "/workers/register", {
        identity = identity,
        queue = queue,
        workflows = wf_names,
        activities = act_names,
        max_concurrent_workflows = opts.max_concurrent_workflows or 10,
        max_concurrent_activities = opts.max_concurrent_activities or 20,
    })
    if reg_resp.status ~= 200 then
        error("workflow.listen: registration failed: " .. (reg_resp.body or "unknown"))
    end
    _worker_id = json.parse(reg_resp.body).worker_id
    log.info("Registered as worker " .. _worker_id .. " on queue '" .. queue .. "'")

    -- Poll loop
    while true do
        M._api("POST", "/workers/heartbeat", { worker_id = _worker_id })

        local did_work = M._poll_workflow_task(queue) or M._poll_activity_task(queue)

        if not did_work then sleep(0.5) end
    end
end

--- Poll one workflow task and process it. Returns true if work was done.
function M._poll_workflow_task(queue)
    local resp = M._api("POST", "/workflow-tasks/poll", {
        queue = queue,
        worker_id = _worker_id,
    })
    if resp.status ~= 200 or not resp.body or resp.body == "null" or resp.body == "" then
        return false
    end
    local task = json.parse(resp.body)
    if not task or not task.workflow_id then return false end

    local commands = M._handle_workflow_task(task)

    M._api("POST", "/workflow-tasks/" .. task.workflow_id .. "/commands", {
        worker_id = _worker_id,
        commands = commands,
    })
    return true
end

--- Poll one activity task and execute it. Returns true if work was done.
function M._poll_activity_task(queue)
    local resp = M._api("POST", "/tasks/poll", {
        queue = queue,
        worker_id = _worker_id,
    })
    if resp.status ~= 200 or not resp.body or resp.body == "null" or resp.body == "" then
        return false
    end
    local task = json.parse(resp.body)
    if not task or not task.id then return false end

    local ok, result_or_err = pcall(function()
        return M._execute_activity(task)
    end)
    if ok then
        M._api("POST", "/tasks/" .. task.id .. "/complete", { result = result_or_err })
    else
        M._api("POST", "/tasks/" .. task.id .. "/fail", { error = tostring(result_or_err) })
    end
    return true
end

--- Run the workflow handler against the current event history. Returns
--- the list of commands to submit back to the engine — exactly one of:
---  * a single yielded command (`ScheduleActivity`, etc.) when the
---    handler reached an unfulfilled step
---  * `CompleteWorkflow` when the handler returned normally
---  * `FailWorkflow` when the handler raised an error
---
--- One yield per replay keeps the model simple; future versions can batch
--- commands when a workflow yields multiple parallel awaits in one go.
function M._handle_workflow_task(task)
    local handler = _workflows[task.workflow_type]
    if not handler then
        return {{
            type = "FailWorkflow",
            error = "no workflow handler registered for type: " .. tostring(task.workflow_type),
        }}
    end

    local ctx = M._make_workflow_ctx(task.workflow_id, task.history or {})
    local co = coroutine.create(function() return handler(ctx, task.input) end)

    local ok, yielded_or_returned = coroutine.resume(co)
    if not ok then
        local err = tostring(yielded_or_returned)
        -- Cancellation is a clean exit, not a failure — translate the
        -- sentinel raised by ctx:check_cancel back into a CancelWorkflow
        -- command so the engine flips status to CANCELLED (not FAILED).
        if err:find("__ASSAY_WORKFLOW_CANCELLED__", 1, true) then
            return {{ type = "CancelWorkflow" }}
        end
        return {{ type = "FailWorkflow", error = err }}
    end
    if coroutine.status(co) == "dead" then
        return {{ type = "CompleteWorkflow", result = yielded_or_returned }}
    end
    -- Yielded a command — submit it and let a subsequent replay continue
    return { yielded_or_returned }
end

--- Build the workflow ctx object used during replay. Each `ctx:` call
--- increments an internal seq counter and either returns the cached
--- value from history (replay) or yields a command (first time through).
function M._make_workflow_ctx(workflow_id, history)
    -- Pre-index history by per-command seq for O(1) lookups during replay.
    -- Each command type has its own seq space — activity, timer, signal
    -- counters are independent. Signals are matched by name (workflows
    -- typically wait on a specific signal name), and the signal queue
    -- preserves arrival order so multiple of the same name are consumed
    -- in turn.
    local activity_results, fired_timers, side_effects, child_results = {}, {}, {}, {}
    local signals_by_name = {} -- [name] = list of payloads in arrival order
    local cancel_requested = false
    for _, event in ipairs(history) do
        local p = event.payload
        if event.event_type == "ActivityCompleted" and p and p.activity_seq then
            activity_results[p.activity_seq] = { ok = true, value = p.result }
        elseif event.event_type == "ActivityFailed" and p and p.activity_seq then
            activity_results[p.activity_seq] = { ok = false, err = p.error }
        elseif event.event_type == "TimerFired" and p and p.timer_seq then
            fired_timers[p.timer_seq] = true
        elseif event.event_type == "SignalReceived" and p and p.signal then
            signals_by_name[p.signal] = signals_by_name[p.signal] or {}
            table.insert(signals_by_name[p.signal], p.payload)
        elseif event.event_type == "SideEffectRecorded" and p and p.side_effect_seq then
            side_effects[p.side_effect_seq] = p.value
        elseif event.event_type == "ChildWorkflowCompleted" and p and p.child_workflow_id then
            child_results[p.child_workflow_id] = { ok = true, value = p.result }
        elseif event.event_type == "ChildWorkflowFailed" and p and p.child_workflow_id then
            child_results[p.child_workflow_id] = { ok = false, err = p.error }
        elseif event.event_type == "WorkflowCancelRequested" then
            cancel_requested = true
        end
    end

    -- Per-workflow-execution signal cursors track how many signals of each
    -- name have already been consumed by ctx:wait_for_signal calls in
    -- this replay, so a workflow that waits twice for "approve" gets the
    -- first arrival on the first call, the second on the second call.
    local signal_cursor = {}
    local activity_seq, timer_seq, side_effect_seq = 0, 0, 0
    local ctx = { workflow_id = workflow_id }

    -- Helper: any ctx method bails out via this if the workflow has been
    -- requested to cancel. The runner catches the sentinel and emits a
    -- CancelWorkflow command, which finalises the workflow state.
    local function check_cancel()
        if cancel_requested then
            error("__ASSAY_WORKFLOW_CANCELLED__")
        end
    end

    --- Schedule an activity and (synchronously, for the workflow author)
    --- return its result. On replay, returns the cached result from
    --- history; on first execution at this seq, yields a ScheduleActivity
    --- command and the workflow run ends until the activity completes
    --- and the workflow becomes dispatchable again.
    function ctx:execute_activity(name, input, opts)
        check_cancel()
        activity_seq = activity_seq + 1
        local r = activity_results[activity_seq]
        if r then
            if r.ok then return r.value end
            error("activity '" .. name .. "' failed: " .. tostring(r.err))
        end
        coroutine.yield({
            type = "ScheduleActivity",
            seq = activity_seq,
            name = name,
            task_queue = (opts and opts.task_queue) or "default",
            input = input,
            max_attempts = opts and opts.max_attempts,
            initial_interval_secs = opts and opts.initial_interval_secs,
            backoff_coefficient = opts and opts.backoff_coefficient,
            start_to_close_secs = opts and opts.start_to_close_secs,
            heartbeat_timeout_secs = opts and opts.heartbeat_timeout_secs,
        })
        -- Unreachable in single-yield mode — yielding ends this replay.
        error("workflow ctx: yielded but resumed unexpectedly")
    end

    --- Pause the workflow durably for `seconds`. The timer is persisted in
    --- the engine; if the worker dies the timer still fires when due and
    --- another worker picks up the workflow. On replay, returns immediately
    --- once the matching TimerFired event is in history.
    function ctx:sleep(seconds)
        check_cancel()
        timer_seq = timer_seq + 1
        if fired_timers[timer_seq] then return end
        coroutine.yield({
            type = "ScheduleTimer",
            seq = timer_seq,
            duration_secs = seconds,
        })
        error("workflow ctx: yielded but resumed unexpectedly")
    end

    --- Run a non-deterministic operation exactly once. The result is
    --- recorded in the workflow event log and returned from cache on all
    --- subsequent replays — so calls like `crypto.uuid()`, `os.time()`,
    --- or anything reading external mutable state can safely live inside
    --- a workflow handler.
    ---
    --- Conceptually a checkpoint: the function runs in the worker, the
    --- worker yields the value to the engine to record, and the engine
    --- re-dispatches the workflow so it continues with the cached value.
    function ctx:side_effect(name, fn)
        check_cancel()
        side_effect_seq = side_effect_seq + 1
        local cached = side_effects[side_effect_seq]
        if cached ~= nil then
            return cached
        end
        local value = fn()
        coroutine.yield({
            type = "RecordSideEffect",
            seq = side_effect_seq,
            name = name,
            value = value,
        })
        error("workflow ctx: yielded but resumed unexpectedly")
    end

    --- Start a child workflow and (synchronously, for the workflow author)
    --- wait for it to complete. Returns the child's result, or raises if
    --- the child failed. The parent yields and is paused until the child
    --- reaches a terminal state — at which point the engine appends a
    --- ChildWorkflowCompleted/Failed event to the parent and re-dispatches
    --- so the parent's handler can replay past this call.
    ---
    --- `opts.workflow_id` MUST be deterministic — repeated calls during
    --- replay must produce the same id, otherwise idempotency breaks.
    function ctx:start_child_workflow(workflow_type, opts)
        check_cancel()
        if not opts or not opts.workflow_id then
            error("ctx:start_child_workflow: opts.workflow_id is required")
        end
        local cached = child_results[opts.workflow_id]
        if cached then
            if cached.ok then return cached.value end
            error("child workflow '" .. opts.workflow_id ..
                "' failed: " .. tostring(cached.err))
        end
        coroutine.yield({
            type = "StartChildWorkflow",
            workflow_type = workflow_type,
            workflow_id = opts.workflow_id,
            input = opts.input,
            task_queue = opts.task_queue or "default",
        })
        error("workflow ctx: yielded but resumed unexpectedly")
    end

    --- Block until a signal with the given name arrives. Returns the
    --- signal's JSON payload (or nil if signaled with no payload).
    --- The "wait" is purely declarative — the workflow yields, the worker
    --- releases its lease, and a future call to send_signal wakes the
    --- workflow back up via mark_workflow_dispatchable. Multiple waits for
    --- the same signal name consume signals in arrival order.
    function ctx:wait_for_signal(name)
        check_cancel()
        local consumed = signal_cursor[name] or 0
        local arrivals = signals_by_name[name] or {}
        if consumed < #arrivals then
            consumed = consumed + 1
            signal_cursor[name] = consumed
            return arrivals[consumed]
        end
        coroutine.yield({
            type = "WaitForSignal",
            name = name,
        })
        error("workflow ctx: yielded but resumed unexpectedly")
    end

    return ctx
end

--- Execute an activity task (the concrete work; runs once, result persisted).
function M._execute_activity(task)
    local handler = _activities[task.name]
    if not handler then
        error("No handler registered for activity: " .. (task.name or "?"))
    end
    local input = task.input
    if type(input) == "string" then input = json.parse(input) end

    local ctx = M._make_activity_ctx(task)
    return handler(ctx, input)
end

--- Activity ctx — minimal, just exposes a heartbeat so long-running
--- activities can prove they're still alive.
function M._make_activity_ctx(task)
    local ctx = {}
    function ctx:heartbeat(details)
        M._api("POST", "/tasks/" .. task.id .. "/heartbeat", { details = details })
    end
    return ctx
end

--- Internal: make an API call to the engine.
function M._api(method, path, body)
    local url = _engine_url .. "/api/v1" .. path
    local opts = { headers = {} }

    if _auth_token then
        opts.headers["Authorization"] = "Bearer " .. _auth_token
    end

    if method == "GET" then
        return http.get(url, opts)
    elseif method == "POST" then
        return http.post(url, body or {}, opts)
    elseif method == "DELETE" then
        return http.delete(url, opts)
    else
        error("workflow._api: unsupported method: " .. method)
    end
end

return M