obeli_sk_db_sqlite/
sqlite_dao.rs

1use assert_matches::assert_matches;
2use async_trait::async_trait;
3use chrono::{DateTime, Utc};
4use concepts::{
5    ComponentId, ExecutionId, FunctionFqn, JoinSetId, StrVariant, SupportedFunctionReturnValue,
6    prefixed_ulid::{DelayId, ExecutionIdDerived, ExecutorId, PrefixedUlid, RunId},
7    storage::{
8        AppendBatchResponse, AppendRequest, AppendResponse, BacktraceFilter, BacktraceInfo,
9        ClientError, CreateRequest, DUMMY_CREATED, DUMMY_HISTORY_EVENT, DUMMY_TEMPORARILY_FAILED,
10        DUMMY_TEMPORARILY_TIMED_OUT, DbConnection, DbConnectionError, DbError, DbPool,
11        ExecutionEvent, ExecutionEventInner, ExecutionListPagination, ExecutionWithState,
12        ExpiredTimer, HistoryEvent, JoinSetRequest, JoinSetResponse, JoinSetResponseEvent,
13        JoinSetResponseEventOuter, LockPendingResponse, LockResponse, LockedExecution, Pagination,
14        PendingState, PendingStateFinished, PendingStateFinishedResultKind, ResponseWithCursor,
15        SpecificError, SubscribeError, Version, VersionType,
16    },
17    time::Sleep,
18};
19use hdrhistogram::{Counter, Histogram};
20use rusqlite::{
21    Connection, OpenFlags, OptionalExtension, Params, ToSql, Transaction, named_params,
22    types::{FromSql, FromSqlError},
23};
24use std::{
25    cmp::max,
26    collections::VecDeque,
27    error::Error,
28    fmt::Debug,
29    path::Path,
30    str::FromStr,
31    sync::{
32        Arc, Mutex,
33        atomic::{AtomicBool, Ordering},
34    },
35    time::Duration,
36};
37use std::{fmt::Write as _, pin::Pin};
38use tokio::{
39    sync::{mpsc, oneshot},
40    time::Instant,
41};
42use tracing::{Level, Span, debug, debug_span, error, info, instrument, trace, warn};
43
44#[derive(Debug, Clone)]
45struct DelayReq {
46    join_set_id: JoinSetId,
47    delay_id: DelayId,
48    expires_at: DateTime<Utc>,
49}
50/*
51mmap_size = 128MB - Set the global memory map so all processes can share some data
52https://www.sqlite.org/pragma.html#pragma_mmap_size
53https://www.sqlite.org/mmap.html
54
55journal_size_limit = 64 MB - limit on the WAL file to prevent unlimited growth
56https://www.sqlite.org/pragma.html#pragma_journal_size_limit
57
58Inspired by https://github.com/rails/rails/pull/49349
59*/
60
61const PRAGMA: [[&str; 2]; 9] = [
62    ["journal_mode", "wal"],
63    ["synchronous", "FULL"],
64    ["foreign_keys", "true"],
65    ["busy_timeout", "1000"],
66    ["cache_size", "10000"], // number of pages
67    ["temp_store", "MEMORY"],
68    ["page_size", "8192"], // 8 KB
69    ["mmap_size", "134217728"],
70    ["journal_size_limit", "67108864"],
71];
72
73const CREATE_TABLE_T_METADATA: &str = r"
74CREATE TABLE IF NOT EXISTS t_metadata (
75    id INTEGER PRIMARY KEY CHECK (id = 1),
76    schema_version INTEGER NOT NULL,
77    created_at TEXT NOT NULL
78) STRICT
79";
80const T_METADATA_SINGLETON: u32 = 1;
81const T_METADATA_EXPECTED_SCHEMA_VERSION: u32 = 1;
82
83/// Stores execution history. Append only.
84const CREATE_TABLE_T_EXECUTION_LOG: &str = r"
85CREATE TABLE IF NOT EXISTS t_execution_log (
86    execution_id TEXT NOT NULL,
87    created_at TEXT NOT NULL,
88    json_value TEXT NOT NULL,
89    version INTEGER NOT NULL,
90    variant TEXT NOT NULL,
91    join_set_id TEXT,
92    history_event_type TEXT GENERATED ALWAYS AS (json_value->>'$.HistoryEvent.event.type') STORED,
93    PRIMARY KEY (execution_id, version)
94) STRICT
95";
96// Used in `fetch_created` and `get_execution_event`
97const CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_VERSION: &str = r"
98CREATE INDEX IF NOT EXISTS idx_t_execution_log_execution_id_version  ON t_execution_log (execution_id, version);
99";
100// Used in `lock_inner` to filter by execution ID and variant (created or event history)
101const CREATE_INDEX_IDX_T_EXECUTION_ID_EXECUTION_ID_VARIANT: &str = r"
102CREATE INDEX IF NOT EXISTS idx_t_execution_log_execution_id_variant  ON t_execution_log (execution_id, variant);
103";
104
105/// Stores child execution return values for the parent (`execution_id`). Append only.
106/// For `JoinSetResponse::DelayFinished`, column `delay_id` must not be null.
107/// For `JoinSetResponse::ChildExecutionFinished`, column `child_execution_id`,`finished_version`
108/// must not be null.
109const CREATE_TABLE_T_JOIN_SET_RESPONSE: &str = r"
110CREATE TABLE IF NOT EXISTS t_join_set_response (
111    id INTEGER PRIMARY KEY AUTOINCREMENT,
112    created_at TEXT NOT NULL,
113    execution_id TEXT NOT NULL,
114    join_set_id TEXT NOT NULL,
115
116    delay_id TEXT,
117
118    child_execution_id TEXT,
119    finished_version INTEGER,
120
121    UNIQUE (execution_id, join_set_id, delay_id, child_execution_id)
122) STRICT
123";
124// Used when querying for the next response
125const CREATE_INDEX_IDX_T_JOIN_SET_RESPONSE_EXECUTION_ID_ID: &str = r"
126CREATE INDEX IF NOT EXISTS idx_t_join_set_response_execution_id_id ON t_join_set_response (execution_id, id);
127";
128
129/// Stores executions in `PendingState`
130/// `state` to column mapping:
131/// `PendingAt`:
132/// `BlockedByJoinSet`:     `join_set_id`, `join_set_closing`
133/// `Locked`:               `executor_id`, `run_id`, `temporary_event_count`, `max_retries`, `retry_exp_backoff_millis`, Option<`parent_execution_id`>, Option<`parent_join_set_id`>, `locked_at_version`
134/// `Finished` :            `result_kind`, `next_version` is not bumped, points to the finished version.
135const CREATE_TABLE_T_STATE: &str = r"
136CREATE TABLE IF NOT EXISTS t_state (
137    execution_id TEXT NOT NULL,
138    is_top_level INTEGER NOT NULL,
139    next_version INTEGER NOT NULL,
140    pending_expires_finished TEXT NOT NULL,
141    ffqn TEXT NOT NULL,
142    state TEXT NOT NULL,
143    created_at TEXT NOT NULL,
144    scheduled_at TEXT NOT NULL,
145
146    join_set_id TEXT,
147    join_set_closing INTEGER,
148
149    executor_id TEXT,
150    run_id TEXT,
151    temporary_event_count INTEGER,
152    max_retries INTEGER,
153    retry_exp_backoff_millis INTEGER,
154    parent_execution_id TEXT,
155    parent_join_set_id TEXT,
156    locked_at_version INTEGER,
157
158    result_kind TEXT,
159
160    PRIMARY KEY (execution_id)
161) STRICT
162";
163const STATE_PENDING_AT: &str = "PendingAt";
164const STATE_BLOCKED_BY_JOIN_SET: &str = "BlockedByJoinSet";
165const STATE_LOCKED: &str = "Locked";
166const STATE_FINISHED: &str = "Finished";
167
168const IDX_T_STATE_LOCK_PENDING: &str = r"
169CREATE INDEX IF NOT EXISTS idx_t_state_lock_pending ON t_state (state, pending_expires_finished, ffqn);
170";
171const IDX_T_STATE_EXPIRED_TIMERS: &str = r"
172CREATE INDEX IF NOT EXISTS idx_t_state_expired_timers ON t_state (pending_expires_finished) WHERE executor_id IS NOT NULL;
173";
174const IDX_T_STATE_EXECUTION_ID_IS_TOP_LEVEL: &str = r"
175CREATE INDEX IF NOT EXISTS idx_t_state_execution_id_is_root ON t_state (execution_id, is_top_level);
176";
177// For `list_executions` by ffqn
178const IDX_T_STATE_FFQN: &str = r"
179CREATE INDEX IF NOT EXISTS idx_t_state_ffqn ON t_state (ffqn);
180";
181// For `list_executions` by creation date
182const IDX_T_STATE_CREATED_AT: &str = r"
183CREATE INDEX IF NOT EXISTS idx_t_state_created_at ON t_state (created_at);
184";
185
186/// Represents [`ExpiredTimer::AsyncDelay`] . Rows are deleted when the delay is processed.
187const CREATE_TABLE_T_DELAY: &str = r"
188CREATE TABLE IF NOT EXISTS t_delay (
189    execution_id TEXT NOT NULL,
190    join_set_id TEXT NOT NULL,
191    delay_id TEXT NOT NULL,
192    expires_at TEXT NOT NULL,
193    PRIMARY KEY (execution_id, join_set_id, delay_id)
194) STRICT
195";
196
197const CREATE_TABLE_T_BACKTRACE: &str = r"
198CREATE TABLE IF NOT EXISTS t_backtrace (
199    execution_id TEXT NOT NULL,
200    component_id TEXT NOT NULL,
201    version_min_including INTEGER NOT NULL,
202    version_max_excluding INTEGER NOT NULL,
203    wasm_backtrace TEXT NOT NULL,
204    PRIMARY KEY (execution_id, version_min_including, version_max_excluding)
205) STRICT
206";
207// Index for searching backtraces by execution_id and version
208const IDX_T_BACKTRACE_EXECUTION_ID_VERSION: &str = r"
209CREATE INDEX IF NOT EXISTS idx_t_backtrace_execution_id_version ON t_backtrace (execution_id, version_min_including, version_max_excluding);
210";
211
212#[expect(clippy::needless_pass_by_value)]
213fn convert_err(err: rusqlite::Error) -> DbError {
214    if matches!(err, rusqlite::Error::QueryReturnedNoRows) {
215        DbError::Specific(SpecificError::NotFound)
216    } else {
217        error!(backtrace = %std::backtrace::Backtrace::capture(), "Sqlite error {err:?}");
218        DbError::Specific(SpecificError::GenericError(StrVariant::Static(
219            "sqlite error",
220        )))
221    }
222}
223
224fn parsing_err(err: impl Into<Box<dyn Error>>) -> DbError {
225    let err = err.into();
226    error!(backtrace = %std::backtrace::Backtrace::capture(), "Validation failed - {err:?}");
227    DbError::Specific(SpecificError::ValidationFailed(StrVariant::Arc(Arc::from(
228        err.to_string(),
229    ))))
230}
231
232type ResponseSubscribers = Arc<
233    std::sync::Mutex<hashbrown::HashMap<ExecutionId, oneshot::Sender<JoinSetResponseEventOuter>>>,
234>;
235
236struct PrefixedUlidWrapper<T: 'static>(PrefixedUlid<T>);
237type ExecutorIdW = PrefixedUlidWrapper<concepts::prefixed_ulid::prefix::Exr>;
238type RunIdW = PrefixedUlidWrapper<concepts::prefixed_ulid::prefix::Run>;
239
240impl<T: 'static> FromSql for PrefixedUlidWrapper<T> {
241    fn column_result(value: rusqlite::types::ValueRef<'_>) -> rusqlite::types::FromSqlResult<Self> {
242        let str = value.as_str()?;
243        let str = str.parse::<PrefixedUlid<T>>().map_err(|err| {
244            error!(
245                backtrace = %std::backtrace::Backtrace::capture(),
246                "Cannot convert identifier of type:`{type}`, value:`{str}` - {err:?}",
247                r#type = std::any::type_name::<T>()
248            );
249            FromSqlError::InvalidType
250        })?;
251        Ok(Self(str))
252    }
253}
254
255struct JsonWrapper<T>(T);
256impl<T: serde::de::DeserializeOwned + 'static + Debug> FromSql for JsonWrapper<T> {
257    fn column_result(value: rusqlite::types::ValueRef<'_>) -> rusqlite::types::FromSqlResult<Self> {
258        let value = match value {
259            rusqlite::types::ValueRef::Text(value) | rusqlite::types::ValueRef::Blob(value) => {
260                Ok(value)
261            }
262            other => {
263                error!(
264                    backtrace = %std::backtrace::Backtrace::capture(),
265                    "Unexpected type when conveting to JSON - expected Text or Blob, got type `{other:?}`",
266                );
267                Err(FromSqlError::InvalidType)
268            }
269        }?;
270        let value = serde_json::from_slice::<T>(value).map_err(|err| {
271            error!(
272                backtrace = %std::backtrace::Backtrace::capture(),
273                "Cannot convert JSON value `{value:?}` to type:`{type}` - {err:?}",
274                r#type = std::any::type_name::<T>()
275            );
276            FromSqlError::InvalidType
277        })?;
278        Ok(Self(value))
279    }
280}
281
282struct FromStrWrapper<T: FromStr>(T);
283impl<T: FromStr<Err = D>, D: Debug> FromSql for FromStrWrapper<T> {
284    fn column_result(value: rusqlite::types::ValueRef<'_>) -> rusqlite::types::FromSqlResult<Self> {
285        let value = String::column_result(value)?;
286        let value = T::from_str(&value).map_err(|err| {
287            error!(
288                backtrace = %std::backtrace::Backtrace::capture(),
289                "Cannot convert string `{value}` to type:`{type}` - {err:?}",
290                r#type = std::any::type_name::<T>()
291            );
292            FromSqlError::InvalidType
293        })?;
294        Ok(Self(value))
295    }
296}
297
298#[derive(Debug)]
299struct CombinedStateDTO {
300    state: String,
301    ffqn: String,
302    pending_expires_finished: DateTime<Utc>,
303    executor_id: Option<ExecutorId>,
304    run_id: Option<RunId>,
305    join_set_id: Option<JoinSetId>,
306    join_set_closing: Option<bool>,
307    result_kind: Option<PendingStateFinishedResultKind>,
308}
309
310#[derive(Debug)]
311struct CombinedState {
312    ffqn: FunctionFqn,
313    pending_state: PendingState,
314    next_version: Version,
315}
316
317struct PendingAt {
318    scheduled_at: DateTime<Utc>,
319    ffqn: FunctionFqn,
320}
321
322#[derive(Default)]
323struct IndexUpdated {
324    temporary_event_count: Option<u32>,
325    pending_at: Option<PendingAt>,
326}
327
328impl CombinedState {
329    fn new(dto: &CombinedStateDTO, next_version: Version) -> Result<Self, DbError> {
330        let pending_state = match dto {
331            CombinedStateDTO {
332                state,
333                ffqn: _,
334                pending_expires_finished: scheduled_at,
335                executor_id: None,
336                run_id: None,
337                join_set_id: None,
338                join_set_closing: None,
339                result_kind: None,
340            } if state == STATE_PENDING_AT => Ok(PendingState::PendingAt {
341                scheduled_at: *scheduled_at,
342            }),
343            CombinedStateDTO {
344                state,
345                ffqn: _,
346                pending_expires_finished: lock_expires_at,
347                executor_id: Some(executor_id),
348                run_id: Some(run_id),
349                join_set_id: None,
350                join_set_closing: None,
351                result_kind: None,
352            } if state == STATE_LOCKED => Ok(PendingState::Locked {
353                executor_id: *executor_id,
354                run_id: *run_id,
355                lock_expires_at: *lock_expires_at,
356            }),
357            CombinedStateDTO {
358                state,
359                ffqn: _,
360                pending_expires_finished: lock_expires_at,
361                executor_id: None,
362                run_id: None,
363                join_set_id: Some(join_set_id),
364                join_set_closing: Some(join_set_closing),
365                result_kind: None,
366            } if state == STATE_BLOCKED_BY_JOIN_SET => Ok(PendingState::BlockedByJoinSet {
367                join_set_id: join_set_id.clone(),
368                closing: *join_set_closing,
369                lock_expires_at: *lock_expires_at,
370            }),
371            CombinedStateDTO {
372                state,
373                ffqn: _,
374                pending_expires_finished: finished_at,
375                executor_id: None,
376                run_id: None,
377                join_set_id: None,
378                join_set_closing: None,
379                result_kind: Some(result_kind),
380            } if state == STATE_FINISHED => Ok(PendingState::Finished {
381                finished: PendingStateFinished {
382                    finished_at: *finished_at,
383                    version: next_version.0,
384                    result_kind: *result_kind,
385                },
386            }),
387            _ => {
388                error!("Cannot deserialize pending state from  {dto:?}");
389                Err(DbError::Specific(SpecificError::ConsistencyError(
390                    StrVariant::Static("invalid `t_state`"),
391                )))
392            }
393        }?;
394        Ok(Self {
395            ffqn: FunctionFqn::from_str(&dto.ffqn).map_err(|parse_err| {
396                error!("Error parsing ffqn of {dto:?} - {parse_err:?}");
397                DbError::Specific(SpecificError::ConsistencyError(StrVariant::Static(
398                    "invalid ffqn value in `t_state`",
399                )))
400            })?,
401            pending_state,
402            next_version,
403        })
404    }
405}
406
407#[derive(Debug, Clone, Copy, PartialEq, Eq)]
408enum CommandPriority {
409    // Write, Shutdown
410    High,
411    // Read
412    Medium,
413    // Lock scan
414    Low,
415}
416
417#[derive(derive_more::Debug)]
418enum ThreadCommand {
419    Func {
420        #[debug(skip)]
421        func: Box<dyn FnOnce(&mut Connection) + Send>,
422        priority: CommandPriority,
423        sent_at: Instant,
424        name: &'static str,
425    },
426    Shutdown,
427    Dummy,
428}
429
430#[derive(Clone)]
431pub struct SqlitePool<S: Sleep>(SqlitePoolInner<S>);
432
433#[derive(Clone)]
434struct SqlitePoolInner<S: Sleep> {
435    shutdown_requested: Arc<AtomicBool>,
436    shutdown_finished: Arc<AtomicBool>,
437    command_tx: tokio::sync::mpsc::Sender<ThreadCommand>,
438    response_subscribers: ResponseSubscribers,
439    ffqn_to_pending_subscription: Arc<Mutex<hashbrown::HashMap<FunctionFqn, mpsc::Sender<()>>>>,
440    sleep: S,
441    join_handle: Option<Arc<std::thread::JoinHandle<()>>>, // always Some, Optional for swapping in drop.
442}
443
444#[async_trait]
445impl<S: Sleep + 'static> DbPool for SqlitePool<S> {
446    fn connection(&self) -> Box<dyn DbConnection> {
447        Box::new(self.clone())
448    }
449
450    fn is_closing(&self) -> bool {
451        self.0.shutdown_requested.load(Ordering::Acquire)
452    }
453
454    async fn close(&self) -> Result<(), DbError> {
455        self.0.shutdown_requested.store(true, Ordering::Release);
456        let _ = self.0.command_tx.send(ThreadCommand::Shutdown).await;
457        while !self.0.shutdown_finished.load(Ordering::Acquire) {
458            self.0.sleep.sleep(Duration::from_millis(1)).await;
459        }
460        Ok(())
461    }
462}
463impl<S: Sleep> Drop for SqlitePool<S> {
464    fn drop(&mut self) {
465        let arc = self.0.join_handle.take().expect("join_handle was set");
466        if let Ok(join_handle) = Arc::try_unwrap(arc) {
467            // Last holder
468            if !join_handle.is_finished() {
469                if !self.0.shutdown_finished.load(Ordering::Acquire) {
470                    // Best effort to shut down the sqlite thread.
471                    let backtrace = std::backtrace::Backtrace::capture();
472                    warn!("SqlitePool was not closed properly - {backtrace}");
473                    self.0.shutdown_requested.store(true, Ordering::Release);
474                    let _ = self.0.command_tx.try_send(ThreadCommand::Shutdown);
475                    // Not joining the thread, drop might be called from async context.
476                    // We are shutting down the server anyway.
477                } else {
478                    // The thread set `shutdown_finished`, it does not matter if it is still alive.
479                }
480            }
481        }
482    }
483}
484
485#[derive(Debug, Clone)]
486pub struct SqliteConfig {
487    pub queue_capacity: usize,
488    pub low_prio_threshold: usize,
489    pub pragma_override: Option<hashbrown::HashMap<String, String>>,
490}
491impl Default for SqliteConfig {
492    fn default() -> Self {
493        Self {
494            queue_capacity: 100,
495            low_prio_threshold: 100,
496            pragma_override: None,
497        }
498    }
499}
500
501impl<S: Sleep> SqlitePool<S> {
502    fn init_thread(
503        path: &Path,
504        mut pragma_override: hashbrown::HashMap<String, String>,
505    ) -> Connection {
506        // No need to log errors - propagate error messages via panic.
507        // It is OK to panic here - spawned on blocking thread pool, panic will error the initialization.
508        fn execute<P: Params>(conn: &Connection, sql: &str, params: P) {
509            conn.execute(sql, params)
510                .unwrap_or_else(|err| panic!("cannot run `{sql}` - {err:?}"));
511        }
512        fn pragma_update(conn: &Connection, name: &str, value: &str) {
513            debug!("Setting PRAGMA {name}={value}");
514            conn.pragma_update(None, name, value)
515                .unwrap_or_else(|err| panic!("cannot update pragma `{name}`=`{value}` - {err:?}"));
516        }
517
518        let conn = Connection::open_with_flags(path, OpenFlags::default())
519            .unwrap_or_else(|err| panic!("cannot open the connection - {err:?}"));
520
521        for [pragma_name, pragma_value] in PRAGMA {
522            let pragma_value = pragma_override
523                .remove(pragma_name)
524                .unwrap_or_else(|| pragma_value.to_string());
525            pragma_update(&conn, pragma_name, &pragma_value);
526        }
527        // drain the rest overrides
528        for (pragma_name, pragma_value) in pragma_override.drain() {
529            pragma_update(&conn, &pragma_name, &pragma_value);
530        }
531
532        // t_metadata
533        execute(&conn, CREATE_TABLE_T_METADATA, []);
534        // Insert row if not exists.
535        execute(
536            &conn,
537            &format!(
538                "INSERT INTO t_metadata (id, schema_version, created_at) VALUES
539                    ({T_METADATA_SINGLETON}, {T_METADATA_EXPECTED_SCHEMA_VERSION}, ?) ON CONFLICT DO NOTHING"
540            ),
541            [Utc::now()],
542        );
543        // Fail on unexpected `schema_version`.
544        let actual_version = conn
545            .prepare("SELECT schema_version FROM t_metadata")
546            .unwrap_or_else(|err| panic!("cannot select schema version - {err:?}"))
547            .query_row([], |row| row.get::<_, u32>("schema_version"));
548
549        let actual_version = actual_version.unwrap_or_else(|err| {
550            panic!("Cannot read the schema version - {err:?}");
551        });
552        assert!(
553            actual_version == T_METADATA_EXPECTED_SCHEMA_VERSION,
554            "wrong schema version, expected {T_METADATA_EXPECTED_SCHEMA_VERSION}, got {actual_version}"
555        );
556
557        // t_execution_log
558        execute(&conn, CREATE_TABLE_T_EXECUTION_LOG, []);
559        execute(
560            &conn,
561            CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_VERSION,
562            [],
563        );
564        execute(
565            &conn,
566            CREATE_INDEX_IDX_T_EXECUTION_ID_EXECUTION_ID_VARIANT,
567            [],
568        );
569        // t_join_set_response
570        execute(&conn, CREATE_TABLE_T_JOIN_SET_RESPONSE, []);
571        execute(
572            &conn,
573            CREATE_INDEX_IDX_T_JOIN_SET_RESPONSE_EXECUTION_ID_ID,
574            [],
575        );
576        // t_state
577        execute(&conn, CREATE_TABLE_T_STATE, []);
578        execute(&conn, IDX_T_STATE_LOCK_PENDING, []);
579        execute(&conn, IDX_T_STATE_EXPIRED_TIMERS, []);
580        execute(&conn, IDX_T_STATE_EXECUTION_ID_IS_TOP_LEVEL, []);
581        execute(&conn, IDX_T_STATE_FFQN, []);
582        execute(&conn, IDX_T_STATE_CREATED_AT, []);
583        // t_delay
584        execute(&conn, CREATE_TABLE_T_DELAY, []);
585        // t_backtrace
586        execute(&conn, CREATE_TABLE_T_BACKTRACE, []);
587        execute(&conn, IDX_T_BACKTRACE_EXECUTION_ID_VERSION, []);
588        conn
589    }
590
591    fn connection_rpc(
592        mut conn: Connection,
593        shutdown_requested: &AtomicBool,
594        shutdown_finished: &AtomicBool,
595        mut command_rx: mpsc::Receiver<ThreadCommand>,
596        queue_capacity: usize,
597        low_prio_threshold: usize,
598    ) {
599        const METRIC_DUMPING_TRESHOLD: usize = 0; // 0 to disable histogram dumping
600        let mut vec: Vec<ThreadCommand> = Vec::with_capacity(queue_capacity);
601        // measure how long it takes to receive the `ThreadCommand`. 1us-1s
602        let mut send_hist = Histogram::<u32>::new_with_bounds(1, 1_000_000, 3).unwrap();
603        let mut func_histograms = hashbrown::HashMap::new();
604        let mut metric_dumping_counter = 0;
605        let print_histogram = |name, histogram: &Histogram<u32>, trailing_coma| {
606            print!(
607                "\"{name}\": {mean}, \"{name}_len\": {len}, \"{name}_meanlen\": {meanlen} {coma}",
608                mean = histogram.mean(),
609                len = histogram.len(),
610                meanlen = histogram.mean() * histogram.len().as_f64(),
611                coma = if trailing_coma { "," } else { "" }
612            );
613        };
614        loop {
615            vec.clear();
616            metric_dumping_counter += 1;
617            if let Some(item) = command_rx.blocking_recv() {
618                vec.push(item);
619            } else {
620                debug!("command_rx was closed");
621                break;
622            }
623            while let Ok(more) = command_rx.try_recv() {
624                vec.push(more);
625            }
626            // Did we receive Shutdown in the batch?
627            let shutdown_found = vec
628                .iter()
629                .any(|item| matches!(item, ThreadCommand::Shutdown));
630            if shutdown_found || shutdown_requested.load(Ordering::Acquire) {
631                debug!("Recveived shutdown before processing the batch");
632                break;
633            }
634
635            let mut execute = |expected: CommandPriority| {
636                let mut processed = 0;
637                for item in &mut vec {
638                    if matches!(item, ThreadCommand::Func { priority, .. } if *priority == expected )
639                    {
640                        let item = std::mem::replace(item, ThreadCommand::Dummy); // get owned item
641                        let (func, sent_at, name) = assert_matches!(item, ThreadCommand::Func{func, sent_at, name, ..} => (func, sent_at, name));
642                        let sent_at = sent_at.elapsed();
643                        let started_at = Instant::now();
644                        func(&mut conn);
645                        let started_at = started_at.elapsed();
646                        processed += 1;
647                        // update hdr metrics
648                        send_hist
649                            .record(u64::from(sent_at.subsec_micros()))
650                            .unwrap();
651                        func_histograms
652                            .entry(name)
653                            .or_insert_with(|| {
654                                Histogram::<u32>::new_with_bounds(1, 1_000_000, 3).unwrap()
655                            })
656                            .record(u64::from(started_at.subsec_micros()))
657                            .unwrap();
658                    }
659                    if shutdown_requested.load(Ordering::Acquire) {
660                        // recheck after every function
661                        debug!("Recveived shutdown during processing of the batch");
662                        break;
663                    }
664                }
665                processed
666            };
667            let processed = execute(CommandPriority::High) + execute(CommandPriority::Medium);
668            if processed < low_prio_threshold {
669                execute(CommandPriority::Low);
670            }
671
672            if metric_dumping_counter == METRIC_DUMPING_TRESHOLD {
673                print!("{{");
674                metric_dumping_counter = 0;
675                func_histograms.iter_mut().for_each(|(name, h)| {
676                    print_histogram(*name, h, true);
677                    h.clear();
678                });
679                print_histogram("send", &send_hist, false);
680                send_hist.clear();
681                println!("}}");
682            }
683        } // Loop until shutdown is set to true.
684        debug!("Closing command thread");
685        shutdown_finished.store(true, Ordering::Release);
686    }
687
688    #[instrument(level = Level::DEBUG, skip_all, name = "sqlite_new")]
689    pub async fn new<P: AsRef<Path>>(
690        path: P,
691        config: SqliteConfig,
692        sleep: S,
693    ) -> Result<Self, DbError> {
694        let path = path.as_ref().to_owned();
695
696        let shutdown_requested = Arc::new(AtomicBool::new(false));
697        let shutdown_finished = Arc::new(AtomicBool::new(false));
698
699        let (command_tx, command_rx) = tokio::sync::mpsc::channel(config.queue_capacity);
700        info!("Sqlite database location: {path:?}");
701        let join_handle = {
702            // Initialize the `Connection`.
703            let init_task = {
704                tokio::task::spawn_blocking(move || {
705                    Self::init_thread(&path, config.pragma_override.unwrap_or_default())
706                })
707                .await
708            };
709            let conn = match init_task {
710                Ok(conn) => conn,
711                Err(err) => {
712                    error!("Initialization error - {err:?}");
713                    return Err(DbError::Specific(SpecificError::GenericError(
714                        StrVariant::Static("initialization error"),
715                    )));
716                }
717            };
718            let shutdown_requested = shutdown_requested.clone();
719            let shutdown_finished = shutdown_finished.clone();
720            // Start the RPC thread.
721            std::thread::spawn(move || {
722                Self::connection_rpc(
723                    conn,
724                    &shutdown_requested,
725                    &shutdown_finished,
726                    command_rx,
727                    config.queue_capacity,
728                    config.low_prio_threshold,
729                );
730            })
731        };
732        Ok(SqlitePool(SqlitePoolInner {
733            shutdown_requested,
734            shutdown_finished,
735            command_tx,
736            response_subscribers: Arc::default(),
737            ffqn_to_pending_subscription: Arc::default(),
738            sleep,
739            join_handle: Some(Arc::new(join_handle)),
740        }))
741    }
742
743    #[instrument(level = Level::TRACE, skip_all, fields(name))]
744    pub async fn transaction_write<F, T>(&self, func: F, name: &'static str) -> Result<T, DbError>
745    where
746        F: FnOnce(&mut rusqlite::Transaction) -> Result<T, DbError> + Send + 'static,
747        T: Send + 'static,
748    {
749        self.transaction(func, true, name).await
750    }
751
752    #[instrument(level = Level::TRACE, skip_all, fields(name))]
753    pub async fn transaction_read<F, T>(&self, func: F, name: &'static str) -> Result<T, DbError>
754    where
755        F: FnOnce(&mut rusqlite::Transaction) -> Result<T, DbError> + Send + 'static,
756        T: Send + 'static,
757    {
758        self.transaction(func, false, name).await
759    }
760
761    /// Invokes the provided function wrapping a new [`rusqlite::Transaction`] that is committed automatically.
762    async fn transaction<F, T>(
763        &self,
764        func: F,
765        write: bool,
766        name: &'static str,
767    ) -> Result<T, DbError>
768    where
769        F: FnOnce(&mut rusqlite::Transaction) -> Result<T, DbError> + Send + 'static,
770        T: Send + 'static,
771    {
772        let (tx, rx) = oneshot::channel();
773        let parent_span = Span::current();
774
775        self.0
776            .command_tx
777            .send(ThreadCommand::Func {
778                priority: if write {
779                    CommandPriority::High
780                } else {
781                    CommandPriority::Medium
782                },
783                func: Box::new(move |conn| {
784                    let res = debug_span!(parent: &parent_span, "tx_begin", name).in_scope(|| {
785                        conn.transaction_with_behavior(if write {
786                            rusqlite::TransactionBehavior::Immediate
787                        } else {
788                            rusqlite::TransactionBehavior::Deferred
789                        })
790                        .map_err(convert_err)
791                    });
792                    let res = res.and_then(|mut transaction| {
793                        parent_span.in_scope(|| func(&mut transaction).map(|ok| (ok, transaction)))
794                    });
795                    let res = res.and_then(|(ok, transaction)| {
796                        debug_span!(parent: &parent_span, "tx_commit", name)
797                            .in_scope(|| transaction.commit().map(|()| ok).map_err(convert_err))
798                    });
799                    _ = tx.send(res);
800                }),
801                sent_at: Instant::now(),
802                name,
803            })
804            .await
805            .map_err(|_send_err| DbError::Connection(DbConnectionError::SendError))?;
806        rx.await
807            .map_err(|_recv_err| DbError::Connection(DbConnectionError::RecvError))?
808    }
809
810    #[instrument(level = Level::TRACE, skip_all, fields(name))]
811    pub async fn conn_low_prio<F, T>(&self, func: F, name: &'static str) -> Result<T, DbError>
812    where
813        F: FnOnce(&Connection) -> Result<T, DbError> + Send + 'static,
814        T: Send + 'static + Default,
815    {
816        let (tx, rx) = oneshot::channel();
817        let span = tracing::trace_span!("tx_function");
818        self.0
819            .command_tx
820            .send(ThreadCommand::Func {
821                priority: CommandPriority::Low,
822                func: Box::new(move |conn| {
823                    _ = tx.send(span.in_scope(|| func(conn)));
824                }),
825                sent_at: Instant::now(),
826                name,
827            })
828            .await
829            .map_err(|_send_err| DbError::Connection(DbConnectionError::SendError))?;
830        match rx.await {
831            Ok(res) => res,
832            Err(_recv_err) => Ok(T::default()), // Dropped computation because of other priorities..
833        }
834    }
835
836    fn fetch_created_event(
837        conn: &Connection,
838        execution_id: &ExecutionId,
839    ) -> Result<CreateRequest, DbError> {
840        let mut stmt = conn
841            .prepare(
842                "SELECT created_at, json_value FROM t_execution_log WHERE \
843            execution_id = :execution_id AND version = 0",
844            )
845            .map_err(convert_err)?;
846        let (created_at, event) = stmt
847            .query_row(
848                named_params! {
849                    ":execution_id": execution_id.to_string(),
850                },
851                |row| {
852                    let created_at = row.get("created_at")?;
853                    let event = row
854                        .get::<_, JsonWrapper<ExecutionEventInner>>("json_value")
855                        .map(|event| (created_at, event.0))
856                        .map_err(|serde| {
857                            error!("cannot deserialize `Created` event: {row:?} - `{serde:?}`");
858                            parsing_err(serde)
859                        });
860                    Ok(event)
861                },
862            )
863            .map_err(convert_err)??;
864        if let ExecutionEventInner::Created {
865            ffqn,
866            params,
867            parent,
868            scheduled_at,
869            retry_exp_backoff,
870            max_retries,
871            component_id,
872            metadata,
873            scheduled_by,
874        } = event
875        {
876            Ok(CreateRequest {
877                created_at,
878                execution_id: execution_id.clone(),
879                ffqn,
880                params,
881                parent,
882                scheduled_at,
883                retry_exp_backoff,
884                max_retries,
885                component_id,
886                metadata,
887                scheduled_by,
888            })
889        } else {
890            error!("Cannt match `Created` event - {event:?}");
891            Err(DbError::Specific(SpecificError::ConsistencyError(
892                StrVariant::Static("Cannot deserialize `Created` event"),
893            )))
894        }
895    }
896
897    fn count_temporary_events(
898        conn: &Connection,
899        execution_id: &ExecutionId,
900    ) -> Result<u32, DbError> {
901        let mut stmt = conn.prepare(
902            "SELECT COUNT(*) as count FROM t_execution_log WHERE execution_id = :execution_id AND (variant = :v1 OR variant = :v2)",
903        ).map_err(convert_err)?;
904        stmt.query_row(
905            named_params! {
906                ":execution_id": execution_id.to_string(),
907                ":v1": DUMMY_TEMPORARILY_TIMED_OUT.variant(),
908                ":v2": DUMMY_TEMPORARILY_FAILED.variant(),
909            },
910            |row| row.get("count"),
911        )
912        .map_err(convert_err)
913    }
914
915    fn get_current_version(
916        tx: &Transaction,
917        execution_id: &ExecutionId,
918    ) -> Result<Version, DbError> {
919        let mut stmt = tx.prepare(
920            "SELECT version FROM t_execution_log WHERE execution_id = :execution_id ORDER BY version DESC LIMIT 1",
921        ).map_err(convert_err)?;
922        stmt.query_row(
923            named_params! {
924                ":execution_id": execution_id.to_string(),
925            },
926            |row| {
927                let version: VersionType = row.get("version")?;
928                Ok(Version::new(version))
929            },
930        )
931        .map_err(convert_err)
932    }
933
934    fn get_next_version(tx: &Transaction, execution_id: &ExecutionId) -> Result<Version, DbError> {
935        Self::get_current_version(tx, execution_id).map(|ver| Version::new(ver.0 + 1))
936    }
937
938    fn check_expected_next_and_appending_version(
939        expected_version: &Version,
940        appending_version: &Version,
941    ) -> Result<(), DbError> {
942        if *expected_version != *appending_version {
943            debug!(
944                "Version mismatch - expected: {expected_version:?}, appending: {appending_version:?}"
945            );
946            return Err(DbError::Specific(SpecificError::VersionMismatch {
947                appending_version: appending_version.clone(),
948                expected_version: expected_version.clone(),
949            }));
950        }
951        Ok(())
952    }
953
954    #[instrument(level = Level::TRACE, skip_all, fields(execution_id = %req.execution_id))]
955    fn create_inner(
956        tx: &Transaction,
957        req: CreateRequest,
958    ) -> Result<(AppendResponse, PendingAt), DbError> {
959        debug!("create_inner");
960
961        let version = Version::new(0);
962        let execution_id = req.execution_id.clone();
963        let execution_id_str = execution_id.to_string();
964        let ffqn = req.ffqn.clone();
965        let created_at = req.created_at;
966        let scheduled_at = req.scheduled_at;
967        let event = ExecutionEventInner::from(req);
968        let event_ser = serde_json::to_string(&event)
969            .map_err(|err| {
970                error!("Cannot serialize {event:?} - {err:?}");
971                rusqlite::Error::ToSqlConversionFailure(err.into())
972            })
973            .map_err(convert_err)?;
974        tx.prepare(
975                "INSERT INTO t_execution_log (execution_id, created_at, version, json_value, variant, join_set_id ) \
976                VALUES (:execution_id, :created_at, :version, :json_value, :variant, :join_set_id)")
977        .map_err(convert_err)?
978        .execute(named_params! {
979            ":execution_id": &execution_id_str,
980            ":created_at": created_at,
981            ":version": version.0,
982            ":json_value": event_ser,
983            ":variant": event.variant(),
984            ":join_set_id": event.join_set_id().map(std::string::ToString::to_string),
985        })
986        .map_err(convert_err)?;
987        let next_version = Version::new(version.0 + 1);
988        let pending_state = PendingState::PendingAt { scheduled_at };
989        let pending_at = {
990            let scheduled_at = assert_matches!(pending_state, PendingState::PendingAt { scheduled_at } => scheduled_at);
991            debug!("Creating with `Pending(`{scheduled_at:?}`)");
992            tx.prepare(
993                "INSERT INTO t_state (created_at, scheduled_at, state, execution_id, is_top_level, next_version, pending_expires_finished, ffqn) \
994                VALUES (:created_at, :scheduled_at, :state, :execution_id, :is_top_level, :next_version, :pending_expires_finished, :ffqn)",
995            )
996            .map_err(convert_err)?
997            .execute(named_params! {
998                ":created_at": created_at,
999                ":scheduled_at": scheduled_at,
1000                ":state": STATE_PENDING_AT,
1001                ":execution_id": execution_id.to_string(),
1002                ":is_top_level": execution_id.is_top_level(),
1003                ":next_version": next_version.0,
1004                ":pending_expires_finished": scheduled_at,
1005                ":ffqn": ffqn.to_string(),
1006            })
1007            .map_err(convert_err)?;
1008            PendingAt { scheduled_at, ffqn }
1009        };
1010        Ok((next_version, pending_at))
1011    }
1012
1013    #[instrument(level = Level::DEBUG, skip_all, fields(%execution_id, %pending_state, %next_version, purge, ffqn = ?ffqn_when_creating))]
1014    fn update_state(
1015        tx: &Transaction,
1016        execution_id: &ExecutionId,
1017        pending_state: &PendingState,
1018        expected_current_version: &Version,
1019        next_version: &Version,
1020        ffqn_when_creating: Option<FunctionFqn>, // or fetched lazily
1021    ) -> Result<IndexUpdated, DbError> {
1022        debug!("update_index");
1023
1024        match &pending_state {
1025            PendingState::PendingAt { scheduled_at } => {
1026                debug!("Setting state `Pending(`{scheduled_at:?}`)");
1027                let updated = tx
1028                    .prepare(
1029                        "UPDATE t_state SET \
1030                        state=:state, next_version = :next_version, pending_expires_finished = :pending_expires_finished, \
1031                        join_set_id = NULL,join_set_closing=NULL, \
1032                        executor_id = NULL,run_id = NULL,temporary_event_count = NULL, \
1033                        max_retries = NULL,retry_exp_backoff_millis = NULL,parent_execution_id = NULL,parent_join_set_id = NULL, locked_at_version = NULL, \
1034                        result_kind = NULL \
1035                        WHERE execution_id = :execution_id AND next_version = :expected_current_version",
1036                    )
1037                    .map_err(convert_err)?
1038                    .execute(named_params! {
1039                        ":next_version": next_version.0,
1040                        ":pending_expires_finished": scheduled_at,
1041                        ":state": STATE_PENDING_AT,
1042
1043                        ":execution_id": execution_id.to_string(),
1044                        ":expected_current_version": expected_current_version.0,
1045
1046                    })
1047                    .map_err(convert_err)?;
1048                if updated != 1 {
1049                    error!(backtrace = %std::backtrace::Backtrace::capture(), "Failed updating state to {pending_state:?}");
1050                    return Err(DbError::Specific(SpecificError::ConsistencyError(
1051                        "updating state failed".into(),
1052                    )));
1053                }
1054                let ffqn = if let Some(ffqn) = ffqn_when_creating {
1055                    ffqn
1056                } else {
1057                    Self::fetch_created_event(tx, execution_id)?.ffqn
1058                };
1059                Ok(IndexUpdated {
1060                    temporary_event_count: None,
1061                    pending_at: Some(PendingAt {
1062                        scheduled_at: *scheduled_at,
1063                        ffqn,
1064                    }),
1065                })
1066            }
1067            PendingState::Locked {
1068                lock_expires_at,
1069                executor_id,
1070                run_id,
1071            } => {
1072                debug!(
1073                    %executor_id,
1074                    %run_id, "Setting state `Locked({next_version}, {lock_expires_at})`"
1075                );
1076                let temporary_event_count = Self::count_temporary_events(tx, execution_id)?;
1077                let create_req = Self::fetch_created_event(tx, execution_id)?;
1078
1079                let updated = tx
1080                    .prepare(
1081                        "UPDATE t_state SET \
1082                        state=:state, next_version = :next_version, pending_expires_finished = :pending_expires_finished, \
1083                        join_set_id = NULL,join_set_closing=NULL, \
1084                        executor_id = :executor_id, run_id = :run_id, temporary_event_count = :temporary_event_count, \
1085                        max_retries = :max_retries, retry_exp_backoff_millis = :retry_exp_backoff_millis, \
1086                        parent_execution_id = :parent_execution_id, parent_join_set_id = :parent_join_set_id, locked_at_version = :locked_at_version, \
1087                        result_kind = NULL \
1088                        WHERE execution_id = :execution_id AND next_version = :expected_current_version",
1089                    )
1090                    .map_err(convert_err)?
1091                    .execute(named_params! {
1092                        ":next_version": next_version.0,
1093                        ":pending_expires_finished": lock_expires_at,
1094                        ":state": STATE_LOCKED,
1095
1096                        ":executor_id": executor_id.to_string(),
1097                        ":run_id": run_id.to_string(),
1098                        ":temporary_event_count": temporary_event_count,
1099                        ":max_retries": create_req.max_retries,
1100                        ":retry_exp_backoff_millis": u64::try_from(create_req.retry_exp_backoff.as_millis()).unwrap(),
1101                        ":parent_execution_id": create_req.parent.as_ref().map(|(pid, _) | pid.to_string()),
1102                        ":parent_join_set_id": create_req.parent.as_ref().map(|(_, join_set_id)| join_set_id.to_string()),
1103                        ":locked_at_version": expected_current_version.0,
1104
1105                        ":execution_id": execution_id.to_string(),
1106                        ":expected_current_version": expected_current_version.0,
1107
1108                    })
1109                    .map_err(convert_err)?;
1110                if updated != 1 {
1111                    error!(backtrace = %std::backtrace::Backtrace::capture(), "Failed updating state to {pending_state:?}");
1112                    return Err(DbError::Specific(SpecificError::ConsistencyError(
1113                        "updating state failed".into(),
1114                    )));
1115                }
1116
1117                Ok(IndexUpdated {
1118                    temporary_event_count: Some(temporary_event_count),
1119                    pending_at: None,
1120                })
1121            }
1122            PendingState::BlockedByJoinSet {
1123                join_set_id,
1124                lock_expires_at,
1125                closing: join_set_closing,
1126            } => {
1127                debug!(%join_set_id, "Setting state `BlockedByJoinSet({next_version},{lock_expires_at})`");
1128                let updated = tx
1129                    .prepare(
1130                        "UPDATE t_state SET \
1131                        state=:state, next_version = :next_version, pending_expires_finished = :pending_expires_finished, \
1132                        join_set_id = :join_set_id, join_set_closing=:join_set_closing, \
1133                        executor_id = NULL,run_id = NULL,temporary_event_count = NULL, \
1134                        max_retries = NULL,retry_exp_backoff_millis = NULL,parent_execution_id = NULL,parent_join_set_id = NULL, locked_at_version = NULL, \
1135                        result_kind = NULL \
1136                        WHERE execution_id = :execution_id AND next_version = :expected_current_version",
1137                    )
1138                    .map_err(convert_err)?
1139                    .execute(named_params! {
1140                        ":next_version": next_version.0,
1141                        ":pending_expires_finished": lock_expires_at,
1142                        ":state": STATE_BLOCKED_BY_JOIN_SET,
1143
1144                        ":join_set_id": join_set_id.to_string(),
1145                        ":join_set_closing": join_set_closing,
1146
1147                        ":execution_id": execution_id.to_string(),
1148                        ":expected_current_version": expected_current_version.0,
1149
1150                    })
1151                    .map_err(convert_err)?;
1152                if updated != 1 {
1153                    error!(backtrace = %std::backtrace::Backtrace::capture(), "Failed updating state to {pending_state:?}");
1154                    return Err(DbError::Specific(SpecificError::ConsistencyError(
1155                        "updating state failed".into(),
1156                    )));
1157                }
1158
1159                Ok(IndexUpdated::default())
1160            }
1161            PendingState::Finished {
1162                finished:
1163                    PendingStateFinished {
1164                        version: finished_version,
1165                        finished_at,
1166                        result_kind,
1167                    },
1168            } => {
1169                debug!("Setting state `Finished`");
1170                assert_eq!(*finished_version, expected_current_version.0);
1171                let updated = tx
1172                    .prepare(
1173                        "UPDATE t_state SET \
1174                        state=:state, next_version = :next_version, pending_expires_finished = :pending_expires_finished, \
1175                        join_set_id = NULL,join_set_closing=NULL, \
1176                        executor_id = NULL,run_id = NULL,temporary_event_count = NULL, \
1177                        max_retries = NULL,retry_exp_backoff_millis = NULL,parent_execution_id = NULL,parent_join_set_id = NULL, locked_at_version = NULL, \
1178                        result_kind = :result_kind \
1179                        WHERE execution_id = :execution_id AND next_version = :expected_current_version",
1180                    )
1181                    .map_err(convert_err)?
1182                    .execute(named_params! {
1183                        ":next_version": finished_version,
1184                        ":pending_expires_finished": finished_at,
1185                        ":state": STATE_FINISHED,
1186
1187                        ":result_kind": result_kind.to_string(),
1188
1189                        ":execution_id": execution_id.to_string(),
1190                        ":expected_current_version": expected_current_version.0,
1191
1192                    })
1193                    .map_err(convert_err)?;
1194                if updated != 1 {
1195                    error!(backtrace = %std::backtrace::Backtrace::capture(), "Failed updating state to {pending_state:?}");
1196                    return Err(DbError::Specific(SpecificError::ConsistencyError(
1197                        "updating state failed".into(),
1198                    )));
1199                }
1200
1201                Ok(IndexUpdated::default())
1202            }
1203        }
1204    }
1205
1206    #[instrument(level = Level::TRACE, skip_all, fields(%execution_id, %next_version, purge))]
1207    fn bump_state_next_version(
1208        tx: &Transaction,
1209        execution_id: &ExecutionId,
1210        next_version: &Version,
1211        delay_req: Option<DelayReq>,
1212    ) -> Result<(), DbError> {
1213        debug!("update_index_version");
1214        let execution_id_str = execution_id.to_string();
1215        let mut stmt = tx.prepare_cached(
1216            "UPDATE t_state SET next_version = :next_version WHERE execution_id = :execution_id AND next_version = :next_version - 1",
1217        ).map_err(convert_err)?;
1218        let updated = stmt
1219            .execute(named_params! {
1220                ":execution_id": execution_id_str,
1221                ":next_version": next_version.0,
1222            })
1223            .map_err(convert_err)?;
1224        if updated != 1 {
1225            return Err(DbError::Specific(SpecificError::NotFound));
1226        }
1227        if let Some(DelayReq {
1228            join_set_id,
1229            delay_id,
1230            expires_at,
1231        }) = delay_req
1232        {
1233            debug!("Inserting delay to `t_delay`");
1234            let mut stmt = tx
1235                .prepare_cached(
1236                    "INSERT INTO t_delay (execution_id, join_set_id, delay_id, expires_at) \
1237                VALUES \
1238                (:execution_id, :join_set_id, :delay_id, :expires_at)",
1239                )
1240                .map_err(convert_err)?;
1241            stmt.execute(named_params! {
1242                ":execution_id": execution_id_str,
1243                ":join_set_id": join_set_id.to_string(),
1244                ":delay_id": delay_id.to_string(),
1245                ":expires_at": expires_at,
1246            })
1247            .map_err(convert_err)?;
1248        }
1249        Ok(())
1250    }
1251
1252    fn get_combined_state(
1253        tx: &Transaction,
1254        execution_id: &ExecutionId,
1255    ) -> Result<CombinedState, DbError> {
1256        let mut stmt = tx.prepare(
1257            "SELECT state, ffqn, next_version, pending_expires_finished, executor_id, run_id, join_set_id, join_set_closing, result_kind FROM t_state WHERE \
1258            execution_id = :execution_id",
1259        ).map_err(convert_err)?;
1260        stmt.query_row(
1261            named_params! {
1262                ":execution_id": execution_id.to_string(),
1263            },
1264            |row| {
1265                Ok(CombinedState::new(
1266                    &CombinedStateDTO {
1267                        state: row.get("state")?,
1268                        ffqn: row.get("ffqn")?,
1269                        pending_expires_finished: row
1270                            .get::<_, DateTime<Utc>>("pending_expires_finished")?,
1271                        executor_id: row
1272                            .get::<_, Option<ExecutorIdW>>("executor_id")?
1273                            .map(|w| w.0),
1274                        run_id: row.get::<_, Option<RunIdW>>("run_id")?.map(|w| w.0),
1275                        join_set_id: row.get::<_, Option<JoinSetId>>("join_set_id")?,
1276                        join_set_closing: row.get::<_, Option<bool>>("join_set_closing")?,
1277                        result_kind: row
1278                            .get::<_, Option<FromStrWrapper<PendingStateFinishedResultKind>>>(
1279                                "result_kind",
1280                            )?
1281                            .map(|wrapper| wrapper.0),
1282                    },
1283                    Version::new(row.get("next_version")?),
1284                ))
1285            },
1286        )
1287        .map_err(convert_err)?
1288    }
1289
1290    fn list_executions(
1291        read_tx: &Transaction,
1292        ffqn: Option<FunctionFqn>,
1293        top_level_only: bool,
1294        pagination: ExecutionListPagination,
1295    ) -> Result<Vec<ExecutionWithState>, DbError> {
1296        struct StatementModifier {
1297            where_vec: Vec<String>,
1298            params: Vec<(&'static str, Box<dyn rusqlite::ToSql>)>,
1299            limit: u32,
1300            limit_desc: bool,
1301        }
1302        fn paginate<T: rusqlite::ToSql + 'static>(
1303            pagination: Pagination<Option<T>>,
1304            column: &str,
1305            top_level_only: bool,
1306        ) -> StatementModifier {
1307            let mut where_vec: Vec<String> = vec![];
1308            let mut params: Vec<(&'static str, Box<dyn rusqlite::ToSql>)> = vec![];
1309            let limit = pagination.length();
1310            let limit_desc = pagination.is_desc();
1311            let rel = pagination.rel();
1312            match pagination {
1313                Pagination::NewerThan {
1314                    cursor: Some(cursor),
1315                    ..
1316                }
1317                | Pagination::OlderThan {
1318                    cursor: Some(cursor),
1319                    ..
1320                } => {
1321                    where_vec.push(format!("{column} {rel} :cursor"));
1322                    params.push((":cursor", Box::new(cursor)));
1323                }
1324                _ => {}
1325            }
1326            if top_level_only {
1327                where_vec.push("is_top_level=true".to_string());
1328            }
1329            StatementModifier {
1330                where_vec,
1331                params,
1332                limit,
1333                limit_desc,
1334            }
1335        }
1336        let mut statement_mod = match pagination {
1337            ExecutionListPagination::CreatedBy(pagination) => {
1338                paginate(pagination, "created_at", top_level_only)
1339            }
1340            ExecutionListPagination::ExecutionId(pagination) => {
1341                paginate(pagination, "execution_id", top_level_only)
1342            }
1343        };
1344
1345        if let Some(ffqn) = ffqn {
1346            statement_mod.where_vec.push("ffqn = :ffqn".to_string());
1347            statement_mod
1348                .params
1349                .push((":ffqn", Box::new(ffqn.to_string())));
1350        }
1351
1352        let where_str = if statement_mod.where_vec.is_empty() {
1353            String::new()
1354        } else {
1355            format!("WHERE {}", statement_mod.where_vec.join(" AND "))
1356        };
1357        let sql = format!(
1358            "SELECT created_at, scheduled_at, state, execution_id, ffqn, next_version, \
1359            pending_expires_finished, executor_id, run_id, join_set_id, join_set_closing, result_kind \
1360            FROM t_state {where_str} ORDER BY created_at {desc} LIMIT {limit}",
1361            desc = if statement_mod.limit_desc { "DESC" } else { "" },
1362            limit = statement_mod.limit
1363        );
1364        let mut vec: Vec<_> = read_tx
1365            .prepare(&sql)
1366            .map_err(convert_err)?
1367            .query_map::<_, &[(&'static str, &dyn rusqlite::ToSql)], _>(
1368                statement_mod
1369                    .params
1370                    .iter()
1371                    .map(|(key, value)| (*key, value.as_ref()))
1372                    .collect::<Vec<_>>()
1373                    .as_ref(),
1374                |row| {
1375                    let execution_id_res = row
1376                        .get::<_, String>("execution_id")?
1377                        .parse::<ExecutionId>()
1378                        .map_err(parsing_err);
1379                    let created_at = row.get("created_at")?;
1380                    let scheduled_at = row.get("scheduled_at")?;
1381                    let combined_state_res = CombinedState::new(
1382                        &CombinedStateDTO {
1383                            state: row.get("state")?,
1384                            ffqn: row.get("ffqn")?,
1385                            pending_expires_finished: row
1386                                .get::<_, DateTime<Utc>>("pending_expires_finished")?,
1387                            executor_id: row
1388                                .get::<_, Option<ExecutorIdW>>("executor_id")?
1389                                .map(|w| w.0),
1390                            run_id: row.get::<_, Option<RunIdW>>("run_id")?.map(|w| w.0),
1391                            join_set_id: row.get::<_, Option<JoinSetId>>("join_set_id")?,
1392                            join_set_closing: row.get::<_, Option<bool>>("join_set_closing")?,
1393                            result_kind: row
1394                                .get::<_, Option<FromStrWrapper<PendingStateFinishedResultKind>>>(
1395                                    "result_kind",
1396                                )?
1397                                .map(|wrapper| wrapper.0),
1398                        },
1399                        Version::new(row.get("next_version")?),
1400                    );
1401                    Ok(combined_state_res.and_then(|combined_state| {
1402                        execution_id_res.map(|execution_id| ExecutionWithState {
1403                            execution_id,
1404                            ffqn: combined_state.ffqn,
1405                            pending_state: combined_state.pending_state,
1406                            created_at,
1407                            scheduled_at,
1408                        })
1409                    }))
1410                },
1411            )
1412            .map_err(convert_err)?
1413            .collect::<Result<Vec<_>, _>>()
1414            .map_err(convert_err)?
1415            .into_iter()
1416            .collect::<Result<_, _>>()?;
1417
1418        if !statement_mod.limit_desc {
1419            // the list must be sorted in descending order
1420            vec.reverse();
1421        }
1422        Ok(vec)
1423    }
1424
1425    #[instrument(level = Level::TRACE, skip_all, fields(%execution_id, %run_id, %executor_id))]
1426    #[expect(clippy::too_many_arguments)]
1427    fn lock_inner(
1428        tx: &Transaction,
1429        created_at: DateTime<Utc>,
1430        component_id: ComponentId,
1431        execution_id: &ExecutionId,
1432        run_id: RunId,
1433        appending_version: &Version,
1434        executor_id: ExecutorId,
1435        lock_expires_at: DateTime<Utc>,
1436    ) -> Result<LockedExecution, DbError> {
1437        debug!("lock_inner");
1438        let CombinedState {
1439            ffqn,
1440            pending_state,
1441            next_version: expected_version,
1442        } = Self::get_combined_state(tx, execution_id)?;
1443        pending_state
1444            .can_append_lock(created_at, executor_id, run_id, lock_expires_at)
1445            .map_err(DbError::Specific)?;
1446        Self::check_expected_next_and_appending_version(&expected_version, appending_version)?;
1447
1448        // Append to `execution_log` table.
1449        let event = ExecutionEventInner::Locked {
1450            component_id,
1451            executor_id,
1452            lock_expires_at,
1453            run_id,
1454        };
1455        let event_ser = serde_json::to_string(&event)
1456            .map_err(|err| {
1457                error!("Cannot serialize {event:?} - {err:?}");
1458                rusqlite::Error::ToSqlConversionFailure(err.into())
1459            })
1460            .map_err(convert_err)?;
1461        let mut stmt = tx
1462            .prepare_cached(
1463                "INSERT INTO t_execution_log \
1464            (execution_id, created_at, json_value, version, variant) \
1465            VALUES \
1466            (:execution_id, :created_at, :json_value, :version, :variant)",
1467            )
1468            .map_err(convert_err)?;
1469        stmt.execute(named_params! {
1470            ":execution_id": execution_id.to_string(),
1471            ":created_at": created_at,
1472            ":json_value": event_ser,
1473            ":version": appending_version.0,
1474            ":variant": event.variant(),
1475        })
1476        .map_err(convert_err)?;
1477        // Update `t_state`
1478        let pending_state = PendingState::Locked {
1479            executor_id,
1480            run_id,
1481            lock_expires_at,
1482        };
1483
1484        let next_version = Version::new(appending_version.0 + 1);
1485        let temporary_event_count = Self::update_state(
1486            tx,
1487            execution_id,
1488            &pending_state,
1489            &Version(next_version.0 - 1),
1490            &next_version,
1491            Some(ffqn),
1492        )?
1493        .temporary_event_count
1494        .expect("temporary_event_count must be set");
1495        // Fetch event_history and `Created` event to construct the response.
1496        let mut events = tx
1497            .prepare(
1498                "SELECT json_value FROM t_execution_log WHERE \
1499                execution_id = :execution_id AND (variant = :v1 OR variant = :v2) \
1500                ORDER BY version",
1501            )
1502            .map_err(convert_err)?
1503            .query_map(
1504                named_params! {
1505                    ":execution_id": execution_id.to_string(),
1506                    ":v1": DUMMY_CREATED.variant(),
1507                    ":v2": DUMMY_HISTORY_EVENT.variant(),
1508                },
1509                |row| {
1510                    let event = row
1511                        .get::<_, JsonWrapper<ExecutionEventInner>>("json_value")
1512                        .map(|wrapper| wrapper.0)
1513                        .map_err(|serde| {
1514                            error!("Cannot deserialize {row:?} - {serde:?}");
1515                            parsing_err(serde)
1516                        });
1517                    Ok(event)
1518                },
1519            )
1520            .map_err(convert_err)?
1521            .collect::<Result<Vec<_>, _>>()
1522            .map_err(convert_err)?
1523            .into_iter()
1524            .collect::<Result<VecDeque<_>, _>>()?;
1525        let Some(ExecutionEventInner::Created {
1526            ffqn,
1527            params,
1528            scheduled_at,
1529            retry_exp_backoff,
1530            max_retries,
1531            parent,
1532            metadata,
1533            ..
1534        }) = events.pop_front()
1535        else {
1536            error!("Execution log must contain at least `Created` event");
1537            return Err(DbError::Specific(SpecificError::ConsistencyError(
1538                StrVariant::Static("execution log must contain `Created` event"),
1539            )));
1540        };
1541        let responses = Self::list_responses(tx, execution_id, None)?
1542            .into_iter()
1543            .map(|resp| resp.event)
1544            .collect();
1545        trace!("Responses: {responses:?}");
1546        let event_history = events
1547            .into_iter()
1548            .map(|event| {
1549                if let ExecutionEventInner::HistoryEvent { event } = event {
1550                    Ok(event)
1551                } else {
1552                    error!("Rows can only contain `Created` and `HistoryEvent` event kinds");
1553                    Err(DbError::Specific(SpecificError::ConsistencyError(
1554                        StrVariant::Static(
1555                            "Rows can only contain `Created` and `HistoryEvent` event kinds",
1556                        ),
1557                    )))
1558                }
1559            })
1560            .collect::<Result<Vec<_>, _>>()?;
1561        Ok(LockedExecution {
1562            execution_id: execution_id.clone(),
1563            metadata,
1564            run_id,
1565            version: next_version,
1566            ffqn,
1567            params,
1568            event_history,
1569            responses,
1570            scheduled_at,
1571            retry_exp_backoff,
1572            max_retries,
1573            parent,
1574            temporary_event_count,
1575        })
1576    }
1577
1578    fn count_join_next(
1579        tx: &Transaction,
1580        execution_id: &ExecutionId,
1581        join_set_id: &JoinSetId,
1582    ) -> Result<u64, DbError> {
1583        let mut stmt = tx.prepare(
1584            "SELECT COUNT(*) as count FROM t_execution_log WHERE execution_id = :execution_id AND join_set_id = :join_set_id \
1585            AND history_event_type = :join_next",
1586        ).map_err(convert_err)?;
1587        stmt.query_row(
1588            named_params! {
1589                ":execution_id": execution_id.to_string(),
1590                ":join_set_id": join_set_id.to_string(),
1591                ":join_next": "JoinNext",
1592            },
1593            |row| row.get("count"),
1594        )
1595        .map_err(convert_err)
1596    }
1597
1598    #[instrument(level = Level::TRACE, skip_all, fields(%execution_id))]
1599    fn append(
1600        tx: &Transaction,
1601        execution_id: &ExecutionId,
1602        req: &AppendRequest,
1603        appending_version: &Version,
1604    ) -> Result<(AppendResponse, Option<PendingAt>), DbError> {
1605        assert!(!matches!(req.event, ExecutionEventInner::Created { .. }));
1606        if let ExecutionEventInner::Locked {
1607            component_id,
1608            executor_id,
1609            run_id,
1610            lock_expires_at,
1611        } = &req.event
1612        {
1613            let locked = Self::lock_inner(
1614                tx,
1615                req.created_at,
1616                component_id.clone(),
1617                execution_id,
1618                *run_id,
1619                appending_version,
1620                *executor_id,
1621                *lock_expires_at,
1622            )?;
1623            return Ok((locked.version, None));
1624        }
1625
1626        let combined_state = Self::get_combined_state(tx, execution_id)?;
1627        if combined_state.pending_state.is_finished() {
1628            debug!("Execution is already finished");
1629            return Err(DbError::Specific(SpecificError::ValidationFailed(
1630                StrVariant::Static("execution is already finished"),
1631            )));
1632        }
1633
1634        Self::check_expected_next_and_appending_version(
1635            &combined_state.next_version,
1636            appending_version,
1637        )?;
1638        let event_ser = serde_json::to_string(&req.event)
1639            .map_err(|err| {
1640                error!("Cannot serialize {:?} - {err:?}", req.event);
1641                rusqlite::Error::ToSqlConversionFailure(err.into())
1642            })
1643            .map_err(convert_err)?;
1644
1645        let mut stmt = tx.prepare(
1646                    "INSERT INTO t_execution_log (execution_id, created_at, json_value, version, variant, join_set_id) \
1647                    VALUES (:execution_id, :created_at, :json_value, :version, :variant, :join_set_id)").map_err(convert_err)?;
1648        stmt.execute(named_params! {
1649            ":execution_id": execution_id.to_string(),
1650            ":created_at": req.created_at,
1651            ":json_value": event_ser,
1652            ":version": appending_version.0,
1653            ":variant": req.event.variant(),
1654            ":join_set_id": req.event.join_set_id().map(std::string::ToString::to_string),
1655        })
1656        .map_err(convert_err)?;
1657        // Calculate current pending state
1658        let mut next_version = Version(appending_version.0 + 1);
1659        let index_action = match &req.event {
1660            ExecutionEventInner::Created { .. } => {
1661                unreachable!("handled in the caller")
1662            }
1663
1664            ExecutionEventInner::Locked { .. } => {
1665                unreachable!("handled above")
1666            }
1667            ExecutionEventInner::TemporarilyFailed {
1668                backoff_expires_at, ..
1669            }
1670            | ExecutionEventInner::TemporarilyTimedOut {
1671                backoff_expires_at, ..
1672            }
1673            | ExecutionEventInner::Unlocked {
1674                backoff_expires_at, ..
1675            } => IndexAction::PendingStateChanged(PendingState::PendingAt {
1676                scheduled_at: *backoff_expires_at,
1677            }),
1678            ExecutionEventInner::Finished { result, .. } => {
1679                next_version = appending_version.clone();
1680                IndexAction::PendingStateChanged(PendingState::Finished {
1681                    finished: PendingStateFinished {
1682                        version: appending_version.0,
1683                        finished_at: req.created_at,
1684                        result_kind: PendingStateFinishedResultKind::from(result),
1685                    },
1686                })
1687            }
1688            ExecutionEventInner::HistoryEvent {
1689                event:
1690                    HistoryEvent::JoinSetCreate { .. }
1691                    | HistoryEvent::JoinSetRequest {
1692                        request: JoinSetRequest::ChildExecutionRequest { .. },
1693                        ..
1694                    }
1695                    | HistoryEvent::Persist { .. }
1696                    | HistoryEvent::Schedule { .. }
1697                    | HistoryEvent::Stub { .. }
1698                    | HistoryEvent::JoinNextTooMany { .. },
1699            } => IndexAction::NoPendingStateChange(None),
1700
1701            ExecutionEventInner::HistoryEvent {
1702                event:
1703                    HistoryEvent::JoinSetRequest {
1704                        join_set_id,
1705                        request:
1706                            JoinSetRequest::DelayRequest {
1707                                delay_id,
1708                                expires_at,
1709                                ..
1710                            },
1711                    },
1712            } => IndexAction::NoPendingStateChange(Some(DelayReq {
1713                join_set_id: join_set_id.clone(),
1714                delay_id: delay_id.clone(),
1715                expires_at: *expires_at,
1716            })),
1717
1718            ExecutionEventInner::HistoryEvent {
1719                event:
1720                    HistoryEvent::JoinNext {
1721                        join_set_id,
1722                        run_expires_at,
1723                        closing,
1724                        requested_ffqn: _,
1725                    },
1726            } => {
1727                // Did the response arrive already?
1728                let join_next_count = Self::count_join_next(tx, execution_id, join_set_id)?;
1729                let nth_response =
1730                    Self::nth_response(tx, execution_id, join_set_id, join_next_count - 1)?; // Skip n-1 rows
1731                trace!("join_next_count: {join_next_count}, nth_response: {nth_response:?}");
1732                assert!(join_next_count > 0);
1733                if let Some(JoinSetResponseEventOuter {
1734                    created_at: nth_created_at,
1735                    ..
1736                }) = nth_response
1737                {
1738                    // No need to block
1739                    let scheduled_at = max(*run_expires_at, nth_created_at);
1740                    IndexAction::PendingStateChanged(PendingState::PendingAt { scheduled_at })
1741                } else {
1742                    IndexAction::PendingStateChanged(PendingState::BlockedByJoinSet {
1743                        join_set_id: join_set_id.clone(),
1744                        lock_expires_at: *run_expires_at,
1745                        closing: *closing,
1746                    })
1747                }
1748            }
1749        };
1750
1751        let pending_at = match index_action {
1752            IndexAction::PendingStateChanged(pending_state) => {
1753                let expected_current_version = Version(
1754                    if let PendingState::Finished {
1755                        finished:
1756                            PendingStateFinished {
1757                                version: finished_version,
1758                                ..
1759                            },
1760                    } = &pending_state
1761                    {
1762                        assert_eq!(*finished_version, next_version.0); // `next_state` must not be bumped.
1763                        next_version.0
1764                    } else {
1765                        next_version.0 - 1
1766                    },
1767                );
1768
1769                Self::update_state(
1770                    tx,
1771                    execution_id,
1772                    &pending_state,
1773                    &expected_current_version,
1774                    &next_version,
1775                    None,
1776                )?
1777                .pending_at
1778            }
1779            IndexAction::NoPendingStateChange(delay_req) => {
1780                Self::bump_state_next_version(tx, execution_id, &next_version, delay_req)?;
1781                None
1782            }
1783        };
1784        Ok((next_version, pending_at))
1785    }
1786
1787    fn append_response(
1788        tx: &Transaction,
1789        execution_id: &ExecutionId,
1790        req: &JoinSetResponseEventOuter,
1791        response_subscribers: &ResponseSubscribers,
1792    ) -> Result<
1793        (
1794            Option<oneshot::Sender<JoinSetResponseEventOuter>>,
1795            Option<PendingAt>,
1796        ),
1797        DbError,
1798    > {
1799        let mut stmt = tx.prepare(
1800            "INSERT INTO t_join_set_response (execution_id, created_at, join_set_id, delay_id, child_execution_id, finished_version) \
1801                    VALUES (:execution_id, :created_at, :join_set_id, :delay_id, :child_execution_id, :finished_version)",
1802        ).map_err(convert_err)?;
1803        let join_set_id = &req.event.join_set_id;
1804        let delay_id = match &req.event.event {
1805            JoinSetResponse::DelayFinished { delay_id } => Some(delay_id.to_string()),
1806            JoinSetResponse::ChildExecutionFinished { .. } => None,
1807        };
1808        let (child_execution_id, finished_version) = match &req.event.event {
1809            JoinSetResponse::ChildExecutionFinished {
1810                child_execution_id,
1811                finished_version,
1812                result: _,
1813            } => (
1814                Some(child_execution_id.to_string()),
1815                Some(finished_version.0),
1816            ),
1817            JoinSetResponse::DelayFinished { .. } => (None, None),
1818        };
1819
1820        stmt.execute(named_params! {
1821            ":execution_id": execution_id.to_string(),
1822            ":created_at": req.created_at,
1823            ":join_set_id": join_set_id.to_string(),
1824            ":delay_id": delay_id,
1825            ":child_execution_id": child_execution_id,
1826            ":finished_version": finished_version,
1827        })
1828        .map_err(convert_err)?;
1829
1830        // if the execution is going to be unblocked by this response...
1831        let previous_pending_state = Self::get_combined_state(tx, execution_id)?.pending_state;
1832        debug!("previous_pending_state: {previous_pending_state:?}");
1833        let pendnig_at = match previous_pending_state {
1834            PendingState::BlockedByJoinSet {
1835                join_set_id: found_join_set_id,
1836                lock_expires_at, // Set to a future time if the worker is keeping the execution warm waiting for the result.
1837                closing: _,
1838            } if *join_set_id == found_join_set_id => {
1839                // PendingAt should be set to current time if called from expired_timers_watcher,
1840                // or to a future time if the execution is hot.
1841                let scheduled_at = max(lock_expires_at, req.created_at);
1842                // TODO: Add diff test
1843                // update the pending state.
1844                let pending_state = PendingState::PendingAt { scheduled_at };
1845                let next_version = Self::get_next_version(tx, execution_id)?;
1846                Self::update_state(
1847                    tx,
1848                    execution_id,
1849                    &pending_state,
1850                    &next_version, // not changing the version
1851                    &next_version,
1852                    None,
1853                )?
1854                .pending_at
1855            }
1856            _ => None,
1857        };
1858        if let JoinSetResponseEvent {
1859            join_set_id,
1860            event: JoinSetResponse::DelayFinished { delay_id },
1861        } = &req.event
1862        {
1863            debug!(%join_set_id, %delay_id, "Deleting from `t_delay`");
1864            let mut stmt =
1865                tx.prepare_cached("DELETE FROM t_delay WHERE execution_id = :execution_id AND join_set_id = :join_set_id AND delay_id = :delay_id").map_err(convert_err)?;
1866            stmt.execute(named_params! {
1867                ":execution_id": execution_id.to_string(),
1868                ":join_set_id": join_set_id.to_string(),
1869                ":delay_id": delay_id.to_string(),
1870            })
1871            .map_err(convert_err)?;
1872        }
1873        Ok((
1874            response_subscribers.lock().unwrap().remove(execution_id),
1875            pendnig_at,
1876        ))
1877    }
1878
1879    fn append_backtrace(tx: &Transaction, backtrace_info: &BacktraceInfo) -> Result<(), DbError> {
1880        let mut stmt = tx
1881            .prepare(
1882                "INSERT INTO t_backtrace (execution_id, component_id, version_min_including, version_max_excluding, wasm_backtrace) \
1883                    VALUES (:execution_id, :component_id, :version_min_including, :version_max_excluding, :wasm_backtrace)",
1884            )
1885            .map_err(convert_err)?;
1886        let backtrace = serde_json::to_string(&backtrace_info.wasm_backtrace)
1887            .map_err(|err| {
1888                error!(
1889                    "Cannot serialize backtrace {:?} - {err:?}",
1890                    backtrace_info.wasm_backtrace
1891                );
1892                rusqlite::Error::ToSqlConversionFailure(err.into())
1893            })
1894            .map_err(convert_err)?;
1895        stmt.execute(named_params! {
1896            ":execution_id": backtrace_info.execution_id.to_string(),
1897            ":component_id": backtrace_info.component_id.to_string(),
1898            ":version_min_including": backtrace_info.version_min_including.0,
1899            ":version_max_excluding": backtrace_info.version_max_excluding.0,
1900            ":wasm_backtrace": backtrace,
1901        })
1902        .map_err(convert_err)?;
1903        Ok(())
1904    }
1905
1906    #[cfg(feature = "test")]
1907    fn get(
1908        tx: &Transaction,
1909        execution_id: &ExecutionId,
1910    ) -> Result<concepts::storage::ExecutionLog, DbError> {
1911        let mut stmt = tx
1912            .prepare(
1913                "SELECT created_at, json_value FROM t_execution_log WHERE \
1914                        execution_id = :execution_id ORDER BY version",
1915            )
1916            .map_err(convert_err)?;
1917        let events = stmt
1918            .query_map(
1919                named_params! {
1920                    ":execution_id": execution_id.to_string(),
1921                },
1922                |row| {
1923                    let created_at = row.get("created_at")?;
1924                    let event = row
1925                        .get::<_, JsonWrapper<ExecutionEventInner>>("json_value")
1926                        .map(|event| ExecutionEvent {
1927                            created_at,
1928                            event: event.0,
1929                            backtrace_id: None,
1930                        })
1931                        .map_err(|serde| {
1932                            error!("Cannot deserialize {row:?} - {serde:?}");
1933                            parsing_err(serde)
1934                        });
1935                    Ok(event)
1936                },
1937            )
1938            .map_err(convert_err)?
1939            .collect::<Result<Vec<_>, _>>()
1940            .map_err(convert_err)?
1941            .into_iter()
1942            .collect::<Result<Vec<_>, _>>()?;
1943        if events.is_empty() {
1944            return Err(DbError::Specific(SpecificError::NotFound));
1945        }
1946        let combined_state = Self::get_combined_state(tx, execution_id)?;
1947        let responses = Self::list_responses(tx, execution_id, None)?
1948            .into_iter()
1949            .map(|resp| resp.event)
1950            .collect();
1951        Ok(concepts::storage::ExecutionLog {
1952            execution_id: execution_id.clone(),
1953            events,
1954            responses,
1955            next_version: combined_state.next_version, // In case of finished, this will be the already last version
1956            pending_state: combined_state.pending_state,
1957        })
1958    }
1959
1960    fn list_execution_events(
1961        tx: &Transaction,
1962        execution_id: &ExecutionId,
1963        version_min: VersionType,
1964        version_max_excluding: VersionType,
1965        include_backtrace_id: bool,
1966    ) -> Result<Vec<ExecutionEvent>, DbError> {
1967        let select = if include_backtrace_id {
1968            "SELECT
1969                log.created_at,
1970                log.json_value,
1971                -- Select version_min_including from backtrace if a match is found, otherwise NULL
1972                bt.version_min_including AS backtrace_id
1973            FROM
1974                t_execution_log AS log
1975            LEFT OUTER JOIN -- Use LEFT JOIN to keep all logs even if no backtrace matches
1976                t_backtrace AS bt ON log.execution_id = bt.execution_id
1977                                -- Check if the log's version falls within the backtrace's range
1978                                AND log.version >= bt.version_min_including
1979                                AND log.version < bt.version_max_excluding
1980            WHERE
1981                log.execution_id = :execution_id
1982                AND log.version >= :version_min
1983                AND log.version < :version_max_excluding
1984            ORDER BY
1985                log.version;"
1986        } else {
1987            "SELECT
1988                created_at, json_value, NULL as backtrace_id
1989            FROM t_execution_log WHERE
1990                execution_id = :execution_id AND version >= :version_min AND version < :version_max_excluding
1991            ORDER BY version"
1992        };
1993        tx.prepare(select)
1994            .map_err(convert_err)?
1995            .query_map(
1996                named_params! {
1997                    ":execution_id": execution_id.to_string(),
1998                    ":version_min": version_min,
1999                    ":version_max_excluding": version_max_excluding
2000                },
2001                |row| {
2002                    let created_at = row.get("created_at")?;
2003                    let backtrace_id = row
2004                        .get::<_, Option<VersionType>>("backtrace_id")?
2005                        .map(Version::new);
2006                    let event = row
2007                        .get::<_, JsonWrapper<ExecutionEventInner>>("json_value")
2008                        .map(|event| ExecutionEvent {
2009                            created_at,
2010                            event: event.0,
2011                            backtrace_id,
2012                        })
2013                        .map_err(|serde| {
2014                            error!("Cannot deserialize {row:?} - {serde:?}");
2015                            parsing_err(serde)
2016                        });
2017                    Ok(event)
2018                },
2019            )
2020            .map_err(convert_err)?
2021            .collect::<Result<Vec<_>, _>>()
2022            .map_err(convert_err)?
2023            .into_iter()
2024            .collect::<Result<Vec<_>, _>>()
2025    }
2026
2027    fn get_execution_event(
2028        tx: &Transaction,
2029        execution_id: &ExecutionId,
2030        version: VersionType,
2031    ) -> Result<ExecutionEvent, DbError> {
2032        let mut stmt = tx
2033            .prepare(
2034                "SELECT created_at, json_value FROM t_execution_log WHERE \
2035                        execution_id = :execution_id AND version = :version",
2036            )
2037            .map_err(convert_err)?;
2038        stmt.query_row(
2039            named_params! {
2040                ":execution_id": execution_id.to_string(),
2041                ":version": version,
2042            },
2043            |row| {
2044                let created_at = row.get("created_at")?;
2045                let event = row
2046                    .get::<_, JsonWrapper<ExecutionEventInner>>("json_value")
2047                    .map(|event| ExecutionEvent {
2048                        created_at,
2049                        event: event.0,
2050                        backtrace_id: None,
2051                    })
2052                    .map_err(|serde| {
2053                        error!("Cannot deserialize {row:?} - {serde:?}");
2054                        parsing_err(serde)
2055                    });
2056                Ok(event)
2057            },
2058        )
2059        .map_err(convert_err)?
2060    }
2061
2062    fn list_responses(
2063        tx: &Transaction,
2064        execution_id: &ExecutionId,
2065        pagination: Option<Pagination<u32>>,
2066    ) -> Result<Vec<ResponseWithCursor>, DbError> {
2067        // TODO: Add test
2068        let mut params: Vec<(&'static str, Box<dyn rusqlite::ToSql>)> = vec![];
2069        let mut sql = "SELECT r.id, r.created_at, r.join_set_id, \
2070            r.delay_id, \
2071            r.child_execution_id, r.finished_version, l.json_value \
2072            FROM t_join_set_response r LEFT OUTER JOIN t_execution_log l ON r.child_execution_id = l.execution_id \
2073            WHERE \
2074            r.execution_id = :execution_id AND \
2075            ( \
2076            r.finished_version = l.version \
2077            OR r.child_execution_id IS NULL \
2078            )"
2079        .to_string();
2080        let limit = match &pagination {
2081            Some(
2082                pagination @ (Pagination::NewerThan { cursor, .. }
2083                | Pagination::OlderThan { cursor, .. }),
2084            ) => {
2085                params.push((":cursor", Box::new(cursor)));
2086                write!(sql, " AND r.id {rel} :cursor", rel = pagination.rel()).unwrap();
2087                Some(pagination.length())
2088            }
2089            None => None,
2090        };
2091        sql.push_str(" ORDER BY id");
2092        if pagination.as_ref().is_some_and(Pagination::is_desc) {
2093            sql.push_str(" DESC");
2094        }
2095        if let Some(limit) = limit {
2096            write!(sql, " LIMIT {limit}").unwrap();
2097        }
2098        params.push((":execution_id", Box::new(execution_id.to_string())));
2099        tx.prepare(&sql)
2100            .map_err(convert_err)?
2101            .query_map::<_, &[(&'static str, &dyn ToSql)], _>(
2102                params
2103                    .iter()
2104                    .map(|(key, value)| (*key, value.as_ref()))
2105                    .collect::<Vec<_>>()
2106                    .as_ref(),
2107                Self::parse_response_with_cursor,
2108            )
2109            .map_err(convert_err)?
2110            .collect::<Result<Result<Vec<_>, DbError>, rusqlite::Error>>()
2111            .map_err(convert_err)?
2112    }
2113
2114    fn parse_response_with_cursor(
2115        row: &rusqlite::Row<'_>,
2116    ) -> Result<Result<ResponseWithCursor, DbError>, rusqlite::Error> {
2117        let id = row.get("id")?;
2118        let created_at: DateTime<Utc> = row.get("created_at")?;
2119        let join_set_id = row.get::<_, JoinSetId>("join_set_id")?;
2120        let inner_res = match (
2121            row.get::<_, Option<DelayId>>("delay_id")?,
2122            row.get::<_, Option<ExecutionIdDerived>>("child_execution_id")?,
2123            row.get::<_, Option<VersionType>>("finished_version")?,
2124            row.get::<_, Option<JsonWrapper<ExecutionEventInner>>>("json_value")?,
2125        ) {
2126            (Some(delay_id), None, None, None) => Ok(JoinSetResponse::DelayFinished {
2127                delay_id,
2128            }),
2129            (None, Some(child_execution_id), Some(finished_version), Some(result)) => {
2130                match result.0 {
2131                    ExecutionEventInner::Finished { result, .. } => {
2132                        Ok(JoinSetResponse::ChildExecutionFinished {
2133                            child_execution_id,
2134                            finished_version: Version(finished_version),
2135                            result,
2136                        })
2137                    }
2138                    _ => Err(DbError::Specific(SpecificError::ConsistencyError(
2139                        StrVariant::from(format!("invalid row t_join_set_response.{id} - Finished event not found at finished_version")),
2140                    ))),
2141                }
2142            }
2143            (delay, child, finished, result) => {
2144                error!("Invalid row in t_join_set_response {id} - {:?} {child:?} {finished:?} {:?}", delay, result.map(|it| it.0));
2145                Err(DbError::Specific(SpecificError::ConsistencyError(
2146                StrVariant::Static("invalid row in t_join_set_response"),
2147            )))},
2148        }
2149        .map(|event| ResponseWithCursor {
2150            cursor: id,
2151            event: JoinSetResponseEventOuter {
2152                event: JoinSetResponseEvent { join_set_id, event },
2153                created_at,
2154            },
2155        });
2156        Ok(inner_res)
2157    }
2158
2159    fn nth_response(
2160        tx: &Transaction,
2161        execution_id: &ExecutionId,
2162        join_set_id: &JoinSetId,
2163        skip_rows: u64,
2164    ) -> Result<Option<JoinSetResponseEventOuter>, DbError> {
2165        // TODO: Add test
2166        Ok(tx
2167            .prepare(
2168                "SELECT r.id, r.created_at, r.join_set_id, \
2169                    r.delay_id, \
2170                    r.child_execution_id, r.finished_version, l.json_value \
2171                    FROM t_join_set_response r LEFT OUTER JOIN t_execution_log l ON r.child_execution_id = l.execution_id \
2172                    WHERE \
2173                    r.execution_id = :execution_id AND r.join_set_id = :join_set_id AND \
2174                    (
2175                    r.finished_version = l.version \
2176                    OR \
2177                    r.child_execution_id IS NULL \
2178                    ) \
2179                    ORDER BY id \
2180                    LIMIT 1 OFFSET :offset",
2181            )
2182            .map_err(convert_err)?
2183            .query_row(
2184                named_params! {
2185                    ":execution_id": execution_id.to_string(),
2186                    ":join_set_id": join_set_id.to_string(),
2187                    ":offset": skip_rows,
2188                },
2189                Self::parse_response_with_cursor,
2190            )
2191            .optional()
2192            .map_err(convert_err)?
2193            .transpose()?
2194            .map(|resp| resp.event))
2195    }
2196
2197    // TODO(perf): Instead of OFFSET an per-execution sequential ID could improve the read performance.
2198    #[instrument(level = Level::TRACE, skip_all)]
2199    fn get_responses_with_offset(
2200        tx: &Transaction,
2201        execution_id: &ExecutionId,
2202        skip_rows: usize,
2203    ) -> Result<Vec<JoinSetResponseEventOuter>, DbError> {
2204        // TODO: Add test
2205        tx.prepare(
2206            "SELECT r.id, r.created_at, r.join_set_id, \
2207            r.delay_id, \
2208            r.child_execution_id, r.finished_version, l.json_value \
2209            FROM t_join_set_response r LEFT OUTER JOIN t_execution_log l ON r.child_execution_id = l.execution_id \
2210            WHERE \
2211            r.execution_id = :execution_id AND \
2212            ( \
2213            r.finished_version = l.version \
2214            OR r.child_execution_id IS NULL \
2215            ) \
2216            ORDER BY id \
2217            LIMIT -1 OFFSET :offset",
2218        )
2219        .map_err(convert_err)?
2220        .query_map(
2221            named_params! {
2222                ":execution_id": execution_id.to_string(),
2223                ":offset": skip_rows,
2224            },
2225            Self::parse_response_with_cursor,
2226        )
2227        .map_err(convert_err)?
2228        .collect::<Result<Result<Vec<_>, DbError>, _>>()
2229        .map_err(convert_err)?
2230        .map(|resp| resp.into_iter().map(|vec| vec.event).collect())
2231    }
2232
2233    fn get_pending(
2234        conn: &Connection,
2235        batch_size: usize,
2236        pending_at_or_sooner: DateTime<Utc>,
2237        ffqns: &[FunctionFqn],
2238    ) -> Result<Vec<(ExecutionId, Version)>, DbError> {
2239        let mut execution_ids_versions = Vec::with_capacity(batch_size);
2240
2241        for ffqn in ffqns {
2242            // Select executions in PendingAt.
2243            let mut stmt = conn
2244                .prepare(&format!(
2245                    "SELECT execution_id, next_version FROM t_state WHERE \
2246                            state = \"{STATE_PENDING_AT}\" AND \
2247                            pending_expires_finished <= :pending_expires_finished AND ffqn = :ffqn \
2248                            ORDER BY pending_expires_finished LIMIT :batch_size"
2249                ))
2250                .map_err(convert_err)?;
2251            let execs_and_versions = stmt
2252                .query_map(
2253                    named_params! {
2254                        ":pending_expires_finished": pending_at_or_sooner,
2255                        ":ffqn": ffqn.to_string(),
2256                        ":batch_size": batch_size - execution_ids_versions.len(),
2257                    },
2258                    |row| {
2259                        let execution_id =
2260                            row.get::<_, String>("execution_id")?.parse::<ExecutionId>();
2261                        let version = Version::new(row.get::<_, VersionType>("next_version")?);
2262                        Ok(execution_id.map(|e| (e, version)))
2263                    },
2264                )
2265                .map_err(convert_err)?
2266                .collect::<Result<Vec<_>, _>>()
2267                .map_err(convert_err)?
2268                .into_iter()
2269                .collect::<Result<Vec<_>, _>>()
2270                .map_err(parsing_err)?;
2271            execution_ids_versions.extend(execs_and_versions);
2272            if execution_ids_versions.len() == batch_size {
2273                // Prioritieze lowering of db requests, although ffqns later in the list might get starved.
2274                break;
2275            }
2276        }
2277        Ok(execution_ids_versions)
2278    }
2279
2280    #[instrument(level = Level::TRACE, skip_all)]
2281    fn notify_pending(&self, pending_at: PendingAt, current_time: DateTime<Utc>) {
2282        Self::notify_pending_locked(
2283            pending_at,
2284            current_time,
2285            &self.0.ffqn_to_pending_subscription.lock().unwrap(),
2286        );
2287    }
2288
2289    #[instrument(level = Level::TRACE, skip_all)]
2290    fn notify_pending_all(
2291        &self,
2292        pending_ats: impl Iterator<Item = PendingAt>,
2293        current_time: DateTime<Utc>,
2294    ) {
2295        let ffqn_to_pending_subscription = self.0.ffqn_to_pending_subscription.lock().unwrap();
2296        for pending_at in pending_ats {
2297            Self::notify_pending_locked(pending_at, current_time, &ffqn_to_pending_subscription);
2298        }
2299    }
2300
2301    fn notify_pending_locked(
2302        pending_at: PendingAt,
2303        current_time: DateTime<Utc>,
2304        ffqn_to_pending_subscription: &std::sync::MutexGuard<
2305            hashbrown::HashMap<FunctionFqn, mpsc::Sender<()>>,
2306        >,
2307    ) {
2308        match pending_at {
2309            PendingAt { scheduled_at, ffqn } if scheduled_at <= current_time => {
2310                if let Some(subscription) = ffqn_to_pending_subscription.get(&ffqn) {
2311                    debug!("Notifying pending subscriber");
2312                    let _ = subscription.try_send(());
2313                }
2314            }
2315            _ => {}
2316        }
2317    }
2318
2319    #[instrument(level = Level::TRACE, skip_all)]
2320    fn append_batch_create_new_execution_inner(
2321        tx: &mut rusqlite::Transaction,
2322        batch: Vec<AppendRequest>,
2323        execution_id: &ExecutionId,
2324        mut version: Version,
2325        child_req: Vec<CreateRequest>,
2326    ) -> Result<(Version, Vec<PendingAt>), DbError> {
2327        let mut pending_at = None;
2328        for append_request in batch {
2329            (version, pending_at) = Self::append(tx, execution_id, &append_request, &version)?;
2330        }
2331        let mut pending_ats = Vec::new();
2332        if let Some(pending_at) = pending_at {
2333            pending_ats.push(pending_at);
2334        }
2335        for child_req in child_req {
2336            let (_, pending_at) = Self::create_inner(tx, child_req)?;
2337            pending_ats.push(pending_at);
2338        }
2339        Ok((version, pending_ats))
2340    }
2341}
2342
2343enum IndexAction {
2344    PendingStateChanged(PendingState),
2345    NoPendingStateChange(Option<DelayReq>),
2346}
2347
2348#[async_trait]
2349impl<S: Sleep> DbConnection for SqlitePool<S> {
2350    #[instrument(level = Level::DEBUG, skip_all, fields(execution_id = %req.execution_id))]
2351    async fn create(&self, req: CreateRequest) -> Result<AppendResponse, DbError> {
2352        debug!("create");
2353        trace!(?req, "create");
2354        let created_at = req.created_at;
2355        let (version, pending_at) = self
2356            .transaction_write(move |tx| Self::create_inner(tx, req), "create")
2357            .await?;
2358        self.notify_pending(pending_at, created_at);
2359        Ok(version)
2360    }
2361
2362    #[instrument(level = Level::TRACE, skip(self))]
2363    async fn lock_pending(
2364        &self,
2365        batch_size: usize,
2366        pending_at_or_sooner: DateTime<Utc>,
2367        ffqns: Arc<[FunctionFqn]>,
2368        created_at: DateTime<Utc>,
2369        component_id: ComponentId,
2370        executor_id: ExecutorId,
2371        lock_expires_at: DateTime<Utc>,
2372        run_id: RunId,
2373    ) -> Result<LockPendingResponse, DbError> {
2374        let execution_ids_versions = self
2375            .conn_low_prio(
2376                move |conn| Self::get_pending(conn, batch_size, pending_at_or_sooner, &ffqns),
2377                "get_pending",
2378            )
2379            .await?;
2380        if execution_ids_versions.is_empty() {
2381            Ok(vec![])
2382        } else {
2383            debug!("Locking {execution_ids_versions:?}");
2384            self.transaction_write(
2385                move |tx| {
2386                    let mut locked_execs = Vec::with_capacity(execution_ids_versions.len());
2387                    // Append lock
2388                    for (execution_id, version) in execution_ids_versions {
2389                        match Self::lock_inner(
2390                            tx,
2391                            created_at,
2392                            component_id.clone(),
2393                            &execution_id,
2394                            run_id,
2395                            &version,
2396                            executor_id,
2397                            lock_expires_at,
2398                        ) {
2399                            Ok(locked) => locked_execs.push(locked),
2400                            Err(err) => {
2401                                warn!("Locking row {execution_id} failed - {err:?}");
2402                            }
2403                        }
2404                    }
2405                    Ok(locked_execs)
2406                },
2407                "lock_pending",
2408            )
2409            .await
2410        }
2411    }
2412
2413    /// Specialized `append` which returns the event history.
2414    #[instrument(level = Level::DEBUG, skip(self))]
2415    async fn lock(
2416        &self,
2417        created_at: DateTime<Utc>,
2418        component_id: ComponentId,
2419        execution_id: &ExecutionId,
2420        run_id: RunId,
2421        version: Version,
2422        executor_id: ExecutorId,
2423        lock_expires_at: DateTime<Utc>,
2424    ) -> Result<LockResponse, DbError> {
2425        debug!("lock");
2426        let execution_id = execution_id.clone();
2427        self.transaction_write(
2428            move |tx| {
2429                let locked = Self::lock_inner(
2430                    tx,
2431                    created_at,
2432                    component_id,
2433                    &execution_id,
2434                    run_id,
2435                    &version,
2436                    executor_id,
2437                    lock_expires_at,
2438                )?;
2439                Ok((locked.event_history, locked.version))
2440            },
2441            "lock_inner",
2442        )
2443        .await
2444    }
2445
2446    #[instrument(level = Level::DEBUG, skip(self, req))]
2447    async fn append(
2448        &self,
2449        execution_id: ExecutionId,
2450        version: Version,
2451        req: AppendRequest,
2452    ) -> Result<AppendResponse, DbError> {
2453        debug!(%req, "append");
2454        trace!(?req, "append");
2455        // Disallow `Created` event
2456        let created_at = req.created_at;
2457        if let ExecutionEventInner::Created { .. } = req.event {
2458            panic!("Cannot append `Created` event - use `create` instead");
2459        }
2460        let (version, pending_at) = self
2461            .transaction_write(
2462                move |tx| Self::append(tx, &execution_id, &req, &version),
2463                "append",
2464            )
2465            .await?;
2466        if let Some(pending_at) = pending_at {
2467            self.notify_pending(pending_at, created_at);
2468        }
2469        Ok(version)
2470    }
2471
2472    #[instrument(level = Level::DEBUG, skip(self, batch))]
2473    async fn append_batch(
2474        &self,
2475        current_time: DateTime<Utc>,
2476        batch: Vec<AppendRequest>,
2477        execution_id: ExecutionId,
2478        version: Version,
2479    ) -> Result<AppendBatchResponse, DbError> {
2480        debug!("append_batch");
2481        trace!(?batch, "append_batch");
2482        assert!(!batch.is_empty(), "Empty batch request");
2483        if batch.iter().any(|append_request| {
2484            matches!(append_request.event, ExecutionEventInner::Created { .. })
2485        }) {
2486            panic!("Cannot append `Created` event - use `create` instead");
2487        }
2488        let (version, pending_at) = self
2489            .transaction_write(
2490                move |tx| {
2491                    let mut version = version;
2492                    let mut pending_at = None;
2493                    for append_request in batch {
2494                        (version, pending_at) =
2495                            Self::append(tx, &execution_id, &append_request, &version)?;
2496                    }
2497                    Ok((version, pending_at))
2498                },
2499                "append_batch",
2500            )
2501            .await?;
2502        if let Some(pending_at) = pending_at {
2503            self.notify_pending(pending_at, current_time);
2504        }
2505        Ok(version)
2506    }
2507
2508    #[instrument(level = Level::DEBUG, skip(self, batch, child_req))]
2509    async fn append_batch_create_new_execution(
2510        &self,
2511        current_time: DateTime<Utc>,
2512        batch: Vec<AppendRequest>,
2513        execution_id: ExecutionId,
2514        version: Version,
2515        child_req: Vec<CreateRequest>,
2516    ) -> Result<AppendBatchResponse, DbError> {
2517        debug!("append_batch_create_new_execution");
2518        trace!(?batch, ?child_req, "append_batch_create_new_execution");
2519        assert!(!batch.is_empty(), "Empty batch request");
2520        if batch.iter().any(|append_request| {
2521            matches!(append_request.event, ExecutionEventInner::Created { .. })
2522        }) {
2523            panic!("Cannot append `Created` event - use `create` instead");
2524        }
2525
2526        let (version, pending_ats) = self
2527            .transaction_write(
2528                move |tx| {
2529                    Self::append_batch_create_new_execution_inner(
2530                        tx,
2531                        batch,
2532                        &execution_id,
2533                        version,
2534                        child_req,
2535                    )
2536                },
2537                "append_batch_create_new_execution_inner",
2538            )
2539            .await?;
2540        self.notify_pending_all(pending_ats.into_iter(), current_time);
2541        Ok(version)
2542    }
2543
2544    #[instrument(level = Level::DEBUG, skip(self, batch, parent_response_event))]
2545    async fn append_batch_respond_to_parent(
2546        &self,
2547        execution_id: ExecutionIdDerived,
2548        current_time: DateTime<Utc>,
2549        batch: Vec<AppendRequest>,
2550        version: Version,
2551        parent_execution_id: ExecutionId,
2552        parent_response_event: JoinSetResponseEventOuter,
2553    ) -> Result<AppendBatchResponse, DbError> {
2554        debug!("append_batch_respond_to_parent");
2555        let execution_id = ExecutionId::Derived(execution_id);
2556        if execution_id == parent_execution_id {
2557            // Pending state would be wrong.
2558            // This is not a panic because it depends on DB state.
2559            return Err(DbError::Specific(SpecificError::ValidationFailed(
2560                StrVariant::Static(
2561                    "Parameters `execution_id` and `parent_execution_id` cannot be the same",
2562                ),
2563            )));
2564        }
2565        trace!(
2566            ?batch,
2567            ?parent_response_event,
2568            "append_batch_respond_to_parent"
2569        );
2570        assert!(!batch.is_empty(), "Empty batch request");
2571        if batch.iter().any(|append_request| {
2572            matches!(append_request.event, ExecutionEventInner::Created { .. })
2573        }) {
2574            panic!("Cannot append `Created` event - use `create` instead");
2575        }
2576        let response_subscribers = self.0.response_subscribers.clone();
2577        let (version, response_subscriber, pending_ats) = {
2578            let event = parent_response_event.clone();
2579            self.transaction_write(
2580                move |tx| {
2581                    let mut version = version;
2582                    let mut pending_at_child = None;
2583                    for append_request in batch {
2584                        (version, pending_at_child) =
2585                            Self::append(tx, &execution_id, &append_request, &version)?;
2586                    }
2587
2588                    let (response_subscriber, pending_at_parent) = Self::append_response(
2589                        tx,
2590                        &parent_execution_id,
2591                        &event,
2592                        &response_subscribers,
2593                    )?;
2594                    Ok((
2595                        version,
2596                        response_subscriber,
2597                        vec![pending_at_child, pending_at_parent],
2598                    ))
2599                },
2600                "append_batch_respond_to_parent",
2601            )
2602            .await?
2603        };
2604        if let Some(response_subscriber) = response_subscriber {
2605            let notified = response_subscriber.send(parent_response_event);
2606            debug!("Notifying response subscriber: {notified:?}");
2607        }
2608        self.notify_pending_all(pending_ats.into_iter().flatten(), current_time);
2609        Ok(version)
2610    }
2611
2612    #[cfg(feature = "test")]
2613    #[instrument(level = Level::DEBUG, skip(self))]
2614    async fn get(
2615        &self,
2616        execution_id: &ExecutionId,
2617    ) -> Result<concepts::storage::ExecutionLog, DbError> {
2618        trace!("get");
2619        let execution_id = execution_id.clone();
2620        self.transaction_read(move |tx| Self::get(tx, &execution_id), "get")
2621            .await
2622    }
2623
2624    #[instrument(level = Level::DEBUG, skip(self))]
2625    async fn list_execution_events(
2626        &self,
2627        execution_id: &ExecutionId,
2628        since: &Version,
2629        max_length: VersionType,
2630        include_backtrace_id: bool,
2631    ) -> Result<Vec<ExecutionEvent>, DbError> {
2632        let execution_id = execution_id.clone();
2633        let since = since.0;
2634        self.transaction_read(
2635            move |tx| {
2636                Self::list_execution_events(
2637                    tx,
2638                    &execution_id,
2639                    since,
2640                    since + max_length,
2641                    include_backtrace_id,
2642                )
2643            },
2644            "get",
2645        )
2646        .await
2647    }
2648
2649    #[instrument(level = Level::DEBUG, skip(self, interrupt_after))]
2650    async fn subscribe_to_next_responses(
2651        &self,
2652        execution_id: &ExecutionId,
2653        start_idx: usize,
2654        interrupt_after: Pin<Box<dyn Future<Output = ()> + Send>>,
2655    ) -> Result<Vec<JoinSetResponseEventOuter>, SubscribeError> {
2656        debug!("next_responses");
2657        let execution_id = execution_id.clone();
2658        let response_subscribers = self.0.response_subscribers.clone();
2659        let resp_or_receiver = self
2660            .transaction_write(
2661                move |tx| {
2662                    let responses = Self::get_responses_with_offset(tx, &execution_id, start_idx)?;
2663                    if responses.is_empty() {
2664                        // cannot race as we have the transaction write lock
2665                        let (sender, receiver) = oneshot::channel();
2666                        response_subscribers
2667                            .lock()
2668                            .unwrap()
2669                            .insert(execution_id, sender);
2670                        Ok(itertools::Either::Right(receiver))
2671                    } else {
2672                        Ok(itertools::Either::Left(responses))
2673                    }
2674                },
2675                "subscribe_to_next_responses",
2676            )
2677            .await
2678            .map_err(SubscribeError::DbError)?;
2679        match resp_or_receiver {
2680            itertools::Either::Left(resp) => Ok(resp),
2681            itertools::Either::Right(receiver) => {
2682                tokio::select! {
2683                    resp = receiver => resp.map(|resp| vec![resp]).map_err(|_| SubscribeError::DbError(DbError::Connection(DbConnectionError::RecvError))),
2684                    () = interrupt_after => Err(SubscribeError::Interrupted),
2685                }
2686            }
2687        }
2688    }
2689
2690    #[instrument(level = Level::DEBUG, skip(self, response_event), fields(join_set_id = %response_event.join_set_id))]
2691    async fn append_response(
2692        &self,
2693        created_at: DateTime<Utc>,
2694        execution_id: ExecutionId,
2695        response_event: JoinSetResponseEvent,
2696    ) -> Result<(), DbError> {
2697        debug!("append_response");
2698        let response_subscribers = self.0.response_subscribers.clone();
2699        let event = JoinSetResponseEventOuter {
2700            created_at,
2701            event: response_event,
2702        };
2703        let (response_subscriber, pending_at) = {
2704            let event = event.clone();
2705            self.transaction_write(
2706                move |tx| Self::append_response(tx, &execution_id, &event, &response_subscribers),
2707                "append_response",
2708            )
2709            .await?
2710        };
2711        if let Some(response_subscriber) = response_subscriber {
2712            debug!("Notifying response subscriber");
2713            let _ = response_subscriber.send(event);
2714        }
2715        if let Some(pending_at) = pending_at {
2716            self.notify_pending(pending_at, created_at);
2717        }
2718        Ok(())
2719    }
2720
2721    #[instrument(level = Level::DEBUG, skip_all)]
2722    async fn append_backtrace(&self, append: BacktraceInfo) -> Result<(), DbError> {
2723        debug!("append_backtrace");
2724        self.transaction_write(
2725            move |tx| Self::append_backtrace(tx, &append),
2726            "append_backtrace",
2727        )
2728        .await
2729    }
2730
2731    #[instrument(level = Level::DEBUG, skip_all)]
2732    async fn append_backtrace_batch(&self, batch: Vec<BacktraceInfo>) -> Result<(), DbError> {
2733        debug!("append_backtrace_batch");
2734        self.transaction_write(
2735            move |tx| {
2736                for append in batch {
2737                    Self::append_backtrace(tx, &append)?;
2738                }
2739                Ok(())
2740            },
2741            "append_backtrace",
2742        )
2743        .await
2744    }
2745
2746    #[instrument(level = Level::DEBUG, skip_all)]
2747    async fn get_backtrace(
2748        &self,
2749        execution_id: &ExecutionId,
2750        filter: BacktraceFilter,
2751    ) -> Result<BacktraceInfo, DbError> {
2752        debug!("get_last_backtrace");
2753        let execution_id = execution_id.clone();
2754
2755        self.transaction_read(
2756            move |tx| {
2757                let select = "SELECT component_id, version_min_including, version_max_excluding, wasm_backtrace FROM t_backtrace \
2758                                WHERE execution_id = :execution_id";
2759                let mut params: Vec<(&'static str, Box<dyn rusqlite::ToSql>)> = vec![(":execution_id", Box::new(execution_id.to_string()))];
2760                let select = match filter {
2761                    BacktraceFilter::Specific(version) =>{
2762                        params.push((":version", Box::new(version.0)));
2763                        format!("{select} AND version_min_including <= :version AND version_max_excluding > :version")
2764                    },
2765                    BacktraceFilter::First => format!("{select} ORDER BY version_min_including LIMIT 1"),
2766                    BacktraceFilter::Last => format!("{select} ORDER BY version_min_including DESC LIMIT 1")
2767               };
2768                let mut stmt = tx
2769                    .prepare(
2770                         &select
2771                    )
2772                    .map_err(convert_err)?;
2773
2774                stmt.query_row::<_, &[(&'static str, &dyn ToSql)], _>(
2775                        params
2776                            .iter()
2777                            .map(|(key, value)| (*key, value.as_ref()))
2778                            .collect::<Vec<_>>()
2779                            .as_ref(),
2780                    |row| {
2781                        Ok(BacktraceInfo {
2782                            execution_id: execution_id.clone(),
2783                            component_id: row.get::<_, FromStrWrapper<_> >("component_id")?.0,
2784                            version_min_including: Version::new(row.get::<_, VersionType>("version_min_including")?),
2785                            version_max_excluding: Version::new(row.get::<_, VersionType>("version_max_excluding")?),
2786                            wasm_backtrace: row.get::<_, JsonWrapper<_>>("wasm_backtrace")?.0,
2787                        })
2788                    },
2789                )
2790                .map_err(convert_err)
2791            },
2792            "get_last_backtrace",
2793        ).await
2794    }
2795
2796    /// Get currently expired delays and locks.
2797    #[instrument(level = Level::TRACE, skip(self))]
2798    async fn get_expired_timers(&self, at: DateTime<Utc>) -> Result<Vec<ExpiredTimer>, DbError> {
2799        self.conn_low_prio(
2800            move |conn| {
2801                let mut expired_timers = conn.prepare(
2802                    "SELECT execution_id, join_set_id, delay_id FROM t_delay WHERE expires_at <= :at",
2803                ).map_err(convert_err)?.query_map(
2804                        named_params! {
2805                            ":at": at,
2806                        },
2807                        |row| {
2808                            let execution_id = row.get("execution_id")?;
2809                            let join_set_id = row.get::<_, JoinSetId>("join_set_id")?;
2810                            let delay_id = row.get::<_, DelayId>("delay_id")?;
2811                            Ok(ExpiredTimer::Delay { execution_id, join_set_id, delay_id })
2812                        },
2813                    ).map_err(convert_err)?
2814                    .collect::<Result<Vec<_>, _>>().map_err(convert_err)?;
2815                    // Extend with expired locks
2816                    expired_timers.extend(conn.prepare(&format!(
2817                        "SELECT execution_id, next_version, temporary_event_count, max_retries, retry_exp_backoff_millis, parent_execution_id, parent_join_set_id, locked_at_version FROM t_state \
2818                        WHERE pending_expires_finished <= :at AND state = \"{STATE_LOCKED}\"")
2819                    ).map_err(convert_err)?
2820                    .query_map(
2821                            named_params! {
2822                                ":at": at,
2823                            },
2824                            |row| {
2825                                let execution_id = row.get("execution_id")?;
2826                                let version = Version::new(row.get::<_, VersionType>("next_version")?);
2827                                let locked_at_version = Version::new(row.get::<_, VersionType>("locked_at_version")?);
2828
2829                                let temporary_event_count = row.get::<_, u32>("temporary_event_count")?;
2830                                let max_retries = row.get::<_, u32>("max_retries")?;
2831                                let retry_exp_backoff = Duration::from_millis(row.get::<_, u64>("retry_exp_backoff_millis")?);
2832                                let parent_execution_id = row.get::<_, Option<ExecutionId>>("parent_execution_id")?;
2833                                let parent_join_set_id = row.get::<_, Option<JoinSetId>>("parent_join_set_id")?;
2834
2835                                Ok(ExpiredTimer::Lock { execution_id, locked_at_version, version, temporary_event_count, max_retries,
2836                                    retry_exp_backoff, parent: parent_execution_id.and_then(|pexe| parent_join_set_id.map(|pjs| (pexe, pjs)))})
2837                            },
2838                        ).map_err(convert_err)?
2839                        .collect::<Result<Vec<_>, _>>().map_err(convert_err)?
2840                    );
2841
2842                if !expired_timers.is_empty() {
2843                    debug!("get_expired_timers found {expired_timers:?}");
2844                }
2845                Ok(expired_timers)
2846            }, "get_expired_timers"
2847        )
2848        .await
2849    }
2850
2851    #[instrument(level = Level::TRACE, skip(self))]
2852    async fn wait_for_pending(
2853        &self,
2854        pending_at_or_sooner: DateTime<Utc>,
2855        ffqns: Arc<[FunctionFqn]>,
2856        max_wait: Duration,
2857    ) {
2858        let sleep_fut = self.0.sleep.sleep(max_wait);
2859        let (sender, mut receiver) = mpsc::channel(1);
2860        {
2861            let mut ffqn_to_pending_subscription =
2862                self.0.ffqn_to_pending_subscription.lock().unwrap();
2863            for ffqn in ffqns.as_ref() {
2864                ffqn_to_pending_subscription.insert(ffqn.clone(), sender.clone());
2865            }
2866        }
2867        let execution_ids_versions = match self
2868            .conn_low_prio(
2869                {
2870                    let ffqns = ffqns.clone();
2871                    move |conn| Self::get_pending(conn, 1, pending_at_or_sooner, ffqns.as_ref())
2872                },
2873                "subscribe_to_pending",
2874            )
2875            .await
2876        {
2877            Ok(ok) => ok,
2878            Err(err) => {
2879                trace!("Ignoring error and waiting in for timeout - {err:?}");
2880                sleep_fut.await;
2881                return;
2882            }
2883        };
2884        if !execution_ids_versions.is_empty() {
2885            trace!("Not waiting, database already contains new pending executions");
2886            return;
2887        }
2888        tokio::select! { // future's liveness: Dropping the loser immediately.
2889            _ = receiver.recv() => {
2890                trace!("Received a notification");
2891            }
2892            () = sleep_fut => {
2893            }
2894        }
2895    }
2896
2897    async fn wait_for_finished_result(
2898        &self,
2899        execution_id: &ExecutionId,
2900        timeout: Option<Duration>,
2901    ) -> Result<SupportedFunctionReturnValue, ClientError> {
2902        let execution_result = {
2903            let fut = async move {
2904                loop {
2905                    let execution_id = execution_id.clone();
2906                    if let Some(execution_result) = self
2907                        .transaction_read(move |tx| {
2908                            let pending_state =
2909                                Self::get_combined_state(tx, &execution_id)?.pending_state;
2910                            if let PendingState::Finished { finished } = pending_state {
2911                                let event =
2912                                    Self::get_execution_event(tx, &execution_id, finished.version)?;
2913                                if let ExecutionEventInner::Finished { result, ..} = event.event {
2914                                    Ok(Some(result))
2915                                } else {
2916                                    error!("Mismatch, expected Finished row: {event:?} based on t_state {finished}");
2917                                    Err(DbError::Specific(SpecificError::ConsistencyError(
2918                                        StrVariant::Static(
2919                                            "cannot get finished event based on t_state version",
2920                                        ),
2921                                    )))
2922                                }
2923                            } else {
2924                                Ok(None)
2925                            }
2926                        }, "wait_for_finished_result")
2927                        .await?
2928                    {
2929                        return Ok(execution_result);
2930                    }
2931                    // FIXME: change to subscription based approach
2932                    tokio::time::sleep(Duration::from_millis(100)).await;
2933                }
2934            };
2935
2936            if let Some(timeout) = timeout {
2937                tokio::select! { // future's liveness: Dropping the loser immediately.
2938                    res = fut => res,
2939                    () = tokio::time::sleep(timeout) => Err(ClientError::Timeout)
2940                }
2941            } else {
2942                fut.await
2943            }
2944        }?;
2945        Ok(execution_result)
2946    }
2947
2948    async fn get_execution_event(
2949        &self,
2950        execution_id: &ExecutionId,
2951        version: &Version,
2952    ) -> Result<ExecutionEvent, DbError> {
2953        let version = version.0;
2954        let execution_id = execution_id.clone();
2955        self.transaction_read(
2956            move |tx| Self::get_execution_event(tx, &execution_id, version),
2957            "get_execution_event",
2958        )
2959        .await
2960    }
2961
2962    async fn get_pending_state(&self, execution_id: &ExecutionId) -> Result<PendingState, DbError> {
2963        let execution_id = execution_id.clone();
2964        Ok(self
2965            .transaction_read(
2966                move |tx| Self::get_combined_state(tx, &execution_id),
2967                "get_pending_state",
2968            )
2969            .await?
2970            .pending_state)
2971    }
2972
2973    async fn list_executions(
2974        &self,
2975        ffqn: Option<FunctionFqn>,
2976        top_level_only: bool,
2977        pagination: ExecutionListPagination,
2978    ) -> Result<Vec<ExecutionWithState>, DbError> {
2979        Ok(self
2980            .transaction_read(
2981                move |tx| Self::list_executions(tx, ffqn, top_level_only, pagination),
2982                "list_executions",
2983            )
2984            .await?)
2985    }
2986
2987    async fn list_responses(
2988        &self,
2989        execution_id: &ExecutionId,
2990        pagination: Pagination<u32>,
2991    ) -> Result<Vec<ResponseWithCursor>, DbError> {
2992        let execution_id = execution_id.clone();
2993        Ok(self
2994            .transaction_read(
2995                move |tx| Self::list_responses(tx, &execution_id, Some(pagination)),
2996                "list_executions",
2997            )
2998            .await?)
2999    }
3000}
3001
3002#[cfg(any(test, feature = "tempfile"))]
3003pub mod tempfile {
3004    use super::{SqliteConfig, SqlitePool};
3005    use concepts::time::TokioSleep;
3006    use tempfile::NamedTempFile;
3007
3008    pub async fn sqlite_pool() -> (SqlitePool<TokioSleep>, Option<NamedTempFile>) {
3009        if let Ok(path) = std::env::var("SQLITE_FILE") {
3010            (
3011                SqlitePool::new(path, SqliteConfig::default(), TokioSleep)
3012                    .await
3013                    .unwrap(),
3014                None,
3015            )
3016        } else {
3017            let file = NamedTempFile::new().unwrap();
3018            let path = file.path();
3019            (
3020                SqlitePool::new(path, SqliteConfig::default(), TokioSleep)
3021                    .await
3022                    .unwrap(),
3023                Some(file),
3024            )
3025        }
3026    }
3027}