local M = {}
function M.client(url, opts)
opts = opts or {}
local base_url = url:gsub("/+$", "")
local default_ns = opts.namespace or "default"
local api_key = opts.api_key
local function headers()
local h = { ["Content-Type"] = "application/json" }
if api_key then
h["Authorization"] = "Bearer " .. api_key
end
return h
end
local function resolve_ns(opts_override)
if opts_override and opts_override.namespace then
return opts_override.namespace
end
return default_ns
end
local function api_get(path_str)
local resp = http.get(base_url .. path_str, { headers = headers() })
if resp.status ~= 200 then
error("temporal: GET " .. path_str .. " HTTP " .. resp.status .. ": " .. resp.body)
end
return json.parse(resp.body)
end
local function api_post(path_str, payload)
local resp = http.post(base_url .. path_str, payload, { headers = headers() })
if resp.status ~= 200 then
error("temporal: POST " .. path_str .. " HTTP " .. resp.status .. ": " .. resp.body)
end
return json.parse(resp.body)
end
local function build_query_string(params)
if #params == 0 then return "" end
return "?" .. table.concat(params, "&")
end
local c = {}
function c:health()
local resp = http.get(base_url .. "/health")
return resp.status == 200
end
function c:system_info()
return api_get("/api/v1/system-info")
end
c.namespaces = {}
function c.namespaces:list()
return api_get("/api/v1/namespaces")
end
function c.namespaces:get(name)
return api_get("/api/v1/namespaces/" .. name)
end
c.workflows = {}
function c.workflows:list(wf_opts)
wf_opts = wf_opts or {}
local ns = resolve_ns(wf_opts)
local params = {}
if wf_opts.query then params[#params + 1] = "query=" .. wf_opts.query end
if wf_opts.page_size then params[#params + 1] = "pageSize=" .. wf_opts.page_size end
local qs = build_query_string(params)
return api_get("/api/v1/namespaces/" .. ns .. "/workflows" .. qs)
end
function c.workflows:get(workflow_id, run_id, wf_opts)
wf_opts = wf_opts or {}
local ns = resolve_ns(wf_opts)
local params = {}
if run_id then params[#params + 1] = "runId=" .. run_id end
local qs = build_query_string(params)
return api_get("/api/v1/namespaces/" .. ns .. "/workflows/" .. workflow_id .. qs)
end
function c.workflows:history(workflow_id, run_id, wf_opts)
wf_opts = wf_opts or {}
local ns = resolve_ns(wf_opts)
local params = {}
if run_id then params[#params + 1] = "runId=" .. run_id end
if wf_opts.maximum_page_size then params[#params + 1] = "maximumPageSize=" .. wf_opts.maximum_page_size end
local qs = build_query_string(params)
return api_get("/api/v1/namespaces/" .. ns .. "/workflows/" .. workflow_id .. "/history" .. qs)
end
function c.workflows:signal(workflow_id, signal_name, input, wf_opts)
wf_opts = wf_opts or {}
local ns = resolve_ns(wf_opts)
local params = {}
if wf_opts.run_id then params[#params + 1] = "runId=" .. wf_opts.run_id end
local qs = build_query_string(params)
local body = { signalName = signal_name }
if input then body.input = input end
return api_post("/api/v1/namespaces/" .. ns .. "/workflows/" .. workflow_id .. "/signal" .. qs, body)
end
function c.workflows:terminate(workflow_id, reason, wf_opts)
wf_opts = wf_opts or {}
local ns = resolve_ns(wf_opts)
local params = {}
if wf_opts.run_id then params[#params + 1] = "runId=" .. wf_opts.run_id end
local qs = build_query_string(params)
local body = {}
if reason then body.reason = reason end
return api_post("/api/v1/namespaces/" .. ns .. "/workflows/" .. workflow_id .. "/terminate" .. qs, body)
end
function c.workflows:cancel(workflow_id, wf_opts)
wf_opts = wf_opts or {}
local ns = resolve_ns(wf_opts)
local params = {}
if wf_opts.run_id then params[#params + 1] = "runId=" .. wf_opts.run_id end
local qs = build_query_string(params)
return api_post("/api/v1/namespaces/" .. ns .. "/workflows/" .. workflow_id .. "/cancel" .. qs, {})
end
function c.workflows:search(query, wf_opts)
wf_opts = wf_opts or {}
local ns = resolve_ns(wf_opts)
local params = {}
if query then params[#params + 1] = "query=" .. query end
if wf_opts.page_size then params[#params + 1] = "pageSize=" .. wf_opts.page_size end
local qs = build_query_string(params)
return api_get("/api/v1/namespaces/" .. ns .. "/workflows" .. qs)
end
function c.workflows:is_running(workflow_id, wf_opts)
wf_opts = wf_opts or {}
local wf = c.workflows:get(workflow_id, nil, wf_opts)
if wf and wf.workflowExecutionInfo and wf.workflowExecutionInfo.status then
return wf.workflowExecutionInfo.status == "WORKFLOW_EXECUTION_STATUS_RUNNING"
end
return false
end
function c.workflows:wait_complete(workflow_id, timeout_secs, wf_opts)
wf_opts = wf_opts or {}
local deadline = time() + timeout_secs
while true do
local wf = c.workflows:get(workflow_id, nil, wf_opts)
if wf and wf.workflowExecutionInfo and wf.workflowExecutionInfo.status then
if wf.workflowExecutionInfo.status ~= "WORKFLOW_EXECUTION_STATUS_RUNNING" then
return wf
end
end
if time() >= deadline then
error("temporal: timeout waiting for workflow " .. workflow_id .. " to complete")
end
sleep(5)
end
end
c.task_queues = {}
function c.task_queues:get(name, tq_opts)
tq_opts = tq_opts or {}
local ns = resolve_ns(tq_opts)
local params = {}
if tq_opts.task_queue_type then params[#params + 1] = "taskQueueType=" .. tq_opts.task_queue_type end
local qs = build_query_string(params)
return api_get("/api/v1/namespaces/" .. ns .. "/task-queues/" .. name .. qs)
end
c.schedules = {}
function c.schedules:list(sched_opts)
sched_opts = sched_opts or {}
local ns = resolve_ns(sched_opts)
local params = {}
if sched_opts.maximum_page_size then params[#params + 1] = "maximumPageSize=" .. sched_opts.maximum_page_size end
local qs = build_query_string(params)
return api_get("/api/v1/namespaces/" .. ns .. "/schedules" .. qs)
end
function c.schedules:get(schedule_id, sched_opts)
sched_opts = sched_opts or {}
local ns = resolve_ns(sched_opts)
return api_get("/api/v1/namespaces/" .. ns .. "/schedules/" .. schedule_id)
end
return c
end
return M