obeli_sk_db_sqlite/
sqlite_dao.rs

1use crate::histograms::Histograms;
2use assert_matches::assert_matches;
3use async_trait::async_trait;
4use chrono::{DateTime, Utc};
5use concepts::{
6    ComponentId, ExecutionId, FunctionFqn, JoinSetId, SupportedFunctionReturnValue,
7    prefixed_ulid::{DelayId, ExecutionIdDerived, ExecutorId, RunId},
8    storage::{
9        AppendBatchResponse, AppendRequest, AppendResponse, BacktraceFilter, BacktraceInfo,
10        CreateRequest, DUMMY_CREATED, DUMMY_HISTORY_EVENT, DbConnection, DbErrorGeneric,
11        DbErrorRead, DbErrorReadWithTimeout, DbErrorWrite, DbErrorWritePermanent, DbExecutor,
12        DbPool, DbPoolCloseable, ExecutionEvent, ExecutionEventInner, ExecutionListPagination,
13        ExecutionWithState, ExpiredDelay, ExpiredLock, ExpiredTimer, HistoryEvent, JoinSetRequest,
14        JoinSetResponse, JoinSetResponseEvent, JoinSetResponseEventOuter, LockPendingResponse,
15        LockedExecution, Pagination, PendingState, PendingStateFinished,
16        PendingStateFinishedResultKind, ResponseWithCursor, Version, VersionType,
17    },
18};
19use conversions::{FromStrWrapper, JsonWrapper, consistency_db_err, consistency_rusqlite};
20use conversions::{
21    intermittent_err_generic, prepare_err_generic, result_err_generic, result_err_read,
22};
23use hashbrown::HashMap;
24use rusqlite::{
25    CachedStatement, Connection, OpenFlags, OptionalExtension, Params, ToSql, Transaction,
26    named_params,
27};
28use std::{
29    cmp::max,
30    collections::VecDeque,
31    fmt::Debug,
32    path::Path,
33    str::FromStr,
34    sync::{
35        Arc, Mutex,
36        atomic::{AtomicBool, Ordering},
37    },
38    time::Duration,
39};
40use std::{fmt::Write as _, pin::Pin};
41use tokio::{
42    sync::{mpsc, oneshot},
43    time::Instant,
44};
45use tracing::{Level, Span, debug, debug_span, error, info, instrument, trace, warn};
46
47#[derive(Debug, thiserror::Error)]
48#[error("initialization error")]
49pub struct InitializationError;
50
51#[derive(Debug, Clone)]
52struct DelayReq {
53    join_set_id: JoinSetId,
54    delay_id: DelayId,
55    expires_at: DateTime<Utc>,
56}
57/*
58mmap_size = 128MB - Set the global memory map so all processes can share some data
59https://www.sqlite.org/pragma.html#pragma_mmap_size
60https://www.sqlite.org/mmap.html
61
62journal_size_limit = 64 MB - limit on the WAL file to prevent unlimited growth
63https://www.sqlite.org/pragma.html#pragma_journal_size_limit
64
65Inspired by https://github.com/rails/rails/pull/49349
66*/
67
68const PRAGMA: [[&str; 2]; 10] = [
69    ["journal_mode", "wal"],
70    ["synchronous", "FULL"],
71    ["foreign_keys", "true"],
72    ["busy_timeout", "1000"],
73    ["cache_size", "10000"], // number of pages
74    ["temp_store", "MEMORY"],
75    ["page_size", "8192"], // 8 KB
76    ["mmap_size", "134217728"],
77    ["journal_size_limit", "67108864"],
78    ["integrity_check", ""],
79];
80
81// Append only
82const CREATE_TABLE_T_METADATA: &str = r"
83CREATE TABLE IF NOT EXISTS t_metadata (
84    id INTEGER PRIMARY KEY AUTOINCREMENT,
85    schema_version INTEGER NOT NULL,
86    created_at TEXT NOT NULL
87) STRICT
88";
89const T_METADATA_EXPECTED_SCHEMA_VERSION: u32 = 2;
90
91/// Stores execution history. Append only.
92const CREATE_TABLE_T_EXECUTION_LOG: &str = r"
93CREATE TABLE IF NOT EXISTS t_execution_log (
94    execution_id TEXT NOT NULL,
95    created_at TEXT NOT NULL,
96    json_value TEXT NOT NULL,
97    version INTEGER NOT NULL,
98    variant TEXT NOT NULL,
99    join_set_id TEXT,
100    history_event_type TEXT GENERATED ALWAYS AS (json_value->>'$.HistoryEvent.event.type') STORED,
101    PRIMARY KEY (execution_id, version)
102) STRICT
103";
104// Used in `fetch_created` and `get_execution_event`
105const CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_VERSION: &str = r"
106CREATE INDEX IF NOT EXISTS idx_t_execution_log_execution_id_version  ON t_execution_log (execution_id, version);
107";
108// Used in `lock_inner` to filter by execution ID and variant (created or event history)
109const CREATE_INDEX_IDX_T_EXECUTION_ID_EXECUTION_ID_VARIANT: &str = r"
110CREATE INDEX IF NOT EXISTS idx_t_execution_log_execution_id_variant  ON t_execution_log (execution_id, variant);
111";
112
113/// Stores child execution return values for the parent (`execution_id`). Append only.
114/// For `JoinSetResponse::DelayFinished`, column `delay_id` must not be null.
115/// For `JoinSetResponse::ChildExecutionFinished`, column `child_execution_id`,`finished_version`
116/// must not be null.
117const CREATE_TABLE_T_JOIN_SET_RESPONSE: &str = r"
118CREATE TABLE IF NOT EXISTS t_join_set_response (
119    id INTEGER PRIMARY KEY AUTOINCREMENT,
120    created_at TEXT NOT NULL,
121    execution_id TEXT NOT NULL,
122    join_set_id TEXT NOT NULL,
123
124    delay_id TEXT,
125
126    child_execution_id TEXT,
127    finished_version INTEGER,
128
129    UNIQUE (execution_id, join_set_id, delay_id, child_execution_id)
130) STRICT
131";
132// Used when querying for the next response
133const CREATE_INDEX_IDX_T_JOIN_SET_RESPONSE_EXECUTION_ID_ID: &str = r"
134CREATE INDEX IF NOT EXISTS idx_t_join_set_response_execution_id_id ON t_join_set_response (execution_id, id);
135";
136
137/// Stores executions in `PendingState`
138/// `state` to column mapping:
139/// `PendingAt`:            (nothing but required columns)
140/// `Locked`:               `last_lock_version`, `executor_id`, `run_id`
141/// `BlockedByJoinSet`:     `join_set_id`, `join_set_closing`
142/// `Finished` :            `result_kind`.
143const CREATE_TABLE_T_STATE: &str = r"
144CREATE TABLE IF NOT EXISTS t_state (
145    execution_id TEXT NOT NULL,
146    is_top_level INTEGER NOT NULL,
147    corresponding_version INTEGER NOT NULL,
148    pending_expires_finished TEXT NOT NULL,
149    ffqn TEXT NOT NULL,
150    state TEXT NOT NULL,
151    created_at TEXT NOT NULL,
152    updated_at TEXT NOT NULL,
153    scheduled_at TEXT NOT NULL,
154    intermittent_event_count INTEGER NOT NULL,
155    max_retries INTEGER NOT NULL,
156    retry_exp_backoff_millis INTEGER NOT NULL,
157
158    last_lock_version INTEGER,
159    executor_id TEXT,
160    run_id TEXT,
161
162    join_set_id TEXT,
163    join_set_closing INTEGER,
164
165    result_kind TEXT,
166
167    PRIMARY KEY (execution_id)
168) STRICT
169";
170const STATE_PENDING_AT: &str = "PendingAt";
171const STATE_BLOCKED_BY_JOIN_SET: &str = "BlockedByJoinSet";
172const STATE_LOCKED: &str = "Locked";
173const STATE_FINISHED: &str = "Finished";
174const HISTORY_EVENT_TYPE_JOIN_NEXT: &str = "JoinNext";
175
176// TODO: partial indexes
177const IDX_T_STATE_LOCK_PENDING: &str = r"
178CREATE INDEX IF NOT EXISTS idx_t_state_lock_pending ON t_state (state, pending_expires_finished, ffqn);
179";
180const IDX_T_STATE_EXPIRED_TIMERS: &str = r"
181CREATE INDEX IF NOT EXISTS idx_t_state_expired_timers ON t_state (pending_expires_finished) WHERE executor_id IS NOT NULL;
182";
183const IDX_T_STATE_EXECUTION_ID_IS_TOP_LEVEL: &str = r"
184CREATE INDEX IF NOT EXISTS idx_t_state_execution_id_is_root ON t_state (execution_id, is_top_level);
185";
186// For `list_executions` by ffqn
187const IDX_T_STATE_FFQN: &str = r"
188CREATE INDEX IF NOT EXISTS idx_t_state_ffqn ON t_state (ffqn);
189";
190// For `list_executions` by creation date
191const IDX_T_STATE_CREATED_AT: &str = r"
192CREATE INDEX IF NOT EXISTS idx_t_state_created_at ON t_state (created_at);
193";
194
195/// Represents [`ExpiredTimer::AsyncDelay`] . Rows are deleted when the delay is processed.
196const CREATE_TABLE_T_DELAY: &str = r"
197CREATE TABLE IF NOT EXISTS t_delay (
198    execution_id TEXT NOT NULL,
199    join_set_id TEXT NOT NULL,
200    delay_id TEXT NOT NULL,
201    expires_at TEXT NOT NULL,
202    PRIMARY KEY (execution_id, join_set_id, delay_id)
203) STRICT
204";
205
206// Append only.
207const CREATE_TABLE_T_BACKTRACE: &str = r"
208CREATE TABLE IF NOT EXISTS t_backtrace (
209    execution_id TEXT NOT NULL,
210    component_id TEXT NOT NULL,
211    version_min_including INTEGER NOT NULL,
212    version_max_excluding INTEGER NOT NULL,
213    wasm_backtrace TEXT NOT NULL,
214    PRIMARY KEY (execution_id, version_min_including, version_max_excluding)
215) STRICT
216";
217// Index for searching backtraces by execution_id and version
218const IDX_T_BACKTRACE_EXECUTION_ID_VERSION: &str = r"
219CREATE INDEX IF NOT EXISTS idx_t_backtrace_execution_id_version ON t_backtrace (execution_id, version_min_including, version_max_excluding);
220";
221mod conversions {
222
223    use concepts::storage::{DbErrorGeneric, DbErrorRead};
224    use rusqlite::types::{FromSql, FromSqlError};
225    use std::{fmt::Debug, str::FromStr};
226    use tracing::error;
227
228    #[expect(clippy::needless_pass_by_value)]
229    pub(crate) fn prepare_err_generic(err: rusqlite::Error) -> DbErrorGeneric {
230        error!(backtrace = %std::backtrace::Backtrace::capture(), "Sqlite error {err:?}");
231        DbErrorGeneric::Uncategorized(err.to_string().into())
232    }
233    #[expect(clippy::needless_pass_by_value)]
234    pub(crate) fn intermittent_err_generic(err: rusqlite::Error) -> DbErrorGeneric {
235        error!(backtrace = %std::backtrace::Backtrace::capture(), "Sqlite error {err:?}");
236        DbErrorGeneric::Uncategorized(err.to_string().into())
237    }
238    #[expect(clippy::needless_pass_by_value)]
239    pub(crate) fn result_err_generic(err: rusqlite::Error) -> DbErrorGeneric {
240        error!(backtrace = %std::backtrace::Backtrace::capture(), "Sqlite error {err:?}");
241        DbErrorGeneric::Uncategorized(err.to_string().into())
242    }
243    pub(crate) fn result_err_read(err: rusqlite::Error) -> DbErrorRead {
244        if matches!(err, rusqlite::Error::QueryReturnedNoRows) {
245            DbErrorRead::NotFound
246        } else {
247            result_err_generic(err).into()
248        }
249    }
250
251    pub(crate) struct JsonWrapper<T>(pub(crate) T);
252    impl<T: serde::de::DeserializeOwned + 'static + Debug> FromSql for JsonWrapper<T> {
253        fn column_result(
254            value: rusqlite::types::ValueRef<'_>,
255        ) -> rusqlite::types::FromSqlResult<Self> {
256            let value = match value {
257                rusqlite::types::ValueRef::Text(value) | rusqlite::types::ValueRef::Blob(value) => {
258                    Ok(value)
259                }
260                other => {
261                    error!(
262                        backtrace = %std::backtrace::Backtrace::capture(),
263                        "Unexpected type when conveting to JSON - expected Text or Blob, got type `{other:?}`",
264                    );
265                    Err(FromSqlError::InvalidType)
266                }
267            }?;
268            let value = serde_json::from_slice::<T>(value).map_err(|err| {
269                error!(
270                    backtrace = %std::backtrace::Backtrace::capture(),
271                    "Cannot convert JSON value `{value:?}` to type:`{type}` - {err:?}",
272                    r#type = std::any::type_name::<T>()
273                );
274                FromSqlError::InvalidType
275            })?;
276            Ok(Self(value))
277        }
278    }
279
280    pub(crate) struct FromStrWrapper<T: FromStr>(pub(crate) T);
281    impl<T: FromStr<Err = D>, D: Debug> FromSql for FromStrWrapper<T> {
282        fn column_result(
283            value: rusqlite::types::ValueRef<'_>,
284        ) -> rusqlite::types::FromSqlResult<Self> {
285            let value = String::column_result(value)?;
286            let value = T::from_str(&value).map_err(|err| {
287                error!(
288                    backtrace = %std::backtrace::Backtrace::capture(),
289                    "Cannot convert string `{value}` to type:`{type}` - {err:?}",
290                    r#type = std::any::type_name::<T>()
291                );
292                FromSqlError::InvalidType
293            })?;
294            Ok(Self(value))
295        }
296    }
297
298    // Used as a wrapper for `FromSqlError::Other`
299    #[derive(Debug, thiserror::Error)]
300    #[error("{0}")]
301    pub(crate) struct OtherError(&'static str);
302    pub(crate) fn consistency_rusqlite(input: &'static str) -> rusqlite::Error {
303        FromSqlError::other(OtherError(input)).into()
304    }
305
306    pub(crate) fn consistency_db_err(input: &'static str) -> DbErrorGeneric {
307        DbErrorGeneric::Uncategorized(input.into())
308    }
309}
310#[derive(Debug)]
311struct CombinedStateDTO {
312    state: String,
313    ffqn: String,
314    pending_expires_finished: DateTime<Utc>,
315    // Locked:
316    last_lock_version: Option<Version>,
317    executor_id: Option<ExecutorId>,
318    run_id: Option<RunId>,
319    // Blocked by join set:
320    join_set_id: Option<JoinSetId>,
321    join_set_closing: Option<bool>,
322    // Finished:
323    result_kind: Option<PendingStateFinishedResultKind>,
324}
325#[derive(Debug)]
326struct CombinedState {
327    ffqn: FunctionFqn,
328    pending_state: PendingState,
329    corresponding_version: Version,
330}
331impl CombinedState {
332    fn get_next_version_assert_not_finished(&self) -> Version {
333        assert!(!self.pending_state.is_finished());
334        self.corresponding_version.increment()
335    }
336
337    #[cfg(feature = "test")]
338    fn get_next_version_or_finished(&self) -> Version {
339        if self.pending_state.is_finished() {
340            self.corresponding_version.clone()
341        } else {
342            self.corresponding_version.increment()
343        }
344    }
345}
346
347#[derive(Debug)]
348struct NotifierPendingAt {
349    scheduled_at: DateTime<Utc>,
350    ffqn: FunctionFqn,
351}
352
353#[derive(Debug)]
354struct NotifierExecutionFinished {
355    execution_id: ExecutionId,
356    retval: SupportedFunctionReturnValue,
357}
358
359#[derive(Debug, Default)]
360struct AppendNotifier {
361    pending_at: Option<NotifierPendingAt>,
362    execution_finished: Option<NotifierExecutionFinished>,
363    response: Option<(ExecutionId, JoinSetResponseEventOuter)>,
364}
365
366impl CombinedState {
367    fn new(
368        dto: &CombinedStateDTO,
369        corresponding_version: Version,
370    ) -> Result<Self, rusqlite::Error> {
371        let pending_state = match dto {
372            CombinedStateDTO {
373                state,
374                ffqn: _,
375                pending_expires_finished: scheduled_at,
376                last_lock_version: None,
377                executor_id: None,
378                run_id: None,
379                join_set_id: None,
380                join_set_closing: None,
381                result_kind: None,
382            } if state == STATE_PENDING_AT => Ok(PendingState::PendingAt {
383                scheduled_at: *scheduled_at,
384            }),
385            CombinedStateDTO {
386                state,
387                ffqn: _,
388                pending_expires_finished: lock_expires_at,
389                last_lock_version: Some(_),
390                executor_id: Some(executor_id),
391                run_id: Some(run_id),
392                join_set_id: None,
393                join_set_closing: None,
394                result_kind: None,
395            } if state == STATE_LOCKED => Ok(PendingState::Locked {
396                executor_id: *executor_id,
397                run_id: *run_id,
398                lock_expires_at: *lock_expires_at,
399            }),
400            CombinedStateDTO {
401                state,
402                ffqn: _,
403                pending_expires_finished: lock_expires_at,
404                last_lock_version: None,
405                executor_id: None,
406                run_id: None,
407                join_set_id: Some(join_set_id),
408                join_set_closing: Some(join_set_closing),
409                result_kind: None,
410            } if state == STATE_BLOCKED_BY_JOIN_SET => Ok(PendingState::BlockedByJoinSet {
411                join_set_id: join_set_id.clone(),
412                closing: *join_set_closing,
413                lock_expires_at: *lock_expires_at,
414            }),
415            CombinedStateDTO {
416                state,
417                ffqn: _,
418                pending_expires_finished: finished_at,
419                last_lock_version: None,
420                executor_id: None,
421                run_id: None,
422                join_set_id: None,
423                join_set_closing: None,
424                result_kind: Some(result_kind),
425            } if state == STATE_FINISHED => Ok(PendingState::Finished {
426                finished: PendingStateFinished {
427                    finished_at: *finished_at,
428                    version: corresponding_version.0,
429                    result_kind: *result_kind,
430                },
431            }),
432            _ => {
433                error!("Cannot deserialize pending state from  {dto:?}");
434                Err(consistency_rusqlite("invalid `t_state`"))
435            }
436        }?;
437        Ok(Self {
438            ffqn: FunctionFqn::from_str(&dto.ffqn).map_err(|parse_err| {
439                error!("Error parsing ffqn of {dto:?} - {parse_err:?}");
440                consistency_rusqlite("invalid ffqn value in `t_state`")
441            })?,
442            pending_state,
443            corresponding_version,
444        })
445    }
446}
447
448#[derive(Debug, Clone, Copy, PartialEq, Eq)]
449enum CommandPriority {
450    // Write, Shutdown
451    High,
452    // Read
453    Medium,
454    // Lock scan
455    Low,
456}
457
458#[derive(derive_more::Debug)]
459enum ThreadCommand {
460    Func {
461        #[debug(skip)]
462        func: Box<dyn FnOnce(&mut Connection) + Send>,
463        priority: CommandPriority,
464        sent_at: Instant,
465        name: &'static str,
466    },
467    Shutdown,
468    Dummy,
469}
470
471#[derive(Clone)]
472pub struct SqlitePool(SqlitePoolInner);
473
474type ResponseSubscribers =
475    Arc<Mutex<HashMap<ExecutionId, (oneshot::Sender<JoinSetResponseEventOuter>, u64)>>>;
476type PendingFfqnSubscribers = Arc<Mutex<HashMap<FunctionFqn, (mpsc::Sender<()>, u64)>>>;
477type ExecutionFinishedSubscribers =
478    Mutex<HashMap<ExecutionId, HashMap<u64, oneshot::Sender<SupportedFunctionReturnValue>>>>;
479#[derive(Clone)]
480struct SqlitePoolInner {
481    shutdown_requested: Arc<AtomicBool>,
482    shutdown_finished: Arc<AtomicBool>,
483    command_tx: tokio::sync::mpsc::Sender<ThreadCommand>,
484    response_subscribers: ResponseSubscribers,
485    pending_ffqn_subscribers: PendingFfqnSubscribers,
486    execution_finished_subscribers: Arc<ExecutionFinishedSubscribers>,
487    join_handle: Option<Arc<std::thread::JoinHandle<()>>>, // always Some, Optional for swapping in drop.
488}
489
490#[async_trait]
491impl DbPoolCloseable for SqlitePool {
492    async fn close(self) {
493        self.0.shutdown_requested.store(true, Ordering::Release);
494        // Unblock the thread's blocking_recv. If the capacity is reached, the next processed message will trigger shutdown.
495        let _ = self.0.command_tx.try_send(ThreadCommand::Shutdown);
496        while !self.0.shutdown_finished.load(Ordering::Acquire) {
497            tokio::time::sleep(Duration::from_millis(1)).await;
498        }
499    }
500}
501
502#[async_trait]
503impl DbPool for SqlitePool {
504    fn connection(&self) -> Box<dyn DbConnection> {
505        Box::new(self.clone())
506    }
507}
508impl Drop for SqlitePool {
509    fn drop(&mut self) {
510        let arc = self.0.join_handle.take().expect("join_handle was set");
511        if let Ok(join_handle) = Arc::try_unwrap(arc) {
512            // Last holder
513            if !join_handle.is_finished() {
514                if !self.0.shutdown_finished.load(Ordering::Acquire) {
515                    // Best effort to shut down the sqlite thread.
516                    let backtrace = std::backtrace::Backtrace::capture();
517                    warn!("SqlitePool was not closed properly - {backtrace}");
518                    self.0.shutdown_requested.store(true, Ordering::Release);
519                    // Unblock the thread's blocking_recv. If the capacity is reached, the next processed message will trigger shutdown.
520                    let _ = self.0.command_tx.try_send(ThreadCommand::Shutdown);
521                    // Not joining the thread, drop might be called from async context.
522                    // We are shutting down the server anyway.
523                } else {
524                    // The thread set `shutdown_finished` as its last operation.
525                }
526            }
527        }
528    }
529}
530
531#[derive(Debug, Clone)]
532pub struct SqliteConfig {
533    pub queue_capacity: usize,
534    pub low_prio_threshold: usize,
535    pub pragma_override: Option<HashMap<String, String>>,
536    pub metrics_threshold: Option<Duration>,
537}
538impl Default for SqliteConfig {
539    fn default() -> Self {
540        Self {
541            queue_capacity: 100,
542            low_prio_threshold: 100,
543            pragma_override: None,
544            metrics_threshold: None,
545        }
546    }
547}
548
549impl SqlitePool {
550    fn init_thread(
551        path: &Path,
552        mut pragma_override: HashMap<String, String>,
553    ) -> Result<Connection, InitializationError> {
554        fn execute<P: Params>(
555            conn: &Connection,
556            sql: &str,
557            params: P,
558        ) -> Result<(), InitializationError> {
559            conn.execute(sql, params).map(|_| ()).map_err(|err| {
560                error!("Cannot run `{sql}` - {err:?}");
561                InitializationError
562            })
563        }
564        fn pragma_update(
565            conn: &Connection,
566            name: &str,
567            value: &str,
568        ) -> Result<(), InitializationError> {
569            if value.is_empty() {
570                debug!("Querying PRAGMA {name}");
571                conn.pragma_query(None, name, |row| {
572                    debug!("{row:?}");
573                    Ok(())
574                })
575                .map_err(|err| {
576                    error!("cannot update pragma `{name}`=`{value}` - {err:?}");
577                    InitializationError
578                })
579            } else {
580                debug!("Setting PRAGMA {name}={value}");
581                conn.pragma_update(None, name, value).map_err(|err| {
582                    error!("cannot update pragma `{name}`=`{value}` - {err:?}");
583                    InitializationError
584                })
585            }
586        }
587
588        let conn = Connection::open_with_flags(path, OpenFlags::default()).map_err(|err| {
589            error!("cannot open the connection - {err:?}");
590            InitializationError
591        })?;
592
593        for [pragma_name, default_value] in PRAGMA {
594            let pragma_value = pragma_override
595                .remove(pragma_name)
596                .unwrap_or_else(|| default_value.to_string());
597            pragma_update(&conn, pragma_name, &pragma_value)?;
598        }
599        // drain the rest overrides
600        for (pragma_name, pragma_value) in pragma_override.drain() {
601            pragma_update(&conn, &pragma_name, &pragma_value)?;
602        }
603
604        // t_metadata
605        execute(&conn, CREATE_TABLE_T_METADATA, [])?;
606        // Insert row if not exists.
607        execute(
608            &conn,
609            &format!(
610                "INSERT INTO t_metadata (schema_version, created_at) VALUES
611                    ({T_METADATA_EXPECTED_SCHEMA_VERSION}, ?) ON CONFLICT DO NOTHING"
612            ),
613            [Utc::now()],
614        )?;
615        // Fail on unexpected `schema_version`.
616        let actual_version = conn
617            .prepare("SELECT schema_version FROM t_metadata ORDER BY id DESC LIMIT 1")
618            .map_err(|err| {
619                error!("cannot select schema version - {err:?}");
620                InitializationError
621            })?
622            .query_row([], |row| row.get::<_, u32>("schema_version"));
623
624        let actual_version = actual_version.map_err(|err| {
625            error!("Cannot read the schema version - {err:?}");
626            InitializationError
627        })?;
628        if actual_version != T_METADATA_EXPECTED_SCHEMA_VERSION {
629            error!(
630                "wrong schema version, expected {T_METADATA_EXPECTED_SCHEMA_VERSION}, got {actual_version}"
631            );
632            return Err(InitializationError);
633        }
634
635        // t_execution_log
636        execute(&conn, CREATE_TABLE_T_EXECUTION_LOG, [])?;
637        execute(
638            &conn,
639            CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_VERSION,
640            [],
641        )?;
642        execute(
643            &conn,
644            CREATE_INDEX_IDX_T_EXECUTION_ID_EXECUTION_ID_VARIANT,
645            [],
646        )?;
647        // t_join_set_response
648        execute(&conn, CREATE_TABLE_T_JOIN_SET_RESPONSE, [])?;
649        execute(
650            &conn,
651            CREATE_INDEX_IDX_T_JOIN_SET_RESPONSE_EXECUTION_ID_ID,
652            [],
653        )?;
654        // t_state
655        execute(&conn, CREATE_TABLE_T_STATE, [])?;
656        execute(&conn, IDX_T_STATE_LOCK_PENDING, [])?;
657        execute(&conn, IDX_T_STATE_EXPIRED_TIMERS, [])?;
658        execute(&conn, IDX_T_STATE_EXECUTION_ID_IS_TOP_LEVEL, [])?;
659        execute(&conn, IDX_T_STATE_FFQN, [])?;
660        execute(&conn, IDX_T_STATE_CREATED_AT, [])?;
661        // t_delay
662        execute(&conn, CREATE_TABLE_T_DELAY, [])?;
663        // t_backtrace
664        execute(&conn, CREATE_TABLE_T_BACKTRACE, [])?;
665        execute(&conn, IDX_T_BACKTRACE_EXECUTION_ID_VERSION, [])?;
666        Ok(conn)
667    }
668
669    fn execute(
670        priority_filter: CommandPriority,
671        vec: &mut Vec<ThreadCommand>,
672        conn: &mut Connection,
673        shutdown_requested: &AtomicBool,
674        histograms: &mut Histograms,
675    ) -> Result<usize, ()> {
676        let mut processed = 0;
677        for item in vec {
678            if matches!(item, ThreadCommand::Func { priority, .. } if *priority == priority_filter )
679            {
680                let item = std::mem::replace(item, ThreadCommand::Dummy); // get owned item
681                let (func, sent_at, func_name) = assert_matches!(item, ThreadCommand::Func{func, sent_at, name, ..} => (func, sent_at, name));
682                let sent_latency = sent_at.elapsed();
683                let started_at = Instant::now();
684                func(conn);
685                let func_duration = started_at.elapsed();
686                processed += 1;
687                histograms.record(sent_latency, func_name, func_duration);
688            }
689            if shutdown_requested.load(Ordering::Acquire) {
690                // recheck after every function
691                debug!("Recveived shutdown during processing of the batch");
692                return Err(());
693            }
694        }
695        Ok(processed)
696    }
697
698    fn connection_rpc(
699        mut conn: Connection,
700        shutdown_requested: &AtomicBool,
701        shutdown_finished: &AtomicBool,
702        mut command_rx: mpsc::Receiver<ThreadCommand>,
703        queue_capacity: usize,
704        low_prio_threshold: usize,
705        metrics_threshold: Option<Duration>,
706    ) {
707        let mut vec: Vec<ThreadCommand> = Vec::with_capacity(queue_capacity);
708
709        let mut histograms = Histograms::new(metrics_threshold);
710
711        let mut metrics_instant = std::time::Instant::now();
712        while Self::tick(
713            &mut vec,
714            &mut conn,
715            shutdown_requested,
716            &mut command_rx,
717            low_prio_threshold,
718            &mut histograms,
719            &mut metrics_instant,
720        )
721        .is_ok()
722        {
723            // Loop until shutdown is set to true.
724        }
725        debug!("Closing command thread");
726        shutdown_finished.store(true, Ordering::Release);
727    }
728
729    fn tick(
730        vec: &mut Vec<ThreadCommand>,
731        conn: &mut Connection,
732        shutdown_requested: &AtomicBool,
733        command_rx: &mut mpsc::Receiver<ThreadCommand>,
734        low_prio_threshold: usize,
735        histograms: &mut Histograms,
736        metrics_instant: &mut std::time::Instant,
737    ) -> Result<(), ()> {
738        vec.clear();
739        if let Some(item) = command_rx.blocking_recv() {
740            vec.push(item);
741        } else {
742            debug!("command_rx was closed");
743            return Err(());
744        }
745        while let Ok(more) = command_rx.try_recv() {
746            vec.push(more);
747        }
748        // Did we receive Shutdown in the batch?
749        let shutdown_found = vec
750            .iter()
751            .any(|item| matches!(item, ThreadCommand::Shutdown));
752        if shutdown_found || shutdown_requested.load(Ordering::Acquire) {
753            debug!("Recveived shutdown before processing the batch");
754            return Err(());
755        }
756
757        let processed = Self::execute(
758            CommandPriority::High,
759            vec,
760            conn,
761            shutdown_requested,
762            histograms,
763        )? + Self::execute(
764            CommandPriority::Medium,
765            vec,
766            conn,
767            shutdown_requested,
768            histograms,
769        )?;
770        if processed < low_prio_threshold {
771            Self::execute(
772                CommandPriority::Low,
773                vec,
774                conn,
775                shutdown_requested,
776                histograms,
777            )?;
778        }
779
780        histograms.print_if_elapsed(metrics_instant);
781        Ok(())
782    }
783
784    #[instrument(level = Level::DEBUG, skip_all, name = "sqlite_new")]
785    pub async fn new<P: AsRef<Path>>(
786        path: P,
787        config: SqliteConfig,
788    ) -> Result<Self, InitializationError> {
789        let path = path.as_ref().to_owned();
790
791        let shutdown_requested = Arc::new(AtomicBool::new(false));
792        let shutdown_finished = Arc::new(AtomicBool::new(false));
793
794        let (command_tx, command_rx) = tokio::sync::mpsc::channel(config.queue_capacity);
795        info!("Sqlite database location: {path:?}");
796        let join_handle = {
797            // Initialize the `Connection`.
798            let init_task = {
799                tokio::task::spawn_blocking(move || {
800                    Self::init_thread(&path, config.pragma_override.unwrap_or_default())
801                })
802                .await
803            };
804            let conn = match init_task {
805                Ok(res) => res?,
806                Err(join_err) => {
807                    error!("Initialization panic - {join_err:?}");
808                    return Err(InitializationError);
809                }
810            };
811            let shutdown_requested = shutdown_requested.clone();
812            let shutdown_finished = shutdown_finished.clone();
813            // Start the RPC thread.
814            std::thread::spawn(move || {
815                Self::connection_rpc(
816                    conn,
817                    &shutdown_requested,
818                    &shutdown_finished,
819                    command_rx,
820                    config.queue_capacity,
821                    config.low_prio_threshold,
822                    config.metrics_threshold,
823                );
824            })
825        };
826        Ok(SqlitePool(SqlitePoolInner {
827            shutdown_requested,
828            shutdown_finished,
829            command_tx,
830            response_subscribers: Arc::default(),
831            pending_ffqn_subscribers: Arc::default(),
832            join_handle: Some(Arc::new(join_handle)),
833            execution_finished_subscribers: Arc::default(),
834        }))
835    }
836
837    #[instrument(level = Level::TRACE, skip_all, fields(name))]
838    pub async fn transaction_write<F, T, E>(
839        &self,
840        func: F,
841        name: &'static str,
842    ) -> Result<Result<T, E>, DbErrorGeneric /* only close variant */>
843    where
844        F: FnOnce(&mut rusqlite::Transaction) -> Result<T, E> + Send + 'static,
845        T: Send + 'static,
846        E: From<DbErrorGeneric> + Send + 'static,
847    {
848        self.transaction(func, true, name).await
849    }
850
851    #[instrument(level = Level::TRACE, skip_all, fields(name))]
852    pub async fn transaction_read<F, T, E>(
853        &self,
854        func: F,
855        name: &'static str,
856    ) -> Result<Result<T, E>, DbErrorGeneric /* only close variant */>
857    where
858        F: FnOnce(&mut rusqlite::Transaction) -> Result<T, E> + Send + 'static,
859        T: Send + 'static,
860        E: From<DbErrorGeneric> + Send + 'static,
861    {
862        self.transaction(func, false, name).await
863    }
864
865    /// Invokes the provided function wrapping a new [`rusqlite::Transaction`] that is committed automatically.
866    async fn transaction<F, T, E>(
867        &self,
868        func: F,
869        write: bool,
870        name: &'static str,
871    ) -> Result<Result<T, E>, DbErrorGeneric /* only close variant */>
872    where
873        F: FnOnce(&mut rusqlite::Transaction) -> Result<T, E> + Send + 'static,
874        T: Send + 'static,
875        E: From<DbErrorGeneric> + Send + 'static,
876    {
877        let (tx, rx) = oneshot::channel();
878        let parent_span = Span::current();
879
880        self.0
881            .command_tx
882            .send(ThreadCommand::Func {
883                priority: if write {
884                    CommandPriority::High
885                } else {
886                    CommandPriority::Medium
887                },
888                func: Box::new(move |conn| {
889                    let tx_begin =
890                        debug_span!(parent: &parent_span, "tx_begin", name).in_scope(|| {
891                            conn.transaction_with_behavior(if write {
892                                rusqlite::TransactionBehavior::Immediate
893                            } else {
894                                rusqlite::TransactionBehavior::Deferred
895                            })
896                            .map_err(result_err_generic)
897                            .map_err(E::from)
898                        });
899                    let tx_apply = tx_begin.and_then(|mut transaction| {
900                        parent_span.in_scope(|| func(&mut transaction).map(|ok| (ok, transaction)))
901                    });
902                    let tx_commit = tx_apply.and_then(|(ok, transaction)| {
903                        debug_span!(parent: &parent_span, "tx_commit", name).in_scope(|| {
904                            transaction
905                                .commit()
906                                .map(|()| ok)
907                                .map_err(result_err_generic)
908                                .map_err(E::from)
909                        })
910                    });
911                    tx.send(tx_commit)
912                        .unwrap_or_else(|_| unreachable!("outer fn waits forever for the result"));
913                }),
914                sent_at: Instant::now(),
915                name,
916            })
917            .await
918            .map_err(|_send_err| DbErrorGeneric::Close)?;
919        rx.await.map_err(|_recv_err| DbErrorGeneric::Close)
920    }
921
922    #[instrument(level = Level::TRACE, skip_all, fields(name))]
923    pub async fn conn_low_prio<F, T>(
924        &self,
925        func: F,
926        name: &'static str,
927    ) -> Result<T, DbErrorGeneric>
928    where
929        F: FnOnce(&Connection) -> Result<T, DbErrorGeneric> + Send + 'static,
930        T: Send + 'static + Default,
931    {
932        let (tx, rx) = oneshot::channel();
933        let span = tracing::trace_span!("tx_function");
934        self.0
935            .command_tx
936            .send(ThreadCommand::Func {
937                priority: CommandPriority::Low,
938                func: Box::new(move |conn| {
939                    tx.send(span.in_scope(|| func(conn)))
940                        .unwrap_or_else(|_| unreachable!("outer fn waits forever for the result"));
941                }),
942                sent_at: Instant::now(),
943                name,
944            })
945            .await
946            .map_err(|_send_err| DbErrorGeneric::Close)?;
947        match rx.await {
948            Ok(res) => res,
949            Err(_recv_err) => Ok(T::default()), // Dropped computation because of other priorities..
950        }
951    }
952
953    fn fetch_created_event(
954        conn: &Connection,
955        execution_id: &ExecutionId,
956    ) -> Result<CreateRequest, DbErrorRead> {
957        let mut stmt = conn
958            .prepare(
959                "SELECT created_at, json_value FROM t_execution_log WHERE \
960            execution_id = :execution_id AND version = 0",
961            )
962            .map_err(prepare_err_generic)?;
963        let (created_at, event) = stmt
964            .query_row(
965                named_params! {
966                    ":execution_id": execution_id.to_string(),
967                },
968                |row| {
969                    let created_at = row.get("created_at")?;
970                    let event = row
971                        .get::<_, JsonWrapper<ExecutionEventInner>>("json_value")
972                        .map_err(|serde| {
973                            error!("cannot deserialize `Created` event: {row:?} - `{serde:?}`");
974                            consistency_rusqlite("cannot deserialize `Created` event")
975                        })?;
976                    Ok((created_at, event.0))
977                },
978            )
979            .map_err(result_err_generic)?;
980        if let ExecutionEventInner::Created {
981            ffqn,
982            params,
983            parent,
984            scheduled_at,
985            retry_exp_backoff,
986            max_retries,
987            component_id,
988            metadata,
989            scheduled_by,
990        } = event
991        {
992            Ok(CreateRequest {
993                created_at,
994                execution_id: execution_id.clone(),
995                ffqn,
996                params,
997                parent,
998                scheduled_at,
999                retry_exp_backoff,
1000                max_retries,
1001                component_id,
1002                metadata,
1003                scheduled_by,
1004            })
1005        } else {
1006            error!("Row with version=0 must be a `Created` event - {event:?}");
1007            Err(consistency_db_err("expected `Created` event").into())
1008        }
1009    }
1010
1011    fn check_expected_next_and_appending_version(
1012        expected_version: &Version,
1013        appending_version: &Version,
1014    ) -> Result<(), DbErrorWrite> {
1015        if *expected_version != *appending_version {
1016            debug!(
1017                "Version conflict - expected: {expected_version:?}, appending: {appending_version:?}"
1018            );
1019            return Err(DbErrorWrite::Permanent(
1020                DbErrorWritePermanent::CannotWrite {
1021                    reason: "version conflict".into(),
1022                    expected_version: Some(expected_version.clone()),
1023                },
1024            ));
1025        }
1026        Ok(())
1027    }
1028
1029    #[instrument(level = Level::TRACE, skip_all, fields(execution_id = %req.execution_id))]
1030    fn create_inner(
1031        tx: &Transaction,
1032        req: CreateRequest,
1033    ) -> Result<(AppendResponse, AppendNotifier), DbErrorWrite> {
1034        debug!("create_inner");
1035
1036        let version = Version::default();
1037        let execution_id = req.execution_id.clone();
1038        let execution_id_str = execution_id.to_string();
1039        let ffqn = req.ffqn.clone();
1040        let created_at = req.created_at;
1041        let scheduled_at = req.scheduled_at;
1042        let max_retries = req.max_retries;
1043        let backoff_millis =
1044            u64::try_from(req.retry_exp_backoff.as_millis()).expect("backoff too big");
1045        let event = ExecutionEventInner::from(req);
1046        let event_ser = serde_json::to_string(&event).map_err(|err| {
1047            error!("Cannot serialize {event:?} - {err:?}");
1048            DbErrorWritePermanent::ValidationFailed("parameter serialization error".into())
1049        })?;
1050        tx.prepare(
1051                "INSERT INTO t_execution_log (execution_id, created_at, version, json_value, variant, join_set_id ) \
1052                VALUES (:execution_id, :created_at, :version, :json_value, :variant, :join_set_id)")
1053        .map_err(prepare_err_generic)?
1054        .execute(named_params! {
1055            ":execution_id": &execution_id_str,
1056            ":created_at": created_at,
1057            ":version": version.0,
1058            ":json_value": event_ser,
1059            ":variant": event.variant(),
1060            ":join_set_id": event.join_set_id().map(std::string::ToString::to_string),
1061        })
1062        .map_err(result_err_generic)?;
1063        let pending_state = PendingState::PendingAt { scheduled_at };
1064        let pending_at = {
1065            let scheduled_at = assert_matches!(pending_state, PendingState::PendingAt { scheduled_at } => scheduled_at);
1066            debug!("Creating with `Pending(`{scheduled_at:?}`)");
1067            tx.prepare(
1068                r"
1069                INSERT INTO t_state (
1070                    execution_id,
1071                    is_top_level,
1072                    corresponding_version,
1073                    pending_expires_finished,
1074                    ffqn,
1075                    state,
1076                    created_at,
1077                    updated_at,
1078                    scheduled_at,
1079                    intermittent_event_count,
1080                    max_retries,
1081                    retry_exp_backoff_millis
1082                    )
1083                VALUES (
1084                    :execution_id,
1085                    :is_top_level,
1086                    :corresponding_version,
1087                    :pending_expires_finished,
1088                    :ffqn,
1089                    :state,
1090                    :created_at,
1091                    CURRENT_TIMESTAMP,
1092                    :scheduled_at,
1093                    0,
1094                    :max_retries,
1095                    :retry_exp_backoff_millis
1096                    )
1097                ",
1098            )
1099            .map_err(prepare_err_generic)?
1100            .execute(named_params! {
1101                ":execution_id": execution_id.to_string(),
1102                ":is_top_level": execution_id.is_top_level(),
1103                ":corresponding_version": version.0,
1104                ":pending_expires_finished": scheduled_at,
1105                ":ffqn": ffqn.to_string(),
1106                ":state": STATE_PENDING_AT,
1107                ":created_at": created_at,
1108                ":scheduled_at": scheduled_at,
1109                ":max_retries": max_retries,
1110                ":retry_exp_backoff_millis": backoff_millis,
1111            })
1112            .map_err(result_err_generic)?;
1113            AppendNotifier {
1114                pending_at: Some(NotifierPendingAt { scheduled_at, ffqn }),
1115                execution_finished: None,
1116                response: None,
1117            }
1118        };
1119        let next_version = Version::new(version.0 + 1);
1120        Ok((next_version, pending_at))
1121    }
1122
1123    #[instrument(level = Level::DEBUG, skip_all, fields(%execution_id, %scheduled_at, %corresponding_version))]
1124    fn update_state_pending_after_response_appended(
1125        tx: &Transaction,
1126        execution_id: &ExecutionId,
1127        scheduled_at: DateTime<Utc>,     // Changing to state PendingAt
1128        corresponding_version: &Version, // t_execution_log is not be changed
1129    ) -> Result<AppendNotifier, DbErrorWrite> {
1130        debug!("Setting t_state to Pending(`{scheduled_at:?}`) after response appended");
1131        let execution_id_str = execution_id.to_string();
1132        let mut stmt = tx
1133            .prepare_cached(
1134                r"
1135                UPDATE t_state
1136                SET
1137                    corresponding_version = :corresponding_version,
1138                    pending_expires_finished = :pending_expires_finished,
1139                    state = :state,
1140                    updated_at = CURRENT_TIMESTAMP,
1141
1142                    last_lock_version = NULL,
1143                    executor_id = NULL,
1144                    run_id = NULL,
1145
1146                    join_set_id = NULL,
1147                    join_set_closing = NULL,
1148
1149                    result_kind = NULL
1150                WHERE execution_id = :execution_id
1151            ",
1152            )
1153            .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1154        let updated = stmt
1155            .execute(named_params! {
1156                ":execution_id": execution_id_str,
1157                ":corresponding_version": corresponding_version.0,
1158                ":pending_expires_finished": scheduled_at,
1159                ":state": STATE_PENDING_AT,
1160            })
1161            .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1162        if updated != 1 {
1163            return Err(DbErrorWrite::NotFound);
1164        }
1165        Ok(AppendNotifier {
1166            pending_at: Some(NotifierPendingAt {
1167                scheduled_at,
1168                ffqn: Self::fetch_created_event(tx, execution_id)?.ffqn,
1169            }),
1170            execution_finished: None,
1171            response: None,
1172        })
1173    }
1174
1175    #[instrument(level = Level::DEBUG, skip_all, fields(%execution_id, %scheduled_at, %appending_version))]
1176    fn update_state_pending_after_event_appended(
1177        tx: &Transaction,
1178        execution_id: &ExecutionId,
1179        appending_version: &Version,
1180        scheduled_at: DateTime<Utc>, // Changing to state PendingAt
1181        intermittent_failure: bool,
1182    ) -> Result<(AppendResponse, AppendNotifier), DbErrorWrite> {
1183        debug!("Setting t_state to Pending(`{scheduled_at:?}`) after event appended");
1184        // If response idx is unknown, use 0. This distinguishs the two paths.
1185        // JoinNext will always send an actual idx.
1186        let mut stmt = tx
1187            .prepare_cached(
1188                r"
1189                UPDATE t_state
1190                SET
1191                    corresponding_version = :appending_version,
1192                    pending_expires_finished = :pending_expires_finished,
1193                    state = :state,
1194                    updated_at = CURRENT_TIMESTAMP,
1195                    intermittent_event_count = intermittent_event_count + :intermittent_delta,
1196
1197                    last_lock_version = NULL,
1198                    executor_id = NULL,
1199                    run_id = NULL,
1200
1201                    join_set_id = NULL,
1202                    join_set_closing = NULL,
1203
1204                    result_kind = NULL
1205                WHERE execution_id = :execution_id;
1206            ",
1207            )
1208            .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1209        let updated = stmt
1210            .execute(named_params! {
1211                ":execution_id": execution_id.to_string(),
1212                ":appending_version": appending_version.0,
1213                ":pending_expires_finished": scheduled_at,
1214                ":state": STATE_PENDING_AT,
1215                ":intermittent_delta": i32::from(intermittent_failure) // 0 or 1
1216            })
1217            .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1218        if updated != 1 {
1219            return Err(DbErrorWrite::NotFound);
1220        }
1221        Ok((
1222            appending_version.increment(),
1223            AppendNotifier {
1224                pending_at: Some(NotifierPendingAt {
1225                    scheduled_at,
1226                    ffqn: Self::fetch_created_event(tx, execution_id)?.ffqn,
1227                }),
1228                execution_finished: None,
1229                response: None,
1230            },
1231        ))
1232    }
1233
1234    fn update_state_locked_get_intermittent_event_count(
1235        tx: &Transaction,
1236        execution_id: &ExecutionId,
1237        executor_id: ExecutorId,
1238        run_id: RunId,
1239        lock_expires_at: DateTime<Utc>,
1240        appending_version: &Version,
1241    ) -> Result<u32, DbErrorWrite> {
1242        debug!("Setting t_state to Locked(`{lock_expires_at:?}`)");
1243        let execution_id_str = execution_id.to_string();
1244        let mut stmt = tx
1245            .prepare_cached(
1246                r"
1247                UPDATE t_state
1248                SET
1249                    corresponding_version = :appending_version,
1250                    pending_expires_finished = :pending_expires_finished,
1251                    state = :state,
1252                    updated_at = CURRENT_TIMESTAMP,
1253
1254                    last_lock_version = :appending_version,
1255                    executor_id = :executor_id,
1256                    run_id = :run_id,
1257
1258                    join_set_id = NULL,
1259                    join_set_closing = NULL,
1260
1261                    result_kind = NULL
1262                WHERE execution_id = :execution_id
1263            ",
1264            )
1265            .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1266        let updated = stmt
1267            .execute(named_params! {
1268                ":execution_id": execution_id_str,
1269                ":appending_version": appending_version.0,
1270                ":pending_expires_finished": lock_expires_at,
1271                ":state": STATE_LOCKED,
1272                ":executor_id": executor_id.to_string(),
1273                ":run_id": run_id.to_string(),
1274            })
1275            .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1276        if updated != 1 {
1277            return Err(DbErrorWrite::NotFound);
1278        }
1279
1280        // fetch intermittent event count from the just-inserted row.
1281        let intermittent_event_count = tx
1282            .prepare(
1283                "SELECT intermittent_event_count FROM t_state WHERE execution_id = :execution_id",
1284            )
1285            .map_err(prepare_err_generic)?
1286            .query_row(
1287                named_params! {
1288                    ":execution_id": execution_id_str,
1289                },
1290                |row| {
1291                    let intermittent_event_count = row.get("intermittent_event_count")?;
1292                    Ok(intermittent_event_count)
1293                },
1294            )
1295            .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1296
1297        Ok(intermittent_event_count)
1298    }
1299
1300    fn update_state_blocked(
1301        tx: &Transaction,
1302        execution_id: &ExecutionId,
1303        appending_version: &Version,
1304        // BlockedByJoinSet fields
1305        join_set_id: &JoinSetId,
1306        lock_expires_at: DateTime<Utc>,
1307        join_set_closing: bool,
1308    ) -> Result<
1309        AppendResponse, // next version
1310        DbErrorWrite,
1311    > {
1312        debug!("Setting t_state to BlockedByJoinSet(`{join_set_id}`)");
1313        let execution_id_str = execution_id.to_string();
1314        let mut stmt = tx
1315            .prepare_cached(
1316                r"
1317                UPDATE t_state
1318                SET
1319                    corresponding_version = :appending_version,
1320                    pending_expires_finished = :pending_expires_finished,
1321                    state = :state,
1322                    updated_at = CURRENT_TIMESTAMP,
1323
1324                    last_lock_version = NULL,
1325                    executor_id = NULL,
1326                    run_id = NULL,
1327
1328                    join_set_id = :join_set_id,
1329                    join_set_closing = :join_set_closing,
1330
1331                    result_kind = NULL
1332                WHERE execution_id = :execution_id
1333            ",
1334            )
1335            .map_err(prepare_err_generic)?;
1336        let updated = stmt
1337            .execute(named_params! {
1338                ":execution_id": execution_id_str,
1339                ":appending_version": appending_version.0,
1340                ":pending_expires_finished": lock_expires_at,
1341                ":state": STATE_BLOCKED_BY_JOIN_SET,
1342                ":join_set_id": join_set_id,
1343                ":join_set_closing": join_set_closing,
1344            })
1345            .map_err(result_err_generic)?;
1346        if updated != 1 {
1347            return Err(DbErrorWrite::NotFound);
1348        }
1349        Ok(appending_version.increment())
1350    }
1351
1352    fn update_state_finished(
1353        tx: &Transaction,
1354        execution_id: &ExecutionId,
1355        appending_version: &Version,
1356        // Finished fields
1357        finished_at: DateTime<Utc>,
1358        result_kind: PendingStateFinishedResultKind,
1359    ) -> Result<(), DbErrorWrite> {
1360        debug!("Setting t_state to Finished");
1361        let execution_id_str = execution_id.to_string();
1362        let mut stmt = tx
1363            .prepare_cached(
1364                r"
1365                UPDATE t_state
1366                SET
1367                    corresponding_version = :appending_version,
1368                    pending_expires_finished = :pending_expires_finished,
1369                    state = :state,
1370                    updated_at = CURRENT_TIMESTAMP,
1371
1372                    last_lock_version = NULL,
1373                    executor_id = NULL,
1374                    run_id = NULL,
1375
1376                    join_set_id = NULL,
1377                    join_set_closing = NULL,
1378
1379                    result_kind = :result_kind
1380                WHERE execution_id = :execution_id
1381            ",
1382            )
1383            .map_err(prepare_err_generic)?;
1384        let updated = stmt
1385            .execute(named_params! {
1386                ":execution_id": execution_id_str,
1387                ":appending_version": appending_version.0,
1388                ":pending_expires_finished": finished_at,
1389                ":state": STATE_FINISHED,
1390                ":result_kind": result_kind.to_string(),
1391            })
1392            .map_err(result_err_generic)?;
1393        if updated != 1 {
1394            return Err(DbErrorWrite::NotFound);
1395        }
1396        Ok(())
1397    }
1398
1399    // Upon appending new event to t_execution_log, copy the previous t_state with changed appending_version and created_at.
1400    #[instrument(level = Level::TRACE, skip_all, fields(%execution_id, %appending_version))]
1401    fn bump_state_next_version(
1402        tx: &Transaction,
1403        execution_id: &ExecutionId,
1404        appending_version: &Version,
1405        delay_req: Option<DelayReq>,
1406    ) -> Result<AppendResponse /* next version */, DbErrorWrite> {
1407        debug!("update_index_version");
1408        let execution_id_str = execution_id.to_string();
1409        let mut stmt = tx
1410            .prepare_cached(
1411                r"
1412                UPDATE t_state
1413                SET
1414                    corresponding_version = :appending_version,
1415                    updated_at = CURRENT_TIMESTAMP
1416                WHERE execution_id = :execution_id
1417            ",
1418            )
1419            .map_err(prepare_err_generic)?;
1420        let updated = stmt
1421            .execute(named_params! {
1422                ":execution_id": execution_id_str,
1423                ":appending_version": appending_version.0,
1424            })
1425            .map_err(result_err_generic)?;
1426        if updated != 1 {
1427            return Err(DbErrorWrite::NotFound);
1428        }
1429        if let Some(DelayReq {
1430            join_set_id,
1431            delay_id,
1432            expires_at,
1433        }) = delay_req
1434        {
1435            debug!("Inserting delay to `t_delay`");
1436            let mut stmt = tx
1437                .prepare_cached(
1438                    "INSERT INTO t_delay (execution_id, join_set_id, delay_id, expires_at) \
1439                VALUES \
1440                (:execution_id, :join_set_id, :delay_id, :expires_at)",
1441                )
1442                .map_err(prepare_err_generic)?;
1443            stmt.execute(named_params! {
1444                ":execution_id": execution_id_str,
1445                ":join_set_id": join_set_id.to_string(),
1446                ":delay_id": delay_id.to_string(),
1447                ":expires_at": expires_at,
1448            })
1449            .map_err(result_err_generic)?;
1450        }
1451        Ok(appending_version.increment())
1452    }
1453
1454    fn get_combined_state(
1455        tx: &Transaction,
1456        execution_id: &ExecutionId,
1457    ) -> Result<CombinedState, DbErrorRead> {
1458        let mut stmt = tx
1459            .prepare(
1460                r"
1461                SELECT
1462                    state, ffqn, corresponding_version, pending_expires_finished,
1463                    last_lock_version, executor_id, run_id,
1464                    join_set_id, join_set_closing,
1465                    result_kind
1466                    FROM t_state
1467                WHERE
1468                    execution_id = :execution_id
1469                ",
1470            )
1471            .map_err(prepare_err_generic)?;
1472        stmt.query_row(
1473            named_params! {
1474                ":execution_id": execution_id.to_string(),
1475            },
1476            |row| {
1477                CombinedState::new(
1478                    &CombinedStateDTO {
1479                        state: row.get("state")?,
1480                        ffqn: row.get("ffqn")?,
1481                        pending_expires_finished: row
1482                            .get::<_, DateTime<Utc>>("pending_expires_finished")?,
1483                        last_lock_version: row
1484                            .get::<_, Option<VersionType>>("last_lock_version")?
1485                            .map(Version::new),
1486                        executor_id: row.get::<_, Option<ExecutorId>>("executor_id")?,
1487                        run_id: row.get::<_, Option<RunId>>("run_id")?,
1488                        join_set_id: row.get::<_, Option<JoinSetId>>("join_set_id")?,
1489                        join_set_closing: row.get::<_, Option<bool>>("join_set_closing")?,
1490                        result_kind: row
1491                            .get::<_, Option<FromStrWrapper<PendingStateFinishedResultKind>>>(
1492                                "result_kind",
1493                            )?
1494                            .map(|wrapper| wrapper.0),
1495                    },
1496                    Version::new(row.get("corresponding_version")?),
1497                )
1498            },
1499        )
1500        .map_err(result_err_read)
1501    }
1502
1503    fn list_executions(
1504        read_tx: &Transaction,
1505        ffqn: Option<FunctionFqn>,
1506        top_level_only: bool,
1507        pagination: ExecutionListPagination,
1508    ) -> Result<Vec<ExecutionWithState>, DbErrorGeneric> {
1509        struct StatementModifier {
1510            where_vec: Vec<String>,
1511            params: Vec<(&'static str, Box<dyn rusqlite::ToSql>)>,
1512            limit: u32,
1513            limit_desc: bool,
1514        }
1515        fn paginate<T: rusqlite::ToSql + 'static>(
1516            pagination: Pagination<Option<T>>,
1517            column: &str,
1518            top_level_only: bool,
1519        ) -> StatementModifier {
1520            let mut where_vec: Vec<String> = vec![];
1521            let mut params: Vec<(&'static str, Box<dyn rusqlite::ToSql>)> = vec![];
1522            let limit = pagination.length();
1523            let limit_desc = pagination.is_desc();
1524            let rel = pagination.rel();
1525            match pagination {
1526                Pagination::NewerThan {
1527                    cursor: Some(cursor),
1528                    ..
1529                }
1530                | Pagination::OlderThan {
1531                    cursor: Some(cursor),
1532                    ..
1533                } => {
1534                    where_vec.push(format!("{column} {rel} :cursor"));
1535                    params.push((":cursor", Box::new(cursor)));
1536                }
1537                _ => {}
1538            }
1539            if top_level_only {
1540                where_vec.push("is_top_level=true".to_string());
1541            }
1542            StatementModifier {
1543                where_vec,
1544                params,
1545                limit,
1546                limit_desc,
1547            }
1548        }
1549        let mut statement_mod = match pagination {
1550            ExecutionListPagination::CreatedBy(pagination) => {
1551                paginate(pagination, "created_at", top_level_only)
1552            }
1553            ExecutionListPagination::ExecutionId(pagination) => {
1554                paginate(pagination, "execution_id", top_level_only)
1555            }
1556        };
1557
1558        if let Some(ffqn) = ffqn {
1559            statement_mod.where_vec.push("ffqn = :ffqn".to_string());
1560            statement_mod
1561                .params
1562                .push((":ffqn", Box::new(ffqn.to_string())));
1563        }
1564
1565        let where_str = if statement_mod.where_vec.is_empty() {
1566            String::new()
1567        } else {
1568            format!("WHERE {}", statement_mod.where_vec.join(" AND "))
1569        };
1570        let sql = format!(
1571            r"
1572            SELECT created_at, scheduled_at, state, execution_id, ffqn, corresponding_version, pending_expires_finished,
1573            last_lock_version, executor_id, run_id,
1574            join_set_id, join_set_closing,
1575            result_kind
1576            FROM t_state {where_str} ORDER BY created_at {desc} LIMIT {limit}
1577            ",
1578            desc = if statement_mod.limit_desc { "DESC" } else { "" },
1579            limit = statement_mod.limit
1580        );
1581        let mut vec: Vec<_> = read_tx
1582            .prepare(&sql)
1583            .map_err(prepare_err_generic)?
1584            .query_map::<_, &[(&'static str, &dyn rusqlite::ToSql)], _>(
1585                statement_mod
1586                    .params
1587                    .iter()
1588                    .map(|(key, value)| (*key, value.as_ref()))
1589                    .collect::<Vec<_>>()
1590                    .as_ref(),
1591                |row| {
1592                    let execution_id = row.get::<_, ExecutionId>("execution_id")?;
1593                    let created_at = row.get("created_at")?;
1594                    let scheduled_at = row.get("scheduled_at")?;
1595                    let combined_state = CombinedState::new(
1596                        &CombinedStateDTO {
1597                            state: row.get("state")?,
1598                            ffqn: row.get("ffqn")?,
1599                            pending_expires_finished: row
1600                                .get::<_, DateTime<Utc>>("pending_expires_finished")?,
1601                            executor_id: row.get::<_, Option<ExecutorId>>("executor_id")?,
1602
1603                            last_lock_version: row
1604                                .get::<_, Option<VersionType>>("last_lock_version")?
1605                                .map(Version::new),
1606                            run_id: row.get::<_, Option<RunId>>("run_id")?,
1607                            join_set_id: row.get::<_, Option<JoinSetId>>("join_set_id")?,
1608                            join_set_closing: row.get::<_, Option<bool>>("join_set_closing")?,
1609                            result_kind: row
1610                                .get::<_, Option<FromStrWrapper<PendingStateFinishedResultKind>>>(
1611                                    "result_kind",
1612                                )?
1613                                .map(|wrapper| wrapper.0),
1614                        },
1615                        Version::new(row.get("corresponding_version")?),
1616                    )?;
1617                    Ok(ExecutionWithState {
1618                        execution_id,
1619                        ffqn: combined_state.ffqn,
1620                        pending_state: combined_state.pending_state,
1621                        created_at,
1622                        scheduled_at,
1623                    })
1624                },
1625            )
1626            .map_err(intermittent_err_generic)?
1627            .collect::<Vec<Result<_, _>>>()
1628            .into_iter()
1629            .filter_map(|row| match row {
1630                Ok(row) => Some(row),
1631                Err(err) => {
1632                    warn!("Skipping row - {err:?}");
1633                    None
1634                }
1635            })
1636            .collect();
1637
1638        if !statement_mod.limit_desc {
1639            // the list must be sorted in descending order
1640            vec.reverse();
1641        }
1642        Ok(vec)
1643    }
1644
1645    #[instrument(level = Level::TRACE, skip_all, fields(%execution_id, %run_id, %executor_id))]
1646    #[expect(clippy::too_many_arguments)]
1647    fn lock_single_execution(
1648        tx: &Transaction,
1649        created_at: DateTime<Utc>,
1650        component_id: ComponentId,
1651        execution_id: &ExecutionId,
1652        run_id: RunId,
1653        appending_version: &Version,
1654        executor_id: ExecutorId,
1655        lock_expires_at: DateTime<Utc>,
1656    ) -> Result<LockedExecution, DbErrorWrite> {
1657        debug!("lock_inner");
1658        let combined_state = Self::get_combined_state(tx, execution_id)?;
1659        combined_state.pending_state.can_append_lock(
1660            created_at,
1661            executor_id,
1662            run_id,
1663            lock_expires_at,
1664        )?;
1665        let expected_version = combined_state.get_next_version_assert_not_finished();
1666        Self::check_expected_next_and_appending_version(&expected_version, appending_version)?;
1667
1668        // Append to `execution_log` table.
1669        let event = ExecutionEventInner::Locked {
1670            component_id,
1671            executor_id,
1672            lock_expires_at,
1673            run_id,
1674        };
1675        let event_ser = serde_json::to_string(&event).map_err(|err| {
1676            warn!("Cannot serialize {event:?} - {err:?}");
1677            DbErrorWritePermanent::ValidationFailed("parameter serialization error".into())
1678        })?;
1679        let mut stmt = tx
1680            .prepare_cached(
1681                "INSERT INTO t_execution_log \
1682            (execution_id, created_at, json_value, version, variant) \
1683            VALUES \
1684            (:execution_id, :created_at, :json_value, :version, :variant)",
1685            )
1686            .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1687        stmt.execute(named_params! {
1688            ":execution_id": execution_id.to_string(),
1689            ":created_at": created_at,
1690            ":json_value": event_ser,
1691            ":version": appending_version.0,
1692            ":variant": event.variant(),
1693        })
1694        .map_err(|err| {
1695            warn!("Cannot lock execution - {err:?}");
1696            DbErrorWritePermanent::CannotWrite {
1697                reason: "cannot lock execution".into(),
1698                expected_version: None,
1699            }
1700        })?;
1701
1702        // Update `t_state`
1703        let responses = Self::list_responses(tx, execution_id, None)?;
1704        let responses = responses.into_iter().map(|resp| resp.event).collect();
1705        trace!("Responses: {responses:?}");
1706
1707        let intermittent_event_count = Self::update_state_locked_get_intermittent_event_count(
1708            tx,
1709            execution_id,
1710            executor_id,
1711            run_id,
1712            lock_expires_at,
1713            appending_version,
1714        )?;
1715        // Fetch event_history and `Created` event to construct the response.
1716        let mut events = tx
1717            .prepare(
1718                "SELECT json_value FROM t_execution_log WHERE \
1719                execution_id = :execution_id AND (variant = :v1 OR variant = :v2) \
1720                ORDER BY version",
1721            )
1722            .map_err(prepare_err_generic)?
1723            .query_map(
1724                named_params! {
1725                    ":execution_id": execution_id.to_string(),
1726                    ":v1": DUMMY_CREATED.variant(),
1727                    ":v2": DUMMY_HISTORY_EVENT.variant(),
1728                },
1729                |row| {
1730                    row.get::<_, JsonWrapper<ExecutionEventInner>>("json_value")
1731                        .map(|wrapper| wrapper.0)
1732                        .map_err(|serde| {
1733                            error!("Cannot deserialize {row:?} - {serde:?}");
1734                            consistency_rusqlite("cannot deserialize json value")
1735                        })
1736                },
1737            )
1738            .map_err(result_err_generic)?
1739            .collect::<Result<Vec<_>, _>>()
1740            .map_err(result_err_generic)?
1741            .into_iter()
1742            .collect::<VecDeque<_>>();
1743        let Some(ExecutionEventInner::Created {
1744            ffqn,
1745            params,
1746            retry_exp_backoff,
1747            max_retries,
1748            parent,
1749            metadata,
1750            ..
1751        }) = events.pop_front()
1752        else {
1753            error!("Execution log must contain at least `Created` event");
1754            return Err(consistency_db_err("execution log must contain `Created` event").into());
1755        };
1756
1757        let event_history = events
1758            .into_iter()
1759            .map(|event| {
1760                if let ExecutionEventInner::HistoryEvent { event } = event {
1761                    Ok(event)
1762                } else {
1763                    error!("Rows can only contain `Created` and `HistoryEvent` event kinds");
1764                    Err(consistency_db_err(
1765                        "rows can only contain `Created` and `HistoryEvent` event kinds",
1766                    ))
1767                }
1768            })
1769            .collect::<Result<Vec<_>, _>>()?;
1770        Ok(LockedExecution {
1771            execution_id: execution_id.clone(),
1772            metadata,
1773            run_id,
1774            next_version: appending_version.increment(),
1775            ffqn,
1776            params,
1777            event_history,
1778            responses,
1779            retry_exp_backoff,
1780            max_retries,
1781            parent,
1782            intermittent_event_count,
1783        })
1784    }
1785
1786    fn count_join_next(
1787        tx: &Transaction,
1788        execution_id: &ExecutionId,
1789        join_set_id: &JoinSetId,
1790    ) -> Result<u64, DbErrorRead> {
1791        let mut stmt = tx.prepare(
1792            "SELECT COUNT(*) as count FROM t_execution_log WHERE execution_id = :execution_id AND join_set_id = :join_set_id \
1793            AND history_event_type = :join_next",
1794        ).map_err(prepare_err_generic)?;
1795        Ok(stmt
1796            .query_row(
1797                named_params! {
1798                    ":execution_id": execution_id.to_string(),
1799                    ":join_set_id": join_set_id.to_string(),
1800                    ":join_next": HISTORY_EVENT_TYPE_JOIN_NEXT,
1801                },
1802                |row| row.get("count"),
1803            )
1804            .map_err(result_err_generic)?)
1805    }
1806
1807    #[instrument(level = Level::TRACE, skip_all, fields(%execution_id))]
1808    #[expect(clippy::needless_return)]
1809    fn append(
1810        tx: &Transaction,
1811        execution_id: &ExecutionId,
1812        req: AppendRequest,
1813        appending_version: Version,
1814    ) -> Result<(AppendResponse, AppendNotifier), DbErrorWrite> {
1815        if matches!(req.event, ExecutionEventInner::Created { .. }) {
1816            return Err(DbErrorWrite::Permanent(
1817                DbErrorWritePermanent::ValidationFailed(
1818                    "cannot append `Created` event - use `create` instead".into(),
1819                ),
1820            ));
1821        }
1822        if let AppendRequest {
1823            event:
1824                ExecutionEventInner::Locked {
1825                    component_id,
1826                    executor_id,
1827                    run_id,
1828                    lock_expires_at,
1829                },
1830            created_at,
1831        } = req
1832        {
1833            return Self::lock_single_execution(
1834                tx,
1835                created_at,
1836                component_id,
1837                execution_id,
1838                run_id,
1839                &appending_version,
1840                executor_id,
1841                lock_expires_at,
1842            )
1843            .map(|locked_execution| (locked_execution.next_version, AppendNotifier::default()));
1844        }
1845
1846        let combined_state = Self::get_combined_state(tx, execution_id)?;
1847        if combined_state.pending_state.is_finished() {
1848            debug!("Execution is already finished");
1849            return Err(DbErrorWrite::Permanent(
1850                DbErrorWritePermanent::CannotWrite {
1851                    reason: "already finished".into(),
1852                    expected_version: None,
1853                },
1854            ));
1855        }
1856
1857        Self::check_expected_next_and_appending_version(
1858            &combined_state.get_next_version_assert_not_finished(),
1859            &appending_version,
1860        )?;
1861        let event_ser = serde_json::to_string(&req.event).map_err(|err| {
1862            error!("Cannot serialize {:?} - {err:?}", req.event);
1863            DbErrorWritePermanent::ValidationFailed("parameter serialization error".into())
1864        })?;
1865
1866        let mut stmt = tx.prepare(
1867                    "INSERT INTO t_execution_log (execution_id, created_at, json_value, version, variant, join_set_id) \
1868                    VALUES (:execution_id, :created_at, :json_value, :version, :variant, :join_set_id)")
1869                    .map_err(prepare_err_generic)?;
1870        stmt.execute(named_params! {
1871            ":execution_id": execution_id.to_string(),
1872            ":created_at": req.created_at,
1873            ":json_value": event_ser,
1874            ":version": appending_version.0,
1875            ":variant": req.event.variant(),
1876            ":join_set_id": req.event.join_set_id().map(std::string::ToString::to_string),
1877        })
1878        .map_err(result_err_generic)?;
1879        // Calculate current pending state
1880
1881        match &req.event {
1882            ExecutionEventInner::Created { .. } => {
1883                unreachable!("handled in the caller")
1884            }
1885
1886            ExecutionEventInner::Locked { .. } => {
1887                unreachable!("handled above")
1888            }
1889
1890            ExecutionEventInner::TemporarilyFailed {
1891                backoff_expires_at, ..
1892            }
1893            | ExecutionEventInner::TemporarilyTimedOut {
1894                backoff_expires_at, ..
1895            } => {
1896                let (next_version, notifier) = Self::update_state_pending_after_event_appended(
1897                    tx,
1898                    execution_id,
1899                    &appending_version,
1900                    *backoff_expires_at,
1901                    true, // an intermittent failure
1902                )?;
1903                return Ok((next_version, notifier));
1904            }
1905
1906            ExecutionEventInner::Unlocked {
1907                backoff_expires_at, ..
1908            } => {
1909                let (next_version, notifier) = Self::update_state_pending_after_event_appended(
1910                    tx,
1911                    execution_id,
1912                    &appending_version,
1913                    *backoff_expires_at,
1914                    false, // not an intermittent failure
1915                )?;
1916                return Ok((next_version, notifier));
1917            }
1918
1919            ExecutionEventInner::Finished { result, .. } => {
1920                Self::update_state_finished(
1921                    tx,
1922                    execution_id,
1923                    &appending_version,
1924                    req.created_at,
1925                    PendingStateFinishedResultKind::from(result),
1926                )?;
1927                return Ok((
1928                    appending_version,
1929                    AppendNotifier {
1930                        pending_at: None,
1931                        execution_finished: Some(NotifierExecutionFinished {
1932                            execution_id: execution_id.clone(),
1933                            retval: result.clone(),
1934                        }),
1935                        response: None,
1936                    },
1937                ));
1938            }
1939
1940            ExecutionEventInner::HistoryEvent {
1941                event:
1942                    HistoryEvent::JoinSetCreate { .. }
1943                    | HistoryEvent::JoinSetRequest {
1944                        request: JoinSetRequest::ChildExecutionRequest { .. },
1945                        ..
1946                    }
1947                    | HistoryEvent::Persist { .. }
1948                    | HistoryEvent::Schedule { .. }
1949                    | HistoryEvent::Stub { .. }
1950                    | HistoryEvent::JoinNextTooMany { .. },
1951            } => {
1952                return Ok((
1953                    Self::bump_state_next_version(tx, execution_id, &appending_version, None)?,
1954                    AppendNotifier::default(),
1955                ));
1956            }
1957
1958            ExecutionEventInner::HistoryEvent {
1959                event:
1960                    HistoryEvent::JoinSetRequest {
1961                        join_set_id,
1962                        request:
1963                            JoinSetRequest::DelayRequest {
1964                                delay_id,
1965                                expires_at,
1966                                ..
1967                            },
1968                    },
1969            } => {
1970                return Ok((
1971                    Self::bump_state_next_version(
1972                        tx,
1973                        execution_id,
1974                        &appending_version,
1975                        Some(DelayReq {
1976                            join_set_id: join_set_id.clone(),
1977                            delay_id: delay_id.clone(),
1978                            expires_at: *expires_at,
1979                        }),
1980                    )?,
1981                    AppendNotifier::default(),
1982                ));
1983            }
1984
1985            ExecutionEventInner::HistoryEvent {
1986                event:
1987                    HistoryEvent::JoinNext {
1988                        join_set_id,
1989                        run_expires_at,
1990                        closing,
1991                        requested_ffqn: _,
1992                    },
1993            } => {
1994                // Did the response arrive already?
1995                let join_next_count = Self::count_join_next(tx, execution_id, join_set_id)?;
1996                let nth_response =
1997                    Self::nth_response(tx, execution_id, join_set_id, join_next_count - 1)?; // Skip n-1 rows
1998                trace!("join_next_count: {join_next_count}, nth_response: {nth_response:?}");
1999                assert!(join_next_count > 0);
2000                if let Some(ResponseWithCursor {
2001                    event:
2002                        JoinSetResponseEventOuter {
2003                            created_at: nth_created_at,
2004                            ..
2005                        },
2006                    cursor: _,
2007                }) = nth_response
2008                {
2009                    let scheduled_at = max(*run_expires_at, nth_created_at); // No need to block
2010                    let (next_version, notifier) = Self::update_state_pending_after_event_appended(
2011                        tx,
2012                        execution_id,
2013                        &appending_version,
2014                        scheduled_at,
2015                        false, // not an intermittent failure
2016                    )?;
2017                    return Ok((next_version, notifier));
2018                }
2019                return Ok((
2020                    Self::update_state_blocked(
2021                        tx,
2022                        execution_id,
2023                        &appending_version,
2024                        join_set_id,
2025                        *run_expires_at,
2026                        *closing,
2027                    )?,
2028                    AppendNotifier::default(),
2029                ));
2030            }
2031        }
2032    }
2033
2034    fn append_response(
2035        tx: &Transaction,
2036        execution_id: &ExecutionId,
2037        response_outer: JoinSetResponseEventOuter,
2038    ) -> Result<AppendNotifier, DbErrorWrite> {
2039        let mut stmt = tx.prepare(
2040            "INSERT INTO t_join_set_response (execution_id, created_at, join_set_id, delay_id, child_execution_id, finished_version) \
2041                    VALUES (:execution_id, :created_at, :join_set_id, :delay_id, :child_execution_id, :finished_version)",
2042        ).map_err(prepare_err_generic)?;
2043        let join_set_id = &response_outer.event.join_set_id;
2044        let delay_id = match &response_outer.event.event {
2045            JoinSetResponse::DelayFinished { delay_id } => Some(delay_id.to_string()),
2046            JoinSetResponse::ChildExecutionFinished { .. } => None,
2047        };
2048        let (child_execution_id, finished_version) = match &response_outer.event.event {
2049            JoinSetResponse::ChildExecutionFinished {
2050                child_execution_id,
2051                finished_version,
2052                result: _,
2053            } => (
2054                Some(child_execution_id.to_string()),
2055                Some(finished_version.0),
2056            ),
2057            JoinSetResponse::DelayFinished { .. } => (None, None),
2058        };
2059
2060        stmt.execute(named_params! {
2061            ":execution_id": execution_id.to_string(),
2062            ":created_at": response_outer.created_at,
2063            ":join_set_id": join_set_id.to_string(),
2064            ":delay_id": delay_id,
2065            ":child_execution_id": child_execution_id,
2066            ":finished_version": finished_version,
2067        })
2068        .map_err(result_err_generic)?;
2069
2070        // if the execution is going to be unblocked by this response...
2071        let combined_state = Self::get_combined_state(tx, execution_id)?;
2072        debug!("previous_pending_state: {combined_state:?}");
2073        let mut notifier = if let PendingState::BlockedByJoinSet {
2074            join_set_id: found_join_set_id,
2075            lock_expires_at, // Set to a future time if the worker is keeping the execution warm waiting for the result.
2076            closing: _,
2077        } = combined_state.pending_state
2078            && *join_set_id == found_join_set_id
2079        {
2080            // PendingAt should be set to current time if called from expired_timers_watcher,
2081            // or to a future time if the execution is hot.
2082            let scheduled_at = max(lock_expires_at, response_outer.created_at);
2083            // TODO: Add diff test
2084            // Unblock the state.
2085            Self::update_state_pending_after_response_appended(
2086                tx,
2087                execution_id,
2088                scheduled_at,
2089                &combined_state.corresponding_version, // not changing the version
2090            )?
2091        } else {
2092            AppendNotifier::default()
2093        };
2094        if let JoinSetResponseEvent {
2095            join_set_id,
2096            event: JoinSetResponse::DelayFinished { delay_id },
2097        } = &response_outer.event
2098        {
2099            debug!(%join_set_id, %delay_id, "Deleting from `t_delay`");
2100            let mut stmt =
2101                tx.prepare_cached("DELETE FROM t_delay WHERE execution_id = :execution_id AND join_set_id = :join_set_id AND delay_id = :delay_id")
2102                .map_err(prepare_err_generic)?;
2103            stmt.execute(named_params! {
2104                ":execution_id": execution_id.to_string(),
2105                ":join_set_id": join_set_id.to_string(),
2106                ":delay_id": delay_id.to_string(),
2107            })
2108            .map_err(result_err_generic)?;
2109        }
2110        notifier.response = Some((execution_id.clone(), response_outer));
2111        Ok(notifier)
2112    }
2113
2114    fn append_backtrace(
2115        tx: &Transaction,
2116        backtrace_info: &BacktraceInfo,
2117    ) -> Result<(), DbErrorWrite> {
2118        let backtrace = serde_json::to_string(&backtrace_info.wasm_backtrace).map_err(|err| {
2119            warn!(
2120                "Cannot serialize backtrace {:?} - {err:?}",
2121                backtrace_info.wasm_backtrace
2122            );
2123            DbErrorWritePermanent::ValidationFailed("cannot serialize backtrace".into())
2124        })?;
2125        let mut stmt = tx
2126            .prepare(
2127                "INSERT INTO t_backtrace (execution_id, component_id, version_min_including, version_max_excluding, wasm_backtrace) \
2128                    VALUES (:execution_id, :component_id, :version_min_including, :version_max_excluding, :wasm_backtrace)",
2129            )
2130            .map_err(prepare_err_generic)?;
2131        stmt.execute(named_params! {
2132            ":execution_id": backtrace_info.execution_id.to_string(),
2133            ":component_id": backtrace_info.component_id.to_string(),
2134            ":version_min_including": backtrace_info.version_min_including.0,
2135            ":version_max_excluding": backtrace_info.version_max_excluding.0,
2136            ":wasm_backtrace": backtrace,
2137        })
2138        .map_err(result_err_generic)?;
2139        Ok(())
2140    }
2141
2142    #[cfg(feature = "test")]
2143    fn get(
2144        tx: &Transaction,
2145        execution_id: &ExecutionId,
2146    ) -> Result<concepts::storage::ExecutionLog, DbErrorRead> {
2147        let mut stmt = tx
2148            .prepare(
2149                "SELECT created_at, json_value FROM t_execution_log WHERE \
2150                        execution_id = :execution_id ORDER BY version",
2151            )
2152            .map_err(prepare_err_generic)?;
2153        let events = stmt
2154            .query_map(
2155                named_params! {
2156                    ":execution_id": execution_id.to_string(),
2157                },
2158                |row| {
2159                    let created_at = row.get("created_at")?;
2160                    let event = row
2161                        .get::<_, JsonWrapper<ExecutionEventInner>>("json_value")
2162                        .map_err(|serde| {
2163                            error!("Cannot deserialize {row:?} - {serde:?}");
2164                            consistency_rusqlite("cannot deserialize event")
2165                        })?
2166                        .0;
2167
2168                    Ok(ExecutionEvent {
2169                        created_at,
2170                        event,
2171                        backtrace_id: None,
2172                    })
2173                },
2174            )
2175            .map_err(result_err_read)?
2176            .collect::<Result<Vec<_>, _>>()
2177            .map_err(result_err_read)?;
2178        if events.is_empty() {
2179            return Err(DbErrorRead::NotFound);
2180        }
2181        let combined_state = Self::get_combined_state(tx, execution_id)?;
2182        let responses = Self::list_responses(tx, execution_id, None)?
2183            .into_iter()
2184            .map(|resp| resp.event)
2185            .collect();
2186        Ok(concepts::storage::ExecutionLog {
2187            execution_id: execution_id.clone(),
2188            events,
2189            responses,
2190            next_version: combined_state.get_next_version_or_finished(), // In case of finished, this will be the already last version
2191            pending_state: combined_state.pending_state,
2192        })
2193    }
2194
2195    fn list_execution_events(
2196        tx: &Transaction,
2197        execution_id: &ExecutionId,
2198        version_min: VersionType,
2199        version_max_excluding: VersionType,
2200        include_backtrace_id: bool,
2201    ) -> Result<Vec<ExecutionEvent>, DbErrorRead> {
2202        let select = if include_backtrace_id {
2203            "SELECT
2204                log.created_at,
2205                log.json_value,
2206                -- Select version_min_including from backtrace if a match is found, otherwise NULL
2207                bt.version_min_including AS backtrace_id
2208            FROM
2209                t_execution_log AS log
2210            LEFT OUTER JOIN -- Use LEFT JOIN to keep all logs even if no backtrace matches
2211                t_backtrace AS bt ON log.execution_id = bt.execution_id
2212                                -- Check if the log's version falls within the backtrace's range
2213                                AND log.version >= bt.version_min_including
2214                                AND log.version < bt.version_max_excluding
2215            WHERE
2216                log.execution_id = :execution_id
2217                AND log.version >= :version_min
2218                AND log.version < :version_max_excluding
2219            ORDER BY
2220                log.version;"
2221        } else {
2222            "SELECT
2223                created_at, json_value, NULL as backtrace_id
2224            FROM t_execution_log WHERE
2225                execution_id = :execution_id AND version >= :version_min AND version < :version_max_excluding
2226            ORDER BY version"
2227        };
2228        tx.prepare(select)
2229            .map_err(prepare_err_generic)?
2230            .query_map(
2231                named_params! {
2232                    ":execution_id": execution_id.to_string(),
2233                    ":version_min": version_min,
2234                    ":version_max_excluding": version_max_excluding
2235                },
2236                |row| {
2237                    let created_at = row.get("created_at")?;
2238                    let backtrace_id = row
2239                        .get::<_, Option<VersionType>>("backtrace_id")?
2240                        .map(Version::new);
2241                    let event = row
2242                        .get::<_, JsonWrapper<ExecutionEventInner>>("json_value")
2243                        .map(|event| ExecutionEvent {
2244                            created_at,
2245                            event: event.0,
2246                            backtrace_id,
2247                        })
2248                        .map_err(|serde| {
2249                            error!("Cannot deserialize {row:?} - {serde:?}");
2250                            consistency_rusqlite("cannot deserialize")
2251                        })?;
2252                    Ok(event)
2253                },
2254            )
2255            .map_err(result_err_read)?
2256            .collect::<Result<Vec<_>, _>>()
2257            .map_err(result_err_read)
2258    }
2259
2260    fn get_execution_event(
2261        tx: &Transaction,
2262        execution_id: &ExecutionId,
2263        version: VersionType,
2264    ) -> Result<ExecutionEvent, DbErrorRead> {
2265        let mut stmt = tx
2266            .prepare(
2267                "SELECT created_at, json_value FROM t_execution_log WHERE \
2268                        execution_id = :execution_id AND version = :version",
2269            )
2270            .map_err(prepare_err_generic)?;
2271        stmt.query_row(
2272            named_params! {
2273                ":execution_id": execution_id.to_string(),
2274                ":version": version,
2275            },
2276            |row| {
2277                let created_at = row.get("created_at")?;
2278                let event = row
2279                    .get::<_, JsonWrapper<ExecutionEventInner>>("json_value")
2280                    .map_err(|serde| {
2281                        error!("Cannot deserialize {row:?} - {serde:?}");
2282                        consistency_rusqlite("cannot deserialize event")
2283                    })?;
2284
2285                Ok(ExecutionEvent {
2286                    created_at,
2287                    event: event.0,
2288                    backtrace_id: None,
2289                })
2290            },
2291        )
2292        .map_err(result_err_read)
2293    }
2294
2295    fn list_responses(
2296        tx: &Transaction,
2297        execution_id: &ExecutionId,
2298        pagination: Option<Pagination<u32>>,
2299    ) -> Result<Vec<ResponseWithCursor>, DbErrorRead> {
2300        // TODO: Add test
2301        let mut params: Vec<(&'static str, Box<dyn rusqlite::ToSql>)> = vec![];
2302        let mut sql = "SELECT \
2303            r.id, r.created_at, r.join_set_id,  r.delay_id, r.child_execution_id, r.finished_version, l.json_value \
2304            FROM t_join_set_response r LEFT OUTER JOIN t_execution_log l ON r.child_execution_id = l.execution_id \
2305            WHERE \
2306            r.execution_id = :execution_id \
2307            AND ( r.finished_version = l.version OR r.child_execution_id IS NULL ) \
2308            "
2309        .to_string();
2310        let limit = match &pagination {
2311            Some(
2312                pagination @ (Pagination::NewerThan { cursor, .. }
2313                | Pagination::OlderThan { cursor, .. }),
2314            ) => {
2315                params.push((":cursor", Box::new(cursor)));
2316                write!(sql, " AND r.id {rel} :cursor", rel = pagination.rel()).unwrap();
2317                Some(pagination.length())
2318            }
2319            None => None,
2320        };
2321        sql.push_str(" ORDER BY id");
2322        if pagination.as_ref().is_some_and(Pagination::is_desc) {
2323            sql.push_str(" DESC");
2324        }
2325        if let Some(limit) = limit {
2326            write!(sql, " LIMIT {limit}").unwrap();
2327        }
2328        params.push((":execution_id", Box::new(execution_id.to_string())));
2329        tx.prepare(&sql)
2330            .map_err(prepare_err_generic)?
2331            .query_map::<_, &[(&'static str, &dyn ToSql)], _>(
2332                params
2333                    .iter()
2334                    .map(|(key, value)| (*key, value.as_ref()))
2335                    .collect::<Vec<_>>()
2336                    .as_ref(),
2337                Self::parse_response_with_cursor,
2338            )
2339            .map_err(intermittent_err_generic)?
2340            .collect::<Result<Vec<_>, rusqlite::Error>>()
2341            .map_err(result_err_read)
2342    }
2343
2344    fn parse_response_with_cursor(
2345        row: &rusqlite::Row<'_>,
2346    ) -> Result<ResponseWithCursor, rusqlite::Error> {
2347        let id = row.get("id")?;
2348        let created_at: DateTime<Utc> = row.get("created_at")?;
2349        let join_set_id = row.get::<_, JoinSetId>("join_set_id")?;
2350        let event = match (
2351            row.get::<_, Option<DelayId>>("delay_id")?,
2352            row.get::<_, Option<ExecutionIdDerived>>("child_execution_id")?,
2353            row.get::<_, Option<VersionType>>("finished_version")?,
2354            row.get::<_, Option<JsonWrapper<ExecutionEventInner>>>("json_value")?,
2355        ) {
2356            (Some(delay_id), None, None, None) => JoinSetResponse::DelayFinished { delay_id },
2357            (
2358                None,
2359                Some(child_execution_id),
2360                Some(finished_version),
2361                Some(JsonWrapper(ExecutionEventInner::Finished { result, .. })),
2362            ) => JoinSetResponse::ChildExecutionFinished {
2363                child_execution_id,
2364                finished_version: Version(finished_version),
2365                result,
2366            },
2367            (delay, child, finished, result) => {
2368                error!(
2369                    "Invalid row in t_join_set_response {id} - {:?} {child:?} {finished:?} {:?}",
2370                    delay,
2371                    result.map(|it| it.0)
2372                );
2373                return Err(consistency_rusqlite("invalid row in t_join_set_response"));
2374            }
2375        };
2376        Ok(ResponseWithCursor {
2377            cursor: id,
2378            event: JoinSetResponseEventOuter {
2379                event: JoinSetResponseEvent { join_set_id, event },
2380                created_at,
2381            },
2382        })
2383    }
2384
2385    fn nth_response(
2386        tx: &Transaction,
2387        execution_id: &ExecutionId,
2388        join_set_id: &JoinSetId,
2389        skip_rows: u64,
2390    ) -> Result<Option<ResponseWithCursor>, DbErrorRead> {
2391        // TODO: Add test
2392        tx
2393            .prepare(
2394                "SELECT r.id, r.created_at, r.join_set_id, \
2395                    r.delay_id, \
2396                    r.child_execution_id, r.finished_version, l.json_value \
2397                    FROM t_join_set_response r LEFT OUTER JOIN t_execution_log l ON r.child_execution_id = l.execution_id \
2398                    WHERE \
2399                    r.execution_id = :execution_id AND r.join_set_id = :join_set_id AND \
2400                    (
2401                    r.finished_version = l.version \
2402                    OR \
2403                    r.child_execution_id IS NULL \
2404                    ) \
2405                    ORDER BY id \
2406                    LIMIT 1 OFFSET :offset",
2407            )
2408            .map_err(prepare_err_generic)?
2409            .query_row(
2410                named_params! {
2411                    ":execution_id": execution_id.to_string(),
2412                    ":join_set_id": join_set_id.to_string(),
2413                    ":offset": skip_rows,
2414                },
2415                Self::parse_response_with_cursor,
2416            )
2417            .optional()
2418            .map_err(result_err_read)
2419    }
2420
2421    // TODO(perf): Instead of OFFSET an per-execution sequential ID could improve the read performance.
2422    #[instrument(level = Level::TRACE, skip_all)]
2423    fn get_responses_with_offset(
2424        tx: &Transaction,
2425        execution_id: &ExecutionId,
2426        skip_rows: usize,
2427    ) -> Result<Vec<JoinSetResponseEventOuter>, DbErrorRead> {
2428        // TODO: Add test
2429        tx.prepare(
2430            "SELECT r.id, r.created_at, r.join_set_id, \
2431            r.delay_id, \
2432            r.child_execution_id, r.finished_version, l.json_value \
2433            FROM t_join_set_response r LEFT OUTER JOIN t_execution_log l ON r.child_execution_id = l.execution_id \
2434            WHERE \
2435            r.execution_id = :execution_id AND \
2436            ( \
2437            r.finished_version = l.version \
2438            OR r.child_execution_id IS NULL \
2439            ) \
2440            ORDER BY id \
2441            LIMIT -1 OFFSET :offset",
2442        )
2443        .map_err(prepare_err_generic)?
2444        .query_map(
2445            named_params! {
2446                ":execution_id": execution_id.to_string(),
2447                ":offset": skip_rows,
2448            },
2449            Self::parse_response_with_cursor,
2450        )
2451        .map_err(result_err_read)?
2452        .collect::<Result<Vec<_>, _>>()
2453        .map_err(result_err_read)
2454        .map(|resp| resp.into_iter().map(|vec| vec.event).collect())
2455    }
2456
2457    fn get_pending_of_single_ffqn(
2458        mut stmt: CachedStatement,
2459        batch_size: usize,
2460        pending_at_or_sooner: DateTime<Utc>,
2461        ffqn: &FunctionFqn,
2462    ) -> Result<Vec<(ExecutionId, Version)>, ()> {
2463        stmt.query_map(
2464            named_params! {
2465                ":pending_expires_finished": pending_at_or_sooner,
2466                ":ffqn": ffqn.to_string(),
2467                ":batch_size": batch_size,
2468            },
2469            |row| {
2470                let execution_id = row.get::<_, String>("execution_id")?.parse::<ExecutionId>();
2471                let next_version =
2472                    Version::new(row.get::<_, VersionType>("corresponding_version")?).increment();
2473                Ok(execution_id.map(|exe| (exe, next_version)))
2474            },
2475        )
2476        .map_err(|err| {
2477            warn!("Ignoring consistency error {err:?}");
2478        })?
2479        .collect::<Result<Vec<_>, _>>()
2480        .map_err(|err| {
2481            warn!("Ignoring consistency error {err:?}");
2482        })?
2483        .into_iter()
2484        .collect::<Result<Vec<_>, _>>()
2485        .map_err(|err| {
2486            warn!("Ignoring consistency error {err:?}");
2487        })
2488    }
2489
2490    /// Get executions and their next versions
2491    fn get_pending(
2492        conn: &Connection,
2493        batch_size: usize,
2494        pending_at_or_sooner: DateTime<Utc>,
2495        ffqns: &[FunctionFqn],
2496    ) -> Result<Vec<(ExecutionId, Version)>, DbErrorGeneric> {
2497        let mut execution_ids_versions = Vec::with_capacity(batch_size);
2498        for ffqn in ffqns {
2499            // Select executions in PendingAt.
2500            let stmt = conn
2501                .prepare_cached(&format!(
2502                    r#"
2503                    SELECT execution_id, corresponding_version FROM t_state WHERE
2504                    state = "{STATE_PENDING_AT}" AND
2505                    pending_expires_finished <= :pending_expires_finished AND ffqn = :ffqn
2506                    ORDER BY pending_expires_finished LIMIT :batch_size
2507                    "#
2508                ))
2509                .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
2510
2511            if let Ok(execs_and_versions) = Self::get_pending_of_single_ffqn(
2512                stmt,
2513                batch_size - execution_ids_versions.len(),
2514                pending_at_or_sooner,
2515                ffqn,
2516            ) {
2517                execution_ids_versions.extend(execs_and_versions);
2518                if execution_ids_versions.len() == batch_size {
2519                    // Prioritieze lowering of db requests, although ffqns later in the list might get starved.
2520                    break;
2521                }
2522                // consistency errors are ignored since we want to return at least some rows.
2523            }
2524        }
2525        Ok(execution_ids_versions)
2526    }
2527
2528    // Must be called after write transaction for a correct happens-before relationship.
2529    #[instrument(level = Level::TRACE, skip_all)]
2530    fn notify_all(&self, notifiers: Vec<AppendNotifier>, current_time: DateTime<Utc>) {
2531        let (pending_ats, finished_execs, responses) = {
2532            let (mut pending_ats, mut finished_execs, mut responses) =
2533                (Vec::new(), Vec::new(), Vec::new());
2534            for notifier in notifiers {
2535                if let Some(pending_at) = notifier.pending_at {
2536                    pending_ats.push(pending_at);
2537                }
2538                if let Some(finished) = notifier.execution_finished {
2539                    finished_execs.push(finished);
2540                }
2541                if let Some(response) = notifier.response {
2542                    responses.push(response);
2543                }
2544            }
2545            (pending_ats, finished_execs, responses)
2546        };
2547
2548        // Notify pending_at subscribers.
2549        if !pending_ats.is_empty() {
2550            let guard = self.0.pending_ffqn_subscribers.lock().unwrap();
2551            for pending_at in pending_ats {
2552                Self::notify_pending_locked(&pending_at, current_time, &guard);
2553            }
2554        }
2555        // Notify execution finished subscribers.
2556        // Every NotifierExecutionFinished value belongs to a different execution, since only `append(Finished)` can produce `NotifierExecutionFinished`.
2557        if !finished_execs.is_empty() {
2558            let mut guard = self.0.execution_finished_subscribers.lock().unwrap();
2559            for finished in finished_execs {
2560                if let Some(listeners_of_exe_id) = guard.remove(&finished.execution_id) {
2561                    for (_tag, sender) in listeners_of_exe_id {
2562                        // Sending while holding the lock but the oneshot sender does not block.
2563                        // If `wait_for_finished_result` happens after the append, it would receive the finished value instead.
2564                        let _ = sender.send(finished.retval.clone());
2565                    }
2566                }
2567            }
2568        }
2569        // Notify response subscribers.
2570        if !responses.is_empty() {
2571            let mut guard = self.0.response_subscribers.lock().unwrap();
2572            for (execution_id, response) in responses {
2573                if let Some((sender, _)) = guard.remove(&execution_id) {
2574                    let _ = sender.send(response);
2575                }
2576            }
2577        }
2578    }
2579
2580    fn notify_pending_locked(
2581        notifier: &NotifierPendingAt,
2582        current_time: DateTime<Utc>,
2583        ffqn_to_pending_subscription: &std::sync::MutexGuard<
2584            HashMap<FunctionFqn, (mpsc::Sender<()>, u64)>,
2585        >,
2586    ) {
2587        // No need to remove here, cleanup is handled by the caller.
2588        if notifier.scheduled_at <= current_time
2589            && let Some((subscription, _)) = ffqn_to_pending_subscription.get(&notifier.ffqn)
2590        {
2591            debug!("Notifying pending subscriber");
2592            // Does not block
2593            let _ = subscription.try_send(());
2594        }
2595    }
2596}
2597
2598#[async_trait]
2599impl DbExecutor for SqlitePool {
2600    #[instrument(level = Level::TRACE, skip(self))]
2601    async fn lock_pending(
2602        &self,
2603        batch_size: usize,
2604        pending_at_or_sooner: DateTime<Utc>,
2605        ffqns: Arc<[FunctionFqn]>,
2606        created_at: DateTime<Utc>,
2607        component_id: ComponentId,
2608        executor_id: ExecutorId,
2609        lock_expires_at: DateTime<Utc>,
2610        run_id: RunId,
2611    ) -> Result<LockPendingResponse, DbErrorGeneric> {
2612        let execution_ids_versions = self
2613            .conn_low_prio(
2614                move |conn| Self::get_pending(conn, batch_size, pending_at_or_sooner, &ffqns),
2615                "get_pending",
2616            )
2617            .await?;
2618        if execution_ids_versions.is_empty() {
2619            Ok(vec![])
2620        } else {
2621            debug!("Locking {execution_ids_versions:?}");
2622            self.transaction_write(
2623                move |tx| {
2624                    let mut locked_execs = Vec::with_capacity(execution_ids_versions.len());
2625                    // Append lock
2626                    for (execution_id, version) in execution_ids_versions {
2627                        match Self::lock_single_execution(
2628                            tx,
2629                            created_at,
2630                            component_id.clone(),
2631                            &execution_id,
2632                            run_id,
2633                            &version,
2634                            executor_id,
2635                            lock_expires_at,
2636                        ) {
2637                            Ok(locked) => locked_execs.push(locked),
2638                            Err(err) => {
2639                                warn!("Locking row {execution_id} failed - {err:?}");
2640                            }
2641                        }
2642                    }
2643                    Ok(locked_execs)
2644                },
2645                "lock_pending",
2646            )
2647            .await?
2648        }
2649    }
2650
2651    #[instrument(level = Level::DEBUG, skip(self))]
2652    async fn lock_one(
2653        &self,
2654        created_at: DateTime<Utc>,
2655        component_id: ComponentId,
2656        execution_id: &ExecutionId,
2657        run_id: RunId,
2658        version: Version,
2659        executor_id: ExecutorId,
2660        lock_expires_at: DateTime<Utc>,
2661    ) -> Result<LockedExecution, DbErrorWrite> {
2662        debug!(%execution_id, "lock_extend");
2663        let execution_id = execution_id.clone();
2664        self.transaction_write(
2665            move |tx| {
2666                Self::lock_single_execution(
2667                    tx,
2668                    created_at,
2669                    component_id,
2670                    &execution_id,
2671                    run_id,
2672                    &version,
2673                    executor_id,
2674                    lock_expires_at,
2675                )
2676            },
2677            "lock_inner",
2678        )
2679        .await?
2680    }
2681
2682    #[instrument(level = Level::DEBUG, skip(self, req))]
2683    async fn append(
2684        &self,
2685        execution_id: ExecutionId,
2686        version: Version,
2687        req: AppendRequest,
2688    ) -> Result<AppendResponse, DbErrorWrite> {
2689        debug!(%req, "append");
2690        trace!(?req, "append");
2691        let created_at = req.created_at;
2692        let (version, notifier) = self
2693            .transaction_write(
2694                move |tx| Self::append(tx, &execution_id, req, version),
2695                "append",
2696            )
2697            .await??;
2698        self.notify_all(vec![notifier], created_at);
2699        Ok(version)
2700    }
2701
2702    #[instrument(level = Level::DEBUG, skip(self, batch, parent_response_event))]
2703    async fn append_batch_respond_to_parent(
2704        &self,
2705        execution_id: ExecutionIdDerived,
2706        current_time: DateTime<Utc>,
2707        batch: Vec<AppendRequest>,
2708        version: Version,
2709        parent_execution_id: ExecutionId,
2710        parent_response_event: JoinSetResponseEventOuter,
2711    ) -> Result<AppendBatchResponse, DbErrorWrite> {
2712        debug!("append_batch_respond_to_parent");
2713        let execution_id = ExecutionId::Derived(execution_id);
2714        if execution_id == parent_execution_id {
2715            // Pending state would be wrong.
2716            // This is not a panic because it depends on DB state.
2717            return Err(DbErrorWrite::Permanent(
2718                DbErrorWritePermanent::ValidationFailed(
2719                    "Parameters `execution_id` and `parent_execution_id` cannot be the same".into(),
2720                ),
2721            ));
2722        }
2723        trace!(
2724            ?batch,
2725            ?parent_response_event,
2726            "append_batch_respond_to_parent"
2727        );
2728        assert!(!batch.is_empty(), "Empty batch request");
2729        let (version, notifiers) = {
2730            self.transaction_write(
2731                move |tx| {
2732                    let mut version = version;
2733                    let mut notifier_of_child = None;
2734                    for append_request in batch {
2735                        let (v, n) = Self::append(tx, &execution_id, append_request, version)?;
2736                        version = v;
2737                        notifier_of_child = Some(n);
2738                    }
2739
2740                    let pending_at_parent =
2741                        Self::append_response(tx, &parent_execution_id, parent_response_event)?;
2742                    Ok::<_, DbErrorWrite>((
2743                        version,
2744                        vec![
2745                            notifier_of_child.expect("checked that the batch is not empty"),
2746                            pending_at_parent,
2747                        ],
2748                    ))
2749                },
2750                "append_batch_respond_to_parent",
2751            )
2752            .await??
2753        };
2754        self.notify_all(notifiers, current_time);
2755        Ok(version)
2756    }
2757
2758    // Supports only one subscriber per ffqn.
2759    // A new subscriber replaces the old one, which will eventually time out, which is fine.
2760    #[instrument(level = Level::TRACE, skip(self, timeout_fut))]
2761    async fn wait_for_pending(
2762        &self,
2763        pending_at_or_sooner: DateTime<Utc>,
2764        ffqns: Arc<[FunctionFqn]>,
2765        timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
2766    ) {
2767        let unique_tag: u64 = rand::random();
2768        let (sender, mut receiver) = mpsc::channel(1); // senders must use `try_send`
2769        {
2770            let mut ffqn_to_pending_subscription = self.0.pending_ffqn_subscribers.lock().unwrap();
2771            for ffqn in ffqns.as_ref() {
2772                ffqn_to_pending_subscription.insert(ffqn.clone(), (sender.clone(), unique_tag));
2773            }
2774        }
2775        async {
2776            let Ok(execution_ids_versions) = self
2777                .conn_low_prio(
2778                    {
2779                        let ffqns = ffqns.clone();
2780                        move |conn| Self::get_pending(conn, 1, pending_at_or_sooner, ffqns.as_ref())
2781                    },
2782                    "subscribe_to_pending",
2783                )
2784                .await
2785            else {
2786                trace!(
2787                    "Ignoring get_pending error and waiting in for timeout to avoid executor repolling too soon"
2788                );
2789                timeout_fut.await;
2790                return;
2791            };
2792            if !execution_ids_versions.is_empty() {
2793                trace!("Not waiting, database already contains new pending executions");
2794                return;
2795            }
2796            tokio::select! { // future's liveness: Dropping the loser immediately.
2797                _ = receiver.recv() => {
2798                    trace!("Received a notification");
2799                }
2800                () = timeout_fut => {
2801                }
2802            }
2803        }.await;
2804        // Clean up ffqn_to_pending_subscription in any case
2805        {
2806            let mut ffqn_to_pending_subscription = self.0.pending_ffqn_subscribers.lock().unwrap();
2807            for ffqn in ffqns.as_ref() {
2808                match ffqn_to_pending_subscription.remove(ffqn) {
2809                    Some((_, tag)) if tag == unique_tag => {
2810                        // Cleanup OK.
2811                    }
2812                    Some(other) => {
2813                        // Reinsert foreign sender.
2814                        ffqn_to_pending_subscription.insert(ffqn.clone(), other);
2815                    }
2816                    None => {
2817                        // Value was replaced and cleaned up already.
2818                    }
2819                }
2820            }
2821        }
2822    }
2823}
2824
2825#[async_trait]
2826impl DbConnection for SqlitePool {
2827    #[instrument(level = Level::DEBUG, skip_all, fields(execution_id = %req.execution_id))]
2828    async fn create(&self, req: CreateRequest) -> Result<AppendResponse, DbErrorWrite> {
2829        debug!("create");
2830        trace!(?req, "create");
2831        let created_at = req.created_at;
2832        let (version, notifier) = self
2833            .transaction_write(move |tx| Self::create_inner(tx, req), "create")
2834            .await??;
2835        self.notify_all(vec![notifier], created_at);
2836        Ok(version)
2837    }
2838
2839    #[instrument(level = Level::DEBUG, skip(self, batch))]
2840    async fn append_batch(
2841        &self,
2842        current_time: DateTime<Utc>,
2843        batch: Vec<AppendRequest>,
2844        execution_id: ExecutionId,
2845        version: Version,
2846    ) -> Result<AppendBatchResponse, DbErrorWrite> {
2847        debug!("append_batch");
2848        trace!(?batch, "append_batch");
2849        assert!(!batch.is_empty(), "Empty batch request");
2850
2851        let (version, notifier) = self
2852            .transaction_write(
2853                move |tx| {
2854                    let mut version = version;
2855                    let mut notifier = None;
2856                    for append_request in batch {
2857                        let (v, n) = Self::append(tx, &execution_id, append_request, version)?;
2858                        version = v;
2859                        notifier = Some(n);
2860                    }
2861                    Ok::<_, DbErrorWrite>((
2862                        version,
2863                        notifier.expect("checked that the batch is not empty"),
2864                    ))
2865                },
2866                "append_batch",
2867            )
2868            .await??;
2869
2870        self.notify_all(vec![notifier], current_time);
2871        Ok(version)
2872    }
2873
2874    #[instrument(level = Level::DEBUG, skip(self, batch, child_req))]
2875    async fn append_batch_create_new_execution(
2876        &self,
2877        current_time: DateTime<Utc>,
2878        batch: Vec<AppendRequest>,
2879        execution_id: ExecutionId,
2880        version: Version,
2881        child_req: Vec<CreateRequest>,
2882    ) -> Result<AppendBatchResponse, DbErrorWrite> {
2883        debug!("append_batch_create_new_execution");
2884        trace!(?batch, ?child_req, "append_batch_create_new_execution");
2885        assert!(!batch.is_empty(), "Empty batch request");
2886
2887        let (version, notifiers) = self
2888            .transaction_write(
2889                move |tx| {
2890                    let mut notifier = None;
2891                    let mut version = version;
2892                    for append_request in batch {
2893                        let (v, n) = Self::append(tx, &execution_id, append_request, version)?;
2894                        version = v;
2895                        notifier = Some(n);
2896                    }
2897                    let mut notifiers = Vec::new();
2898                    notifiers.push(notifier.expect("checked that the batch is not empty"));
2899
2900                    for child_req in child_req {
2901                        let (_, notifier) = Self::create_inner(tx, child_req)?;
2902                        notifiers.push(notifier);
2903                    }
2904                    Ok::<_, DbErrorWrite>((version, notifiers))
2905                },
2906                "append_batch_create_new_execution_inner",
2907            )
2908            .await??;
2909        self.notify_all(notifiers, current_time);
2910        Ok(version)
2911    }
2912
2913    #[cfg(feature = "test")]
2914    #[instrument(level = Level::DEBUG, skip(self))]
2915    async fn get(
2916        &self,
2917        execution_id: &ExecutionId,
2918    ) -> Result<concepts::storage::ExecutionLog, DbErrorRead> {
2919        trace!("get");
2920        let execution_id = execution_id.clone();
2921        self.transaction_read(move |tx| Self::get(tx, &execution_id), "get")
2922            .await?
2923    }
2924
2925    #[instrument(level = Level::DEBUG, skip(self))]
2926    async fn list_execution_events(
2927        &self,
2928        execution_id: &ExecutionId,
2929        since: &Version,
2930        max_length: VersionType,
2931        include_backtrace_id: bool,
2932    ) -> Result<Vec<ExecutionEvent>, DbErrorRead> {
2933        let execution_id = execution_id.clone();
2934        let since = since.0;
2935        self.transaction_read(
2936            move |tx| {
2937                Self::list_execution_events(
2938                    tx,
2939                    &execution_id,
2940                    since,
2941                    since + max_length,
2942                    include_backtrace_id,
2943                )
2944            },
2945            "get",
2946        )
2947        .await?
2948    }
2949
2950    // Supports only one subscriber per execution id.
2951    // A new call will overwrite the old subscriber, the old one will end
2952    // with a timeout, which is fine.
2953    #[instrument(level = Level::DEBUG, skip(self, timeout_fut))]
2954    async fn subscribe_to_next_responses(
2955        &self,
2956        execution_id: &ExecutionId,
2957        start_idx: usize,
2958        timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
2959    ) -> Result<Vec<JoinSetResponseEventOuter>, DbErrorReadWithTimeout> {
2960        debug!("next_responses");
2961        let unique_tag: u64 = rand::random();
2962        let execution_id = execution_id.clone();
2963
2964        let cleanup = || {
2965            let mut guard = self.0.response_subscribers.lock().unwrap();
2966            match guard.remove(&execution_id) {
2967                Some((_, tag)) if tag == unique_tag => {} // Cleanup OK.
2968                Some(other) => {
2969                    // Reinsert foreign sender.
2970                    guard.insert(execution_id.clone(), other);
2971                }
2972                None => {} // Value was replaced and cleaned up already, or notification was sent.
2973            }
2974        };
2975
2976        let response_subscribers = self.0.response_subscribers.clone();
2977        let resp_or_receiver = {
2978            let execution_id = execution_id.clone();
2979            self.transaction_write(
2980                move |tx| {
2981                    let responses = Self::get_responses_with_offset(tx, &execution_id, start_idx)?;
2982                    if responses.is_empty() {
2983                        // cannot race as we have the transaction write lock
2984                        let (sender, receiver) = oneshot::channel();
2985                        response_subscribers
2986                            .lock()
2987                            .unwrap()
2988                            .insert(execution_id, (sender, unique_tag));
2989                        Ok::<_, DbErrorReadWithTimeout>(itertools::Either::Right(receiver))
2990                    } else {
2991                        Ok(itertools::Either::Left(responses))
2992                    }
2993                },
2994                "subscribe_to_next_responses",
2995            )
2996            .await
2997        }
2998        .map_err(DbErrorReadWithTimeout::from)
2999        .flatten()
3000        .inspect_err(|_| {
3001            cleanup();
3002        })?;
3003        match resp_or_receiver {
3004            itertools::Either::Left(resp) => Ok(resp), // no need for cleanup
3005            itertools::Either::Right(receiver) => {
3006                let res = async move {
3007                    tokio::select! {
3008                        resp = receiver => {
3009                            let resp = resp.map_err(|_| DbErrorGeneric::Close)?;
3010                            Ok(vec![resp])
3011                        }
3012                        () = timeout_fut => Err(DbErrorReadWithTimeout::Timeout),
3013                    }
3014                }
3015                .await;
3016                cleanup();
3017                res
3018            }
3019        }
3020    }
3021
3022    // Supports multiple subscribers.
3023    async fn wait_for_finished_result(
3024        &self,
3025        execution_id: &ExecutionId,
3026        timeout_fut: Option<Pin<Box<dyn Future<Output = ()> + Send>>>,
3027    ) -> Result<SupportedFunctionReturnValue, DbErrorReadWithTimeout> {
3028        let unique_tag: u64 = rand::random();
3029        let execution_id = execution_id.clone();
3030        let execution_finished_subscription = self.0.execution_finished_subscribers.clone();
3031
3032        let cleanup = || {
3033            let mut guard = self.0.execution_finished_subscribers.lock().unwrap();
3034            if let Some(subscribers) = guard.get_mut(&execution_id) {
3035                subscribers.remove(&unique_tag);
3036            }
3037        };
3038
3039        let resp_or_receiver = {
3040            let execution_id = execution_id.clone();
3041            self.transaction_read(move |tx| {
3042                let pending_state =
3043                    Self::get_combined_state(tx, &execution_id)?.pending_state;
3044                if let PendingState::Finished { finished } = pending_state {
3045                    let event =
3046                        Self::get_execution_event(tx, &execution_id, finished.version)?;
3047                    if let ExecutionEventInner::Finished { result, ..} = event.event {
3048                        Ok(itertools::Either::Left(result))
3049                    } else {
3050                        error!("Mismatch, expected Finished row: {event:?} based on t_state {finished}");
3051                        Err(DbErrorReadWithTimeout::from(consistency_db_err(
3052                            "cannot get finished event based on t_state version"
3053                        )))
3054                    }
3055                } else {
3056                    // Cannot race with the notifier as we have the transaction write lock:
3057                    // Either the finished event was appended previously, thus `itertools::Either::Left` was selected,
3058                    // or we end up here. If this tx fails, the cleanup will remove this entry.
3059                    let (sender, receiver) = oneshot::channel();
3060                    let mut guard = execution_finished_subscription.lock().unwrap();
3061                    guard.entry(execution_id).or_default().insert(unique_tag, sender);
3062                    Ok(itertools::Either::Right(receiver))
3063                }
3064            }, "wait_for_finished_result")
3065            .await
3066        }.map_err(DbErrorReadWithTimeout::from)
3067        .flatten()
3068        .inspect_err(|_| {
3069            // This cleanup can race with the notification sender. If the
3070            // sender already
3071            cleanup();
3072        })?;
3073
3074        let timeout_fut = timeout_fut.unwrap_or_else(|| Box::pin(std::future::pending()));
3075        match resp_or_receiver {
3076            itertools::Either::Left(resp) => Ok(resp), // no need for cleanup
3077            itertools::Either::Right(receiver) => {
3078                let res = async move {
3079                    tokio::select! {
3080                        resp = receiver => {
3081                            Ok(resp.expect("the notifier sends to all listeners, cannot race with cleanup"))
3082                        }
3083                        () = timeout_fut => Err(DbErrorReadWithTimeout::Timeout),
3084                    }
3085                }
3086                .await;
3087                cleanup();
3088                res
3089            }
3090        }
3091    }
3092
3093    #[instrument(level = Level::DEBUG, skip(self, response_event), fields(join_set_id = %response_event.join_set_id))]
3094    async fn append_response(
3095        &self,
3096        created_at: DateTime<Utc>,
3097        execution_id: ExecutionId,
3098        response_event: JoinSetResponseEvent,
3099    ) -> Result<(), DbErrorWrite> {
3100        debug!("append_response");
3101        let event = JoinSetResponseEventOuter {
3102            created_at,
3103            event: response_event,
3104        };
3105        let notifier = self
3106            .transaction_write(
3107                move |tx| Self::append_response(tx, &execution_id, event),
3108                "append_response",
3109            )
3110            .await??;
3111        self.notify_all(vec![notifier], created_at);
3112        Ok(())
3113    }
3114
3115    #[instrument(level = Level::DEBUG, skip_all)]
3116    async fn append_backtrace(&self, append: BacktraceInfo) -> Result<(), DbErrorWrite> {
3117        debug!("append_backtrace");
3118        self.transaction_write(
3119            move |tx| Self::append_backtrace(tx, &append),
3120            "append_backtrace",
3121        )
3122        .await?
3123    }
3124
3125    #[instrument(level = Level::DEBUG, skip_all)]
3126    async fn append_backtrace_batch(&self, batch: Vec<BacktraceInfo>) -> Result<(), DbErrorWrite> {
3127        debug!("append_backtrace_batch");
3128        self.transaction_write(
3129            move |tx| {
3130                for append in batch {
3131                    Self::append_backtrace(tx, &append)?;
3132                }
3133                Ok(())
3134            },
3135            "append_backtrace",
3136        )
3137        .await?
3138    }
3139
3140    #[instrument(level = Level::DEBUG, skip_all)]
3141    async fn get_backtrace(
3142        &self,
3143        execution_id: &ExecutionId,
3144        filter: BacktraceFilter,
3145    ) -> Result<BacktraceInfo, DbErrorRead> {
3146        debug!("get_last_backtrace");
3147        let execution_id = execution_id.clone();
3148
3149        self.transaction_read(
3150            move |tx| {
3151                let select = "SELECT component_id, version_min_including, version_max_excluding, wasm_backtrace FROM t_backtrace \
3152                                WHERE execution_id = :execution_id";
3153                let mut params: Vec<(&'static str, Box<dyn rusqlite::ToSql>)> = vec![(":execution_id", Box::new(execution_id.to_string()))];
3154                let select = match filter {
3155                    BacktraceFilter::Specific(version) =>{
3156                        params.push((":version", Box::new(version.0)));
3157                        format!("{select} AND version_min_including <= :version AND version_max_excluding > :version")
3158                    },
3159                    BacktraceFilter::First => format!("{select} ORDER BY version_min_including LIMIT 1"),
3160                    BacktraceFilter::Last => format!("{select} ORDER BY version_min_including DESC LIMIT 1")
3161               };
3162                tx
3163                    .prepare(&select)
3164                    .map_err(prepare_err_generic)?
3165                    .query_row::<_, &[(&'static str, &dyn ToSql)], _>(
3166                        params
3167                            .iter()
3168                            .map(|(key, value)| (*key, value.as_ref()))
3169                            .collect::<Vec<_>>()
3170                            .as_ref(),
3171                    |row| {
3172                        Ok(BacktraceInfo {
3173                            execution_id: execution_id.clone(),
3174                            component_id: row.get::<_, FromStrWrapper<_> >("component_id")?.0,
3175                            version_min_including: Version::new(row.get::<_, VersionType>("version_min_including")?),
3176                            version_max_excluding: Version::new(row.get::<_, VersionType>("version_max_excluding")?),
3177                            wasm_backtrace: row.get::<_, JsonWrapper<_>>("wasm_backtrace")?.0,
3178                        })
3179                    },
3180                )
3181                .map_err(result_err_read)
3182            },
3183            "get_last_backtrace",
3184        ).await?
3185    }
3186
3187    /// Get currently expired delays and locks.
3188    #[instrument(level = Level::TRACE, skip(self))]
3189    async fn get_expired_timers(
3190        &self,
3191        at: DateTime<Utc>,
3192    ) -> Result<Vec<ExpiredTimer>, DbErrorGeneric> {
3193        self.conn_low_prio(
3194            move |conn| {
3195                let mut expired_timers = conn.prepare(
3196                    "SELECT execution_id, join_set_id, delay_id FROM t_delay WHERE expires_at <= :at",
3197                ).map_err(prepare_err_generic)?
3198                .query_map(
3199                        named_params! {
3200                            ":at": at,
3201                        },
3202                        |row| {
3203                            let execution_id = row.get("execution_id")?;
3204                            let join_set_id = row.get::<_, JoinSetId>("join_set_id")?;
3205                            let delay_id = row.get::<_, DelayId>("delay_id")?;
3206                            let delay = ExpiredDelay { execution_id, join_set_id, delay_id };
3207                            Ok(ExpiredTimer::Delay(delay))
3208                        },
3209                    ).map_err(result_err_generic)?
3210                    .collect::<Result<Vec<_>, _>>().map_err(result_err_generic)?;
3211                // Extend with expired locks
3212                let expired = conn.prepare(&format!(r#"
3213                    SELECT execution_id, last_lock_version, corresponding_version, intermittent_event_count, max_retries, retry_exp_backoff_millis
3214                    FROM t_state
3215                    WHERE pending_expires_finished <= :at AND state = "{STATE_LOCKED}"
3216                    "#
3217                )
3218                ).map_err(prepare_err_generic)?
3219                .query_map(
3220                        named_params! {
3221                            ":at": at,
3222                        },
3223                        |row| {
3224                            let execution_id = row.get("execution_id")?;
3225                            let locked_at_version = Version::new(row.get("last_lock_version")?);
3226                            let next_version = Version::new(row.get("corresponding_version")?).increment();
3227                            let intermittent_event_count = row.get("intermittent_event_count")?;
3228                            let max_retries = row.get("max_retries")?;
3229                            let retry_exp_backoff_millis = row.get("retry_exp_backoff_millis")?;
3230                            let parent = if let ExecutionId::Derived(derived) = &execution_id {
3231                                derived.split_to_parts().inspect_err(|err| error!("cannot split execution {execution_id} to parts: {err:?}")).ok()
3232                            } else {
3233                                None
3234                            };
3235                            let lock = ExpiredLock {
3236                                execution_id,
3237                                locked_at_version,
3238                                next_version,
3239                                intermittent_event_count,
3240                                max_retries,
3241                                retry_exp_backoff: Duration::from_millis(retry_exp_backoff_millis),
3242                                parent
3243                            };
3244                            Ok(ExpiredTimer::Lock(lock))
3245                        }
3246                    ).map_err(result_err_generic)?
3247                    .collect::<Result<Vec<_>, _>>().map_err(result_err_generic)?;
3248                expired_timers.extend(expired);
3249                if !expired_timers.is_empty() {
3250                    debug!("get_expired_timers found {expired_timers:?}");
3251                }
3252                Ok(expired_timers)
3253            }, "get_expired_timers"
3254        )
3255        .await
3256    }
3257
3258    async fn get_execution_event(
3259        &self,
3260        execution_id: &ExecutionId,
3261        version: &Version,
3262    ) -> Result<ExecutionEvent, DbErrorRead> {
3263        let version = version.0;
3264        let execution_id = execution_id.clone();
3265        self.transaction_read(
3266            move |tx| Self::get_execution_event(tx, &execution_id, version),
3267            "get_execution_event",
3268        )
3269        .await?
3270    }
3271
3272    async fn get_pending_state(
3273        &self,
3274        execution_id: &ExecutionId,
3275    ) -> Result<PendingState, DbErrorRead> {
3276        let execution_id = execution_id.clone();
3277        Ok(self
3278            .transaction_read(
3279                move |tx| Self::get_combined_state(tx, &execution_id),
3280                "get_pending_state",
3281            )
3282            .await??
3283            .pending_state)
3284    }
3285
3286    async fn list_executions(
3287        &self,
3288        ffqn: Option<FunctionFqn>,
3289        top_level_only: bool,
3290        pagination: ExecutionListPagination,
3291    ) -> Result<Vec<ExecutionWithState>, DbErrorGeneric> {
3292        self.transaction_read(
3293            move |tx| Self::list_executions(tx, ffqn, top_level_only, pagination),
3294            "list_executions",
3295        )
3296        .await?
3297    }
3298
3299    async fn list_responses(
3300        &self,
3301        execution_id: &ExecutionId,
3302        pagination: Pagination<u32>,
3303    ) -> Result<Vec<ResponseWithCursor>, DbErrorRead> {
3304        let execution_id = execution_id.clone();
3305        self.transaction_read(
3306            move |tx| Self::list_responses(tx, &execution_id, Some(pagination)),
3307            "list_executions",
3308        )
3309        .await?
3310    }
3311}
3312
3313#[cfg(any(test, feature = "tempfile"))]
3314pub mod tempfile {
3315    use super::{SqliteConfig, SqlitePool};
3316    use tempfile::NamedTempFile;
3317
3318    pub async fn sqlite_pool() -> (SqlitePool, Option<NamedTempFile>) {
3319        if let Ok(path) = std::env::var("SQLITE_FILE") {
3320            (
3321                SqlitePool::new(path, SqliteConfig::default())
3322                    .await
3323                    .unwrap(),
3324                None,
3325            )
3326        } else {
3327            let file = NamedTempFile::new().unwrap();
3328            let path = file.path();
3329            (
3330                SqlitePool::new(path, SqliteConfig::default())
3331                    .await
3332                    .unwrap(),
3333                Some(file),
3334            )
3335        }
3336    }
3337}