#![allow(clippy::expect_used)]
#![allow(clippy::unwrap_used)]
use sqlx::sqlite::{SqlitePool, SqlitePoolOptions};
use sqlx::{Row, Sqlite, Transaction};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tracing::debug;
use super::{
DeleteInstanceResult, DispatcherCapabilityFilter, ExecutionInfo, InstanceFilter, InstanceInfo, OrchestrationItem,
Provider, ProviderAdmin, ProviderError, PruneOptions, PruneResult, QueueDepths, ScheduledActivityIdentifier,
SessionFetchConfig, SystemMetrics, TagFilter, WorkItem,
};
use crate::{Event, EventKind};
const DEFAULT_BULK_OPERATION_LIMIT: u32 = 1000;
#[derive(Debug, Clone, Default)]
pub struct SqliteOptions {
}
pub struct SqliteProvider {
pool: SqlitePool,
}
impl SqliteProvider {
fn sqlx_to_provider_error(operation: &str, e: sqlx::Error) -> ProviderError {
let error_msg = e.to_string();
if error_msg.contains("database is locked") || error_msg.contains("SQLITE_BUSY") {
return ProviderError::retryable(operation, format!("Database locked: {error_msg}"));
}
if error_msg.contains("UNIQUE constraint") || error_msg.contains("PRIMARY KEY") {
return ProviderError::permanent(operation, format!("Constraint violation: {error_msg}"));
}
if error_msg.contains("connection") || error_msg.contains("timeout") {
return ProviderError::retryable(operation, format!("Connection error: {error_msg}"));
}
ProviderError::retryable(operation, error_msg)
}
async fn enqueue_orchestrator_work_with_delay(
&self,
item: WorkItem,
delay: Option<Duration>,
) -> Result<(), ProviderError> {
let work_item = serde_json::to_string(&item)
.map_err(|e| ProviderError::permanent("enqueue_for_orchestrator", format!("Serialization error: {e}")))?;
let instance = match &item {
WorkItem::StartOrchestration { instance, .. }
| WorkItem::ActivityCompleted { instance, .. }
| WorkItem::ActivityFailed { instance, .. }
| WorkItem::TimerFired { instance, .. }
| WorkItem::ExternalRaised { instance, .. }
| WorkItem::QueueMessage { instance, .. }
| WorkItem::CancelInstance { instance, .. }
| WorkItem::ContinueAsNew { instance, .. } => instance,
#[cfg(feature = "replay-version-test")]
WorkItem::ExternalRaised2 { instance, .. } => instance,
WorkItem::SubOrchCompleted { parent_instance, .. } | WorkItem::SubOrchFailed { parent_instance, .. } => {
parent_instance
}
_ => {
return Err(ProviderError::permanent(
"enqueue_for_orchestrator",
"Invalid work item type",
));
}
};
tracing::debug!(
target: "duroxide::providers::sqlite",
?item,
instance = %instance,
delay = ?delay,
"enqueue_orchestrator_work_with_delay"
);
let visible_at = if let Some(delay) = delay {
let delay_ms = delay.as_millis().min(i64::MAX as u128) as i64;
Self::now_millis().saturating_add(delay_ms)
} else {
Self::now_millis()
};
sqlx::query("INSERT INTO orchestrator_queue (instance_id, work_item, visible_at) VALUES (?, ?, ?)")
.bind(instance)
.bind(work_item)
.bind(visible_at)
.execute(&self.pool)
.await
.map_err(|e| Self::sqlx_to_provider_error("enqueue_for_orchestrator", e))?;
Ok(())
}
pub async fn new(database_url: &str, _options: Option<SqliteOptions>) -> Result<Self, sqlx::Error> {
let is_memory = database_url.contains(":memory:") || database_url.contains("mode=memory");
let pool = SqlitePoolOptions::new()
.max_connections(5)
.after_connect(move |conn, _meta| {
Box::pin({
let is_memory = is_memory;
async move {
if is_memory {
sqlx::query("PRAGMA journal_mode = MEMORY").execute(&mut *conn).await?;
sqlx::query("PRAGMA synchronous = OFF").execute(&mut *conn).await?;
} else {
sqlx::query("PRAGMA journal_mode = WAL").execute(&mut *conn).await?;
sqlx::query("PRAGMA synchronous = WAL").execute(&mut *conn).await?;
sqlx::query("PRAGMA wal_autocheckpoint = 10000")
.execute(&mut *conn)
.await?;
sqlx::query("PRAGMA cache_size = -64000").execute(&mut *conn).await?;
}
sqlx::query("PRAGMA busy_timeout = 60000").execute(&mut *conn).await?;
sqlx::query("PRAGMA foreign_keys = ON").execute(&mut *conn).await?;
Ok(())
}
})
})
.connect(database_url)
.await?;
if database_url.contains(":memory:") || database_url.contains("mode=memory") {
Self::create_schema(&pool).await?;
} else {
match sqlx::migrate!("./migrations").run(&pool).await {
Ok(_) => {
tracing::debug!("Successfully ran migrations");
}
Err(e) => {
tracing::debug!("Migration failed: {}, falling back to create_schema", e);
Self::create_schema(&pool).await?;
}
}
}
Ok(Self { pool })
}
pub async fn new_in_memory() -> Result<Self, sqlx::Error> {
Self::new_in_memory_with_options(None).await
}
pub async fn new_in_memory_with_options(options: Option<SqliteOptions>) -> Result<Self, sqlx::Error> {
let url = "sqlite::memory:?cache=shared";
Self::new(url, options).await
}
pub async fn checkpoint(&self) -> Result<(), sqlx::Error> {
sqlx::query("PRAGMA wal_checkpoint(FULL)").execute(&self.pool).await?;
Ok(())
}
pub async fn debug_dump(&self) -> String {
let mut out = String::new();
let mut conn = match self.pool.acquire().await {
Ok(c) => c,
Err(e) => return format!("<debug_dump: acquire error: {e}>"),
};
if let Ok((cnt,)) = sqlx::query_as::<_, (i64,)>("SELECT COUNT(*) FROM orchestrator_queue")
.fetch_one(&mut *conn)
.await
{
out.push_str(&format!("orchestrator_queue.count = {cnt}\n"));
}
if let Ok(rows) = sqlx::query(
r#"SELECT id, instance_id, lock_token, locked_until, work_item FROM orchestrator_queue ORDER BY id LIMIT 10"#
).fetch_all(&mut *conn).await {
out.push_str("orchestrator_queue.sample:\n");
for r in rows { let id: i64 = r.try_get("id").unwrap_or_default(); let inst: String = r.try_get("instance_id").unwrap_or_default(); let lock: Option<String> = r.try_get("lock_token").unwrap_or_default(); let until: Option<i64> = r.try_get("locked_until").unwrap_or_default(); let item: String = r.try_get("work_item").unwrap_or_default(); out.push_str(&format!(" id={id}, inst={inst}, lock={lock:?}, until={until:?}, item={item}\n")); }
}
if let Ok((cnt,)) = sqlx::query_as::<_, (i64,)>("SELECT COUNT(*) FROM worker_queue")
.fetch_one(&mut *conn)
.await
{
out.push_str(&format!("worker_queue.count = {cnt}\n"));
}
if let Ok(rows) =
sqlx::query(r#"SELECT id, lock_token, locked_until, work_item FROM worker_queue ORDER BY id LIMIT 10"#)
.fetch_all(&mut *conn)
.await
{
out.push_str("worker_queue.sample:\n");
for r in rows {
let id: i64 = r.try_get("id").unwrap_or_default();
let lock: Option<String> = r.try_get("lock_token").unwrap_or_default();
let until: Option<i64> = r.try_get("locked_until").unwrap_or_default();
let item: String = r.try_get("work_item").unwrap_or_default();
out.push_str(&format!(" id={id}, lock={lock:?}, until={until:?}, item={item}\n"));
}
}
out
}
async fn create_schema(pool: &SqlitePool) -> Result<(), sqlx::Error> {
sqlx::query(
r#"
CREATE TABLE IF NOT EXISTS instances (
instance_id TEXT PRIMARY KEY,
orchestration_name TEXT NOT NULL,
orchestration_version TEXT,
current_execution_id INTEGER NOT NULL DEFAULT 1,
custom_status TEXT,
custom_status_version INTEGER NOT NULL DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
parent_instance_id TEXT REFERENCES instances(instance_id)
)
"#,
)
.execute(pool)
.await?;
sqlx::query(r#"CREATE INDEX IF NOT EXISTS idx_instances_parent ON instances(parent_instance_id)"#)
.execute(pool)
.await?;
sqlx::query(
r#"
CREATE TABLE IF NOT EXISTS executions (
instance_id TEXT NOT NULL,
execution_id INTEGER NOT NULL,
status TEXT NOT NULL DEFAULT 'Running',
output TEXT,
duroxide_version_major INTEGER,
duroxide_version_minor INTEGER,
duroxide_version_patch INTEGER,
started_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
completed_at TIMESTAMP,
PRIMARY KEY (instance_id, execution_id)
)
"#,
)
.execute(pool)
.await?;
let column_exists: bool =
sqlx::query_scalar::<_, i64>("SELECT COUNT(*) FROM pragma_table_info('executions') WHERE name = 'output'")
.fetch_one(pool)
.await
.unwrap_or(0)
> 0;
if !column_exists {
sqlx::query("ALTER TABLE executions ADD COLUMN output TEXT")
.execute(pool)
.await?;
debug!("Added output column to executions table");
}
let custom_status_on_instances: bool = sqlx::query_scalar::<_, i64>(
"SELECT COUNT(*) FROM pragma_table_info('instances') WHERE name = 'custom_status'",
)
.fetch_one(pool)
.await
.unwrap_or(0)
> 0;
if !custom_status_on_instances {
sqlx::query("ALTER TABLE instances ADD COLUMN custom_status TEXT")
.execute(pool)
.await?;
sqlx::query("ALTER TABLE instances ADD COLUMN custom_status_version INTEGER NOT NULL DEFAULT 0")
.execute(pool)
.await?;
debug!("Added custom_status columns to instances table");
}
sqlx::query(
r#"
CREATE TABLE IF NOT EXISTS history (
instance_id TEXT NOT NULL,
execution_id INTEGER NOT NULL,
event_id INTEGER NOT NULL,
event_type TEXT NOT NULL,
event_data TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (instance_id, execution_id, event_id)
)
"#,
)
.execute(pool)
.await?;
sqlx::query(
r#"
CREATE TABLE IF NOT EXISTS orchestrator_queue (
id INTEGER PRIMARY KEY AUTOINCREMENT,
instance_id TEXT NOT NULL,
work_item TEXT NOT NULL,
visible_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
lock_token TEXT,
locked_until TIMESTAMP,
attempt_count INTEGER NOT NULL DEFAULT 0 CHECK(attempt_count >= 0),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
"#,
)
.execute(pool)
.await?;
sqlx::query(
r#"
CREATE TABLE IF NOT EXISTS worker_queue (
id INTEGER PRIMARY KEY AUTOINCREMENT,
work_item TEXT NOT NULL,
visible_at INTEGER NOT NULL DEFAULT 0,
lock_token TEXT,
locked_until TIMESTAMP,
attempt_count INTEGER NOT NULL DEFAULT 0 CHECK(attempt_count >= 0),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
instance_id TEXT,
execution_id TEXT,
activity_id INTEGER,
session_id TEXT,
tag TEXT
)
"#,
)
.execute(pool)
.await?;
sqlx::query(
r#"
CREATE TABLE IF NOT EXISTS instance_locks (
instance_id TEXT PRIMARY KEY,
lock_token TEXT NOT NULL,
locked_until INTEGER NOT NULL,
locked_at INTEGER NOT NULL
)
"#,
)
.execute(pool)
.await?;
sqlx::query("CREATE INDEX IF NOT EXISTS idx_orch_visible ON orchestrator_queue(visible_at, lock_token)")
.execute(pool)
.await?;
sqlx::query("CREATE INDEX IF NOT EXISTS idx_orch_instance ON orchestrator_queue(instance_id)")
.execute(pool)
.await?;
sqlx::query("CREATE INDEX IF NOT EXISTS idx_orch_lock ON orchestrator_queue(lock_token)")
.execute(pool)
.await?;
sqlx::query("CREATE INDEX IF NOT EXISTS idx_worker_available ON worker_queue(lock_token, id)")
.execute(pool)
.await?;
sqlx::query(
"CREATE INDEX IF NOT EXISTS idx_worker_identity ON worker_queue(instance_id, execution_id, activity_id)",
)
.execute(pool)
.await?;
sqlx::query("CREATE INDEX IF NOT EXISTS idx_worker_queue_session ON worker_queue(session_id)")
.execute(pool)
.await?;
sqlx::query("CREATE INDEX IF NOT EXISTS idx_worker_queue_tag ON worker_queue(tag)")
.execute(pool)
.await?;
sqlx::query(
r#"
CREATE TABLE IF NOT EXISTS sessions (
session_id TEXT PRIMARY KEY,
worker_id TEXT NOT NULL,
locked_until INTEGER NOT NULL,
last_activity_at INTEGER NOT NULL
)
"#,
)
.execute(pool)
.await?;
sqlx::query(
r#"
CREATE TABLE IF NOT EXISTS kv_store (
instance_id TEXT NOT NULL,
key TEXT NOT NULL,
value TEXT NOT NULL,
last_updated_at_ms INTEGER NOT NULL DEFAULT 0,
PRIMARY KEY (instance_id, key)
)
"#,
)
.execute(pool)
.await?;
sqlx::query(
r#"
CREATE TABLE IF NOT EXISTS kv_delta (
instance_id TEXT NOT NULL,
key TEXT NOT NULL,
value TEXT,
last_updated_at_ms INTEGER NOT NULL,
PRIMARY KEY (instance_id, key)
)
"#,
)
.execute(pool)
.await?;
Ok(())
}
fn generate_lock_token() -> String {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("system clock should be after UNIX epoch")
.as_nanos();
format!("lock_{now}_{}", std::process::id())
}
fn now_millis() -> i64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("system clock should be after UNIX epoch")
.as_millis() as i64
}
fn timestamp_after(duration: Duration) -> i64 {
Self::now_millis() + duration.as_millis() as i64
}
fn build_tag_clause(filter: &TagFilter, start_param: usize) -> String {
match filter {
TagFilter::DefaultOnly => "q.tag IS NULL".to_string(),
TagFilter::Tags(set) => {
let placeholders: Vec<_> = (0..set.len()).map(|i| format!("?{}", start_param + i)).collect();
format!("q.tag IN ({})", placeholders.join(", "))
}
TagFilter::DefaultAnd(set) => {
let placeholders: Vec<_> = (0..set.len()).map(|i| format!("?{}", start_param + i)).collect();
format!("(q.tag IS NULL OR q.tag IN ({}))", placeholders.join(", "))
}
TagFilter::Any => "1".to_string(), TagFilter::None => "0".to_string(), }
}
fn collect_tag_values(filter: &TagFilter) -> Vec<String> {
match filter {
TagFilter::Tags(set) | TagFilter::DefaultAnd(set) => {
let mut v: Vec<String> = set.iter().cloned().collect();
v.sort(); v
}
_ => Vec::new(),
}
}
async fn read_history_in_tx(
&self,
tx: &mut Transaction<'_, Sqlite>,
instance: &str,
execution_id: Option<u64>,
) -> Result<Vec<Event>, sqlx::Error> {
let execution_id = match execution_id {
Some(id) => id as i64,
None => {
sqlx::query_scalar::<_, i64>(
"SELECT COALESCE(MAX(execution_id), 1) FROM executions WHERE instance_id = ?",
)
.bind(instance)
.fetch_one(&mut **tx)
.await?
}
};
let rows = sqlx::query(
r#"
SELECT event_data
FROM history
WHERE instance_id = ? AND execution_id = ?
ORDER BY event_id
"#,
)
.bind(instance)
.bind(execution_id)
.fetch_all(&mut **tx)
.await?;
let mut events = Vec::new();
for (idx, row) in rows.iter().enumerate() {
let event_data: String = row.try_get("event_data")?;
let event: Event = serde_json::from_str::<Event>(&event_data).map_err(|e| {
sqlx::Error::Protocol(format!(
"Failed to deserialize history event at position {idx} for instance '{instance}' execution {execution_id}: {e}"
))
})?;
events.push(event);
}
Ok(events)
}
async fn get_kv_snapshot_in_tx(
&self,
tx: &mut Transaction<'_, Sqlite>,
instance: &str,
) -> Result<std::collections::HashMap<String, crate::providers::KvEntry>, ProviderError> {
let rows = sqlx::query("SELECT key, value, last_updated_at_ms FROM kv_store WHERE instance_id = ?")
.bind(instance)
.fetch_all(&mut **tx)
.await
.map_err(|e| Self::sqlx_to_provider_error("get_kv_snapshot_in_tx", e))?;
let mut map = std::collections::HashMap::new();
for row in rows {
let k: String = row
.try_get("key")
.map_err(|e| Self::sqlx_to_provider_error("get_kv_snapshot_in_tx", e))?;
let v: String = row
.try_get("value")
.map_err(|e| Self::sqlx_to_provider_error("get_kv_snapshot_in_tx", e))?;
let ts: i64 = row
.try_get("last_updated_at_ms")
.map_err(|e| Self::sqlx_to_provider_error("get_kv_snapshot_in_tx", e))?;
map.insert(
k,
crate::providers::KvEntry {
value: v,
last_updated_at_ms: ts as u64,
},
);
}
Ok(map)
}
async fn append_history_in_tx(
&self,
tx: &mut Transaction<'_, Sqlite>,
instance: &str,
execution_id: u64,
events: Vec<Event>,
) -> Result<(), sqlx::Error> {
for event in &events {
if event.event_id() == 0 {
return Err(sqlx::Error::Protocol("event_id must be set by runtime".into()));
}
}
for event in &events {
let event_type = match &event.kind {
EventKind::OrchestrationStarted { .. } => "OrchestrationStarted",
EventKind::OrchestrationCompleted { .. } => "OrchestrationCompleted",
EventKind::OrchestrationFailed { .. } => "OrchestrationFailed",
EventKind::OrchestrationContinuedAsNew { .. } => "OrchestrationContinuedAsNew",
EventKind::ActivityScheduled { .. } => "ActivityScheduled",
EventKind::ActivityCompleted { .. } => "ActivityCompleted",
EventKind::ActivityFailed { .. } => "ActivityFailed",
EventKind::ActivityCancelRequested { .. } => "ActivityCancelRequested",
EventKind::TimerCreated { .. } => "TimerCreated",
EventKind::TimerFired { .. } => "TimerFired",
EventKind::ExternalSubscribed { .. } => "ExternalSubscribed",
EventKind::ExternalEvent { .. } => "ExternalEvent",
EventKind::SubOrchestrationScheduled { .. } => "SubOrchestrationScheduled",
EventKind::SubOrchestrationCompleted { .. } => "SubOrchestrationCompleted",
EventKind::SubOrchestrationFailed { .. } => "SubOrchestrationFailed",
EventKind::SubOrchestrationCancelRequested { .. } => "SubOrchestrationCancelRequested",
EventKind::OrchestrationCancelRequested { .. } => "OrchestrationCancelRequested",
EventKind::ExternalSubscribedCancelled { .. } => "ExternalSubscribedCancelled",
EventKind::QueueSubscribed { .. } => "ExternalSubscribedPersistent",
EventKind::QueueEventDelivered { .. } => "ExternalEventPersistent",
EventKind::QueueSubscriptionCancelled { .. } => "ExternalSubscribedPersistentCancelled",
EventKind::OrchestrationChained { .. } => "OrchestrationChained",
EventKind::CustomStatusUpdated { .. } => "CustomStatusUpdated",
EventKind::KeyValueSet { .. } => "KeyValueSet",
EventKind::KeyValueCleared { .. } => "KeyValueCleared",
EventKind::KeyValuesCleared => "KeyValuesCleared",
#[cfg(feature = "replay-version-test")]
EventKind::ExternalSubscribed2 { .. } => "ExternalSubscribed2",
#[cfg(feature = "replay-version-test")]
EventKind::ExternalEvent2 { .. } => "ExternalEvent2",
};
let event_data = serde_json::to_string(&event)
.expect("Event serialization should never fail - this is a programming error");
let event_id = event.event_id() as i64;
sqlx::query(
r#"
INSERT INTO history (instance_id, execution_id, event_id, event_type, event_data)
VALUES (?, ?, ?, ?, ?)
"#,
)
.bind(instance)
.bind(execution_id as i64)
.bind(event_id)
.bind(event_type)
.bind(event_data)
.execute(&mut **tx)
.await?;
}
Ok(())
}
pub fn get_pool(&self) -> &sqlx::SqlitePool {
&self.pool
}
}
#[async_trait::async_trait]
impl Provider for SqliteProvider {
fn name(&self) -> &str {
"sqlite"
}
fn version(&self) -> &str {
env!("CARGO_PKG_VERSION")
}
async fn fetch_orchestration_item(
&self,
lock_timeout: Duration,
_poll_timeout: Duration,
filter: Option<&DispatcherCapabilityFilter>,
) -> Result<Option<(OrchestrationItem, String, u32)>, ProviderError> {
let mut tx = self
.pool
.begin()
.await
.map_err(|e| Self::sqlx_to_provider_error("fetch_orchestration_item", e))?;
let now_ms = Self::now_millis();
let row = if let Some(cap_filter) = filter {
let range = match cap_filter.supported_duroxide_versions.first() {
Some(r) => r,
None => {
tx.commit()
.await
.map_err(|e| Self::sqlx_to_provider_error("fetch_orchestration_item", e))?;
return Ok(None);
}
};
let min_packed =
range.min.major as i64 * 1_000_000 + range.min.minor as i64 * 1_000 + range.min.patch as i64;
let max_packed =
range.max.major as i64 * 1_000_000 + range.max.minor as i64 * 1_000 + range.max.patch as i64;
sqlx::query(
r#"
SELECT q.instance_id
FROM orchestrator_queue q
LEFT JOIN instance_locks il ON q.instance_id = il.instance_id
LEFT JOIN instances i ON q.instance_id = i.instance_id
LEFT JOIN executions e ON i.instance_id = e.instance_id AND i.current_execution_id = e.execution_id
WHERE q.visible_at <= ?1
AND (il.instance_id IS NULL OR il.locked_until <= ?1)
AND (
e.duroxide_version_major IS NULL
OR (e.duroxide_version_major * 1000000 + e.duroxide_version_minor * 1000 + e.duroxide_version_patch) BETWEEN ?2 AND ?3
)
ORDER BY q.id
LIMIT 1
"#,
)
.bind(now_ms)
.bind(min_packed)
.bind(max_packed)
.fetch_optional(&mut *tx)
.await
.map_err(|e| Self::sqlx_to_provider_error("fetch_orchestration_item", e))?
} else {
sqlx::query(
r#"
SELECT q.instance_id
FROM orchestrator_queue q
LEFT JOIN instance_locks il ON q.instance_id = il.instance_id
WHERE q.visible_at <= ?1
AND (il.instance_id IS NULL OR il.locked_until <= ?1)
ORDER BY q.id
LIMIT 1
"#,
)
.bind(now_ms)
.fetch_optional(&mut *tx)
.await
.map_err(|e| Self::sqlx_to_provider_error("fetch_orchestration_item", e))?
};
if row.is_none() {
let msg_count: Option<i64> = sqlx::query_scalar("SELECT COUNT(*) FROM orchestrator_queue")
.fetch_one(&mut *tx)
.await
.map_err(|e| Self::sqlx_to_provider_error("fetch_orchestration_item", e))?;
tracing::debug!(
target = "duroxide::providers::sqlite",
total_messages=?msg_count,
"No available instances"
);
tx.rollback().await.ok();
return Ok(None);
}
let instance_id: String = row
.ok_or_else(|| ProviderError::permanent("fetch_orchestration_item", "No instance found"))?
.try_get("instance_id")
.map_err(|e| {
ProviderError::permanent("fetch_orchestration_item", format!("Failed to get instance_id: {e}"))
})?;
tracing::debug!(target="duroxide::providers::sqlite", instance_id=%instance_id, "Selected available instance");
let lock_token = Self::generate_lock_token();
let locked_until = Self::timestamp_after(lock_timeout);
let lock_result = sqlx::query(
r#"
INSERT INTO instance_locks (instance_id, lock_token, locked_until, locked_at)
VALUES (?1, ?2, ?3, ?4)
ON CONFLICT(instance_id) DO UPDATE
SET lock_token = ?2, locked_until = ?3, locked_at = ?4
WHERE locked_until <= ?4
"#,
)
.bind(&instance_id)
.bind(&lock_token)
.bind(locked_until)
.bind(now_ms)
.execute(&mut *tx)
.await;
match lock_result {
Ok(result) => {
let affected = result.rows_affected();
tracing::debug!(target="duroxide::providers::sqlite", instance=%instance_id, rows_affected=affected, "Instance lock result");
if affected == 0 {
tracing::debug!(target="duroxide::providers::sqlite", instance=%instance_id, "Failed to acquire instance lock (already locked)");
tx.rollback().await.ok();
return Ok(None);
}
}
Err(e) => {
tracing::debug!(target="duroxide::providers::sqlite", instance=%instance_id, error=%e.to_string(), "Error acquiring instance lock");
tx.rollback().await.ok();
return Err(Self::sqlx_to_provider_error("fetch_orchestration_item", e));
}
}
let update_result = sqlx::query(
r#"
UPDATE orchestrator_queue
SET lock_token = ?1, attempt_count = attempt_count + 1
WHERE instance_id = ?2 AND visible_at <= ?3
"#,
)
.bind(&lock_token)
.bind(&instance_id)
.bind(now_ms)
.execute(&mut *tx)
.await
.map_err(|e| Self::sqlx_to_provider_error("fetch_orchestration_item", e))?;
let messages = sqlx::query(
r#"
SELECT id, work_item, attempt_count
FROM orchestrator_queue
WHERE lock_token = ?1
ORDER BY id
"#,
)
.bind(&lock_token)
.fetch_all(&mut *tx)
.await
.map_err(|e| Self::sqlx_to_provider_error("fetch_orchestration_item", e))?;
tracing::debug!(target="duroxide::providers::sqlite", message_count=%messages.len(), rows_updated=%update_result.rows_affected(), instance=%instance_id, "Fetched and marked messages for locked instance");
if messages.is_empty() {
sqlx::query("DELETE FROM instance_locks WHERE instance_id = ?")
.bind(&instance_id)
.execute(&mut *tx)
.await
.ok();
tx.rollback().await.ok();
return Ok(None);
}
let mut max_attempt_count: u32 = 0;
let work_items: Vec<WorkItem> = messages
.iter()
.filter_map(|r| {
if let Ok(attempt_count) = r.try_get::<i64, _>("attempt_count") {
max_attempt_count = max_attempt_count.max(attempt_count as u32);
}
r.try_get::<String, _>("work_item")
.ok()
.and_then(|s| serde_json::from_str(&s).ok())
})
.collect();
let instance_info = sqlx::query(
r#"
SELECT i.orchestration_name, i.orchestration_version, i.current_execution_id
FROM instances i
WHERE i.instance_id = ?1
"#,
)
.bind(&instance_id)
.fetch_optional(&mut *tx)
.await
.map_err(|e| Self::sqlx_to_provider_error("fetch_orchestration_item", e))?;
let (orchestration_name, orchestration_version, current_execution_id, history) = if let Some(info) =
instance_info
{
let name: String = info.try_get("orchestration_name").map_err(|e| {
ProviderError::permanent(
"fetch_orchestration_item",
format!("Failed to get orchestration_name: {e}"),
)
})?;
let version: Option<String> = info.try_get("orchestration_version").ok();
let exec_id: i64 = info.try_get("current_execution_id").map_err(|e| {
ProviderError::permanent(
"fetch_orchestration_item",
format!("Failed to get current_execution_id: {e}"),
)
})?;
let hist_result = self
.read_history_in_tx(&mut tx, &instance_id, Some(exec_id as u64))
.await;
match hist_result {
Ok(hist) => {
let version = version.unwrap_or_else(|| {
debug_assert!(
!hist
.iter()
.any(|e| matches!(&e.kind, EventKind::OrchestrationStarted { .. })),
"Instance exists with NULL version but history contains OrchestrationStarted event"
);
"unknown".to_string()
});
(name, version, exec_id as u64, hist)
}
Err(e) => {
let error_msg = format!("Failed to deserialize history: {e}");
tracing::warn!(
target = "duroxide::providers::sqlite",
instance = %instance_id,
error = %error_msg,
"History deserialization failed, returning item with history_error"
);
let version = version.unwrap_or_else(|| "unknown".to_string());
tx.commit()
.await
.map_err(|ce| Self::sqlx_to_provider_error("fetch_orchestration_item", ce))?;
return Ok(Some((
OrchestrationItem {
instance: instance_id,
orchestration_name: name,
execution_id: exec_id as u64,
version,
history: vec![],
messages: work_items,
history_error: Some(error_msg),
kv_snapshot: std::collections::HashMap::new(),
},
lock_token,
max_attempt_count,
)));
}
}
} else {
let hist = self
.read_history_in_tx(&mut tx, &instance_id, None)
.await
.unwrap_or_default();
if let Some(first_started) = hist.iter().find_map(|e| {
if let EventKind::OrchestrationStarted { name, version, .. } = &e.kind {
Some((name.clone(), version.clone()))
} else {
None
}
}) {
let (name, version) = first_started;
(name, version, 1u64, hist)
} else if let Some(start_item) = work_items.iter().find(|item| {
matches!(
item,
WorkItem::StartOrchestration { .. } | WorkItem::ContinueAsNew { .. }
)
}) {
let (orchestration, version) = match start_item {
WorkItem::StartOrchestration {
orchestration, version, ..
}
| WorkItem::ContinueAsNew {
orchestration, version, ..
} => (orchestration.clone(), version.clone()),
_ => unreachable!(),
};
(
orchestration,
version.unwrap_or_else(|| "unknown".to_string()),
1u64,
Vec::new(),
)
} else {
let all_queue_messages = work_items
.iter()
.all(|item| matches!(item, WorkItem::QueueMessage { .. }));
if all_queue_messages {
let message_count = work_items.len();
tracing::warn!(
target="duroxide::providers::sqlite",
instance=%instance_id,
message_count,
"Dropping orphan queue messages — events enqueued before orchestration started are not supported"
);
sqlx::query("DELETE FROM orchestrator_queue WHERE lock_token = ?1")
.bind(&lock_token)
.execute(&mut *tx)
.await
.map_err(|e| Self::sqlx_to_provider_error("fetch_orchestration_item", e))?;
sqlx::query("DELETE FROM instance_locks WHERE lock_token = ?1")
.bind(&lock_token)
.execute(&mut *tx)
.await
.map_err(|e| Self::sqlx_to_provider_error("fetch_orchestration_item", e))?;
tx.commit()
.await
.map_err(|e| Self::sqlx_to_provider_error("fetch_orchestration_item", e))?;
return Ok(None);
}
tracing::debug!(
target="duroxide::providers::sqlite",
instance=%instance_id,
message_count=work_items.len(),
"Work items without orchestration context; releasing for retry"
);
tx.rollback().await.ok();
return Ok(None);
}
};
let kv_snapshot = self.get_kv_snapshot_in_tx(&mut tx, &instance_id).await?;
tx.commit()
.await
.map_err(|e| Self::sqlx_to_provider_error("fetch_orchestration_item", e))?;
debug!(
instance = %instance_id,
messages = work_items.len(),
history_len = history.len(),
kv_keys = kv_snapshot.len(),
"Fetched orchestration item"
);
Ok(Some((
OrchestrationItem {
instance: instance_id,
orchestration_name,
execution_id: current_execution_id,
version: orchestration_version,
messages: work_items,
history,
history_error: None,
kv_snapshot,
},
lock_token,
max_attempt_count,
)))
}
async fn ack_orchestration_item(
&self,
lock_token: &str,
execution_id: u64,
history_delta: Vec<Event>,
worker_items: Vec<WorkItem>,
orchestrator_items: Vec<WorkItem>,
metadata: crate::providers::ExecutionMetadata,
cancelled_activities: Vec<ScheduledActivityIdentifier>,
) -> Result<(), ProviderError> {
let mut tx = self
.pool
.begin()
.await
.map_err(|e| Self::sqlx_to_provider_error("list_instances", e))?;
let row = sqlx::query("SELECT instance_id FROM instance_locks WHERE lock_token = ?")
.bind(lock_token)
.fetch_optional(&mut *tx)
.await
.map_err(|e| Self::sqlx_to_provider_error("list_instances", e))?
.ok_or_else(|| ProviderError::permanent("ack_orchestration_item", "Invalid lock token"))?;
let instance_id: String = row.try_get("instance_id").map_err(|e| {
ProviderError::permanent("ack_orchestration_item", format!("Failed to decode instance_id: {e}"))
})?;
sqlx::query("DELETE FROM orchestrator_queue WHERE lock_token = ?")
.bind(lock_token)
.execute(&mut *tx)
.await
.map_err(|e| Self::sqlx_to_provider_error("list_instances", e))?;
debug!(
instance = %instance_id,
execution_id = %execution_id,
history_delta_len = %history_delta.len(),
"Acking with explicit execution_id"
);
if let (Some(name), Some(version)) = (&metadata.orchestration_name, &metadata.orchestration_version) {
sqlx::query(
r#"
INSERT OR IGNORE INTO instances
(instance_id, orchestration_name, orchestration_version, current_execution_id, parent_instance_id)
VALUES (?, ?, ?, ?, ?)
"#,
)
.bind(&instance_id)
.bind(name)
.bind(version.as_str()) .bind(execution_id as i64)
.bind(&metadata.parent_instance_id)
.execute(&mut *tx)
.await
.map_err(|e| Self::sqlx_to_provider_error("list_instances", e))?;
sqlx::query(
r#"
UPDATE instances
SET orchestration_name = ?, orchestration_version = ?
WHERE instance_id = ?
"#,
)
.bind(name)
.bind(version)
.bind(&instance_id)
.execute(&mut *tx)
.await
.map_err(|e| Self::sqlx_to_provider_error("list_instances", e))?;
}
sqlx::query(
r#"
INSERT OR IGNORE INTO executions (instance_id, execution_id, status)
VALUES (?, ?, 'Running')
"#,
)
.bind(&instance_id)
.bind(execution_id as i64)
.execute(&mut *tx)
.await
.map_err(|e| Self::sqlx_to_provider_error("ack_orchestration_item", e))?;
if let Some(pinned) = &metadata.pinned_duroxide_version {
sqlx::query(
r#"
UPDATE executions
SET duroxide_version_major = ?, duroxide_version_minor = ?, duroxide_version_patch = ?
WHERE instance_id = ? AND execution_id = ?
"#,
)
.bind(pinned.major as i64)
.bind(pinned.minor as i64)
.bind(pinned.patch as i64)
.bind(&instance_id)
.bind(execution_id as i64)
.execute(&mut *tx)
.await
.map_err(|e| Self::sqlx_to_provider_error("ack_orchestration_item", e))?;
}
sqlx::query(
r#"
UPDATE instances
SET current_execution_id = MAX(current_execution_id, ?)
WHERE instance_id = ?
"#,
)
.bind(execution_id as i64)
.bind(&instance_id)
.execute(&mut *tx)
.await
.map_err(|e| Self::sqlx_to_provider_error("list_instances", e))?;
if !history_delta.is_empty() {
debug!(
instance = %instance_id,
events = history_delta.len(),
first_event = ?history_delta.first().map(std::mem::discriminant),
"Appending history delta"
);
self.append_history_in_tx(&mut tx, &instance_id, execution_id, history_delta.clone())
.await
.map_err(|e| {
ProviderError::permanent("ack_orchestration_item", format!("Failed to append history: {e}"))
})?;
}
if let Some(status) = &metadata.status {
let now_ms = Self::now_millis();
sqlx::query(
r#"
UPDATE executions
SET status = ?, output = ?, completed_at = ?
WHERE instance_id = ? AND execution_id = ?
"#,
)
.bind(status)
.bind(&metadata.output)
.bind(now_ms)
.bind(&instance_id)
.bind(execution_id as i64)
.execute(&mut *tx)
.await
.map_err(|e| Self::sqlx_to_provider_error("ack_orchestration_item", e))?;
debug!(
instance = %instance_id,
execution_id = %execution_id,
status = %status,
"Updated execution status and output from metadata"
);
}
let custom_status_from_delta = history_delta.iter().rev().find_map(|e| match &e.kind {
EventKind::CustomStatusUpdated { status } => Some(status.clone()),
_ => None,
});
match custom_status_from_delta {
Some(Some(custom_status)) => {
sqlx::query(
r#"
UPDATE instances
SET custom_status = ?, custom_status_version = custom_status_version + 1
WHERE instance_id = ?
"#,
)
.bind(&custom_status)
.bind(&instance_id)
.execute(&mut *tx)
.await
.map_err(|e| Self::sqlx_to_provider_error("ack_orchestration_item", e))?;
debug!(
instance = %instance_id,
custom_status = %custom_status,
"Updated custom_status from history event"
);
}
Some(None) => {
sqlx::query(
r#"
UPDATE instances
SET custom_status = NULL, custom_status_version = custom_status_version + 1
WHERE instance_id = ?
"#,
)
.bind(&instance_id)
.execute(&mut *tx)
.await
.map_err(|e| Self::sqlx_to_provider_error("ack_orchestration_item", e))?;
debug!(
instance = %instance_id,
"Cleared custom_status from history event"
);
}
None => {
}
}
for event in &history_delta {
match &event.kind {
EventKind::KeyValueSet {
key,
value,
last_updated_at_ms,
} => {
sqlx::query(
r#"
INSERT INTO kv_delta (instance_id, key, value, last_updated_at_ms)
VALUES (?, ?, ?, ?)
ON CONFLICT(instance_id, key)
DO UPDATE SET value = excluded.value, last_updated_at_ms = excluded.last_updated_at_ms
"#,
)
.bind(&instance_id)
.bind(key)
.bind(value)
.bind(*last_updated_at_ms as i64)
.execute(&mut *tx)
.await
.map_err(|e| Self::sqlx_to_provider_error("ack_orchestration_item", e))?;
}
EventKind::KeyValueCleared { key } => {
sqlx::query(
r#"
INSERT INTO kv_delta (instance_id, key, value, last_updated_at_ms)
VALUES (?, ?, NULL, ?)
ON CONFLICT(instance_id, key)
DO UPDATE SET value = NULL, last_updated_at_ms = excluded.last_updated_at_ms
"#,
)
.bind(&instance_id)
.bind(key)
.bind(Self::now_millis())
.execute(&mut *tx)
.await
.map_err(|e| Self::sqlx_to_provider_error("ack_orchestration_item", e))?;
}
EventKind::KeyValuesCleared => {
let clear_ts = Self::now_millis();
sqlx::query("UPDATE kv_delta SET value = NULL, last_updated_at_ms = ? WHERE instance_id = ?")
.bind(clear_ts)
.bind(&instance_id)
.execute(&mut *tx)
.await
.map_err(|e| Self::sqlx_to_provider_error("ack_orchestration_item", e))?;
sqlx::query(
r#"
INSERT OR IGNORE INTO kv_delta (instance_id, key, value, last_updated_at_ms)
SELECT instance_id, key, NULL, ? FROM kv_store WHERE instance_id = ?
"#,
)
.bind(clear_ts)
.bind(&instance_id)
.execute(&mut *tx)
.await
.map_err(|e| Self::sqlx_to_provider_error("ack_orchestration_item", e))?;
}
_ => {}
}
}
let is_terminal = metadata
.status
.as_deref()
.is_some_and(|s| s == "Completed" || s == "ContinuedAsNew" || s == "Failed");
if is_terminal {
sqlx::query(
r#"
INSERT INTO kv_store (instance_id, key, value, last_updated_at_ms)
SELECT instance_id, key, value, last_updated_at_ms
FROM kv_delta
WHERE instance_id = ? AND value IS NOT NULL
ON CONFLICT(instance_id, key)
DO UPDATE SET value = excluded.value, last_updated_at_ms = excluded.last_updated_at_ms
"#,
)
.bind(&instance_id)
.execute(&mut *tx)
.await
.map_err(|e| Self::sqlx_to_provider_error("ack_orchestration_item", e))?;
sqlx::query(
r#"
DELETE FROM kv_store
WHERE instance_id = ?
AND key IN (SELECT key FROM kv_delta WHERE instance_id = ? AND value IS NULL)
"#,
)
.bind(&instance_id)
.bind(&instance_id)
.execute(&mut *tx)
.await
.map_err(|e| Self::sqlx_to_provider_error("ack_orchestration_item", e))?;
sqlx::query("DELETE FROM kv_delta WHERE instance_id = ?")
.bind(&instance_id)
.execute(&mut *tx)
.await
.map_err(|e| Self::sqlx_to_provider_error("ack_orchestration_item", e))?;
}
debug!(
instance = %instance_id,
count = worker_items.len(),
"Enqueuing worker items"
);
let now_ms = Self::now_millis();
for item in worker_items {
let (activity_instance, activity_execution_id, activity_id, session_id, tag) = match &item {
WorkItem::ActivityExecute {
instance,
execution_id,
id,
session_id,
tag,
..
} => (
Some(instance.as_str()),
Some(*execution_id),
Some(*id),
session_id.as_deref(),
tag.as_deref(),
),
_ => (None, None, None, None, None),
};
let work_item = serde_json::to_string(&item).map_err(|e| {
ProviderError::permanent("enqueue_for_orchestrator", format!("Serialization error: {e}"))
})?;
sqlx::query(
r#"
INSERT INTO worker_queue (work_item, visible_at, instance_id, execution_id, activity_id, session_id, tag)
VALUES (?, ?, ?, ?, ?, ?, ?)
"#,
)
.bind(work_item)
.bind(now_ms)
.bind(activity_instance)
.bind(activity_execution_id.map(|e| e as i64))
.bind(activity_id.map(|a| a as i64))
.bind(session_id)
.bind(tag)
.execute(&mut *tx)
.await
.map_err(|e| Self::sqlx_to_provider_error("list_instances", e))?;
}
if !cancelled_activities.is_empty() {
debug!(
instance = %instance_id,
count = cancelled_activities.len(),
"Cancelling activities via lock stealing"
);
let placeholders: Vec<String> = cancelled_activities
.iter()
.enumerate()
.map(|(i, _)| format!("(?{}, ?{}, ?{})", i * 3 + 1, i * 3 + 2, i * 3 + 3))
.collect();
let sql = format!(
"DELETE FROM worker_queue WHERE (instance_id, execution_id, activity_id) IN (VALUES {})",
placeholders.join(", ")
);
let mut query = sqlx::query(&sql);
for activity in &cancelled_activities {
query = query
.bind(&activity.instance)
.bind(activity.execution_id as i64)
.bind(activity.activity_id as i64);
}
query
.execute(&mut *tx)
.await
.map_err(|e| Self::sqlx_to_provider_error("ack_orchestration_item", e))?;
}
for item in orchestrator_items {
let work_item = serde_json::to_string(&item).map_err(|e| {
ProviderError::permanent("enqueue_for_orchestrator", format!("Serialization error: {e}"))
})?;
let instance = match &item {
WorkItem::StartOrchestration { instance, .. }
| WorkItem::ActivityCompleted { instance, .. }
| WorkItem::ActivityFailed { instance, .. }
| WorkItem::TimerFired { instance, .. }
| WorkItem::ExternalRaised { instance, .. }
| WorkItem::QueueMessage { instance, .. }
| WorkItem::CancelInstance { instance, .. }
| WorkItem::ContinueAsNew { instance, .. } => instance,
#[cfg(feature = "replay-version-test")]
WorkItem::ExternalRaised2 { instance, .. } => instance,
WorkItem::SubOrchCompleted { parent_instance, .. }
| WorkItem::SubOrchFailed { parent_instance, .. } => parent_instance,
_ => continue,
};
tracing::debug!(target = "duroxide::providers::sqlite", instance=%instance, ?item, "enqueue orchestrator item in ack");
let visible_at = match &item {
WorkItem::TimerFired { fire_at_ms, .. } => *fire_at_ms as i64,
_ => Self::now_millis(),
};
sqlx::query("INSERT INTO orchestrator_queue (instance_id, work_item, visible_at) VALUES (?, ?, ?)")
.bind(instance)
.bind(work_item)
.bind(visible_at)
.execute(&mut *tx)
.await
.map_err(|e| Self::sqlx_to_provider_error("list_instances", e))?;
}
let now_ms = Self::now_millis();
let lock_valid = sqlx::query_scalar::<_, i64>(
r#"
SELECT COUNT(*) FROM instance_locks
WHERE instance_id = ? AND lock_token = ? AND locked_until > ?
"#,
)
.bind(&instance_id)
.bind(lock_token)
.bind(now_ms)
.fetch_one(&mut *tx)
.await
.map_err(|e| Self::sqlx_to_provider_error("list_instances", e))?;
if lock_valid == 0 {
tracing::warn!(
instance = %instance_id,
lock_token = %lock_token,
"Instance lock expired or invalid, aborting ack"
);
tx.rollback().await.ok();
return Err(ProviderError::permanent(
"ack_orchestration_item",
"Instance lock expired",
));
}
sqlx::query("DELETE FROM instance_locks WHERE instance_id = ? AND lock_token = ?")
.bind(&instance_id)
.bind(lock_token)
.execute(&mut *tx)
.await
.map_err(|e| Self::sqlx_to_provider_error("list_instances", e))?;
tx.commit()
.await
.map_err(|e| Self::sqlx_to_provider_error("list_instances", e))?;
debug!(
instance = %instance_id,
"Acknowledged orchestration item and released lock"
);
Ok(())
}
async fn read(&self, instance: &str) -> Result<Vec<Event>, ProviderError> {
let mut conn = self
.pool
.acquire()
.await
.map_err(|e| Self::sqlx_to_provider_error("read", e))?;
let execution_id: i64 =
sqlx::query_scalar::<_, i64>("SELECT COALESCE(MAX(execution_id), 1) FROM executions WHERE instance_id = ?")
.bind(instance)
.fetch_one(&mut *conn)
.await
.map_err(|e| Self::sqlx_to_provider_error("read", e))?;
let rows = sqlx::query(
r#"
SELECT event_data
FROM history
WHERE instance_id = ? AND execution_id = ?
ORDER BY event_id
"#,
)
.bind(instance)
.bind(execution_id)
.fetch_all(&mut *conn)
.await
.map_err(|e| Self::sqlx_to_provider_error("read", e))?;
let mut events = Vec::new();
for row in rows {
let event_data: String = row
.try_get("event_data")
.map_err(|e| ProviderError::permanent("read", format!("Failed to get event_data: {e}")))?;
let event: Event = serde_json::from_str(&event_data)
.map_err(|e| ProviderError::permanent("read", format!("Failed to deserialize event: {e}")))?;
events.push(event);
}
Ok(events)
}
async fn read_with_execution(&self, instance: &str, execution_id: u64) -> Result<Vec<Event>, ProviderError> {
let mut conn = self
.pool
.acquire()
.await
.map_err(|e| Self::sqlx_to_provider_error("read_with_execution", e))?;
let rows = sqlx::query(
r#"
SELECT event_data
FROM history
WHERE instance_id = ? AND execution_id = ?
ORDER BY event_id
"#,
)
.bind(instance)
.bind(execution_id as i64)
.fetch_all(&mut *conn)
.await
.map_err(|e| Self::sqlx_to_provider_error("read_with_execution", e))?;
let mut events = Vec::new();
for row in rows {
let event_data: String = row.try_get("event_data").map_err(|e| {
ProviderError::permanent("read_with_execution", format!("Failed to get event_data: {e}"))
})?;
let event: Event = serde_json::from_str(&event_data).map_err(|e| {
ProviderError::permanent("read_with_execution", format!("Failed to deserialize event: {e}"))
})?;
events.push(event);
}
Ok(events)
}
async fn append_with_execution(
&self,
instance: &str,
execution_id: u64,
new_events: Vec<Event>,
) -> Result<(), ProviderError> {
let mut tx = self
.pool
.begin()
.await
.map_err(|e| Self::sqlx_to_provider_error("append_with_execution", e))?;
self.append_history_in_tx(&mut tx, instance, execution_id, new_events)
.await
.map_err(|e| ProviderError::permanent("append_with_execution", format!("Failed to append history: {e}")))?;
tx.commit()
.await
.map_err(|e| Self::sqlx_to_provider_error("append_with_execution", e))?;
Ok(())
}
async fn enqueue_for_orchestrator(&self, item: WorkItem, delay: Option<Duration>) -> Result<(), ProviderError> {
self.enqueue_orchestrator_work_with_delay(item, delay).await
}
async fn enqueue_for_worker(&self, item: WorkItem) -> Result<(), ProviderError> {
tracing::debug!(target: "duroxide::providers::sqlite", ?item, "enqueue_for_worker");
let (activity_instance, activity_execution_id, activity_id, session_id, tag) = match &item {
WorkItem::ActivityExecute {
instance,
execution_id,
id,
session_id,
tag,
..
} => (
Some(instance.as_str()),
Some(*execution_id),
Some(*id),
session_id.as_deref(),
tag.as_deref(),
),
_ => (None, None, None, None, None),
};
let work_item = serde_json::to_string(&item)
.map_err(|e| ProviderError::permanent("enqueue_for_worker", format!("Serialization error: {e}")))?;
let now_ms = Self::now_millis();
sqlx::query(
r#"
INSERT INTO worker_queue (work_item, visible_at, instance_id, execution_id, activity_id, session_id, tag)
VALUES (?, ?, ?, ?, ?, ?, ?)
"#,
)
.bind(work_item)
.bind(now_ms)
.bind(activity_instance)
.bind(activity_execution_id.map(|e| e as i64))
.bind(activity_id.map(|a| a as i64))
.bind(session_id)
.bind(tag)
.execute(&self.pool)
.await
.map_err(|e| Self::sqlx_to_provider_error("enqueue_for_worker", e))?;
Ok(())
}
async fn fetch_work_item(
&self,
lock_timeout: Duration,
_poll_timeout: Duration,
session: Option<&SessionFetchConfig>,
tag_filter: &TagFilter,
) -> Result<Option<(WorkItem, String, u32)>, ProviderError> {
if matches!(tag_filter, TagFilter::None) {
return Ok(None);
}
let mut tx = self
.pool
.begin()
.await
.map_err(|e| Self::sqlx_to_provider_error("fetch_work_item", e))?;
let lock_token = Self::generate_lock_token();
let locked_until = Self::timestamp_after(lock_timeout);
tracing::debug!(
"Worker dequeue: looking for available items, locked_until will be {}",
locked_until
);
let now_ms = Self::now_millis();
let tag_start_param = if session.is_some() { 3 } else { 2 };
let tag_clause = Self::build_tag_clause(tag_filter, tag_start_param);
let tag_values = Self::collect_tag_values(tag_filter);
let next_item = if let Some(config) = session {
let sql = format!(
r#"
SELECT q.id, q.work_item, q.attempt_count, q.session_id
FROM worker_queue q
LEFT JOIN sessions s ON s.session_id = q.session_id AND s.locked_until > ?1
WHERE q.visible_at <= ?1
AND (q.lock_token IS NULL OR q.locked_until <= ?1)
AND (
q.session_id IS NULL
OR s.worker_id = ?2
OR s.session_id IS NULL
)
AND ({tag_clause})
ORDER BY q.id
LIMIT 1
"#,
);
let mut query = sqlx::query(&sql).bind(now_ms).bind(&config.owner_id);
for val in &tag_values {
query = query.bind(val.as_str());
}
query
.fetch_optional(&mut *tx)
.await
.map_err(|e| Self::sqlx_to_provider_error("fetch_work_item", e))?
} else {
let sql = format!(
r#"
SELECT q.id, q.work_item, q.attempt_count, q.session_id FROM worker_queue q
WHERE q.visible_at <= ?1
AND (q.lock_token IS NULL OR q.locked_until <= ?1)
AND q.session_id IS NULL
AND ({tag_clause})
ORDER BY q.id
LIMIT 1
"#,
);
let mut query = sqlx::query(&sql).bind(now_ms);
for val in &tag_values {
query = query.bind(val.as_str());
}
query
.fetch_optional(&mut *tx)
.await
.map_err(|e| Self::sqlx_to_provider_error("fetch_work_item", e))?
};
if next_item.is_none() {
tracing::debug!("Worker dequeue: no available items found");
return Ok(None);
}
let next_item = next_item.unwrap();
tracing::debug!("Worker dequeue found item");
let id: i64 = next_item
.try_get("id")
.map_err(|e| ProviderError::permanent("fetch_work_item", format!("Failed to get id: {e}")))?;
let work_item_str: String = next_item
.try_get("work_item")
.map_err(|e| ProviderError::permanent("fetch_work_item", format!("Failed to get work_item: {e}")))?;
let current_attempt_count: i64 = next_item
.try_get("attempt_count")
.map_err(|e| ProviderError::permanent("fetch_work_item", format!("Failed to get attempt_count: {e}")))?;
let session_id: Option<String> = next_item
.try_get("session_id")
.map_err(|e| ProviderError::permanent("fetch_work_item", format!("Failed to get session_id: {e}")))?;
sqlx::query(
r#"
UPDATE worker_queue
SET lock_token = ?1, locked_until = ?2, attempt_count = attempt_count + 1
WHERE id = ?3
"#,
)
.bind(&lock_token)
.bind(locked_until)
.bind(id)
.execute(&mut *tx)
.await
.map_err(|e| Self::sqlx_to_provider_error("fetch_work_item", e))?;
if let (Some(sid), Some(config)) = (&session_id, session) {
let session_locked_until = now_ms + config.lock_timeout.as_millis() as i64;
let upsert_result = sqlx::query(
r#"
INSERT INTO sessions (session_id, worker_id, locked_until, last_activity_at)
VALUES (?1, ?2, ?3, ?4)
ON CONFLICT (session_id) DO UPDATE
SET worker_id = ?2,
locked_until = ?3,
last_activity_at = ?4
WHERE sessions.locked_until <= ?4 OR sessions.worker_id = ?2
"#,
)
.bind(sid)
.bind(&config.owner_id)
.bind(session_locked_until)
.bind(now_ms)
.execute(&mut *tx)
.await
.map_err(|e| Self::sqlx_to_provider_error("fetch_work_item", e))?;
if upsert_result.rows_affected() == 0 {
tracing::warn!(
target: "duroxide::providers::sqlite",
session_id = %sid,
owner_id = %config.owner_id,
"Session claim failed — owned by another worker, rolling back"
);
tx.rollback().await.ok();
return Ok(None);
}
tracing::debug!(
target: "duroxide::providers::sqlite",
session_id = %sid,
owner_id = %config.owner_id,
"Session claimed/refreshed on fetch"
);
}
let attempt_count = (current_attempt_count + 1) as u32;
let work_item: WorkItem = serde_json::from_str(&work_item_str)
.map_err(|e| ProviderError::permanent("fetch_work_item", format!("Deserialization error: {e}")))?;
tx.commit()
.await
.map_err(|e| Self::sqlx_to_provider_error("fetch_work_item", e))?;
Ok(Some((work_item, lock_token, attempt_count)))
}
async fn ack_work_item(&self, token: &str, completion: Option<WorkItem>) -> Result<(), ProviderError> {
let mut tx = self
.pool
.begin()
.await
.map_err(|e| Self::sqlx_to_provider_error("ack_work_item", e))?;
let now_ms = Self::now_millis();
let deleted_row: Option<(Option<String>,)> =
sqlx::query_as("DELETE FROM worker_queue WHERE lock_token = ? AND locked_until > ? RETURNING session_id")
.bind(token)
.bind(now_ms)
.fetch_optional(&mut *tx)
.await
.map_err(|e| Self::sqlx_to_provider_error("ack_work_item", e))?;
let session_id = match deleted_row {
Some((sid,)) => sid,
None => {
return Err(ProviderError::permanent(
"ack_work_item",
"Activity was cancelled or lock expired (worker queue row not found or lock invalid)",
));
}
};
if let Some(ref sid) = session_id {
sqlx::query("UPDATE sessions SET last_activity_at = ?1 WHERE session_id = ?2 AND locked_until > ?1")
.bind(now_ms)
.bind(sid)
.execute(&mut *tx)
.await
.ok(); }
if let Some(completion) = completion {
let instance = match &completion {
WorkItem::ActivityCompleted { instance, .. } => instance,
WorkItem::ActivityFailed { instance, .. } => instance,
_ => {
return Err(ProviderError::permanent(
"ack_work_item",
"Invalid completion type for worker ack",
));
}
};
let work_item = serde_json::to_string(&completion)
.map_err(|e| ProviderError::permanent("ack_work_item", format!("Serialization error: {e}")))?;
let now_ms = Self::now_millis();
sqlx::query("INSERT INTO orchestrator_queue (instance_id, work_item, visible_at) VALUES (?, ?, ?)")
.bind(instance)
.bind(work_item)
.bind(now_ms)
.execute(&mut *tx)
.await
.map_err(|e| Self::sqlx_to_provider_error("ack_work_item", e))?;
tx.commit()
.await
.map_err(|e| Self::sqlx_to_provider_error("ack_work_item", e))?;
debug!(instance = %instance, "Atomically acked worker and enqueued completion");
} else {
tx.commit()
.await
.map_err(|e| Self::sqlx_to_provider_error("ack_work_item", e))?;
debug!("Acked worker item without enqueuing completion (orchestration terminal/missing)");
}
Ok(())
}
async fn renew_work_item_lock(&self, token: &str, extend_for: Duration) -> Result<(), ProviderError> {
let now_ms = Self::now_millis();
let locked_until = Self::timestamp_after(extend_for);
let row: Option<(Option<String>,)> = sqlx::query_as(
r#"
UPDATE worker_queue
SET locked_until = ?1
WHERE lock_token = ?2
AND locked_until > ?3
RETURNING session_id
"#,
)
.bind(locked_until)
.bind(token)
.bind(now_ms)
.fetch_optional(&self.pool)
.await
.map_err(|e| Self::sqlx_to_provider_error("renew_work_item_lock", e))?;
let session_id = match row {
Some((sid,)) => sid,
None => {
return Err(ProviderError::permanent(
"renew_work_item_lock",
"Lock renewal failed - activity was cancelled or lock expired",
));
}
};
if let Some(ref sid) = session_id {
sqlx::query("UPDATE sessions SET last_activity_at = ?1 WHERE session_id = ?2 AND locked_until > ?1")
.bind(now_ms)
.bind(sid)
.execute(&self.pool)
.await
.ok(); }
tracing::debug!(
target: "duroxide::providers::sqlite",
lock_token = %token,
extend_secs = %extend_for.as_secs(),
"Work item lock renewed"
);
Ok(())
}
async fn abandon_work_item(
&self,
token: &str,
delay: Option<Duration>,
ignore_attempt: bool,
) -> Result<(), ProviderError> {
let now_ms = Self::now_millis();
let visible_at = if let Some(d) = delay {
Self::timestamp_after(d)
} else {
now_ms
};
let query = if ignore_attempt {
r#"
UPDATE worker_queue
SET lock_token = NULL, locked_until = NULL, visible_at = ?1,
attempt_count = MAX(0, attempt_count - 1)
WHERE lock_token = ?2
"#
} else {
r#"
UPDATE worker_queue
SET lock_token = NULL, locked_until = NULL, visible_at = ?1
WHERE lock_token = ?2
"#
};
let result = sqlx::query(query)
.bind(visible_at)
.bind(token)
.execute(&self.pool)
.await
.map_err(|e| Self::sqlx_to_provider_error("abandon_work_item", e))?;
if result.rows_affected() == 0 {
return Err(ProviderError::permanent(
"abandon_work_item",
"Invalid lock token or already acked",
));
}
tracing::debug!(
target: "duroxide::providers::sqlite",
lock_token = %token,
delay_ms = ?delay.map(|d| d.as_millis()),
ignore_attempt = %ignore_attempt,
"Work item abandoned"
);
Ok(())
}
async fn renew_orchestration_item_lock(&self, token: &str, extend_for: Duration) -> Result<(), ProviderError> {
let locked_until = Self::timestamp_after(extend_for);
let now_ms = Self::now_millis();
let result = sqlx::query(
r#"
UPDATE instance_locks
SET locked_until = ?1
WHERE lock_token = ?2
AND locked_until > ?3
"#,
)
.bind(locked_until)
.bind(token)
.bind(now_ms)
.execute(&self.pool)
.await
.map_err(|e| Self::sqlx_to_provider_error("renew_orchestration_item_lock", e))?;
if result.rows_affected() == 0 {
return Err(ProviderError::permanent(
"renew_orchestration_item_lock",
"Lock token invalid, expired, or already acked",
));
}
tracing::debug!(
target: "duroxide::providers::sqlite",
lock_token = %token,
extend_secs = %extend_for.as_secs(),
"Orchestration item lock renewed"
);
Ok(())
}
async fn abandon_orchestration_item(
&self,
lock_token: &str,
delay: Option<Duration>,
ignore_attempt: bool,
) -> Result<(), ProviderError> {
let mut tx = self
.pool
.begin()
.await
.map_err(|e| Self::sqlx_to_provider_error("abandon_orchestration_item", e))?;
let instance_id: Option<String> =
sqlx::query_scalar("SELECT instance_id FROM instance_locks WHERE lock_token = ?")
.bind(lock_token)
.fetch_optional(&mut *tx)
.await
.map_err(|e| Self::sqlx_to_provider_error("abandon_orchestration_item", e))?;
let Some(instance_id) = instance_id else {
return Err(ProviderError::permanent(
"abandon_orchestration_item",
"Invalid lock token",
));
};
sqlx::query("DELETE FROM instance_locks WHERE lock_token = ?")
.bind(lock_token)
.execute(&mut *tx)
.await
.map_err(|e| Self::sqlx_to_provider_error("abandon_orchestration_item", e))?;
if ignore_attempt {
sqlx::query(
"UPDATE orchestrator_queue SET attempt_count = MAX(0, attempt_count - 1) WHERE instance_id = ?",
)
.bind(&instance_id)
.execute(&mut *tx)
.await
.map_err(|e| Self::sqlx_to_provider_error("abandon_orchestration_item", e))?;
}
if let Some(delay) = delay {
let delay_ms = delay.as_millis().min(i64::MAX as u128) as i64;
let visible_at = Self::now_millis().saturating_add(delay_ms);
sqlx::query("UPDATE orchestrator_queue SET visible_at = ? WHERE instance_id = ? AND visible_at <= ?")
.bind(visible_at)
.bind(&instance_id)
.bind(Self::now_millis())
.execute(&mut *tx)
.await
.map_err(|e| Self::sqlx_to_provider_error("abandon_orchestration_item", e))?;
}
tx.commit()
.await
.map_err(|e| Self::sqlx_to_provider_error("abandon_orchestration_item", e))?;
Ok(())
}
async fn renew_session_lock(
&self,
owner_ids: &[&str],
extend_for: Duration,
idle_timeout: Duration,
) -> Result<usize, ProviderError> {
if owner_ids.is_empty() {
return Ok(0);
}
let now_ms = Self::now_millis();
let locked_until = now_ms + extend_for.as_millis() as i64;
let idle_cutoff = now_ms - idle_timeout.as_millis() as i64;
let placeholders: Vec<String> = (0..owner_ids.len()).map(|i| format!("?{}", i + 3)).collect();
let sql = format!(
"UPDATE sessions SET locked_until = ?1 \
WHERE worker_id IN ({}) \
AND locked_until > ?2 \
AND last_activity_at > ?{}",
placeholders.join(", "),
owner_ids.len() + 3,
);
let mut query = sqlx::query(&sql).bind(locked_until).bind(now_ms);
for owner_id in owner_ids {
query = query.bind(owner_id);
}
query = query.bind(idle_cutoff);
let result = query
.execute(&self.pool)
.await
.map_err(|e| Self::sqlx_to_provider_error("renew_session_lock", e))?;
let count = result.rows_affected() as usize;
tracing::debug!(
target: "duroxide::providers::sqlite",
owner_count = %owner_ids.len(),
sessions_renewed = %count,
"Session locks renewed"
);
Ok(count)
}
async fn cleanup_orphaned_sessions(&self, _idle_timeout: Duration) -> Result<usize, ProviderError> {
let now_ms = Self::now_millis();
let result = sqlx::query(
r#"
DELETE FROM sessions
WHERE locked_until < ?1
AND NOT EXISTS (
SELECT 1 FROM worker_queue WHERE worker_queue.session_id = sessions.session_id
)
"#,
)
.bind(now_ms)
.execute(&self.pool)
.await
.map_err(|e| Self::sqlx_to_provider_error("cleanup_orphaned_sessions", e))?;
let count = result.rows_affected() as usize;
tracing::debug!(
target: "duroxide::providers::sqlite",
sessions_cleaned = %count,
"Orphaned sessions cleaned up"
);
Ok(count)
}
fn as_management_capability(&self) -> Option<&dyn ProviderAdmin> {
Some(self as &dyn ProviderAdmin)
}
async fn get_custom_status(
&self,
instance: &str,
last_seen_version: u64,
) -> Result<Option<(Option<String>, u64)>, ProviderError> {
let row = sqlx::query(
r#"
SELECT custom_status, custom_status_version
FROM instances
WHERE instance_id = ? AND custom_status_version > ?
"#,
)
.bind(instance)
.bind(last_seen_version as i64)
.fetch_optional(&self.pool)
.await
.map_err(|e| Self::sqlx_to_provider_error("get_custom_status", e))?;
match row {
Some(row) => {
let custom_status: Option<String> = row.try_get("custom_status").ok().flatten();
let version: i64 = row.try_get("custom_status_version").unwrap_or(0);
Ok(Some((custom_status, version as u64)))
}
None => Ok(None),
}
}
async fn get_kv_value(&self, instance: &str, key: &str) -> Result<Option<String>, ProviderError> {
let delta_row = sqlx::query("SELECT value FROM kv_delta WHERE instance_id = ? AND key = ?")
.bind(instance)
.bind(key)
.fetch_optional(&self.pool)
.await
.map_err(|e| Self::sqlx_to_provider_error("get_kv_value", e))?;
if let Some(r) = delta_row {
let value: Option<String> = r
.try_get("value")
.map_err(|e| Self::sqlx_to_provider_error("get_kv_value", e))?;
return Ok(value);
}
let row = sqlx::query("SELECT value FROM kv_store WHERE instance_id = ? AND key = ?")
.bind(instance)
.bind(key)
.fetch_optional(&self.pool)
.await
.map_err(|e| Self::sqlx_to_provider_error("get_kv_value", e))?;
Ok(match row {
Some(r) => Some(
r.try_get("value")
.map_err(|e| Self::sqlx_to_provider_error("get_kv_value", e))?,
),
None => None,
})
}
async fn get_kv_all_values(
&self,
instance: &str,
) -> Result<std::collections::HashMap<String, String>, ProviderError> {
let store_rows = sqlx::query("SELECT key, value FROM kv_store WHERE instance_id = ?")
.bind(instance)
.fetch_all(&self.pool)
.await
.map_err(|e| Self::sqlx_to_provider_error("get_kv_all_values", e))?;
let mut map = std::collections::HashMap::new();
for row in store_rows {
let k: String = row
.try_get("key")
.map_err(|e| Self::sqlx_to_provider_error("get_kv_all_values", e))?;
let v: String = row
.try_get("value")
.map_err(|e| Self::sqlx_to_provider_error("get_kv_all_values", e))?;
map.insert(k, v);
}
let delta_rows = sqlx::query("SELECT key, value FROM kv_delta WHERE instance_id = ?")
.bind(instance)
.fetch_all(&self.pool)
.await
.map_err(|e| Self::sqlx_to_provider_error("get_kv_all_values", e))?;
for row in delta_rows {
let k: String = row
.try_get("key")
.map_err(|e| Self::sqlx_to_provider_error("get_kv_all_values", e))?;
let v: Option<String> = row
.try_get("value")
.map_err(|e| Self::sqlx_to_provider_error("get_kv_all_values", e))?;
match v {
Some(value) => {
map.insert(k, value);
}
None => {
map.remove(&k);
} }
}
Ok(map)
}
async fn get_instance_stats(&self, instance: &str) -> Result<Option<crate::SystemStats>, ProviderError> {
let exec_row = sqlx::query("SELECT current_execution_id FROM instances WHERE instance_id = ?")
.bind(instance)
.fetch_optional(&self.pool)
.await
.map_err(|e| Self::sqlx_to_provider_error("get_instance_stats", e))?;
let exec_id: i64 = match exec_row {
Some(row) => row
.try_get("current_execution_id")
.map_err(|e| Self::sqlx_to_provider_error("get_instance_stats", e))?,
None => return Ok(None),
};
let history_row = sqlx::query(
"SELECT COUNT(*) as cnt, COALESCE(SUM(LENGTH(event_data)), 0) as size_bytes \
FROM history WHERE instance_id = ? AND execution_id = ?",
)
.bind(instance)
.bind(exec_id)
.fetch_one(&self.pool)
.await
.map_err(|e| Self::sqlx_to_provider_error("get_instance_stats", e))?;
let history_event_count: i64 = history_row
.try_get("cnt")
.map_err(|e| Self::sqlx_to_provider_error("get_instance_stats", e))?;
let history_size_bytes: i64 = history_row
.try_get("size_bytes")
.map_err(|e| Self::sqlx_to_provider_error("get_instance_stats", e))?;
let kv_row = sqlx::query(
"SELECT COUNT(*) as cnt, COALESCE(SUM(LENGTH(value)), 0) as size_bytes \
FROM ( \
SELECT COALESCE(d.key, s.key) AS key, \
CASE WHEN d.key IS NOT NULL THEN d.value ELSE s.value END AS value \
FROM kv_store s \
LEFT JOIN kv_delta d ON s.instance_id = d.instance_id AND s.key = d.key \
WHERE s.instance_id = ? \
UNION \
SELECT d.key, d.value \
FROM kv_delta d \
LEFT JOIN kv_store s ON d.instance_id = s.instance_id AND d.key = s.key \
WHERE d.instance_id = ? AND s.key IS NULL \
) merged WHERE value IS NOT NULL",
)
.bind(instance)
.bind(instance)
.fetch_one(&self.pool)
.await
.map_err(|e| Self::sqlx_to_provider_error("get_instance_stats", e))?;
let kv_user_key_count: i64 = kv_row
.try_get("cnt")
.map_err(|e| Self::sqlx_to_provider_error("get_instance_stats", e))?;
let kv_total_value_bytes: i64 = kv_row
.try_get("size_bytes")
.map_err(|e| Self::sqlx_to_provider_error("get_instance_stats", e))?;
let carry_forward_row = sqlx::query(
"SELECT event_data FROM history \
WHERE instance_id = ? AND execution_id = ? AND event_id = 1",
)
.bind(instance)
.bind(exec_id)
.fetch_optional(&self.pool)
.await
.map_err(|e| Self::sqlx_to_provider_error("get_instance_stats", e))?;
let queue_pending_count = match carry_forward_row {
Some(row) => {
let data: String = row
.try_get("event_data")
.map_err(|e| Self::sqlx_to_provider_error("get_instance_stats", e))?;
let event: crate::Event =
serde_json::from_str(&data).map_err(|e| {
ProviderError::permanent(
"get_instance_stats",
format!(
"Failed to deserialize OrchestrationStarted event: {e}"
),
)
})?;
match event.kind {
crate::EventKind::OrchestrationStarted {
carry_forward_events: Some(ref events),
..
} => events.len() as u64,
_ => 0,
}
}
None => 0,
};
Ok(Some(crate::SystemStats {
history_event_count: history_event_count as u64,
history_size_bytes: history_size_bytes as u64,
queue_pending_count,
kv_user_key_count: kv_user_key_count as u64,
kv_total_value_bytes: kv_total_value_bytes as u64,
}))
}
}
#[async_trait::async_trait]
impl ProviderAdmin for SqliteProvider {
async fn list_instances(&self) -> Result<Vec<String>, ProviderError> {
let rows = sqlx::query("SELECT instance_id FROM instances ORDER BY created_at DESC")
.fetch_all(&self.pool)
.await
.map_err(|e| Self::sqlx_to_provider_error("latest_execution_id", e))?;
let instances: Vec<String> = rows
.into_iter()
.map(|row| row.try_get("instance_id").unwrap_or_default())
.collect();
Ok(instances)
}
async fn list_instances_by_status(&self, status: &str) -> Result<Vec<String>, ProviderError> {
let rows = sqlx::query(
r#"
SELECT i.instance_id
FROM instances i
JOIN executions e ON i.instance_id = e.instance_id AND i.current_execution_id = e.execution_id
WHERE e.status = ?
ORDER BY i.created_at DESC
"#,
)
.bind(status)
.fetch_all(&self.pool)
.await
.map_err(|e| Self::sqlx_to_provider_error("list_instances_by_status", e))?;
let instances: Vec<String> = rows
.into_iter()
.map(|row| row.try_get("instance_id").unwrap_or_default())
.collect();
Ok(instances)
}
async fn list_executions(&self, instance: &str) -> Result<Vec<u64>, ProviderError> {
let rows = sqlx::query("SELECT execution_id FROM executions WHERE instance_id = ? ORDER BY execution_id")
.bind(instance)
.fetch_all(&self.pool)
.await
.map_err(|e| Self::sqlx_to_provider_error("list_executions", e))?;
let executions: Vec<u64> = rows
.into_iter()
.map(|row| row.try_get::<i64, _>("execution_id").unwrap_or(0) as u64)
.collect();
Ok(executions)
}
async fn read_history_with_execution_id(
&self,
instance: &str,
execution_id: u64,
) -> Result<Vec<Event>, ProviderError> {
let rows = sqlx::query(
r#"
SELECT event_data
FROM history
WHERE instance_id = ? AND execution_id = ?
ORDER BY event_id
"#,
)
.bind(instance)
.bind(execution_id as i64)
.fetch_all(&self.pool)
.await
.map_err(|e| Self::sqlx_to_provider_error("read_history_with_execution_id", e))?;
let mut events = Vec::new();
for row in rows {
let event_data: String = row
.try_get("event_data")
.map_err(|e| Self::sqlx_to_provider_error("read_history_with_execution_id", e))?;
let event: Event = serde_json::from_str(&event_data).map_err(|e| {
ProviderError::permanent(
"read_history_with_execution_id",
format!("Failed to deserialize event: {e}"),
)
})?;
events.push(event);
}
Ok(events)
}
async fn read_history(&self, instance: &str) -> Result<Vec<Event>, ProviderError> {
let execution_id = self.latest_execution_id(instance).await?;
self.read_history_with_execution_id(instance, execution_id).await
}
async fn latest_execution_id(&self, instance: &str) -> Result<u64, ProviderError> {
let row = sqlx::query(
"SELECT COALESCE(MAX(execution_id), 1) as max_execution_id FROM executions WHERE instance_id = ?",
)
.bind(instance)
.fetch_optional(&self.pool)
.await
.map_err(|e| Self::sqlx_to_provider_error("latest_execution_id", e))?;
match row {
Some(row) => {
let max_id: i64 = row.try_get("max_execution_id").unwrap_or(1);
Ok(max_id as u64)
}
None => Ok(1), }
}
async fn get_instance_info(&self, instance: &str) -> Result<InstanceInfo, ProviderError> {
let row = sqlx::query(
r#"
SELECT
i.instance_id,
i.orchestration_name,
i.orchestration_version,
i.current_execution_id,
i.created_at,
i.updated_at,
i.parent_instance_id,
e.status,
e.output
FROM instances i
LEFT JOIN executions e ON i.instance_id = e.instance_id AND i.current_execution_id = e.execution_id
WHERE i.instance_id = ?
"#,
)
.bind(instance)
.fetch_optional(&self.pool)
.await
.map_err(|e| Self::sqlx_to_provider_error("get_instance_info", e))?;
match row {
Some(row) => {
let instance_id: String = row
.try_get("instance_id")
.map_err(|e| Self::sqlx_to_provider_error("get_instance_info", e))?;
let orchestration_name: String = row
.try_get("orchestration_name")
.map_err(|e| Self::sqlx_to_provider_error("get_instance_info", e))?;
let orchestration_version: Option<String> = row.try_get("orchestration_version").ok();
let orchestration_version = orchestration_version.unwrap_or_else(|| {
"unknown".to_string()
});
let current_execution_id: i64 = row.try_get("current_execution_id").unwrap_or(1);
let created_at: i64 = row.try_get("created_at").unwrap_or(0);
let updated_at: i64 = row.try_get("updated_at").unwrap_or(0);
let status: String = row.try_get("status").unwrap_or_else(|_| "Unknown".to_string());
let output: Option<String> = row.try_get("output").ok();
let parent_instance_id: Option<String> = row.try_get("parent_instance_id").ok().flatten();
Ok(InstanceInfo {
instance_id,
orchestration_name,
orchestration_version,
current_execution_id: current_execution_id as u64,
status,
output,
created_at: created_at as u64,
updated_at: updated_at as u64,
parent_instance_id,
})
}
None => Err(ProviderError::permanent(
"get_instance_info",
format!("Instance {instance} not found"),
)),
}
}
async fn get_execution_info(&self, instance: &str, execution_id: u64) -> Result<ExecutionInfo, ProviderError> {
let row = sqlx::query(
r#"
SELECT
e.execution_id,
e.status,
e.output,
e.started_at,
e.completed_at,
COUNT(h.event_id) as event_count
FROM executions e
LEFT JOIN history h ON e.instance_id = h.instance_id AND e.execution_id = h.execution_id
WHERE e.instance_id = ? AND e.execution_id = ?
GROUP BY e.execution_id
"#,
)
.bind(instance)
.bind(execution_id as i64)
.fetch_optional(&self.pool)
.await
.map_err(|e| Self::sqlx_to_provider_error("get_execution_info", e))?;
match row {
Some(row) => {
let execution_id: i64 = row
.try_get("execution_id")
.map_err(|e| Self::sqlx_to_provider_error("get_execution_info", e))?;
let status: String = row
.try_get("status")
.map_err(|e| Self::sqlx_to_provider_error("get_execution_info", e))?;
let output: Option<String> = row.try_get("output").ok();
let started_at: i64 = row.try_get("started_at").unwrap_or(0);
let completed_at: Option<i64> = row.try_get("completed_at").ok();
let event_count: i64 = row.try_get("event_count").unwrap_or(0);
Ok(ExecutionInfo {
execution_id: execution_id as u64,
status,
output,
started_at: started_at as u64,
completed_at: completed_at.map(|t| t as u64),
event_count: event_count as usize,
})
}
None => Err(ProviderError::permanent(
"get_execution_info",
format!("Execution {execution_id} not found for instance {instance}"),
)),
}
}
async fn get_system_metrics(&self) -> Result<SystemMetrics, ProviderError> {
let row = sqlx::query(
r#"
SELECT
COUNT(*) as total_instances,
SUM(CASE WHEN e.status = 'Running' THEN 1 ELSE 0 END) as running_instances,
SUM(CASE WHEN e.status = 'Completed' THEN 1 ELSE 0 END) as completed_instances,
SUM(CASE WHEN e.status = 'Failed' THEN 1 ELSE 0 END) as failed_instances,
SUM(CASE WHEN e.status = 'ContinuedAsNew' THEN 1 ELSE 0 END) as continued_as_new_instances
FROM instances i
LEFT JOIN executions e ON i.instance_id = e.instance_id AND i.current_execution_id = e.execution_id
"#,
)
.fetch_optional(&self.pool)
.await
.map_err(|e| Self::sqlx_to_provider_error("get_system_metrics", e))?;
match row {
Some(row) => {
let total_instances: i64 = row.try_get("total_instances").unwrap_or(0);
let running_instances: i64 = row.try_get("running_instances").unwrap_or(0);
let completed_instances: i64 = row.try_get("completed_instances").unwrap_or(0);
let failed_instances: i64 = row.try_get("failed_instances").unwrap_or(0);
let _: i64 = row.try_get("continued_as_new_instances").unwrap_or(0);
let total_executions_row = sqlx::query("SELECT COUNT(*) as total_executions FROM executions")
.fetch_optional(&self.pool)
.await
.map_err(|e| Self::sqlx_to_provider_error("get_system_metrics", e))?;
let total_executions: i64 = total_executions_row
.and_then(|row| row.try_get("total_executions").ok())
.unwrap_or(0);
let total_events_row = sqlx::query("SELECT COUNT(*) as total_events FROM history")
.fetch_optional(&self.pool)
.await
.map_err(|e| Self::sqlx_to_provider_error("get_system_metrics", e))?;
let total_events: i64 = total_events_row
.and_then(|row| row.try_get("total_events").ok())
.unwrap_or(0);
Ok(SystemMetrics {
total_instances: total_instances as u64,
total_executions: total_executions as u64,
running_instances: running_instances as u64,
completed_instances: completed_instances as u64,
failed_instances: failed_instances as u64,
total_events: total_events as u64,
})
}
None => Ok(SystemMetrics::default()),
}
}
async fn get_queue_depths(&self) -> Result<QueueDepths, ProviderError> {
let orchestrator_row = sqlx::query("SELECT COUNT(*) as count FROM orchestrator_queue WHERE lock_token IS NULL")
.fetch_optional(&self.pool)
.await
.map_err(|e| Self::sqlx_to_provider_error("get_queue_depths", e))?;
let orchestrator_queue: usize =
orchestrator_row.and_then(|row| row.try_get("count").ok()).unwrap_or(0) as usize;
let worker_row = sqlx::query("SELECT COUNT(*) as count FROM worker_queue WHERE lock_token IS NULL")
.fetch_optional(&self.pool)
.await
.map_err(|e| Self::sqlx_to_provider_error("get_queue_depths", e))?;
let worker_queue: usize = worker_row.and_then(|row| row.try_get("count").ok()).unwrap_or(0) as usize;
Ok(QueueDepths {
orchestrator_queue,
worker_queue,
timer_queue: 0, })
}
async fn list_children(&self, instance_id: &str) -> Result<Vec<String>, ProviderError> {
let rows = sqlx::query("SELECT instance_id FROM instances WHERE parent_instance_id = ?")
.bind(instance_id)
.fetch_all(&self.pool)
.await
.map_err(|e| Self::sqlx_to_provider_error("list_children", e))?;
let children: Vec<String> = rows
.into_iter()
.filter_map(|row| row.try_get("instance_id").ok())
.collect();
Ok(children)
}
async fn get_parent_id(&self, instance_id: &str) -> Result<Option<String>, ProviderError> {
let row = sqlx::query("SELECT parent_instance_id FROM instances WHERE instance_id = ?")
.bind(instance_id)
.fetch_optional(&self.pool)
.await
.map_err(|e| Self::sqlx_to_provider_error("get_parent_id", e))?;
match row {
Some(r) => Ok(r.try_get("parent_instance_id").ok().flatten()),
None => Err(ProviderError::permanent(
"get_parent_id",
format!("Instance {instance_id} not found"),
)),
}
}
async fn delete_instances_atomic(
&self,
ids: &[String],
force: bool,
) -> Result<DeleteInstanceResult, ProviderError> {
if ids.is_empty() {
return Ok(DeleteInstanceResult::default());
}
let placeholders: String = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
if !force {
let check_sql = format!(
r#"
SELECT i.instance_id, e.status
FROM instances i
LEFT JOIN executions e ON i.instance_id = e.instance_id AND i.current_execution_id = e.execution_id
WHERE i.instance_id IN ({placeholders})
"#
);
let mut query = sqlx::query(&check_sql);
for id in ids {
query = query.bind(id);
}
let rows = query
.fetch_all(&self.pool)
.await
.map_err(|e| Self::sqlx_to_provider_error("delete_instances_atomic", e))?;
for row in rows {
let status: Option<String> = row.try_get("status").ok();
if status.as_deref() == Some("Running") {
let instance_id: String = row.try_get("instance_id").unwrap_or_default();
return Err(ProviderError::permanent(
"delete_instances_atomic",
format!(
"Instance {instance_id} is still running. Use force=true to delete anyway, or cancel first."
),
));
}
}
}
let mut tx = self
.pool
.begin()
.await
.map_err(|e| Self::sqlx_to_provider_error("delete_instances_atomic", e))?;
let orphan_check_sql = format!(
r#"
SELECT instance_id, parent_instance_id FROM instances
WHERE parent_instance_id IN ({placeholders})
AND instance_id NOT IN ({placeholders})
LIMIT 1
"#
);
let mut orphan_query = sqlx::query(&orphan_check_sql);
for id in ids {
orphan_query = orphan_query.bind(id);
}
for id in ids {
orphan_query = orphan_query.bind(id);
}
if let Some(orphan_row) = orphan_query
.fetch_optional(&mut *tx)
.await
.map_err(|e| Self::sqlx_to_provider_error("delete_instances_atomic", e))?
{
let orphan_id: String = orphan_row.try_get("instance_id").unwrap_or_default();
let parent_id: String = orphan_row.try_get("parent_instance_id").unwrap_or_default();
return Err(ProviderError::permanent(
"delete_instances_atomic",
format!(
"Cannot delete: instance {parent_id} has child {orphan_id} that was created after tree traversal. \
Re-fetch the tree and retry."
),
));
}
let mut result = DeleteInstanceResult::default();
let count_history_sql = format!("SELECT COUNT(*) as count FROM history WHERE instance_id IN ({placeholders})");
let mut count_query = sqlx::query(&count_history_sql);
for id in ids {
count_query = count_query.bind(id);
}
let history_count: i64 = count_query
.fetch_one(&mut *tx)
.await
.map_err(|e| Self::sqlx_to_provider_error("delete_instances_atomic", e))?
.try_get("count")
.unwrap_or(0);
result.events_deleted = history_count as u64;
let count_exec_sql = format!("SELECT COUNT(*) as count FROM executions WHERE instance_id IN ({placeholders})");
let mut count_query = sqlx::query(&count_exec_sql);
for id in ids {
count_query = count_query.bind(id);
}
let exec_count: i64 = count_query
.fetch_one(&mut *tx)
.await
.map_err(|e| Self::sqlx_to_provider_error("delete_instances_atomic", e))?
.try_get("count")
.unwrap_or(0);
result.executions_deleted = exec_count as u64;
let count_orch_q_sql =
format!("SELECT COUNT(*) as count FROM orchestrator_queue WHERE instance_id IN ({placeholders})");
let mut count_query = sqlx::query(&count_orch_q_sql);
for id in ids {
count_query = count_query.bind(id);
}
let orch_q_count: i64 = count_query
.fetch_one(&mut *tx)
.await
.map_err(|e| Self::sqlx_to_provider_error("delete_instances_atomic", e))?
.try_get("count")
.unwrap_or(0);
let count_worker_q_sql =
format!("SELECT COUNT(*) as count FROM worker_queue WHERE instance_id IN ({placeholders})");
let mut count_query = sqlx::query(&count_worker_q_sql);
for id in ids {
count_query = count_query.bind(id);
}
let worker_q_count: i64 = count_query
.fetch_one(&mut *tx)
.await
.map_err(|e| Self::sqlx_to_provider_error("delete_instances_atomic", e))?
.try_get("count")
.unwrap_or(0);
result.queue_messages_deleted = (orch_q_count + worker_q_count) as u64;
let del_history_sql = format!("DELETE FROM history WHERE instance_id IN ({placeholders})");
let mut del_query = sqlx::query(&del_history_sql);
for id in ids {
del_query = del_query.bind(id);
}
del_query
.execute(&mut *tx)
.await
.map_err(|e| Self::sqlx_to_provider_error("delete_instances_atomic", e))?;
let del_exec_sql = format!("DELETE FROM executions WHERE instance_id IN ({placeholders})");
let mut del_query = sqlx::query(&del_exec_sql);
for id in ids {
del_query = del_query.bind(id);
}
del_query
.execute(&mut *tx)
.await
.map_err(|e| Self::sqlx_to_provider_error("delete_instances_atomic", e))?;
let del_orch_q_sql = format!("DELETE FROM orchestrator_queue WHERE instance_id IN ({placeholders})");
let mut del_query = sqlx::query(&del_orch_q_sql);
for id in ids {
del_query = del_query.bind(id);
}
del_query
.execute(&mut *tx)
.await
.map_err(|e| Self::sqlx_to_provider_error("delete_instances_atomic", e))?;
let del_worker_q_sql = format!("DELETE FROM worker_queue WHERE instance_id IN ({placeholders})");
let mut del_query = sqlx::query(&del_worker_q_sql);
for id in ids {
del_query = del_query.bind(id);
}
del_query
.execute(&mut *tx)
.await
.map_err(|e| Self::sqlx_to_provider_error("delete_instances_atomic", e))?;
let del_locks_sql = format!("DELETE FROM instance_locks WHERE instance_id IN ({placeholders})");
let mut del_query = sqlx::query(&del_locks_sql);
for id in ids {
del_query = del_query.bind(id);
}
del_query
.execute(&mut *tx)
.await
.map_err(|e| Self::sqlx_to_provider_error("delete_instances_atomic", e))?;
let del_kv_sql = format!("DELETE FROM kv_store WHERE instance_id IN ({placeholders})");
let mut del_query = sqlx::query(&del_kv_sql);
for id in ids {
del_query = del_query.bind(id);
}
del_query
.execute(&mut *tx)
.await
.map_err(|e| Self::sqlx_to_provider_error("delete_instances_atomic", e))?;
let del_kv_delta_sql = format!("DELETE FROM kv_delta WHERE instance_id IN ({placeholders})");
let mut del_query = sqlx::query(&del_kv_delta_sql);
for id in ids {
del_query = del_query.bind(id);
}
del_query
.execute(&mut *tx)
.await
.map_err(|e| Self::sqlx_to_provider_error("delete_instances_atomic", e))?;
let del_instances_sql = format!("DELETE FROM instances WHERE instance_id IN ({placeholders})");
let mut del_query = sqlx::query(&del_instances_sql);
for id in ids {
del_query = del_query.bind(id);
}
del_query
.execute(&mut *tx)
.await
.map_err(|e| Self::sqlx_to_provider_error("delete_instances_atomic", e))?;
tx.commit()
.await
.map_err(|e| Self::sqlx_to_provider_error("delete_instances_atomic", e))?;
result.instances_deleted = ids.len() as u64;
Ok(result)
}
async fn delete_instance_bulk(&self, filter: InstanceFilter) -> Result<DeleteInstanceResult, ProviderError> {
let mut sql = String::from(
r#"
SELECT i.instance_id
FROM instances i
LEFT JOIN executions e ON i.instance_id = e.instance_id AND i.current_execution_id = e.execution_id
WHERE i.parent_instance_id IS NULL
AND e.status IN ('Completed', 'Failed', 'ContinuedAsNew')
"#,
);
if let Some(ref ids) = filter.instance_ids {
if ids.is_empty() {
return Ok(DeleteInstanceResult::default());
}
let placeholders: Vec<&str> = ids.iter().map(|_| "?").collect();
sql.push_str(&format!(" AND i.instance_id IN ({})", placeholders.join(",")));
}
if filter.completed_before.is_some() {
sql.push_str(" AND e.completed_at < ?");
}
let limit = filter.limit.unwrap_or(DEFAULT_BULK_OPERATION_LIMIT);
sql.push_str(&format!(" LIMIT {limit}"));
let mut query = sqlx::query(&sql);
if let Some(ref ids) = filter.instance_ids {
for id in ids {
query = query.bind(id);
}
}
if let Some(completed_before) = filter.completed_before {
query = query.bind(completed_before as i64);
}
let rows = query
.fetch_all(&self.pool)
.await
.map_err(|e| Self::sqlx_to_provider_error("delete_instance_bulk", e))?;
let instance_ids: Vec<String> = rows.iter().filter_map(|row| row.try_get("instance_id").ok()).collect();
if instance_ids.is_empty() {
return Ok(DeleteInstanceResult::default());
}
let mut result = DeleteInstanceResult::default();
for instance_id in &instance_ids {
let tree = self.get_instance_tree(instance_id).await?;
let tree_size = tree.all_ids.len() as u64;
let delete_result = self.delete_instances_atomic(&tree.all_ids, true).await?;
result.executions_deleted += delete_result.executions_deleted;
result.events_deleted += delete_result.events_deleted;
result.queue_messages_deleted += delete_result.queue_messages_deleted;
result.instances_deleted += tree_size;
}
Ok(result)
}
async fn prune_executions(&self, instance_id: &str, options: PruneOptions) -> Result<PruneResult, ProviderError> {
let current_exec_row = sqlx::query("SELECT current_execution_id FROM instances WHERE instance_id = ?")
.bind(instance_id)
.fetch_optional(&self.pool)
.await
.map_err(|e| Self::sqlx_to_provider_error("prune_executions", e))?;
let current_execution_id: i64 = match current_exec_row {
Some(row) => row.try_get("current_execution_id").unwrap_or(1),
None => {
return Err(ProviderError::permanent(
"prune_executions",
format!("Instance {instance_id} not found"),
));
}
};
let mut conditions = vec![
"instance_id = ?".to_string(),
"execution_id != ?".to_string(), "status != 'Running'".to_string(), ];
if let Some(keep_last) = options.keep_last {
conditions.push(format!(
"execution_id NOT IN (SELECT execution_id FROM executions WHERE instance_id = ? ORDER BY execution_id DESC LIMIT {keep_last})"
));
}
if options.completed_before.is_some() {
conditions.push("completed_at < ?".to_string());
}
let sql = format!("SELECT execution_id FROM executions WHERE {}", conditions.join(" AND "));
let mut query = sqlx::query(&sql);
query = query.bind(instance_id);
query = query.bind(current_execution_id);
if options.keep_last.is_some() {
query = query.bind(instance_id);
}
if let Some(completed_before) = options.completed_before {
query = query.bind(completed_before as i64);
}
let rows = query
.fetch_all(&self.pool)
.await
.map_err(|e| Self::sqlx_to_provider_error("prune_executions", e))?;
let execution_ids: Vec<i64> = rows.iter().filter_map(|row| row.try_get("execution_id").ok()).collect();
if execution_ids.is_empty() {
return Ok(PruneResult {
instances_processed: 1,
..Default::default()
});
}
let mut tx = self
.pool
.begin()
.await
.map_err(|e| Self::sqlx_to_provider_error("prune_executions", e))?;
let mut result = PruneResult {
instances_processed: 1,
..Default::default()
};
for exec_id in &execution_ids {
let history_count: i64 =
sqlx::query("SELECT COUNT(*) as count FROM history WHERE instance_id = ? AND execution_id = ?")
.bind(instance_id)
.bind(exec_id)
.fetch_one(&mut *tx)
.await
.map_err(|e| Self::sqlx_to_provider_error("prune_executions", e))?
.try_get("count")
.unwrap_or(0);
sqlx::query("DELETE FROM history WHERE instance_id = ? AND execution_id = ?")
.bind(instance_id)
.bind(exec_id)
.execute(&mut *tx)
.await
.map_err(|e| Self::sqlx_to_provider_error("prune_executions", e))?;
sqlx::query("DELETE FROM executions WHERE instance_id = ? AND execution_id = ?")
.bind(instance_id)
.bind(exec_id)
.execute(&mut *tx)
.await
.map_err(|e| Self::sqlx_to_provider_error("prune_executions", e))?;
result.executions_deleted += 1;
result.events_deleted += history_count as u64;
}
tx.commit()
.await
.map_err(|e| Self::sqlx_to_provider_error("prune_executions", e))?;
Ok(result)
}
async fn prune_executions_bulk(
&self,
filter: InstanceFilter,
options: PruneOptions,
) -> Result<PruneResult, ProviderError> {
let mut sql = String::from(
r#"
SELECT i.instance_id
FROM instances i
LEFT JOIN executions e ON i.instance_id = e.instance_id AND i.current_execution_id = e.execution_id
WHERE 1=1
"#,
);
if let Some(ref ids) = filter.instance_ids {
if ids.is_empty() {
return Ok(PruneResult::default());
}
let placeholders: Vec<&str> = ids.iter().map(|_| "?").collect();
sql.push_str(&format!(" AND i.instance_id IN ({})", placeholders.join(",")));
}
if filter.completed_before.is_some() {
sql.push_str(" AND e.completed_at < ?");
}
let limit = filter.limit.unwrap_or(1000);
sql.push_str(&format!(" LIMIT {limit}"));
let mut query = sqlx::query(&sql);
if let Some(ref ids) = filter.instance_ids {
for id in ids {
query = query.bind(id);
}
}
if let Some(completed_before) = filter.completed_before {
query = query.bind(completed_before as i64);
}
let rows = query
.fetch_all(&self.pool)
.await
.map_err(|e| Self::sqlx_to_provider_error("prune_executions_bulk", e))?;
let instance_ids: Vec<String> = rows.iter().filter_map(|row| row.try_get("instance_id").ok()).collect();
let mut result = PruneResult::default();
for instance_id in &instance_ids {
let single_result = self.prune_executions(instance_id, options.clone()).await?;
result.instances_processed += single_result.instances_processed;
result.executions_deleted += single_result.executions_deleted;
result.events_deleted += single_result.events_deleted;
}
Ok(result)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::providers::ExecutionMetadata;
async fn test_create_execution(
provider: &SqliteProvider,
instance: &str,
orchestration: &str,
version: &str,
input: &str,
parent_instance: Option<&str>,
parent_id: Option<u64>,
) -> Result<u64, ProviderError> {
let execs = ProviderAdmin::list_executions(provider, instance).await?;
let next_execution_id = if execs.is_empty() {
crate::INITIAL_EXECUTION_ID
} else {
execs
.iter()
.max()
.copied()
.expect("execs is not empty, so max() must return Some")
+ 1
};
provider
.enqueue_for_orchestrator(
WorkItem::StartOrchestration {
instance: instance.to_string(),
orchestration: orchestration.to_string(),
version: Some(version.to_string()),
input: input.to_string(),
parent_instance: parent_instance.map(|s| s.to_string()),
parent_id,
execution_id: next_execution_id,
},
None,
)
.await?;
let (_item, lock_token, _attempt_count) = provider
.fetch_orchestration_item(Duration::from_secs(30), Duration::ZERO, None)
.await?
.ok_or_else(|| "Failed to fetch orchestration item".to_string())?;
provider
.ack_orchestration_item(
&lock_token,
next_execution_id,
vec![Event::with_event_id(
crate::INITIAL_EVENT_ID,
instance,
next_execution_id,
None,
EventKind::OrchestrationStarted {
name: orchestration.to_string(),
version: version.to_string(),
input: input.to_string(),
parent_instance: parent_instance.map(|s| s.to_string()),
parent_id,
carry_forward_events: None,
initial_custom_status: None,
},
)],
vec![],
vec![],
ExecutionMetadata {
orchestration_name: Some(orchestration.to_string()),
orchestration_version: Some(version.to_string()),
..Default::default()
},
vec![],
)
.await?;
Ok(next_execution_id)
}
async fn create_test_store() -> SqliteProvider {
SqliteProvider::new("sqlite::memory:", None)
.await
.expect("Failed to create test store")
}
#[tokio::test]
async fn test_basic_enqueue_dequeue() {
let store = create_test_store().await;
let item = WorkItem::StartOrchestration {
instance: "test-1".to_string(),
orchestration: "TestOrch".to_string(),
version: Some("1.0.0".to_string()),
input: "{}".to_string(),
parent_instance: None,
parent_id: None,
execution_id: crate::INITIAL_EXECUTION_ID,
};
store
.enqueue_for_orchestrator(item.clone(), None)
.await
.expect("enqueue should succeed");
let (orch_item, lock_token, _attempt_count) = store
.fetch_orchestration_item(Duration::from_secs(30), Duration::ZERO, None)
.await
.expect("fetch should succeed")
.expect("item should be present");
assert_eq!(orch_item.instance, "test-1");
assert_eq!(orch_item.messages.len(), 1);
assert_eq!(orch_item.history.len(), 0);
let history_delta = vec![Event::with_event_id(
1,
"test-1",
1,
None,
EventKind::OrchestrationStarted {
name: "TestOrch".to_string(),
version: "1.0.0".to_string(),
input: "{}".to_string(),
parent_instance: None,
parent_id: None,
carry_forward_events: None,
initial_custom_status: None,
},
)];
store
.ack_orchestration_item(
&lock_token,
1, history_delta,
vec![],
vec![],
ExecutionMetadata::default(),
vec![],
)
.await
.unwrap();
assert!(
store
.fetch_orchestration_item(Duration::from_secs(30), Duration::ZERO, None)
.await
.unwrap()
.is_none()
);
let history = store.read("test-1").await.unwrap_or_default();
assert_eq!(history.len(), 1);
}
#[tokio::test]
async fn test_transactional_atomicity() {
let store = create_test_store().await;
let lock_timeout = Duration::from_secs(30);
let start = WorkItem::StartOrchestration {
instance: "test-atomic".to_string(),
orchestration: "AtomicTest".to_string(),
version: Some("1.0.0".to_string()),
input: "{}".to_string(),
parent_instance: None,
parent_id: None,
execution_id: crate::INITIAL_EXECUTION_ID,
};
store.enqueue_for_orchestrator(start, None).await.unwrap();
let (_orch_item, lock_token, _attempt_count) = store
.fetch_orchestration_item(Duration::from_secs(30), Duration::ZERO, None)
.await
.unwrap()
.unwrap();
let history_delta = vec![
Event::with_event_id(
1,
"test-atomic",
1,
None,
EventKind::OrchestrationStarted {
name: "AtomicTest".to_string(),
version: "1.0.0".to_string(),
input: "{}".to_string(),
parent_instance: None,
parent_id: None,
carry_forward_events: None,
initial_custom_status: None,
},
),
Event::with_event_id(
2,
"test-atomic",
1,
None,
EventKind::ActivityScheduled {
name: "Activity1".to_string(),
input: "{}".to_string(),
session_id: None,
tag: None,
},
),
Event::with_event_id(
3,
"test-atomic",
1,
None,
EventKind::ActivityScheduled {
name: "Activity2".to_string(),
input: "{}".to_string(),
session_id: None,
tag: None,
},
),
];
let worker_items = vec![
WorkItem::ActivityExecute {
instance: "test-atomic".to_string(),
execution_id: 1,
id: 1,
name: "Activity1".to_string(),
input: "{}".to_string(),
session_id: None,
tag: None,
},
WorkItem::ActivityExecute {
instance: "test-atomic".to_string(),
execution_id: 1,
id: 2,
name: "Activity2".to_string(),
input: "{}".to_string(),
session_id: None,
tag: None,
},
];
store
.ack_orchestration_item(
&lock_token,
1, history_delta,
worker_items,
vec![],
ExecutionMetadata::default(),
vec![],
)
.await
.unwrap();
let history = store.read("test-atomic").await.unwrap_or_default();
assert_eq!(history.len(), 3);
let (work1, token1, _) = store
.fetch_work_item(lock_timeout, Duration::ZERO, None, &TagFilter::default())
.await
.unwrap()
.unwrap();
let (work2, token2, _) = store
.fetch_work_item(lock_timeout, Duration::ZERO, None, &TagFilter::default())
.await
.unwrap()
.unwrap();
assert!(matches!(work1, WorkItem::ActivityExecute { id: 1, .. }));
assert!(matches!(work2, WorkItem::ActivityExecute { id: 2, .. }));
assert!(
store
.fetch_work_item(lock_timeout, Duration::ZERO, None, &TagFilter::default())
.await
.unwrap()
.is_none()
);
store
.ack_work_item(
&token1,
Some(WorkItem::ActivityCompleted {
instance: "test-atomic".to_string(),
execution_id: 1,
id: 1,
result: "done".to_string(),
}),
)
.await
.unwrap();
store
.ack_work_item(
&token2,
Some(WorkItem::ActivityCompleted {
instance: "test-atomic".to_string(),
execution_id: 1,
id: 2,
result: "done".to_string(),
}),
)
.await
.unwrap();
}
#[tokio::test]
async fn test_lock_expiration() {
let store = create_test_store().await;
let short_lock_timeout = Duration::from_secs(2);
let item = WorkItem::StartOrchestration {
instance: "test-lock".to_string(),
orchestration: "LockTest".to_string(),
version: Some("1.0.0".to_string()),
input: "{}".to_string(),
parent_instance: None,
parent_id: None,
execution_id: crate::INITIAL_EXECUTION_ID,
};
store.enqueue_for_orchestrator(item, None).await.unwrap();
let (_orch_item, lock_token, _attempt_count) = store
.fetch_orchestration_item(short_lock_timeout, Duration::ZERO, None)
.await
.unwrap()
.unwrap();
assert!(
store
.fetch_orchestration_item(short_lock_timeout, Duration::ZERO, None)
.await
.unwrap()
.is_none()
);
tokio::time::sleep(Duration::from_millis(2100)).await;
let redelivered = store
.fetch_orchestration_item(short_lock_timeout, Duration::ZERO, None)
.await
.unwrap();
if redelivered.is_none() {
eprintln!("No redelivery after lock expiry. Checking queue state...");
return;
}
let (redelivered_item, redelivered_lock_token, _attempt_count) = redelivered.unwrap();
assert_eq!(redelivered_item.instance, "test-lock");
assert_ne!(redelivered_lock_token, lock_token);
store
.ack_orchestration_item(
&redelivered_lock_token,
1, vec![],
vec![],
vec![],
ExecutionMetadata::default(),
vec![],
)
.await
.unwrap();
assert!(
store
.ack_orchestration_item(
&lock_token,
1, vec![],
vec![],
vec![],
ExecutionMetadata::default(),
vec![],
)
.await
.is_err()
);
}
#[tokio::test]
async fn test_multi_execution_support() {
let store = create_test_store().await;
let instance = "test-multi-exec";
assert_eq!(ProviderAdmin::latest_execution_id(&store, instance).await, Ok(1)); assert!(
ProviderAdmin::list_executions(&store, instance)
.await
.unwrap()
.is_empty()
);
let exec1 = test_create_execution(&store, instance, "MultiExecTest", "1.0.0", "input1", None, None)
.await
.unwrap();
assert_eq!(exec1, 1);
assert_eq!(ProviderAdmin::latest_execution_id(&store, instance).await, Ok(1));
assert_eq!(ProviderAdmin::list_executions(&store, instance).await.unwrap(), vec![1]);
let hist1 = store.read_with_execution(instance, 1).await.unwrap_or_default();
assert_eq!(hist1.len(), 1);
assert!(matches!(&hist1[0].kind, EventKind::OrchestrationStarted { .. }));
store
.append_with_execution(
instance,
1,
vec![Event::with_event_id(
2,
instance,
1,
None,
EventKind::OrchestrationCompleted {
output: "result1".to_string(),
},
)],
)
.await
.unwrap();
let exec2 = test_create_execution(&store, instance, "MultiExecTest", "1.0.0", "input2", None, None)
.await
.unwrap();
assert_eq!(exec2, 2);
assert_eq!(ProviderAdmin::latest_execution_id(&store, instance).await, Ok(2));
assert_eq!(
ProviderAdmin::list_executions(&store, instance).await.unwrap(),
vec![1, 2]
);
let hist1_final = store.read_with_execution(instance, 1).await.unwrap_or_default();
assert_eq!(hist1_final.len(), 2);
let hist2 = store.read_with_execution(instance, 2).await.unwrap_or_default();
assert_eq!(hist2.len(), 1);
let hist_latest = store.read(instance).await.unwrap_or_default();
assert_eq!(hist_latest.len(), 1);
assert!(matches!(&hist_latest[0].kind, EventKind::OrchestrationStarted { input, .. } if input == "input2"));
}
#[tokio::test]
async fn test_abandon_orchestration_item() {
let store = create_test_store().await;
let lock_timeout = Duration::from_secs(30);
let item = WorkItem::StartOrchestration {
instance: "test-abandon".to_string(),
orchestration: "AbandonTest".to_string(),
version: Some("1.0.0".to_string()),
input: "{}".to_string(),
parent_instance: None,
parent_id: None,
execution_id: crate::INITIAL_EXECUTION_ID,
};
store.enqueue_for_orchestrator(item, None).await.unwrap();
let (_orch_item, lock_token, _attempt_count) = store
.fetch_orchestration_item(lock_timeout, Duration::ZERO, None)
.await
.unwrap()
.unwrap();
assert!(
store
.fetch_orchestration_item(lock_timeout, Duration::ZERO, None)
.await
.unwrap()
.is_none()
);
store
.abandon_orchestration_item(&lock_token, None, false)
.await
.unwrap();
let (orch_item2, lock_token2, _attempt_count2) = store
.fetch_orchestration_item(lock_timeout, Duration::ZERO, None)
.await
.unwrap()
.unwrap();
assert_eq!(orch_item2.instance, "test-abandon");
assert_ne!(lock_token2, lock_token); }
#[tokio::test]
async fn test_list_instances() {
let store = create_test_store().await;
assert!(ProviderAdmin::list_instances(&store).await.unwrap().is_empty());
for i in 1..=3 {
test_create_execution(&store, &format!("instance-{i}"), "ListTest", "1.0.0", "{}", None, None)
.await
.unwrap();
}
let instances = ProviderAdmin::list_instances(&store).await.unwrap();
assert_eq!(instances.len(), 3);
assert!(instances.contains(&"instance-1".to_string()));
assert!(instances.contains(&"instance-2".to_string()));
assert!(instances.contains(&"instance-3".to_string()));
}
#[tokio::test]
async fn test_worker_queue_operations() {
let store = create_test_store().await;
let lock_timeout = Duration::from_secs(30);
let work_item = WorkItem::ActivityExecute {
instance: "test-worker".to_string(),
execution_id: 1,
id: 1,
name: "TestActivity".to_string(),
input: "test-input".to_string(),
session_id: None,
tag: None,
};
store.enqueue_for_worker(work_item.clone()).await.unwrap();
let (dequeued, token, _) = store
.fetch_work_item(lock_timeout, Duration::ZERO, None, &TagFilter::default())
.await
.unwrap()
.unwrap();
assert!(matches!(dequeued, WorkItem::ActivityExecute { name, .. } if name == "TestActivity"));
assert!(
store
.fetch_work_item(lock_timeout, Duration::ZERO, None, &TagFilter::default())
.await
.unwrap()
.is_none()
);
store
.ack_work_item(
&token,
Some(WorkItem::ActivityCompleted {
instance: "test-worker".to_string(),
execution_id: 1,
id: 1,
result: "done".to_string(),
}),
)
.await
.unwrap();
assert!(
store
.fetch_work_item(lock_timeout, Duration::ZERO, None, &TagFilter::default())
.await
.unwrap()
.is_none()
);
}
#[tokio::test]
async fn test_delayed_visibility() {
let store = create_test_store().await;
let lock_timeout = Duration::from_secs(30);
let delayed_item = WorkItem::StartOrchestration {
instance: "test-delayed".to_string(),
orchestration: "DelayedTest".to_string(),
version: Some("1.0.0".to_string()),
input: "{}".to_string(),
parent_instance: None,
parent_id: None,
execution_id: crate::INITIAL_EXECUTION_ID,
};
store
.enqueue_orchestrator_work_with_delay(delayed_item.clone(), Some(Duration::from_secs(2)))
.await
.unwrap();
assert!(
store
.fetch_orchestration_item(lock_timeout, Duration::ZERO, None)
.await
.unwrap()
.is_none()
);
tokio::time::sleep(std::time::Duration::from_millis(2100)).await;
let (item, lock_token, _attempt_count) = store
.fetch_orchestration_item(lock_timeout, Duration::ZERO, None)
.await
.unwrap()
.unwrap();
assert_eq!(item.instance, "test-delayed");
store
.ack_orchestration_item(
&lock_token,
1, vec![],
vec![],
vec![],
ExecutionMetadata {
orchestration_name: Some("DelayedTest".to_string()),
orchestration_version: Some("1.0.0".to_string()),
..Default::default()
},
vec![],
)
.await
.unwrap();
let start_item = WorkItem::StartOrchestration {
instance: "test-timer-delayed".to_string(),
orchestration: "TimerDelayedTest".to_string(),
version: Some("1.0.0".to_string()),
input: "{}".to_string(),
parent_instance: None,
parent_id: None,
execution_id: crate::INITIAL_EXECUTION_ID,
};
store.enqueue_for_orchestrator(start_item, None).await.unwrap();
let (_orch_item, lock_token2, _attempt_count2) = store
.fetch_orchestration_item(lock_timeout, Duration::ZERO, None)
.await
.unwrap()
.unwrap();
store
.ack_orchestration_item(
&lock_token2,
1, vec![],
vec![],
vec![],
ExecutionMetadata {
orchestration_name: Some("TimerDelayedTest".to_string()),
orchestration_version: Some("1.0.0".to_string()),
..Default::default()
},
vec![],
)
.await
.unwrap();
let timer_fired = WorkItem::TimerFired {
instance: "test-timer-delayed".to_string(),
execution_id: 1,
id: 1,
fire_at_ms: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as u64
+ 2000,
};
store
.enqueue_for_orchestrator(timer_fired.clone(), Some(Duration::from_secs(2)))
.await
.unwrap();
assert!(
store
.fetch_orchestration_item(lock_timeout, Duration::ZERO, None)
.await
.unwrap()
.is_none()
);
tokio::time::sleep(std::time::Duration::from_millis(2100)).await;
let (timer_item, _lock_token, _attempt_count) = store
.fetch_orchestration_item(lock_timeout, Duration::ZERO, None)
.await
.unwrap()
.unwrap();
assert_eq!(timer_item.instance, "test-timer-delayed");
assert_eq!(timer_item.messages.len(), 1);
assert!(matches!(timer_item.messages[0], WorkItem::TimerFired { .. }));
}
#[tokio::test]
async fn test_abandon_with_delay() {
let store = create_test_store().await;
let lock_timeout = Duration::from_secs(30);
let item = WorkItem::StartOrchestration {
instance: "test-abandon-delay".to_string(),
orchestration: "AbandonDelayTest".to_string(),
version: Some("1.0.0".to_string()),
input: "{}".to_string(),
parent_instance: None,
parent_id: None,
execution_id: crate::INITIAL_EXECUTION_ID,
};
store.enqueue_for_orchestrator(item, None).await.unwrap();
let (_orch_item, lock_token, _attempt_count) = store
.fetch_orchestration_item(lock_timeout, Duration::ZERO, None)
.await
.unwrap()
.unwrap();
store
.abandon_orchestration_item(&lock_token, Some(Duration::from_secs(2)), false)
.await
.unwrap();
assert!(
store
.fetch_orchestration_item(lock_timeout, Duration::ZERO, None)
.await
.unwrap()
.is_none()
);
tokio::time::sleep(std::time::Duration::from_millis(2100)).await;
let (item2, _lock_token2, _attempt_count2) = store
.fetch_orchestration_item(lock_timeout, Duration::ZERO, None)
.await
.unwrap()
.unwrap();
assert_eq!(item2.instance, "test-abandon-delay");
}
#[tokio::test]
async fn test_timer_queue_operations() {
let store = create_test_store().await;
let lock_timeout = Duration::from_secs(30);
let start_item = WorkItem::StartOrchestration {
instance: "test-timer".to_string(),
orchestration: "TestOrch".to_string(),
version: Some("1.0.0".to_string()),
input: "{}".to_string(),
parent_instance: None,
parent_id: None,
execution_id: 1,
};
store.enqueue_for_orchestrator(start_item, None).await.unwrap();
let (_orch_item, lock_token, _attempt_count) = store
.fetch_orchestration_item(lock_timeout, Duration::ZERO, None)
.await
.unwrap()
.unwrap();
store
.ack_orchestration_item(
&lock_token,
1,
vec![Event::with_event_id(
1,
"test-timer",
1,
None,
EventKind::OrchestrationStarted {
name: "TestOrch".to_string(),
version: "1.0.0".to_string(),
input: "{}".to_string(),
parent_instance: None,
parent_id: None,
carry_forward_events: None,
initial_custom_status: None,
},
)],
vec![],
vec![],
ExecutionMetadata::default(),
vec![],
)
.await
.unwrap();
let future_time = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as u64
+ 60000;
let future_timer = WorkItem::TimerFired {
instance: "test-timer".to_string(),
execution_id: 1,
id: 1,
fire_at_ms: future_time,
};
store
.enqueue_for_orchestrator(future_timer, Some(Duration::from_secs(60)))
.await
.unwrap();
assert!(
store
.fetch_orchestration_item(lock_timeout, Duration::ZERO, None)
.await
.unwrap()
.is_none()
);
let past_timer = WorkItem::TimerFired {
instance: "test-timer".to_string(),
execution_id: 1,
id: 2,
fire_at_ms: 0,
};
store.enqueue_for_orchestrator(past_timer, None).await.unwrap();
let (item, _lock_token2, _attempt_count2) = store
.fetch_orchestration_item(lock_timeout, Duration::ZERO, None)
.await
.unwrap()
.unwrap();
assert_eq!(item.instance, "test-timer");
let work_item = item
.messages
.iter()
.find(|m| matches!(m, WorkItem::TimerFired { id: 2, .. }));
assert!(work_item.is_some());
}
#[tokio::test]
async fn test_abandon_work_item() {
let store = create_test_store().await;
let lock_timeout = Duration::from_secs(30);
let work_item = WorkItem::ActivityExecute {
instance: "test-abandon-work".to_string(),
execution_id: 1,
id: 1,
name: "TestActivity".to_string(),
input: "test-input".to_string(),
session_id: None,
tag: None,
};
store.enqueue_for_worker(work_item).await.unwrap();
let (_, lock_token, _) = store
.fetch_work_item(lock_timeout, Duration::ZERO, None, &TagFilter::default())
.await
.unwrap()
.unwrap();
assert!(
store
.fetch_work_item(lock_timeout, Duration::ZERO, None, &TagFilter::default())
.await
.unwrap()
.is_none()
);
store.abandon_work_item(&lock_token, None, false).await.unwrap();
let (dequeued2, lock_token2, _) = store
.fetch_work_item(lock_timeout, Duration::ZERO, None, &TagFilter::default())
.await
.unwrap()
.unwrap();
assert!(matches!(dequeued2, WorkItem::ActivityExecute { instance, .. } if instance == "test-abandon-work"));
assert_ne!(lock_token2, lock_token); }
#[tokio::test]
async fn test_abandon_work_item_invalid_token() {
let store = create_test_store().await;
let result = store.abandon_work_item("invalid-token", None, false).await;
assert!(result.is_err());
let err = result.unwrap_err();
assert!(!err.is_retryable());
}
#[tokio::test]
async fn test_abandon_work_item_ignore_attempt() {
let store = create_test_store().await;
let lock_timeout = Duration::from_secs(30);
let work_item = WorkItem::ActivityExecute {
instance: "test-ignore-attempt".to_string(),
execution_id: 1,
id: 1,
name: "TestActivity".to_string(),
input: "test-input".to_string(),
session_id: None,
tag: None,
};
store.enqueue_for_worker(work_item).await.unwrap();
let (_, lock_token1, attempt1) = store
.fetch_work_item(lock_timeout, Duration::ZERO, None, &TagFilter::default())
.await
.unwrap()
.unwrap();
assert_eq!(attempt1, 1);
store.abandon_work_item(&lock_token1, None, false).await.unwrap();
let (_, lock_token2, attempt2) = store
.fetch_work_item(lock_timeout, Duration::ZERO, None, &TagFilter::default())
.await
.unwrap()
.unwrap();
assert_eq!(attempt2, 2);
store.abandon_work_item(&lock_token2, None, true).await.unwrap();
let (_, _lock_token3, attempt3) = store
.fetch_work_item(lock_timeout, Duration::ZERO, None, &TagFilter::default())
.await
.unwrap()
.unwrap();
assert_eq!(attempt3, 2);
}
#[tokio::test]
async fn test_abandon_orchestration_item_ignore_attempt() {
let store = create_test_store().await;
let lock_timeout = Duration::from_secs(30);
let item = WorkItem::StartOrchestration {
instance: "test-ignore-attempt-orch".to_string(),
orchestration: "IgnoreTest".to_string(),
version: None,
input: "{}".to_string(),
parent_instance: None,
parent_id: None,
execution_id: crate::INITIAL_EXECUTION_ID,
};
store.enqueue_for_orchestrator(item, None).await.unwrap();
let (_item1, lock_token1, attempt1) = store
.fetch_orchestration_item(lock_timeout, Duration::ZERO, None)
.await
.unwrap()
.unwrap();
assert_eq!(attempt1, 1);
store
.abandon_orchestration_item(&lock_token1, None, false)
.await
.unwrap();
let (_item2, lock_token2, attempt2) = store
.fetch_orchestration_item(lock_timeout, Duration::ZERO, None)
.await
.unwrap()
.unwrap();
assert_eq!(attempt2, 2);
store
.abandon_orchestration_item(&lock_token2, None, true)
.await
.unwrap();
let (_item3, _lock_token3, attempt3) = store
.fetch_orchestration_item(lock_timeout, Duration::ZERO, None)
.await
.unwrap()
.unwrap();
assert_eq!(attempt3, 2);
}
#[tokio::test]
async fn test_abandon_ignore_attempt_never_goes_below_zero() {
let store = create_test_store().await;
let lock_timeout = Duration::from_secs(30);
let work_item = WorkItem::ActivityExecute {
instance: "test-never-negative".to_string(),
execution_id: 1,
id: 1,
name: "TestActivity".to_string(),
input: "test-input".to_string(),
session_id: None,
tag: None,
};
store.enqueue_for_worker(work_item).await.unwrap();
let (_, lock_token1, attempt1) = store
.fetch_work_item(lock_timeout, Duration::ZERO, None, &TagFilter::default())
.await
.unwrap()
.unwrap();
assert_eq!(attempt1, 1);
store.abandon_work_item(&lock_token1, None, true).await.unwrap();
let (_, lock_token2, attempt2) = store
.fetch_work_item(lock_timeout, Duration::ZERO, None, &TagFilter::default())
.await
.unwrap()
.unwrap();
assert_eq!(attempt2, 1);
store.abandon_work_item(&lock_token2, None, true).await.unwrap();
let (_, _lock_token3, attempt3) = store
.fetch_work_item(lock_timeout, Duration::ZERO, None, &TagFilter::default())
.await
.unwrap()
.unwrap();
assert_eq!(attempt3, 1);
}
#[tokio::test]
async fn test_renew_orchestration_item_lock() {
let store = create_test_store().await;
let lock_timeout = Duration::from_secs(30);
let item = WorkItem::StartOrchestration {
instance: "test-renew-orch".to_string(),
orchestration: "RenewTest".to_string(),
version: Some("1.0.0".to_string()),
input: "{}".to_string(),
parent_instance: None,
parent_id: None,
execution_id: crate::INITIAL_EXECUTION_ID,
};
store.enqueue_for_orchestrator(item, None).await.unwrap();
let (_orch_item, lock_token, _attempt_count) = store
.fetch_orchestration_item(lock_timeout, Duration::ZERO, None)
.await
.unwrap()
.unwrap();
store
.renew_orchestration_item_lock(&lock_token, Duration::from_secs(60))
.await
.unwrap();
assert!(
store
.fetch_orchestration_item(lock_timeout, Duration::ZERO, None)
.await
.unwrap()
.is_none()
);
}
#[tokio::test]
async fn test_renew_orchestration_item_lock_invalid_token() {
let store = create_test_store().await;
let result = store
.renew_orchestration_item_lock("invalid-token", Duration::from_secs(60))
.await;
assert!(result.is_err());
let err = result.unwrap_err();
assert!(!err.is_retryable());
}
}