obeli_sk_db_sqlite/
sqlite_dao.rs

1use crate::histograms::Histograms;
2use async_trait::async_trait;
3use chrono::{DateTime, Utc};
4use concepts::{
5    ComponentId, ComponentRetryConfig, ExecutionId, FunctionFqn, JoinSetId, StrVariant,
6    SupportedFunctionReturnValue,
7    prefixed_ulid::{DelayId, ExecutionIdDerived, ExecutorId, RunId},
8    storage::{
9        AppendBatchResponse, AppendEventsToExecution, AppendRequest, AppendResponse,
10        AppendResponseToExecution, BacktraceFilter, BacktraceInfo, CreateRequest, DUMMY_CREATED,
11        DUMMY_HISTORY_EVENT, DbConnection, DbErrorGeneric, DbErrorRead, DbErrorReadWithTimeout,
12        DbErrorWrite, DbErrorWriteNonRetriable, DbExecutor, DbPool, DbPoolCloseable,
13        ExecutionEvent, ExecutionEventInner, ExecutionListPagination, ExecutionWithState,
14        ExpiredDelay, ExpiredLock, ExpiredTimer, HistoryEvent, JoinSetRequest, JoinSetResponse,
15        JoinSetResponseEvent, JoinSetResponseEventOuter, LockPendingResponse, Locked, LockedBy,
16        LockedExecution, Pagination, PendingState, PendingStateFinished,
17        PendingStateFinishedResultKind, PendingStateLocked, ResponseWithCursor, Version,
18        VersionType,
19    },
20};
21use conversions::{FromStrWrapper, JsonWrapper, consistency_db_err, consistency_rusqlite};
22use hashbrown::HashMap;
23use rusqlite::{
24    CachedStatement, Connection, ErrorCode, OpenFlags, OptionalExtension, Params, ToSql,
25    Transaction, TransactionBehavior, named_params, types::ToSqlOutput,
26};
27use std::{
28    cmp::max,
29    collections::VecDeque,
30    fmt::Debug,
31    ops::DerefMut,
32    path::Path,
33    str::FromStr,
34    sync::{
35        Arc, Mutex,
36        atomic::{AtomicBool, Ordering},
37    },
38    time::Duration,
39};
40use std::{fmt::Write as _, pin::Pin};
41use tokio::{
42    sync::{mpsc, oneshot},
43    time::Instant,
44};
45use tracing::{Level, debug, error, info, instrument, trace, warn};
46
47#[derive(Debug, thiserror::Error)]
48#[error("initialization error")]
49pub struct InitializationError;
50
51#[derive(Debug, Clone)]
52struct DelayReq {
53    join_set_id: JoinSetId,
54    delay_id: DelayId,
55    expires_at: DateTime<Utc>,
56}
57/*
58mmap_size = 128MB - Set the global memory map so all processes can share some data
59https://www.sqlite.org/pragma.html#pragma_mmap_size
60https://www.sqlite.org/mmap.html
61
62journal_size_limit = 64 MB - limit on the WAL file to prevent unlimited growth
63https://www.sqlite.org/pragma.html#pragma_journal_size_limit
64
65Inspired by https://github.com/rails/rails/pull/49349
66*/
67
68const PRAGMA: [[&str; 2]; 10] = [
69    ["journal_mode", "wal"],
70    ["synchronous", "FULL"],
71    ["foreign_keys", "true"],
72    ["busy_timeout", "1000"],
73    ["cache_size", "10000"], // number of pages
74    ["temp_store", "MEMORY"],
75    ["page_size", "8192"], // 8 KB
76    ["mmap_size", "134217728"],
77    ["journal_size_limit", "67108864"],
78    ["integrity_check", ""],
79];
80
81// Append only
82const CREATE_TABLE_T_METADATA: &str = r"
83CREATE TABLE IF NOT EXISTS t_metadata (
84    id INTEGER PRIMARY KEY AUTOINCREMENT,
85    schema_version INTEGER NOT NULL,
86    created_at TEXT NOT NULL
87) STRICT
88";
89const T_METADATA_EXPECTED_SCHEMA_VERSION: u32 = 2;
90
91/// Stores execution history. Append only.
92const CREATE_TABLE_T_EXECUTION_LOG: &str = r"
93CREATE TABLE IF NOT EXISTS t_execution_log (
94    execution_id TEXT NOT NULL,
95    created_at TEXT NOT NULL,
96    json_value TEXT NOT NULL,
97    version INTEGER NOT NULL,
98    variant TEXT NOT NULL,
99    join_set_id TEXT,
100    history_event_type TEXT GENERATED ALWAYS AS (json_value->>'$.HistoryEvent.event.type') STORED,
101    PRIMARY KEY (execution_id, version)
102) STRICT
103";
104// Used in `fetch_created` and `get_execution_event`
105const CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_VERSION: &str = r"
106CREATE INDEX IF NOT EXISTS idx_t_execution_log_execution_id_version  ON t_execution_log (execution_id, version);
107";
108// Used in `lock_inner` to filter by execution ID and variant (created or event history)
109const CREATE_INDEX_IDX_T_EXECUTION_ID_EXECUTION_ID_VARIANT: &str = r"
110CREATE INDEX IF NOT EXISTS idx_t_execution_log_execution_id_variant  ON t_execution_log (execution_id, variant);
111";
112
113/// Stores child execution return values for the parent (`execution_id`). Append only.
114/// For `JoinSetResponse::DelayFinished`, columns `delay_id`,`delay_success` must not not null.
115/// For `JoinSetResponse::ChildExecutionFinished`, columns `child_execution_id`,`finished_version`
116/// must not be null.
117const CREATE_TABLE_T_JOIN_SET_RESPONSE: &str = r"
118CREATE TABLE IF NOT EXISTS t_join_set_response (
119    id INTEGER PRIMARY KEY AUTOINCREMENT,
120    created_at TEXT NOT NULL,
121    execution_id TEXT NOT NULL,
122    join_set_id TEXT NOT NULL,
123
124    delay_id TEXT,
125    delay_success INTEGER,
126
127    child_execution_id TEXT,
128    finished_version INTEGER,
129
130    UNIQUE (execution_id, join_set_id, delay_id, child_execution_id)
131) STRICT
132";
133// Used when querying for the next response
134const CREATE_INDEX_IDX_T_JOIN_SET_RESPONSE_EXECUTION_ID_ID: &str = r"
135CREATE INDEX IF NOT EXISTS idx_t_join_set_response_execution_id_id ON t_join_set_response (execution_id, id);
136";
137// Child execution id must be unique.
138const CREATE_INDEX_IDX_JOIN_SET_RESPONSE_UNIQUE_CHILD_ID: &str = r"
139CREATE UNIQUE INDEX IF NOT EXISTS idx_join_set_response_unique_child_id
140ON t_join_set_response (child_execution_id) WHERE child_execution_id IS NOT NULL;
141";
142// Delay id must be unique.
143const CREATE_INDEX_IDX_JOIN_SET_RESPONSE_UNIQUE_DELAY_ID: &str = r"
144CREATE UNIQUE INDEX IF NOT EXISTS idx_join_set_response_unique_delay_id
145ON t_join_set_response (delay_id) WHERE delay_id IS NOT NULL;
146";
147
148/// Stores executions in `PendingState`
149/// `state` to column mapping:
150/// `PendingAt`:            (nothing but required columns), optionally contains all `Locked` columns.
151/// `Locked`:               `max_retries`, `retry_exp_backoff_millis`, `last_lock_version`, `executor_id`, `run_id`
152/// `BlockedByJoinSet`:     `join_set_id`, `join_set_closing`
153/// `Finished` :            `result_kind`.
154const CREATE_TABLE_T_STATE: &str = r"
155CREATE TABLE IF NOT EXISTS t_state (
156    execution_id TEXT NOT NULL,
157    is_top_level INTEGER NOT NULL,
158    corresponding_version INTEGER NOT NULL,
159    pending_expires_finished TEXT NOT NULL,
160    ffqn TEXT NOT NULL,
161    state TEXT NOT NULL,
162    created_at TEXT NOT NULL,
163    updated_at TEXT NOT NULL,
164    scheduled_at TEXT NOT NULL,
165    intermittent_event_count INTEGER NOT NULL,
166
167    max_retries INTEGER,
168    retry_exp_backoff_millis INTEGER,
169    last_lock_version INTEGER,
170    executor_id TEXT,
171    run_id TEXT,
172
173    join_set_id TEXT,
174    join_set_closing INTEGER,
175
176    result_kind TEXT,
177
178    PRIMARY KEY (execution_id)
179) STRICT
180";
181const STATE_PENDING_AT: &str = "PendingAt";
182const STATE_BLOCKED_BY_JOIN_SET: &str = "BlockedByJoinSet";
183const STATE_LOCKED: &str = "Locked";
184const STATE_FINISHED: &str = "Finished";
185const HISTORY_EVENT_TYPE_JOIN_NEXT: &str = "JoinNext";
186
187// TODO: partial indexes
188const IDX_T_STATE_LOCK_PENDING: &str = r"
189CREATE INDEX IF NOT EXISTS idx_t_state_lock_pending ON t_state (state, pending_expires_finished, ffqn);
190";
191const IDX_T_STATE_EXPIRED_TIMERS: &str = r"
192CREATE INDEX IF NOT EXISTS idx_t_state_expired_timers ON t_state (pending_expires_finished) WHERE executor_id IS NOT NULL;
193";
194const IDX_T_STATE_EXECUTION_ID_IS_TOP_LEVEL: &str = r"
195CREATE INDEX IF NOT EXISTS idx_t_state_execution_id_is_root ON t_state (execution_id, is_top_level);
196";
197// For `list_executions` by ffqn
198const IDX_T_STATE_FFQN: &str = r"
199CREATE INDEX IF NOT EXISTS idx_t_state_ffqn ON t_state (ffqn);
200";
201// For `list_executions` by creation date
202const IDX_T_STATE_CREATED_AT: &str = r"
203CREATE INDEX IF NOT EXISTS idx_t_state_created_at ON t_state (created_at);
204";
205
206/// Represents [`ExpiredTimer::AsyncDelay`] . Rows are deleted when the delay is processed.
207const CREATE_TABLE_T_DELAY: &str = r"
208CREATE TABLE IF NOT EXISTS t_delay (
209    execution_id TEXT NOT NULL,
210    join_set_id TEXT NOT NULL,
211    delay_id TEXT NOT NULL,
212    expires_at TEXT NOT NULL,
213    PRIMARY KEY (execution_id, join_set_id, delay_id)
214) STRICT
215";
216
217// Append only.
218const CREATE_TABLE_T_BACKTRACE: &str = r"
219CREATE TABLE IF NOT EXISTS t_backtrace (
220    execution_id TEXT NOT NULL,
221    component_id TEXT NOT NULL,
222    version_min_including INTEGER NOT NULL,
223    version_max_excluding INTEGER NOT NULL,
224    wasm_backtrace TEXT NOT NULL,
225    PRIMARY KEY (execution_id, version_min_including, version_max_excluding)
226) STRICT
227";
228// Index for searching backtraces by execution_id and version
229const IDX_T_BACKTRACE_EXECUTION_ID_VERSION: &str = r"
230CREATE INDEX IF NOT EXISTS idx_t_backtrace_execution_id_version ON t_backtrace (execution_id, version_min_including, version_max_excluding);
231";
232
233#[derive(Debug, thiserror::Error, Clone)]
234enum RusqliteError {
235    #[error("not found")]
236    NotFound,
237    #[error("generic: {0}")]
238    Generic(StrVariant),
239}
240
241mod conversions {
242
243    use super::RusqliteError;
244    use concepts::storage::{DbErrorGeneric, DbErrorRead, DbErrorReadWithTimeout, DbErrorWrite};
245    use rusqlite::{
246        ToSql,
247        types::{FromSql, FromSqlError},
248    };
249    use std::{fmt::Debug, str::FromStr};
250    use tracing::error;
251
252    impl From<rusqlite::Error> for RusqliteError {
253        fn from(err: rusqlite::Error) -> Self {
254            if matches!(err, rusqlite::Error::QueryReturnedNoRows) {
255                RusqliteError::NotFound
256            } else {
257                error!(backtrace = %std::backtrace::Backtrace::capture(), "Sqlite error {err:?}");
258                RusqliteError::Generic(err.to_string().into())
259            }
260        }
261    }
262
263    impl From<RusqliteError> for DbErrorGeneric {
264        fn from(err: RusqliteError) -> DbErrorGeneric {
265            match err {
266                RusqliteError::NotFound => DbErrorGeneric::Uncategorized("not found".into()),
267                RusqliteError::Generic(str) => DbErrorGeneric::Uncategorized(str),
268            }
269        }
270    }
271    impl From<RusqliteError> for DbErrorRead {
272        fn from(err: RusqliteError) -> Self {
273            if matches!(err, RusqliteError::NotFound) {
274                Self::NotFound
275            } else {
276                Self::from(DbErrorGeneric::from(err))
277            }
278        }
279    }
280    impl From<RusqliteError> for DbErrorReadWithTimeout {
281        fn from(err: RusqliteError) -> Self {
282            Self::from(DbErrorRead::from(err))
283        }
284    }
285    impl From<RusqliteError> for DbErrorWrite {
286        fn from(err: RusqliteError) -> Self {
287            if matches!(err, RusqliteError::NotFound) {
288                Self::NotFound
289            } else {
290                Self::from(DbErrorGeneric::from(err))
291            }
292        }
293    }
294
295    pub(crate) struct JsonWrapper<T>(pub(crate) T);
296    impl<T: serde::de::DeserializeOwned + 'static + Debug> FromSql for JsonWrapper<T> {
297        fn column_result(
298            value: rusqlite::types::ValueRef<'_>,
299        ) -> rusqlite::types::FromSqlResult<Self> {
300            let value = match value {
301                rusqlite::types::ValueRef::Text(value) | rusqlite::types::ValueRef::Blob(value) => {
302                    Ok(value)
303                }
304                other => {
305                    error!(
306                        backtrace = %std::backtrace::Backtrace::capture(),
307                        "Unexpected type when conveting to JSON - expected Text or Blob, got type `{other:?}`",
308                    );
309                    Err(FromSqlError::InvalidType)
310                }
311            }?;
312            let value = serde_json::from_slice::<T>(value).map_err(|err| {
313                error!(
314                    backtrace = %std::backtrace::Backtrace::capture(),
315                    "Cannot convert JSON value `{value:?}` to type:`{type}` - {err:?}",
316                    r#type = std::any::type_name::<T>()
317                );
318                FromSqlError::InvalidType
319            })?;
320            Ok(Self(value))
321        }
322    }
323    impl<T: serde::ser::Serialize + Debug> ToSql for JsonWrapper<T> {
324        fn to_sql(&self) -> rusqlite::Result<rusqlite::types::ToSqlOutput<'_>> {
325            let string = serde_json::to_string(&self.0).map_err(|err| {
326                error!(
327                    "Cannot serialize {value:?} of type `{type}` - {err:?}",
328                    value = self.0,
329                    r#type = std::any::type_name::<T>()
330                );
331                rusqlite::Error::ToSqlConversionFailure(Box::new(err))
332            })?;
333            Ok(rusqlite::types::ToSqlOutput::Owned(
334                rusqlite::types::Value::Text(string),
335            ))
336        }
337    }
338
339    pub(crate) struct FromStrWrapper<T: FromStr>(pub(crate) T);
340    impl<T: FromStr<Err = D>, D: Debug> FromSql for FromStrWrapper<T> {
341        fn column_result(
342            value: rusqlite::types::ValueRef<'_>,
343        ) -> rusqlite::types::FromSqlResult<Self> {
344            let value = String::column_result(value)?;
345            let value = T::from_str(&value).map_err(|err| {
346                error!(
347                    backtrace = %std::backtrace::Backtrace::capture(),
348                    "Cannot convert string `{value}` to type:`{type}` - {err:?}",
349                    r#type = std::any::type_name::<T>()
350                );
351                FromSqlError::InvalidType
352            })?;
353            Ok(Self(value))
354        }
355    }
356
357    // Used as a wrapper for `FromSqlError::Other`
358    #[derive(Debug, thiserror::Error)]
359    #[error("{0}")]
360    pub(crate) struct OtherError(&'static str);
361    pub(crate) fn consistency_rusqlite(input: &'static str) -> rusqlite::Error {
362        FromSqlError::other(OtherError(input)).into()
363    }
364
365    pub(crate) fn consistency_db_err(input: &'static str) -> DbErrorGeneric {
366        DbErrorGeneric::Uncategorized(input.into())
367    }
368}
369
370#[derive(Debug)]
371struct CombinedStateDTO {
372    state: String,
373    ffqn: String,
374    pending_expires_finished: DateTime<Utc>,
375    // Locked:
376    last_lock_version: Option<Version>,
377    executor_id: Option<ExecutorId>,
378    run_id: Option<RunId>,
379    // Blocked by join set:
380    join_set_id: Option<JoinSetId>,
381    join_set_closing: Option<bool>,
382    // Finished:
383    result_kind: Option<PendingStateFinishedResultKind>,
384}
385#[derive(Debug)]
386struct CombinedState {
387    ffqn: FunctionFqn,
388    pending_state: PendingState,
389    corresponding_version: Version,
390}
391impl CombinedState {
392    fn get_next_version_assert_not_finished(&self) -> Version {
393        assert!(!self.pending_state.is_finished());
394        self.corresponding_version.increment()
395    }
396
397    #[cfg(feature = "test")]
398    fn get_next_version_or_finished(&self) -> Version {
399        if self.pending_state.is_finished() {
400            self.corresponding_version.clone()
401        } else {
402            self.corresponding_version.increment()
403        }
404    }
405}
406
407#[derive(Debug)]
408struct NotifierPendingAt {
409    scheduled_at: DateTime<Utc>,
410    ffqn: FunctionFqn,
411}
412
413#[derive(Debug)]
414struct NotifierExecutionFinished {
415    execution_id: ExecutionId,
416    retval: SupportedFunctionReturnValue,
417}
418
419#[derive(Debug, Default)]
420struct AppendNotifier {
421    pending_at: Option<NotifierPendingAt>,
422    execution_finished: Option<NotifierExecutionFinished>,
423    response: Option<(ExecutionId, JoinSetResponseEventOuter)>,
424}
425
426impl CombinedState {
427    fn new(
428        dto: &CombinedStateDTO,
429        corresponding_version: Version,
430    ) -> Result<Self, rusqlite::Error> {
431        let pending_state = match dto {
432            // Pending - just created
433            CombinedStateDTO {
434                state,
435                ffqn: _,
436                pending_expires_finished: scheduled_at,
437                last_lock_version: None,
438                executor_id: None,
439                run_id: None,
440                join_set_id: None,
441                join_set_closing: None,
442                result_kind: None,
443            } if state == STATE_PENDING_AT => Ok(PendingState::PendingAt {
444                scheduled_at: *scheduled_at,
445                last_lock: None,
446            }),
447            // Pending, previously locked
448            CombinedStateDTO {
449                state,
450                ffqn: _,
451                pending_expires_finished: scheduled_at,
452                last_lock_version: None,
453                executor_id: Some(executor_id),
454                run_id: Some(run_id),
455                join_set_id: None,
456                join_set_closing: None,
457                result_kind: None,
458            } if state == STATE_PENDING_AT => Ok(PendingState::PendingAt {
459                scheduled_at: *scheduled_at,
460                last_lock: Some(LockedBy {
461                    executor_id: *executor_id,
462                    run_id: *run_id,
463                }),
464            }),
465            CombinedStateDTO {
466                state,
467                ffqn: _,
468                pending_expires_finished: lock_expires_at,
469                last_lock_version: Some(_),
470                executor_id: Some(executor_id),
471                run_id: Some(run_id),
472                join_set_id: None,
473                join_set_closing: None,
474                result_kind: None,
475            } if state == STATE_LOCKED => Ok(PendingState::Locked(PendingStateLocked {
476                locked_by: LockedBy {
477                    executor_id: *executor_id,
478                    run_id: *run_id,
479                },
480                lock_expires_at: *lock_expires_at,
481            })),
482            CombinedStateDTO {
483                state,
484                ffqn: _,
485                pending_expires_finished: lock_expires_at,
486                last_lock_version: None,
487                executor_id: _,
488                run_id: _,
489                join_set_id: Some(join_set_id),
490                join_set_closing: Some(join_set_closing),
491                result_kind: None,
492            } if state == STATE_BLOCKED_BY_JOIN_SET => Ok(PendingState::BlockedByJoinSet {
493                join_set_id: join_set_id.clone(),
494                closing: *join_set_closing,
495                lock_expires_at: *lock_expires_at,
496            }),
497            CombinedStateDTO {
498                state,
499                ffqn: _,
500                pending_expires_finished: finished_at,
501                last_lock_version: None,
502                executor_id: None,
503                run_id: None,
504                join_set_id: None,
505                join_set_closing: None,
506                result_kind: Some(result_kind),
507            } if state == STATE_FINISHED => Ok(PendingState::Finished {
508                finished: PendingStateFinished {
509                    finished_at: *finished_at,
510                    version: corresponding_version.0,
511                    result_kind: *result_kind,
512                },
513            }),
514            _ => {
515                error!("Cannot deserialize pending state from  {dto:?}");
516                Err(consistency_rusqlite("invalid `t_state`"))
517            }
518        }?;
519        Ok(Self {
520            ffqn: FunctionFqn::from_str(&dto.ffqn).map_err(|parse_err| {
521                error!("Error parsing ffqn of {dto:?} - {parse_err:?}");
522                consistency_rusqlite("invalid ffqn value in `t_state`")
523            })?,
524            pending_state,
525            corresponding_version,
526        })
527    }
528}
529
530#[derive(derive_more::Debug)]
531struct LogicalTx {
532    #[debug(skip)]
533    func: Box<dyn FnMut(&mut Transaction) + Send>,
534    sent_at: Instant,
535    func_name: &'static str,
536    #[debug(skip)]
537    commit_ack_sender: oneshot::Sender<Result<(), RusqliteError>>,
538}
539
540#[derive(derive_more::Debug)]
541enum ThreadCommand {
542    LogicalTx(LogicalTx),
543    Shutdown,
544}
545
546#[derive(Clone)]
547pub struct SqlitePool(SqlitePoolInner);
548
549type ResponseSubscribers =
550    Arc<Mutex<HashMap<ExecutionId, (oneshot::Sender<JoinSetResponseEventOuter>, u64)>>>;
551type PendingFfqnSubscribers = Arc<Mutex<HashMap<FunctionFqn, (mpsc::Sender<()>, u64)>>>;
552type ExecutionFinishedSubscribers =
553    Mutex<HashMap<ExecutionId, HashMap<u64, oneshot::Sender<SupportedFunctionReturnValue>>>>;
554#[derive(Clone)]
555struct SqlitePoolInner {
556    shutdown_requested: Arc<AtomicBool>,
557    shutdown_finished: Arc<AtomicBool>,
558    command_tx: tokio::sync::mpsc::Sender<ThreadCommand>,
559    response_subscribers: ResponseSubscribers,
560    pending_ffqn_subscribers: PendingFfqnSubscribers,
561    execution_finished_subscribers: Arc<ExecutionFinishedSubscribers>,
562    join_handle: Option<Arc<std::thread::JoinHandle<()>>>, // always Some, Optional for swapping in drop.
563}
564
565#[async_trait]
566impl DbPoolCloseable for SqlitePool {
567    async fn close(self) {
568        debug!("Sqlite is closing");
569        self.0.shutdown_requested.store(true, Ordering::Release);
570        // Unblock the thread's blocking_recv. If the capacity is reached, the next processed message will trigger shutdown.
571        let _ = self.0.command_tx.try_send(ThreadCommand::Shutdown);
572        while !self.0.shutdown_finished.load(Ordering::Acquire) {
573            tokio::time::sleep(Duration::from_millis(1)).await;
574        }
575        debug!("Sqlite was closed");
576    }
577}
578
579#[async_trait]
580impl DbPool for SqlitePool {
581    fn connection(&self) -> Box<dyn DbConnection> {
582        Box::new(self.clone())
583    }
584}
585impl Drop for SqlitePool {
586    fn drop(&mut self) {
587        let arc = self.0.join_handle.take().expect("join_handle was set");
588        if let Ok(join_handle) = Arc::try_unwrap(arc) {
589            // Last holder
590            if !join_handle.is_finished() {
591                if !self.0.shutdown_finished.load(Ordering::Acquire) {
592                    // Best effort to shut down the sqlite thread.
593                    let backtrace = std::backtrace::Backtrace::capture();
594                    warn!("SqlitePool was not closed properly - {backtrace}");
595                    self.0.shutdown_requested.store(true, Ordering::Release);
596                    // Unblock the thread's blocking_recv. If the capacity is reached, the next processed message will trigger shutdown.
597                    let _ = self.0.command_tx.try_send(ThreadCommand::Shutdown);
598                    // Not joining the thread, drop might be called from async context.
599                    // We are shutting down the server anyway.
600                } else {
601                    // The thread set `shutdown_finished` as its last operation.
602                }
603            }
604        }
605    }
606}
607
608#[derive(Debug, Clone)]
609pub struct SqliteConfig {
610    pub queue_capacity: usize,
611    pub pragma_override: Option<HashMap<String, String>>,
612    pub metrics_threshold: Option<Duration>,
613}
614impl Default for SqliteConfig {
615    fn default() -> Self {
616        Self {
617            queue_capacity: 100,
618            pragma_override: None,
619            metrics_threshold: None,
620        }
621    }
622}
623
624impl SqlitePool {
625    fn init_thread(
626        path: &Path,
627        mut pragma_override: HashMap<String, String>,
628    ) -> Result<Connection, InitializationError> {
629        fn conn_execute<P: Params>(
630            conn: &Connection,
631            sql: &str,
632            params: P,
633        ) -> Result<(), InitializationError> {
634            conn.execute(sql, params).map(|_| ()).map_err(|err| {
635                error!("Cannot run `{sql}` - {err:?}");
636                InitializationError
637            })
638        }
639        fn pragma_update(
640            conn: &Connection,
641            name: &str,
642            value: &str,
643        ) -> Result<(), InitializationError> {
644            if value.is_empty() {
645                debug!("Querying PRAGMA {name}");
646                conn.pragma_query(None, name, |row| {
647                    debug!("{row:?}");
648                    Ok(())
649                })
650                .map_err(|err| {
651                    error!("cannot update pragma `{name}`=`{value}` - {err:?}");
652                    InitializationError
653                })
654            } else {
655                debug!("Setting PRAGMA {name}={value}");
656                conn.pragma_update(None, name, value).map_err(|err| {
657                    error!("cannot update pragma `{name}`=`{value}` - {err:?}");
658                    InitializationError
659                })
660            }
661        }
662
663        let conn = Connection::open_with_flags(path, OpenFlags::default()).map_err(|err| {
664            error!("cannot open the connection - {err:?}");
665            InitializationError
666        })?;
667
668        for [pragma_name, default_value] in PRAGMA {
669            let pragma_value = pragma_override
670                .remove(pragma_name)
671                .unwrap_or_else(|| default_value.to_string());
672            pragma_update(&conn, pragma_name, &pragma_value)?;
673        }
674        // drain the rest overrides
675        for (pragma_name, pragma_value) in pragma_override.drain() {
676            pragma_update(&conn, &pragma_name, &pragma_value)?;
677        }
678
679        // t_metadata
680        conn_execute(&conn, CREATE_TABLE_T_METADATA, [])?;
681        // Insert row if not exists.
682        conn_execute(
683            &conn,
684            &format!(
685                "INSERT INTO t_metadata (schema_version, created_at) VALUES
686                    ({T_METADATA_EXPECTED_SCHEMA_VERSION}, ?) ON CONFLICT DO NOTHING"
687            ),
688            [Utc::now()],
689        )?;
690        // Fail on unexpected `schema_version`.
691        let actual_version = conn
692            .prepare("SELECT schema_version FROM t_metadata ORDER BY id DESC LIMIT 1")
693            .map_err(|err| {
694                error!("cannot select schema version - {err:?}");
695                InitializationError
696            })?
697            .query_row([], |row| row.get::<_, u32>("schema_version"));
698
699        let actual_version = actual_version.map_err(|err| {
700            error!("Cannot read the schema version - {err:?}");
701            InitializationError
702        })?;
703        if actual_version != T_METADATA_EXPECTED_SCHEMA_VERSION {
704            error!(
705                "wrong schema version, expected {T_METADATA_EXPECTED_SCHEMA_VERSION}, got {actual_version}"
706            );
707            return Err(InitializationError);
708        }
709
710        // t_execution_log
711        conn_execute(&conn, CREATE_TABLE_T_EXECUTION_LOG, [])?;
712        conn_execute(
713            &conn,
714            CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_VERSION,
715            [],
716        )?;
717        conn_execute(
718            &conn,
719            CREATE_INDEX_IDX_T_EXECUTION_ID_EXECUTION_ID_VARIANT,
720            [],
721        )?;
722        // t_join_set_response
723        conn_execute(&conn, CREATE_TABLE_T_JOIN_SET_RESPONSE, [])?;
724        conn_execute(
725            &conn,
726            CREATE_INDEX_IDX_T_JOIN_SET_RESPONSE_EXECUTION_ID_ID,
727            [],
728        )?;
729        conn_execute(
730            &conn,
731            CREATE_INDEX_IDX_JOIN_SET_RESPONSE_UNIQUE_CHILD_ID,
732            [],
733        )?;
734        conn_execute(
735            &conn,
736            CREATE_INDEX_IDX_JOIN_SET_RESPONSE_UNIQUE_DELAY_ID,
737            [],
738        )?;
739        // t_state
740        conn_execute(&conn, CREATE_TABLE_T_STATE, [])?;
741        conn_execute(&conn, IDX_T_STATE_LOCK_PENDING, [])?;
742        conn_execute(&conn, IDX_T_STATE_EXPIRED_TIMERS, [])?;
743        conn_execute(&conn, IDX_T_STATE_EXECUTION_ID_IS_TOP_LEVEL, [])?;
744        conn_execute(&conn, IDX_T_STATE_FFQN, [])?;
745        conn_execute(&conn, IDX_T_STATE_CREATED_AT, [])?;
746        // t_delay
747        conn_execute(&conn, CREATE_TABLE_T_DELAY, [])?;
748        // t_backtrace
749        conn_execute(&conn, CREATE_TABLE_T_BACKTRACE, [])?;
750        conn_execute(&conn, IDX_T_BACKTRACE_EXECUTION_ID_VERSION, [])?;
751        Ok(conn)
752    }
753
754    fn connection_rpc(
755        mut conn: Connection,
756        shutdown_requested: &AtomicBool,
757        shutdown_finished: &AtomicBool,
758        mut command_rx: mpsc::Receiver<ThreadCommand>,
759        metrics_threshold: Option<Duration>,
760    ) {
761        let mut histograms = Histograms::new(metrics_threshold);
762        while Self::tick(
763            &mut conn,
764            shutdown_requested,
765            &mut command_rx,
766            &mut histograms,
767        )
768        .is_ok()
769        {
770            // Loop until shutdown is set to true.
771        }
772        debug!("Closing command thread");
773        shutdown_finished.store(true, Ordering::Release);
774    }
775
776    fn tick(
777        conn: &mut Connection,
778        shutdown_requested: &AtomicBool,
779        command_rx: &mut mpsc::Receiver<ThreadCommand>,
780        histograms: &mut Histograms,
781    ) -> Result<(), ()> {
782        let mut ltx_list = Vec::new();
783        // Wait for first logical tx.
784        let mut ltx = match command_rx.blocking_recv() {
785            Some(ThreadCommand::LogicalTx(ltx)) => ltx,
786            Some(ThreadCommand::Shutdown) => {
787                debug!("shutdown message received");
788                return Err(());
789            }
790            None => {
791                debug!("command_rx was closed");
792                return Err(());
793            }
794        };
795        let all_fns_start = std::time::Instant::now();
796        let mut physical_tx = conn
797            .transaction_with_behavior(TransactionBehavior::Immediate)
798            .map_err(|begin_err| {
799                error!("Cannot open transaction, closing sqlite - {begin_err:?}");
800            })?;
801        Self::ltx_apply_to_tx(&mut ltx, &mut physical_tx, histograms);
802        ltx_list.push(ltx);
803
804        while let Ok(more) = command_rx.try_recv() {
805            let mut ltx = match more {
806                ThreadCommand::Shutdown => {
807                    debug!("shutdown message received");
808                    return Err(());
809                }
810                ThreadCommand::LogicalTx(ltx) => ltx,
811            };
812
813            Self::ltx_apply_to_tx(&mut ltx, &mut physical_tx, histograms);
814            ltx_list.push(ltx);
815        }
816        histograms.record_all_fns(all_fns_start.elapsed());
817
818        {
819            // Commit
820            if shutdown_requested.load(Ordering::Relaxed) {
821                debug!("Recveived shutdown during processing of the batch");
822                return Err(());
823            }
824            let commit_result = {
825                let now = std::time::Instant::now();
826                let commit_result = physical_tx.commit().map_err(RusqliteError::from);
827                histograms.record_commit(now.elapsed());
828                commit_result
829            };
830            if commit_result.is_ok() || ltx_list.len() == 1 {
831                // Happy path - just send the commit ACK.
832                for ltx in ltx_list {
833                    // Ignore the result: ThreadCommand producer timed out before awaiting the ack.
834                    let _ = ltx.commit_ack_sender.send(commit_result.clone());
835                }
836            } else {
837                warn!(
838                    "Bulk transaction failed, committing {} transactions serially",
839                    ltx_list.len()
840                );
841                // Bulk transaction failed. Replay each ltx in its own physical transaction.
842                // TODO: Use SAVEPOINT + RELEASE SAVEPOINT, ROLLBACK savepoint instead.
843                for ltx in ltx_list {
844                    Self::ltx_commit_single(ltx, conn, shutdown_requested, histograms)?;
845                }
846            }
847        }
848        histograms.print_if_elapsed();
849        Ok(())
850    }
851
852    fn ltx_commit_single(
853        mut ltx: LogicalTx,
854        conn: &mut Connection,
855        shutdown_requested: &AtomicBool,
856        histograms: &mut Histograms,
857    ) -> Result<(), ()> {
858        let mut physical_tx = conn
859            .transaction_with_behavior(TransactionBehavior::Immediate)
860            .map_err(|begin_err| {
861                error!("Cannot open transaction, closing sqlite - {begin_err:?}");
862            })?;
863        Self::ltx_apply_to_tx(&mut ltx, &mut physical_tx, histograms);
864        if shutdown_requested.load(Ordering::Relaxed) {
865            debug!("Recveived shutdown during processing of the batch");
866            return Err(());
867        }
868        let commit_result = {
869            let now = std::time::Instant::now();
870            let commit_result = physical_tx.commit().map_err(RusqliteError::from);
871            histograms.record_commit(now.elapsed());
872            commit_result
873        };
874        // Ignore the result: ThreadCommand producer timed out before awaiting the ack.
875        let _ = ltx.commit_ack_sender.send(commit_result);
876        Ok(())
877    }
878
879    fn ltx_apply_to_tx(
880        ltx: &mut LogicalTx,
881        physical_tx: &mut Transaction,
882        histograms: &mut Histograms,
883    ) {
884        let sent_latency = ltx.sent_at.elapsed();
885        let started_at = Instant::now();
886        (ltx.func)(physical_tx);
887        histograms.record_command(sent_latency, ltx.func_name, started_at.elapsed());
888    }
889
890    #[instrument(level = Level::DEBUG, skip_all, name = "sqlite_new")]
891    pub async fn new<P: AsRef<Path>>(
892        path: P,
893        config: SqliteConfig,
894    ) -> Result<Self, InitializationError> {
895        let path = path.as_ref().to_owned();
896
897        let shutdown_requested = Arc::new(AtomicBool::new(false));
898        let shutdown_finished = Arc::new(AtomicBool::new(false));
899
900        let (command_tx, command_rx) = tokio::sync::mpsc::channel(config.queue_capacity);
901        info!("Sqlite database location: {path:?}");
902        let join_handle = {
903            // Initialize the `Connection`.
904            let init_task = {
905                tokio::task::spawn_blocking(move || {
906                    Self::init_thread(&path, config.pragma_override.unwrap_or_default())
907                })
908                .await
909            };
910            let conn = match init_task {
911                Ok(res) => res?,
912                Err(join_err) => {
913                    error!("Initialization panic - {join_err:?}");
914                    return Err(InitializationError);
915                }
916            };
917            let shutdown_requested = shutdown_requested.clone();
918            let shutdown_finished = shutdown_finished.clone();
919            // Start the RPC thread.
920            std::thread::spawn(move || {
921                Self::connection_rpc(
922                    conn,
923                    &shutdown_requested,
924                    &shutdown_finished,
925                    command_rx,
926                    config.metrics_threshold,
927                );
928            })
929        };
930        Ok(SqlitePool(SqlitePoolInner {
931            shutdown_requested,
932            shutdown_finished,
933            command_tx,
934            response_subscribers: Arc::default(),
935            pending_ffqn_subscribers: Arc::default(),
936            join_handle: Some(Arc::new(join_handle)),
937            execution_finished_subscribers: Arc::default(),
938        }))
939    }
940
941    /// Invokes the provided function wrapping a new [`rusqlite::Transaction`] that is committed automatically.
942    async fn transaction<F, T, E>(&self, mut func: F, func_name: &'static str) -> Result<T, E>
943    where
944        F: FnMut(&mut rusqlite::Transaction) -> Result<T, E> + Send + 'static,
945        T: Send + 'static,
946        E: From<DbErrorGeneric> + From<RusqliteError> + Send + 'static,
947    {
948        let fn_res: Arc<std::sync::Mutex<Option<_>>> = Arc::default();
949        let (commit_ack_sender, commit_ack_receiver) = oneshot::channel();
950        let thread_command_func = {
951            let fn_res = fn_res.clone();
952            ThreadCommand::LogicalTx(LogicalTx {
953                func: Box::new(move |tx| {
954                    let func_res = func(tx);
955                    *fn_res.lock().unwrap() = Some(func_res);
956                }),
957                sent_at: Instant::now(),
958                func_name,
959                commit_ack_sender,
960            })
961        };
962        self.0
963            .command_tx
964            .send(thread_command_func)
965            .await
966            .map_err(|_send_err| DbErrorGeneric::Close)?;
967
968        // Wait for commit ack, then get the retval from the mutex.
969        match commit_ack_receiver.await {
970            Ok(Ok(())) => {
971                let mut guard = fn_res.lock().unwrap();
972                std::mem::take(guard.deref_mut()).expect("ltx must have been run at least once")
973            }
974            Ok(Err(rusqlite_err)) => Err(E::from(rusqlite_err)),
975            Err(_) => Err(E::from(DbErrorGeneric::Close)),
976        }
977    }
978
979    fn fetch_created_event(
980        conn: &Connection,
981        execution_id: &ExecutionId,
982    ) -> Result<CreateRequest, DbErrorRead> {
983        let mut stmt = conn.prepare(
984            "SELECT created_at, json_value FROM t_execution_log WHERE \
985            execution_id = :execution_id AND version = 0",
986        )?;
987        let (created_at, event) = stmt.query_row(
988            named_params! {
989                ":execution_id": execution_id.to_string(),
990            },
991            |row| {
992                let created_at = row.get("created_at")?;
993                let event = row
994                    .get::<_, JsonWrapper<ExecutionEventInner>>("json_value")
995                    .map_err(|serde| {
996                        error!("cannot deserialize `Created` event: {row:?} - `{serde:?}`");
997                        consistency_rusqlite("cannot deserialize `Created` event")
998                    })?;
999                Ok((created_at, event.0))
1000            },
1001        )?;
1002        if let ExecutionEventInner::Created {
1003            ffqn,
1004            params,
1005            parent,
1006            scheduled_at,
1007            component_id,
1008            metadata,
1009            scheduled_by,
1010        } = event
1011        {
1012            Ok(CreateRequest {
1013                created_at,
1014                execution_id: execution_id.clone(),
1015                ffqn,
1016                params,
1017                parent,
1018                scheduled_at,
1019                component_id,
1020                metadata,
1021                scheduled_by,
1022            })
1023        } else {
1024            error!("Row with version=0 must be a `Created` event - {event:?}");
1025            Err(consistency_db_err("expected `Created` event").into())
1026        }
1027    }
1028
1029    fn check_expected_next_and_appending_version(
1030        expected_version: &Version,
1031        appending_version: &Version,
1032    ) -> Result<(), DbErrorWrite> {
1033        if *expected_version != *appending_version {
1034            debug!(
1035                "Version conflict - expected: {expected_version:?}, appending: {appending_version:?}"
1036            );
1037            return Err(DbErrorWrite::NonRetriable(
1038                DbErrorWriteNonRetriable::VersionConflict {
1039                    expected: expected_version.clone(),
1040                    requested: appending_version.clone(),
1041                },
1042            ));
1043        }
1044        Ok(())
1045    }
1046
1047    #[instrument(level = Level::TRACE, skip_all, fields(execution_id = %req.execution_id))]
1048    fn create_inner(
1049        tx: &Transaction,
1050        req: CreateRequest,
1051    ) -> Result<(AppendResponse, AppendNotifier), DbErrorWrite> {
1052        debug!("create_inner");
1053
1054        let version = Version::default();
1055        let execution_id = req.execution_id.clone();
1056        let execution_id_str = execution_id.to_string();
1057        let ffqn = req.ffqn.clone();
1058        let created_at = req.created_at;
1059        let scheduled_at = req.scheduled_at;
1060        let event = ExecutionEventInner::from(req);
1061        let event_ser = serde_json::to_string(&event).map_err(|err| {
1062            error!("Cannot serialize {event:?} - {err:?}");
1063            DbErrorWriteNonRetriable::ValidationFailed("parameter serialization error".into())
1064        })?;
1065        tx.prepare(
1066                "INSERT INTO t_execution_log (execution_id, created_at, version, json_value, variant, join_set_id ) \
1067                VALUES (:execution_id, :created_at, :version, :json_value, :variant, :join_set_id)")
1068        ?
1069        .execute(named_params! {
1070            ":execution_id": &execution_id_str,
1071            ":created_at": created_at,
1072            ":version": version.0,
1073            ":json_value": event_ser,
1074            ":variant": event.variant(),
1075            ":join_set_id": event.join_set_id().map(std::string::ToString::to_string),
1076        })
1077        ?;
1078        let pending_at = {
1079            debug!("Creating with `Pending(`{scheduled_at:?}`)");
1080            tx.prepare(
1081                r"
1082                INSERT INTO t_state (
1083                    execution_id,
1084                    is_top_level,
1085                    corresponding_version,
1086                    pending_expires_finished,
1087                    ffqn,
1088                    state,
1089                    created_at,
1090                    updated_at,
1091                    scheduled_at,
1092                    intermittent_event_count
1093                    )
1094                VALUES (
1095                    :execution_id,
1096                    :is_top_level,
1097                    :corresponding_version,
1098                    :pending_expires_finished,
1099                    :ffqn,
1100                    :state,
1101                    :created_at,
1102                    CURRENT_TIMESTAMP,
1103                    :scheduled_at,
1104                    0
1105                    )
1106                ",
1107            )?
1108            .execute(named_params! {
1109                ":execution_id": execution_id.to_string(),
1110                ":is_top_level": execution_id.is_top_level(),
1111                ":corresponding_version": version.0,
1112                ":pending_expires_finished": scheduled_at,
1113                ":ffqn": ffqn.to_string(),
1114                ":state": STATE_PENDING_AT,
1115                ":created_at": created_at,
1116                ":scheduled_at": scheduled_at,
1117            })?;
1118            AppendNotifier {
1119                pending_at: Some(NotifierPendingAt { scheduled_at, ffqn }),
1120                execution_finished: None,
1121                response: None,
1122            }
1123        };
1124        let next_version = Version::new(version.0 + 1);
1125        Ok((next_version, pending_at))
1126    }
1127
1128    #[instrument(level = Level::DEBUG, skip_all, fields(%execution_id, %scheduled_at, %corresponding_version))]
1129    fn update_state_pending_after_response_appended(
1130        tx: &Transaction,
1131        execution_id: &ExecutionId,
1132        scheduled_at: DateTime<Utc>,     // Changing to state PendingAt
1133        corresponding_version: &Version, // t_execution_log is not be changed
1134    ) -> Result<AppendNotifier, DbErrorWrite> {
1135        debug!("Setting t_state to Pending(`{scheduled_at:?}`) after response appended");
1136        let execution_id_str = execution_id.to_string();
1137        let mut stmt = tx
1138            .prepare_cached(
1139                r"
1140                UPDATE t_state
1141                SET
1142                    corresponding_version = :corresponding_version,
1143                    pending_expires_finished = :pending_expires_finished,
1144                    state = :state,
1145                    updated_at = CURRENT_TIMESTAMP,
1146
1147                    last_lock_version = NULL,
1148
1149                    join_set_id = NULL,
1150                    join_set_closing = NULL,
1151
1152                    result_kind = NULL
1153                WHERE execution_id = :execution_id
1154            ",
1155            )
1156            .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1157        let updated = stmt
1158            .execute(named_params! {
1159                ":execution_id": execution_id_str,
1160                ":corresponding_version": corresponding_version.0,
1161                ":pending_expires_finished": scheduled_at,
1162                ":state": STATE_PENDING_AT,
1163            })
1164            .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1165        if updated != 1 {
1166            return Err(DbErrorWrite::NotFound);
1167        }
1168        Ok(AppendNotifier {
1169            pending_at: Some(NotifierPendingAt {
1170                scheduled_at,
1171                ffqn: Self::fetch_created_event(tx, execution_id)?.ffqn,
1172            }),
1173            execution_finished: None,
1174            response: None,
1175        })
1176    }
1177
1178    #[instrument(level = Level::DEBUG, skip_all, fields(%execution_id, %scheduled_at, %appending_version))]
1179    fn update_state_pending_after_event_appended(
1180        tx: &Transaction,
1181        execution_id: &ExecutionId,
1182        appending_version: &Version,
1183        scheduled_at: DateTime<Utc>, // Changing to state PendingAt
1184        intermittent_failure: bool,
1185    ) -> Result<(AppendResponse, AppendNotifier), DbErrorWrite> {
1186        debug!("Setting t_state to Pending(`{scheduled_at:?}`) after event appended");
1187        // If response idx is unknown, use 0. This distinguishs the two paths.
1188        // JoinNext will always send an actual idx.
1189        let mut stmt = tx
1190            .prepare_cached(
1191                r"
1192                UPDATE t_state
1193                SET
1194                    corresponding_version = :appending_version,
1195                    pending_expires_finished = :pending_expires_finished,
1196                    state = :state,
1197                    updated_at = CURRENT_TIMESTAMP,
1198                    intermittent_event_count = intermittent_event_count + :intermittent_delta,
1199
1200                    last_lock_version = NULL,
1201
1202                    join_set_id = NULL,
1203                    join_set_closing = NULL,
1204
1205                    result_kind = NULL
1206                WHERE execution_id = :execution_id;
1207            ",
1208            )
1209            .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1210        let updated = stmt
1211            .execute(named_params! {
1212                ":execution_id": execution_id.to_string(),
1213                ":appending_version": appending_version.0,
1214                ":pending_expires_finished": scheduled_at,
1215                ":state": STATE_PENDING_AT,
1216                ":intermittent_delta": i32::from(intermittent_failure) // 0 or 1
1217            })
1218            .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1219        if updated != 1 {
1220            return Err(DbErrorWrite::NotFound);
1221        }
1222        Ok((
1223            appending_version.increment(),
1224            AppendNotifier {
1225                pending_at: Some(NotifierPendingAt {
1226                    scheduled_at,
1227                    ffqn: Self::fetch_created_event(tx, execution_id)?.ffqn,
1228                }),
1229                execution_finished: None,
1230                response: None,
1231            },
1232        ))
1233    }
1234
1235    fn update_state_locked_get_intermittent_event_count(
1236        tx: &Transaction,
1237        execution_id: &ExecutionId,
1238        executor_id: ExecutorId,
1239        run_id: RunId,
1240        lock_expires_at: DateTime<Utc>,
1241        appending_version: &Version,
1242        retry_config: ComponentRetryConfig,
1243    ) -> Result<u32, DbErrorWrite> {
1244        debug!("Setting t_state to Locked(`{lock_expires_at:?}`)");
1245        let backoff_millis =
1246            u64::try_from(retry_config.retry_exp_backoff.as_millis()).expect("backoff too big");
1247        let execution_id_str = execution_id.to_string();
1248        let mut stmt = tx
1249            .prepare_cached(
1250                r"
1251                UPDATE t_state
1252                SET
1253                    corresponding_version = :appending_version,
1254                    pending_expires_finished = :pending_expires_finished,
1255                    state = :state,
1256                    updated_at = CURRENT_TIMESTAMP,
1257
1258                    max_retries = :max_retries,
1259                    retry_exp_backoff_millis = :retry_exp_backoff_millis,
1260                    last_lock_version = :appending_version,
1261                    executor_id = :executor_id,
1262                    run_id = :run_id,
1263
1264                    join_set_id = NULL,
1265                    join_set_closing = NULL,
1266
1267                    result_kind = NULL
1268                WHERE execution_id = :execution_id
1269            ",
1270            )
1271            .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1272        let updated = stmt
1273            .execute(named_params! {
1274                ":execution_id": execution_id_str,
1275                ":appending_version": appending_version.0,
1276                ":pending_expires_finished": lock_expires_at,
1277                ":state": STATE_LOCKED,
1278                ":max_retries": retry_config.max_retries,
1279                ":retry_exp_backoff_millis": backoff_millis,
1280                ":executor_id": executor_id.to_string(),
1281                ":run_id": run_id.to_string(),
1282            })
1283            .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1284        if updated != 1 {
1285            return Err(DbErrorWrite::NotFound);
1286        }
1287
1288        // fetch intermittent event count from the just-inserted row.
1289        let intermittent_event_count = tx
1290            .prepare(
1291                "SELECT intermittent_event_count FROM t_state WHERE execution_id = :execution_id",
1292            )?
1293            .query_row(
1294                named_params! {
1295                    ":execution_id": execution_id_str,
1296                },
1297                |row| {
1298                    let intermittent_event_count = row.get("intermittent_event_count")?;
1299                    Ok(intermittent_event_count)
1300                },
1301            )
1302            .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1303
1304        Ok(intermittent_event_count)
1305    }
1306
1307    fn update_state_blocked(
1308        tx: &Transaction,
1309        execution_id: &ExecutionId,
1310        appending_version: &Version,
1311        // BlockedByJoinSet fields
1312        join_set_id: &JoinSetId,
1313        lock_expires_at: DateTime<Utc>,
1314        join_set_closing: bool,
1315    ) -> Result<
1316        AppendResponse, // next version
1317        DbErrorWrite,
1318    > {
1319        debug!("Setting t_state to BlockedByJoinSet(`{join_set_id}`)");
1320        let execution_id_str = execution_id.to_string();
1321        let mut stmt = tx.prepare_cached(
1322            r"
1323                UPDATE t_state
1324                SET
1325                    corresponding_version = :appending_version,
1326                    pending_expires_finished = :pending_expires_finished,
1327                    state = :state,
1328                    updated_at = CURRENT_TIMESTAMP,
1329
1330                    last_lock_version = NULL,
1331
1332                    join_set_id = :join_set_id,
1333                    join_set_closing = :join_set_closing,
1334
1335                    result_kind = NULL
1336                WHERE execution_id = :execution_id
1337            ",
1338        )?;
1339        let updated = stmt.execute(named_params! {
1340            ":execution_id": execution_id_str,
1341            ":appending_version": appending_version.0,
1342            ":pending_expires_finished": lock_expires_at,
1343            ":state": STATE_BLOCKED_BY_JOIN_SET,
1344            ":join_set_id": join_set_id,
1345            ":join_set_closing": join_set_closing,
1346        })?;
1347        if updated != 1 {
1348            return Err(DbErrorWrite::NotFound);
1349        }
1350        Ok(appending_version.increment())
1351    }
1352
1353    fn update_state_finished(
1354        tx: &Transaction,
1355        execution_id: &ExecutionId,
1356        appending_version: &Version,
1357        // Finished fields
1358        finished_at: DateTime<Utc>,
1359        result_kind: PendingStateFinishedResultKind,
1360    ) -> Result<(), DbErrorWrite> {
1361        debug!("Setting t_state to Finished");
1362        let execution_id_str = execution_id.to_string();
1363        let mut stmt = tx.prepare_cached(
1364            r"
1365                UPDATE t_state
1366                SET
1367                    corresponding_version = :appending_version,
1368                    pending_expires_finished = :pending_expires_finished,
1369                    state = :state,
1370                    updated_at = CURRENT_TIMESTAMP,
1371
1372                    last_lock_version = NULL,
1373                    executor_id = NULL,
1374                    run_id = NULL,
1375
1376                    join_set_id = NULL,
1377                    join_set_closing = NULL,
1378
1379                    result_kind = :result_kind
1380                WHERE execution_id = :execution_id
1381            ",
1382        )?;
1383
1384        let updated = stmt.execute(named_params! {
1385            ":execution_id": execution_id_str,
1386            ":appending_version": appending_version.0,
1387            ":pending_expires_finished": finished_at,
1388            ":state": STATE_FINISHED,
1389            ":result_kind": JsonWrapper(result_kind),
1390        })?;
1391        if updated != 1 {
1392            return Err(DbErrorWrite::NotFound);
1393        }
1394        Ok(())
1395    }
1396
1397    // Upon appending new event to t_execution_log, copy the previous t_state with changed appending_version and created_at.
1398    #[instrument(level = Level::TRACE, skip_all, fields(%execution_id, %appending_version))]
1399    fn bump_state_next_version(
1400        tx: &Transaction,
1401        execution_id: &ExecutionId,
1402        appending_version: &Version,
1403        delay_req: Option<DelayReq>,
1404    ) -> Result<AppendResponse /* next version */, DbErrorWrite> {
1405        debug!("update_index_version");
1406        let execution_id_str = execution_id.to_string();
1407        let mut stmt = tx.prepare_cached(
1408            r"
1409                UPDATE t_state
1410                SET
1411                    corresponding_version = :appending_version,
1412                    updated_at = CURRENT_TIMESTAMP
1413                WHERE execution_id = :execution_id
1414            ",
1415        )?;
1416        let updated = stmt.execute(named_params! {
1417            ":execution_id": execution_id_str,
1418            ":appending_version": appending_version.0,
1419        })?;
1420        if updated != 1 {
1421            return Err(DbErrorWrite::NotFound);
1422        }
1423        if let Some(DelayReq {
1424            join_set_id,
1425            delay_id,
1426            expires_at,
1427        }) = delay_req
1428        {
1429            debug!("Inserting delay to `t_delay`");
1430            let mut stmt = tx.prepare_cached(
1431                "INSERT INTO t_delay (execution_id, join_set_id, delay_id, expires_at) \
1432                VALUES \
1433                (:execution_id, :join_set_id, :delay_id, :expires_at)",
1434            )?;
1435            stmt.execute(named_params! {
1436                ":execution_id": execution_id_str,
1437                ":join_set_id": join_set_id.to_string(),
1438                ":delay_id": delay_id.to_string(),
1439                ":expires_at": expires_at,
1440            })
1441            .map_err(|err| match err.sqlite_error().map(|err| err.code) {
1442                Some(ErrorCode::ConstraintViolation) => DbErrorWrite::NonRetriable(
1443                    DbErrorWriteNonRetriable::IllegalState("conflicting delay id".into()),
1444                ),
1445                _ => DbErrorWrite::from(err),
1446            })?;
1447        }
1448        Ok(appending_version.increment())
1449    }
1450
1451    fn get_combined_state(
1452        tx: &Transaction,
1453        execution_id: &ExecutionId,
1454    ) -> Result<CombinedState, DbErrorRead> {
1455        let mut stmt = tx.prepare(
1456            r"
1457                SELECT
1458                    state, ffqn, corresponding_version, pending_expires_finished,
1459                    last_lock_version, executor_id, run_id,
1460                    join_set_id, join_set_closing,
1461                    result_kind
1462                    FROM t_state
1463                WHERE
1464                    execution_id = :execution_id
1465                ",
1466        )?;
1467        stmt.query_row(
1468            named_params! {
1469                ":execution_id": execution_id.to_string(),
1470            },
1471            |row| {
1472                CombinedState::new(
1473                    &CombinedStateDTO {
1474                        state: row.get("state")?,
1475                        ffqn: row.get("ffqn")?,
1476                        pending_expires_finished: row
1477                            .get::<_, DateTime<Utc>>("pending_expires_finished")?,
1478                        last_lock_version: row
1479                            .get::<_, Option<VersionType>>("last_lock_version")?
1480                            .map(Version::new),
1481                        executor_id: row.get::<_, Option<ExecutorId>>("executor_id")?,
1482                        run_id: row.get::<_, Option<RunId>>("run_id")?,
1483                        join_set_id: row.get::<_, Option<JoinSetId>>("join_set_id")?,
1484                        join_set_closing: row.get::<_, Option<bool>>("join_set_closing")?,
1485                        result_kind: row
1486                            .get::<_, Option<JsonWrapper<PendingStateFinishedResultKind>>>(
1487                                "result_kind",
1488                            )?
1489                            .map(|wrapper| wrapper.0),
1490                    },
1491                    Version::new(row.get("corresponding_version")?),
1492                )
1493            },
1494        )
1495        .map_err(DbErrorRead::from)
1496    }
1497
1498    fn list_executions(
1499        read_tx: &Transaction,
1500        ffqn: Option<&FunctionFqn>,
1501        top_level_only: bool,
1502        pagination: &ExecutionListPagination,
1503    ) -> Result<Vec<ExecutionWithState>, DbErrorGeneric> {
1504        struct StatementModifier<'a> {
1505            where_vec: Vec<String>,
1506            params: Vec<(&'static str, ToSqlOutput<'a>)>,
1507            limit: u32,
1508            limit_desc: bool,
1509        }
1510
1511        fn paginate<'a, T: rusqlite::ToSql + 'static>(
1512            pagination: &'a Pagination<Option<T>>,
1513            column: &str,
1514            top_level_only: bool,
1515        ) -> Result<StatementModifier<'a>, DbErrorGeneric> {
1516            let mut where_vec: Vec<String> = vec![];
1517            let mut params: Vec<(&'static str, ToSqlOutput<'a>)> = vec![];
1518            let limit = pagination.length();
1519            let limit_desc = pagination.is_desc();
1520            let rel = pagination.rel();
1521            match pagination {
1522                Pagination::NewerThan {
1523                    cursor: Some(cursor),
1524                    ..
1525                }
1526                | Pagination::OlderThan {
1527                    cursor: Some(cursor),
1528                    ..
1529                } => {
1530                    where_vec.push(format!("{column} {rel} :cursor"));
1531                    let cursor = cursor.to_sql().map_err(|err| {
1532                        error!("Possible program error - cannot convert cursor to sql - {err:?}");
1533                        DbErrorGeneric::Uncategorized("cannot convert cursor to sql".into())
1534                    })?;
1535                    params.push((":cursor", cursor));
1536                }
1537                _ => {}
1538            }
1539            if top_level_only {
1540                where_vec.push("is_top_level=true".to_string());
1541            }
1542            Ok(StatementModifier {
1543                where_vec,
1544                params,
1545                limit,
1546                limit_desc,
1547            })
1548        }
1549
1550        let mut statement_mod = match pagination {
1551            ExecutionListPagination::CreatedBy(pagination) => {
1552                paginate(pagination, "created_at", top_level_only)?
1553            }
1554            ExecutionListPagination::ExecutionId(pagination) => {
1555                paginate(pagination, "execution_id", top_level_only)?
1556            }
1557        };
1558
1559        let ffqn_temporary;
1560        if let Some(ffqn) = ffqn {
1561            statement_mod.where_vec.push("ffqn = :ffqn".to_string());
1562            ffqn_temporary = ffqn.to_string();
1563            let ffqn = ffqn_temporary
1564                .to_sql()
1565                .expect("string conversion never fails");
1566
1567            statement_mod.params.push((":ffqn", ffqn));
1568        }
1569
1570        let where_str = if statement_mod.where_vec.is_empty() {
1571            String::new()
1572        } else {
1573            format!("WHERE {}", statement_mod.where_vec.join(" AND "))
1574        };
1575        let sql = format!(
1576            r"
1577            SELECT created_at, scheduled_at, state, execution_id, ffqn, corresponding_version, pending_expires_finished,
1578            last_lock_version, executor_id, run_id,
1579            join_set_id, join_set_closing,
1580            result_kind
1581            FROM t_state {where_str} ORDER BY created_at {desc} LIMIT {limit}
1582            ",
1583            desc = if statement_mod.limit_desc { "DESC" } else { "" },
1584            limit = statement_mod.limit,
1585        );
1586        let mut vec: Vec<_> = read_tx
1587            .prepare(&sql)?
1588            .query_map::<_, &[(&'static str, ToSqlOutput)], _>(
1589                statement_mod
1590                    .params
1591                    .into_iter()
1592                    .collect::<Vec<_>>()
1593                    .as_ref(),
1594                |row| {
1595                    let execution_id = row.get::<_, ExecutionId>("execution_id")?;
1596                    let created_at = row.get("created_at")?;
1597                    let scheduled_at = row.get("scheduled_at")?;
1598                    let combined_state = CombinedState::new(
1599                        &CombinedStateDTO {
1600                            state: row.get("state")?,
1601                            ffqn: row.get("ffqn")?,
1602                            pending_expires_finished: row
1603                                .get::<_, DateTime<Utc>>("pending_expires_finished")?,
1604                            executor_id: row.get::<_, Option<ExecutorId>>("executor_id")?,
1605
1606                            last_lock_version: row
1607                                .get::<_, Option<VersionType>>("last_lock_version")?
1608                                .map(Version::new),
1609                            run_id: row.get::<_, Option<RunId>>("run_id")?,
1610                            join_set_id: row.get::<_, Option<JoinSetId>>("join_set_id")?,
1611                            join_set_closing: row.get::<_, Option<bool>>("join_set_closing")?,
1612                            result_kind: row
1613                                .get::<_, Option<JsonWrapper<PendingStateFinishedResultKind>>>(
1614                                    "result_kind",
1615                                )?
1616                                .map(|wrapper| wrapper.0),
1617                        },
1618                        Version::new(row.get("corresponding_version")?),
1619                    )?;
1620                    Ok(ExecutionWithState {
1621                        execution_id,
1622                        ffqn: combined_state.ffqn,
1623                        pending_state: combined_state.pending_state,
1624                        created_at,
1625                        scheduled_at,
1626                    })
1627                },
1628            )?
1629            .collect::<Vec<Result<_, _>>>()
1630            .into_iter()
1631            .filter_map(|row| match row {
1632                Ok(row) => Some(row),
1633                Err(err) => {
1634                    warn!("Skipping row - {err:?}");
1635                    None
1636                }
1637            })
1638            .collect();
1639
1640        if !statement_mod.limit_desc {
1641            // the list must be sorted in descending order
1642            vec.reverse();
1643        }
1644        Ok(vec)
1645    }
1646
1647    #[instrument(level = Level::TRACE, skip_all, fields(%execution_id, %run_id, %executor_id))]
1648    #[expect(clippy::too_many_arguments)]
1649    fn lock_single_execution(
1650        tx: &Transaction,
1651        created_at: DateTime<Utc>,
1652        component_id: &ComponentId,
1653        execution_id: &ExecutionId,
1654        run_id: RunId,
1655        appending_version: &Version,
1656        executor_id: ExecutorId,
1657        lock_expires_at: DateTime<Utc>,
1658        retry_config: ComponentRetryConfig,
1659    ) -> Result<LockedExecution, DbErrorWrite> {
1660        debug!("lock_single_execution");
1661        let combined_state = Self::get_combined_state(tx, execution_id)?;
1662        combined_state.pending_state.can_append_lock(
1663            created_at,
1664            executor_id,
1665            run_id,
1666            lock_expires_at,
1667        )?;
1668        let expected_version = combined_state.get_next_version_assert_not_finished();
1669        Self::check_expected_next_and_appending_version(&expected_version, appending_version)?;
1670
1671        // Append to `execution_log` table.
1672        let locked_event = Locked {
1673            component_id: component_id.clone(),
1674            executor_id,
1675            lock_expires_at,
1676            run_id,
1677            retry_config,
1678        };
1679        let event = ExecutionEventInner::Locked(locked_event.clone());
1680        let event_ser = serde_json::to_string(&event).map_err(|err| {
1681            warn!("Cannot serialize {event:?} - {err:?}");
1682            DbErrorWriteNonRetriable::ValidationFailed("parameter serialization error".into())
1683        })?;
1684        let mut stmt = tx
1685            .prepare_cached(
1686                "INSERT INTO t_execution_log \
1687            (execution_id, created_at, json_value, version, variant) \
1688            VALUES \
1689            (:execution_id, :created_at, :json_value, :version, :variant)",
1690            )
1691            .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1692        stmt.execute(named_params! {
1693            ":execution_id": execution_id.to_string(),
1694            ":created_at": created_at,
1695            ":json_value": event_ser,
1696            ":version": appending_version.0,
1697            ":variant": event.variant(),
1698        })
1699        .map_err(|err| {
1700            warn!("Cannot lock execution - {err:?}");
1701            DbErrorWrite::NonRetriable(DbErrorWriteNonRetriable::IllegalState("cannot lock".into()))
1702        })?;
1703
1704        // Update `t_state`
1705        let responses = Self::list_responses(tx, execution_id, None)?;
1706        let responses = responses.into_iter().map(|resp| resp.event).collect();
1707        trace!("Responses: {responses:?}");
1708
1709        let intermittent_event_count = Self::update_state_locked_get_intermittent_event_count(
1710            tx,
1711            execution_id,
1712            executor_id,
1713            run_id,
1714            lock_expires_at,
1715            appending_version,
1716            retry_config,
1717        )?;
1718        // Fetch event_history and `Created` event to construct the response.
1719        let mut events = tx
1720            .prepare(
1721                "SELECT json_value, version FROM t_execution_log WHERE \
1722                execution_id = :execution_id AND (variant = :variant1 OR variant = :variant2) \
1723                ORDER BY version",
1724            )?
1725            .query_map(
1726                named_params! {
1727                    ":execution_id": execution_id.to_string(),
1728                    ":variant1": DUMMY_CREATED.variant(),
1729                    ":variant2": DUMMY_HISTORY_EVENT.variant(),
1730                },
1731                |row| {
1732                    let created_at_fake = DateTime::from_timestamp_nanos(0); // not used
1733                    let event = row
1734                        .get::<_, JsonWrapper<ExecutionEventInner>>("json_value")
1735                        .map_err(|serde| {
1736                            error!("Cannot deserialize {row:?} - {serde:?}");
1737                            consistency_rusqlite("cannot deserialize event")
1738                        })?
1739                        .0;
1740                    let version = Version(row.get("version")?);
1741
1742                    Ok(ExecutionEvent {
1743                        created_at: created_at_fake,
1744                        event,
1745                        backtrace_id: None,
1746                        version,
1747                    })
1748                },
1749            )?
1750            .collect::<Result<Vec<_>, _>>()?
1751            .into_iter()
1752            .collect::<VecDeque<_>>();
1753        let Some(ExecutionEventInner::Created {
1754            ffqn,
1755            params,
1756            parent,
1757            metadata,
1758            ..
1759        }) = events.pop_front().map(|outer| outer.event)
1760        else {
1761            error!("Execution log must contain at least `Created` event");
1762            return Err(consistency_db_err("execution log must contain `Created` event").into());
1763        };
1764
1765        let event_history = events
1766            .into_iter()
1767            .map(|ExecutionEvent { event, version, .. }| {
1768                if let ExecutionEventInner::HistoryEvent { event } = event {
1769                    Ok((event, version))
1770                } else {
1771                    error!("Rows can only contain `Created` and `HistoryEvent` event kinds");
1772                    Err(consistency_db_err(
1773                        "rows can only contain `Created` and `HistoryEvent` event kinds",
1774                    ))
1775                }
1776            })
1777            .collect::<Result<Vec<_>, _>>()?;
1778
1779        Ok(LockedExecution {
1780            execution_id: execution_id.clone(),
1781            metadata,
1782            next_version: appending_version.increment(),
1783            ffqn,
1784            params,
1785            event_history,
1786            responses,
1787            parent,
1788            intermittent_event_count,
1789            locked_event,
1790        })
1791    }
1792
1793    fn count_join_next(
1794        tx: &Transaction,
1795        execution_id: &ExecutionId,
1796        join_set_id: &JoinSetId,
1797    ) -> Result<u64, DbErrorRead> {
1798        let mut stmt = tx.prepare(
1799            "SELECT COUNT(*) as count FROM t_execution_log WHERE execution_id = :execution_id AND join_set_id = :join_set_id \
1800            AND history_event_type = :join_next",
1801        )?;
1802        Ok(stmt.query_row(
1803            named_params! {
1804                ":execution_id": execution_id.to_string(),
1805                ":join_set_id": join_set_id.to_string(),
1806                ":join_next": HISTORY_EVENT_TYPE_JOIN_NEXT,
1807            },
1808            |row| row.get("count"),
1809        )?)
1810    }
1811
1812    #[instrument(level = Level::TRACE, skip_all, fields(%execution_id))]
1813    #[expect(clippy::needless_return)]
1814    fn append(
1815        tx: &Transaction,
1816        execution_id: &ExecutionId,
1817        req: AppendRequest,
1818        appending_version: Version,
1819    ) -> Result<(AppendResponse, AppendNotifier), DbErrorWrite> {
1820        if matches!(req.event, ExecutionEventInner::Created { .. }) {
1821            return Err(DbErrorWrite::NonRetriable(
1822                DbErrorWriteNonRetriable::ValidationFailed(
1823                    "cannot append `Created` event - use `create` instead".into(),
1824                ),
1825            ));
1826        }
1827        if let AppendRequest {
1828            event:
1829                ExecutionEventInner::Locked(Locked {
1830                    component_id,
1831                    executor_id,
1832                    run_id,
1833                    lock_expires_at,
1834                    retry_config,
1835                }),
1836            created_at,
1837        } = req
1838        {
1839            return Self::lock_single_execution(
1840                tx,
1841                created_at,
1842                &component_id,
1843                execution_id,
1844                run_id,
1845                &appending_version,
1846                executor_id,
1847                lock_expires_at,
1848                retry_config,
1849            )
1850            .map(|locked_execution| (locked_execution.next_version, AppendNotifier::default()));
1851        }
1852
1853        let combined_state = Self::get_combined_state(tx, execution_id)?;
1854        if combined_state.pending_state.is_finished() {
1855            debug!("Execution is already finished");
1856            return Err(DbErrorWrite::NonRetriable(
1857                DbErrorWriteNonRetriable::IllegalState("already finished".into()),
1858            ));
1859        }
1860
1861        Self::check_expected_next_and_appending_version(
1862            &combined_state.get_next_version_assert_not_finished(),
1863            &appending_version,
1864        )?;
1865        let event_ser = serde_json::to_string(&req.event).map_err(|err| {
1866            error!("Cannot serialize {:?} - {err:?}", req.event);
1867            DbErrorWriteNonRetriable::ValidationFailed("parameter serialization error".into())
1868        })?;
1869
1870        let mut stmt = tx.prepare(
1871                    "INSERT INTO t_execution_log (execution_id, created_at, json_value, version, variant, join_set_id) \
1872                    VALUES (:execution_id, :created_at, :json_value, :version, :variant, :join_set_id)")
1873                    ?;
1874        stmt.execute(named_params! {
1875            ":execution_id": execution_id.to_string(),
1876            ":created_at": req.created_at,
1877            ":json_value": event_ser,
1878            ":version": appending_version.0,
1879            ":variant": req.event.variant(),
1880            ":join_set_id": req.event.join_set_id().map(std::string::ToString::to_string),
1881        })?;
1882        // Calculate current pending state
1883
1884        match &req.event {
1885            ExecutionEventInner::Created { .. } => {
1886                unreachable!("handled in the caller")
1887            }
1888
1889            ExecutionEventInner::Locked { .. } => {
1890                unreachable!("handled above")
1891            }
1892
1893            ExecutionEventInner::TemporarilyFailed {
1894                backoff_expires_at, ..
1895            }
1896            | ExecutionEventInner::TemporarilyTimedOut {
1897                backoff_expires_at, ..
1898            } => {
1899                let (next_version, notifier) = Self::update_state_pending_after_event_appended(
1900                    tx,
1901                    execution_id,
1902                    &appending_version,
1903                    *backoff_expires_at,
1904                    true, // an intermittent failure
1905                )?;
1906                return Ok((next_version, notifier));
1907            }
1908
1909            ExecutionEventInner::Unlocked {
1910                backoff_expires_at, ..
1911            } => {
1912                let (next_version, notifier) = Self::update_state_pending_after_event_appended(
1913                    tx,
1914                    execution_id,
1915                    &appending_version,
1916                    *backoff_expires_at,
1917                    false, // not an intermittent failure
1918                )?;
1919                return Ok((next_version, notifier));
1920            }
1921
1922            ExecutionEventInner::Finished { result, .. } => {
1923                Self::update_state_finished(
1924                    tx,
1925                    execution_id,
1926                    &appending_version,
1927                    req.created_at,
1928                    PendingStateFinishedResultKind::from(result),
1929                )?;
1930                return Ok((
1931                    appending_version,
1932                    AppendNotifier {
1933                        pending_at: None,
1934                        execution_finished: Some(NotifierExecutionFinished {
1935                            execution_id: execution_id.clone(),
1936                            retval: result.clone(),
1937                        }),
1938                        response: None,
1939                    },
1940                ));
1941            }
1942
1943            ExecutionEventInner::HistoryEvent {
1944                event:
1945                    HistoryEvent::JoinSetCreate { .. }
1946                    | HistoryEvent::JoinSetRequest {
1947                        request: JoinSetRequest::ChildExecutionRequest { .. },
1948                        ..
1949                    }
1950                    | HistoryEvent::Persist { .. }
1951                    | HistoryEvent::Schedule { .. }
1952                    | HistoryEvent::Stub { .. }
1953                    | HistoryEvent::JoinNextTooMany { .. },
1954            } => {
1955                return Ok((
1956                    Self::bump_state_next_version(tx, execution_id, &appending_version, None)?,
1957                    AppendNotifier::default(),
1958                ));
1959            }
1960
1961            ExecutionEventInner::HistoryEvent {
1962                event:
1963                    HistoryEvent::JoinSetRequest {
1964                        join_set_id,
1965                        request:
1966                            JoinSetRequest::DelayRequest {
1967                                delay_id,
1968                                expires_at,
1969                                ..
1970                            },
1971                    },
1972            } => {
1973                return Ok((
1974                    Self::bump_state_next_version(
1975                        tx,
1976                        execution_id,
1977                        &appending_version,
1978                        Some(DelayReq {
1979                            join_set_id: join_set_id.clone(),
1980                            delay_id: delay_id.clone(),
1981                            expires_at: *expires_at,
1982                        }),
1983                    )?,
1984                    AppendNotifier::default(),
1985                ));
1986            }
1987
1988            ExecutionEventInner::HistoryEvent {
1989                event:
1990                    HistoryEvent::JoinNext {
1991                        join_set_id,
1992                        run_expires_at,
1993                        closing,
1994                        requested_ffqn: _,
1995                    },
1996            } => {
1997                // Did the response arrive already?
1998                let join_next_count = Self::count_join_next(tx, execution_id, join_set_id)?;
1999                let nth_response =
2000                    Self::nth_response(tx, execution_id, join_set_id, join_next_count - 1)?; // Skip n-1 rows
2001                trace!("join_next_count: {join_next_count}, nth_response: {nth_response:?}");
2002                assert!(join_next_count > 0);
2003                if let Some(ResponseWithCursor {
2004                    event:
2005                        JoinSetResponseEventOuter {
2006                            created_at: nth_created_at,
2007                            ..
2008                        },
2009                    cursor: _,
2010                }) = nth_response
2011                {
2012                    let scheduled_at = max(*run_expires_at, nth_created_at); // No need to block
2013                    let (next_version, notifier) = Self::update_state_pending_after_event_appended(
2014                        tx,
2015                        execution_id,
2016                        &appending_version,
2017                        scheduled_at,
2018                        false, // not an intermittent failure
2019                    )?;
2020                    return Ok((next_version, notifier));
2021                }
2022                return Ok((
2023                    Self::update_state_blocked(
2024                        tx,
2025                        execution_id,
2026                        &appending_version,
2027                        join_set_id,
2028                        *run_expires_at,
2029                        *closing,
2030                    )?,
2031                    AppendNotifier::default(),
2032                ));
2033            }
2034        }
2035    }
2036
2037    fn append_response(
2038        tx: &Transaction,
2039        execution_id: &ExecutionId,
2040        response_outer: JoinSetResponseEventOuter,
2041    ) -> Result<AppendNotifier, DbErrorWrite> {
2042        let mut stmt = tx.prepare(
2043            "INSERT INTO t_join_set_response (execution_id, created_at, join_set_id, delay_id, delay_success, child_execution_id, finished_version) \
2044                    VALUES (:execution_id, :created_at, :join_set_id, :delay_id, :delay_success, :child_execution_id, :finished_version)",
2045        )?;
2046        let join_set_id = &response_outer.event.join_set_id;
2047        let (delay_id, delay_success) = match &response_outer.event.event {
2048            JoinSetResponse::DelayFinished { delay_id, result } => {
2049                (Some(delay_id.to_string()), Some(result.is_ok()))
2050            }
2051            JoinSetResponse::ChildExecutionFinished { .. } => (None, None),
2052        };
2053        let (child_execution_id, finished_version) = match &response_outer.event.event {
2054            JoinSetResponse::ChildExecutionFinished {
2055                child_execution_id,
2056                finished_version,
2057                result: _,
2058            } => (
2059                Some(child_execution_id.to_string()),
2060                Some(finished_version.0),
2061            ),
2062            JoinSetResponse::DelayFinished { .. } => (None, None),
2063        };
2064
2065        stmt.execute(named_params! {
2066            ":execution_id": execution_id.to_string(),
2067            ":created_at": response_outer.created_at,
2068            ":join_set_id": join_set_id.to_string(),
2069            ":delay_id": delay_id,
2070            ":delay_success": delay_success,
2071            ":child_execution_id": child_execution_id,
2072            ":finished_version": finished_version,
2073        })
2074        .map_err(|err| match err.sqlite_error().map(|err| err.code) {
2075            Some(ErrorCode::ConstraintViolation) => DbErrorWrite::NonRetriable(
2076                DbErrorWriteNonRetriable::IllegalState("conflicting response id".into()),
2077            ),
2078            _ => DbErrorWrite::from(err),
2079        })?;
2080
2081        // if the execution is going to be unblocked by this response...
2082        let combined_state = Self::get_combined_state(tx, execution_id)?;
2083        debug!("previous_pending_state: {combined_state:?}");
2084        let mut notifier = if let PendingState::BlockedByJoinSet {
2085            join_set_id: found_join_set_id,
2086            lock_expires_at, // Set to a future time if the worker is keeping the execution warm waiting for the result.
2087            closing: _,
2088        } = combined_state.pending_state
2089            && *join_set_id == found_join_set_id
2090        {
2091            // PendingAt should be set to current time if called from expired_timers_watcher,
2092            // or to a future time if the execution is hot.
2093            let scheduled_at = max(lock_expires_at, response_outer.created_at);
2094            // TODO: Add diff test
2095            // Unblock the state.
2096            Self::update_state_pending_after_response_appended(
2097                tx,
2098                execution_id,
2099                scheduled_at,
2100                &combined_state.corresponding_version, // not changing the version
2101            )?
2102        } else {
2103            AppendNotifier::default()
2104        };
2105        if let JoinSetResponseEvent {
2106            join_set_id,
2107            event:
2108                JoinSetResponse::DelayFinished {
2109                    delay_id,
2110                    result: _,
2111                },
2112        } = &response_outer.event
2113        {
2114            debug!(%join_set_id, %delay_id, "Deleting from `t_delay`");
2115            let mut stmt =
2116                tx.prepare_cached("DELETE FROM t_delay WHERE execution_id = :execution_id AND join_set_id = :join_set_id AND delay_id = :delay_id")
2117                ?;
2118            stmt.execute(named_params! {
2119                ":execution_id": execution_id.to_string(),
2120                ":join_set_id": join_set_id.to_string(),
2121                ":delay_id": delay_id.to_string(),
2122            })?;
2123        }
2124        notifier.response = Some((execution_id.clone(), response_outer));
2125        Ok(notifier)
2126    }
2127
2128    fn append_backtrace(
2129        tx: &Transaction,
2130        backtrace_info: &BacktraceInfo,
2131    ) -> Result<(), DbErrorWrite> {
2132        let backtrace = serde_json::to_string(&backtrace_info.wasm_backtrace).map_err(|err| {
2133            warn!(
2134                "Cannot serialize backtrace {:?} - {err:?}",
2135                backtrace_info.wasm_backtrace
2136            );
2137            DbErrorWriteNonRetriable::ValidationFailed("cannot serialize backtrace".into())
2138        })?;
2139        let mut stmt = tx
2140            .prepare(
2141                "INSERT INTO t_backtrace (execution_id, component_id, version_min_including, version_max_excluding, wasm_backtrace) \
2142                    VALUES (:execution_id, :component_id, :version_min_including, :version_max_excluding, :wasm_backtrace)",
2143            )
2144            ?;
2145        stmt.execute(named_params! {
2146            ":execution_id": backtrace_info.execution_id.to_string(),
2147            ":component_id": backtrace_info.component_id.to_string(),
2148            ":version_min_including": backtrace_info.version_min_including.0,
2149            ":version_max_excluding": backtrace_info.version_max_excluding.0,
2150            ":wasm_backtrace": backtrace,
2151        })?;
2152        Ok(())
2153    }
2154
2155    #[cfg(feature = "test")]
2156    fn get(
2157        tx: &Transaction,
2158        execution_id: &ExecutionId,
2159    ) -> Result<concepts::storage::ExecutionLog, DbErrorRead> {
2160        let mut stmt = tx.prepare(
2161            "SELECT created_at, json_value, version FROM t_execution_log WHERE \
2162                        execution_id = :execution_id ORDER BY version",
2163        )?;
2164        let events = stmt
2165            .query_map(
2166                named_params! {
2167                    ":execution_id": execution_id.to_string(),
2168                },
2169                |row| {
2170                    let created_at = row.get("created_at")?;
2171                    let event = row
2172                        .get::<_, JsonWrapper<ExecutionEventInner>>("json_value")
2173                        .map_err(|serde| {
2174                            error!("Cannot deserialize {row:?} - {serde:?}");
2175                            consistency_rusqlite("cannot deserialize event")
2176                        })?
2177                        .0;
2178                    let version = Version(row.get("version")?);
2179
2180                    Ok(ExecutionEvent {
2181                        created_at,
2182                        event,
2183                        backtrace_id: None,
2184                        version,
2185                    })
2186                },
2187            )?
2188            .collect::<Result<Vec<_>, _>>()?;
2189        if events.is_empty() {
2190            return Err(DbErrorRead::NotFound);
2191        }
2192        let combined_state = Self::get_combined_state(tx, execution_id)?;
2193        let responses = Self::list_responses(tx, execution_id, None)?
2194            .into_iter()
2195            .map(|resp| resp.event)
2196            .collect();
2197        Ok(concepts::storage::ExecutionLog {
2198            execution_id: execution_id.clone(),
2199            events,
2200            responses,
2201            next_version: combined_state.get_next_version_or_finished(), // In case of finished, this will be the already last version
2202            pending_state: combined_state.pending_state,
2203        })
2204    }
2205
2206    fn list_execution_events(
2207        tx: &Transaction,
2208        execution_id: &ExecutionId,
2209        version_min: VersionType,
2210        version_max_excluding: VersionType,
2211        include_backtrace_id: bool,
2212    ) -> Result<Vec<ExecutionEvent>, DbErrorRead> {
2213        let select = if include_backtrace_id {
2214            "SELECT
2215                log.created_at,
2216                log.json_value,
2217                log.version as version,
2218                -- Select version_min_including from backtrace if a match is found, otherwise NULL
2219                bt.version_min_including AS backtrace_id
2220            FROM
2221                t_execution_log AS log
2222            LEFT OUTER JOIN -- Use LEFT JOIN to keep all logs even if no backtrace matches
2223                t_backtrace AS bt ON log.execution_id = bt.execution_id
2224                                -- Check if the log's version falls within the backtrace's range
2225                                AND log.version >= bt.version_min_including
2226                                AND log.version < bt.version_max_excluding
2227            WHERE
2228                log.execution_id = :execution_id
2229                AND log.version >= :version_min
2230                AND log.version < :version_max_excluding
2231            ORDER BY
2232                log.version;"
2233        } else {
2234            "SELECT
2235                created_at, json_value, NULL as backtrace_id, version
2236            FROM t_execution_log WHERE
2237                execution_id = :execution_id AND version >= :version_min AND version < :version_max_excluding
2238            ORDER BY version"
2239        };
2240        tx.prepare(select)?
2241            .query_map(
2242                named_params! {
2243                    ":execution_id": execution_id.to_string(),
2244                    ":version_min": version_min,
2245                    ":version_max_excluding": version_max_excluding
2246                },
2247                |row| {
2248                    let created_at = row.get("created_at")?;
2249                    let backtrace_id = row
2250                        .get::<_, Option<VersionType>>("backtrace_id")?
2251                        .map(Version::new);
2252                    let version = Version(row.get("version")?);
2253
2254                    let event = row
2255                        .get::<_, JsonWrapper<ExecutionEventInner>>("json_value")
2256                        .map(|event| ExecutionEvent {
2257                            created_at,
2258                            event: event.0,
2259                            backtrace_id,
2260                            version,
2261                        })
2262                        .map_err(|serde| {
2263                            error!("Cannot deserialize {row:?} - {serde:?}");
2264                            consistency_rusqlite("cannot deserialize")
2265                        })?;
2266                    Ok(event)
2267                },
2268            )?
2269            .collect::<Result<Vec<_>, _>>()
2270            .map_err(DbErrorRead::from)
2271    }
2272
2273    fn get_execution_event(
2274        tx: &Transaction,
2275        execution_id: &ExecutionId,
2276        version: VersionType,
2277    ) -> Result<ExecutionEvent, DbErrorRead> {
2278        let mut stmt = tx.prepare(
2279            "SELECT created_at, json_value, version FROM t_execution_log WHERE \
2280                        execution_id = :execution_id AND version = :version",
2281        )?;
2282        stmt.query_row(
2283            named_params! {
2284                ":execution_id": execution_id.to_string(),
2285                ":version": version,
2286            },
2287            |row| {
2288                let created_at = row.get("created_at")?;
2289                let event = row
2290                    .get::<_, JsonWrapper<ExecutionEventInner>>("json_value")
2291                    .map_err(|serde| {
2292                        error!("Cannot deserialize {row:?} - {serde:?}");
2293                        consistency_rusqlite("cannot deserialize event")
2294                    })?;
2295                let version = Version(row.get("version")?);
2296
2297                Ok(ExecutionEvent {
2298                    created_at,
2299                    event: event.0,
2300                    backtrace_id: None,
2301                    version,
2302                })
2303            },
2304        )
2305        .map_err(DbErrorRead::from)
2306    }
2307
2308    fn get_last_execution_event(
2309        tx: &Transaction,
2310        execution_id: &ExecutionId,
2311    ) -> Result<ExecutionEvent, DbErrorRead> {
2312        let mut stmt = tx.prepare(
2313            "SELECT created_at, json_value, version FROM t_execution_log WHERE \
2314                        execution_id = :execution_id ORDER BY version DESC",
2315        )?;
2316        stmt.query_row(
2317            named_params! {
2318                ":execution_id": execution_id.to_string(),
2319            },
2320            |row| {
2321                let created_at = row.get("created_at")?;
2322                let event = row
2323                    .get::<_, JsonWrapper<ExecutionEventInner>>("json_value")
2324                    .map_err(|serde| {
2325                        error!("Cannot deserialize {row:?} - {serde:?}");
2326                        consistency_rusqlite("cannot deserialize event")
2327                    })?;
2328                let version = Version(row.get("version")?);
2329                Ok(ExecutionEvent {
2330                    created_at,
2331                    event: event.0,
2332                    backtrace_id: None,
2333                    version: version.clone(),
2334                })
2335            },
2336        )
2337        .map_err(DbErrorRead::from)
2338    }
2339
2340    fn list_responses(
2341        tx: &Transaction,
2342        execution_id: &ExecutionId,
2343        pagination: Option<Pagination<u32>>,
2344    ) -> Result<Vec<ResponseWithCursor>, DbErrorRead> {
2345        // TODO: Add test
2346        let mut params: Vec<(&'static str, Box<dyn rusqlite::ToSql>)> = vec![];
2347        let mut sql = "SELECT \
2348            r.id, r.created_at, r.join_set_id,  r.delay_id, r.delay_success, r.child_execution_id, r.finished_version, l.json_value \
2349            FROM t_join_set_response r LEFT OUTER JOIN t_execution_log l ON r.child_execution_id = l.execution_id \
2350            WHERE \
2351            r.execution_id = :execution_id \
2352            AND ( r.finished_version = l.version OR r.child_execution_id IS NULL ) \
2353            "
2354        .to_string();
2355        let limit = match &pagination {
2356            Some(
2357                pagination @ (Pagination::NewerThan { cursor, .. }
2358                | Pagination::OlderThan { cursor, .. }),
2359            ) => {
2360                params.push((":cursor", Box::new(cursor)));
2361                write!(sql, " AND r.id {rel} :cursor", rel = pagination.rel()).unwrap();
2362                Some(pagination.length())
2363            }
2364            None => None,
2365        };
2366        sql.push_str(" ORDER BY id");
2367        if pagination.as_ref().is_some_and(Pagination::is_desc) {
2368            sql.push_str(" DESC");
2369        }
2370        if let Some(limit) = limit {
2371            write!(sql, " LIMIT {limit}").unwrap();
2372        }
2373        params.push((":execution_id", Box::new(execution_id.to_string())));
2374        tx.prepare(&sql)?
2375            .query_map::<_, &[(&'static str, &dyn ToSql)], _>(
2376                params
2377                    .iter()
2378                    .map(|(key, value)| (*key, value.as_ref()))
2379                    .collect::<Vec<_>>()
2380                    .as_ref(),
2381                Self::parse_response_with_cursor,
2382            )?
2383            .collect::<Result<Vec<_>, rusqlite::Error>>()
2384            .map_err(DbErrorRead::from)
2385    }
2386
2387    fn parse_response_with_cursor(
2388        row: &rusqlite::Row<'_>,
2389    ) -> Result<ResponseWithCursor, rusqlite::Error> {
2390        let id = row.get("id")?;
2391        let created_at: DateTime<Utc> = row.get("created_at")?;
2392        let join_set_id = row.get::<_, JoinSetId>("join_set_id")?;
2393        let event = match (
2394            row.get::<_, Option<DelayId>>("delay_id")?,
2395            row.get::<_, Option<bool>>("delay_success")?,
2396            row.get::<_, Option<ExecutionIdDerived>>("child_execution_id")?,
2397            row.get::<_, Option<VersionType>>("finished_version")?,
2398            row.get::<_, Option<JsonWrapper<ExecutionEventInner>>>("json_value")?,
2399        ) {
2400            (Some(delay_id), Some(delay_success), None, None, None) => {
2401                JoinSetResponse::DelayFinished {
2402                    delay_id,
2403                    result: delay_success.then_some(()).ok_or(()),
2404                }
2405            }
2406            (
2407                None,
2408                None,
2409                Some(child_execution_id),
2410                Some(finished_version),
2411                Some(JsonWrapper(ExecutionEventInner::Finished { result, .. })),
2412            ) => JoinSetResponse::ChildExecutionFinished {
2413                child_execution_id,
2414                finished_version: Version(finished_version),
2415                result,
2416            },
2417            (delay, delay_success, child, finished, result) => {
2418                error!(
2419                    "Invalid row in t_join_set_response {id} - {delay:?} {delay_success:?} {child:?} {finished:?} {:?}",
2420                    result.map(|it| it.0)
2421                );
2422                return Err(consistency_rusqlite("invalid row in t_join_set_response"));
2423            }
2424        };
2425        Ok(ResponseWithCursor {
2426            cursor: id,
2427            event: JoinSetResponseEventOuter {
2428                event: JoinSetResponseEvent { join_set_id, event },
2429                created_at,
2430            },
2431        })
2432    }
2433
2434    fn nth_response(
2435        tx: &Transaction,
2436        execution_id: &ExecutionId,
2437        join_set_id: &JoinSetId,
2438        skip_rows: u64,
2439    ) -> Result<Option<ResponseWithCursor>, DbErrorRead> {
2440        // TODO: Add test
2441        tx
2442            .prepare(
2443                "SELECT r.id, r.created_at, r.join_set_id, \
2444                    r.delay_id, r.delay_success, \
2445                    r.child_execution_id, r.finished_version, l.json_value \
2446                    FROM t_join_set_response r LEFT OUTER JOIN t_execution_log l ON r.child_execution_id = l.execution_id \
2447                    WHERE \
2448                    r.execution_id = :execution_id AND r.join_set_id = :join_set_id AND \
2449                    (
2450                    r.finished_version = l.version \
2451                    OR \
2452                    r.child_execution_id IS NULL \
2453                    ) \
2454                    ORDER BY id \
2455                    LIMIT 1 OFFSET :offset",
2456            )
2457            ?
2458            .query_row(
2459                named_params! {
2460                    ":execution_id": execution_id.to_string(),
2461                    ":join_set_id": join_set_id.to_string(),
2462                    ":offset": skip_rows,
2463                },
2464                Self::parse_response_with_cursor,
2465            )
2466            .optional()
2467            .map_err(DbErrorRead::from)
2468    }
2469
2470    // TODO(perf): Instead of OFFSET an per-execution sequential ID could improve the read performance.
2471    #[instrument(level = Level::TRACE, skip_all)]
2472    fn get_responses_with_offset(
2473        tx: &Transaction,
2474        execution_id: &ExecutionId,
2475        skip_rows: usize,
2476    ) -> Result<Vec<JoinSetResponseEventOuter>, DbErrorRead> {
2477        // TODO: Add test
2478        tx.prepare(
2479            "SELECT r.id, r.created_at, r.join_set_id, \
2480            r.delay_id, r.delay_success, \
2481            r.child_execution_id, r.finished_version, l.json_value \
2482            FROM t_join_set_response r LEFT OUTER JOIN t_execution_log l ON r.child_execution_id = l.execution_id \
2483            WHERE \
2484            r.execution_id = :execution_id AND \
2485            ( \
2486            r.finished_version = l.version \
2487            OR r.child_execution_id IS NULL \
2488            ) \
2489            ORDER BY id \
2490            LIMIT -1 OFFSET :offset",
2491        )
2492        ?
2493        .query_map(
2494            named_params! {
2495                ":execution_id": execution_id.to_string(),
2496                ":offset": skip_rows,
2497            },
2498            Self::parse_response_with_cursor,
2499        )
2500        ?
2501        .collect::<Result<Vec<_>, _>>()
2502        .map(|resp| resp.into_iter().map(|vec| vec.event).collect())
2503        .map_err(DbErrorRead::from)
2504    }
2505
2506    fn get_pending_of_single_ffqn(
2507        mut stmt: CachedStatement,
2508        batch_size: usize,
2509        pending_at_or_sooner: DateTime<Utc>,
2510        ffqn: &FunctionFqn,
2511    ) -> Result<Vec<(ExecutionId, Version)>, ()> {
2512        stmt.query_map(
2513            named_params! {
2514                ":pending_expires_finished": pending_at_or_sooner,
2515                ":ffqn": ffqn.to_string(),
2516                ":batch_size": batch_size,
2517            },
2518            |row| {
2519                let execution_id = row.get::<_, String>("execution_id")?.parse::<ExecutionId>();
2520                let next_version =
2521                    Version::new(row.get::<_, VersionType>("corresponding_version")?).increment();
2522                Ok(execution_id.map(|exe| (exe, next_version)))
2523            },
2524        )
2525        .map_err(|err| {
2526            warn!("Ignoring consistency error {err:?}");
2527        })?
2528        .collect::<Result<Vec<_>, _>>()
2529        .map_err(|err| {
2530            warn!("Ignoring consistency error {err:?}");
2531        })?
2532        .into_iter()
2533        .collect::<Result<Vec<_>, _>>()
2534        .map_err(|err| {
2535            warn!("Ignoring consistency error {err:?}");
2536        })
2537    }
2538
2539    /// Get executions and their next versions
2540    fn get_pending(
2541        conn: &Connection,
2542        batch_size: usize,
2543        pending_at_or_sooner: DateTime<Utc>,
2544        ffqns: &[FunctionFqn],
2545    ) -> Result<Vec<(ExecutionId, Version)>, DbErrorGeneric> {
2546        let mut execution_ids_versions = Vec::with_capacity(batch_size);
2547        for ffqn in ffqns {
2548            // Select executions in PendingAt.
2549            let stmt = conn
2550                .prepare_cached(&format!(
2551                    r#"
2552                    SELECT execution_id, corresponding_version FROM t_state WHERE
2553                    state = "{STATE_PENDING_AT}" AND
2554                    pending_expires_finished <= :pending_expires_finished AND ffqn = :ffqn
2555                    ORDER BY pending_expires_finished LIMIT :batch_size
2556                    "#
2557                ))
2558                .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
2559
2560            if let Ok(execs_and_versions) = Self::get_pending_of_single_ffqn(
2561                stmt,
2562                batch_size - execution_ids_versions.len(),
2563                pending_at_or_sooner,
2564                ffqn,
2565            ) {
2566                execution_ids_versions.extend(execs_and_versions);
2567                if execution_ids_versions.len() == batch_size {
2568                    // Prioritieze lowering of db requests, although ffqns later in the list might get starved.
2569                    break;
2570                }
2571                // consistency errors are ignored since we want to return at least some rows.
2572            }
2573        }
2574        Ok(execution_ids_versions)
2575    }
2576
2577    // Must be called after write transaction for a correct happens-before relationship.
2578    #[instrument(level = Level::TRACE, skip_all)]
2579    fn notify_all(&self, notifiers: Vec<AppendNotifier>, current_time: DateTime<Utc>) {
2580        let (pending_ats, finished_execs, responses) = {
2581            let (mut pending_ats, mut finished_execs, mut responses) =
2582                (Vec::new(), Vec::new(), Vec::new());
2583            for notifier in notifiers {
2584                if let Some(pending_at) = notifier.pending_at {
2585                    pending_ats.push(pending_at);
2586                }
2587                if let Some(finished) = notifier.execution_finished {
2588                    finished_execs.push(finished);
2589                }
2590                if let Some(response) = notifier.response {
2591                    responses.push(response);
2592                }
2593            }
2594            (pending_ats, finished_execs, responses)
2595        };
2596
2597        // Notify pending_at subscribers.
2598        if !pending_ats.is_empty() {
2599            let guard = self.0.pending_ffqn_subscribers.lock().unwrap();
2600            for pending_at in pending_ats {
2601                Self::notify_pending_locked(&pending_at, current_time, &guard);
2602            }
2603        }
2604        // Notify execution finished subscribers.
2605        // Every NotifierExecutionFinished value belongs to a different execution, since only `append(Finished)` can produce `NotifierExecutionFinished`.
2606        if !finished_execs.is_empty() {
2607            let mut guard = self.0.execution_finished_subscribers.lock().unwrap();
2608            for finished in finished_execs {
2609                if let Some(listeners_of_exe_id) = guard.remove(&finished.execution_id) {
2610                    for (_tag, sender) in listeners_of_exe_id {
2611                        // Sending while holding the lock but the oneshot sender does not block.
2612                        // If `wait_for_finished_result` happens after the append, it would receive the finished value instead.
2613                        let _ = sender.send(finished.retval.clone());
2614                    }
2615                }
2616            }
2617        }
2618        // Notify response subscribers.
2619        if !responses.is_empty() {
2620            let mut guard = self.0.response_subscribers.lock().unwrap();
2621            for (execution_id, response) in responses {
2622                if let Some((sender, _)) = guard.remove(&execution_id) {
2623                    let _ = sender.send(response);
2624                }
2625            }
2626        }
2627    }
2628
2629    fn notify_pending_locked(
2630        notifier: &NotifierPendingAt,
2631        current_time: DateTime<Utc>,
2632        ffqn_to_pending_subscription: &std::sync::MutexGuard<
2633            HashMap<FunctionFqn, (mpsc::Sender<()>, u64)>,
2634        >,
2635    ) {
2636        // No need to remove here, cleanup is handled by the caller.
2637        if notifier.scheduled_at <= current_time
2638            && let Some((subscription, _)) = ffqn_to_pending_subscription.get(&notifier.ffqn)
2639        {
2640            debug!("Notifying pending subscriber");
2641            // Does not block
2642            let _ = subscription.try_send(());
2643        }
2644    }
2645}
2646
2647#[async_trait]
2648impl DbExecutor for SqlitePool {
2649    #[instrument(level = Level::TRACE, skip(self))]
2650    async fn lock_pending(
2651        &self,
2652        batch_size: usize,
2653        pending_at_or_sooner: DateTime<Utc>,
2654        ffqns: Arc<[FunctionFqn]>,
2655        created_at: DateTime<Utc>,
2656        component_id: ComponentId,
2657        executor_id: ExecutorId,
2658        lock_expires_at: DateTime<Utc>,
2659        run_id: RunId,
2660        retry_config: ComponentRetryConfig,
2661    ) -> Result<LockPendingResponse, DbErrorGeneric> {
2662        let execution_ids_versions = self
2663            .transaction(
2664                move |conn| Self::get_pending(conn, batch_size, pending_at_or_sooner, &ffqns),
2665                "get_pending",
2666            )
2667            .await?;
2668        if execution_ids_versions.is_empty() {
2669            Ok(vec![])
2670        } else {
2671            debug!("Locking {execution_ids_versions:?}");
2672            self.transaction(
2673                move |tx| {
2674                    let mut locked_execs = Vec::with_capacity(execution_ids_versions.len());
2675                    // Append lock
2676                    for (execution_id, version) in &execution_ids_versions {
2677                        match Self::lock_single_execution(
2678                            tx,
2679                            created_at,
2680                            &component_id,
2681                            execution_id,
2682                            run_id,
2683                            version,
2684                            executor_id,
2685                            lock_expires_at,
2686                            retry_config,
2687                        ) {
2688                            Ok(locked) => locked_execs.push(locked),
2689                            Err(err) => {
2690                                warn!("Locking row {execution_id} failed - {err:?}");
2691                            }
2692                        }
2693                    }
2694                    Ok(locked_execs)
2695                },
2696                "lock_pending",
2697            )
2698            .await
2699        }
2700    }
2701
2702    #[instrument(level = Level::DEBUG, skip(self))]
2703    async fn lock_one(
2704        &self,
2705        created_at: DateTime<Utc>,
2706        component_id: ComponentId,
2707        execution_id: &ExecutionId,
2708        run_id: RunId,
2709        version: Version,
2710        executor_id: ExecutorId,
2711        lock_expires_at: DateTime<Utc>,
2712        retry_config: ComponentRetryConfig,
2713    ) -> Result<LockedExecution, DbErrorWrite> {
2714        debug!(%execution_id, "lock_one");
2715        let execution_id = execution_id.clone();
2716        self.transaction(
2717            move |tx| {
2718                Self::lock_single_execution(
2719                    tx,
2720                    created_at,
2721                    &component_id,
2722                    &execution_id,
2723                    run_id,
2724                    &version,
2725                    executor_id,
2726                    lock_expires_at,
2727                    retry_config,
2728                )
2729            },
2730            "lock_inner",
2731        )
2732        .await
2733    }
2734
2735    #[instrument(level = Level::DEBUG, skip(self, req))]
2736    async fn append(
2737        &self,
2738        execution_id: ExecutionId,
2739        version: Version,
2740        req: AppendRequest,
2741    ) -> Result<AppendResponse, DbErrorWrite> {
2742        debug!(%req, "append");
2743        trace!(?req, "append");
2744        let created_at = req.created_at;
2745        let (version, notifier) = self
2746            .transaction(
2747                move |tx| Self::append(tx, &execution_id, req.clone(), version.clone()),
2748                "append",
2749            )
2750            .await?;
2751        self.notify_all(vec![notifier], created_at);
2752        Ok(version)
2753    }
2754
2755    #[instrument(level = Level::DEBUG, skip_all)]
2756    async fn append_batch_respond_to_parent(
2757        &self,
2758        events: AppendEventsToExecution,
2759        response: AppendResponseToExecution,
2760        current_time: DateTime<Utc>,
2761    ) -> Result<AppendBatchResponse, DbErrorWrite> {
2762        debug!("append_batch_respond_to_parent");
2763        if events.execution_id == response.parent_execution_id {
2764            // Pending state would be wrong.
2765            // This is not a panic because it depends on DB state.
2766            return Err(DbErrorWrite::NonRetriable(
2767                DbErrorWriteNonRetriable::ValidationFailed(
2768                    "Parameters `execution_id` and `parent_execution_id` cannot be the same".into(),
2769                ),
2770            ));
2771        }
2772        if events.batch.is_empty() {
2773            error!("Batch cannot be empty");
2774            return Err(DbErrorWrite::NonRetriable(
2775                DbErrorWriteNonRetriable::ValidationFailed("batch cannot be empty".into()),
2776            ));
2777        }
2778        let (version, notifiers) = {
2779            self.transaction(
2780                move |tx| {
2781                    let mut version = events.version.clone();
2782                    let mut notifier_of_child = None;
2783                    for append_request in &events.batch {
2784                        let (v, n) = Self::append(
2785                            tx,
2786                            &events.execution_id,
2787                            append_request.clone(),
2788                            version,
2789                        )?;
2790                        version = v;
2791                        notifier_of_child = Some(n);
2792                    }
2793
2794                    let pending_at_parent = Self::append_response(
2795                        tx,
2796                        &response.parent_execution_id,
2797                        JoinSetResponseEventOuter {
2798                            created_at: response.created_at,
2799                            event: JoinSetResponseEvent {
2800                                join_set_id: response.join_set_id.clone(),
2801                                event: JoinSetResponse::ChildExecutionFinished {
2802                                    child_execution_id: response.child_execution_id.clone(),
2803                                    finished_version: response.finished_version.clone(),
2804                                    result: response.result.clone(),
2805                                },
2806                            },
2807                        },
2808                    )?;
2809                    Ok::<_, DbErrorWrite>((
2810                        version,
2811                        vec![
2812                            notifier_of_child.expect("checked that the batch is not empty"),
2813                            pending_at_parent,
2814                        ],
2815                    ))
2816                },
2817                "append_batch_respond_to_parent",
2818            )
2819            .await?
2820        };
2821        self.notify_all(notifiers, current_time);
2822        Ok(version)
2823    }
2824
2825    // Supports only one subscriber per ffqn.
2826    // A new subscriber replaces the old one, which will eventually time out, which is fine.
2827    #[instrument(level = Level::TRACE, skip(self, timeout_fut))]
2828    async fn wait_for_pending(
2829        &self,
2830        pending_at_or_sooner: DateTime<Utc>,
2831        ffqns: Arc<[FunctionFqn]>,
2832        timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
2833    ) {
2834        let unique_tag: u64 = rand::random();
2835        let (sender, mut receiver) = mpsc::channel(1); // senders must use `try_send`
2836        {
2837            let mut ffqn_to_pending_subscription = self.0.pending_ffqn_subscribers.lock().unwrap();
2838            for ffqn in ffqns.as_ref() {
2839                ffqn_to_pending_subscription.insert(ffqn.clone(), (sender.clone(), unique_tag));
2840            }
2841        }
2842        async {
2843            let Ok(execution_ids_versions) = self
2844                .transaction(
2845                    {
2846                        let ffqns = ffqns.clone();
2847                        move |conn| Self::get_pending(conn, 1, pending_at_or_sooner, ffqns.as_ref())
2848                    },
2849                    "subscribe_to_pending",
2850                )
2851                .await
2852            else {
2853                trace!(
2854                    "Ignoring get_pending error and waiting in for timeout to avoid executor repolling too soon"
2855                );
2856                timeout_fut.await;
2857                return;
2858            };
2859            if !execution_ids_versions.is_empty() {
2860                trace!("Not waiting, database already contains new pending executions");
2861                return;
2862            }
2863            tokio::select! { // future's liveness: Dropping the loser immediately.
2864                _ = receiver.recv() => {
2865                    trace!("Received a notification");
2866                }
2867                () = timeout_fut => {
2868                }
2869            }
2870        }.await;
2871        // Clean up ffqn_to_pending_subscription in any case
2872        {
2873            let mut ffqn_to_pending_subscription = self.0.pending_ffqn_subscribers.lock().unwrap();
2874            for ffqn in ffqns.as_ref() {
2875                match ffqn_to_pending_subscription.remove(ffqn) {
2876                    Some((_, tag)) if tag == unique_tag => {
2877                        // Cleanup OK.
2878                    }
2879                    Some(other) => {
2880                        // Reinsert foreign sender.
2881                        ffqn_to_pending_subscription.insert(ffqn.clone(), other);
2882                    }
2883                    None => {
2884                        // Value was replaced and cleaned up already.
2885                    }
2886                }
2887            }
2888        }
2889    }
2890}
2891
2892#[async_trait]
2893impl DbConnection for SqlitePool {
2894    #[instrument(level = Level::DEBUG, skip_all, fields(execution_id = %req.execution_id))]
2895    async fn create(&self, req: CreateRequest) -> Result<AppendResponse, DbErrorWrite> {
2896        debug!("create");
2897        trace!(?req, "create");
2898        let created_at = req.created_at;
2899        let (version, notifier) = self
2900            .transaction(move |tx| Self::create_inner(tx, req.clone()), "create")
2901            .await?;
2902        self.notify_all(vec![notifier], created_at);
2903        Ok(version)
2904    }
2905
2906    #[instrument(level = Level::DEBUG, skip(self, batch))]
2907    async fn append_batch(
2908        &self,
2909        current_time: DateTime<Utc>,
2910        batch: Vec<AppendRequest>,
2911        execution_id: ExecutionId,
2912        version: Version,
2913    ) -> Result<AppendBatchResponse, DbErrorWrite> {
2914        debug!("append_batch");
2915        trace!(?batch, "append_batch");
2916        assert!(!batch.is_empty(), "Empty batch request");
2917
2918        let (version, notifier) = self
2919            .transaction(
2920                move |tx| {
2921                    let mut version = version.clone();
2922                    let mut notifier = None;
2923                    for append_request in &batch {
2924                        let (v, n) =
2925                            Self::append(tx, &execution_id, append_request.clone(), version)?;
2926                        version = v;
2927                        notifier = Some(n);
2928                    }
2929                    Ok::<_, DbErrorWrite>((
2930                        version,
2931                        notifier.expect("checked that the batch is not empty"),
2932                    ))
2933                },
2934                "append_batch",
2935            )
2936            .await?;
2937
2938        self.notify_all(vec![notifier], current_time);
2939        Ok(version)
2940    }
2941
2942    #[instrument(level = Level::DEBUG, skip(self, batch, child_req))]
2943    async fn append_batch_create_new_execution(
2944        &self,
2945        current_time: DateTime<Utc>,
2946        batch: Vec<AppendRequest>,
2947        execution_id: ExecutionId,
2948        version: Version,
2949        child_req: Vec<CreateRequest>,
2950    ) -> Result<AppendBatchResponse, DbErrorWrite> {
2951        debug!("append_batch_create_new_execution");
2952        trace!(?batch, ?child_req, "append_batch_create_new_execution");
2953        assert!(!batch.is_empty(), "Empty batch request");
2954
2955        let (version, notifiers) = self
2956            .transaction(
2957                move |tx| {
2958                    let mut notifier = None;
2959                    let mut version = version.clone();
2960                    for append_request in &batch {
2961                        let (v, n) =
2962                            Self::append(tx, &execution_id, append_request.clone(), version)?;
2963                        version = v;
2964                        notifier = Some(n);
2965                    }
2966                    let mut notifiers = Vec::new();
2967                    notifiers.push(notifier.expect("checked that the batch is not empty"));
2968
2969                    for child_req in &child_req {
2970                        let (_, notifier) = Self::create_inner(tx, child_req.clone())?;
2971                        notifiers.push(notifier);
2972                    }
2973                    Ok::<_, DbErrorWrite>((version, notifiers))
2974                },
2975                "append_batch_create_new_execution_inner",
2976            )
2977            .await?;
2978        self.notify_all(notifiers, current_time);
2979        Ok(version)
2980    }
2981
2982    #[cfg(feature = "test")]
2983    #[instrument(level = Level::DEBUG, skip(self))]
2984    async fn get(
2985        &self,
2986        execution_id: &ExecutionId,
2987    ) -> Result<concepts::storage::ExecutionLog, DbErrorRead> {
2988        trace!("get");
2989        let execution_id = execution_id.clone();
2990        self.transaction(move |tx| Self::get(tx, &execution_id), "get")
2991            .await
2992    }
2993
2994    #[instrument(level = Level::DEBUG, skip(self))]
2995    async fn list_execution_events(
2996        &self,
2997        execution_id: &ExecutionId,
2998        since: &Version,
2999        max_length: VersionType,
3000        include_backtrace_id: bool,
3001    ) -> Result<Vec<ExecutionEvent>, DbErrorRead> {
3002        let execution_id = execution_id.clone();
3003        let since = since.0;
3004        self.transaction(
3005            move |tx| {
3006                Self::list_execution_events(
3007                    tx,
3008                    &execution_id,
3009                    since,
3010                    since + max_length,
3011                    include_backtrace_id,
3012                )
3013            },
3014            "get",
3015        )
3016        .await
3017    }
3018
3019    // Supports only one subscriber per execution id.
3020    // A new call will overwrite the old subscriber, the old one will end
3021    // with a timeout, which is fine.
3022    #[instrument(level = Level::DEBUG, skip(self, timeout_fut))]
3023    async fn subscribe_to_next_responses(
3024        &self,
3025        execution_id: &ExecutionId,
3026        start_idx: usize,
3027        timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
3028    ) -> Result<Vec<JoinSetResponseEventOuter>, DbErrorReadWithTimeout> {
3029        debug!("next_responses");
3030        let unique_tag: u64 = rand::random();
3031        let execution_id = execution_id.clone();
3032
3033        let cleanup = || {
3034            let mut guard = self.0.response_subscribers.lock().unwrap();
3035            match guard.remove(&execution_id) {
3036                Some((_, tag)) if tag == unique_tag => {} // Cleanup OK.
3037                Some(other) => {
3038                    // Reinsert foreign sender.
3039                    guard.insert(execution_id.clone(), other);
3040                }
3041                None => {} // Value was replaced and cleaned up already, or notification was sent.
3042            }
3043        };
3044
3045        let response_subscribers = self.0.response_subscribers.clone();
3046        let resp_or_receiver = {
3047            let execution_id = execution_id.clone();
3048            self.transaction(
3049                move |tx| {
3050                    let responses = Self::get_responses_with_offset(tx, &execution_id, start_idx)?;
3051                    if responses.is_empty() {
3052                        // cannot race as we have the transaction write lock
3053                        let (sender, receiver) = oneshot::channel();
3054                        response_subscribers
3055                            .lock()
3056                            .unwrap()
3057                            .insert(execution_id.clone(), (sender, unique_tag));
3058                        Ok::<_, DbErrorReadWithTimeout>(itertools::Either::Right(receiver))
3059                    } else {
3060                        Ok(itertools::Either::Left(responses))
3061                    }
3062                },
3063                "subscribe_to_next_responses",
3064            )
3065            .await
3066        }
3067        .inspect_err(|_| {
3068            cleanup();
3069        })?;
3070        match resp_or_receiver {
3071            itertools::Either::Left(resp) => Ok(resp), // no need for cleanup
3072            itertools::Either::Right(receiver) => {
3073                let res = async move {
3074                    tokio::select! {
3075                        resp = receiver => {
3076                            let resp = resp.map_err(|_| DbErrorGeneric::Close)?;
3077                            Ok(vec![resp])
3078                        }
3079                        () = timeout_fut => Err(DbErrorReadWithTimeout::Timeout),
3080                    }
3081                }
3082                .await;
3083                cleanup();
3084                res
3085            }
3086        }
3087    }
3088
3089    // Supports multiple subscribers.
3090    async fn wait_for_finished_result(
3091        &self,
3092        execution_id: &ExecutionId,
3093        timeout_fut: Option<Pin<Box<dyn Future<Output = ()> + Send>>>,
3094    ) -> Result<SupportedFunctionReturnValue, DbErrorReadWithTimeout> {
3095        let unique_tag: u64 = rand::random();
3096        let execution_id = execution_id.clone();
3097        let execution_finished_subscription = self.0.execution_finished_subscribers.clone();
3098
3099        let cleanup = || {
3100            let mut guard = self.0.execution_finished_subscribers.lock().unwrap();
3101            if let Some(subscribers) = guard.get_mut(&execution_id) {
3102                subscribers.remove(&unique_tag);
3103            }
3104        };
3105
3106        let resp_or_receiver = {
3107            let execution_id = execution_id.clone();
3108            self.transaction(move |tx| {
3109                let pending_state =
3110                    Self::get_combined_state(tx, &execution_id)?.pending_state;
3111                if let PendingState::Finished { finished } = pending_state {
3112                    let event =
3113                        Self::get_execution_event(tx, &execution_id, finished.version)?;
3114                    if let ExecutionEventInner::Finished { result, ..} = event.event {
3115                        Ok(itertools::Either::Left(result))
3116                    } else {
3117                        error!("Mismatch, expected Finished row: {event:?} based on t_state {finished}");
3118                        Err(DbErrorReadWithTimeout::from(consistency_db_err(
3119                            "cannot get finished event based on t_state version"
3120                        )))
3121                    }
3122                } else {
3123                    // Cannot race with the notifier as we have the transaction write lock:
3124                    // Either the finished event was appended previously, thus `itertools::Either::Left` was selected,
3125                    // or we end up here. If this tx fails, the cleanup will remove this entry.
3126                    let (sender, receiver) = oneshot::channel();
3127                    let mut guard = execution_finished_subscription.lock().unwrap();
3128                    guard.entry(execution_id.clone()).or_default().insert(unique_tag, sender);
3129                    Ok(itertools::Either::Right(receiver))
3130                }
3131            }, "wait_for_finished_result")
3132            .await
3133        }
3134        .inspect_err(|_| {
3135            // This cleanup can race with the notification sender, since both are running after a transaction was finished.
3136            // If the notification sender wins, it removes our oneshot sender and puts a value in it, cleanup will not find the unique tag.
3137            // If cleanup wins, it simply removes the oneshot sender.
3138            cleanup();
3139        })?;
3140
3141        let timeout_fut = timeout_fut.unwrap_or_else(|| Box::pin(std::future::pending()));
3142        match resp_or_receiver {
3143            itertools::Either::Left(resp) => Ok(resp), // no need for cleanup
3144            itertools::Either::Right(receiver) => {
3145                let res = async move {
3146                    tokio::select! {
3147                        resp = receiver => {
3148                            Ok(resp.expect("the notifier sends to all listeners, cannot race with cleanup"))
3149                        }
3150                        () = timeout_fut => Err(DbErrorReadWithTimeout::Timeout),
3151                    }
3152                }
3153                .await;
3154                cleanup();
3155                res
3156            }
3157        }
3158    }
3159
3160    #[cfg(feature = "test")]
3161    #[instrument(level = Level::DEBUG, skip(self, response_event), fields(join_set_id = %response_event.join_set_id))]
3162    async fn append_response(
3163        &self,
3164        created_at: DateTime<Utc>,
3165        execution_id: ExecutionId,
3166        response_event: JoinSetResponseEvent,
3167    ) -> Result<(), DbErrorWrite> {
3168        debug!("append_response");
3169        let event = JoinSetResponseEventOuter {
3170            created_at,
3171            event: response_event,
3172        };
3173        let notifier = self
3174            .transaction(
3175                move |tx| Self::append_response(tx, &execution_id, event.clone()),
3176                "append_response",
3177            )
3178            .await?;
3179        self.notify_all(vec![notifier], created_at);
3180        Ok(())
3181    }
3182
3183    #[instrument(level = Level::DEBUG, skip_all, fields(%join_set_id, %execution_id))]
3184    async fn append_delay_response(
3185        &self,
3186        created_at: DateTime<Utc>,
3187        execution_id: ExecutionId,
3188        join_set_id: JoinSetId,
3189        delay_id: DelayId,
3190        result: Result<(), ()>,
3191    ) -> Result<(), DbErrorWrite> {
3192        debug!("append_delay_response");
3193        let event = JoinSetResponseEventOuter {
3194            created_at,
3195            event: JoinSetResponseEvent {
3196                join_set_id,
3197                event: JoinSetResponse::DelayFinished { delay_id, result },
3198            },
3199        };
3200        let notifier = self
3201            .transaction(
3202                move |tx| Self::append_response(tx, &execution_id, event.clone()),
3203                "append_delay_response",
3204            )
3205            .await?;
3206        self.notify_all(vec![notifier], created_at);
3207        Ok(())
3208    }
3209
3210    #[instrument(level = Level::DEBUG, skip_all)]
3211    async fn append_backtrace(&self, append: BacktraceInfo) -> Result<(), DbErrorWrite> {
3212        debug!("append_backtrace");
3213        self.transaction(
3214            move |tx| Self::append_backtrace(tx, &append),
3215            "append_backtrace",
3216        )
3217        .await
3218    }
3219
3220    #[instrument(level = Level::DEBUG, skip_all)]
3221    async fn append_backtrace_batch(&self, batch: Vec<BacktraceInfo>) -> Result<(), DbErrorWrite> {
3222        debug!("append_backtrace_batch");
3223        self.transaction(
3224            move |tx| {
3225                for append in &batch {
3226                    Self::append_backtrace(tx, append)?;
3227                }
3228                Ok(())
3229            },
3230            "append_backtrace",
3231        )
3232        .await
3233    }
3234
3235    #[instrument(level = Level::DEBUG, skip_all)]
3236    async fn get_backtrace(
3237        &self,
3238        execution_id: &ExecutionId,
3239        filter: BacktraceFilter,
3240    ) -> Result<BacktraceInfo, DbErrorRead> {
3241        debug!("get_last_backtrace");
3242        let execution_id = execution_id.clone();
3243
3244        self.transaction(
3245            move |tx| {
3246                let select = "SELECT component_id, version_min_including, version_max_excluding, wasm_backtrace FROM t_backtrace \
3247                                WHERE execution_id = :execution_id";
3248                let mut params: Vec<(&'static str, Box<dyn rusqlite::ToSql>)> = vec![(":execution_id", Box::new(execution_id.to_string()))];
3249                let select = match &filter {
3250                    BacktraceFilter::Specific(version) =>{
3251                        params.push((":version", Box::new(version.0)));
3252                        format!("{select} AND version_min_including <= :version AND version_max_excluding > :version")
3253                    },
3254                    BacktraceFilter::First => format!("{select} ORDER BY version_min_including LIMIT 1"),
3255                    BacktraceFilter::Last => format!("{select} ORDER BY version_min_including DESC LIMIT 1")
3256                };
3257                tx
3258                    .prepare(&select)
3259                    ?
3260                    .query_row::<_, &[(&'static str, &dyn ToSql)], _>(
3261                        params
3262                            .iter()
3263                            .map(|(key, value)| (*key, value.as_ref()))
3264                            .collect::<Vec<_>>()
3265                            .as_ref(),
3266                    |row| {
3267                        Ok(BacktraceInfo {
3268                            execution_id: execution_id.clone(),
3269                            component_id: row.get::<_, FromStrWrapper<_> >("component_id")?.0,
3270                            version_min_including: Version::new(row.get::<_, VersionType>("version_min_including")?),
3271                            version_max_excluding: Version::new(row.get::<_, VersionType>("version_max_excluding")?),
3272                            wasm_backtrace: row.get::<_, JsonWrapper<_>>("wasm_backtrace")?.0,
3273                        })
3274                    },
3275                ).map_err(DbErrorRead::from)
3276            },
3277            "get_last_backtrace",
3278        ).await
3279    }
3280
3281    /// Get currently expired delays and locks.
3282    #[instrument(level = Level::TRACE, skip(self))]
3283    async fn get_expired_timers(
3284        &self,
3285        at: DateTime<Utc>,
3286    ) -> Result<Vec<ExpiredTimer>, DbErrorGeneric> {
3287        self.transaction(
3288            move |conn| {
3289                let mut expired_timers = conn.prepare(
3290                    "SELECT execution_id, join_set_id, delay_id FROM t_delay WHERE expires_at <= :at",
3291                )?
3292                .query_map(
3293                        named_params! {
3294                            ":at": at,
3295                        },
3296                        |row| {
3297                            let execution_id = row.get("execution_id")?;
3298                            let join_set_id = row.get::<_, JoinSetId>("join_set_id")?;
3299                            let delay_id = row.get::<_, DelayId>("delay_id")?;
3300                            let delay = ExpiredDelay { execution_id, join_set_id, delay_id };
3301                            Ok(ExpiredTimer::Delay(delay))
3302                        },
3303                    )?
3304                    .collect::<Result<Vec<_>, _>>()?;
3305                // Extend with expired locks
3306                let expired = conn.prepare(&format!(r#"
3307                    SELECT execution_id, last_lock_version, corresponding_version, intermittent_event_count, max_retries, retry_exp_backoff_millis,
3308                    executor_id, run_id
3309                    FROM t_state
3310                    WHERE pending_expires_finished <= :at AND state = "{STATE_LOCKED}"
3311                    "#
3312                )
3313                )?
3314                .query_map(
3315                        named_params! {
3316                            ":at": at,
3317                        },
3318                        |row| {
3319                            let execution_id = row.get("execution_id")?;
3320                            let locked_at_version = Version::new(row.get("last_lock_version")?);
3321                            let next_version = Version::new(row.get("corresponding_version")?).increment();
3322                            let intermittent_event_count = row.get("intermittent_event_count")?;
3323                            let max_retries = row.get("max_retries")?;
3324                            let retry_exp_backoff_millis = row.get("retry_exp_backoff_millis")?;
3325                            let executor_id = row.get("executor_id")?;
3326                            let run_id = row.get("run_id")?;
3327                            let lock = ExpiredLock {
3328                                execution_id,
3329                                locked_at_version,
3330                                next_version,
3331                                intermittent_event_count,
3332                                max_retries,
3333                                retry_exp_backoff: Duration::from_millis(retry_exp_backoff_millis),
3334                                locked_by: LockedBy { executor_id, run_id },
3335                            };
3336                            Ok(ExpiredTimer::Lock(lock))
3337                        }
3338                    )?
3339                    .collect::<Result<Vec<_>, _>>()?;
3340                expired_timers.extend(expired);
3341                if !expired_timers.is_empty() {
3342                    debug!("get_expired_timers found {expired_timers:?}");
3343                }
3344                Ok(expired_timers)
3345            }, "get_expired_timers"
3346        )
3347        .await
3348    }
3349
3350    async fn get_execution_event(
3351        &self,
3352        execution_id: &ExecutionId,
3353        version: &Version,
3354    ) -> Result<ExecutionEvent, DbErrorRead> {
3355        let version = version.0;
3356        let execution_id = execution_id.clone();
3357        self.transaction(
3358            move |tx| Self::get_execution_event(tx, &execution_id, version),
3359            "get_execution_event",
3360        )
3361        .await
3362    }
3363
3364    async fn get_last_execution_event(
3365        &self,
3366        execution_id: &ExecutionId,
3367    ) -> Result<ExecutionEvent, DbErrorRead> {
3368        let execution_id = execution_id.clone();
3369        self.transaction(
3370            move |tx| Self::get_last_execution_event(tx, &execution_id),
3371            "get_last_execution_event",
3372        )
3373        .await
3374    }
3375
3376    async fn get_pending_state(
3377        &self,
3378        execution_id: &ExecutionId,
3379    ) -> Result<PendingState, DbErrorRead> {
3380        let execution_id = execution_id.clone();
3381        Ok(self
3382            .transaction(
3383                move |tx| Self::get_combined_state(tx, &execution_id),
3384                "get_pending_state",
3385            )
3386            .await?
3387            .pending_state)
3388    }
3389
3390    async fn list_executions(
3391        &self,
3392        ffqn: Option<FunctionFqn>,
3393        top_level_only: bool,
3394        pagination: ExecutionListPagination,
3395    ) -> Result<Vec<ExecutionWithState>, DbErrorGeneric> {
3396        self.transaction(
3397            move |tx| Self::list_executions(tx, ffqn.as_ref(), top_level_only, &pagination),
3398            "list_executions",
3399        )
3400        .await
3401    }
3402
3403    async fn list_responses(
3404        &self,
3405        execution_id: &ExecutionId,
3406        pagination: Pagination<u32>,
3407    ) -> Result<Vec<ResponseWithCursor>, DbErrorRead> {
3408        let execution_id = execution_id.clone();
3409        self.transaction(
3410            move |tx| Self::list_responses(tx, &execution_id, Some(pagination)),
3411            "list_executions",
3412        )
3413        .await
3414    }
3415}
3416
3417#[cfg(any(test, feature = "tempfile"))]
3418pub mod tempfile {
3419    use super::{SqliteConfig, SqlitePool};
3420    use tempfile::NamedTempFile;
3421
3422    pub async fn sqlite_pool() -> (SqlitePool, Option<NamedTempFile>) {
3423        if let Ok(path) = std::env::var("SQLITE_FILE") {
3424            (
3425                SqlitePool::new(path, SqliteConfig::default())
3426                    .await
3427                    .unwrap(),
3428                None,
3429            )
3430        } else {
3431            let file = NamedTempFile::new().unwrap();
3432            let path = file.path();
3433            (
3434                SqlitePool::new(path, SqliteConfig::default())
3435                    .await
3436                    .unwrap(),
3437                Some(file),
3438            )
3439        }
3440    }
3441}