use crate::config::Config;
use crate::error::Result;
use crate::store::dblock::DbLock;
use crate::store::{BackendType, ConcurrencyModel};
use crate::store::{
DbStateTable, DbTables, MessageTable, QueueTable, RunRecordTable, SerializedLock,
StepRecordTable, Store, Tables, WorkerTable, WorkflowTable,
};
use crate::Worker;
use crate::types::NewQueueRecord;
use async_trait::async_trait;
use std::sync::Arc;
use turso::{Database, Row};
pub(crate) mod dialect;
pub mod tables;
use self::tables::db_state::TursoDbState;
use self::tables::messages::TursoMessageTable;
use self::tables::queues::TursoQueueTable;
use self::tables::runs::TursoRunRecordTable;
use self::tables::steps::TursoStepRecordTable;
use self::tables::workers::TursoWorkerTable;
use self::tables::workflows::TursoWorkflowTable;
#[derive(Debug, Clone)]
pub(crate) struct TursoTables {
db: Arc<Database>,
config: Config,
queues: Arc<TursoQueueTable>,
messages: Arc<TursoMessageTable>,
workers: Arc<TursoWorkerTable>,
db_state: Arc<TursoDbState>,
workflows: Arc<TursoWorkflowTable>,
workflow_runs: Arc<TursoRunRecordTable>,
workflow_steps: Arc<TursoStepRecordTable>,
}
impl TursoTables {
pub(crate) async fn new(dsn: &str, config: &Config) -> Result<Self> {
let path = BackendType::TURSO_PREFIXES
.iter()
.find_map(|prefix| dsn.strip_prefix(prefix))
.ok_or_else(|| crate::error::Error::InvalidConfig {
field: "dsn".to_string(),
message: "Unsupported DSN format: <redacted>".to_string(),
})?;
let builder = turso::Builder::new_local(path);
let db = builder
.build()
.await
.map_err(|e| crate::error::Error::Internal {
message: format!("Failed to connect to Turso: {}", e),
})?;
let db = Arc::new(db);
let conn = db.connect().map_err(|e| crate::error::Error::Internal {
message: format!("Failed to get connection: {}", e),
})?;
let mut rows = conn
.query("PRAGMA journal_mode=WAL;", ())
.await
.map_err(|e| crate::error::Error::Internal {
message: format!("Failed to set WAL mode: {}", e),
})?;
while rows
.next()
.await
.map_err(|e| crate::error::Error::Internal {
message: format!("Failed to consume WAL pragma result: {}", e),
})?
.is_some()
{}
conn.execute("PRAGMA busy_timeout = 5000;", ())
.await
.map_err(|e| crate::error::Error::Internal {
message: format!("Failed to set busy timeout: {}", e),
})?;
conn.execute("PRAGMA foreign_keys = ON;", ())
.await
.map_err(|e| crate::error::Error::Internal {
message: format!("Failed to set foreign_keys: {}", e),
})?;
Ok(Self {
db: Arc::clone(&db),
config: config.clone(),
queues: Arc::new(TursoQueueTable::new(Arc::clone(&db))),
messages: Arc::new(TursoMessageTable::new(Arc::clone(&db))),
workers: Arc::new(TursoWorkerTable::new(Arc::clone(&db))),
db_state: Arc::new(TursoDbState::new(Arc::clone(&db))),
workflows: Arc::new(TursoWorkflowTable::new(Arc::clone(&db))),
workflow_runs: Arc::new(TursoRunRecordTable::new(Arc::clone(&db))),
workflow_steps: Arc::new(TursoStepRecordTable::new(Arc::clone(&db))),
})
}
}
#[derive(Clone)]
pub struct TursoStore {
db: SerializedLock<TursoTables>,
tables: Tables<SerializedLock<TursoTables>>,
}
impl std::fmt::Debug for TursoStore {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("TursoStore").finish()
}
}
impl TursoStore {
pub async fn new(dsn: &str, config: &Config) -> Result<Self> {
let db = SerializedLock::new(TursoTables::new(dsn, config).await?);
let tables = Tables::new(db.clone());
Ok(Self { db, tables })
}
fn any_store(&self) -> crate::store::AnyStore {
crate::store::AnyStore::Turso(self.clone())
}
}
pub use crate::store::sqlite_utils::format_timestamp as format_turso_timestamp;
pub use crate::store::sqlite_utils::parse_timestamp as parse_turso_timestamp;
pub trait FromTursoRow: Sized {
fn from_row(row: &Row, idx: usize) -> Result<Self>;
}
impl FromTursoRow for i64 {
fn from_row(row: &Row, idx: usize) -> Result<Self> {
row.get(idx).map_err(|e| crate::error::Error::Internal {
message: e.to_string(),
})
}
}
impl FromTursoRow for String {
fn from_row(row: &Row, idx: usize) -> Result<Self> {
row.get(idx).map_err(|e| crate::error::Error::Internal {
message: e.to_string(),
})
}
}
impl FromTursoRow for bool {
fn from_row(row: &Row, idx: usize) -> Result<Self> {
let val: i64 = row.get(idx).map_err(|e| crate::error::Error::Internal {
message: e.to_string(),
})?;
Ok(val != 0)
}
}
pub struct TursoQueryBuilder {
sql: String,
params: Vec<turso::Value>,
}
pub async fn connect_db(db: &Database) -> Result<turso::Connection> {
let conn = db.connect().map_err(|e| crate::error::Error::Internal {
message: format!("Connect failed: {}", e),
})?;
conn.execute("PRAGMA busy_timeout = 5000;", ())
.await
.map_err(|e| crate::error::Error::Internal {
message: format!("Failed to set busy timeout: {}", e),
})?;
conn.execute("PRAGMA foreign_keys = ON;", ())
.await
.map_err(|e| crate::error::Error::Internal {
message: format!("Failed to set foreign_keys: {}", e),
})?;
Ok(conn)
}
impl TursoQueryBuilder {
pub fn new(sql: &str) -> Self {
Self {
sql: sql.to_string(),
params: Vec::new(),
}
}
pub fn bind<T>(mut self, value: T) -> Self
where
T: Into<turso::Value>,
{
self.params.push(value.into());
self
}
pub async fn execute(self, db: &Database) -> Result<u64> {
let conn = connect_db(db).await?;
self.execute_on_connection(&conn).await
}
pub async fn execute_once(self, db: &Database) -> Result<u64> {
let conn = connect_db(db).await?;
self.execute_once_on_connection(&conn).await
}
pub async fn execute_on_connection(self, conn: &turso::Connection) -> Result<u64> {
let mut retries = 0;
const MAX_RETRIES: u32 = 10;
let mut delay = 50u64; const MAX_DELAY: u64 = 5000;
loop {
let res = conn.execute(&self.sql, self.params.clone()).await;
match res {
Ok(count) => return Ok(count),
Err(e) => {
let msg = e.to_string();
let is_locked = msg.contains("database is locked")
|| msg.contains("SQLITE_BUSY")
|| msg.contains("snapshot is stale");
if is_locked && retries < MAX_RETRIES {
retries += 1;
let jitter = (std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.subsec_nanos()
% 20) as i64
- 10;
let jittered_delay = (delay as i64 + jitter).max(1) as u64;
tracing::warn!(
"Database locked, retrying {}/{} in {}ms: {}",
retries,
MAX_RETRIES,
jittered_delay,
self.sql
);
tokio::time::sleep(tokio::time::Duration::from_millis(jittered_delay))
.await;
delay = delay.saturating_mul(2).min(MAX_DELAY);
continue;
}
return Err(crate::error::Error::QueryFailed {
query: self.sql,
source: Box::new(e),
context: if is_locked {
"Execute on conn failed (locked)".into()
} else {
"Execute on conn failed".into()
},
});
}
}
}
}
pub async fn execute_once_on_connection(self, conn: &turso::Connection) -> Result<u64> {
conn.execute(&self.sql, self.params.clone())
.await
.map_err(|e| crate::error::Error::QueryFailed {
query: self.sql,
source: Box::new(e),
context: "Execute once failed (DML - no retry)".into(),
})
}
pub async fn fetch_all(self, db: &Database) -> Result<Vec<Row>> {
let conn = connect_db(db).await?;
self.fetch_all_on_connection(&conn).await
}
pub async fn fetch_all_once(self, db: &Database) -> Result<Vec<Row>> {
let conn = connect_db(db).await?;
self.fetch_all_once_on_connection(&conn).await
}
pub async fn fetch_all_on_connection(self, conn: &turso::Connection) -> Result<Vec<Row>> {
let mut retries = 0;
const MAX_RETRIES: u32 = 10;
let mut delay = 50u64; const MAX_DELAY: u64 = 5000;
loop {
let res = conn.query(&self.sql, self.params.clone()).await;
match res {
Ok(mut rows) => {
let mut result = Vec::new();
let mut loop_err = None;
loop {
match rows.next().await {
Ok(Some(row)) => result.push(row),
Ok(None) => break,
Err(e) => {
loop_err = Some(e);
break;
}
}
}
if let Some(e) = loop_err {
let msg = e.to_string();
let is_locked = msg.contains("database is locked")
|| msg.contains("SQLITE_BUSY")
|| msg.contains("snapshot is stale");
if is_locked && retries < MAX_RETRIES {
retries += 1;
let jitter = (std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.subsec_nanos()
% 20) as i64
- 10;
let jittered_delay = (delay as i64 + jitter).max(1) as u64;
tracing::warn!(
"Database locked during fetch, retrying {}/{} in {}ms: {}",
retries,
MAX_RETRIES,
jittered_delay,
self.sql
);
tokio::time::sleep(tokio::time::Duration::from_millis(jittered_delay))
.await;
delay = delay.saturating_mul(2).min(MAX_DELAY);
continue;
}
return Err(crate::error::Error::QueryFailed {
query: self.sql.clone(),
source: Box::new(e),
context: "Next row failed".into(),
});
}
return Ok(result);
}
Err(e) => {
let msg = e.to_string();
let is_locked = msg.contains("database is locked")
|| msg.contains("SQLITE_BUSY")
|| msg.contains("snapshot is stale");
if is_locked && retries < MAX_RETRIES {
retries += 1;
let jitter = (std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.subsec_nanos()
% 20) as i64
- 10;
let jittered_delay = (delay as i64 + jitter).max(1) as u64;
tracing::warn!(
"Database locked query start, retrying {}/{} in {}ms: {}",
retries,
MAX_RETRIES,
jittered_delay,
self.sql
);
tokio::time::sleep(tokio::time::Duration::from_millis(jittered_delay))
.await;
delay = delay.saturating_mul(2).min(MAX_DELAY);
continue;
}
return Err(crate::error::Error::QueryFailed {
query: self.sql.clone(),
source: Box::new(e),
context: "Query on conn failed".into(),
});
}
}
}
}
pub async fn fetch_all_once_on_connection(self, conn: &turso::Connection) -> Result<Vec<Row>> {
let mut rows = conn
.query(&self.sql, self.params.clone())
.await
.map_err(|e| crate::error::Error::QueryFailed {
query: self.sql.clone(),
source: Box::new(e),
context: "Query once failed (DML - no retry)".into(),
})?;
let mut result = Vec::new();
loop {
match rows.next().await {
Ok(Some(row)) => result.push(row),
Ok(None) => break,
Err(e) => {
return Err(crate::error::Error::QueryFailed {
query: self.sql.clone(),
source: Box::new(e),
context: "Fetch row once failed (DML - no retry)".into(),
})
}
}
}
Ok(result)
}
pub async fn fetch_one(self, db: &Database) -> Result<Row> {
let conn = connect_db(db).await?;
self.fetch_one_on_connection(&conn).await
}
pub async fn fetch_one_once(self, db: &Database) -> Result<Row> {
let conn = connect_db(db).await?;
self.fetch_one_once_on_connection(&conn).await
}
pub async fn fetch_one_on_connection(self, conn: &turso::Connection) -> Result<Row> {
let rows = self.fetch_all_on_connection(conn).await?;
if rows.is_empty() {
Err(crate::error::Error::NotFound {
entity: "Row".into(),
id: "None".into(),
})
} else {
Ok(rows.into_iter().next().unwrap())
}
}
pub async fn fetch_one_once_on_connection(self, conn: &turso::Connection) -> Result<Row> {
let rows = self.fetch_all_once_on_connection(conn).await?;
if rows.is_empty() {
Err(crate::error::Error::NotFound {
entity: "Row".into(),
id: "None".into(),
})
} else {
Ok(rows.into_iter().next().unwrap())
}
}
pub async fn fetch_optional(self, db: &Database) -> Result<Option<Row>> {
let conn = connect_db(db).await?;
self.fetch_optional_on_connection(&conn).await
}
pub async fn fetch_optional_once(self, db: &Database) -> Result<Option<Row>> {
let conn = connect_db(db).await?;
self.fetch_optional_once_on_connection(&conn).await
}
pub async fn fetch_optional_on_connection(
self,
conn: &turso::Connection,
) -> Result<Option<Row>> {
let rows = self.fetch_all_on_connection(conn).await?;
Ok(rows.into_iter().next())
}
pub async fn fetch_optional_once_on_connection(
self,
conn: &turso::Connection,
) -> Result<Option<Row>> {
let rows = self.fetch_all_once_on_connection(conn).await?;
Ok(rows.into_iter().next())
}
}
pub fn query(sql: &str) -> TursoQueryBuilder {
TursoQueryBuilder::new(sql)
}
pub struct GenericScalarBuilder {
builder: TursoQueryBuilder,
}
impl GenericScalarBuilder {
pub fn bind<V: Into<turso::Value>>(mut self, value: V) -> Self {
self.builder = self.builder.bind(value);
self
}
pub async fn fetch_one<T>(self, db: &Database) -> Result<T>
where
T: FromTursoRow,
{
let row = self.builder.fetch_one(db).await?;
T::from_row(&row, 0)
}
pub async fn fetch_one_once<T>(self, db: &Database) -> Result<T>
where
T: FromTursoRow,
{
let row = self.builder.fetch_one_once(db).await?;
T::from_row(&row, 0)
}
pub async fn fetch_optional<T>(self, db: &Database) -> Result<Option<T>>
where
T: FromTursoRow,
{
let row = self.builder.fetch_optional(db).await?;
if let Some(r) = row {
Ok(Some(T::from_row(&r, 0)?))
} else {
Ok(None)
}
}
pub async fn fetch_optional_once<T>(self, db: &Database) -> Result<Option<T>>
where
T: FromTursoRow,
{
let row = self.builder.fetch_optional_once(db).await?;
if let Some(r) = row {
Ok(Some(T::from_row(&r, 0)?))
} else {
Ok(None)
}
}
pub async fn fetch_optional_on_connection<T>(
self,
conn: &turso::Connection,
) -> Result<Option<T>>
where
T: FromTursoRow,
{
let row = self.builder.fetch_optional_on_connection(conn).await?;
if let Some(r) = row {
Ok(Some(T::from_row(&r, 0)?))
} else {
Ok(None)
}
}
}
pub fn query_scalar(sql: &str) -> GenericScalarBuilder {
GenericScalarBuilder {
builder: TursoQueryBuilder::new(sql),
}
}
#[async_trait]
impl DbTables for TursoTables {
async fn execute_raw(&self, sql: &str) -> Result<()> {
query(sql).execute_once(&self.db).await?;
Ok(())
}
async fn execute_raw_with_i64(&self, sql: &str, param: i64) -> Result<()> {
query(sql).bind(param).execute_once(&self.db).await?;
Ok(())
}
async fn execute_raw_with_two_i64(&self, sql: &str, param1: i64, param2: i64) -> Result<()> {
query(sql)
.bind(param1)
.bind(param2)
.execute_once(&self.db)
.await?;
Ok(())
}
async fn query_int(&self, sql: &str) -> Result<i64> {
query_scalar(sql).fetch_one(&self.db).await
}
async fn query_string(&self, sql: &str) -> Result<String> {
query_scalar(sql).fetch_one(&self.db).await
}
async fn query_bool(&self, sql: &str) -> Result<bool> {
query_scalar(sql).fetch_one(&self.db).await
}
fn config(&self) -> &Config {
&self.config
}
fn concurrency_model(&self) -> ConcurrencyModel {
ConcurrencyModel::SingleProcess
}
fn queues(&self) -> &dyn QueueTable {
self.queues.as_ref()
}
fn messages(&self) -> &dyn MessageTable {
self.messages.as_ref()
}
fn workers(&self) -> &dyn WorkerTable {
self.workers.as_ref()
}
fn db_state(&self) -> &dyn DbStateTable {
self.db_state.as_ref()
}
fn workflows(&self) -> &dyn WorkflowTable {
self.workflows.as_ref()
}
fn workflow_runs(&self) -> &dyn RunRecordTable {
self.workflow_runs.as_ref()
}
fn workflow_steps(&self) -> &dyn StepRecordTable {
self.workflow_steps.as_ref()
}
async fn bootstrap(&self) -> Result<()> {
let conn = connect_db(&self.db).await?;
let scripts = [
(
"00_create_schema_version",
include_str!("../../../migrations/turso/00_create_schema_version.sql"),
),
(
"01_create_queues",
include_str!("../../../migrations/turso/01_create_queues.sql"),
),
(
"02_create_workers",
include_str!("../../../migrations/turso/02_create_workers.sql"),
),
(
"03_create_messages",
include_str!("../../../migrations/turso/03_create_messages.sql"),
),
(
"05_create_workflows",
include_str!("../../../migrations/turso/05_create_workflows.sql"),
),
];
for (name, script) in scripts {
if name != "00_create_schema_version" {
let applied: Option<i64> = crate::store::turso::query_scalar(
"SELECT 1 FROM pgqrs_schema_version WHERE version = ?",
)
.bind(name.to_string())
.fetch_optional_on_connection(&conn)
.await?;
if applied.is_some() {
continue;
}
}
for statement in script.split(';') {
let s = statement.trim();
if !s.is_empty() {
conn.execute(s, ())
.await
.map_err(|e| crate::error::Error::Internal {
message: format!("Bootstrap failed on {}: {}", name, e),
})?;
}
}
if name != "00_create_schema_version" {
let sql = "INSERT OR IGNORE INTO pgqrs_schema_version (version, applied_at, description) VALUES (?, datetime('now'), ?)";
conn.execute(sql, (name, format!("Applied {}", name)))
.await
.map_err(|e| crate::error::Error::Internal {
message: format!("Failed to record migration {}: {}", name, e),
})?;
}
}
Ok(())
}
}
#[async_trait]
impl Store for TursoStore {
async fn execute_raw(&self, sql: &str) -> Result<()> {
let sql = sql.to_string();
self.db
.with_write(|db| Box::pin(async move { db.execute_raw(&sql).await }))
.await
}
async fn execute_raw_with_i64(&self, sql: &str, param: i64) -> Result<()> {
let sql = sql.to_string();
self.db
.with_write(|db| Box::pin(async move { db.execute_raw_with_i64(&sql, param).await }))
.await
}
async fn execute_raw_with_two_i64(&self, sql: &str, param1: i64, param2: i64) -> Result<()> {
let sql = sql.to_string();
self.db
.with_write(|db| {
Box::pin(async move { db.execute_raw_with_two_i64(&sql, param1, param2).await })
})
.await
}
async fn query_int(&self, sql: &str) -> Result<i64> {
let sql = sql.to_string();
self.db
.with_read(|db| Box::pin(async move { db.query_int(&sql).await }))
.await
}
async fn query_string(&self, sql: &str) -> Result<String> {
let sql = sql.to_string();
self.db
.with_read(|db| Box::pin(async move { db.query_string(&sql).await }))
.await
}
async fn query_bool(&self, sql: &str) -> Result<bool> {
let sql = sql.to_string();
self.db
.with_read(|db| Box::pin(async move { db.query_bool(&sql).await }))
.await
}
fn config(&self) -> &Config {
self.db.config()
}
fn queues(&self) -> &dyn QueueTable {
&self.tables
}
fn messages(&self) -> &dyn MessageTable {
&self.tables
}
fn workers(&self) -> &dyn WorkerTable {
&self.tables
}
fn db_state(&self) -> &dyn DbStateTable {
&self.tables
}
fn workflows(&self) -> &dyn WorkflowTable {
&self.tables
}
fn workflow_runs(&self) -> &dyn RunRecordTable {
&self.tables
}
fn workflow_steps(&self) -> &dyn StepRecordTable {
&self.tables
}
async fn bootstrap(&self) -> Result<()> {
self.db
.with_write(|db| Box::pin(async move { db.bootstrap().await }))
.await
}
async fn admin(&self, name: &str, config: &Config) -> Result<crate::workers::Admin> {
let _ = config;
crate::workers::Admin::new(self.any_store(), name).await
}
async fn admin_ephemeral(&self, config: &Config) -> Result<crate::workers::Admin> {
let _ = config;
crate::workers::Admin::new_ephemeral(self.any_store()).await
}
async fn producer(
&self,
queue_name: &str,
name: &str,
config: &Config,
) -> Result<crate::workers::Producer> {
let queue_info = QueueTable::get_by_name(&self.tables, queue_name).await?;
let worker_record = WorkerTable::register(&self.tables, Some(queue_info.id), name).await?;
Ok(crate::workers::Producer::new(
self.any_store(),
queue_info,
worker_record,
config.validation_config.clone(),
))
}
async fn consumer(
&self,
queue_name: &str,
name: &str,
config: &Config,
) -> Result<crate::workers::Consumer> {
let _ = config;
let queue_info = QueueTable::get_by_name(&self.tables, queue_name).await?;
let worker_record = WorkerTable::register(&self.tables, Some(queue_info.id), name).await?;
Ok(crate::workers::Consumer::new(
self.any_store(),
queue_info,
worker_record,
))
}
async fn queue(&self, name: &str) -> Result<crate::types::QueueRecord> {
let queue_exists = QueueTable::exists(&self.tables, name).await?;
if queue_exists {
return Err(crate::error::Error::QueueAlreadyExists {
name: name.to_string(),
});
}
QueueTable::insert(
&self.tables,
NewQueueRecord {
queue_name: name.to_string(),
},
)
.await
}
async fn producer_ephemeral(
&self,
queue_name: &str,
config: &Config,
) -> Result<crate::workers::Producer> {
let queue_info = QueueTable::get_by_name(&self.tables, queue_name).await?;
let worker_record =
WorkerTable::register_ephemeral(&self.tables, Some(queue_info.id)).await?;
Ok(crate::workers::Producer::new(
self.any_store(),
queue_info,
worker_record,
config.validation_config.clone(),
))
}
async fn consumer_ephemeral(
&self,
queue_name: &str,
config: &Config,
) -> Result<crate::workers::Consumer> {
let _ = config;
let queue_info = QueueTable::get_by_name(&self.tables, queue_name).await?;
let worker_record =
WorkerTable::register_ephemeral(&self.tables, Some(queue_info.id)).await?;
Ok(crate::workers::Consumer::new(
self.any_store(),
queue_info,
worker_record,
))
}
async fn workflow(&self, name: &str) -> Result<crate::types::WorkflowRecord> {
let queue_exists = QueueTable::exists(&self.tables, name).await?;
if !queue_exists {
let _queue = QueueTable::insert(
&self.tables,
NewQueueRecord {
queue_name: name.to_string(),
},
)
.await?;
}
let queue = QueueTable::get_by_name(&self.tables, name).await?;
let workflow_record = WorkflowTable::insert(
&self.tables,
crate::types::NewWorkflowRecord {
name: name.to_string(),
queue_id: queue.id,
},
)
.await
.map_err(|e| {
let msg = e.to_string();
if msg.contains("UNIQUE constraint failed") || msg.contains("constraint failed") {
return crate::error::Error::WorkflowAlreadyExists {
name: name.to_string(),
};
}
e
})?;
Ok(workflow_record)
}
async fn run(&self, message: crate::types::QueueMessage) -> Result<crate::workers::Run> {
match RunRecordTable::get_by_message_id(&self.tables, message.id).await {
Ok(record) => {
return Ok(crate::workers::Run::new(self.any_store(), record));
}
Err(crate::error::Error::NotFound { .. }) => {
}
Err(e) => return Err(e),
}
let queue = QueueTable::get(&self.tables, message.queue_id).await?;
let workflow = WorkflowTable::get_by_name(&self.tables, &queue.queue_name).await?;
let run_rec = RunRecordTable::insert(
&self.tables,
crate::types::NewRunRecord {
workflow_id: workflow.id,
message_id: message.id,
input: Some(message.payload.clone()),
},
)
.await?;
Ok(crate::workers::Run::new(self.any_store(), run_rec))
}
async fn worker(&self, id: i64) -> Result<Box<dyn Worker>> {
let worker_record = WorkerTable::get(&self.tables, id).await?;
Ok(Box::new(crate::workers::WorkerHandle::new(
self.any_store(),
worker_record,
)))
}
fn concurrency_model(&self) -> ConcurrencyModel {
self.db.concurrency_model()
}
fn backend_name(&self) -> &'static str {
"turso"
}
}