use anyhow::Result;
use sqlx::SqlitePool;
use crate::store::{ApiKeyRecord, NamespaceRecord, NamespaceStats, QueueStats, WorkflowStore};
use crate::types::*;
const SCHEMA: &str = r#"
CREATE TABLE IF NOT EXISTS namespaces (
name TEXT PRIMARY KEY,
created_at REAL NOT NULL
);
INSERT OR IGNORE INTO namespaces (name, created_at)
VALUES ('main', strftime('%s', 'now'));
CREATE TABLE IF NOT EXISTS workflows (
id TEXT PRIMARY KEY,
namespace TEXT NOT NULL DEFAULT 'main',
run_id TEXT NOT NULL,
workflow_type TEXT NOT NULL,
task_queue TEXT NOT NULL DEFAULT 'main',
status TEXT NOT NULL DEFAULT 'PENDING',
input TEXT,
result TEXT,
error TEXT,
parent_id TEXT,
claimed_by TEXT,
search_attributes TEXT,
archived_at REAL,
archive_uri TEXT,
-- Workflow-task dispatch (Phase 9): a workflow is "dispatchable" when
-- it has new events a worker needs to replay against. Set true on
-- start, on activity completion, on timer fire, on signal arrival.
-- Cleared when a worker claims the dispatch lease.
needs_dispatch INTEGER NOT NULL DEFAULT 0,
dispatch_claimed_by TEXT,
dispatch_last_heartbeat REAL,
created_at REAL NOT NULL,
updated_at REAL NOT NULL,
completed_at REAL
);
CREATE INDEX IF NOT EXISTS idx_wf_status_queue ON workflows(status, task_queue);
CREATE INDEX IF NOT EXISTS idx_wf_namespace ON workflows(namespace);
CREATE INDEX IF NOT EXISTS idx_wf_dispatch ON workflows(task_queue, needs_dispatch, dispatch_claimed_by);
CREATE TABLE IF NOT EXISTS workflow_events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
workflow_id TEXT NOT NULL REFERENCES workflows(id),
seq INTEGER NOT NULL,
event_type TEXT NOT NULL,
payload TEXT,
timestamp REAL NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_wf_events_lookup ON workflow_events(workflow_id, seq);
CREATE TABLE IF NOT EXISTS workflow_activities (
id INTEGER PRIMARY KEY AUTOINCREMENT,
workflow_id TEXT NOT NULL REFERENCES workflows(id),
seq INTEGER NOT NULL,
name TEXT NOT NULL,
task_queue TEXT NOT NULL DEFAULT 'main',
input TEXT,
status TEXT NOT NULL DEFAULT 'PENDING',
result TEXT,
error TEXT,
attempt INTEGER NOT NULL DEFAULT 1,
max_attempts INTEGER NOT NULL DEFAULT 3,
initial_interval_secs REAL NOT NULL DEFAULT 1,
backoff_coefficient REAL NOT NULL DEFAULT 2,
start_to_close_secs REAL NOT NULL DEFAULT 300,
heartbeat_timeout_secs REAL,
claimed_by TEXT,
scheduled_at REAL NOT NULL,
started_at REAL,
completed_at REAL,
last_heartbeat REAL,
UNIQUE (workflow_id, seq)
);
CREATE INDEX IF NOT EXISTS idx_wf_act_pending ON workflow_activities(task_queue, status, scheduled_at);
CREATE TABLE IF NOT EXISTS workflow_timers (
id INTEGER PRIMARY KEY AUTOINCREMENT,
workflow_id TEXT NOT NULL REFERENCES workflows(id),
seq INTEGER NOT NULL,
fire_at REAL NOT NULL,
fired INTEGER NOT NULL DEFAULT 0,
UNIQUE (workflow_id, seq)
);
CREATE INDEX IF NOT EXISTS idx_wf_timers_due ON workflow_timers(fire_at);
CREATE TABLE IF NOT EXISTS workflow_signals (
id INTEGER PRIMARY KEY AUTOINCREMENT,
workflow_id TEXT NOT NULL REFERENCES workflows(id),
name TEXT NOT NULL,
payload TEXT,
consumed INTEGER NOT NULL DEFAULT 0,
received_at REAL NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_wf_signals_lookup ON workflow_signals(workflow_id, name, consumed);
CREATE TABLE IF NOT EXISTS workflow_schedules (
name TEXT NOT NULL,
namespace TEXT NOT NULL DEFAULT 'main',
workflow_type TEXT NOT NULL,
cron_expr TEXT NOT NULL,
timezone TEXT NOT NULL DEFAULT 'UTC',
input TEXT,
task_queue TEXT NOT NULL DEFAULT 'main',
overlap_policy TEXT NOT NULL DEFAULT 'skip',
paused INTEGER NOT NULL DEFAULT 0,
last_run_at REAL,
next_run_at REAL,
last_workflow_id TEXT,
created_at REAL NOT NULL,
PRIMARY KEY (namespace, name)
);
CREATE TABLE IF NOT EXISTS workflow_workers (
id TEXT PRIMARY KEY,
namespace TEXT NOT NULL DEFAULT 'main',
identity TEXT NOT NULL,
task_queue TEXT NOT NULL,
workflows TEXT,
activities TEXT,
max_concurrent_workflows INTEGER NOT NULL DEFAULT 10,
max_concurrent_activities INTEGER NOT NULL DEFAULT 10,
active_tasks INTEGER NOT NULL DEFAULT 0,
last_heartbeat REAL NOT NULL,
registered_at REAL NOT NULL
);
CREATE TABLE IF NOT EXISTS workflow_snapshots (
workflow_id TEXT NOT NULL REFERENCES workflows(id),
event_seq INTEGER NOT NULL,
state_json TEXT NOT NULL,
created_at REAL NOT NULL,
PRIMARY KEY (workflow_id, event_seq)
);
CREATE TABLE IF NOT EXISTS api_keys (
key_hash TEXT PRIMARY KEY,
prefix TEXT NOT NULL,
label TEXT,
created_at REAL NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_api_keys_prefix ON api_keys(prefix);
CREATE TABLE IF NOT EXISTS engine_lock (
id INTEGER PRIMARY KEY CHECK (id = 1),
instance_id TEXT NOT NULL,
started_at REAL NOT NULL,
last_heartbeat REAL NOT NULL
);
"#;
const LOCK_STALE_SECS: f64 = 60.0;
const LOCK_HEARTBEAT_SECS: u64 = 15;
pub struct SqliteStore {
pool: SqlitePool,
instance_id: String,
}
impl SqliteStore {
pub async fn new(url: &str) -> Result<Self> {
let pool = SqlitePool::connect(url).await?;
let instance_id = format!("assay-{:016x}", {
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
let mut h = DefaultHasher::new();
std::time::SystemTime::now().hash(&mut h);
std::process::id().hash(&mut h);
h.finish()
});
let store = Self { pool, instance_id };
store.migrate().await?;
Ok(store)
}
pub async fn acquire_engine_lock(&self) -> Result<()> {
let now = timestamp_now();
let result = sqlx::query(
"INSERT INTO engine_lock (id, instance_id, started_at, last_heartbeat) VALUES (1, ?, ?, ?)",
)
.bind(&self.instance_id)
.bind(now)
.bind(now)
.execute(&self.pool)
.await;
match result {
Ok(_) => Ok(()),
Err(_) => {
let row: Option<(String, f64)> = sqlx::query_as(
"SELECT instance_id, last_heartbeat FROM engine_lock WHERE id = 1",
)
.fetch_optional(&self.pool)
.await?;
if let Some((existing_id, last_hb)) = row {
if now - last_hb > LOCK_STALE_SECS {
sqlx::query(
"UPDATE engine_lock SET instance_id = ?, started_at = ?, last_heartbeat = ? WHERE id = 1",
)
.bind(&self.instance_id)
.bind(now)
.bind(now)
.execute(&self.pool)
.await?;
tracing::warn!(
"Took over stale engine lock from {existing_id} (last heartbeat {:.0}s ago)",
now - last_hb
);
Ok(())
} else {
let age = now - last_hb;
anyhow::bail!(
"Another assay engine instance is already running (id: {existing_id}, \
last heartbeat {age:.0}s ago).\n\n\
SQLite only supports a single engine instance. For multi-instance \
deployment (Kubernetes, Docker Swarm), use PostgreSQL:\n\n\
\x20 assay serve --backend postgres://user:pass@host:5432/dbname"
);
}
} else {
anyhow::bail!("Unexpected engine lock state");
}
}
}
}
pub async fn refresh_engine_lock(&self) -> Result<()> {
sqlx::query("UPDATE engine_lock SET last_heartbeat = ? WHERE id = 1 AND instance_id = ?")
.bind(timestamp_now())
.bind(&self.instance_id)
.execute(&self.pool)
.await?;
Ok(())
}
pub async fn release_engine_lock(&self) -> Result<()> {
sqlx::query("DELETE FROM engine_lock WHERE id = 1 AND instance_id = ?")
.bind(&self.instance_id)
.execute(&self.pool)
.await?;
Ok(())
}
pub fn spawn_lock_heartbeat(self: &std::sync::Arc<Self>) {
let store = std::sync::Arc::clone(self);
tokio::spawn(async move {
let mut tick = tokio::time::interval(std::time::Duration::from_secs(LOCK_HEARTBEAT_SECS));
loop {
tick.tick().await;
if let Err(e) = store.refresh_engine_lock().await {
tracing::error!("Engine lock heartbeat failed: {e}");
}
}
});
}
async fn migrate(&self) -> Result<()> {
for statement in SCHEMA.split(';') {
let trimmed = statement.trim();
if !trimmed.is_empty() {
sqlx::query(trimmed).execute(&self.pool).await?;
}
}
Ok(())
}
#[allow(dead_code)]
async fn add_column_if_missing(
pool: &SqlitePool,
table: &str,
column: &str,
type_def: &str,
) -> Result<()> {
let exists: Option<(String,)> =
sqlx::query_as("SELECT name FROM pragma_table_info(?) WHERE name = ?")
.bind(table)
.bind(column)
.fetch_optional(pool)
.await?;
if exists.is_none() {
let sql = format!("ALTER TABLE {table} ADD COLUMN {column} {type_def}");
sqlx::query(&sql).execute(pool).await?;
}
Ok(())
}
}
impl WorkflowStore for SqliteStore {
async fn create_namespace(&self, name: &str) -> Result<()> {
sqlx::query("INSERT INTO namespaces (name, created_at) VALUES (?, ?)")
.bind(name)
.bind(timestamp_now())
.execute(&self.pool)
.await?;
Ok(())
}
async fn list_namespaces(&self) -> Result<Vec<NamespaceRecord>> {
let rows = sqlx::query_as::<_, (String, f64)>(
"SELECT name, created_at FROM namespaces ORDER BY name",
)
.fetch_all(&self.pool)
.await?;
Ok(rows
.into_iter()
.map(|(name, created_at)| NamespaceRecord { name, created_at })
.collect())
}
async fn delete_namespace(&self, name: &str) -> Result<bool> {
let res = sqlx::query("DELETE FROM namespaces WHERE name = ?")
.bind(name)
.execute(&self.pool)
.await?;
Ok(res.rows_affected() > 0)
}
async fn get_namespace_stats(&self, namespace: &str) -> Result<NamespaceStats> {
let total: (i64,) =
sqlx::query_as("SELECT COUNT(*) FROM workflows WHERE namespace = ?")
.bind(namespace)
.fetch_one(&self.pool)
.await?;
let running: (i64,) = sqlx::query_as(
"SELECT COUNT(*) FROM workflows WHERE namespace = ? AND status = 'RUNNING'",
)
.bind(namespace)
.fetch_one(&self.pool)
.await?;
let pending: (i64,) = sqlx::query_as(
"SELECT COUNT(*) FROM workflows WHERE namespace = ? AND status = 'PENDING'",
)
.bind(namespace)
.fetch_one(&self.pool)
.await?;
let completed: (i64,) = sqlx::query_as(
"SELECT COUNT(*) FROM workflows WHERE namespace = ? AND status = 'COMPLETED'",
)
.bind(namespace)
.fetch_one(&self.pool)
.await?;
let failed: (i64,) = sqlx::query_as(
"SELECT COUNT(*) FROM workflows WHERE namespace = ? AND status = 'FAILED'",
)
.bind(namespace)
.fetch_one(&self.pool)
.await?;
let schedules: (i64,) =
sqlx::query_as("SELECT COUNT(*) FROM workflow_schedules WHERE namespace = ?")
.bind(namespace)
.fetch_one(&self.pool)
.await?;
let workers: (i64,) =
sqlx::query_as("SELECT COUNT(*) FROM workflow_workers WHERE namespace = ?")
.bind(namespace)
.fetch_one(&self.pool)
.await?;
Ok(NamespaceStats {
namespace: namespace.to_string(),
total_workflows: total.0,
running: running.0,
pending: pending.0,
completed: completed.0,
failed: failed.0,
schedules: schedules.0,
workers: workers.0,
})
}
async fn create_workflow(&self, wf: &WorkflowRecord) -> Result<()> {
sqlx::query(
"INSERT INTO workflows (id, namespace, run_id, workflow_type, task_queue, status, input, result, error, parent_id, claimed_by, search_attributes, archived_at, archive_uri, created_at, updated_at, completed_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
)
.bind(&wf.id)
.bind(&wf.namespace)
.bind(&wf.run_id)
.bind(&wf.workflow_type)
.bind(&wf.task_queue)
.bind(&wf.status)
.bind(&wf.input)
.bind(&wf.result)
.bind(&wf.error)
.bind(&wf.parent_id)
.bind(&wf.claimed_by)
.bind(&wf.search_attributes)
.bind(wf.archived_at)
.bind(&wf.archive_uri)
.bind(wf.created_at)
.bind(wf.updated_at)
.bind(wf.completed_at)
.execute(&self.pool)
.await?;
Ok(())
}
async fn get_workflow(&self, id: &str) -> Result<Option<WorkflowRecord>> {
let row = sqlx::query_as::<_, SqliteWorkflowRow>(
"SELECT id, namespace, run_id, workflow_type, task_queue, status, input, result, error, parent_id, claimed_by, search_attributes, archived_at, archive_uri, created_at, updated_at, completed_at FROM workflows WHERE id = ?",
)
.bind(id)
.fetch_optional(&self.pool)
.await?;
Ok(row.map(Into::into))
}
async fn list_workflows(
&self,
namespace: &str,
status: Option<WorkflowStatus>,
workflow_type: Option<&str>,
search_attrs_filter: Option<&str>,
limit: i64,
offset: i64,
) -> Result<Vec<WorkflowRecord>> {
let status_str = status.map(|s| s.to_string());
let filter_pairs: Vec<(String, serde_json::Value)> = search_attrs_filter
.and_then(|s| serde_json::from_str::<serde_json::Value>(s).ok())
.and_then(|v| v.as_object().cloned())
.map(|m| m.into_iter().collect())
.unwrap_or_default();
let mut sql = String::from(
"SELECT id, namespace, run_id, workflow_type, task_queue, status, input, result, error, parent_id, claimed_by, search_attributes, archived_at, archive_uri, created_at, updated_at, completed_at
FROM workflows
WHERE namespace = ?
AND (? IS NULL OR status = ?)
AND (? IS NULL OR workflow_type = ?)",
);
for _ in &filter_pairs {
sql.push_str(" AND json_extract(search_attributes, '$.' || ?) = ?");
}
sql.push_str(" ORDER BY created_at DESC LIMIT ? OFFSET ?");
let mut q = sqlx::query_as::<_, SqliteWorkflowRow>(&sql)
.bind(namespace)
.bind(&status_str)
.bind(&status_str)
.bind(workflow_type)
.bind(workflow_type);
for (key, value) in &filter_pairs {
q = q.bind(key.clone());
match value {
serde_json::Value::String(s) => q = q.bind(s.clone()),
serde_json::Value::Number(n) => {
if let Some(i) = n.as_i64() {
q = q.bind(i);
} else if let Some(f) = n.as_f64() {
q = q.bind(f);
} else {
q = q.bind(n.to_string());
}
}
serde_json::Value::Bool(b) => q = q.bind(*b as i64),
_ => q = q.bind(value.to_string()),
}
}
let rows = q
.bind(limit)
.bind(offset)
.fetch_all(&self.pool)
.await?;
Ok(rows.into_iter().map(Into::into).collect())
}
async fn update_workflow_status(
&self,
id: &str,
status: WorkflowStatus,
result: Option<&str>,
error: Option<&str>,
) -> Result<()> {
let now = timestamp_now();
let completed_at = if status.is_terminal() { Some(now) } else { None };
sqlx::query(
"UPDATE workflows SET status = ?, result = COALESCE(?, result), error = COALESCE(?, error), updated_at = ?, completed_at = COALESCE(?, completed_at) WHERE id = ?",
)
.bind(status.to_string())
.bind(result)
.bind(error)
.bind(now)
.bind(completed_at)
.bind(id)
.execute(&self.pool)
.await?;
Ok(())
}
async fn claim_workflow(&self, id: &str, worker_id: &str) -> Result<bool> {
let res = sqlx::query(
"UPDATE workflows SET claimed_by = ?, status = 'RUNNING', updated_at = ? WHERE id = ? AND claimed_by IS NULL",
)
.bind(worker_id)
.bind(timestamp_now())
.bind(id)
.execute(&self.pool)
.await?;
Ok(res.rows_affected() > 0)
}
async fn mark_workflow_dispatchable(&self, workflow_id: &str) -> Result<()> {
sqlx::query("UPDATE workflows SET needs_dispatch = 1 WHERE id = ?")
.bind(workflow_id)
.execute(&self.pool)
.await?;
Ok(())
}
async fn claim_workflow_task(
&self,
task_queue: &str,
worker_id: &str,
) -> Result<Option<WorkflowRecord>> {
let now = timestamp_now();
let row = sqlx::query_as::<_, SqliteWorkflowRow>(
"UPDATE workflows
SET dispatch_claimed_by = ?, dispatch_last_heartbeat = ?, needs_dispatch = 0
WHERE id = (
SELECT id FROM workflows
WHERE task_queue = ?
AND needs_dispatch = 1
AND dispatch_claimed_by IS NULL
AND status NOT IN ('COMPLETED', 'FAILED', 'CANCELLED', 'TIMED_OUT')
ORDER BY updated_at ASC
LIMIT 1
)
RETURNING id, namespace, run_id, workflow_type, task_queue, status, input, result, error, parent_id, claimed_by, search_attributes, archived_at, archive_uri, created_at, updated_at, completed_at",
)
.bind(worker_id)
.bind(now)
.bind(task_queue)
.fetch_optional(&self.pool)
.await?;
Ok(row.map(Into::into))
}
async fn release_workflow_task(&self, workflow_id: &str, worker_id: &str) -> Result<()> {
sqlx::query(
"UPDATE workflows
SET dispatch_claimed_by = NULL, dispatch_last_heartbeat = NULL
WHERE id = ? AND dispatch_claimed_by = ?",
)
.bind(workflow_id)
.bind(worker_id)
.execute(&self.pool)
.await?;
Ok(())
}
async fn release_stale_dispatch_leases(
&self,
now: f64,
timeout_secs: f64,
) -> Result<u64> {
let res = sqlx::query(
"UPDATE workflows
SET dispatch_claimed_by = NULL,
dispatch_last_heartbeat = NULL,
needs_dispatch = 1
WHERE dispatch_claimed_by IS NOT NULL
AND (? - dispatch_last_heartbeat) > ?
AND status NOT IN ('COMPLETED', 'FAILED', 'CANCELLED', 'TIMED_OUT')",
)
.bind(now)
.bind(timeout_secs)
.execute(&self.pool)
.await?;
Ok(res.rows_affected())
}
async fn append_event(&self, ev: &WorkflowEvent) -> Result<i64> {
let res = sqlx::query(
"INSERT INTO workflow_events (workflow_id, seq, event_type, payload, timestamp) VALUES (?, ?, ?, ?, ?)",
)
.bind(&ev.workflow_id)
.bind(ev.seq)
.bind(&ev.event_type)
.bind(&ev.payload)
.bind(ev.timestamp)
.execute(&self.pool)
.await?;
Ok(res.last_insert_rowid())
}
async fn list_events(&self, workflow_id: &str) -> Result<Vec<WorkflowEvent>> {
let rows = sqlx::query_as::<_, SqliteEventRow>(
"SELECT id, workflow_id, seq, event_type, payload, timestamp FROM workflow_events WHERE workflow_id = ? ORDER BY seq ASC",
)
.bind(workflow_id)
.fetch_all(&self.pool)
.await?;
Ok(rows.into_iter().map(Into::into).collect())
}
async fn get_event_count(&self, workflow_id: &str) -> Result<i64> {
let row: (i64,) =
sqlx::query_as("SELECT COUNT(*) FROM workflow_events WHERE workflow_id = ?")
.bind(workflow_id)
.fetch_one(&self.pool)
.await?;
Ok(row.0)
}
async fn create_activity(&self, act: &WorkflowActivity) -> Result<i64> {
let res = sqlx::query(
"INSERT INTO workflow_activities (workflow_id, seq, name, task_queue, input, status, attempt, max_attempts, initial_interval_secs, backoff_coefficient, start_to_close_secs, heartbeat_timeout_secs, scheduled_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
)
.bind(&act.workflow_id)
.bind(act.seq)
.bind(&act.name)
.bind(&act.task_queue)
.bind(&act.input)
.bind(&act.status)
.bind(act.attempt)
.bind(act.max_attempts)
.bind(act.initial_interval_secs)
.bind(act.backoff_coefficient)
.bind(act.start_to_close_secs)
.bind(act.heartbeat_timeout_secs)
.bind(act.scheduled_at)
.execute(&self.pool)
.await?;
Ok(res.last_insert_rowid())
}
async fn get_activity(&self, id: i64) -> Result<Option<WorkflowActivity>> {
let row = sqlx::query_as::<_, SqliteActivityRow>(
"SELECT id, workflow_id, seq, name, task_queue, input, status, result, error, attempt, max_attempts, initial_interval_secs, backoff_coefficient, start_to_close_secs, heartbeat_timeout_secs, claimed_by, scheduled_at, started_at, completed_at, last_heartbeat
FROM workflow_activities WHERE id = ?",
)
.bind(id)
.fetch_optional(&self.pool)
.await?;
Ok(row.map(Into::into))
}
async fn get_activity_by_workflow_seq(
&self,
workflow_id: &str,
seq: i32,
) -> Result<Option<WorkflowActivity>> {
let row = sqlx::query_as::<_, SqliteActivityRow>(
"SELECT id, workflow_id, seq, name, task_queue, input, status, result, error, attempt, max_attempts, initial_interval_secs, backoff_coefficient, start_to_close_secs, heartbeat_timeout_secs, claimed_by, scheduled_at, started_at, completed_at, last_heartbeat
FROM workflow_activities WHERE workflow_id = ? AND seq = ?",
)
.bind(workflow_id)
.bind(seq)
.fetch_optional(&self.pool)
.await?;
Ok(row.map(Into::into))
}
async fn claim_activity(
&self,
task_queue: &str,
worker_id: &str,
) -> Result<Option<WorkflowActivity>> {
let now = timestamp_now();
let row = sqlx::query_as::<_, SqliteActivityRow>(
"UPDATE workflow_activities SET status = 'RUNNING', claimed_by = ?, started_at = ?
WHERE id = (
SELECT id FROM workflow_activities
WHERE task_queue = ? AND status = 'PENDING'
ORDER BY scheduled_at ASC
LIMIT 1
)
RETURNING id, workflow_id, seq, name, task_queue, input, status, result, error, attempt, max_attempts, initial_interval_secs, backoff_coefficient, start_to_close_secs, heartbeat_timeout_secs, claimed_by, scheduled_at, started_at, completed_at, last_heartbeat",
)
.bind(worker_id)
.bind(now)
.bind(task_queue)
.fetch_optional(&self.pool)
.await?;
Ok(row.map(Into::into))
}
async fn requeue_activity_for_retry(
&self,
id: i64,
next_attempt: i32,
next_scheduled_at: f64,
) -> Result<()> {
sqlx::query(
"UPDATE workflow_activities
SET status = 'PENDING', attempt = ?, scheduled_at = ?,
claimed_by = NULL, started_at = NULL, last_heartbeat = NULL,
error = NULL
WHERE id = ?",
)
.bind(next_attempt)
.bind(next_scheduled_at)
.bind(id)
.execute(&self.pool)
.await?;
Ok(())
}
async fn complete_activity(
&self,
id: i64,
result: Option<&str>,
error: Option<&str>,
failed: bool,
) -> Result<()> {
let status = if failed { "FAILED" } else { "COMPLETED" };
sqlx::query(
"UPDATE workflow_activities SET status = ?, result = ?, error = ?, completed_at = ? WHERE id = ?",
)
.bind(status)
.bind(result)
.bind(error)
.bind(timestamp_now())
.bind(id)
.execute(&self.pool)
.await?;
Ok(())
}
async fn heartbeat_activity(&self, id: i64, _details: Option<&str>) -> Result<()> {
sqlx::query("UPDATE workflow_activities SET last_heartbeat = ? WHERE id = ?")
.bind(timestamp_now())
.bind(id)
.execute(&self.pool)
.await?;
Ok(())
}
async fn get_timed_out_activities(&self, now: f64) -> Result<Vec<WorkflowActivity>> {
let rows = sqlx::query_as::<_, SqliteActivityRow>(
"SELECT id, workflow_id, seq, name, task_queue, input, status, result, error, attempt, max_attempts, initial_interval_secs, backoff_coefficient, start_to_close_secs, heartbeat_timeout_secs, claimed_by, scheduled_at, started_at, completed_at, last_heartbeat
FROM workflow_activities
WHERE status = 'RUNNING'
AND heartbeat_timeout_secs IS NOT NULL
AND last_heartbeat IS NOT NULL
AND (? - last_heartbeat) > heartbeat_timeout_secs",
)
.bind(now)
.fetch_all(&self.pool)
.await?;
Ok(rows.into_iter().map(Into::into).collect())
}
async fn create_timer(&self, timer: &WorkflowTimer) -> Result<i64> {
let res = sqlx::query(
"INSERT INTO workflow_timers (workflow_id, seq, fire_at, fired) VALUES (?, ?, ?, 0)",
)
.bind(&timer.workflow_id)
.bind(timer.seq)
.bind(timer.fire_at)
.execute(&self.pool)
.await?;
Ok(res.last_insert_rowid())
}
async fn cancel_pending_activities(&self, workflow_id: &str) -> Result<u64> {
let res = sqlx::query(
"UPDATE workflow_activities SET status = 'CANCELLED', completed_at = ?
WHERE workflow_id = ? AND status = 'PENDING'",
)
.bind(timestamp_now())
.bind(workflow_id)
.execute(&self.pool)
.await?;
Ok(res.rows_affected())
}
async fn cancel_pending_timers(&self, workflow_id: &str) -> Result<u64> {
let res = sqlx::query(
"UPDATE workflow_timers SET fired = 1
WHERE workflow_id = ? AND fired = 0",
)
.bind(workflow_id)
.execute(&self.pool)
.await?;
Ok(res.rows_affected())
}
async fn get_timer_by_workflow_seq(
&self,
workflow_id: &str,
seq: i32,
) -> Result<Option<WorkflowTimer>> {
let row = sqlx::query_as::<_, SqliteTimerRow>(
"SELECT id, workflow_id, seq, fire_at, fired
FROM workflow_timers WHERE workflow_id = ? AND seq = ?",
)
.bind(workflow_id)
.bind(seq)
.fetch_optional(&self.pool)
.await?;
Ok(row.map(Into::into))
}
async fn fire_due_timers(&self, now: f64) -> Result<Vec<WorkflowTimer>> {
let rows = sqlx::query_as::<_, SqliteTimerRow>(
"UPDATE workflow_timers SET fired = 1
WHERE fired = 0 AND fire_at <= ?
RETURNING id, workflow_id, seq, fire_at, fired",
)
.bind(now)
.fetch_all(&self.pool)
.await?;
Ok(rows.into_iter().map(Into::into).collect())
}
async fn send_signal(&self, sig: &WorkflowSignal) -> Result<i64> {
let res = sqlx::query(
"INSERT INTO workflow_signals (workflow_id, name, payload, consumed, received_at) VALUES (?, ?, ?, 0, ?)",
)
.bind(&sig.workflow_id)
.bind(&sig.name)
.bind(&sig.payload)
.bind(sig.received_at)
.execute(&self.pool)
.await?;
Ok(res.last_insert_rowid())
}
async fn consume_signals(
&self,
workflow_id: &str,
name: &str,
) -> Result<Vec<WorkflowSignal>> {
let rows = sqlx::query_as::<_, SqliteSignalRow>(
"UPDATE workflow_signals SET consumed = 1
WHERE workflow_id = ? AND name = ? AND consumed = 0
RETURNING id, workflow_id, name, payload, consumed, received_at",
)
.bind(workflow_id)
.bind(name)
.fetch_all(&self.pool)
.await?;
Ok(rows.into_iter().map(Into::into).collect())
}
async fn create_schedule(&self, sched: &WorkflowSchedule) -> Result<()> {
sqlx::query(
"INSERT INTO workflow_schedules (name, namespace, workflow_type, cron_expr, timezone, input, task_queue, overlap_policy, paused, last_run_at, next_run_at, last_workflow_id, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
)
.bind(&sched.name)
.bind(&sched.namespace)
.bind(&sched.workflow_type)
.bind(&sched.cron_expr)
.bind(&sched.timezone)
.bind(&sched.input)
.bind(&sched.task_queue)
.bind(&sched.overlap_policy)
.bind(sched.paused)
.bind(sched.last_run_at)
.bind(sched.next_run_at)
.bind(&sched.last_workflow_id)
.bind(sched.created_at)
.execute(&self.pool)
.await?;
Ok(())
}
async fn get_schedule(&self, namespace: &str, name: &str) -> Result<Option<WorkflowSchedule>> {
let row = sqlx::query_as::<_, SqliteScheduleRow>(
"SELECT name, namespace, workflow_type, cron_expr, timezone, input, task_queue, overlap_policy, paused, last_run_at, next_run_at, last_workflow_id, created_at
FROM workflow_schedules WHERE namespace = ? AND name = ?",
)
.bind(namespace)
.bind(name)
.fetch_optional(&self.pool)
.await?;
Ok(row.map(Into::into))
}
async fn list_schedules(&self, namespace: &str) -> Result<Vec<WorkflowSchedule>> {
let rows = sqlx::query_as::<_, SqliteScheduleRow>(
"SELECT name, namespace, workflow_type, cron_expr, timezone, input, task_queue, overlap_policy, paused, last_run_at, next_run_at, last_workflow_id, created_at
FROM workflow_schedules WHERE namespace = ? ORDER BY name",
)
.bind(namespace)
.fetch_all(&self.pool)
.await?;
Ok(rows.into_iter().map(Into::into).collect())
}
async fn update_schedule_last_run(
&self,
namespace: &str,
name: &str,
last_run_at: f64,
next_run_at: f64,
workflow_id: &str,
) -> Result<()> {
sqlx::query(
"UPDATE workflow_schedules SET last_run_at = ?, next_run_at = ?, last_workflow_id = ? WHERE namespace = ? AND name = ?",
)
.bind(last_run_at)
.bind(next_run_at)
.bind(workflow_id)
.bind(namespace)
.bind(name)
.execute(&self.pool)
.await?;
Ok(())
}
async fn delete_schedule(&self, namespace: &str, name: &str) -> Result<bool> {
let res =
sqlx::query("DELETE FROM workflow_schedules WHERE namespace = ? AND name = ?")
.bind(namespace)
.bind(name)
.execute(&self.pool)
.await?;
Ok(res.rows_affected() > 0)
}
async fn list_archivable_workflows(
&self,
cutoff: f64,
limit: i64,
) -> Result<Vec<WorkflowRecord>> {
let rows = sqlx::query_as::<_, SqliteWorkflowRow>(
"SELECT id, namespace, run_id, workflow_type, task_queue, status, input, result, error, parent_id, claimed_by, search_attributes, archived_at, archive_uri, created_at, updated_at, completed_at
FROM workflows
WHERE status IN ('COMPLETED', 'FAILED', 'CANCELLED', 'TIMED_OUT')
AND completed_at IS NOT NULL
AND completed_at < ?
AND archived_at IS NULL
ORDER BY completed_at ASC
LIMIT ?",
)
.bind(cutoff)
.bind(limit)
.fetch_all(&self.pool)
.await?;
Ok(rows.into_iter().map(Into::into).collect())
}
async fn mark_archived_and_purge(
&self,
workflow_id: &str,
archive_uri: &str,
archived_at: f64,
) -> Result<()> {
let mut tx = self.pool.begin().await?;
sqlx::query("DELETE FROM workflow_events WHERE workflow_id = ?")
.bind(workflow_id)
.execute(&mut *tx)
.await?;
sqlx::query("DELETE FROM workflow_activities WHERE workflow_id = ?")
.bind(workflow_id)
.execute(&mut *tx)
.await?;
sqlx::query("DELETE FROM workflow_timers WHERE workflow_id = ?")
.bind(workflow_id)
.execute(&mut *tx)
.await?;
sqlx::query("DELETE FROM workflow_signals WHERE workflow_id = ?")
.bind(workflow_id)
.execute(&mut *tx)
.await?;
sqlx::query("DELETE FROM workflow_snapshots WHERE workflow_id = ?")
.bind(workflow_id)
.execute(&mut *tx)
.await?;
sqlx::query(
"UPDATE workflows SET archived_at = ?, archive_uri = ? WHERE id = ?",
)
.bind(archived_at)
.bind(archive_uri)
.bind(workflow_id)
.execute(&mut *tx)
.await?;
tx.commit().await?;
Ok(())
}
async fn upsert_search_attributes(
&self,
workflow_id: &str,
patch_json: &str,
) -> Result<()> {
let current: Option<(Option<String>,)> =
sqlx::query_as("SELECT search_attributes FROM workflows WHERE id = ?")
.bind(workflow_id)
.fetch_optional(&self.pool)
.await?;
let merged = merge_search_attrs(
current.and_then(|(s,)| s).as_deref(),
patch_json,
)?;
sqlx::query("UPDATE workflows SET search_attributes = ? WHERE id = ?")
.bind(merged)
.bind(workflow_id)
.execute(&self.pool)
.await?;
Ok(())
}
async fn update_schedule(
&self,
namespace: &str,
name: &str,
patch: &SchedulePatch,
) -> Result<Option<WorkflowSchedule>> {
let mut sets: Vec<&'static str> = Vec::new();
if patch.cron_expr.is_some() {
sets.push("cron_expr = ?");
}
if patch.timezone.is_some() {
sets.push("timezone = ?");
}
if patch.input.is_some() {
sets.push("input = ?");
}
if patch.task_queue.is_some() {
sets.push("task_queue = ?");
}
if patch.overlap_policy.is_some() {
sets.push("overlap_policy = ?");
}
if sets.is_empty() {
return self.get_schedule(namespace, name).await;
}
let sql = format!(
"UPDATE workflow_schedules SET {} WHERE namespace = ? AND name = ?",
sets.join(", ")
);
let mut q = sqlx::query(&sql);
if let Some(ref v) = patch.cron_expr {
q = q.bind(v);
}
if let Some(ref v) = patch.timezone {
q = q.bind(v);
}
if let Some(ref v) = patch.input {
q = q.bind(v.to_string());
}
if let Some(ref v) = patch.task_queue {
q = q.bind(v);
}
if let Some(ref v) = patch.overlap_policy {
q = q.bind(v);
}
let res = q
.bind(namespace)
.bind(name)
.execute(&self.pool)
.await?;
if res.rows_affected() == 0 {
return Ok(None);
}
self.get_schedule(namespace, name).await
}
async fn set_schedule_paused(
&self,
namespace: &str,
name: &str,
paused: bool,
) -> Result<Option<WorkflowSchedule>> {
let res = sqlx::query(
"UPDATE workflow_schedules SET paused = ? WHERE namespace = ? AND name = ?",
)
.bind(paused)
.bind(namespace)
.bind(name)
.execute(&self.pool)
.await?;
if res.rows_affected() == 0 {
return Ok(None);
}
self.get_schedule(namespace, name).await
}
async fn register_worker(&self, w: &WorkflowWorker) -> Result<()> {
sqlx::query(
"INSERT OR REPLACE INTO workflow_workers (id, namespace, identity, task_queue, workflows, activities, max_concurrent_workflows, max_concurrent_activities, active_tasks, last_heartbeat, registered_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
)
.bind(&w.id)
.bind(&w.namespace)
.bind(&w.identity)
.bind(&w.task_queue)
.bind(&w.workflows)
.bind(&w.activities)
.bind(w.max_concurrent_workflows)
.bind(w.max_concurrent_activities)
.bind(w.active_tasks)
.bind(w.last_heartbeat)
.bind(w.registered_at)
.execute(&self.pool)
.await?;
Ok(())
}
async fn heartbeat_worker(&self, id: &str, now: f64) -> Result<()> {
sqlx::query("UPDATE workflow_workers SET last_heartbeat = ? WHERE id = ?")
.bind(now)
.bind(id)
.execute(&self.pool)
.await?;
Ok(())
}
async fn list_workers(&self, namespace: &str) -> Result<Vec<WorkflowWorker>> {
let rows = sqlx::query_as::<_, SqliteWorkerRow>(
"SELECT id, namespace, identity, task_queue, workflows, activities, max_concurrent_workflows, max_concurrent_activities, active_tasks, last_heartbeat, registered_at
FROM workflow_workers WHERE namespace = ? ORDER BY registered_at",
)
.bind(namespace)
.fetch_all(&self.pool)
.await?;
Ok(rows.into_iter().map(Into::into).collect())
}
async fn remove_dead_workers(&self, cutoff: f64) -> Result<Vec<String>> {
let rows: Vec<(String,)> =
sqlx::query_as("SELECT id FROM workflow_workers WHERE last_heartbeat < ?")
.bind(cutoff)
.fetch_all(&self.pool)
.await?;
let ids: Vec<String> = rows.into_iter().map(|r| r.0).collect();
if !ids.is_empty() {
sqlx::query("DELETE FROM workflow_workers WHERE last_heartbeat < ?")
.bind(cutoff)
.execute(&self.pool)
.await?;
}
Ok(ids)
}
async fn create_api_key(
&self,
key_hash: &str,
prefix: &str,
label: Option<&str>,
created_at: f64,
) -> Result<()> {
sqlx::query(
"INSERT INTO api_keys (key_hash, prefix, label, created_at) VALUES (?, ?, ?, ?)",
)
.bind(key_hash)
.bind(prefix)
.bind(label)
.bind(created_at)
.execute(&self.pool)
.await?;
Ok(())
}
async fn validate_api_key(&self, key_hash: &str) -> Result<bool> {
let row: Option<(i64,)> =
sqlx::query_as("SELECT 1 FROM api_keys WHERE key_hash = ?")
.bind(key_hash)
.fetch_optional(&self.pool)
.await?;
Ok(row.is_some())
}
async fn list_api_keys(&self) -> Result<Vec<ApiKeyRecord>> {
let rows = sqlx::query_as::<_, (String, Option<String>, f64)>(
"SELECT prefix, label, created_at FROM api_keys ORDER BY created_at DESC",
)
.fetch_all(&self.pool)
.await?;
Ok(rows
.into_iter()
.map(|(prefix, label, created_at)| ApiKeyRecord {
prefix,
label,
created_at,
})
.collect())
}
async fn revoke_api_key(&self, prefix: &str) -> Result<bool> {
let res = sqlx::query("DELETE FROM api_keys WHERE prefix = ?")
.bind(prefix)
.execute(&self.pool)
.await?;
Ok(res.rows_affected() > 0)
}
async fn api_keys_empty(&self) -> Result<bool> {
let row: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM api_keys")
.fetch_one(&self.pool)
.await?;
Ok(row.0 == 0)
}
async fn get_api_key_by_label(&self, label: &str) -> Result<Option<ApiKeyRecord>> {
let row: Option<(String, Option<String>, f64)> = sqlx::query_as(
"SELECT prefix, label, created_at FROM api_keys WHERE label = ? LIMIT 1",
)
.bind(label)
.fetch_optional(&self.pool)
.await?;
Ok(row.map(|(prefix, label, created_at)| ApiKeyRecord {
prefix,
label,
created_at,
}))
}
async fn list_child_workflows(&self, parent_id: &str) -> Result<Vec<WorkflowRecord>> {
let rows = sqlx::query_as::<_, SqliteWorkflowRow>(
"SELECT id, namespace, run_id, workflow_type, task_queue, status, input, result, error, parent_id, claimed_by, search_attributes, archived_at, archive_uri, created_at, updated_at, completed_at
FROM workflows WHERE parent_id = ? ORDER BY created_at ASC",
)
.bind(parent_id)
.fetch_all(&self.pool)
.await?;
Ok(rows.into_iter().map(Into::into).collect())
}
async fn create_snapshot(
&self,
workflow_id: &str,
event_seq: i32,
state_json: &str,
) -> Result<()> {
sqlx::query(
"INSERT OR REPLACE INTO workflow_snapshots (workflow_id, event_seq, state_json, created_at)
VALUES (?, ?, ?, ?)",
)
.bind(workflow_id)
.bind(event_seq)
.bind(state_json)
.bind(timestamp_now())
.execute(&self.pool)
.await?;
Ok(())
}
async fn get_latest_snapshot(
&self,
workflow_id: &str,
) -> Result<Option<WorkflowSnapshot>> {
let row = sqlx::query_as::<_, (String, i32, String, f64)>(
"SELECT workflow_id, event_seq, state_json, created_at
FROM workflow_snapshots WHERE workflow_id = ?
ORDER BY event_seq DESC LIMIT 1",
)
.bind(workflow_id)
.fetch_optional(&self.pool)
.await?;
Ok(row.map(|(workflow_id, event_seq, state_json, created_at)| WorkflowSnapshot {
workflow_id,
event_seq,
state_json,
created_at,
}))
}
async fn get_queue_stats(&self, namespace: &str) -> Result<Vec<QueueStats>> {
let rows = sqlx::query_as::<_, (String, i64, i64)>(
"SELECT a.task_queue,
SUM(CASE WHEN a.status = 'PENDING' THEN 1 ELSE 0 END),
SUM(CASE WHEN a.status = 'RUNNING' THEN 1 ELSE 0 END)
FROM workflow_activities a
INNER JOIN workflows w ON w.id = a.workflow_id
WHERE w.namespace = ?
GROUP BY a.task_queue",
)
.bind(namespace)
.fetch_all(&self.pool)
.await?;
let mut stats: Vec<QueueStats> = rows
.into_iter()
.map(|(queue, pending, running)| QueueStats {
queue,
pending_activities: pending,
running_activities: running,
workers: 0,
})
.collect();
let worker_rows = sqlx::query_as::<_, (String, i64)>(
"SELECT task_queue, COUNT(*) FROM workflow_workers WHERE namespace = ? GROUP BY task_queue",
)
.bind(namespace)
.fetch_all(&self.pool)
.await?;
for (queue, count) in worker_rows {
if let Some(s) = stats.iter_mut().find(|s| s.queue == queue) {
s.workers = count;
} else {
stats.push(QueueStats {
queue,
pending_activities: 0,
running_activities: 0,
workers: count,
});
}
}
stats.sort_by(|a, b| a.queue.cmp(&b.queue));
Ok(stats)
}
async fn try_acquire_scheduler_lock(&self) -> Result<bool> {
self.refresh_engine_lock().await.ok();
Ok(true)
}
}
fn timestamp_now() -> f64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs_f64()
}
pub(crate) fn merge_search_attrs(current: Option<&str>, patch_json: &str) -> Result<String> {
let mut current_map: serde_json::Map<String, serde_json::Value> = current
.and_then(|s| serde_json::from_str::<serde_json::Value>(s).ok())
.and_then(|v| v.as_object().cloned())
.unwrap_or_default();
let patch: serde_json::Value = serde_json::from_str(patch_json)
.map_err(|e| anyhow::anyhow!("invalid search_attributes patch: {e}"))?;
let patch_obj = patch
.as_object()
.ok_or_else(|| anyhow::anyhow!("search_attributes patch must be a JSON object"))?;
for (k, v) in patch_obj {
current_map.insert(k.clone(), v.clone());
}
Ok(serde_json::Value::Object(current_map).to_string())
}
#[derive(sqlx::FromRow)]
struct SqliteWorkflowRow {
id: String,
namespace: String,
run_id: String,
workflow_type: String,
task_queue: String,
status: String,
input: Option<String>,
result: Option<String>,
error: Option<String>,
parent_id: Option<String>,
claimed_by: Option<String>,
search_attributes: Option<String>,
archived_at: Option<f64>,
archive_uri: Option<String>,
created_at: f64,
updated_at: f64,
completed_at: Option<f64>,
}
impl From<SqliteWorkflowRow> for WorkflowRecord {
fn from(r: SqliteWorkflowRow) -> Self {
Self {
id: r.id,
namespace: r.namespace,
run_id: r.run_id,
workflow_type: r.workflow_type,
task_queue: r.task_queue,
status: r.status,
input: r.input,
result: r.result,
error: r.error,
parent_id: r.parent_id,
claimed_by: r.claimed_by,
search_attributes: r.search_attributes,
archived_at: r.archived_at,
archive_uri: r.archive_uri,
created_at: r.created_at,
updated_at: r.updated_at,
completed_at: r.completed_at,
}
}
}
#[derive(sqlx::FromRow)]
struct SqliteEventRow {
id: i64,
workflow_id: String,
seq: i32,
event_type: String,
payload: Option<String>,
timestamp: f64,
}
impl From<SqliteEventRow> for WorkflowEvent {
fn from(r: SqliteEventRow) -> Self {
Self {
id: Some(r.id),
workflow_id: r.workflow_id,
seq: r.seq,
event_type: r.event_type,
payload: r.payload,
timestamp: r.timestamp,
}
}
}
#[derive(sqlx::FromRow)]
struct SqliteActivityRow {
id: i64,
workflow_id: String,
seq: i32,
name: String,
task_queue: String,
input: Option<String>,
status: String,
result: Option<String>,
error: Option<String>,
attempt: i32,
max_attempts: i32,
initial_interval_secs: f64,
backoff_coefficient: f64,
start_to_close_secs: f64,
heartbeat_timeout_secs: Option<f64>,
claimed_by: Option<String>,
scheduled_at: f64,
started_at: Option<f64>,
completed_at: Option<f64>,
last_heartbeat: Option<f64>,
}
impl From<SqliteActivityRow> for WorkflowActivity {
fn from(r: SqliteActivityRow) -> Self {
Self {
id: Some(r.id),
workflow_id: r.workflow_id,
seq: r.seq,
name: r.name,
task_queue: r.task_queue,
input: r.input,
status: r.status,
result: r.result,
error: r.error,
attempt: r.attempt,
max_attempts: r.max_attempts,
initial_interval_secs: r.initial_interval_secs,
backoff_coefficient: r.backoff_coefficient,
start_to_close_secs: r.start_to_close_secs,
heartbeat_timeout_secs: r.heartbeat_timeout_secs,
claimed_by: r.claimed_by,
scheduled_at: r.scheduled_at,
started_at: r.started_at,
completed_at: r.completed_at,
last_heartbeat: r.last_heartbeat,
}
}
}
#[derive(sqlx::FromRow)]
struct SqliteTimerRow {
id: i64,
workflow_id: String,
seq: i32,
fire_at: f64,
fired: bool,
}
impl From<SqliteTimerRow> for WorkflowTimer {
fn from(r: SqliteTimerRow) -> Self {
Self {
id: Some(r.id),
workflow_id: r.workflow_id,
seq: r.seq,
fire_at: r.fire_at,
fired: r.fired,
}
}
}
#[derive(sqlx::FromRow)]
struct SqliteSignalRow {
id: i64,
workflow_id: String,
name: String,
payload: Option<String>,
consumed: bool,
received_at: f64,
}
impl From<SqliteSignalRow> for WorkflowSignal {
fn from(r: SqliteSignalRow) -> Self {
Self {
id: Some(r.id),
workflow_id: r.workflow_id,
name: r.name,
payload: r.payload,
consumed: r.consumed,
received_at: r.received_at,
}
}
}
#[derive(sqlx::FromRow)]
struct SqliteScheduleRow {
name: String,
namespace: String,
workflow_type: String,
cron_expr: String,
timezone: String,
input: Option<String>,
task_queue: String,
overlap_policy: String,
paused: bool,
last_run_at: Option<f64>,
next_run_at: Option<f64>,
last_workflow_id: Option<String>,
created_at: f64,
}
impl From<SqliteScheduleRow> for WorkflowSchedule {
fn from(r: SqliteScheduleRow) -> Self {
Self {
name: r.name,
namespace: r.namespace,
workflow_type: r.workflow_type,
cron_expr: r.cron_expr,
timezone: r.timezone,
input: r.input,
task_queue: r.task_queue,
overlap_policy: r.overlap_policy,
paused: r.paused,
last_run_at: r.last_run_at,
next_run_at: r.next_run_at,
last_workflow_id: r.last_workflow_id,
created_at: r.created_at,
}
}
}
#[derive(sqlx::FromRow)]
struct SqliteWorkerRow {
id: String,
namespace: String,
identity: String,
task_queue: String,
workflows: Option<String>,
activities: Option<String>,
max_concurrent_workflows: i32,
max_concurrent_activities: i32,
active_tasks: i32,
last_heartbeat: f64,
registered_at: f64,
}
impl From<SqliteWorkerRow> for WorkflowWorker {
fn from(r: SqliteWorkerRow) -> Self {
Self {
id: r.id,
namespace: r.namespace,
identity: r.identity,
task_queue: r.task_queue,
workflows: r.workflows,
activities: r.activities,
max_concurrent_workflows: r.max_concurrent_workflows,
max_concurrent_activities: r.max_concurrent_activities,
active_tasks: r.active_tasks,
last_heartbeat: r.last_heartbeat,
registered_at: r.registered_at,
}
}
}