obeli_sk_db_sqlite/
sqlite_dao.rs

1use crate::histograms::Histograms;
2use async_trait::async_trait;
3use chrono::{DateTime, Utc};
4use concepts::{
5    ComponentId, ComponentRetryConfig, ExecutionId, FunctionFqn, JoinSetId, StrVariant,
6    SupportedFunctionReturnValue,
7    prefixed_ulid::{DelayId, ExecutionIdDerived, ExecutorId, RunId},
8    storage::{
9        AppendBatchResponse, AppendEventsToExecution, AppendRequest, AppendResponse,
10        AppendResponseToExecution, BacktraceFilter, BacktraceInfo, CreateRequest, DUMMY_CREATED,
11        DUMMY_HISTORY_EVENT, DbConnection, DbErrorGeneric, DbErrorRead, DbErrorReadWithTimeout,
12        DbErrorWrite, DbErrorWriteNonRetriable, DbExecutor, DbPool, DbPoolCloseable,
13        ExecutionEvent, ExecutionEventInner, ExecutionListPagination, ExecutionWithState,
14        ExpiredDelay, ExpiredLock, ExpiredTimer, HistoryEvent, JoinSetRequest, JoinSetResponse,
15        JoinSetResponseEvent, JoinSetResponseEventOuter, LockPendingResponse, Locked, LockedBy,
16        LockedExecution, Pagination, PendingState, PendingStateFinished,
17        PendingStateFinishedResultKind, PendingStateLocked, ResponseWithCursor, Version,
18        VersionType,
19    },
20};
21use conversions::{FromStrWrapper, JsonWrapper, consistency_db_err, consistency_rusqlite};
22use hashbrown::HashMap;
23use rusqlite::{
24    CachedStatement, Connection, OpenFlags, OptionalExtension, Params, ToSql, Transaction,
25    TransactionBehavior, named_params, types::ToSqlOutput,
26};
27use std::{
28    cmp::max,
29    collections::VecDeque,
30    fmt::Debug,
31    ops::DerefMut,
32    path::Path,
33    str::FromStr,
34    sync::{
35        Arc, Mutex,
36        atomic::{AtomicBool, Ordering},
37    },
38    time::Duration,
39};
40use std::{fmt::Write as _, pin::Pin};
41use tokio::{
42    sync::{mpsc, oneshot},
43    time::Instant,
44};
45use tracing::{Level, debug, error, info, instrument, trace, warn};
46
47#[derive(Debug, thiserror::Error)]
48#[error("initialization error")]
49pub struct InitializationError;
50
51#[derive(Debug, Clone)]
52struct DelayReq {
53    join_set_id: JoinSetId,
54    delay_id: DelayId,
55    expires_at: DateTime<Utc>,
56}
57/*
58mmap_size = 128MB - Set the global memory map so all processes can share some data
59https://www.sqlite.org/pragma.html#pragma_mmap_size
60https://www.sqlite.org/mmap.html
61
62journal_size_limit = 64 MB - limit on the WAL file to prevent unlimited growth
63https://www.sqlite.org/pragma.html#pragma_journal_size_limit
64
65Inspired by https://github.com/rails/rails/pull/49349
66*/
67
68const PRAGMA: [[&str; 2]; 10] = [
69    ["journal_mode", "wal"],
70    ["synchronous", "FULL"],
71    ["foreign_keys", "true"],
72    ["busy_timeout", "1000"],
73    ["cache_size", "10000"], // number of pages
74    ["temp_store", "MEMORY"],
75    ["page_size", "8192"], // 8 KB
76    ["mmap_size", "134217728"],
77    ["journal_size_limit", "67108864"],
78    ["integrity_check", ""],
79];
80
81// Append only
82const CREATE_TABLE_T_METADATA: &str = r"
83CREATE TABLE IF NOT EXISTS t_metadata (
84    id INTEGER PRIMARY KEY AUTOINCREMENT,
85    schema_version INTEGER NOT NULL,
86    created_at TEXT NOT NULL
87) STRICT
88";
89const T_METADATA_EXPECTED_SCHEMA_VERSION: u32 = 2;
90
91/// Stores execution history. Append only.
92const CREATE_TABLE_T_EXECUTION_LOG: &str = r"
93CREATE TABLE IF NOT EXISTS t_execution_log (
94    execution_id TEXT NOT NULL,
95    created_at TEXT NOT NULL,
96    json_value TEXT NOT NULL,
97    version INTEGER NOT NULL,
98    variant TEXT NOT NULL,
99    join_set_id TEXT,
100    history_event_type TEXT GENERATED ALWAYS AS (json_value->>'$.HistoryEvent.event.type') STORED,
101    PRIMARY KEY (execution_id, version)
102) STRICT
103";
104// Used in `fetch_created` and `get_execution_event`
105const CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_VERSION: &str = r"
106CREATE INDEX IF NOT EXISTS idx_t_execution_log_execution_id_version  ON t_execution_log (execution_id, version);
107";
108// Used in `lock_inner` to filter by execution ID and variant (created or event history)
109const CREATE_INDEX_IDX_T_EXECUTION_ID_EXECUTION_ID_VARIANT: &str = r"
110CREATE INDEX IF NOT EXISTS idx_t_execution_log_execution_id_variant  ON t_execution_log (execution_id, variant);
111";
112
113/// Stores child execution return values for the parent (`execution_id`). Append only.
114/// For `JoinSetResponse::DelayFinished`, column `delay_id` must not be null.
115/// For `JoinSetResponse::ChildExecutionFinished`, column `child_execution_id`,`finished_version`
116/// must not be null.
117const CREATE_TABLE_T_JOIN_SET_RESPONSE: &str = r"
118CREATE TABLE IF NOT EXISTS t_join_set_response (
119    id INTEGER PRIMARY KEY AUTOINCREMENT,
120    created_at TEXT NOT NULL,
121    execution_id TEXT NOT NULL,
122    join_set_id TEXT NOT NULL,
123
124    delay_id TEXT,
125
126    child_execution_id TEXT,
127    finished_version INTEGER,
128
129    UNIQUE (execution_id, join_set_id, delay_id, child_execution_id)
130) STRICT
131";
132// Used when querying for the next response
133const CREATE_INDEX_IDX_T_JOIN_SET_RESPONSE_EXECUTION_ID_ID: &str = r"
134CREATE INDEX IF NOT EXISTS idx_t_join_set_response_execution_id_id ON t_join_set_response (execution_id, id);
135";
136
137/// Stores executions in `PendingState`
138/// `state` to column mapping:
139/// `PendingAt`:            (nothing but required columns), optionally contains all `Locked` columns.
140/// `Locked`:               `max_retries`, `retry_exp_backoff_millis`, `last_lock_version`, `executor_id`, `run_id`
141/// `BlockedByJoinSet`:     `join_set_id`, `join_set_closing`
142/// `Finished` :            `result_kind`.
143const CREATE_TABLE_T_STATE: &str = r"
144CREATE TABLE IF NOT EXISTS t_state (
145    execution_id TEXT NOT NULL,
146    is_top_level INTEGER NOT NULL,
147    corresponding_version INTEGER NOT NULL,
148    pending_expires_finished TEXT NOT NULL,
149    ffqn TEXT NOT NULL,
150    state TEXT NOT NULL,
151    created_at TEXT NOT NULL,
152    updated_at TEXT NOT NULL,
153    scheduled_at TEXT NOT NULL,
154    intermittent_event_count INTEGER NOT NULL,
155
156    max_retries INTEGER,
157    retry_exp_backoff_millis INTEGER,
158    last_lock_version INTEGER,
159    executor_id TEXT,
160    run_id TEXT,
161
162    join_set_id TEXT,
163    join_set_closing INTEGER,
164
165    result_kind TEXT,
166
167    PRIMARY KEY (execution_id)
168) STRICT
169";
170const STATE_PENDING_AT: &str = "PendingAt";
171const STATE_BLOCKED_BY_JOIN_SET: &str = "BlockedByJoinSet";
172const STATE_LOCKED: &str = "Locked";
173const STATE_FINISHED: &str = "Finished";
174const HISTORY_EVENT_TYPE_JOIN_NEXT: &str = "JoinNext";
175
176// TODO: partial indexes
177const IDX_T_STATE_LOCK_PENDING: &str = r"
178CREATE INDEX IF NOT EXISTS idx_t_state_lock_pending ON t_state (state, pending_expires_finished, ffqn);
179";
180const IDX_T_STATE_EXPIRED_TIMERS: &str = r"
181CREATE INDEX IF NOT EXISTS idx_t_state_expired_timers ON t_state (pending_expires_finished) WHERE executor_id IS NOT NULL;
182";
183const IDX_T_STATE_EXECUTION_ID_IS_TOP_LEVEL: &str = r"
184CREATE INDEX IF NOT EXISTS idx_t_state_execution_id_is_root ON t_state (execution_id, is_top_level);
185";
186// For `list_executions` by ffqn
187const IDX_T_STATE_FFQN: &str = r"
188CREATE INDEX IF NOT EXISTS idx_t_state_ffqn ON t_state (ffqn);
189";
190// For `list_executions` by creation date
191const IDX_T_STATE_CREATED_AT: &str = r"
192CREATE INDEX IF NOT EXISTS idx_t_state_created_at ON t_state (created_at);
193";
194
195/// Represents [`ExpiredTimer::AsyncDelay`] . Rows are deleted when the delay is processed.
196const CREATE_TABLE_T_DELAY: &str = r"
197CREATE TABLE IF NOT EXISTS t_delay (
198    execution_id TEXT NOT NULL,
199    join_set_id TEXT NOT NULL,
200    delay_id TEXT NOT NULL,
201    expires_at TEXT NOT NULL,
202    PRIMARY KEY (execution_id, join_set_id, delay_id)
203) STRICT
204";
205
206// Append only.
207const CREATE_TABLE_T_BACKTRACE: &str = r"
208CREATE TABLE IF NOT EXISTS t_backtrace (
209    execution_id TEXT NOT NULL,
210    component_id TEXT NOT NULL,
211    version_min_including INTEGER NOT NULL,
212    version_max_excluding INTEGER NOT NULL,
213    wasm_backtrace TEXT NOT NULL,
214    PRIMARY KEY (execution_id, version_min_including, version_max_excluding)
215) STRICT
216";
217// Index for searching backtraces by execution_id and version
218const IDX_T_BACKTRACE_EXECUTION_ID_VERSION: &str = r"
219CREATE INDEX IF NOT EXISTS idx_t_backtrace_execution_id_version ON t_backtrace (execution_id, version_min_including, version_max_excluding);
220";
221
222#[derive(Debug, thiserror::Error, Clone)]
223enum RusqliteError {
224    #[error("not found")]
225    NotFound,
226    #[error("generic: {0}")]
227    Generic(StrVariant),
228}
229
230mod conversions {
231
232    use super::RusqliteError;
233    use concepts::storage::{DbErrorGeneric, DbErrorRead, DbErrorReadWithTimeout, DbErrorWrite};
234    use rusqlite::types::{FromSql, FromSqlError};
235    use std::{fmt::Debug, str::FromStr};
236    use tracing::error;
237
238    impl From<rusqlite::Error> for RusqliteError {
239        fn from(err: rusqlite::Error) -> Self {
240            if matches!(err, rusqlite::Error::QueryReturnedNoRows) {
241                RusqliteError::NotFound
242            } else {
243                error!(backtrace = %std::backtrace::Backtrace::capture(), "Sqlite error {err:?}");
244                RusqliteError::Generic(err.to_string().into())
245            }
246        }
247    }
248
249    impl From<RusqliteError> for DbErrorGeneric {
250        fn from(err: RusqliteError) -> DbErrorGeneric {
251            match err {
252                RusqliteError::NotFound => DbErrorGeneric::Uncategorized("not found".into()),
253                RusqliteError::Generic(str) => DbErrorGeneric::Uncategorized(str),
254            }
255        }
256    }
257    impl From<RusqliteError> for DbErrorRead {
258        fn from(err: RusqliteError) -> Self {
259            if matches!(err, RusqliteError::NotFound) {
260                Self::NotFound
261            } else {
262                Self::from(DbErrorGeneric::from(err))
263            }
264        }
265    }
266    impl From<RusqliteError> for DbErrorReadWithTimeout {
267        fn from(err: RusqliteError) -> Self {
268            Self::from(DbErrorRead::from(err))
269        }
270    }
271    impl From<RusqliteError> for DbErrorWrite {
272        fn from(err: RusqliteError) -> Self {
273            if matches!(err, RusqliteError::NotFound) {
274                Self::NotFound
275            } else {
276                Self::from(DbErrorGeneric::from(err))
277            }
278        }
279    }
280
281    pub(crate) struct JsonWrapper<T>(pub(crate) T);
282    impl<T: serde::de::DeserializeOwned + 'static + Debug> FromSql for JsonWrapper<T> {
283        fn column_result(
284            value: rusqlite::types::ValueRef<'_>,
285        ) -> rusqlite::types::FromSqlResult<Self> {
286            let value = match value {
287                rusqlite::types::ValueRef::Text(value) | rusqlite::types::ValueRef::Blob(value) => {
288                    Ok(value)
289                }
290                other => {
291                    error!(
292                        backtrace = %std::backtrace::Backtrace::capture(),
293                        "Unexpected type when conveting to JSON - expected Text or Blob, got type `{other:?}`",
294                    );
295                    Err(FromSqlError::InvalidType)
296                }
297            }?;
298            let value = serde_json::from_slice::<T>(value).map_err(|err| {
299                error!(
300                    backtrace = %std::backtrace::Backtrace::capture(),
301                    "Cannot convert JSON value `{value:?}` to type:`{type}` - {err:?}",
302                    r#type = std::any::type_name::<T>()
303                );
304                FromSqlError::InvalidType
305            })?;
306            Ok(Self(value))
307        }
308    }
309
310    pub(crate) struct FromStrWrapper<T: FromStr>(pub(crate) T);
311    impl<T: FromStr<Err = D>, D: Debug> FromSql for FromStrWrapper<T> {
312        fn column_result(
313            value: rusqlite::types::ValueRef<'_>,
314        ) -> rusqlite::types::FromSqlResult<Self> {
315            let value = String::column_result(value)?;
316            let value = T::from_str(&value).map_err(|err| {
317                error!(
318                    backtrace = %std::backtrace::Backtrace::capture(),
319                    "Cannot convert string `{value}` to type:`{type}` - {err:?}",
320                    r#type = std::any::type_name::<T>()
321                );
322                FromSqlError::InvalidType
323            })?;
324            Ok(Self(value))
325        }
326    }
327
328    // Used as a wrapper for `FromSqlError::Other`
329    #[derive(Debug, thiserror::Error)]
330    #[error("{0}")]
331    pub(crate) struct OtherError(&'static str);
332    pub(crate) fn consistency_rusqlite(input: &'static str) -> rusqlite::Error {
333        FromSqlError::other(OtherError(input)).into()
334    }
335
336    pub(crate) fn consistency_db_err(input: &'static str) -> DbErrorGeneric {
337        DbErrorGeneric::Uncategorized(input.into())
338    }
339}
340
341#[derive(Debug)]
342struct CombinedStateDTO {
343    state: String,
344    ffqn: String,
345    pending_expires_finished: DateTime<Utc>,
346    // Locked:
347    last_lock_version: Option<Version>,
348    executor_id: Option<ExecutorId>,
349    run_id: Option<RunId>,
350    // Blocked by join set:
351    join_set_id: Option<JoinSetId>,
352    join_set_closing: Option<bool>,
353    // Finished:
354    result_kind: Option<PendingStateFinishedResultKind>,
355}
356#[derive(Debug)]
357struct CombinedState {
358    ffqn: FunctionFqn,
359    pending_state: PendingState,
360    corresponding_version: Version,
361}
362impl CombinedState {
363    fn get_next_version_assert_not_finished(&self) -> Version {
364        assert!(!self.pending_state.is_finished());
365        self.corresponding_version.increment()
366    }
367
368    #[cfg(feature = "test")]
369    fn get_next_version_or_finished(&self) -> Version {
370        if self.pending_state.is_finished() {
371            self.corresponding_version.clone()
372        } else {
373            self.corresponding_version.increment()
374        }
375    }
376}
377
378#[derive(Debug)]
379struct NotifierPendingAt {
380    scheduled_at: DateTime<Utc>,
381    ffqn: FunctionFqn,
382}
383
384#[derive(Debug)]
385struct NotifierExecutionFinished {
386    execution_id: ExecutionId,
387    retval: SupportedFunctionReturnValue,
388}
389
390#[derive(Debug, Default)]
391struct AppendNotifier {
392    pending_at: Option<NotifierPendingAt>,
393    execution_finished: Option<NotifierExecutionFinished>,
394    response: Option<(ExecutionId, JoinSetResponseEventOuter)>,
395}
396
397impl CombinedState {
398    fn new(
399        dto: &CombinedStateDTO,
400        corresponding_version: Version,
401    ) -> Result<Self, rusqlite::Error> {
402        let pending_state = match dto {
403            // Pending - just created
404            CombinedStateDTO {
405                state,
406                ffqn: _,
407                pending_expires_finished: scheduled_at,
408                last_lock_version: None,
409                executor_id: None,
410                run_id: None,
411                join_set_id: None,
412                join_set_closing: None,
413                result_kind: None,
414            } if state == STATE_PENDING_AT => Ok(PendingState::PendingAt {
415                scheduled_at: *scheduled_at,
416                last_lock: None,
417            }),
418            // Pending, previously locked
419            CombinedStateDTO {
420                state,
421                ffqn: _,
422                pending_expires_finished: scheduled_at,
423                last_lock_version: None,
424                executor_id: Some(executor_id),
425                run_id: Some(run_id),
426                join_set_id: None,
427                join_set_closing: None,
428                result_kind: None,
429            } if state == STATE_PENDING_AT => Ok(PendingState::PendingAt {
430                scheduled_at: *scheduled_at,
431                last_lock: Some(LockedBy {
432                    executor_id: *executor_id,
433                    run_id: *run_id,
434                }),
435            }),
436            CombinedStateDTO {
437                state,
438                ffqn: _,
439                pending_expires_finished: lock_expires_at,
440                last_lock_version: Some(_),
441                executor_id: Some(executor_id),
442                run_id: Some(run_id),
443                join_set_id: None,
444                join_set_closing: None,
445                result_kind: None,
446            } if state == STATE_LOCKED => Ok(PendingState::Locked(PendingStateLocked {
447                locked_by: LockedBy {
448                    executor_id: *executor_id,
449                    run_id: *run_id,
450                },
451                lock_expires_at: *lock_expires_at,
452            })),
453            CombinedStateDTO {
454                state,
455                ffqn: _,
456                pending_expires_finished: lock_expires_at,
457                last_lock_version: None,
458                executor_id: _,
459                run_id: _,
460                join_set_id: Some(join_set_id),
461                join_set_closing: Some(join_set_closing),
462                result_kind: None,
463            } if state == STATE_BLOCKED_BY_JOIN_SET => Ok(PendingState::BlockedByJoinSet {
464                join_set_id: join_set_id.clone(),
465                closing: *join_set_closing,
466                lock_expires_at: *lock_expires_at,
467            }),
468            CombinedStateDTO {
469                state,
470                ffqn: _,
471                pending_expires_finished: finished_at,
472                last_lock_version: None,
473                executor_id: None,
474                run_id: None,
475                join_set_id: None,
476                join_set_closing: None,
477                result_kind: Some(result_kind),
478            } if state == STATE_FINISHED => Ok(PendingState::Finished {
479                finished: PendingStateFinished {
480                    finished_at: *finished_at,
481                    version: corresponding_version.0,
482                    result_kind: *result_kind,
483                },
484            }),
485            _ => {
486                error!("Cannot deserialize pending state from  {dto:?}");
487                Err(consistency_rusqlite("invalid `t_state`"))
488            }
489        }?;
490        Ok(Self {
491            ffqn: FunctionFqn::from_str(&dto.ffqn).map_err(|parse_err| {
492                error!("Error parsing ffqn of {dto:?} - {parse_err:?}");
493                consistency_rusqlite("invalid ffqn value in `t_state`")
494            })?,
495            pending_state,
496            corresponding_version,
497        })
498    }
499}
500
501#[derive(derive_more::Debug)]
502struct LogicalTx {
503    #[debug(skip)]
504    func: Box<dyn FnMut(&mut Transaction) + Send>,
505    sent_at: Instant,
506    func_name: &'static str,
507    #[debug(skip)]
508    commit_ack_sender: oneshot::Sender<Result<(), RusqliteError>>,
509}
510
511#[derive(derive_more::Debug)]
512enum ThreadCommand {
513    LogicalTx(LogicalTx),
514    Shutdown,
515}
516
517#[derive(Clone)]
518pub struct SqlitePool(SqlitePoolInner);
519
520type ResponseSubscribers =
521    Arc<Mutex<HashMap<ExecutionId, (oneshot::Sender<JoinSetResponseEventOuter>, u64)>>>;
522type PendingFfqnSubscribers = Arc<Mutex<HashMap<FunctionFqn, (mpsc::Sender<()>, u64)>>>;
523type ExecutionFinishedSubscribers =
524    Mutex<HashMap<ExecutionId, HashMap<u64, oneshot::Sender<SupportedFunctionReturnValue>>>>;
525#[derive(Clone)]
526struct SqlitePoolInner {
527    shutdown_requested: Arc<AtomicBool>,
528    shutdown_finished: Arc<AtomicBool>,
529    command_tx: tokio::sync::mpsc::Sender<ThreadCommand>,
530    response_subscribers: ResponseSubscribers,
531    pending_ffqn_subscribers: PendingFfqnSubscribers,
532    execution_finished_subscribers: Arc<ExecutionFinishedSubscribers>,
533    join_handle: Option<Arc<std::thread::JoinHandle<()>>>, // always Some, Optional for swapping in drop.
534}
535
536#[async_trait]
537impl DbPoolCloseable for SqlitePool {
538    async fn close(self) {
539        debug!("Sqlite is closing");
540        self.0.shutdown_requested.store(true, Ordering::Release);
541        // Unblock the thread's blocking_recv. If the capacity is reached, the next processed message will trigger shutdown.
542        let _ = self.0.command_tx.try_send(ThreadCommand::Shutdown);
543        while !self.0.shutdown_finished.load(Ordering::Acquire) {
544            tokio::time::sleep(Duration::from_millis(1)).await;
545        }
546        debug!("Sqlite was closed");
547    }
548}
549
550#[async_trait]
551impl DbPool for SqlitePool {
552    fn connection(&self) -> Box<dyn DbConnection> {
553        Box::new(self.clone())
554    }
555}
556impl Drop for SqlitePool {
557    fn drop(&mut self) {
558        let arc = self.0.join_handle.take().expect("join_handle was set");
559        if let Ok(join_handle) = Arc::try_unwrap(arc) {
560            // Last holder
561            if !join_handle.is_finished() {
562                if !self.0.shutdown_finished.load(Ordering::Acquire) {
563                    // Best effort to shut down the sqlite thread.
564                    let backtrace = std::backtrace::Backtrace::capture();
565                    warn!("SqlitePool was not closed properly - {backtrace}");
566                    self.0.shutdown_requested.store(true, Ordering::Release);
567                    // Unblock the thread's blocking_recv. If the capacity is reached, the next processed message will trigger shutdown.
568                    let _ = self.0.command_tx.try_send(ThreadCommand::Shutdown);
569                    // Not joining the thread, drop might be called from async context.
570                    // We are shutting down the server anyway.
571                } else {
572                    // The thread set `shutdown_finished` as its last operation.
573                }
574            }
575        }
576    }
577}
578
579#[derive(Debug, Clone)]
580pub struct SqliteConfig {
581    pub queue_capacity: usize,
582    pub pragma_override: Option<HashMap<String, String>>,
583    pub metrics_threshold: Option<Duration>,
584}
585impl Default for SqliteConfig {
586    fn default() -> Self {
587        Self {
588            queue_capacity: 100,
589            pragma_override: None,
590            metrics_threshold: None,
591        }
592    }
593}
594
595impl SqlitePool {
596    fn init_thread(
597        path: &Path,
598        mut pragma_override: HashMap<String, String>,
599    ) -> Result<Connection, InitializationError> {
600        fn conn_execute<P: Params>(
601            conn: &Connection,
602            sql: &str,
603            params: P,
604        ) -> Result<(), InitializationError> {
605            conn.execute(sql, params).map(|_| ()).map_err(|err| {
606                error!("Cannot run `{sql}` - {err:?}");
607                InitializationError
608            })
609        }
610        fn pragma_update(
611            conn: &Connection,
612            name: &str,
613            value: &str,
614        ) -> Result<(), InitializationError> {
615            if value.is_empty() {
616                debug!("Querying PRAGMA {name}");
617                conn.pragma_query(None, name, |row| {
618                    debug!("{row:?}");
619                    Ok(())
620                })
621                .map_err(|err| {
622                    error!("cannot update pragma `{name}`=`{value}` - {err:?}");
623                    InitializationError
624                })
625            } else {
626                debug!("Setting PRAGMA {name}={value}");
627                conn.pragma_update(None, name, value).map_err(|err| {
628                    error!("cannot update pragma `{name}`=`{value}` - {err:?}");
629                    InitializationError
630                })
631            }
632        }
633
634        let conn = Connection::open_with_flags(path, OpenFlags::default()).map_err(|err| {
635            error!("cannot open the connection - {err:?}");
636            InitializationError
637        })?;
638
639        for [pragma_name, default_value] in PRAGMA {
640            let pragma_value = pragma_override
641                .remove(pragma_name)
642                .unwrap_or_else(|| default_value.to_string());
643            pragma_update(&conn, pragma_name, &pragma_value)?;
644        }
645        // drain the rest overrides
646        for (pragma_name, pragma_value) in pragma_override.drain() {
647            pragma_update(&conn, &pragma_name, &pragma_value)?;
648        }
649
650        // t_metadata
651        conn_execute(&conn, CREATE_TABLE_T_METADATA, [])?;
652        // Insert row if not exists.
653        conn_execute(
654            &conn,
655            &format!(
656                "INSERT INTO t_metadata (schema_version, created_at) VALUES
657                    ({T_METADATA_EXPECTED_SCHEMA_VERSION}, ?) ON CONFLICT DO NOTHING"
658            ),
659            [Utc::now()],
660        )?;
661        // Fail on unexpected `schema_version`.
662        let actual_version = conn
663            .prepare("SELECT schema_version FROM t_metadata ORDER BY id DESC LIMIT 1")
664            .map_err(|err| {
665                error!("cannot select schema version - {err:?}");
666                InitializationError
667            })?
668            .query_row([], |row| row.get::<_, u32>("schema_version"));
669
670        let actual_version = actual_version.map_err(|err| {
671            error!("Cannot read the schema version - {err:?}");
672            InitializationError
673        })?;
674        if actual_version != T_METADATA_EXPECTED_SCHEMA_VERSION {
675            error!(
676                "wrong schema version, expected {T_METADATA_EXPECTED_SCHEMA_VERSION}, got {actual_version}"
677            );
678            return Err(InitializationError);
679        }
680
681        // t_execution_log
682        conn_execute(&conn, CREATE_TABLE_T_EXECUTION_LOG, [])?;
683        conn_execute(
684            &conn,
685            CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_VERSION,
686            [],
687        )?;
688        conn_execute(
689            &conn,
690            CREATE_INDEX_IDX_T_EXECUTION_ID_EXECUTION_ID_VARIANT,
691            [],
692        )?;
693        // t_join_set_response
694        conn_execute(&conn, CREATE_TABLE_T_JOIN_SET_RESPONSE, [])?;
695        conn_execute(
696            &conn,
697            CREATE_INDEX_IDX_T_JOIN_SET_RESPONSE_EXECUTION_ID_ID,
698            [],
699        )?;
700        // t_state
701        conn_execute(&conn, CREATE_TABLE_T_STATE, [])?;
702        conn_execute(&conn, IDX_T_STATE_LOCK_PENDING, [])?;
703        conn_execute(&conn, IDX_T_STATE_EXPIRED_TIMERS, [])?;
704        conn_execute(&conn, IDX_T_STATE_EXECUTION_ID_IS_TOP_LEVEL, [])?;
705        conn_execute(&conn, IDX_T_STATE_FFQN, [])?;
706        conn_execute(&conn, IDX_T_STATE_CREATED_AT, [])?;
707        // t_delay
708        conn_execute(&conn, CREATE_TABLE_T_DELAY, [])?;
709        // t_backtrace
710        conn_execute(&conn, CREATE_TABLE_T_BACKTRACE, [])?;
711        conn_execute(&conn, IDX_T_BACKTRACE_EXECUTION_ID_VERSION, [])?;
712        Ok(conn)
713    }
714
715    fn connection_rpc(
716        mut conn: Connection,
717        shutdown_requested: &AtomicBool,
718        shutdown_finished: &AtomicBool,
719        mut command_rx: mpsc::Receiver<ThreadCommand>,
720        metrics_threshold: Option<Duration>,
721    ) {
722        let mut histograms = Histograms::new(metrics_threshold);
723        while Self::tick(
724            &mut conn,
725            shutdown_requested,
726            &mut command_rx,
727            &mut histograms,
728        )
729        .is_ok()
730        {
731            // Loop until shutdown is set to true.
732        }
733        debug!("Closing command thread");
734        shutdown_finished.store(true, Ordering::Release);
735    }
736
737    fn tick(
738        conn: &mut Connection,
739        shutdown_requested: &AtomicBool,
740        command_rx: &mut mpsc::Receiver<ThreadCommand>,
741        histograms: &mut Histograms,
742    ) -> Result<(), ()> {
743        let mut ltx_list = Vec::new();
744        // Wait for first logical tx.
745        let mut ltx = match command_rx.blocking_recv() {
746            Some(ThreadCommand::LogicalTx(ltx)) => ltx,
747            Some(ThreadCommand::Shutdown) => {
748                debug!("shutdown message received");
749                return Err(());
750            }
751            None => {
752                debug!("command_rx was closed");
753                return Err(());
754            }
755        };
756        let all_fns_start = std::time::Instant::now();
757        let mut physical_tx = conn
758            .transaction_with_behavior(TransactionBehavior::Immediate)
759            .map_err(|begin_err| {
760                error!("Cannot open transaction, closing sqlite - {begin_err:?}");
761            })?;
762        Self::ltx_apply_to_tx(&mut ltx, &mut physical_tx, histograms);
763        ltx_list.push(ltx);
764
765        while let Ok(more) = command_rx.try_recv() {
766            let mut ltx = match more {
767                ThreadCommand::Shutdown => {
768                    debug!("shutdown message received");
769                    return Err(());
770                }
771                ThreadCommand::LogicalTx(ltx) => ltx,
772            };
773
774            Self::ltx_apply_to_tx(&mut ltx, &mut physical_tx, histograms);
775            ltx_list.push(ltx);
776        }
777        histograms.record_all_fns(all_fns_start.elapsed());
778
779        {
780            // Commit
781            if shutdown_requested.load(Ordering::Relaxed) {
782                debug!("Recveived shutdown during processing of the batch");
783                return Err(());
784            }
785            let commit_result = {
786                let now = std::time::Instant::now();
787                let commit_result = physical_tx.commit().map_err(RusqliteError::from);
788                histograms.record_commit(now.elapsed());
789                commit_result
790            };
791            if commit_result.is_ok() || ltx_list.len() == 1 {
792                // Happy path - just send the commit ACK.
793                for ltx in ltx_list {
794                    // Ignore the result: ThreadCommand producer timed out before awaiting the ack.
795                    let _ = ltx.commit_ack_sender.send(commit_result.clone());
796                }
797            } else {
798                warn!(
799                    "Bulk transaction failed, committing {} transactions serially",
800                    ltx_list.len()
801                );
802                // Bulk transaction failed. Replay each ltx in its own physical transaction.
803                for ltx in ltx_list {
804                    Self::ltx_commit_single(ltx, conn, shutdown_requested, histograms)?;
805                }
806            }
807        }
808        histograms.print_if_elapsed();
809        Ok(())
810    }
811
812    fn ltx_commit_single(
813        mut ltx: LogicalTx,
814        conn: &mut Connection,
815        shutdown_requested: &AtomicBool,
816        histograms: &mut Histograms,
817    ) -> Result<(), ()> {
818        let mut physical_tx = conn
819            .transaction_with_behavior(TransactionBehavior::Immediate)
820            .map_err(|begin_err| {
821                error!("Cannot open transaction, closing sqlite - {begin_err:?}");
822            })?;
823        Self::ltx_apply_to_tx(&mut ltx, &mut physical_tx, histograms);
824        if shutdown_requested.load(Ordering::Relaxed) {
825            debug!("Recveived shutdown during processing of the batch");
826            return Err(());
827        }
828        let commit_result = {
829            let now = std::time::Instant::now();
830            let commit_result = physical_tx.commit().map_err(RusqliteError::from);
831            histograms.record_commit(now.elapsed());
832            commit_result
833        };
834        // Ignore the result: ThreadCommand producer timed out before awaiting the ack.
835        let _ = ltx.commit_ack_sender.send(commit_result);
836        Ok(())
837    }
838
839    fn ltx_apply_to_tx(
840        ltx: &mut LogicalTx,
841        physical_tx: &mut Transaction,
842        histograms: &mut Histograms,
843    ) {
844        let sent_latency = ltx.sent_at.elapsed();
845        let started_at = Instant::now();
846        (ltx.func)(physical_tx);
847        histograms.record_command(sent_latency, ltx.func_name, started_at.elapsed());
848    }
849
850    #[instrument(level = Level::DEBUG, skip_all, name = "sqlite_new")]
851    pub async fn new<P: AsRef<Path>>(
852        path: P,
853        config: SqliteConfig,
854    ) -> Result<Self, InitializationError> {
855        let path = path.as_ref().to_owned();
856
857        let shutdown_requested = Arc::new(AtomicBool::new(false));
858        let shutdown_finished = Arc::new(AtomicBool::new(false));
859
860        let (command_tx, command_rx) = tokio::sync::mpsc::channel(config.queue_capacity);
861        info!("Sqlite database location: {path:?}");
862        let join_handle = {
863            // Initialize the `Connection`.
864            let init_task = {
865                tokio::task::spawn_blocking(move || {
866                    Self::init_thread(&path, config.pragma_override.unwrap_or_default())
867                })
868                .await
869            };
870            let conn = match init_task {
871                Ok(res) => res?,
872                Err(join_err) => {
873                    error!("Initialization panic - {join_err:?}");
874                    return Err(InitializationError);
875                }
876            };
877            let shutdown_requested = shutdown_requested.clone();
878            let shutdown_finished = shutdown_finished.clone();
879            // Start the RPC thread.
880            std::thread::spawn(move || {
881                Self::connection_rpc(
882                    conn,
883                    &shutdown_requested,
884                    &shutdown_finished,
885                    command_rx,
886                    config.metrics_threshold,
887                );
888            })
889        };
890        Ok(SqlitePool(SqlitePoolInner {
891            shutdown_requested,
892            shutdown_finished,
893            command_tx,
894            response_subscribers: Arc::default(),
895            pending_ffqn_subscribers: Arc::default(),
896            join_handle: Some(Arc::new(join_handle)),
897            execution_finished_subscribers: Arc::default(),
898        }))
899    }
900
901    /// Invokes the provided function wrapping a new [`rusqlite::Transaction`] that is committed automatically.
902    async fn transaction<F, T, E>(&self, mut func: F, func_name: &'static str) -> Result<T, E>
903    where
904        F: FnMut(&mut rusqlite::Transaction) -> Result<T, E> + Send + 'static,
905        T: Send + 'static,
906        E: From<DbErrorGeneric> + From<RusqliteError> + Send + 'static,
907    {
908        let fn_res: Arc<std::sync::Mutex<Option<_>>> = Arc::default();
909        let (commit_ack_sender, commit_ack_receiver) = oneshot::channel();
910        let thread_command_func = {
911            let fn_res = fn_res.clone();
912            ThreadCommand::LogicalTx(LogicalTx {
913                func: Box::new(move |tx| {
914                    let func_res = func(tx);
915                    *fn_res.lock().unwrap() = Some(func_res);
916                }),
917                sent_at: Instant::now(),
918                func_name,
919                commit_ack_sender,
920            })
921        };
922        self.0
923            .command_tx
924            .send(thread_command_func)
925            .await
926            .map_err(|_send_err| DbErrorGeneric::Close)?;
927
928        // Wait for commit ack, then get the retval from the mutex.
929        match commit_ack_receiver.await {
930            Ok(Ok(())) => {
931                let mut guard = fn_res.lock().unwrap();
932                std::mem::take(guard.deref_mut()).expect("ltx must have been run at least once")
933            }
934            Ok(Err(rusqlite_err)) => Err(E::from(rusqlite_err)),
935            Err(_) => Err(E::from(DbErrorGeneric::Close)),
936        }
937    }
938
939    fn fetch_created_event(
940        conn: &Connection,
941        execution_id: &ExecutionId,
942    ) -> Result<CreateRequest, DbErrorRead> {
943        let mut stmt = conn.prepare(
944            "SELECT created_at, json_value FROM t_execution_log WHERE \
945            execution_id = :execution_id AND version = 0",
946        )?;
947        let (created_at, event) = stmt.query_row(
948            named_params! {
949                ":execution_id": execution_id.to_string(),
950            },
951            |row| {
952                let created_at = row.get("created_at")?;
953                let event = row
954                    .get::<_, JsonWrapper<ExecutionEventInner>>("json_value")
955                    .map_err(|serde| {
956                        error!("cannot deserialize `Created` event: {row:?} - `{serde:?}`");
957                        consistency_rusqlite("cannot deserialize `Created` event")
958                    })?;
959                Ok((created_at, event.0))
960            },
961        )?;
962        if let ExecutionEventInner::Created {
963            ffqn,
964            params,
965            parent,
966            scheduled_at,
967            component_id,
968            metadata,
969            scheduled_by,
970        } = event
971        {
972            Ok(CreateRequest {
973                created_at,
974                execution_id: execution_id.clone(),
975                ffqn,
976                params,
977                parent,
978                scheduled_at,
979                component_id,
980                metadata,
981                scheduled_by,
982            })
983        } else {
984            error!("Row with version=0 must be a `Created` event - {event:?}");
985            Err(consistency_db_err("expected `Created` event").into())
986        }
987    }
988
989    fn check_expected_next_and_appending_version(
990        expected_version: &Version,
991        appending_version: &Version,
992    ) -> Result<(), DbErrorWrite> {
993        if *expected_version != *appending_version {
994            debug!(
995                "Version conflict - expected: {expected_version:?}, appending: {appending_version:?}"
996            );
997            return Err(DbErrorWrite::NonRetriable(
998                DbErrorWriteNonRetriable::VersionConflict(expected_version.clone()),
999            ));
1000        }
1001        Ok(())
1002    }
1003
1004    #[instrument(level = Level::TRACE, skip_all, fields(execution_id = %req.execution_id))]
1005    fn create_inner(
1006        tx: &Transaction,
1007        req: CreateRequest,
1008    ) -> Result<(AppendResponse, AppendNotifier), DbErrorWrite> {
1009        debug!("create_inner");
1010
1011        let version = Version::default();
1012        let execution_id = req.execution_id.clone();
1013        let execution_id_str = execution_id.to_string();
1014        let ffqn = req.ffqn.clone();
1015        let created_at = req.created_at;
1016        let scheduled_at = req.scheduled_at;
1017        let event = ExecutionEventInner::from(req);
1018        let event_ser = serde_json::to_string(&event).map_err(|err| {
1019            error!("Cannot serialize {event:?} - {err:?}");
1020            DbErrorWriteNonRetriable::ValidationFailed("parameter serialization error".into())
1021        })?;
1022        tx.prepare(
1023                "INSERT INTO t_execution_log (execution_id, created_at, version, json_value, variant, join_set_id ) \
1024                VALUES (:execution_id, :created_at, :version, :json_value, :variant, :join_set_id)")
1025        ?
1026        .execute(named_params! {
1027            ":execution_id": &execution_id_str,
1028            ":created_at": created_at,
1029            ":version": version.0,
1030            ":json_value": event_ser,
1031            ":variant": event.variant(),
1032            ":join_set_id": event.join_set_id().map(std::string::ToString::to_string),
1033        })
1034        ?;
1035        let pending_at = {
1036            debug!("Creating with `Pending(`{scheduled_at:?}`)");
1037            tx.prepare(
1038                r"
1039                INSERT INTO t_state (
1040                    execution_id,
1041                    is_top_level,
1042                    corresponding_version,
1043                    pending_expires_finished,
1044                    ffqn,
1045                    state,
1046                    created_at,
1047                    updated_at,
1048                    scheduled_at,
1049                    intermittent_event_count
1050                    )
1051                VALUES (
1052                    :execution_id,
1053                    :is_top_level,
1054                    :corresponding_version,
1055                    :pending_expires_finished,
1056                    :ffqn,
1057                    :state,
1058                    :created_at,
1059                    CURRENT_TIMESTAMP,
1060                    :scheduled_at,
1061                    0
1062                    )
1063                ",
1064            )?
1065            .execute(named_params! {
1066                ":execution_id": execution_id.to_string(),
1067                ":is_top_level": execution_id.is_top_level(),
1068                ":corresponding_version": version.0,
1069                ":pending_expires_finished": scheduled_at,
1070                ":ffqn": ffqn.to_string(),
1071                ":state": STATE_PENDING_AT,
1072                ":created_at": created_at,
1073                ":scheduled_at": scheduled_at,
1074            })?;
1075            AppendNotifier {
1076                pending_at: Some(NotifierPendingAt { scheduled_at, ffqn }),
1077                execution_finished: None,
1078                response: None,
1079            }
1080        };
1081        let next_version = Version::new(version.0 + 1);
1082        Ok((next_version, pending_at))
1083    }
1084
1085    #[instrument(level = Level::DEBUG, skip_all, fields(%execution_id, %scheduled_at, %corresponding_version))]
1086    fn update_state_pending_after_response_appended(
1087        tx: &Transaction,
1088        execution_id: &ExecutionId,
1089        scheduled_at: DateTime<Utc>,     // Changing to state PendingAt
1090        corresponding_version: &Version, // t_execution_log is not be changed
1091    ) -> Result<AppendNotifier, DbErrorWrite> {
1092        debug!("Setting t_state to Pending(`{scheduled_at:?}`) after response appended");
1093        let execution_id_str = execution_id.to_string();
1094        let mut stmt = tx
1095            .prepare_cached(
1096                r"
1097                UPDATE t_state
1098                SET
1099                    corresponding_version = :corresponding_version,
1100                    pending_expires_finished = :pending_expires_finished,
1101                    state = :state,
1102                    updated_at = CURRENT_TIMESTAMP,
1103
1104                    last_lock_version = NULL,
1105
1106                    join_set_id = NULL,
1107                    join_set_closing = NULL,
1108
1109                    result_kind = NULL
1110                WHERE execution_id = :execution_id
1111            ",
1112            )
1113            .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1114        let updated = stmt
1115            .execute(named_params! {
1116                ":execution_id": execution_id_str,
1117                ":corresponding_version": corresponding_version.0,
1118                ":pending_expires_finished": scheduled_at,
1119                ":state": STATE_PENDING_AT,
1120            })
1121            .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1122        if updated != 1 {
1123            return Err(DbErrorWrite::NotFound);
1124        }
1125        Ok(AppendNotifier {
1126            pending_at: Some(NotifierPendingAt {
1127                scheduled_at,
1128                ffqn: Self::fetch_created_event(tx, execution_id)?.ffqn,
1129            }),
1130            execution_finished: None,
1131            response: None,
1132        })
1133    }
1134
1135    #[instrument(level = Level::DEBUG, skip_all, fields(%execution_id, %scheduled_at, %appending_version))]
1136    fn update_state_pending_after_event_appended(
1137        tx: &Transaction,
1138        execution_id: &ExecutionId,
1139        appending_version: &Version,
1140        scheduled_at: DateTime<Utc>, // Changing to state PendingAt
1141        intermittent_failure: bool,
1142    ) -> Result<(AppendResponse, AppendNotifier), DbErrorWrite> {
1143        debug!("Setting t_state to Pending(`{scheduled_at:?}`) after event appended");
1144        // If response idx is unknown, use 0. This distinguishs the two paths.
1145        // JoinNext will always send an actual idx.
1146        let mut stmt = tx
1147            .prepare_cached(
1148                r"
1149                UPDATE t_state
1150                SET
1151                    corresponding_version = :appending_version,
1152                    pending_expires_finished = :pending_expires_finished,
1153                    state = :state,
1154                    updated_at = CURRENT_TIMESTAMP,
1155                    intermittent_event_count = intermittent_event_count + :intermittent_delta,
1156
1157                    last_lock_version = NULL,
1158
1159                    join_set_id = NULL,
1160                    join_set_closing = NULL,
1161
1162                    result_kind = NULL
1163                WHERE execution_id = :execution_id;
1164            ",
1165            )
1166            .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1167        let updated = stmt
1168            .execute(named_params! {
1169                ":execution_id": execution_id.to_string(),
1170                ":appending_version": appending_version.0,
1171                ":pending_expires_finished": scheduled_at,
1172                ":state": STATE_PENDING_AT,
1173                ":intermittent_delta": i32::from(intermittent_failure) // 0 or 1
1174            })
1175            .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1176        if updated != 1 {
1177            return Err(DbErrorWrite::NotFound);
1178        }
1179        Ok((
1180            appending_version.increment(),
1181            AppendNotifier {
1182                pending_at: Some(NotifierPendingAt {
1183                    scheduled_at,
1184                    ffqn: Self::fetch_created_event(tx, execution_id)?.ffqn,
1185                }),
1186                execution_finished: None,
1187                response: None,
1188            },
1189        ))
1190    }
1191
1192    fn update_state_locked_get_intermittent_event_count(
1193        tx: &Transaction,
1194        execution_id: &ExecutionId,
1195        executor_id: ExecutorId,
1196        run_id: RunId,
1197        lock_expires_at: DateTime<Utc>,
1198        appending_version: &Version,
1199        retry_config: ComponentRetryConfig,
1200    ) -> Result<u32, DbErrorWrite> {
1201        debug!("Setting t_state to Locked(`{lock_expires_at:?}`)");
1202        let backoff_millis =
1203            u64::try_from(retry_config.retry_exp_backoff.as_millis()).expect("backoff too big");
1204        let execution_id_str = execution_id.to_string();
1205        let mut stmt = tx
1206            .prepare_cached(
1207                r"
1208                UPDATE t_state
1209                SET
1210                    corresponding_version = :appending_version,
1211                    pending_expires_finished = :pending_expires_finished,
1212                    state = :state,
1213                    updated_at = CURRENT_TIMESTAMP,
1214
1215                    max_retries = :max_retries,
1216                    retry_exp_backoff_millis = :retry_exp_backoff_millis,
1217                    last_lock_version = :appending_version,
1218                    executor_id = :executor_id,
1219                    run_id = :run_id,
1220
1221                    join_set_id = NULL,
1222                    join_set_closing = NULL,
1223
1224                    result_kind = NULL
1225                WHERE execution_id = :execution_id
1226            ",
1227            )
1228            .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1229        let updated = stmt
1230            .execute(named_params! {
1231                ":execution_id": execution_id_str,
1232                ":appending_version": appending_version.0,
1233                ":pending_expires_finished": lock_expires_at,
1234                ":state": STATE_LOCKED,
1235                ":max_retries": retry_config.max_retries,
1236                ":retry_exp_backoff_millis": backoff_millis,
1237                ":executor_id": executor_id.to_string(),
1238                ":run_id": run_id.to_string(),
1239            })
1240            .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1241        if updated != 1 {
1242            return Err(DbErrorWrite::NotFound);
1243        }
1244
1245        // fetch intermittent event count from the just-inserted row.
1246        let intermittent_event_count = tx
1247            .prepare(
1248                "SELECT intermittent_event_count FROM t_state WHERE execution_id = :execution_id",
1249            )?
1250            .query_row(
1251                named_params! {
1252                    ":execution_id": execution_id_str,
1253                },
1254                |row| {
1255                    let intermittent_event_count = row.get("intermittent_event_count")?;
1256                    Ok(intermittent_event_count)
1257                },
1258            )
1259            .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1260
1261        Ok(intermittent_event_count)
1262    }
1263
1264    fn update_state_blocked(
1265        tx: &Transaction,
1266        execution_id: &ExecutionId,
1267        appending_version: &Version,
1268        // BlockedByJoinSet fields
1269        join_set_id: &JoinSetId,
1270        lock_expires_at: DateTime<Utc>,
1271        join_set_closing: bool,
1272    ) -> Result<
1273        AppendResponse, // next version
1274        DbErrorWrite,
1275    > {
1276        debug!("Setting t_state to BlockedByJoinSet(`{join_set_id}`)");
1277        let execution_id_str = execution_id.to_string();
1278        let mut stmt = tx.prepare_cached(
1279            r"
1280                UPDATE t_state
1281                SET
1282                    corresponding_version = :appending_version,
1283                    pending_expires_finished = :pending_expires_finished,
1284                    state = :state,
1285                    updated_at = CURRENT_TIMESTAMP,
1286
1287                    last_lock_version = NULL,
1288
1289                    join_set_id = :join_set_id,
1290                    join_set_closing = :join_set_closing,
1291
1292                    result_kind = NULL
1293                WHERE execution_id = :execution_id
1294            ",
1295        )?;
1296        let updated = stmt.execute(named_params! {
1297            ":execution_id": execution_id_str,
1298            ":appending_version": appending_version.0,
1299            ":pending_expires_finished": lock_expires_at,
1300            ":state": STATE_BLOCKED_BY_JOIN_SET,
1301            ":join_set_id": join_set_id,
1302            ":join_set_closing": join_set_closing,
1303        })?;
1304        if updated != 1 {
1305            return Err(DbErrorWrite::NotFound);
1306        }
1307        Ok(appending_version.increment())
1308    }
1309
1310    fn update_state_finished(
1311        tx: &Transaction,
1312        execution_id: &ExecutionId,
1313        appending_version: &Version,
1314        // Finished fields
1315        finished_at: DateTime<Utc>,
1316        result_kind: PendingStateFinishedResultKind,
1317    ) -> Result<(), DbErrorWrite> {
1318        debug!("Setting t_state to Finished");
1319        let execution_id_str = execution_id.to_string();
1320        let mut stmt = tx.prepare_cached(
1321            r"
1322                UPDATE t_state
1323                SET
1324                    corresponding_version = :appending_version,
1325                    pending_expires_finished = :pending_expires_finished,
1326                    state = :state,
1327                    updated_at = CURRENT_TIMESTAMP,
1328
1329                    last_lock_version = NULL,
1330                    executor_id = NULL,
1331                    run_id = NULL,
1332
1333                    join_set_id = NULL,
1334                    join_set_closing = NULL,
1335
1336                    result_kind = :result_kind
1337                WHERE execution_id = :execution_id
1338            ",
1339        )?;
1340        let updated = stmt.execute(named_params! {
1341            ":execution_id": execution_id_str,
1342            ":appending_version": appending_version.0,
1343            ":pending_expires_finished": finished_at,
1344            ":state": STATE_FINISHED,
1345            ":result_kind": result_kind.to_string(),
1346        })?;
1347        if updated != 1 {
1348            return Err(DbErrorWrite::NotFound);
1349        }
1350        Ok(())
1351    }
1352
1353    // Upon appending new event to t_execution_log, copy the previous t_state with changed appending_version and created_at.
1354    #[instrument(level = Level::TRACE, skip_all, fields(%execution_id, %appending_version))]
1355    fn bump_state_next_version(
1356        tx: &Transaction,
1357        execution_id: &ExecutionId,
1358        appending_version: &Version,
1359        delay_req: Option<DelayReq>,
1360    ) -> Result<AppendResponse /* next version */, DbErrorWrite> {
1361        debug!("update_index_version");
1362        let execution_id_str = execution_id.to_string();
1363        let mut stmt = tx.prepare_cached(
1364            r"
1365                UPDATE t_state
1366                SET
1367                    corresponding_version = :appending_version,
1368                    updated_at = CURRENT_TIMESTAMP
1369                WHERE execution_id = :execution_id
1370            ",
1371        )?;
1372        let updated = stmt.execute(named_params! {
1373            ":execution_id": execution_id_str,
1374            ":appending_version": appending_version.0,
1375        })?;
1376        if updated != 1 {
1377            return Err(DbErrorWrite::NotFound);
1378        }
1379        if let Some(DelayReq {
1380            join_set_id,
1381            delay_id,
1382            expires_at,
1383        }) = delay_req
1384        {
1385            debug!("Inserting delay to `t_delay`");
1386            let mut stmt = tx.prepare_cached(
1387                "INSERT INTO t_delay (execution_id, join_set_id, delay_id, expires_at) \
1388                VALUES \
1389                (:execution_id, :join_set_id, :delay_id, :expires_at)",
1390            )?;
1391            stmt.execute(named_params! {
1392                ":execution_id": execution_id_str,
1393                ":join_set_id": join_set_id.to_string(),
1394                ":delay_id": delay_id.to_string(),
1395                ":expires_at": expires_at,
1396            })?;
1397        }
1398        Ok(appending_version.increment())
1399    }
1400
1401    fn get_combined_state(
1402        tx: &Transaction,
1403        execution_id: &ExecutionId,
1404    ) -> Result<CombinedState, DbErrorRead> {
1405        let mut stmt = tx.prepare(
1406            r"
1407                SELECT
1408                    state, ffqn, corresponding_version, pending_expires_finished,
1409                    last_lock_version, executor_id, run_id,
1410                    join_set_id, join_set_closing,
1411                    result_kind
1412                    FROM t_state
1413                WHERE
1414                    execution_id = :execution_id
1415                ",
1416        )?;
1417        stmt.query_row(
1418            named_params! {
1419                ":execution_id": execution_id.to_string(),
1420            },
1421            |row| {
1422                CombinedState::new(
1423                    &CombinedStateDTO {
1424                        state: row.get("state")?,
1425                        ffqn: row.get("ffqn")?,
1426                        pending_expires_finished: row
1427                            .get::<_, DateTime<Utc>>("pending_expires_finished")?,
1428                        last_lock_version: row
1429                            .get::<_, Option<VersionType>>("last_lock_version")?
1430                            .map(Version::new),
1431                        executor_id: row.get::<_, Option<ExecutorId>>("executor_id")?,
1432                        run_id: row.get::<_, Option<RunId>>("run_id")?,
1433                        join_set_id: row.get::<_, Option<JoinSetId>>("join_set_id")?,
1434                        join_set_closing: row.get::<_, Option<bool>>("join_set_closing")?,
1435                        result_kind: row
1436                            .get::<_, Option<FromStrWrapper<PendingStateFinishedResultKind>>>(
1437                                "result_kind",
1438                            )?
1439                            .map(|wrapper| wrapper.0),
1440                    },
1441                    Version::new(row.get("corresponding_version")?),
1442                )
1443            },
1444        )
1445        .map_err(DbErrorRead::from)
1446    }
1447
1448    fn list_executions(
1449        read_tx: &Transaction,
1450        ffqn: Option<&FunctionFqn>,
1451        top_level_only: bool,
1452        pagination: &ExecutionListPagination,
1453    ) -> Result<Vec<ExecutionWithState>, DbErrorGeneric> {
1454        struct StatementModifier<'a> {
1455            where_vec: Vec<String>,
1456            params: Vec<(&'static str, ToSqlOutput<'a>)>,
1457            limit: u32,
1458            limit_desc: bool,
1459        }
1460
1461        fn paginate<'a, T: rusqlite::ToSql + 'static>(
1462            pagination: &'a Pagination<Option<T>>,
1463            column: &str,
1464            top_level_only: bool,
1465        ) -> Result<StatementModifier<'a>, DbErrorGeneric> {
1466            let mut where_vec: Vec<String> = vec![];
1467            let mut params: Vec<(&'static str, ToSqlOutput<'a>)> = vec![];
1468            let limit = pagination.length();
1469            let limit_desc = pagination.is_desc();
1470            let rel = pagination.rel();
1471            match pagination {
1472                Pagination::NewerThan {
1473                    cursor: Some(cursor),
1474                    ..
1475                }
1476                | Pagination::OlderThan {
1477                    cursor: Some(cursor),
1478                    ..
1479                } => {
1480                    where_vec.push(format!("{column} {rel} :cursor"));
1481                    let cursor = cursor.to_sql().map_err(|err| {
1482                        error!("Possible program error - cannot convert cursor to sql - {err:?}");
1483                        DbErrorGeneric::Uncategorized("cannot convert cursor to sql".into())
1484                    })?;
1485                    params.push((":cursor", cursor));
1486                }
1487                _ => {}
1488            }
1489            if top_level_only {
1490                where_vec.push("is_top_level=true".to_string());
1491            }
1492            Ok(StatementModifier {
1493                where_vec,
1494                params,
1495                limit,
1496                limit_desc,
1497            })
1498        }
1499
1500        let mut statement_mod = match pagination {
1501            ExecutionListPagination::CreatedBy(pagination) => {
1502                paginate(pagination, "created_at", top_level_only)?
1503            }
1504            ExecutionListPagination::ExecutionId(pagination) => {
1505                paginate(pagination, "execution_id", top_level_only)?
1506            }
1507        };
1508
1509        let ffqn_temporary;
1510        if let Some(ffqn) = ffqn {
1511            statement_mod.where_vec.push("ffqn = :ffqn".to_string());
1512            ffqn_temporary = ffqn.to_string();
1513            let ffqn = ffqn_temporary
1514                .to_sql()
1515                .expect("string conversion never fails");
1516
1517            statement_mod.params.push((":ffqn", ffqn));
1518        }
1519
1520        let where_str = if statement_mod.where_vec.is_empty() {
1521            String::new()
1522        } else {
1523            format!("WHERE {}", statement_mod.where_vec.join(" AND "))
1524        };
1525        let sql = format!(
1526            r"
1527            SELECT created_at, scheduled_at, state, execution_id, ffqn, corresponding_version, pending_expires_finished,
1528            last_lock_version, executor_id, run_id,
1529            join_set_id, join_set_closing,
1530            result_kind
1531            FROM t_state {where_str} ORDER BY created_at {desc} LIMIT {limit}
1532            ",
1533            desc = if statement_mod.limit_desc { "DESC" } else { "" },
1534            limit = statement_mod.limit,
1535        );
1536        let mut vec: Vec<_> = read_tx
1537            .prepare(&sql)?
1538            .query_map::<_, &[(&'static str, ToSqlOutput)], _>(
1539                statement_mod
1540                    .params
1541                    .into_iter()
1542                    .collect::<Vec<_>>()
1543                    .as_ref(),
1544                |row| {
1545                    let execution_id = row.get::<_, ExecutionId>("execution_id")?;
1546                    let created_at = row.get("created_at")?;
1547                    let scheduled_at = row.get("scheduled_at")?;
1548                    let combined_state = CombinedState::new(
1549                        &CombinedStateDTO {
1550                            state: row.get("state")?,
1551                            ffqn: row.get("ffqn")?,
1552                            pending_expires_finished: row
1553                                .get::<_, DateTime<Utc>>("pending_expires_finished")?,
1554                            executor_id: row.get::<_, Option<ExecutorId>>("executor_id")?,
1555
1556                            last_lock_version: row
1557                                .get::<_, Option<VersionType>>("last_lock_version")?
1558                                .map(Version::new),
1559                            run_id: row.get::<_, Option<RunId>>("run_id")?,
1560                            join_set_id: row.get::<_, Option<JoinSetId>>("join_set_id")?,
1561                            join_set_closing: row.get::<_, Option<bool>>("join_set_closing")?,
1562                            result_kind: row
1563                                .get::<_, Option<FromStrWrapper<PendingStateFinishedResultKind>>>(
1564                                    "result_kind",
1565                                )?
1566                                .map(|wrapper| wrapper.0),
1567                        },
1568                        Version::new(row.get("corresponding_version")?),
1569                    )?;
1570                    Ok(ExecutionWithState {
1571                        execution_id,
1572                        ffqn: combined_state.ffqn,
1573                        pending_state: combined_state.pending_state,
1574                        created_at,
1575                        scheduled_at,
1576                    })
1577                },
1578            )?
1579            .collect::<Vec<Result<_, _>>>()
1580            .into_iter()
1581            .filter_map(|row| match row {
1582                Ok(row) => Some(row),
1583                Err(err) => {
1584                    warn!("Skipping row - {err:?}");
1585                    None
1586                }
1587            })
1588            .collect();
1589
1590        if !statement_mod.limit_desc {
1591            // the list must be sorted in descending order
1592            vec.reverse();
1593        }
1594        Ok(vec)
1595    }
1596
1597    #[instrument(level = Level::TRACE, skip_all, fields(%execution_id, %run_id, %executor_id))]
1598    #[expect(clippy::too_many_arguments)]
1599    fn lock_single_execution(
1600        tx: &Transaction,
1601        created_at: DateTime<Utc>,
1602        component_id: &ComponentId,
1603        execution_id: &ExecutionId,
1604        run_id: RunId,
1605        appending_version: &Version,
1606        executor_id: ExecutorId,
1607        lock_expires_at: DateTime<Utc>,
1608        retry_config: ComponentRetryConfig,
1609    ) -> Result<LockedExecution, DbErrorWrite> {
1610        debug!("lock_single_execution");
1611        let combined_state = Self::get_combined_state(tx, execution_id)?;
1612        combined_state.pending_state.can_append_lock(
1613            created_at,
1614            executor_id,
1615            run_id,
1616            lock_expires_at,
1617        )?;
1618        let expected_version = combined_state.get_next_version_assert_not_finished();
1619        Self::check_expected_next_and_appending_version(&expected_version, appending_version)?;
1620
1621        // Append to `execution_log` table.
1622        let locked_event = Locked {
1623            component_id: component_id.clone(),
1624            executor_id,
1625            lock_expires_at,
1626            run_id,
1627            retry_config,
1628        };
1629        let event = ExecutionEventInner::Locked(locked_event.clone());
1630        let event_ser = serde_json::to_string(&event).map_err(|err| {
1631            warn!("Cannot serialize {event:?} - {err:?}");
1632            DbErrorWriteNonRetriable::ValidationFailed("parameter serialization error".into())
1633        })?;
1634        let mut stmt = tx
1635            .prepare_cached(
1636                "INSERT INTO t_execution_log \
1637            (execution_id, created_at, json_value, version, variant) \
1638            VALUES \
1639            (:execution_id, :created_at, :json_value, :version, :variant)",
1640            )
1641            .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1642        stmt.execute(named_params! {
1643            ":execution_id": execution_id.to_string(),
1644            ":created_at": created_at,
1645            ":json_value": event_ser,
1646            ":version": appending_version.0,
1647            ":variant": event.variant(),
1648        })
1649        .map_err(|err| {
1650            warn!("Cannot lock execution - {err:?}");
1651            DbErrorWrite::NonRetriable(DbErrorWriteNonRetriable::IllegalState("cannot lock".into()))
1652        })?;
1653
1654        // Update `t_state`
1655        let responses = Self::list_responses(tx, execution_id, None)?;
1656        let responses = responses.into_iter().map(|resp| resp.event).collect();
1657        trace!("Responses: {responses:?}");
1658
1659        let intermittent_event_count = Self::update_state_locked_get_intermittent_event_count(
1660            tx,
1661            execution_id,
1662            executor_id,
1663            run_id,
1664            lock_expires_at,
1665            appending_version,
1666            retry_config,
1667        )?;
1668        // Fetch event_history and `Created` event to construct the response.
1669        let mut events = tx
1670            .prepare(
1671                "SELECT json_value FROM t_execution_log WHERE \
1672                execution_id = :execution_id AND (variant = :v1 OR variant = :v2) \
1673                ORDER BY version",
1674            )?
1675            .query_map(
1676                named_params! {
1677                    ":execution_id": execution_id.to_string(),
1678                    ":v1": DUMMY_CREATED.variant(),
1679                    ":v2": DUMMY_HISTORY_EVENT.variant(),
1680                },
1681                |row| {
1682                    row.get::<_, JsonWrapper<ExecutionEventInner>>("json_value")
1683                        .map(|wrapper| wrapper.0)
1684                        .map_err(|serde| {
1685                            error!("Cannot deserialize {row:?} - {serde:?}");
1686                            consistency_rusqlite("cannot deserialize json value")
1687                        })
1688                },
1689            )?
1690            .collect::<Result<Vec<_>, _>>()?
1691            .into_iter()
1692            .collect::<VecDeque<_>>();
1693        let Some(ExecutionEventInner::Created {
1694            ffqn,
1695            params,
1696            parent,
1697            metadata,
1698            ..
1699        }) = events.pop_front()
1700        else {
1701            error!("Execution log must contain at least `Created` event");
1702            return Err(consistency_db_err("execution log must contain `Created` event").into());
1703        };
1704
1705        let event_history = events
1706            .into_iter()
1707            .map(|event| {
1708                if let ExecutionEventInner::HistoryEvent { event } = event {
1709                    Ok(event)
1710                } else {
1711                    error!("Rows can only contain `Created` and `HistoryEvent` event kinds");
1712                    Err(consistency_db_err(
1713                        "rows can only contain `Created` and `HistoryEvent` event kinds",
1714                    ))
1715                }
1716            })
1717            .collect::<Result<Vec<_>, _>>()?;
1718
1719        Ok(LockedExecution {
1720            execution_id: execution_id.clone(),
1721            metadata,
1722            next_version: appending_version.increment(),
1723            ffqn,
1724            params,
1725            event_history,
1726            responses,
1727            parent,
1728            intermittent_event_count,
1729            locked_event,
1730        })
1731    }
1732
1733    fn count_join_next(
1734        tx: &Transaction,
1735        execution_id: &ExecutionId,
1736        join_set_id: &JoinSetId,
1737    ) -> Result<u64, DbErrorRead> {
1738        let mut stmt = tx.prepare(
1739            "SELECT COUNT(*) as count FROM t_execution_log WHERE execution_id = :execution_id AND join_set_id = :join_set_id \
1740            AND history_event_type = :join_next",
1741        )?;
1742        Ok(stmt.query_row(
1743            named_params! {
1744                ":execution_id": execution_id.to_string(),
1745                ":join_set_id": join_set_id.to_string(),
1746                ":join_next": HISTORY_EVENT_TYPE_JOIN_NEXT,
1747            },
1748            |row| row.get("count"),
1749        )?)
1750    }
1751
1752    #[instrument(level = Level::TRACE, skip_all, fields(%execution_id))]
1753    #[expect(clippy::needless_return)]
1754    fn append(
1755        tx: &Transaction,
1756        execution_id: &ExecutionId,
1757        req: AppendRequest,
1758        appending_version: Version,
1759    ) -> Result<(AppendResponse, AppendNotifier), DbErrorWrite> {
1760        if matches!(req.event, ExecutionEventInner::Created { .. }) {
1761            return Err(DbErrorWrite::NonRetriable(
1762                DbErrorWriteNonRetriable::ValidationFailed(
1763                    "cannot append `Created` event - use `create` instead".into(),
1764                ),
1765            ));
1766        }
1767        if let AppendRequest {
1768            event:
1769                ExecutionEventInner::Locked(Locked {
1770                    component_id,
1771                    executor_id,
1772                    run_id,
1773                    lock_expires_at,
1774                    retry_config,
1775                }),
1776            created_at,
1777        } = req
1778        {
1779            return Self::lock_single_execution(
1780                tx,
1781                created_at,
1782                &component_id,
1783                execution_id,
1784                run_id,
1785                &appending_version,
1786                executor_id,
1787                lock_expires_at,
1788                retry_config,
1789            )
1790            .map(|locked_execution| (locked_execution.next_version, AppendNotifier::default()));
1791        }
1792
1793        let combined_state = Self::get_combined_state(tx, execution_id)?;
1794        if combined_state.pending_state.is_finished() {
1795            debug!("Execution is already finished");
1796            return Err(DbErrorWrite::NonRetriable(
1797                DbErrorWriteNonRetriable::IllegalState("already finished".into()),
1798            ));
1799        }
1800
1801        Self::check_expected_next_and_appending_version(
1802            &combined_state.get_next_version_assert_not_finished(),
1803            &appending_version,
1804        )?;
1805        let event_ser = serde_json::to_string(&req.event).map_err(|err| {
1806            error!("Cannot serialize {:?} - {err:?}", req.event);
1807            DbErrorWriteNonRetriable::ValidationFailed("parameter serialization error".into())
1808        })?;
1809
1810        let mut stmt = tx.prepare(
1811                    "INSERT INTO t_execution_log (execution_id, created_at, json_value, version, variant, join_set_id) \
1812                    VALUES (:execution_id, :created_at, :json_value, :version, :variant, :join_set_id)")
1813                    ?;
1814        stmt.execute(named_params! {
1815            ":execution_id": execution_id.to_string(),
1816            ":created_at": req.created_at,
1817            ":json_value": event_ser,
1818            ":version": appending_version.0,
1819            ":variant": req.event.variant(),
1820            ":join_set_id": req.event.join_set_id().map(std::string::ToString::to_string),
1821        })?;
1822        // Calculate current pending state
1823
1824        match &req.event {
1825            ExecutionEventInner::Created { .. } => {
1826                unreachable!("handled in the caller")
1827            }
1828
1829            ExecutionEventInner::Locked { .. } => {
1830                unreachable!("handled above")
1831            }
1832
1833            ExecutionEventInner::TemporarilyFailed {
1834                backoff_expires_at, ..
1835            }
1836            | ExecutionEventInner::TemporarilyTimedOut {
1837                backoff_expires_at, ..
1838            } => {
1839                let (next_version, notifier) = Self::update_state_pending_after_event_appended(
1840                    tx,
1841                    execution_id,
1842                    &appending_version,
1843                    *backoff_expires_at,
1844                    true, // an intermittent failure
1845                )?;
1846                return Ok((next_version, notifier));
1847            }
1848
1849            ExecutionEventInner::Unlocked {
1850                backoff_expires_at, ..
1851            } => {
1852                let (next_version, notifier) = Self::update_state_pending_after_event_appended(
1853                    tx,
1854                    execution_id,
1855                    &appending_version,
1856                    *backoff_expires_at,
1857                    false, // not an intermittent failure
1858                )?;
1859                return Ok((next_version, notifier));
1860            }
1861
1862            ExecutionEventInner::Finished { result, .. } => {
1863                Self::update_state_finished(
1864                    tx,
1865                    execution_id,
1866                    &appending_version,
1867                    req.created_at,
1868                    PendingStateFinishedResultKind::from(result),
1869                )?;
1870                return Ok((
1871                    appending_version,
1872                    AppendNotifier {
1873                        pending_at: None,
1874                        execution_finished: Some(NotifierExecutionFinished {
1875                            execution_id: execution_id.clone(),
1876                            retval: result.clone(),
1877                        }),
1878                        response: None,
1879                    },
1880                ));
1881            }
1882
1883            ExecutionEventInner::HistoryEvent {
1884                event:
1885                    HistoryEvent::JoinSetCreate { .. }
1886                    | HistoryEvent::JoinSetRequest {
1887                        request: JoinSetRequest::ChildExecutionRequest { .. },
1888                        ..
1889                    }
1890                    | HistoryEvent::Persist { .. }
1891                    | HistoryEvent::Schedule { .. }
1892                    | HistoryEvent::Stub { .. }
1893                    | HistoryEvent::JoinNextTooMany { .. },
1894            } => {
1895                return Ok((
1896                    Self::bump_state_next_version(tx, execution_id, &appending_version, None)?,
1897                    AppendNotifier::default(),
1898                ));
1899            }
1900
1901            ExecutionEventInner::HistoryEvent {
1902                event:
1903                    HistoryEvent::JoinSetRequest {
1904                        join_set_id,
1905                        request:
1906                            JoinSetRequest::DelayRequest {
1907                                delay_id,
1908                                expires_at,
1909                                ..
1910                            },
1911                    },
1912            } => {
1913                return Ok((
1914                    Self::bump_state_next_version(
1915                        tx,
1916                        execution_id,
1917                        &appending_version,
1918                        Some(DelayReq {
1919                            join_set_id: join_set_id.clone(),
1920                            delay_id: delay_id.clone(),
1921                            expires_at: *expires_at,
1922                        }),
1923                    )?,
1924                    AppendNotifier::default(),
1925                ));
1926            }
1927
1928            ExecutionEventInner::HistoryEvent {
1929                event:
1930                    HistoryEvent::JoinNext {
1931                        join_set_id,
1932                        run_expires_at,
1933                        closing,
1934                        requested_ffqn: _,
1935                    },
1936            } => {
1937                // Did the response arrive already?
1938                let join_next_count = Self::count_join_next(tx, execution_id, join_set_id)?;
1939                let nth_response =
1940                    Self::nth_response(tx, execution_id, join_set_id, join_next_count - 1)?; // Skip n-1 rows
1941                trace!("join_next_count: {join_next_count}, nth_response: {nth_response:?}");
1942                assert!(join_next_count > 0);
1943                if let Some(ResponseWithCursor {
1944                    event:
1945                        JoinSetResponseEventOuter {
1946                            created_at: nth_created_at,
1947                            ..
1948                        },
1949                    cursor: _,
1950                }) = nth_response
1951                {
1952                    let scheduled_at = max(*run_expires_at, nth_created_at); // No need to block
1953                    let (next_version, notifier) = Self::update_state_pending_after_event_appended(
1954                        tx,
1955                        execution_id,
1956                        &appending_version,
1957                        scheduled_at,
1958                        false, // not an intermittent failure
1959                    )?;
1960                    return Ok((next_version, notifier));
1961                }
1962                return Ok((
1963                    Self::update_state_blocked(
1964                        tx,
1965                        execution_id,
1966                        &appending_version,
1967                        join_set_id,
1968                        *run_expires_at,
1969                        *closing,
1970                    )?,
1971                    AppendNotifier::default(),
1972                ));
1973            }
1974        }
1975    }
1976
1977    fn append_response(
1978        tx: &Transaction,
1979        execution_id: &ExecutionId,
1980        response_outer: JoinSetResponseEventOuter,
1981    ) -> Result<AppendNotifier, DbErrorWrite> {
1982        let mut stmt = tx.prepare(
1983            "INSERT INTO t_join_set_response (execution_id, created_at, join_set_id, delay_id, child_execution_id, finished_version) \
1984                    VALUES (:execution_id, :created_at, :join_set_id, :delay_id, :child_execution_id, :finished_version)",
1985        )?;
1986        let join_set_id = &response_outer.event.join_set_id;
1987        let delay_id = match &response_outer.event.event {
1988            JoinSetResponse::DelayFinished { delay_id } => Some(delay_id.to_string()),
1989            JoinSetResponse::ChildExecutionFinished { .. } => None,
1990        };
1991        let (child_execution_id, finished_version) = match &response_outer.event.event {
1992            JoinSetResponse::ChildExecutionFinished {
1993                child_execution_id,
1994                finished_version,
1995                result: _,
1996            } => (
1997                Some(child_execution_id.to_string()),
1998                Some(finished_version.0),
1999            ),
2000            JoinSetResponse::DelayFinished { .. } => (None, None),
2001        };
2002
2003        stmt.execute(named_params! {
2004            ":execution_id": execution_id.to_string(),
2005            ":created_at": response_outer.created_at,
2006            ":join_set_id": join_set_id.to_string(),
2007            ":delay_id": delay_id,
2008            ":child_execution_id": child_execution_id,
2009            ":finished_version": finished_version,
2010        })?;
2011
2012        // if the execution is going to be unblocked by this response...
2013        let combined_state = Self::get_combined_state(tx, execution_id)?;
2014        debug!("previous_pending_state: {combined_state:?}");
2015        let mut notifier = if let PendingState::BlockedByJoinSet {
2016            join_set_id: found_join_set_id,
2017            lock_expires_at, // Set to a future time if the worker is keeping the execution warm waiting for the result.
2018            closing: _,
2019        } = combined_state.pending_state
2020            && *join_set_id == found_join_set_id
2021        {
2022            // PendingAt should be set to current time if called from expired_timers_watcher,
2023            // or to a future time if the execution is hot.
2024            let scheduled_at = max(lock_expires_at, response_outer.created_at);
2025            // TODO: Add diff test
2026            // Unblock the state.
2027            Self::update_state_pending_after_response_appended(
2028                tx,
2029                execution_id,
2030                scheduled_at,
2031                &combined_state.corresponding_version, // not changing the version
2032            )?
2033        } else {
2034            AppendNotifier::default()
2035        };
2036        if let JoinSetResponseEvent {
2037            join_set_id,
2038            event: JoinSetResponse::DelayFinished { delay_id },
2039        } = &response_outer.event
2040        {
2041            debug!(%join_set_id, %delay_id, "Deleting from `t_delay`");
2042            let mut stmt =
2043                tx.prepare_cached("DELETE FROM t_delay WHERE execution_id = :execution_id AND join_set_id = :join_set_id AND delay_id = :delay_id")
2044                ?;
2045            stmt.execute(named_params! {
2046                ":execution_id": execution_id.to_string(),
2047                ":join_set_id": join_set_id.to_string(),
2048                ":delay_id": delay_id.to_string(),
2049            })?;
2050        }
2051        notifier.response = Some((execution_id.clone(), response_outer));
2052        Ok(notifier)
2053    }
2054
2055    fn append_backtrace(
2056        tx: &Transaction,
2057        backtrace_info: &BacktraceInfo,
2058    ) -> Result<(), DbErrorWrite> {
2059        let backtrace = serde_json::to_string(&backtrace_info.wasm_backtrace).map_err(|err| {
2060            warn!(
2061                "Cannot serialize backtrace {:?} - {err:?}",
2062                backtrace_info.wasm_backtrace
2063            );
2064            DbErrorWriteNonRetriable::ValidationFailed("cannot serialize backtrace".into())
2065        })?;
2066        let mut stmt = tx
2067            .prepare(
2068                "INSERT INTO t_backtrace (execution_id, component_id, version_min_including, version_max_excluding, wasm_backtrace) \
2069                    VALUES (:execution_id, :component_id, :version_min_including, :version_max_excluding, :wasm_backtrace)",
2070            )
2071            ?;
2072        stmt.execute(named_params! {
2073            ":execution_id": backtrace_info.execution_id.to_string(),
2074            ":component_id": backtrace_info.component_id.to_string(),
2075            ":version_min_including": backtrace_info.version_min_including.0,
2076            ":version_max_excluding": backtrace_info.version_max_excluding.0,
2077            ":wasm_backtrace": backtrace,
2078        })?;
2079        Ok(())
2080    }
2081
2082    #[cfg(feature = "test")]
2083    fn get(
2084        tx: &Transaction,
2085        execution_id: &ExecutionId,
2086    ) -> Result<concepts::storage::ExecutionLog, DbErrorRead> {
2087        let mut stmt = tx.prepare(
2088            "SELECT created_at, json_value FROM t_execution_log WHERE \
2089                        execution_id = :execution_id ORDER BY version",
2090        )?;
2091        let events = stmt
2092            .query_map(
2093                named_params! {
2094                    ":execution_id": execution_id.to_string(),
2095                },
2096                |row| {
2097                    let created_at = row.get("created_at")?;
2098                    let event = row
2099                        .get::<_, JsonWrapper<ExecutionEventInner>>("json_value")
2100                        .map_err(|serde| {
2101                            error!("Cannot deserialize {row:?} - {serde:?}");
2102                            consistency_rusqlite("cannot deserialize event")
2103                        })?
2104                        .0;
2105
2106                    Ok(ExecutionEvent {
2107                        created_at,
2108                        event,
2109                        backtrace_id: None,
2110                    })
2111                },
2112            )?
2113            .collect::<Result<Vec<_>, _>>()?;
2114        if events.is_empty() {
2115            return Err(DbErrorRead::NotFound);
2116        }
2117        let combined_state = Self::get_combined_state(tx, execution_id)?;
2118        let responses = Self::list_responses(tx, execution_id, None)?
2119            .into_iter()
2120            .map(|resp| resp.event)
2121            .collect();
2122        Ok(concepts::storage::ExecutionLog {
2123            execution_id: execution_id.clone(),
2124            events,
2125            responses,
2126            next_version: combined_state.get_next_version_or_finished(), // In case of finished, this will be the already last version
2127            pending_state: combined_state.pending_state,
2128        })
2129    }
2130
2131    fn list_execution_events(
2132        tx: &Transaction,
2133        execution_id: &ExecutionId,
2134        version_min: VersionType,
2135        version_max_excluding: VersionType,
2136        include_backtrace_id: bool,
2137    ) -> Result<Vec<ExecutionEvent>, DbErrorRead> {
2138        let select = if include_backtrace_id {
2139            "SELECT
2140                log.created_at,
2141                log.json_value,
2142                -- Select version_min_including from backtrace if a match is found, otherwise NULL
2143                bt.version_min_including AS backtrace_id
2144            FROM
2145                t_execution_log AS log
2146            LEFT OUTER JOIN -- Use LEFT JOIN to keep all logs even if no backtrace matches
2147                t_backtrace AS bt ON log.execution_id = bt.execution_id
2148                                -- Check if the log's version falls within the backtrace's range
2149                                AND log.version >= bt.version_min_including
2150                                AND log.version < bt.version_max_excluding
2151            WHERE
2152                log.execution_id = :execution_id
2153                AND log.version >= :version_min
2154                AND log.version < :version_max_excluding
2155            ORDER BY
2156                log.version;"
2157        } else {
2158            "SELECT
2159                created_at, json_value, NULL as backtrace_id
2160            FROM t_execution_log WHERE
2161                execution_id = :execution_id AND version >= :version_min AND version < :version_max_excluding
2162            ORDER BY version"
2163        };
2164        tx.prepare(select)?
2165            .query_map(
2166                named_params! {
2167                    ":execution_id": execution_id.to_string(),
2168                    ":version_min": version_min,
2169                    ":version_max_excluding": version_max_excluding
2170                },
2171                |row| {
2172                    let created_at = row.get("created_at")?;
2173                    let backtrace_id = row
2174                        .get::<_, Option<VersionType>>("backtrace_id")?
2175                        .map(Version::new);
2176                    let event = row
2177                        .get::<_, JsonWrapper<ExecutionEventInner>>("json_value")
2178                        .map(|event| ExecutionEvent {
2179                            created_at,
2180                            event: event.0,
2181                            backtrace_id,
2182                        })
2183                        .map_err(|serde| {
2184                            error!("Cannot deserialize {row:?} - {serde:?}");
2185                            consistency_rusqlite("cannot deserialize")
2186                        })?;
2187                    Ok(event)
2188                },
2189            )?
2190            .collect::<Result<Vec<_>, _>>()
2191            .map_err(DbErrorRead::from)
2192    }
2193
2194    fn get_execution_event(
2195        tx: &Transaction,
2196        execution_id: &ExecutionId,
2197        version: VersionType,
2198    ) -> Result<ExecutionEvent, DbErrorRead> {
2199        let mut stmt = tx.prepare(
2200            "SELECT created_at, json_value FROM t_execution_log WHERE \
2201                        execution_id = :execution_id AND version = :version",
2202        )?;
2203        stmt.query_row(
2204            named_params! {
2205                ":execution_id": execution_id.to_string(),
2206                ":version": version,
2207            },
2208            |row| {
2209                let created_at = row.get("created_at")?;
2210                let event = row
2211                    .get::<_, JsonWrapper<ExecutionEventInner>>("json_value")
2212                    .map_err(|serde| {
2213                        error!("Cannot deserialize {row:?} - {serde:?}");
2214                        consistency_rusqlite("cannot deserialize event")
2215                    })?;
2216
2217                Ok(ExecutionEvent {
2218                    created_at,
2219                    event: event.0,
2220                    backtrace_id: None,
2221                })
2222            },
2223        )
2224        .map_err(DbErrorRead::from)
2225    }
2226
2227    fn list_responses(
2228        tx: &Transaction,
2229        execution_id: &ExecutionId,
2230        pagination: Option<Pagination<u32>>,
2231    ) -> Result<Vec<ResponseWithCursor>, DbErrorRead> {
2232        // TODO: Add test
2233        let mut params: Vec<(&'static str, Box<dyn rusqlite::ToSql>)> = vec![];
2234        let mut sql = "SELECT \
2235            r.id, r.created_at, r.join_set_id,  r.delay_id, r.child_execution_id, r.finished_version, l.json_value \
2236            FROM t_join_set_response r LEFT OUTER JOIN t_execution_log l ON r.child_execution_id = l.execution_id \
2237            WHERE \
2238            r.execution_id = :execution_id \
2239            AND ( r.finished_version = l.version OR r.child_execution_id IS NULL ) \
2240            "
2241        .to_string();
2242        let limit = match &pagination {
2243            Some(
2244                pagination @ (Pagination::NewerThan { cursor, .. }
2245                | Pagination::OlderThan { cursor, .. }),
2246            ) => {
2247                params.push((":cursor", Box::new(cursor)));
2248                write!(sql, " AND r.id {rel} :cursor", rel = pagination.rel()).unwrap();
2249                Some(pagination.length())
2250            }
2251            None => None,
2252        };
2253        sql.push_str(" ORDER BY id");
2254        if pagination.as_ref().is_some_and(Pagination::is_desc) {
2255            sql.push_str(" DESC");
2256        }
2257        if let Some(limit) = limit {
2258            write!(sql, " LIMIT {limit}").unwrap();
2259        }
2260        params.push((":execution_id", Box::new(execution_id.to_string())));
2261        tx.prepare(&sql)?
2262            .query_map::<_, &[(&'static str, &dyn ToSql)], _>(
2263                params
2264                    .iter()
2265                    .map(|(key, value)| (*key, value.as_ref()))
2266                    .collect::<Vec<_>>()
2267                    .as_ref(),
2268                Self::parse_response_with_cursor,
2269            )?
2270            .collect::<Result<Vec<_>, rusqlite::Error>>()
2271            .map_err(DbErrorRead::from)
2272    }
2273
2274    fn parse_response_with_cursor(
2275        row: &rusqlite::Row<'_>,
2276    ) -> Result<ResponseWithCursor, rusqlite::Error> {
2277        let id = row.get("id")?;
2278        let created_at: DateTime<Utc> = row.get("created_at")?;
2279        let join_set_id = row.get::<_, JoinSetId>("join_set_id")?;
2280        let event = match (
2281            row.get::<_, Option<DelayId>>("delay_id")?,
2282            row.get::<_, Option<ExecutionIdDerived>>("child_execution_id")?,
2283            row.get::<_, Option<VersionType>>("finished_version")?,
2284            row.get::<_, Option<JsonWrapper<ExecutionEventInner>>>("json_value")?,
2285        ) {
2286            (Some(delay_id), None, None, None) => JoinSetResponse::DelayFinished { delay_id },
2287            (
2288                None,
2289                Some(child_execution_id),
2290                Some(finished_version),
2291                Some(JsonWrapper(ExecutionEventInner::Finished { result, .. })),
2292            ) => JoinSetResponse::ChildExecutionFinished {
2293                child_execution_id,
2294                finished_version: Version(finished_version),
2295                result,
2296            },
2297            (delay, child, finished, result) => {
2298                error!(
2299                    "Invalid row in t_join_set_response {id} - {:?} {child:?} {finished:?} {:?}",
2300                    delay,
2301                    result.map(|it| it.0)
2302                );
2303                return Err(consistency_rusqlite("invalid row in t_join_set_response"));
2304            }
2305        };
2306        Ok(ResponseWithCursor {
2307            cursor: id,
2308            event: JoinSetResponseEventOuter {
2309                event: JoinSetResponseEvent { join_set_id, event },
2310                created_at,
2311            },
2312        })
2313    }
2314
2315    fn nth_response(
2316        tx: &Transaction,
2317        execution_id: &ExecutionId,
2318        join_set_id: &JoinSetId,
2319        skip_rows: u64,
2320    ) -> Result<Option<ResponseWithCursor>, DbErrorRead> {
2321        // TODO: Add test
2322        tx
2323            .prepare(
2324                "SELECT r.id, r.created_at, r.join_set_id, \
2325                    r.delay_id, \
2326                    r.child_execution_id, r.finished_version, l.json_value \
2327                    FROM t_join_set_response r LEFT OUTER JOIN t_execution_log l ON r.child_execution_id = l.execution_id \
2328                    WHERE \
2329                    r.execution_id = :execution_id AND r.join_set_id = :join_set_id AND \
2330                    (
2331                    r.finished_version = l.version \
2332                    OR \
2333                    r.child_execution_id IS NULL \
2334                    ) \
2335                    ORDER BY id \
2336                    LIMIT 1 OFFSET :offset",
2337            )
2338            ?
2339            .query_row(
2340                named_params! {
2341                    ":execution_id": execution_id.to_string(),
2342                    ":join_set_id": join_set_id.to_string(),
2343                    ":offset": skip_rows,
2344                },
2345                Self::parse_response_with_cursor,
2346            )
2347            .optional()
2348            .map_err(DbErrorRead::from)
2349    }
2350
2351    // TODO(perf): Instead of OFFSET an per-execution sequential ID could improve the read performance.
2352    #[instrument(level = Level::TRACE, skip_all)]
2353    fn get_responses_with_offset(
2354        tx: &Transaction,
2355        execution_id: &ExecutionId,
2356        skip_rows: usize,
2357    ) -> Result<Vec<JoinSetResponseEventOuter>, DbErrorRead> {
2358        // TODO: Add test
2359        tx.prepare(
2360            "SELECT r.id, r.created_at, r.join_set_id, \
2361            r.delay_id, \
2362            r.child_execution_id, r.finished_version, l.json_value \
2363            FROM t_join_set_response r LEFT OUTER JOIN t_execution_log l ON r.child_execution_id = l.execution_id \
2364            WHERE \
2365            r.execution_id = :execution_id AND \
2366            ( \
2367            r.finished_version = l.version \
2368            OR r.child_execution_id IS NULL \
2369            ) \
2370            ORDER BY id \
2371            LIMIT -1 OFFSET :offset",
2372        )
2373        ?
2374        .query_map(
2375            named_params! {
2376                ":execution_id": execution_id.to_string(),
2377                ":offset": skip_rows,
2378            },
2379            Self::parse_response_with_cursor,
2380        )
2381        ?
2382        .collect::<Result<Vec<_>, _>>()
2383        .map(|resp| resp.into_iter().map(|vec| vec.event).collect())
2384        .map_err(DbErrorRead::from)
2385    }
2386
2387    fn get_pending_of_single_ffqn(
2388        mut stmt: CachedStatement,
2389        batch_size: usize,
2390        pending_at_or_sooner: DateTime<Utc>,
2391        ffqn: &FunctionFqn,
2392    ) -> Result<Vec<(ExecutionId, Version)>, ()> {
2393        stmt.query_map(
2394            named_params! {
2395                ":pending_expires_finished": pending_at_or_sooner,
2396                ":ffqn": ffqn.to_string(),
2397                ":batch_size": batch_size,
2398            },
2399            |row| {
2400                let execution_id = row.get::<_, String>("execution_id")?.parse::<ExecutionId>();
2401                let next_version =
2402                    Version::new(row.get::<_, VersionType>("corresponding_version")?).increment();
2403                Ok(execution_id.map(|exe| (exe, next_version)))
2404            },
2405        )
2406        .map_err(|err| {
2407            warn!("Ignoring consistency error {err:?}");
2408        })?
2409        .collect::<Result<Vec<_>, _>>()
2410        .map_err(|err| {
2411            warn!("Ignoring consistency error {err:?}");
2412        })?
2413        .into_iter()
2414        .collect::<Result<Vec<_>, _>>()
2415        .map_err(|err| {
2416            warn!("Ignoring consistency error {err:?}");
2417        })
2418    }
2419
2420    /// Get executions and their next versions
2421    fn get_pending(
2422        conn: &Connection,
2423        batch_size: usize,
2424        pending_at_or_sooner: DateTime<Utc>,
2425        ffqns: &[FunctionFqn],
2426    ) -> Result<Vec<(ExecutionId, Version)>, DbErrorGeneric> {
2427        let mut execution_ids_versions = Vec::with_capacity(batch_size);
2428        for ffqn in ffqns {
2429            // Select executions in PendingAt.
2430            let stmt = conn
2431                .prepare_cached(&format!(
2432                    r#"
2433                    SELECT execution_id, corresponding_version FROM t_state WHERE
2434                    state = "{STATE_PENDING_AT}" AND
2435                    pending_expires_finished <= :pending_expires_finished AND ffqn = :ffqn
2436                    ORDER BY pending_expires_finished LIMIT :batch_size
2437                    "#
2438                ))
2439                .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
2440
2441            if let Ok(execs_and_versions) = Self::get_pending_of_single_ffqn(
2442                stmt,
2443                batch_size - execution_ids_versions.len(),
2444                pending_at_or_sooner,
2445                ffqn,
2446            ) {
2447                execution_ids_versions.extend(execs_and_versions);
2448                if execution_ids_versions.len() == batch_size {
2449                    // Prioritieze lowering of db requests, although ffqns later in the list might get starved.
2450                    break;
2451                }
2452                // consistency errors are ignored since we want to return at least some rows.
2453            }
2454        }
2455        Ok(execution_ids_versions)
2456    }
2457
2458    // Must be called after write transaction for a correct happens-before relationship.
2459    #[instrument(level = Level::TRACE, skip_all)]
2460    fn notify_all(&self, notifiers: Vec<AppendNotifier>, current_time: DateTime<Utc>) {
2461        let (pending_ats, finished_execs, responses) = {
2462            let (mut pending_ats, mut finished_execs, mut responses) =
2463                (Vec::new(), Vec::new(), Vec::new());
2464            for notifier in notifiers {
2465                if let Some(pending_at) = notifier.pending_at {
2466                    pending_ats.push(pending_at);
2467                }
2468                if let Some(finished) = notifier.execution_finished {
2469                    finished_execs.push(finished);
2470                }
2471                if let Some(response) = notifier.response {
2472                    responses.push(response);
2473                }
2474            }
2475            (pending_ats, finished_execs, responses)
2476        };
2477
2478        // Notify pending_at subscribers.
2479        if !pending_ats.is_empty() {
2480            let guard = self.0.pending_ffqn_subscribers.lock().unwrap();
2481            for pending_at in pending_ats {
2482                Self::notify_pending_locked(&pending_at, current_time, &guard);
2483            }
2484        }
2485        // Notify execution finished subscribers.
2486        // Every NotifierExecutionFinished value belongs to a different execution, since only `append(Finished)` can produce `NotifierExecutionFinished`.
2487        if !finished_execs.is_empty() {
2488            let mut guard = self.0.execution_finished_subscribers.lock().unwrap();
2489            for finished in finished_execs {
2490                if let Some(listeners_of_exe_id) = guard.remove(&finished.execution_id) {
2491                    for (_tag, sender) in listeners_of_exe_id {
2492                        // Sending while holding the lock but the oneshot sender does not block.
2493                        // If `wait_for_finished_result` happens after the append, it would receive the finished value instead.
2494                        let _ = sender.send(finished.retval.clone());
2495                    }
2496                }
2497            }
2498        }
2499        // Notify response subscribers.
2500        if !responses.is_empty() {
2501            let mut guard = self.0.response_subscribers.lock().unwrap();
2502            for (execution_id, response) in responses {
2503                if let Some((sender, _)) = guard.remove(&execution_id) {
2504                    let _ = sender.send(response);
2505                }
2506            }
2507        }
2508    }
2509
2510    fn notify_pending_locked(
2511        notifier: &NotifierPendingAt,
2512        current_time: DateTime<Utc>,
2513        ffqn_to_pending_subscription: &std::sync::MutexGuard<
2514            HashMap<FunctionFqn, (mpsc::Sender<()>, u64)>,
2515        >,
2516    ) {
2517        // No need to remove here, cleanup is handled by the caller.
2518        if notifier.scheduled_at <= current_time
2519            && let Some((subscription, _)) = ffqn_to_pending_subscription.get(&notifier.ffqn)
2520        {
2521            debug!("Notifying pending subscriber");
2522            // Does not block
2523            let _ = subscription.try_send(());
2524        }
2525    }
2526}
2527
2528#[async_trait]
2529impl DbExecutor for SqlitePool {
2530    #[instrument(level = Level::TRACE, skip(self))]
2531    async fn lock_pending(
2532        &self,
2533        batch_size: usize,
2534        pending_at_or_sooner: DateTime<Utc>,
2535        ffqns: Arc<[FunctionFqn]>,
2536        created_at: DateTime<Utc>,
2537        component_id: ComponentId,
2538        executor_id: ExecutorId,
2539        lock_expires_at: DateTime<Utc>,
2540        run_id: RunId,
2541        retry_config: ComponentRetryConfig,
2542    ) -> Result<LockPendingResponse, DbErrorGeneric> {
2543        let execution_ids_versions = self
2544            .transaction(
2545                move |conn| Self::get_pending(conn, batch_size, pending_at_or_sooner, &ffqns),
2546                "get_pending",
2547            )
2548            .await?;
2549        if execution_ids_versions.is_empty() {
2550            Ok(vec![])
2551        } else {
2552            debug!("Locking {execution_ids_versions:?}");
2553            self.transaction(
2554                move |tx| {
2555                    let mut locked_execs = Vec::with_capacity(execution_ids_versions.len());
2556                    // Append lock
2557                    for (execution_id, version) in &execution_ids_versions {
2558                        match Self::lock_single_execution(
2559                            tx,
2560                            created_at,
2561                            &component_id,
2562                            execution_id,
2563                            run_id,
2564                            version,
2565                            executor_id,
2566                            lock_expires_at,
2567                            retry_config,
2568                        ) {
2569                            Ok(locked) => locked_execs.push(locked),
2570                            Err(err) => {
2571                                warn!("Locking row {execution_id} failed - {err:?}");
2572                            }
2573                        }
2574                    }
2575                    Ok(locked_execs)
2576                },
2577                "lock_pending",
2578            )
2579            .await
2580        }
2581    }
2582
2583    #[instrument(level = Level::DEBUG, skip(self))]
2584    async fn lock_one(
2585        &self,
2586        created_at: DateTime<Utc>,
2587        component_id: ComponentId,
2588        execution_id: &ExecutionId,
2589        run_id: RunId,
2590        version: Version,
2591        executor_id: ExecutorId,
2592        lock_expires_at: DateTime<Utc>,
2593        retry_config: ComponentRetryConfig,
2594    ) -> Result<LockedExecution, DbErrorWrite> {
2595        debug!(%execution_id, "lock_one");
2596        let execution_id = execution_id.clone();
2597        self.transaction(
2598            move |tx| {
2599                Self::lock_single_execution(
2600                    tx,
2601                    created_at,
2602                    &component_id,
2603                    &execution_id,
2604                    run_id,
2605                    &version,
2606                    executor_id,
2607                    lock_expires_at,
2608                    retry_config,
2609                )
2610            },
2611            "lock_inner",
2612        )
2613        .await
2614    }
2615
2616    #[instrument(level = Level::DEBUG, skip(self, req))]
2617    async fn append(
2618        &self,
2619        execution_id: ExecutionId,
2620        version: Version,
2621        req: AppendRequest,
2622    ) -> Result<AppendResponse, DbErrorWrite> {
2623        debug!(%req, "append");
2624        trace!(?req, "append");
2625        let created_at = req.created_at;
2626        let (version, notifier) = self
2627            .transaction(
2628                move |tx| Self::append(tx, &execution_id, req.clone(), version.clone()),
2629                "append",
2630            )
2631            .await?;
2632        self.notify_all(vec![notifier], created_at);
2633        Ok(version)
2634    }
2635
2636    #[instrument(level = Level::DEBUG, skip_all)]
2637    async fn append_batch_respond_to_parent(
2638        &self,
2639        events: AppendEventsToExecution,
2640        response: AppendResponseToExecution,
2641        current_time: DateTime<Utc>,
2642    ) -> Result<AppendBatchResponse, DbErrorWrite> {
2643        debug!("append_batch_respond_to_parent");
2644        if events.execution_id == response.parent_execution_id {
2645            // Pending state would be wrong.
2646            // This is not a panic because it depends on DB state.
2647            return Err(DbErrorWrite::NonRetriable(
2648                DbErrorWriteNonRetriable::ValidationFailed(
2649                    "Parameters `execution_id` and `parent_execution_id` cannot be the same".into(),
2650                ),
2651            ));
2652        }
2653        if events.batch.is_empty() {
2654            error!("Batch cannot be empty");
2655            return Err(DbErrorWrite::NonRetriable(
2656                DbErrorWriteNonRetriable::ValidationFailed("batch cannot be empty".into()),
2657            ));
2658        }
2659        let (version, notifiers) = {
2660            self.transaction(
2661                move |tx| {
2662                    let mut version = events.version.clone();
2663                    let mut notifier_of_child = None;
2664                    for append_request in &events.batch {
2665                        let (v, n) = Self::append(
2666                            tx,
2667                            &events.execution_id,
2668                            append_request.clone(),
2669                            version,
2670                        )?;
2671                        version = v;
2672                        notifier_of_child = Some(n);
2673                    }
2674
2675                    let pending_at_parent = Self::append_response(
2676                        tx,
2677                        &response.parent_execution_id,
2678                        response.parent_response_event.clone(),
2679                    )?;
2680                    Ok::<_, DbErrorWrite>((
2681                        version,
2682                        vec![
2683                            notifier_of_child.expect("checked that the batch is not empty"),
2684                            pending_at_parent,
2685                        ],
2686                    ))
2687                },
2688                "append_batch_respond_to_parent",
2689            )
2690            .await?
2691        };
2692        self.notify_all(notifiers, current_time);
2693        Ok(version)
2694    }
2695
2696    // Supports only one subscriber per ffqn.
2697    // A new subscriber replaces the old one, which will eventually time out, which is fine.
2698    #[instrument(level = Level::TRACE, skip(self, timeout_fut))]
2699    async fn wait_for_pending(
2700        &self,
2701        pending_at_or_sooner: DateTime<Utc>,
2702        ffqns: Arc<[FunctionFqn]>,
2703        timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
2704    ) {
2705        let unique_tag: u64 = rand::random();
2706        let (sender, mut receiver) = mpsc::channel(1); // senders must use `try_send`
2707        {
2708            let mut ffqn_to_pending_subscription = self.0.pending_ffqn_subscribers.lock().unwrap();
2709            for ffqn in ffqns.as_ref() {
2710                ffqn_to_pending_subscription.insert(ffqn.clone(), (sender.clone(), unique_tag));
2711            }
2712        }
2713        async {
2714            let Ok(execution_ids_versions) = self
2715                .transaction(
2716                    {
2717                        let ffqns = ffqns.clone();
2718                        move |conn| Self::get_pending(conn, 1, pending_at_or_sooner, ffqns.as_ref())
2719                    },
2720                    "subscribe_to_pending",
2721                )
2722                .await
2723            else {
2724                trace!(
2725                    "Ignoring get_pending error and waiting in for timeout to avoid executor repolling too soon"
2726                );
2727                timeout_fut.await;
2728                return;
2729            };
2730            if !execution_ids_versions.is_empty() {
2731                trace!("Not waiting, database already contains new pending executions");
2732                return;
2733            }
2734            tokio::select! { // future's liveness: Dropping the loser immediately.
2735                _ = receiver.recv() => {
2736                    trace!("Received a notification");
2737                }
2738                () = timeout_fut => {
2739                }
2740            }
2741        }.await;
2742        // Clean up ffqn_to_pending_subscription in any case
2743        {
2744            let mut ffqn_to_pending_subscription = self.0.pending_ffqn_subscribers.lock().unwrap();
2745            for ffqn in ffqns.as_ref() {
2746                match ffqn_to_pending_subscription.remove(ffqn) {
2747                    Some((_, tag)) if tag == unique_tag => {
2748                        // Cleanup OK.
2749                    }
2750                    Some(other) => {
2751                        // Reinsert foreign sender.
2752                        ffqn_to_pending_subscription.insert(ffqn.clone(), other);
2753                    }
2754                    None => {
2755                        // Value was replaced and cleaned up already.
2756                    }
2757                }
2758            }
2759        }
2760    }
2761}
2762
2763#[async_trait]
2764impl DbConnection for SqlitePool {
2765    #[instrument(level = Level::DEBUG, skip_all, fields(execution_id = %req.execution_id))]
2766    async fn create(&self, req: CreateRequest) -> Result<AppendResponse, DbErrorWrite> {
2767        debug!("create");
2768        trace!(?req, "create");
2769        let created_at = req.created_at;
2770        let (version, notifier) = self
2771            .transaction(move |tx| Self::create_inner(tx, req.clone()), "create")
2772            .await?;
2773        self.notify_all(vec![notifier], created_at);
2774        Ok(version)
2775    }
2776
2777    #[instrument(level = Level::DEBUG, skip(self, batch))]
2778    async fn append_batch(
2779        &self,
2780        current_time: DateTime<Utc>,
2781        batch: Vec<AppendRequest>,
2782        execution_id: ExecutionId,
2783        version: Version,
2784    ) -> Result<AppendBatchResponse, DbErrorWrite> {
2785        debug!("append_batch");
2786        trace!(?batch, "append_batch");
2787        assert!(!batch.is_empty(), "Empty batch request");
2788
2789        let (version, notifier) = self
2790            .transaction(
2791                move |tx| {
2792                    let mut version = version.clone();
2793                    let mut notifier = None;
2794                    for append_request in &batch {
2795                        let (v, n) =
2796                            Self::append(tx, &execution_id, append_request.clone(), version)?;
2797                        version = v;
2798                        notifier = Some(n);
2799                    }
2800                    Ok::<_, DbErrorWrite>((
2801                        version,
2802                        notifier.expect("checked that the batch is not empty"),
2803                    ))
2804                },
2805                "append_batch",
2806            )
2807            .await?;
2808
2809        self.notify_all(vec![notifier], current_time);
2810        Ok(version)
2811    }
2812
2813    #[instrument(level = Level::DEBUG, skip(self, batch, child_req))]
2814    async fn append_batch_create_new_execution(
2815        &self,
2816        current_time: DateTime<Utc>,
2817        batch: Vec<AppendRequest>,
2818        execution_id: ExecutionId,
2819        version: Version,
2820        child_req: Vec<CreateRequest>,
2821    ) -> Result<AppendBatchResponse, DbErrorWrite> {
2822        debug!("append_batch_create_new_execution");
2823        trace!(?batch, ?child_req, "append_batch_create_new_execution");
2824        assert!(!batch.is_empty(), "Empty batch request");
2825
2826        let (version, notifiers) = self
2827            .transaction(
2828                move |tx| {
2829                    let mut notifier = None;
2830                    let mut version = version.clone();
2831                    for append_request in &batch {
2832                        let (v, n) =
2833                            Self::append(tx, &execution_id, append_request.clone(), version)?;
2834                        version = v;
2835                        notifier = Some(n);
2836                    }
2837                    let mut notifiers = Vec::new();
2838                    notifiers.push(notifier.expect("checked that the batch is not empty"));
2839
2840                    for child_req in &child_req {
2841                        let (_, notifier) = Self::create_inner(tx, child_req.clone())?;
2842                        notifiers.push(notifier);
2843                    }
2844                    Ok::<_, DbErrorWrite>((version, notifiers))
2845                },
2846                "append_batch_create_new_execution_inner",
2847            )
2848            .await?;
2849        self.notify_all(notifiers, current_time);
2850        Ok(version)
2851    }
2852
2853    #[cfg(feature = "test")]
2854    #[instrument(level = Level::DEBUG, skip(self))]
2855    async fn get(
2856        &self,
2857        execution_id: &ExecutionId,
2858    ) -> Result<concepts::storage::ExecutionLog, DbErrorRead> {
2859        trace!("get");
2860        let execution_id = execution_id.clone();
2861        self.transaction(move |tx| Self::get(tx, &execution_id), "get")
2862            .await
2863    }
2864
2865    #[instrument(level = Level::DEBUG, skip(self))]
2866    async fn list_execution_events(
2867        &self,
2868        execution_id: &ExecutionId,
2869        since: &Version,
2870        max_length: VersionType,
2871        include_backtrace_id: bool,
2872    ) -> Result<Vec<ExecutionEvent>, DbErrorRead> {
2873        let execution_id = execution_id.clone();
2874        let since = since.0;
2875        self.transaction(
2876            move |tx| {
2877                Self::list_execution_events(
2878                    tx,
2879                    &execution_id,
2880                    since,
2881                    since + max_length,
2882                    include_backtrace_id,
2883                )
2884            },
2885            "get",
2886        )
2887        .await
2888    }
2889
2890    // Supports only one subscriber per execution id.
2891    // A new call will overwrite the old subscriber, the old one will end
2892    // with a timeout, which is fine.
2893    #[instrument(level = Level::DEBUG, skip(self, timeout_fut))]
2894    async fn subscribe_to_next_responses(
2895        &self,
2896        execution_id: &ExecutionId,
2897        start_idx: usize,
2898        timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
2899    ) -> Result<Vec<JoinSetResponseEventOuter>, DbErrorReadWithTimeout> {
2900        debug!("next_responses");
2901        let unique_tag: u64 = rand::random();
2902        let execution_id = execution_id.clone();
2903
2904        let cleanup = || {
2905            let mut guard = self.0.response_subscribers.lock().unwrap();
2906            match guard.remove(&execution_id) {
2907                Some((_, tag)) if tag == unique_tag => {} // Cleanup OK.
2908                Some(other) => {
2909                    // Reinsert foreign sender.
2910                    guard.insert(execution_id.clone(), other);
2911                }
2912                None => {} // Value was replaced and cleaned up already, or notification was sent.
2913            }
2914        };
2915
2916        let response_subscribers = self.0.response_subscribers.clone();
2917        let resp_or_receiver = {
2918            let execution_id = execution_id.clone();
2919            self.transaction(
2920                move |tx| {
2921                    let responses = Self::get_responses_with_offset(tx, &execution_id, start_idx)?;
2922                    if responses.is_empty() {
2923                        // cannot race as we have the transaction write lock
2924                        let (sender, receiver) = oneshot::channel();
2925                        response_subscribers
2926                            .lock()
2927                            .unwrap()
2928                            .insert(execution_id.clone(), (sender, unique_tag));
2929                        Ok::<_, DbErrorReadWithTimeout>(itertools::Either::Right(receiver))
2930                    } else {
2931                        Ok(itertools::Either::Left(responses))
2932                    }
2933                },
2934                "subscribe_to_next_responses",
2935            )
2936            .await
2937        }
2938        .inspect_err(|_| {
2939            cleanup();
2940        })?;
2941        match resp_or_receiver {
2942            itertools::Either::Left(resp) => Ok(resp), // no need for cleanup
2943            itertools::Either::Right(receiver) => {
2944                let res = async move {
2945                    tokio::select! {
2946                        resp = receiver => {
2947                            let resp = resp.map_err(|_| DbErrorGeneric::Close)?;
2948                            Ok(vec![resp])
2949                        }
2950                        () = timeout_fut => Err(DbErrorReadWithTimeout::Timeout),
2951                    }
2952                }
2953                .await;
2954                cleanup();
2955                res
2956            }
2957        }
2958    }
2959
2960    // Supports multiple subscribers.
2961    async fn wait_for_finished_result(
2962        &self,
2963        execution_id: &ExecutionId,
2964        timeout_fut: Option<Pin<Box<dyn Future<Output = ()> + Send>>>,
2965    ) -> Result<SupportedFunctionReturnValue, DbErrorReadWithTimeout> {
2966        let unique_tag: u64 = rand::random();
2967        let execution_id = execution_id.clone();
2968        let execution_finished_subscription = self.0.execution_finished_subscribers.clone();
2969
2970        let cleanup = || {
2971            let mut guard = self.0.execution_finished_subscribers.lock().unwrap();
2972            if let Some(subscribers) = guard.get_mut(&execution_id) {
2973                subscribers.remove(&unique_tag);
2974            }
2975        };
2976
2977        let resp_or_receiver = {
2978            let execution_id = execution_id.clone();
2979            self.transaction(move |tx| {
2980                let pending_state =
2981                    Self::get_combined_state(tx, &execution_id)?.pending_state;
2982                if let PendingState::Finished { finished } = pending_state {
2983                    let event =
2984                        Self::get_execution_event(tx, &execution_id, finished.version)?;
2985                    if let ExecutionEventInner::Finished { result, ..} = event.event {
2986                        Ok(itertools::Either::Left(result))
2987                    } else {
2988                        error!("Mismatch, expected Finished row: {event:?} based on t_state {finished}");
2989                        Err(DbErrorReadWithTimeout::from(consistency_db_err(
2990                            "cannot get finished event based on t_state version"
2991                        )))
2992                    }
2993                } else {
2994                    // Cannot race with the notifier as we have the transaction write lock:
2995                    // Either the finished event was appended previously, thus `itertools::Either::Left` was selected,
2996                    // or we end up here. If this tx fails, the cleanup will remove this entry.
2997                    let (sender, receiver) = oneshot::channel();
2998                    let mut guard = execution_finished_subscription.lock().unwrap();
2999                    guard.entry(execution_id.clone()).or_default().insert(unique_tag, sender);
3000                    Ok(itertools::Either::Right(receiver))
3001                }
3002            }, "wait_for_finished_result")
3003            .await
3004        }
3005        .inspect_err(|_| {
3006            // This cleanup can race with the notification sender, since both are running after a transaction was finished.
3007            // If the notification sender wins, it removes our oneshot sender and puts a value in it, cleanup will not find the unique tag.
3008            // If cleanup wins, it simply removes the oneshot sender.
3009            cleanup();
3010        })?;
3011
3012        let timeout_fut = timeout_fut.unwrap_or_else(|| Box::pin(std::future::pending()));
3013        match resp_or_receiver {
3014            itertools::Either::Left(resp) => Ok(resp), // no need for cleanup
3015            itertools::Either::Right(receiver) => {
3016                let res = async move {
3017                    tokio::select! {
3018                        resp = receiver => {
3019                            Ok(resp.expect("the notifier sends to all listeners, cannot race with cleanup"))
3020                        }
3021                        () = timeout_fut => Err(DbErrorReadWithTimeout::Timeout),
3022                    }
3023                }
3024                .await;
3025                cleanup();
3026                res
3027            }
3028        }
3029    }
3030
3031    #[instrument(level = Level::DEBUG, skip(self, response_event), fields(join_set_id = %response_event.join_set_id))]
3032    async fn append_response(
3033        &self,
3034        created_at: DateTime<Utc>,
3035        execution_id: ExecutionId,
3036        response_event: JoinSetResponseEvent,
3037    ) -> Result<(), DbErrorWrite> {
3038        debug!("append_response");
3039        let event = JoinSetResponseEventOuter {
3040            created_at,
3041            event: response_event,
3042        };
3043        let notifier = self
3044            .transaction(
3045                move |tx| Self::append_response(tx, &execution_id, event.clone()),
3046                "append_response",
3047            )
3048            .await?;
3049        self.notify_all(vec![notifier], created_at);
3050        Ok(())
3051    }
3052
3053    #[instrument(level = Level::DEBUG, skip_all)]
3054    async fn append_backtrace(&self, append: BacktraceInfo) -> Result<(), DbErrorWrite> {
3055        debug!("append_backtrace");
3056        self.transaction(
3057            move |tx| Self::append_backtrace(tx, &append),
3058            "append_backtrace",
3059        )
3060        .await
3061    }
3062
3063    #[instrument(level = Level::DEBUG, skip_all)]
3064    async fn append_backtrace_batch(&self, batch: Vec<BacktraceInfo>) -> Result<(), DbErrorWrite> {
3065        debug!("append_backtrace_batch");
3066        self.transaction(
3067            move |tx| {
3068                for append in &batch {
3069                    Self::append_backtrace(tx, append)?;
3070                }
3071                Ok(())
3072            },
3073            "append_backtrace",
3074        )
3075        .await
3076    }
3077
3078    #[instrument(level = Level::DEBUG, skip_all)]
3079    async fn get_backtrace(
3080        &self,
3081        execution_id: &ExecutionId,
3082        filter: BacktraceFilter,
3083    ) -> Result<BacktraceInfo, DbErrorRead> {
3084        debug!("get_last_backtrace");
3085        let execution_id = execution_id.clone();
3086
3087        self.transaction(
3088            move |tx| {
3089                let select = "SELECT component_id, version_min_including, version_max_excluding, wasm_backtrace FROM t_backtrace \
3090                                WHERE execution_id = :execution_id";
3091                let mut params: Vec<(&'static str, Box<dyn rusqlite::ToSql>)> = vec![(":execution_id", Box::new(execution_id.to_string()))];
3092                let select = match &filter {
3093                    BacktraceFilter::Specific(version) =>{
3094                        params.push((":version", Box::new(version.0)));
3095                        format!("{select} AND version_min_including <= :version AND version_max_excluding > :version")
3096                    },
3097                    BacktraceFilter::First => format!("{select} ORDER BY version_min_including LIMIT 1"),
3098                    BacktraceFilter::Last => format!("{select} ORDER BY version_min_including DESC LIMIT 1")
3099                };
3100                tx
3101                    .prepare(&select)
3102                    ?
3103                    .query_row::<_, &[(&'static str, &dyn ToSql)], _>(
3104                        params
3105                            .iter()
3106                            .map(|(key, value)| (*key, value.as_ref()))
3107                            .collect::<Vec<_>>()
3108                            .as_ref(),
3109                    |row| {
3110                        Ok(BacktraceInfo {
3111                            execution_id: execution_id.clone(),
3112                            component_id: row.get::<_, FromStrWrapper<_> >("component_id")?.0,
3113                            version_min_including: Version::new(row.get::<_, VersionType>("version_min_including")?),
3114                            version_max_excluding: Version::new(row.get::<_, VersionType>("version_max_excluding")?),
3115                            wasm_backtrace: row.get::<_, JsonWrapper<_>>("wasm_backtrace")?.0,
3116                        })
3117                    },
3118                ).map_err(DbErrorRead::from)
3119            },
3120            "get_last_backtrace",
3121        ).await
3122    }
3123
3124    /// Get currently expired delays and locks.
3125    #[instrument(level = Level::TRACE, skip(self))]
3126    async fn get_expired_timers(
3127        &self,
3128        at: DateTime<Utc>,
3129    ) -> Result<Vec<ExpiredTimer>, DbErrorGeneric> {
3130        self.transaction(
3131            move |conn| {
3132                let mut expired_timers = conn.prepare(
3133                    "SELECT execution_id, join_set_id, delay_id FROM t_delay WHERE expires_at <= :at",
3134                )?
3135                .query_map(
3136                        named_params! {
3137                            ":at": at,
3138                        },
3139                        |row| {
3140                            let execution_id = row.get("execution_id")?;
3141                            let join_set_id = row.get::<_, JoinSetId>("join_set_id")?;
3142                            let delay_id = row.get::<_, DelayId>("delay_id")?;
3143                            let delay = ExpiredDelay { execution_id, join_set_id, delay_id };
3144                            Ok(ExpiredTimer::Delay(delay))
3145                        },
3146                    )?
3147                    .collect::<Result<Vec<_>, _>>()?;
3148                // Extend with expired locks
3149                let expired = conn.prepare(&format!(r#"
3150                    SELECT execution_id, last_lock_version, corresponding_version, intermittent_event_count, max_retries, retry_exp_backoff_millis,
3151                    executor_id, run_id
3152                    FROM t_state
3153                    WHERE pending_expires_finished <= :at AND state = "{STATE_LOCKED}"
3154                    "#
3155                )
3156                )?
3157                .query_map(
3158                        named_params! {
3159                            ":at": at,
3160                        },
3161                        |row| {
3162                            let execution_id = row.get("execution_id")?;
3163                            let locked_at_version = Version::new(row.get("last_lock_version")?);
3164                            let next_version = Version::new(row.get("corresponding_version")?).increment();
3165                            let intermittent_event_count = row.get("intermittent_event_count")?;
3166                            let max_retries = row.get("max_retries")?;
3167                            let retry_exp_backoff_millis = row.get("retry_exp_backoff_millis")?;
3168                            let executor_id = row.get("executor_id")?;
3169                            let run_id = row.get("run_id")?;
3170                            let lock = ExpiredLock {
3171                                execution_id,
3172                                locked_at_version,
3173                                next_version,
3174                                intermittent_event_count,
3175                                max_retries,
3176                                retry_exp_backoff: Duration::from_millis(retry_exp_backoff_millis),
3177                                locked_by: LockedBy { executor_id, run_id },
3178                            };
3179                            Ok(ExpiredTimer::Lock(lock))
3180                        }
3181                    )?
3182                    .collect::<Result<Vec<_>, _>>()?;
3183                expired_timers.extend(expired);
3184                if !expired_timers.is_empty() {
3185                    debug!("get_expired_timers found {expired_timers:?}");
3186                }
3187                Ok(expired_timers)
3188            }, "get_expired_timers"
3189        )
3190        .await
3191    }
3192
3193    async fn get_execution_event(
3194        &self,
3195        execution_id: &ExecutionId,
3196        version: &Version,
3197    ) -> Result<ExecutionEvent, DbErrorRead> {
3198        let version = version.0;
3199        let execution_id = execution_id.clone();
3200        self.transaction(
3201            move |tx| Self::get_execution_event(tx, &execution_id, version),
3202            "get_execution_event",
3203        )
3204        .await
3205    }
3206
3207    async fn get_pending_state(
3208        &self,
3209        execution_id: &ExecutionId,
3210    ) -> Result<PendingState, DbErrorRead> {
3211        let execution_id = execution_id.clone();
3212        Ok(self
3213            .transaction(
3214                move |tx| Self::get_combined_state(tx, &execution_id),
3215                "get_pending_state",
3216            )
3217            .await?
3218            .pending_state)
3219    }
3220
3221    async fn list_executions(
3222        &self,
3223        ffqn: Option<FunctionFqn>,
3224        top_level_only: bool,
3225        pagination: ExecutionListPagination,
3226    ) -> Result<Vec<ExecutionWithState>, DbErrorGeneric> {
3227        self.transaction(
3228            move |tx| Self::list_executions(tx, ffqn.as_ref(), top_level_only, &pagination),
3229            "list_executions",
3230        )
3231        .await
3232    }
3233
3234    async fn list_responses(
3235        &self,
3236        execution_id: &ExecutionId,
3237        pagination: Pagination<u32>,
3238    ) -> Result<Vec<ResponseWithCursor>, DbErrorRead> {
3239        let execution_id = execution_id.clone();
3240        self.transaction(
3241            move |tx| Self::list_responses(tx, &execution_id, Some(pagination)),
3242            "list_executions",
3243        )
3244        .await
3245    }
3246}
3247
3248#[cfg(any(test, feature = "tempfile"))]
3249pub mod tempfile {
3250    use super::{SqliteConfig, SqlitePool};
3251    use tempfile::NamedTempFile;
3252
3253    pub async fn sqlite_pool() -> (SqlitePool, Option<NamedTempFile>) {
3254        if let Ok(path) = std::env::var("SQLITE_FILE") {
3255            (
3256                SqlitePool::new(path, SqliteConfig::default())
3257                    .await
3258                    .unwrap(),
3259                None,
3260            )
3261        } else {
3262            let file = NamedTempFile::new().unwrap();
3263            let path = file.path();
3264            (
3265                SqlitePool::new(path, SqliteConfig::default())
3266                    .await
3267                    .unwrap(),
3268                Some(file),
3269            )
3270        }
3271    }
3272}