obeli_sk_db_sqlite/
sqlite_dao.rs

1use crate::histograms::Histograms;
2use assert_matches::assert_matches;
3use async_trait::async_trait;
4use chrono::{DateTime, Utc};
5use concepts::{
6    ComponentId, ComponentRetryConfig, ExecutionId, FunctionFqn, JoinSetId, StrVariant,
7    SupportedFunctionReturnValue,
8    prefixed_ulid::{DelayId, ExecutionIdDerived, ExecutorId, RunId},
9    storage::{
10        AppendBatchResponse, AppendEventsToExecution, AppendRequest, AppendResponse,
11        AppendResponseToExecution, BacktraceFilter, BacktraceInfo, CreateRequest, DUMMY_CREATED,
12        DUMMY_HISTORY_EVENT, DbConnection, DbErrorGeneric, DbErrorRead, DbErrorReadWithTimeout,
13        DbErrorWrite, DbErrorWritePermanent, DbExecutor, DbPool, DbPoolCloseable, ExecutionEvent,
14        ExecutionEventInner, ExecutionListPagination, ExecutionWithState, ExpiredDelay,
15        ExpiredLock, ExpiredTimer, HistoryEvent, JoinSetRequest, JoinSetResponse,
16        JoinSetResponseEvent, JoinSetResponseEventOuter, LockPendingResponse, LockedExecution,
17        Pagination, PendingState, PendingStateFinished, PendingStateFinishedResultKind,
18        ResponseWithCursor, Version, VersionType,
19    },
20};
21use conversions::{FromStrWrapper, JsonWrapper, consistency_db_err, consistency_rusqlite};
22use hashbrown::HashMap;
23use rusqlite::{
24    CachedStatement, Connection, OpenFlags, OptionalExtension, Params, ToSql, Transaction,
25    TransactionBehavior, named_params, types::ToSqlOutput,
26};
27use std::{
28    cmp::max,
29    collections::VecDeque,
30    fmt::Debug,
31    ops::DerefMut,
32    path::Path,
33    str::FromStr,
34    sync::{
35        Arc, Mutex,
36        atomic::{AtomicBool, Ordering},
37    },
38    time::Duration,
39};
40use std::{fmt::Write as _, pin::Pin};
41use tokio::{
42    sync::{mpsc, oneshot},
43    time::Instant,
44};
45use tracing::{Level, debug, error, info, instrument, trace, warn};
46
47#[derive(Debug, thiserror::Error)]
48#[error("initialization error")]
49pub struct InitializationError;
50
51#[derive(Debug, Clone)]
52struct DelayReq {
53    join_set_id: JoinSetId,
54    delay_id: DelayId,
55    expires_at: DateTime<Utc>,
56}
57/*
58mmap_size = 128MB - Set the global memory map so all processes can share some data
59https://www.sqlite.org/pragma.html#pragma_mmap_size
60https://www.sqlite.org/mmap.html
61
62journal_size_limit = 64 MB - limit on the WAL file to prevent unlimited growth
63https://www.sqlite.org/pragma.html#pragma_journal_size_limit
64
65Inspired by https://github.com/rails/rails/pull/49349
66*/
67
68const PRAGMA: [[&str; 2]; 10] = [
69    ["journal_mode", "wal"],
70    ["synchronous", "FULL"],
71    ["foreign_keys", "true"],
72    ["busy_timeout", "1000"],
73    ["cache_size", "10000"], // number of pages
74    ["temp_store", "MEMORY"],
75    ["page_size", "8192"], // 8 KB
76    ["mmap_size", "134217728"],
77    ["journal_size_limit", "67108864"],
78    ["integrity_check", ""],
79];
80
81// Append only
82const CREATE_TABLE_T_METADATA: &str = r"
83CREATE TABLE IF NOT EXISTS t_metadata (
84    id INTEGER PRIMARY KEY AUTOINCREMENT,
85    schema_version INTEGER NOT NULL,
86    created_at TEXT NOT NULL
87) STRICT
88";
89const T_METADATA_EXPECTED_SCHEMA_VERSION: u32 = 2;
90
91/// Stores execution history. Append only.
92const CREATE_TABLE_T_EXECUTION_LOG: &str = r"
93CREATE TABLE IF NOT EXISTS t_execution_log (
94    execution_id TEXT NOT NULL,
95    created_at TEXT NOT NULL,
96    json_value TEXT NOT NULL,
97    version INTEGER NOT NULL,
98    variant TEXT NOT NULL,
99    join_set_id TEXT,
100    history_event_type TEXT GENERATED ALWAYS AS (json_value->>'$.HistoryEvent.event.type') STORED,
101    PRIMARY KEY (execution_id, version)
102) STRICT
103";
104// Used in `fetch_created` and `get_execution_event`
105const CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_VERSION: &str = r"
106CREATE INDEX IF NOT EXISTS idx_t_execution_log_execution_id_version  ON t_execution_log (execution_id, version);
107";
108// Used in `lock_inner` to filter by execution ID and variant (created or event history)
109const CREATE_INDEX_IDX_T_EXECUTION_ID_EXECUTION_ID_VARIANT: &str = r"
110CREATE INDEX IF NOT EXISTS idx_t_execution_log_execution_id_variant  ON t_execution_log (execution_id, variant);
111";
112
113/// Stores child execution return values for the parent (`execution_id`). Append only.
114/// For `JoinSetResponse::DelayFinished`, column `delay_id` must not be null.
115/// For `JoinSetResponse::ChildExecutionFinished`, column `child_execution_id`,`finished_version`
116/// must not be null.
117const CREATE_TABLE_T_JOIN_SET_RESPONSE: &str = r"
118CREATE TABLE IF NOT EXISTS t_join_set_response (
119    id INTEGER PRIMARY KEY AUTOINCREMENT,
120    created_at TEXT NOT NULL,
121    execution_id TEXT NOT NULL,
122    join_set_id TEXT NOT NULL,
123
124    delay_id TEXT,
125
126    child_execution_id TEXT,
127    finished_version INTEGER,
128
129    UNIQUE (execution_id, join_set_id, delay_id, child_execution_id)
130) STRICT
131";
132// Used when querying for the next response
133const CREATE_INDEX_IDX_T_JOIN_SET_RESPONSE_EXECUTION_ID_ID: &str = r"
134CREATE INDEX IF NOT EXISTS idx_t_join_set_response_execution_id_id ON t_join_set_response (execution_id, id);
135";
136
137/// Stores executions in `PendingState`
138/// `state` to column mapping:
139/// `PendingAt`:            (nothing but required columns)
140/// `Locked`:               `max_retries`, `retry_exp_backoff_millis`, `last_lock_version`, `executor_id`, `run_id`
141/// `BlockedByJoinSet`:     `join_set_id`, `join_set_closing`
142/// `Finished` :            `result_kind`.
143const CREATE_TABLE_T_STATE: &str = r"
144CREATE TABLE IF NOT EXISTS t_state (
145    execution_id TEXT NOT NULL,
146    is_top_level INTEGER NOT NULL,
147    corresponding_version INTEGER NOT NULL,
148    pending_expires_finished TEXT NOT NULL,
149    ffqn TEXT NOT NULL,
150    state TEXT NOT NULL,
151    created_at TEXT NOT NULL,
152    updated_at TEXT NOT NULL,
153    scheduled_at TEXT NOT NULL,
154    intermittent_event_count INTEGER NOT NULL,
155
156    max_retries INTEGER,
157    retry_exp_backoff_millis INTEGER,
158    last_lock_version INTEGER,
159    executor_id TEXT,
160    run_id TEXT,
161
162    join_set_id TEXT,
163    join_set_closing INTEGER,
164
165    result_kind TEXT,
166
167    PRIMARY KEY (execution_id)
168) STRICT
169";
170const STATE_PENDING_AT: &str = "PendingAt";
171const STATE_BLOCKED_BY_JOIN_SET: &str = "BlockedByJoinSet";
172const STATE_LOCKED: &str = "Locked";
173const STATE_FINISHED: &str = "Finished";
174const HISTORY_EVENT_TYPE_JOIN_NEXT: &str = "JoinNext";
175
176// TODO: partial indexes
177const IDX_T_STATE_LOCK_PENDING: &str = r"
178CREATE INDEX IF NOT EXISTS idx_t_state_lock_pending ON t_state (state, pending_expires_finished, ffqn);
179";
180const IDX_T_STATE_EXPIRED_TIMERS: &str = r"
181CREATE INDEX IF NOT EXISTS idx_t_state_expired_timers ON t_state (pending_expires_finished) WHERE executor_id IS NOT NULL;
182";
183const IDX_T_STATE_EXECUTION_ID_IS_TOP_LEVEL: &str = r"
184CREATE INDEX IF NOT EXISTS idx_t_state_execution_id_is_root ON t_state (execution_id, is_top_level);
185";
186// For `list_executions` by ffqn
187const IDX_T_STATE_FFQN: &str = r"
188CREATE INDEX IF NOT EXISTS idx_t_state_ffqn ON t_state (ffqn);
189";
190// For `list_executions` by creation date
191const IDX_T_STATE_CREATED_AT: &str = r"
192CREATE INDEX IF NOT EXISTS idx_t_state_created_at ON t_state (created_at);
193";
194
195/// Represents [`ExpiredTimer::AsyncDelay`] . Rows are deleted when the delay is processed.
196const CREATE_TABLE_T_DELAY: &str = r"
197CREATE TABLE IF NOT EXISTS t_delay (
198    execution_id TEXT NOT NULL,
199    join_set_id TEXT NOT NULL,
200    delay_id TEXT NOT NULL,
201    expires_at TEXT NOT NULL,
202    PRIMARY KEY (execution_id, join_set_id, delay_id)
203) STRICT
204";
205
206// Append only.
207const CREATE_TABLE_T_BACKTRACE: &str = r"
208CREATE TABLE IF NOT EXISTS t_backtrace (
209    execution_id TEXT NOT NULL,
210    component_id TEXT NOT NULL,
211    version_min_including INTEGER NOT NULL,
212    version_max_excluding INTEGER NOT NULL,
213    wasm_backtrace TEXT NOT NULL,
214    PRIMARY KEY (execution_id, version_min_including, version_max_excluding)
215) STRICT
216";
217// Index for searching backtraces by execution_id and version
218const IDX_T_BACKTRACE_EXECUTION_ID_VERSION: &str = r"
219CREATE INDEX IF NOT EXISTS idx_t_backtrace_execution_id_version ON t_backtrace (execution_id, version_min_including, version_max_excluding);
220";
221
222#[derive(Debug, thiserror::Error, Clone)]
223enum RusqliteError {
224    #[error("not found")]
225    NotFound,
226    #[error("generic: {0}")]
227    Generic(StrVariant),
228}
229
230mod conversions {
231
232    use super::RusqliteError;
233    use concepts::storage::{DbErrorGeneric, DbErrorRead, DbErrorReadWithTimeout, DbErrorWrite};
234    use rusqlite::types::{FromSql, FromSqlError};
235    use std::{fmt::Debug, str::FromStr};
236    use tracing::error;
237
238    impl From<rusqlite::Error> for RusqliteError {
239        fn from(err: rusqlite::Error) -> Self {
240            if matches!(err, rusqlite::Error::QueryReturnedNoRows) {
241                RusqliteError::NotFound
242            } else {
243                error!(backtrace = %std::backtrace::Backtrace::capture(), "Sqlite error {err:?}");
244                RusqliteError::Generic(err.to_string().into())
245            }
246        }
247    }
248
249    impl From<RusqliteError> for DbErrorGeneric {
250        fn from(err: RusqliteError) -> DbErrorGeneric {
251            match err {
252                RusqliteError::NotFound => DbErrorGeneric::Uncategorized("not found".into()),
253                RusqliteError::Generic(str) => DbErrorGeneric::Uncategorized(str),
254            }
255        }
256    }
257    impl From<RusqliteError> for DbErrorRead {
258        fn from(err: RusqliteError) -> Self {
259            if matches!(err, RusqliteError::NotFound) {
260                Self::NotFound
261            } else {
262                Self::from(DbErrorGeneric::from(err))
263            }
264        }
265    }
266    impl From<RusqliteError> for DbErrorReadWithTimeout {
267        fn from(err: RusqliteError) -> Self {
268            Self::from(DbErrorRead::from(err))
269        }
270    }
271    impl From<RusqliteError> for DbErrorWrite {
272        fn from(err: RusqliteError) -> Self {
273            if matches!(err, RusqliteError::NotFound) {
274                Self::NotFound
275            } else {
276                Self::from(DbErrorGeneric::from(err))
277            }
278        }
279    }
280
281    pub(crate) struct JsonWrapper<T>(pub(crate) T);
282    impl<T: serde::de::DeserializeOwned + 'static + Debug> FromSql for JsonWrapper<T> {
283        fn column_result(
284            value: rusqlite::types::ValueRef<'_>,
285        ) -> rusqlite::types::FromSqlResult<Self> {
286            let value = match value {
287                rusqlite::types::ValueRef::Text(value) | rusqlite::types::ValueRef::Blob(value) => {
288                    Ok(value)
289                }
290                other => {
291                    error!(
292                        backtrace = %std::backtrace::Backtrace::capture(),
293                        "Unexpected type when conveting to JSON - expected Text or Blob, got type `{other:?}`",
294                    );
295                    Err(FromSqlError::InvalidType)
296                }
297            }?;
298            let value = serde_json::from_slice::<T>(value).map_err(|err| {
299                error!(
300                    backtrace = %std::backtrace::Backtrace::capture(),
301                    "Cannot convert JSON value `{value:?}` to type:`{type}` - {err:?}",
302                    r#type = std::any::type_name::<T>()
303                );
304                FromSqlError::InvalidType
305            })?;
306            Ok(Self(value))
307        }
308    }
309
310    pub(crate) struct FromStrWrapper<T: FromStr>(pub(crate) T);
311    impl<T: FromStr<Err = D>, D: Debug> FromSql for FromStrWrapper<T> {
312        fn column_result(
313            value: rusqlite::types::ValueRef<'_>,
314        ) -> rusqlite::types::FromSqlResult<Self> {
315            let value = String::column_result(value)?;
316            let value = T::from_str(&value).map_err(|err| {
317                error!(
318                    backtrace = %std::backtrace::Backtrace::capture(),
319                    "Cannot convert string `{value}` to type:`{type}` - {err:?}",
320                    r#type = std::any::type_name::<T>()
321                );
322                FromSqlError::InvalidType
323            })?;
324            Ok(Self(value))
325        }
326    }
327
328    // Used as a wrapper for `FromSqlError::Other`
329    #[derive(Debug, thiserror::Error)]
330    #[error("{0}")]
331    pub(crate) struct OtherError(&'static str);
332    pub(crate) fn consistency_rusqlite(input: &'static str) -> rusqlite::Error {
333        FromSqlError::other(OtherError(input)).into()
334    }
335
336    pub(crate) fn consistency_db_err(input: &'static str) -> DbErrorGeneric {
337        DbErrorGeneric::Uncategorized(input.into())
338    }
339}
340
341#[derive(Debug)]
342struct CombinedStateDTO {
343    state: String,
344    ffqn: String,
345    pending_expires_finished: DateTime<Utc>,
346    // Locked:
347    last_lock_version: Option<Version>,
348    executor_id: Option<ExecutorId>,
349    run_id: Option<RunId>,
350    // Blocked by join set:
351    join_set_id: Option<JoinSetId>,
352    join_set_closing: Option<bool>,
353    // Finished:
354    result_kind: Option<PendingStateFinishedResultKind>,
355}
356#[derive(Debug)]
357struct CombinedState {
358    ffqn: FunctionFqn,
359    pending_state: PendingState,
360    corresponding_version: Version,
361}
362impl CombinedState {
363    fn get_next_version_assert_not_finished(&self) -> Version {
364        assert!(!self.pending_state.is_finished());
365        self.corresponding_version.increment()
366    }
367
368    #[cfg(feature = "test")]
369    fn get_next_version_or_finished(&self) -> Version {
370        if self.pending_state.is_finished() {
371            self.corresponding_version.clone()
372        } else {
373            self.corresponding_version.increment()
374        }
375    }
376}
377
378#[derive(Debug)]
379struct NotifierPendingAt {
380    scheduled_at: DateTime<Utc>,
381    ffqn: FunctionFqn,
382}
383
384#[derive(Debug)]
385struct NotifierExecutionFinished {
386    execution_id: ExecutionId,
387    retval: SupportedFunctionReturnValue,
388}
389
390#[derive(Debug, Default)]
391struct AppendNotifier {
392    pending_at: Option<NotifierPendingAt>,
393    execution_finished: Option<NotifierExecutionFinished>,
394    response: Option<(ExecutionId, JoinSetResponseEventOuter)>,
395}
396
397impl CombinedState {
398    fn new(
399        dto: &CombinedStateDTO,
400        corresponding_version: Version,
401    ) -> Result<Self, rusqlite::Error> {
402        let pending_state = match dto {
403            CombinedStateDTO {
404                state,
405                ffqn: _,
406                pending_expires_finished: scheduled_at,
407                last_lock_version: None,
408                executor_id: None,
409                run_id: None,
410                join_set_id: None,
411                join_set_closing: None,
412                result_kind: None,
413            } if state == STATE_PENDING_AT => Ok(PendingState::PendingAt {
414                scheduled_at: *scheduled_at,
415            }),
416            CombinedStateDTO {
417                state,
418                ffqn: _,
419                pending_expires_finished: lock_expires_at,
420                last_lock_version: Some(_),
421                executor_id: Some(executor_id),
422                run_id: Some(run_id),
423                join_set_id: None,
424                join_set_closing: None,
425                result_kind: None,
426            } if state == STATE_LOCKED => Ok(PendingState::Locked {
427                executor_id: *executor_id,
428                run_id: *run_id,
429                lock_expires_at: *lock_expires_at,
430            }),
431            CombinedStateDTO {
432                state,
433                ffqn: _,
434                pending_expires_finished: lock_expires_at,
435                last_lock_version: None,
436                executor_id: None,
437                run_id: None,
438                join_set_id: Some(join_set_id),
439                join_set_closing: Some(join_set_closing),
440                result_kind: None,
441            } if state == STATE_BLOCKED_BY_JOIN_SET => Ok(PendingState::BlockedByJoinSet {
442                join_set_id: join_set_id.clone(),
443                closing: *join_set_closing,
444                lock_expires_at: *lock_expires_at,
445            }),
446            CombinedStateDTO {
447                state,
448                ffqn: _,
449                pending_expires_finished: finished_at,
450                last_lock_version: None,
451                executor_id: None,
452                run_id: None,
453                join_set_id: None,
454                join_set_closing: None,
455                result_kind: Some(result_kind),
456            } if state == STATE_FINISHED => Ok(PendingState::Finished {
457                finished: PendingStateFinished {
458                    finished_at: *finished_at,
459                    version: corresponding_version.0,
460                    result_kind: *result_kind,
461                },
462            }),
463            _ => {
464                error!("Cannot deserialize pending state from  {dto:?}");
465                Err(consistency_rusqlite("invalid `t_state`"))
466            }
467        }?;
468        Ok(Self {
469            ffqn: FunctionFqn::from_str(&dto.ffqn).map_err(|parse_err| {
470                error!("Error parsing ffqn of {dto:?} - {parse_err:?}");
471                consistency_rusqlite("invalid ffqn value in `t_state`")
472            })?,
473            pending_state,
474            corresponding_version,
475        })
476    }
477}
478
479#[derive(derive_more::Debug)]
480struct LogicalTx {
481    #[debug(skip)]
482    func: Box<dyn FnMut(&mut Transaction) + Send>,
483    sent_at: Instant,
484    func_name: &'static str,
485    #[debug(skip)]
486    commit_ack_sender: oneshot::Sender<Result<(), RusqliteError>>,
487}
488
489#[derive(derive_more::Debug)]
490enum ThreadCommand {
491    LogicalTx(LogicalTx),
492    Shutdown,
493}
494
495#[derive(Clone)]
496pub struct SqlitePool(SqlitePoolInner);
497
498type ResponseSubscribers =
499    Arc<Mutex<HashMap<ExecutionId, (oneshot::Sender<JoinSetResponseEventOuter>, u64)>>>;
500type PendingFfqnSubscribers = Arc<Mutex<HashMap<FunctionFqn, (mpsc::Sender<()>, u64)>>>;
501type ExecutionFinishedSubscribers =
502    Mutex<HashMap<ExecutionId, HashMap<u64, oneshot::Sender<SupportedFunctionReturnValue>>>>;
503#[derive(Clone)]
504struct SqlitePoolInner {
505    shutdown_requested: Arc<AtomicBool>,
506    shutdown_finished: Arc<AtomicBool>,
507    command_tx: tokio::sync::mpsc::Sender<ThreadCommand>,
508    response_subscribers: ResponseSubscribers,
509    pending_ffqn_subscribers: PendingFfqnSubscribers,
510    execution_finished_subscribers: Arc<ExecutionFinishedSubscribers>,
511    join_handle: Option<Arc<std::thread::JoinHandle<()>>>, // always Some, Optional for swapping in drop.
512}
513
514#[async_trait]
515impl DbPoolCloseable for SqlitePool {
516    async fn close(self) {
517        debug!("Sqlite is closing");
518        self.0.shutdown_requested.store(true, Ordering::Release);
519        // Unblock the thread's blocking_recv. If the capacity is reached, the next processed message will trigger shutdown.
520        let _ = self.0.command_tx.try_send(ThreadCommand::Shutdown);
521        while !self.0.shutdown_finished.load(Ordering::Acquire) {
522            tokio::time::sleep(Duration::from_millis(1)).await;
523        }
524        debug!("Sqlite was closed");
525    }
526}
527
528#[async_trait]
529impl DbPool for SqlitePool {
530    fn connection(&self) -> Box<dyn DbConnection> {
531        Box::new(self.clone())
532    }
533}
534impl Drop for SqlitePool {
535    fn drop(&mut self) {
536        let arc = self.0.join_handle.take().expect("join_handle was set");
537        if let Ok(join_handle) = Arc::try_unwrap(arc) {
538            // Last holder
539            if !join_handle.is_finished() {
540                if !self.0.shutdown_finished.load(Ordering::Acquire) {
541                    // Best effort to shut down the sqlite thread.
542                    let backtrace = std::backtrace::Backtrace::capture();
543                    warn!("SqlitePool was not closed properly - {backtrace}");
544                    self.0.shutdown_requested.store(true, Ordering::Release);
545                    // Unblock the thread's blocking_recv. If the capacity is reached, the next processed message will trigger shutdown.
546                    let _ = self.0.command_tx.try_send(ThreadCommand::Shutdown);
547                    // Not joining the thread, drop might be called from async context.
548                    // We are shutting down the server anyway.
549                } else {
550                    // The thread set `shutdown_finished` as its last operation.
551                }
552            }
553        }
554    }
555}
556
557#[derive(Debug, Clone)]
558pub struct SqliteConfig {
559    pub queue_capacity: usize,
560    pub pragma_override: Option<HashMap<String, String>>,
561    pub metrics_threshold: Option<Duration>,
562}
563impl Default for SqliteConfig {
564    fn default() -> Self {
565        Self {
566            queue_capacity: 100,
567            pragma_override: None,
568            metrics_threshold: None,
569        }
570    }
571}
572
573impl SqlitePool {
574    fn init_thread(
575        path: &Path,
576        mut pragma_override: HashMap<String, String>,
577    ) -> Result<Connection, InitializationError> {
578        fn conn_execute<P: Params>(
579            conn: &Connection,
580            sql: &str,
581            params: P,
582        ) -> Result<(), InitializationError> {
583            conn.execute(sql, params).map(|_| ()).map_err(|err| {
584                error!("Cannot run `{sql}` - {err:?}");
585                InitializationError
586            })
587        }
588        fn pragma_update(
589            conn: &Connection,
590            name: &str,
591            value: &str,
592        ) -> Result<(), InitializationError> {
593            if value.is_empty() {
594                debug!("Querying PRAGMA {name}");
595                conn.pragma_query(None, name, |row| {
596                    debug!("{row:?}");
597                    Ok(())
598                })
599                .map_err(|err| {
600                    error!("cannot update pragma `{name}`=`{value}` - {err:?}");
601                    InitializationError
602                })
603            } else {
604                debug!("Setting PRAGMA {name}={value}");
605                conn.pragma_update(None, name, value).map_err(|err| {
606                    error!("cannot update pragma `{name}`=`{value}` - {err:?}");
607                    InitializationError
608                })
609            }
610        }
611
612        let conn = Connection::open_with_flags(path, OpenFlags::default()).map_err(|err| {
613            error!("cannot open the connection - {err:?}");
614            InitializationError
615        })?;
616
617        for [pragma_name, default_value] in PRAGMA {
618            let pragma_value = pragma_override
619                .remove(pragma_name)
620                .unwrap_or_else(|| default_value.to_string());
621            pragma_update(&conn, pragma_name, &pragma_value)?;
622        }
623        // drain the rest overrides
624        for (pragma_name, pragma_value) in pragma_override.drain() {
625            pragma_update(&conn, &pragma_name, &pragma_value)?;
626        }
627
628        // t_metadata
629        conn_execute(&conn, CREATE_TABLE_T_METADATA, [])?;
630        // Insert row if not exists.
631        conn_execute(
632            &conn,
633            &format!(
634                "INSERT INTO t_metadata (schema_version, created_at) VALUES
635                    ({T_METADATA_EXPECTED_SCHEMA_VERSION}, ?) ON CONFLICT DO NOTHING"
636            ),
637            [Utc::now()],
638        )?;
639        // Fail on unexpected `schema_version`.
640        let actual_version = conn
641            .prepare("SELECT schema_version FROM t_metadata ORDER BY id DESC LIMIT 1")
642            .map_err(|err| {
643                error!("cannot select schema version - {err:?}");
644                InitializationError
645            })?
646            .query_row([], |row| row.get::<_, u32>("schema_version"));
647
648        let actual_version = actual_version.map_err(|err| {
649            error!("Cannot read the schema version - {err:?}");
650            InitializationError
651        })?;
652        if actual_version != T_METADATA_EXPECTED_SCHEMA_VERSION {
653            error!(
654                "wrong schema version, expected {T_METADATA_EXPECTED_SCHEMA_VERSION}, got {actual_version}"
655            );
656            return Err(InitializationError);
657        }
658
659        // t_execution_log
660        conn_execute(&conn, CREATE_TABLE_T_EXECUTION_LOG, [])?;
661        conn_execute(
662            &conn,
663            CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_VERSION,
664            [],
665        )?;
666        conn_execute(
667            &conn,
668            CREATE_INDEX_IDX_T_EXECUTION_ID_EXECUTION_ID_VARIANT,
669            [],
670        )?;
671        // t_join_set_response
672        conn_execute(&conn, CREATE_TABLE_T_JOIN_SET_RESPONSE, [])?;
673        conn_execute(
674            &conn,
675            CREATE_INDEX_IDX_T_JOIN_SET_RESPONSE_EXECUTION_ID_ID,
676            [],
677        )?;
678        // t_state
679        conn_execute(&conn, CREATE_TABLE_T_STATE, [])?;
680        conn_execute(&conn, IDX_T_STATE_LOCK_PENDING, [])?;
681        conn_execute(&conn, IDX_T_STATE_EXPIRED_TIMERS, [])?;
682        conn_execute(&conn, IDX_T_STATE_EXECUTION_ID_IS_TOP_LEVEL, [])?;
683        conn_execute(&conn, IDX_T_STATE_FFQN, [])?;
684        conn_execute(&conn, IDX_T_STATE_CREATED_AT, [])?;
685        // t_delay
686        conn_execute(&conn, CREATE_TABLE_T_DELAY, [])?;
687        // t_backtrace
688        conn_execute(&conn, CREATE_TABLE_T_BACKTRACE, [])?;
689        conn_execute(&conn, IDX_T_BACKTRACE_EXECUTION_ID_VERSION, [])?;
690        Ok(conn)
691    }
692
693    fn connection_rpc(
694        mut conn: Connection,
695        shutdown_requested: &AtomicBool,
696        shutdown_finished: &AtomicBool,
697        mut command_rx: mpsc::Receiver<ThreadCommand>,
698        metrics_threshold: Option<Duration>,
699    ) {
700        let mut histograms = Histograms::new(metrics_threshold);
701        while Self::tick(
702            &mut conn,
703            shutdown_requested,
704            &mut command_rx,
705            &mut histograms,
706        )
707        .is_ok()
708        {
709            // Loop until shutdown is set to true.
710        }
711        debug!("Closing command thread");
712        shutdown_finished.store(true, Ordering::Release);
713    }
714
715    fn tick(
716        conn: &mut Connection,
717        shutdown_requested: &AtomicBool,
718        command_rx: &mut mpsc::Receiver<ThreadCommand>,
719        histograms: &mut Histograms,
720    ) -> Result<(), ()> {
721        let mut ltx_list = Vec::new();
722        // Wait for first logical tx.
723        let mut ltx = match command_rx.blocking_recv() {
724            Some(ThreadCommand::LogicalTx(ltx)) => ltx,
725            Some(ThreadCommand::Shutdown) => {
726                debug!("shutdown message received");
727                return Err(());
728            }
729            None => {
730                debug!("command_rx was closed");
731                return Err(());
732            }
733        };
734        let all_fns_start = std::time::Instant::now();
735        let mut physical_tx = conn
736            .transaction_with_behavior(TransactionBehavior::Immediate)
737            .map_err(|begin_err| {
738                error!("Cannot open transaction, closing sqlite - {begin_err:?}");
739            })?;
740        Self::ltx_apply_to_tx(&mut ltx, &mut physical_tx, histograms);
741        ltx_list.push(ltx);
742
743        while let Ok(more) = command_rx.try_recv() {
744            let mut ltx = match more {
745                ThreadCommand::Shutdown => {
746                    debug!("shutdown message received");
747                    return Err(());
748                }
749                ThreadCommand::LogicalTx(ltx) => ltx,
750            };
751
752            Self::ltx_apply_to_tx(&mut ltx, &mut physical_tx, histograms);
753            ltx_list.push(ltx);
754        }
755        histograms.record_all_fns(all_fns_start.elapsed());
756
757        {
758            // Commit
759            if shutdown_requested.load(Ordering::Relaxed) {
760                debug!("Recveived shutdown during processing of the batch");
761                return Err(());
762            }
763            let commit_result = {
764                let now = std::time::Instant::now();
765                let commit_result = physical_tx.commit().map_err(RusqliteError::from);
766                histograms.record_commit(now.elapsed());
767                commit_result
768            };
769            if commit_result.is_ok() || ltx_list.len() == 1 {
770                // Happy path - just send the commit ACK.
771                for ltx in ltx_list {
772                    // Ignore the result: ThreadCommand producer timed out before awaiting the ack.
773                    let _ = ltx.commit_ack_sender.send(commit_result.clone());
774                }
775            } else {
776                // Bulk transaction failed. Replay each ltx in its own physical transaction.
777                for ltx in ltx_list {
778                    Self::ltx_commit_single(ltx, conn, shutdown_requested, histograms)?;
779                }
780            }
781        }
782        histograms.print_if_elapsed();
783        Ok(())
784    }
785
786    fn ltx_commit_single(
787        mut ltx: LogicalTx,
788        conn: &mut Connection,
789        shutdown_requested: &AtomicBool,
790        histograms: &mut Histograms,
791    ) -> Result<(), ()> {
792        let mut physical_tx = conn
793            .transaction_with_behavior(TransactionBehavior::Immediate)
794            .map_err(|begin_err| {
795                error!("Cannot open transaction, closing sqlite - {begin_err:?}");
796            })?;
797        Self::ltx_apply_to_tx(&mut ltx, &mut physical_tx, histograms);
798        if shutdown_requested.load(Ordering::Relaxed) {
799            debug!("Recveived shutdown during processing of the batch");
800            return Err(());
801        }
802        let commit_result = {
803            let now = std::time::Instant::now();
804            let commit_result = physical_tx.commit().map_err(RusqliteError::from);
805            histograms.record_commit(now.elapsed());
806            commit_result
807        };
808        // Ignore the result: ThreadCommand producer timed out before awaiting the ack.
809        let _ = ltx.commit_ack_sender.send(commit_result);
810        Ok(())
811    }
812
813    fn ltx_apply_to_tx(
814        ltx: &mut LogicalTx,
815        physical_tx: &mut Transaction,
816        histograms: &mut Histograms,
817    ) {
818        let sent_latency = ltx.sent_at.elapsed();
819        let started_at = Instant::now();
820        (ltx.func)(physical_tx);
821        histograms.record_command(sent_latency, ltx.func_name, started_at.elapsed());
822    }
823
824    #[instrument(level = Level::DEBUG, skip_all, name = "sqlite_new")]
825    pub async fn new<P: AsRef<Path>>(
826        path: P,
827        config: SqliteConfig,
828    ) -> Result<Self, InitializationError> {
829        let path = path.as_ref().to_owned();
830
831        let shutdown_requested = Arc::new(AtomicBool::new(false));
832        let shutdown_finished = Arc::new(AtomicBool::new(false));
833
834        let (command_tx, command_rx) = tokio::sync::mpsc::channel(config.queue_capacity);
835        info!("Sqlite database location: {path:?}");
836        let join_handle = {
837            // Initialize the `Connection`.
838            let init_task = {
839                tokio::task::spawn_blocking(move || {
840                    Self::init_thread(&path, config.pragma_override.unwrap_or_default())
841                })
842                .await
843            };
844            let conn = match init_task {
845                Ok(res) => res?,
846                Err(join_err) => {
847                    error!("Initialization panic - {join_err:?}");
848                    return Err(InitializationError);
849                }
850            };
851            let shutdown_requested = shutdown_requested.clone();
852            let shutdown_finished = shutdown_finished.clone();
853            // Start the RPC thread.
854            std::thread::spawn(move || {
855                Self::connection_rpc(
856                    conn,
857                    &shutdown_requested,
858                    &shutdown_finished,
859                    command_rx,
860                    config.metrics_threshold,
861                );
862            })
863        };
864        Ok(SqlitePool(SqlitePoolInner {
865            shutdown_requested,
866            shutdown_finished,
867            command_tx,
868            response_subscribers: Arc::default(),
869            pending_ffqn_subscribers: Arc::default(),
870            join_handle: Some(Arc::new(join_handle)),
871            execution_finished_subscribers: Arc::default(),
872        }))
873    }
874
875    /// Invokes the provided function wrapping a new [`rusqlite::Transaction`] that is committed automatically.
876    async fn transaction<F, T, E>(&self, mut func: F, func_name: &'static str) -> Result<T, E>
877    where
878        F: FnMut(&mut rusqlite::Transaction) -> Result<T, E> + Send + 'static,
879        T: Send + 'static,
880        E: From<DbErrorGeneric> + From<RusqliteError> + Send + 'static,
881    {
882        let fn_res: Arc<std::sync::Mutex<Option<_>>> = Arc::default();
883        let (commit_ack_sender, commit_ack_receiver) = oneshot::channel();
884        let thread_command_func = {
885            let fn_res = fn_res.clone();
886            ThreadCommand::LogicalTx(LogicalTx {
887                func: Box::new(move |tx| {
888                    let func_res = func(tx);
889                    *fn_res.lock().unwrap() = Some(func_res);
890                }),
891                sent_at: Instant::now(),
892                func_name,
893                commit_ack_sender,
894            })
895        };
896        self.0
897            .command_tx
898            .send(thread_command_func)
899            .await
900            .map_err(|_send_err| DbErrorGeneric::Close)?;
901
902        // Wait for commit ack, then get the retval from the mutex.
903        match commit_ack_receiver.await {
904            Ok(Ok(())) => {
905                let mut guard = fn_res.lock().unwrap();
906                std::mem::take(guard.deref_mut()).expect("ltx must have been run at least once")
907            }
908            Ok(Err(rusqlite_err)) => Err(E::from(rusqlite_err)),
909            Err(_) => Err(E::from(DbErrorGeneric::Close)),
910        }
911    }
912
913    fn fetch_created_event(
914        conn: &Connection,
915        execution_id: &ExecutionId,
916    ) -> Result<CreateRequest, DbErrorRead> {
917        let mut stmt = conn.prepare(
918            "SELECT created_at, json_value FROM t_execution_log WHERE \
919            execution_id = :execution_id AND version = 0",
920        )?;
921        let (created_at, event) = stmt.query_row(
922            named_params! {
923                ":execution_id": execution_id.to_string(),
924            },
925            |row| {
926                let created_at = row.get("created_at")?;
927                let event = row
928                    .get::<_, JsonWrapper<ExecutionEventInner>>("json_value")
929                    .map_err(|serde| {
930                        error!("cannot deserialize `Created` event: {row:?} - `{serde:?}`");
931                        consistency_rusqlite("cannot deserialize `Created` event")
932                    })?;
933                Ok((created_at, event.0))
934            },
935        )?;
936        if let ExecutionEventInner::Created {
937            ffqn,
938            params,
939            parent,
940            scheduled_at,
941            component_id,
942            metadata,
943            scheduled_by,
944        } = event
945        {
946            Ok(CreateRequest {
947                created_at,
948                execution_id: execution_id.clone(),
949                ffqn,
950                params,
951                parent,
952                scheduled_at,
953                component_id,
954                metadata,
955                scheduled_by,
956            })
957        } else {
958            error!("Row with version=0 must be a `Created` event - {event:?}");
959            Err(consistency_db_err("expected `Created` event").into())
960        }
961    }
962
963    fn check_expected_next_and_appending_version(
964        expected_version: &Version,
965        appending_version: &Version,
966    ) -> Result<(), DbErrorWrite> {
967        if *expected_version != *appending_version {
968            debug!(
969                "Version conflict - expected: {expected_version:?}, appending: {appending_version:?}"
970            );
971            return Err(DbErrorWrite::Permanent(
972                DbErrorWritePermanent::CannotWrite {
973                    reason: "version conflict".into(),
974                    expected_version: Some(expected_version.clone()),
975                },
976            ));
977        }
978        Ok(())
979    }
980
981    #[instrument(level = Level::TRACE, skip_all, fields(execution_id = %req.execution_id))]
982    fn create_inner(
983        tx: &Transaction,
984        req: CreateRequest,
985    ) -> Result<(AppendResponse, AppendNotifier), DbErrorWrite> {
986        debug!("create_inner");
987
988        let version = Version::default();
989        let execution_id = req.execution_id.clone();
990        let execution_id_str = execution_id.to_string();
991        let ffqn = req.ffqn.clone();
992        let created_at = req.created_at;
993        let scheduled_at = req.scheduled_at;
994        let event = ExecutionEventInner::from(req);
995        let event_ser = serde_json::to_string(&event).map_err(|err| {
996            error!("Cannot serialize {event:?} - {err:?}");
997            DbErrorWritePermanent::ValidationFailed("parameter serialization error".into())
998        })?;
999        tx.prepare(
1000                "INSERT INTO t_execution_log (execution_id, created_at, version, json_value, variant, join_set_id ) \
1001                VALUES (:execution_id, :created_at, :version, :json_value, :variant, :join_set_id)")
1002        ?
1003        .execute(named_params! {
1004            ":execution_id": &execution_id_str,
1005            ":created_at": created_at,
1006            ":version": version.0,
1007            ":json_value": event_ser,
1008            ":variant": event.variant(),
1009            ":join_set_id": event.join_set_id().map(std::string::ToString::to_string),
1010        })
1011        ?;
1012        let pending_state = PendingState::PendingAt { scheduled_at };
1013        let pending_at = {
1014            let scheduled_at = assert_matches!(pending_state, PendingState::PendingAt { scheduled_at } => scheduled_at);
1015            debug!("Creating with `Pending(`{scheduled_at:?}`)");
1016            tx.prepare(
1017                r"
1018                INSERT INTO t_state (
1019                    execution_id,
1020                    is_top_level,
1021                    corresponding_version,
1022                    pending_expires_finished,
1023                    ffqn,
1024                    state,
1025                    created_at,
1026                    updated_at,
1027                    scheduled_at,
1028                    intermittent_event_count
1029                    )
1030                VALUES (
1031                    :execution_id,
1032                    :is_top_level,
1033                    :corresponding_version,
1034                    :pending_expires_finished,
1035                    :ffqn,
1036                    :state,
1037                    :created_at,
1038                    CURRENT_TIMESTAMP,
1039                    :scheduled_at,
1040                    0
1041                    )
1042                ",
1043            )?
1044            .execute(named_params! {
1045                ":execution_id": execution_id.to_string(),
1046                ":is_top_level": execution_id.is_top_level(),
1047                ":corresponding_version": version.0,
1048                ":pending_expires_finished": scheduled_at,
1049                ":ffqn": ffqn.to_string(),
1050                ":state": STATE_PENDING_AT,
1051                ":created_at": created_at,
1052                ":scheduled_at": scheduled_at,
1053            })?;
1054            AppendNotifier {
1055                pending_at: Some(NotifierPendingAt { scheduled_at, ffqn }),
1056                execution_finished: None,
1057                response: None,
1058            }
1059        };
1060        let next_version = Version::new(version.0 + 1);
1061        Ok((next_version, pending_at))
1062    }
1063
1064    #[instrument(level = Level::DEBUG, skip_all, fields(%execution_id, %scheduled_at, %corresponding_version))]
1065    fn update_state_pending_after_response_appended(
1066        tx: &Transaction,
1067        execution_id: &ExecutionId,
1068        scheduled_at: DateTime<Utc>,     // Changing to state PendingAt
1069        corresponding_version: &Version, // t_execution_log is not be changed
1070    ) -> Result<AppendNotifier, DbErrorWrite> {
1071        debug!("Setting t_state to Pending(`{scheduled_at:?}`) after response appended");
1072        let execution_id_str = execution_id.to_string();
1073        let mut stmt = tx
1074            .prepare_cached(
1075                r"
1076                UPDATE t_state
1077                SET
1078                    corresponding_version = :corresponding_version,
1079                    pending_expires_finished = :pending_expires_finished,
1080                    state = :state,
1081                    updated_at = CURRENT_TIMESTAMP,
1082
1083                    last_lock_version = NULL,
1084                    executor_id = NULL,
1085                    run_id = NULL,
1086
1087                    join_set_id = NULL,
1088                    join_set_closing = NULL,
1089
1090                    result_kind = NULL
1091                WHERE execution_id = :execution_id
1092            ",
1093            )
1094            .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1095        let updated = stmt
1096            .execute(named_params! {
1097                ":execution_id": execution_id_str,
1098                ":corresponding_version": corresponding_version.0,
1099                ":pending_expires_finished": scheduled_at,
1100                ":state": STATE_PENDING_AT,
1101            })
1102            .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1103        if updated != 1 {
1104            return Err(DbErrorWrite::NotFound);
1105        }
1106        Ok(AppendNotifier {
1107            pending_at: Some(NotifierPendingAt {
1108                scheduled_at,
1109                ffqn: Self::fetch_created_event(tx, execution_id)?.ffqn,
1110            }),
1111            execution_finished: None,
1112            response: None,
1113        })
1114    }
1115
1116    #[instrument(level = Level::DEBUG, skip_all, fields(%execution_id, %scheduled_at, %appending_version))]
1117    fn update_state_pending_after_event_appended(
1118        tx: &Transaction,
1119        execution_id: &ExecutionId,
1120        appending_version: &Version,
1121        scheduled_at: DateTime<Utc>, // Changing to state PendingAt
1122        intermittent_failure: bool,
1123    ) -> Result<(AppendResponse, AppendNotifier), DbErrorWrite> {
1124        debug!("Setting t_state to Pending(`{scheduled_at:?}`) after event appended");
1125        // If response idx is unknown, use 0. This distinguishs the two paths.
1126        // JoinNext will always send an actual idx.
1127        let mut stmt = tx
1128            .prepare_cached(
1129                r"
1130                UPDATE t_state
1131                SET
1132                    corresponding_version = :appending_version,
1133                    pending_expires_finished = :pending_expires_finished,
1134                    state = :state,
1135                    updated_at = CURRENT_TIMESTAMP,
1136                    intermittent_event_count = intermittent_event_count + :intermittent_delta,
1137
1138                    last_lock_version = NULL,
1139                    executor_id = NULL,
1140                    run_id = NULL,
1141
1142                    join_set_id = NULL,
1143                    join_set_closing = NULL,
1144
1145                    result_kind = NULL
1146                WHERE execution_id = :execution_id;
1147            ",
1148            )
1149            .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1150        let updated = stmt
1151            .execute(named_params! {
1152                ":execution_id": execution_id.to_string(),
1153                ":appending_version": appending_version.0,
1154                ":pending_expires_finished": scheduled_at,
1155                ":state": STATE_PENDING_AT,
1156                ":intermittent_delta": i32::from(intermittent_failure) // 0 or 1
1157            })
1158            .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1159        if updated != 1 {
1160            return Err(DbErrorWrite::NotFound);
1161        }
1162        Ok((
1163            appending_version.increment(),
1164            AppendNotifier {
1165                pending_at: Some(NotifierPendingAt {
1166                    scheduled_at,
1167                    ffqn: Self::fetch_created_event(tx, execution_id)?.ffqn,
1168                }),
1169                execution_finished: None,
1170                response: None,
1171            },
1172        ))
1173    }
1174
1175    fn update_state_locked_get_intermittent_event_count(
1176        tx: &Transaction,
1177        execution_id: &ExecutionId,
1178        executor_id: ExecutorId,
1179        run_id: RunId,
1180        lock_expires_at: DateTime<Utc>,
1181        appending_version: &Version,
1182        retry_config: ComponentRetryConfig,
1183    ) -> Result<u32, DbErrorWrite> {
1184        debug!("Setting t_state to Locked(`{lock_expires_at:?}`)");
1185        let backoff_millis =
1186            u64::try_from(retry_config.retry_exp_backoff.as_millis()).expect("backoff too big");
1187        let execution_id_str = execution_id.to_string();
1188        let mut stmt = tx
1189            .prepare_cached(
1190                r"
1191                UPDATE t_state
1192                SET
1193                    corresponding_version = :appending_version,
1194                    pending_expires_finished = :pending_expires_finished,
1195                    state = :state,
1196                    updated_at = CURRENT_TIMESTAMP,
1197
1198                    max_retries = :max_retries,
1199                    retry_exp_backoff_millis = :retry_exp_backoff_millis,
1200                    last_lock_version = :appending_version,
1201                    executor_id = :executor_id,
1202                    run_id = :run_id,
1203
1204                    join_set_id = NULL,
1205                    join_set_closing = NULL,
1206
1207                    result_kind = NULL
1208                WHERE execution_id = :execution_id
1209            ",
1210            )
1211            .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1212        let updated = stmt
1213            .execute(named_params! {
1214                ":execution_id": execution_id_str,
1215                ":appending_version": appending_version.0,
1216                ":pending_expires_finished": lock_expires_at,
1217                ":state": STATE_LOCKED,
1218                ":max_retries": retry_config.max_retries,
1219                ":retry_exp_backoff_millis": backoff_millis,
1220                ":executor_id": executor_id.to_string(),
1221                ":run_id": run_id.to_string(),
1222            })
1223            .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1224        if updated != 1 {
1225            return Err(DbErrorWrite::NotFound);
1226        }
1227
1228        // fetch intermittent event count from the just-inserted row.
1229        let intermittent_event_count = tx
1230            .prepare(
1231                "SELECT intermittent_event_count FROM t_state WHERE execution_id = :execution_id",
1232            )?
1233            .query_row(
1234                named_params! {
1235                    ":execution_id": execution_id_str,
1236                },
1237                |row| {
1238                    let intermittent_event_count = row.get("intermittent_event_count")?;
1239                    Ok(intermittent_event_count)
1240                },
1241            )
1242            .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1243
1244        Ok(intermittent_event_count)
1245    }
1246
1247    fn update_state_blocked(
1248        tx: &Transaction,
1249        execution_id: &ExecutionId,
1250        appending_version: &Version,
1251        // BlockedByJoinSet fields
1252        join_set_id: &JoinSetId,
1253        lock_expires_at: DateTime<Utc>,
1254        join_set_closing: bool,
1255    ) -> Result<
1256        AppendResponse, // next version
1257        DbErrorWrite,
1258    > {
1259        debug!("Setting t_state to BlockedByJoinSet(`{join_set_id}`)");
1260        let execution_id_str = execution_id.to_string();
1261        let mut stmt = tx.prepare_cached(
1262            r"
1263                UPDATE t_state
1264                SET
1265                    corresponding_version = :appending_version,
1266                    pending_expires_finished = :pending_expires_finished,
1267                    state = :state,
1268                    updated_at = CURRENT_TIMESTAMP,
1269
1270                    last_lock_version = NULL,
1271                    executor_id = NULL,
1272                    run_id = NULL,
1273
1274                    join_set_id = :join_set_id,
1275                    join_set_closing = :join_set_closing,
1276
1277                    result_kind = NULL
1278                WHERE execution_id = :execution_id
1279            ",
1280        )?;
1281        let updated = stmt.execute(named_params! {
1282            ":execution_id": execution_id_str,
1283            ":appending_version": appending_version.0,
1284            ":pending_expires_finished": lock_expires_at,
1285            ":state": STATE_BLOCKED_BY_JOIN_SET,
1286            ":join_set_id": join_set_id,
1287            ":join_set_closing": join_set_closing,
1288        })?;
1289        if updated != 1 {
1290            return Err(DbErrorWrite::NotFound);
1291        }
1292        Ok(appending_version.increment())
1293    }
1294
1295    fn update_state_finished(
1296        tx: &Transaction,
1297        execution_id: &ExecutionId,
1298        appending_version: &Version,
1299        // Finished fields
1300        finished_at: DateTime<Utc>,
1301        result_kind: PendingStateFinishedResultKind,
1302    ) -> Result<(), DbErrorWrite> {
1303        debug!("Setting t_state to Finished");
1304        let execution_id_str = execution_id.to_string();
1305        let mut stmt = tx.prepare_cached(
1306            r"
1307                UPDATE t_state
1308                SET
1309                    corresponding_version = :appending_version,
1310                    pending_expires_finished = :pending_expires_finished,
1311                    state = :state,
1312                    updated_at = CURRENT_TIMESTAMP,
1313
1314                    last_lock_version = NULL,
1315                    executor_id = NULL,
1316                    run_id = NULL,
1317
1318                    join_set_id = NULL,
1319                    join_set_closing = NULL,
1320
1321                    result_kind = :result_kind
1322                WHERE execution_id = :execution_id
1323            ",
1324        )?;
1325        let updated = stmt.execute(named_params! {
1326            ":execution_id": execution_id_str,
1327            ":appending_version": appending_version.0,
1328            ":pending_expires_finished": finished_at,
1329            ":state": STATE_FINISHED,
1330            ":result_kind": result_kind.to_string(),
1331        })?;
1332        if updated != 1 {
1333            return Err(DbErrorWrite::NotFound);
1334        }
1335        Ok(())
1336    }
1337
1338    // Upon appending new event to t_execution_log, copy the previous t_state with changed appending_version and created_at.
1339    #[instrument(level = Level::TRACE, skip_all, fields(%execution_id, %appending_version))]
1340    fn bump_state_next_version(
1341        tx: &Transaction,
1342        execution_id: &ExecutionId,
1343        appending_version: &Version,
1344        delay_req: Option<DelayReq>,
1345    ) -> Result<AppendResponse /* next version */, DbErrorWrite> {
1346        debug!("update_index_version");
1347        let execution_id_str = execution_id.to_string();
1348        let mut stmt = tx.prepare_cached(
1349            r"
1350                UPDATE t_state
1351                SET
1352                    corresponding_version = :appending_version,
1353                    updated_at = CURRENT_TIMESTAMP
1354                WHERE execution_id = :execution_id
1355            ",
1356        )?;
1357        let updated = stmt.execute(named_params! {
1358            ":execution_id": execution_id_str,
1359            ":appending_version": appending_version.0,
1360        })?;
1361        if updated != 1 {
1362            return Err(DbErrorWrite::NotFound);
1363        }
1364        if let Some(DelayReq {
1365            join_set_id,
1366            delay_id,
1367            expires_at,
1368        }) = delay_req
1369        {
1370            debug!("Inserting delay to `t_delay`");
1371            let mut stmt = tx.prepare_cached(
1372                "INSERT INTO t_delay (execution_id, join_set_id, delay_id, expires_at) \
1373                VALUES \
1374                (:execution_id, :join_set_id, :delay_id, :expires_at)",
1375            )?;
1376            stmt.execute(named_params! {
1377                ":execution_id": execution_id_str,
1378                ":join_set_id": join_set_id.to_string(),
1379                ":delay_id": delay_id.to_string(),
1380                ":expires_at": expires_at,
1381            })?;
1382        }
1383        Ok(appending_version.increment())
1384    }
1385
1386    fn get_combined_state(
1387        tx: &Transaction,
1388        execution_id: &ExecutionId,
1389    ) -> Result<CombinedState, DbErrorRead> {
1390        let mut stmt = tx.prepare(
1391            r"
1392                SELECT
1393                    state, ffqn, corresponding_version, pending_expires_finished,
1394                    last_lock_version, executor_id, run_id,
1395                    join_set_id, join_set_closing,
1396                    result_kind
1397                    FROM t_state
1398                WHERE
1399                    execution_id = :execution_id
1400                ",
1401        )?;
1402        stmt.query_row(
1403            named_params! {
1404                ":execution_id": execution_id.to_string(),
1405            },
1406            |row| {
1407                CombinedState::new(
1408                    &CombinedStateDTO {
1409                        state: row.get("state")?,
1410                        ffqn: row.get("ffqn")?,
1411                        pending_expires_finished: row
1412                            .get::<_, DateTime<Utc>>("pending_expires_finished")?,
1413                        last_lock_version: row
1414                            .get::<_, Option<VersionType>>("last_lock_version")?
1415                            .map(Version::new),
1416                        executor_id: row.get::<_, Option<ExecutorId>>("executor_id")?,
1417                        run_id: row.get::<_, Option<RunId>>("run_id")?,
1418                        join_set_id: row.get::<_, Option<JoinSetId>>("join_set_id")?,
1419                        join_set_closing: row.get::<_, Option<bool>>("join_set_closing")?,
1420                        result_kind: row
1421                            .get::<_, Option<FromStrWrapper<PendingStateFinishedResultKind>>>(
1422                                "result_kind",
1423                            )?
1424                            .map(|wrapper| wrapper.0),
1425                    },
1426                    Version::new(row.get("corresponding_version")?),
1427                )
1428            },
1429        )
1430        .map_err(DbErrorRead::from)
1431    }
1432
1433    fn list_executions(
1434        read_tx: &Transaction,
1435        ffqn: Option<&FunctionFqn>,
1436        top_level_only: bool,
1437        pagination: &ExecutionListPagination,
1438    ) -> Result<Vec<ExecutionWithState>, DbErrorGeneric> {
1439        struct StatementModifier<'a> {
1440            where_vec: Vec<String>,
1441            params: Vec<(&'static str, ToSqlOutput<'a>)>,
1442            limit: u32,
1443            limit_desc: bool,
1444        }
1445
1446        fn paginate<'a, T: rusqlite::ToSql + 'static>(
1447            pagination: &'a Pagination<Option<T>>,
1448            column: &str,
1449            top_level_only: bool,
1450        ) -> Result<StatementModifier<'a>, DbErrorGeneric> {
1451            let mut where_vec: Vec<String> = vec![];
1452            let mut params: Vec<(&'static str, ToSqlOutput<'a>)> = vec![];
1453            let limit = pagination.length();
1454            let limit_desc = pagination.is_desc();
1455            let rel = pagination.rel();
1456            match pagination {
1457                Pagination::NewerThan {
1458                    cursor: Some(cursor),
1459                    ..
1460                }
1461                | Pagination::OlderThan {
1462                    cursor: Some(cursor),
1463                    ..
1464                } => {
1465                    where_vec.push(format!("{column} {rel} :cursor"));
1466                    let cursor = cursor.to_sql().map_err(|err| {
1467                        error!("Possible program error - cannot convert cursor to sql - {err:?}");
1468                        DbErrorGeneric::Uncategorized("cannot convert cursor to sql".into())
1469                    })?;
1470                    params.push((":cursor", cursor));
1471                }
1472                _ => {}
1473            }
1474            if top_level_only {
1475                where_vec.push("is_top_level=true".to_string());
1476            }
1477            Ok(StatementModifier {
1478                where_vec,
1479                params,
1480                limit,
1481                limit_desc,
1482            })
1483        }
1484
1485        let mut statement_mod = match pagination {
1486            ExecutionListPagination::CreatedBy(pagination) => {
1487                paginate(pagination, "created_at", top_level_only)?
1488            }
1489            ExecutionListPagination::ExecutionId(pagination) => {
1490                paginate(pagination, "execution_id", top_level_only)?
1491            }
1492        };
1493
1494        let ffqn_temporary;
1495        if let Some(ffqn) = ffqn {
1496            statement_mod.where_vec.push("ffqn = :ffqn".to_string());
1497            ffqn_temporary = ffqn.to_string();
1498            let ffqn = ffqn_temporary
1499                .to_sql()
1500                .expect("string conversion never fails");
1501
1502            statement_mod.params.push((":ffqn", ffqn));
1503        }
1504
1505        let where_str = if statement_mod.where_vec.is_empty() {
1506            String::new()
1507        } else {
1508            format!("WHERE {}", statement_mod.where_vec.join(" AND "))
1509        };
1510        let sql = format!(
1511            r"
1512            SELECT created_at, scheduled_at, state, execution_id, ffqn, corresponding_version, pending_expires_finished,
1513            last_lock_version, executor_id, run_id,
1514            join_set_id, join_set_closing,
1515            result_kind
1516            FROM t_state {where_str} ORDER BY created_at {desc} LIMIT {limit}
1517            ",
1518            desc = if statement_mod.limit_desc { "DESC" } else { "" },
1519            limit = statement_mod.limit,
1520        );
1521        let mut vec: Vec<_> = read_tx
1522            .prepare(&sql)?
1523            .query_map::<_, &[(&'static str, ToSqlOutput)], _>(
1524                statement_mod
1525                    .params
1526                    .into_iter()
1527                    .collect::<Vec<_>>()
1528                    .as_ref(),
1529                |row| {
1530                    let execution_id = row.get::<_, ExecutionId>("execution_id")?;
1531                    let created_at = row.get("created_at")?;
1532                    let scheduled_at = row.get("scheduled_at")?;
1533                    let combined_state = CombinedState::new(
1534                        &CombinedStateDTO {
1535                            state: row.get("state")?,
1536                            ffqn: row.get("ffqn")?,
1537                            pending_expires_finished: row
1538                                .get::<_, DateTime<Utc>>("pending_expires_finished")?,
1539                            executor_id: row.get::<_, Option<ExecutorId>>("executor_id")?,
1540
1541                            last_lock_version: row
1542                                .get::<_, Option<VersionType>>("last_lock_version")?
1543                                .map(Version::new),
1544                            run_id: row.get::<_, Option<RunId>>("run_id")?,
1545                            join_set_id: row.get::<_, Option<JoinSetId>>("join_set_id")?,
1546                            join_set_closing: row.get::<_, Option<bool>>("join_set_closing")?,
1547                            result_kind: row
1548                                .get::<_, Option<FromStrWrapper<PendingStateFinishedResultKind>>>(
1549                                    "result_kind",
1550                                )?
1551                                .map(|wrapper| wrapper.0),
1552                        },
1553                        Version::new(row.get("corresponding_version")?),
1554                    )?;
1555                    Ok(ExecutionWithState {
1556                        execution_id,
1557                        ffqn: combined_state.ffqn,
1558                        pending_state: combined_state.pending_state,
1559                        created_at,
1560                        scheduled_at,
1561                    })
1562                },
1563            )?
1564            .collect::<Vec<Result<_, _>>>()
1565            .into_iter()
1566            .filter_map(|row| match row {
1567                Ok(row) => Some(row),
1568                Err(err) => {
1569                    warn!("Skipping row - {err:?}");
1570                    None
1571                }
1572            })
1573            .collect();
1574
1575        if !statement_mod.limit_desc {
1576            // the list must be sorted in descending order
1577            vec.reverse();
1578        }
1579        Ok(vec)
1580    }
1581
1582    #[instrument(level = Level::TRACE, skip_all, fields(%execution_id, %run_id, %executor_id))]
1583    #[expect(clippy::too_many_arguments)]
1584    fn lock_single_execution(
1585        tx: &Transaction,
1586        created_at: DateTime<Utc>,
1587        component_id: &ComponentId,
1588        execution_id: &ExecutionId,
1589        run_id: RunId,
1590        appending_version: &Version,
1591        executor_id: ExecutorId,
1592        lock_expires_at: DateTime<Utc>,
1593        retry_config: ComponentRetryConfig,
1594    ) -> Result<LockedExecution, DbErrorWrite> {
1595        debug!("lock_single_execution");
1596        let combined_state = Self::get_combined_state(tx, execution_id)?;
1597        combined_state.pending_state.can_append_lock(
1598            created_at,
1599            executor_id,
1600            run_id,
1601            lock_expires_at,
1602        )?;
1603        let expected_version = combined_state.get_next_version_assert_not_finished();
1604        Self::check_expected_next_and_appending_version(&expected_version, appending_version)?;
1605
1606        // Append to `execution_log` table.
1607        let event = ExecutionEventInner::Locked {
1608            component_id: component_id.clone(),
1609            executor_id,
1610            lock_expires_at,
1611            run_id,
1612            retry_config,
1613        };
1614        let event_ser = serde_json::to_string(&event).map_err(|err| {
1615            warn!("Cannot serialize {event:?} - {err:?}");
1616            DbErrorWritePermanent::ValidationFailed("parameter serialization error".into())
1617        })?;
1618        let mut stmt = tx
1619            .prepare_cached(
1620                "INSERT INTO t_execution_log \
1621            (execution_id, created_at, json_value, version, variant) \
1622            VALUES \
1623            (:execution_id, :created_at, :json_value, :version, :variant)",
1624            )
1625            .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1626        stmt.execute(named_params! {
1627            ":execution_id": execution_id.to_string(),
1628            ":created_at": created_at,
1629            ":json_value": event_ser,
1630            ":version": appending_version.0,
1631            ":variant": event.variant(),
1632        })
1633        .map_err(|err| {
1634            warn!("Cannot lock execution - {err:?}");
1635            DbErrorWritePermanent::CannotWrite {
1636                reason: "cannot lock execution".into(),
1637                expected_version: None,
1638            }
1639        })?;
1640
1641        // Update `t_state`
1642        let responses = Self::list_responses(tx, execution_id, None)?;
1643        let responses = responses.into_iter().map(|resp| resp.event).collect();
1644        trace!("Responses: {responses:?}");
1645
1646        let intermittent_event_count = Self::update_state_locked_get_intermittent_event_count(
1647            tx,
1648            execution_id,
1649            executor_id,
1650            run_id,
1651            lock_expires_at,
1652            appending_version,
1653            retry_config,
1654        )?;
1655        // Fetch event_history and `Created` event to construct the response.
1656        let mut events = tx
1657            .prepare(
1658                "SELECT json_value FROM t_execution_log WHERE \
1659                execution_id = :execution_id AND (variant = :v1 OR variant = :v2) \
1660                ORDER BY version",
1661            )?
1662            .query_map(
1663                named_params! {
1664                    ":execution_id": execution_id.to_string(),
1665                    ":v1": DUMMY_CREATED.variant(),
1666                    ":v2": DUMMY_HISTORY_EVENT.variant(),
1667                },
1668                |row| {
1669                    row.get::<_, JsonWrapper<ExecutionEventInner>>("json_value")
1670                        .map(|wrapper| wrapper.0)
1671                        .map_err(|serde| {
1672                            error!("Cannot deserialize {row:?} - {serde:?}");
1673                            consistency_rusqlite("cannot deserialize json value")
1674                        })
1675                },
1676            )?
1677            .collect::<Result<Vec<_>, _>>()?
1678            .into_iter()
1679            .collect::<VecDeque<_>>();
1680        let Some(ExecutionEventInner::Created {
1681            ffqn,
1682            params,
1683            parent,
1684            metadata,
1685            ..
1686        }) = events.pop_front()
1687        else {
1688            error!("Execution log must contain at least `Created` event");
1689            return Err(consistency_db_err("execution log must contain `Created` event").into());
1690        };
1691
1692        let event_history = events
1693            .into_iter()
1694            .map(|event| {
1695                if let ExecutionEventInner::HistoryEvent { event } = event {
1696                    Ok(event)
1697                } else {
1698                    error!("Rows can only contain `Created` and `HistoryEvent` event kinds");
1699                    Err(consistency_db_err(
1700                        "rows can only contain `Created` and `HistoryEvent` event kinds",
1701                    ))
1702                }
1703            })
1704            .collect::<Result<Vec<_>, _>>()?;
1705        Ok(LockedExecution {
1706            execution_id: execution_id.clone(),
1707            metadata,
1708            run_id,
1709            next_version: appending_version.increment(),
1710            ffqn,
1711            params,
1712            event_history,
1713            responses,
1714            parent,
1715            intermittent_event_count,
1716        })
1717    }
1718
1719    fn count_join_next(
1720        tx: &Transaction,
1721        execution_id: &ExecutionId,
1722        join_set_id: &JoinSetId,
1723    ) -> Result<u64, DbErrorRead> {
1724        let mut stmt = tx.prepare(
1725            "SELECT COUNT(*) as count FROM t_execution_log WHERE execution_id = :execution_id AND join_set_id = :join_set_id \
1726            AND history_event_type = :join_next",
1727        )?;
1728        Ok(stmt.query_row(
1729            named_params! {
1730                ":execution_id": execution_id.to_string(),
1731                ":join_set_id": join_set_id.to_string(),
1732                ":join_next": HISTORY_EVENT_TYPE_JOIN_NEXT,
1733            },
1734            |row| row.get("count"),
1735        )?)
1736    }
1737
1738    #[instrument(level = Level::TRACE, skip_all, fields(%execution_id))]
1739    #[expect(clippy::needless_return)]
1740    fn append(
1741        tx: &Transaction,
1742        execution_id: &ExecutionId,
1743        req: AppendRequest,
1744        appending_version: Version,
1745    ) -> Result<(AppendResponse, AppendNotifier), DbErrorWrite> {
1746        if matches!(req.event, ExecutionEventInner::Created { .. }) {
1747            return Err(DbErrorWrite::Permanent(
1748                DbErrorWritePermanent::ValidationFailed(
1749                    "cannot append `Created` event - use `create` instead".into(),
1750                ),
1751            ));
1752        }
1753        if let AppendRequest {
1754            event:
1755                ExecutionEventInner::Locked {
1756                    component_id,
1757                    executor_id,
1758                    run_id,
1759                    lock_expires_at,
1760                    retry_config,
1761                },
1762            created_at,
1763        } = req
1764        {
1765            return Self::lock_single_execution(
1766                tx,
1767                created_at,
1768                &component_id,
1769                execution_id,
1770                run_id,
1771                &appending_version,
1772                executor_id,
1773                lock_expires_at,
1774                retry_config,
1775            )
1776            .map(|locked_execution| (locked_execution.next_version, AppendNotifier::default()));
1777        }
1778
1779        let combined_state = Self::get_combined_state(tx, execution_id)?;
1780        if combined_state.pending_state.is_finished() {
1781            debug!("Execution is already finished");
1782            return Err(DbErrorWrite::Permanent(
1783                DbErrorWritePermanent::CannotWrite {
1784                    reason: "already finished".into(),
1785                    expected_version: None,
1786                },
1787            ));
1788        }
1789
1790        Self::check_expected_next_and_appending_version(
1791            &combined_state.get_next_version_assert_not_finished(),
1792            &appending_version,
1793        )?;
1794        let event_ser = serde_json::to_string(&req.event).map_err(|err| {
1795            error!("Cannot serialize {:?} - {err:?}", req.event);
1796            DbErrorWritePermanent::ValidationFailed("parameter serialization error".into())
1797        })?;
1798
1799        let mut stmt = tx.prepare(
1800                    "INSERT INTO t_execution_log (execution_id, created_at, json_value, version, variant, join_set_id) \
1801                    VALUES (:execution_id, :created_at, :json_value, :version, :variant, :join_set_id)")
1802                    ?;
1803        stmt.execute(named_params! {
1804            ":execution_id": execution_id.to_string(),
1805            ":created_at": req.created_at,
1806            ":json_value": event_ser,
1807            ":version": appending_version.0,
1808            ":variant": req.event.variant(),
1809            ":join_set_id": req.event.join_set_id().map(std::string::ToString::to_string),
1810        })?;
1811        // Calculate current pending state
1812
1813        match &req.event {
1814            ExecutionEventInner::Created { .. } => {
1815                unreachable!("handled in the caller")
1816            }
1817
1818            ExecutionEventInner::Locked { .. } => {
1819                unreachable!("handled above")
1820            }
1821
1822            ExecutionEventInner::TemporarilyFailed {
1823                backoff_expires_at, ..
1824            }
1825            | ExecutionEventInner::TemporarilyTimedOut {
1826                backoff_expires_at, ..
1827            } => {
1828                let (next_version, notifier) = Self::update_state_pending_after_event_appended(
1829                    tx,
1830                    execution_id,
1831                    &appending_version,
1832                    *backoff_expires_at,
1833                    true, // an intermittent failure
1834                )?;
1835                return Ok((next_version, notifier));
1836            }
1837
1838            ExecutionEventInner::Unlocked {
1839                backoff_expires_at, ..
1840            } => {
1841                let (next_version, notifier) = Self::update_state_pending_after_event_appended(
1842                    tx,
1843                    execution_id,
1844                    &appending_version,
1845                    *backoff_expires_at,
1846                    false, // not an intermittent failure
1847                )?;
1848                return Ok((next_version, notifier));
1849            }
1850
1851            ExecutionEventInner::Finished { result, .. } => {
1852                Self::update_state_finished(
1853                    tx,
1854                    execution_id,
1855                    &appending_version,
1856                    req.created_at,
1857                    PendingStateFinishedResultKind::from(result),
1858                )?;
1859                return Ok((
1860                    appending_version,
1861                    AppendNotifier {
1862                        pending_at: None,
1863                        execution_finished: Some(NotifierExecutionFinished {
1864                            execution_id: execution_id.clone(),
1865                            retval: result.clone(),
1866                        }),
1867                        response: None,
1868                    },
1869                ));
1870            }
1871
1872            ExecutionEventInner::HistoryEvent {
1873                event:
1874                    HistoryEvent::JoinSetCreate { .. }
1875                    | HistoryEvent::JoinSetRequest {
1876                        request: JoinSetRequest::ChildExecutionRequest { .. },
1877                        ..
1878                    }
1879                    | HistoryEvent::Persist { .. }
1880                    | HistoryEvent::Schedule { .. }
1881                    | HistoryEvent::Stub { .. }
1882                    | HistoryEvent::JoinNextTooMany { .. },
1883            } => {
1884                return Ok((
1885                    Self::bump_state_next_version(tx, execution_id, &appending_version, None)?,
1886                    AppendNotifier::default(),
1887                ));
1888            }
1889
1890            ExecutionEventInner::HistoryEvent {
1891                event:
1892                    HistoryEvent::JoinSetRequest {
1893                        join_set_id,
1894                        request:
1895                            JoinSetRequest::DelayRequest {
1896                                delay_id,
1897                                expires_at,
1898                                ..
1899                            },
1900                    },
1901            } => {
1902                return Ok((
1903                    Self::bump_state_next_version(
1904                        tx,
1905                        execution_id,
1906                        &appending_version,
1907                        Some(DelayReq {
1908                            join_set_id: join_set_id.clone(),
1909                            delay_id: delay_id.clone(),
1910                            expires_at: *expires_at,
1911                        }),
1912                    )?,
1913                    AppendNotifier::default(),
1914                ));
1915            }
1916
1917            ExecutionEventInner::HistoryEvent {
1918                event:
1919                    HistoryEvent::JoinNext {
1920                        join_set_id,
1921                        run_expires_at,
1922                        closing,
1923                        requested_ffqn: _,
1924                    },
1925            } => {
1926                // Did the response arrive already?
1927                let join_next_count = Self::count_join_next(tx, execution_id, join_set_id)?;
1928                let nth_response =
1929                    Self::nth_response(tx, execution_id, join_set_id, join_next_count - 1)?; // Skip n-1 rows
1930                trace!("join_next_count: {join_next_count}, nth_response: {nth_response:?}");
1931                assert!(join_next_count > 0);
1932                if let Some(ResponseWithCursor {
1933                    event:
1934                        JoinSetResponseEventOuter {
1935                            created_at: nth_created_at,
1936                            ..
1937                        },
1938                    cursor: _,
1939                }) = nth_response
1940                {
1941                    let scheduled_at = max(*run_expires_at, nth_created_at); // No need to block
1942                    let (next_version, notifier) = Self::update_state_pending_after_event_appended(
1943                        tx,
1944                        execution_id,
1945                        &appending_version,
1946                        scheduled_at,
1947                        false, // not an intermittent failure
1948                    )?;
1949                    return Ok((next_version, notifier));
1950                }
1951                return Ok((
1952                    Self::update_state_blocked(
1953                        tx,
1954                        execution_id,
1955                        &appending_version,
1956                        join_set_id,
1957                        *run_expires_at,
1958                        *closing,
1959                    )?,
1960                    AppendNotifier::default(),
1961                ));
1962            }
1963        }
1964    }
1965
1966    fn append_response(
1967        tx: &Transaction,
1968        execution_id: &ExecutionId,
1969        response_outer: JoinSetResponseEventOuter,
1970    ) -> Result<AppendNotifier, DbErrorWrite> {
1971        let mut stmt = tx.prepare(
1972            "INSERT INTO t_join_set_response (execution_id, created_at, join_set_id, delay_id, child_execution_id, finished_version) \
1973                    VALUES (:execution_id, :created_at, :join_set_id, :delay_id, :child_execution_id, :finished_version)",
1974        )?;
1975        let join_set_id = &response_outer.event.join_set_id;
1976        let delay_id = match &response_outer.event.event {
1977            JoinSetResponse::DelayFinished { delay_id } => Some(delay_id.to_string()),
1978            JoinSetResponse::ChildExecutionFinished { .. } => None,
1979        };
1980        let (child_execution_id, finished_version) = match &response_outer.event.event {
1981            JoinSetResponse::ChildExecutionFinished {
1982                child_execution_id,
1983                finished_version,
1984                result: _,
1985            } => (
1986                Some(child_execution_id.to_string()),
1987                Some(finished_version.0),
1988            ),
1989            JoinSetResponse::DelayFinished { .. } => (None, None),
1990        };
1991
1992        stmt.execute(named_params! {
1993            ":execution_id": execution_id.to_string(),
1994            ":created_at": response_outer.created_at,
1995            ":join_set_id": join_set_id.to_string(),
1996            ":delay_id": delay_id,
1997            ":child_execution_id": child_execution_id,
1998            ":finished_version": finished_version,
1999        })?;
2000
2001        // if the execution is going to be unblocked by this response...
2002        let combined_state = Self::get_combined_state(tx, execution_id)?;
2003        debug!("previous_pending_state: {combined_state:?}");
2004        let mut notifier = if let PendingState::BlockedByJoinSet {
2005            join_set_id: found_join_set_id,
2006            lock_expires_at, // Set to a future time if the worker is keeping the execution warm waiting for the result.
2007            closing: _,
2008        } = combined_state.pending_state
2009            && *join_set_id == found_join_set_id
2010        {
2011            // PendingAt should be set to current time if called from expired_timers_watcher,
2012            // or to a future time if the execution is hot.
2013            let scheduled_at = max(lock_expires_at, response_outer.created_at);
2014            // TODO: Add diff test
2015            // Unblock the state.
2016            Self::update_state_pending_after_response_appended(
2017                tx,
2018                execution_id,
2019                scheduled_at,
2020                &combined_state.corresponding_version, // not changing the version
2021            )?
2022        } else {
2023            AppendNotifier::default()
2024        };
2025        if let JoinSetResponseEvent {
2026            join_set_id,
2027            event: JoinSetResponse::DelayFinished { delay_id },
2028        } = &response_outer.event
2029        {
2030            debug!(%join_set_id, %delay_id, "Deleting from `t_delay`");
2031            let mut stmt =
2032                tx.prepare_cached("DELETE FROM t_delay WHERE execution_id = :execution_id AND join_set_id = :join_set_id AND delay_id = :delay_id")
2033                ?;
2034            stmt.execute(named_params! {
2035                ":execution_id": execution_id.to_string(),
2036                ":join_set_id": join_set_id.to_string(),
2037                ":delay_id": delay_id.to_string(),
2038            })?;
2039        }
2040        notifier.response = Some((execution_id.clone(), response_outer));
2041        Ok(notifier)
2042    }
2043
2044    fn append_backtrace(
2045        tx: &Transaction,
2046        backtrace_info: &BacktraceInfo,
2047    ) -> Result<(), DbErrorWrite> {
2048        let backtrace = serde_json::to_string(&backtrace_info.wasm_backtrace).map_err(|err| {
2049            warn!(
2050                "Cannot serialize backtrace {:?} - {err:?}",
2051                backtrace_info.wasm_backtrace
2052            );
2053            DbErrorWritePermanent::ValidationFailed("cannot serialize backtrace".into())
2054        })?;
2055        let mut stmt = tx
2056            .prepare(
2057                "INSERT INTO t_backtrace (execution_id, component_id, version_min_including, version_max_excluding, wasm_backtrace) \
2058                    VALUES (:execution_id, :component_id, :version_min_including, :version_max_excluding, :wasm_backtrace)",
2059            )
2060            ?;
2061        stmt.execute(named_params! {
2062            ":execution_id": backtrace_info.execution_id.to_string(),
2063            ":component_id": backtrace_info.component_id.to_string(),
2064            ":version_min_including": backtrace_info.version_min_including.0,
2065            ":version_max_excluding": backtrace_info.version_max_excluding.0,
2066            ":wasm_backtrace": backtrace,
2067        })?;
2068        Ok(())
2069    }
2070
2071    #[cfg(feature = "test")]
2072    fn get(
2073        tx: &Transaction,
2074        execution_id: &ExecutionId,
2075    ) -> Result<concepts::storage::ExecutionLog, DbErrorRead> {
2076        let mut stmt = tx.prepare(
2077            "SELECT created_at, json_value FROM t_execution_log WHERE \
2078                        execution_id = :execution_id ORDER BY version",
2079        )?;
2080        let events = stmt
2081            .query_map(
2082                named_params! {
2083                    ":execution_id": execution_id.to_string(),
2084                },
2085                |row| {
2086                    let created_at = row.get("created_at")?;
2087                    let event = row
2088                        .get::<_, JsonWrapper<ExecutionEventInner>>("json_value")
2089                        .map_err(|serde| {
2090                            error!("Cannot deserialize {row:?} - {serde:?}");
2091                            consistency_rusqlite("cannot deserialize event")
2092                        })?
2093                        .0;
2094
2095                    Ok(ExecutionEvent {
2096                        created_at,
2097                        event,
2098                        backtrace_id: None,
2099                    })
2100                },
2101            )?
2102            .collect::<Result<Vec<_>, _>>()?;
2103        if events.is_empty() {
2104            return Err(DbErrorRead::NotFound);
2105        }
2106        let combined_state = Self::get_combined_state(tx, execution_id)?;
2107        let responses = Self::list_responses(tx, execution_id, None)?
2108            .into_iter()
2109            .map(|resp| resp.event)
2110            .collect();
2111        Ok(concepts::storage::ExecutionLog {
2112            execution_id: execution_id.clone(),
2113            events,
2114            responses,
2115            next_version: combined_state.get_next_version_or_finished(), // In case of finished, this will be the already last version
2116            pending_state: combined_state.pending_state,
2117        })
2118    }
2119
2120    fn list_execution_events(
2121        tx: &Transaction,
2122        execution_id: &ExecutionId,
2123        version_min: VersionType,
2124        version_max_excluding: VersionType,
2125        include_backtrace_id: bool,
2126    ) -> Result<Vec<ExecutionEvent>, DbErrorRead> {
2127        let select = if include_backtrace_id {
2128            "SELECT
2129                log.created_at,
2130                log.json_value,
2131                -- Select version_min_including from backtrace if a match is found, otherwise NULL
2132                bt.version_min_including AS backtrace_id
2133            FROM
2134                t_execution_log AS log
2135            LEFT OUTER JOIN -- Use LEFT JOIN to keep all logs even if no backtrace matches
2136                t_backtrace AS bt ON log.execution_id = bt.execution_id
2137                                -- Check if the log's version falls within the backtrace's range
2138                                AND log.version >= bt.version_min_including
2139                                AND log.version < bt.version_max_excluding
2140            WHERE
2141                log.execution_id = :execution_id
2142                AND log.version >= :version_min
2143                AND log.version < :version_max_excluding
2144            ORDER BY
2145                log.version;"
2146        } else {
2147            "SELECT
2148                created_at, json_value, NULL as backtrace_id
2149            FROM t_execution_log WHERE
2150                execution_id = :execution_id AND version >= :version_min AND version < :version_max_excluding
2151            ORDER BY version"
2152        };
2153        tx.prepare(select)?
2154            .query_map(
2155                named_params! {
2156                    ":execution_id": execution_id.to_string(),
2157                    ":version_min": version_min,
2158                    ":version_max_excluding": version_max_excluding
2159                },
2160                |row| {
2161                    let created_at = row.get("created_at")?;
2162                    let backtrace_id = row
2163                        .get::<_, Option<VersionType>>("backtrace_id")?
2164                        .map(Version::new);
2165                    let event = row
2166                        .get::<_, JsonWrapper<ExecutionEventInner>>("json_value")
2167                        .map(|event| ExecutionEvent {
2168                            created_at,
2169                            event: event.0,
2170                            backtrace_id,
2171                        })
2172                        .map_err(|serde| {
2173                            error!("Cannot deserialize {row:?} - {serde:?}");
2174                            consistency_rusqlite("cannot deserialize")
2175                        })?;
2176                    Ok(event)
2177                },
2178            )?
2179            .collect::<Result<Vec<_>, _>>()
2180            .map_err(DbErrorRead::from)
2181    }
2182
2183    fn get_execution_event(
2184        tx: &Transaction,
2185        execution_id: &ExecutionId,
2186        version: VersionType,
2187    ) -> Result<ExecutionEvent, DbErrorRead> {
2188        let mut stmt = tx.prepare(
2189            "SELECT created_at, json_value FROM t_execution_log WHERE \
2190                        execution_id = :execution_id AND version = :version",
2191        )?;
2192        stmt.query_row(
2193            named_params! {
2194                ":execution_id": execution_id.to_string(),
2195                ":version": version,
2196            },
2197            |row| {
2198                let created_at = row.get("created_at")?;
2199                let event = row
2200                    .get::<_, JsonWrapper<ExecutionEventInner>>("json_value")
2201                    .map_err(|serde| {
2202                        error!("Cannot deserialize {row:?} - {serde:?}");
2203                        consistency_rusqlite("cannot deserialize event")
2204                    })?;
2205
2206                Ok(ExecutionEvent {
2207                    created_at,
2208                    event: event.0,
2209                    backtrace_id: None,
2210                })
2211            },
2212        )
2213        .map_err(DbErrorRead::from)
2214    }
2215
2216    fn list_responses(
2217        tx: &Transaction,
2218        execution_id: &ExecutionId,
2219        pagination: Option<Pagination<u32>>,
2220    ) -> Result<Vec<ResponseWithCursor>, DbErrorRead> {
2221        // TODO: Add test
2222        let mut params: Vec<(&'static str, Box<dyn rusqlite::ToSql>)> = vec![];
2223        let mut sql = "SELECT \
2224            r.id, r.created_at, r.join_set_id,  r.delay_id, r.child_execution_id, r.finished_version, l.json_value \
2225            FROM t_join_set_response r LEFT OUTER JOIN t_execution_log l ON r.child_execution_id = l.execution_id \
2226            WHERE \
2227            r.execution_id = :execution_id \
2228            AND ( r.finished_version = l.version OR r.child_execution_id IS NULL ) \
2229            "
2230        .to_string();
2231        let limit = match &pagination {
2232            Some(
2233                pagination @ (Pagination::NewerThan { cursor, .. }
2234                | Pagination::OlderThan { cursor, .. }),
2235            ) => {
2236                params.push((":cursor", Box::new(cursor)));
2237                write!(sql, " AND r.id {rel} :cursor", rel = pagination.rel()).unwrap();
2238                Some(pagination.length())
2239            }
2240            None => None,
2241        };
2242        sql.push_str(" ORDER BY id");
2243        if pagination.as_ref().is_some_and(Pagination::is_desc) {
2244            sql.push_str(" DESC");
2245        }
2246        if let Some(limit) = limit {
2247            write!(sql, " LIMIT {limit}").unwrap();
2248        }
2249        params.push((":execution_id", Box::new(execution_id.to_string())));
2250        tx.prepare(&sql)?
2251            .query_map::<_, &[(&'static str, &dyn ToSql)], _>(
2252                params
2253                    .iter()
2254                    .map(|(key, value)| (*key, value.as_ref()))
2255                    .collect::<Vec<_>>()
2256                    .as_ref(),
2257                Self::parse_response_with_cursor,
2258            )?
2259            .collect::<Result<Vec<_>, rusqlite::Error>>()
2260            .map_err(DbErrorRead::from)
2261    }
2262
2263    fn parse_response_with_cursor(
2264        row: &rusqlite::Row<'_>,
2265    ) -> Result<ResponseWithCursor, rusqlite::Error> {
2266        let id = row.get("id")?;
2267        let created_at: DateTime<Utc> = row.get("created_at")?;
2268        let join_set_id = row.get::<_, JoinSetId>("join_set_id")?;
2269        let event = match (
2270            row.get::<_, Option<DelayId>>("delay_id")?,
2271            row.get::<_, Option<ExecutionIdDerived>>("child_execution_id")?,
2272            row.get::<_, Option<VersionType>>("finished_version")?,
2273            row.get::<_, Option<JsonWrapper<ExecutionEventInner>>>("json_value")?,
2274        ) {
2275            (Some(delay_id), None, None, None) => JoinSetResponse::DelayFinished { delay_id },
2276            (
2277                None,
2278                Some(child_execution_id),
2279                Some(finished_version),
2280                Some(JsonWrapper(ExecutionEventInner::Finished { result, .. })),
2281            ) => JoinSetResponse::ChildExecutionFinished {
2282                child_execution_id,
2283                finished_version: Version(finished_version),
2284                result,
2285            },
2286            (delay, child, finished, result) => {
2287                error!(
2288                    "Invalid row in t_join_set_response {id} - {:?} {child:?} {finished:?} {:?}",
2289                    delay,
2290                    result.map(|it| it.0)
2291                );
2292                return Err(consistency_rusqlite("invalid row in t_join_set_response"));
2293            }
2294        };
2295        Ok(ResponseWithCursor {
2296            cursor: id,
2297            event: JoinSetResponseEventOuter {
2298                event: JoinSetResponseEvent { join_set_id, event },
2299                created_at,
2300            },
2301        })
2302    }
2303
2304    fn nth_response(
2305        tx: &Transaction,
2306        execution_id: &ExecutionId,
2307        join_set_id: &JoinSetId,
2308        skip_rows: u64,
2309    ) -> Result<Option<ResponseWithCursor>, DbErrorRead> {
2310        // TODO: Add test
2311        tx
2312            .prepare(
2313                "SELECT r.id, r.created_at, r.join_set_id, \
2314                    r.delay_id, \
2315                    r.child_execution_id, r.finished_version, l.json_value \
2316                    FROM t_join_set_response r LEFT OUTER JOIN t_execution_log l ON r.child_execution_id = l.execution_id \
2317                    WHERE \
2318                    r.execution_id = :execution_id AND r.join_set_id = :join_set_id AND \
2319                    (
2320                    r.finished_version = l.version \
2321                    OR \
2322                    r.child_execution_id IS NULL \
2323                    ) \
2324                    ORDER BY id \
2325                    LIMIT 1 OFFSET :offset",
2326            )
2327            ?
2328            .query_row(
2329                named_params! {
2330                    ":execution_id": execution_id.to_string(),
2331                    ":join_set_id": join_set_id.to_string(),
2332                    ":offset": skip_rows,
2333                },
2334                Self::parse_response_with_cursor,
2335            )
2336            .optional()
2337            .map_err(DbErrorRead::from)
2338    }
2339
2340    // TODO(perf): Instead of OFFSET an per-execution sequential ID could improve the read performance.
2341    #[instrument(level = Level::TRACE, skip_all)]
2342    fn get_responses_with_offset(
2343        tx: &Transaction,
2344        execution_id: &ExecutionId,
2345        skip_rows: usize,
2346    ) -> Result<Vec<JoinSetResponseEventOuter>, DbErrorRead> {
2347        // TODO: Add test
2348        tx.prepare(
2349            "SELECT r.id, r.created_at, r.join_set_id, \
2350            r.delay_id, \
2351            r.child_execution_id, r.finished_version, l.json_value \
2352            FROM t_join_set_response r LEFT OUTER JOIN t_execution_log l ON r.child_execution_id = l.execution_id \
2353            WHERE \
2354            r.execution_id = :execution_id AND \
2355            ( \
2356            r.finished_version = l.version \
2357            OR r.child_execution_id IS NULL \
2358            ) \
2359            ORDER BY id \
2360            LIMIT -1 OFFSET :offset",
2361        )
2362        ?
2363        .query_map(
2364            named_params! {
2365                ":execution_id": execution_id.to_string(),
2366                ":offset": skip_rows,
2367            },
2368            Self::parse_response_with_cursor,
2369        )
2370        ?
2371        .collect::<Result<Vec<_>, _>>()
2372        .map(|resp| resp.into_iter().map(|vec| vec.event).collect())
2373        .map_err(DbErrorRead::from)
2374    }
2375
2376    fn get_pending_of_single_ffqn(
2377        mut stmt: CachedStatement,
2378        batch_size: usize,
2379        pending_at_or_sooner: DateTime<Utc>,
2380        ffqn: &FunctionFqn,
2381    ) -> Result<Vec<(ExecutionId, Version)>, ()> {
2382        stmt.query_map(
2383            named_params! {
2384                ":pending_expires_finished": pending_at_or_sooner,
2385                ":ffqn": ffqn.to_string(),
2386                ":batch_size": batch_size,
2387            },
2388            |row| {
2389                let execution_id = row.get::<_, String>("execution_id")?.parse::<ExecutionId>();
2390                let next_version =
2391                    Version::new(row.get::<_, VersionType>("corresponding_version")?).increment();
2392                Ok(execution_id.map(|exe| (exe, next_version)))
2393            },
2394        )
2395        .map_err(|err| {
2396            warn!("Ignoring consistency error {err:?}");
2397        })?
2398        .collect::<Result<Vec<_>, _>>()
2399        .map_err(|err| {
2400            warn!("Ignoring consistency error {err:?}");
2401        })?
2402        .into_iter()
2403        .collect::<Result<Vec<_>, _>>()
2404        .map_err(|err| {
2405            warn!("Ignoring consistency error {err:?}");
2406        })
2407    }
2408
2409    /// Get executions and their next versions
2410    fn get_pending(
2411        conn: &Connection,
2412        batch_size: usize,
2413        pending_at_or_sooner: DateTime<Utc>,
2414        ffqns: &[FunctionFqn],
2415    ) -> Result<Vec<(ExecutionId, Version)>, DbErrorGeneric> {
2416        let mut execution_ids_versions = Vec::with_capacity(batch_size);
2417        for ffqn in ffqns {
2418            // Select executions in PendingAt.
2419            let stmt = conn
2420                .prepare_cached(&format!(
2421                    r#"
2422                    SELECT execution_id, corresponding_version FROM t_state WHERE
2423                    state = "{STATE_PENDING_AT}" AND
2424                    pending_expires_finished <= :pending_expires_finished AND ffqn = :ffqn
2425                    ORDER BY pending_expires_finished LIMIT :batch_size
2426                    "#
2427                ))
2428                .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
2429
2430            if let Ok(execs_and_versions) = Self::get_pending_of_single_ffqn(
2431                stmt,
2432                batch_size - execution_ids_versions.len(),
2433                pending_at_or_sooner,
2434                ffqn,
2435            ) {
2436                execution_ids_versions.extend(execs_and_versions);
2437                if execution_ids_versions.len() == batch_size {
2438                    // Prioritieze lowering of db requests, although ffqns later in the list might get starved.
2439                    break;
2440                }
2441                // consistency errors are ignored since we want to return at least some rows.
2442            }
2443        }
2444        Ok(execution_ids_versions)
2445    }
2446
2447    // Must be called after write transaction for a correct happens-before relationship.
2448    #[instrument(level = Level::TRACE, skip_all)]
2449    fn notify_all(&self, notifiers: Vec<AppendNotifier>, current_time: DateTime<Utc>) {
2450        let (pending_ats, finished_execs, responses) = {
2451            let (mut pending_ats, mut finished_execs, mut responses) =
2452                (Vec::new(), Vec::new(), Vec::new());
2453            for notifier in notifiers {
2454                if let Some(pending_at) = notifier.pending_at {
2455                    pending_ats.push(pending_at);
2456                }
2457                if let Some(finished) = notifier.execution_finished {
2458                    finished_execs.push(finished);
2459                }
2460                if let Some(response) = notifier.response {
2461                    responses.push(response);
2462                }
2463            }
2464            (pending_ats, finished_execs, responses)
2465        };
2466
2467        // Notify pending_at subscribers.
2468        if !pending_ats.is_empty() {
2469            let guard = self.0.pending_ffqn_subscribers.lock().unwrap();
2470            for pending_at in pending_ats {
2471                Self::notify_pending_locked(&pending_at, current_time, &guard);
2472            }
2473        }
2474        // Notify execution finished subscribers.
2475        // Every NotifierExecutionFinished value belongs to a different execution, since only `append(Finished)` can produce `NotifierExecutionFinished`.
2476        if !finished_execs.is_empty() {
2477            let mut guard = self.0.execution_finished_subscribers.lock().unwrap();
2478            for finished in finished_execs {
2479                if let Some(listeners_of_exe_id) = guard.remove(&finished.execution_id) {
2480                    for (_tag, sender) in listeners_of_exe_id {
2481                        // Sending while holding the lock but the oneshot sender does not block.
2482                        // If `wait_for_finished_result` happens after the append, it would receive the finished value instead.
2483                        let _ = sender.send(finished.retval.clone());
2484                    }
2485                }
2486            }
2487        }
2488        // Notify response subscribers.
2489        if !responses.is_empty() {
2490            let mut guard = self.0.response_subscribers.lock().unwrap();
2491            for (execution_id, response) in responses {
2492                if let Some((sender, _)) = guard.remove(&execution_id) {
2493                    let _ = sender.send(response);
2494                }
2495            }
2496        }
2497    }
2498
2499    fn notify_pending_locked(
2500        notifier: &NotifierPendingAt,
2501        current_time: DateTime<Utc>,
2502        ffqn_to_pending_subscription: &std::sync::MutexGuard<
2503            HashMap<FunctionFqn, (mpsc::Sender<()>, u64)>,
2504        >,
2505    ) {
2506        // No need to remove here, cleanup is handled by the caller.
2507        if notifier.scheduled_at <= current_time
2508            && let Some((subscription, _)) = ffqn_to_pending_subscription.get(&notifier.ffqn)
2509        {
2510            debug!("Notifying pending subscriber");
2511            // Does not block
2512            let _ = subscription.try_send(());
2513        }
2514    }
2515}
2516
2517#[async_trait]
2518impl DbExecutor for SqlitePool {
2519    #[instrument(level = Level::TRACE, skip(self))]
2520    async fn lock_pending(
2521        &self,
2522        batch_size: usize,
2523        pending_at_or_sooner: DateTime<Utc>,
2524        ffqns: Arc<[FunctionFqn]>,
2525        created_at: DateTime<Utc>,
2526        component_id: ComponentId,
2527        executor_id: ExecutorId,
2528        lock_expires_at: DateTime<Utc>,
2529        run_id: RunId,
2530        retry_config: ComponentRetryConfig,
2531    ) -> Result<LockPendingResponse, DbErrorGeneric> {
2532        let execution_ids_versions = self
2533            .transaction(
2534                move |conn| Self::get_pending(conn, batch_size, pending_at_or_sooner, &ffqns),
2535                "get_pending",
2536            )
2537            .await?;
2538        if execution_ids_versions.is_empty() {
2539            Ok(vec![])
2540        } else {
2541            debug!("Locking {execution_ids_versions:?}");
2542            self.transaction(
2543                move |tx| {
2544                    let mut locked_execs = Vec::with_capacity(execution_ids_versions.len());
2545                    // Append lock
2546                    for (execution_id, version) in &execution_ids_versions {
2547                        match Self::lock_single_execution(
2548                            tx,
2549                            created_at,
2550                            &component_id,
2551                            execution_id,
2552                            run_id,
2553                            version,
2554                            executor_id,
2555                            lock_expires_at,
2556                            retry_config,
2557                        ) {
2558                            Ok(locked) => locked_execs.push(locked),
2559                            Err(err) => {
2560                                warn!("Locking row {execution_id} failed - {err:?}");
2561                            }
2562                        }
2563                    }
2564                    Ok(locked_execs)
2565                },
2566                "lock_pending",
2567            )
2568            .await
2569        }
2570    }
2571
2572    #[instrument(level = Level::DEBUG, skip(self))]
2573    async fn lock_one(
2574        &self,
2575        created_at: DateTime<Utc>,
2576        component_id: ComponentId,
2577        execution_id: &ExecutionId,
2578        run_id: RunId,
2579        version: Version,
2580        executor_id: ExecutorId,
2581        lock_expires_at: DateTime<Utc>,
2582        retry_config: ComponentRetryConfig,
2583    ) -> Result<LockedExecution, DbErrorWrite> {
2584        debug!(%execution_id, "lock_one");
2585        let execution_id = execution_id.clone();
2586        self.transaction(
2587            move |tx| {
2588                Self::lock_single_execution(
2589                    tx,
2590                    created_at,
2591                    &component_id,
2592                    &execution_id,
2593                    run_id,
2594                    &version,
2595                    executor_id,
2596                    lock_expires_at,
2597                    retry_config,
2598                )
2599            },
2600            "lock_inner",
2601        )
2602        .await
2603    }
2604
2605    #[instrument(level = Level::DEBUG, skip(self, req))]
2606    async fn append(
2607        &self,
2608        execution_id: ExecutionId,
2609        version: Version,
2610        req: AppendRequest,
2611    ) -> Result<AppendResponse, DbErrorWrite> {
2612        debug!(%req, "append");
2613        trace!(?req, "append");
2614        let created_at = req.created_at;
2615        let (version, notifier) = self
2616            .transaction(
2617                move |tx| Self::append(tx, &execution_id, req.clone(), version.clone()),
2618                "append",
2619            )
2620            .await?;
2621        self.notify_all(vec![notifier], created_at);
2622        Ok(version)
2623    }
2624
2625    #[instrument(level = Level::DEBUG, skip_all)]
2626    async fn append_batch_respond_to_parent(
2627        &self,
2628        events: AppendEventsToExecution,
2629        response: AppendResponseToExecution,
2630        current_time: DateTime<Utc>,
2631    ) -> Result<AppendBatchResponse, DbErrorWrite> {
2632        debug!("append_batch_respond_to_parent");
2633        if events.execution_id == response.parent_execution_id {
2634            // Pending state would be wrong.
2635            // This is not a panic because it depends on DB state.
2636            return Err(DbErrorWrite::Permanent(
2637                DbErrorWritePermanent::ValidationFailed(
2638                    "Parameters `execution_id` and `parent_execution_id` cannot be the same".into(),
2639                ),
2640            ));
2641        }
2642        if events.batch.is_empty() {
2643            error!("Batch cannot be empty");
2644            return Err(DbErrorWrite::Permanent(
2645                DbErrorWritePermanent::ValidationFailed("batch cannot be empty".into()),
2646            ));
2647        }
2648        let (version, notifiers) = {
2649            self.transaction(
2650                move |tx| {
2651                    let mut version = events.version.clone();
2652                    let mut notifier_of_child = None;
2653                    for append_request in &events.batch {
2654                        let (v, n) = Self::append(
2655                            tx,
2656                            &events.execution_id,
2657                            append_request.clone(),
2658                            version,
2659                        )?;
2660                        version = v;
2661                        notifier_of_child = Some(n);
2662                    }
2663
2664                    let pending_at_parent = Self::append_response(
2665                        tx,
2666                        &response.parent_execution_id,
2667                        response.parent_response_event.clone(),
2668                    )?;
2669                    Ok::<_, DbErrorWrite>((
2670                        version,
2671                        vec![
2672                            notifier_of_child.expect("checked that the batch is not empty"),
2673                            pending_at_parent,
2674                        ],
2675                    ))
2676                },
2677                "append_batch_respond_to_parent",
2678            )
2679            .await?
2680        };
2681        self.notify_all(notifiers, current_time);
2682        Ok(version)
2683    }
2684
2685    // Supports only one subscriber per ffqn.
2686    // A new subscriber replaces the old one, which will eventually time out, which is fine.
2687    #[instrument(level = Level::TRACE, skip(self, timeout_fut))]
2688    async fn wait_for_pending(
2689        &self,
2690        pending_at_or_sooner: DateTime<Utc>,
2691        ffqns: Arc<[FunctionFqn]>,
2692        timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
2693    ) {
2694        let unique_tag: u64 = rand::random();
2695        let (sender, mut receiver) = mpsc::channel(1); // senders must use `try_send`
2696        {
2697            let mut ffqn_to_pending_subscription = self.0.pending_ffqn_subscribers.lock().unwrap();
2698            for ffqn in ffqns.as_ref() {
2699                ffqn_to_pending_subscription.insert(ffqn.clone(), (sender.clone(), unique_tag));
2700            }
2701        }
2702        async {
2703            let Ok(execution_ids_versions) = self
2704                .transaction(
2705                    {
2706                        let ffqns = ffqns.clone();
2707                        move |conn| Self::get_pending(conn, 1, pending_at_or_sooner, ffqns.as_ref())
2708                    },
2709                    "subscribe_to_pending",
2710                )
2711                .await
2712            else {
2713                trace!(
2714                    "Ignoring get_pending error and waiting in for timeout to avoid executor repolling too soon"
2715                );
2716                timeout_fut.await;
2717                return;
2718            };
2719            if !execution_ids_versions.is_empty() {
2720                trace!("Not waiting, database already contains new pending executions");
2721                return;
2722            }
2723            tokio::select! { // future's liveness: Dropping the loser immediately.
2724                _ = receiver.recv() => {
2725                    trace!("Received a notification");
2726                }
2727                () = timeout_fut => {
2728                }
2729            }
2730        }.await;
2731        // Clean up ffqn_to_pending_subscription in any case
2732        {
2733            let mut ffqn_to_pending_subscription = self.0.pending_ffqn_subscribers.lock().unwrap();
2734            for ffqn in ffqns.as_ref() {
2735                match ffqn_to_pending_subscription.remove(ffqn) {
2736                    Some((_, tag)) if tag == unique_tag => {
2737                        // Cleanup OK.
2738                    }
2739                    Some(other) => {
2740                        // Reinsert foreign sender.
2741                        ffqn_to_pending_subscription.insert(ffqn.clone(), other);
2742                    }
2743                    None => {
2744                        // Value was replaced and cleaned up already.
2745                    }
2746                }
2747            }
2748        }
2749    }
2750}
2751
2752#[async_trait]
2753impl DbConnection for SqlitePool {
2754    #[instrument(level = Level::DEBUG, skip_all, fields(execution_id = %req.execution_id))]
2755    async fn create(&self, req: CreateRequest) -> Result<AppendResponse, DbErrorWrite> {
2756        debug!("create");
2757        trace!(?req, "create");
2758        let created_at = req.created_at;
2759        let (version, notifier) = self
2760            .transaction(move |tx| Self::create_inner(tx, req.clone()), "create")
2761            .await?;
2762        self.notify_all(vec![notifier], created_at);
2763        Ok(version)
2764    }
2765
2766    #[instrument(level = Level::DEBUG, skip(self, batch))]
2767    async fn append_batch(
2768        &self,
2769        current_time: DateTime<Utc>,
2770        batch: Vec<AppendRequest>,
2771        execution_id: ExecutionId,
2772        version: Version,
2773    ) -> Result<AppendBatchResponse, DbErrorWrite> {
2774        debug!("append_batch");
2775        trace!(?batch, "append_batch");
2776        assert!(!batch.is_empty(), "Empty batch request");
2777
2778        let (version, notifier) = self
2779            .transaction(
2780                move |tx| {
2781                    let mut version = version.clone();
2782                    let mut notifier = None;
2783                    for append_request in &batch {
2784                        let (v, n) =
2785                            Self::append(tx, &execution_id, append_request.clone(), version)?;
2786                        version = v;
2787                        notifier = Some(n);
2788                    }
2789                    Ok::<_, DbErrorWrite>((
2790                        version,
2791                        notifier.expect("checked that the batch is not empty"),
2792                    ))
2793                },
2794                "append_batch",
2795            )
2796            .await?;
2797
2798        self.notify_all(vec![notifier], current_time);
2799        Ok(version)
2800    }
2801
2802    #[instrument(level = Level::DEBUG, skip(self, batch, child_req))]
2803    async fn append_batch_create_new_execution(
2804        &self,
2805        current_time: DateTime<Utc>,
2806        batch: Vec<AppendRequest>,
2807        execution_id: ExecutionId,
2808        version: Version,
2809        child_req: Vec<CreateRequest>,
2810    ) -> Result<AppendBatchResponse, DbErrorWrite> {
2811        debug!("append_batch_create_new_execution");
2812        trace!(?batch, ?child_req, "append_batch_create_new_execution");
2813        assert!(!batch.is_empty(), "Empty batch request");
2814
2815        let (version, notifiers) = self
2816            .transaction(
2817                move |tx| {
2818                    let mut notifier = None;
2819                    let mut version = version.clone();
2820                    for append_request in &batch {
2821                        let (v, n) =
2822                            Self::append(tx, &execution_id, append_request.clone(), version)?;
2823                        version = v;
2824                        notifier = Some(n);
2825                    }
2826                    let mut notifiers = Vec::new();
2827                    notifiers.push(notifier.expect("checked that the batch is not empty"));
2828
2829                    for child_req in &child_req {
2830                        let (_, notifier) = Self::create_inner(tx, child_req.clone())?;
2831                        notifiers.push(notifier);
2832                    }
2833                    Ok::<_, DbErrorWrite>((version, notifiers))
2834                },
2835                "append_batch_create_new_execution_inner",
2836            )
2837            .await?;
2838        self.notify_all(notifiers, current_time);
2839        Ok(version)
2840    }
2841
2842    #[cfg(feature = "test")]
2843    #[instrument(level = Level::DEBUG, skip(self))]
2844    async fn get(
2845        &self,
2846        execution_id: &ExecutionId,
2847    ) -> Result<concepts::storage::ExecutionLog, DbErrorRead> {
2848        trace!("get");
2849        let execution_id = execution_id.clone();
2850        self.transaction(move |tx| Self::get(tx, &execution_id), "get")
2851            .await
2852    }
2853
2854    #[instrument(level = Level::DEBUG, skip(self))]
2855    async fn list_execution_events(
2856        &self,
2857        execution_id: &ExecutionId,
2858        since: &Version,
2859        max_length: VersionType,
2860        include_backtrace_id: bool,
2861    ) -> Result<Vec<ExecutionEvent>, DbErrorRead> {
2862        let execution_id = execution_id.clone();
2863        let since = since.0;
2864        self.transaction(
2865            move |tx| {
2866                Self::list_execution_events(
2867                    tx,
2868                    &execution_id,
2869                    since,
2870                    since + max_length,
2871                    include_backtrace_id,
2872                )
2873            },
2874            "get",
2875        )
2876        .await
2877    }
2878
2879    // Supports only one subscriber per execution id.
2880    // A new call will overwrite the old subscriber, the old one will end
2881    // with a timeout, which is fine.
2882    #[instrument(level = Level::DEBUG, skip(self, timeout_fut))]
2883    async fn subscribe_to_next_responses(
2884        &self,
2885        execution_id: &ExecutionId,
2886        start_idx: usize,
2887        timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
2888    ) -> Result<Vec<JoinSetResponseEventOuter>, DbErrorReadWithTimeout> {
2889        debug!("next_responses");
2890        let unique_tag: u64 = rand::random();
2891        let execution_id = execution_id.clone();
2892
2893        let cleanup = || {
2894            let mut guard = self.0.response_subscribers.lock().unwrap();
2895            match guard.remove(&execution_id) {
2896                Some((_, tag)) if tag == unique_tag => {} // Cleanup OK.
2897                Some(other) => {
2898                    // Reinsert foreign sender.
2899                    guard.insert(execution_id.clone(), other);
2900                }
2901                None => {} // Value was replaced and cleaned up already, or notification was sent.
2902            }
2903        };
2904
2905        let response_subscribers = self.0.response_subscribers.clone();
2906        let resp_or_receiver = {
2907            let execution_id = execution_id.clone();
2908            self.transaction(
2909                move |tx| {
2910                    let responses = Self::get_responses_with_offset(tx, &execution_id, start_idx)?;
2911                    if responses.is_empty() {
2912                        // cannot race as we have the transaction write lock
2913                        let (sender, receiver) = oneshot::channel();
2914                        response_subscribers
2915                            .lock()
2916                            .unwrap()
2917                            .insert(execution_id.clone(), (sender, unique_tag));
2918                        Ok::<_, DbErrorReadWithTimeout>(itertools::Either::Right(receiver))
2919                    } else {
2920                        Ok(itertools::Either::Left(responses))
2921                    }
2922                },
2923                "subscribe_to_next_responses",
2924            )
2925            .await
2926        }
2927        .inspect_err(|_| {
2928            cleanup();
2929        })?;
2930        match resp_or_receiver {
2931            itertools::Either::Left(resp) => Ok(resp), // no need for cleanup
2932            itertools::Either::Right(receiver) => {
2933                let res = async move {
2934                    tokio::select! {
2935                        resp = receiver => {
2936                            let resp = resp.map_err(|_| DbErrorGeneric::Close)?;
2937                            Ok(vec![resp])
2938                        }
2939                        () = timeout_fut => Err(DbErrorReadWithTimeout::Timeout),
2940                    }
2941                }
2942                .await;
2943                cleanup();
2944                res
2945            }
2946        }
2947    }
2948
2949    // Supports multiple subscribers.
2950    async fn wait_for_finished_result(
2951        &self,
2952        execution_id: &ExecutionId,
2953        timeout_fut: Option<Pin<Box<dyn Future<Output = ()> + Send>>>,
2954    ) -> Result<SupportedFunctionReturnValue, DbErrorReadWithTimeout> {
2955        let unique_tag: u64 = rand::random();
2956        let execution_id = execution_id.clone();
2957        let execution_finished_subscription = self.0.execution_finished_subscribers.clone();
2958
2959        let cleanup = || {
2960            let mut guard = self.0.execution_finished_subscribers.lock().unwrap();
2961            if let Some(subscribers) = guard.get_mut(&execution_id) {
2962                subscribers.remove(&unique_tag);
2963            }
2964        };
2965
2966        let resp_or_receiver = {
2967            let execution_id = execution_id.clone();
2968            self.transaction(move |tx| {
2969                let pending_state =
2970                    Self::get_combined_state(tx, &execution_id)?.pending_state;
2971                if let PendingState::Finished { finished } = pending_state {
2972                    let event =
2973                        Self::get_execution_event(tx, &execution_id, finished.version)?;
2974                    if let ExecutionEventInner::Finished { result, ..} = event.event {
2975                        Ok(itertools::Either::Left(result))
2976                    } else {
2977                        error!("Mismatch, expected Finished row: {event:?} based on t_state {finished}");
2978                        Err(DbErrorReadWithTimeout::from(consistency_db_err(
2979                            "cannot get finished event based on t_state version"
2980                        )))
2981                    }
2982                } else {
2983                    // Cannot race with the notifier as we have the transaction write lock:
2984                    // Either the finished event was appended previously, thus `itertools::Either::Left` was selected,
2985                    // or we end up here. If this tx fails, the cleanup will remove this entry.
2986                    let (sender, receiver) = oneshot::channel();
2987                    let mut guard = execution_finished_subscription.lock().unwrap();
2988                    guard.entry(execution_id.clone()).or_default().insert(unique_tag, sender);
2989                    Ok(itertools::Either::Right(receiver))
2990                }
2991            }, "wait_for_finished_result")
2992            .await
2993        }
2994        .inspect_err(|_| {
2995            // This cleanup can race with the notification sender, since both are running after a transaction was finished.
2996            // If the notification sender wins, it removes our oneshot sender and puts a value in it, cleanup will not find the unique tag.
2997            // If cleanup wins, it simply removes the oneshot sender.
2998            cleanup();
2999        })?;
3000
3001        let timeout_fut = timeout_fut.unwrap_or_else(|| Box::pin(std::future::pending()));
3002        match resp_or_receiver {
3003            itertools::Either::Left(resp) => Ok(resp), // no need for cleanup
3004            itertools::Either::Right(receiver) => {
3005                let res = async move {
3006                    tokio::select! {
3007                        resp = receiver => {
3008                            Ok(resp.expect("the notifier sends to all listeners, cannot race with cleanup"))
3009                        }
3010                        () = timeout_fut => Err(DbErrorReadWithTimeout::Timeout),
3011                    }
3012                }
3013                .await;
3014                cleanup();
3015                res
3016            }
3017        }
3018    }
3019
3020    #[instrument(level = Level::DEBUG, skip(self, response_event), fields(join_set_id = %response_event.join_set_id))]
3021    async fn append_response(
3022        &self,
3023        created_at: DateTime<Utc>,
3024        execution_id: ExecutionId,
3025        response_event: JoinSetResponseEvent,
3026    ) -> Result<(), DbErrorWrite> {
3027        debug!("append_response");
3028        let event = JoinSetResponseEventOuter {
3029            created_at,
3030            event: response_event,
3031        };
3032        let notifier = self
3033            .transaction(
3034                move |tx| Self::append_response(tx, &execution_id, event.clone()),
3035                "append_response",
3036            )
3037            .await?;
3038        self.notify_all(vec![notifier], created_at);
3039        Ok(())
3040    }
3041
3042    #[instrument(level = Level::DEBUG, skip_all)]
3043    async fn append_backtrace(&self, append: BacktraceInfo) -> Result<(), DbErrorWrite> {
3044        debug!("append_backtrace");
3045        self.transaction(
3046            move |tx| Self::append_backtrace(tx, &append),
3047            "append_backtrace",
3048        )
3049        .await
3050    }
3051
3052    #[instrument(level = Level::DEBUG, skip_all)]
3053    async fn append_backtrace_batch(&self, batch: Vec<BacktraceInfo>) -> Result<(), DbErrorWrite> {
3054        debug!("append_backtrace_batch");
3055        self.transaction(
3056            move |tx| {
3057                for append in &batch {
3058                    Self::append_backtrace(tx, append)?;
3059                }
3060                Ok(())
3061            },
3062            "append_backtrace",
3063        )
3064        .await
3065    }
3066
3067    #[instrument(level = Level::DEBUG, skip_all)]
3068    async fn get_backtrace(
3069        &self,
3070        execution_id: &ExecutionId,
3071        filter: BacktraceFilter,
3072    ) -> Result<BacktraceInfo, DbErrorRead> {
3073        debug!("get_last_backtrace");
3074        let execution_id = execution_id.clone();
3075
3076        self.transaction(
3077            move |tx| {
3078                let select = "SELECT component_id, version_min_including, version_max_excluding, wasm_backtrace FROM t_backtrace \
3079                                WHERE execution_id = :execution_id";
3080                let mut params: Vec<(&'static str, Box<dyn rusqlite::ToSql>)> = vec![(":execution_id", Box::new(execution_id.to_string()))];
3081                let select = match &filter {
3082                    BacktraceFilter::Specific(version) =>{
3083                        params.push((":version", Box::new(version.0)));
3084                        format!("{select} AND version_min_including <= :version AND version_max_excluding > :version")
3085                    },
3086                    BacktraceFilter::First => format!("{select} ORDER BY version_min_including LIMIT 1"),
3087                    BacktraceFilter::Last => format!("{select} ORDER BY version_min_including DESC LIMIT 1")
3088                };
3089                tx
3090                    .prepare(&select)
3091                    ?
3092                    .query_row::<_, &[(&'static str, &dyn ToSql)], _>(
3093                        params
3094                            .iter()
3095                            .map(|(key, value)| (*key, value.as_ref()))
3096                            .collect::<Vec<_>>()
3097                            .as_ref(),
3098                    |row| {
3099                        Ok(BacktraceInfo {
3100                            execution_id: execution_id.clone(),
3101                            component_id: row.get::<_, FromStrWrapper<_> >("component_id")?.0,
3102                            version_min_including: Version::new(row.get::<_, VersionType>("version_min_including")?),
3103                            version_max_excluding: Version::new(row.get::<_, VersionType>("version_max_excluding")?),
3104                            wasm_backtrace: row.get::<_, JsonWrapper<_>>("wasm_backtrace")?.0,
3105                        })
3106                    },
3107                ).map_err(DbErrorRead::from)
3108            },
3109            "get_last_backtrace",
3110        ).await
3111    }
3112
3113    /// Get currently expired delays and locks.
3114    #[instrument(level = Level::TRACE, skip(self))]
3115    async fn get_expired_timers(
3116        &self,
3117        at: DateTime<Utc>,
3118    ) -> Result<Vec<ExpiredTimer>, DbErrorGeneric> {
3119        self.transaction(
3120            move |conn| {
3121                let mut expired_timers = conn.prepare(
3122                    "SELECT execution_id, join_set_id, delay_id FROM t_delay WHERE expires_at <= :at",
3123                )?
3124                .query_map(
3125                        named_params! {
3126                            ":at": at,
3127                        },
3128                        |row| {
3129                            let execution_id = row.get("execution_id")?;
3130                            let join_set_id = row.get::<_, JoinSetId>("join_set_id")?;
3131                            let delay_id = row.get::<_, DelayId>("delay_id")?;
3132                            let delay = ExpiredDelay { execution_id, join_set_id, delay_id };
3133                            Ok(ExpiredTimer::Delay(delay))
3134                        },
3135                    )?
3136                    .collect::<Result<Vec<_>, _>>()?;
3137                // Extend with expired locks
3138                let expired = conn.prepare(&format!(r#"
3139                    SELECT execution_id, last_lock_version, corresponding_version, intermittent_event_count, max_retries, retry_exp_backoff_millis
3140                    FROM t_state
3141                    WHERE pending_expires_finished <= :at AND state = "{STATE_LOCKED}"
3142                    "#
3143                )
3144                )?
3145                .query_map(
3146                        named_params! {
3147                            ":at": at,
3148                        },
3149                        |row| {
3150                            let execution_id = row.get("execution_id")?;
3151                            let locked_at_version = Version::new(row.get("last_lock_version")?);
3152                            let next_version = Version::new(row.get("corresponding_version")?).increment();
3153                            let intermittent_event_count = row.get("intermittent_event_count")?;
3154                            let max_retries = row.get("max_retries")?;
3155                            let retry_exp_backoff_millis = row.get("retry_exp_backoff_millis")?;
3156                            let lock = ExpiredLock {
3157                                execution_id,
3158                                locked_at_version,
3159                                next_version,
3160                                intermittent_event_count,
3161                                max_retries,
3162                                retry_exp_backoff: Duration::from_millis(retry_exp_backoff_millis),
3163                            };
3164                            Ok(ExpiredTimer::Lock(lock))
3165                        }
3166                    )?
3167                    .collect::<Result<Vec<_>, _>>()?;
3168                expired_timers.extend(expired);
3169                if !expired_timers.is_empty() {
3170                    debug!("get_expired_timers found {expired_timers:?}");
3171                }
3172                Ok(expired_timers)
3173            }, "get_expired_timers"
3174        )
3175        .await
3176    }
3177
3178    async fn get_execution_event(
3179        &self,
3180        execution_id: &ExecutionId,
3181        version: &Version,
3182    ) -> Result<ExecutionEvent, DbErrorRead> {
3183        let version = version.0;
3184        let execution_id = execution_id.clone();
3185        self.transaction(
3186            move |tx| Self::get_execution_event(tx, &execution_id, version),
3187            "get_execution_event",
3188        )
3189        .await
3190    }
3191
3192    async fn get_pending_state(
3193        &self,
3194        execution_id: &ExecutionId,
3195    ) -> Result<PendingState, DbErrorRead> {
3196        let execution_id = execution_id.clone();
3197        Ok(self
3198            .transaction(
3199                move |tx| Self::get_combined_state(tx, &execution_id),
3200                "get_pending_state",
3201            )
3202            .await?
3203            .pending_state)
3204    }
3205
3206    async fn list_executions(
3207        &self,
3208        ffqn: Option<FunctionFqn>,
3209        top_level_only: bool,
3210        pagination: ExecutionListPagination,
3211    ) -> Result<Vec<ExecutionWithState>, DbErrorGeneric> {
3212        self.transaction(
3213            move |tx| Self::list_executions(tx, ffqn.as_ref(), top_level_only, &pagination),
3214            "list_executions",
3215        )
3216        .await
3217    }
3218
3219    async fn list_responses(
3220        &self,
3221        execution_id: &ExecutionId,
3222        pagination: Pagination<u32>,
3223    ) -> Result<Vec<ResponseWithCursor>, DbErrorRead> {
3224        let execution_id = execution_id.clone();
3225        self.transaction(
3226            move |tx| Self::list_responses(tx, &execution_id, Some(pagination)),
3227            "list_executions",
3228        )
3229        .await
3230    }
3231}
3232
3233#[cfg(any(test, feature = "tempfile"))]
3234pub mod tempfile {
3235    use super::{SqliteConfig, SqlitePool};
3236    use tempfile::NamedTempFile;
3237
3238    pub async fn sqlite_pool() -> (SqlitePool, Option<NamedTempFile>) {
3239        if let Ok(path) = std::env::var("SQLITE_FILE") {
3240            (
3241                SqlitePool::new(path, SqliteConfig::default())
3242                    .await
3243                    .unwrap(),
3244                None,
3245            )
3246        } else {
3247            let file = NamedTempFile::new().unwrap();
3248            let path = file.path();
3249            (
3250                SqlitePool::new(path, SqliteConfig::default())
3251                    .await
3252                    .unwrap(),
3253                Some(file),
3254            )
3255        }
3256    }
3257}