Skip to main content

obeli_sk_db_sqlite/
sqlite_dao.rs

1use crate::{histograms::Histograms, sqlite_dao::conversions::to_generic_error};
2use async_trait::async_trait;
3use chrono::{DateTime, Utc};
4use concepts::{
5    ComponentId, ComponentRetryConfig, ExecutionId, FunctionFqn, JoinSetId, StrVariant,
6    SupportedFunctionReturnValue,
7    component_id::InputContentDigest,
8    prefixed_ulid::{DelayId, DeploymentId, ExecutionIdDerived, ExecutorId, RunId},
9    storage::{
10        AppendBatchResponse, AppendDelayResponseOutcome, AppendEventsToExecution, AppendRequest,
11        AppendResponse, AppendResponseToExecution, BacktraceFilter, BacktraceInfo, CreateRequest,
12        DUMMY_CREATED, DUMMY_HISTORY_EVENT, DbConnection, DbErrorGeneric, DbErrorRead,
13        DbErrorReadWithTimeout, DbErrorWrite, DbErrorWriteNonRetriable, DbExecutor, DbExternalApi,
14        DbPool, DbPoolCloseable, DeploymentState, ExecutionEvent, ExecutionListPagination,
15        ExecutionRequest, ExecutionWithState, ExecutionWithStateRequestsResponses, ExpiredDelay,
16        ExpiredLock, ExpiredTimer, HISTORY_EVENT_TYPE_JOIN_NEXT, HistoryEvent, JoinSetRequest,
17        JoinSetResponse, JoinSetResponseEvent, JoinSetResponseEventOuter,
18        ListExecutionEventsResponse, ListExecutionsFilter, ListLogsResponse, ListResponsesResponse,
19        LockPendingResponse, Locked, LockedBy, LockedExecution, LogEntry, LogEntryRow, LogFilter,
20        LogInfoAppendRow, LogLevel, LogStreamType, Pagination, PendingState,
21        PendingStateBlockedByJoinSet, PendingStateFinishedResultKind, PendingStateMergedPause,
22        ResponseCursor, ResponseWithCursor, STATE_BLOCKED_BY_JOIN_SET, STATE_FINISHED,
23        STATE_LOCKED, STATE_PENDING_AT, TimeoutOutcome, Version, VersionType,
24    },
25};
26use const_format::formatcp;
27use conversions::{JsonWrapper, consistency_db_err, consistency_rusqlite, from_generic_error};
28use db_common::{
29    AppendNotifier, CombinedState, CombinedStateDTO, NotifierExecutionFinished, NotifierPendingAt,
30    PendingFfqnSubscribersHolder,
31};
32use hashbrown::HashMap;
33use rusqlite::{
34    CachedStatement, Connection, OpenFlags, OptionalExtension, Params, Row, ToSql, Transaction,
35    TransactionBehavior, named_params, types::ToSqlOutput,
36};
37use std::{
38    cmp::max,
39    collections::VecDeque,
40    fmt::Debug,
41    ops::DerefMut,
42    panic::Location,
43    path::Path,
44    sync::{
45        Arc, Mutex,
46        atomic::{AtomicBool, Ordering},
47    },
48    time::{Duration, Instant},
49};
50use std::{fmt::Write as _, pin::Pin};
51use strum::IntoEnumIterator as _;
52use tokio::sync::{mpsc, oneshot};
53use tracing::{Level, Span, debug, error, info, instrument, trace, warn};
54use tracing_error::SpanTrace;
55
56#[derive(Debug, thiserror::Error)]
57#[error("initialization error")]
58pub struct InitializationError;
59
60#[derive(Debug, Clone)]
61struct DelayReq {
62    join_set_id: JoinSetId,
63    delay_id: DelayId,
64    expires_at: DateTime<Utc>,
65}
66/*
67mmap_size = 128MB - Set the global memory map so all processes can share some data
68https://www.sqlite.org/pragma.html#pragma_mmap_size
69https://www.sqlite.org/mmap.html
70
71journal_size_limit = 64 MB - limit on the WAL file to prevent unlimited growth
72https://www.sqlite.org/pragma.html#pragma_journal_size_limit
73
74Inspired by https://github.com/rails/rails/pull/49349
75*/
76
77const PRAGMA: [[&str; 2]; 10] = [
78    ["journal_mode", "wal"],
79    ["synchronous", "FULL"],
80    ["foreign_keys", "true"],
81    ["busy_timeout", "1000"],
82    ["cache_size", "10000"], // number of pages
83    ["temp_store", "MEMORY"],
84    ["page_size", "8192"], // 8 KB
85    ["mmap_size", "134217728"],
86    ["journal_size_limit", "67108864"],
87    ["integrity_check", ""],
88];
89
90// Append only
91const CREATE_TABLE_T_METADATA: &str = r"
92CREATE TABLE IF NOT EXISTS t_metadata (
93    id INTEGER PRIMARY KEY AUTOINCREMENT,
94    schema_version INTEGER NOT NULL,
95    created_at TEXT NOT NULL
96) STRICT
97";
98const T_METADATA_EXPECTED_SCHEMA_VERSION: u32 = 6;
99
100/// Stores execution history. Append only.
101const CREATE_TABLE_T_EXECUTION_LOG: &str = r"
102CREATE TABLE IF NOT EXISTS t_execution_log (
103    execution_id TEXT NOT NULL,
104    created_at TEXT NOT NULL,
105    json_value TEXT NOT NULL,
106    version INTEGER NOT NULL,
107    variant TEXT NOT NULL,
108    join_set_id TEXT,
109    history_event_type TEXT GENERATED ALWAYS AS (json_value->>'$.history_event.event.type') STORED,
110    PRIMARY KEY (execution_id, version)
111) STRICT
112";
113// Used in `fetch_created` and `get_execution_event`
114const CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_VERSION: &str = r"
115CREATE INDEX IF NOT EXISTS idx_t_execution_log_execution_id_version  ON t_execution_log (execution_id, version);
116";
117// Used in `lock_inner` to filter by execution ID and variant (created or event history)
118const CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_VARIANT: &str = r"
119CREATE INDEX IF NOT EXISTS idx_t_execution_log_execution_id_variant  ON t_execution_log (execution_id, variant);
120";
121
122// Used in `count_join_next`
123const CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_JOIN_SET: &str = const_format::formatcp!(
124    "CREATE INDEX IF NOT EXISTS idx_t_execution_log_execution_id_join_set  ON t_execution_log (execution_id, join_set_id, history_event_type) WHERE history_event_type=\"{}\";",
125    HISTORY_EVENT_TYPE_JOIN_NEXT
126);
127
128/// Stores child execution return values for the parent (`execution_id`). Append only.
129/// For `JoinSetResponse::DelayFinished`, columns `delay_id`,`delay_success` must not not null.
130/// For `JoinSetResponse::ChildExecutionFinished`, columns `child_execution_id`,`finished_version`
131/// must not be null.
132const CREATE_TABLE_T_JOIN_SET_RESPONSE: &str = r"
133CREATE TABLE IF NOT EXISTS t_join_set_response (
134    id INTEGER PRIMARY KEY AUTOINCREMENT,
135    created_at TEXT NOT NULL,
136    execution_id TEXT NOT NULL,
137    join_set_id TEXT NOT NULL,
138
139    delay_id TEXT,
140    delay_success INTEGER,
141
142    child_execution_id TEXT,
143    finished_version INTEGER,
144
145    UNIQUE (execution_id, join_set_id, delay_id, child_execution_id)
146) STRICT
147";
148// Used when querying for the next response
149const CREATE_INDEX_IDX_T_JOIN_SET_RESPONSE_EXECUTION_ID_ID: &str = r"
150CREATE INDEX IF NOT EXISTS idx_t_join_set_response_execution_id_id ON t_join_set_response (execution_id, id);
151";
152// Child execution id must be unique.
153const CREATE_INDEX_IDX_JOIN_SET_RESPONSE_UNIQUE_CHILD_ID: &str = r"
154CREATE UNIQUE INDEX IF NOT EXISTS idx_join_set_response_unique_child_id
155ON t_join_set_response (child_execution_id) WHERE child_execution_id IS NOT NULL;
156";
157// Delay id must be unique.
158const CREATE_INDEX_IDX_JOIN_SET_RESPONSE_UNIQUE_DELAY_ID: &str = r"
159CREATE UNIQUE INDEX IF NOT EXISTS idx_join_set_response_unique_delay_id
160ON t_join_set_response (delay_id) WHERE delay_id IS NOT NULL;
161";
162
163/// Stores executions in `PendingState`
164/// `state` to column mapping:
165/// `PendingAt`:            (nothing but required columns + Preserves `executor_id`, `run_id` only if locked previously.
166/// `Locked`:               `max_retries`, `retry_exp_backoff_millis`, `last_lock_version`, `executor_id`, `run_id`.
167/// `BlockedByJoinSet`:     `join_set_id`, `join_set_closing`. Preserves `executor_id`, `run_id` from `Locked` state for lock extensions.
168/// `Finished` :            `result_kind`.
169///
170/// Column details:
171/// ## `deployment_id`:
172/// Which deployment created / last locked the execution. Transitioning to Blocked is issued by the same executor.
173/// Remote deployment can append response and transition the execution to Pending, but that does not change the `deployment_id`.
174///
175/// ## `pending_expires_finished`
176/// Either pending at, lock expires at or finished at based on state.
177///
178/// ## `max_retries` and `retry_exp_backoff_millis`
179/// Only needed for selecting expired locks.
180///
181/// ## `last_lock_version`
182/// Needed for selecting expired locks, see [`ExpiredLock::locked_at_version`], cleared unless state is `Locked`.
183///
184/// ## `executor_id`, `run_id`
185/// Set by `Locked` event. When transitioning to `PendingAt` or `BlockedByJoinSet`, those columns should be preserved so that the workflow can extend the lock.
186///
187/// ## `component_id_input_digest`
188/// Inserted when Created, updated on every Locked event because of locking by ffqn.
189const CREATE_TABLE_T_STATE: &str = r"
190CREATE TABLE IF NOT EXISTS t_state (
191    execution_id TEXT NOT NULL,
192    is_top_level INTEGER NOT NULL,
193    corresponding_version INTEGER NOT NULL,
194    ffqn TEXT NOT NULL,
195    created_at TEXT NOT NULL,
196    component_id_input_digest BLOB NOT NULL,
197    component_type TEXT NOT NULL,
198    first_scheduled_at TEXT NOT NULL,
199    deployment_id TEXT NOT NULL,
200    is_paused INTEGER NOT NULL,
201
202    pending_expires_finished TEXT NOT NULL,
203    state TEXT NOT NULL,
204    updated_at TEXT NOT NULL,
205    intermittent_event_count INTEGER NOT NULL,
206
207    max_retries INTEGER,
208    retry_exp_backoff_millis INTEGER,
209    last_lock_version INTEGER,
210    executor_id TEXT,
211    run_id TEXT,
212
213    join_set_id TEXT,
214    join_set_closing INTEGER,
215
216    result_kind TEXT,
217
218    PRIMARY KEY (execution_id)
219) STRICT
220";
221
222// For `get_pending_by_ffqns`
223const IDX_T_STATE_LOCK_PENDING_BY_FFQN: &str = formatcp!(
224    r"
225CREATE INDEX IF NOT EXISTS idx_t_state_lock_pending_by_ffqn ON t_state (pending_expires_finished, ffqn) WHERE state = '{}';
226",
227    STATE_PENDING_AT
228);
229// For `get_pending_by_component_input_digest`
230const IDX_T_STATE_LOCK_PENDING_BY_COMPONENT: &str = formatcp!(
231    r"
232CREATE INDEX IF NOT EXISTS idx_t_state_lock_pending_by_component ON t_state (pending_expires_finished, component_id_input_digest) WHERE state = '{}';
233",
234    STATE_PENDING_AT
235);
236const IDX_T_STATE_EXPIRED_LOCKS: &str = formatcp!(
237    "CREATE INDEX IF NOT EXISTS idx_t_state_expired_locks ON t_state (pending_expires_finished) WHERE state = '{}';",
238    STATE_LOCKED
239);
240const IDX_T_STATE_EXECUTION_ID_IS_TOP_LEVEL: &str = r"
241CREATE INDEX IF NOT EXISTS idx_t_state_execution_id_is_root ON t_state (execution_id, is_top_level);
242";
243// For `list_executions` by ffqn
244const IDX_T_STATE_FFQN: &str = r"
245CREATE INDEX IF NOT EXISTS idx_t_state_ffqn ON t_state (ffqn);
246";
247// For `list_executions` by creation date
248const IDX_T_STATE_CREATED_AT: &str = r"
249CREATE INDEX IF NOT EXISTS idx_t_state_created_at ON t_state (created_at);
250";
251
252// For `list_deployment_states`
253const IDX_T_STATE_DEPLOYMENT_STATE: &str = r"
254CREATE INDEX IF NOT EXISTS idx_t_state_deployment_state ON t_state (deployment_id, state);
255";
256
257/// Represents [`ExpiredTimer::AsyncDelay`] . Rows are deleted when the delay is processed.
258const CREATE_TABLE_T_DELAY: &str = r"
259CREATE TABLE IF NOT EXISTS t_delay (
260    execution_id TEXT NOT NULL,
261    join_set_id TEXT NOT NULL,
262    delay_id TEXT NOT NULL,
263    expires_at TEXT NOT NULL,
264    PRIMARY KEY (execution_id, join_set_id, delay_id)
265) STRICT
266";
267
268// Backtrace tables
269// Append only
270const CREATE_TABLE_T_EXECUTION_BACKTRACE: &str = r"
271CREATE TABLE IF NOT EXISTS t_execution_backtrace (
272    execution_id TEXT NOT NULL,
273    component_id TEXT NOT NULL,
274    version_min_including INTEGER NOT NULL,
275    version_max_excluding INTEGER NOT NULL,
276    backtrace_hash BLOB NOT NULL,
277
278    PRIMARY KEY (
279        execution_id,
280        version_min_including,
281        version_max_excluding
282    ),
283    FOREIGN KEY (backtrace_hash)
284        REFERENCES t_wasm_backtrace(backtrace_hash)
285) STRICT
286";
287// Index for searching backtraces by execution_id and version
288const IDX_T_EXECUTION_BACKTRACE_EXECUTION_ID_VERSION: &str = r"
289CREATE INDEX IF NOT EXISTS idx_t_execution_backtrace_execution_id_version
290ON t_execution_backtrace (
291    execution_id,
292    version_min_including,
293    version_max_excluding
294)
295";
296// Deduplication of backtraces
297const CREATE_TABLE_T_WASM_BACKTRACE: &str = r"
298CREATE TABLE IF NOT EXISTS t_wasm_backtrace (
299    backtrace_hash BLOB NOT NULL,
300    wasm_backtrace TEXT NOT NULL,
301
302    PRIMARY KEY (backtrace_hash)
303) STRICT
304";
305
306/// Stores logs and std stream output of execution runs. Append only.
307/// Logs have `level` and `message` null.
308/// Std streams have `stream_type`, `payload` not null.
309const CREATE_TABLE_T_LOG: &str = r"
310CREATE TABLE IF NOT EXISTS t_log (
311    id INTEGER PRIMARY KEY,
312    execution_id TEXT NOT NULL,
313    run_id TEXT NOT NULL,
314    created_at TEXT NOT NULL,
315    level INTEGER,
316    message TEXT,
317    stream_type INTEGER,
318    payload BLOB
319) STRICT
320";
321const IDX_T_LOG_EXECUTION_ID_RUN_ID_CREATED_AT: &str = r"
322CREATE INDEX IF NOT EXISTS idx_t_log_execution_id_run_id_created_at ON t_log (execution_id, run_id, created_at);
323";
324const IDX_T_LOG_EXECUTION_ID_CREATED_AT: &str = r"
325CREATE INDEX IF NOT EXISTS idx_t_log_execution_id_created_at ON t_log (execution_id, created_at);
326";
327
328#[derive(Debug, thiserror::Error, Clone)]
329enum RusqliteError {
330    #[error("not found")]
331    NotFound,
332    #[error("generic: {reason}")]
333    Generic {
334        reason: StrVariant,
335        context: SpanTrace,
336        source: Option<Arc<dyn std::error::Error + Send + Sync>>,
337        loc: &'static Location<'static>,
338    },
339    #[error("close")]
340    Close,
341}
342
343mod conversions {
344
345    use super::RusqliteError;
346    use concepts::{
347        StrVariant,
348        storage::{DbErrorGeneric, DbErrorRead, DbErrorReadWithTimeout, DbErrorWrite},
349    };
350    use rusqlite::{
351        ToSql,
352        types::{FromSql, FromSqlError},
353    };
354    use std::{fmt::Debug, panic::Location, sync::Arc};
355    use tracing::error;
356    use tracing_error::SpanTrace;
357
358    impl From<rusqlite::Error> for RusqliteError {
359        // The LTX returns this, capture inner location.
360        #[track_caller]
361        fn from(err: rusqlite::Error) -> Self {
362            if matches!(err, rusqlite::Error::QueryReturnedNoRows) {
363                RusqliteError::NotFound
364            } else {
365                RusqliteError::Generic {
366                    reason: err.to_string().into(),
367                    context: SpanTrace::capture(),
368                    source: Some(Arc::new(err)),
369                    loc: Location::caller(),
370                }
371            }
372        }
373    }
374
375    // Manual conversion, as this function ignores the NotFound.
376    #[track_caller]
377    pub fn to_generic_error(err: RusqliteError) -> DbErrorGeneric {
378        if let RusqliteError::Close = err {
379            DbErrorGeneric::Close
380        } else {
381            DbErrorGeneric::Uncategorized {
382                reason: err.to_string().into(),
383                context: SpanTrace::capture(),
384                source: Some(Arc::new(err)),
385                loc: Location::caller(),
386            }
387        }
388    }
389
390    /// Convert a `DbErrorGeneric` to `rusqlite::Error` for use in row mapping closures.
391    #[track_caller]
392    pub fn from_generic_error(err: &DbErrorGeneric) -> rusqlite::Error {
393        FromSqlError::other(OtherError {
394            reason: err.to_string().into(),
395            loc: Location::caller(),
396        })
397        .into()
398    }
399
400    impl From<RusqliteError> for DbErrorRead {
401        fn from(err: RusqliteError) -> Self {
402            if matches!(err, RusqliteError::NotFound) {
403                Self::NotFound
404            } else {
405                to_generic_error(err).into()
406            }
407        }
408    }
409    impl From<RusqliteError> for DbErrorReadWithTimeout {
410        fn from(err: RusqliteError) -> Self {
411            Self::from(DbErrorRead::from(err))
412        }
413    }
414    impl From<RusqliteError> for DbErrorWrite {
415        fn from(err: RusqliteError) -> Self {
416            if matches!(err, RusqliteError::NotFound) {
417                Self::NotFound
418            } else {
419                to_generic_error(err).into()
420            }
421        }
422    }
423
424    pub(crate) struct JsonWrapper<T>(pub(crate) T);
425    impl<T: serde::de::DeserializeOwned + 'static + Debug> FromSql for JsonWrapper<T> {
426        fn column_result(
427            value: rusqlite::types::ValueRef<'_>,
428        ) -> rusqlite::types::FromSqlResult<Self> {
429            let value = match value {
430                rusqlite::types::ValueRef::Text(value) | rusqlite::types::ValueRef::Blob(value) => {
431                    Ok(value)
432                }
433                other => {
434                    error!(
435                        backtrace = %std::backtrace::Backtrace::capture(),
436                        "Unexpected type when conveting to JSON - expected Text or Blob, got type `{other:?}`",
437                    );
438                    Err(FromSqlError::InvalidType)
439                }
440            }?;
441            let value = serde_json::from_slice::<T>(value).map_err(|err| {
442                error!(
443                    backtrace = %std::backtrace::Backtrace::capture(),
444                    "Cannot convert JSON value `{value:?}` to type:`{type}` - {err:?}",
445                    r#type = std::any::type_name::<T>()
446                );
447                FromSqlError::InvalidType
448            })?;
449            Ok(Self(value))
450        }
451    }
452    impl<T: serde::ser::Serialize + Debug> ToSql for JsonWrapper<T> {
453        fn to_sql(&self) -> rusqlite::Result<rusqlite::types::ToSqlOutput<'_>> {
454            let string = serde_json::to_string(&self.0).map_err(|err| {
455                error!(
456                    "Cannot serialize {value:?} of type `{type}` - {err:?}",
457                    value = self.0,
458                    r#type = std::any::type_name::<T>()
459                );
460                rusqlite::Error::ToSqlConversionFailure(Box::new(err))
461            })?;
462            Ok(rusqlite::types::ToSqlOutput::Owned(
463                rusqlite::types::Value::Text(string),
464            ))
465        }
466    }
467
468    // Used as a wrapper for `FromSqlError::Other`
469    #[derive(Debug, thiserror::Error)]
470    #[error("{reason}")]
471    pub(crate) struct OtherError {
472        reason: StrVariant,
473        loc: &'static Location<'static>,
474    }
475
476    #[track_caller]
477    pub(crate) fn consistency_rusqlite(reason: impl Into<StrVariant>) -> rusqlite::Error {
478        FromSqlError::other(OtherError {
479            reason: reason.into(),
480            loc: Location::caller(),
481        })
482        .into()
483    }
484
485    #[track_caller]
486    pub(crate) fn consistency_db_err(reason: impl Into<StrVariant>) -> DbErrorGeneric {
487        DbErrorGeneric::Uncategorized {
488            reason: reason.into(),
489            context: SpanTrace::capture(),
490            source: None,
491            loc: Location::caller(),
492        }
493    }
494}
495
496#[derive(Debug, Copy, Clone, PartialEq, Eq)]
497enum TxType {
498    MultipleWrites, // PhyTx must be rolled back, other LTX restarted
499    Other,          // Read only or a single write LTX. Continue the PhyTx if LTX returns error.
500}
501
502#[derive(Clone)]
503struct CommitError(RusqliteError);
504
505#[derive(Debug)]
506struct ShouldRollback;
507
508#[derive(derive_more::Debug)]
509struct LogicalTx {
510    #[debug(skip)]
511    #[expect(clippy::type_complexity)]
512    func: Box<dyn FnMut(&mut Transaction) -> Result<(), ShouldRollback> + Send>,
513    sent_at: Instant,
514    func_name: &'static str,
515    #[debug(skip)]
516    // Result of Physical Tx commit / rollback
517    phytx_flush_sender: oneshot::Sender<Result<(), CommitError>>,
518    priority: LtxPriority,
519}
520#[derive(Copy, Clone, Debug, PartialEq, Eq)]
521enum LtxPriority {
522    High,
523    Low,
524}
525
526#[derive(derive_more::Debug)]
527enum ThreadCommand {
528    LogicalTx(LogicalTx),
529    Shutdown,
530}
531
532#[derive(Clone)]
533pub struct SqlitePool(SqlitePoolInner);
534
535type ResponseSubscribers =
536    Arc<Mutex<HashMap<ExecutionId, (oneshot::Sender<ResponseWithCursor>, u64)>>>;
537type PendingSubscribers = Arc<Mutex<PendingFfqnSubscribersHolder>>;
538type ExecutionFinishedSubscribers =
539    Mutex<HashMap<ExecutionId, HashMap<u64, oneshot::Sender<SupportedFunctionReturnValue>>>>;
540
541#[derive(Clone)]
542struct SqlitePoolInner {
543    shutdown_requested: Arc<AtomicBool>,
544    shutdown_finished: Arc<AtomicBool>,
545    command_tx: tokio::sync::mpsc::Sender<ThreadCommand>,
546    response_subscribers: ResponseSubscribers,
547    pending_subscribers: PendingSubscribers,
548    execution_finished_subscribers: Arc<ExecutionFinishedSubscribers>,
549    join_handle: Option<Arc<std::thread::JoinHandle<()>>>, // always Some, Optional for swapping in drop.
550}
551
552#[async_trait]
553impl DbPoolCloseable for SqlitePool {
554    async fn close(&self) {
555        debug!("Sqlite is closing");
556        self.0.shutdown_requested.store(true, Ordering::Release);
557        // Unblock the thread's blocking_recv. If the capacity is reached, the next processed message will trigger shutdown.
558        let _ = self.0.command_tx.try_send(ThreadCommand::Shutdown);
559        while !self.0.shutdown_finished.load(Ordering::Acquire) {
560            tokio::time::sleep(Duration::from_millis(1)).await;
561        }
562        debug!("Sqlite was closed");
563    }
564}
565
566#[async_trait]
567impl DbPool for SqlitePool {
568    async fn db_exec_conn(&self) -> Result<Box<dyn DbExecutor>, DbErrorGeneric> {
569        if self.0.shutdown_requested.load(Ordering::Acquire) {
570            return Err(DbErrorGeneric::Close);
571        }
572        Ok(Box::new(self.clone()))
573    }
574
575    async fn connection(&self) -> Result<Box<dyn DbConnection>, DbErrorGeneric> {
576        if self.0.shutdown_requested.load(Ordering::Acquire) {
577            return Err(DbErrorGeneric::Close);
578        }
579        Ok(Box::new(self.clone()))
580    }
581    async fn external_api_conn(&self) -> Result<Box<dyn DbExternalApi>, DbErrorGeneric> {
582        if self.0.shutdown_requested.load(Ordering::Acquire) {
583            return Err(DbErrorGeneric::Close);
584        }
585        Ok(Box::new(self.clone()))
586    }
587    #[cfg(feature = "test")]
588    async fn connection_test(
589        &self,
590    ) -> Result<Box<dyn concepts::storage::DbConnectionTest>, DbErrorGeneric> {
591        if self.0.shutdown_requested.load(Ordering::Acquire) {
592            return Err(DbErrorGeneric::Close);
593        }
594        Ok(Box::new(self.clone()))
595    }
596}
597
598impl Drop for SqlitePool {
599    fn drop(&mut self) {
600        let arc = self.0.join_handle.take().expect("join_handle was set");
601        if let Ok(join_handle) = Arc::try_unwrap(arc) {
602            // Last holder
603            if !join_handle.is_finished() {
604                if !self.0.shutdown_finished.load(Ordering::Acquire) {
605                    // Best effort to shut down the sqlite thread.
606                    let backtrace = std::backtrace::Backtrace::capture();
607                    warn!("SqlitePool was not closed properly - {backtrace}");
608                    self.0.shutdown_requested.store(true, Ordering::Release);
609                    // Unblock the thread's blocking_recv. If the capacity is reached, the next processed message will trigger shutdown.
610                    let _ = self.0.command_tx.try_send(ThreadCommand::Shutdown);
611                    // Not joining the thread, drop might be called from async context.
612                    // We are shutting down the server anyway.
613                } else {
614                    // The thread set `shutdown_finished` as its last operation.
615                }
616            }
617        }
618    }
619}
620
621#[derive(Debug, Clone)]
622pub struct SqliteConfig {
623    pub queue_capacity: usize,
624    pub pragma_override: Option<HashMap<String, String>>,
625    pub metrics_threshold: Option<Duration>,
626}
627impl Default for SqliteConfig {
628    fn default() -> Self {
629        Self {
630            queue_capacity: 100,
631            pragma_override: None,
632            metrics_threshold: None,
633        }
634    }
635}
636
637struct ShutdownRequested;
638
639impl SqlitePool {
640    fn init_thread(
641        path: &Path,
642        mut pragma_override: HashMap<String, String>,
643    ) -> Result<Connection, InitializationError> {
644        fn conn_execute<P: Params>(
645            conn: &Connection,
646            sql: &str,
647            params: P,
648        ) -> Result<(), InitializationError> {
649            conn.execute(sql, params).map(|_| ()).map_err(|err| {
650                error!("Cannot run `{sql}` - {err:?}");
651                InitializationError
652            })
653        }
654        fn pragma_update(
655            conn: &Connection,
656            name: &str,
657            value: &str,
658        ) -> Result<(), InitializationError> {
659            if value.is_empty() {
660                debug!("Querying PRAGMA {name}");
661                conn.pragma_query(None, name, |row| {
662                    debug!("{row:?}");
663                    Ok(())
664                })
665                .map_err(|err| {
666                    error!("cannot update pragma `{name}`=`{value}` - {err:?}");
667                    InitializationError
668                })
669            } else {
670                debug!("Setting PRAGMA {name}={value}");
671                conn.pragma_update(None, name, value).map_err(|err| {
672                    error!("cannot update pragma `{name}`=`{value}` - {err:?}");
673                    InitializationError
674                })
675            }
676        }
677
678        let conn = Connection::open_with_flags(path, OpenFlags::default()).map_err(|err| {
679            error!("cannot open the connection - {err:?}");
680            InitializationError
681        })?;
682
683        for [pragma_name, default_value] in PRAGMA {
684            let pragma_value = pragma_override
685                .remove(pragma_name)
686                .unwrap_or_else(|| default_value.to_string());
687            pragma_update(&conn, pragma_name, &pragma_value)?;
688        }
689        // drain the rest overrides
690        for (pragma_name, pragma_value) in pragma_override.drain() {
691            pragma_update(&conn, &pragma_name, &pragma_value)?;
692        }
693
694        // t_metadata
695        {
696            conn_execute(&conn, CREATE_TABLE_T_METADATA, [])?;
697            // Insert row if not exists.
698
699            let actual_version = conn
700                .prepare("SELECT schema_version FROM t_metadata ORDER BY id DESC LIMIT 1")
701                .map_err(|err| {
702                    error!("cannot select schema version - {err:?}");
703                    InitializationError
704                })?
705                .query_row([], |row| row.get::<_, u32>("schema_version"))
706                .optional()
707                .map_err(|err| {
708                    error!("Cannot read the schema version - {err:?}");
709                    InitializationError
710                })?;
711
712            match actual_version {
713                None => conn_execute(
714                    &conn,
715                    &format!(
716                        "INSERT INTO t_metadata (schema_version, created_at) VALUES
717                            ({T_METADATA_EXPECTED_SCHEMA_VERSION}, ?) ON CONFLICT DO NOTHING"
718                    ),
719                    [Utc::now()],
720                )?,
721                Some(actual_version) => {
722                    // Fail on unexpected `schema_version`.
723                    if actual_version != T_METADATA_EXPECTED_SCHEMA_VERSION {
724                        error!(
725                            "wrong schema version, expected {T_METADATA_EXPECTED_SCHEMA_VERSION}, got {actual_version}"
726                        );
727                        return Err(InitializationError);
728                    }
729                }
730            }
731        }
732
733        // t_execution_log
734        conn_execute(&conn, CREATE_TABLE_T_EXECUTION_LOG, [])?;
735        conn_execute(
736            &conn,
737            CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_VERSION,
738            [],
739        )?;
740        conn_execute(
741            &conn,
742            CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_VARIANT,
743            [],
744        )?;
745        conn_execute(
746            &conn,
747            CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_JOIN_SET,
748            [],
749        )?;
750        // t_join_set_response
751        conn_execute(&conn, CREATE_TABLE_T_JOIN_SET_RESPONSE, [])?;
752        conn_execute(
753            &conn,
754            CREATE_INDEX_IDX_T_JOIN_SET_RESPONSE_EXECUTION_ID_ID,
755            [],
756        )?;
757        conn_execute(
758            &conn,
759            CREATE_INDEX_IDX_JOIN_SET_RESPONSE_UNIQUE_CHILD_ID,
760            [],
761        )?;
762        conn_execute(
763            &conn,
764            CREATE_INDEX_IDX_JOIN_SET_RESPONSE_UNIQUE_DELAY_ID,
765            [],
766        )?;
767        // t_state
768        conn_execute(&conn, CREATE_TABLE_T_STATE, [])?;
769        conn_execute(&conn, IDX_T_STATE_LOCK_PENDING_BY_FFQN, [])?;
770        conn_execute(&conn, IDX_T_STATE_LOCK_PENDING_BY_COMPONENT, [])?;
771        conn_execute(&conn, IDX_T_STATE_EXPIRED_LOCKS, [])?;
772        conn_execute(&conn, IDX_T_STATE_EXECUTION_ID_IS_TOP_LEVEL, [])?;
773        conn_execute(&conn, IDX_T_STATE_FFQN, [])?;
774        conn_execute(&conn, IDX_T_STATE_CREATED_AT, [])?;
775        conn_execute(&conn, IDX_T_STATE_DEPLOYMENT_STATE, [])?;
776        // t_delay
777        conn_execute(&conn, CREATE_TABLE_T_DELAY, [])?;
778        // backtrace
779        conn_execute(&conn, CREATE_TABLE_T_EXECUTION_BACKTRACE, [])?;
780        conn_execute(&conn, IDX_T_EXECUTION_BACKTRACE_EXECUTION_ID_VERSION, [])?;
781        conn_execute(&conn, CREATE_TABLE_T_WASM_BACKTRACE, [])?;
782        // t_log
783        conn_execute(&conn, CREATE_TABLE_T_LOG, [])?;
784        conn_execute(&conn, IDX_T_LOG_EXECUTION_ID_RUN_ID_CREATED_AT, [])?;
785        conn_execute(&conn, IDX_T_LOG_EXECUTION_ID_CREATED_AT, [])?;
786        Ok(conn)
787    }
788
789    fn connection_rpc(
790        mut conn: Connection,
791        shutdown_requested: &AtomicBool,
792        shutdown_finished: &AtomicBool,
793        mut command_rx: mpsc::Receiver<ThreadCommand>,
794        metrics_threshold: Option<Duration>,
795    ) {
796        let mut histograms = Histograms::new(metrics_threshold);
797        while Self::tick(
798            &mut conn,
799            shutdown_requested,
800            &mut command_rx,
801            &mut histograms,
802        )
803        .is_ok()
804        {
805            // Loop until shutdown is set to true.
806        }
807        debug!("Closing command thread");
808        shutdown_finished.store(true, Ordering::Release);
809    }
810
811    fn tick(
812        conn: &mut Connection,
813        shutdown_requested: &AtomicBool,
814        command_rx: &mut mpsc::Receiver<ThreadCommand>,
815        histograms: &mut Histograms,
816    ) -> Result<(), ShutdownRequested> {
817        #[derive(Clone, Copy, PartialEq, Eq)]
818        enum ApplyOrSkip {
819            Apply,
820            Skip, // LTX apply failed previously
821        }
822        let mut ltx_list: Vec<(LogicalTx, ApplyOrSkip)> = Vec::new();
823        // perf: Wait for first logical tx with high priority.
824        loop {
825            let ltx = match command_rx.blocking_recv() {
826                Some(ThreadCommand::LogicalTx(ltx)) => ltx,
827                Some(ThreadCommand::Shutdown) => {
828                    debug!("Shutdown message received");
829                    return Err(ShutdownRequested);
830                }
831                None => {
832                    debug!("command_rx was closed");
833                    return Err(ShutdownRequested);
834                }
835            };
836            let prio = ltx.priority;
837            ltx_list.push((ltx, ApplyOrSkip::Apply));
838            if prio == LtxPriority::High {
839                break;
840            }
841        }
842        // Exactly one High prio LTX is in `ltx_list`.
843
844        let all_fns_start = std::time::Instant::now();
845
846        // Add all remaining LTXes that were queued after the first High-prio.
847        while let Ok(more) = command_rx.try_recv() {
848            let ltx = match more {
849                ThreadCommand::Shutdown => {
850                    debug!("Shutdown message received");
851                    // ptx gets rolled back on drop
852                    // phytx_flush_receiver drop is converted to `RusqliteError::Close` error
853                    return Err(ShutdownRequested);
854                }
855                ThreadCommand::LogicalTx(ltx) => ltx,
856            };
857            ltx_list.push((ltx, ApplyOrSkip::Apply));
858        }
859
860        struct NeedsRestart;
861        type CommitResult = Result<(), CommitError>;
862        fn try_apply_all(
863            mut ptx: Transaction<'_>,
864            ltx_list: &mut [(LogicalTx, ApplyOrSkip)],
865            histograms: &mut Histograms,
866            all_fns_start: Instant,
867        ) -> Result<CommitResult, NeedsRestart> {
868            for (ltx, former_res) in ltx_list
869                .iter_mut()
870                .filter(|(_, former_res)| *former_res == ApplyOrSkip::Apply)
871            {
872                if let Ok(()) = SqlitePool::ltx_apply_to_phytx(ltx, &mut ptx, histograms) {
873                } else {
874                    *former_res = ApplyOrSkip::Skip;
875                    // ptx rollbacks on drop
876                    return Err(NeedsRestart);
877                }
878            }
879            // All LTXes were applied or skipped.
880            histograms.record_all_fns(all_fns_start.elapsed());
881            let now = std::time::Instant::now();
882            let commit_result = ptx.commit().map_err(|err| {
883                warn!("Cannot commit transaction - {err:?}");
884                CommitError(RusqliteError::from(err))
885            });
886            histograms.record_commit(now.elapsed());
887            Ok(commit_result)
888        }
889
890        fn apply_all(
891            conn: &mut Connection,
892            ltx_list: &mut [(LogicalTx, ApplyOrSkip)],
893            histograms: &mut Histograms,
894            all_fns_start: Instant,
895            shutdown_requested: &AtomicBool,
896        ) -> Result<CommitResult, ShutdownRequested> {
897            // TODO: Investigate SAVEPOINT + RELEASE SAVEPOINT, ROLLBACK savepoint instead.
898            loop {
899                match conn.transaction_with_behavior(TransactionBehavior::Immediate) {
900                    Ok(ptx) => {
901                        if let Ok(commit_res) =
902                            try_apply_all(ptx, ltx_list, histograms, all_fns_start)
903                        {
904                            return Ok(commit_res);
905                        }
906                    }
907                    Err(begin_err) => {
908                        error!("Cannot open transaction - {begin_err:?}");
909                        std::thread::sleep(Duration::from_millis(100));
910                        if shutdown_requested.load(Ordering::Acquire) {
911                            return Err(ShutdownRequested);
912                        }
913                    }
914                }
915            }
916        }
917        let ok_or_commit_error = apply_all(
918            conn,
919            &mut ltx_list,
920            histograms,
921            all_fns_start,
922            shutdown_requested,
923        )?;
924
925        for (ltx, apply_or_skip) in ltx_list {
926            let to_send = match apply_or_skip {
927                ApplyOrSkip::Apply => ok_or_commit_error.clone(),
928                ApplyOrSkip::Skip => {
929                    Ok(()) // tx was aborted, caller will receive error from the LTX fn
930                }
931            };
932            // Ignore the sending result: ThreadCommand producer timed out before awaiting the ack.
933            let _ = ltx.phytx_flush_sender.send(to_send);
934        }
935
936        histograms.print_if_elapsed();
937        Ok(())
938    }
939
940    // Returning error means the PhyTx needs to be rolled back, implying LTX contained multiple writes.
941    fn ltx_apply_to_phytx(
942        ltx: &mut LogicalTx,
943        physical_tx: &mut Transaction,
944        histograms: &mut Histograms,
945    ) -> Result<(), ShouldRollback> {
946        let sent_latency = ltx.sent_at.elapsed();
947        let started_at = Instant::now();
948        let res = (ltx.func)(physical_tx);
949        histograms.record_command(sent_latency, ltx.func_name, started_at.elapsed());
950        res
951    }
952
953    #[instrument(skip_all, name = "sqlite_new")]
954    pub async fn new<P: AsRef<Path>>(
955        path: P,
956        config: SqliteConfig,
957    ) -> Result<Self, InitializationError> {
958        let path = path.as_ref().to_owned();
959
960        let shutdown_requested = Arc::new(AtomicBool::new(false));
961        let shutdown_finished = Arc::new(AtomicBool::new(false));
962
963        let (command_tx, command_rx) = tokio::sync::mpsc::channel(config.queue_capacity);
964        info!("Sqlite database location: {path:?}");
965        let join_handle = {
966            // Initialize the `Connection`.
967            let init_task = {
968                tokio::task::spawn_blocking(move || {
969                    Self::init_thread(&path, config.pragma_override.unwrap_or_default())
970                })
971                .await
972            };
973            let conn = match init_task {
974                Ok(res) => res?,
975                Err(join_err) => {
976                    error!("Initialization panic - {join_err:?}");
977                    return Err(InitializationError);
978                }
979            };
980            let shutdown_requested = shutdown_requested.clone();
981            let shutdown_finished = shutdown_finished.clone();
982            // Start the RPC thread.
983            std::thread::spawn(move || {
984                Self::connection_rpc(
985                    conn,
986                    &shutdown_requested,
987                    &shutdown_finished,
988                    command_rx,
989                    config.metrics_threshold,
990                );
991            })
992        };
993        Ok(SqlitePool(SqlitePoolInner {
994            shutdown_requested,
995            shutdown_finished,
996            command_tx,
997            response_subscribers: Arc::default(),
998            pending_subscribers: Arc::default(),
999            join_handle: Some(Arc::new(join_handle)),
1000            execution_finished_subscribers: Arc::default(),
1001        }))
1002    }
1003
1004    /// Invokes the provided function wrapping a new [`rusqlite::Transaction`] that is committed automatically.
1005    async fn transaction<F, T, E>(
1006        &self,
1007        mut func: F,
1008        tx_type: TxType,
1009        func_name: &'static str,
1010    ) -> Result<T, E>
1011    where
1012        F: FnMut(&mut rusqlite::Transaction) -> Result<T, E> + Send + 'static,
1013        T: Send + 'static,
1014        E: From<RusqliteError> + Send + 'static,
1015    {
1016        let fn_res: Arc<std::sync::Mutex<Option<_>>> = Arc::default();
1017        let (phytx_flush_sender, phytx_flush_receiver) = oneshot::channel();
1018        let current_span = Span::current();
1019        let thread_command_func = {
1020            let fn_res = fn_res.clone();
1021            ThreadCommand::LogicalTx(LogicalTx {
1022                func: Box::new(move |tx| {
1023                    let _guard = current_span.enter();
1024                    let func_res = func(tx);
1025                    let res = if func_res.is_ok() {
1026                        Ok(())
1027                    } else {
1028                        Err(ShouldRollback)
1029                    };
1030                    // save result to be sent to the caller
1031                    *fn_res.lock().unwrap() = Some(func_res);
1032                    match tx_type {
1033                        TxType::MultipleWrites => res,
1034                        TxType::Other => Ok(()),
1035                    }
1036                }),
1037                sent_at: Instant::now(),
1038                func_name,
1039                phytx_flush_sender,
1040                priority: LtxPriority::High,
1041            })
1042        };
1043        self.0
1044            .command_tx
1045            .send(thread_command_func)
1046            .await
1047            .map_err(|_send_err| RusqliteError::Close)?;
1048
1049        // Wait for commit / rollbeck, then get the retval from the mutex.
1050        match phytx_flush_receiver.await {
1051            Ok(Ok(())) => {
1052                let mut guard = fn_res.lock().unwrap();
1053                std::mem::take(guard.deref_mut()).expect("ltx must have been run at least once")
1054            }
1055            Ok(Err(CommitError(rusqlite_err))) => Err(E::from(rusqlite_err)),
1056            Err(_) => Err(E::from(RusqliteError::Close)),
1057        }
1058    }
1059
1060    /// Invokes the provided function wrapping a new [`rusqlite::Transaction`] that is committed automatically.
1061    async fn transaction_fire_forget<F, T, E>(&self, mut func: F, func_name: &'static str)
1062    where
1063        F: FnMut(&mut rusqlite::Transaction) -> Result<T, E> + Send + 'static,
1064        T: Send + 'static + Default,
1065        E: From<RusqliteError> + Send + 'static,
1066    {
1067        let (commit_ack_sender, _commit_ack_receiver) = oneshot::channel(); // Nobody is interested in receiving the result
1068        let current_span = Span::current();
1069        let thread_command_func = {
1070            ThreadCommand::LogicalTx(LogicalTx {
1071                func: Box::new(move |tx| {
1072                    let _guard = current_span.enter();
1073                    let _ = func(tx);
1074                    Ok(()) // Never rollback
1075                }),
1076                sent_at: Instant::now(),
1077                func_name,
1078                phytx_flush_sender: commit_ack_sender,
1079                priority: LtxPriority::Low,
1080            })
1081        };
1082        let _ = self.0.command_tx.send(thread_command_func).await; // Ignore error when the channel is closed.
1083    }
1084
1085    fn fetch_created_event(
1086        conn: &Connection,
1087        execution_id: &ExecutionId,
1088    ) -> Result<CreateRequest, DbErrorRead> {
1089        let mut stmt = conn.prepare(
1090            "SELECT created_at, json_value FROM t_execution_log WHERE \
1091            execution_id = :execution_id AND version = 0",
1092        )?;
1093        let (created_at, event) = stmt.query_row(
1094            named_params! {
1095                ":execution_id": execution_id.to_string(),
1096            },
1097            |row| {
1098                let created_at = row.get("created_at")?;
1099                let event = row
1100                    .get::<_, JsonWrapper<ExecutionRequest>>("json_value")
1101                    .map_err(|serde| {
1102                        error!("cannot deserialize `Created` event: {row:?} - `{serde:?}`");
1103                        consistency_rusqlite("cannot deserialize `Created` event")
1104                    })?;
1105                Ok((created_at, event.0))
1106            },
1107        )?;
1108        if let ExecutionRequest::Created {
1109            ffqn,
1110            params,
1111            parent,
1112            scheduled_at,
1113            component_id,
1114            deployment_id,
1115            metadata,
1116            scheduled_by,
1117        } = event
1118        {
1119            Ok(CreateRequest {
1120                created_at,
1121                execution_id: execution_id.clone(),
1122                ffqn,
1123                params,
1124                parent,
1125                scheduled_at,
1126                component_id,
1127                deployment_id,
1128                metadata,
1129                scheduled_by,
1130            })
1131        } else {
1132            error!("Row with version=0 must be a `Created` event - {event:?}");
1133            Err(consistency_db_err("expected `Created` event").into())
1134        }
1135    }
1136
1137    fn check_expected_next_and_appending_version(
1138        expected_version: &Version,
1139        appending_version: &Version,
1140    ) -> Result<(), DbErrorWrite> {
1141        if *expected_version != *appending_version {
1142            debug!(
1143                "Version conflict - expected: {expected_version:?}, appending: {appending_version:?}"
1144            );
1145            return Err(DbErrorWrite::NonRetriable(
1146                DbErrorWriteNonRetriable::VersionConflict {
1147                    expected: expected_version.clone(),
1148                    requested: appending_version.clone(),
1149                },
1150            ));
1151        }
1152        Ok(())
1153    }
1154
1155    #[instrument(level = Level::DEBUG, skip_all, fields(execution_id = %req.execution_id))]
1156    fn create_inner(
1157        tx: &Transaction,
1158        req: CreateRequest,
1159    ) -> Result<(AppendResponse, AppendNotifier), DbErrorWrite> {
1160        trace!("create_inner");
1161
1162        let version = Version::default();
1163        let execution_id = req.execution_id.clone();
1164        let execution_id_str = execution_id.to_string();
1165        let ffqn = req.ffqn.clone();
1166        let created_at = req.created_at;
1167        let scheduled_at = req.scheduled_at;
1168        let component_id = req.component_id.clone();
1169        let deployment_id = req.deployment_id;
1170        let event = ExecutionRequest::from(req);
1171        let event_ser = serde_json::to_string(&event).map_err(|err| {
1172            error!("Cannot serialize {event:?} - {err:?}");
1173            DbErrorWriteNonRetriable::ValidationFailed("parameter serialization error".into())
1174        })?;
1175        tx.prepare(
1176                "INSERT INTO t_execution_log (execution_id, created_at, version, json_value, variant, join_set_id ) \
1177                VALUES (:execution_id, :created_at, :version, :json_value, :variant, :join_set_id)")
1178        ?
1179        .execute(named_params! {
1180            ":execution_id": &execution_id_str,
1181            ":created_at": created_at,
1182            ":version": version.0,
1183            ":json_value": event_ser,
1184            ":variant": event.variant(),
1185            ":join_set_id": event.join_set_id().map(std::string::ToString::to_string),
1186        })
1187        ?;
1188        let pending_at = {
1189            debug!("Creating with `Pending(`{scheduled_at:?}`)");
1190            tx.prepare(
1191                r"
1192                INSERT INTO t_state (
1193                    execution_id,
1194                    is_top_level,
1195                    corresponding_version,
1196                    pending_expires_finished,
1197                    ffqn,
1198                    state,
1199                    created_at,
1200                    component_id_input_digest,
1201                    component_type,
1202                    deployment_id,
1203                    updated_at,
1204                    first_scheduled_at,
1205                    intermittent_event_count,
1206                    is_paused
1207                    )
1208                VALUES (
1209                    :execution_id,
1210                    :is_top_level,
1211                    :corresponding_version,
1212                    :pending_expires_finished,
1213                    :ffqn,
1214                    :state,
1215                    :created_at,
1216                    :component_id_input_digest,
1217                    :component_type,
1218                    :deployment_id,
1219                    CURRENT_TIMESTAMP,
1220                    :first_scheduled_at,
1221                    0,
1222                    false
1223                    )
1224                ",
1225            )?
1226            .execute(named_params! {
1227                ":execution_id": execution_id.to_string(),
1228                ":is_top_level": execution_id.is_top_level(),
1229                ":corresponding_version": version.0,
1230                ":pending_expires_finished": scheduled_at,
1231                ":ffqn": ffqn.to_string(),
1232                ":state": STATE_PENDING_AT,
1233                ":created_at": created_at,
1234                ":component_id_input_digest": component_id.input_digest,
1235                ":component_type": component_id.component_type,
1236                ":deployment_id": deployment_id.to_string(),
1237                ":first_scheduled_at": scheduled_at,
1238            })?;
1239            AppendNotifier {
1240                pending_at: Some(NotifierPendingAt {
1241                    scheduled_at,
1242                    ffqn,
1243                    component_input_digest: component_id.input_digest,
1244                }),
1245                execution_finished: None,
1246                response: None,
1247            }
1248        };
1249        let next_version = Version::new(version.0 + 1);
1250        Ok((next_version, pending_at))
1251    }
1252
1253    #[instrument(level = Level::DEBUG, skip_all, fields(%execution_id, %scheduled_at))]
1254    fn update_state_pending_after_response_appended(
1255        tx: &Transaction,
1256        execution_id: &ExecutionId,
1257        scheduled_at: DateTime<Utc>, // Changing to state PendingAt
1258        component_input_digest: InputContentDigest,
1259    ) -> Result<AppendNotifier, DbErrorWrite> {
1260        debug!("Setting t_state to Pending(`{scheduled_at:?}`) after response appended");
1261        let mut stmt = tx
1262            .prepare_cached(
1263                r"
1264                UPDATE t_state
1265                SET
1266                    pending_expires_finished = :pending_expires_finished,
1267                    state = :state,
1268                    updated_at = CURRENT_TIMESTAMP,
1269                    max_retries = NULL,
1270                    retry_exp_backoff_millis = NULL,
1271                    last_lock_version = NULL,
1272
1273                    join_set_id = NULL,
1274                    join_set_closing = NULL,
1275
1276                    result_kind = NULL
1277                WHERE execution_id = :execution_id
1278            ",
1279            )
1280            .map_err(|err| DbErrorGeneric::Uncategorized {
1281                reason: err.to_string().into(),
1282                context: SpanTrace::capture(),
1283                source: Some(Arc::new(err)),
1284                loc: Location::caller(),
1285            })?;
1286        let updated = stmt
1287            .execute(named_params! {
1288                ":execution_id": execution_id,
1289                ":pending_expires_finished": scheduled_at,
1290                ":state": STATE_PENDING_AT,
1291            })
1292            .map_err(|err| DbErrorGeneric::Uncategorized {
1293                reason: err.to_string().into(),
1294                context: SpanTrace::capture(),
1295                source: Some(Arc::new(err)),
1296                loc: Location::caller(),
1297            })?;
1298        if updated != 1 {
1299            return Err(DbErrorWrite::NotFound);
1300        }
1301        Ok(AppendNotifier {
1302            pending_at: Some(NotifierPendingAt {
1303                scheduled_at,
1304                ffqn: Self::fetch_created_event(tx, execution_id)?.ffqn,
1305                component_input_digest,
1306            }),
1307            execution_finished: None,
1308            response: None,
1309        })
1310    }
1311
1312    #[instrument(level = Level::DEBUG, skip_all, fields(%execution_id, %scheduled_at, %appending_version))]
1313    fn update_state_pending_after_event_appended(
1314        tx: &Transaction,
1315        execution_id: &ExecutionId,
1316        appending_version: &Version,
1317        scheduled_at: DateTime<Utc>, // Changing to state PendingAt
1318        intermittent_failure: bool,
1319        component_input_digest: InputContentDigest,
1320    ) -> Result<(AppendResponse, AppendNotifier), DbErrorWrite> {
1321        debug!("Setting t_state to Pending(`{scheduled_at:?}`) after event appended");
1322        let mut stmt = tx.prepare_cached(
1323            r"
1324                UPDATE t_state
1325                SET
1326                    corresponding_version = :appending_version,
1327                    pending_expires_finished = :pending_expires_finished,
1328                    state = :state,
1329                    updated_at = CURRENT_TIMESTAMP,
1330                    intermittent_event_count = intermittent_event_count + :intermittent_delta,
1331
1332                    max_retries = NULL,
1333                    retry_exp_backoff_millis = NULL,
1334                    last_lock_version = NULL,
1335
1336                    join_set_id = NULL,
1337                    join_set_closing = NULL,
1338
1339                    result_kind = NULL
1340                WHERE execution_id = :execution_id;
1341            ", // `executor_id` and `run_id` are preserved for lock extension.
1342        )?;
1343        let updated = stmt
1344            .execute(named_params! {
1345                ":execution_id": execution_id.to_string(),
1346                ":appending_version": appending_version.0,
1347                ":pending_expires_finished": scheduled_at,
1348                ":state": STATE_PENDING_AT,
1349                ":intermittent_delta": i32::from(intermittent_failure) // 0 or 1
1350            })
1351            .map_err(DbErrorWrite::from)?;
1352        if updated != 1 {
1353            return Err(DbErrorWrite::NotFound);
1354        }
1355        Ok((
1356            appending_version.increment(),
1357            AppendNotifier {
1358                pending_at: Some(NotifierPendingAt {
1359                    scheduled_at,
1360                    ffqn: Self::fetch_created_event(tx, execution_id)?.ffqn,
1361                    component_input_digest,
1362                }),
1363                execution_finished: None,
1364                response: None,
1365            },
1366        ))
1367    }
1368
1369    #[expect(clippy::too_many_arguments)]
1370    #[instrument(level = Level::DEBUG, skip_all, fields(%execution_id, %appending_version))]
1371    fn update_state_locked_get_intermittent_event_count(
1372        tx: &Transaction,
1373        execution_id: &ExecutionId,
1374        deployment_id: DeploymentId,
1375        component_digest: &InputContentDigest,
1376        executor_id: ExecutorId,
1377        run_id: RunId,
1378        lock_expires_at: DateTime<Utc>,
1379        appending_version: &Version,
1380        retry_config: ComponentRetryConfig,
1381    ) -> Result<u32, DbErrorWrite> {
1382        debug!("Setting t_state to Locked(`{lock_expires_at:?}`)");
1383        let backoff_millis =
1384            i64::try_from(retry_config.retry_exp_backoff.as_millis()).map_err(|err| {
1385                DbErrorGeneric::Uncategorized {
1386                    reason: "backoff too big".into(),
1387                    context: SpanTrace::capture(),
1388                    source: Some(Arc::new(err)),
1389                    loc: Location::caller(),
1390                }
1391            })?; // Keep equal to Postgres' BIGINT = i64
1392        let execution_id_str = execution_id.to_string();
1393        let mut stmt = tx.prepare_cached(
1394            r"
1395                UPDATE t_state
1396                SET
1397                    corresponding_version = :appending_version,
1398                    pending_expires_finished = :pending_expires_finished,
1399                    state = :state,
1400                    updated_at = CURRENT_TIMESTAMP,
1401                    deployment_id = :deployment_id,
1402                    component_id_input_digest = :component_id_input_digest,
1403
1404                    max_retries = :max_retries,
1405                    retry_exp_backoff_millis = :retry_exp_backoff_millis,
1406                    last_lock_version = :appending_version,
1407                    executor_id = :executor_id,
1408                    run_id = :run_id,
1409
1410                    join_set_id = NULL,
1411                    join_set_closing = NULL,
1412
1413                    result_kind = NULL
1414                WHERE execution_id = :execution_id
1415                AND is_paused = false
1416            ",
1417        )?;
1418        let updated = stmt.execute(named_params! {
1419            ":execution_id": execution_id_str,
1420            ":appending_version": appending_version.0,
1421            ":pending_expires_finished": lock_expires_at,
1422            ":state": STATE_LOCKED,
1423            ":deployment_id": deployment_id.to_string(),
1424            ":component_id_input_digest": component_digest,
1425            ":max_retries": retry_config.max_retries,
1426            ":retry_exp_backoff_millis": backoff_millis,
1427            ":executor_id": executor_id.to_string(),
1428            ":run_id": run_id.to_string(),
1429        })?;
1430        if updated != 1 {
1431            return Err(DbErrorWrite::NotFound);
1432        }
1433
1434        // fetch intermittent event count from the just-inserted row.
1435        let intermittent_event_count = tx
1436            .prepare(
1437                "SELECT intermittent_event_count FROM t_state WHERE execution_id = :execution_id",
1438            )?
1439            .query_row(
1440                named_params! {
1441                    ":execution_id": execution_id_str,
1442                },
1443                |row| {
1444                    let intermittent_event_count = row.get("intermittent_event_count")?;
1445                    Ok(intermittent_event_count)
1446                },
1447            )?;
1448
1449        Ok(intermittent_event_count)
1450    }
1451
1452    /// Appending [`HistoryEvent::JoinNext`].
1453    #[instrument(level = Level::DEBUG, skip_all, fields(%execution_id, %appending_version))]
1454    fn update_state_blocked(
1455        tx: &Transaction,
1456        execution_id: &ExecutionId,
1457        appending_version: &Version,
1458        // BlockedByJoinSet fields
1459        join_set_id: &JoinSetId,
1460        lock_expires_at: DateTime<Utc>,
1461        join_set_closing: bool,
1462    ) -> Result<
1463        AppendResponse, // next version
1464        DbErrorWrite,
1465    > {
1466        debug!("Setting t_state to BlockedByJoinSet(`{join_set_id}`)");
1467        let execution_id_str = execution_id.to_string();
1468        let mut stmt = tx.prepare_cached(
1469            r"
1470                UPDATE t_state
1471                SET
1472                    corresponding_version = :appending_version,
1473                    pending_expires_finished = :pending_expires_finished,
1474                    state = :state,
1475                    updated_at = CURRENT_TIMESTAMP,
1476
1477                    max_retries = NULL,
1478                    retry_exp_backoff_millis = NULL,
1479                    last_lock_version = NULL,
1480
1481                    join_set_id = :join_set_id,
1482                    join_set_closing = :join_set_closing,
1483
1484                    result_kind = NULL
1485                WHERE execution_id = :execution_id
1486            ",
1487        )?;
1488        let updated = stmt.execute(named_params! {
1489            ":execution_id": execution_id_str,
1490            ":appending_version": appending_version.0,
1491            ":pending_expires_finished": lock_expires_at,
1492            ":state": STATE_BLOCKED_BY_JOIN_SET,
1493            ":join_set_id": join_set_id,
1494            ":join_set_closing": join_set_closing,
1495        })?;
1496        if updated != 1 {
1497            return Err(DbErrorWrite::NotFound);
1498        }
1499        Ok(appending_version.increment())
1500    }
1501
1502    #[instrument(level = Level::DEBUG, skip_all, fields(%execution_id, %appending_version))]
1503    fn update_state_finished(
1504        tx: &Transaction,
1505        execution_id: &ExecutionId,
1506        appending_version: &Version,
1507        // Finished fields
1508        finished_at: DateTime<Utc>,
1509        result_kind: PendingStateFinishedResultKind,
1510    ) -> Result<(), DbErrorWrite> {
1511        debug!("Setting t_state to Finished");
1512        let execution_id_str = execution_id.to_string();
1513        let mut stmt = tx.prepare_cached(
1514            r"
1515                UPDATE t_state
1516                SET
1517                    corresponding_version = :appending_version,
1518                    pending_expires_finished = :pending_expires_finished,
1519                    state = :state,
1520                    updated_at = CURRENT_TIMESTAMP,
1521
1522                    max_retries = NULL,
1523                    retry_exp_backoff_millis = NULL,
1524                    last_lock_version = NULL,
1525                    executor_id = NULL,
1526                    run_id = NULL,
1527
1528                    join_set_id = NULL,
1529                    join_set_closing = NULL,
1530
1531                    result_kind = :result_kind
1532                WHERE execution_id = :execution_id
1533            ",
1534        )?;
1535
1536        let updated = stmt.execute(named_params! {
1537            ":execution_id": execution_id_str,
1538            ":appending_version": appending_version.0,
1539            ":pending_expires_finished": finished_at,
1540            ":state": STATE_FINISHED,
1541            ":result_kind": JsonWrapper(result_kind),
1542        })?;
1543        if updated != 1 {
1544            return Err(DbErrorWrite::NotFound);
1545        }
1546        Ok(())
1547    }
1548
1549    #[instrument(level = Level::DEBUG, skip_all, fields(%execution_id, %appending_version, %is_paused))]
1550    fn update_state_paused(
1551        tx: &Transaction,
1552        execution_id: &ExecutionId,
1553        appending_version: &Version,
1554        is_paused: bool,
1555    ) -> Result<AppendResponse, DbErrorWrite> {
1556        debug!(
1557            "Setting t_state to {}",
1558            if is_paused { "paused" } else { "unpaused" }
1559        );
1560        let execution_id_str = execution_id.to_string();
1561        let mut stmt = tx.prepare_cached(
1562            r"
1563                UPDATE t_state
1564                SET
1565                    corresponding_version = :appending_version,
1566                    is_paused = :is_paused,
1567                    updated_at = CURRENT_TIMESTAMP
1568                WHERE execution_id = :execution_id
1569            ",
1570        )?;
1571
1572        let updated = stmt.execute(named_params! {
1573            ":execution_id": execution_id_str,
1574            ":appending_version": appending_version.0,
1575            ":is_paused": is_paused,
1576        })?;
1577        if updated != 1 {
1578            return Err(DbErrorWrite::NotFound);
1579        }
1580        Ok(appending_version.increment())
1581    }
1582
1583    // Upon appending new event to t_execution_log, copy the previous t_state with changed appending_version and created_at.
1584    #[instrument(level = Level::DEBUG, skip_all, fields(%execution_id, %appending_version))]
1585    fn bump_state_next_version(
1586        tx: &Transaction,
1587        execution_id: &ExecutionId,
1588        appending_version: &Version,
1589        delay_req: Option<DelayReq>,
1590    ) -> Result<AppendResponse /* next version */, DbErrorWrite> {
1591        debug!("update_index_version");
1592        let execution_id_str = execution_id.to_string();
1593        let mut stmt = tx.prepare_cached(
1594            r"
1595                UPDATE t_state
1596                SET
1597                    corresponding_version = :appending_version,
1598                    updated_at = CURRENT_TIMESTAMP
1599                WHERE execution_id = :execution_id
1600            ",
1601        )?;
1602        let updated = stmt.execute(named_params! {
1603            ":execution_id": execution_id_str,
1604            ":appending_version": appending_version.0,
1605        })?;
1606        if updated != 1 {
1607            return Err(DbErrorWrite::NotFound);
1608        }
1609        if let Some(DelayReq {
1610            join_set_id,
1611            delay_id,
1612            expires_at,
1613        }) = delay_req
1614        {
1615            debug!("Inserting delay to `t_delay`");
1616            let mut stmt = tx.prepare_cached(
1617                "INSERT INTO t_delay (execution_id, join_set_id, delay_id, expires_at) \
1618                VALUES \
1619                (:execution_id, :join_set_id, :delay_id, :expires_at)",
1620            )?;
1621            stmt.execute(named_params! {
1622                ":execution_id": execution_id_str,
1623                ":join_set_id": join_set_id.to_string(),
1624                ":delay_id": delay_id.to_string(),
1625                ":expires_at": expires_at,
1626            })?;
1627        }
1628        Ok(appending_version.increment())
1629    }
1630
1631    fn get_combined_state(
1632        tx: &Transaction,
1633        execution_id: &ExecutionId,
1634    ) -> Result<CombinedState, DbErrorRead> {
1635        let mut stmt = tx.prepare(
1636            r"
1637                SELECT
1638                    created_at, first_scheduled_at,
1639                    state, ffqn, component_id_input_digest, component_type, deployment_id,
1640                    corresponding_version, pending_expires_finished,
1641                    last_lock_version, executor_id, run_id,
1642                    join_set_id, join_set_closing,
1643                    result_kind, is_paused
1644                    FROM t_state
1645                WHERE
1646                    execution_id = :execution_id
1647                ",
1648        )?;
1649        stmt.query_row(
1650            named_params! {
1651                ":execution_id": execution_id.to_string(),
1652            },
1653            |row| {
1654                CombinedState::new(
1655                    CombinedStateDTO {
1656                        execution_id: execution_id.clone(),
1657                        created_at: row.get("created_at")?,
1658                        first_scheduled_at: row.get("first_scheduled_at")?,
1659                        component_digest: row.get("component_id_input_digest")?,
1660                        component_type: row.get("component_type")?,
1661                        deployment_id: row.get("deployment_id")?,
1662                        state: row.get("state")?,
1663                        ffqn: row.get("ffqn")?,
1664                        pending_expires_finished: row
1665                            .get::<_, DateTime<Utc>>("pending_expires_finished")?,
1666                        last_lock_version: row
1667                            .get::<_, Option<VersionType>>("last_lock_version")?
1668                            .map(Version::new),
1669                        executor_id: row.get::<_, Option<ExecutorId>>("executor_id")?,
1670                        run_id: row.get::<_, Option<RunId>>("run_id")?,
1671                        join_set_id: row.get::<_, Option<JoinSetId>>("join_set_id")?,
1672                        join_set_closing: row.get::<_, Option<bool>>("join_set_closing")?,
1673                        result_kind: row
1674                            .get::<_, Option<JsonWrapper<PendingStateFinishedResultKind>>>(
1675                                "result_kind",
1676                            )?
1677                            .map(|wrapper| wrapper.0),
1678                        is_paused: row.get("is_paused")?,
1679                    },
1680                    Version::new(row.get("corresponding_version")?),
1681                )
1682                .map_err(|e| from_generic_error(&e))
1683            },
1684        )
1685        .map_err(DbErrorRead::from)
1686    }
1687
1688    fn list_executions(
1689        read_tx: &Transaction,
1690        filter: &ListExecutionsFilter,
1691        pagination: &ExecutionListPagination,
1692    ) -> Result<Vec<ExecutionWithState>, RusqliteError> {
1693        #[derive(Debug)]
1694        struct StatementModifier<'a> {
1695            where_vec: Vec<String>,
1696            params: Vec<(&'static str, ToSqlOutput<'a>)>,
1697            limit: u32,
1698            limit_desc: bool,
1699        }
1700
1701        fn paginate<'a, T: Clone + rusqlite::ToSql + 'static>(
1702            pagination: &'a Pagination<Option<T>>,
1703            column: &str,
1704            filter: &ListExecutionsFilter,
1705        ) -> Result<StatementModifier<'a>, RusqliteError> {
1706            let mut where_vec: Vec<String> = vec![];
1707            let mut params: Vec<(&'static str, ToSqlOutput<'a>)> = vec![];
1708            let limit = pagination.length();
1709            let limit_desc = pagination.is_desc();
1710            match pagination {
1711                Pagination::NewerThan {
1712                    cursor: Some(cursor),
1713                    ..
1714                }
1715                | Pagination::OlderThan {
1716                    cursor: Some(cursor),
1717                    ..
1718                } => {
1719                    where_vec.push(format!("{column} {rel} :cursor", rel = pagination.rel()));
1720                    let cursor = cursor.to_sql().map_err(|err| {
1721                        error!("Possible program error - cannot convert cursor to sql - {err:?}");
1722                        RusqliteError::Generic {
1723                            reason: "cannot convert cursor to sql".into(),
1724                            context: SpanTrace::capture(),
1725                            source: Some(Arc::new(err)),
1726                            loc: Location::caller(),
1727                        }
1728                    })?;
1729                    params.push((":cursor", cursor));
1730                }
1731                _ => {}
1732            }
1733            if !filter.show_derived {
1734                where_vec.push("is_top_level=true".to_string());
1735            }
1736            Ok(StatementModifier {
1737                where_vec,
1738                params,
1739                limit: u32::from(limit),
1740                limit_desc,
1741            })
1742        }
1743
1744        let mut statement_mod = match pagination {
1745            ExecutionListPagination::CreatedBy(pagination) => {
1746                paginate(pagination, "created_at", filter)?
1747            }
1748            ExecutionListPagination::ExecutionId(pagination) => {
1749                paginate(pagination, "execution_id", filter)?
1750            }
1751        };
1752        let like = |str| format!("{str}%");
1753
1754        let ffqn_temporary;
1755        if let Some(ffqn_prefix) = &filter.ffqn_prefix {
1756            statement_mod.where_vec.push("ffqn LIKE :ffqn".to_string());
1757            ffqn_temporary = like(ffqn_prefix);
1758            let ffqn = ffqn_temporary
1759                .to_sql()
1760                .expect("string conversion never fails");
1761            statement_mod.params.push((":ffqn", ffqn));
1762        }
1763
1764        if filter.hide_finished {
1765            statement_mod
1766                .where_vec
1767                .push(format!("state != '{STATE_FINISHED}'"));
1768        }
1769        let prefix_temporary;
1770        if let Some(prefix) = &filter.execution_id_prefix {
1771            statement_mod
1772                .where_vec
1773                .push("execution_id LIKE :prefix".to_string());
1774            prefix_temporary = like(prefix);
1775            statement_mod.params.push((
1776                ":prefix",
1777                prefix_temporary
1778                    .to_sql()
1779                    .expect("string conversion never fails"),
1780            ));
1781        }
1782
1783        let component_digest_temporary;
1784        if let Some(componnet_digest) = &filter.component_digest {
1785            statement_mod
1786                .where_vec
1787                .push("component_id_input_digest = :component_digest".to_string());
1788            component_digest_temporary = componnet_digest.clone();
1789            let component_digest_sql = component_digest_temporary
1790                .to_sql()
1791                .expect("InputContentDigest conversion never fails");
1792            statement_mod
1793                .params
1794                .push((":component_digest", component_digest_sql));
1795        }
1796
1797        let deployment_id_temporary;
1798        if let Some(deployment_id) = filter.deployment_id {
1799            statement_mod
1800                .where_vec
1801                .push("deployment_id = :deployment_id".to_string());
1802            deployment_id_temporary = deployment_id;
1803            let deployment_id = deployment_id_temporary
1804                .to_sql()
1805                .expect("DeploymentId conversion never fails");
1806            statement_mod.params.push((":deployment_id", deployment_id));
1807        }
1808
1809        let where_str = if statement_mod.where_vec.is_empty() {
1810            String::new()
1811        } else {
1812            format!("WHERE {}", statement_mod.where_vec.join(" AND "))
1813        };
1814
1815        // Inner query: fetch rows with cursor-based ordering
1816        // Outer query: always return results in descending order
1817        let (inner_order, outer_order) = if statement_mod.limit_desc {
1818            ("DESC", "")
1819        } else {
1820            ("", "DESC")
1821        };
1822
1823        let inner_sql = format!(
1824            r"SELECT created_at, first_scheduled_at, component_id_input_digest, component_type, deployment_id,
1825            state, execution_id, ffqn, corresponding_version, pending_expires_finished,
1826            last_lock_version, executor_id, run_id,
1827            join_set_id, join_set_closing,
1828            result_kind, is_paused
1829            FROM t_state {where_str} ORDER BY created_at {inner_order} LIMIT {limit}",
1830            limit = statement_mod.limit,
1831        );
1832
1833        let sql = if outer_order.is_empty() {
1834            inner_sql
1835        } else {
1836            format!("SELECT * FROM ({inner_sql}) AS sub ORDER BY created_at {outer_order}")
1837        };
1838        let vec: Vec<_> = read_tx
1839            .prepare(&sql)?
1840            .query_map::<_, &[(&'static str, ToSqlOutput)], _>(
1841                statement_mod
1842                    .params
1843                    .into_iter()
1844                    .collect::<Vec<_>>()
1845                    .as_ref(),
1846                |row| {
1847                    let combined_state = CombinedState::new(
1848                        CombinedStateDTO {
1849                            execution_id: row.get("execution_id")?,
1850                            created_at: row.get("created_at")?,
1851                            first_scheduled_at: row.get("first_scheduled_at")?,
1852                            component_digest: row.get("component_id_input_digest")?,
1853                            component_type: row.get("component_type")?,
1854                            deployment_id: row.get("deployment_id")?,
1855                            state: row.get("state")?,
1856                            ffqn: row.get("ffqn")?,
1857                            pending_expires_finished: row.get("pending_expires_finished")?,
1858                            executor_id: row.get::<_, Option<ExecutorId>>("executor_id")?,
1859
1860                            last_lock_version: row
1861                                .get::<_, Option<VersionType>>("last_lock_version")?
1862                                .map(Version::new),
1863                            run_id: row.get::<_, Option<RunId>>("run_id")?,
1864                            join_set_id: row.get::<_, Option<JoinSetId>>("join_set_id")?,
1865                            join_set_closing: row.get::<_, Option<bool>>("join_set_closing")?,
1866                            result_kind: row
1867                                .get::<_, Option<JsonWrapper<PendingStateFinishedResultKind>>>(
1868                                    "result_kind",
1869                                )?
1870                                .map(|wrapper| wrapper.0),
1871                            is_paused: row.get("is_paused")?,
1872                        },
1873                        Version::new(row.get("corresponding_version")?),
1874                    )
1875                    .map_err(|e| from_generic_error(&e))?;
1876                    Ok(combined_state.execution_with_state)
1877                },
1878            )?
1879            .collect::<Vec<Result<_, _>>>()
1880            .into_iter()
1881            .filter_map(|row| match row {
1882                Ok(row) => Some(row),
1883                Err(err) => {
1884                    warn!("Skipping row - {err:?}");
1885                    None
1886                }
1887            })
1888            .collect();
1889
1890        Ok(vec)
1891    }
1892
1893    fn list_responses(
1894        tx: &Transaction,
1895        execution_id: &ExecutionId,
1896        pagination: Option<Pagination<u32>>,
1897    ) -> Result<Vec<ResponseWithCursor>, DbErrorRead> {
1898        // TODO: Add test
1899        let mut params: Vec<(&'static str, Box<dyn rusqlite::ToSql>)> = vec![];
1900        let mut sql = "SELECT \
1901            r.id, r.created_at, r.join_set_id,  r.delay_id, r.delay_success, r.child_execution_id, r.finished_version, l.json_value \
1902            FROM t_join_set_response r LEFT OUTER JOIN t_execution_log l ON r.child_execution_id = l.execution_id \
1903            WHERE \
1904            r.execution_id = :execution_id \
1905            AND ( r.finished_version = l.version OR r.child_execution_id IS NULL ) \
1906            "
1907        .to_string();
1908        let limit = match &pagination {
1909            Some(
1910                pagination @ (Pagination::NewerThan { cursor, .. }
1911                | Pagination::OlderThan { cursor, .. }),
1912            ) => {
1913                params.push((":cursor", Box::new(cursor)));
1914                write!(sql, " AND r.id {rel} :cursor", rel = pagination.rel()).unwrap();
1915                Some(pagination.length())
1916            }
1917            None => None,
1918        };
1919        sql.push_str(" ORDER BY id");
1920        let is_desc = pagination.as_ref().is_some_and(Pagination::is_desc);
1921        if is_desc {
1922            sql.push_str(" DESC");
1923        }
1924        if let Some(limit) = limit {
1925            write!(sql, " LIMIT {limit}").unwrap();
1926        }
1927        // Re-order to ascending for consistent oldest-to-newest results
1928        if is_desc {
1929            sql = format!("SELECT * FROM ({sql}) ORDER BY id ASC");
1930        }
1931        params.push((":execution_id", Box::new(execution_id.to_string())));
1932        tx.prepare(&sql)?
1933            .query_map::<_, &[(&'static str, &dyn ToSql)], _>(
1934                params
1935                    .iter()
1936                    .map(|(key, value)| (*key, value.as_ref()))
1937                    .collect::<Vec<_>>()
1938                    .as_ref(),
1939                Self::parse_response_with_cursor,
1940            )?
1941            .collect::<Result<Vec<_>, rusqlite::Error>>()
1942            .map_err(DbErrorRead::from)
1943    }
1944
1945    fn parse_response_with_cursor(
1946        row: &rusqlite::Row<'_>,
1947    ) -> Result<ResponseWithCursor, rusqlite::Error> {
1948        let id = row.get("id")?;
1949        let created_at: DateTime<Utc> = row.get("created_at")?;
1950        let join_set_id = row.get::<_, JoinSetId>("join_set_id")?;
1951        let event = match (
1952            row.get::<_, Option<DelayId>>("delay_id")?,
1953            row.get::<_, Option<bool>>("delay_success")?,
1954            row.get::<_, Option<ExecutionIdDerived>>("child_execution_id")?,
1955            row.get::<_, Option<VersionType>>("finished_version")?,
1956            row.get::<_, Option<JsonWrapper<ExecutionRequest>>>("json_value")?,
1957        ) {
1958            (Some(delay_id), Some(delay_success), None, None, None) => {
1959                JoinSetResponse::DelayFinished {
1960                    delay_id,
1961                    result: delay_success.then_some(()).ok_or(()),
1962                }
1963            }
1964            (
1965                None,
1966                None,
1967                Some(child_execution_id),
1968                Some(finished_version),
1969                Some(JsonWrapper(ExecutionRequest::Finished { result, .. })),
1970            ) => JoinSetResponse::ChildExecutionFinished {
1971                child_execution_id,
1972                finished_version: Version(finished_version),
1973                result,
1974            },
1975            (delay, delay_success, child, finished, result) => {
1976                error!(
1977                    "Invalid row in t_join_set_response {id} - {delay:?} {delay_success:?} {child:?} {finished:?} {:?}",
1978                    result.map(|it| it.0)
1979                );
1980                return Err(consistency_rusqlite("invalid row in t_join_set_response"));
1981            }
1982        };
1983        Ok(ResponseWithCursor {
1984            cursor: ResponseCursor(id),
1985            event: JoinSetResponseEventOuter {
1986                event: JoinSetResponseEvent { join_set_id, event },
1987                created_at,
1988            },
1989        })
1990    }
1991
1992    #[instrument(level = Level::TRACE, skip(tx))]
1993    #[expect(clippy::too_many_arguments)]
1994    fn lock_single_execution(
1995        tx: &Transaction,
1996        created_at: DateTime<Utc>,
1997        component_id: &ComponentId,
1998        deployment_id: DeploymentId,
1999        execution_id: &ExecutionId,
2000        run_id: RunId,
2001        appending_version: &Version,
2002        executor_id: ExecutorId,
2003        lock_expires_at: DateTime<Utc>,
2004        retry_config: ComponentRetryConfig,
2005    ) -> Result<LockedExecution, DbErrorWrite> {
2006        debug!("lock_single_execution");
2007        let combined_state = Self::get_combined_state(tx, execution_id)?;
2008        combined_state
2009            .execution_with_state
2010            .pending_state
2011            .can_append_lock(created_at, executor_id, run_id, lock_expires_at)?;
2012        let expected_version = combined_state.get_next_version_assert_not_finished();
2013        Self::check_expected_next_and_appending_version(&expected_version, appending_version)?;
2014
2015        // Append to `execution_log` table.
2016        let locked_event = Locked {
2017            component_id: component_id.clone(),
2018            deployment_id,
2019            executor_id,
2020            lock_expires_at,
2021            run_id,
2022            retry_config,
2023        };
2024        let event = ExecutionRequest::Locked(locked_event.clone());
2025        let event_ser = serde_json::to_string(&event).map_err(|err| {
2026            warn!("Cannot serialize {event:?} - {err:?}");
2027            DbErrorWriteNonRetriable::ValidationFailed("parameter serialization error".into())
2028        })?;
2029        let mut stmt = tx
2030            .prepare_cached(
2031                "INSERT INTO t_execution_log \
2032            (execution_id, created_at, json_value, version, variant) \
2033            VALUES \
2034            (:execution_id, :created_at, :json_value, :version, :variant)",
2035            )
2036            .map_err(|err| DbErrorGeneric::Uncategorized {
2037                reason: err.to_string().into(),
2038                context: SpanTrace::capture(),
2039                source: Some(Arc::new(err)),
2040                loc: Location::caller(),
2041            })?;
2042        stmt.execute(named_params! {
2043            ":execution_id": execution_id.to_string(),
2044            ":created_at": created_at,
2045            ":json_value": event_ser,
2046            ":version": appending_version.0,
2047            ":variant": event.variant(),
2048        })
2049        .map_err(|err| {
2050            DbErrorWrite::NonRetriable(DbErrorWriteNonRetriable::IllegalState {
2051                reason: "cannot lock".into(),
2052                context: SpanTrace::capture(),
2053                source: Some(Arc::new(err)),
2054                loc: Location::caller(),
2055            })
2056        })?;
2057
2058        let responses = Self::list_responses(tx, execution_id, None)?;
2059        trace!("Responses: {responses:?}");
2060
2061        // Update `t_state`
2062        let intermittent_event_count = Self::update_state_locked_get_intermittent_event_count(
2063            tx,
2064            execution_id,
2065            deployment_id,
2066            &component_id.input_digest,
2067            executor_id,
2068            run_id,
2069            lock_expires_at,
2070            appending_version,
2071            retry_config,
2072        )?;
2073        // Fetch event_history and `Created` event to construct the response.
2074        let mut events = tx
2075            .prepare(
2076                "SELECT json_value, version FROM t_execution_log WHERE \
2077                execution_id = :execution_id AND (variant = :variant1 OR variant = :variant2) \
2078                ORDER BY version",
2079            )?
2080            .query_map(
2081                named_params! {
2082                    ":execution_id": execution_id.to_string(),
2083                    ":variant1": DUMMY_CREATED.variant(),
2084                    ":variant2": DUMMY_HISTORY_EVENT.variant(),
2085                },
2086                |row| {
2087                    let created_at_fake = DateTime::from_timestamp_nanos(0); // not used, only the inner event and version
2088                    let event = row
2089                        .get::<_, JsonWrapper<ExecutionRequest>>("json_value")
2090                        .map_err(|serde| {
2091                            error!("Cannot deserialize {row:?} - {serde:?}");
2092                            consistency_rusqlite("cannot deserialize event")
2093                        })?
2094                        .0;
2095                    let version = Version(row.get("version")?);
2096
2097                    Ok(ExecutionEvent {
2098                        created_at: created_at_fake,
2099                        event,
2100                        backtrace_id: None,
2101                        version,
2102                    })
2103                },
2104            )?
2105            .collect::<Result<Vec<_>, _>>()?
2106            .into_iter()
2107            .collect::<VecDeque<_>>();
2108        let Some(ExecutionRequest::Created {
2109            ffqn,
2110            params,
2111            parent,
2112            metadata,
2113            ..
2114        }) = events.pop_front().map(|outer| outer.event)
2115        else {
2116            return Err(consistency_db_err("execution log must contain `Created` event").into());
2117        };
2118
2119        let event_history = events
2120            .into_iter()
2121            .map(|ExecutionEvent { event, version, .. }| {
2122                if let ExecutionRequest::HistoryEvent { event } = event {
2123                    Ok((event, version))
2124                } else {
2125                    Err(consistency_db_err(
2126                        "rows can only contain `Created` and `HistoryEvent` event kinds",
2127                    ))
2128                }
2129            })
2130            .collect::<Result<Vec<_>, _>>()?;
2131
2132        Ok(LockedExecution {
2133            execution_id: execution_id.clone(),
2134            metadata,
2135            next_version: appending_version.increment(),
2136            ffqn,
2137            params,
2138            event_history,
2139            responses,
2140            parent,
2141            intermittent_event_count,
2142            locked_event,
2143        })
2144    }
2145
2146    fn count_join_next(
2147        tx: &Transaction,
2148        execution_id: &ExecutionId,
2149        join_set_id: &JoinSetId,
2150    ) -> Result<u32, DbErrorRead> {
2151        let mut stmt = tx.prepare(
2152            "SELECT COUNT(*) as count FROM t_execution_log WHERE execution_id = :execution_id AND join_set_id = :join_set_id \
2153            AND history_event_type = :join_next",
2154        )?;
2155        Ok(stmt.query_row(
2156            named_params! {
2157                ":execution_id": execution_id.to_string(),
2158                ":join_set_id": join_set_id.to_string(),
2159                ":join_next": HISTORY_EVENT_TYPE_JOIN_NEXT,
2160            },
2161            |row| row.get::<_, u32>("count"),
2162        )?)
2163    }
2164
2165    fn nth_response(
2166        tx: &Transaction,
2167        execution_id: &ExecutionId,
2168        join_set_id: &JoinSetId,
2169        skip_rows: u32,
2170    ) -> Result<Option<ResponseWithCursor>, DbErrorRead> {
2171        // TODO: Add test
2172        tx
2173            .prepare(
2174                "SELECT r.id, r.created_at, r.join_set_id, \
2175                    r.delay_id, r.delay_success, \
2176                    r.child_execution_id, r.finished_version, l.json_value \
2177                    FROM t_join_set_response r LEFT OUTER JOIN t_execution_log l ON r.child_execution_id = l.execution_id \
2178                    WHERE \
2179                    r.execution_id = :execution_id AND r.join_set_id = :join_set_id AND \
2180                    (
2181                    r.finished_version = l.version \
2182                    OR \
2183                    r.child_execution_id IS NULL \
2184                    ) \
2185                    ORDER BY id \
2186                    LIMIT 1 OFFSET :offset",
2187            )
2188            ?
2189            .query_row(
2190                named_params! {
2191                    ":execution_id": execution_id.to_string(),
2192                    ":join_set_id": join_set_id.to_string(),
2193                    ":offset": skip_rows,
2194                },
2195                Self::parse_response_with_cursor,
2196            )
2197            .optional()
2198            .map_err(DbErrorRead::from)
2199    }
2200
2201    #[instrument(level = Level::TRACE, skip_all, fields(%execution_id, %appending_version))]
2202    #[expect(clippy::needless_return)]
2203    fn append(
2204        tx: &Transaction,
2205        execution_id: &ExecutionId,
2206        req: AppendRequest,
2207        appending_version: Version,
2208    ) -> Result<(AppendResponse, AppendNotifier), DbErrorWrite> {
2209        if matches!(req.event, ExecutionRequest::Created { .. }) {
2210            return Err(DbErrorWrite::NonRetriable(
2211                DbErrorWriteNonRetriable::ValidationFailed(
2212                    "cannot append `Created` event - use `create` instead".into(),
2213                ),
2214            ));
2215        }
2216        if let AppendRequest {
2217            event:
2218                ExecutionRequest::Locked(Locked {
2219                    component_id,
2220                    deployment_id,
2221                    executor_id,
2222                    run_id,
2223                    lock_expires_at,
2224                    retry_config,
2225                }),
2226            created_at,
2227        } = req
2228        {
2229            return Self::lock_single_execution(
2230                tx,
2231                created_at,
2232                &component_id,
2233                deployment_id,
2234                execution_id,
2235                run_id,
2236                &appending_version,
2237                executor_id,
2238                lock_expires_at,
2239                retry_config,
2240            )
2241            .map(|locked_execution| (locked_execution.next_version, AppendNotifier::default()));
2242        }
2243
2244        let combined_state = Self::get_combined_state(tx, execution_id)?;
2245        if combined_state
2246            .execution_with_state
2247            .pending_state
2248            .is_finished()
2249        {
2250            debug!("Execution is already finished");
2251            return Err(DbErrorWrite::NonRetriable(
2252                DbErrorWriteNonRetriable::IllegalState {
2253                    reason: "already finished".into(),
2254                    context: SpanTrace::capture(),
2255                    source: None,
2256                    loc: Location::caller(),
2257                },
2258            ));
2259        }
2260
2261        Self::check_expected_next_and_appending_version(
2262            &combined_state.get_next_version_assert_not_finished(),
2263            &appending_version,
2264        )?;
2265        let event_ser = serde_json::to_string(&req.event).map_err(|err| {
2266            error!("Cannot serialize {:?} - {err:?}", req.event);
2267            DbErrorWriteNonRetriable::ValidationFailed("parameter serialization error".into())
2268        })?;
2269
2270        let mut stmt = tx.prepare(
2271                    "INSERT INTO t_execution_log (execution_id, created_at, json_value, version, variant, join_set_id) \
2272                    VALUES (:execution_id, :created_at, :json_value, :version, :variant, :join_set_id)")
2273                    ?;
2274        stmt.execute(named_params! {
2275            ":execution_id": execution_id.to_string(),
2276            ":created_at": req.created_at,
2277            ":json_value": event_ser,
2278            ":version": appending_version.0,
2279            ":variant": req.event.variant(),
2280            ":join_set_id": req.event.join_set_id().map(std::string::ToString::to_string),
2281        })?;
2282        // Calculate current pending state
2283
2284        match &req.event {
2285            ExecutionRequest::Created { .. } => {
2286                unreachable!("handled in the caller")
2287            }
2288
2289            ExecutionRequest::Locked { .. } => {
2290                unreachable!("handled above")
2291            }
2292
2293            ExecutionRequest::TemporarilyFailed {
2294                backoff_expires_at, ..
2295            }
2296            | ExecutionRequest::TemporarilyTimedOut {
2297                backoff_expires_at, ..
2298            } => {
2299                let (next_version, notifier) = Self::update_state_pending_after_event_appended(
2300                    tx,
2301                    execution_id,
2302                    &appending_version,
2303                    *backoff_expires_at,
2304                    true, // an intermittent failure
2305                    combined_state.execution_with_state.component_digest,
2306                )?;
2307                return Ok((next_version, notifier));
2308            }
2309
2310            ExecutionRequest::Unlocked {
2311                backoff_expires_at, ..
2312            } => {
2313                let (next_version, notifier) = Self::update_state_pending_after_event_appended(
2314                    tx,
2315                    execution_id,
2316                    &appending_version,
2317                    *backoff_expires_at,
2318                    false, // not an intermittent failure
2319                    combined_state.execution_with_state.component_digest,
2320                )?;
2321                return Ok((next_version, notifier));
2322            }
2323
2324            ExecutionRequest::Paused => {
2325                match &combined_state.execution_with_state.pending_state {
2326                    PendingState::Finished { .. } => {
2327                        unreachable!("handled above");
2328                    }
2329                    PendingState::Paused(..) => {
2330                        return Err(DbErrorWriteNonRetriable::IllegalState {
2331                            reason: "cannot pause, execution is already paused".into(),
2332                            context: SpanTrace::capture(),
2333                            source: None,
2334                            loc: Location::caller(),
2335                        }
2336                        .into());
2337                    }
2338                    _ => {}
2339                }
2340                let next_version =
2341                    Self::update_state_paused(tx, execution_id, &appending_version, true)?;
2342                return Ok((next_version, AppendNotifier::default()));
2343            }
2344
2345            ExecutionRequest::Unpaused => {
2346                if !combined_state
2347                    .execution_with_state
2348                    .pending_state
2349                    .is_paused()
2350                {
2351                    return Err(DbErrorWriteNonRetriable::IllegalState {
2352                        reason: "cannot unpause, execution is not paused".into(),
2353                        context: SpanTrace::capture(),
2354                        source: None,
2355                        loc: Location::caller(),
2356                    }
2357                    .into());
2358                }
2359                let next_version =
2360                    Self::update_state_paused(tx, execution_id, &appending_version, false)?;
2361                return Ok((next_version, AppendNotifier::default()));
2362            }
2363
2364            ExecutionRequest::Finished { result, .. } => {
2365                Self::update_state_finished(
2366                    tx,
2367                    execution_id,
2368                    &appending_version,
2369                    req.created_at,
2370                    PendingStateFinishedResultKind::from(result),
2371                )?;
2372                return Ok((
2373                    appending_version,
2374                    AppendNotifier {
2375                        pending_at: None,
2376                        execution_finished: Some(NotifierExecutionFinished {
2377                            execution_id: execution_id.clone(),
2378                            retval: result.clone(),
2379                        }),
2380                        response: None,
2381                    },
2382                ));
2383            }
2384
2385            ExecutionRequest::HistoryEvent {
2386                event:
2387                    HistoryEvent::JoinSetCreate { .. }
2388                    | HistoryEvent::JoinSetRequest {
2389                        request: JoinSetRequest::ChildExecutionRequest { .. },
2390                        ..
2391                    }
2392                    | HistoryEvent::Persist { .. }
2393                    | HistoryEvent::Schedule { .. }
2394                    | HistoryEvent::Stub { .. }
2395                    | HistoryEvent::JoinNextTooMany { .. }
2396                    | HistoryEvent::JoinNextTry { .. },
2397            } => {
2398                return Ok((
2399                    Self::bump_state_next_version(tx, execution_id, &appending_version, None)?,
2400                    AppendNotifier::default(),
2401                ));
2402            }
2403
2404            ExecutionRequest::HistoryEvent {
2405                event:
2406                    HistoryEvent::JoinSetRequest {
2407                        join_set_id,
2408                        request:
2409                            JoinSetRequest::DelayRequest {
2410                                delay_id,
2411                                expires_at,
2412                                ..
2413                            },
2414                    },
2415            } => {
2416                return Ok((
2417                    Self::bump_state_next_version(
2418                        tx,
2419                        execution_id,
2420                        &appending_version,
2421                        Some(DelayReq {
2422                            join_set_id: join_set_id.clone(),
2423                            delay_id: delay_id.clone(),
2424                            expires_at: *expires_at,
2425                        }),
2426                    )?,
2427                    AppendNotifier::default(),
2428                ));
2429            }
2430
2431            ExecutionRequest::HistoryEvent {
2432                event:
2433                    HistoryEvent::JoinNext {
2434                        join_set_id,
2435                        run_expires_at,
2436                        closing,
2437                        requested_ffqn: _,
2438                    },
2439            } => {
2440                // Did the response arrive already?
2441                let join_next_count = Self::count_join_next(tx, execution_id, join_set_id)?;
2442                let nth_response =
2443                    Self::nth_response(tx, execution_id, join_set_id, join_next_count - 1)?; // Skip n-1 rows
2444                trace!("join_next_count: {join_next_count}, nth_response: {nth_response:?}");
2445                assert!(join_next_count > 0);
2446                if let Some(ResponseWithCursor {
2447                    event:
2448                        JoinSetResponseEventOuter {
2449                            created_at: nth_created_at,
2450                            ..
2451                        },
2452                    cursor: _,
2453                }) = nth_response
2454                {
2455                    let scheduled_at = max(*run_expires_at, nth_created_at); // No need to block
2456                    let (next_version, notifier) = Self::update_state_pending_after_event_appended(
2457                        tx,
2458                        execution_id,
2459                        &appending_version,
2460                        scheduled_at,
2461                        false, // not an intermittent failure
2462                        combined_state.execution_with_state.component_digest,
2463                    )?;
2464                    return Ok((next_version, notifier));
2465                }
2466                return Ok((
2467                    Self::update_state_blocked(
2468                        tx,
2469                        execution_id,
2470                        &appending_version,
2471                        join_set_id,
2472                        *run_expires_at,
2473                        *closing,
2474                    )?,
2475                    AppendNotifier::default(),
2476                ));
2477            }
2478        }
2479    }
2480
2481    fn append_response(
2482        tx: &Transaction,
2483        execution_id: &ExecutionId,
2484        event: JoinSetResponseEventOuter,
2485    ) -> Result<AppendNotifier, DbErrorWrite> {
2486        let mut stmt = tx.prepare(
2487            "INSERT INTO t_join_set_response (execution_id, created_at, join_set_id, delay_id, delay_success, child_execution_id, finished_version) \
2488                    VALUES (:execution_id, :created_at, :join_set_id, :delay_id, :delay_success, :child_execution_id, :finished_version)",
2489        )?;
2490        let join_set_id = &event.event.join_set_id;
2491        let (delay_id, delay_success) = match &event.event.event {
2492            JoinSetResponse::DelayFinished { delay_id, result } => {
2493                (Some(delay_id.to_string()), Some(result.is_ok()))
2494            }
2495            JoinSetResponse::ChildExecutionFinished { .. } => (None, None),
2496        };
2497        let (child_execution_id, finished_version) = match &event.event.event {
2498            JoinSetResponse::ChildExecutionFinished {
2499                child_execution_id,
2500                finished_version,
2501                result: _,
2502            } => (
2503                Some(child_execution_id.to_string()),
2504                Some(finished_version.0),
2505            ),
2506            JoinSetResponse::DelayFinished { .. } => (None, None),
2507        };
2508
2509        stmt.execute(named_params! {
2510            ":execution_id": execution_id.to_string(),
2511            ":created_at": event.created_at,
2512            ":join_set_id": join_set_id.to_string(),
2513            ":delay_id": delay_id,
2514            ":delay_success": delay_success,
2515            ":child_execution_id": child_execution_id,
2516            ":finished_version": finished_version,
2517        })?;
2518        let cursor = ResponseCursor(
2519            u32::try_from(tx.last_insert_rowid())
2520                .map_err(|_| consistency_db_err("t_join_set_response.id must not be negative"))?,
2521        );
2522
2523        // if the execution is going to be unblocked by this response...
2524        let combined_state = Self::get_combined_state(tx, execution_id)?;
2525        debug!("previous_pending_state: {combined_state:?}");
2526        let mut notifier = if let PendingStateMergedPause::BlockedByJoinSet {
2527            state:
2528                PendingStateBlockedByJoinSet {
2529                    join_set_id: found_join_set_id,
2530                    lock_expires_at, // Set to a future time if the worker is keeping the execution warm waiting for the result.
2531                    closing: _,
2532                },
2533            paused: _,
2534        } =
2535            PendingStateMergedPause::from(combined_state.execution_with_state.pending_state)
2536            && *join_set_id == found_join_set_id
2537        {
2538            // PendingAt should be set to current time if called from expired_timers_watcher,
2539            // or to a future time if the execution is hot.
2540            let scheduled_at = max(lock_expires_at, event.created_at);
2541            // TODO: Add diff test
2542            // Unblock the state.
2543            Self::update_state_pending_after_response_appended(
2544                tx,
2545                execution_id,
2546                scheduled_at,
2547                combined_state.execution_with_state.component_digest,
2548            )?
2549        } else {
2550            AppendNotifier::default()
2551        };
2552        if let JoinSetResponseEvent {
2553            join_set_id,
2554            event:
2555                JoinSetResponse::DelayFinished {
2556                    delay_id,
2557                    result: _,
2558                },
2559        } = &event.event
2560        {
2561            debug!(%join_set_id, %delay_id, "Deleting from `t_delay`");
2562            let mut stmt =
2563                tx.prepare_cached("DELETE FROM t_delay WHERE execution_id = :execution_id AND join_set_id = :join_set_id AND delay_id = :delay_id")
2564                ?;
2565            stmt.execute(named_params! {
2566                ":execution_id": execution_id.to_string(),
2567                ":join_set_id": join_set_id.to_string(),
2568                ":delay_id": delay_id.to_string(),
2569            })?;
2570        }
2571        notifier.response = Some((execution_id.clone(), ResponseWithCursor { cursor, event }));
2572        Ok(notifier)
2573    }
2574
2575    fn append_backtrace(
2576        tx: &Transaction,
2577        backtrace_info: &BacktraceInfo,
2578    ) -> Result<(), DbErrorWrite> {
2579        let backtrace_hash = backtrace_info.wasm_backtrace.hash();
2580
2581        tx.prepare("INSERT OR IGNORE INTO t_wasm_backtrace (backtrace_hash, wasm_backtrace) VALUES (:backtrace_hash, :wasm_backtrace)")?
2582        .execute(named_params! {
2583            ":backtrace_hash": backtrace_hash,
2584            ":wasm_backtrace": JsonWrapper(&backtrace_info.wasm_backtrace)
2585        })?;
2586
2587        tx.prepare(
2588                "INSERT INTO t_execution_backtrace (execution_id, component_id, version_min_including, version_max_excluding, backtrace_hash) \
2589                    VALUES (:execution_id, :component_id, :version_min_including, :version_max_excluding, :backtrace_hash)",
2590        )?
2591        .execute(named_params! {
2592            ":execution_id": backtrace_info.execution_id.to_string(),
2593            ":component_id": JsonWrapper(&backtrace_info.component_id),
2594            ":version_min_including": backtrace_info.version_min_including.0,
2595            ":version_max_excluding": backtrace_info.version_max_excluding.0,
2596            ":backtrace_hash": backtrace_hash,
2597        })?;
2598
2599        Ok(())
2600    }
2601
2602    fn append_log(tx: &Transaction, row: &LogInfoAppendRow) -> Result<(), DbErrorWrite> {
2603        let mut stmt = tx.prepare(
2604            "INSERT INTO t_log (
2605            execution_id,
2606            run_id,
2607            created_at,
2608            level,
2609            message,
2610            stream_type,
2611            payload
2612        ) VALUES (
2613            :execution_id,
2614            :run_id,
2615            :created_at,
2616            :level,
2617            :message,
2618            :stream_type,
2619            :payload
2620        )",
2621        )?;
2622
2623        match &row.log_entry {
2624            LogEntry::Log {
2625                created_at,
2626                level,
2627                message,
2628            } => {
2629                stmt.execute(named_params! {
2630                    ":execution_id": row.execution_id,
2631                    ":run_id": row.run_id,
2632                    ":created_at": created_at,
2633                    ":level": *level as u8,
2634                    ":message": message,
2635                    ":stream_type": Option::<u8>::None,
2636                    ":payload": Option::<Vec<u8>>::None,
2637                })?;
2638            }
2639            LogEntry::Stream {
2640                created_at,
2641                payload,
2642                stream_type,
2643            } => {
2644                stmt.execute(named_params! {
2645                    ":execution_id": row.execution_id,
2646                    ":run_id": row.run_id,
2647                    ":created_at": created_at,
2648                    ":level": Option::<u8>::None,
2649                    ":message": Option::<String>::None,
2650                    ":stream_type": *stream_type as u8,
2651                    ":payload": payload,
2652                })?;
2653            }
2654        }
2655
2656        Ok(())
2657    }
2658
2659    fn get(
2660        tx: &Transaction,
2661        execution_id: &ExecutionId,
2662    ) -> Result<concepts::storage::ExecutionLog, DbErrorRead> {
2663        let mut stmt = tx.prepare(
2664            "SELECT created_at, json_value, version FROM t_execution_log WHERE \
2665                        execution_id = :execution_id ORDER BY version",
2666        )?;
2667        let events = stmt
2668            .query_map(
2669                named_params! {
2670                    ":execution_id": execution_id.to_string(),
2671                },
2672                |row| {
2673                    let created_at = row.get("created_at")?;
2674                    let event = row
2675                        .get::<_, JsonWrapper<ExecutionRequest>>("json_value")
2676                        .map_err(|serde| {
2677                            error!("Cannot deserialize {row:?} - {serde:?}");
2678                            consistency_rusqlite("cannot deserialize event")
2679                        })?
2680                        .0;
2681                    let version = Version(row.get("version")?);
2682
2683                    Ok(ExecutionEvent {
2684                        created_at,
2685                        event,
2686                        backtrace_id: None,
2687                        version,
2688                    })
2689                },
2690            )?
2691            .collect::<Result<Vec<_>, _>>()?;
2692        if events.is_empty() {
2693            return Err(DbErrorRead::NotFound);
2694        }
2695        let combined_state = Self::get_combined_state(tx, execution_id)?;
2696        let responses = Self::list_responses(tx, execution_id, None)?;
2697        Ok(concepts::storage::ExecutionLog {
2698            execution_id: execution_id.clone(),
2699            events,
2700            responses,
2701            next_version: combined_state.get_next_version_or_finished(), // In case of finished, this will be the already last version
2702            pending_state: combined_state.execution_with_state.pending_state,
2703            component_digest: combined_state.execution_with_state.component_digest,
2704            component_type: combined_state.execution_with_state.component_type,
2705            deployment_id: combined_state.execution_with_state.deployment_id,
2706        })
2707    }
2708
2709    fn get_max_version(
2710        tx: &Transaction,
2711        execution_id: &ExecutionId,
2712    ) -> Result<Version, DbErrorRead> {
2713        tx.prepare("SELECT MAX(version) FROM t_execution_log WHERE execution_id = :execution_id")?
2714            .query_row(
2715                named_params! { ":execution_id": execution_id.to_string() },
2716                |row| row.get::<_, Option<VersionType>>(0),
2717            )
2718            .map(|v| v.map(Version::new).ok_or(DbErrorRead::NotFound))
2719            .map_err(DbErrorRead::from)
2720            .flatten()
2721    }
2722
2723    fn get_max_response_cursor(
2724        tx: &Transaction,
2725        execution_id: &ExecutionId,
2726    ) -> Result<ResponseCursor, DbErrorRead> {
2727        let max_cursor = tx
2728            .prepare("SELECT MAX(id) FROM t_join_set_response WHERE execution_id = :execution_id")?
2729            .query_row(
2730                named_params! { ":execution_id": execution_id.to_string() },
2731                |row| row.get::<_, Option<u32>>(0),
2732            )?;
2733        // Assume the execution exists and has no responses
2734        let max_cursor = max_cursor.unwrap_or_default();
2735        Ok(ResponseCursor(max_cursor))
2736    }
2737
2738    fn list_execution_events(
2739        tx: &Transaction,
2740        execution_id: &ExecutionId,
2741        pagination: Pagination<VersionType>,
2742        include_backtrace_id: bool,
2743    ) -> Result<Vec<ExecutionEvent>, DbErrorRead> {
2744        let mut params: Vec<(&'static str, Box<dyn rusqlite::ToSql>)> = vec![];
2745        params.push((":execution_id", Box::new(execution_id.to_string())));
2746
2747        let (cursor, length, rel, is_desc) = match &pagination {
2748            Pagination::NewerThan {
2749                cursor,
2750                length,
2751                including_cursor,
2752            } => (
2753                *cursor,
2754                *length,
2755                if *including_cursor { ">=" } else { ">" },
2756                false,
2757            ),
2758            Pagination::OlderThan {
2759                cursor,
2760                length,
2761                including_cursor,
2762            } => (
2763                *cursor,
2764                *length,
2765                if *including_cursor { "<=" } else { "<" },
2766                true,
2767            ),
2768        };
2769        params.push((":cursor", Box::new(cursor)));
2770
2771        let base_select = if include_backtrace_id {
2772            format!(
2773                "SELECT
2774                    log.created_at,
2775                    log.json_value,
2776                    log.version as version,
2777                    bt.version_min_including AS backtrace_id
2778                FROM
2779                    t_execution_log AS log
2780                LEFT OUTER JOIN
2781                    t_execution_backtrace AS bt ON log.execution_id = bt.execution_id
2782                                    AND log.version >= bt.version_min_including
2783                                    AND log.version < bt.version_max_excluding
2784                WHERE
2785                    log.execution_id = :execution_id
2786                    AND log.version {rel} :cursor"
2787            )
2788        } else {
2789            format!(
2790                "SELECT
2791                    created_at, json_value, NULL as backtrace_id, version
2792                FROM t_execution_log WHERE
2793                    execution_id = :execution_id AND version {rel} :cursor"
2794            )
2795        };
2796
2797        let order = if is_desc { "DESC" } else { "ASC" };
2798        let mut sql = format!("{base_select} ORDER BY version {order} LIMIT {length}");
2799
2800        // Re-order to ascending for consistent oldest-to-newest results
2801        if is_desc {
2802            sql = format!("SELECT * FROM ({sql}) ORDER BY version ASC");
2803        }
2804
2805        tx.prepare(&sql)?
2806            .query_map::<_, &[(&'static str, &dyn ToSql)], _>(
2807                params
2808                    .iter()
2809                    .map(|(key, value)| (*key, value.as_ref()))
2810                    .collect::<Vec<_>>()
2811                    .as_ref(),
2812                |row| {
2813                    let created_at = row.get("created_at")?;
2814                    let backtrace_id = row
2815                        .get::<_, Option<VersionType>>("backtrace_id")?
2816                        .map(Version::new);
2817                    let version = Version(row.get("version")?);
2818
2819                    let event = row
2820                        .get::<_, JsonWrapper<ExecutionRequest>>("json_value")
2821                        .map(|event| ExecutionEvent {
2822                            created_at,
2823                            event: event.0,
2824                            backtrace_id,
2825                            version,
2826                        })
2827                        .map_err(|serde| {
2828                            error!("Cannot deserialize {row:?} - {serde:?}");
2829                            consistency_rusqlite("cannot deserialize")
2830                        })?;
2831                    Ok(event)
2832                },
2833            )?
2834            .collect::<Result<Vec<_>, _>>()
2835            .map_err(DbErrorRead::from)
2836    }
2837
2838    fn map_t_execution_log_row(row: &Row<'_>) -> Result<ExecutionEvent, rusqlite::Error> {
2839        let created_at = row.get("created_at")?;
2840        let event = row
2841            .get::<_, JsonWrapper<ExecutionRequest>>("json_value")
2842            .map_err(|serde| {
2843                error!("Cannot deserialize {row:?} - {serde:?}");
2844                consistency_rusqlite("cannot deserialize event")
2845            })?;
2846        let version = Version(row.get("version")?);
2847
2848        Ok(ExecutionEvent {
2849            created_at,
2850            event: event.0,
2851            backtrace_id: None,
2852            version,
2853        })
2854    }
2855
2856    fn get_execution_event(
2857        tx: &Transaction,
2858        execution_id: &ExecutionId,
2859        version: VersionType,
2860    ) -> Result<ExecutionEvent, DbErrorRead> {
2861        tx.prepare(
2862            "SELECT created_at, json_value, version FROM t_execution_log WHERE \
2863                        execution_id = :execution_id AND version = :version",
2864        )?
2865        .query_row(
2866            named_params! {
2867                ":execution_id": execution_id.to_string(),
2868                ":version": version,
2869            },
2870            SqlitePool::map_t_execution_log_row,
2871        )
2872        .map_err(DbErrorRead::from)
2873    }
2874
2875    fn get_last_execution_event(
2876        tx: &Transaction,
2877        execution_id: &ExecutionId,
2878    ) -> Result<ExecutionEvent, DbErrorRead> {
2879        tx.prepare(
2880            "SELECT created_at, json_value, version FROM t_execution_log WHERE \
2881                        execution_id = :execution_id ORDER BY version DESC LIMIT 1",
2882        )?
2883        .query_row(
2884            named_params! {
2885                ":execution_id": execution_id.to_string(),
2886            },
2887            SqlitePool::map_t_execution_log_row,
2888        )
2889        .map_err(DbErrorRead::from)
2890    }
2891
2892    fn get_delay_response(
2893        tx: &Transaction,
2894        execution_id: &ExecutionId,
2895        delay_id: &DelayId,
2896    ) -> Result<Option<bool>, DbErrorRead> {
2897        // TODO: Add test
2898        tx.prepare(
2899            "SELECT delay_success \
2900                    FROM t_join_set_response \
2901                    WHERE \
2902                    execution_id = :execution_id AND delay_id = :delay_id
2903                    ",
2904        )?
2905        .query_row(
2906            named_params! {
2907                ":execution_id": execution_id.to_string(),
2908                ":delay_id": delay_id.to_string(),
2909            },
2910            |row| {
2911                let delay_success = row.get::<_, bool>("delay_success")?;
2912                Ok(delay_success)
2913            },
2914        )
2915        .optional()
2916        .map_err(DbErrorRead::from)
2917    }
2918
2919    #[instrument(level = Level::TRACE, skip_all)]
2920    /// Find next responses for this execution
2921    fn get_responses_after(
2922        tx: &Transaction,
2923        execution_id: &ExecutionId,
2924        last_response: ResponseCursor,
2925    ) -> Result<Vec<ResponseWithCursor>, DbErrorRead> {
2926        // TODO: Add test
2927        tx.prepare(
2928            "SELECT r.id, r.created_at, r.join_set_id, \
2929            r.delay_id, r.delay_success, \
2930            r.child_execution_id, r.finished_version, child.json_value \
2931            FROM t_join_set_response r LEFT OUTER JOIN t_execution_log child ON r.child_execution_id = child.execution_id \
2932            WHERE \
2933            r.id > :last_response_id AND \
2934            r.execution_id = :execution_id AND \
2935            ( \
2936            r.finished_version = child.version \
2937            OR r.child_execution_id IS NULL \
2938            ) \
2939            ORDER BY id",
2940        )
2941        ?
2942        .query_map(
2943            named_params! {
2944                ":last_response_id": last_response.0,
2945                ":execution_id": execution_id.to_string(),
2946            },
2947            Self::parse_response_with_cursor,
2948        )
2949        ?
2950        .collect::<Result<Vec<_>, _>>()
2951        .map_err(DbErrorRead::from)
2952    }
2953
2954    fn get_pending_of_single_ffqn(
2955        mut stmt: CachedStatement,
2956        batch_size: u32,
2957        pending_at_or_sooner: DateTime<Utc>,
2958        ffqn: &FunctionFqn,
2959    ) -> Result<Vec<(ExecutionId, Version)>, ()> {
2960        stmt.query_map(
2961            named_params! {
2962                ":pending_expires_finished": pending_at_or_sooner,
2963                ":ffqn": ffqn.to_string(),
2964                ":batch_size": batch_size,
2965            },
2966            |row| {
2967                let execution_id = row.get::<_, ExecutionId>("execution_id")?;
2968                let next_version =
2969                    Version::new(row.get::<_, VersionType>("corresponding_version")?).increment();
2970                Ok((execution_id, next_version))
2971            },
2972        )
2973        .map_err(|err| {
2974            warn!("Ignoring consistency error {err:?}");
2975        })?
2976        .collect::<Result<Vec<_>, _>>()
2977        .map_err(|err| {
2978            warn!("Ignoring consistency error {err:?}");
2979        })
2980    }
2981
2982    /// Get executions and their next versions
2983    fn get_pending_by_ffqns(
2984        conn: &Connection,
2985        batch_size: u32,
2986        pending_at_or_sooner: DateTime<Utc>,
2987        ffqns: &[FunctionFqn],
2988    ) -> Result<Vec<(ExecutionId, Version)>, RusqliteError> {
2989        let batch_size = usize::try_from(batch_size).expect("16 bit systems are unsupported");
2990        let mut execution_ids_versions = Vec::with_capacity(batch_size);
2991        for ffqn in ffqns {
2992            // Select executions in PendingAt.
2993            let stmt = conn.prepare_cached(&format!(
2994                r#"
2995                    SELECT execution_id, corresponding_version FROM t_state WHERE
2996                    state = "{STATE_PENDING_AT}" AND
2997                    pending_expires_finished <= :pending_expires_finished AND ffqn = :ffqn
2998                    AND is_paused = false
2999                    ORDER BY pending_expires_finished LIMIT :batch_size
3000                    "#
3001            ))?;
3002
3003            if let Ok(execs_and_versions) = Self::get_pending_of_single_ffqn(
3004                stmt,
3005                u32::try_from(batch_size - execution_ids_versions.len())
3006                    .expect("u32 - anything must fit to u32"),
3007                pending_at_or_sooner,
3008                ffqn,
3009            ) {
3010                execution_ids_versions.extend(execs_and_versions);
3011                if execution_ids_versions.len() == batch_size {
3012                    // Prioritieze lowering of db requests, although ffqns later in the list might get starved.
3013                    break;
3014                }
3015                // consistency errors are ignored since we want to return at least some rows.
3016            }
3017        }
3018        Ok(execution_ids_versions)
3019    }
3020
3021    fn get_pending_by_component_input_digest(
3022        conn: &Connection,
3023        batch_size: u32,
3024        pending_at_or_sooner: DateTime<Utc>,
3025        input_digest: &InputContentDigest,
3026    ) -> Result<Vec<(ExecutionId, Version)>, RusqliteError> {
3027        let mut stmt = conn.prepare_cached(&format!(
3028            r#"
3029                SELECT execution_id, corresponding_version FROM t_state WHERE
3030                state = "{STATE_PENDING_AT}" AND
3031                pending_expires_finished <= :pending_expires_finished AND
3032                component_id_input_digest = :component_id_input_digest
3033                AND is_paused = false
3034                ORDER BY pending_expires_finished LIMIT :batch_size
3035                "#
3036        ))?;
3037
3038        stmt.query_map(
3039            named_params! {
3040                ":pending_expires_finished": pending_at_or_sooner,
3041                ":component_id_input_digest": input_digest,
3042                ":batch_size": batch_size,
3043            },
3044            |row| {
3045                let execution_id = row.get::<_, ExecutionId>("execution_id")?;
3046                let next_version =
3047                    Version::new(row.get::<_, VersionType>("corresponding_version")?).increment();
3048                Ok((execution_id, next_version))
3049            },
3050        )?
3051        .collect::<Result<Vec<_>, _>>()
3052        .map_err(RusqliteError::from)
3053    }
3054
3055    // Must be called after write transaction for a correct happens-before relationship.
3056    #[instrument(level = Level::TRACE, skip_all)]
3057    fn notify_all(&self, notifiers: Vec<AppendNotifier>, current_time: DateTime<Utc>) {
3058        let (pending_ats, finished_execs, responses) = {
3059            let (mut pending_ats, mut finished_execs, mut responses) =
3060                (Vec::new(), Vec::new(), Vec::new());
3061            for notifier in notifiers {
3062                if let Some(pending_at) = notifier.pending_at {
3063                    pending_ats.push(pending_at);
3064                }
3065                if let Some(finished) = notifier.execution_finished {
3066                    finished_execs.push(finished);
3067                }
3068                if let Some(response) = notifier.response {
3069                    responses.push(response);
3070                }
3071            }
3072            (pending_ats, finished_execs, responses)
3073        };
3074
3075        // Notify pending_at subscribers.
3076        if !pending_ats.is_empty() {
3077            let guard = self.0.pending_subscribers.lock().unwrap();
3078            for pending_at in pending_ats {
3079                Self::notify_pending_locked(&pending_at, current_time, &guard);
3080            }
3081        }
3082        // Notify execution finished subscribers.
3083        // Every NotifierExecutionFinished value belongs to a different execution, since only `append(Finished)` can produce `NotifierExecutionFinished`.
3084        if !finished_execs.is_empty() {
3085            let mut guard = self.0.execution_finished_subscribers.lock().unwrap();
3086            for finished in finished_execs {
3087                if let Some(listeners_of_exe_id) = guard.remove(&finished.execution_id) {
3088                    for (_tag, sender) in listeners_of_exe_id {
3089                        // Sending while holding the lock but the oneshot sender does not block.
3090                        // If `wait_for_finished_result` happens after the append, it would receive the finished value instead.
3091                        let _ = sender.send(finished.retval.clone());
3092                    }
3093                }
3094            }
3095        }
3096        // Notify response subscribers.
3097        if !responses.is_empty() {
3098            let mut guard = self.0.response_subscribers.lock().unwrap();
3099            for (execution_id, response) in responses {
3100                if let Some((sender, _)) = guard.remove(&execution_id) {
3101                    let _ = sender.send(response);
3102                }
3103            }
3104        }
3105    }
3106
3107    fn notify_pending_locked(
3108        notifier: &NotifierPendingAt,
3109        current_time: DateTime<Utc>,
3110        ffqn_to_pending_subscription: &std::sync::MutexGuard<PendingFfqnSubscribersHolder>,
3111    ) {
3112        // No need to remove here, cleanup is handled by the caller.
3113        if notifier.scheduled_at <= current_time {
3114            ffqn_to_pending_subscription.notify(notifier);
3115        }
3116    }
3117
3118    fn upgrade_execution_component_single_write(
3119        tx: &Transaction,
3120        execution_id: &ExecutionId,
3121        old: &InputContentDigest,
3122        new: &InputContentDigest,
3123    ) -> Result<(), DbErrorWrite> {
3124        debug!("Updating t_state to component {new}");
3125        let mut stmt = tx.prepare_cached(
3126            r"
3127                UPDATE t_state
3128                SET
3129                    updated_at = CURRENT_TIMESTAMP,
3130                    component_id_input_digest = :new
3131                WHERE
3132                    execution_id = :execution_id AND
3133                    component_id_input_digest = :old
3134            ",
3135        )?;
3136        let updated = stmt.execute(named_params! {
3137            ":execution_id": execution_id,
3138            ":old": old,
3139            ":new": new,
3140        })?;
3141        if updated != 1 {
3142            return Err(DbErrorWrite::NotFound);
3143        }
3144        Ok(())
3145    }
3146
3147    fn list_logs_tx(
3148        tx: &Transaction,
3149        execution_id: &ExecutionId,
3150        filter: &LogFilter,
3151        pagination: &Pagination<u32>,
3152    ) -> Result<ListLogsResponse, DbErrorRead> {
3153        let mut query = String::from(
3154            "SELECT id, run_id, created_at, level, message, stream_type, payload
3155         FROM t_log
3156         WHERE execution_id = :execution_id",
3157        );
3158
3159        let length = pagination.length();
3160        let params = vec![
3161            (":execution_id", &execution_id as &dyn rusqlite::ToSql),
3162            (":cursor", pagination.cursor() as &dyn rusqlite::ToSql),
3163            (":length", &length as &dyn rusqlite::ToSql),
3164        ];
3165
3166        // Logs and streams filter
3167        let level_filter = if filter.should_show_logs() {
3168            let levels_str = if !filter.levels().is_empty() {
3169                filter
3170                    .levels()
3171                    .iter()
3172                    .map(|lvl| (*lvl as u8).to_string())
3173                    .collect::<Vec<_>>()
3174                    .join(",")
3175            } else {
3176                LogLevel::iter()
3177                    .map(|lvl| (lvl as u8).to_string())
3178                    .collect::<Vec<_>>()
3179                    .join(",")
3180            };
3181            Some(format!(" level IN ({levels_str})"))
3182        } else {
3183            None
3184        };
3185        let stream_filter = if filter.should_show_streams() {
3186            let streams_str = if !filter.stream_types().is_empty() {
3187                filter
3188                    .stream_types()
3189                    .iter()
3190                    .map(|st| (*st as u8).to_string())
3191                    .collect::<Vec<_>>()
3192                    .join(",")
3193            } else {
3194                LogStreamType::iter()
3195                    .map(|st| (st as u8).to_string())
3196                    .collect::<Vec<_>>()
3197                    .join(",")
3198            };
3199            Some(format!(" stream_type IN ({streams_str})"))
3200        } else {
3201            None
3202        };
3203        match (level_filter, stream_filter) {
3204            (Some(level_filter), Some(stream_filter)) => {
3205                write!(&mut query, " AND ({level_filter} OR {stream_filter})")
3206                    .expect("writing to string");
3207            }
3208            (Some(level_filter), None) => {
3209                write!(&mut query, " AND {level_filter}").expect("writing to string");
3210            }
3211            (None, Some(stream_filter)) => {
3212                write!(&mut query, " AND {stream_filter}").expect("writing to string");
3213            }
3214            (None, None) => unreachable!("guarded by constructor"),
3215        }
3216
3217        // Pagination
3218        write!(&mut query, " AND id {} :cursor", pagination.rel()).expect("writing to string");
3219
3220        // Ordering
3221        query.push_str(" ORDER BY id ");
3222        query.push_str(pagination.asc_or_desc());
3223
3224        // Limit
3225        query.push_str(" LIMIT :length");
3226
3227        let mut stmt = tx.prepare(&query)?;
3228
3229        let items = stmt
3230            .query_map(params.as_slice(), |row| {
3231                let cursor = row.get("id")?;
3232                let created_at: DateTime<Utc> = row.get("created_at")?;
3233                let run_id = row.get("run_id")?;
3234                let level: Option<u8> = row.get("level")?;
3235                let message: Option<String> = row.get("message")?;
3236                let stream_type: Option<u8> = row.get("stream_type")?;
3237                let payload: Option<Vec<u8>> = row.get("payload")?;
3238
3239                let log_entry = match (level, message, stream_type, payload) {
3240                    (Some(lvl), Some(msg), None, None) => LogEntry::Log {
3241                        created_at,
3242                        level: LogLevel::try_from(lvl).map_err(|_| {
3243                            consistency_rusqlite(format!(
3244                                "cannot convert {lvl} to LogLevel , id: {cursor}"
3245                            ))
3246                        })?,
3247                        message: msg,
3248                    },
3249                    (None, None, Some(stype), Some(pl)) => LogEntry::Stream {
3250                        created_at,
3251                        stream_type: LogStreamType::try_from(stype).map_err(|_| {
3252                            consistency_rusqlite(format!(
3253                                "cannot convert {stype} to LogStreamType , id: {cursor}"
3254                            ))
3255                        })?,
3256                        payload: pl,
3257                    },
3258                    _ => {
3259                        return Err(consistency_rusqlite(format!(
3260                            "invalid t_log row id:{cursor}"
3261                        )));
3262                    }
3263                };
3264                Ok(LogEntryRow {
3265                    cursor,
3266                    run_id,
3267                    log_entry,
3268                })
3269            })?
3270            .collect::<Result<Vec<_>, _>>()?;
3271
3272        Ok(ListLogsResponse {
3273            next_page: items
3274                .last()
3275                .map(|item| Pagination::NewerThan {
3276                    length: pagination.length(),
3277                    cursor: item.cursor,
3278                    including_cursor: false,
3279                })
3280                .unwrap_or({
3281                    if pagination.is_asc() {
3282                        *pagination // no new results, keep the same cursor
3283                    } else {
3284                        // no prev results, let's start from beginning
3285                        Pagination::NewerThan {
3286                            length: pagination.length(),
3287                            cursor: 0,
3288                            including_cursor: false, // does not matter, no row has id = 0
3289                        }
3290                    }
3291                }),
3292            prev_page: match items.first() {
3293                Some(item) => Some(Pagination::OlderThan {
3294                    length: pagination.length(),
3295                    cursor: item.cursor,
3296                    including_cursor: false,
3297                }),
3298                None if pagination.is_asc() && *pagination.cursor() > 0 => {
3299                    // asked for a next page that does not exists (yet).
3300                    Some(pagination.invert())
3301                }
3302                None => None,
3303            },
3304            items,
3305        })
3306    }
3307
3308    fn list_deployment_states(
3309        tx: &Transaction,
3310        current_time: DateTime<Utc>,
3311        pagination: Pagination<Option<DeploymentId>>,
3312    ) -> Result<Vec<DeploymentState>, DbErrorRead> {
3313        let mut params: Vec<(&'static str, Box<dyn ToSql>)> = vec![];
3314        let mut sql = format!(
3315            r"
3316        SELECT
3317            deployment_id,
3318            SUM(state = '{STATE_LOCKED}') AS locked,
3319            SUM(state = '{STATE_PENDING_AT}' AND pending_expires_finished <= :now) AS pending,
3320            SUM(state = '{STATE_PENDING_AT}' AND pending_expires_finished > :now) AS scheduled,
3321            SUM(state = '{STATE_BLOCKED_BY_JOIN_SET}') AS blocked,
3322            SUM(state = '{STATE_FINISHED}') AS finished
3323        FROM t_state"
3324        );
3325
3326        params.push((":now", Box::new(current_time)));
3327
3328        if let Some(cursor) = pagination.cursor() {
3329            params.push((":cursor", Box::new(*cursor)));
3330            write!(
3331                sql,
3332                " WHERE deployment_id {rel} :cursor",
3333                rel = pagination.rel()
3334            )
3335            .expect("writing to string");
3336        }
3337
3338        // Inner query: fetch rows with cursor-based ordering
3339        // Outer query: always return results in descending order
3340        let (inner_order, outer_order) = if pagination.is_desc() {
3341            ("DESC", "")
3342        } else {
3343            ("ASC", "DESC")
3344        };
3345
3346        write!(
3347            sql,
3348            " GROUP BY deployment_id ORDER BY deployment_id {inner_order} LIMIT {limit}",
3349            limit = pagination.length()
3350        )
3351        .expect("writing to string");
3352
3353        let final_sql = if outer_order.is_empty() {
3354            sql
3355        } else {
3356            format!("SELECT * FROM ({sql}) AS sub ORDER BY deployment_id {outer_order}")
3357        };
3358
3359        let result: Vec<DeploymentState> = tx
3360            .prepare(&final_sql)?
3361            .query_map::<_, &[(&'static str, &dyn ToSql)], _>(
3362                params
3363                    .iter()
3364                    .map(|(k, v)| (*k, v.as_ref()))
3365                    .collect::<Vec<_>>()
3366                    .as_ref(),
3367                |row| {
3368                    Ok(DeploymentState {
3369                        deployment_id: row.get("deployment_id")?,
3370                        locked: row.get("locked")?,
3371                        pending: row.get("pending")?,
3372                        scheduled: row.get("scheduled")?,
3373                        blocked: row.get("blocked")?,
3374                        finished: row.get("finished")?,
3375                    })
3376                },
3377            )?
3378            .collect::<Result<Vec<_>, rusqlite::Error>>()
3379            .map_err(DbErrorRead::from)?;
3380
3381        Ok(result)
3382    }
3383
3384    fn pause_execution(
3385        tx: &Transaction,
3386        execution_id: &ExecutionId,
3387        paused_at: DateTime<Utc>,
3388    ) -> Result<Version, DbErrorWrite> {
3389        let combined_state = Self::get_combined_state(tx, execution_id)?;
3390        let appending_version = combined_state.get_next_version_fail_if_finished()?;
3391        debug!("Pausing with {appending_version}");
3392        let (next_version, _) = Self::append(
3393            tx,
3394            execution_id,
3395            AppendRequest {
3396                created_at: paused_at,
3397                event: ExecutionRequest::Paused,
3398            },
3399            appending_version,
3400        )?;
3401        Ok(next_version)
3402    }
3403
3404    fn unpause_execution(
3405        tx: &Transaction,
3406        execution_id: &ExecutionId,
3407        paused_at: DateTime<Utc>,
3408    ) -> Result<Version, DbErrorWrite> {
3409        let combined_state = Self::get_combined_state(tx, execution_id)?;
3410        let appending_version = combined_state.get_next_version_fail_if_finished()?;
3411        debug!("Unpausing with {appending_version}");
3412        let (next_version, _) = Self::append(
3413            tx,
3414            execution_id,
3415            AppendRequest {
3416                created_at: paused_at,
3417                event: ExecutionRequest::Unpaused,
3418            },
3419            appending_version,
3420        )?;
3421        Ok(next_version)
3422    }
3423}
3424
3425#[async_trait]
3426impl DbExecutor for SqlitePool {
3427    #[instrument(level = Level::TRACE, skip(self))]
3428    async fn lock_pending_by_ffqns(
3429        &self,
3430        batch_size: u32,
3431        pending_at_or_sooner: DateTime<Utc>,
3432        ffqns: Arc<[FunctionFqn]>,
3433        created_at: DateTime<Utc>,
3434        component_id: ComponentId,
3435        deployment_id: DeploymentId,
3436        executor_id: ExecutorId,
3437        lock_expires_at: DateTime<Utc>,
3438        run_id: RunId,
3439        retry_config: ComponentRetryConfig,
3440    ) -> Result<LockPendingResponse, DbErrorWrite> {
3441        let execution_ids_versions = self
3442            .transaction(
3443                move |conn| {
3444                    Self::get_pending_by_ffqns(conn, batch_size, pending_at_or_sooner, &ffqns)
3445                },
3446                TxType::Other, // read only
3447                "lock_pending_by_ffqns_get",
3448            )
3449            .await
3450            .map_err(to_generic_error)?;
3451        if execution_ids_versions.is_empty() {
3452            Ok(vec![])
3453        } else {
3454            debug!("Locking {execution_ids_versions:?}");
3455            self.transaction(
3456                move |tx| {
3457                    let mut locked_execs = Vec::with_capacity(execution_ids_versions.len());
3458                    // Append lock
3459                    for (execution_id, version) in &execution_ids_versions {
3460                        locked_execs.push(Self::lock_single_execution(
3461                            tx,
3462                            created_at,
3463                            &component_id,
3464                            deployment_id,
3465                            execution_id,
3466                            run_id,
3467                            version,
3468                            executor_id,
3469                            lock_expires_at,
3470                            retry_config,
3471                        )?);
3472                    }
3473                    Ok::<_, DbErrorWrite>(locked_execs)
3474                },
3475                TxType::MultipleWrites,
3476                "lock_pending_by_ffqns_one",
3477            )
3478            .await
3479        }
3480    }
3481
3482    #[instrument(level = Level::TRACE, skip(self))]
3483    async fn lock_pending_by_component_digest(
3484        &self,
3485        batch_size: u32,
3486        pending_at_or_sooner: DateTime<Utc>,
3487        component_id: &ComponentId,
3488        deployment_id: DeploymentId,
3489        created_at: DateTime<Utc>,
3490        executor_id: ExecutorId,
3491        lock_expires_at: DateTime<Utc>,
3492        run_id: RunId,
3493        retry_config: ComponentRetryConfig,
3494    ) -> Result<LockPendingResponse, DbErrorWrite> {
3495        let component_id = component_id.clone();
3496        let execution_ids_versions = self
3497            .transaction(
3498                {
3499                    let component_id = component_id.clone();
3500                    move |conn| {
3501                        Self::get_pending_by_component_input_digest(
3502                            conn,
3503                            batch_size,
3504                            pending_at_or_sooner,
3505                            &component_id.input_digest,
3506                        )
3507                    }
3508                },
3509                TxType::Other, // read only
3510                "lock_pending_by_component_id_get",
3511            )
3512            .await
3513            .map_err(to_generic_error)?;
3514        if execution_ids_versions.is_empty() {
3515            Ok(vec![])
3516        } else {
3517            debug!("Locking {execution_ids_versions:?}");
3518            self.transaction(
3519                move |tx| {
3520                    let mut locked_execs = Vec::with_capacity(execution_ids_versions.len());
3521                    // Append lock
3522                    for (execution_id, version) in &execution_ids_versions {
3523                        locked_execs.push(Self::lock_single_execution(
3524                            tx,
3525                            created_at,
3526                            &component_id,
3527                            deployment_id,
3528                            execution_id,
3529                            run_id,
3530                            version,
3531                            executor_id,
3532                            lock_expires_at,
3533                            retry_config,
3534                        )?);
3535                    }
3536                    Ok::<_, DbErrorWrite>(locked_execs)
3537                },
3538                TxType::MultipleWrites,
3539                "lock_pending_by_component_id_one",
3540            )
3541            .await
3542        }
3543    }
3544
3545    #[cfg(feature = "test")]
3546    #[instrument(level = Level::DEBUG, skip(self))]
3547    async fn lock_one(
3548        &self,
3549        created_at: DateTime<Utc>,
3550        component_id: ComponentId,
3551        deployment_id: DeploymentId,
3552        execution_id: &ExecutionId,
3553        run_id: RunId,
3554        version: Version,
3555        executor_id: ExecutorId,
3556        lock_expires_at: DateTime<Utc>,
3557        retry_config: ComponentRetryConfig,
3558    ) -> Result<LockedExecution, DbErrorWrite> {
3559        debug!(%execution_id, "lock_one");
3560        let execution_id = execution_id.clone();
3561        self.transaction(
3562            move |tx| {
3563                Self::lock_single_execution(
3564                    tx,
3565                    created_at,
3566                    &component_id,
3567                    deployment_id,
3568                    &execution_id,
3569                    run_id,
3570                    &version,
3571                    executor_id,
3572                    lock_expires_at,
3573                    retry_config,
3574                )
3575            },
3576            TxType::MultipleWrites, // insert + update t_state
3577            "lock_inner",
3578        )
3579        .await
3580    }
3581
3582    #[instrument(level = Level::DEBUG, skip(self, req))]
3583    async fn append(
3584        &self,
3585        execution_id: ExecutionId,
3586        version: Version,
3587        req: AppendRequest,
3588    ) -> Result<AppendResponse, DbErrorWrite> {
3589        debug!(%req, "append");
3590        trace!(?req, "append");
3591        let created_at = req.created_at;
3592        let (version, notifier) = self
3593            .transaction(
3594                move |tx| Self::append(tx, &execution_id, req.clone(), version.clone()),
3595                TxType::MultipleWrites, // insert + update t_state
3596                "append",
3597            )
3598            .await?;
3599        self.notify_all(vec![notifier], created_at);
3600        Ok(version)
3601    }
3602
3603    #[instrument(level = Level::DEBUG, skip_all)]
3604    async fn append_batch_respond_to_parent(
3605        &self,
3606        events: AppendEventsToExecution,
3607        response: AppendResponseToExecution,
3608        current_time: DateTime<Utc>,
3609    ) -> Result<AppendBatchResponse, DbErrorWrite> {
3610        debug!("append_batch_respond_to_parent");
3611        if events.execution_id == response.parent_execution_id {
3612            // Pending state would be wrong.
3613            // This is not a panic because it depends on DB state.
3614            return Err(DbErrorWrite::NonRetriable(
3615                DbErrorWriteNonRetriable::ValidationFailed(
3616                    "Parameters `execution_id` and `parent_execution_id` cannot be the same".into(),
3617                ),
3618            ));
3619        }
3620        if events.batch.is_empty() {
3621            error!("Batch cannot be empty");
3622            return Err(DbErrorWrite::NonRetriable(
3623                DbErrorWriteNonRetriable::ValidationFailed("batch cannot be empty".into()),
3624            ));
3625        }
3626        let (version, notifiers) = {
3627            self.transaction(
3628                move |tx| {
3629                    let mut version = events.version.clone();
3630                    let mut notifier_of_child = None;
3631                    for append_request in &events.batch {
3632                        let (v, n) = Self::append(
3633                            tx,
3634                            &events.execution_id,
3635                            append_request.clone(),
3636                            version,
3637                        )?;
3638                        version = v;
3639                        notifier_of_child = Some(n);
3640                    }
3641
3642                    let pending_at_parent = Self::append_response(
3643                        tx,
3644                        &response.parent_execution_id,
3645                        JoinSetResponseEventOuter {
3646                            created_at: response.created_at,
3647                            event: JoinSetResponseEvent {
3648                                join_set_id: response.join_set_id.clone(),
3649                                event: JoinSetResponse::ChildExecutionFinished {
3650                                    child_execution_id: response.child_execution_id.clone(),
3651                                    finished_version: response.finished_version.clone(),
3652                                    result: response.result.clone(),
3653                                },
3654                            },
3655                        },
3656                    )?;
3657                    Ok::<_, DbErrorWrite>((
3658                        version,
3659                        vec![
3660                            notifier_of_child.expect("checked that the batch is not empty"),
3661                            pending_at_parent,
3662                        ],
3663                    ))
3664                },
3665                TxType::MultipleWrites,
3666                "append_batch_respond_to_parent",
3667            )
3668            .await?
3669        };
3670        self.notify_all(notifiers, current_time);
3671        Ok(version)
3672    }
3673
3674    // Supports only one subscriber (executor) per ffqn.
3675    // A new subscriber replaces the old one, which will eventually time out, which is fine.
3676    #[instrument(level = Level::TRACE, skip(self, timeout_fut))]
3677    async fn wait_for_pending_by_ffqn(
3678        &self,
3679        pending_at_or_sooner: DateTime<Utc>,
3680        ffqns: Arc<[FunctionFqn]>,
3681        timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
3682    ) {
3683        let unique_tag: u64 = rand::random();
3684        let (sender, mut receiver) = mpsc::channel(1); // senders must use `try_send`
3685        {
3686            let mut pending_subscribers = self.0.pending_subscribers.lock().unwrap();
3687            for ffqn in ffqns.as_ref() {
3688                pending_subscribers.insert_ffqn(ffqn.clone(), (sender.clone(), unique_tag));
3689            }
3690        }
3691        async {
3692            let Ok(execution_ids_versions) = self
3693                .transaction(
3694                    {
3695                        let ffqns = ffqns.clone();
3696                        move |conn| Self::get_pending_by_ffqns(conn, 1, pending_at_or_sooner, ffqns.as_ref())
3697                    },
3698                    TxType::Other, // read only
3699                    "get_pending_by_ffqns",
3700                )
3701                .await
3702            else {
3703                trace!(
3704                    "Ignoring get_pending error and waiting in for timeout to avoid executor repolling too soon"
3705                );
3706                timeout_fut.await;
3707                return;
3708            };
3709            if !execution_ids_versions.is_empty() {
3710                trace!("Not waiting, database already contains new pending executions");
3711                return;
3712            }
3713            tokio::select! { // future's liveness: Dropping the loser immediately.
3714                _ = receiver.recv() => {
3715                    trace!("Received a notification");
3716                }
3717                () = timeout_fut => {
3718                }
3719            }
3720        }.await;
3721        // Clean up ffqn_to_pending_subscription in any case
3722        {
3723            let mut pending_subscribers = self.0.pending_subscribers.lock().unwrap();
3724            for ffqn in ffqns.as_ref() {
3725                match pending_subscribers.remove_ffqn(ffqn) {
3726                    Some((_, tag)) if tag == unique_tag => {
3727                        // Cleanup OK.
3728                    }
3729                    Some(other) => {
3730                        // Reinsert foreign sender.
3731                        pending_subscribers.insert_ffqn(ffqn.clone(), other);
3732                    }
3733                    None => {
3734                        // Value was replaced and cleaned up already.
3735                    }
3736                }
3737            }
3738        }
3739    }
3740
3741    // Supports only one subscriber (executor) per component id.
3742    // A new subscriber replaces the old one, which will eventually time out, which is fine.
3743    #[instrument(level = Level::DEBUG, skip(self, timeout_fut))]
3744    async fn wait_for_pending_by_component_digest(
3745        &self,
3746        pending_at_or_sooner: DateTime<Utc>,
3747        component_digest: &InputContentDigest,
3748        timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
3749    ) {
3750        let unique_tag: u64 = rand::random();
3751        let (sender, mut receiver) = mpsc::channel(1); // senders must use `try_send`
3752        {
3753            let mut pending_subscribers = self.0.pending_subscribers.lock().unwrap();
3754            pending_subscribers
3755                .insert_by_component(component_digest.clone(), (sender.clone(), unique_tag));
3756        }
3757        async {
3758            let Ok(execution_ids_versions) = self
3759                .transaction(
3760                    {
3761                        let input_digest = component_digest.clone();
3762                        move |conn| Self::get_pending_by_component_input_digest(conn, 1, pending_at_or_sooner, &input_digest)
3763                    },
3764                    TxType::Other, // read only
3765                    "get_pending_by_component_input_digest",
3766                )
3767                .await
3768            else {
3769                trace!(
3770                    "Ignoring get_pending error and waiting in for timeout to avoid executor repolling too soon"
3771                );
3772                timeout_fut.await;
3773                return;
3774            };
3775            if !execution_ids_versions.is_empty() {
3776                trace!("Not waiting, database already contains new pending executions");
3777                return;
3778            }
3779            tokio::select! { // future's liveness: Dropping the loser immediately.
3780                _ = receiver.recv() => {
3781                    trace!("Received a notification");
3782                }
3783                () = timeout_fut => {
3784                }
3785            }
3786        }.await;
3787        // Clean up ffqn_to_pending_subscription in any case
3788        {
3789            let mut pending_subscribers = self.0.pending_subscribers.lock().unwrap();
3790
3791            match pending_subscribers.remove_by_component(component_digest) {
3792                Some((_, tag)) if tag == unique_tag => {
3793                    // Cleanup OK.
3794                }
3795                Some(other) => {
3796                    // Reinsert foreign sender.
3797                    pending_subscribers.insert_by_component(component_digest.clone(), other);
3798                }
3799                None => {
3800                    // Value was replaced and cleaned up already.
3801                }
3802            }
3803        }
3804    }
3805
3806    async fn get_last_execution_event(
3807        &self,
3808        execution_id: &ExecutionId,
3809    ) -> Result<ExecutionEvent, DbErrorRead> {
3810        let execution_id = execution_id.clone();
3811        self.transaction(
3812            move |tx| Self::get_last_execution_event(tx, &execution_id),
3813            TxType::Other, // read only
3814            "get_last_execution_event",
3815        )
3816        .await
3817    }
3818}
3819
3820#[async_trait]
3821impl DbExternalApi for SqlitePool {
3822    #[instrument(skip(self))]
3823    async fn get_backtrace(
3824        &self,
3825        execution_id: &ExecutionId,
3826        filter: BacktraceFilter,
3827    ) -> Result<BacktraceInfo, DbErrorRead> {
3828        debug!("get_backtrace");
3829        let execution_id = execution_id.clone();
3830
3831        self.transaction(
3832            move |tx| {
3833                let select = "SELECT component_id, version_min_including, version_max_excluding, wasm_backtrace FROM t_execution_backtrace e \
3834                                INNER JOIN t_wasm_backtrace w ON e.backtrace_hash = w.backtrace_hash \
3835                                WHERE execution_id = :execution_id";
3836                let mut params: Vec<(&'static str, Box<dyn rusqlite::ToSql>)> = vec![(":execution_id", Box::new(execution_id.to_string()))];
3837                let select = match &filter {
3838                    BacktraceFilter::Specific(version) =>{
3839                        params.push((":version", Box::new(version.0)));
3840                        format!("{select} AND version_min_including <= :version AND version_max_excluding > :version")
3841                    },
3842                    BacktraceFilter::First => format!("{select} ORDER BY version_min_including LIMIT 1"),
3843                    BacktraceFilter::Last => format!("{select} ORDER BY version_min_including DESC LIMIT 1")
3844                };
3845                tx
3846                    .prepare(&select)
3847                    ?
3848                    .query_row::<_, &[(&'static str, &dyn ToSql)], _>(
3849                        params
3850                            .iter()
3851                            .map(|(key, value)| (*key, value.as_ref()))
3852                            .collect::<Vec<_>>()
3853                            .as_ref(),
3854                    |row| {
3855                        Ok(BacktraceInfo {
3856                            execution_id: execution_id.clone(),
3857                            component_id: row.get::<_, JsonWrapper<_> >("component_id")?.0,
3858                            version_min_including: Version::new(row.get::<_, VersionType>("version_min_including")?),
3859                            version_max_excluding: Version::new(row.get::<_, VersionType>("version_max_excluding")?),
3860                            wasm_backtrace: row.get::<_, JsonWrapper<_>>("wasm_backtrace")?.0,
3861                        })
3862                    },
3863                ).map_err(DbErrorRead::from)
3864            },
3865            TxType::Other, // read only
3866            "get_last_backtrace",
3867        ).await
3868    }
3869
3870    #[instrument(skip(self))]
3871    async fn list_executions(
3872        &self,
3873        filter: ListExecutionsFilter,
3874        pagination: ExecutionListPagination,
3875    ) -> Result<Vec<ExecutionWithState>, DbErrorGeneric> {
3876        self.transaction(
3877            move |tx| Self::list_executions(tx, &filter, &pagination),
3878            TxType::Other, // read only
3879            "list_executions",
3880        )
3881        .await
3882        .map_err(to_generic_error)
3883    }
3884
3885    #[instrument(skip(self))]
3886    async fn list_execution_events(
3887        &self,
3888        execution_id: &ExecutionId,
3889        pagination: Pagination<VersionType>,
3890        include_backtrace_id: bool,
3891    ) -> Result<ListExecutionEventsResponse, DbErrorRead> {
3892        let execution_id = execution_id.clone();
3893        self.transaction(
3894            move |tx| {
3895                let events = Self::list_execution_events(
3896                    tx,
3897                    &execution_id,
3898                    pagination,
3899                    include_backtrace_id,
3900                )?;
3901                let max_version = Self::get_max_version(tx, &execution_id)?;
3902                Ok(ListExecutionEventsResponse {
3903                    events,
3904                    max_version,
3905                })
3906            },
3907            TxType::Other, // read only
3908            "get",
3909        )
3910        .await
3911    }
3912
3913    #[instrument(skip(self))]
3914    async fn list_responses(
3915        &self,
3916        execution_id: &ExecutionId,
3917        pagination: Pagination<u32>,
3918    ) -> Result<ListResponsesResponse, DbErrorRead> {
3919        let execution_id = execution_id.clone();
3920        self.transaction(
3921            move |tx| {
3922                let responses = Self::list_responses(tx, &execution_id, Some(pagination))?;
3923                let max_cursor = Self::get_max_response_cursor(tx, &execution_id)?;
3924                Ok(ListResponsesResponse {
3925                    responses,
3926                    max_cursor,
3927                })
3928            },
3929            TxType::Other, // read only
3930            "list_responses",
3931        )
3932        .await
3933    }
3934
3935    #[instrument(skip(self))]
3936    async fn list_execution_events_responses(
3937        &self,
3938        execution_id: &ExecutionId,
3939        req_since: &Version,
3940        req_max_length: VersionType,
3941        req_include_backtrace_id: bool,
3942        resp_pagination: Pagination<u32>,
3943    ) -> Result<ExecutionWithStateRequestsResponses, DbErrorRead> {
3944        let execution_id = execution_id.clone();
3945        let req_since = req_since.0;
3946        self.transaction(
3947            move |tx| {
3948                let combined_state = Self::get_combined_state(tx, &execution_id)?;
3949                let events = Self::list_execution_events(
3950                    tx,
3951                    &execution_id,
3952                    Pagination::NewerThan {
3953                        length: req_max_length
3954                            .try_into()
3955                            .expect("req_max_length fits in u16"),
3956                        cursor: req_since,
3957                        including_cursor: true,
3958                    },
3959                    req_include_backtrace_id,
3960                )?;
3961                let responses = Self::list_responses(tx, &execution_id, Some(resp_pagination))?;
3962                let max_version = Self::get_max_version(tx, &execution_id)?;
3963                let max_cursor = Self::get_max_response_cursor(tx, &execution_id)?;
3964                Ok(ExecutionWithStateRequestsResponses {
3965                    execution_with_state: combined_state.execution_with_state,
3966                    events,
3967                    responses,
3968                    max_version,
3969                    max_cursor,
3970                })
3971            },
3972            TxType::Other, // read only
3973            "list_execution_events_responses",
3974        )
3975        .await
3976    }
3977
3978    #[instrument(skip(self))]
3979    async fn upgrade_execution_component(
3980        &self,
3981        execution_id: &ExecutionId,
3982        old: &InputContentDigest,
3983        new: &InputContentDigest,
3984    ) -> Result<(), DbErrorWrite> {
3985        let execution_id = execution_id.clone();
3986        let old = old.clone();
3987        let new = new.clone();
3988        self.transaction(
3989            move |tx| Self::upgrade_execution_component_single_write(tx, &execution_id, &old, &new),
3990            TxType::Other, // single write
3991            "upgrade_execution_component",
3992        )
3993        .await
3994    }
3995
3996    #[instrument(skip(self))]
3997    async fn list_logs(
3998        &self,
3999        execution_id: &ExecutionId,
4000        filter: LogFilter,
4001        pagination: Pagination<u32>,
4002    ) -> Result<ListLogsResponse, DbErrorRead> {
4003        let execution_id = execution_id.clone();
4004        self.transaction(
4005            move |tx| Self::list_logs_tx(tx, &execution_id, &filter, &pagination),
4006            TxType::Other, // read only
4007            "list_logs",
4008        )
4009        .await
4010    }
4011
4012    #[instrument(skip(self))]
4013    async fn list_deployment_states(
4014        &self,
4015        current_time: DateTime<Utc>,
4016        pagination: Pagination<Option<DeploymentId>>,
4017    ) -> Result<Vec<DeploymentState>, DbErrorRead> {
4018        self.transaction(
4019            move |tx| Self::list_deployment_states(tx, current_time, pagination),
4020            TxType::Other, // read only
4021            "list_deployment_states",
4022        )
4023        .await
4024    }
4025
4026    #[instrument(skip(self))]
4027    async fn pause_execution(
4028        &self,
4029        execution_id: &ExecutionId,
4030        paused_at: DateTime<Utc>,
4031    ) -> Result<AppendResponse, DbErrorWrite> {
4032        let execution_id = execution_id.clone();
4033        self.transaction(
4034            move |tx| SqlitePool::pause_execution(tx, &execution_id, paused_at),
4035            TxType::MultipleWrites,
4036            "pause_execution",
4037        )
4038        .await
4039    }
4040
4041    #[instrument(skip(self))]
4042    async fn unpause_execution(
4043        &self,
4044        execution_id: &ExecutionId,
4045        unpaused_at: DateTime<Utc>,
4046    ) -> Result<AppendResponse, DbErrorWrite> {
4047        let execution_id = execution_id.clone();
4048        self.transaction(
4049            move |tx| SqlitePool::unpause_execution(tx, &execution_id, unpaused_at),
4050            TxType::MultipleWrites,
4051            "unpause_execution",
4052        )
4053        .await
4054    }
4055}
4056
4057#[async_trait]
4058impl DbConnection for SqlitePool {
4059    #[instrument(level = Level::DEBUG, skip_all, fields(execution_id = %req.execution_id))]
4060    async fn create(&self, req: CreateRequest) -> Result<AppendResponse, DbErrorWrite> {
4061        debug!("create");
4062        trace!(?req, "create");
4063        let created_at = req.created_at;
4064        let (version, notifier) = self
4065            .transaction(
4066                move |tx| Self::create_inner(tx, req.clone()),
4067                TxType::MultipleWrites,
4068                "create",
4069            )
4070            .await?;
4071        self.notify_all(vec![notifier], created_at);
4072        Ok(version)
4073    }
4074
4075    #[instrument(level = Level::DEBUG, skip(self))]
4076    async fn get(
4077        &self,
4078        execution_id: &ExecutionId,
4079    ) -> Result<concepts::storage::ExecutionLog, DbErrorRead> {
4080        trace!("get");
4081        let execution_id = execution_id.clone();
4082        self.transaction(
4083            move |tx| Self::get(tx, &execution_id),
4084            TxType::Other, // read only
4085            "get",
4086        )
4087        .await
4088    }
4089
4090    #[instrument(level = Level::DEBUG, skip(self, batch))]
4091    async fn append_batch(
4092        &self,
4093        current_time: DateTime<Utc>,
4094        batch: Vec<AppendRequest>,
4095        execution_id: ExecutionId,
4096        version: Version,
4097    ) -> Result<AppendBatchResponse, DbErrorWrite> {
4098        debug!("append_batch");
4099        trace!(?batch, "append_batch");
4100        assert!(!batch.is_empty(), "Empty batch request");
4101
4102        let (version, notifier) = self
4103            .transaction(
4104                move |tx| {
4105                    let mut version = version.clone();
4106                    let mut notifier = None;
4107                    for append_request in &batch {
4108                        let (v, n) =
4109                            Self::append(tx, &execution_id, append_request.clone(), version)?;
4110                        version = v;
4111                        notifier = Some(n);
4112                    }
4113                    Ok::<_, DbErrorWrite>((
4114                        version,
4115                        notifier.expect("checked that the batch is not empty"),
4116                    ))
4117                },
4118                TxType::MultipleWrites,
4119                "append_batch",
4120            )
4121            .await?;
4122
4123        self.notify_all(vec![notifier], current_time);
4124        Ok(version)
4125    }
4126
4127    #[instrument(level = Level::DEBUG, skip_all, fields(%execution_id, %version))]
4128    async fn append_batch_create_new_execution(
4129        &self,
4130        current_time: DateTime<Utc>,
4131        batch: Vec<AppendRequest>,
4132        execution_id: ExecutionId,
4133        version: Version,
4134        child_req: Vec<CreateRequest>,
4135        backtraces: Vec<BacktraceInfo>,
4136    ) -> Result<AppendBatchResponse, DbErrorWrite> {
4137        debug!("append_batch_create_new_execution");
4138        trace!(?batch, ?child_req, "append_batch_create_new_execution");
4139        assert!(!batch.is_empty(), "Empty batch request");
4140
4141        let (version, notifiers) = self
4142            .transaction(
4143                move |tx| {
4144                    let mut notifier = None;
4145                    let mut version = version.clone();
4146                    for append_request in &batch {
4147                        let (v, n) =
4148                            Self::append(tx, &execution_id, append_request.clone(), version)?;
4149                        version = v;
4150                        notifier = Some(n);
4151                    }
4152                    let mut notifiers = Vec::new();
4153                    notifiers.push(notifier.expect("checked that the batch is not empty"));
4154
4155                    for child_req in &child_req {
4156                        let (_, notifier) = Self::create_inner(tx, child_req.clone())?;
4157                        notifiers.push(notifier);
4158                    }
4159                    Ok::<_, DbErrorWrite>((version, notifiers))
4160                },
4161                TxType::MultipleWrites,
4162                "append_batch_create_new_execution_inner",
4163            )
4164            .await?;
4165        self.notify_all(notifiers, current_time);
4166        self.transaction_fire_forget(
4167            move |tx| {
4168                for backtrace in &backtraces {
4169                    Self::append_backtrace(tx, backtrace)?;
4170                }
4171                Ok::<_, DbErrorWrite>(())
4172            },
4173            "append_batch_create_new_execution_append_backtrace",
4174        )
4175        .await;
4176        Ok(version)
4177    }
4178
4179    // Supports only one subscriber per execution id.
4180    // A new call will overwrite the old subscriber, the old one will end
4181    // with a timeout, which is fine.
4182    #[instrument(level = Level::DEBUG, skip(self, timeout_fut))]
4183    async fn subscribe_to_next_responses(
4184        &self,
4185        execution_id: &ExecutionId,
4186        last_response: ResponseCursor,
4187        timeout_fut: Pin<Box<dyn Future<Output = TimeoutOutcome> + Send>>,
4188    ) -> Result<Vec<ResponseWithCursor>, DbErrorReadWithTimeout> {
4189        debug!("next_responses");
4190        let unique_tag: u64 = rand::random();
4191        let execution_id = execution_id.clone();
4192
4193        let cleanup = || {
4194            let mut guard = self.0.response_subscribers.lock().unwrap();
4195            match guard.remove(&execution_id) {
4196                Some((_, tag)) if tag == unique_tag => {} // Cleanup OK.
4197                Some(other) => {
4198                    // Reinsert foreign sender.
4199                    guard.insert(execution_id.clone(), other);
4200                }
4201                None => {} // Value was replaced and cleaned up already, or notification was sent.
4202            }
4203        };
4204
4205        let response_subscribers = self.0.response_subscribers.clone();
4206        let resp_or_receiver = {
4207            let execution_id = execution_id.clone();
4208            self.transaction(
4209                move |tx| {
4210                    let responses = Self::get_responses_after(tx, &execution_id, last_response)?;
4211                    if responses.is_empty() {
4212                        // cannot race as we have the transaction write lock
4213                        let (sender, receiver) = oneshot::channel();
4214                        response_subscribers
4215                            .lock()
4216                            .unwrap()
4217                            .insert(execution_id.clone(), (sender, unique_tag));
4218                        Ok::<_, DbErrorReadWithTimeout>(itertools::Either::Right(receiver))
4219                    } else {
4220                        Ok(itertools::Either::Left(responses))
4221                    }
4222                },
4223                TxType::Other, // read only
4224                "subscribe_to_next_responses",
4225            )
4226            .await
4227        }
4228        .inspect_err(|_| {
4229            cleanup();
4230        })?;
4231        match resp_or_receiver {
4232            itertools::Either::Left(resp) => Ok(resp), // no need for cleanup
4233            itertools::Either::Right(receiver) => {
4234                let res = tokio::select! {
4235                    resp = receiver => {
4236                        match resp {
4237                            Ok(resp) => Ok(vec![resp]),
4238                            Err(_) => Err(DbErrorReadWithTimeout::from(DbErrorGeneric::Close)),
4239                        }
4240                    }
4241                    outcome = timeout_fut => Err(DbErrorReadWithTimeout::Timeout(outcome)),
4242                };
4243                cleanup();
4244                res
4245            }
4246        }
4247    }
4248
4249    // Supports multiple subscribers.
4250    #[instrument(level = Level::DEBUG, skip(self, timeout_fut))]
4251    async fn wait_for_finished_result(
4252        &self,
4253        execution_id: &ExecutionId,
4254        timeout_fut: Option<Pin<Box<dyn Future<Output = TimeoutOutcome> + Send>>>,
4255    ) -> Result<SupportedFunctionReturnValue, DbErrorReadWithTimeout> {
4256        let unique_tag: u64 = rand::random();
4257        let execution_id = execution_id.clone();
4258        let execution_finished_subscription = self.0.execution_finished_subscribers.clone();
4259
4260        let cleanup = || {
4261            let mut guard = self.0.execution_finished_subscribers.lock().unwrap();
4262            if let Some(subscribers) = guard.get_mut(&execution_id) {
4263                subscribers.remove(&unique_tag);
4264            }
4265        };
4266
4267        let resp_or_receiver = {
4268            let execution_id = execution_id.clone();
4269            self.transaction(move |tx| {
4270                let pending_state =
4271                    Self::get_combined_state(tx, &execution_id)?.execution_with_state.pending_state;
4272                if let PendingState::Finished(finished) = pending_state {
4273                    let event =
4274                        Self::get_execution_event(tx, &execution_id, finished.version)?;
4275                    if let ExecutionRequest::Finished { result, ..} = event.event {
4276                        Ok(itertools::Either::Left(result))
4277                    } else {
4278                        error!("Mismatch, expected Finished row: {event:?} based on t_state {finished}");
4279                        Err(DbErrorReadWithTimeout::from(consistency_db_err(
4280                            "cannot get finished event based on t_state version"
4281                        )))
4282                    }
4283                } else {
4284                    // Cannot race with the notifier as we have the transaction write lock:
4285                    // Either the finished event was appended previously, thus `itertools::Either::Left` was selected,
4286                    // or we end up here. If this tx fails, the cleanup will remove this entry.
4287                    let (sender, receiver) = oneshot::channel();
4288                    let mut guard = execution_finished_subscription.lock().unwrap();
4289                    guard.entry(execution_id.clone()).or_default().insert(unique_tag, sender);
4290                    Ok(itertools::Either::Right(receiver))
4291                }
4292            },
4293            TxType::Other, // read only
4294            "wait_for_finished_result")
4295            .await
4296        }
4297        .inspect_err(|_| {
4298            // This cleanup can race with the notification sender, since both are running after a transaction was finished.
4299            // If the notification sender wins, it removes our oneshot sender and puts a value in it, cleanup will not find the unique tag.
4300            // If cleanup wins, it simply removes the oneshot sender.
4301            cleanup();
4302        })?;
4303
4304        let timeout_fut = timeout_fut.unwrap_or_else(|| Box::pin(std::future::pending()));
4305        match resp_or_receiver {
4306            itertools::Either::Left(resp) => Ok(resp), // no need for cleanup
4307            itertools::Either::Right(receiver) => {
4308                let res = tokio::select! {
4309                    resp = receiver => {
4310                        match resp {
4311                            Ok(retval) => Ok(retval),
4312                            Err(_recv_err) => Err(DbErrorGeneric::Close.into())
4313                        }
4314                    }
4315                    outcome = timeout_fut => Err(DbErrorReadWithTimeout::Timeout(outcome)),
4316                };
4317                cleanup();
4318                res
4319            }
4320        }
4321    }
4322
4323    #[instrument(level = Level::DEBUG, skip_all, fields(%join_set_id, %execution_id))]
4324    async fn append_delay_response(
4325        &self,
4326        created_at: DateTime<Utc>,
4327        execution_id: ExecutionId,
4328        join_set_id: JoinSetId,
4329        delay_id: DelayId,
4330        result: Result<(), ()>,
4331    ) -> Result<AppendDelayResponseOutcome, DbErrorWrite> {
4332        debug!("append_delay_response");
4333        let event = JoinSetResponseEventOuter {
4334            created_at,
4335            event: JoinSetResponseEvent {
4336                join_set_id,
4337                event: JoinSetResponse::DelayFinished {
4338                    delay_id: delay_id.clone(),
4339                    result,
4340                },
4341            },
4342        };
4343        let res = self
4344            .transaction(
4345                {
4346                    let execution_id = execution_id.clone();
4347                    move |tx| Self::append_response(tx, &execution_id, event.clone())
4348                },
4349                TxType::MultipleWrites,
4350                "append_delay_response",
4351            )
4352            .await;
4353        match res {
4354            Ok(notifier) => {
4355                self.notify_all(vec![notifier], created_at);
4356                Ok(AppendDelayResponseOutcome::Success)
4357            }
4358            Err(DbErrorWrite::NonRetriable(DbErrorWriteNonRetriable::Conflict)) => {
4359                let delay_success = self
4360                    .transaction(
4361                        move |tx| Self::get_delay_response(tx, &execution_id, &delay_id),
4362                        TxType::Other, // read only
4363                        "get_delay_response",
4364                    )
4365                    .await?;
4366                match delay_success {
4367                    Some(true) => Ok(AppendDelayResponseOutcome::AlreadyFinished),
4368                    Some(false) => Ok(AppendDelayResponseOutcome::AlreadyCancelled),
4369                    None => Err(DbErrorWrite::Generic(DbErrorGeneric::Uncategorized {
4370                        reason: "insert failed yet select did not find the response".into(),
4371                        context: SpanTrace::capture(),
4372                        source: None,
4373                        loc: Location::caller(),
4374                    })),
4375                }
4376            }
4377            Err(err) => Err(err),
4378        }
4379    }
4380
4381    #[instrument(level = Level::DEBUG, skip_all)]
4382    async fn append_backtrace(&self, append: BacktraceInfo) -> Result<(), DbErrorWrite> {
4383        trace!("append_backtrace");
4384        self.transaction_fire_forget(
4385            move |tx| Self::append_backtrace(tx, &append),
4386            "append_backtrace",
4387        )
4388        .await;
4389        Ok(())
4390    }
4391
4392    #[instrument(level = Level::DEBUG, skip_all)]
4393    async fn append_backtrace_batch(&self, batch: Vec<BacktraceInfo>) -> Result<(), DbErrorWrite> {
4394        trace!("append_backtrace_batch");
4395        self.transaction_fire_forget(
4396            move |tx| {
4397                for append in &batch {
4398                    Self::append_backtrace(tx, append)?;
4399                }
4400                Ok::<_, DbErrorWrite>(())
4401            },
4402            "append_backtrace_batch",
4403        )
4404        .await;
4405        Ok(())
4406    }
4407
4408    #[instrument(level = Level::DEBUG, skip_all)]
4409    async fn append_log(&self, row: LogInfoAppendRow) -> Result<(), DbErrorWrite> {
4410        trace!("append_log");
4411        self.transaction_fire_forget(move |tx| Self::append_log(tx, &row), "append_log")
4412            .await;
4413        Ok(())
4414    }
4415
4416    #[instrument(level = Level::DEBUG, skip_all)]
4417    async fn append_log_batch(&self, batch: &[LogInfoAppendRow]) -> Result<(), DbErrorWrite> {
4418        trace!("append_log_batch");
4419        let batch = Vec::from(batch);
4420        self.transaction_fire_forget(
4421            move |tx| {
4422                for row in &batch {
4423                    Self::append_log(tx, row)?;
4424                }
4425                Ok::<_, DbErrorWrite>(())
4426            },
4427            "append_log_batch",
4428        )
4429        .await;
4430        Ok(())
4431    }
4432
4433    /// Get currently expired delays and locks.
4434    #[instrument(level = Level::TRACE, skip(self))]
4435    async fn get_expired_timers(
4436        &self,
4437        at: DateTime<Utc>,
4438    ) -> Result<Vec<ExpiredTimer>, DbErrorGeneric> {
4439        self.transaction(
4440            move |conn| {
4441                let mut expired_timers = conn.prepare(
4442                    "SELECT execution_id, join_set_id, delay_id FROM t_delay WHERE expires_at <= :at",
4443                )?
4444                .query_map(
4445                        named_params! {
4446                            ":at": at,
4447                        },
4448                        |row| {
4449                            let execution_id = row.get("execution_id")?;
4450                            let join_set_id = row.get::<_, JoinSetId>("join_set_id")?;
4451                            let delay_id = row.get::<_, DelayId>("delay_id")?;
4452                            let delay = ExpiredDelay { execution_id, join_set_id, delay_id };
4453                            Ok(ExpiredTimer::Delay(delay))
4454                        },
4455                    )?
4456                    .collect::<Result<Vec<_>, _>>()?;
4457                // Extend with expired locks
4458                let expired = conn.prepare(&format!(r#"
4459                    SELECT execution_id, last_lock_version, corresponding_version, intermittent_event_count, max_retries, retry_exp_backoff_millis,
4460                    executor_id, run_id
4461                    FROM t_state
4462                    WHERE pending_expires_finished <= :at AND state = "{STATE_LOCKED}"
4463                    "#
4464                )
4465                )?
4466                .query_map(
4467                        named_params! {
4468                            ":at": at,
4469                        },
4470                        |row| {
4471                            let execution_id = row.get("execution_id")?;
4472                            let locked_at_version = Version::new(row.get("last_lock_version")?);
4473                            let next_version = Version::new(row.get("corresponding_version")?).increment();
4474                            let intermittent_event_count = row.get("intermittent_event_count")?;
4475                            let max_retries = row.get("max_retries")?;
4476                            let retry_exp_backoff_millis = u64::from(row.get::<_, u32>("retry_exp_backoff_millis")?);
4477                            let executor_id = row.get("executor_id")?;
4478                            let run_id = row.get("run_id")?;
4479                            let lock = ExpiredLock {
4480                                execution_id,
4481                                locked_at_version,
4482                                next_version,
4483                                intermittent_event_count,
4484                                max_retries,
4485                                retry_exp_backoff: Duration::from_millis(retry_exp_backoff_millis),
4486                                locked_by: LockedBy { executor_id, run_id },
4487                            };
4488                            Ok(ExpiredTimer::Lock(lock))
4489                        }
4490                    )?
4491                    .collect::<Result<Vec<_>, _>>()?;
4492                expired_timers.extend(expired);
4493                if !expired_timers.is_empty() {
4494                    debug!("get_expired_timers found {expired_timers:?}");
4495                }
4496                Ok(expired_timers)
4497            },
4498            TxType::Other, // read only
4499            "get_expired_timers"
4500        )
4501        .await
4502        .map_err(to_generic_error)
4503    }
4504
4505    async fn get_execution_event(
4506        &self,
4507        execution_id: &ExecutionId,
4508        version: &Version,
4509    ) -> Result<ExecutionEvent, DbErrorRead> {
4510        let version = version.0;
4511        let execution_id = execution_id.clone();
4512        self.transaction(
4513            move |tx| Self::get_execution_event(tx, &execution_id, version),
4514            TxType::Other, // read only
4515            "get_execution_event",
4516        )
4517        .await
4518    }
4519
4520    async fn get_pending_state(
4521        &self,
4522        execution_id: &ExecutionId,
4523    ) -> Result<ExecutionWithState, DbErrorRead> {
4524        let execution_id = execution_id.clone();
4525        Ok(self
4526            .transaction(
4527                move |tx| Self::get_combined_state(tx, &execution_id),
4528                TxType::Other, // read only
4529                "get_pending_state",
4530            )
4531            .await?
4532            .execution_with_state)
4533    }
4534}
4535
4536#[cfg(feature = "test")]
4537#[async_trait]
4538impl concepts::storage::DbConnectionTest for SqlitePool {
4539    #[instrument(level = Level::DEBUG, skip(self, response_event), fields(join_set_id = %response_event.join_set_id))]
4540    async fn append_response(
4541        &self,
4542        created_at: DateTime<Utc>,
4543        execution_id: ExecutionId,
4544        response_event: JoinSetResponseEvent,
4545    ) -> Result<(), DbErrorWrite> {
4546        debug!("append_response");
4547        let event = JoinSetResponseEventOuter {
4548            created_at,
4549            event: response_event,
4550        };
4551        let notifier = self
4552            .transaction(
4553                move |tx| Self::append_response(tx, &execution_id, event.clone()),
4554                TxType::Other, // read only
4555                "append_response",
4556            )
4557            .await?;
4558        self.notify_all(vec![notifier], created_at);
4559        Ok(())
4560    }
4561}
4562
4563#[cfg(any(test, feature = "tempfile"))]
4564pub mod tempfile {
4565    use super::{SqliteConfig, SqlitePool};
4566    use tempfile::NamedTempFile;
4567
4568    pub async fn sqlite_pool() -> (SqlitePool, Option<NamedTempFile>) {
4569        if let Ok(path) = std::env::var("SQLITE_FILE") {
4570            (
4571                SqlitePool::new(path, SqliteConfig::default())
4572                    .await
4573                    .unwrap(),
4574                None,
4575            )
4576        } else {
4577            let file = NamedTempFile::new().unwrap();
4578            let path = file.path();
4579            (
4580                SqlitePool::new(path, SqliteConfig::default())
4581                    .await
4582                    .unwrap(),
4583                Some(file),
4584            )
4585        }
4586    }
4587}
4588
4589#[cfg(test)]
4590mod tests {
4591    use crate::sqlite_dao::{SqlitePool, TxType, tempfile::sqlite_pool};
4592    use assert_matches::assert_matches;
4593    use chrono::DateTime;
4594    use concepts::{
4595        ComponentId, FunctionFqn, Params,
4596        prefixed_ulid::{DEPLOYMENT_ID_DUMMY, EXECUTION_ID_DUMMY},
4597        storage::{CreateRequest, DbErrorWrite, DbErrorWriteNonRetriable, DbPoolCloseable},
4598    };
4599    use rusqlite::named_params;
4600
4601    const SOME_FFQN: FunctionFqn = FunctionFqn::new_static("ns:pkg/ifc", "fn");
4602
4603    #[tokio::test]
4604    async fn failing_ltx_should_be_rolled_back() -> Result<(), DbErrorWrite> {
4605        let created_at = DateTime::from_timestamp_nanos(0);
4606        let (pool, _guard) = sqlite_pool().await;
4607        pool.transaction(
4608            move |tx| {
4609                let req = CreateRequest {
4610                    created_at,
4611                    execution_id: EXECUTION_ID_DUMMY,
4612                    ffqn: SOME_FFQN,
4613                    params: Params::empty(),
4614                    parent: None,
4615                    metadata: concepts::ExecutionMetadata::empty(),
4616                    scheduled_at: created_at,
4617                    component_id: ComponentId::dummy_activity(),
4618                    deployment_id: DEPLOYMENT_ID_DUMMY,
4619                    scheduled_by: None,
4620                };
4621                SqlitePool::create_inner(tx, req)?;
4622                SqlitePool::pause_execution(tx, &EXECUTION_ID_DUMMY, created_at)?;
4623                Ok::<_, DbErrorWrite>(())
4624            },
4625            TxType::MultipleWrites,
4626            "create_inner + pause_execution",
4627        )
4628        .await?;
4629
4630        // Second tx should fail
4631        let err = pool
4632            .transaction(
4633                move |tx| SqlitePool::pause_execution(tx, &EXECUTION_ID_DUMMY, created_at),
4634                TxType::MultipleWrites,
4635                "pause_execution",
4636            )
4637            .await
4638            .unwrap_err();
4639        let reason = assert_matches!(err, DbErrorWrite::NonRetriable(DbErrorWriteNonRetriable::IllegalState { reason, .. }) => reason);
4640        assert_eq!("cannot pause, execution is already paused", reason.as_ref());
4641
4642        let events = pool.transaction(
4643            move |tx| {
4644                let events =
4645                    tx.prepare(
4646                        "SELECT created_at, json_value, version FROM t_execution_log WHERE execution_id = :execution_id",
4647                    )?
4648                    .query_map(
4649                        named_params! {
4650                            ":execution_id": EXECUTION_ID_DUMMY.to_string(),
4651                        },
4652                        SqlitePool::map_t_execution_log_row,
4653                    )
4654                    .map_err(DbErrorWrite::from)?
4655                    .collect::<Result<Vec<_>, _>>()?;
4656
4657                Ok::<_, DbErrorWrite>(events)
4658            },
4659            TxType::Other, // read only
4660            "get_log",
4661        )
4662        .await?;
4663        assert_eq!(2, events.len());
4664        pool.close().await;
4665        Ok(())
4666    }
4667}