obeli_sk_db_sqlite/
sqlite_dao.rs

1use crate::histograms::Histograms;
2use async_trait::async_trait;
3use chrono::{DateTime, Utc};
4use concepts::{
5    ComponentId, ComponentRetryConfig, ExecutionId, FunctionFqn, JoinSetId, StrVariant,
6    SupportedFunctionReturnValue,
7    prefixed_ulid::{DelayId, ExecutionIdDerived, ExecutorId, RunId},
8    storage::{
9        AppendBatchResponse, AppendEventsToExecution, AppendRequest, AppendResponse,
10        AppendResponseToExecution, BacktraceFilter, BacktraceInfo, CreateRequest, DUMMY_CREATED,
11        DUMMY_HISTORY_EVENT, DbConnection, DbErrorGeneric, DbErrorRead, DbErrorReadWithTimeout,
12        DbErrorWrite, DbErrorWriteNonRetriable, DbExecutor, DbPool, DbPoolCloseable,
13        ExecutionEvent, ExecutionEventInner, ExecutionListPagination, ExecutionWithState,
14        ExpiredDelay, ExpiredLock, ExpiredTimer, HistoryEvent, JoinSetRequest, JoinSetResponse,
15        JoinSetResponseEvent, JoinSetResponseEventOuter, LockPendingResponse, Locked, LockedBy,
16        LockedExecution, Pagination, PendingState, PendingStateFinished,
17        PendingStateFinishedResultKind, PendingStateLocked, ResponseWithCursor, Version,
18        VersionType,
19    },
20};
21use conversions::{FromStrWrapper, JsonWrapper, consistency_db_err, consistency_rusqlite};
22use hashbrown::HashMap;
23use rusqlite::{
24    CachedStatement, Connection, OpenFlags, OptionalExtension, Params, ToSql, Transaction,
25    TransactionBehavior, named_params, types::ToSqlOutput,
26};
27use std::{
28    cmp::max,
29    collections::VecDeque,
30    fmt::Debug,
31    ops::DerefMut,
32    path::Path,
33    str::FromStr,
34    sync::{
35        Arc, Mutex,
36        atomic::{AtomicBool, Ordering},
37    },
38    time::Duration,
39};
40use std::{fmt::Write as _, pin::Pin};
41use tokio::{
42    sync::{mpsc, oneshot},
43    time::Instant,
44};
45use tracing::{Level, debug, error, info, instrument, trace, warn};
46
47#[derive(Debug, thiserror::Error)]
48#[error("initialization error")]
49pub struct InitializationError;
50
51#[derive(Debug, Clone)]
52struct DelayReq {
53    join_set_id: JoinSetId,
54    delay_id: DelayId,
55    expires_at: DateTime<Utc>,
56}
57/*
58mmap_size = 128MB - Set the global memory map so all processes can share some data
59https://www.sqlite.org/pragma.html#pragma_mmap_size
60https://www.sqlite.org/mmap.html
61
62journal_size_limit = 64 MB - limit on the WAL file to prevent unlimited growth
63https://www.sqlite.org/pragma.html#pragma_journal_size_limit
64
65Inspired by https://github.com/rails/rails/pull/49349
66*/
67
68const PRAGMA: [[&str; 2]; 10] = [
69    ["journal_mode", "wal"],
70    ["synchronous", "FULL"],
71    ["foreign_keys", "true"],
72    ["busy_timeout", "1000"],
73    ["cache_size", "10000"], // number of pages
74    ["temp_store", "MEMORY"],
75    ["page_size", "8192"], // 8 KB
76    ["mmap_size", "134217728"],
77    ["journal_size_limit", "67108864"],
78    ["integrity_check", ""],
79];
80
81// Append only
82const CREATE_TABLE_T_METADATA: &str = r"
83CREATE TABLE IF NOT EXISTS t_metadata (
84    id INTEGER PRIMARY KEY AUTOINCREMENT,
85    schema_version INTEGER NOT NULL,
86    created_at TEXT NOT NULL
87) STRICT
88";
89const T_METADATA_EXPECTED_SCHEMA_VERSION: u32 = 2;
90
91/// Stores execution history. Append only.
92const CREATE_TABLE_T_EXECUTION_LOG: &str = r"
93CREATE TABLE IF NOT EXISTS t_execution_log (
94    execution_id TEXT NOT NULL,
95    created_at TEXT NOT NULL,
96    json_value TEXT NOT NULL,
97    version INTEGER NOT NULL,
98    variant TEXT NOT NULL,
99    join_set_id TEXT,
100    history_event_type TEXT GENERATED ALWAYS AS (json_value->>'$.HistoryEvent.event.type') STORED,
101    PRIMARY KEY (execution_id, version)
102) STRICT
103";
104// Used in `fetch_created` and `get_execution_event`
105const CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_VERSION: &str = r"
106CREATE INDEX IF NOT EXISTS idx_t_execution_log_execution_id_version  ON t_execution_log (execution_id, version);
107";
108// Used in `lock_inner` to filter by execution ID and variant (created or event history)
109const CREATE_INDEX_IDX_T_EXECUTION_ID_EXECUTION_ID_VARIANT: &str = r"
110CREATE INDEX IF NOT EXISTS idx_t_execution_log_execution_id_variant  ON t_execution_log (execution_id, variant);
111";
112
113/// Stores child execution return values for the parent (`execution_id`). Append only.
114/// For `JoinSetResponse::DelayFinished`, column `delay_id` must not be null.
115/// For `JoinSetResponse::ChildExecutionFinished`, column `child_execution_id`,`finished_version`
116/// must not be null.
117const CREATE_TABLE_T_JOIN_SET_RESPONSE: &str = r"
118CREATE TABLE IF NOT EXISTS t_join_set_response (
119    id INTEGER PRIMARY KEY AUTOINCREMENT,
120    created_at TEXT NOT NULL,
121    execution_id TEXT NOT NULL,
122    join_set_id TEXT NOT NULL,
123
124    delay_id TEXT,
125
126    child_execution_id TEXT,
127    finished_version INTEGER,
128
129    UNIQUE (execution_id, join_set_id, delay_id, child_execution_id)
130) STRICT
131";
132// Used when querying for the next response
133const CREATE_INDEX_IDX_T_JOIN_SET_RESPONSE_EXECUTION_ID_ID: &str = r"
134CREATE INDEX IF NOT EXISTS idx_t_join_set_response_execution_id_id ON t_join_set_response (execution_id, id);
135";
136
137/// Stores executions in `PendingState`
138/// `state` to column mapping:
139/// `PendingAt`:            (nothing but required columns), optionally contains all `Locked` columns.
140/// `Locked`:               `max_retries`, `retry_exp_backoff_millis`, `last_lock_version`, `executor_id`, `run_id`
141/// `BlockedByJoinSet`:     `join_set_id`, `join_set_closing`
142/// `Finished` :            `result_kind`.
143const CREATE_TABLE_T_STATE: &str = r"
144CREATE TABLE IF NOT EXISTS t_state (
145    execution_id TEXT NOT NULL,
146    is_top_level INTEGER NOT NULL,
147    corresponding_version INTEGER NOT NULL,
148    pending_expires_finished TEXT NOT NULL,
149    ffqn TEXT NOT NULL,
150    state TEXT NOT NULL,
151    created_at TEXT NOT NULL,
152    updated_at TEXT NOT NULL,
153    scheduled_at TEXT NOT NULL,
154    intermittent_event_count INTEGER NOT NULL,
155
156    max_retries INTEGER,
157    retry_exp_backoff_millis INTEGER,
158    last_lock_version INTEGER,
159    executor_id TEXT,
160    run_id TEXT,
161
162    join_set_id TEXT,
163    join_set_closing INTEGER,
164
165    result_kind TEXT,
166
167    PRIMARY KEY (execution_id)
168) STRICT
169";
170const STATE_PENDING_AT: &str = "PendingAt";
171const STATE_BLOCKED_BY_JOIN_SET: &str = "BlockedByJoinSet";
172const STATE_LOCKED: &str = "Locked";
173const STATE_FINISHED: &str = "Finished";
174const HISTORY_EVENT_TYPE_JOIN_NEXT: &str = "JoinNext";
175
176// TODO: partial indexes
177const IDX_T_STATE_LOCK_PENDING: &str = r"
178CREATE INDEX IF NOT EXISTS idx_t_state_lock_pending ON t_state (state, pending_expires_finished, ffqn);
179";
180const IDX_T_STATE_EXPIRED_TIMERS: &str = r"
181CREATE INDEX IF NOT EXISTS idx_t_state_expired_timers ON t_state (pending_expires_finished) WHERE executor_id IS NOT NULL;
182";
183const IDX_T_STATE_EXECUTION_ID_IS_TOP_LEVEL: &str = r"
184CREATE INDEX IF NOT EXISTS idx_t_state_execution_id_is_root ON t_state (execution_id, is_top_level);
185";
186// For `list_executions` by ffqn
187const IDX_T_STATE_FFQN: &str = r"
188CREATE INDEX IF NOT EXISTS idx_t_state_ffqn ON t_state (ffqn);
189";
190// For `list_executions` by creation date
191const IDX_T_STATE_CREATED_AT: &str = r"
192CREATE INDEX IF NOT EXISTS idx_t_state_created_at ON t_state (created_at);
193";
194
195/// Represents [`ExpiredTimer::AsyncDelay`] . Rows are deleted when the delay is processed.
196const CREATE_TABLE_T_DELAY: &str = r"
197CREATE TABLE IF NOT EXISTS t_delay (
198    execution_id TEXT NOT NULL,
199    join_set_id TEXT NOT NULL,
200    delay_id TEXT NOT NULL,
201    expires_at TEXT NOT NULL,
202    PRIMARY KEY (execution_id, join_set_id, delay_id)
203) STRICT
204";
205
206// Append only.
207const CREATE_TABLE_T_BACKTRACE: &str = r"
208CREATE TABLE IF NOT EXISTS t_backtrace (
209    execution_id TEXT NOT NULL,
210    component_id TEXT NOT NULL,
211    version_min_including INTEGER NOT NULL,
212    version_max_excluding INTEGER NOT NULL,
213    wasm_backtrace TEXT NOT NULL,
214    PRIMARY KEY (execution_id, version_min_including, version_max_excluding)
215) STRICT
216";
217// Index for searching backtraces by execution_id and version
218const IDX_T_BACKTRACE_EXECUTION_ID_VERSION: &str = r"
219CREATE INDEX IF NOT EXISTS idx_t_backtrace_execution_id_version ON t_backtrace (execution_id, version_min_including, version_max_excluding);
220";
221
222#[derive(Debug, thiserror::Error, Clone)]
223enum RusqliteError {
224    #[error("not found")]
225    NotFound,
226    #[error("generic: {0}")]
227    Generic(StrVariant),
228}
229
230mod conversions {
231
232    use super::RusqliteError;
233    use concepts::storage::{DbErrorGeneric, DbErrorRead, DbErrorReadWithTimeout, DbErrorWrite};
234    use rusqlite::types::{FromSql, FromSqlError};
235    use std::{fmt::Debug, str::FromStr};
236    use tracing::error;
237
238    impl From<rusqlite::Error> for RusqliteError {
239        fn from(err: rusqlite::Error) -> Self {
240            if matches!(err, rusqlite::Error::QueryReturnedNoRows) {
241                RusqliteError::NotFound
242            } else {
243                error!(backtrace = %std::backtrace::Backtrace::capture(), "Sqlite error {err:?}");
244                RusqliteError::Generic(err.to_string().into())
245            }
246        }
247    }
248
249    impl From<RusqliteError> for DbErrorGeneric {
250        fn from(err: RusqliteError) -> DbErrorGeneric {
251            match err {
252                RusqliteError::NotFound => DbErrorGeneric::Uncategorized("not found".into()),
253                RusqliteError::Generic(str) => DbErrorGeneric::Uncategorized(str),
254            }
255        }
256    }
257    impl From<RusqliteError> for DbErrorRead {
258        fn from(err: RusqliteError) -> Self {
259            if matches!(err, RusqliteError::NotFound) {
260                Self::NotFound
261            } else {
262                Self::from(DbErrorGeneric::from(err))
263            }
264        }
265    }
266    impl From<RusqliteError> for DbErrorReadWithTimeout {
267        fn from(err: RusqliteError) -> Self {
268            Self::from(DbErrorRead::from(err))
269        }
270    }
271    impl From<RusqliteError> for DbErrorWrite {
272        fn from(err: RusqliteError) -> Self {
273            if matches!(err, RusqliteError::NotFound) {
274                Self::NotFound
275            } else {
276                Self::from(DbErrorGeneric::from(err))
277            }
278        }
279    }
280
281    pub(crate) struct JsonWrapper<T>(pub(crate) T);
282    impl<T: serde::de::DeserializeOwned + 'static + Debug> FromSql for JsonWrapper<T> {
283        fn column_result(
284            value: rusqlite::types::ValueRef<'_>,
285        ) -> rusqlite::types::FromSqlResult<Self> {
286            let value = match value {
287                rusqlite::types::ValueRef::Text(value) | rusqlite::types::ValueRef::Blob(value) => {
288                    Ok(value)
289                }
290                other => {
291                    error!(
292                        backtrace = %std::backtrace::Backtrace::capture(),
293                        "Unexpected type when conveting to JSON - expected Text or Blob, got type `{other:?}`",
294                    );
295                    Err(FromSqlError::InvalidType)
296                }
297            }?;
298            let value = serde_json::from_slice::<T>(value).map_err(|err| {
299                error!(
300                    backtrace = %std::backtrace::Backtrace::capture(),
301                    "Cannot convert JSON value `{value:?}` to type:`{type}` - {err:?}",
302                    r#type = std::any::type_name::<T>()
303                );
304                FromSqlError::InvalidType
305            })?;
306            Ok(Self(value))
307        }
308    }
309
310    pub(crate) struct FromStrWrapper<T: FromStr>(pub(crate) T);
311    impl<T: FromStr<Err = D>, D: Debug> FromSql for FromStrWrapper<T> {
312        fn column_result(
313            value: rusqlite::types::ValueRef<'_>,
314        ) -> rusqlite::types::FromSqlResult<Self> {
315            let value = String::column_result(value)?;
316            let value = T::from_str(&value).map_err(|err| {
317                error!(
318                    backtrace = %std::backtrace::Backtrace::capture(),
319                    "Cannot convert string `{value}` to type:`{type}` - {err:?}",
320                    r#type = std::any::type_name::<T>()
321                );
322                FromSqlError::InvalidType
323            })?;
324            Ok(Self(value))
325        }
326    }
327
328    // Used as a wrapper for `FromSqlError::Other`
329    #[derive(Debug, thiserror::Error)]
330    #[error("{0}")]
331    pub(crate) struct OtherError(&'static str);
332    pub(crate) fn consistency_rusqlite(input: &'static str) -> rusqlite::Error {
333        FromSqlError::other(OtherError(input)).into()
334    }
335
336    pub(crate) fn consistency_db_err(input: &'static str) -> DbErrorGeneric {
337        DbErrorGeneric::Uncategorized(input.into())
338    }
339}
340
341#[derive(Debug)]
342struct CombinedStateDTO {
343    state: String,
344    ffqn: String,
345    pending_expires_finished: DateTime<Utc>,
346    // Locked:
347    last_lock_version: Option<Version>,
348    executor_id: Option<ExecutorId>,
349    run_id: Option<RunId>,
350    // Blocked by join set:
351    join_set_id: Option<JoinSetId>,
352    join_set_closing: Option<bool>,
353    // Finished:
354    result_kind: Option<PendingStateFinishedResultKind>,
355}
356#[derive(Debug)]
357struct CombinedState {
358    ffqn: FunctionFqn,
359    pending_state: PendingState,
360    corresponding_version: Version,
361}
362impl CombinedState {
363    fn get_next_version_assert_not_finished(&self) -> Version {
364        assert!(!self.pending_state.is_finished());
365        self.corresponding_version.increment()
366    }
367
368    #[cfg(feature = "test")]
369    fn get_next_version_or_finished(&self) -> Version {
370        if self.pending_state.is_finished() {
371            self.corresponding_version.clone()
372        } else {
373            self.corresponding_version.increment()
374        }
375    }
376}
377
378#[derive(Debug)]
379struct NotifierPendingAt {
380    scheduled_at: DateTime<Utc>,
381    ffqn: FunctionFqn,
382}
383
384#[derive(Debug)]
385struct NotifierExecutionFinished {
386    execution_id: ExecutionId,
387    retval: SupportedFunctionReturnValue,
388}
389
390#[derive(Debug, Default)]
391struct AppendNotifier {
392    pending_at: Option<NotifierPendingAt>,
393    execution_finished: Option<NotifierExecutionFinished>,
394    response: Option<(ExecutionId, JoinSetResponseEventOuter)>,
395}
396
397impl CombinedState {
398    fn new(
399        dto: &CombinedStateDTO,
400        corresponding_version: Version,
401    ) -> Result<Self, rusqlite::Error> {
402        let pending_state = match dto {
403            // Pending - just created
404            CombinedStateDTO {
405                state,
406                ffqn: _,
407                pending_expires_finished: scheduled_at,
408                last_lock_version: None,
409                executor_id: None,
410                run_id: None,
411                join_set_id: None,
412                join_set_closing: None,
413                result_kind: None,
414            } if state == STATE_PENDING_AT => Ok(PendingState::PendingAt {
415                scheduled_at: *scheduled_at,
416                last_lock: None,
417            }),
418            // Pending, previously locked
419            CombinedStateDTO {
420                state,
421                ffqn: _,
422                pending_expires_finished: scheduled_at,
423                last_lock_version: None,
424                executor_id: Some(executor_id),
425                run_id: Some(run_id),
426                join_set_id: None,
427                join_set_closing: None,
428                result_kind: None,
429            } if state == STATE_PENDING_AT => Ok(PendingState::PendingAt {
430                scheduled_at: *scheduled_at,
431                last_lock: Some(LockedBy {
432                    executor_id: *executor_id,
433                    run_id: *run_id,
434                }),
435            }),
436            CombinedStateDTO {
437                state,
438                ffqn: _,
439                pending_expires_finished: lock_expires_at,
440                last_lock_version: Some(_),
441                executor_id: Some(executor_id),
442                run_id: Some(run_id),
443                join_set_id: None,
444                join_set_closing: None,
445                result_kind: None,
446            } if state == STATE_LOCKED => Ok(PendingState::Locked(PendingStateLocked {
447                locked_by: LockedBy {
448                    executor_id: *executor_id,
449                    run_id: *run_id,
450                },
451                lock_expires_at: *lock_expires_at,
452            })),
453            CombinedStateDTO {
454                state,
455                ffqn: _,
456                pending_expires_finished: lock_expires_at,
457                last_lock_version: None,
458                executor_id: _,
459                run_id: _,
460                join_set_id: Some(join_set_id),
461                join_set_closing: Some(join_set_closing),
462                result_kind: None,
463            } if state == STATE_BLOCKED_BY_JOIN_SET => Ok(PendingState::BlockedByJoinSet {
464                join_set_id: join_set_id.clone(),
465                closing: *join_set_closing,
466                lock_expires_at: *lock_expires_at,
467            }),
468            CombinedStateDTO {
469                state,
470                ffqn: _,
471                pending_expires_finished: finished_at,
472                last_lock_version: None,
473                executor_id: None,
474                run_id: None,
475                join_set_id: None,
476                join_set_closing: None,
477                result_kind: Some(result_kind),
478            } if state == STATE_FINISHED => Ok(PendingState::Finished {
479                finished: PendingStateFinished {
480                    finished_at: *finished_at,
481                    version: corresponding_version.0,
482                    result_kind: *result_kind,
483                },
484            }),
485            _ => {
486                error!("Cannot deserialize pending state from  {dto:?}");
487                Err(consistency_rusqlite("invalid `t_state`"))
488            }
489        }?;
490        Ok(Self {
491            ffqn: FunctionFqn::from_str(&dto.ffqn).map_err(|parse_err| {
492                error!("Error parsing ffqn of {dto:?} - {parse_err:?}");
493                consistency_rusqlite("invalid ffqn value in `t_state`")
494            })?,
495            pending_state,
496            corresponding_version,
497        })
498    }
499}
500
501#[derive(derive_more::Debug)]
502struct LogicalTx {
503    #[debug(skip)]
504    func: Box<dyn FnMut(&mut Transaction) + Send>,
505    sent_at: Instant,
506    func_name: &'static str,
507    #[debug(skip)]
508    commit_ack_sender: oneshot::Sender<Result<(), RusqliteError>>,
509}
510
511#[derive(derive_more::Debug)]
512enum ThreadCommand {
513    LogicalTx(LogicalTx),
514    Shutdown,
515}
516
517#[derive(Clone)]
518pub struct SqlitePool(SqlitePoolInner);
519
520type ResponseSubscribers =
521    Arc<Mutex<HashMap<ExecutionId, (oneshot::Sender<JoinSetResponseEventOuter>, u64)>>>;
522type PendingFfqnSubscribers = Arc<Mutex<HashMap<FunctionFqn, (mpsc::Sender<()>, u64)>>>;
523type ExecutionFinishedSubscribers =
524    Mutex<HashMap<ExecutionId, HashMap<u64, oneshot::Sender<SupportedFunctionReturnValue>>>>;
525#[derive(Clone)]
526struct SqlitePoolInner {
527    shutdown_requested: Arc<AtomicBool>,
528    shutdown_finished: Arc<AtomicBool>,
529    command_tx: tokio::sync::mpsc::Sender<ThreadCommand>,
530    response_subscribers: ResponseSubscribers,
531    pending_ffqn_subscribers: PendingFfqnSubscribers,
532    execution_finished_subscribers: Arc<ExecutionFinishedSubscribers>,
533    join_handle: Option<Arc<std::thread::JoinHandle<()>>>, // always Some, Optional for swapping in drop.
534}
535
536#[async_trait]
537impl DbPoolCloseable for SqlitePool {
538    async fn close(self) {
539        debug!("Sqlite is closing");
540        self.0.shutdown_requested.store(true, Ordering::Release);
541        // Unblock the thread's blocking_recv. If the capacity is reached, the next processed message will trigger shutdown.
542        let _ = self.0.command_tx.try_send(ThreadCommand::Shutdown);
543        while !self.0.shutdown_finished.load(Ordering::Acquire) {
544            tokio::time::sleep(Duration::from_millis(1)).await;
545        }
546        debug!("Sqlite was closed");
547    }
548}
549
550#[async_trait]
551impl DbPool for SqlitePool {
552    fn connection(&self) -> Box<dyn DbConnection> {
553        Box::new(self.clone())
554    }
555}
556impl Drop for SqlitePool {
557    fn drop(&mut self) {
558        let arc = self.0.join_handle.take().expect("join_handle was set");
559        if let Ok(join_handle) = Arc::try_unwrap(arc) {
560            // Last holder
561            if !join_handle.is_finished() {
562                if !self.0.shutdown_finished.load(Ordering::Acquire) {
563                    // Best effort to shut down the sqlite thread.
564                    let backtrace = std::backtrace::Backtrace::capture();
565                    warn!("SqlitePool was not closed properly - {backtrace}");
566                    self.0.shutdown_requested.store(true, Ordering::Release);
567                    // Unblock the thread's blocking_recv. If the capacity is reached, the next processed message will trigger shutdown.
568                    let _ = self.0.command_tx.try_send(ThreadCommand::Shutdown);
569                    // Not joining the thread, drop might be called from async context.
570                    // We are shutting down the server anyway.
571                } else {
572                    // The thread set `shutdown_finished` as its last operation.
573                }
574            }
575        }
576    }
577}
578
579#[derive(Debug, Clone)]
580pub struct SqliteConfig {
581    pub queue_capacity: usize,
582    pub pragma_override: Option<HashMap<String, String>>,
583    pub metrics_threshold: Option<Duration>,
584}
585impl Default for SqliteConfig {
586    fn default() -> Self {
587        Self {
588            queue_capacity: 100,
589            pragma_override: None,
590            metrics_threshold: None,
591        }
592    }
593}
594
595impl SqlitePool {
596    fn init_thread(
597        path: &Path,
598        mut pragma_override: HashMap<String, String>,
599    ) -> Result<Connection, InitializationError> {
600        fn conn_execute<P: Params>(
601            conn: &Connection,
602            sql: &str,
603            params: P,
604        ) -> Result<(), InitializationError> {
605            conn.execute(sql, params).map(|_| ()).map_err(|err| {
606                error!("Cannot run `{sql}` - {err:?}");
607                InitializationError
608            })
609        }
610        fn pragma_update(
611            conn: &Connection,
612            name: &str,
613            value: &str,
614        ) -> Result<(), InitializationError> {
615            if value.is_empty() {
616                debug!("Querying PRAGMA {name}");
617                conn.pragma_query(None, name, |row| {
618                    debug!("{row:?}");
619                    Ok(())
620                })
621                .map_err(|err| {
622                    error!("cannot update pragma `{name}`=`{value}` - {err:?}");
623                    InitializationError
624                })
625            } else {
626                debug!("Setting PRAGMA {name}={value}");
627                conn.pragma_update(None, name, value).map_err(|err| {
628                    error!("cannot update pragma `{name}`=`{value}` - {err:?}");
629                    InitializationError
630                })
631            }
632        }
633
634        let conn = Connection::open_with_flags(path, OpenFlags::default()).map_err(|err| {
635            error!("cannot open the connection - {err:?}");
636            InitializationError
637        })?;
638
639        for [pragma_name, default_value] in PRAGMA {
640            let pragma_value = pragma_override
641                .remove(pragma_name)
642                .unwrap_or_else(|| default_value.to_string());
643            pragma_update(&conn, pragma_name, &pragma_value)?;
644        }
645        // drain the rest overrides
646        for (pragma_name, pragma_value) in pragma_override.drain() {
647            pragma_update(&conn, &pragma_name, &pragma_value)?;
648        }
649
650        // t_metadata
651        conn_execute(&conn, CREATE_TABLE_T_METADATA, [])?;
652        // Insert row if not exists.
653        conn_execute(
654            &conn,
655            &format!(
656                "INSERT INTO t_metadata (schema_version, created_at) VALUES
657                    ({T_METADATA_EXPECTED_SCHEMA_VERSION}, ?) ON CONFLICT DO NOTHING"
658            ),
659            [Utc::now()],
660        )?;
661        // Fail on unexpected `schema_version`.
662        let actual_version = conn
663            .prepare("SELECT schema_version FROM t_metadata ORDER BY id DESC LIMIT 1")
664            .map_err(|err| {
665                error!("cannot select schema version - {err:?}");
666                InitializationError
667            })?
668            .query_row([], |row| row.get::<_, u32>("schema_version"));
669
670        let actual_version = actual_version.map_err(|err| {
671            error!("Cannot read the schema version - {err:?}");
672            InitializationError
673        })?;
674        if actual_version != T_METADATA_EXPECTED_SCHEMA_VERSION {
675            error!(
676                "wrong schema version, expected {T_METADATA_EXPECTED_SCHEMA_VERSION}, got {actual_version}"
677            );
678            return Err(InitializationError);
679        }
680
681        // t_execution_log
682        conn_execute(&conn, CREATE_TABLE_T_EXECUTION_LOG, [])?;
683        conn_execute(
684            &conn,
685            CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_VERSION,
686            [],
687        )?;
688        conn_execute(
689            &conn,
690            CREATE_INDEX_IDX_T_EXECUTION_ID_EXECUTION_ID_VARIANT,
691            [],
692        )?;
693        // t_join_set_response
694        conn_execute(&conn, CREATE_TABLE_T_JOIN_SET_RESPONSE, [])?;
695        conn_execute(
696            &conn,
697            CREATE_INDEX_IDX_T_JOIN_SET_RESPONSE_EXECUTION_ID_ID,
698            [],
699        )?;
700        // t_state
701        conn_execute(&conn, CREATE_TABLE_T_STATE, [])?;
702        conn_execute(&conn, IDX_T_STATE_LOCK_PENDING, [])?;
703        conn_execute(&conn, IDX_T_STATE_EXPIRED_TIMERS, [])?;
704        conn_execute(&conn, IDX_T_STATE_EXECUTION_ID_IS_TOP_LEVEL, [])?;
705        conn_execute(&conn, IDX_T_STATE_FFQN, [])?;
706        conn_execute(&conn, IDX_T_STATE_CREATED_AT, [])?;
707        // t_delay
708        conn_execute(&conn, CREATE_TABLE_T_DELAY, [])?;
709        // t_backtrace
710        conn_execute(&conn, CREATE_TABLE_T_BACKTRACE, [])?;
711        conn_execute(&conn, IDX_T_BACKTRACE_EXECUTION_ID_VERSION, [])?;
712        Ok(conn)
713    }
714
715    fn connection_rpc(
716        mut conn: Connection,
717        shutdown_requested: &AtomicBool,
718        shutdown_finished: &AtomicBool,
719        mut command_rx: mpsc::Receiver<ThreadCommand>,
720        metrics_threshold: Option<Duration>,
721    ) {
722        let mut histograms = Histograms::new(metrics_threshold);
723        while Self::tick(
724            &mut conn,
725            shutdown_requested,
726            &mut command_rx,
727            &mut histograms,
728        )
729        .is_ok()
730        {
731            // Loop until shutdown is set to true.
732        }
733        debug!("Closing command thread");
734        shutdown_finished.store(true, Ordering::Release);
735    }
736
737    fn tick(
738        conn: &mut Connection,
739        shutdown_requested: &AtomicBool,
740        command_rx: &mut mpsc::Receiver<ThreadCommand>,
741        histograms: &mut Histograms,
742    ) -> Result<(), ()> {
743        let mut ltx_list = Vec::new();
744        // Wait for first logical tx.
745        let mut ltx = match command_rx.blocking_recv() {
746            Some(ThreadCommand::LogicalTx(ltx)) => ltx,
747            Some(ThreadCommand::Shutdown) => {
748                debug!("shutdown message received");
749                return Err(());
750            }
751            None => {
752                debug!("command_rx was closed");
753                return Err(());
754            }
755        };
756        let all_fns_start = std::time::Instant::now();
757        let mut physical_tx = conn
758            .transaction_with_behavior(TransactionBehavior::Immediate)
759            .map_err(|begin_err| {
760                error!("Cannot open transaction, closing sqlite - {begin_err:?}");
761            })?;
762        Self::ltx_apply_to_tx(&mut ltx, &mut physical_tx, histograms);
763        ltx_list.push(ltx);
764
765        while let Ok(more) = command_rx.try_recv() {
766            let mut ltx = match more {
767                ThreadCommand::Shutdown => {
768                    debug!("shutdown message received");
769                    return Err(());
770                }
771                ThreadCommand::LogicalTx(ltx) => ltx,
772            };
773
774            Self::ltx_apply_to_tx(&mut ltx, &mut physical_tx, histograms);
775            ltx_list.push(ltx);
776        }
777        histograms.record_all_fns(all_fns_start.elapsed());
778
779        {
780            // Commit
781            if shutdown_requested.load(Ordering::Relaxed) {
782                debug!("Recveived shutdown during processing of the batch");
783                return Err(());
784            }
785            let commit_result = {
786                let now = std::time::Instant::now();
787                let commit_result = physical_tx.commit().map_err(RusqliteError::from);
788                histograms.record_commit(now.elapsed());
789                commit_result
790            };
791            if commit_result.is_ok() || ltx_list.len() == 1 {
792                // Happy path - just send the commit ACK.
793                for ltx in ltx_list {
794                    // Ignore the result: ThreadCommand producer timed out before awaiting the ack.
795                    let _ = ltx.commit_ack_sender.send(commit_result.clone());
796                }
797            } else {
798                warn!(
799                    "Bulk transaction failed, committing {} transactions serially",
800                    ltx_list.len()
801                );
802                // Bulk transaction failed. Replay each ltx in its own physical transaction.
803                for ltx in ltx_list {
804                    Self::ltx_commit_single(ltx, conn, shutdown_requested, histograms)?;
805                }
806            }
807        }
808        histograms.print_if_elapsed();
809        Ok(())
810    }
811
812    fn ltx_commit_single(
813        mut ltx: LogicalTx,
814        conn: &mut Connection,
815        shutdown_requested: &AtomicBool,
816        histograms: &mut Histograms,
817    ) -> Result<(), ()> {
818        let mut physical_tx = conn
819            .transaction_with_behavior(TransactionBehavior::Immediate)
820            .map_err(|begin_err| {
821                error!("Cannot open transaction, closing sqlite - {begin_err:?}");
822            })?;
823        Self::ltx_apply_to_tx(&mut ltx, &mut physical_tx, histograms);
824        if shutdown_requested.load(Ordering::Relaxed) {
825            debug!("Recveived shutdown during processing of the batch");
826            return Err(());
827        }
828        let commit_result = {
829            let now = std::time::Instant::now();
830            let commit_result = physical_tx.commit().map_err(RusqliteError::from);
831            histograms.record_commit(now.elapsed());
832            commit_result
833        };
834        // Ignore the result: ThreadCommand producer timed out before awaiting the ack.
835        let _ = ltx.commit_ack_sender.send(commit_result);
836        Ok(())
837    }
838
839    fn ltx_apply_to_tx(
840        ltx: &mut LogicalTx,
841        physical_tx: &mut Transaction,
842        histograms: &mut Histograms,
843    ) {
844        let sent_latency = ltx.sent_at.elapsed();
845        let started_at = Instant::now();
846        (ltx.func)(physical_tx);
847        histograms.record_command(sent_latency, ltx.func_name, started_at.elapsed());
848    }
849
850    #[instrument(level = Level::DEBUG, skip_all, name = "sqlite_new")]
851    pub async fn new<P: AsRef<Path>>(
852        path: P,
853        config: SqliteConfig,
854    ) -> Result<Self, InitializationError> {
855        let path = path.as_ref().to_owned();
856
857        let shutdown_requested = Arc::new(AtomicBool::new(false));
858        let shutdown_finished = Arc::new(AtomicBool::new(false));
859
860        let (command_tx, command_rx) = tokio::sync::mpsc::channel(config.queue_capacity);
861        info!("Sqlite database location: {path:?}");
862        let join_handle = {
863            // Initialize the `Connection`.
864            let init_task = {
865                tokio::task::spawn_blocking(move || {
866                    Self::init_thread(&path, config.pragma_override.unwrap_or_default())
867                })
868                .await
869            };
870            let conn = match init_task {
871                Ok(res) => res?,
872                Err(join_err) => {
873                    error!("Initialization panic - {join_err:?}");
874                    return Err(InitializationError);
875                }
876            };
877            let shutdown_requested = shutdown_requested.clone();
878            let shutdown_finished = shutdown_finished.clone();
879            // Start the RPC thread.
880            std::thread::spawn(move || {
881                Self::connection_rpc(
882                    conn,
883                    &shutdown_requested,
884                    &shutdown_finished,
885                    command_rx,
886                    config.metrics_threshold,
887                );
888            })
889        };
890        Ok(SqlitePool(SqlitePoolInner {
891            shutdown_requested,
892            shutdown_finished,
893            command_tx,
894            response_subscribers: Arc::default(),
895            pending_ffqn_subscribers: Arc::default(),
896            join_handle: Some(Arc::new(join_handle)),
897            execution_finished_subscribers: Arc::default(),
898        }))
899    }
900
901    /// Invokes the provided function wrapping a new [`rusqlite::Transaction`] that is committed automatically.
902    async fn transaction<F, T, E>(&self, mut func: F, func_name: &'static str) -> Result<T, E>
903    where
904        F: FnMut(&mut rusqlite::Transaction) -> Result<T, E> + Send + 'static,
905        T: Send + 'static,
906        E: From<DbErrorGeneric> + From<RusqliteError> + Send + 'static,
907    {
908        let fn_res: Arc<std::sync::Mutex<Option<_>>> = Arc::default();
909        let (commit_ack_sender, commit_ack_receiver) = oneshot::channel();
910        let thread_command_func = {
911            let fn_res = fn_res.clone();
912            ThreadCommand::LogicalTx(LogicalTx {
913                func: Box::new(move |tx| {
914                    let func_res = func(tx);
915                    *fn_res.lock().unwrap() = Some(func_res);
916                }),
917                sent_at: Instant::now(),
918                func_name,
919                commit_ack_sender,
920            })
921        };
922        self.0
923            .command_tx
924            .send(thread_command_func)
925            .await
926            .map_err(|_send_err| DbErrorGeneric::Close)?;
927
928        // Wait for commit ack, then get the retval from the mutex.
929        match commit_ack_receiver.await {
930            Ok(Ok(())) => {
931                let mut guard = fn_res.lock().unwrap();
932                std::mem::take(guard.deref_mut()).expect("ltx must have been run at least once")
933            }
934            Ok(Err(rusqlite_err)) => Err(E::from(rusqlite_err)),
935            Err(_) => Err(E::from(DbErrorGeneric::Close)),
936        }
937    }
938
939    fn fetch_created_event(
940        conn: &Connection,
941        execution_id: &ExecutionId,
942    ) -> Result<CreateRequest, DbErrorRead> {
943        let mut stmt = conn.prepare(
944            "SELECT created_at, json_value FROM t_execution_log WHERE \
945            execution_id = :execution_id AND version = 0",
946        )?;
947        let (created_at, event) = stmt.query_row(
948            named_params! {
949                ":execution_id": execution_id.to_string(),
950            },
951            |row| {
952                let created_at = row.get("created_at")?;
953                let event = row
954                    .get::<_, JsonWrapper<ExecutionEventInner>>("json_value")
955                    .map_err(|serde| {
956                        error!("cannot deserialize `Created` event: {row:?} - `{serde:?}`");
957                        consistency_rusqlite("cannot deserialize `Created` event")
958                    })?;
959                Ok((created_at, event.0))
960            },
961        )?;
962        if let ExecutionEventInner::Created {
963            ffqn,
964            params,
965            parent,
966            scheduled_at,
967            component_id,
968            metadata,
969            scheduled_by,
970        } = event
971        {
972            Ok(CreateRequest {
973                created_at,
974                execution_id: execution_id.clone(),
975                ffqn,
976                params,
977                parent,
978                scheduled_at,
979                component_id,
980                metadata,
981                scheduled_by,
982            })
983        } else {
984            error!("Row with version=0 must be a `Created` event - {event:?}");
985            Err(consistency_db_err("expected `Created` event").into())
986        }
987    }
988
989    fn check_expected_next_and_appending_version(
990        expected_version: &Version,
991        appending_version: &Version,
992    ) -> Result<(), DbErrorWrite> {
993        if *expected_version != *appending_version {
994            debug!(
995                "Version conflict - expected: {expected_version:?}, appending: {appending_version:?}"
996            );
997            return Err(DbErrorWrite::NonRetriable(
998                DbErrorWriteNonRetriable::VersionConflict {
999                    expected: expected_version.clone(),
1000                    requested: appending_version.clone(),
1001                },
1002            ));
1003        }
1004        Ok(())
1005    }
1006
1007    #[instrument(level = Level::TRACE, skip_all, fields(execution_id = %req.execution_id))]
1008    fn create_inner(
1009        tx: &Transaction,
1010        req: CreateRequest,
1011    ) -> Result<(AppendResponse, AppendNotifier), DbErrorWrite> {
1012        debug!("create_inner");
1013
1014        let version = Version::default();
1015        let execution_id = req.execution_id.clone();
1016        let execution_id_str = execution_id.to_string();
1017        let ffqn = req.ffqn.clone();
1018        let created_at = req.created_at;
1019        let scheduled_at = req.scheduled_at;
1020        let event = ExecutionEventInner::from(req);
1021        let event_ser = serde_json::to_string(&event).map_err(|err| {
1022            error!("Cannot serialize {event:?} - {err:?}");
1023            DbErrorWriteNonRetriable::ValidationFailed("parameter serialization error".into())
1024        })?;
1025        tx.prepare(
1026                "INSERT INTO t_execution_log (execution_id, created_at, version, json_value, variant, join_set_id ) \
1027                VALUES (:execution_id, :created_at, :version, :json_value, :variant, :join_set_id)")
1028        ?
1029        .execute(named_params! {
1030            ":execution_id": &execution_id_str,
1031            ":created_at": created_at,
1032            ":version": version.0,
1033            ":json_value": event_ser,
1034            ":variant": event.variant(),
1035            ":join_set_id": event.join_set_id().map(std::string::ToString::to_string),
1036        })
1037        ?;
1038        let pending_at = {
1039            debug!("Creating with `Pending(`{scheduled_at:?}`)");
1040            tx.prepare(
1041                r"
1042                INSERT INTO t_state (
1043                    execution_id,
1044                    is_top_level,
1045                    corresponding_version,
1046                    pending_expires_finished,
1047                    ffqn,
1048                    state,
1049                    created_at,
1050                    updated_at,
1051                    scheduled_at,
1052                    intermittent_event_count
1053                    )
1054                VALUES (
1055                    :execution_id,
1056                    :is_top_level,
1057                    :corresponding_version,
1058                    :pending_expires_finished,
1059                    :ffqn,
1060                    :state,
1061                    :created_at,
1062                    CURRENT_TIMESTAMP,
1063                    :scheduled_at,
1064                    0
1065                    )
1066                ",
1067            )?
1068            .execute(named_params! {
1069                ":execution_id": execution_id.to_string(),
1070                ":is_top_level": execution_id.is_top_level(),
1071                ":corresponding_version": version.0,
1072                ":pending_expires_finished": scheduled_at,
1073                ":ffqn": ffqn.to_string(),
1074                ":state": STATE_PENDING_AT,
1075                ":created_at": created_at,
1076                ":scheduled_at": scheduled_at,
1077            })?;
1078            AppendNotifier {
1079                pending_at: Some(NotifierPendingAt { scheduled_at, ffqn }),
1080                execution_finished: None,
1081                response: None,
1082            }
1083        };
1084        let next_version = Version::new(version.0 + 1);
1085        Ok((next_version, pending_at))
1086    }
1087
1088    #[instrument(level = Level::DEBUG, skip_all, fields(%execution_id, %scheduled_at, %corresponding_version))]
1089    fn update_state_pending_after_response_appended(
1090        tx: &Transaction,
1091        execution_id: &ExecutionId,
1092        scheduled_at: DateTime<Utc>,     // Changing to state PendingAt
1093        corresponding_version: &Version, // t_execution_log is not be changed
1094    ) -> Result<AppendNotifier, DbErrorWrite> {
1095        debug!("Setting t_state to Pending(`{scheduled_at:?}`) after response appended");
1096        let execution_id_str = execution_id.to_string();
1097        let mut stmt = tx
1098            .prepare_cached(
1099                r"
1100                UPDATE t_state
1101                SET
1102                    corresponding_version = :corresponding_version,
1103                    pending_expires_finished = :pending_expires_finished,
1104                    state = :state,
1105                    updated_at = CURRENT_TIMESTAMP,
1106
1107                    last_lock_version = NULL,
1108
1109                    join_set_id = NULL,
1110                    join_set_closing = NULL,
1111
1112                    result_kind = NULL
1113                WHERE execution_id = :execution_id
1114            ",
1115            )
1116            .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1117        let updated = stmt
1118            .execute(named_params! {
1119                ":execution_id": execution_id_str,
1120                ":corresponding_version": corresponding_version.0,
1121                ":pending_expires_finished": scheduled_at,
1122                ":state": STATE_PENDING_AT,
1123            })
1124            .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1125        if updated != 1 {
1126            return Err(DbErrorWrite::NotFound);
1127        }
1128        Ok(AppendNotifier {
1129            pending_at: Some(NotifierPendingAt {
1130                scheduled_at,
1131                ffqn: Self::fetch_created_event(tx, execution_id)?.ffqn,
1132            }),
1133            execution_finished: None,
1134            response: None,
1135        })
1136    }
1137
1138    #[instrument(level = Level::DEBUG, skip_all, fields(%execution_id, %scheduled_at, %appending_version))]
1139    fn update_state_pending_after_event_appended(
1140        tx: &Transaction,
1141        execution_id: &ExecutionId,
1142        appending_version: &Version,
1143        scheduled_at: DateTime<Utc>, // Changing to state PendingAt
1144        intermittent_failure: bool,
1145    ) -> Result<(AppendResponse, AppendNotifier), DbErrorWrite> {
1146        debug!("Setting t_state to Pending(`{scheduled_at:?}`) after event appended");
1147        // If response idx is unknown, use 0. This distinguishs the two paths.
1148        // JoinNext will always send an actual idx.
1149        let mut stmt = tx
1150            .prepare_cached(
1151                r"
1152                UPDATE t_state
1153                SET
1154                    corresponding_version = :appending_version,
1155                    pending_expires_finished = :pending_expires_finished,
1156                    state = :state,
1157                    updated_at = CURRENT_TIMESTAMP,
1158                    intermittent_event_count = intermittent_event_count + :intermittent_delta,
1159
1160                    last_lock_version = NULL,
1161
1162                    join_set_id = NULL,
1163                    join_set_closing = NULL,
1164
1165                    result_kind = NULL
1166                WHERE execution_id = :execution_id;
1167            ",
1168            )
1169            .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1170        let updated = stmt
1171            .execute(named_params! {
1172                ":execution_id": execution_id.to_string(),
1173                ":appending_version": appending_version.0,
1174                ":pending_expires_finished": scheduled_at,
1175                ":state": STATE_PENDING_AT,
1176                ":intermittent_delta": i32::from(intermittent_failure) // 0 or 1
1177            })
1178            .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1179        if updated != 1 {
1180            return Err(DbErrorWrite::NotFound);
1181        }
1182        Ok((
1183            appending_version.increment(),
1184            AppendNotifier {
1185                pending_at: Some(NotifierPendingAt {
1186                    scheduled_at,
1187                    ffqn: Self::fetch_created_event(tx, execution_id)?.ffqn,
1188                }),
1189                execution_finished: None,
1190                response: None,
1191            },
1192        ))
1193    }
1194
1195    fn update_state_locked_get_intermittent_event_count(
1196        tx: &Transaction,
1197        execution_id: &ExecutionId,
1198        executor_id: ExecutorId,
1199        run_id: RunId,
1200        lock_expires_at: DateTime<Utc>,
1201        appending_version: &Version,
1202        retry_config: ComponentRetryConfig,
1203    ) -> Result<u32, DbErrorWrite> {
1204        debug!("Setting t_state to Locked(`{lock_expires_at:?}`)");
1205        let backoff_millis =
1206            u64::try_from(retry_config.retry_exp_backoff.as_millis()).expect("backoff too big");
1207        let execution_id_str = execution_id.to_string();
1208        let mut stmt = tx
1209            .prepare_cached(
1210                r"
1211                UPDATE t_state
1212                SET
1213                    corresponding_version = :appending_version,
1214                    pending_expires_finished = :pending_expires_finished,
1215                    state = :state,
1216                    updated_at = CURRENT_TIMESTAMP,
1217
1218                    max_retries = :max_retries,
1219                    retry_exp_backoff_millis = :retry_exp_backoff_millis,
1220                    last_lock_version = :appending_version,
1221                    executor_id = :executor_id,
1222                    run_id = :run_id,
1223
1224                    join_set_id = NULL,
1225                    join_set_closing = NULL,
1226
1227                    result_kind = NULL
1228                WHERE execution_id = :execution_id
1229            ",
1230            )
1231            .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1232        let updated = stmt
1233            .execute(named_params! {
1234                ":execution_id": execution_id_str,
1235                ":appending_version": appending_version.0,
1236                ":pending_expires_finished": lock_expires_at,
1237                ":state": STATE_LOCKED,
1238                ":max_retries": retry_config.max_retries,
1239                ":retry_exp_backoff_millis": backoff_millis,
1240                ":executor_id": executor_id.to_string(),
1241                ":run_id": run_id.to_string(),
1242            })
1243            .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1244        if updated != 1 {
1245            return Err(DbErrorWrite::NotFound);
1246        }
1247
1248        // fetch intermittent event count from the just-inserted row.
1249        let intermittent_event_count = tx
1250            .prepare(
1251                "SELECT intermittent_event_count FROM t_state WHERE execution_id = :execution_id",
1252            )?
1253            .query_row(
1254                named_params! {
1255                    ":execution_id": execution_id_str,
1256                },
1257                |row| {
1258                    let intermittent_event_count = row.get("intermittent_event_count")?;
1259                    Ok(intermittent_event_count)
1260                },
1261            )
1262            .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1263
1264        Ok(intermittent_event_count)
1265    }
1266
1267    fn update_state_blocked(
1268        tx: &Transaction,
1269        execution_id: &ExecutionId,
1270        appending_version: &Version,
1271        // BlockedByJoinSet fields
1272        join_set_id: &JoinSetId,
1273        lock_expires_at: DateTime<Utc>,
1274        join_set_closing: bool,
1275    ) -> Result<
1276        AppendResponse, // next version
1277        DbErrorWrite,
1278    > {
1279        debug!("Setting t_state to BlockedByJoinSet(`{join_set_id}`)");
1280        let execution_id_str = execution_id.to_string();
1281        let mut stmt = tx.prepare_cached(
1282            r"
1283                UPDATE t_state
1284                SET
1285                    corresponding_version = :appending_version,
1286                    pending_expires_finished = :pending_expires_finished,
1287                    state = :state,
1288                    updated_at = CURRENT_TIMESTAMP,
1289
1290                    last_lock_version = NULL,
1291
1292                    join_set_id = :join_set_id,
1293                    join_set_closing = :join_set_closing,
1294
1295                    result_kind = NULL
1296                WHERE execution_id = :execution_id
1297            ",
1298        )?;
1299        let updated = stmt.execute(named_params! {
1300            ":execution_id": execution_id_str,
1301            ":appending_version": appending_version.0,
1302            ":pending_expires_finished": lock_expires_at,
1303            ":state": STATE_BLOCKED_BY_JOIN_SET,
1304            ":join_set_id": join_set_id,
1305            ":join_set_closing": join_set_closing,
1306        })?;
1307        if updated != 1 {
1308            return Err(DbErrorWrite::NotFound);
1309        }
1310        Ok(appending_version.increment())
1311    }
1312
1313    fn update_state_finished(
1314        tx: &Transaction,
1315        execution_id: &ExecutionId,
1316        appending_version: &Version,
1317        // Finished fields
1318        finished_at: DateTime<Utc>,
1319        result_kind: PendingStateFinishedResultKind,
1320    ) -> Result<(), DbErrorWrite> {
1321        debug!("Setting t_state to Finished");
1322        let execution_id_str = execution_id.to_string();
1323        let mut stmt = tx.prepare_cached(
1324            r"
1325                UPDATE t_state
1326                SET
1327                    corresponding_version = :appending_version,
1328                    pending_expires_finished = :pending_expires_finished,
1329                    state = :state,
1330                    updated_at = CURRENT_TIMESTAMP,
1331
1332                    last_lock_version = NULL,
1333                    executor_id = NULL,
1334                    run_id = NULL,
1335
1336                    join_set_id = NULL,
1337                    join_set_closing = NULL,
1338
1339                    result_kind = :result_kind
1340                WHERE execution_id = :execution_id
1341            ",
1342        )?;
1343        let updated = stmt.execute(named_params! {
1344            ":execution_id": execution_id_str,
1345            ":appending_version": appending_version.0,
1346            ":pending_expires_finished": finished_at,
1347            ":state": STATE_FINISHED,
1348            ":result_kind": result_kind.to_string(),
1349        })?;
1350        if updated != 1 {
1351            return Err(DbErrorWrite::NotFound);
1352        }
1353        Ok(())
1354    }
1355
1356    // Upon appending new event to t_execution_log, copy the previous t_state with changed appending_version and created_at.
1357    #[instrument(level = Level::TRACE, skip_all, fields(%execution_id, %appending_version))]
1358    fn bump_state_next_version(
1359        tx: &Transaction,
1360        execution_id: &ExecutionId,
1361        appending_version: &Version,
1362        delay_req: Option<DelayReq>,
1363    ) -> Result<AppendResponse /* next version */, DbErrorWrite> {
1364        debug!("update_index_version");
1365        let execution_id_str = execution_id.to_string();
1366        let mut stmt = tx.prepare_cached(
1367            r"
1368                UPDATE t_state
1369                SET
1370                    corresponding_version = :appending_version,
1371                    updated_at = CURRENT_TIMESTAMP
1372                WHERE execution_id = :execution_id
1373            ",
1374        )?;
1375        let updated = stmt.execute(named_params! {
1376            ":execution_id": execution_id_str,
1377            ":appending_version": appending_version.0,
1378        })?;
1379        if updated != 1 {
1380            return Err(DbErrorWrite::NotFound);
1381        }
1382        if let Some(DelayReq {
1383            join_set_id,
1384            delay_id,
1385            expires_at,
1386        }) = delay_req
1387        {
1388            debug!("Inserting delay to `t_delay`");
1389            let mut stmt = tx.prepare_cached(
1390                "INSERT INTO t_delay (execution_id, join_set_id, delay_id, expires_at) \
1391                VALUES \
1392                (:execution_id, :join_set_id, :delay_id, :expires_at)",
1393            )?;
1394            stmt.execute(named_params! {
1395                ":execution_id": execution_id_str,
1396                ":join_set_id": join_set_id.to_string(),
1397                ":delay_id": delay_id.to_string(),
1398                ":expires_at": expires_at,
1399            })?;
1400        }
1401        Ok(appending_version.increment())
1402    }
1403
1404    fn get_combined_state(
1405        tx: &Transaction,
1406        execution_id: &ExecutionId,
1407    ) -> Result<CombinedState, DbErrorRead> {
1408        let mut stmt = tx.prepare(
1409            r"
1410                SELECT
1411                    state, ffqn, corresponding_version, pending_expires_finished,
1412                    last_lock_version, executor_id, run_id,
1413                    join_set_id, join_set_closing,
1414                    result_kind
1415                    FROM t_state
1416                WHERE
1417                    execution_id = :execution_id
1418                ",
1419        )?;
1420        stmt.query_row(
1421            named_params! {
1422                ":execution_id": execution_id.to_string(),
1423            },
1424            |row| {
1425                CombinedState::new(
1426                    &CombinedStateDTO {
1427                        state: row.get("state")?,
1428                        ffqn: row.get("ffqn")?,
1429                        pending_expires_finished: row
1430                            .get::<_, DateTime<Utc>>("pending_expires_finished")?,
1431                        last_lock_version: row
1432                            .get::<_, Option<VersionType>>("last_lock_version")?
1433                            .map(Version::new),
1434                        executor_id: row.get::<_, Option<ExecutorId>>("executor_id")?,
1435                        run_id: row.get::<_, Option<RunId>>("run_id")?,
1436                        join_set_id: row.get::<_, Option<JoinSetId>>("join_set_id")?,
1437                        join_set_closing: row.get::<_, Option<bool>>("join_set_closing")?,
1438                        result_kind: row
1439                            .get::<_, Option<FromStrWrapper<PendingStateFinishedResultKind>>>(
1440                                "result_kind",
1441                            )?
1442                            .map(|wrapper| wrapper.0),
1443                    },
1444                    Version::new(row.get("corresponding_version")?),
1445                )
1446            },
1447        )
1448        .map_err(DbErrorRead::from)
1449    }
1450
1451    fn list_executions(
1452        read_tx: &Transaction,
1453        ffqn: Option<&FunctionFqn>,
1454        top_level_only: bool,
1455        pagination: &ExecutionListPagination,
1456    ) -> Result<Vec<ExecutionWithState>, DbErrorGeneric> {
1457        struct StatementModifier<'a> {
1458            where_vec: Vec<String>,
1459            params: Vec<(&'static str, ToSqlOutput<'a>)>,
1460            limit: u32,
1461            limit_desc: bool,
1462        }
1463
1464        fn paginate<'a, T: rusqlite::ToSql + 'static>(
1465            pagination: &'a Pagination<Option<T>>,
1466            column: &str,
1467            top_level_only: bool,
1468        ) -> Result<StatementModifier<'a>, DbErrorGeneric> {
1469            let mut where_vec: Vec<String> = vec![];
1470            let mut params: Vec<(&'static str, ToSqlOutput<'a>)> = vec![];
1471            let limit = pagination.length();
1472            let limit_desc = pagination.is_desc();
1473            let rel = pagination.rel();
1474            match pagination {
1475                Pagination::NewerThan {
1476                    cursor: Some(cursor),
1477                    ..
1478                }
1479                | Pagination::OlderThan {
1480                    cursor: Some(cursor),
1481                    ..
1482                } => {
1483                    where_vec.push(format!("{column} {rel} :cursor"));
1484                    let cursor = cursor.to_sql().map_err(|err| {
1485                        error!("Possible program error - cannot convert cursor to sql - {err:?}");
1486                        DbErrorGeneric::Uncategorized("cannot convert cursor to sql".into())
1487                    })?;
1488                    params.push((":cursor", cursor));
1489                }
1490                _ => {}
1491            }
1492            if top_level_only {
1493                where_vec.push("is_top_level=true".to_string());
1494            }
1495            Ok(StatementModifier {
1496                where_vec,
1497                params,
1498                limit,
1499                limit_desc,
1500            })
1501        }
1502
1503        let mut statement_mod = match pagination {
1504            ExecutionListPagination::CreatedBy(pagination) => {
1505                paginate(pagination, "created_at", top_level_only)?
1506            }
1507            ExecutionListPagination::ExecutionId(pagination) => {
1508                paginate(pagination, "execution_id", top_level_only)?
1509            }
1510        };
1511
1512        let ffqn_temporary;
1513        if let Some(ffqn) = ffqn {
1514            statement_mod.where_vec.push("ffqn = :ffqn".to_string());
1515            ffqn_temporary = ffqn.to_string();
1516            let ffqn = ffqn_temporary
1517                .to_sql()
1518                .expect("string conversion never fails");
1519
1520            statement_mod.params.push((":ffqn", ffqn));
1521        }
1522
1523        let where_str = if statement_mod.where_vec.is_empty() {
1524            String::new()
1525        } else {
1526            format!("WHERE {}", statement_mod.where_vec.join(" AND "))
1527        };
1528        let sql = format!(
1529            r"
1530            SELECT created_at, scheduled_at, state, execution_id, ffqn, corresponding_version, pending_expires_finished,
1531            last_lock_version, executor_id, run_id,
1532            join_set_id, join_set_closing,
1533            result_kind
1534            FROM t_state {where_str} ORDER BY created_at {desc} LIMIT {limit}
1535            ",
1536            desc = if statement_mod.limit_desc { "DESC" } else { "" },
1537            limit = statement_mod.limit,
1538        );
1539        let mut vec: Vec<_> = read_tx
1540            .prepare(&sql)?
1541            .query_map::<_, &[(&'static str, ToSqlOutput)], _>(
1542                statement_mod
1543                    .params
1544                    .into_iter()
1545                    .collect::<Vec<_>>()
1546                    .as_ref(),
1547                |row| {
1548                    let execution_id = row.get::<_, ExecutionId>("execution_id")?;
1549                    let created_at = row.get("created_at")?;
1550                    let scheduled_at = row.get("scheduled_at")?;
1551                    let combined_state = CombinedState::new(
1552                        &CombinedStateDTO {
1553                            state: row.get("state")?,
1554                            ffqn: row.get("ffqn")?,
1555                            pending_expires_finished: row
1556                                .get::<_, DateTime<Utc>>("pending_expires_finished")?,
1557                            executor_id: row.get::<_, Option<ExecutorId>>("executor_id")?,
1558
1559                            last_lock_version: row
1560                                .get::<_, Option<VersionType>>("last_lock_version")?
1561                                .map(Version::new),
1562                            run_id: row.get::<_, Option<RunId>>("run_id")?,
1563                            join_set_id: row.get::<_, Option<JoinSetId>>("join_set_id")?,
1564                            join_set_closing: row.get::<_, Option<bool>>("join_set_closing")?,
1565                            result_kind: row
1566                                .get::<_, Option<FromStrWrapper<PendingStateFinishedResultKind>>>(
1567                                    "result_kind",
1568                                )?
1569                                .map(|wrapper| wrapper.0),
1570                        },
1571                        Version::new(row.get("corresponding_version")?),
1572                    )?;
1573                    Ok(ExecutionWithState {
1574                        execution_id,
1575                        ffqn: combined_state.ffqn,
1576                        pending_state: combined_state.pending_state,
1577                        created_at,
1578                        scheduled_at,
1579                    })
1580                },
1581            )?
1582            .collect::<Vec<Result<_, _>>>()
1583            .into_iter()
1584            .filter_map(|row| match row {
1585                Ok(row) => Some(row),
1586                Err(err) => {
1587                    warn!("Skipping row - {err:?}");
1588                    None
1589                }
1590            })
1591            .collect();
1592
1593        if !statement_mod.limit_desc {
1594            // the list must be sorted in descending order
1595            vec.reverse();
1596        }
1597        Ok(vec)
1598    }
1599
1600    #[instrument(level = Level::TRACE, skip_all, fields(%execution_id, %run_id, %executor_id))]
1601    #[expect(clippy::too_many_arguments)]
1602    fn lock_single_execution(
1603        tx: &Transaction,
1604        created_at: DateTime<Utc>,
1605        component_id: &ComponentId,
1606        execution_id: &ExecutionId,
1607        run_id: RunId,
1608        appending_version: &Version,
1609        executor_id: ExecutorId,
1610        lock_expires_at: DateTime<Utc>,
1611        retry_config: ComponentRetryConfig,
1612    ) -> Result<LockedExecution, DbErrorWrite> {
1613        debug!("lock_single_execution");
1614        let combined_state = Self::get_combined_state(tx, execution_id)?;
1615        combined_state.pending_state.can_append_lock(
1616            created_at,
1617            executor_id,
1618            run_id,
1619            lock_expires_at,
1620        )?;
1621        let expected_version = combined_state.get_next_version_assert_not_finished();
1622        Self::check_expected_next_and_appending_version(&expected_version, appending_version)?;
1623
1624        // Append to `execution_log` table.
1625        let locked_event = Locked {
1626            component_id: component_id.clone(),
1627            executor_id,
1628            lock_expires_at,
1629            run_id,
1630            retry_config,
1631        };
1632        let event = ExecutionEventInner::Locked(locked_event.clone());
1633        let event_ser = serde_json::to_string(&event).map_err(|err| {
1634            warn!("Cannot serialize {event:?} - {err:?}");
1635            DbErrorWriteNonRetriable::ValidationFailed("parameter serialization error".into())
1636        })?;
1637        let mut stmt = tx
1638            .prepare_cached(
1639                "INSERT INTO t_execution_log \
1640            (execution_id, created_at, json_value, version, variant) \
1641            VALUES \
1642            (:execution_id, :created_at, :json_value, :version, :variant)",
1643            )
1644            .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1645        stmt.execute(named_params! {
1646            ":execution_id": execution_id.to_string(),
1647            ":created_at": created_at,
1648            ":json_value": event_ser,
1649            ":version": appending_version.0,
1650            ":variant": event.variant(),
1651        })
1652        .map_err(|err| {
1653            warn!("Cannot lock execution - {err:?}");
1654            DbErrorWrite::NonRetriable(DbErrorWriteNonRetriable::IllegalState("cannot lock".into()))
1655        })?;
1656
1657        // Update `t_state`
1658        let responses = Self::list_responses(tx, execution_id, None)?;
1659        let responses = responses.into_iter().map(|resp| resp.event).collect();
1660        trace!("Responses: {responses:?}");
1661
1662        let intermittent_event_count = Self::update_state_locked_get_intermittent_event_count(
1663            tx,
1664            execution_id,
1665            executor_id,
1666            run_id,
1667            lock_expires_at,
1668            appending_version,
1669            retry_config,
1670        )?;
1671        // Fetch event_history and `Created` event to construct the response.
1672        let mut events = tx
1673            .prepare(
1674                "SELECT json_value FROM t_execution_log WHERE \
1675                execution_id = :execution_id AND (variant = :v1 OR variant = :v2) \
1676                ORDER BY version",
1677            )?
1678            .query_map(
1679                named_params! {
1680                    ":execution_id": execution_id.to_string(),
1681                    ":v1": DUMMY_CREATED.variant(),
1682                    ":v2": DUMMY_HISTORY_EVENT.variant(),
1683                },
1684                |row| {
1685                    row.get::<_, JsonWrapper<ExecutionEventInner>>("json_value")
1686                        .map(|wrapper| wrapper.0)
1687                        .map_err(|serde| {
1688                            error!("Cannot deserialize {row:?} - {serde:?}");
1689                            consistency_rusqlite("cannot deserialize json value")
1690                        })
1691                },
1692            )?
1693            .collect::<Result<Vec<_>, _>>()?
1694            .into_iter()
1695            .collect::<VecDeque<_>>();
1696        let Some(ExecutionEventInner::Created {
1697            ffqn,
1698            params,
1699            parent,
1700            metadata,
1701            ..
1702        }) = events.pop_front()
1703        else {
1704            error!("Execution log must contain at least `Created` event");
1705            return Err(consistency_db_err("execution log must contain `Created` event").into());
1706        };
1707
1708        let event_history = events
1709            .into_iter()
1710            .map(|event| {
1711                if let ExecutionEventInner::HistoryEvent { event } = event {
1712                    Ok(event)
1713                } else {
1714                    error!("Rows can only contain `Created` and `HistoryEvent` event kinds");
1715                    Err(consistency_db_err(
1716                        "rows can only contain `Created` and `HistoryEvent` event kinds",
1717                    ))
1718                }
1719            })
1720            .collect::<Result<Vec<_>, _>>()?;
1721
1722        Ok(LockedExecution {
1723            execution_id: execution_id.clone(),
1724            metadata,
1725            next_version: appending_version.increment(),
1726            ffqn,
1727            params,
1728            event_history,
1729            responses,
1730            parent,
1731            intermittent_event_count,
1732            locked_event,
1733        })
1734    }
1735
1736    fn count_join_next(
1737        tx: &Transaction,
1738        execution_id: &ExecutionId,
1739        join_set_id: &JoinSetId,
1740    ) -> Result<u64, DbErrorRead> {
1741        let mut stmt = tx.prepare(
1742            "SELECT COUNT(*) as count FROM t_execution_log WHERE execution_id = :execution_id AND join_set_id = :join_set_id \
1743            AND history_event_type = :join_next",
1744        )?;
1745        Ok(stmt.query_row(
1746            named_params! {
1747                ":execution_id": execution_id.to_string(),
1748                ":join_set_id": join_set_id.to_string(),
1749                ":join_next": HISTORY_EVENT_TYPE_JOIN_NEXT,
1750            },
1751            |row| row.get("count"),
1752        )?)
1753    }
1754
1755    #[instrument(level = Level::TRACE, skip_all, fields(%execution_id))]
1756    #[expect(clippy::needless_return)]
1757    fn append(
1758        tx: &Transaction,
1759        execution_id: &ExecutionId,
1760        req: AppendRequest,
1761        appending_version: Version,
1762    ) -> Result<(AppendResponse, AppendNotifier), DbErrorWrite> {
1763        if matches!(req.event, ExecutionEventInner::Created { .. }) {
1764            return Err(DbErrorWrite::NonRetriable(
1765                DbErrorWriteNonRetriable::ValidationFailed(
1766                    "cannot append `Created` event - use `create` instead".into(),
1767                ),
1768            ));
1769        }
1770        if let AppendRequest {
1771            event:
1772                ExecutionEventInner::Locked(Locked {
1773                    component_id,
1774                    executor_id,
1775                    run_id,
1776                    lock_expires_at,
1777                    retry_config,
1778                }),
1779            created_at,
1780        } = req
1781        {
1782            return Self::lock_single_execution(
1783                tx,
1784                created_at,
1785                &component_id,
1786                execution_id,
1787                run_id,
1788                &appending_version,
1789                executor_id,
1790                lock_expires_at,
1791                retry_config,
1792            )
1793            .map(|locked_execution| (locked_execution.next_version, AppendNotifier::default()));
1794        }
1795
1796        let combined_state = Self::get_combined_state(tx, execution_id)?;
1797        if combined_state.pending_state.is_finished() {
1798            debug!("Execution is already finished");
1799            return Err(DbErrorWrite::NonRetriable(
1800                DbErrorWriteNonRetriable::IllegalState("already finished".into()),
1801            ));
1802        }
1803
1804        Self::check_expected_next_and_appending_version(
1805            &combined_state.get_next_version_assert_not_finished(),
1806            &appending_version,
1807        )?;
1808        let event_ser = serde_json::to_string(&req.event).map_err(|err| {
1809            error!("Cannot serialize {:?} - {err:?}", req.event);
1810            DbErrorWriteNonRetriable::ValidationFailed("parameter serialization error".into())
1811        })?;
1812
1813        let mut stmt = tx.prepare(
1814                    "INSERT INTO t_execution_log (execution_id, created_at, json_value, version, variant, join_set_id) \
1815                    VALUES (:execution_id, :created_at, :json_value, :version, :variant, :join_set_id)")
1816                    ?;
1817        stmt.execute(named_params! {
1818            ":execution_id": execution_id.to_string(),
1819            ":created_at": req.created_at,
1820            ":json_value": event_ser,
1821            ":version": appending_version.0,
1822            ":variant": req.event.variant(),
1823            ":join_set_id": req.event.join_set_id().map(std::string::ToString::to_string),
1824        })?;
1825        // Calculate current pending state
1826
1827        match &req.event {
1828            ExecutionEventInner::Created { .. } => {
1829                unreachable!("handled in the caller")
1830            }
1831
1832            ExecutionEventInner::Locked { .. } => {
1833                unreachable!("handled above")
1834            }
1835
1836            ExecutionEventInner::TemporarilyFailed {
1837                backoff_expires_at, ..
1838            }
1839            | ExecutionEventInner::TemporarilyTimedOut {
1840                backoff_expires_at, ..
1841            } => {
1842                let (next_version, notifier) = Self::update_state_pending_after_event_appended(
1843                    tx,
1844                    execution_id,
1845                    &appending_version,
1846                    *backoff_expires_at,
1847                    true, // an intermittent failure
1848                )?;
1849                return Ok((next_version, notifier));
1850            }
1851
1852            ExecutionEventInner::Unlocked {
1853                backoff_expires_at, ..
1854            } => {
1855                let (next_version, notifier) = Self::update_state_pending_after_event_appended(
1856                    tx,
1857                    execution_id,
1858                    &appending_version,
1859                    *backoff_expires_at,
1860                    false, // not an intermittent failure
1861                )?;
1862                return Ok((next_version, notifier));
1863            }
1864
1865            ExecutionEventInner::Finished { result, .. } => {
1866                Self::update_state_finished(
1867                    tx,
1868                    execution_id,
1869                    &appending_version,
1870                    req.created_at,
1871                    PendingStateFinishedResultKind::from(result),
1872                )?;
1873                return Ok((
1874                    appending_version,
1875                    AppendNotifier {
1876                        pending_at: None,
1877                        execution_finished: Some(NotifierExecutionFinished {
1878                            execution_id: execution_id.clone(),
1879                            retval: result.clone(),
1880                        }),
1881                        response: None,
1882                    },
1883                ));
1884            }
1885
1886            ExecutionEventInner::HistoryEvent {
1887                event:
1888                    HistoryEvent::JoinSetCreate { .. }
1889                    | HistoryEvent::JoinSetRequest {
1890                        request: JoinSetRequest::ChildExecutionRequest { .. },
1891                        ..
1892                    }
1893                    | HistoryEvent::Persist { .. }
1894                    | HistoryEvent::Schedule { .. }
1895                    | HistoryEvent::Stub { .. }
1896                    | HistoryEvent::JoinNextTooMany { .. },
1897            } => {
1898                return Ok((
1899                    Self::bump_state_next_version(tx, execution_id, &appending_version, None)?,
1900                    AppendNotifier::default(),
1901                ));
1902            }
1903
1904            ExecutionEventInner::HistoryEvent {
1905                event:
1906                    HistoryEvent::JoinSetRequest {
1907                        join_set_id,
1908                        request:
1909                            JoinSetRequest::DelayRequest {
1910                                delay_id,
1911                                expires_at,
1912                                ..
1913                            },
1914                    },
1915            } => {
1916                return Ok((
1917                    Self::bump_state_next_version(
1918                        tx,
1919                        execution_id,
1920                        &appending_version,
1921                        Some(DelayReq {
1922                            join_set_id: join_set_id.clone(),
1923                            delay_id: delay_id.clone(),
1924                            expires_at: *expires_at,
1925                        }),
1926                    )?,
1927                    AppendNotifier::default(),
1928                ));
1929            }
1930
1931            ExecutionEventInner::HistoryEvent {
1932                event:
1933                    HistoryEvent::JoinNext {
1934                        join_set_id,
1935                        run_expires_at,
1936                        closing,
1937                        requested_ffqn: _,
1938                    },
1939            } => {
1940                // Did the response arrive already?
1941                let join_next_count = Self::count_join_next(tx, execution_id, join_set_id)?;
1942                let nth_response =
1943                    Self::nth_response(tx, execution_id, join_set_id, join_next_count - 1)?; // Skip n-1 rows
1944                trace!("join_next_count: {join_next_count}, nth_response: {nth_response:?}");
1945                assert!(join_next_count > 0);
1946                if let Some(ResponseWithCursor {
1947                    event:
1948                        JoinSetResponseEventOuter {
1949                            created_at: nth_created_at,
1950                            ..
1951                        },
1952                    cursor: _,
1953                }) = nth_response
1954                {
1955                    let scheduled_at = max(*run_expires_at, nth_created_at); // No need to block
1956                    let (next_version, notifier) = Self::update_state_pending_after_event_appended(
1957                        tx,
1958                        execution_id,
1959                        &appending_version,
1960                        scheduled_at,
1961                        false, // not an intermittent failure
1962                    )?;
1963                    return Ok((next_version, notifier));
1964                }
1965                return Ok((
1966                    Self::update_state_blocked(
1967                        tx,
1968                        execution_id,
1969                        &appending_version,
1970                        join_set_id,
1971                        *run_expires_at,
1972                        *closing,
1973                    )?,
1974                    AppendNotifier::default(),
1975                ));
1976            }
1977        }
1978    }
1979
1980    fn append_response(
1981        tx: &Transaction,
1982        execution_id: &ExecutionId,
1983        response_outer: JoinSetResponseEventOuter,
1984    ) -> Result<AppendNotifier, DbErrorWrite> {
1985        let mut stmt = tx.prepare(
1986            "INSERT INTO t_join_set_response (execution_id, created_at, join_set_id, delay_id, child_execution_id, finished_version) \
1987                    VALUES (:execution_id, :created_at, :join_set_id, :delay_id, :child_execution_id, :finished_version)",
1988        )?;
1989        let join_set_id = &response_outer.event.join_set_id;
1990        let delay_id = match &response_outer.event.event {
1991            JoinSetResponse::DelayFinished { delay_id } => Some(delay_id.to_string()),
1992            JoinSetResponse::ChildExecutionFinished { .. } => None,
1993        };
1994        let (child_execution_id, finished_version) = match &response_outer.event.event {
1995            JoinSetResponse::ChildExecutionFinished {
1996                child_execution_id,
1997                finished_version,
1998                result: _,
1999            } => (
2000                Some(child_execution_id.to_string()),
2001                Some(finished_version.0),
2002            ),
2003            JoinSetResponse::DelayFinished { .. } => (None, None),
2004        };
2005
2006        stmt.execute(named_params! {
2007            ":execution_id": execution_id.to_string(),
2008            ":created_at": response_outer.created_at,
2009            ":join_set_id": join_set_id.to_string(),
2010            ":delay_id": delay_id,
2011            ":child_execution_id": child_execution_id,
2012            ":finished_version": finished_version,
2013        })?;
2014
2015        // if the execution is going to be unblocked by this response...
2016        let combined_state = Self::get_combined_state(tx, execution_id)?;
2017        debug!("previous_pending_state: {combined_state:?}");
2018        let mut notifier = if let PendingState::BlockedByJoinSet {
2019            join_set_id: found_join_set_id,
2020            lock_expires_at, // Set to a future time if the worker is keeping the execution warm waiting for the result.
2021            closing: _,
2022        } = combined_state.pending_state
2023            && *join_set_id == found_join_set_id
2024        {
2025            // PendingAt should be set to current time if called from expired_timers_watcher,
2026            // or to a future time if the execution is hot.
2027            let scheduled_at = max(lock_expires_at, response_outer.created_at);
2028            // TODO: Add diff test
2029            // Unblock the state.
2030            Self::update_state_pending_after_response_appended(
2031                tx,
2032                execution_id,
2033                scheduled_at,
2034                &combined_state.corresponding_version, // not changing the version
2035            )?
2036        } else {
2037            AppendNotifier::default()
2038        };
2039        if let JoinSetResponseEvent {
2040            join_set_id,
2041            event: JoinSetResponse::DelayFinished { delay_id },
2042        } = &response_outer.event
2043        {
2044            debug!(%join_set_id, %delay_id, "Deleting from `t_delay`");
2045            let mut stmt =
2046                tx.prepare_cached("DELETE FROM t_delay WHERE execution_id = :execution_id AND join_set_id = :join_set_id AND delay_id = :delay_id")
2047                ?;
2048            stmt.execute(named_params! {
2049                ":execution_id": execution_id.to_string(),
2050                ":join_set_id": join_set_id.to_string(),
2051                ":delay_id": delay_id.to_string(),
2052            })?;
2053        }
2054        notifier.response = Some((execution_id.clone(), response_outer));
2055        Ok(notifier)
2056    }
2057
2058    fn append_backtrace(
2059        tx: &Transaction,
2060        backtrace_info: &BacktraceInfo,
2061    ) -> Result<(), DbErrorWrite> {
2062        let backtrace = serde_json::to_string(&backtrace_info.wasm_backtrace).map_err(|err| {
2063            warn!(
2064                "Cannot serialize backtrace {:?} - {err:?}",
2065                backtrace_info.wasm_backtrace
2066            );
2067            DbErrorWriteNonRetriable::ValidationFailed("cannot serialize backtrace".into())
2068        })?;
2069        let mut stmt = tx
2070            .prepare(
2071                "INSERT INTO t_backtrace (execution_id, component_id, version_min_including, version_max_excluding, wasm_backtrace) \
2072                    VALUES (:execution_id, :component_id, :version_min_including, :version_max_excluding, :wasm_backtrace)",
2073            )
2074            ?;
2075        stmt.execute(named_params! {
2076            ":execution_id": backtrace_info.execution_id.to_string(),
2077            ":component_id": backtrace_info.component_id.to_string(),
2078            ":version_min_including": backtrace_info.version_min_including.0,
2079            ":version_max_excluding": backtrace_info.version_max_excluding.0,
2080            ":wasm_backtrace": backtrace,
2081        })?;
2082        Ok(())
2083    }
2084
2085    #[cfg(feature = "test")]
2086    fn get(
2087        tx: &Transaction,
2088        execution_id: &ExecutionId,
2089    ) -> Result<concepts::storage::ExecutionLog, DbErrorRead> {
2090        let mut stmt = tx.prepare(
2091            "SELECT created_at, json_value FROM t_execution_log WHERE \
2092                        execution_id = :execution_id ORDER BY version",
2093        )?;
2094        let events = stmt
2095            .query_map(
2096                named_params! {
2097                    ":execution_id": execution_id.to_string(),
2098                },
2099                |row| {
2100                    let created_at = row.get("created_at")?;
2101                    let event = row
2102                        .get::<_, JsonWrapper<ExecutionEventInner>>("json_value")
2103                        .map_err(|serde| {
2104                            error!("Cannot deserialize {row:?} - {serde:?}");
2105                            consistency_rusqlite("cannot deserialize event")
2106                        })?
2107                        .0;
2108
2109                    Ok(ExecutionEvent {
2110                        created_at,
2111                        event,
2112                        backtrace_id: None,
2113                    })
2114                },
2115            )?
2116            .collect::<Result<Vec<_>, _>>()?;
2117        if events.is_empty() {
2118            return Err(DbErrorRead::NotFound);
2119        }
2120        let combined_state = Self::get_combined_state(tx, execution_id)?;
2121        let responses = Self::list_responses(tx, execution_id, None)?
2122            .into_iter()
2123            .map(|resp| resp.event)
2124            .collect();
2125        Ok(concepts::storage::ExecutionLog {
2126            execution_id: execution_id.clone(),
2127            events,
2128            responses,
2129            next_version: combined_state.get_next_version_or_finished(), // In case of finished, this will be the already last version
2130            pending_state: combined_state.pending_state,
2131        })
2132    }
2133
2134    fn list_execution_events(
2135        tx: &Transaction,
2136        execution_id: &ExecutionId,
2137        version_min: VersionType,
2138        version_max_excluding: VersionType,
2139        include_backtrace_id: bool,
2140    ) -> Result<Vec<ExecutionEvent>, DbErrorRead> {
2141        let select = if include_backtrace_id {
2142            "SELECT
2143                log.created_at,
2144                log.json_value,
2145                -- Select version_min_including from backtrace if a match is found, otherwise NULL
2146                bt.version_min_including AS backtrace_id
2147            FROM
2148                t_execution_log AS log
2149            LEFT OUTER JOIN -- Use LEFT JOIN to keep all logs even if no backtrace matches
2150                t_backtrace AS bt ON log.execution_id = bt.execution_id
2151                                -- Check if the log's version falls within the backtrace's range
2152                                AND log.version >= bt.version_min_including
2153                                AND log.version < bt.version_max_excluding
2154            WHERE
2155                log.execution_id = :execution_id
2156                AND log.version >= :version_min
2157                AND log.version < :version_max_excluding
2158            ORDER BY
2159                log.version;"
2160        } else {
2161            "SELECT
2162                created_at, json_value, NULL as backtrace_id
2163            FROM t_execution_log WHERE
2164                execution_id = :execution_id AND version >= :version_min AND version < :version_max_excluding
2165            ORDER BY version"
2166        };
2167        tx.prepare(select)?
2168            .query_map(
2169                named_params! {
2170                    ":execution_id": execution_id.to_string(),
2171                    ":version_min": version_min,
2172                    ":version_max_excluding": version_max_excluding
2173                },
2174                |row| {
2175                    let created_at = row.get("created_at")?;
2176                    let backtrace_id = row
2177                        .get::<_, Option<VersionType>>("backtrace_id")?
2178                        .map(Version::new);
2179                    let event = row
2180                        .get::<_, JsonWrapper<ExecutionEventInner>>("json_value")
2181                        .map(|event| ExecutionEvent {
2182                            created_at,
2183                            event: event.0,
2184                            backtrace_id,
2185                        })
2186                        .map_err(|serde| {
2187                            error!("Cannot deserialize {row:?} - {serde:?}");
2188                            consistency_rusqlite("cannot deserialize")
2189                        })?;
2190                    Ok(event)
2191                },
2192            )?
2193            .collect::<Result<Vec<_>, _>>()
2194            .map_err(DbErrorRead::from)
2195    }
2196
2197    fn get_execution_event(
2198        tx: &Transaction,
2199        execution_id: &ExecutionId,
2200        version: VersionType,
2201    ) -> Result<ExecutionEvent, DbErrorRead> {
2202        let mut stmt = tx.prepare(
2203            "SELECT created_at, json_value FROM t_execution_log WHERE \
2204                        execution_id = :execution_id AND version = :version",
2205        )?;
2206        stmt.query_row(
2207            named_params! {
2208                ":execution_id": execution_id.to_string(),
2209                ":version": version,
2210            },
2211            |row| {
2212                let created_at = row.get("created_at")?;
2213                let event = row
2214                    .get::<_, JsonWrapper<ExecutionEventInner>>("json_value")
2215                    .map_err(|serde| {
2216                        error!("Cannot deserialize {row:?} - {serde:?}");
2217                        consistency_rusqlite("cannot deserialize event")
2218                    })?;
2219
2220                Ok(ExecutionEvent {
2221                    created_at,
2222                    event: event.0,
2223                    backtrace_id: None,
2224                })
2225            },
2226        )
2227        .map_err(DbErrorRead::from)
2228    }
2229
2230    fn list_responses(
2231        tx: &Transaction,
2232        execution_id: &ExecutionId,
2233        pagination: Option<Pagination<u32>>,
2234    ) -> Result<Vec<ResponseWithCursor>, DbErrorRead> {
2235        // TODO: Add test
2236        let mut params: Vec<(&'static str, Box<dyn rusqlite::ToSql>)> = vec![];
2237        let mut sql = "SELECT \
2238            r.id, r.created_at, r.join_set_id,  r.delay_id, r.child_execution_id, r.finished_version, l.json_value \
2239            FROM t_join_set_response r LEFT OUTER JOIN t_execution_log l ON r.child_execution_id = l.execution_id \
2240            WHERE \
2241            r.execution_id = :execution_id \
2242            AND ( r.finished_version = l.version OR r.child_execution_id IS NULL ) \
2243            "
2244        .to_string();
2245        let limit = match &pagination {
2246            Some(
2247                pagination @ (Pagination::NewerThan { cursor, .. }
2248                | Pagination::OlderThan { cursor, .. }),
2249            ) => {
2250                params.push((":cursor", Box::new(cursor)));
2251                write!(sql, " AND r.id {rel} :cursor", rel = pagination.rel()).unwrap();
2252                Some(pagination.length())
2253            }
2254            None => None,
2255        };
2256        sql.push_str(" ORDER BY id");
2257        if pagination.as_ref().is_some_and(Pagination::is_desc) {
2258            sql.push_str(" DESC");
2259        }
2260        if let Some(limit) = limit {
2261            write!(sql, " LIMIT {limit}").unwrap();
2262        }
2263        params.push((":execution_id", Box::new(execution_id.to_string())));
2264        tx.prepare(&sql)?
2265            .query_map::<_, &[(&'static str, &dyn ToSql)], _>(
2266                params
2267                    .iter()
2268                    .map(|(key, value)| (*key, value.as_ref()))
2269                    .collect::<Vec<_>>()
2270                    .as_ref(),
2271                Self::parse_response_with_cursor,
2272            )?
2273            .collect::<Result<Vec<_>, rusqlite::Error>>()
2274            .map_err(DbErrorRead::from)
2275    }
2276
2277    fn parse_response_with_cursor(
2278        row: &rusqlite::Row<'_>,
2279    ) -> Result<ResponseWithCursor, rusqlite::Error> {
2280        let id = row.get("id")?;
2281        let created_at: DateTime<Utc> = row.get("created_at")?;
2282        let join_set_id = row.get::<_, JoinSetId>("join_set_id")?;
2283        let event = match (
2284            row.get::<_, Option<DelayId>>("delay_id")?,
2285            row.get::<_, Option<ExecutionIdDerived>>("child_execution_id")?,
2286            row.get::<_, Option<VersionType>>("finished_version")?,
2287            row.get::<_, Option<JsonWrapper<ExecutionEventInner>>>("json_value")?,
2288        ) {
2289            (Some(delay_id), None, None, None) => JoinSetResponse::DelayFinished { delay_id },
2290            (
2291                None,
2292                Some(child_execution_id),
2293                Some(finished_version),
2294                Some(JsonWrapper(ExecutionEventInner::Finished { result, .. })),
2295            ) => JoinSetResponse::ChildExecutionFinished {
2296                child_execution_id,
2297                finished_version: Version(finished_version),
2298                result,
2299            },
2300            (delay, child, finished, result) => {
2301                error!(
2302                    "Invalid row in t_join_set_response {id} - {:?} {child:?} {finished:?} {:?}",
2303                    delay,
2304                    result.map(|it| it.0)
2305                );
2306                return Err(consistency_rusqlite("invalid row in t_join_set_response"));
2307            }
2308        };
2309        Ok(ResponseWithCursor {
2310            cursor: id,
2311            event: JoinSetResponseEventOuter {
2312                event: JoinSetResponseEvent { join_set_id, event },
2313                created_at,
2314            },
2315        })
2316    }
2317
2318    fn nth_response(
2319        tx: &Transaction,
2320        execution_id: &ExecutionId,
2321        join_set_id: &JoinSetId,
2322        skip_rows: u64,
2323    ) -> Result<Option<ResponseWithCursor>, DbErrorRead> {
2324        // TODO: Add test
2325        tx
2326            .prepare(
2327                "SELECT r.id, r.created_at, r.join_set_id, \
2328                    r.delay_id, \
2329                    r.child_execution_id, r.finished_version, l.json_value \
2330                    FROM t_join_set_response r LEFT OUTER JOIN t_execution_log l ON r.child_execution_id = l.execution_id \
2331                    WHERE \
2332                    r.execution_id = :execution_id AND r.join_set_id = :join_set_id AND \
2333                    (
2334                    r.finished_version = l.version \
2335                    OR \
2336                    r.child_execution_id IS NULL \
2337                    ) \
2338                    ORDER BY id \
2339                    LIMIT 1 OFFSET :offset",
2340            )
2341            ?
2342            .query_row(
2343                named_params! {
2344                    ":execution_id": execution_id.to_string(),
2345                    ":join_set_id": join_set_id.to_string(),
2346                    ":offset": skip_rows,
2347                },
2348                Self::parse_response_with_cursor,
2349            )
2350            .optional()
2351            .map_err(DbErrorRead::from)
2352    }
2353
2354    // TODO(perf): Instead of OFFSET an per-execution sequential ID could improve the read performance.
2355    #[instrument(level = Level::TRACE, skip_all)]
2356    fn get_responses_with_offset(
2357        tx: &Transaction,
2358        execution_id: &ExecutionId,
2359        skip_rows: usize,
2360    ) -> Result<Vec<JoinSetResponseEventOuter>, DbErrorRead> {
2361        // TODO: Add test
2362        tx.prepare(
2363            "SELECT r.id, r.created_at, r.join_set_id, \
2364            r.delay_id, \
2365            r.child_execution_id, r.finished_version, l.json_value \
2366            FROM t_join_set_response r LEFT OUTER JOIN t_execution_log l ON r.child_execution_id = l.execution_id \
2367            WHERE \
2368            r.execution_id = :execution_id AND \
2369            ( \
2370            r.finished_version = l.version \
2371            OR r.child_execution_id IS NULL \
2372            ) \
2373            ORDER BY id \
2374            LIMIT -1 OFFSET :offset",
2375        )
2376        ?
2377        .query_map(
2378            named_params! {
2379                ":execution_id": execution_id.to_string(),
2380                ":offset": skip_rows,
2381            },
2382            Self::parse_response_with_cursor,
2383        )
2384        ?
2385        .collect::<Result<Vec<_>, _>>()
2386        .map(|resp| resp.into_iter().map(|vec| vec.event).collect())
2387        .map_err(DbErrorRead::from)
2388    }
2389
2390    fn get_pending_of_single_ffqn(
2391        mut stmt: CachedStatement,
2392        batch_size: usize,
2393        pending_at_or_sooner: DateTime<Utc>,
2394        ffqn: &FunctionFqn,
2395    ) -> Result<Vec<(ExecutionId, Version)>, ()> {
2396        stmt.query_map(
2397            named_params! {
2398                ":pending_expires_finished": pending_at_or_sooner,
2399                ":ffqn": ffqn.to_string(),
2400                ":batch_size": batch_size,
2401            },
2402            |row| {
2403                let execution_id = row.get::<_, String>("execution_id")?.parse::<ExecutionId>();
2404                let next_version =
2405                    Version::new(row.get::<_, VersionType>("corresponding_version")?).increment();
2406                Ok(execution_id.map(|exe| (exe, next_version)))
2407            },
2408        )
2409        .map_err(|err| {
2410            warn!("Ignoring consistency error {err:?}");
2411        })?
2412        .collect::<Result<Vec<_>, _>>()
2413        .map_err(|err| {
2414            warn!("Ignoring consistency error {err:?}");
2415        })?
2416        .into_iter()
2417        .collect::<Result<Vec<_>, _>>()
2418        .map_err(|err| {
2419            warn!("Ignoring consistency error {err:?}");
2420        })
2421    }
2422
2423    /// Get executions and their next versions
2424    fn get_pending(
2425        conn: &Connection,
2426        batch_size: usize,
2427        pending_at_or_sooner: DateTime<Utc>,
2428        ffqns: &[FunctionFqn],
2429    ) -> Result<Vec<(ExecutionId, Version)>, DbErrorGeneric> {
2430        let mut execution_ids_versions = Vec::with_capacity(batch_size);
2431        for ffqn in ffqns {
2432            // Select executions in PendingAt.
2433            let stmt = conn
2434                .prepare_cached(&format!(
2435                    r#"
2436                    SELECT execution_id, corresponding_version FROM t_state WHERE
2437                    state = "{STATE_PENDING_AT}" AND
2438                    pending_expires_finished <= :pending_expires_finished AND ffqn = :ffqn
2439                    ORDER BY pending_expires_finished LIMIT :batch_size
2440                    "#
2441                ))
2442                .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
2443
2444            if let Ok(execs_and_versions) = Self::get_pending_of_single_ffqn(
2445                stmt,
2446                batch_size - execution_ids_versions.len(),
2447                pending_at_or_sooner,
2448                ffqn,
2449            ) {
2450                execution_ids_versions.extend(execs_and_versions);
2451                if execution_ids_versions.len() == batch_size {
2452                    // Prioritieze lowering of db requests, although ffqns later in the list might get starved.
2453                    break;
2454                }
2455                // consistency errors are ignored since we want to return at least some rows.
2456            }
2457        }
2458        Ok(execution_ids_versions)
2459    }
2460
2461    // Must be called after write transaction for a correct happens-before relationship.
2462    #[instrument(level = Level::TRACE, skip_all)]
2463    fn notify_all(&self, notifiers: Vec<AppendNotifier>, current_time: DateTime<Utc>) {
2464        let (pending_ats, finished_execs, responses) = {
2465            let (mut pending_ats, mut finished_execs, mut responses) =
2466                (Vec::new(), Vec::new(), Vec::new());
2467            for notifier in notifiers {
2468                if let Some(pending_at) = notifier.pending_at {
2469                    pending_ats.push(pending_at);
2470                }
2471                if let Some(finished) = notifier.execution_finished {
2472                    finished_execs.push(finished);
2473                }
2474                if let Some(response) = notifier.response {
2475                    responses.push(response);
2476                }
2477            }
2478            (pending_ats, finished_execs, responses)
2479        };
2480
2481        // Notify pending_at subscribers.
2482        if !pending_ats.is_empty() {
2483            let guard = self.0.pending_ffqn_subscribers.lock().unwrap();
2484            for pending_at in pending_ats {
2485                Self::notify_pending_locked(&pending_at, current_time, &guard);
2486            }
2487        }
2488        // Notify execution finished subscribers.
2489        // Every NotifierExecutionFinished value belongs to a different execution, since only `append(Finished)` can produce `NotifierExecutionFinished`.
2490        if !finished_execs.is_empty() {
2491            let mut guard = self.0.execution_finished_subscribers.lock().unwrap();
2492            for finished in finished_execs {
2493                if let Some(listeners_of_exe_id) = guard.remove(&finished.execution_id) {
2494                    for (_tag, sender) in listeners_of_exe_id {
2495                        // Sending while holding the lock but the oneshot sender does not block.
2496                        // If `wait_for_finished_result` happens after the append, it would receive the finished value instead.
2497                        let _ = sender.send(finished.retval.clone());
2498                    }
2499                }
2500            }
2501        }
2502        // Notify response subscribers.
2503        if !responses.is_empty() {
2504            let mut guard = self.0.response_subscribers.lock().unwrap();
2505            for (execution_id, response) in responses {
2506                if let Some((sender, _)) = guard.remove(&execution_id) {
2507                    let _ = sender.send(response);
2508                }
2509            }
2510        }
2511    }
2512
2513    fn notify_pending_locked(
2514        notifier: &NotifierPendingAt,
2515        current_time: DateTime<Utc>,
2516        ffqn_to_pending_subscription: &std::sync::MutexGuard<
2517            HashMap<FunctionFqn, (mpsc::Sender<()>, u64)>,
2518        >,
2519    ) {
2520        // No need to remove here, cleanup is handled by the caller.
2521        if notifier.scheduled_at <= current_time
2522            && let Some((subscription, _)) = ffqn_to_pending_subscription.get(&notifier.ffqn)
2523        {
2524            debug!("Notifying pending subscriber");
2525            // Does not block
2526            let _ = subscription.try_send(());
2527        }
2528    }
2529}
2530
2531#[async_trait]
2532impl DbExecutor for SqlitePool {
2533    #[instrument(level = Level::TRACE, skip(self))]
2534    async fn lock_pending(
2535        &self,
2536        batch_size: usize,
2537        pending_at_or_sooner: DateTime<Utc>,
2538        ffqns: Arc<[FunctionFqn]>,
2539        created_at: DateTime<Utc>,
2540        component_id: ComponentId,
2541        executor_id: ExecutorId,
2542        lock_expires_at: DateTime<Utc>,
2543        run_id: RunId,
2544        retry_config: ComponentRetryConfig,
2545    ) -> Result<LockPendingResponse, DbErrorGeneric> {
2546        let execution_ids_versions = self
2547            .transaction(
2548                move |conn| Self::get_pending(conn, batch_size, pending_at_or_sooner, &ffqns),
2549                "get_pending",
2550            )
2551            .await?;
2552        if execution_ids_versions.is_empty() {
2553            Ok(vec![])
2554        } else {
2555            debug!("Locking {execution_ids_versions:?}");
2556            self.transaction(
2557                move |tx| {
2558                    let mut locked_execs = Vec::with_capacity(execution_ids_versions.len());
2559                    // Append lock
2560                    for (execution_id, version) in &execution_ids_versions {
2561                        match Self::lock_single_execution(
2562                            tx,
2563                            created_at,
2564                            &component_id,
2565                            execution_id,
2566                            run_id,
2567                            version,
2568                            executor_id,
2569                            lock_expires_at,
2570                            retry_config,
2571                        ) {
2572                            Ok(locked) => locked_execs.push(locked),
2573                            Err(err) => {
2574                                warn!("Locking row {execution_id} failed - {err:?}");
2575                            }
2576                        }
2577                    }
2578                    Ok(locked_execs)
2579                },
2580                "lock_pending",
2581            )
2582            .await
2583        }
2584    }
2585
2586    #[instrument(level = Level::DEBUG, skip(self))]
2587    async fn lock_one(
2588        &self,
2589        created_at: DateTime<Utc>,
2590        component_id: ComponentId,
2591        execution_id: &ExecutionId,
2592        run_id: RunId,
2593        version: Version,
2594        executor_id: ExecutorId,
2595        lock_expires_at: DateTime<Utc>,
2596        retry_config: ComponentRetryConfig,
2597    ) -> Result<LockedExecution, DbErrorWrite> {
2598        debug!(%execution_id, "lock_one");
2599        let execution_id = execution_id.clone();
2600        self.transaction(
2601            move |tx| {
2602                Self::lock_single_execution(
2603                    tx,
2604                    created_at,
2605                    &component_id,
2606                    &execution_id,
2607                    run_id,
2608                    &version,
2609                    executor_id,
2610                    lock_expires_at,
2611                    retry_config,
2612                )
2613            },
2614            "lock_inner",
2615        )
2616        .await
2617    }
2618
2619    #[instrument(level = Level::DEBUG, skip(self, req))]
2620    async fn append(
2621        &self,
2622        execution_id: ExecutionId,
2623        version: Version,
2624        req: AppendRequest,
2625    ) -> Result<AppendResponse, DbErrorWrite> {
2626        debug!(%req, "append");
2627        trace!(?req, "append");
2628        let created_at = req.created_at;
2629        let (version, notifier) = self
2630            .transaction(
2631                move |tx| Self::append(tx, &execution_id, req.clone(), version.clone()),
2632                "append",
2633            )
2634            .await?;
2635        self.notify_all(vec![notifier], created_at);
2636        Ok(version)
2637    }
2638
2639    #[instrument(level = Level::DEBUG, skip_all)]
2640    async fn append_batch_respond_to_parent(
2641        &self,
2642        events: AppendEventsToExecution,
2643        response: AppendResponseToExecution,
2644        current_time: DateTime<Utc>,
2645    ) -> Result<AppendBatchResponse, DbErrorWrite> {
2646        debug!("append_batch_respond_to_parent");
2647        if events.execution_id == response.parent_execution_id {
2648            // Pending state would be wrong.
2649            // This is not a panic because it depends on DB state.
2650            return Err(DbErrorWrite::NonRetriable(
2651                DbErrorWriteNonRetriable::ValidationFailed(
2652                    "Parameters `execution_id` and `parent_execution_id` cannot be the same".into(),
2653                ),
2654            ));
2655        }
2656        if events.batch.is_empty() {
2657            error!("Batch cannot be empty");
2658            return Err(DbErrorWrite::NonRetriable(
2659                DbErrorWriteNonRetriable::ValidationFailed("batch cannot be empty".into()),
2660            ));
2661        }
2662        let (version, notifiers) = {
2663            self.transaction(
2664                move |tx| {
2665                    let mut version = events.version.clone();
2666                    let mut notifier_of_child = None;
2667                    for append_request in &events.batch {
2668                        let (v, n) = Self::append(
2669                            tx,
2670                            &events.execution_id,
2671                            append_request.clone(),
2672                            version,
2673                        )?;
2674                        version = v;
2675                        notifier_of_child = Some(n);
2676                    }
2677
2678                    let pending_at_parent = Self::append_response(
2679                        tx,
2680                        &response.parent_execution_id,
2681                        response.parent_response_event.clone(),
2682                    )?;
2683                    Ok::<_, DbErrorWrite>((
2684                        version,
2685                        vec![
2686                            notifier_of_child.expect("checked that the batch is not empty"),
2687                            pending_at_parent,
2688                        ],
2689                    ))
2690                },
2691                "append_batch_respond_to_parent",
2692            )
2693            .await?
2694        };
2695        self.notify_all(notifiers, current_time);
2696        Ok(version)
2697    }
2698
2699    // Supports only one subscriber per ffqn.
2700    // A new subscriber replaces the old one, which will eventually time out, which is fine.
2701    #[instrument(level = Level::TRACE, skip(self, timeout_fut))]
2702    async fn wait_for_pending(
2703        &self,
2704        pending_at_or_sooner: DateTime<Utc>,
2705        ffqns: Arc<[FunctionFqn]>,
2706        timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
2707    ) {
2708        let unique_tag: u64 = rand::random();
2709        let (sender, mut receiver) = mpsc::channel(1); // senders must use `try_send`
2710        {
2711            let mut ffqn_to_pending_subscription = self.0.pending_ffqn_subscribers.lock().unwrap();
2712            for ffqn in ffqns.as_ref() {
2713                ffqn_to_pending_subscription.insert(ffqn.clone(), (sender.clone(), unique_tag));
2714            }
2715        }
2716        async {
2717            let Ok(execution_ids_versions) = self
2718                .transaction(
2719                    {
2720                        let ffqns = ffqns.clone();
2721                        move |conn| Self::get_pending(conn, 1, pending_at_or_sooner, ffqns.as_ref())
2722                    },
2723                    "subscribe_to_pending",
2724                )
2725                .await
2726            else {
2727                trace!(
2728                    "Ignoring get_pending error and waiting in for timeout to avoid executor repolling too soon"
2729                );
2730                timeout_fut.await;
2731                return;
2732            };
2733            if !execution_ids_versions.is_empty() {
2734                trace!("Not waiting, database already contains new pending executions");
2735                return;
2736            }
2737            tokio::select! { // future's liveness: Dropping the loser immediately.
2738                _ = receiver.recv() => {
2739                    trace!("Received a notification");
2740                }
2741                () = timeout_fut => {
2742                }
2743            }
2744        }.await;
2745        // Clean up ffqn_to_pending_subscription in any case
2746        {
2747            let mut ffqn_to_pending_subscription = self.0.pending_ffqn_subscribers.lock().unwrap();
2748            for ffqn in ffqns.as_ref() {
2749                match ffqn_to_pending_subscription.remove(ffqn) {
2750                    Some((_, tag)) if tag == unique_tag => {
2751                        // Cleanup OK.
2752                    }
2753                    Some(other) => {
2754                        // Reinsert foreign sender.
2755                        ffqn_to_pending_subscription.insert(ffqn.clone(), other);
2756                    }
2757                    None => {
2758                        // Value was replaced and cleaned up already.
2759                    }
2760                }
2761            }
2762        }
2763    }
2764}
2765
2766#[async_trait]
2767impl DbConnection for SqlitePool {
2768    #[instrument(level = Level::DEBUG, skip_all, fields(execution_id = %req.execution_id))]
2769    async fn create(&self, req: CreateRequest) -> Result<AppendResponse, DbErrorWrite> {
2770        debug!("create");
2771        trace!(?req, "create");
2772        let created_at = req.created_at;
2773        let (version, notifier) = self
2774            .transaction(move |tx| Self::create_inner(tx, req.clone()), "create")
2775            .await?;
2776        self.notify_all(vec![notifier], created_at);
2777        Ok(version)
2778    }
2779
2780    #[instrument(level = Level::DEBUG, skip(self, batch))]
2781    async fn append_batch(
2782        &self,
2783        current_time: DateTime<Utc>,
2784        batch: Vec<AppendRequest>,
2785        execution_id: ExecutionId,
2786        version: Version,
2787    ) -> Result<AppendBatchResponse, DbErrorWrite> {
2788        debug!("append_batch");
2789        trace!(?batch, "append_batch");
2790        assert!(!batch.is_empty(), "Empty batch request");
2791
2792        let (version, notifier) = self
2793            .transaction(
2794                move |tx| {
2795                    let mut version = version.clone();
2796                    let mut notifier = None;
2797                    for append_request in &batch {
2798                        let (v, n) =
2799                            Self::append(tx, &execution_id, append_request.clone(), version)?;
2800                        version = v;
2801                        notifier = Some(n);
2802                    }
2803                    Ok::<_, DbErrorWrite>((
2804                        version,
2805                        notifier.expect("checked that the batch is not empty"),
2806                    ))
2807                },
2808                "append_batch",
2809            )
2810            .await?;
2811
2812        self.notify_all(vec![notifier], current_time);
2813        Ok(version)
2814    }
2815
2816    #[instrument(level = Level::DEBUG, skip(self, batch, child_req))]
2817    async fn append_batch_create_new_execution(
2818        &self,
2819        current_time: DateTime<Utc>,
2820        batch: Vec<AppendRequest>,
2821        execution_id: ExecutionId,
2822        version: Version,
2823        child_req: Vec<CreateRequest>,
2824    ) -> Result<AppendBatchResponse, DbErrorWrite> {
2825        debug!("append_batch_create_new_execution");
2826        trace!(?batch, ?child_req, "append_batch_create_new_execution");
2827        assert!(!batch.is_empty(), "Empty batch request");
2828
2829        let (version, notifiers) = self
2830            .transaction(
2831                move |tx| {
2832                    let mut notifier = None;
2833                    let mut version = version.clone();
2834                    for append_request in &batch {
2835                        let (v, n) =
2836                            Self::append(tx, &execution_id, append_request.clone(), version)?;
2837                        version = v;
2838                        notifier = Some(n);
2839                    }
2840                    let mut notifiers = Vec::new();
2841                    notifiers.push(notifier.expect("checked that the batch is not empty"));
2842
2843                    for child_req in &child_req {
2844                        let (_, notifier) = Self::create_inner(tx, child_req.clone())?;
2845                        notifiers.push(notifier);
2846                    }
2847                    Ok::<_, DbErrorWrite>((version, notifiers))
2848                },
2849                "append_batch_create_new_execution_inner",
2850            )
2851            .await?;
2852        self.notify_all(notifiers, current_time);
2853        Ok(version)
2854    }
2855
2856    #[cfg(feature = "test")]
2857    #[instrument(level = Level::DEBUG, skip(self))]
2858    async fn get(
2859        &self,
2860        execution_id: &ExecutionId,
2861    ) -> Result<concepts::storage::ExecutionLog, DbErrorRead> {
2862        trace!("get");
2863        let execution_id = execution_id.clone();
2864        self.transaction(move |tx| Self::get(tx, &execution_id), "get")
2865            .await
2866    }
2867
2868    #[instrument(level = Level::DEBUG, skip(self))]
2869    async fn list_execution_events(
2870        &self,
2871        execution_id: &ExecutionId,
2872        since: &Version,
2873        max_length: VersionType,
2874        include_backtrace_id: bool,
2875    ) -> Result<Vec<ExecutionEvent>, DbErrorRead> {
2876        let execution_id = execution_id.clone();
2877        let since = since.0;
2878        self.transaction(
2879            move |tx| {
2880                Self::list_execution_events(
2881                    tx,
2882                    &execution_id,
2883                    since,
2884                    since + max_length,
2885                    include_backtrace_id,
2886                )
2887            },
2888            "get",
2889        )
2890        .await
2891    }
2892
2893    // Supports only one subscriber per execution id.
2894    // A new call will overwrite the old subscriber, the old one will end
2895    // with a timeout, which is fine.
2896    #[instrument(level = Level::DEBUG, skip(self, timeout_fut))]
2897    async fn subscribe_to_next_responses(
2898        &self,
2899        execution_id: &ExecutionId,
2900        start_idx: usize,
2901        timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
2902    ) -> Result<Vec<JoinSetResponseEventOuter>, DbErrorReadWithTimeout> {
2903        debug!("next_responses");
2904        let unique_tag: u64 = rand::random();
2905        let execution_id = execution_id.clone();
2906
2907        let cleanup = || {
2908            let mut guard = self.0.response_subscribers.lock().unwrap();
2909            match guard.remove(&execution_id) {
2910                Some((_, tag)) if tag == unique_tag => {} // Cleanup OK.
2911                Some(other) => {
2912                    // Reinsert foreign sender.
2913                    guard.insert(execution_id.clone(), other);
2914                }
2915                None => {} // Value was replaced and cleaned up already, or notification was sent.
2916            }
2917        };
2918
2919        let response_subscribers = self.0.response_subscribers.clone();
2920        let resp_or_receiver = {
2921            let execution_id = execution_id.clone();
2922            self.transaction(
2923                move |tx| {
2924                    let responses = Self::get_responses_with_offset(tx, &execution_id, start_idx)?;
2925                    if responses.is_empty() {
2926                        // cannot race as we have the transaction write lock
2927                        let (sender, receiver) = oneshot::channel();
2928                        response_subscribers
2929                            .lock()
2930                            .unwrap()
2931                            .insert(execution_id.clone(), (sender, unique_tag));
2932                        Ok::<_, DbErrorReadWithTimeout>(itertools::Either::Right(receiver))
2933                    } else {
2934                        Ok(itertools::Either::Left(responses))
2935                    }
2936                },
2937                "subscribe_to_next_responses",
2938            )
2939            .await
2940        }
2941        .inspect_err(|_| {
2942            cleanup();
2943        })?;
2944        match resp_or_receiver {
2945            itertools::Either::Left(resp) => Ok(resp), // no need for cleanup
2946            itertools::Either::Right(receiver) => {
2947                let res = async move {
2948                    tokio::select! {
2949                        resp = receiver => {
2950                            let resp = resp.map_err(|_| DbErrorGeneric::Close)?;
2951                            Ok(vec![resp])
2952                        }
2953                        () = timeout_fut => Err(DbErrorReadWithTimeout::Timeout),
2954                    }
2955                }
2956                .await;
2957                cleanup();
2958                res
2959            }
2960        }
2961    }
2962
2963    // Supports multiple subscribers.
2964    async fn wait_for_finished_result(
2965        &self,
2966        execution_id: &ExecutionId,
2967        timeout_fut: Option<Pin<Box<dyn Future<Output = ()> + Send>>>,
2968    ) -> Result<SupportedFunctionReturnValue, DbErrorReadWithTimeout> {
2969        let unique_tag: u64 = rand::random();
2970        let execution_id = execution_id.clone();
2971        let execution_finished_subscription = self.0.execution_finished_subscribers.clone();
2972
2973        let cleanup = || {
2974            let mut guard = self.0.execution_finished_subscribers.lock().unwrap();
2975            if let Some(subscribers) = guard.get_mut(&execution_id) {
2976                subscribers.remove(&unique_tag);
2977            }
2978        };
2979
2980        let resp_or_receiver = {
2981            let execution_id = execution_id.clone();
2982            self.transaction(move |tx| {
2983                let pending_state =
2984                    Self::get_combined_state(tx, &execution_id)?.pending_state;
2985                if let PendingState::Finished { finished } = pending_state {
2986                    let event =
2987                        Self::get_execution_event(tx, &execution_id, finished.version)?;
2988                    if let ExecutionEventInner::Finished { result, ..} = event.event {
2989                        Ok(itertools::Either::Left(result))
2990                    } else {
2991                        error!("Mismatch, expected Finished row: {event:?} based on t_state {finished}");
2992                        Err(DbErrorReadWithTimeout::from(consistency_db_err(
2993                            "cannot get finished event based on t_state version"
2994                        )))
2995                    }
2996                } else {
2997                    // Cannot race with the notifier as we have the transaction write lock:
2998                    // Either the finished event was appended previously, thus `itertools::Either::Left` was selected,
2999                    // or we end up here. If this tx fails, the cleanup will remove this entry.
3000                    let (sender, receiver) = oneshot::channel();
3001                    let mut guard = execution_finished_subscription.lock().unwrap();
3002                    guard.entry(execution_id.clone()).or_default().insert(unique_tag, sender);
3003                    Ok(itertools::Either::Right(receiver))
3004                }
3005            }, "wait_for_finished_result")
3006            .await
3007        }
3008        .inspect_err(|_| {
3009            // This cleanup can race with the notification sender, since both are running after a transaction was finished.
3010            // If the notification sender wins, it removes our oneshot sender and puts a value in it, cleanup will not find the unique tag.
3011            // If cleanup wins, it simply removes the oneshot sender.
3012            cleanup();
3013        })?;
3014
3015        let timeout_fut = timeout_fut.unwrap_or_else(|| Box::pin(std::future::pending()));
3016        match resp_or_receiver {
3017            itertools::Either::Left(resp) => Ok(resp), // no need for cleanup
3018            itertools::Either::Right(receiver) => {
3019                let res = async move {
3020                    tokio::select! {
3021                        resp = receiver => {
3022                            Ok(resp.expect("the notifier sends to all listeners, cannot race with cleanup"))
3023                        }
3024                        () = timeout_fut => Err(DbErrorReadWithTimeout::Timeout),
3025                    }
3026                }
3027                .await;
3028                cleanup();
3029                res
3030            }
3031        }
3032    }
3033
3034    #[instrument(level = Level::DEBUG, skip(self, response_event), fields(join_set_id = %response_event.join_set_id))]
3035    async fn append_response(
3036        &self,
3037        created_at: DateTime<Utc>,
3038        execution_id: ExecutionId,
3039        response_event: JoinSetResponseEvent,
3040    ) -> Result<(), DbErrorWrite> {
3041        debug!("append_response");
3042        let event = JoinSetResponseEventOuter {
3043            created_at,
3044            event: response_event,
3045        };
3046        let notifier = self
3047            .transaction(
3048                move |tx| Self::append_response(tx, &execution_id, event.clone()),
3049                "append_response",
3050            )
3051            .await?;
3052        self.notify_all(vec![notifier], created_at);
3053        Ok(())
3054    }
3055
3056    #[instrument(level = Level::DEBUG, skip_all)]
3057    async fn append_backtrace(&self, append: BacktraceInfo) -> Result<(), DbErrorWrite> {
3058        debug!("append_backtrace");
3059        self.transaction(
3060            move |tx| Self::append_backtrace(tx, &append),
3061            "append_backtrace",
3062        )
3063        .await
3064    }
3065
3066    #[instrument(level = Level::DEBUG, skip_all)]
3067    async fn append_backtrace_batch(&self, batch: Vec<BacktraceInfo>) -> Result<(), DbErrorWrite> {
3068        debug!("append_backtrace_batch");
3069        self.transaction(
3070            move |tx| {
3071                for append in &batch {
3072                    Self::append_backtrace(tx, append)?;
3073                }
3074                Ok(())
3075            },
3076            "append_backtrace",
3077        )
3078        .await
3079    }
3080
3081    #[instrument(level = Level::DEBUG, skip_all)]
3082    async fn get_backtrace(
3083        &self,
3084        execution_id: &ExecutionId,
3085        filter: BacktraceFilter,
3086    ) -> Result<BacktraceInfo, DbErrorRead> {
3087        debug!("get_last_backtrace");
3088        let execution_id = execution_id.clone();
3089
3090        self.transaction(
3091            move |tx| {
3092                let select = "SELECT component_id, version_min_including, version_max_excluding, wasm_backtrace FROM t_backtrace \
3093                                WHERE execution_id = :execution_id";
3094                let mut params: Vec<(&'static str, Box<dyn rusqlite::ToSql>)> = vec![(":execution_id", Box::new(execution_id.to_string()))];
3095                let select = match &filter {
3096                    BacktraceFilter::Specific(version) =>{
3097                        params.push((":version", Box::new(version.0)));
3098                        format!("{select} AND version_min_including <= :version AND version_max_excluding > :version")
3099                    },
3100                    BacktraceFilter::First => format!("{select} ORDER BY version_min_including LIMIT 1"),
3101                    BacktraceFilter::Last => format!("{select} ORDER BY version_min_including DESC LIMIT 1")
3102                };
3103                tx
3104                    .prepare(&select)
3105                    ?
3106                    .query_row::<_, &[(&'static str, &dyn ToSql)], _>(
3107                        params
3108                            .iter()
3109                            .map(|(key, value)| (*key, value.as_ref()))
3110                            .collect::<Vec<_>>()
3111                            .as_ref(),
3112                    |row| {
3113                        Ok(BacktraceInfo {
3114                            execution_id: execution_id.clone(),
3115                            component_id: row.get::<_, FromStrWrapper<_> >("component_id")?.0,
3116                            version_min_including: Version::new(row.get::<_, VersionType>("version_min_including")?),
3117                            version_max_excluding: Version::new(row.get::<_, VersionType>("version_max_excluding")?),
3118                            wasm_backtrace: row.get::<_, JsonWrapper<_>>("wasm_backtrace")?.0,
3119                        })
3120                    },
3121                ).map_err(DbErrorRead::from)
3122            },
3123            "get_last_backtrace",
3124        ).await
3125    }
3126
3127    /// Get currently expired delays and locks.
3128    #[instrument(level = Level::TRACE, skip(self))]
3129    async fn get_expired_timers(
3130        &self,
3131        at: DateTime<Utc>,
3132    ) -> Result<Vec<ExpiredTimer>, DbErrorGeneric> {
3133        self.transaction(
3134            move |conn| {
3135                let mut expired_timers = conn.prepare(
3136                    "SELECT execution_id, join_set_id, delay_id FROM t_delay WHERE expires_at <= :at",
3137                )?
3138                .query_map(
3139                        named_params! {
3140                            ":at": at,
3141                        },
3142                        |row| {
3143                            let execution_id = row.get("execution_id")?;
3144                            let join_set_id = row.get::<_, JoinSetId>("join_set_id")?;
3145                            let delay_id = row.get::<_, DelayId>("delay_id")?;
3146                            let delay = ExpiredDelay { execution_id, join_set_id, delay_id };
3147                            Ok(ExpiredTimer::Delay(delay))
3148                        },
3149                    )?
3150                    .collect::<Result<Vec<_>, _>>()?;
3151                // Extend with expired locks
3152                let expired = conn.prepare(&format!(r#"
3153                    SELECT execution_id, last_lock_version, corresponding_version, intermittent_event_count, max_retries, retry_exp_backoff_millis,
3154                    executor_id, run_id
3155                    FROM t_state
3156                    WHERE pending_expires_finished <= :at AND state = "{STATE_LOCKED}"
3157                    "#
3158                )
3159                )?
3160                .query_map(
3161                        named_params! {
3162                            ":at": at,
3163                        },
3164                        |row| {
3165                            let execution_id = row.get("execution_id")?;
3166                            let locked_at_version = Version::new(row.get("last_lock_version")?);
3167                            let next_version = Version::new(row.get("corresponding_version")?).increment();
3168                            let intermittent_event_count = row.get("intermittent_event_count")?;
3169                            let max_retries = row.get("max_retries")?;
3170                            let retry_exp_backoff_millis = row.get("retry_exp_backoff_millis")?;
3171                            let executor_id = row.get("executor_id")?;
3172                            let run_id = row.get("run_id")?;
3173                            let lock = ExpiredLock {
3174                                execution_id,
3175                                locked_at_version,
3176                                next_version,
3177                                intermittent_event_count,
3178                                max_retries,
3179                                retry_exp_backoff: Duration::from_millis(retry_exp_backoff_millis),
3180                                locked_by: LockedBy { executor_id, run_id },
3181                            };
3182                            Ok(ExpiredTimer::Lock(lock))
3183                        }
3184                    )?
3185                    .collect::<Result<Vec<_>, _>>()?;
3186                expired_timers.extend(expired);
3187                if !expired_timers.is_empty() {
3188                    debug!("get_expired_timers found {expired_timers:?}");
3189                }
3190                Ok(expired_timers)
3191            }, "get_expired_timers"
3192        )
3193        .await
3194    }
3195
3196    async fn get_execution_event(
3197        &self,
3198        execution_id: &ExecutionId,
3199        version: &Version,
3200    ) -> Result<ExecutionEvent, DbErrorRead> {
3201        let version = version.0;
3202        let execution_id = execution_id.clone();
3203        self.transaction(
3204            move |tx| Self::get_execution_event(tx, &execution_id, version),
3205            "get_execution_event",
3206        )
3207        .await
3208    }
3209
3210    async fn get_pending_state(
3211        &self,
3212        execution_id: &ExecutionId,
3213    ) -> Result<PendingState, DbErrorRead> {
3214        let execution_id = execution_id.clone();
3215        Ok(self
3216            .transaction(
3217                move |tx| Self::get_combined_state(tx, &execution_id),
3218                "get_pending_state",
3219            )
3220            .await?
3221            .pending_state)
3222    }
3223
3224    async fn list_executions(
3225        &self,
3226        ffqn: Option<FunctionFqn>,
3227        top_level_only: bool,
3228        pagination: ExecutionListPagination,
3229    ) -> Result<Vec<ExecutionWithState>, DbErrorGeneric> {
3230        self.transaction(
3231            move |tx| Self::list_executions(tx, ffqn.as_ref(), top_level_only, &pagination),
3232            "list_executions",
3233        )
3234        .await
3235    }
3236
3237    async fn list_responses(
3238        &self,
3239        execution_id: &ExecutionId,
3240        pagination: Pagination<u32>,
3241    ) -> Result<Vec<ResponseWithCursor>, DbErrorRead> {
3242        let execution_id = execution_id.clone();
3243        self.transaction(
3244            move |tx| Self::list_responses(tx, &execution_id, Some(pagination)),
3245            "list_executions",
3246        )
3247        .await
3248    }
3249}
3250
3251#[cfg(any(test, feature = "tempfile"))]
3252pub mod tempfile {
3253    use super::{SqliteConfig, SqlitePool};
3254    use tempfile::NamedTempFile;
3255
3256    pub async fn sqlite_pool() -> (SqlitePool, Option<NamedTempFile>) {
3257        if let Ok(path) = std::env::var("SQLITE_FILE") {
3258            (
3259                SqlitePool::new(path, SqliteConfig::default())
3260                    .await
3261                    .unwrap(),
3262                None,
3263            )
3264        } else {
3265            let file = NamedTempFile::new().unwrap();
3266            let path = file.path();
3267            (
3268                SqlitePool::new(path, SqliteConfig::default())
3269                    .await
3270                    .unwrap(),
3271                Some(file),
3272            )
3273        }
3274    }
3275}