obeli_sk_db_sqlite/
sqlite_dao.rs

1use crate::histograms::Histograms;
2use assert_matches::assert_matches;
3use async_trait::async_trait;
4use chrono::{DateTime, Utc};
5use concepts::{
6    ComponentId, ExecutionId, FunctionFqn, JoinSetId, StrVariant, SupportedFunctionReturnValue,
7    prefixed_ulid::{DelayId, ExecutionIdDerived, ExecutorId, RunId},
8    storage::{
9        AppendBatchResponse, AppendEventsToExecution, AppendRequest, AppendResponse,
10        AppendResponseToExecution, BacktraceFilter, BacktraceInfo, CreateRequest, DUMMY_CREATED,
11        DUMMY_HISTORY_EVENT, DbConnection, DbErrorGeneric, DbErrorRead, DbErrorReadWithTimeout,
12        DbErrorWrite, DbErrorWritePermanent, DbExecutor, DbPool, DbPoolCloseable, ExecutionEvent,
13        ExecutionEventInner, ExecutionListPagination, ExecutionWithState, ExpiredDelay,
14        ExpiredLock, ExpiredTimer, HistoryEvent, JoinSetRequest, JoinSetResponse,
15        JoinSetResponseEvent, JoinSetResponseEventOuter, LockPendingResponse, LockedExecution,
16        Pagination, PendingState, PendingStateFinished, PendingStateFinishedResultKind,
17        ResponseWithCursor, Version, VersionType,
18    },
19};
20use conversions::{FromStrWrapper, JsonWrapper, consistency_db_err, consistency_rusqlite};
21use hashbrown::HashMap;
22use rusqlite::{
23    CachedStatement, Connection, OpenFlags, OptionalExtension, Params, ToSql, Transaction,
24    TransactionBehavior, named_params, types::ToSqlOutput,
25};
26use std::{
27    cmp::max,
28    collections::VecDeque,
29    fmt::Debug,
30    ops::DerefMut,
31    path::Path,
32    str::FromStr,
33    sync::{
34        Arc, Mutex,
35        atomic::{AtomicBool, Ordering},
36    },
37    time::Duration,
38};
39use std::{fmt::Write as _, pin::Pin};
40use tokio::{
41    sync::{mpsc, oneshot},
42    time::Instant,
43};
44use tracing::{Level, debug, error, info, instrument, trace, warn};
45
46#[derive(Debug, thiserror::Error)]
47#[error("initialization error")]
48pub struct InitializationError;
49
50#[derive(Debug, Clone)]
51struct DelayReq {
52    join_set_id: JoinSetId,
53    delay_id: DelayId,
54    expires_at: DateTime<Utc>,
55}
56/*
57mmap_size = 128MB - Set the global memory map so all processes can share some data
58https://www.sqlite.org/pragma.html#pragma_mmap_size
59https://www.sqlite.org/mmap.html
60
61journal_size_limit = 64 MB - limit on the WAL file to prevent unlimited growth
62https://www.sqlite.org/pragma.html#pragma_journal_size_limit
63
64Inspired by https://github.com/rails/rails/pull/49349
65*/
66
67const PRAGMA: [[&str; 2]; 10] = [
68    ["journal_mode", "wal"],
69    ["synchronous", "FULL"],
70    ["foreign_keys", "true"],
71    ["busy_timeout", "1000"],
72    ["cache_size", "10000"], // number of pages
73    ["temp_store", "MEMORY"],
74    ["page_size", "8192"], // 8 KB
75    ["mmap_size", "134217728"],
76    ["journal_size_limit", "67108864"],
77    ["integrity_check", ""],
78];
79
80// Append only
81const CREATE_TABLE_T_METADATA: &str = r"
82CREATE TABLE IF NOT EXISTS t_metadata (
83    id INTEGER PRIMARY KEY AUTOINCREMENT,
84    schema_version INTEGER NOT NULL,
85    created_at TEXT NOT NULL
86) STRICT
87";
88const T_METADATA_EXPECTED_SCHEMA_VERSION: u32 = 2;
89
90/// Stores execution history. Append only.
91const CREATE_TABLE_T_EXECUTION_LOG: &str = r"
92CREATE TABLE IF NOT EXISTS t_execution_log (
93    execution_id TEXT NOT NULL,
94    created_at TEXT NOT NULL,
95    json_value TEXT NOT NULL,
96    version INTEGER NOT NULL,
97    variant TEXT NOT NULL,
98    join_set_id TEXT,
99    history_event_type TEXT GENERATED ALWAYS AS (json_value->>'$.HistoryEvent.event.type') STORED,
100    PRIMARY KEY (execution_id, version)
101) STRICT
102";
103// Used in `fetch_created` and `get_execution_event`
104const CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_VERSION: &str = r"
105CREATE INDEX IF NOT EXISTS idx_t_execution_log_execution_id_version  ON t_execution_log (execution_id, version);
106";
107// Used in `lock_inner` to filter by execution ID and variant (created or event history)
108const CREATE_INDEX_IDX_T_EXECUTION_ID_EXECUTION_ID_VARIANT: &str = r"
109CREATE INDEX IF NOT EXISTS idx_t_execution_log_execution_id_variant  ON t_execution_log (execution_id, variant);
110";
111
112/// Stores child execution return values for the parent (`execution_id`). Append only.
113/// For `JoinSetResponse::DelayFinished`, column `delay_id` must not be null.
114/// For `JoinSetResponse::ChildExecutionFinished`, column `child_execution_id`,`finished_version`
115/// must not be null.
116const CREATE_TABLE_T_JOIN_SET_RESPONSE: &str = r"
117CREATE TABLE IF NOT EXISTS t_join_set_response (
118    id INTEGER PRIMARY KEY AUTOINCREMENT,
119    created_at TEXT NOT NULL,
120    execution_id TEXT NOT NULL,
121    join_set_id TEXT NOT NULL,
122
123    delay_id TEXT,
124
125    child_execution_id TEXT,
126    finished_version INTEGER,
127
128    UNIQUE (execution_id, join_set_id, delay_id, child_execution_id)
129) STRICT
130";
131// Used when querying for the next response
132const CREATE_INDEX_IDX_T_JOIN_SET_RESPONSE_EXECUTION_ID_ID: &str = r"
133CREATE INDEX IF NOT EXISTS idx_t_join_set_response_execution_id_id ON t_join_set_response (execution_id, id);
134";
135
136/// Stores executions in `PendingState`
137/// `state` to column mapping:
138/// `PendingAt`:            (nothing but required columns)
139/// `Locked`:               `last_lock_version`, `executor_id`, `run_id`
140/// `BlockedByJoinSet`:     `join_set_id`, `join_set_closing`
141/// `Finished` :            `result_kind`.
142const CREATE_TABLE_T_STATE: &str = r"
143CREATE TABLE IF NOT EXISTS t_state (
144    execution_id TEXT NOT NULL,
145    is_top_level INTEGER NOT NULL,
146    corresponding_version INTEGER NOT NULL,
147    pending_expires_finished TEXT NOT NULL,
148    ffqn TEXT NOT NULL,
149    state TEXT NOT NULL,
150    created_at TEXT NOT NULL,
151    updated_at TEXT NOT NULL,
152    scheduled_at TEXT NOT NULL,
153    intermittent_event_count INTEGER NOT NULL,
154    max_retries INTEGER NOT NULL,
155    retry_exp_backoff_millis INTEGER NOT NULL,
156
157    last_lock_version INTEGER,
158    executor_id TEXT,
159    run_id TEXT,
160
161    join_set_id TEXT,
162    join_set_closing INTEGER,
163
164    result_kind TEXT,
165
166    PRIMARY KEY (execution_id)
167) STRICT
168";
169const STATE_PENDING_AT: &str = "PendingAt";
170const STATE_BLOCKED_BY_JOIN_SET: &str = "BlockedByJoinSet";
171const STATE_LOCKED: &str = "Locked";
172const STATE_FINISHED: &str = "Finished";
173const HISTORY_EVENT_TYPE_JOIN_NEXT: &str = "JoinNext";
174
175// TODO: partial indexes
176const IDX_T_STATE_LOCK_PENDING: &str = r"
177CREATE INDEX IF NOT EXISTS idx_t_state_lock_pending ON t_state (state, pending_expires_finished, ffqn);
178";
179const IDX_T_STATE_EXPIRED_TIMERS: &str = r"
180CREATE INDEX IF NOT EXISTS idx_t_state_expired_timers ON t_state (pending_expires_finished) WHERE executor_id IS NOT NULL;
181";
182const IDX_T_STATE_EXECUTION_ID_IS_TOP_LEVEL: &str = r"
183CREATE INDEX IF NOT EXISTS idx_t_state_execution_id_is_root ON t_state (execution_id, is_top_level);
184";
185// For `list_executions` by ffqn
186const IDX_T_STATE_FFQN: &str = r"
187CREATE INDEX IF NOT EXISTS idx_t_state_ffqn ON t_state (ffqn);
188";
189// For `list_executions` by creation date
190const IDX_T_STATE_CREATED_AT: &str = r"
191CREATE INDEX IF NOT EXISTS idx_t_state_created_at ON t_state (created_at);
192";
193
194/// Represents [`ExpiredTimer::AsyncDelay`] . Rows are deleted when the delay is processed.
195const CREATE_TABLE_T_DELAY: &str = r"
196CREATE TABLE IF NOT EXISTS t_delay (
197    execution_id TEXT NOT NULL,
198    join_set_id TEXT NOT NULL,
199    delay_id TEXT NOT NULL,
200    expires_at TEXT NOT NULL,
201    PRIMARY KEY (execution_id, join_set_id, delay_id)
202) STRICT
203";
204
205// Append only.
206const CREATE_TABLE_T_BACKTRACE: &str = r"
207CREATE TABLE IF NOT EXISTS t_backtrace (
208    execution_id TEXT NOT NULL,
209    component_id TEXT NOT NULL,
210    version_min_including INTEGER NOT NULL,
211    version_max_excluding INTEGER NOT NULL,
212    wasm_backtrace TEXT NOT NULL,
213    PRIMARY KEY (execution_id, version_min_including, version_max_excluding)
214) STRICT
215";
216// Index for searching backtraces by execution_id and version
217const IDX_T_BACKTRACE_EXECUTION_ID_VERSION: &str = r"
218CREATE INDEX IF NOT EXISTS idx_t_backtrace_execution_id_version ON t_backtrace (execution_id, version_min_including, version_max_excluding);
219";
220
221#[derive(Debug, thiserror::Error, Clone)]
222enum RusqliteError {
223    #[error("not found")]
224    NotFound,
225    #[error("generic: {0}")]
226    Generic(StrVariant),
227}
228
229mod conversions {
230
231    use super::RusqliteError;
232    use concepts::storage::{DbErrorGeneric, DbErrorRead, DbErrorReadWithTimeout, DbErrorWrite};
233    use rusqlite::types::{FromSql, FromSqlError};
234    use std::{fmt::Debug, str::FromStr};
235    use tracing::error;
236
237    impl From<rusqlite::Error> for RusqliteError {
238        fn from(err: rusqlite::Error) -> Self {
239            if matches!(err, rusqlite::Error::QueryReturnedNoRows) {
240                RusqliteError::NotFound
241            } else {
242                error!(backtrace = %std::backtrace::Backtrace::capture(), "Sqlite error {err:?}");
243                RusqliteError::Generic(err.to_string().into())
244            }
245        }
246    }
247
248    impl From<RusqliteError> for DbErrorGeneric {
249        fn from(err: RusqliteError) -> DbErrorGeneric {
250            match err {
251                RusqliteError::NotFound => DbErrorGeneric::Uncategorized("not found".into()),
252                RusqliteError::Generic(str) => DbErrorGeneric::Uncategorized(str),
253            }
254        }
255    }
256    impl From<RusqliteError> for DbErrorRead {
257        fn from(err: RusqliteError) -> Self {
258            if matches!(err, RusqliteError::NotFound) {
259                Self::NotFound
260            } else {
261                Self::from(DbErrorGeneric::from(err))
262            }
263        }
264    }
265    impl From<RusqliteError> for DbErrorReadWithTimeout {
266        fn from(err: RusqliteError) -> Self {
267            Self::from(DbErrorRead::from(err))
268        }
269    }
270    impl From<RusqliteError> for DbErrorWrite {
271        fn from(err: RusqliteError) -> Self {
272            if matches!(err, RusqliteError::NotFound) {
273                Self::NotFound
274            } else {
275                Self::from(DbErrorGeneric::from(err))
276            }
277        }
278    }
279
280    pub(crate) struct JsonWrapper<T>(pub(crate) T);
281    impl<T: serde::de::DeserializeOwned + 'static + Debug> FromSql for JsonWrapper<T> {
282        fn column_result(
283            value: rusqlite::types::ValueRef<'_>,
284        ) -> rusqlite::types::FromSqlResult<Self> {
285            let value = match value {
286                rusqlite::types::ValueRef::Text(value) | rusqlite::types::ValueRef::Blob(value) => {
287                    Ok(value)
288                }
289                other => {
290                    error!(
291                        backtrace = %std::backtrace::Backtrace::capture(),
292                        "Unexpected type when conveting to JSON - expected Text or Blob, got type `{other:?}`",
293                    );
294                    Err(FromSqlError::InvalidType)
295                }
296            }?;
297            let value = serde_json::from_slice::<T>(value).map_err(|err| {
298                error!(
299                    backtrace = %std::backtrace::Backtrace::capture(),
300                    "Cannot convert JSON value `{value:?}` to type:`{type}` - {err:?}",
301                    r#type = std::any::type_name::<T>()
302                );
303                FromSqlError::InvalidType
304            })?;
305            Ok(Self(value))
306        }
307    }
308
309    pub(crate) struct FromStrWrapper<T: FromStr>(pub(crate) T);
310    impl<T: FromStr<Err = D>, D: Debug> FromSql for FromStrWrapper<T> {
311        fn column_result(
312            value: rusqlite::types::ValueRef<'_>,
313        ) -> rusqlite::types::FromSqlResult<Self> {
314            let value = String::column_result(value)?;
315            let value = T::from_str(&value).map_err(|err| {
316                error!(
317                    backtrace = %std::backtrace::Backtrace::capture(),
318                    "Cannot convert string `{value}` to type:`{type}` - {err:?}",
319                    r#type = std::any::type_name::<T>()
320                );
321                FromSqlError::InvalidType
322            })?;
323            Ok(Self(value))
324        }
325    }
326
327    // Used as a wrapper for `FromSqlError::Other`
328    #[derive(Debug, thiserror::Error)]
329    #[error("{0}")]
330    pub(crate) struct OtherError(&'static str);
331    pub(crate) fn consistency_rusqlite(input: &'static str) -> rusqlite::Error {
332        FromSqlError::other(OtherError(input)).into()
333    }
334
335    pub(crate) fn consistency_db_err(input: &'static str) -> DbErrorGeneric {
336        DbErrorGeneric::Uncategorized(input.into())
337    }
338}
339
340#[derive(Debug)]
341struct CombinedStateDTO {
342    state: String,
343    ffqn: String,
344    pending_expires_finished: DateTime<Utc>,
345    // Locked:
346    last_lock_version: Option<Version>,
347    executor_id: Option<ExecutorId>,
348    run_id: Option<RunId>,
349    // Blocked by join set:
350    join_set_id: Option<JoinSetId>,
351    join_set_closing: Option<bool>,
352    // Finished:
353    result_kind: Option<PendingStateFinishedResultKind>,
354}
355#[derive(Debug)]
356struct CombinedState {
357    ffqn: FunctionFqn,
358    pending_state: PendingState,
359    corresponding_version: Version,
360}
361impl CombinedState {
362    fn get_next_version_assert_not_finished(&self) -> Version {
363        assert!(!self.pending_state.is_finished());
364        self.corresponding_version.increment()
365    }
366
367    #[cfg(feature = "test")]
368    fn get_next_version_or_finished(&self) -> Version {
369        if self.pending_state.is_finished() {
370            self.corresponding_version.clone()
371        } else {
372            self.corresponding_version.increment()
373        }
374    }
375}
376
377#[derive(Debug)]
378struct NotifierPendingAt {
379    scheduled_at: DateTime<Utc>,
380    ffqn: FunctionFqn,
381}
382
383#[derive(Debug)]
384struct NotifierExecutionFinished {
385    execution_id: ExecutionId,
386    retval: SupportedFunctionReturnValue,
387}
388
389#[derive(Debug, Default)]
390struct AppendNotifier {
391    pending_at: Option<NotifierPendingAt>,
392    execution_finished: Option<NotifierExecutionFinished>,
393    response: Option<(ExecutionId, JoinSetResponseEventOuter)>,
394}
395
396impl CombinedState {
397    fn new(
398        dto: &CombinedStateDTO,
399        corresponding_version: Version,
400    ) -> Result<Self, rusqlite::Error> {
401        let pending_state = match dto {
402            CombinedStateDTO {
403                state,
404                ffqn: _,
405                pending_expires_finished: scheduled_at,
406                last_lock_version: None,
407                executor_id: None,
408                run_id: None,
409                join_set_id: None,
410                join_set_closing: None,
411                result_kind: None,
412            } if state == STATE_PENDING_AT => Ok(PendingState::PendingAt {
413                scheduled_at: *scheduled_at,
414            }),
415            CombinedStateDTO {
416                state,
417                ffqn: _,
418                pending_expires_finished: lock_expires_at,
419                last_lock_version: Some(_),
420                executor_id: Some(executor_id),
421                run_id: Some(run_id),
422                join_set_id: None,
423                join_set_closing: None,
424                result_kind: None,
425            } if state == STATE_LOCKED => Ok(PendingState::Locked {
426                executor_id: *executor_id,
427                run_id: *run_id,
428                lock_expires_at: *lock_expires_at,
429            }),
430            CombinedStateDTO {
431                state,
432                ffqn: _,
433                pending_expires_finished: lock_expires_at,
434                last_lock_version: None,
435                executor_id: None,
436                run_id: None,
437                join_set_id: Some(join_set_id),
438                join_set_closing: Some(join_set_closing),
439                result_kind: None,
440            } if state == STATE_BLOCKED_BY_JOIN_SET => Ok(PendingState::BlockedByJoinSet {
441                join_set_id: join_set_id.clone(),
442                closing: *join_set_closing,
443                lock_expires_at: *lock_expires_at,
444            }),
445            CombinedStateDTO {
446                state,
447                ffqn: _,
448                pending_expires_finished: finished_at,
449                last_lock_version: None,
450                executor_id: None,
451                run_id: None,
452                join_set_id: None,
453                join_set_closing: None,
454                result_kind: Some(result_kind),
455            } if state == STATE_FINISHED => Ok(PendingState::Finished {
456                finished: PendingStateFinished {
457                    finished_at: *finished_at,
458                    version: corresponding_version.0,
459                    result_kind: *result_kind,
460                },
461            }),
462            _ => {
463                error!("Cannot deserialize pending state from  {dto:?}");
464                Err(consistency_rusqlite("invalid `t_state`"))
465            }
466        }?;
467        Ok(Self {
468            ffqn: FunctionFqn::from_str(&dto.ffqn).map_err(|parse_err| {
469                error!("Error parsing ffqn of {dto:?} - {parse_err:?}");
470                consistency_rusqlite("invalid ffqn value in `t_state`")
471            })?,
472            pending_state,
473            corresponding_version,
474        })
475    }
476}
477
478#[derive(derive_more::Debug)]
479struct LogicalTx {
480    #[debug(skip)]
481    func: Box<dyn FnMut(&mut Transaction) + Send>,
482    sent_at: Instant,
483    func_name: &'static str,
484    #[debug(skip)]
485    commit_ack_sender: oneshot::Sender<Result<(), RusqliteError>>,
486}
487
488#[derive(derive_more::Debug)]
489enum ThreadCommand {
490    LogicalTx(LogicalTx),
491    Shutdown,
492}
493
494#[derive(Clone)]
495pub struct SqlitePool(SqlitePoolInner);
496
497type ResponseSubscribers =
498    Arc<Mutex<HashMap<ExecutionId, (oneshot::Sender<JoinSetResponseEventOuter>, u64)>>>;
499type PendingFfqnSubscribers = Arc<Mutex<HashMap<FunctionFqn, (mpsc::Sender<()>, u64)>>>;
500type ExecutionFinishedSubscribers =
501    Mutex<HashMap<ExecutionId, HashMap<u64, oneshot::Sender<SupportedFunctionReturnValue>>>>;
502#[derive(Clone)]
503struct SqlitePoolInner {
504    shutdown_requested: Arc<AtomicBool>,
505    shutdown_finished: Arc<AtomicBool>,
506    command_tx: tokio::sync::mpsc::Sender<ThreadCommand>,
507    response_subscribers: ResponseSubscribers,
508    pending_ffqn_subscribers: PendingFfqnSubscribers,
509    execution_finished_subscribers: Arc<ExecutionFinishedSubscribers>,
510    join_handle: Option<Arc<std::thread::JoinHandle<()>>>, // always Some, Optional for swapping in drop.
511}
512
513#[async_trait]
514impl DbPoolCloseable for SqlitePool {
515    async fn close(self) {
516        debug!("Sqlite is closing");
517        self.0.shutdown_requested.store(true, Ordering::Release);
518        // Unblock the thread's blocking_recv. If the capacity is reached, the next processed message will trigger shutdown.
519        let _ = self.0.command_tx.try_send(ThreadCommand::Shutdown);
520        while !self.0.shutdown_finished.load(Ordering::Acquire) {
521            tokio::time::sleep(Duration::from_millis(1)).await;
522        }
523        debug!("Sqlite was closed");
524    }
525}
526
527#[async_trait]
528impl DbPool for SqlitePool {
529    fn connection(&self) -> Box<dyn DbConnection> {
530        Box::new(self.clone())
531    }
532}
533impl Drop for SqlitePool {
534    fn drop(&mut self) {
535        let arc = self.0.join_handle.take().expect("join_handle was set");
536        if let Ok(join_handle) = Arc::try_unwrap(arc) {
537            // Last holder
538            if !join_handle.is_finished() {
539                if !self.0.shutdown_finished.load(Ordering::Acquire) {
540                    // Best effort to shut down the sqlite thread.
541                    let backtrace = std::backtrace::Backtrace::capture();
542                    warn!("SqlitePool was not closed properly - {backtrace}");
543                    self.0.shutdown_requested.store(true, Ordering::Release);
544                    // Unblock the thread's blocking_recv. If the capacity is reached, the next processed message will trigger shutdown.
545                    let _ = self.0.command_tx.try_send(ThreadCommand::Shutdown);
546                    // Not joining the thread, drop might be called from async context.
547                    // We are shutting down the server anyway.
548                } else {
549                    // The thread set `shutdown_finished` as its last operation.
550                }
551            }
552        }
553    }
554}
555
556#[derive(Debug, Clone)]
557pub struct SqliteConfig {
558    pub queue_capacity: usize,
559    pub pragma_override: Option<HashMap<String, String>>,
560    pub metrics_threshold: Option<Duration>,
561}
562impl Default for SqliteConfig {
563    fn default() -> Self {
564        Self {
565            queue_capacity: 100,
566            pragma_override: None,
567            metrics_threshold: None,
568        }
569    }
570}
571
572impl SqlitePool {
573    fn init_thread(
574        path: &Path,
575        mut pragma_override: HashMap<String, String>,
576    ) -> Result<Connection, InitializationError> {
577        fn conn_execute<P: Params>(
578            conn: &Connection,
579            sql: &str,
580            params: P,
581        ) -> Result<(), InitializationError> {
582            conn.execute(sql, params).map(|_| ()).map_err(|err| {
583                error!("Cannot run `{sql}` - {err:?}");
584                InitializationError
585            })
586        }
587        fn pragma_update(
588            conn: &Connection,
589            name: &str,
590            value: &str,
591        ) -> Result<(), InitializationError> {
592            if value.is_empty() {
593                debug!("Querying PRAGMA {name}");
594                conn.pragma_query(None, name, |row| {
595                    debug!("{row:?}");
596                    Ok(())
597                })
598                .map_err(|err| {
599                    error!("cannot update pragma `{name}`=`{value}` - {err:?}");
600                    InitializationError
601                })
602            } else {
603                debug!("Setting PRAGMA {name}={value}");
604                conn.pragma_update(None, name, value).map_err(|err| {
605                    error!("cannot update pragma `{name}`=`{value}` - {err:?}");
606                    InitializationError
607                })
608            }
609        }
610
611        let conn = Connection::open_with_flags(path, OpenFlags::default()).map_err(|err| {
612            error!("cannot open the connection - {err:?}");
613            InitializationError
614        })?;
615
616        for [pragma_name, default_value] in PRAGMA {
617            let pragma_value = pragma_override
618                .remove(pragma_name)
619                .unwrap_or_else(|| default_value.to_string());
620            pragma_update(&conn, pragma_name, &pragma_value)?;
621        }
622        // drain the rest overrides
623        for (pragma_name, pragma_value) in pragma_override.drain() {
624            pragma_update(&conn, &pragma_name, &pragma_value)?;
625        }
626
627        // t_metadata
628        conn_execute(&conn, CREATE_TABLE_T_METADATA, [])?;
629        // Insert row if not exists.
630        conn_execute(
631            &conn,
632            &format!(
633                "INSERT INTO t_metadata (schema_version, created_at) VALUES
634                    ({T_METADATA_EXPECTED_SCHEMA_VERSION}, ?) ON CONFLICT DO NOTHING"
635            ),
636            [Utc::now()],
637        )?;
638        // Fail on unexpected `schema_version`.
639        let actual_version = conn
640            .prepare("SELECT schema_version FROM t_metadata ORDER BY id DESC LIMIT 1")
641            .map_err(|err| {
642                error!("cannot select schema version - {err:?}");
643                InitializationError
644            })?
645            .query_row([], |row| row.get::<_, u32>("schema_version"));
646
647        let actual_version = actual_version.map_err(|err| {
648            error!("Cannot read the schema version - {err:?}");
649            InitializationError
650        })?;
651        if actual_version != T_METADATA_EXPECTED_SCHEMA_VERSION {
652            error!(
653                "wrong schema version, expected {T_METADATA_EXPECTED_SCHEMA_VERSION}, got {actual_version}"
654            );
655            return Err(InitializationError);
656        }
657
658        // t_execution_log
659        conn_execute(&conn, CREATE_TABLE_T_EXECUTION_LOG, [])?;
660        conn_execute(
661            &conn,
662            CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_VERSION,
663            [],
664        )?;
665        conn_execute(
666            &conn,
667            CREATE_INDEX_IDX_T_EXECUTION_ID_EXECUTION_ID_VARIANT,
668            [],
669        )?;
670        // t_join_set_response
671        conn_execute(&conn, CREATE_TABLE_T_JOIN_SET_RESPONSE, [])?;
672        conn_execute(
673            &conn,
674            CREATE_INDEX_IDX_T_JOIN_SET_RESPONSE_EXECUTION_ID_ID,
675            [],
676        )?;
677        // t_state
678        conn_execute(&conn, CREATE_TABLE_T_STATE, [])?;
679        conn_execute(&conn, IDX_T_STATE_LOCK_PENDING, [])?;
680        conn_execute(&conn, IDX_T_STATE_EXPIRED_TIMERS, [])?;
681        conn_execute(&conn, IDX_T_STATE_EXECUTION_ID_IS_TOP_LEVEL, [])?;
682        conn_execute(&conn, IDX_T_STATE_FFQN, [])?;
683        conn_execute(&conn, IDX_T_STATE_CREATED_AT, [])?;
684        // t_delay
685        conn_execute(&conn, CREATE_TABLE_T_DELAY, [])?;
686        // t_backtrace
687        conn_execute(&conn, CREATE_TABLE_T_BACKTRACE, [])?;
688        conn_execute(&conn, IDX_T_BACKTRACE_EXECUTION_ID_VERSION, [])?;
689        Ok(conn)
690    }
691
692    fn connection_rpc(
693        mut conn: Connection,
694        shutdown_requested: &AtomicBool,
695        shutdown_finished: &AtomicBool,
696        mut command_rx: mpsc::Receiver<ThreadCommand>,
697        metrics_threshold: Option<Duration>,
698    ) {
699        let mut histograms = Histograms::new(metrics_threshold);
700        while Self::tick(
701            &mut conn,
702            shutdown_requested,
703            &mut command_rx,
704            &mut histograms,
705        )
706        .is_ok()
707        {
708            // Loop until shutdown is set to true.
709        }
710        debug!("Closing command thread");
711        shutdown_finished.store(true, Ordering::Release);
712    }
713
714    fn tick(
715        conn: &mut Connection,
716        shutdown_requested: &AtomicBool,
717        command_rx: &mut mpsc::Receiver<ThreadCommand>,
718        histograms: &mut Histograms,
719    ) -> Result<(), ()> {
720        let mut ltx_list = Vec::new();
721        // Wait for first logical tx.
722        let mut ltx = match command_rx.blocking_recv() {
723            Some(ThreadCommand::LogicalTx(ltx)) => ltx,
724            Some(ThreadCommand::Shutdown) => {
725                debug!("shutdown message received");
726                return Err(());
727            }
728            None => {
729                debug!("command_rx was closed");
730                return Err(());
731            }
732        };
733        let all_fns_start = std::time::Instant::now();
734        let mut physical_tx = conn
735            .transaction_with_behavior(TransactionBehavior::Immediate)
736            .map_err(|begin_err| {
737                error!("Cannot open transaction, closing sqlite - {begin_err:?}");
738            })?;
739        Self::ltx_apply_to_tx(&mut ltx, &mut physical_tx, histograms);
740        ltx_list.push(ltx);
741
742        while let Ok(more) = command_rx.try_recv() {
743            let mut ltx = match more {
744                ThreadCommand::Shutdown => {
745                    debug!("shutdown message received");
746                    return Err(());
747                }
748                ThreadCommand::LogicalTx(ltx) => ltx,
749            };
750
751            Self::ltx_apply_to_tx(&mut ltx, &mut physical_tx, histograms);
752            ltx_list.push(ltx);
753        }
754        histograms.record_all_fns(all_fns_start.elapsed());
755
756        {
757            // Commit
758            if shutdown_requested.load(Ordering::Relaxed) {
759                debug!("Recveived shutdown during processing of the batch");
760                return Err(());
761            }
762            let commit_result = {
763                let now = std::time::Instant::now();
764                let commit_result = physical_tx.commit().map_err(RusqliteError::from);
765                histograms.record_commit(now.elapsed());
766                commit_result
767            };
768            if commit_result.is_ok() || ltx_list.len() == 1 {
769                // Happy path - just send the commit ACK.
770                for ltx in ltx_list {
771                    // Ignore the result: ThreadCommand producer timed out before awaiting the ack.
772                    let _ = ltx.commit_ack_sender.send(commit_result.clone());
773                }
774            } else {
775                // Bulk transaction failed. Replay each ltx in its own physical transaction.
776                for ltx in ltx_list {
777                    Self::ltx_commit_single(ltx, conn, shutdown_requested, histograms)?;
778                }
779            }
780        }
781        histograms.print_if_elapsed();
782        Ok(())
783    }
784
785    fn ltx_commit_single(
786        mut ltx: LogicalTx,
787        conn: &mut Connection,
788        shutdown_requested: &AtomicBool,
789        histograms: &mut Histograms,
790    ) -> Result<(), ()> {
791        let mut physical_tx = conn
792            .transaction_with_behavior(TransactionBehavior::Immediate)
793            .map_err(|begin_err| {
794                error!("Cannot open transaction, closing sqlite - {begin_err:?}");
795            })?;
796        Self::ltx_apply_to_tx(&mut ltx, &mut physical_tx, histograms);
797        if shutdown_requested.load(Ordering::Relaxed) {
798            debug!("Recveived shutdown during processing of the batch");
799            return Err(());
800        }
801        let commit_result = {
802            let now = std::time::Instant::now();
803            let commit_result = physical_tx.commit().map_err(RusqliteError::from);
804            histograms.record_commit(now.elapsed());
805            commit_result
806        };
807        // Ignore the result: ThreadCommand producer timed out before awaiting the ack.
808        let _ = ltx.commit_ack_sender.send(commit_result);
809        Ok(())
810    }
811
812    fn ltx_apply_to_tx(
813        ltx: &mut LogicalTx,
814        physical_tx: &mut Transaction,
815        histograms: &mut Histograms,
816    ) {
817        let sent_latency = ltx.sent_at.elapsed();
818        let started_at = Instant::now();
819        (ltx.func)(physical_tx);
820        histograms.record_command(sent_latency, ltx.func_name, started_at.elapsed());
821    }
822
823    #[instrument(level = Level::DEBUG, skip_all, name = "sqlite_new")]
824    pub async fn new<P: AsRef<Path>>(
825        path: P,
826        config: SqliteConfig,
827    ) -> Result<Self, InitializationError> {
828        let path = path.as_ref().to_owned();
829
830        let shutdown_requested = Arc::new(AtomicBool::new(false));
831        let shutdown_finished = Arc::new(AtomicBool::new(false));
832
833        let (command_tx, command_rx) = tokio::sync::mpsc::channel(config.queue_capacity);
834        info!("Sqlite database location: {path:?}");
835        let join_handle = {
836            // Initialize the `Connection`.
837            let init_task = {
838                tokio::task::spawn_blocking(move || {
839                    Self::init_thread(&path, config.pragma_override.unwrap_or_default())
840                })
841                .await
842            };
843            let conn = match init_task {
844                Ok(res) => res?,
845                Err(join_err) => {
846                    error!("Initialization panic - {join_err:?}");
847                    return Err(InitializationError);
848                }
849            };
850            let shutdown_requested = shutdown_requested.clone();
851            let shutdown_finished = shutdown_finished.clone();
852            // Start the RPC thread.
853            std::thread::spawn(move || {
854                Self::connection_rpc(
855                    conn,
856                    &shutdown_requested,
857                    &shutdown_finished,
858                    command_rx,
859                    config.metrics_threshold,
860                );
861            })
862        };
863        Ok(SqlitePool(SqlitePoolInner {
864            shutdown_requested,
865            shutdown_finished,
866            command_tx,
867            response_subscribers: Arc::default(),
868            pending_ffqn_subscribers: Arc::default(),
869            join_handle: Some(Arc::new(join_handle)),
870            execution_finished_subscribers: Arc::default(),
871        }))
872    }
873
874    /// Invokes the provided function wrapping a new [`rusqlite::Transaction`] that is committed automatically.
875    async fn transaction<F, T, E>(&self, mut func: F, func_name: &'static str) -> Result<T, E>
876    where
877        F: FnMut(&mut rusqlite::Transaction) -> Result<T, E> + Send + 'static,
878        T: Send + 'static,
879        E: From<DbErrorGeneric> + From<RusqliteError> + Send + 'static,
880    {
881        let fn_res: Arc<std::sync::Mutex<Option<_>>> = Arc::default();
882        let (commit_ack_sender, commit_ack_receiver) = oneshot::channel();
883        let thread_command_func = {
884            let fn_res = fn_res.clone();
885            ThreadCommand::LogicalTx(LogicalTx {
886                func: Box::new(move |tx| {
887                    let func_res = func(tx);
888                    *fn_res.lock().unwrap() = Some(func_res);
889                }),
890                sent_at: Instant::now(),
891                func_name,
892                commit_ack_sender,
893            })
894        };
895        self.0
896            .command_tx
897            .send(thread_command_func)
898            .await
899            .map_err(|_send_err| DbErrorGeneric::Close)?;
900
901        // Wait for commit ack, then get the retval from the mutex.
902        match commit_ack_receiver.await {
903            Ok(Ok(())) => {
904                let mut guard = fn_res.lock().unwrap();
905                std::mem::take(guard.deref_mut()).expect("ltx must have been run at least once")
906            }
907            Ok(Err(rusqlite_err)) => Err(E::from(rusqlite_err)),
908            Err(_) => Err(E::from(DbErrorGeneric::Close)),
909        }
910    }
911
912    fn fetch_created_event(
913        conn: &Connection,
914        execution_id: &ExecutionId,
915    ) -> Result<CreateRequest, DbErrorRead> {
916        let mut stmt = conn.prepare(
917            "SELECT created_at, json_value FROM t_execution_log WHERE \
918            execution_id = :execution_id AND version = 0",
919        )?;
920        let (created_at, event) = stmt.query_row(
921            named_params! {
922                ":execution_id": execution_id.to_string(),
923            },
924            |row| {
925                let created_at = row.get("created_at")?;
926                let event = row
927                    .get::<_, JsonWrapper<ExecutionEventInner>>("json_value")
928                    .map_err(|serde| {
929                        error!("cannot deserialize `Created` event: {row:?} - `{serde:?}`");
930                        consistency_rusqlite("cannot deserialize `Created` event")
931                    })?;
932                Ok((created_at, event.0))
933            },
934        )?;
935        if let ExecutionEventInner::Created {
936            ffqn,
937            params,
938            parent,
939            scheduled_at,
940            retry_exp_backoff,
941            max_retries,
942            component_id,
943            metadata,
944            scheduled_by,
945        } = event
946        {
947            Ok(CreateRequest {
948                created_at,
949                execution_id: execution_id.clone(),
950                ffqn,
951                params,
952                parent,
953                scheduled_at,
954                retry_exp_backoff,
955                max_retries,
956                component_id,
957                metadata,
958                scheduled_by,
959            })
960        } else {
961            error!("Row with version=0 must be a `Created` event - {event:?}");
962            Err(consistency_db_err("expected `Created` event").into())
963        }
964    }
965
966    fn check_expected_next_and_appending_version(
967        expected_version: &Version,
968        appending_version: &Version,
969    ) -> Result<(), DbErrorWrite> {
970        if *expected_version != *appending_version {
971            debug!(
972                "Version conflict - expected: {expected_version:?}, appending: {appending_version:?}"
973            );
974            return Err(DbErrorWrite::Permanent(
975                DbErrorWritePermanent::CannotWrite {
976                    reason: "version conflict".into(),
977                    expected_version: Some(expected_version.clone()),
978                },
979            ));
980        }
981        Ok(())
982    }
983
984    #[instrument(level = Level::TRACE, skip_all, fields(execution_id = %req.execution_id))]
985    fn create_inner(
986        tx: &Transaction,
987        req: CreateRequest,
988    ) -> Result<(AppendResponse, AppendNotifier), DbErrorWrite> {
989        debug!("create_inner");
990
991        let version = Version::default();
992        let execution_id = req.execution_id.clone();
993        let execution_id_str = execution_id.to_string();
994        let ffqn = req.ffqn.clone();
995        let created_at = req.created_at;
996        let scheduled_at = req.scheduled_at;
997        let max_retries = req.max_retries;
998        let backoff_millis =
999            u64::try_from(req.retry_exp_backoff.as_millis()).expect("backoff too big");
1000        let event = ExecutionEventInner::from(req);
1001        let event_ser = serde_json::to_string(&event).map_err(|err| {
1002            error!("Cannot serialize {event:?} - {err:?}");
1003            DbErrorWritePermanent::ValidationFailed("parameter serialization error".into())
1004        })?;
1005        tx.prepare(
1006                "INSERT INTO t_execution_log (execution_id, created_at, version, json_value, variant, join_set_id ) \
1007                VALUES (:execution_id, :created_at, :version, :json_value, :variant, :join_set_id)")
1008        ?
1009        .execute(named_params! {
1010            ":execution_id": &execution_id_str,
1011            ":created_at": created_at,
1012            ":version": version.0,
1013            ":json_value": event_ser,
1014            ":variant": event.variant(),
1015            ":join_set_id": event.join_set_id().map(std::string::ToString::to_string),
1016        })
1017        ?;
1018        let pending_state = PendingState::PendingAt { scheduled_at };
1019        let pending_at = {
1020            let scheduled_at = assert_matches!(pending_state, PendingState::PendingAt { scheduled_at } => scheduled_at);
1021            debug!("Creating with `Pending(`{scheduled_at:?}`)");
1022            tx.prepare(
1023                r"
1024                INSERT INTO t_state (
1025                    execution_id,
1026                    is_top_level,
1027                    corresponding_version,
1028                    pending_expires_finished,
1029                    ffqn,
1030                    state,
1031                    created_at,
1032                    updated_at,
1033                    scheduled_at,
1034                    intermittent_event_count,
1035                    max_retries,
1036                    retry_exp_backoff_millis
1037                    )
1038                VALUES (
1039                    :execution_id,
1040                    :is_top_level,
1041                    :corresponding_version,
1042                    :pending_expires_finished,
1043                    :ffqn,
1044                    :state,
1045                    :created_at,
1046                    CURRENT_TIMESTAMP,
1047                    :scheduled_at,
1048                    0,
1049                    :max_retries,
1050                    :retry_exp_backoff_millis
1051                    )
1052                ",
1053            )?
1054            .execute(named_params! {
1055                ":execution_id": execution_id.to_string(),
1056                ":is_top_level": execution_id.is_top_level(),
1057                ":corresponding_version": version.0,
1058                ":pending_expires_finished": scheduled_at,
1059                ":ffqn": ffqn.to_string(),
1060                ":state": STATE_PENDING_AT,
1061                ":created_at": created_at,
1062                ":scheduled_at": scheduled_at,
1063                ":max_retries": max_retries,
1064                ":retry_exp_backoff_millis": backoff_millis,
1065            })?;
1066            AppendNotifier {
1067                pending_at: Some(NotifierPendingAt { scheduled_at, ffqn }),
1068                execution_finished: None,
1069                response: None,
1070            }
1071        };
1072        let next_version = Version::new(version.0 + 1);
1073        Ok((next_version, pending_at))
1074    }
1075
1076    #[instrument(level = Level::DEBUG, skip_all, fields(%execution_id, %scheduled_at, %corresponding_version))]
1077    fn update_state_pending_after_response_appended(
1078        tx: &Transaction,
1079        execution_id: &ExecutionId,
1080        scheduled_at: DateTime<Utc>,     // Changing to state PendingAt
1081        corresponding_version: &Version, // t_execution_log is not be changed
1082    ) -> Result<AppendNotifier, DbErrorWrite> {
1083        debug!("Setting t_state to Pending(`{scheduled_at:?}`) after response appended");
1084        let execution_id_str = execution_id.to_string();
1085        let mut stmt = tx
1086            .prepare_cached(
1087                r"
1088                UPDATE t_state
1089                SET
1090                    corresponding_version = :corresponding_version,
1091                    pending_expires_finished = :pending_expires_finished,
1092                    state = :state,
1093                    updated_at = CURRENT_TIMESTAMP,
1094
1095                    last_lock_version = NULL,
1096                    executor_id = NULL,
1097                    run_id = NULL,
1098
1099                    join_set_id = NULL,
1100                    join_set_closing = NULL,
1101
1102                    result_kind = NULL
1103                WHERE execution_id = :execution_id
1104            ",
1105            )
1106            .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1107        let updated = stmt
1108            .execute(named_params! {
1109                ":execution_id": execution_id_str,
1110                ":corresponding_version": corresponding_version.0,
1111                ":pending_expires_finished": scheduled_at,
1112                ":state": STATE_PENDING_AT,
1113            })
1114            .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1115        if updated != 1 {
1116            return Err(DbErrorWrite::NotFound);
1117        }
1118        Ok(AppendNotifier {
1119            pending_at: Some(NotifierPendingAt {
1120                scheduled_at,
1121                ffqn: Self::fetch_created_event(tx, execution_id)?.ffqn,
1122            }),
1123            execution_finished: None,
1124            response: None,
1125        })
1126    }
1127
1128    #[instrument(level = Level::DEBUG, skip_all, fields(%execution_id, %scheduled_at, %appending_version))]
1129    fn update_state_pending_after_event_appended(
1130        tx: &Transaction,
1131        execution_id: &ExecutionId,
1132        appending_version: &Version,
1133        scheduled_at: DateTime<Utc>, // Changing to state PendingAt
1134        intermittent_failure: bool,
1135    ) -> Result<(AppendResponse, AppendNotifier), DbErrorWrite> {
1136        debug!("Setting t_state to Pending(`{scheduled_at:?}`) after event appended");
1137        // If response idx is unknown, use 0. This distinguishs the two paths.
1138        // JoinNext will always send an actual idx.
1139        let mut stmt = tx
1140            .prepare_cached(
1141                r"
1142                UPDATE t_state
1143                SET
1144                    corresponding_version = :appending_version,
1145                    pending_expires_finished = :pending_expires_finished,
1146                    state = :state,
1147                    updated_at = CURRENT_TIMESTAMP,
1148                    intermittent_event_count = intermittent_event_count + :intermittent_delta,
1149
1150                    last_lock_version = NULL,
1151                    executor_id = NULL,
1152                    run_id = NULL,
1153
1154                    join_set_id = NULL,
1155                    join_set_closing = NULL,
1156
1157                    result_kind = NULL
1158                WHERE execution_id = :execution_id;
1159            ",
1160            )
1161            .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1162        let updated = stmt
1163            .execute(named_params! {
1164                ":execution_id": execution_id.to_string(),
1165                ":appending_version": appending_version.0,
1166                ":pending_expires_finished": scheduled_at,
1167                ":state": STATE_PENDING_AT,
1168                ":intermittent_delta": i32::from(intermittent_failure) // 0 or 1
1169            })
1170            .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1171        if updated != 1 {
1172            return Err(DbErrorWrite::NotFound);
1173        }
1174        Ok((
1175            appending_version.increment(),
1176            AppendNotifier {
1177                pending_at: Some(NotifierPendingAt {
1178                    scheduled_at,
1179                    ffqn: Self::fetch_created_event(tx, execution_id)?.ffqn,
1180                }),
1181                execution_finished: None,
1182                response: None,
1183            },
1184        ))
1185    }
1186
1187    fn update_state_locked_get_intermittent_event_count(
1188        tx: &Transaction,
1189        execution_id: &ExecutionId,
1190        executor_id: ExecutorId,
1191        run_id: RunId,
1192        lock_expires_at: DateTime<Utc>,
1193        appending_version: &Version,
1194    ) -> Result<u32, DbErrorWrite> {
1195        debug!("Setting t_state to Locked(`{lock_expires_at:?}`)");
1196        let execution_id_str = execution_id.to_string();
1197        let mut stmt = tx
1198            .prepare_cached(
1199                r"
1200                UPDATE t_state
1201                SET
1202                    corresponding_version = :appending_version,
1203                    pending_expires_finished = :pending_expires_finished,
1204                    state = :state,
1205                    updated_at = CURRENT_TIMESTAMP,
1206
1207                    last_lock_version = :appending_version,
1208                    executor_id = :executor_id,
1209                    run_id = :run_id,
1210
1211                    join_set_id = NULL,
1212                    join_set_closing = NULL,
1213
1214                    result_kind = NULL
1215                WHERE execution_id = :execution_id
1216            ",
1217            )
1218            .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1219        let updated = stmt
1220            .execute(named_params! {
1221                ":execution_id": execution_id_str,
1222                ":appending_version": appending_version.0,
1223                ":pending_expires_finished": lock_expires_at,
1224                ":state": STATE_LOCKED,
1225                ":executor_id": executor_id.to_string(),
1226                ":run_id": run_id.to_string(),
1227            })
1228            .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1229        if updated != 1 {
1230            return Err(DbErrorWrite::NotFound);
1231        }
1232
1233        // fetch intermittent event count from the just-inserted row.
1234        let intermittent_event_count = tx
1235            .prepare(
1236                "SELECT intermittent_event_count FROM t_state WHERE execution_id = :execution_id",
1237            )?
1238            .query_row(
1239                named_params! {
1240                    ":execution_id": execution_id_str,
1241                },
1242                |row| {
1243                    let intermittent_event_count = row.get("intermittent_event_count")?;
1244                    Ok(intermittent_event_count)
1245                },
1246            )
1247            .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1248
1249        Ok(intermittent_event_count)
1250    }
1251
1252    fn update_state_blocked(
1253        tx: &Transaction,
1254        execution_id: &ExecutionId,
1255        appending_version: &Version,
1256        // BlockedByJoinSet fields
1257        join_set_id: &JoinSetId,
1258        lock_expires_at: DateTime<Utc>,
1259        join_set_closing: bool,
1260    ) -> Result<
1261        AppendResponse, // next version
1262        DbErrorWrite,
1263    > {
1264        debug!("Setting t_state to BlockedByJoinSet(`{join_set_id}`)");
1265        let execution_id_str = execution_id.to_string();
1266        let mut stmt = tx.prepare_cached(
1267            r"
1268                UPDATE t_state
1269                SET
1270                    corresponding_version = :appending_version,
1271                    pending_expires_finished = :pending_expires_finished,
1272                    state = :state,
1273                    updated_at = CURRENT_TIMESTAMP,
1274
1275                    last_lock_version = NULL,
1276                    executor_id = NULL,
1277                    run_id = NULL,
1278
1279                    join_set_id = :join_set_id,
1280                    join_set_closing = :join_set_closing,
1281
1282                    result_kind = NULL
1283                WHERE execution_id = :execution_id
1284            ",
1285        )?;
1286        let updated = stmt.execute(named_params! {
1287            ":execution_id": execution_id_str,
1288            ":appending_version": appending_version.0,
1289            ":pending_expires_finished": lock_expires_at,
1290            ":state": STATE_BLOCKED_BY_JOIN_SET,
1291            ":join_set_id": join_set_id,
1292            ":join_set_closing": join_set_closing,
1293        })?;
1294        if updated != 1 {
1295            return Err(DbErrorWrite::NotFound);
1296        }
1297        Ok(appending_version.increment())
1298    }
1299
1300    fn update_state_finished(
1301        tx: &Transaction,
1302        execution_id: &ExecutionId,
1303        appending_version: &Version,
1304        // Finished fields
1305        finished_at: DateTime<Utc>,
1306        result_kind: PendingStateFinishedResultKind,
1307    ) -> Result<(), DbErrorWrite> {
1308        debug!("Setting t_state to Finished");
1309        let execution_id_str = execution_id.to_string();
1310        let mut stmt = tx.prepare_cached(
1311            r"
1312                UPDATE t_state
1313                SET
1314                    corresponding_version = :appending_version,
1315                    pending_expires_finished = :pending_expires_finished,
1316                    state = :state,
1317                    updated_at = CURRENT_TIMESTAMP,
1318
1319                    last_lock_version = NULL,
1320                    executor_id = NULL,
1321                    run_id = NULL,
1322
1323                    join_set_id = NULL,
1324                    join_set_closing = NULL,
1325
1326                    result_kind = :result_kind
1327                WHERE execution_id = :execution_id
1328            ",
1329        )?;
1330        let updated = stmt.execute(named_params! {
1331            ":execution_id": execution_id_str,
1332            ":appending_version": appending_version.0,
1333            ":pending_expires_finished": finished_at,
1334            ":state": STATE_FINISHED,
1335            ":result_kind": result_kind.to_string(),
1336        })?;
1337        if updated != 1 {
1338            return Err(DbErrorWrite::NotFound);
1339        }
1340        Ok(())
1341    }
1342
1343    // Upon appending new event to t_execution_log, copy the previous t_state with changed appending_version and created_at.
1344    #[instrument(level = Level::TRACE, skip_all, fields(%execution_id, %appending_version))]
1345    fn bump_state_next_version(
1346        tx: &Transaction,
1347        execution_id: &ExecutionId,
1348        appending_version: &Version,
1349        delay_req: Option<DelayReq>,
1350    ) -> Result<AppendResponse /* next version */, DbErrorWrite> {
1351        debug!("update_index_version");
1352        let execution_id_str = execution_id.to_string();
1353        let mut stmt = tx.prepare_cached(
1354            r"
1355                UPDATE t_state
1356                SET
1357                    corresponding_version = :appending_version,
1358                    updated_at = CURRENT_TIMESTAMP
1359                WHERE execution_id = :execution_id
1360            ",
1361        )?;
1362        let updated = stmt.execute(named_params! {
1363            ":execution_id": execution_id_str,
1364            ":appending_version": appending_version.0,
1365        })?;
1366        if updated != 1 {
1367            return Err(DbErrorWrite::NotFound);
1368        }
1369        if let Some(DelayReq {
1370            join_set_id,
1371            delay_id,
1372            expires_at,
1373        }) = delay_req
1374        {
1375            debug!("Inserting delay to `t_delay`");
1376            let mut stmt = tx.prepare_cached(
1377                "INSERT INTO t_delay (execution_id, join_set_id, delay_id, expires_at) \
1378                VALUES \
1379                (:execution_id, :join_set_id, :delay_id, :expires_at)",
1380            )?;
1381            stmt.execute(named_params! {
1382                ":execution_id": execution_id_str,
1383                ":join_set_id": join_set_id.to_string(),
1384                ":delay_id": delay_id.to_string(),
1385                ":expires_at": expires_at,
1386            })?;
1387        }
1388        Ok(appending_version.increment())
1389    }
1390
1391    fn get_combined_state(
1392        tx: &Transaction,
1393        execution_id: &ExecutionId,
1394    ) -> Result<CombinedState, DbErrorRead> {
1395        let mut stmt = tx.prepare(
1396            r"
1397                SELECT
1398                    state, ffqn, corresponding_version, pending_expires_finished,
1399                    last_lock_version, executor_id, run_id,
1400                    join_set_id, join_set_closing,
1401                    result_kind
1402                    FROM t_state
1403                WHERE
1404                    execution_id = :execution_id
1405                ",
1406        )?;
1407        stmt.query_row(
1408            named_params! {
1409                ":execution_id": execution_id.to_string(),
1410            },
1411            |row| {
1412                CombinedState::new(
1413                    &CombinedStateDTO {
1414                        state: row.get("state")?,
1415                        ffqn: row.get("ffqn")?,
1416                        pending_expires_finished: row
1417                            .get::<_, DateTime<Utc>>("pending_expires_finished")?,
1418                        last_lock_version: row
1419                            .get::<_, Option<VersionType>>("last_lock_version")?
1420                            .map(Version::new),
1421                        executor_id: row.get::<_, Option<ExecutorId>>("executor_id")?,
1422                        run_id: row.get::<_, Option<RunId>>("run_id")?,
1423                        join_set_id: row.get::<_, Option<JoinSetId>>("join_set_id")?,
1424                        join_set_closing: row.get::<_, Option<bool>>("join_set_closing")?,
1425                        result_kind: row
1426                            .get::<_, Option<FromStrWrapper<PendingStateFinishedResultKind>>>(
1427                                "result_kind",
1428                            )?
1429                            .map(|wrapper| wrapper.0),
1430                    },
1431                    Version::new(row.get("corresponding_version")?),
1432                )
1433            },
1434        )
1435        .map_err(DbErrorRead::from)
1436    }
1437
1438    fn list_executions(
1439        read_tx: &Transaction,
1440        ffqn: Option<&FunctionFqn>,
1441        top_level_only: bool,
1442        pagination: &ExecutionListPagination,
1443    ) -> Result<Vec<ExecutionWithState>, DbErrorGeneric> {
1444        struct StatementModifier<'a> {
1445            where_vec: Vec<String>,
1446            params: Vec<(&'static str, ToSqlOutput<'a>)>,
1447            limit: u32,
1448            limit_desc: bool,
1449        }
1450
1451        fn paginate<'a, T: rusqlite::ToSql + 'static>(
1452            pagination: &'a Pagination<Option<T>>,
1453            column: &str,
1454            top_level_only: bool,
1455        ) -> StatementModifier<'a> {
1456            let mut where_vec: Vec<String> = vec![];
1457            let mut params: Vec<(&'static str, ToSqlOutput<'a>)> = vec![];
1458            let limit = pagination.length();
1459            let limit_desc = pagination.is_desc();
1460            let rel = pagination.rel();
1461            match pagination {
1462                Pagination::NewerThan {
1463                    cursor: Some(cursor),
1464                    ..
1465                }
1466                | Pagination::OlderThan {
1467                    cursor: Some(cursor),
1468                    ..
1469                } => {
1470                    where_vec.push(format!("{column} {rel} :cursor"));
1471                    params.push((":cursor", cursor.to_sql().expect("FIXME")));
1472                }
1473                _ => {}
1474            }
1475            if top_level_only {
1476                where_vec.push("is_top_level=true".to_string());
1477            }
1478            StatementModifier {
1479                where_vec,
1480                params,
1481                limit,
1482                limit_desc,
1483            }
1484        }
1485
1486        let mut statement_mod = match pagination {
1487            ExecutionListPagination::CreatedBy(pagination) => {
1488                paginate(pagination, "created_at", top_level_only)
1489            }
1490            ExecutionListPagination::ExecutionId(pagination) => {
1491                paginate(pagination, "execution_id", top_level_only)
1492            }
1493        };
1494
1495        let ffqn_temporary;
1496        if let Some(ffqn) = ffqn {
1497            statement_mod.where_vec.push("ffqn = :ffqn".to_string());
1498            ffqn_temporary = ffqn.to_string();
1499            let ffqn = ffqn_temporary
1500                .to_sql()
1501                .expect("string conversion never fails");
1502
1503            statement_mod.params.push((":ffqn", ffqn));
1504        }
1505
1506        let where_str = if statement_mod.where_vec.is_empty() {
1507            String::new()
1508        } else {
1509            format!("WHERE {}", statement_mod.where_vec.join(" AND "))
1510        };
1511        let sql = format!(
1512            r"
1513            SELECT created_at, scheduled_at, state, execution_id, ffqn, corresponding_version, pending_expires_finished,
1514            last_lock_version, executor_id, run_id,
1515            join_set_id, join_set_closing,
1516            result_kind
1517            FROM t_state {where_str} ORDER BY created_at {desc} LIMIT {limit}
1518            ",
1519            desc = if statement_mod.limit_desc { "DESC" } else { "" },
1520            limit = statement_mod.limit,
1521        );
1522        let mut vec: Vec<_> = read_tx
1523            .prepare(&sql)?
1524            .query_map::<_, &[(&'static str, ToSqlOutput)], _>(
1525                statement_mod
1526                    .params
1527                    .into_iter()
1528                    .collect::<Vec<_>>()
1529                    .as_ref(),
1530                |row| {
1531                    let execution_id = row.get::<_, ExecutionId>("execution_id")?;
1532                    let created_at = row.get("created_at")?;
1533                    let scheduled_at = row.get("scheduled_at")?;
1534                    let combined_state = CombinedState::new(
1535                        &CombinedStateDTO {
1536                            state: row.get("state")?,
1537                            ffqn: row.get("ffqn")?,
1538                            pending_expires_finished: row
1539                                .get::<_, DateTime<Utc>>("pending_expires_finished")?,
1540                            executor_id: row.get::<_, Option<ExecutorId>>("executor_id")?,
1541
1542                            last_lock_version: row
1543                                .get::<_, Option<VersionType>>("last_lock_version")?
1544                                .map(Version::new),
1545                            run_id: row.get::<_, Option<RunId>>("run_id")?,
1546                            join_set_id: row.get::<_, Option<JoinSetId>>("join_set_id")?,
1547                            join_set_closing: row.get::<_, Option<bool>>("join_set_closing")?,
1548                            result_kind: row
1549                                .get::<_, Option<FromStrWrapper<PendingStateFinishedResultKind>>>(
1550                                    "result_kind",
1551                                )?
1552                                .map(|wrapper| wrapper.0),
1553                        },
1554                        Version::new(row.get("corresponding_version")?),
1555                    )?;
1556                    Ok(ExecutionWithState {
1557                        execution_id,
1558                        ffqn: combined_state.ffqn,
1559                        pending_state: combined_state.pending_state,
1560                        created_at,
1561                        scheduled_at,
1562                    })
1563                },
1564            )?
1565            .collect::<Vec<Result<_, _>>>()
1566            .into_iter()
1567            .filter_map(|row| match row {
1568                Ok(row) => Some(row),
1569                Err(err) => {
1570                    warn!("Skipping row - {err:?}");
1571                    None
1572                }
1573            })
1574            .collect();
1575
1576        if !statement_mod.limit_desc {
1577            // the list must be sorted in descending order
1578            vec.reverse();
1579        }
1580        Ok(vec)
1581    }
1582
1583    #[instrument(level = Level::TRACE, skip_all, fields(%execution_id, %run_id, %executor_id))]
1584    #[expect(clippy::too_many_arguments)]
1585    fn lock_single_execution(
1586        tx: &Transaction,
1587        created_at: DateTime<Utc>,
1588        component_id: &ComponentId,
1589        execution_id: &ExecutionId,
1590        run_id: RunId,
1591        appending_version: &Version,
1592        executor_id: ExecutorId,
1593        lock_expires_at: DateTime<Utc>,
1594    ) -> Result<LockedExecution, DbErrorWrite> {
1595        debug!("lock_inner");
1596        let combined_state = Self::get_combined_state(tx, execution_id)?;
1597        combined_state.pending_state.can_append_lock(
1598            created_at,
1599            executor_id,
1600            run_id,
1601            lock_expires_at,
1602        )?;
1603        let expected_version = combined_state.get_next_version_assert_not_finished();
1604        Self::check_expected_next_and_appending_version(&expected_version, appending_version)?;
1605
1606        // Append to `execution_log` table.
1607        let event = ExecutionEventInner::Locked {
1608            component_id: component_id.clone(),
1609            executor_id,
1610            lock_expires_at,
1611            run_id,
1612        };
1613        let event_ser = serde_json::to_string(&event).map_err(|err| {
1614            warn!("Cannot serialize {event:?} - {err:?}");
1615            DbErrorWritePermanent::ValidationFailed("parameter serialization error".into())
1616        })?;
1617        let mut stmt = tx
1618            .prepare_cached(
1619                "INSERT INTO t_execution_log \
1620            (execution_id, created_at, json_value, version, variant) \
1621            VALUES \
1622            (:execution_id, :created_at, :json_value, :version, :variant)",
1623            )
1624            .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1625        stmt.execute(named_params! {
1626            ":execution_id": execution_id.to_string(),
1627            ":created_at": created_at,
1628            ":json_value": event_ser,
1629            ":version": appending_version.0,
1630            ":variant": event.variant(),
1631        })
1632        .map_err(|err| {
1633            warn!("Cannot lock execution - {err:?}");
1634            DbErrorWritePermanent::CannotWrite {
1635                reason: "cannot lock execution".into(),
1636                expected_version: None,
1637            }
1638        })?;
1639
1640        // Update `t_state`
1641        let responses = Self::list_responses(tx, execution_id, None)?;
1642        let responses = responses.into_iter().map(|resp| resp.event).collect();
1643        trace!("Responses: {responses:?}");
1644
1645        let intermittent_event_count = Self::update_state_locked_get_intermittent_event_count(
1646            tx,
1647            execution_id,
1648            executor_id,
1649            run_id,
1650            lock_expires_at,
1651            appending_version,
1652        )?;
1653        // Fetch event_history and `Created` event to construct the response.
1654        let mut events = tx
1655            .prepare(
1656                "SELECT json_value FROM t_execution_log WHERE \
1657                execution_id = :execution_id AND (variant = :v1 OR variant = :v2) \
1658                ORDER BY version",
1659            )?
1660            .query_map(
1661                named_params! {
1662                    ":execution_id": execution_id.to_string(),
1663                    ":v1": DUMMY_CREATED.variant(),
1664                    ":v2": DUMMY_HISTORY_EVENT.variant(),
1665                },
1666                |row| {
1667                    row.get::<_, JsonWrapper<ExecutionEventInner>>("json_value")
1668                        .map(|wrapper| wrapper.0)
1669                        .map_err(|serde| {
1670                            error!("Cannot deserialize {row:?} - {serde:?}");
1671                            consistency_rusqlite("cannot deserialize json value")
1672                        })
1673                },
1674            )?
1675            .collect::<Result<Vec<_>, _>>()?
1676            .into_iter()
1677            .collect::<VecDeque<_>>();
1678        let Some(ExecutionEventInner::Created {
1679            ffqn,
1680            params,
1681            retry_exp_backoff,
1682            max_retries,
1683            parent,
1684            metadata,
1685            ..
1686        }) = events.pop_front()
1687        else {
1688            error!("Execution log must contain at least `Created` event");
1689            return Err(consistency_db_err("execution log must contain `Created` event").into());
1690        };
1691
1692        let event_history = events
1693            .into_iter()
1694            .map(|event| {
1695                if let ExecutionEventInner::HistoryEvent { event } = event {
1696                    Ok(event)
1697                } else {
1698                    error!("Rows can only contain `Created` and `HistoryEvent` event kinds");
1699                    Err(consistency_db_err(
1700                        "rows can only contain `Created` and `HistoryEvent` event kinds",
1701                    ))
1702                }
1703            })
1704            .collect::<Result<Vec<_>, _>>()?;
1705        Ok(LockedExecution {
1706            execution_id: execution_id.clone(),
1707            metadata,
1708            run_id,
1709            next_version: appending_version.increment(),
1710            ffqn,
1711            params,
1712            event_history,
1713            responses,
1714            retry_exp_backoff,
1715            max_retries,
1716            parent,
1717            intermittent_event_count,
1718        })
1719    }
1720
1721    fn count_join_next(
1722        tx: &Transaction,
1723        execution_id: &ExecutionId,
1724        join_set_id: &JoinSetId,
1725    ) -> Result<u64, DbErrorRead> {
1726        let mut stmt = tx.prepare(
1727            "SELECT COUNT(*) as count FROM t_execution_log WHERE execution_id = :execution_id AND join_set_id = :join_set_id \
1728            AND history_event_type = :join_next",
1729        )?;
1730        Ok(stmt.query_row(
1731            named_params! {
1732                ":execution_id": execution_id.to_string(),
1733                ":join_set_id": join_set_id.to_string(),
1734                ":join_next": HISTORY_EVENT_TYPE_JOIN_NEXT,
1735            },
1736            |row| row.get("count"),
1737        )?)
1738    }
1739
1740    #[instrument(level = Level::TRACE, skip_all, fields(%execution_id))]
1741    #[expect(clippy::needless_return)]
1742    fn append(
1743        tx: &Transaction,
1744        execution_id: &ExecutionId,
1745        req: AppendRequest,
1746        appending_version: Version,
1747    ) -> Result<(AppendResponse, AppendNotifier), DbErrorWrite> {
1748        if matches!(req.event, ExecutionEventInner::Created { .. }) {
1749            return Err(DbErrorWrite::Permanent(
1750                DbErrorWritePermanent::ValidationFailed(
1751                    "cannot append `Created` event - use `create` instead".into(),
1752                ),
1753            ));
1754        }
1755        if let AppendRequest {
1756            event:
1757                ExecutionEventInner::Locked {
1758                    component_id,
1759                    executor_id,
1760                    run_id,
1761                    lock_expires_at,
1762                },
1763            created_at,
1764        } = req
1765        {
1766            return Self::lock_single_execution(
1767                tx,
1768                created_at,
1769                &component_id,
1770                execution_id,
1771                run_id,
1772                &appending_version,
1773                executor_id,
1774                lock_expires_at,
1775            )
1776            .map(|locked_execution| (locked_execution.next_version, AppendNotifier::default()));
1777        }
1778
1779        let combined_state = Self::get_combined_state(tx, execution_id)?;
1780        if combined_state.pending_state.is_finished() {
1781            debug!("Execution is already finished");
1782            return Err(DbErrorWrite::Permanent(
1783                DbErrorWritePermanent::CannotWrite {
1784                    reason: "already finished".into(),
1785                    expected_version: None,
1786                },
1787            ));
1788        }
1789
1790        Self::check_expected_next_and_appending_version(
1791            &combined_state.get_next_version_assert_not_finished(),
1792            &appending_version,
1793        )?;
1794        let event_ser = serde_json::to_string(&req.event).map_err(|err| {
1795            error!("Cannot serialize {:?} - {err:?}", req.event);
1796            DbErrorWritePermanent::ValidationFailed("parameter serialization error".into())
1797        })?;
1798
1799        let mut stmt = tx.prepare(
1800                    "INSERT INTO t_execution_log (execution_id, created_at, json_value, version, variant, join_set_id) \
1801                    VALUES (:execution_id, :created_at, :json_value, :version, :variant, :join_set_id)")
1802                    ?;
1803        stmt.execute(named_params! {
1804            ":execution_id": execution_id.to_string(),
1805            ":created_at": req.created_at,
1806            ":json_value": event_ser,
1807            ":version": appending_version.0,
1808            ":variant": req.event.variant(),
1809            ":join_set_id": req.event.join_set_id().map(std::string::ToString::to_string),
1810        })?;
1811        // Calculate current pending state
1812
1813        match &req.event {
1814            ExecutionEventInner::Created { .. } => {
1815                unreachable!("handled in the caller")
1816            }
1817
1818            ExecutionEventInner::Locked { .. } => {
1819                unreachable!("handled above")
1820            }
1821
1822            ExecutionEventInner::TemporarilyFailed {
1823                backoff_expires_at, ..
1824            }
1825            | ExecutionEventInner::TemporarilyTimedOut {
1826                backoff_expires_at, ..
1827            } => {
1828                let (next_version, notifier) = Self::update_state_pending_after_event_appended(
1829                    tx,
1830                    execution_id,
1831                    &appending_version,
1832                    *backoff_expires_at,
1833                    true, // an intermittent failure
1834                )?;
1835                return Ok((next_version, notifier));
1836            }
1837
1838            ExecutionEventInner::Unlocked {
1839                backoff_expires_at, ..
1840            } => {
1841                let (next_version, notifier) = Self::update_state_pending_after_event_appended(
1842                    tx,
1843                    execution_id,
1844                    &appending_version,
1845                    *backoff_expires_at,
1846                    false, // not an intermittent failure
1847                )?;
1848                return Ok((next_version, notifier));
1849            }
1850
1851            ExecutionEventInner::Finished { result, .. } => {
1852                Self::update_state_finished(
1853                    tx,
1854                    execution_id,
1855                    &appending_version,
1856                    req.created_at,
1857                    PendingStateFinishedResultKind::from(result),
1858                )?;
1859                return Ok((
1860                    appending_version,
1861                    AppendNotifier {
1862                        pending_at: None,
1863                        execution_finished: Some(NotifierExecutionFinished {
1864                            execution_id: execution_id.clone(),
1865                            retval: result.clone(),
1866                        }),
1867                        response: None,
1868                    },
1869                ));
1870            }
1871
1872            ExecutionEventInner::HistoryEvent {
1873                event:
1874                    HistoryEvent::JoinSetCreate { .. }
1875                    | HistoryEvent::JoinSetRequest {
1876                        request: JoinSetRequest::ChildExecutionRequest { .. },
1877                        ..
1878                    }
1879                    | HistoryEvent::Persist { .. }
1880                    | HistoryEvent::Schedule { .. }
1881                    | HistoryEvent::Stub { .. }
1882                    | HistoryEvent::JoinNextTooMany { .. },
1883            } => {
1884                return Ok((
1885                    Self::bump_state_next_version(tx, execution_id, &appending_version, None)?,
1886                    AppendNotifier::default(),
1887                ));
1888            }
1889
1890            ExecutionEventInner::HistoryEvent {
1891                event:
1892                    HistoryEvent::JoinSetRequest {
1893                        join_set_id,
1894                        request:
1895                            JoinSetRequest::DelayRequest {
1896                                delay_id,
1897                                expires_at,
1898                                ..
1899                            },
1900                    },
1901            } => {
1902                return Ok((
1903                    Self::bump_state_next_version(
1904                        tx,
1905                        execution_id,
1906                        &appending_version,
1907                        Some(DelayReq {
1908                            join_set_id: join_set_id.clone(),
1909                            delay_id: delay_id.clone(),
1910                            expires_at: *expires_at,
1911                        }),
1912                    )?,
1913                    AppendNotifier::default(),
1914                ));
1915            }
1916
1917            ExecutionEventInner::HistoryEvent {
1918                event:
1919                    HistoryEvent::JoinNext {
1920                        join_set_id,
1921                        run_expires_at,
1922                        closing,
1923                        requested_ffqn: _,
1924                    },
1925            } => {
1926                // Did the response arrive already?
1927                let join_next_count = Self::count_join_next(tx, execution_id, join_set_id)?;
1928                let nth_response =
1929                    Self::nth_response(tx, execution_id, join_set_id, join_next_count - 1)?; // Skip n-1 rows
1930                trace!("join_next_count: {join_next_count}, nth_response: {nth_response:?}");
1931                assert!(join_next_count > 0);
1932                if let Some(ResponseWithCursor {
1933                    event:
1934                        JoinSetResponseEventOuter {
1935                            created_at: nth_created_at,
1936                            ..
1937                        },
1938                    cursor: _,
1939                }) = nth_response
1940                {
1941                    let scheduled_at = max(*run_expires_at, nth_created_at); // No need to block
1942                    let (next_version, notifier) = Self::update_state_pending_after_event_appended(
1943                        tx,
1944                        execution_id,
1945                        &appending_version,
1946                        scheduled_at,
1947                        false, // not an intermittent failure
1948                    )?;
1949                    return Ok((next_version, notifier));
1950                }
1951                return Ok((
1952                    Self::update_state_blocked(
1953                        tx,
1954                        execution_id,
1955                        &appending_version,
1956                        join_set_id,
1957                        *run_expires_at,
1958                        *closing,
1959                    )?,
1960                    AppendNotifier::default(),
1961                ));
1962            }
1963        }
1964    }
1965
1966    fn append_response(
1967        tx: &Transaction,
1968        execution_id: &ExecutionId,
1969        response_outer: JoinSetResponseEventOuter,
1970    ) -> Result<AppendNotifier, DbErrorWrite> {
1971        let mut stmt = tx.prepare(
1972            "INSERT INTO t_join_set_response (execution_id, created_at, join_set_id, delay_id, child_execution_id, finished_version) \
1973                    VALUES (:execution_id, :created_at, :join_set_id, :delay_id, :child_execution_id, :finished_version)",
1974        )?;
1975        let join_set_id = &response_outer.event.join_set_id;
1976        let delay_id = match &response_outer.event.event {
1977            JoinSetResponse::DelayFinished { delay_id } => Some(delay_id.to_string()),
1978            JoinSetResponse::ChildExecutionFinished { .. } => None,
1979        };
1980        let (child_execution_id, finished_version) = match &response_outer.event.event {
1981            JoinSetResponse::ChildExecutionFinished {
1982                child_execution_id,
1983                finished_version,
1984                result: _,
1985            } => (
1986                Some(child_execution_id.to_string()),
1987                Some(finished_version.0),
1988            ),
1989            JoinSetResponse::DelayFinished { .. } => (None, None),
1990        };
1991
1992        stmt.execute(named_params! {
1993            ":execution_id": execution_id.to_string(),
1994            ":created_at": response_outer.created_at,
1995            ":join_set_id": join_set_id.to_string(),
1996            ":delay_id": delay_id,
1997            ":child_execution_id": child_execution_id,
1998            ":finished_version": finished_version,
1999        })?;
2000
2001        // if the execution is going to be unblocked by this response...
2002        let combined_state = Self::get_combined_state(tx, execution_id)?;
2003        debug!("previous_pending_state: {combined_state:?}");
2004        let mut notifier = if let PendingState::BlockedByJoinSet {
2005            join_set_id: found_join_set_id,
2006            lock_expires_at, // Set to a future time if the worker is keeping the execution warm waiting for the result.
2007            closing: _,
2008        } = combined_state.pending_state
2009            && *join_set_id == found_join_set_id
2010        {
2011            // PendingAt should be set to current time if called from expired_timers_watcher,
2012            // or to a future time if the execution is hot.
2013            let scheduled_at = max(lock_expires_at, response_outer.created_at);
2014            // TODO: Add diff test
2015            // Unblock the state.
2016            Self::update_state_pending_after_response_appended(
2017                tx,
2018                execution_id,
2019                scheduled_at,
2020                &combined_state.corresponding_version, // not changing the version
2021            )?
2022        } else {
2023            AppendNotifier::default()
2024        };
2025        if let JoinSetResponseEvent {
2026            join_set_id,
2027            event: JoinSetResponse::DelayFinished { delay_id },
2028        } = &response_outer.event
2029        {
2030            debug!(%join_set_id, %delay_id, "Deleting from `t_delay`");
2031            let mut stmt =
2032                tx.prepare_cached("DELETE FROM t_delay WHERE execution_id = :execution_id AND join_set_id = :join_set_id AND delay_id = :delay_id")
2033                ?;
2034            stmt.execute(named_params! {
2035                ":execution_id": execution_id.to_string(),
2036                ":join_set_id": join_set_id.to_string(),
2037                ":delay_id": delay_id.to_string(),
2038            })?;
2039        }
2040        notifier.response = Some((execution_id.clone(), response_outer));
2041        Ok(notifier)
2042    }
2043
2044    fn append_backtrace(
2045        tx: &Transaction,
2046        backtrace_info: &BacktraceInfo,
2047    ) -> Result<(), DbErrorWrite> {
2048        let backtrace = serde_json::to_string(&backtrace_info.wasm_backtrace).map_err(|err| {
2049            warn!(
2050                "Cannot serialize backtrace {:?} - {err:?}",
2051                backtrace_info.wasm_backtrace
2052            );
2053            DbErrorWritePermanent::ValidationFailed("cannot serialize backtrace".into())
2054        })?;
2055        let mut stmt = tx
2056            .prepare(
2057                "INSERT INTO t_backtrace (execution_id, component_id, version_min_including, version_max_excluding, wasm_backtrace) \
2058                    VALUES (:execution_id, :component_id, :version_min_including, :version_max_excluding, :wasm_backtrace)",
2059            )
2060            ?;
2061        stmt.execute(named_params! {
2062            ":execution_id": backtrace_info.execution_id.to_string(),
2063            ":component_id": backtrace_info.component_id.to_string(),
2064            ":version_min_including": backtrace_info.version_min_including.0,
2065            ":version_max_excluding": backtrace_info.version_max_excluding.0,
2066            ":wasm_backtrace": backtrace,
2067        })?;
2068        Ok(())
2069    }
2070
2071    #[cfg(feature = "test")]
2072    fn get(
2073        tx: &Transaction,
2074        execution_id: &ExecutionId,
2075    ) -> Result<concepts::storage::ExecutionLog, DbErrorRead> {
2076        let mut stmt = tx.prepare(
2077            "SELECT created_at, json_value FROM t_execution_log WHERE \
2078                        execution_id = :execution_id ORDER BY version",
2079        )?;
2080        let events = stmt
2081            .query_map(
2082                named_params! {
2083                    ":execution_id": execution_id.to_string(),
2084                },
2085                |row| {
2086                    let created_at = row.get("created_at")?;
2087                    let event = row
2088                        .get::<_, JsonWrapper<ExecutionEventInner>>("json_value")
2089                        .map_err(|serde| {
2090                            error!("Cannot deserialize {row:?} - {serde:?}");
2091                            consistency_rusqlite("cannot deserialize event")
2092                        })?
2093                        .0;
2094
2095                    Ok(ExecutionEvent {
2096                        created_at,
2097                        event,
2098                        backtrace_id: None,
2099                    })
2100                },
2101            )?
2102            .collect::<Result<Vec<_>, _>>()?;
2103        if events.is_empty() {
2104            return Err(DbErrorRead::NotFound);
2105        }
2106        let combined_state = Self::get_combined_state(tx, execution_id)?;
2107        let responses = Self::list_responses(tx, execution_id, None)?
2108            .into_iter()
2109            .map(|resp| resp.event)
2110            .collect();
2111        Ok(concepts::storage::ExecutionLog {
2112            execution_id: execution_id.clone(),
2113            events,
2114            responses,
2115            next_version: combined_state.get_next_version_or_finished(), // In case of finished, this will be the already last version
2116            pending_state: combined_state.pending_state,
2117        })
2118    }
2119
2120    fn list_execution_events(
2121        tx: &Transaction,
2122        execution_id: &ExecutionId,
2123        version_min: VersionType,
2124        version_max_excluding: VersionType,
2125        include_backtrace_id: bool,
2126    ) -> Result<Vec<ExecutionEvent>, DbErrorRead> {
2127        let select = if include_backtrace_id {
2128            "SELECT
2129                log.created_at,
2130                log.json_value,
2131                -- Select version_min_including from backtrace if a match is found, otherwise NULL
2132                bt.version_min_including AS backtrace_id
2133            FROM
2134                t_execution_log AS log
2135            LEFT OUTER JOIN -- Use LEFT JOIN to keep all logs even if no backtrace matches
2136                t_backtrace AS bt ON log.execution_id = bt.execution_id
2137                                -- Check if the log's version falls within the backtrace's range
2138                                AND log.version >= bt.version_min_including
2139                                AND log.version < bt.version_max_excluding
2140            WHERE
2141                log.execution_id = :execution_id
2142                AND log.version >= :version_min
2143                AND log.version < :version_max_excluding
2144            ORDER BY
2145                log.version;"
2146        } else {
2147            "SELECT
2148                created_at, json_value, NULL as backtrace_id
2149            FROM t_execution_log WHERE
2150                execution_id = :execution_id AND version >= :version_min AND version < :version_max_excluding
2151            ORDER BY version"
2152        };
2153        tx.prepare(select)?
2154            .query_map(
2155                named_params! {
2156                    ":execution_id": execution_id.to_string(),
2157                    ":version_min": version_min,
2158                    ":version_max_excluding": version_max_excluding
2159                },
2160                |row| {
2161                    let created_at = row.get("created_at")?;
2162                    let backtrace_id = row
2163                        .get::<_, Option<VersionType>>("backtrace_id")?
2164                        .map(Version::new);
2165                    let event = row
2166                        .get::<_, JsonWrapper<ExecutionEventInner>>("json_value")
2167                        .map(|event| ExecutionEvent {
2168                            created_at,
2169                            event: event.0,
2170                            backtrace_id,
2171                        })
2172                        .map_err(|serde| {
2173                            error!("Cannot deserialize {row:?} - {serde:?}");
2174                            consistency_rusqlite("cannot deserialize")
2175                        })?;
2176                    Ok(event)
2177                },
2178            )?
2179            .collect::<Result<Vec<_>, _>>()
2180            .map_err(DbErrorRead::from)
2181    }
2182
2183    fn get_execution_event(
2184        tx: &Transaction,
2185        execution_id: &ExecutionId,
2186        version: VersionType,
2187    ) -> Result<ExecutionEvent, DbErrorRead> {
2188        let mut stmt = tx.prepare(
2189            "SELECT created_at, json_value FROM t_execution_log WHERE \
2190                        execution_id = :execution_id AND version = :version",
2191        )?;
2192        stmt.query_row(
2193            named_params! {
2194                ":execution_id": execution_id.to_string(),
2195                ":version": version,
2196            },
2197            |row| {
2198                let created_at = row.get("created_at")?;
2199                let event = row
2200                    .get::<_, JsonWrapper<ExecutionEventInner>>("json_value")
2201                    .map_err(|serde| {
2202                        error!("Cannot deserialize {row:?} - {serde:?}");
2203                        consistency_rusqlite("cannot deserialize event")
2204                    })?;
2205
2206                Ok(ExecutionEvent {
2207                    created_at,
2208                    event: event.0,
2209                    backtrace_id: None,
2210                })
2211            },
2212        )
2213        .map_err(DbErrorRead::from)
2214    }
2215
2216    fn list_responses(
2217        tx: &Transaction,
2218        execution_id: &ExecutionId,
2219        pagination: Option<Pagination<u32>>,
2220    ) -> Result<Vec<ResponseWithCursor>, DbErrorRead> {
2221        // TODO: Add test
2222        let mut params: Vec<(&'static str, Box<dyn rusqlite::ToSql>)> = vec![];
2223        let mut sql = "SELECT \
2224            r.id, r.created_at, r.join_set_id,  r.delay_id, r.child_execution_id, r.finished_version, l.json_value \
2225            FROM t_join_set_response r LEFT OUTER JOIN t_execution_log l ON r.child_execution_id = l.execution_id \
2226            WHERE \
2227            r.execution_id = :execution_id \
2228            AND ( r.finished_version = l.version OR r.child_execution_id IS NULL ) \
2229            "
2230        .to_string();
2231        let limit = match &pagination {
2232            Some(
2233                pagination @ (Pagination::NewerThan { cursor, .. }
2234                | Pagination::OlderThan { cursor, .. }),
2235            ) => {
2236                params.push((":cursor", Box::new(cursor)));
2237                write!(sql, " AND r.id {rel} :cursor", rel = pagination.rel()).unwrap();
2238                Some(pagination.length())
2239            }
2240            None => None,
2241        };
2242        sql.push_str(" ORDER BY id");
2243        if pagination.as_ref().is_some_and(Pagination::is_desc) {
2244            sql.push_str(" DESC");
2245        }
2246        if let Some(limit) = limit {
2247            write!(sql, " LIMIT {limit}").unwrap();
2248        }
2249        params.push((":execution_id", Box::new(execution_id.to_string())));
2250        tx.prepare(&sql)?
2251            .query_map::<_, &[(&'static str, &dyn ToSql)], _>(
2252                params
2253                    .iter()
2254                    .map(|(key, value)| (*key, value.as_ref()))
2255                    .collect::<Vec<_>>()
2256                    .as_ref(),
2257                Self::parse_response_with_cursor,
2258            )?
2259            .collect::<Result<Vec<_>, rusqlite::Error>>()
2260            .map_err(DbErrorRead::from)
2261    }
2262
2263    fn parse_response_with_cursor(
2264        row: &rusqlite::Row<'_>,
2265    ) -> Result<ResponseWithCursor, rusqlite::Error> {
2266        let id = row.get("id")?;
2267        let created_at: DateTime<Utc> = row.get("created_at")?;
2268        let join_set_id = row.get::<_, JoinSetId>("join_set_id")?;
2269        let event = match (
2270            row.get::<_, Option<DelayId>>("delay_id")?,
2271            row.get::<_, Option<ExecutionIdDerived>>("child_execution_id")?,
2272            row.get::<_, Option<VersionType>>("finished_version")?,
2273            row.get::<_, Option<JsonWrapper<ExecutionEventInner>>>("json_value")?,
2274        ) {
2275            (Some(delay_id), None, None, None) => JoinSetResponse::DelayFinished { delay_id },
2276            (
2277                None,
2278                Some(child_execution_id),
2279                Some(finished_version),
2280                Some(JsonWrapper(ExecutionEventInner::Finished { result, .. })),
2281            ) => JoinSetResponse::ChildExecutionFinished {
2282                child_execution_id,
2283                finished_version: Version(finished_version),
2284                result,
2285            },
2286            (delay, child, finished, result) => {
2287                error!(
2288                    "Invalid row in t_join_set_response {id} - {:?} {child:?} {finished:?} {:?}",
2289                    delay,
2290                    result.map(|it| it.0)
2291                );
2292                return Err(consistency_rusqlite("invalid row in t_join_set_response"));
2293            }
2294        };
2295        Ok(ResponseWithCursor {
2296            cursor: id,
2297            event: JoinSetResponseEventOuter {
2298                event: JoinSetResponseEvent { join_set_id, event },
2299                created_at,
2300            },
2301        })
2302    }
2303
2304    fn nth_response(
2305        tx: &Transaction,
2306        execution_id: &ExecutionId,
2307        join_set_id: &JoinSetId,
2308        skip_rows: u64,
2309    ) -> Result<Option<ResponseWithCursor>, DbErrorRead> {
2310        // TODO: Add test
2311        tx
2312            .prepare(
2313                "SELECT r.id, r.created_at, r.join_set_id, \
2314                    r.delay_id, \
2315                    r.child_execution_id, r.finished_version, l.json_value \
2316                    FROM t_join_set_response r LEFT OUTER JOIN t_execution_log l ON r.child_execution_id = l.execution_id \
2317                    WHERE \
2318                    r.execution_id = :execution_id AND r.join_set_id = :join_set_id AND \
2319                    (
2320                    r.finished_version = l.version \
2321                    OR \
2322                    r.child_execution_id IS NULL \
2323                    ) \
2324                    ORDER BY id \
2325                    LIMIT 1 OFFSET :offset",
2326            )
2327            ?
2328            .query_row(
2329                named_params! {
2330                    ":execution_id": execution_id.to_string(),
2331                    ":join_set_id": join_set_id.to_string(),
2332                    ":offset": skip_rows,
2333                },
2334                Self::parse_response_with_cursor,
2335            )
2336            .optional()
2337            .map_err(DbErrorRead::from)
2338    }
2339
2340    // TODO(perf): Instead of OFFSET an per-execution sequential ID could improve the read performance.
2341    #[instrument(level = Level::TRACE, skip_all)]
2342    fn get_responses_with_offset(
2343        tx: &Transaction,
2344        execution_id: &ExecutionId,
2345        skip_rows: usize,
2346    ) -> Result<Vec<JoinSetResponseEventOuter>, DbErrorRead> {
2347        // TODO: Add test
2348        tx.prepare(
2349            "SELECT r.id, r.created_at, r.join_set_id, \
2350            r.delay_id, \
2351            r.child_execution_id, r.finished_version, l.json_value \
2352            FROM t_join_set_response r LEFT OUTER JOIN t_execution_log l ON r.child_execution_id = l.execution_id \
2353            WHERE \
2354            r.execution_id = :execution_id AND \
2355            ( \
2356            r.finished_version = l.version \
2357            OR r.child_execution_id IS NULL \
2358            ) \
2359            ORDER BY id \
2360            LIMIT -1 OFFSET :offset",
2361        )
2362        ?
2363        .query_map(
2364            named_params! {
2365                ":execution_id": execution_id.to_string(),
2366                ":offset": skip_rows,
2367            },
2368            Self::parse_response_with_cursor,
2369        )
2370        ?
2371        .collect::<Result<Vec<_>, _>>()
2372        .map(|resp| resp.into_iter().map(|vec| vec.event).collect())
2373        .map_err(DbErrorRead::from)
2374    }
2375
2376    fn get_pending_of_single_ffqn(
2377        mut stmt: CachedStatement,
2378        batch_size: usize,
2379        pending_at_or_sooner: DateTime<Utc>,
2380        ffqn: &FunctionFqn,
2381    ) -> Result<Vec<(ExecutionId, Version)>, ()> {
2382        stmt.query_map(
2383            named_params! {
2384                ":pending_expires_finished": pending_at_or_sooner,
2385                ":ffqn": ffqn.to_string(),
2386                ":batch_size": batch_size,
2387            },
2388            |row| {
2389                let execution_id = row.get::<_, String>("execution_id")?.parse::<ExecutionId>();
2390                let next_version =
2391                    Version::new(row.get::<_, VersionType>("corresponding_version")?).increment();
2392                Ok(execution_id.map(|exe| (exe, next_version)))
2393            },
2394        )
2395        .map_err(|err| {
2396            warn!("Ignoring consistency error {err:?}");
2397        })?
2398        .collect::<Result<Vec<_>, _>>()
2399        .map_err(|err| {
2400            warn!("Ignoring consistency error {err:?}");
2401        })?
2402        .into_iter()
2403        .collect::<Result<Vec<_>, _>>()
2404        .map_err(|err| {
2405            warn!("Ignoring consistency error {err:?}");
2406        })
2407    }
2408
2409    /// Get executions and their next versions
2410    fn get_pending(
2411        conn: &Connection,
2412        batch_size: usize,
2413        pending_at_or_sooner: DateTime<Utc>,
2414        ffqns: &[FunctionFqn],
2415    ) -> Result<Vec<(ExecutionId, Version)>, DbErrorGeneric> {
2416        let mut execution_ids_versions = Vec::with_capacity(batch_size);
2417        for ffqn in ffqns {
2418            // Select executions in PendingAt.
2419            let stmt = conn
2420                .prepare_cached(&format!(
2421                    r#"
2422                    SELECT execution_id, corresponding_version FROM t_state WHERE
2423                    state = "{STATE_PENDING_AT}" AND
2424                    pending_expires_finished <= :pending_expires_finished AND ffqn = :ffqn
2425                    ORDER BY pending_expires_finished LIMIT :batch_size
2426                    "#
2427                ))
2428                .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
2429
2430            if let Ok(execs_and_versions) = Self::get_pending_of_single_ffqn(
2431                stmt,
2432                batch_size - execution_ids_versions.len(),
2433                pending_at_or_sooner,
2434                ffqn,
2435            ) {
2436                execution_ids_versions.extend(execs_and_versions);
2437                if execution_ids_versions.len() == batch_size {
2438                    // Prioritieze lowering of db requests, although ffqns later in the list might get starved.
2439                    break;
2440                }
2441                // consistency errors are ignored since we want to return at least some rows.
2442            }
2443        }
2444        Ok(execution_ids_versions)
2445    }
2446
2447    // Must be called after write transaction for a correct happens-before relationship.
2448    #[instrument(level = Level::TRACE, skip_all)]
2449    fn notify_all(&self, notifiers: Vec<AppendNotifier>, current_time: DateTime<Utc>) {
2450        let (pending_ats, finished_execs, responses) = {
2451            let (mut pending_ats, mut finished_execs, mut responses) =
2452                (Vec::new(), Vec::new(), Vec::new());
2453            for notifier in notifiers {
2454                if let Some(pending_at) = notifier.pending_at {
2455                    pending_ats.push(pending_at);
2456                }
2457                if let Some(finished) = notifier.execution_finished {
2458                    finished_execs.push(finished);
2459                }
2460                if let Some(response) = notifier.response {
2461                    responses.push(response);
2462                }
2463            }
2464            (pending_ats, finished_execs, responses)
2465        };
2466
2467        // Notify pending_at subscribers.
2468        if !pending_ats.is_empty() {
2469            let guard = self.0.pending_ffqn_subscribers.lock().unwrap();
2470            for pending_at in pending_ats {
2471                Self::notify_pending_locked(&pending_at, current_time, &guard);
2472            }
2473        }
2474        // Notify execution finished subscribers.
2475        // Every NotifierExecutionFinished value belongs to a different execution, since only `append(Finished)` can produce `NotifierExecutionFinished`.
2476        if !finished_execs.is_empty() {
2477            let mut guard = self.0.execution_finished_subscribers.lock().unwrap();
2478            for finished in finished_execs {
2479                if let Some(listeners_of_exe_id) = guard.remove(&finished.execution_id) {
2480                    for (_tag, sender) in listeners_of_exe_id {
2481                        // Sending while holding the lock but the oneshot sender does not block.
2482                        // If `wait_for_finished_result` happens after the append, it would receive the finished value instead.
2483                        let _ = sender.send(finished.retval.clone());
2484                    }
2485                }
2486            }
2487        }
2488        // Notify response subscribers.
2489        if !responses.is_empty() {
2490            let mut guard = self.0.response_subscribers.lock().unwrap();
2491            for (execution_id, response) in responses {
2492                if let Some((sender, _)) = guard.remove(&execution_id) {
2493                    let _ = sender.send(response);
2494                }
2495            }
2496        }
2497    }
2498
2499    fn notify_pending_locked(
2500        notifier: &NotifierPendingAt,
2501        current_time: DateTime<Utc>,
2502        ffqn_to_pending_subscription: &std::sync::MutexGuard<
2503            HashMap<FunctionFqn, (mpsc::Sender<()>, u64)>,
2504        >,
2505    ) {
2506        // No need to remove here, cleanup is handled by the caller.
2507        if notifier.scheduled_at <= current_time
2508            && let Some((subscription, _)) = ffqn_to_pending_subscription.get(&notifier.ffqn)
2509        {
2510            debug!("Notifying pending subscriber");
2511            // Does not block
2512            let _ = subscription.try_send(());
2513        }
2514    }
2515}
2516
2517#[async_trait]
2518impl DbExecutor for SqlitePool {
2519    #[instrument(level = Level::TRACE, skip(self))]
2520    async fn lock_pending(
2521        &self,
2522        batch_size: usize,
2523        pending_at_or_sooner: DateTime<Utc>,
2524        ffqns: Arc<[FunctionFqn]>,
2525        created_at: DateTime<Utc>,
2526        component_id: ComponentId,
2527        executor_id: ExecutorId,
2528        lock_expires_at: DateTime<Utc>,
2529        run_id: RunId,
2530    ) -> Result<LockPendingResponse, DbErrorGeneric> {
2531        let execution_ids_versions = self
2532            .transaction(
2533                move |conn| Self::get_pending(conn, batch_size, pending_at_or_sooner, &ffqns),
2534                "get_pending",
2535            )
2536            .await?;
2537        if execution_ids_versions.is_empty() {
2538            Ok(vec![])
2539        } else {
2540            debug!("Locking {execution_ids_versions:?}");
2541            self.transaction(
2542                move |tx| {
2543                    let mut locked_execs = Vec::with_capacity(execution_ids_versions.len());
2544                    // Append lock
2545                    for (execution_id, version) in &execution_ids_versions {
2546                        match Self::lock_single_execution(
2547                            tx,
2548                            created_at,
2549                            &component_id,
2550                            execution_id,
2551                            run_id,
2552                            version,
2553                            executor_id,
2554                            lock_expires_at,
2555                        ) {
2556                            Ok(locked) => locked_execs.push(locked),
2557                            Err(err) => {
2558                                warn!("Locking row {execution_id} failed - {err:?}");
2559                            }
2560                        }
2561                    }
2562                    Ok(locked_execs)
2563                },
2564                "lock_pending",
2565            )
2566            .await
2567        }
2568    }
2569
2570    #[instrument(level = Level::DEBUG, skip(self))]
2571    async fn lock_one(
2572        &self,
2573        created_at: DateTime<Utc>,
2574        component_id: ComponentId,
2575        execution_id: &ExecutionId,
2576        run_id: RunId,
2577        version: Version,
2578        executor_id: ExecutorId,
2579        lock_expires_at: DateTime<Utc>,
2580    ) -> Result<LockedExecution, DbErrorWrite> {
2581        debug!(%execution_id, "lock_extend");
2582        let execution_id = execution_id.clone();
2583        self.transaction(
2584            move |tx| {
2585                Self::lock_single_execution(
2586                    tx,
2587                    created_at,
2588                    &component_id,
2589                    &execution_id,
2590                    run_id,
2591                    &version,
2592                    executor_id,
2593                    lock_expires_at,
2594                )
2595            },
2596            "lock_inner",
2597        )
2598        .await
2599    }
2600
2601    #[instrument(level = Level::DEBUG, skip(self, req))]
2602    async fn append(
2603        &self,
2604        execution_id: ExecutionId,
2605        version: Version,
2606        req: AppendRequest,
2607    ) -> Result<AppendResponse, DbErrorWrite> {
2608        debug!(%req, "append");
2609        trace!(?req, "append");
2610        let created_at = req.created_at;
2611        let (version, notifier) = self
2612            .transaction(
2613                move |tx| Self::append(tx, &execution_id, req.clone(), version.clone()),
2614                "append",
2615            )
2616            .await?;
2617        self.notify_all(vec![notifier], created_at);
2618        Ok(version)
2619    }
2620
2621    #[instrument(level = Level::DEBUG, skip_all)]
2622    async fn append_batch_respond_to_parent(
2623        &self,
2624        events: AppendEventsToExecution,
2625        response: AppendResponseToExecution,
2626        current_time: DateTime<Utc>,
2627    ) -> Result<AppendBatchResponse, DbErrorWrite> {
2628        debug!("append_batch_respond_to_parent");
2629        if events.execution_id == response.parent_execution_id {
2630            // Pending state would be wrong.
2631            // This is not a panic because it depends on DB state.
2632            return Err(DbErrorWrite::Permanent(
2633                DbErrorWritePermanent::ValidationFailed(
2634                    "Parameters `execution_id` and `parent_execution_id` cannot be the same".into(),
2635                ),
2636            ));
2637        }
2638        assert!(!events.batch.is_empty(), "Empty batch request"); // FIXME
2639        let (version, notifiers) = {
2640            self.transaction(
2641                move |tx| {
2642                    let mut version = events.version.clone();
2643                    let mut notifier_of_child = None;
2644                    for append_request in &events.batch {
2645                        let (v, n) = Self::append(
2646                            tx,
2647                            &events.execution_id,
2648                            append_request.clone(),
2649                            version,
2650                        )?;
2651                        version = v;
2652                        notifier_of_child = Some(n);
2653                    }
2654
2655                    let pending_at_parent = Self::append_response(
2656                        tx,
2657                        &response.parent_execution_id,
2658                        response.parent_response_event.clone(),
2659                    )?;
2660                    Ok::<_, DbErrorWrite>((
2661                        version,
2662                        vec![
2663                            notifier_of_child.expect("checked that the batch is not empty"),
2664                            pending_at_parent,
2665                        ],
2666                    ))
2667                },
2668                "append_batch_respond_to_parent",
2669            )
2670            .await?
2671        };
2672        self.notify_all(notifiers, current_time);
2673        Ok(version)
2674    }
2675
2676    // Supports only one subscriber per ffqn.
2677    // A new subscriber replaces the old one, which will eventually time out, which is fine.
2678    #[instrument(level = Level::TRACE, skip(self, timeout_fut))]
2679    async fn wait_for_pending(
2680        &self,
2681        pending_at_or_sooner: DateTime<Utc>,
2682        ffqns: Arc<[FunctionFqn]>,
2683        timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
2684    ) {
2685        let unique_tag: u64 = rand::random();
2686        let (sender, mut receiver) = mpsc::channel(1); // senders must use `try_send`
2687        {
2688            let mut ffqn_to_pending_subscription = self.0.pending_ffqn_subscribers.lock().unwrap();
2689            for ffqn in ffqns.as_ref() {
2690                ffqn_to_pending_subscription.insert(ffqn.clone(), (sender.clone(), unique_tag));
2691            }
2692        }
2693        async {
2694            let Ok(execution_ids_versions) = self
2695                .transaction(
2696                    {
2697                        let ffqns = ffqns.clone();
2698                        move |conn| Self::get_pending(conn, 1, pending_at_or_sooner, ffqns.as_ref())
2699                    },
2700                    "subscribe_to_pending",
2701                )
2702                .await
2703            else {
2704                trace!(
2705                    "Ignoring get_pending error and waiting in for timeout to avoid executor repolling too soon"
2706                );
2707                timeout_fut.await;
2708                return;
2709            };
2710            if !execution_ids_versions.is_empty() {
2711                trace!("Not waiting, database already contains new pending executions");
2712                return;
2713            }
2714            tokio::select! { // future's liveness: Dropping the loser immediately.
2715                _ = receiver.recv() => {
2716                    trace!("Received a notification");
2717                }
2718                () = timeout_fut => {
2719                }
2720            }
2721        }.await;
2722        // Clean up ffqn_to_pending_subscription in any case
2723        {
2724            let mut ffqn_to_pending_subscription = self.0.pending_ffqn_subscribers.lock().unwrap();
2725            for ffqn in ffqns.as_ref() {
2726                match ffqn_to_pending_subscription.remove(ffqn) {
2727                    Some((_, tag)) if tag == unique_tag => {
2728                        // Cleanup OK.
2729                    }
2730                    Some(other) => {
2731                        // Reinsert foreign sender.
2732                        ffqn_to_pending_subscription.insert(ffqn.clone(), other);
2733                    }
2734                    None => {
2735                        // Value was replaced and cleaned up already.
2736                    }
2737                }
2738            }
2739        }
2740    }
2741}
2742
2743#[async_trait]
2744impl DbConnection for SqlitePool {
2745    #[instrument(level = Level::DEBUG, skip_all, fields(execution_id = %req.execution_id))]
2746    async fn create(&self, req: CreateRequest) -> Result<AppendResponse, DbErrorWrite> {
2747        debug!("create");
2748        trace!(?req, "create");
2749        let created_at = req.created_at;
2750        let (version, notifier) = self
2751            .transaction(move |tx| Self::create_inner(tx, req.clone()), "create")
2752            .await?;
2753        self.notify_all(vec![notifier], created_at);
2754        Ok(version)
2755    }
2756
2757    #[instrument(level = Level::DEBUG, skip(self, batch))]
2758    async fn append_batch(
2759        &self,
2760        current_time: DateTime<Utc>,
2761        batch: Vec<AppendRequest>,
2762        execution_id: ExecutionId,
2763        version: Version,
2764    ) -> Result<AppendBatchResponse, DbErrorWrite> {
2765        debug!("append_batch");
2766        trace!(?batch, "append_batch");
2767        assert!(!batch.is_empty(), "Empty batch request");
2768
2769        let (version, notifier) = self
2770            .transaction(
2771                move |tx| {
2772                    let mut version = version.clone();
2773                    let mut notifier = None;
2774                    for append_request in &batch {
2775                        let (v, n) =
2776                            Self::append(tx, &execution_id, append_request.clone(), version)?;
2777                        version = v;
2778                        notifier = Some(n);
2779                    }
2780                    Ok::<_, DbErrorWrite>((
2781                        version,
2782                        notifier.expect("checked that the batch is not empty"),
2783                    ))
2784                },
2785                "append_batch",
2786            )
2787            .await?;
2788
2789        self.notify_all(vec![notifier], current_time);
2790        Ok(version)
2791    }
2792
2793    #[instrument(level = Level::DEBUG, skip(self, batch, child_req))]
2794    async fn append_batch_create_new_execution(
2795        &self,
2796        current_time: DateTime<Utc>,
2797        batch: Vec<AppendRequest>,
2798        execution_id: ExecutionId,
2799        version: Version,
2800        child_req: Vec<CreateRequest>,
2801    ) -> Result<AppendBatchResponse, DbErrorWrite> {
2802        debug!("append_batch_create_new_execution");
2803        trace!(?batch, ?child_req, "append_batch_create_new_execution");
2804        assert!(!batch.is_empty(), "Empty batch request");
2805
2806        let (version, notifiers) = self
2807            .transaction(
2808                move |tx| {
2809                    let mut notifier = None;
2810                    let mut version = version.clone();
2811                    for append_request in &batch {
2812                        let (v, n) =
2813                            Self::append(tx, &execution_id, append_request.clone(), version)?;
2814                        version = v;
2815                        notifier = Some(n);
2816                    }
2817                    let mut notifiers = Vec::new();
2818                    notifiers.push(notifier.expect("checked that the batch is not empty"));
2819
2820                    for child_req in &child_req {
2821                        let (_, notifier) = Self::create_inner(tx, child_req.clone())?;
2822                        notifiers.push(notifier);
2823                    }
2824                    Ok::<_, DbErrorWrite>((version, notifiers))
2825                },
2826                "append_batch_create_new_execution_inner",
2827            )
2828            .await?;
2829        self.notify_all(notifiers, current_time);
2830        Ok(version)
2831    }
2832
2833    #[cfg(feature = "test")]
2834    #[instrument(level = Level::DEBUG, skip(self))]
2835    async fn get(
2836        &self,
2837        execution_id: &ExecutionId,
2838    ) -> Result<concepts::storage::ExecutionLog, DbErrorRead> {
2839        trace!("get");
2840        let execution_id = execution_id.clone();
2841        self.transaction(move |tx| Self::get(tx, &execution_id), "get")
2842            .await
2843    }
2844
2845    #[instrument(level = Level::DEBUG, skip(self))]
2846    async fn list_execution_events(
2847        &self,
2848        execution_id: &ExecutionId,
2849        since: &Version,
2850        max_length: VersionType,
2851        include_backtrace_id: bool,
2852    ) -> Result<Vec<ExecutionEvent>, DbErrorRead> {
2853        let execution_id = execution_id.clone();
2854        let since = since.0;
2855        self.transaction(
2856            move |tx| {
2857                Self::list_execution_events(
2858                    tx,
2859                    &execution_id,
2860                    since,
2861                    since + max_length,
2862                    include_backtrace_id,
2863                )
2864            },
2865            "get",
2866        )
2867        .await
2868    }
2869
2870    // Supports only one subscriber per execution id.
2871    // A new call will overwrite the old subscriber, the old one will end
2872    // with a timeout, which is fine.
2873    #[instrument(level = Level::DEBUG, skip(self, timeout_fut))]
2874    async fn subscribe_to_next_responses(
2875        &self,
2876        execution_id: &ExecutionId,
2877        start_idx: usize,
2878        timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
2879    ) -> Result<Vec<JoinSetResponseEventOuter>, DbErrorReadWithTimeout> {
2880        debug!("next_responses");
2881        let unique_tag: u64 = rand::random();
2882        let execution_id = execution_id.clone();
2883
2884        let cleanup = || {
2885            let mut guard = self.0.response_subscribers.lock().unwrap();
2886            match guard.remove(&execution_id) {
2887                Some((_, tag)) if tag == unique_tag => {} // Cleanup OK.
2888                Some(other) => {
2889                    // Reinsert foreign sender.
2890                    guard.insert(execution_id.clone(), other);
2891                }
2892                None => {} // Value was replaced and cleaned up already, or notification was sent.
2893            }
2894        };
2895
2896        let response_subscribers = self.0.response_subscribers.clone();
2897        let resp_or_receiver = {
2898            let execution_id = execution_id.clone();
2899            self.transaction(
2900                move |tx| {
2901                    let responses = Self::get_responses_with_offset(tx, &execution_id, start_idx)?;
2902                    if responses.is_empty() {
2903                        // cannot race as we have the transaction write lock
2904                        let (sender, receiver) = oneshot::channel();
2905                        response_subscribers
2906                            .lock()
2907                            .unwrap()
2908                            .insert(execution_id.clone(), (sender, unique_tag));
2909                        Ok::<_, DbErrorReadWithTimeout>(itertools::Either::Right(receiver))
2910                    } else {
2911                        Ok(itertools::Either::Left(responses))
2912                    }
2913                },
2914                "subscribe_to_next_responses",
2915            )
2916            .await
2917        }
2918        .inspect_err(|_| {
2919            cleanup();
2920        })?;
2921        match resp_or_receiver {
2922            itertools::Either::Left(resp) => Ok(resp), // no need for cleanup
2923            itertools::Either::Right(receiver) => {
2924                let res = async move {
2925                    tokio::select! {
2926                        resp = receiver => {
2927                            let resp = resp.map_err(|_| DbErrorGeneric::Close)?;
2928                            Ok(vec![resp])
2929                        }
2930                        () = timeout_fut => Err(DbErrorReadWithTimeout::Timeout),
2931                    }
2932                }
2933                .await;
2934                cleanup();
2935                res
2936            }
2937        }
2938    }
2939
2940    // Supports multiple subscribers.
2941    async fn wait_for_finished_result(
2942        &self,
2943        execution_id: &ExecutionId,
2944        timeout_fut: Option<Pin<Box<dyn Future<Output = ()> + Send>>>,
2945    ) -> Result<SupportedFunctionReturnValue, DbErrorReadWithTimeout> {
2946        let unique_tag: u64 = rand::random();
2947        let execution_id = execution_id.clone();
2948        let execution_finished_subscription = self.0.execution_finished_subscribers.clone();
2949
2950        let cleanup = || {
2951            let mut guard = self.0.execution_finished_subscribers.lock().unwrap();
2952            if let Some(subscribers) = guard.get_mut(&execution_id) {
2953                subscribers.remove(&unique_tag);
2954            }
2955        };
2956
2957        let resp_or_receiver = {
2958            let execution_id = execution_id.clone();
2959            self.transaction(move |tx| {
2960                let pending_state =
2961                    Self::get_combined_state(tx, &execution_id)?.pending_state;
2962                if let PendingState::Finished { finished } = pending_state {
2963                    let event =
2964                        Self::get_execution_event(tx, &execution_id, finished.version)?;
2965                    if let ExecutionEventInner::Finished { result, ..} = event.event {
2966                        Ok(itertools::Either::Left(result))
2967                    } else {
2968                        error!("Mismatch, expected Finished row: {event:?} based on t_state {finished}");
2969                        Err(DbErrorReadWithTimeout::from(consistency_db_err(
2970                            "cannot get finished event based on t_state version"
2971                        )))
2972                    }
2973                } else {
2974                    // Cannot race with the notifier as we have the transaction write lock:
2975                    // Either the finished event was appended previously, thus `itertools::Either::Left` was selected,
2976                    // or we end up here. If this tx fails, the cleanup will remove this entry.
2977                    let (sender, receiver) = oneshot::channel();
2978                    let mut guard = execution_finished_subscription.lock().unwrap();
2979                    guard.entry(execution_id.clone()).or_default().insert(unique_tag, sender);
2980                    Ok(itertools::Either::Right(receiver))
2981                }
2982            }, "wait_for_finished_result")
2983            .await
2984        }
2985        .inspect_err(|_| {
2986            // This cleanup can race with the notification sender, since both are running after a transaction was finished.
2987            // If the notification sender wins, it removes our oneshot sender and puts a value in it, cleanup will not find the unique tag.
2988            // If cleanup wins, it simply removes the oneshot sender.
2989            cleanup();
2990        })?;
2991
2992        let timeout_fut = timeout_fut.unwrap_or_else(|| Box::pin(std::future::pending()));
2993        match resp_or_receiver {
2994            itertools::Either::Left(resp) => Ok(resp), // no need for cleanup
2995            itertools::Either::Right(receiver) => {
2996                let res = async move {
2997                    tokio::select! {
2998                        resp = receiver => {
2999                            Ok(resp.expect("the notifier sends to all listeners, cannot race with cleanup"))
3000                        }
3001                        () = timeout_fut => Err(DbErrorReadWithTimeout::Timeout),
3002                    }
3003                }
3004                .await;
3005                cleanup();
3006                res
3007            }
3008        }
3009    }
3010
3011    #[instrument(level = Level::DEBUG, skip(self, response_event), fields(join_set_id = %response_event.join_set_id))]
3012    async fn append_response(
3013        &self,
3014        created_at: DateTime<Utc>,
3015        execution_id: ExecutionId,
3016        response_event: JoinSetResponseEvent,
3017    ) -> Result<(), DbErrorWrite> {
3018        debug!("append_response");
3019        let event = JoinSetResponseEventOuter {
3020            created_at,
3021            event: response_event,
3022        };
3023        let notifier = self
3024            .transaction(
3025                move |tx| Self::append_response(tx, &execution_id, event.clone()),
3026                "append_response",
3027            )
3028            .await?;
3029        self.notify_all(vec![notifier], created_at);
3030        Ok(())
3031    }
3032
3033    #[instrument(level = Level::DEBUG, skip_all)]
3034    async fn append_backtrace(&self, append: BacktraceInfo) -> Result<(), DbErrorWrite> {
3035        debug!("append_backtrace");
3036        self.transaction(
3037            move |tx| Self::append_backtrace(tx, &append),
3038            "append_backtrace",
3039        )
3040        .await
3041    }
3042
3043    #[instrument(level = Level::DEBUG, skip_all)]
3044    async fn append_backtrace_batch(&self, batch: Vec<BacktraceInfo>) -> Result<(), DbErrorWrite> {
3045        debug!("append_backtrace_batch");
3046        self.transaction(
3047            move |tx| {
3048                for append in &batch {
3049                    Self::append_backtrace(tx, append)?;
3050                }
3051                Ok(())
3052            },
3053            "append_backtrace",
3054        )
3055        .await
3056    }
3057
3058    #[instrument(level = Level::DEBUG, skip_all)]
3059    async fn get_backtrace(
3060        &self,
3061        execution_id: &ExecutionId,
3062        filter: BacktraceFilter,
3063    ) -> Result<BacktraceInfo, DbErrorRead> {
3064        debug!("get_last_backtrace");
3065        let execution_id = execution_id.clone();
3066
3067        self.transaction(
3068            move |tx| {
3069                let select = "SELECT component_id, version_min_including, version_max_excluding, wasm_backtrace FROM t_backtrace \
3070                                WHERE execution_id = :execution_id";
3071                let mut params: Vec<(&'static str, Box<dyn rusqlite::ToSql>)> = vec![(":execution_id", Box::new(execution_id.to_string()))];
3072                let select = match &filter {
3073                    BacktraceFilter::Specific(version) =>{
3074                        params.push((":version", Box::new(version.0)));
3075                        format!("{select} AND version_min_including <= :version AND version_max_excluding > :version")
3076                    },
3077                    BacktraceFilter::First => format!("{select} ORDER BY version_min_including LIMIT 1"),
3078                    BacktraceFilter::Last => format!("{select} ORDER BY version_min_including DESC LIMIT 1")
3079                };
3080                tx
3081                    .prepare(&select)
3082                    ?
3083                    .query_row::<_, &[(&'static str, &dyn ToSql)], _>(
3084                        params
3085                            .iter()
3086                            .map(|(key, value)| (*key, value.as_ref()))
3087                            .collect::<Vec<_>>()
3088                            .as_ref(),
3089                    |row| {
3090                        Ok(BacktraceInfo {
3091                            execution_id: execution_id.clone(),
3092                            component_id: row.get::<_, FromStrWrapper<_> >("component_id")?.0,
3093                            version_min_including: Version::new(row.get::<_, VersionType>("version_min_including")?),
3094                            version_max_excluding: Version::new(row.get::<_, VersionType>("version_max_excluding")?),
3095                            wasm_backtrace: row.get::<_, JsonWrapper<_>>("wasm_backtrace")?.0,
3096                        })
3097                    },
3098                ).map_err(DbErrorRead::from)
3099            },
3100            "get_last_backtrace",
3101        ).await
3102    }
3103
3104    /// Get currently expired delays and locks.
3105    #[instrument(level = Level::TRACE, skip(self))]
3106    async fn get_expired_timers(
3107        &self,
3108        at: DateTime<Utc>,
3109    ) -> Result<Vec<ExpiredTimer>, DbErrorGeneric> {
3110        self.transaction(
3111            move |conn| {
3112                let mut expired_timers = conn.prepare(
3113                    "SELECT execution_id, join_set_id, delay_id FROM t_delay WHERE expires_at <= :at",
3114                )?
3115                .query_map(
3116                        named_params! {
3117                            ":at": at,
3118                        },
3119                        |row| {
3120                            let execution_id = row.get("execution_id")?;
3121                            let join_set_id = row.get::<_, JoinSetId>("join_set_id")?;
3122                            let delay_id = row.get::<_, DelayId>("delay_id")?;
3123                            let delay = ExpiredDelay { execution_id, join_set_id, delay_id };
3124                            Ok(ExpiredTimer::Delay(delay))
3125                        },
3126                    )?
3127                    .collect::<Result<Vec<_>, _>>()?;
3128                // Extend with expired locks
3129                let expired = conn.prepare(&format!(r#"
3130                    SELECT execution_id, last_lock_version, corresponding_version, intermittent_event_count, max_retries, retry_exp_backoff_millis
3131                    FROM t_state
3132                    WHERE pending_expires_finished <= :at AND state = "{STATE_LOCKED}"
3133                    "#
3134                )
3135                )?
3136                .query_map(
3137                        named_params! {
3138                            ":at": at,
3139                        },
3140                        |row| {
3141                            let execution_id = row.get("execution_id")?;
3142                            let locked_at_version = Version::new(row.get("last_lock_version")?);
3143                            let next_version = Version::new(row.get("corresponding_version")?).increment();
3144                            let intermittent_event_count = row.get("intermittent_event_count")?;
3145                            let max_retries = row.get("max_retries")?;
3146                            let retry_exp_backoff_millis = row.get("retry_exp_backoff_millis")?;
3147                            let parent = if let ExecutionId::Derived(derived) = &execution_id {
3148                                derived.split_to_parts().inspect_err(|err| error!("cannot split execution {execution_id} to parts: {err:?}")).ok()
3149                            } else {
3150                                None
3151                            };
3152                            let lock = ExpiredLock {
3153                                execution_id,
3154                                locked_at_version,
3155                                next_version,
3156                                intermittent_event_count,
3157                                max_retries,
3158                                retry_exp_backoff: Duration::from_millis(retry_exp_backoff_millis),
3159                                parent
3160                            };
3161                            Ok(ExpiredTimer::Lock(lock))
3162                        }
3163                    )?
3164                    .collect::<Result<Vec<_>, _>>()?;
3165                expired_timers.extend(expired);
3166                if !expired_timers.is_empty() {
3167                    debug!("get_expired_timers found {expired_timers:?}");
3168                }
3169                Ok(expired_timers)
3170            }, "get_expired_timers"
3171        )
3172        .await
3173    }
3174
3175    async fn get_execution_event(
3176        &self,
3177        execution_id: &ExecutionId,
3178        version: &Version,
3179    ) -> Result<ExecutionEvent, DbErrorRead> {
3180        let version = version.0;
3181        let execution_id = execution_id.clone();
3182        self.transaction(
3183            move |tx| Self::get_execution_event(tx, &execution_id, version),
3184            "get_execution_event",
3185        )
3186        .await
3187    }
3188
3189    async fn get_pending_state(
3190        &self,
3191        execution_id: &ExecutionId,
3192    ) -> Result<PendingState, DbErrorRead> {
3193        let execution_id = execution_id.clone();
3194        Ok(self
3195            .transaction(
3196                move |tx| Self::get_combined_state(tx, &execution_id),
3197                "get_pending_state",
3198            )
3199            .await?
3200            .pending_state)
3201    }
3202
3203    async fn list_executions(
3204        &self,
3205        ffqn: Option<FunctionFqn>,
3206        top_level_only: bool,
3207        pagination: ExecutionListPagination,
3208    ) -> Result<Vec<ExecutionWithState>, DbErrorGeneric> {
3209        self.transaction(
3210            move |tx| Self::list_executions(tx, ffqn.as_ref(), top_level_only, &pagination),
3211            "list_executions",
3212        )
3213        .await
3214    }
3215
3216    async fn list_responses(
3217        &self,
3218        execution_id: &ExecutionId,
3219        pagination: Pagination<u32>,
3220    ) -> Result<Vec<ResponseWithCursor>, DbErrorRead> {
3221        let execution_id = execution_id.clone();
3222        self.transaction(
3223            move |tx| Self::list_responses(tx, &execution_id, Some(pagination)),
3224            "list_executions",
3225        )
3226        .await
3227    }
3228}
3229
3230#[cfg(any(test, feature = "tempfile"))]
3231pub mod tempfile {
3232    use super::{SqliteConfig, SqlitePool};
3233    use tempfile::NamedTempFile;
3234
3235    pub async fn sqlite_pool() -> (SqlitePool, Option<NamedTempFile>) {
3236        if let Ok(path) = std::env::var("SQLITE_FILE") {
3237            (
3238                SqlitePool::new(path, SqliteConfig::default())
3239                    .await
3240                    .unwrap(),
3241                None,
3242            )
3243        } else {
3244            let file = NamedTempFile::new().unwrap();
3245            let path = file.path();
3246            (
3247                SqlitePool::new(path, SqliteConfig::default())
3248                    .await
3249                    .unwrap(),
3250                Some(file),
3251            )
3252        }
3253    }
3254}