obeli_sk_db_sqlite/
sqlite_dao.rs

1use crate::histograms::Histograms;
2use async_trait::async_trait;
3use chrono::{DateTime, Utc};
4use concepts::{
5    ComponentId, ComponentRetryConfig, ExecutionId, FunctionFqn, JoinSetId, StrVariant,
6    SupportedFunctionReturnValue,
7    component_id::InputContentDigest,
8    prefixed_ulid::{DelayId, ExecutionIdDerived, ExecutorId, RunId},
9    storage::{
10        AppendBatchResponse, AppendDelayResponseOutcome, AppendEventsToExecution, AppendRequest,
11        AppendResponse, AppendResponseToExecution, BacktraceFilter, BacktraceInfo, CreateRequest,
12        DUMMY_CREATED, DUMMY_HISTORY_EVENT, DbConnection, DbErrorGeneric, DbErrorRead,
13        DbErrorReadWithTimeout, DbErrorWrite, DbErrorWriteNonRetriable, DbExecutor, DbExternalApi,
14        DbPool, DbPoolCloseable, ExecutionEvent, ExecutionListPagination, ExecutionRequest,
15        ExecutionWithState, ExpiredDelay, ExpiredLock, ExpiredTimer, HISTORY_EVENT_TYPE_JOIN_NEXT,
16        HistoryEvent, JoinSetRequest, JoinSetResponse, JoinSetResponseEvent,
17        JoinSetResponseEventOuter, LockPendingResponse, Locked, LockedBy, LockedExecution,
18        Pagination, PendingState, PendingStateFinished, PendingStateFinishedResultKind,
19        PendingStateLocked, ResponseWithCursor, STATE_BLOCKED_BY_JOIN_SET, STATE_FINISHED,
20        STATE_LOCKED, STATE_PENDING_AT, TimeoutOutcome, Version, VersionType,
21    },
22};
23use conversions::{JsonWrapper, consistency_db_err, consistency_rusqlite};
24use hashbrown::HashMap;
25use rusqlite::{
26    CachedStatement, Connection, OpenFlags, OptionalExtension, Params, ToSql, Transaction,
27    TransactionBehavior, named_params, types::ToSqlOutput,
28};
29use std::{
30    cmp::max,
31    collections::VecDeque,
32    fmt::Debug,
33    ops::DerefMut,
34    path::Path,
35    str::FromStr,
36    sync::{
37        Arc, Mutex,
38        atomic::{AtomicBool, Ordering},
39    },
40    time::Duration,
41};
42use std::{fmt::Write as _, pin::Pin};
43use tokio::{
44    sync::{mpsc, oneshot},
45    time::Instant,
46};
47use tracing::{Level, debug, error, info, instrument, trace, warn};
48
49#[derive(Debug, thiserror::Error)]
50#[error("initialization error")]
51pub struct InitializationError;
52
53#[derive(Debug, Clone)]
54struct DelayReq {
55    join_set_id: JoinSetId,
56    delay_id: DelayId,
57    expires_at: DateTime<Utc>,
58}
59/*
60mmap_size = 128MB - Set the global memory map so all processes can share some data
61https://www.sqlite.org/pragma.html#pragma_mmap_size
62https://www.sqlite.org/mmap.html
63
64journal_size_limit = 64 MB - limit on the WAL file to prevent unlimited growth
65https://www.sqlite.org/pragma.html#pragma_journal_size_limit
66
67Inspired by https://github.com/rails/rails/pull/49349
68*/
69
70const PRAGMA: [[&str; 2]; 10] = [
71    ["journal_mode", "wal"],
72    ["synchronous", "FULL"],
73    ["foreign_keys", "true"],
74    ["busy_timeout", "1000"],
75    ["cache_size", "10000"], // number of pages
76    ["temp_store", "MEMORY"],
77    ["page_size", "8192"], // 8 KB
78    ["mmap_size", "134217728"],
79    ["journal_size_limit", "67108864"],
80    ["integrity_check", ""],
81];
82
83// Append only
84const CREATE_TABLE_T_METADATA: &str = r"
85CREATE TABLE IF NOT EXISTS t_metadata (
86    id INTEGER PRIMARY KEY AUTOINCREMENT,
87    schema_version INTEGER NOT NULL,
88    created_at TEXT NOT NULL
89) STRICT
90";
91const T_METADATA_EXPECTED_SCHEMA_VERSION: u32 = 5;
92
93/// Stores execution history. Append only.
94const CREATE_TABLE_T_EXECUTION_LOG: &str = r"
95CREATE TABLE IF NOT EXISTS t_execution_log (
96    execution_id TEXT NOT NULL,
97    created_at TEXT NOT NULL,
98    json_value TEXT NOT NULL,
99    version INTEGER NOT NULL,
100    variant TEXT NOT NULL,
101    join_set_id TEXT,
102    history_event_type TEXT GENERATED ALWAYS AS (json_value->>'$.HistoryEvent.event.type') STORED,
103    PRIMARY KEY (execution_id, version)
104) STRICT
105";
106// Used in `fetch_created` and `get_execution_event`
107const CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_VERSION: &str = r"
108CREATE INDEX IF NOT EXISTS idx_t_execution_log_execution_id_version  ON t_execution_log (execution_id, version);
109";
110// Used in `lock_inner` to filter by execution ID and variant (created or event history)
111const CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_VARIANT: &str = r"
112CREATE INDEX IF NOT EXISTS idx_t_execution_log_execution_id_variant  ON t_execution_log (execution_id, variant);
113";
114
115// Used in `count_join_next`
116const CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_JOIN_SET: &str = const_format::formatcp!(
117    "CREATE INDEX IF NOT EXISTS idx_t_execution_log_execution_id_join_set  ON t_execution_log (execution_id, join_set_id, history_event_type) WHERE history_event_type=\"{}\";",
118    HISTORY_EVENT_TYPE_JOIN_NEXT
119);
120
121/// Stores child execution return values for the parent (`execution_id`). Append only.
122/// For `JoinSetResponse::DelayFinished`, columns `delay_id`,`delay_success` must not not null.
123/// For `JoinSetResponse::ChildExecutionFinished`, columns `child_execution_id`,`finished_version`
124/// must not be null.
125const CREATE_TABLE_T_JOIN_SET_RESPONSE: &str = r"
126CREATE TABLE IF NOT EXISTS t_join_set_response (
127    id INTEGER PRIMARY KEY AUTOINCREMENT,
128    created_at TEXT NOT NULL,
129    execution_id TEXT NOT NULL,
130    join_set_id TEXT NOT NULL,
131
132    delay_id TEXT,
133    delay_success INTEGER,
134
135    child_execution_id TEXT,
136    finished_version INTEGER,
137
138    UNIQUE (execution_id, join_set_id, delay_id, child_execution_id)
139) STRICT
140";
141// Used when querying for the next response
142const CREATE_INDEX_IDX_T_JOIN_SET_RESPONSE_EXECUTION_ID_ID: &str = r"
143CREATE INDEX IF NOT EXISTS idx_t_join_set_response_execution_id_id ON t_join_set_response (execution_id, id);
144";
145// Child execution id must be unique.
146const CREATE_INDEX_IDX_JOIN_SET_RESPONSE_UNIQUE_CHILD_ID: &str = r"
147CREATE UNIQUE INDEX IF NOT EXISTS idx_join_set_response_unique_child_id
148ON t_join_set_response (child_execution_id) WHERE child_execution_id IS NOT NULL;
149";
150// Delay id must be unique.
151const CREATE_INDEX_IDX_JOIN_SET_RESPONSE_UNIQUE_DELAY_ID: &str = r"
152CREATE UNIQUE INDEX IF NOT EXISTS idx_join_set_response_unique_delay_id
153ON t_join_set_response (delay_id) WHERE delay_id IS NOT NULL;
154";
155
156/// Stores executions in `PendingState`
157/// `state` to column mapping:
158/// `PendingAt`:            (nothing but required columns), optionally contains all `Locked` columns.
159/// `Locked`:               `max_retries`, `retry_exp_backoff_millis`, `last_lock_version`, `executor_id`, `run_id`
160/// `BlockedByJoinSet`:     `join_set_id`, `join_set_closing`
161/// `Finished` :            `result_kind`.
162///
163/// `pending_expires_finished` - either pending at, lock expires at or finished at based on state.
164///
165const CREATE_TABLE_T_STATE: &str = r"
166CREATE TABLE IF NOT EXISTS t_state (
167    execution_id TEXT NOT NULL,
168    is_top_level INTEGER NOT NULL,
169    corresponding_version INTEGER NOT NULL,
170    ffqn TEXT NOT NULL,
171    created_at TEXT NOT NULL,
172    component_id_input_digest BLOB NOT NULL,
173    first_scheduled_at TEXT NOT NULL,
174
175    pending_expires_finished TEXT NOT NULL,
176    state TEXT NOT NULL,
177    updated_at TEXT NOT NULL,
178    intermittent_event_count INTEGER NOT NULL,
179
180    max_retries INTEGER,
181    retry_exp_backoff_millis INTEGER,
182    last_lock_version INTEGER,
183    executor_id TEXT,
184    run_id TEXT,
185
186    join_set_id TEXT,
187    join_set_closing INTEGER,
188
189    result_kind TEXT,
190
191    PRIMARY KEY (execution_id)
192) STRICT
193";
194
195// TODO: partial indexes
196const IDX_T_STATE_LOCK_PENDING: &str = r"
197CREATE INDEX IF NOT EXISTS idx_t_state_lock_pending ON t_state (state, pending_expires_finished, ffqn);
198";
199const IDX_T_STATE_EXPIRED_TIMERS: &str = r"
200CREATE INDEX IF NOT EXISTS idx_t_state_expired_timers ON t_state (pending_expires_finished) WHERE executor_id IS NOT NULL;
201";
202const IDX_T_STATE_EXECUTION_ID_IS_TOP_LEVEL: &str = r"
203CREATE INDEX IF NOT EXISTS idx_t_state_execution_id_is_root ON t_state (execution_id, is_top_level);
204";
205// For `list_executions` by ffqn
206const IDX_T_STATE_FFQN: &str = r"
207CREATE INDEX IF NOT EXISTS idx_t_state_ffqn ON t_state (ffqn);
208";
209// For `list_executions` by creation date
210const IDX_T_STATE_CREATED_AT: &str = r"
211CREATE INDEX IF NOT EXISTS idx_t_state_created_at ON t_state (created_at);
212";
213
214/// Represents [`ExpiredTimer::AsyncDelay`] . Rows are deleted when the delay is processed.
215const CREATE_TABLE_T_DELAY: &str = r"
216CREATE TABLE IF NOT EXISTS t_delay (
217    execution_id TEXT NOT NULL,
218    join_set_id TEXT NOT NULL,
219    delay_id TEXT NOT NULL,
220    expires_at TEXT NOT NULL,
221    PRIMARY KEY (execution_id, join_set_id, delay_id)
222) STRICT
223";
224
225// Append only.
226const CREATE_TABLE_T_BACKTRACE: &str = r"
227CREATE TABLE IF NOT EXISTS t_backtrace (
228    execution_id TEXT NOT NULL,
229    component_id TEXT NOT NULL,
230    version_min_including INTEGER NOT NULL,
231    version_max_excluding INTEGER NOT NULL,
232    wasm_backtrace TEXT NOT NULL,
233    PRIMARY KEY (execution_id, version_min_including, version_max_excluding)
234) STRICT
235";
236// Index for searching backtraces by execution_id and version
237const IDX_T_BACKTRACE_EXECUTION_ID_VERSION: &str = r"
238CREATE INDEX IF NOT EXISTS idx_t_backtrace_execution_id_version ON t_backtrace (execution_id, version_min_including, version_max_excluding);
239";
240
241#[derive(Debug, thiserror::Error, Clone)]
242enum RusqliteError {
243    #[error("not found")]
244    NotFound,
245    #[error("generic: {0}")]
246    Generic(StrVariant),
247}
248
249mod conversions {
250
251    use super::RusqliteError;
252    use concepts::storage::{DbErrorGeneric, DbErrorRead, DbErrorReadWithTimeout, DbErrorWrite};
253    use rusqlite::{
254        ToSql,
255        types::{FromSql, FromSqlError},
256    };
257    use std::fmt::Debug;
258    use tracing::error;
259
260    impl From<rusqlite::Error> for RusqliteError {
261        fn from(err: rusqlite::Error) -> Self {
262            if matches!(err, rusqlite::Error::QueryReturnedNoRows) {
263                RusqliteError::NotFound
264            } else {
265                error!(backtrace = %std::backtrace::Backtrace::capture(), "Sqlite error {err:?}");
266                RusqliteError::Generic(err.to_string().into())
267            }
268        }
269    }
270
271    impl From<RusqliteError> for DbErrorGeneric {
272        fn from(err: RusqliteError) -> DbErrorGeneric {
273            match err {
274                RusqliteError::NotFound => DbErrorGeneric::Uncategorized("not found".into()),
275                RusqliteError::Generic(str) => DbErrorGeneric::Uncategorized(str),
276            }
277        }
278    }
279    impl From<RusqliteError> for DbErrorRead {
280        fn from(err: RusqliteError) -> Self {
281            if matches!(err, RusqliteError::NotFound) {
282                Self::NotFound
283            } else {
284                Self::from(DbErrorGeneric::from(err))
285            }
286        }
287    }
288    impl From<RusqliteError> for DbErrorReadWithTimeout {
289        fn from(err: RusqliteError) -> Self {
290            Self::from(DbErrorRead::from(err))
291        }
292    }
293    impl From<RusqliteError> for DbErrorWrite {
294        fn from(err: RusqliteError) -> Self {
295            if matches!(err, RusqliteError::NotFound) {
296                Self::NotFound
297            } else {
298                Self::from(DbErrorGeneric::from(err))
299            }
300        }
301    }
302
303    pub(crate) struct JsonWrapper<T>(pub(crate) T);
304    impl<T: serde::de::DeserializeOwned + 'static + Debug> FromSql for JsonWrapper<T> {
305        fn column_result(
306            value: rusqlite::types::ValueRef<'_>,
307        ) -> rusqlite::types::FromSqlResult<Self> {
308            let value = match value {
309                rusqlite::types::ValueRef::Text(value) | rusqlite::types::ValueRef::Blob(value) => {
310                    Ok(value)
311                }
312                other => {
313                    error!(
314                        backtrace = %std::backtrace::Backtrace::capture(),
315                        "Unexpected type when conveting to JSON - expected Text or Blob, got type `{other:?}`",
316                    );
317                    Err(FromSqlError::InvalidType)
318                }
319            }?;
320            let value = serde_json::from_slice::<T>(value).map_err(|err| {
321                error!(
322                    backtrace = %std::backtrace::Backtrace::capture(),
323                    "Cannot convert JSON value `{value:?}` to type:`{type}` - {err:?}",
324                    r#type = std::any::type_name::<T>()
325                );
326                FromSqlError::InvalidType
327            })?;
328            Ok(Self(value))
329        }
330    }
331    impl<T: serde::ser::Serialize + Debug> ToSql for JsonWrapper<T> {
332        fn to_sql(&self) -> rusqlite::Result<rusqlite::types::ToSqlOutput<'_>> {
333            let string = serde_json::to_string(&self.0).map_err(|err| {
334                error!(
335                    "Cannot serialize {value:?} of type `{type}` - {err:?}",
336                    value = self.0,
337                    r#type = std::any::type_name::<T>()
338                );
339                rusqlite::Error::ToSqlConversionFailure(Box::new(err))
340            })?;
341            Ok(rusqlite::types::ToSqlOutput::Owned(
342                rusqlite::types::Value::Text(string),
343            ))
344        }
345    }
346
347    // Used as a wrapper for `FromSqlError::Other`
348    #[derive(Debug, thiserror::Error)]
349    #[error("{0}")]
350    pub(crate) struct OtherError(&'static str);
351    pub(crate) fn consistency_rusqlite(input: &'static str) -> rusqlite::Error {
352        FromSqlError::other(OtherError(input)).into()
353    }
354
355    pub(crate) fn consistency_db_err(input: &'static str) -> DbErrorGeneric {
356        DbErrorGeneric::Uncategorized(input.into())
357    }
358}
359
360#[derive(Debug)]
361struct CombinedStateDTO {
362    state: String,
363    ffqn: String,
364    component_id_input_digest: InputContentDigest,
365    pending_expires_finished: DateTime<Utc>,
366    // Locked:
367    last_lock_version: Option<Version>,
368    executor_id: Option<ExecutorId>,
369    run_id: Option<RunId>,
370    // Blocked by join set:
371    join_set_id: Option<JoinSetId>,
372    join_set_closing: Option<bool>,
373    // Finished:
374    result_kind: Option<PendingStateFinishedResultKind>,
375}
376#[derive(Debug)]
377struct CombinedState {
378    ffqn: FunctionFqn,
379    pending_state: PendingState,
380    corresponding_version: Version,
381}
382impl CombinedState {
383    fn get_next_version_assert_not_finished(&self) -> Version {
384        assert!(!self.pending_state.is_finished());
385        self.corresponding_version.increment()
386    }
387
388    #[cfg(feature = "test")]
389    fn get_next_version_or_finished(&self) -> Version {
390        if self.pending_state.is_finished() {
391            self.corresponding_version.clone()
392        } else {
393            self.corresponding_version.increment()
394        }
395    }
396}
397
398#[derive(Debug)]
399struct NotifierPendingAt {
400    scheduled_at: DateTime<Utc>,
401    ffqn: FunctionFqn,
402    component_input_digest: InputContentDigest,
403}
404
405#[derive(Debug)]
406struct NotifierExecutionFinished {
407    execution_id: ExecutionId,
408    retval: SupportedFunctionReturnValue,
409}
410
411#[derive(Debug, Default)]
412struct AppendNotifier {
413    pending_at: Option<NotifierPendingAt>,
414    execution_finished: Option<NotifierExecutionFinished>,
415    response: Option<(ExecutionId, JoinSetResponseEventOuter)>,
416}
417
418impl CombinedState {
419    fn new(dto: CombinedStateDTO, corresponding_version: Version) -> Result<Self, rusqlite::Error> {
420        let ffqn = FunctionFqn::from_str(&dto.ffqn).map_err(|parse_err| {
421            error!("Error parsing ffqn of {dto:?} - {parse_err:?}");
422            consistency_rusqlite("invalid ffqn value in `t_state`")
423        })?;
424        let pending_state = match dto {
425            // Pending - just created
426            CombinedStateDTO {
427                state,
428                ffqn: _,
429                component_id_input_digest,
430                pending_expires_finished: scheduled_at,
431                last_lock_version: None,
432                executor_id: None,
433                run_id: None,
434                join_set_id: None,
435                join_set_closing: None,
436                result_kind: None,
437            } if state == STATE_PENDING_AT => PendingState::PendingAt {
438                scheduled_at,
439                last_lock: None,
440                component_id_input_digest,
441            },
442            // Pending, previously locked
443            CombinedStateDTO {
444                state,
445                ffqn: _,
446                component_id_input_digest,
447                pending_expires_finished: scheduled_at,
448                last_lock_version: None,
449                executor_id: Some(executor_id),
450                run_id: Some(run_id),
451                join_set_id: None,
452                join_set_closing: None,
453                result_kind: None,
454            } if state == STATE_PENDING_AT => PendingState::PendingAt {
455                scheduled_at,
456                last_lock: Some(LockedBy {
457                    executor_id,
458                    run_id,
459                }),
460                component_id_input_digest,
461            },
462            CombinedStateDTO {
463                state,
464                ffqn: _,
465                component_id_input_digest,
466                pending_expires_finished: lock_expires_at,
467                last_lock_version: Some(_),
468                executor_id: Some(executor_id),
469                run_id: Some(run_id),
470                join_set_id: None,
471                join_set_closing: None,
472                result_kind: None,
473            } if state == STATE_LOCKED => PendingState::Locked(PendingStateLocked {
474                locked_by: LockedBy {
475                    executor_id,
476                    run_id,
477                },
478                lock_expires_at,
479                component_id_input_digest,
480            }),
481            CombinedStateDTO {
482                state,
483                ffqn: _,
484                component_id_input_digest,
485                pending_expires_finished: lock_expires_at,
486                last_lock_version: None,
487                executor_id: _,
488                run_id: _,
489                join_set_id: Some(join_set_id),
490                join_set_closing: Some(join_set_closing),
491                result_kind: None,
492            } if state == STATE_BLOCKED_BY_JOIN_SET => PendingState::BlockedByJoinSet {
493                join_set_id: join_set_id.clone(),
494                closing: join_set_closing,
495                lock_expires_at,
496                component_id_input_digest,
497            },
498            CombinedStateDTO {
499                state,
500                ffqn: _,
501                component_id_input_digest,
502                pending_expires_finished: finished_at,
503                last_lock_version: None,
504                executor_id: None,
505                run_id: None,
506                join_set_id: None,
507                join_set_closing: None,
508                result_kind: Some(result_kind),
509            } if state == STATE_FINISHED => PendingState::Finished {
510                finished: PendingStateFinished {
511                    finished_at,
512                    version: corresponding_version.0,
513                    result_kind,
514                },
515                component_id_input_digest,
516            },
517
518            _ => {
519                error!("Cannot deserialize pending state from  {dto:?}");
520                return Err(consistency_rusqlite("invalid `t_state`"));
521            }
522        };
523        Ok(Self {
524            ffqn,
525            pending_state,
526            corresponding_version,
527        })
528    }
529}
530
531#[derive(derive_more::Debug)]
532struct LogicalTx {
533    #[debug(skip)]
534    func: Box<dyn FnMut(&mut Transaction) + Send>,
535    sent_at: Instant,
536    func_name: &'static str,
537    #[debug(skip)]
538    commit_ack_sender: oneshot::Sender<Result<(), RusqliteError>>,
539}
540
541#[derive(derive_more::Debug)]
542enum ThreadCommand {
543    LogicalTx(LogicalTx),
544    Shutdown,
545}
546
547#[derive(Clone)]
548pub struct SqlitePool(SqlitePoolInner);
549
550type ResponseSubscribers =
551    Arc<Mutex<HashMap<ExecutionId, (oneshot::Sender<JoinSetResponseEventOuter>, u64)>>>;
552type PendingSubscribers = Arc<Mutex<PendingFfqnSubscribersHolder>>;
553type ExecutionFinishedSubscribers =
554    Mutex<HashMap<ExecutionId, HashMap<u64, oneshot::Sender<SupportedFunctionReturnValue>>>>;
555
556#[derive(Default)]
557struct PendingFfqnSubscribersHolder {
558    by_ffqns: HashMap<FunctionFqn, (mpsc::Sender<()>, u64)>,
559    by_component: HashMap<InputContentDigest /* input digest */, (mpsc::Sender<()>, u64)>,
560}
561impl PendingFfqnSubscribersHolder {
562    fn notify(&self, notifier: &NotifierPendingAt) {
563        if let Some((subscription, _)) = self.by_ffqns.get(&notifier.ffqn) {
564            debug!("Notifying pending subscriber by ffqn");
565            // Does not block
566            let _ = subscription.try_send(());
567        }
568        if let Some((subscription, _)) = self.by_component.get(&notifier.component_input_digest) {
569            debug!("Notifying pending subscriber by component");
570            // Does not block
571            let _ = subscription.try_send(());
572        }
573    }
574
575    fn insert_ffqn(&mut self, ffqn: FunctionFqn, value: (mpsc::Sender<()>, u64)) {
576        self.by_ffqns.insert(ffqn, value);
577    }
578
579    fn remove_ffqn(&mut self, ffqn: &FunctionFqn) -> Option<(mpsc::Sender<()>, u64)> {
580        self.by_ffqns.remove(ffqn)
581    }
582
583    fn insert_by_component(
584        &mut self,
585        input_content_digest: InputContentDigest,
586        value: (mpsc::Sender<()>, u64),
587    ) {
588        self.by_component.insert(input_content_digest, value);
589    }
590
591    fn remove_by_component(
592        &mut self,
593        input_content_digest: &InputContentDigest,
594    ) -> Option<(mpsc::Sender<()>, u64)> {
595        self.by_component.remove(input_content_digest)
596    }
597}
598
599#[derive(Clone)]
600struct SqlitePoolInner {
601    shutdown_requested: Arc<AtomicBool>,
602    shutdown_finished: Arc<AtomicBool>,
603    command_tx: tokio::sync::mpsc::Sender<ThreadCommand>,
604    response_subscribers: ResponseSubscribers,
605    pending_subscribers: PendingSubscribers,
606    execution_finished_subscribers: Arc<ExecutionFinishedSubscribers>,
607    join_handle: Option<Arc<std::thread::JoinHandle<()>>>, // always Some, Optional for swapping in drop.
608}
609
610#[async_trait]
611impl DbPoolCloseable for SqlitePool {
612    async fn close(&self) {
613        debug!("Sqlite is closing");
614        self.0.shutdown_requested.store(true, Ordering::Release);
615        // Unblock the thread's blocking_recv. If the capacity is reached, the next processed message will trigger shutdown.
616        let _ = self.0.command_tx.try_send(ThreadCommand::Shutdown);
617        while !self.0.shutdown_finished.load(Ordering::Acquire) {
618            tokio::time::sleep(Duration::from_millis(1)).await;
619        }
620        debug!("Sqlite was closed");
621    }
622}
623
624#[async_trait]
625impl DbPool for SqlitePool {
626    async fn db_exec_conn(&self) -> Result<Box<dyn DbExecutor>, DbErrorGeneric> {
627        if self.0.shutdown_requested.load(Ordering::Acquire) {
628            return Err(DbErrorGeneric::Close);
629        }
630        Ok(Box::new(self.clone()))
631    }
632
633    async fn connection(&self) -> Result<Box<dyn DbConnection>, DbErrorGeneric> {
634        if self.0.shutdown_requested.load(Ordering::Acquire) {
635            return Err(DbErrorGeneric::Close);
636        }
637        Ok(Box::new(self.clone()))
638    }
639    async fn external_api_conn(&self) -> Result<Box<dyn DbExternalApi>, DbErrorGeneric> {
640        if self.0.shutdown_requested.load(Ordering::Acquire) {
641            return Err(DbErrorGeneric::Close);
642        }
643        Ok(Box::new(self.clone()))
644    }
645    #[cfg(feature = "test")]
646    async fn connection_test(
647        &self,
648    ) -> Result<Box<dyn concepts::storage::DbConnectionTest>, DbErrorGeneric> {
649        if self.0.shutdown_requested.load(Ordering::Acquire) {
650            return Err(DbErrorGeneric::Close);
651        }
652        Ok(Box::new(self.clone()))
653    }
654}
655
656impl Drop for SqlitePool {
657    fn drop(&mut self) {
658        let arc = self.0.join_handle.take().expect("join_handle was set");
659        if let Ok(join_handle) = Arc::try_unwrap(arc) {
660            // Last holder
661            if !join_handle.is_finished() {
662                if !self.0.shutdown_finished.load(Ordering::Acquire) {
663                    // Best effort to shut down the sqlite thread.
664                    let backtrace = std::backtrace::Backtrace::capture();
665                    warn!("SqlitePool was not closed properly - {backtrace}");
666                    self.0.shutdown_requested.store(true, Ordering::Release);
667                    // Unblock the thread's blocking_recv. If the capacity is reached, the next processed message will trigger shutdown.
668                    let _ = self.0.command_tx.try_send(ThreadCommand::Shutdown);
669                    // Not joining the thread, drop might be called from async context.
670                    // We are shutting down the server anyway.
671                } else {
672                    // The thread set `shutdown_finished` as its last operation.
673                }
674            }
675        }
676    }
677}
678
679#[derive(Debug, Clone)]
680pub struct SqliteConfig {
681    pub queue_capacity: usize,
682    pub pragma_override: Option<HashMap<String, String>>,
683    pub metrics_threshold: Option<Duration>,
684}
685impl Default for SqliteConfig {
686    fn default() -> Self {
687        Self {
688            queue_capacity: 100,
689            pragma_override: None,
690            metrics_threshold: None,
691        }
692    }
693}
694
695impl SqlitePool {
696    fn init_thread(
697        path: &Path,
698        mut pragma_override: HashMap<String, String>,
699    ) -> Result<Connection, InitializationError> {
700        fn conn_execute<P: Params>(
701            conn: &Connection,
702            sql: &str,
703            params: P,
704        ) -> Result<(), InitializationError> {
705            conn.execute(sql, params).map(|_| ()).map_err(|err| {
706                error!("Cannot run `{sql}` - {err:?}");
707                InitializationError
708            })
709        }
710        fn pragma_update(
711            conn: &Connection,
712            name: &str,
713            value: &str,
714        ) -> Result<(), InitializationError> {
715            if value.is_empty() {
716                debug!("Querying PRAGMA {name}");
717                conn.pragma_query(None, name, |row| {
718                    debug!("{row:?}");
719                    Ok(())
720                })
721                .map_err(|err| {
722                    error!("cannot update pragma `{name}`=`{value}` - {err:?}");
723                    InitializationError
724                })
725            } else {
726                debug!("Setting PRAGMA {name}={value}");
727                conn.pragma_update(None, name, value).map_err(|err| {
728                    error!("cannot update pragma `{name}`=`{value}` - {err:?}");
729                    InitializationError
730                })
731            }
732        }
733
734        let conn = Connection::open_with_flags(path, OpenFlags::default()).map_err(|err| {
735            error!("cannot open the connection - {err:?}");
736            InitializationError
737        })?;
738
739        for [pragma_name, default_value] in PRAGMA {
740            let pragma_value = pragma_override
741                .remove(pragma_name)
742                .unwrap_or_else(|| default_value.to_string());
743            pragma_update(&conn, pragma_name, &pragma_value)?;
744        }
745        // drain the rest overrides
746        for (pragma_name, pragma_value) in pragma_override.drain() {
747            pragma_update(&conn, &pragma_name, &pragma_value)?;
748        }
749
750        // t_metadata
751        {
752            conn_execute(&conn, CREATE_TABLE_T_METADATA, [])?;
753            // Insert row if not exists.
754
755            let actual_version = conn
756                .prepare("SELECT schema_version FROM t_metadata ORDER BY id DESC LIMIT 1")
757                .map_err(|err| {
758                    error!("cannot select schema version - {err:?}");
759                    InitializationError
760                })?
761                .query_row([], |row| row.get::<_, u32>("schema_version"))
762                .optional()
763                .map_err(|err| {
764                    error!("Cannot read the schema version - {err:?}");
765                    InitializationError
766                })?;
767
768            match actual_version {
769                None => conn_execute(
770                    &conn,
771                    &format!(
772                        "INSERT INTO t_metadata (schema_version, created_at) VALUES
773                            ({T_METADATA_EXPECTED_SCHEMA_VERSION}, ?) ON CONFLICT DO NOTHING"
774                    ),
775                    [Utc::now()],
776                )?,
777                Some(actual_version) => {
778                    // Fail on unexpected `schema_version`.
779                    if actual_version != T_METADATA_EXPECTED_SCHEMA_VERSION {
780                        error!(
781                            "wrong schema version, expected {T_METADATA_EXPECTED_SCHEMA_VERSION}, got {actual_version}"
782                        );
783                        return Err(InitializationError);
784                    }
785                }
786            }
787        }
788
789        // t_execution_log
790        conn_execute(&conn, CREATE_TABLE_T_EXECUTION_LOG, [])?;
791        conn_execute(
792            &conn,
793            CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_VERSION,
794            [],
795        )?;
796        conn_execute(
797            &conn,
798            CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_VARIANT,
799            [],
800        )?;
801        conn_execute(
802            &conn,
803            CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_JOIN_SET,
804            [],
805        )?;
806        // t_join_set_response
807        conn_execute(&conn, CREATE_TABLE_T_JOIN_SET_RESPONSE, [])?;
808        conn_execute(
809            &conn,
810            CREATE_INDEX_IDX_T_JOIN_SET_RESPONSE_EXECUTION_ID_ID,
811            [],
812        )?;
813        conn_execute(
814            &conn,
815            CREATE_INDEX_IDX_JOIN_SET_RESPONSE_UNIQUE_CHILD_ID,
816            [],
817        )?;
818        conn_execute(
819            &conn,
820            CREATE_INDEX_IDX_JOIN_SET_RESPONSE_UNIQUE_DELAY_ID,
821            [],
822        )?;
823        // t_state
824        conn_execute(&conn, CREATE_TABLE_T_STATE, [])?;
825        conn_execute(&conn, IDX_T_STATE_LOCK_PENDING, [])?;
826        conn_execute(&conn, IDX_T_STATE_EXPIRED_TIMERS, [])?;
827        conn_execute(&conn, IDX_T_STATE_EXECUTION_ID_IS_TOP_LEVEL, [])?;
828        conn_execute(&conn, IDX_T_STATE_FFQN, [])?;
829        conn_execute(&conn, IDX_T_STATE_CREATED_AT, [])?;
830        // t_delay
831        conn_execute(&conn, CREATE_TABLE_T_DELAY, [])?;
832        // t_backtrace
833        conn_execute(&conn, CREATE_TABLE_T_BACKTRACE, [])?;
834        conn_execute(&conn, IDX_T_BACKTRACE_EXECUTION_ID_VERSION, [])?;
835        Ok(conn)
836    }
837
838    fn connection_rpc(
839        mut conn: Connection,
840        shutdown_requested: &AtomicBool,
841        shutdown_finished: &AtomicBool,
842        mut command_rx: mpsc::Receiver<ThreadCommand>,
843        metrics_threshold: Option<Duration>,
844    ) {
845        let mut histograms = Histograms::new(metrics_threshold);
846        while Self::tick(
847            &mut conn,
848            shutdown_requested,
849            &mut command_rx,
850            &mut histograms,
851        )
852        .is_ok()
853        {
854            // Loop until shutdown is set to true.
855        }
856        debug!("Closing command thread");
857        shutdown_finished.store(true, Ordering::Release);
858    }
859
860    fn tick(
861        conn: &mut Connection,
862        shutdown_requested: &AtomicBool,
863        command_rx: &mut mpsc::Receiver<ThreadCommand>,
864        histograms: &mut Histograms,
865    ) -> Result<(), ()> {
866        let mut ltx_list = Vec::new();
867        // Wait for first logical tx.
868        let mut ltx = match command_rx.blocking_recv() {
869            Some(ThreadCommand::LogicalTx(ltx)) => ltx,
870            Some(ThreadCommand::Shutdown) => {
871                debug!("Shutdown message received");
872                return Err(());
873            }
874            None => {
875                debug!("command_rx was closed");
876                return Err(());
877            }
878        };
879        let all_fns_start = std::time::Instant::now();
880        let mut physical_tx = conn
881            .transaction_with_behavior(TransactionBehavior::Immediate)
882            .map_err(|begin_err| {
883                error!("Cannot open transaction, closing sqlite - {begin_err:?}");
884            })?;
885        Self::ltx_apply_to_tx(&mut ltx, &mut physical_tx, histograms);
886        ltx_list.push(ltx);
887
888        while let Ok(more) = command_rx.try_recv() {
889            let mut ltx = match more {
890                ThreadCommand::Shutdown => {
891                    debug!("Shutdown message received");
892                    return Err(());
893                }
894                ThreadCommand::LogicalTx(ltx) => ltx,
895            };
896
897            Self::ltx_apply_to_tx(&mut ltx, &mut physical_tx, histograms);
898            ltx_list.push(ltx);
899        }
900        histograms.record_all_fns(all_fns_start.elapsed());
901
902        {
903            // Commit
904            if shutdown_requested.load(Ordering::Relaxed) {
905                debug!("Recveived shutdown during processing of the batch");
906                return Err(());
907            }
908            let commit_result = {
909                let now = std::time::Instant::now();
910                let commit_result = physical_tx.commit().map_err(RusqliteError::from);
911                histograms.record_commit(now.elapsed());
912                commit_result
913            };
914            if commit_result.is_ok() || ltx_list.len() == 1 {
915                // Happy path - just send the commit ACK.
916                for ltx in ltx_list {
917                    // Ignore the result: ThreadCommand producer timed out before awaiting the ack.
918                    let _ = ltx.commit_ack_sender.send(commit_result.clone());
919                }
920            } else {
921                warn!(
922                    "Bulk transaction failed, committing {} transactions serially",
923                    ltx_list.len()
924                );
925                // Bulk transaction failed. Replay each ltx in its own physical transaction.
926                // TODO: Use SAVEPOINT + RELEASE SAVEPOINT, ROLLBACK savepoint instead.
927                for ltx in ltx_list {
928                    Self::ltx_commit_single(ltx, conn, shutdown_requested, histograms)?;
929                }
930            }
931        }
932        histograms.print_if_elapsed();
933        Ok(())
934    }
935
936    fn ltx_commit_single(
937        mut ltx: LogicalTx,
938        conn: &mut Connection,
939        shutdown_requested: &AtomicBool,
940        histograms: &mut Histograms,
941    ) -> Result<(), ()> {
942        let mut physical_tx = conn
943            .transaction_with_behavior(TransactionBehavior::Immediate)
944            .map_err(|begin_err| {
945                error!("Cannot open transaction, closing sqlite - {begin_err:?}");
946            })?;
947        Self::ltx_apply_to_tx(&mut ltx, &mut physical_tx, histograms);
948        if shutdown_requested.load(Ordering::Relaxed) {
949            debug!("Recveived shutdown during processing of the batch");
950            return Err(());
951        }
952        let commit_result = {
953            let now = std::time::Instant::now();
954            let commit_result = physical_tx.commit().map_err(RusqliteError::from);
955            histograms.record_commit(now.elapsed());
956            commit_result
957        };
958        // Ignore the result: ThreadCommand producer timed out before awaiting the ack.
959        let _ = ltx.commit_ack_sender.send(commit_result);
960        Ok(())
961    }
962
963    fn ltx_apply_to_tx(
964        ltx: &mut LogicalTx,
965        physical_tx: &mut Transaction,
966        histograms: &mut Histograms,
967    ) {
968        let sent_latency = ltx.sent_at.elapsed();
969        let started_at = Instant::now();
970        (ltx.func)(physical_tx);
971        histograms.record_command(sent_latency, ltx.func_name, started_at.elapsed());
972    }
973
974    #[instrument(skip_all, name = "sqlite_new")]
975    pub async fn new<P: AsRef<Path>>(
976        path: P,
977        config: SqliteConfig,
978    ) -> Result<Self, InitializationError> {
979        let path = path.as_ref().to_owned();
980
981        let shutdown_requested = Arc::new(AtomicBool::new(false));
982        let shutdown_finished = Arc::new(AtomicBool::new(false));
983
984        let (command_tx, command_rx) = tokio::sync::mpsc::channel(config.queue_capacity);
985        info!("Sqlite database location: {path:?}");
986        let join_handle = {
987            // Initialize the `Connection`.
988            let init_task = {
989                tokio::task::spawn_blocking(move || {
990                    Self::init_thread(&path, config.pragma_override.unwrap_or_default())
991                })
992                .await
993            };
994            let conn = match init_task {
995                Ok(res) => res?,
996                Err(join_err) => {
997                    error!("Initialization panic - {join_err:?}");
998                    return Err(InitializationError);
999                }
1000            };
1001            let shutdown_requested = shutdown_requested.clone();
1002            let shutdown_finished = shutdown_finished.clone();
1003            // Start the RPC thread.
1004            std::thread::spawn(move || {
1005                Self::connection_rpc(
1006                    conn,
1007                    &shutdown_requested,
1008                    &shutdown_finished,
1009                    command_rx,
1010                    config.metrics_threshold,
1011                );
1012            })
1013        };
1014        Ok(SqlitePool(SqlitePoolInner {
1015            shutdown_requested,
1016            shutdown_finished,
1017            command_tx,
1018            response_subscribers: Arc::default(),
1019            pending_subscribers: Arc::default(),
1020            join_handle: Some(Arc::new(join_handle)),
1021            execution_finished_subscribers: Arc::default(),
1022        }))
1023    }
1024
1025    /// Invokes the provided function wrapping a new [`rusqlite::Transaction`] that is committed automatically.
1026    async fn transaction<F, T, E>(&self, mut func: F, func_name: &'static str) -> Result<T, E>
1027    where
1028        F: FnMut(&mut rusqlite::Transaction) -> Result<T, E> + Send + 'static,
1029        T: Send + 'static,
1030        E: From<DbErrorGeneric> + From<RusqliteError> + Send + 'static,
1031    {
1032        let fn_res: Arc<std::sync::Mutex<Option<_>>> = Arc::default();
1033        let (commit_ack_sender, commit_ack_receiver) = oneshot::channel();
1034        let thread_command_func = {
1035            let fn_res = fn_res.clone();
1036            ThreadCommand::LogicalTx(LogicalTx {
1037                func: Box::new(move |tx| {
1038                    let func_res = func(tx);
1039                    *fn_res.lock().unwrap() = Some(func_res);
1040                }),
1041                sent_at: Instant::now(),
1042                func_name,
1043                commit_ack_sender,
1044            })
1045        };
1046        self.0
1047            .command_tx
1048            .send(thread_command_func)
1049            .await
1050            .map_err(|_send_err| DbErrorGeneric::Close)?;
1051
1052        // Wait for commit ack, then get the retval from the mutex.
1053        match commit_ack_receiver.await {
1054            Ok(Ok(())) => {
1055                let mut guard = fn_res.lock().unwrap();
1056                std::mem::take(guard.deref_mut()).expect("ltx must have been run at least once")
1057            }
1058            Ok(Err(rusqlite_err)) => Err(E::from(rusqlite_err)),
1059            Err(_) => Err(E::from(DbErrorGeneric::Close)),
1060        }
1061    }
1062
1063    fn fetch_created_event(
1064        conn: &Connection,
1065        execution_id: &ExecutionId,
1066    ) -> Result<CreateRequest, DbErrorRead> {
1067        let mut stmt = conn.prepare(
1068            "SELECT created_at, json_value FROM t_execution_log WHERE \
1069            execution_id = :execution_id AND version = 0",
1070        )?;
1071        let (created_at, event) = stmt.query_row(
1072            named_params! {
1073                ":execution_id": execution_id.to_string(),
1074            },
1075            |row| {
1076                let created_at = row.get("created_at")?;
1077                let event = row
1078                    .get::<_, JsonWrapper<ExecutionRequest>>("json_value")
1079                    .map_err(|serde| {
1080                        error!("cannot deserialize `Created` event: {row:?} - `{serde:?}`");
1081                        consistency_rusqlite("cannot deserialize `Created` event")
1082                    })?;
1083                Ok((created_at, event.0))
1084            },
1085        )?;
1086        if let ExecutionRequest::Created {
1087            ffqn,
1088            params,
1089            parent,
1090            scheduled_at,
1091            component_id,
1092            metadata,
1093            scheduled_by,
1094        } = event
1095        {
1096            Ok(CreateRequest {
1097                created_at,
1098                execution_id: execution_id.clone(),
1099                ffqn,
1100                params,
1101                parent,
1102                scheduled_at,
1103                component_id,
1104                metadata,
1105                scheduled_by,
1106            })
1107        } else {
1108            error!("Row with version=0 must be a `Created` event - {event:?}");
1109            Err(consistency_db_err("expected `Created` event").into())
1110        }
1111    }
1112
1113    fn check_expected_next_and_appending_version(
1114        expected_version: &Version,
1115        appending_version: &Version,
1116    ) -> Result<(), DbErrorWrite> {
1117        if *expected_version != *appending_version {
1118            debug!(
1119                "Version conflict - expected: {expected_version:?}, appending: {appending_version:?}"
1120            );
1121            return Err(DbErrorWrite::NonRetriable(
1122                DbErrorWriteNonRetriable::VersionConflict {
1123                    expected: expected_version.clone(),
1124                    requested: appending_version.clone(),
1125                },
1126            ));
1127        }
1128        Ok(())
1129    }
1130
1131    #[instrument(level = Level::TRACE, skip_all, fields(execution_id = %req.execution_id))]
1132    fn create_inner(
1133        tx: &Transaction,
1134        req: CreateRequest,
1135    ) -> Result<(AppendResponse, AppendNotifier), DbErrorWrite> {
1136        trace!("create_inner");
1137
1138        let version = Version::default();
1139        let execution_id = req.execution_id.clone();
1140        let execution_id_str = execution_id.to_string();
1141        let ffqn = req.ffqn.clone();
1142        let created_at = req.created_at;
1143        let scheduled_at = req.scheduled_at;
1144        let component_id = req.component_id.clone();
1145        let event = ExecutionRequest::from(req);
1146        let event_ser = serde_json::to_string(&event).map_err(|err| {
1147            error!("Cannot serialize {event:?} - {err:?}");
1148            DbErrorWriteNonRetriable::ValidationFailed("parameter serialization error".into())
1149        })?;
1150        tx.prepare(
1151                "INSERT INTO t_execution_log (execution_id, created_at, version, json_value, variant, join_set_id ) \
1152                VALUES (:execution_id, :created_at, :version, :json_value, :variant, :join_set_id)")
1153        ?
1154        .execute(named_params! {
1155            ":execution_id": &execution_id_str,
1156            ":created_at": created_at,
1157            ":version": version.0,
1158            ":json_value": event_ser,
1159            ":variant": event.variant(),
1160            ":join_set_id": event.join_set_id().map(std::string::ToString::to_string),
1161        })
1162        ?;
1163        let pending_at = {
1164            debug!("Creating with `Pending(`{scheduled_at:?}`)");
1165            tx.prepare(
1166                r"
1167                INSERT INTO t_state (
1168                    execution_id,
1169                    is_top_level,
1170                    corresponding_version,
1171                    pending_expires_finished,
1172                    ffqn,
1173                    state,
1174                    created_at,
1175                    component_id_input_digest,
1176                    updated_at,
1177                    first_scheduled_at,
1178                    intermittent_event_count
1179                    )
1180                VALUES (
1181                    :execution_id,
1182                    :is_top_level,
1183                    :corresponding_version,
1184                    :pending_expires_finished,
1185                    :ffqn,
1186                    :state,
1187                    :created_at,
1188                    :component_id_input_digest,
1189                    CURRENT_TIMESTAMP,
1190                    :first_scheduled_at,
1191                    0
1192                    )
1193                ",
1194            )?
1195            .execute(named_params! {
1196                ":execution_id": execution_id.to_string(),
1197                ":is_top_level": execution_id.is_top_level(),
1198                ":corresponding_version": version.0,
1199                ":pending_expires_finished": scheduled_at,
1200                ":ffqn": ffqn.to_string(),
1201                ":state": STATE_PENDING_AT,
1202                ":created_at": created_at,
1203                ":component_id_input_digest": component_id.input_digest,
1204                ":first_scheduled_at": scheduled_at,
1205            })?;
1206            AppendNotifier {
1207                pending_at: Some(NotifierPendingAt {
1208                    scheduled_at,
1209                    ffqn,
1210                    component_input_digest: component_id.input_digest,
1211                }),
1212                execution_finished: None,
1213                response: None,
1214            }
1215        };
1216        let next_version = Version::new(version.0 + 1);
1217        Ok((next_version, pending_at))
1218    }
1219
1220    #[instrument(level = Level::DEBUG, skip_all, fields(%execution_id, %scheduled_at, %corresponding_version))]
1221    fn update_state_pending_after_response_appended(
1222        tx: &Transaction,
1223        execution_id: &ExecutionId,
1224        scheduled_at: DateTime<Utc>,     // Changing to state PendingAt
1225        corresponding_version: &Version, // t_execution_log is not changed
1226        component_input_digest: InputContentDigest,
1227    ) -> Result<AppendNotifier, DbErrorWrite> {
1228        debug!("Setting t_state to Pending(`{scheduled_at:?}`) after response appended");
1229        let mut stmt = tx
1230            .prepare_cached(
1231                r"
1232                UPDATE t_state
1233                SET
1234                    corresponding_version = :corresponding_version,
1235                    pending_expires_finished = :pending_expires_finished,
1236                    state = :state,
1237                    updated_at = CURRENT_TIMESTAMP,
1238
1239                    last_lock_version = NULL,
1240
1241                    join_set_id = NULL,
1242                    join_set_closing = NULL,
1243
1244                    result_kind = NULL
1245                WHERE execution_id = :execution_id
1246            ",
1247            )
1248            .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1249        let updated = stmt
1250            .execute(named_params! {
1251                ":execution_id": execution_id,
1252                ":corresponding_version": corresponding_version.0,
1253                ":pending_expires_finished": scheduled_at,
1254                ":state": STATE_PENDING_AT,
1255            })
1256            .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1257        if updated != 1 {
1258            return Err(DbErrorWrite::NotFound);
1259        }
1260        Ok(AppendNotifier {
1261            pending_at: Some(NotifierPendingAt {
1262                scheduled_at,
1263                ffqn: Self::fetch_created_event(tx, execution_id)?.ffqn,
1264                component_input_digest,
1265            }),
1266            execution_finished: None,
1267            response: None,
1268        })
1269    }
1270
1271    #[instrument(level = Level::DEBUG, skip_all, fields(%execution_id, %scheduled_at, %appending_version))]
1272    fn update_state_pending_after_event_appended(
1273        tx: &Transaction,
1274        execution_id: &ExecutionId,
1275        appending_version: &Version,
1276        scheduled_at: DateTime<Utc>, // Changing to state PendingAt
1277        intermittent_failure: bool,
1278        component_input_digest: InputContentDigest,
1279    ) -> Result<(AppendResponse, AppendNotifier), DbErrorWrite> {
1280        debug!("Setting t_state to Pending(`{scheduled_at:?}`) after event appended");
1281        // If response idx is unknown, use 0. This distinguishes the two paths.
1282        // JoinNext will always send an actual idx.
1283        let mut stmt = tx
1284            .prepare_cached(
1285                r"
1286                UPDATE t_state
1287                SET
1288                    corresponding_version = :appending_version,
1289                    pending_expires_finished = :pending_expires_finished,
1290                    state = :state,
1291                    updated_at = CURRENT_TIMESTAMP,
1292                    intermittent_event_count = intermittent_event_count + :intermittent_delta,
1293
1294                    last_lock_version = NULL,
1295
1296                    join_set_id = NULL,
1297                    join_set_closing = NULL,
1298
1299                    result_kind = NULL
1300                WHERE execution_id = :execution_id;
1301            ",
1302            )
1303            .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1304        let updated = stmt
1305            .execute(named_params! {
1306                ":execution_id": execution_id.to_string(),
1307                ":appending_version": appending_version.0,
1308                ":pending_expires_finished": scheduled_at,
1309                ":state": STATE_PENDING_AT,
1310                ":intermittent_delta": i32::from(intermittent_failure) // 0 or 1
1311            })
1312            .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1313        if updated != 1 {
1314            return Err(DbErrorWrite::NotFound);
1315        }
1316        Ok((
1317            appending_version.increment(),
1318            AppendNotifier {
1319                pending_at: Some(NotifierPendingAt {
1320                    scheduled_at,
1321                    ffqn: Self::fetch_created_event(tx, execution_id)?.ffqn,
1322                    component_input_digest,
1323                }),
1324                execution_finished: None,
1325                response: None,
1326            },
1327        ))
1328    }
1329
1330    fn update_state_locked_get_intermittent_event_count(
1331        tx: &Transaction,
1332        execution_id: &ExecutionId,
1333        executor_id: ExecutorId,
1334        run_id: RunId,
1335        lock_expires_at: DateTime<Utc>,
1336        appending_version: &Version,
1337        retry_config: ComponentRetryConfig,
1338    ) -> Result<u32, DbErrorWrite> {
1339        debug!("Setting t_state to Locked(`{lock_expires_at:?}`)");
1340        let backoff_millis = i64::try_from(retry_config.retry_exp_backoff.as_millis())
1341            .map_err(|_| DbErrorGeneric::Uncategorized("backoff too big".into()))?; // Keep equal to Postgres' BIGINT = i64
1342        let execution_id_str = execution_id.to_string();
1343        let mut stmt = tx
1344            .prepare_cached(
1345                r"
1346                UPDATE t_state
1347                SET
1348                    corresponding_version = :appending_version,
1349                    pending_expires_finished = :pending_expires_finished,
1350                    state = :state,
1351                    updated_at = CURRENT_TIMESTAMP,
1352
1353                    max_retries = :max_retries,
1354                    retry_exp_backoff_millis = :retry_exp_backoff_millis,
1355                    last_lock_version = :appending_version,
1356                    executor_id = :executor_id,
1357                    run_id = :run_id,
1358
1359                    join_set_id = NULL,
1360                    join_set_closing = NULL,
1361
1362                    result_kind = NULL
1363                WHERE execution_id = :execution_id
1364            ",
1365            )
1366            .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1367        let updated = stmt
1368            .execute(named_params! {
1369                ":execution_id": execution_id_str,
1370                ":appending_version": appending_version.0,
1371                ":pending_expires_finished": lock_expires_at,
1372                ":state": STATE_LOCKED,
1373                ":max_retries": retry_config.max_retries,
1374                ":retry_exp_backoff_millis": backoff_millis,
1375                ":executor_id": executor_id.to_string(),
1376                ":run_id": run_id.to_string(),
1377            })
1378            .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1379        if updated != 1 {
1380            return Err(DbErrorWrite::NotFound);
1381        }
1382
1383        // fetch intermittent event count from the just-inserted row.
1384        let intermittent_event_count = tx
1385            .prepare(
1386                "SELECT intermittent_event_count FROM t_state WHERE execution_id = :execution_id",
1387            )?
1388            .query_row(
1389                named_params! {
1390                    ":execution_id": execution_id_str,
1391                },
1392                |row| {
1393                    let intermittent_event_count = row.get("intermittent_event_count")?;
1394                    Ok(intermittent_event_count)
1395                },
1396            )
1397            .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1398
1399        Ok(intermittent_event_count)
1400    }
1401
1402    fn update_state_blocked(
1403        tx: &Transaction,
1404        execution_id: &ExecutionId,
1405        appending_version: &Version,
1406        // BlockedByJoinSet fields
1407        join_set_id: &JoinSetId,
1408        lock_expires_at: DateTime<Utc>,
1409        join_set_closing: bool,
1410    ) -> Result<
1411        AppendResponse, // next version
1412        DbErrorWrite,
1413    > {
1414        debug!("Setting t_state to BlockedByJoinSet(`{join_set_id}`)");
1415        let execution_id_str = execution_id.to_string();
1416        let mut stmt = tx.prepare_cached(
1417            r"
1418                UPDATE t_state
1419                SET
1420                    corresponding_version = :appending_version,
1421                    pending_expires_finished = :pending_expires_finished,
1422                    state = :state,
1423                    updated_at = CURRENT_TIMESTAMP,
1424
1425                    last_lock_version = NULL,
1426
1427                    join_set_id = :join_set_id,
1428                    join_set_closing = :join_set_closing,
1429
1430                    result_kind = NULL
1431                WHERE execution_id = :execution_id
1432            ",
1433        )?;
1434        let updated = stmt.execute(named_params! {
1435            ":execution_id": execution_id_str,
1436            ":appending_version": appending_version.0,
1437            ":pending_expires_finished": lock_expires_at,
1438            ":state": STATE_BLOCKED_BY_JOIN_SET,
1439            ":join_set_id": join_set_id,
1440            ":join_set_closing": join_set_closing,
1441        })?;
1442        if updated != 1 {
1443            return Err(DbErrorWrite::NotFound);
1444        }
1445        Ok(appending_version.increment())
1446    }
1447
1448    fn update_state_finished(
1449        tx: &Transaction,
1450        execution_id: &ExecutionId,
1451        appending_version: &Version,
1452        // Finished fields
1453        finished_at: DateTime<Utc>,
1454        result_kind: PendingStateFinishedResultKind,
1455    ) -> Result<(), DbErrorWrite> {
1456        debug!("Setting t_state to Finished");
1457        let execution_id_str = execution_id.to_string();
1458        let mut stmt = tx.prepare_cached(
1459            r"
1460                UPDATE t_state
1461                SET
1462                    corresponding_version = :appending_version,
1463                    pending_expires_finished = :pending_expires_finished,
1464                    state = :state,
1465                    updated_at = CURRENT_TIMESTAMP,
1466
1467                    last_lock_version = NULL,
1468                    executor_id = NULL,
1469                    run_id = NULL,
1470
1471                    join_set_id = NULL,
1472                    join_set_closing = NULL,
1473
1474                    result_kind = :result_kind
1475                WHERE execution_id = :execution_id
1476            ",
1477        )?;
1478
1479        let updated = stmt.execute(named_params! {
1480            ":execution_id": execution_id_str,
1481            ":appending_version": appending_version.0,
1482            ":pending_expires_finished": finished_at,
1483            ":state": STATE_FINISHED,
1484            ":result_kind": JsonWrapper(result_kind),
1485        })?;
1486        if updated != 1 {
1487            return Err(DbErrorWrite::NotFound);
1488        }
1489        Ok(())
1490    }
1491
1492    // Upon appending new event to t_execution_log, copy the previous t_state with changed appending_version and created_at.
1493    #[instrument(level = Level::TRACE, skip_all, fields(%execution_id, %appending_version))]
1494    fn bump_state_next_version(
1495        tx: &Transaction,
1496        execution_id: &ExecutionId,
1497        appending_version: &Version,
1498        delay_req: Option<DelayReq>,
1499    ) -> Result<AppendResponse /* next version */, DbErrorWrite> {
1500        debug!("update_index_version");
1501        let execution_id_str = execution_id.to_string();
1502        let mut stmt = tx.prepare_cached(
1503            r"
1504                UPDATE t_state
1505                SET
1506                    corresponding_version = :appending_version,
1507                    updated_at = CURRENT_TIMESTAMP
1508                WHERE execution_id = :execution_id
1509            ",
1510        )?;
1511        let updated = stmt.execute(named_params! {
1512            ":execution_id": execution_id_str,
1513            ":appending_version": appending_version.0,
1514        })?;
1515        if updated != 1 {
1516            return Err(DbErrorWrite::NotFound);
1517        }
1518        if let Some(DelayReq {
1519            join_set_id,
1520            delay_id,
1521            expires_at,
1522        }) = delay_req
1523        {
1524            debug!("Inserting delay to `t_delay`");
1525            let mut stmt = tx.prepare_cached(
1526                "INSERT INTO t_delay (execution_id, join_set_id, delay_id, expires_at) \
1527                VALUES \
1528                (:execution_id, :join_set_id, :delay_id, :expires_at)",
1529            )?;
1530            stmt.execute(named_params! {
1531                ":execution_id": execution_id_str,
1532                ":join_set_id": join_set_id.to_string(),
1533                ":delay_id": delay_id.to_string(),
1534                ":expires_at": expires_at,
1535            })?;
1536        }
1537        Ok(appending_version.increment())
1538    }
1539
1540    fn get_combined_state(
1541        tx: &Transaction,
1542        execution_id: &ExecutionId,
1543    ) -> Result<CombinedState, DbErrorRead> {
1544        let mut stmt = tx.prepare(
1545            r"
1546                SELECT
1547                    state, ffqn, component_id_input_digest, corresponding_version, pending_expires_finished,
1548                    last_lock_version, executor_id, run_id,
1549                    join_set_id, join_set_closing,
1550                    result_kind
1551                    FROM t_state
1552                WHERE
1553                    execution_id = :execution_id
1554                ",
1555        )?;
1556        stmt.query_row(
1557            named_params! {
1558                ":execution_id": execution_id.to_string(),
1559            },
1560            |row| {
1561                CombinedState::new(
1562                    CombinedStateDTO {
1563                        component_id_input_digest: row.get("component_id_input_digest")?,
1564                        state: row.get("state")?,
1565                        ffqn: row.get("ffqn")?,
1566                        pending_expires_finished: row
1567                            .get::<_, DateTime<Utc>>("pending_expires_finished")?,
1568                        last_lock_version: row
1569                            .get::<_, Option<VersionType>>("last_lock_version")?
1570                            .map(Version::new),
1571                        executor_id: row.get::<_, Option<ExecutorId>>("executor_id")?,
1572                        run_id: row.get::<_, Option<RunId>>("run_id")?,
1573                        join_set_id: row.get::<_, Option<JoinSetId>>("join_set_id")?,
1574                        join_set_closing: row.get::<_, Option<bool>>("join_set_closing")?,
1575                        result_kind: row
1576                            .get::<_, Option<JsonWrapper<PendingStateFinishedResultKind>>>(
1577                                "result_kind",
1578                            )?
1579                            .map(|wrapper| wrapper.0),
1580                    },
1581                    Version::new(row.get("corresponding_version")?),
1582                )
1583            },
1584        )
1585        .map_err(DbErrorRead::from)
1586    }
1587
1588    fn list_executions(
1589        read_tx: &Transaction,
1590        ffqn: Option<&FunctionFqn>,
1591        top_level_only: bool,
1592        pagination: &ExecutionListPagination,
1593    ) -> Result<Vec<ExecutionWithState>, DbErrorGeneric> {
1594        #[derive(Debug)]
1595        struct StatementModifier<'a> {
1596            where_vec: Vec<String>,
1597            params: Vec<(&'static str, ToSqlOutput<'a>)>,
1598            limit: u32,
1599            limit_desc: bool,
1600        }
1601
1602        fn paginate<'a, T: rusqlite::ToSql + 'static>(
1603            pagination: &'a Pagination<Option<T>>,
1604            column: &str,
1605            top_level_only: bool,
1606        ) -> Result<StatementModifier<'a>, DbErrorGeneric> {
1607            let mut where_vec: Vec<String> = vec![];
1608            let mut params: Vec<(&'static str, ToSqlOutput<'a>)> = vec![];
1609            let limit = pagination.length();
1610            let limit_desc = pagination.is_desc();
1611            match pagination {
1612                Pagination::NewerThan {
1613                    cursor: Some(cursor),
1614                    ..
1615                }
1616                | Pagination::OlderThan {
1617                    cursor: Some(cursor),
1618                    ..
1619                } => {
1620                    where_vec.push(format!("{column} {rel} :cursor", rel = pagination.rel()));
1621                    let cursor = cursor.to_sql().map_err(|err| {
1622                        error!("Possible program error - cannot convert cursor to sql - {err:?}");
1623                        DbErrorGeneric::Uncategorized("cannot convert cursor to sql".into())
1624                    })?;
1625                    params.push((":cursor", cursor));
1626                }
1627                _ => {}
1628            }
1629            if top_level_only {
1630                where_vec.push("is_top_level=true".to_string());
1631            }
1632            Ok(StatementModifier {
1633                where_vec,
1634                params,
1635                limit: u32::from(limit),
1636                limit_desc,
1637            })
1638        }
1639
1640        let mut statement_mod = match pagination {
1641            ExecutionListPagination::CreatedBy(pagination) => {
1642                paginate(pagination, "created_at", top_level_only)?
1643            }
1644            ExecutionListPagination::ExecutionId(pagination) => {
1645                paginate(pagination, "execution_id", top_level_only)?
1646            }
1647        };
1648
1649        let ffqn_temporary;
1650        if let Some(ffqn) = ffqn {
1651            statement_mod.where_vec.push("ffqn = :ffqn".to_string());
1652            ffqn_temporary = ffqn.to_string();
1653            let ffqn = ffqn_temporary
1654                .to_sql()
1655                .expect("string conversion never fails");
1656
1657            statement_mod.params.push((":ffqn", ffqn));
1658        }
1659
1660        let where_str = if statement_mod.where_vec.is_empty() {
1661            String::new()
1662        } else {
1663            format!("WHERE {}", statement_mod.where_vec.join(" AND "))
1664        };
1665        let sql = format!(
1666            r"
1667            SELECT created_at, first_scheduled_at, component_id_input_digest,
1668            state, execution_id, ffqn, corresponding_version, pending_expires_finished,
1669            last_lock_version, executor_id, run_id,
1670            join_set_id, join_set_closing,
1671            result_kind
1672            FROM t_state {where_str} ORDER BY created_at {desc} LIMIT {limit}
1673            ",
1674            desc = if statement_mod.limit_desc { "DESC" } else { "" },
1675            limit = statement_mod.limit,
1676        );
1677        let mut vec: Vec<_> = read_tx
1678            .prepare(&sql)?
1679            .query_map::<_, &[(&'static str, ToSqlOutput)], _>(
1680                statement_mod
1681                    .params
1682                    .into_iter()
1683                    .collect::<Vec<_>>()
1684                    .as_ref(),
1685                |row| {
1686                    let execution_id = row.get::<_, ExecutionId>("execution_id")?;
1687                    let component_id_input_digest: InputContentDigest =
1688                        row.get("component_id_input_digest")?;
1689                    let created_at = row.get("created_at")?;
1690                    let first_scheduled_at = row.get("first_scheduled_at")?;
1691                    let combined_state = CombinedState::new(
1692                        CombinedStateDTO {
1693                            component_id_input_digest: component_id_input_digest.clone(),
1694                            state: row.get("state")?,
1695                            ffqn: row.get("ffqn")?,
1696                            pending_expires_finished: row
1697                                .get::<_, DateTime<Utc>>("pending_expires_finished")?,
1698                            executor_id: row.get::<_, Option<ExecutorId>>("executor_id")?,
1699
1700                            last_lock_version: row
1701                                .get::<_, Option<VersionType>>("last_lock_version")?
1702                                .map(Version::new),
1703                            run_id: row.get::<_, Option<RunId>>("run_id")?,
1704                            join_set_id: row.get::<_, Option<JoinSetId>>("join_set_id")?,
1705                            join_set_closing: row.get::<_, Option<bool>>("join_set_closing")?,
1706                            result_kind: row
1707                                .get::<_, Option<JsonWrapper<PendingStateFinishedResultKind>>>(
1708                                    "result_kind",
1709                                )?
1710                                .map(|wrapper| wrapper.0),
1711                        },
1712                        Version::new(row.get("corresponding_version")?),
1713                    )?;
1714                    Ok(ExecutionWithState {
1715                        execution_id,
1716                        ffqn: combined_state.ffqn,
1717                        pending_state: combined_state.pending_state,
1718                        created_at,
1719                        first_scheduled_at,
1720                        component_digest: component_id_input_digest,
1721                    })
1722                },
1723            )?
1724            .collect::<Vec<Result<_, _>>>()
1725            .into_iter()
1726            .filter_map(|row| match row {
1727                Ok(row) => Some(row),
1728                Err(err) => {
1729                    warn!("Skipping row - {err:?}");
1730                    None
1731                }
1732            })
1733            .collect();
1734
1735        if !statement_mod.limit_desc {
1736            // the list must be sorted in descending order
1737            vec.reverse();
1738        }
1739        Ok(vec)
1740    }
1741
1742    fn list_responses(
1743        tx: &Transaction,
1744        execution_id: &ExecutionId,
1745        pagination: Option<Pagination<u32>>,
1746    ) -> Result<Vec<ResponseWithCursor>, DbErrorRead> {
1747        // TODO: Add test
1748        let mut params: Vec<(&'static str, Box<dyn rusqlite::ToSql>)> = vec![];
1749        let mut sql = "SELECT \
1750            r.id, r.created_at, r.join_set_id,  r.delay_id, r.delay_success, r.child_execution_id, r.finished_version, l.json_value \
1751            FROM t_join_set_response r LEFT OUTER JOIN t_execution_log l ON r.child_execution_id = l.execution_id \
1752            WHERE \
1753            r.execution_id = :execution_id \
1754            AND ( r.finished_version = l.version OR r.child_execution_id IS NULL ) \
1755            "
1756        .to_string();
1757        let limit = match &pagination {
1758            Some(
1759                pagination @ (Pagination::NewerThan { cursor, .. }
1760                | Pagination::OlderThan { cursor, .. }),
1761            ) => {
1762                params.push((":cursor", Box::new(cursor)));
1763                write!(sql, " AND r.id {rel} :cursor", rel = pagination.rel()).unwrap();
1764                Some(pagination.length())
1765            }
1766            None => None,
1767        };
1768        sql.push_str(" ORDER BY id");
1769        if pagination.as_ref().is_some_and(Pagination::is_desc) {
1770            sql.push_str(" DESC");
1771        }
1772        if let Some(limit) = limit {
1773            write!(sql, " LIMIT {limit}").unwrap();
1774        }
1775        params.push((":execution_id", Box::new(execution_id.to_string())));
1776        tx.prepare(&sql)?
1777            .query_map::<_, &[(&'static str, &dyn ToSql)], _>(
1778                params
1779                    .iter()
1780                    .map(|(key, value)| (*key, value.as_ref()))
1781                    .collect::<Vec<_>>()
1782                    .as_ref(),
1783                Self::parse_response_with_cursor,
1784            )?
1785            .collect::<Result<Vec<_>, rusqlite::Error>>()
1786            .map_err(DbErrorRead::from)
1787    }
1788
1789    fn parse_response_with_cursor(
1790        row: &rusqlite::Row<'_>,
1791    ) -> Result<ResponseWithCursor, rusqlite::Error> {
1792        let id = row.get("id")?;
1793        let created_at: DateTime<Utc> = row.get("created_at")?;
1794        let join_set_id = row.get::<_, JoinSetId>("join_set_id")?;
1795        let event = match (
1796            row.get::<_, Option<DelayId>>("delay_id")?,
1797            row.get::<_, Option<bool>>("delay_success")?,
1798            row.get::<_, Option<ExecutionIdDerived>>("child_execution_id")?,
1799            row.get::<_, Option<VersionType>>("finished_version")?,
1800            row.get::<_, Option<JsonWrapper<ExecutionRequest>>>("json_value")?,
1801        ) {
1802            (Some(delay_id), Some(delay_success), None, None, None) => {
1803                JoinSetResponse::DelayFinished {
1804                    delay_id,
1805                    result: delay_success.then_some(()).ok_or(()),
1806                }
1807            }
1808            (
1809                None,
1810                None,
1811                Some(child_execution_id),
1812                Some(finished_version),
1813                Some(JsonWrapper(ExecutionRequest::Finished { result, .. })),
1814            ) => JoinSetResponse::ChildExecutionFinished {
1815                child_execution_id,
1816                finished_version: Version(finished_version),
1817                result,
1818            },
1819            (delay, delay_success, child, finished, result) => {
1820                error!(
1821                    "Invalid row in t_join_set_response {id} - {delay:?} {delay_success:?} {child:?} {finished:?} {:?}",
1822                    result.map(|it| it.0)
1823                );
1824                return Err(consistency_rusqlite("invalid row in t_join_set_response"));
1825            }
1826        };
1827        Ok(ResponseWithCursor {
1828            cursor: id,
1829            event: JoinSetResponseEventOuter {
1830                event: JoinSetResponseEvent { join_set_id, event },
1831                created_at,
1832            },
1833        })
1834    }
1835
1836    #[instrument(level = Level::TRACE, skip_all, fields(%execution_id, %run_id, %executor_id))]
1837    #[expect(clippy::too_many_arguments)]
1838    fn lock_single_execution(
1839        tx: &Transaction,
1840        created_at: DateTime<Utc>,
1841        component_id: ComponentId,
1842        execution_id: &ExecutionId,
1843        run_id: RunId,
1844        appending_version: &Version,
1845        executor_id: ExecutorId,
1846        lock_expires_at: DateTime<Utc>,
1847        retry_config: ComponentRetryConfig,
1848    ) -> Result<LockedExecution, DbErrorWrite> {
1849        debug!("lock_single_execution");
1850        let combined_state = Self::get_combined_state(tx, execution_id)?;
1851        combined_state.pending_state.can_append_lock(
1852            created_at,
1853            executor_id,
1854            run_id,
1855            lock_expires_at,
1856        )?;
1857        let expected_version = combined_state.get_next_version_assert_not_finished();
1858        Self::check_expected_next_and_appending_version(&expected_version, appending_version)?;
1859
1860        // Append to `execution_log` table.
1861        let locked_event = Locked {
1862            component_id,
1863            executor_id,
1864            lock_expires_at,
1865            run_id,
1866            retry_config,
1867        };
1868        let event = ExecutionRequest::Locked(locked_event.clone());
1869        let event_ser = serde_json::to_string(&event).map_err(|err| {
1870            warn!("Cannot serialize {event:?} - {err:?}");
1871            DbErrorWriteNonRetriable::ValidationFailed("parameter serialization error".into())
1872        })?;
1873        let mut stmt = tx
1874            .prepare_cached(
1875                "INSERT INTO t_execution_log \
1876            (execution_id, created_at, json_value, version, variant) \
1877            VALUES \
1878            (:execution_id, :created_at, :json_value, :version, :variant)",
1879            )
1880            .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1881        stmt.execute(named_params! {
1882            ":execution_id": execution_id.to_string(),
1883            ":created_at": created_at,
1884            ":json_value": event_ser,
1885            ":version": appending_version.0,
1886            ":variant": event.variant(),
1887        })
1888        .map_err(|err| {
1889            warn!("Cannot lock execution - {err:?}");
1890            DbErrorWrite::NonRetriable(DbErrorWriteNonRetriable::IllegalState("cannot lock".into()))
1891        })?;
1892
1893        // Update `t_state`
1894        let responses = Self::list_responses(tx, execution_id, None)?;
1895        let responses = responses.into_iter().map(|resp| resp.event).collect();
1896        trace!("Responses: {responses:?}");
1897
1898        let intermittent_event_count = Self::update_state_locked_get_intermittent_event_count(
1899            tx,
1900            execution_id,
1901            executor_id,
1902            run_id,
1903            lock_expires_at,
1904            appending_version,
1905            retry_config,
1906        )?;
1907        // Fetch event_history and `Created` event to construct the response.
1908        let mut events = tx
1909            .prepare(
1910                "SELECT json_value, version FROM t_execution_log WHERE \
1911                execution_id = :execution_id AND (variant = :variant1 OR variant = :variant2) \
1912                ORDER BY version",
1913            )?
1914            .query_map(
1915                named_params! {
1916                    ":execution_id": execution_id.to_string(),
1917                    ":variant1": DUMMY_CREATED.variant(),
1918                    ":variant2": DUMMY_HISTORY_EVENT.variant(),
1919                },
1920                |row| {
1921                    let created_at_fake = DateTime::from_timestamp_nanos(0); // not used, only the inner event and version
1922                    let event = row
1923                        .get::<_, JsonWrapper<ExecutionRequest>>("json_value")
1924                        .map_err(|serde| {
1925                            error!("Cannot deserialize {row:?} - {serde:?}");
1926                            consistency_rusqlite("cannot deserialize event")
1927                        })?
1928                        .0;
1929                    let version = Version(row.get("version")?);
1930
1931                    Ok(ExecutionEvent {
1932                        created_at: created_at_fake,
1933                        event,
1934                        backtrace_id: None,
1935                        version,
1936                    })
1937                },
1938            )?
1939            .collect::<Result<Vec<_>, _>>()?
1940            .into_iter()
1941            .collect::<VecDeque<_>>();
1942        let Some(ExecutionRequest::Created {
1943            ffqn,
1944            params,
1945            parent,
1946            metadata,
1947            ..
1948        }) = events.pop_front().map(|outer| outer.event)
1949        else {
1950            error!("Execution log must contain at least `Created` event");
1951            return Err(consistency_db_err("execution log must contain `Created` event").into());
1952        };
1953
1954        let event_history = events
1955            .into_iter()
1956            .map(|ExecutionEvent { event, version, .. }| {
1957                if let ExecutionRequest::HistoryEvent { event } = event {
1958                    Ok((event, version))
1959                } else {
1960                    error!("Rows can only contain `Created` and `HistoryEvent` event kinds");
1961                    Err(consistency_db_err(
1962                        "rows can only contain `Created` and `HistoryEvent` event kinds",
1963                    ))
1964                }
1965            })
1966            .collect::<Result<Vec<_>, _>>()?;
1967
1968        Ok(LockedExecution {
1969            execution_id: execution_id.clone(),
1970            metadata,
1971            next_version: appending_version.increment(),
1972            ffqn,
1973            params,
1974            event_history,
1975            responses,
1976            parent,
1977            intermittent_event_count,
1978            locked_event,
1979        })
1980    }
1981
1982    fn count_join_next(
1983        tx: &Transaction,
1984        execution_id: &ExecutionId,
1985        join_set_id: &JoinSetId,
1986    ) -> Result<u32, DbErrorRead> {
1987        let mut stmt = tx.prepare(
1988            "SELECT COUNT(*) as count FROM t_execution_log WHERE execution_id = :execution_id AND join_set_id = :join_set_id \
1989            AND history_event_type = :join_next",
1990        )?;
1991        Ok(stmt.query_row(
1992            named_params! {
1993                ":execution_id": execution_id.to_string(),
1994                ":join_set_id": join_set_id.to_string(),
1995                ":join_next": HISTORY_EVENT_TYPE_JOIN_NEXT,
1996            },
1997            |row| row.get::<_, u32>("count"),
1998        )?)
1999    }
2000
2001    fn nth_response(
2002        tx: &Transaction,
2003        execution_id: &ExecutionId,
2004        join_set_id: &JoinSetId,
2005        skip_rows: u32,
2006    ) -> Result<Option<ResponseWithCursor>, DbErrorRead> {
2007        // TODO: Add test
2008        tx
2009            .prepare(
2010                "SELECT r.id, r.created_at, r.join_set_id, \
2011                    r.delay_id, r.delay_success, \
2012                    r.child_execution_id, r.finished_version, l.json_value \
2013                    FROM t_join_set_response r LEFT OUTER JOIN t_execution_log l ON r.child_execution_id = l.execution_id \
2014                    WHERE \
2015                    r.execution_id = :execution_id AND r.join_set_id = :join_set_id AND \
2016                    (
2017                    r.finished_version = l.version \
2018                    OR \
2019                    r.child_execution_id IS NULL \
2020                    ) \
2021                    ORDER BY id \
2022                    LIMIT 1 OFFSET :offset",
2023            )
2024            ?
2025            .query_row(
2026                named_params! {
2027                    ":execution_id": execution_id.to_string(),
2028                    ":join_set_id": join_set_id.to_string(),
2029                    ":offset": skip_rows,
2030                },
2031                Self::parse_response_with_cursor,
2032            )
2033            .optional()
2034            .map_err(DbErrorRead::from)
2035    }
2036
2037    #[instrument(level = Level::TRACE, skip_all, fields(%execution_id))]
2038    #[expect(clippy::needless_return)]
2039    fn append(
2040        tx: &Transaction,
2041        execution_id: &ExecutionId,
2042        req: AppendRequest,
2043        appending_version: Version,
2044    ) -> Result<(AppendResponse, AppendNotifier), DbErrorWrite> {
2045        if matches!(req.event, ExecutionRequest::Created { .. }) {
2046            return Err(DbErrorWrite::NonRetriable(
2047                DbErrorWriteNonRetriable::ValidationFailed(
2048                    "cannot append `Created` event - use `create` instead".into(),
2049                ),
2050            ));
2051        }
2052        if let AppendRequest {
2053            event:
2054                ExecutionRequest::Locked(Locked {
2055                    component_id,
2056                    executor_id,
2057                    run_id,
2058                    lock_expires_at,
2059                    retry_config,
2060                }),
2061            created_at,
2062        } = req
2063        {
2064            return Self::lock_single_execution(
2065                tx,
2066                created_at,
2067                component_id,
2068                execution_id,
2069                run_id,
2070                &appending_version,
2071                executor_id,
2072                lock_expires_at,
2073                retry_config,
2074            )
2075            .map(|locked_execution| (locked_execution.next_version, AppendNotifier::default()));
2076        }
2077
2078        let combined_state = Self::get_combined_state(tx, execution_id)?;
2079        if combined_state.pending_state.is_finished() {
2080            debug!("Execution is already finished");
2081            return Err(DbErrorWrite::NonRetriable(
2082                DbErrorWriteNonRetriable::IllegalState("already finished".into()),
2083            ));
2084        }
2085
2086        Self::check_expected_next_and_appending_version(
2087            &combined_state.get_next_version_assert_not_finished(),
2088            &appending_version,
2089        )?;
2090        let event_ser = serde_json::to_string(&req.event).map_err(|err| {
2091            error!("Cannot serialize {:?} - {err:?}", req.event);
2092            DbErrorWriteNonRetriable::ValidationFailed("parameter serialization error".into())
2093        })?;
2094
2095        let mut stmt = tx.prepare(
2096                    "INSERT INTO t_execution_log (execution_id, created_at, json_value, version, variant, join_set_id) \
2097                    VALUES (:execution_id, :created_at, :json_value, :version, :variant, :join_set_id)")
2098                    ?;
2099        stmt.execute(named_params! {
2100            ":execution_id": execution_id.to_string(),
2101            ":created_at": req.created_at,
2102            ":json_value": event_ser,
2103            ":version": appending_version.0,
2104            ":variant": req.event.variant(),
2105            ":join_set_id": req.event.join_set_id().map(std::string::ToString::to_string),
2106        })?;
2107        // Calculate current pending state
2108
2109        match &req.event {
2110            ExecutionRequest::Created { .. } => {
2111                unreachable!("handled in the caller")
2112            }
2113
2114            ExecutionRequest::Locked { .. } => {
2115                unreachable!("handled above")
2116            }
2117
2118            ExecutionRequest::TemporarilyFailed {
2119                backoff_expires_at, ..
2120            }
2121            | ExecutionRequest::TemporarilyTimedOut {
2122                backoff_expires_at, ..
2123            } => {
2124                let (next_version, notifier) = Self::update_state_pending_after_event_appended(
2125                    tx,
2126                    execution_id,
2127                    &appending_version,
2128                    *backoff_expires_at,
2129                    true, // an intermittent failure
2130                    combined_state
2131                        .pending_state
2132                        .get_component_id_input_digest()
2133                        .clone(),
2134                )?;
2135                return Ok((next_version, notifier));
2136            }
2137
2138            ExecutionRequest::Unlocked {
2139                backoff_expires_at, ..
2140            } => {
2141                let (next_version, notifier) = Self::update_state_pending_after_event_appended(
2142                    tx,
2143                    execution_id,
2144                    &appending_version,
2145                    *backoff_expires_at,
2146                    false, // not an intermittent failure
2147                    combined_state
2148                        .pending_state
2149                        .get_component_id_input_digest()
2150                        .clone(),
2151                )?;
2152                return Ok((next_version, notifier));
2153            }
2154
2155            ExecutionRequest::Finished { result, .. } => {
2156                Self::update_state_finished(
2157                    tx,
2158                    execution_id,
2159                    &appending_version,
2160                    req.created_at,
2161                    PendingStateFinishedResultKind::from(result),
2162                )?;
2163                return Ok((
2164                    appending_version,
2165                    AppendNotifier {
2166                        pending_at: None,
2167                        execution_finished: Some(NotifierExecutionFinished {
2168                            execution_id: execution_id.clone(),
2169                            retval: result.clone(),
2170                        }),
2171                        response: None,
2172                    },
2173                ));
2174            }
2175
2176            ExecutionRequest::HistoryEvent {
2177                event:
2178                    HistoryEvent::JoinSetCreate { .. }
2179                    | HistoryEvent::JoinSetRequest {
2180                        request: JoinSetRequest::ChildExecutionRequest { .. },
2181                        ..
2182                    }
2183                    | HistoryEvent::Persist { .. }
2184                    | HistoryEvent::Schedule { .. }
2185                    | HistoryEvent::Stub { .. }
2186                    | HistoryEvent::JoinNextTooMany { .. },
2187            } => {
2188                return Ok((
2189                    Self::bump_state_next_version(tx, execution_id, &appending_version, None)?,
2190                    AppendNotifier::default(),
2191                ));
2192            }
2193
2194            ExecutionRequest::HistoryEvent {
2195                event:
2196                    HistoryEvent::JoinSetRequest {
2197                        join_set_id,
2198                        request:
2199                            JoinSetRequest::DelayRequest {
2200                                delay_id,
2201                                expires_at,
2202                                ..
2203                            },
2204                    },
2205            } => {
2206                return Ok((
2207                    Self::bump_state_next_version(
2208                        tx,
2209                        execution_id,
2210                        &appending_version,
2211                        Some(DelayReq {
2212                            join_set_id: join_set_id.clone(),
2213                            delay_id: delay_id.clone(),
2214                            expires_at: *expires_at,
2215                        }),
2216                    )?,
2217                    AppendNotifier::default(),
2218                ));
2219            }
2220
2221            ExecutionRequest::HistoryEvent {
2222                event:
2223                    HistoryEvent::JoinNext {
2224                        join_set_id,
2225                        run_expires_at,
2226                        closing,
2227                        requested_ffqn: _,
2228                    },
2229            } => {
2230                // Did the response arrive already?
2231                let join_next_count = Self::count_join_next(tx, execution_id, join_set_id)?;
2232                let nth_response =
2233                    Self::nth_response(tx, execution_id, join_set_id, join_next_count - 1)?; // Skip n-1 rows
2234                trace!("join_next_count: {join_next_count}, nth_response: {nth_response:?}");
2235                assert!(join_next_count > 0);
2236                if let Some(ResponseWithCursor {
2237                    event:
2238                        JoinSetResponseEventOuter {
2239                            created_at: nth_created_at,
2240                            ..
2241                        },
2242                    cursor: _,
2243                }) = nth_response
2244                {
2245                    let scheduled_at = max(*run_expires_at, nth_created_at); // No need to block
2246                    let (next_version, notifier) = Self::update_state_pending_after_event_appended(
2247                        tx,
2248                        execution_id,
2249                        &appending_version,
2250                        scheduled_at,
2251                        false, // not an intermittent failure
2252                        combined_state
2253                            .pending_state
2254                            .get_component_id_input_digest()
2255                            .clone(),
2256                    )?;
2257                    return Ok((next_version, notifier));
2258                }
2259                return Ok((
2260                    Self::update_state_blocked(
2261                        tx,
2262                        execution_id,
2263                        &appending_version,
2264                        join_set_id,
2265                        *run_expires_at,
2266                        *closing,
2267                    )?,
2268                    AppendNotifier::default(),
2269                ));
2270            }
2271        }
2272    }
2273
2274    fn append_response(
2275        tx: &Transaction,
2276        execution_id: &ExecutionId,
2277        response_outer: JoinSetResponseEventOuter,
2278    ) -> Result<AppendNotifier, DbErrorWrite> {
2279        let mut stmt = tx.prepare(
2280            "INSERT INTO t_join_set_response (execution_id, created_at, join_set_id, delay_id, delay_success, child_execution_id, finished_version) \
2281                    VALUES (:execution_id, :created_at, :join_set_id, :delay_id, :delay_success, :child_execution_id, :finished_version)",
2282        )?;
2283        let join_set_id = &response_outer.event.join_set_id;
2284        let (delay_id, delay_success) = match &response_outer.event.event {
2285            JoinSetResponse::DelayFinished { delay_id, result } => {
2286                (Some(delay_id.to_string()), Some(result.is_ok()))
2287            }
2288            JoinSetResponse::ChildExecutionFinished { .. } => (None, None),
2289        };
2290        let (child_execution_id, finished_version) = match &response_outer.event.event {
2291            JoinSetResponse::ChildExecutionFinished {
2292                child_execution_id,
2293                finished_version,
2294                result: _,
2295            } => (
2296                Some(child_execution_id.to_string()),
2297                Some(finished_version.0),
2298            ),
2299            JoinSetResponse::DelayFinished { .. } => (None, None),
2300        };
2301
2302        stmt.execute(named_params! {
2303            ":execution_id": execution_id.to_string(),
2304            ":created_at": response_outer.created_at,
2305            ":join_set_id": join_set_id.to_string(),
2306            ":delay_id": delay_id,
2307            ":delay_success": delay_success,
2308            ":child_execution_id": child_execution_id,
2309            ":finished_version": finished_version,
2310        })?;
2311
2312        // if the execution is going to be unblocked by this response...
2313        let combined_state = Self::get_combined_state(tx, execution_id)?;
2314        debug!("previous_pending_state: {combined_state:?}");
2315        let mut notifier = if let PendingState::BlockedByJoinSet {
2316            join_set_id: found_join_set_id,
2317            lock_expires_at, // Set to a future time if the worker is keeping the execution warm waiting for the result.
2318            closing: _,
2319            component_id_input_digest,
2320        } = combined_state.pending_state
2321            && *join_set_id == found_join_set_id
2322        {
2323            // PendingAt should be set to current time if called from expired_timers_watcher,
2324            // or to a future time if the execution is hot.
2325            let scheduled_at = max(lock_expires_at, response_outer.created_at);
2326            // TODO: Add diff test
2327            // Unblock the state.
2328            Self::update_state_pending_after_response_appended(
2329                tx,
2330                execution_id,
2331                scheduled_at,
2332                &combined_state.corresponding_version, // not changing the version
2333                component_id_input_digest,
2334            )?
2335        } else {
2336            AppendNotifier::default()
2337        };
2338        if let JoinSetResponseEvent {
2339            join_set_id,
2340            event:
2341                JoinSetResponse::DelayFinished {
2342                    delay_id,
2343                    result: _,
2344                },
2345        } = &response_outer.event
2346        {
2347            debug!(%join_set_id, %delay_id, "Deleting from `t_delay`");
2348            let mut stmt =
2349                tx.prepare_cached("DELETE FROM t_delay WHERE execution_id = :execution_id AND join_set_id = :join_set_id AND delay_id = :delay_id")
2350                ?;
2351            stmt.execute(named_params! {
2352                ":execution_id": execution_id.to_string(),
2353                ":join_set_id": join_set_id.to_string(),
2354                ":delay_id": delay_id.to_string(),
2355            })?;
2356        }
2357        notifier.response = Some((execution_id.clone(), response_outer));
2358        Ok(notifier)
2359    }
2360
2361    fn append_backtrace(
2362        tx: &Transaction,
2363        backtrace_info: &BacktraceInfo,
2364    ) -> Result<(), DbErrorWrite> {
2365        let backtrace = serde_json::to_string(&backtrace_info.wasm_backtrace).map_err(|err| {
2366            warn!(
2367                "Cannot serialize backtrace {:?} - {err:?}",
2368                backtrace_info.wasm_backtrace
2369            );
2370            DbErrorWriteNonRetriable::ValidationFailed("cannot serialize backtrace".into())
2371        })?;
2372        let mut stmt = tx
2373            .prepare(
2374                "INSERT INTO t_backtrace (execution_id, component_id, version_min_including, version_max_excluding, wasm_backtrace) \
2375                    VALUES (:execution_id, :component_id, :version_min_including, :version_max_excluding, :wasm_backtrace)",
2376            )
2377            ?;
2378        stmt.execute(named_params! {
2379            ":execution_id": backtrace_info.execution_id.to_string(),
2380            ":component_id": backtrace_info.component_id.to_string(),
2381            ":version_min_including": backtrace_info.version_min_including.0,
2382            ":version_max_excluding": backtrace_info.version_max_excluding.0,
2383            ":wasm_backtrace": backtrace,
2384        })?;
2385        Ok(())
2386    }
2387
2388    #[cfg(feature = "test")]
2389    fn get(
2390        tx: &Transaction,
2391        execution_id: &ExecutionId,
2392    ) -> Result<concepts::storage::ExecutionLog, DbErrorRead> {
2393        let mut stmt = tx.prepare(
2394            "SELECT created_at, json_value, version FROM t_execution_log WHERE \
2395                        execution_id = :execution_id ORDER BY version",
2396        )?;
2397        let events = stmt
2398            .query_map(
2399                named_params! {
2400                    ":execution_id": execution_id.to_string(),
2401                },
2402                |row| {
2403                    let created_at = row.get("created_at")?;
2404                    let event = row
2405                        .get::<_, JsonWrapper<ExecutionRequest>>("json_value")
2406                        .map_err(|serde| {
2407                            error!("Cannot deserialize {row:?} - {serde:?}");
2408                            consistency_rusqlite("cannot deserialize event")
2409                        })?
2410                        .0;
2411                    let version = Version(row.get("version")?);
2412
2413                    Ok(ExecutionEvent {
2414                        created_at,
2415                        event,
2416                        backtrace_id: None,
2417                        version,
2418                    })
2419                },
2420            )?
2421            .collect::<Result<Vec<_>, _>>()?;
2422        if events.is_empty() {
2423            return Err(DbErrorRead::NotFound);
2424        }
2425        let combined_state = Self::get_combined_state(tx, execution_id)?;
2426        let responses = Self::list_responses(tx, execution_id, None)?
2427            .into_iter()
2428            .map(|resp| resp.event)
2429            .collect();
2430        Ok(concepts::storage::ExecutionLog {
2431            execution_id: execution_id.clone(),
2432            events,
2433            responses,
2434            next_version: combined_state.get_next_version_or_finished(), // In case of finished, this will be the already last version
2435            pending_state: combined_state.pending_state,
2436        })
2437    }
2438
2439    fn list_execution_events(
2440        tx: &Transaction,
2441        execution_id: &ExecutionId,
2442        version_min: VersionType,
2443        version_max_excluding: VersionType,
2444        include_backtrace_id: bool,
2445    ) -> Result<Vec<ExecutionEvent>, DbErrorRead> {
2446        let select = if include_backtrace_id {
2447            "SELECT
2448                log.created_at,
2449                log.json_value,
2450                log.version as version,
2451                -- Select version_min_including from backtrace if a match is found, otherwise NULL
2452                bt.version_min_including AS backtrace_id
2453            FROM
2454                t_execution_log AS log
2455            LEFT OUTER JOIN -- Use LEFT JOIN to keep all logs even if no backtrace matches
2456                t_backtrace AS bt ON log.execution_id = bt.execution_id
2457                                -- Check if the log's version falls within the backtrace's range
2458                                AND log.version >= bt.version_min_including
2459                                AND log.version < bt.version_max_excluding
2460            WHERE
2461                log.execution_id = :execution_id
2462                AND log.version >= :version_min
2463                AND log.version < :version_max_excluding
2464            ORDER BY
2465                log.version;"
2466        } else {
2467            "SELECT
2468                created_at, json_value, NULL as backtrace_id, version
2469            FROM t_execution_log WHERE
2470                execution_id = :execution_id AND version >= :version_min AND version < :version_max_excluding
2471            ORDER BY version"
2472        };
2473        tx.prepare(select)?
2474            .query_map(
2475                named_params! {
2476                    ":execution_id": execution_id.to_string(),
2477                    ":version_min": version_min,
2478                    ":version_max_excluding": version_max_excluding
2479                },
2480                |row| {
2481                    let created_at = row.get("created_at")?;
2482                    let backtrace_id = row
2483                        .get::<_, Option<VersionType>>("backtrace_id")?
2484                        .map(Version::new);
2485                    let version = Version(row.get("version")?);
2486
2487                    let event = row
2488                        .get::<_, JsonWrapper<ExecutionRequest>>("json_value")
2489                        .map(|event| ExecutionEvent {
2490                            created_at,
2491                            event: event.0,
2492                            backtrace_id,
2493                            version,
2494                        })
2495                        .map_err(|serde| {
2496                            error!("Cannot deserialize {row:?} - {serde:?}");
2497                            consistency_rusqlite("cannot deserialize")
2498                        })?;
2499                    Ok(event)
2500                },
2501            )?
2502            .collect::<Result<Vec<_>, _>>()
2503            .map_err(DbErrorRead::from)
2504    }
2505
2506    fn get_execution_event(
2507        tx: &Transaction,
2508        execution_id: &ExecutionId,
2509        version: VersionType,
2510    ) -> Result<ExecutionEvent, DbErrorRead> {
2511        let mut stmt = tx.prepare(
2512            "SELECT created_at, json_value, version FROM t_execution_log WHERE \
2513                        execution_id = :execution_id AND version = :version",
2514        )?;
2515        stmt.query_row(
2516            named_params! {
2517                ":execution_id": execution_id.to_string(),
2518                ":version": version,
2519            },
2520            |row| {
2521                let created_at = row.get("created_at")?;
2522                let event = row
2523                    .get::<_, JsonWrapper<ExecutionRequest>>("json_value")
2524                    .map_err(|serde| {
2525                        error!("Cannot deserialize {row:?} - {serde:?}");
2526                        consistency_rusqlite("cannot deserialize event")
2527                    })?;
2528                let version = Version(row.get("version")?);
2529
2530                Ok(ExecutionEvent {
2531                    created_at,
2532                    event: event.0,
2533                    backtrace_id: None,
2534                    version,
2535                })
2536            },
2537        )
2538        .map_err(DbErrorRead::from)
2539    }
2540
2541    fn get_last_execution_event(
2542        tx: &Transaction,
2543        execution_id: &ExecutionId,
2544    ) -> Result<ExecutionEvent, DbErrorRead> {
2545        let mut stmt = tx.prepare(
2546            "SELECT created_at, json_value, version FROM t_execution_log WHERE \
2547                        execution_id = :execution_id ORDER BY version DESC",
2548        )?;
2549        stmt.query_row(
2550            named_params! {
2551                ":execution_id": execution_id.to_string(),
2552            },
2553            |row| {
2554                let created_at = row.get("created_at")?;
2555                let event = row
2556                    .get::<_, JsonWrapper<ExecutionRequest>>("json_value")
2557                    .map_err(|serde| {
2558                        error!("Cannot deserialize {row:?} - {serde:?}");
2559                        consistency_rusqlite("cannot deserialize event")
2560                    })?;
2561                let version = Version(row.get("version")?);
2562                Ok(ExecutionEvent {
2563                    created_at,
2564                    event: event.0,
2565                    backtrace_id: None,
2566                    version: version.clone(),
2567                })
2568            },
2569        )
2570        .map_err(DbErrorRead::from)
2571    }
2572
2573    fn delay_response(
2574        tx: &Transaction,
2575        execution_id: &ExecutionId,
2576        delay_id: &DelayId,
2577    ) -> Result<Option<bool>, DbErrorRead> {
2578        // TODO: Add test
2579        tx.prepare(
2580            "SELECT delay_success \
2581                    FROM t_join_set_response \
2582                    WHERE \
2583                    execution_id = :execution_id AND delay_id = :delay_id
2584                    ",
2585        )?
2586        .query_row(
2587            named_params! {
2588                ":execution_id": execution_id.to_string(),
2589                ":delay_id": delay_id.to_string(),
2590            },
2591            |row| {
2592                let delay_success = row.get::<_, bool>("delay_success")?;
2593                Ok(delay_success)
2594            },
2595        )
2596        .optional()
2597        .map_err(DbErrorRead::from)
2598    }
2599
2600    // TODO(perf): Instead of OFFSET an per-execution sequential ID could improve the read performance.
2601    #[instrument(level = Level::TRACE, skip_all)]
2602    fn get_responses_with_offset(
2603        tx: &Transaction,
2604        execution_id: &ExecutionId,
2605        skip_rows: u32,
2606    ) -> Result<Vec<JoinSetResponseEventOuter>, DbErrorRead> {
2607        // TODO: Add test
2608        tx.prepare(
2609            "SELECT r.id, r.created_at, r.join_set_id, \
2610            r.delay_id, r.delay_success, \
2611            r.child_execution_id, r.finished_version, l.json_value \
2612            FROM t_join_set_response r LEFT OUTER JOIN t_execution_log l ON r.child_execution_id = l.execution_id \
2613            WHERE \
2614            r.execution_id = :execution_id AND \
2615            ( \
2616            r.finished_version = l.version \
2617            OR r.child_execution_id IS NULL \
2618            ) \
2619            ORDER BY id \
2620            LIMIT -1 OFFSET :offset",
2621        )
2622        ?
2623        .query_map(
2624            named_params! {
2625                ":execution_id": execution_id.to_string(),
2626                ":offset": skip_rows,
2627            },
2628            Self::parse_response_with_cursor,
2629        )
2630        ?
2631        .collect::<Result<Vec<_>, _>>()
2632        .map(|resp| resp.into_iter().map(|vec| vec.event).collect())
2633        .map_err(DbErrorRead::from)
2634    }
2635
2636    fn get_pending_of_single_ffqn(
2637        mut stmt: CachedStatement,
2638        batch_size: u32,
2639        pending_at_or_sooner: DateTime<Utc>,
2640        ffqn: &FunctionFqn,
2641    ) -> Result<Vec<(ExecutionId, Version)>, ()> {
2642        stmt.query_map(
2643            named_params! {
2644                ":pending_expires_finished": pending_at_or_sooner,
2645                ":ffqn": ffqn.to_string(),
2646                ":batch_size": batch_size,
2647            },
2648            |row| {
2649                let execution_id = row.get::<_, ExecutionId>("execution_id")?;
2650                let next_version =
2651                    Version::new(row.get::<_, VersionType>("corresponding_version")?).increment();
2652                Ok((execution_id, next_version))
2653            },
2654        )
2655        .map_err(|err| {
2656            warn!("Ignoring consistency error {err:?}");
2657        })?
2658        .collect::<Result<Vec<_>, _>>()
2659        .map_err(|err| {
2660            warn!("Ignoring consistency error {err:?}");
2661        })
2662    }
2663
2664    /// Get executions and their next versions
2665    fn get_pending_by_ffqns(
2666        conn: &Connection,
2667        batch_size: u32,
2668        pending_at_or_sooner: DateTime<Utc>,
2669        ffqns: &[FunctionFqn],
2670    ) -> Result<Vec<(ExecutionId, Version)>, DbErrorGeneric> {
2671        let batch_size = usize::try_from(batch_size).expect("16 bit systems are unsupported");
2672        let mut execution_ids_versions = Vec::with_capacity(batch_size);
2673        for ffqn in ffqns {
2674            // Select executions in PendingAt.
2675            let stmt = conn
2676                .prepare_cached(&format!(
2677                    r#"
2678                    SELECT execution_id, corresponding_version FROM t_state WHERE
2679                    state = "{STATE_PENDING_AT}" AND
2680                    pending_expires_finished <= :pending_expires_finished AND ffqn = :ffqn
2681                    ORDER BY pending_expires_finished LIMIT :batch_size
2682                    "#
2683                ))
2684                .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
2685
2686            if let Ok(execs_and_versions) = Self::get_pending_of_single_ffqn(
2687                stmt,
2688                u32::try_from(batch_size - execution_ids_versions.len())
2689                    .expect("u32 - anything must fit to u32"),
2690                pending_at_or_sooner,
2691                ffqn,
2692            ) {
2693                execution_ids_versions.extend(execs_and_versions);
2694                if execution_ids_versions.len() == batch_size {
2695                    // Prioritieze lowering of db requests, although ffqns later in the list might get starved.
2696                    break;
2697                }
2698                // consistency errors are ignored since we want to return at least some rows.
2699            }
2700        }
2701        Ok(execution_ids_versions)
2702    }
2703
2704    fn get_pending_by_component_input_digest(
2705        conn: &Connection,
2706        batch_size: u32,
2707        pending_at_or_sooner: DateTime<Utc>,
2708        input_digest: &InputContentDigest,
2709    ) -> Result<Vec<(ExecutionId, Version)>, DbErrorGeneric> {
2710        let mut stmt = conn
2711            .prepare_cached(&format!(
2712                r#"
2713                SELECT execution_id, corresponding_version FROM t_state WHERE
2714                state = "{STATE_PENDING_AT}" AND
2715                pending_expires_finished <= :pending_expires_finished AND
2716                component_id_input_digest = :component_id_input_digest
2717                ORDER BY pending_expires_finished LIMIT :batch_size
2718                "#
2719            ))
2720            .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
2721
2722        stmt.query_map(
2723            named_params! {
2724                ":pending_expires_finished": pending_at_or_sooner,
2725                ":component_id_input_digest": input_digest,
2726                ":batch_size": batch_size,
2727            },
2728            |row| {
2729                let execution_id = row.get::<_, ExecutionId>("execution_id")?;
2730                let next_version =
2731                    Version::new(row.get::<_, VersionType>("corresponding_version")?).increment();
2732                Ok((execution_id, next_version))
2733            },
2734        )?
2735        .collect::<Result<Vec<_>, _>>()
2736        .map_err(DbErrorGeneric::from)
2737    }
2738
2739    // Must be called after write transaction for a correct happens-before relationship.
2740    #[instrument(level = Level::TRACE, skip_all)]
2741    fn notify_all(&self, notifiers: Vec<AppendNotifier>, current_time: DateTime<Utc>) {
2742        let (pending_ats, finished_execs, responses) = {
2743            let (mut pending_ats, mut finished_execs, mut responses) =
2744                (Vec::new(), Vec::new(), Vec::new());
2745            for notifier in notifiers {
2746                if let Some(pending_at) = notifier.pending_at {
2747                    pending_ats.push(pending_at);
2748                }
2749                if let Some(finished) = notifier.execution_finished {
2750                    finished_execs.push(finished);
2751                }
2752                if let Some(response) = notifier.response {
2753                    responses.push(response);
2754                }
2755            }
2756            (pending_ats, finished_execs, responses)
2757        };
2758
2759        // Notify pending_at subscribers.
2760        if !pending_ats.is_empty() {
2761            let guard = self.0.pending_subscribers.lock().unwrap();
2762            for pending_at in pending_ats {
2763                Self::notify_pending_locked(&pending_at, current_time, &guard);
2764            }
2765        }
2766        // Notify execution finished subscribers.
2767        // Every NotifierExecutionFinished value belongs to a different execution, since only `append(Finished)` can produce `NotifierExecutionFinished`.
2768        if !finished_execs.is_empty() {
2769            let mut guard = self.0.execution_finished_subscribers.lock().unwrap();
2770            for finished in finished_execs {
2771                if let Some(listeners_of_exe_id) = guard.remove(&finished.execution_id) {
2772                    for (_tag, sender) in listeners_of_exe_id {
2773                        // Sending while holding the lock but the oneshot sender does not block.
2774                        // If `wait_for_finished_result` happens after the append, it would receive the finished value instead.
2775                        let _ = sender.send(finished.retval.clone());
2776                    }
2777                }
2778            }
2779        }
2780        // Notify response subscribers.
2781        if !responses.is_empty() {
2782            let mut guard = self.0.response_subscribers.lock().unwrap();
2783            for (execution_id, response) in responses {
2784                if let Some((sender, _)) = guard.remove(&execution_id) {
2785                    let _ = sender.send(response);
2786                }
2787            }
2788        }
2789    }
2790
2791    fn notify_pending_locked(
2792        notifier: &NotifierPendingAt,
2793        current_time: DateTime<Utc>,
2794        ffqn_to_pending_subscription: &std::sync::MutexGuard<PendingFfqnSubscribersHolder>,
2795    ) {
2796        // No need to remove here, cleanup is handled by the caller.
2797        if notifier.scheduled_at <= current_time {
2798            ffqn_to_pending_subscription.notify(notifier);
2799        }
2800    }
2801
2802    fn upgrade_execution_component(
2803        tx: &Transaction,
2804        execution_id: &ExecutionId,
2805        old: &InputContentDigest,
2806        new: &InputContentDigest,
2807    ) -> Result<(), DbErrorWrite> {
2808        debug!("Updating t_state to component {new}");
2809        let mut stmt = tx
2810            .prepare_cached(
2811                r"
2812                UPDATE t_state
2813                SET
2814                    updated_at = CURRENT_TIMESTAMP,
2815                    component_id_input_digest = :new
2816                WHERE
2817                    execution_id = :execution_id AND
2818                    component_id_input_digest = :old
2819            ",
2820            )
2821            .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
2822        let updated = stmt
2823            .execute(named_params! {
2824                ":execution_id": execution_id,
2825                ":old": old,
2826                ":new": new,
2827            })
2828            .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
2829        if updated != 1 {
2830            return Err(DbErrorWrite::NotFound);
2831        }
2832        Ok(())
2833    }
2834}
2835
2836#[async_trait]
2837impl DbExecutor for SqlitePool {
2838    #[instrument(level = Level::TRACE, skip(self))]
2839    async fn lock_pending_by_ffqns(
2840        &self,
2841        batch_size: u32,
2842        pending_at_or_sooner: DateTime<Utc>,
2843        ffqns: Arc<[FunctionFqn]>,
2844        created_at: DateTime<Utc>,
2845        component_id: ComponentId,
2846        executor_id: ExecutorId,
2847        lock_expires_at: DateTime<Utc>,
2848        run_id: RunId,
2849        retry_config: ComponentRetryConfig,
2850    ) -> Result<LockPendingResponse, DbErrorGeneric> {
2851        let execution_ids_versions = self
2852            .transaction(
2853                move |conn| {
2854                    Self::get_pending_by_ffqns(conn, batch_size, pending_at_or_sooner, &ffqns)
2855                },
2856                "lock_pending_by_ffqns_get",
2857            )
2858            .await?;
2859        if execution_ids_versions.is_empty() {
2860            Ok(vec![])
2861        } else {
2862            debug!("Locking {execution_ids_versions:?}");
2863            self.transaction(
2864                move |tx| {
2865                    let mut locked_execs = Vec::with_capacity(execution_ids_versions.len());
2866                    // Append lock
2867                    for (execution_id, version) in &execution_ids_versions {
2868                        match Self::lock_single_execution(
2869                            tx,
2870                            created_at,
2871                            component_id.clone(),
2872                            execution_id,
2873                            run_id,
2874                            version,
2875                            executor_id,
2876                            lock_expires_at,
2877                            retry_config,
2878                        ) {
2879                            Ok(locked) => locked_execs.push(locked),
2880                            Err(err) => {
2881                                warn!("Locking row {execution_id} failed - {err:?}");
2882                            }
2883                        }
2884                    }
2885                    Ok(locked_execs)
2886                },
2887                "lock_pending_by_ffqns_one",
2888            )
2889            .await
2890        }
2891    }
2892
2893    #[instrument(level = Level::TRACE, skip(self))]
2894    async fn lock_pending_by_component_id(
2895        &self,
2896        batch_size: u32,
2897        pending_at_or_sooner: DateTime<Utc>,
2898        component_id: &ComponentId,
2899        created_at: DateTime<Utc>,
2900        executor_id: ExecutorId,
2901        lock_expires_at: DateTime<Utc>,
2902        run_id: RunId,
2903        retry_config: ComponentRetryConfig,
2904    ) -> Result<LockPendingResponse, DbErrorGeneric> {
2905        let component_id = component_id.clone();
2906        let execution_ids_versions = self
2907            .transaction(
2908                {
2909                    let component_id = component_id.clone();
2910                    move |conn| {
2911                        Self::get_pending_by_component_input_digest(
2912                            conn,
2913                            batch_size,
2914                            pending_at_or_sooner,
2915                            &component_id.input_digest,
2916                        )
2917                    }
2918                },
2919                "lock_pending_by_component_id_get",
2920            )
2921            .await?;
2922        if execution_ids_versions.is_empty() {
2923            Ok(vec![])
2924        } else {
2925            debug!("Locking {execution_ids_versions:?}");
2926            self.transaction(
2927                move |tx| {
2928                    let mut locked_execs = Vec::with_capacity(execution_ids_versions.len());
2929                    // Append lock
2930                    for (execution_id, version) in &execution_ids_versions {
2931                        match Self::lock_single_execution(
2932                            tx,
2933                            created_at,
2934                            component_id.clone(),
2935                            execution_id,
2936                            run_id,
2937                            version,
2938                            executor_id,
2939                            lock_expires_at,
2940                            retry_config,
2941                        ) {
2942                            Ok(locked) => locked_execs.push(locked),
2943                            Err(err) => {
2944                                warn!("Locking row {execution_id} failed - {err:?}");
2945                            }
2946                        }
2947                    }
2948                    Ok(locked_execs)
2949                },
2950                "lock_pending_by_component_id_one",
2951            )
2952            .await
2953        }
2954    }
2955
2956    #[instrument(level = Level::DEBUG, skip(self))]
2957    async fn lock_one(
2958        &self,
2959        created_at: DateTime<Utc>,
2960        component_id: ComponentId,
2961        execution_id: &ExecutionId,
2962        run_id: RunId,
2963        version: Version,
2964        executor_id: ExecutorId,
2965        lock_expires_at: DateTime<Utc>,
2966        retry_config: ComponentRetryConfig,
2967    ) -> Result<LockedExecution, DbErrorWrite> {
2968        debug!(%execution_id, "lock_one");
2969        let execution_id = execution_id.clone();
2970        self.transaction(
2971            move |tx| {
2972                Self::lock_single_execution(
2973                    tx,
2974                    created_at,
2975                    component_id.clone(),
2976                    &execution_id,
2977                    run_id,
2978                    &version,
2979                    executor_id,
2980                    lock_expires_at,
2981                    retry_config,
2982                )
2983            },
2984            "lock_inner",
2985        )
2986        .await
2987    }
2988
2989    #[instrument(level = Level::DEBUG, skip(self, req))]
2990    async fn append(
2991        &self,
2992        execution_id: ExecutionId,
2993        version: Version,
2994        req: AppendRequest,
2995    ) -> Result<AppendResponse, DbErrorWrite> {
2996        debug!(%req, "append");
2997        trace!(?req, "append");
2998        let created_at = req.created_at;
2999        let (version, notifier) = self
3000            .transaction(
3001                move |tx| Self::append(tx, &execution_id, req.clone(), version.clone()),
3002                "append",
3003            )
3004            .await?;
3005        self.notify_all(vec![notifier], created_at);
3006        Ok(version)
3007    }
3008
3009    #[instrument(level = Level::DEBUG, skip_all)]
3010    async fn append_batch_respond_to_parent(
3011        &self,
3012        events: AppendEventsToExecution,
3013        response: AppendResponseToExecution,
3014        current_time: DateTime<Utc>,
3015    ) -> Result<AppendBatchResponse, DbErrorWrite> {
3016        debug!("append_batch_respond_to_parent");
3017        if events.execution_id == response.parent_execution_id {
3018            // Pending state would be wrong.
3019            // This is not a panic because it depends on DB state.
3020            return Err(DbErrorWrite::NonRetriable(
3021                DbErrorWriteNonRetriable::ValidationFailed(
3022                    "Parameters `execution_id` and `parent_execution_id` cannot be the same".into(),
3023                ),
3024            ));
3025        }
3026        if events.batch.is_empty() {
3027            error!("Batch cannot be empty");
3028            return Err(DbErrorWrite::NonRetriable(
3029                DbErrorWriteNonRetriable::ValidationFailed("batch cannot be empty".into()),
3030            ));
3031        }
3032        let (version, notifiers) = {
3033            self.transaction(
3034                move |tx| {
3035                    let mut version = events.version.clone();
3036                    let mut notifier_of_child = None;
3037                    for append_request in &events.batch {
3038                        let (v, n) = Self::append(
3039                            tx,
3040                            &events.execution_id,
3041                            append_request.clone(),
3042                            version,
3043                        )?;
3044                        version = v;
3045                        notifier_of_child = Some(n);
3046                    }
3047
3048                    let pending_at_parent = Self::append_response(
3049                        tx,
3050                        &response.parent_execution_id,
3051                        JoinSetResponseEventOuter {
3052                            created_at: response.created_at,
3053                            event: JoinSetResponseEvent {
3054                                join_set_id: response.join_set_id.clone(),
3055                                event: JoinSetResponse::ChildExecutionFinished {
3056                                    child_execution_id: response.child_execution_id.clone(),
3057                                    finished_version: response.finished_version.clone(),
3058                                    result: response.result.clone(),
3059                                },
3060                            },
3061                        },
3062                    )?;
3063                    Ok::<_, DbErrorWrite>((
3064                        version,
3065                        vec![
3066                            notifier_of_child.expect("checked that the batch is not empty"),
3067                            pending_at_parent,
3068                        ],
3069                    ))
3070                },
3071                "append_batch_respond_to_parent",
3072            )
3073            .await?
3074        };
3075        self.notify_all(notifiers, current_time);
3076        Ok(version)
3077    }
3078
3079    // Supports only one subscriber (executor) per ffqn.
3080    // A new subscriber replaces the old one, which will eventually time out, which is fine.
3081    #[instrument(level = Level::TRACE, skip(self, timeout_fut))]
3082    async fn wait_for_pending_by_ffqn(
3083        &self,
3084        pending_at_or_sooner: DateTime<Utc>,
3085        ffqns: Arc<[FunctionFqn]>,
3086        timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
3087    ) {
3088        let unique_tag: u64 = rand::random();
3089        let (sender, mut receiver) = mpsc::channel(1); // senders must use `try_send`
3090        {
3091            let mut pending_subscribers = self.0.pending_subscribers.lock().unwrap();
3092            for ffqn in ffqns.as_ref() {
3093                pending_subscribers.insert_ffqn(ffqn.clone(), (sender.clone(), unique_tag));
3094            }
3095        }
3096        async {
3097            let Ok(execution_ids_versions) = self
3098                .transaction(
3099                    {
3100                        let ffqns = ffqns.clone();
3101                        move |conn| Self::get_pending_by_ffqns(conn, 1, pending_at_or_sooner, ffqns.as_ref())
3102                    },
3103                    "get_pending_by_ffqns",
3104                )
3105                .await
3106            else {
3107                trace!(
3108                    "Ignoring get_pending error and waiting in for timeout to avoid executor repolling too soon"
3109                );
3110                timeout_fut.await;
3111                return;
3112            };
3113            if !execution_ids_versions.is_empty() {
3114                trace!("Not waiting, database already contains new pending executions");
3115                return;
3116            }
3117            tokio::select! { // future's liveness: Dropping the loser immediately.
3118                _ = receiver.recv() => {
3119                    trace!("Received a notification");
3120                }
3121                () = timeout_fut => {
3122                }
3123            }
3124        }.await;
3125        // Clean up ffqn_to_pending_subscription in any case
3126        {
3127            let mut pending_subscribers = self.0.pending_subscribers.lock().unwrap();
3128            for ffqn in ffqns.as_ref() {
3129                match pending_subscribers.remove_ffqn(ffqn) {
3130                    Some((_, tag)) if tag == unique_tag => {
3131                        // Cleanup OK.
3132                    }
3133                    Some(other) => {
3134                        // Reinsert foreign sender.
3135                        pending_subscribers.insert_ffqn(ffqn.clone(), other);
3136                    }
3137                    None => {
3138                        // Value was replaced and cleaned up already.
3139                    }
3140                }
3141            }
3142        }
3143    }
3144
3145    // Supports only one subscriber (executor) per component id.
3146    // A new subscriber replaces the old one, which will eventually time out, which is fine.
3147    async fn wait_for_pending_by_component_id(
3148        &self,
3149        pending_at_or_sooner: DateTime<Utc>,
3150        component_id: &ComponentId,
3151        timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
3152    ) {
3153        let unique_tag: u64 = rand::random();
3154        let (sender, mut receiver) = mpsc::channel(1); // senders must use `try_send`
3155        {
3156            let mut pending_subscribers = self.0.pending_subscribers.lock().unwrap();
3157            pending_subscribers.insert_by_component(
3158                component_id.input_digest.clone(),
3159                (sender.clone(), unique_tag),
3160            );
3161        }
3162        async {
3163            let Ok(execution_ids_versions) = self
3164                .transaction(
3165                    {
3166                        let input_digest =component_id.input_digest.clone();
3167                        move |conn| Self::get_pending_by_component_input_digest(conn, 1, pending_at_or_sooner, &input_digest)
3168                    },
3169                    "get_pending_by_component_input_digest",
3170                )
3171                .await
3172            else {
3173                trace!(
3174                    "Ignoring get_pending error and waiting in for timeout to avoid executor repolling too soon"
3175                );
3176                timeout_fut.await;
3177                return;
3178            };
3179            if !execution_ids_versions.is_empty() {
3180                trace!("Not waiting, database already contains new pending executions");
3181                return;
3182            }
3183            tokio::select! { // future's liveness: Dropping the loser immediately.
3184                _ = receiver.recv() => {
3185                    trace!("Received a notification");
3186                }
3187                () = timeout_fut => {
3188                }
3189            }
3190        }.await;
3191        // Clean up ffqn_to_pending_subscription in any case
3192        {
3193            let mut pending_subscribers = self.0.pending_subscribers.lock().unwrap();
3194
3195            match pending_subscribers.remove_by_component(&component_id.input_digest) {
3196                Some((_, tag)) if tag == unique_tag => {
3197                    // Cleanup OK.
3198                }
3199                Some(other) => {
3200                    // Reinsert foreign sender.
3201                    pending_subscribers
3202                        .insert_by_component(component_id.input_digest.clone(), other);
3203                }
3204                None => {
3205                    // Value was replaced and cleaned up already.
3206                }
3207            }
3208        }
3209    }
3210
3211    async fn get_last_execution_event(
3212        &self,
3213        execution_id: &ExecutionId,
3214    ) -> Result<ExecutionEvent, DbErrorRead> {
3215        let execution_id = execution_id.clone();
3216        self.transaction(
3217            move |tx| Self::get_last_execution_event(tx, &execution_id),
3218            "get_last_execution_event",
3219        )
3220        .await
3221    }
3222}
3223
3224#[async_trait]
3225impl DbExternalApi for SqlitePool {
3226    #[instrument(level = Level::DEBUG, skip_all)]
3227    async fn get_backtrace(
3228        &self,
3229        execution_id: &ExecutionId,
3230        filter: BacktraceFilter,
3231    ) -> Result<BacktraceInfo, DbErrorRead> {
3232        debug!("get_backtrace");
3233        let execution_id = execution_id.clone();
3234
3235        self.transaction(
3236            move |tx| {
3237                let select = "SELECT component_id, version_min_including, version_max_excluding, wasm_backtrace FROM t_backtrace \
3238                                WHERE execution_id = :execution_id";
3239                let mut params: Vec<(&'static str, Box<dyn rusqlite::ToSql>)> = vec![(":execution_id", Box::new(execution_id.to_string()))];
3240                let select = match &filter {
3241                    BacktraceFilter::Specific(version) =>{
3242                        params.push((":version", Box::new(version.0)));
3243                        format!("{select} AND version_min_including <= :version AND version_max_excluding > :version")
3244                    },
3245                    BacktraceFilter::First => format!("{select} ORDER BY version_min_including LIMIT 1"),
3246                    BacktraceFilter::Last => format!("{select} ORDER BY version_min_including DESC LIMIT 1")
3247                };
3248                tx
3249                    .prepare(&select)
3250                    ?
3251                    .query_row::<_, &[(&'static str, &dyn ToSql)], _>(
3252                        params
3253                            .iter()
3254                            .map(|(key, value)| (*key, value.as_ref()))
3255                            .collect::<Vec<_>>()
3256                            .as_ref(),
3257                    |row| {
3258                        Ok(BacktraceInfo {
3259                            execution_id: execution_id.clone(),
3260                            component_id: row.get::<_, JsonWrapper<_> >("component_id")?.0,
3261                            version_min_including: Version::new(row.get::<_, VersionType>("version_min_including")?),
3262                            version_max_excluding: Version::new(row.get::<_, VersionType>("version_max_excluding")?),
3263                            wasm_backtrace: row.get::<_, JsonWrapper<_>>("wasm_backtrace")?.0,
3264                        })
3265                    },
3266                ).map_err(DbErrorRead::from)
3267            },
3268            "get_last_backtrace",
3269        ).await
3270    }
3271
3272    async fn list_executions(
3273        &self,
3274        ffqn: Option<FunctionFqn>,
3275        top_level_only: bool,
3276        pagination: ExecutionListPagination,
3277    ) -> Result<Vec<ExecutionWithState>, DbErrorGeneric> {
3278        self.transaction(
3279            move |tx| Self::list_executions(tx, ffqn.as_ref(), top_level_only, &pagination),
3280            "list_executions",
3281        )
3282        .await
3283    }
3284
3285    #[instrument(level = Level::DEBUG, skip(self))]
3286    async fn list_execution_events(
3287        &self,
3288        execution_id: &ExecutionId,
3289        since: &Version,
3290        max_length: VersionType,
3291        include_backtrace_id: bool,
3292    ) -> Result<Vec<ExecutionEvent>, DbErrorRead> {
3293        let execution_id = execution_id.clone();
3294        let since = since.0;
3295        self.transaction(
3296            move |tx| {
3297                Self::list_execution_events(
3298                    tx,
3299                    &execution_id,
3300                    since,
3301                    since + max_length,
3302                    include_backtrace_id,
3303                )
3304            },
3305            "get",
3306        )
3307        .await
3308    }
3309
3310    async fn list_responses(
3311        &self,
3312        execution_id: &ExecutionId,
3313        pagination: Pagination<u32>,
3314    ) -> Result<Vec<ResponseWithCursor>, DbErrorRead> {
3315        let execution_id = execution_id.clone();
3316        self.transaction(
3317            move |tx| Self::list_responses(tx, &execution_id, Some(pagination)),
3318            "list_responses",
3319        )
3320        .await
3321    }
3322
3323    async fn upgrade_execution_component(
3324        &self,
3325        execution_id: &ExecutionId,
3326        old: &InputContentDigest,
3327        new: &InputContentDigest,
3328    ) -> Result<(), DbErrorWrite> {
3329        let execution_id = execution_id.clone();
3330        let old = old.clone();
3331        let new = new.clone();
3332        self.transaction(
3333            move |tx| Self::upgrade_execution_component(tx, &execution_id, &old, &new),
3334            "upgrade_execution_component",
3335        )
3336        .await
3337    }
3338}
3339
3340#[async_trait]
3341impl DbConnection for SqlitePool {
3342    #[instrument(level = Level::DEBUG, skip_all, fields(execution_id = %req.execution_id))]
3343    async fn create(&self, req: CreateRequest) -> Result<AppendResponse, DbErrorWrite> {
3344        debug!("create");
3345        trace!(?req, "create");
3346        let created_at = req.created_at;
3347        let (version, notifier) = self
3348            .transaction(move |tx| Self::create_inner(tx, req.clone()), "create")
3349            .await?;
3350        self.notify_all(vec![notifier], created_at);
3351        Ok(version)
3352    }
3353
3354    #[instrument(level = Level::DEBUG, skip(self, batch))]
3355    async fn append_batch(
3356        &self,
3357        current_time: DateTime<Utc>,
3358        batch: Vec<AppendRequest>,
3359        execution_id: ExecutionId,
3360        version: Version,
3361    ) -> Result<AppendBatchResponse, DbErrorWrite> {
3362        debug!("append_batch");
3363        trace!(?batch, "append_batch");
3364        assert!(!batch.is_empty(), "Empty batch request");
3365
3366        let (version, notifier) = self
3367            .transaction(
3368                move |tx| {
3369                    let mut version = version.clone();
3370                    let mut notifier = None;
3371                    for append_request in &batch {
3372                        let (v, n) =
3373                            Self::append(tx, &execution_id, append_request.clone(), version)?;
3374                        version = v;
3375                        notifier = Some(n);
3376                    }
3377                    Ok::<_, DbErrorWrite>((
3378                        version,
3379                        notifier.expect("checked that the batch is not empty"),
3380                    ))
3381                },
3382                "append_batch",
3383            )
3384            .await?;
3385
3386        self.notify_all(vec![notifier], current_time);
3387        Ok(version)
3388    }
3389
3390    #[instrument(level = Level::DEBUG, skip(self, batch, child_req))]
3391    async fn append_batch_create_new_execution(
3392        &self,
3393        current_time: DateTime<Utc>,
3394        batch: Vec<AppendRequest>,
3395        execution_id: ExecutionId,
3396        version: Version,
3397        child_req: Vec<CreateRequest>,
3398    ) -> Result<AppendBatchResponse, DbErrorWrite> {
3399        debug!("append_batch_create_new_execution");
3400        trace!(?batch, ?child_req, "append_batch_create_new_execution");
3401        assert!(!batch.is_empty(), "Empty batch request");
3402
3403        let (version, notifiers) = self
3404            .transaction(
3405                move |tx| {
3406                    let mut notifier = None;
3407                    let mut version = version.clone();
3408                    for append_request in &batch {
3409                        let (v, n) =
3410                            Self::append(tx, &execution_id, append_request.clone(), version)?;
3411                        version = v;
3412                        notifier = Some(n);
3413                    }
3414                    let mut notifiers = Vec::new();
3415                    notifiers.push(notifier.expect("checked that the batch is not empty"));
3416
3417                    for child_req in &child_req {
3418                        let (_, notifier) = Self::create_inner(tx, child_req.clone())?;
3419                        notifiers.push(notifier);
3420                    }
3421                    Ok::<_, DbErrorWrite>((version, notifiers))
3422                },
3423                "append_batch_create_new_execution_inner",
3424            )
3425            .await?;
3426        self.notify_all(notifiers, current_time);
3427        Ok(version)
3428    }
3429
3430    // Supports only one subscriber per execution id.
3431    // A new call will overwrite the old subscriber, the old one will end
3432    // with a timeout, which is fine.
3433    #[instrument(level = Level::DEBUG, skip(self, timeout_fut))]
3434    async fn subscribe_to_next_responses(
3435        &self,
3436        execution_id: &ExecutionId,
3437        start_idx: u32,
3438        timeout_fut: Pin<Box<dyn Future<Output = TimeoutOutcome> + Send>>,
3439    ) -> Result<Vec<JoinSetResponseEventOuter>, DbErrorReadWithTimeout> {
3440        debug!("next_responses");
3441        let unique_tag: u64 = rand::random();
3442        let execution_id = execution_id.clone();
3443
3444        let cleanup = || {
3445            let mut guard = self.0.response_subscribers.lock().unwrap();
3446            match guard.remove(&execution_id) {
3447                Some((_, tag)) if tag == unique_tag => {} // Cleanup OK.
3448                Some(other) => {
3449                    // Reinsert foreign sender.
3450                    guard.insert(execution_id.clone(), other);
3451                }
3452                None => {} // Value was replaced and cleaned up already, or notification was sent.
3453            }
3454        };
3455
3456        let response_subscribers = self.0.response_subscribers.clone();
3457        let resp_or_receiver = {
3458            let execution_id = execution_id.clone();
3459            self.transaction(
3460                move |tx| {
3461                    let responses = Self::get_responses_with_offset(tx, &execution_id, start_idx)?;
3462                    if responses.is_empty() {
3463                        // cannot race as we have the transaction write lock
3464                        let (sender, receiver) = oneshot::channel();
3465                        response_subscribers
3466                            .lock()
3467                            .unwrap()
3468                            .insert(execution_id.clone(), (sender, unique_tag));
3469                        Ok::<_, DbErrorReadWithTimeout>(itertools::Either::Right(receiver))
3470                    } else {
3471                        Ok(itertools::Either::Left(responses))
3472                    }
3473                },
3474                "subscribe_to_next_responses",
3475            )
3476            .await
3477        }
3478        .inspect_err(|_| {
3479            cleanup();
3480        })?;
3481        match resp_or_receiver {
3482            itertools::Either::Left(resp) => Ok(resp), // no need for cleanup
3483            itertools::Either::Right(receiver) => {
3484                let res = tokio::select! {
3485                    resp = receiver => {
3486                        match resp {
3487                            Ok(resp) => Ok(vec![resp]),
3488                            Err(_) => Err(DbErrorReadWithTimeout::from(DbErrorGeneric::Close)),
3489                        }
3490                    }
3491                    outcome = timeout_fut => Err(DbErrorReadWithTimeout::Timeout(outcome)),
3492                };
3493                cleanup();
3494                res
3495            }
3496        }
3497    }
3498
3499    // Supports multiple subscribers.
3500    async fn wait_for_finished_result(
3501        &self,
3502        execution_id: &ExecutionId,
3503        timeout_fut: Option<Pin<Box<dyn Future<Output = TimeoutOutcome> + Send>>>,
3504    ) -> Result<SupportedFunctionReturnValue, DbErrorReadWithTimeout> {
3505        let unique_tag: u64 = rand::random();
3506        let execution_id = execution_id.clone();
3507        let execution_finished_subscription = self.0.execution_finished_subscribers.clone();
3508
3509        let cleanup = || {
3510            let mut guard = self.0.execution_finished_subscribers.lock().unwrap();
3511            if let Some(subscribers) = guard.get_mut(&execution_id) {
3512                subscribers.remove(&unique_tag);
3513            }
3514        };
3515
3516        let resp_or_receiver = {
3517            let execution_id = execution_id.clone();
3518            self.transaction(move |tx| {
3519                let pending_state =
3520                    Self::get_combined_state(tx, &execution_id)?.pending_state;
3521                if let PendingState::Finished { finished, .. } = pending_state {
3522                    let event =
3523                        Self::get_execution_event(tx, &execution_id, finished.version)?;
3524                    if let ExecutionRequest::Finished { result, ..} = event.event {
3525                        Ok(itertools::Either::Left(result))
3526                    } else {
3527                        error!("Mismatch, expected Finished row: {event:?} based on t_state {finished}");
3528                        Err(DbErrorReadWithTimeout::from(consistency_db_err(
3529                            "cannot get finished event based on t_state version"
3530                        )))
3531                    }
3532                } else {
3533                    // Cannot race with the notifier as we have the transaction write lock:
3534                    // Either the finished event was appended previously, thus `itertools::Either::Left` was selected,
3535                    // or we end up here. If this tx fails, the cleanup will remove this entry.
3536                    let (sender, receiver) = oneshot::channel();
3537                    let mut guard = execution_finished_subscription.lock().unwrap();
3538                    guard.entry(execution_id.clone()).or_default().insert(unique_tag, sender);
3539                    Ok(itertools::Either::Right(receiver))
3540                }
3541            }, "wait_for_finished_result")
3542            .await
3543        }
3544        .inspect_err(|_| {
3545            // This cleanup can race with the notification sender, since both are running after a transaction was finished.
3546            // If the notification sender wins, it removes our oneshot sender and puts a value in it, cleanup will not find the unique tag.
3547            // If cleanup wins, it simply removes the oneshot sender.
3548            cleanup();
3549        })?;
3550
3551        let timeout_fut = timeout_fut.unwrap_or_else(|| Box::pin(std::future::pending()));
3552        match resp_or_receiver {
3553            itertools::Either::Left(resp) => Ok(resp), // no need for cleanup
3554            itertools::Either::Right(receiver) => {
3555                let res = tokio::select! {
3556                    resp = receiver => {
3557                        match resp {
3558                            Ok(retval) => Ok(retval),
3559                            Err(_recv_err) => Err(DbErrorGeneric::Close.into())
3560                        }
3561                    }
3562                    outcome = timeout_fut => Err(DbErrorReadWithTimeout::Timeout(outcome)),
3563                };
3564                cleanup();
3565                res
3566            }
3567        }
3568    }
3569
3570    #[instrument(level = Level::DEBUG, skip_all, fields(%join_set_id, %execution_id))]
3571    async fn append_delay_response(
3572        &self,
3573        created_at: DateTime<Utc>,
3574        execution_id: ExecutionId,
3575        join_set_id: JoinSetId,
3576        delay_id: DelayId,
3577        result: Result<(), ()>,
3578    ) -> Result<AppendDelayResponseOutcome, DbErrorWrite> {
3579        debug!("append_delay_response");
3580        let event = JoinSetResponseEventOuter {
3581            created_at,
3582            event: JoinSetResponseEvent {
3583                join_set_id,
3584                event: JoinSetResponse::DelayFinished {
3585                    delay_id: delay_id.clone(),
3586                    result,
3587                },
3588            },
3589        };
3590        let res = self
3591            .transaction(
3592                {
3593                    let execution_id = execution_id.clone();
3594                    move |tx| Self::append_response(tx, &execution_id, event.clone())
3595                },
3596                "append_delay_response",
3597            )
3598            .await;
3599        match res {
3600            Ok(notifier) => {
3601                self.notify_all(vec![notifier], created_at);
3602                Ok(AppendDelayResponseOutcome::Success)
3603            }
3604            Err(DbErrorWrite::NonRetriable(DbErrorWriteNonRetriable::Conflict)) => {
3605                let delay_success = self
3606                    .transaction(
3607                        move |tx| Self::delay_response(tx, &execution_id, &delay_id),
3608                        "delay_response",
3609                    )
3610                    .await?;
3611                match delay_success {
3612                    Some(true) => Ok(AppendDelayResponseOutcome::AlreadyFinished),
3613                    Some(false) => Ok(AppendDelayResponseOutcome::AlreadyCancelled),
3614                    None => Err(DbErrorWrite::Generic(DbErrorGeneric::Uncategorized(
3615                        "insert failed yet select did not find the response".into(),
3616                    ))),
3617                }
3618            }
3619            Err(err) => Err(err),
3620        }
3621    }
3622
3623    #[instrument(level = Level::DEBUG, skip_all)]
3624    async fn append_backtrace(&self, append: BacktraceInfo) -> Result<(), DbErrorWrite> {
3625        debug!("append_backtrace");
3626        self.transaction(
3627            move |tx| Self::append_backtrace(tx, &append),
3628            "append_backtrace",
3629        )
3630        .await
3631    }
3632
3633    #[instrument(level = Level::DEBUG, skip_all)]
3634    async fn append_backtrace_batch(&self, batch: Vec<BacktraceInfo>) -> Result<(), DbErrorWrite> {
3635        debug!("append_backtrace_batch");
3636        self.transaction(
3637            move |tx| {
3638                for append in &batch {
3639                    Self::append_backtrace(tx, append)?;
3640                }
3641                Ok(())
3642            },
3643            "append_backtrace",
3644        )
3645        .await
3646    }
3647
3648    /// Get currently expired delays and locks.
3649    #[instrument(level = Level::TRACE, skip(self))]
3650    async fn get_expired_timers(
3651        &self,
3652        at: DateTime<Utc>,
3653    ) -> Result<Vec<ExpiredTimer>, DbErrorGeneric> {
3654        self.transaction(
3655            move |conn| {
3656                let mut expired_timers = conn.prepare(
3657                    "SELECT execution_id, join_set_id, delay_id FROM t_delay WHERE expires_at <= :at",
3658                )?
3659                .query_map(
3660                        named_params! {
3661                            ":at": at,
3662                        },
3663                        |row| {
3664                            let execution_id = row.get("execution_id")?;
3665                            let join_set_id = row.get::<_, JoinSetId>("join_set_id")?;
3666                            let delay_id = row.get::<_, DelayId>("delay_id")?;
3667                            let delay = ExpiredDelay { execution_id, join_set_id, delay_id };
3668                            Ok(ExpiredTimer::Delay(delay))
3669                        },
3670                    )?
3671                    .collect::<Result<Vec<_>, _>>()?;
3672                // Extend with expired locks
3673                let expired = conn.prepare(&format!(r#"
3674                    SELECT execution_id, last_lock_version, corresponding_version, intermittent_event_count, max_retries, retry_exp_backoff_millis,
3675                    executor_id, run_id
3676                    FROM t_state
3677                    WHERE pending_expires_finished <= :at AND state = "{STATE_LOCKED}"
3678                    "#
3679                )
3680                )?
3681                .query_map(
3682                        named_params! {
3683                            ":at": at,
3684                        },
3685                        |row| {
3686                            let execution_id = row.get("execution_id")?;
3687                            let locked_at_version = Version::new(row.get("last_lock_version")?);
3688                            let next_version = Version::new(row.get("corresponding_version")?).increment();
3689                            let intermittent_event_count = row.get("intermittent_event_count")?;
3690                            let max_retries = row.get("max_retries")?;
3691                            let retry_exp_backoff_millis = u64::from(row.get::<_, u32>("retry_exp_backoff_millis")?);
3692                            let executor_id = row.get("executor_id")?;
3693                            let run_id = row.get("run_id")?;
3694                            let lock = ExpiredLock {
3695                                execution_id,
3696                                locked_at_version,
3697                                next_version,
3698                                intermittent_event_count,
3699                                max_retries,
3700                                retry_exp_backoff: Duration::from_millis(retry_exp_backoff_millis),
3701                                locked_by: LockedBy { executor_id, run_id },
3702                            };
3703                            Ok(ExpiredTimer::Lock(lock))
3704                        }
3705                    )?
3706                    .collect::<Result<Vec<_>, _>>()?;
3707                expired_timers.extend(expired);
3708                if !expired_timers.is_empty() {
3709                    debug!("get_expired_timers found {expired_timers:?}");
3710                }
3711                Ok(expired_timers)
3712            }, "get_expired_timers"
3713        )
3714        .await
3715    }
3716
3717    async fn get_execution_event(
3718        &self,
3719        execution_id: &ExecutionId,
3720        version: &Version,
3721    ) -> Result<ExecutionEvent, DbErrorRead> {
3722        let version = version.0;
3723        let execution_id = execution_id.clone();
3724        self.transaction(
3725            move |tx| Self::get_execution_event(tx, &execution_id, version),
3726            "get_execution_event",
3727        )
3728        .await
3729    }
3730
3731    async fn get_pending_state(
3732        &self,
3733        execution_id: &ExecutionId,
3734    ) -> Result<PendingState, DbErrorRead> {
3735        let execution_id = execution_id.clone();
3736        Ok(self
3737            .transaction(
3738                move |tx| Self::get_combined_state(tx, &execution_id),
3739                "get_pending_state",
3740            )
3741            .await?
3742            .pending_state)
3743    }
3744}
3745
3746#[cfg(feature = "test")]
3747#[async_trait]
3748impl concepts::storage::DbConnectionTest for SqlitePool {
3749    #[instrument(level = Level::DEBUG, skip(self, response_event), fields(join_set_id = %response_event.join_set_id))]
3750    async fn append_response(
3751        &self,
3752        created_at: DateTime<Utc>,
3753        execution_id: ExecutionId,
3754        response_event: JoinSetResponseEvent,
3755    ) -> Result<(), DbErrorWrite> {
3756        debug!("append_response");
3757        let event = JoinSetResponseEventOuter {
3758            created_at,
3759            event: response_event,
3760        };
3761        let notifier = self
3762            .transaction(
3763                move |tx| Self::append_response(tx, &execution_id, event.clone()),
3764                "append_response",
3765            )
3766            .await?;
3767        self.notify_all(vec![notifier], created_at);
3768        Ok(())
3769    }
3770
3771    #[instrument(level = Level::DEBUG, skip(self))]
3772    async fn get(
3773        &self,
3774        execution_id: &ExecutionId,
3775    ) -> Result<concepts::storage::ExecutionLog, DbErrorRead> {
3776        trace!("get");
3777        let execution_id = execution_id.clone();
3778        self.transaction(move |tx| Self::get(tx, &execution_id), "get")
3779            .await
3780    }
3781}
3782
3783#[cfg(any(test, feature = "tempfile"))]
3784pub mod tempfile {
3785    use super::{SqliteConfig, SqlitePool};
3786    use tempfile::NamedTempFile;
3787
3788    pub async fn sqlite_pool() -> (SqlitePool, Option<NamedTempFile>) {
3789        if let Ok(path) = std::env::var("SQLITE_FILE") {
3790            (
3791                SqlitePool::new(path, SqliteConfig::default())
3792                    .await
3793                    .unwrap(),
3794                None,
3795            )
3796        } else {
3797            let file = NamedTempFile::new().unwrap();
3798            let path = file.path();
3799            (
3800                SqlitePool::new(path, SqliteConfig::default())
3801                    .await
3802                    .unwrap(),
3803                Some(file),
3804            )
3805        }
3806    }
3807}