obeli_sk_db_sqlite/
sqlite_dao.rs

1use crate::histograms::Histograms;
2use async_trait::async_trait;
3use chrono::{DateTime, Utc};
4use concepts::{
5    ComponentId, ComponentRetryConfig, ExecutionId, FunctionFqn, JoinSetId, StrVariant,
6    SupportedFunctionReturnValue,
7    component_id::InputContentDigest,
8    prefixed_ulid::{DelayId, ExecutionIdDerived, ExecutorId, RunId},
9    storage::{
10        AppendBatchResponse, AppendDelayResponseOutcome, AppendEventsToExecution, AppendRequest,
11        AppendResponse, AppendResponseToExecution, BacktraceFilter, BacktraceInfo, CreateRequest,
12        DUMMY_CREATED, DUMMY_HISTORY_EVENT, DbConnection, DbErrorGeneric, DbErrorRead,
13        DbErrorReadWithTimeout, DbErrorWrite, DbErrorWriteNonRetriable, DbExecutor, DbPool,
14        DbPoolCloseable, ExecutionEvent, ExecutionListPagination, ExecutionRequest,
15        ExecutionWithState, ExpiredDelay, ExpiredLock, ExpiredTimer, HistoryEvent, JoinSetRequest,
16        JoinSetResponse, JoinSetResponseEvent, JoinSetResponseEventOuter, LockPendingResponse,
17        Locked, LockedBy, LockedExecution, Pagination, PendingState, PendingStateFinished,
18        PendingStateFinishedResultKind, PendingStateLocked, ResponseWithCursor, Version,
19        VersionType,
20    },
21};
22use conversions::{JsonWrapper, consistency_db_err, consistency_rusqlite};
23use hashbrown::HashMap;
24use rusqlite::{
25    CachedStatement, Connection, OpenFlags, OptionalExtension, Params, ToSql, Transaction,
26    TransactionBehavior, named_params, types::ToSqlOutput,
27};
28use std::{
29    cmp::max,
30    collections::VecDeque,
31    fmt::Debug,
32    ops::DerefMut,
33    path::Path,
34    str::FromStr,
35    sync::{
36        Arc, Mutex,
37        atomic::{AtomicBool, Ordering},
38    },
39    time::Duration,
40};
41use std::{fmt::Write as _, pin::Pin};
42use tokio::{
43    sync::{mpsc, oneshot},
44    time::Instant,
45};
46use tracing::{Level, debug, error, info, instrument, trace, warn};
47
48#[derive(Debug, thiserror::Error)]
49#[error("initialization error")]
50pub struct InitializationError;
51
52#[derive(Debug, Clone)]
53struct DelayReq {
54    join_set_id: JoinSetId,
55    delay_id: DelayId,
56    expires_at: DateTime<Utc>,
57}
58/*
59mmap_size = 128MB - Set the global memory map so all processes can share some data
60https://www.sqlite.org/pragma.html#pragma_mmap_size
61https://www.sqlite.org/mmap.html
62
63journal_size_limit = 64 MB - limit on the WAL file to prevent unlimited growth
64https://www.sqlite.org/pragma.html#pragma_journal_size_limit
65
66Inspired by https://github.com/rails/rails/pull/49349
67*/
68
69const PRAGMA: [[&str; 2]; 10] = [
70    ["journal_mode", "wal"],
71    ["synchronous", "FULL"],
72    ["foreign_keys", "true"],
73    ["busy_timeout", "1000"],
74    ["cache_size", "10000"], // number of pages
75    ["temp_store", "MEMORY"],
76    ["page_size", "8192"], // 8 KB
77    ["mmap_size", "134217728"],
78    ["journal_size_limit", "67108864"],
79    ["integrity_check", ""],
80];
81
82// Append only
83const CREATE_TABLE_T_METADATA: &str = r"
84CREATE TABLE IF NOT EXISTS t_metadata (
85    id INTEGER PRIMARY KEY AUTOINCREMENT,
86    schema_version INTEGER NOT NULL,
87    created_at TEXT NOT NULL
88) STRICT
89";
90const T_METADATA_EXPECTED_SCHEMA_VERSION: u32 = 4;
91
92/// Stores execution history. Append only.
93const CREATE_TABLE_T_EXECUTION_LOG: &str = r"
94CREATE TABLE IF NOT EXISTS t_execution_log (
95    execution_id TEXT NOT NULL,
96    created_at TEXT NOT NULL,
97    json_value TEXT NOT NULL,
98    version INTEGER NOT NULL,
99    variant TEXT NOT NULL,
100    join_set_id TEXT,
101    history_event_type TEXT GENERATED ALWAYS AS (json_value->>'$.HistoryEvent.event.type') STORED,
102    PRIMARY KEY (execution_id, version)
103) STRICT
104";
105// Used in `fetch_created` and `get_execution_event`
106const CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_VERSION: &str = r"
107CREATE INDEX IF NOT EXISTS idx_t_execution_log_execution_id_version  ON t_execution_log (execution_id, version);
108";
109// Used in `lock_inner` to filter by execution ID and variant (created or event history)
110const CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_VARIANT: &str = r"
111CREATE INDEX IF NOT EXISTS idx_t_execution_log_execution_id_variant  ON t_execution_log (execution_id, variant);
112";
113
114// Used in `count_join_next`
115const CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_JOIN_SET: &str = const_format::formatcp!(
116    "CREATE INDEX IF NOT EXISTS idx_t_execution_log_execution_id_join_set  ON t_execution_log (execution_id, join_set_id, history_event_type) WHERE history_event_type=\"{}\";",
117    HISTORY_EVENT_TYPE_JOIN_NEXT
118);
119
120/// Stores child execution return values for the parent (`execution_id`). Append only.
121/// For `JoinSetResponse::DelayFinished`, columns `delay_id`,`delay_success` must not not null.
122/// For `JoinSetResponse::ChildExecutionFinished`, columns `child_execution_id`,`finished_version`
123/// must not be null.
124const CREATE_TABLE_T_JOIN_SET_RESPONSE: &str = r"
125CREATE TABLE IF NOT EXISTS t_join_set_response (
126    id INTEGER PRIMARY KEY AUTOINCREMENT,
127    created_at TEXT NOT NULL,
128    execution_id TEXT NOT NULL,
129    join_set_id TEXT NOT NULL,
130
131    delay_id TEXT,
132    delay_success INTEGER,
133
134    child_execution_id TEXT,
135    finished_version INTEGER,
136
137    UNIQUE (execution_id, join_set_id, delay_id, child_execution_id)
138) STRICT
139";
140// Used when querying for the next response
141const CREATE_INDEX_IDX_T_JOIN_SET_RESPONSE_EXECUTION_ID_ID: &str = r"
142CREATE INDEX IF NOT EXISTS idx_t_join_set_response_execution_id_id ON t_join_set_response (execution_id, id);
143";
144// Child execution id must be unique.
145const CREATE_INDEX_IDX_JOIN_SET_RESPONSE_UNIQUE_CHILD_ID: &str = r"
146CREATE UNIQUE INDEX IF NOT EXISTS idx_join_set_response_unique_child_id
147ON t_join_set_response (child_execution_id) WHERE child_execution_id IS NOT NULL;
148";
149// Delay id must be unique.
150const CREATE_INDEX_IDX_JOIN_SET_RESPONSE_UNIQUE_DELAY_ID: &str = r"
151CREATE UNIQUE INDEX IF NOT EXISTS idx_join_set_response_unique_delay_id
152ON t_join_set_response (delay_id) WHERE delay_id IS NOT NULL;
153";
154
155/// Stores executions in `PendingState`
156/// `state` to column mapping:
157/// `PendingAt`:            (nothing but required columns), optionally contains all `Locked` columns.
158/// `Locked`:               `max_retries`, `retry_exp_backoff_millis`, `last_lock_version`, `executor_id`, `run_id`
159/// `BlockedByJoinSet`:     `join_set_id`, `join_set_closing`
160/// `Finished` :            `result_kind`.
161///
162/// `pending_expires_finished` - either pending at, lock expires at or finished at based on state.
163///
164const CREATE_TABLE_T_STATE: &str = r"
165CREATE TABLE IF NOT EXISTS t_state (
166    execution_id TEXT NOT NULL,
167    is_top_level INTEGER NOT NULL,
168    corresponding_version INTEGER NOT NULL,
169    ffqn TEXT NOT NULL,
170    created_at TEXT NOT NULL,
171    component_id_input_digest BLOB NOT NULL,
172    first_scheduled_at TEXT NOT NULL,
173
174    pending_expires_finished TEXT NOT NULL,
175    state TEXT NOT NULL,
176    updated_at TEXT NOT NULL,
177    intermittent_event_count INTEGER NOT NULL,
178
179    max_retries INTEGER,
180    retry_exp_backoff_millis INTEGER,
181    last_lock_version INTEGER,
182    executor_id TEXT,
183    run_id TEXT,
184
185    join_set_id TEXT,
186    join_set_closing INTEGER,
187
188    result_kind TEXT,
189
190    PRIMARY KEY (execution_id)
191) STRICT
192";
193const STATE_PENDING_AT: &str = "PendingAt";
194const STATE_BLOCKED_BY_JOIN_SET: &str = "BlockedByJoinSet";
195const STATE_LOCKED: &str = "Locked";
196const STATE_FINISHED: &str = "Finished";
197const HISTORY_EVENT_TYPE_JOIN_NEXT: &str = "JoinNext";
198
199// TODO: partial indexes
200const IDX_T_STATE_LOCK_PENDING: &str = r"
201CREATE INDEX IF NOT EXISTS idx_t_state_lock_pending ON t_state (state, pending_expires_finished, ffqn);
202";
203const IDX_T_STATE_EXPIRED_TIMERS: &str = r"
204CREATE INDEX IF NOT EXISTS idx_t_state_expired_timers ON t_state (pending_expires_finished) WHERE executor_id IS NOT NULL;
205";
206const IDX_T_STATE_EXECUTION_ID_IS_TOP_LEVEL: &str = r"
207CREATE INDEX IF NOT EXISTS idx_t_state_execution_id_is_root ON t_state (execution_id, is_top_level);
208";
209// For `list_executions` by ffqn
210const IDX_T_STATE_FFQN: &str = r"
211CREATE INDEX IF NOT EXISTS idx_t_state_ffqn ON t_state (ffqn);
212";
213// For `list_executions` by creation date
214const IDX_T_STATE_CREATED_AT: &str = r"
215CREATE INDEX IF NOT EXISTS idx_t_state_created_at ON t_state (created_at);
216";
217
218/// Represents [`ExpiredTimer::AsyncDelay`] . Rows are deleted when the delay is processed.
219const CREATE_TABLE_T_DELAY: &str = r"
220CREATE TABLE IF NOT EXISTS t_delay (
221    execution_id TEXT NOT NULL,
222    join_set_id TEXT NOT NULL,
223    delay_id TEXT NOT NULL,
224    expires_at TEXT NOT NULL,
225    PRIMARY KEY (execution_id, join_set_id, delay_id)
226) STRICT
227";
228
229// Append only.
230const CREATE_TABLE_T_BACKTRACE: &str = r"
231CREATE TABLE IF NOT EXISTS t_backtrace (
232    execution_id TEXT NOT NULL,
233    component_id TEXT NOT NULL,
234    version_min_including INTEGER NOT NULL,
235    version_max_excluding INTEGER NOT NULL,
236    wasm_backtrace TEXT NOT NULL,
237    PRIMARY KEY (execution_id, version_min_including, version_max_excluding)
238) STRICT
239";
240// Index for searching backtraces by execution_id and version
241const IDX_T_BACKTRACE_EXECUTION_ID_VERSION: &str = r"
242CREATE INDEX IF NOT EXISTS idx_t_backtrace_execution_id_version ON t_backtrace (execution_id, version_min_including, version_max_excluding);
243";
244
245#[derive(Debug, thiserror::Error, Clone)]
246enum RusqliteError {
247    #[error("not found")]
248    NotFound,
249    #[error("generic: {0}")]
250    Generic(StrVariant),
251}
252
253mod conversions {
254
255    use super::RusqliteError;
256    use concepts::storage::{DbErrorGeneric, DbErrorRead, DbErrorReadWithTimeout, DbErrorWrite};
257    use rusqlite::{
258        ToSql,
259        types::{FromSql, FromSqlError},
260    };
261    use std::fmt::Debug;
262    use tracing::error;
263
264    impl From<rusqlite::Error> for RusqliteError {
265        fn from(err: rusqlite::Error) -> Self {
266            if matches!(err, rusqlite::Error::QueryReturnedNoRows) {
267                RusqliteError::NotFound
268            } else {
269                error!(backtrace = %std::backtrace::Backtrace::capture(), "Sqlite error {err:?}");
270                RusqliteError::Generic(err.to_string().into())
271            }
272        }
273    }
274
275    impl From<RusqliteError> for DbErrorGeneric {
276        fn from(err: RusqliteError) -> DbErrorGeneric {
277            match err {
278                RusqliteError::NotFound => DbErrorGeneric::Uncategorized("not found".into()),
279                RusqliteError::Generic(str) => DbErrorGeneric::Uncategorized(str),
280            }
281        }
282    }
283    impl From<RusqliteError> for DbErrorRead {
284        fn from(err: RusqliteError) -> Self {
285            if matches!(err, RusqliteError::NotFound) {
286                Self::NotFound
287            } else {
288                Self::from(DbErrorGeneric::from(err))
289            }
290        }
291    }
292    impl From<RusqliteError> for DbErrorReadWithTimeout {
293        fn from(err: RusqliteError) -> Self {
294            Self::from(DbErrorRead::from(err))
295        }
296    }
297    impl From<RusqliteError> for DbErrorWrite {
298        fn from(err: RusqliteError) -> Self {
299            if matches!(err, RusqliteError::NotFound) {
300                Self::NotFound
301            } else {
302                Self::from(DbErrorGeneric::from(err))
303            }
304        }
305    }
306
307    pub(crate) struct JsonWrapper<T>(pub(crate) T);
308    impl<T: serde::de::DeserializeOwned + 'static + Debug> FromSql for JsonWrapper<T> {
309        fn column_result(
310            value: rusqlite::types::ValueRef<'_>,
311        ) -> rusqlite::types::FromSqlResult<Self> {
312            let value = match value {
313                rusqlite::types::ValueRef::Text(value) | rusqlite::types::ValueRef::Blob(value) => {
314                    Ok(value)
315                }
316                other => {
317                    error!(
318                        backtrace = %std::backtrace::Backtrace::capture(),
319                        "Unexpected type when conveting to JSON - expected Text or Blob, got type `{other:?}`",
320                    );
321                    Err(FromSqlError::InvalidType)
322                }
323            }?;
324            let value = serde_json::from_slice::<T>(value).map_err(|err| {
325                error!(
326                    backtrace = %std::backtrace::Backtrace::capture(),
327                    "Cannot convert JSON value `{value:?}` to type:`{type}` - {err:?}",
328                    r#type = std::any::type_name::<T>()
329                );
330                FromSqlError::InvalidType
331            })?;
332            Ok(Self(value))
333        }
334    }
335    impl<T: serde::ser::Serialize + Debug> ToSql for JsonWrapper<T> {
336        fn to_sql(&self) -> rusqlite::Result<rusqlite::types::ToSqlOutput<'_>> {
337            let string = serde_json::to_string(&self.0).map_err(|err| {
338                error!(
339                    "Cannot serialize {value:?} of type `{type}` - {err:?}",
340                    value = self.0,
341                    r#type = std::any::type_name::<T>()
342                );
343                rusqlite::Error::ToSqlConversionFailure(Box::new(err))
344            })?;
345            Ok(rusqlite::types::ToSqlOutput::Owned(
346                rusqlite::types::Value::Text(string),
347            ))
348        }
349    }
350
351    // Used as a wrapper for `FromSqlError::Other`
352    #[derive(Debug, thiserror::Error)]
353    #[error("{0}")]
354    pub(crate) struct OtherError(&'static str);
355    pub(crate) fn consistency_rusqlite(input: &'static str) -> rusqlite::Error {
356        FromSqlError::other(OtherError(input)).into()
357    }
358
359    pub(crate) fn consistency_db_err(input: &'static str) -> DbErrorGeneric {
360        DbErrorGeneric::Uncategorized(input.into())
361    }
362}
363
364#[derive(Debug)]
365struct CombinedStateDTO {
366    state: String,
367    ffqn: String,
368    component_id_input_digest: InputContentDigest,
369    pending_expires_finished: DateTime<Utc>,
370    // Locked:
371    last_lock_version: Option<Version>,
372    executor_id: Option<ExecutorId>,
373    run_id: Option<RunId>,
374    // Blocked by join set:
375    join_set_id: Option<JoinSetId>,
376    join_set_closing: Option<bool>,
377    // Finished:
378    result_kind: Option<PendingStateFinishedResultKind>,
379}
380#[derive(Debug)]
381struct CombinedState {
382    ffqn: FunctionFqn,
383    pending_state: PendingState,
384    corresponding_version: Version,
385}
386impl CombinedState {
387    fn get_next_version_assert_not_finished(&self) -> Version {
388        assert!(!self.pending_state.is_finished());
389        self.corresponding_version.increment()
390    }
391
392    #[cfg(feature = "test")]
393    fn get_next_version_or_finished(&self) -> Version {
394        if self.pending_state.is_finished() {
395            self.corresponding_version.clone()
396        } else {
397            self.corresponding_version.increment()
398        }
399    }
400}
401
402#[derive(Debug)]
403struct NotifierPendingAt {
404    scheduled_at: DateTime<Utc>,
405    ffqn: FunctionFqn,
406    component_input_digest: InputContentDigest,
407}
408
409#[derive(Debug)]
410struct NotifierExecutionFinished {
411    execution_id: ExecutionId,
412    retval: SupportedFunctionReturnValue,
413}
414
415#[derive(Debug, Default)]
416struct AppendNotifier {
417    pending_at: Option<NotifierPendingAt>,
418    execution_finished: Option<NotifierExecutionFinished>,
419    response: Option<(ExecutionId, JoinSetResponseEventOuter)>,
420}
421
422impl CombinedState {
423    fn new(dto: CombinedStateDTO, corresponding_version: Version) -> Result<Self, rusqlite::Error> {
424        let ffqn = FunctionFqn::from_str(&dto.ffqn).map_err(|parse_err| {
425            error!("Error parsing ffqn of {dto:?} - {parse_err:?}");
426            consistency_rusqlite("invalid ffqn value in `t_state`")
427        })?;
428        let pending_state = match dto {
429            // Pending - just created
430            CombinedStateDTO {
431                state,
432                ffqn: _,
433                component_id_input_digest,
434                pending_expires_finished: scheduled_at,
435                last_lock_version: None,
436                executor_id: None,
437                run_id: None,
438                join_set_id: None,
439                join_set_closing: None,
440                result_kind: None,
441            } if state == STATE_PENDING_AT => PendingState::PendingAt {
442                scheduled_at,
443                last_lock: None,
444                component_id_input_digest,
445            },
446            // Pending, previously locked
447            CombinedStateDTO {
448                state,
449                ffqn: _,
450                component_id_input_digest,
451                pending_expires_finished: scheduled_at,
452                last_lock_version: None,
453                executor_id: Some(executor_id),
454                run_id: Some(run_id),
455                join_set_id: None,
456                join_set_closing: None,
457                result_kind: None,
458            } if state == STATE_PENDING_AT => PendingState::PendingAt {
459                scheduled_at,
460                last_lock: Some(LockedBy {
461                    executor_id,
462                    run_id,
463                }),
464                component_id_input_digest,
465            },
466            CombinedStateDTO {
467                state,
468                ffqn: _,
469                component_id_input_digest,
470                pending_expires_finished: lock_expires_at,
471                last_lock_version: Some(_),
472                executor_id: Some(executor_id),
473                run_id: Some(run_id),
474                join_set_id: None,
475                join_set_closing: None,
476                result_kind: None,
477            } if state == STATE_LOCKED => PendingState::Locked(PendingStateLocked {
478                locked_by: LockedBy {
479                    executor_id,
480                    run_id,
481                },
482                lock_expires_at,
483                component_id_input_digest,
484            }),
485            CombinedStateDTO {
486                state,
487                ffqn: _,
488                component_id_input_digest,
489                pending_expires_finished: lock_expires_at,
490                last_lock_version: None,
491                executor_id: _,
492                run_id: _,
493                join_set_id: Some(join_set_id),
494                join_set_closing: Some(join_set_closing),
495                result_kind: None,
496            } if state == STATE_BLOCKED_BY_JOIN_SET => PendingState::BlockedByJoinSet {
497                join_set_id: join_set_id.clone(),
498                closing: join_set_closing,
499                lock_expires_at,
500                component_id_input_digest,
501            },
502            CombinedStateDTO {
503                state,
504                ffqn: _,
505                component_id_input_digest,
506                pending_expires_finished: finished_at,
507                last_lock_version: None,
508                executor_id: None,
509                run_id: None,
510                join_set_id: None,
511                join_set_closing: None,
512                result_kind: Some(result_kind),
513            } if state == STATE_FINISHED => PendingState::Finished {
514                finished: PendingStateFinished {
515                    finished_at,
516                    version: corresponding_version.0,
517                    result_kind,
518                },
519                component_id_input_digest,
520            },
521
522            _ => {
523                error!("Cannot deserialize pending state from  {dto:?}");
524                return Err(consistency_rusqlite("invalid `t_state`"));
525            }
526        };
527        Ok(Self {
528            ffqn,
529            pending_state,
530            corresponding_version,
531        })
532    }
533}
534
535#[derive(derive_more::Debug)]
536struct LogicalTx {
537    #[debug(skip)]
538    func: Box<dyn FnMut(&mut Transaction) + Send>,
539    sent_at: Instant,
540    func_name: &'static str,
541    #[debug(skip)]
542    commit_ack_sender: oneshot::Sender<Result<(), RusqliteError>>,
543}
544
545#[derive(derive_more::Debug)]
546enum ThreadCommand {
547    LogicalTx(LogicalTx),
548    Shutdown,
549}
550
551#[derive(Clone)]
552pub struct SqlitePool(SqlitePoolInner);
553
554type ResponseSubscribers =
555    Arc<Mutex<HashMap<ExecutionId, (oneshot::Sender<JoinSetResponseEventOuter>, u64)>>>;
556type PendingSubscribers = Arc<Mutex<PendingFfqnSubscribersHolder>>;
557type ExecutionFinishedSubscribers =
558    Mutex<HashMap<ExecutionId, HashMap<u64, oneshot::Sender<SupportedFunctionReturnValue>>>>;
559
560#[derive(Default)]
561struct PendingFfqnSubscribersHolder {
562    by_ffqns: HashMap<FunctionFqn, (mpsc::Sender<()>, u64)>,
563    by_component: HashMap<InputContentDigest /* input digest */, (mpsc::Sender<()>, u64)>,
564}
565impl PendingFfqnSubscribersHolder {
566    fn notify(&self, notifier: &NotifierPendingAt) {
567        if let Some((subscription, _)) = self.by_ffqns.get(&notifier.ffqn) {
568            debug!("Notifying pending subscriber by ffqn");
569            // Does not block
570            let _ = subscription.try_send(());
571        }
572        if let Some((subscription, _)) = self.by_component.get(&notifier.component_input_digest) {
573            debug!("Notifying pending subscriber by component");
574            // Does not block
575            let _ = subscription.try_send(());
576        }
577    }
578
579    fn insert_ffqn(&mut self, ffqn: FunctionFqn, value: (mpsc::Sender<()>, u64)) {
580        self.by_ffqns.insert(ffqn, value);
581    }
582
583    fn remove_ffqn(&mut self, ffqn: &FunctionFqn) -> Option<(mpsc::Sender<()>, u64)> {
584        self.by_ffqns.remove(ffqn)
585    }
586
587    fn insert_by_component(
588        &mut self,
589        input_content_digest: InputContentDigest,
590        value: (mpsc::Sender<()>, u64),
591    ) {
592        self.by_component.insert(input_content_digest, value);
593    }
594
595    fn remove_by_component(
596        &mut self,
597        input_content_digest: &InputContentDigest,
598    ) -> Option<(mpsc::Sender<()>, u64)> {
599        self.by_component.remove(input_content_digest)
600    }
601}
602
603#[derive(Clone)]
604struct SqlitePoolInner {
605    shutdown_requested: Arc<AtomicBool>,
606    shutdown_finished: Arc<AtomicBool>,
607    command_tx: tokio::sync::mpsc::Sender<ThreadCommand>,
608    response_subscribers: ResponseSubscribers,
609    pending_subscribers: PendingSubscribers,
610    execution_finished_subscribers: Arc<ExecutionFinishedSubscribers>,
611    join_handle: Option<Arc<std::thread::JoinHandle<()>>>, // always Some, Optional for swapping in drop.
612}
613
614#[async_trait]
615impl DbPoolCloseable for SqlitePool {
616    async fn close(self) {
617        debug!("Sqlite is closing");
618        self.0.shutdown_requested.store(true, Ordering::Release);
619        // Unblock the thread's blocking_recv. If the capacity is reached, the next processed message will trigger shutdown.
620        let _ = self.0.command_tx.try_send(ThreadCommand::Shutdown);
621        while !self.0.shutdown_finished.load(Ordering::Acquire) {
622            tokio::time::sleep(Duration::from_millis(1)).await;
623        }
624        debug!("Sqlite was closed");
625    }
626}
627
628#[async_trait]
629impl DbPool for SqlitePool {
630    fn connection(&self) -> Box<dyn DbConnection> {
631        Box::new(self.clone())
632    }
633}
634impl Drop for SqlitePool {
635    fn drop(&mut self) {
636        let arc = self.0.join_handle.take().expect("join_handle was set");
637        if let Ok(join_handle) = Arc::try_unwrap(arc) {
638            // Last holder
639            if !join_handle.is_finished() {
640                if !self.0.shutdown_finished.load(Ordering::Acquire) {
641                    // Best effort to shut down the sqlite thread.
642                    let backtrace = std::backtrace::Backtrace::capture();
643                    warn!("SqlitePool was not closed properly - {backtrace}");
644                    self.0.shutdown_requested.store(true, Ordering::Release);
645                    // Unblock the thread's blocking_recv. If the capacity is reached, the next processed message will trigger shutdown.
646                    let _ = self.0.command_tx.try_send(ThreadCommand::Shutdown);
647                    // Not joining the thread, drop might be called from async context.
648                    // We are shutting down the server anyway.
649                } else {
650                    // The thread set `shutdown_finished` as its last operation.
651                }
652            }
653        }
654    }
655}
656
657#[derive(Debug, Clone)]
658pub struct SqliteConfig {
659    pub queue_capacity: usize,
660    pub pragma_override: Option<HashMap<String, String>>,
661    pub metrics_threshold: Option<Duration>,
662}
663impl Default for SqliteConfig {
664    fn default() -> Self {
665        Self {
666            queue_capacity: 100,
667            pragma_override: None,
668            metrics_threshold: None,
669        }
670    }
671}
672
673impl SqlitePool {
674    fn init_thread(
675        path: &Path,
676        mut pragma_override: HashMap<String, String>,
677    ) -> Result<Connection, InitializationError> {
678        fn conn_execute<P: Params>(
679            conn: &Connection,
680            sql: &str,
681            params: P,
682        ) -> Result<(), InitializationError> {
683            conn.execute(sql, params).map(|_| ()).map_err(|err| {
684                error!("Cannot run `{sql}` - {err:?}");
685                InitializationError
686            })
687        }
688        fn pragma_update(
689            conn: &Connection,
690            name: &str,
691            value: &str,
692        ) -> Result<(), InitializationError> {
693            if value.is_empty() {
694                debug!("Querying PRAGMA {name}");
695                conn.pragma_query(None, name, |row| {
696                    debug!("{row:?}");
697                    Ok(())
698                })
699                .map_err(|err| {
700                    error!("cannot update pragma `{name}`=`{value}` - {err:?}");
701                    InitializationError
702                })
703            } else {
704                debug!("Setting PRAGMA {name}={value}");
705                conn.pragma_update(None, name, value).map_err(|err| {
706                    error!("cannot update pragma `{name}`=`{value}` - {err:?}");
707                    InitializationError
708                })
709            }
710        }
711
712        let conn = Connection::open_with_flags(path, OpenFlags::default()).map_err(|err| {
713            error!("cannot open the connection - {err:?}");
714            InitializationError
715        })?;
716
717        for [pragma_name, default_value] in PRAGMA {
718            let pragma_value = pragma_override
719                .remove(pragma_name)
720                .unwrap_or_else(|| default_value.to_string());
721            pragma_update(&conn, pragma_name, &pragma_value)?;
722        }
723        // drain the rest overrides
724        for (pragma_name, pragma_value) in pragma_override.drain() {
725            pragma_update(&conn, &pragma_name, &pragma_value)?;
726        }
727
728        // t_metadata
729        {
730            conn_execute(&conn, CREATE_TABLE_T_METADATA, [])?;
731            // Insert row if not exists.
732
733            let actual_version = conn
734                .prepare("SELECT schema_version FROM t_metadata ORDER BY id DESC LIMIT 1")
735                .map_err(|err| {
736                    error!("cannot select schema version - {err:?}");
737                    InitializationError
738                })?
739                .query_row([], |row| row.get::<_, u32>("schema_version"))
740                .optional()
741                .map_err(|err| {
742                    error!("Cannot read the schema version - {err:?}");
743                    InitializationError
744                })?;
745
746            match actual_version {
747                None => conn_execute(
748                    &conn,
749                    &format!(
750                        "INSERT INTO t_metadata (schema_version, created_at) VALUES
751                            ({T_METADATA_EXPECTED_SCHEMA_VERSION}, ?) ON CONFLICT DO NOTHING"
752                    ),
753                    [Utc::now()],
754                )?,
755                Some(actual_version) => {
756                    // Fail on unexpected `schema_version`.
757                    if actual_version != T_METADATA_EXPECTED_SCHEMA_VERSION {
758                        error!(
759                            "wrong schema version, expected {T_METADATA_EXPECTED_SCHEMA_VERSION}, got {actual_version}"
760                        );
761                        return Err(InitializationError);
762                    }
763                }
764            }
765        }
766
767        // t_execution_log
768        conn_execute(&conn, CREATE_TABLE_T_EXECUTION_LOG, [])?;
769        conn_execute(
770            &conn,
771            CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_VERSION,
772            [],
773        )?;
774        conn_execute(
775            &conn,
776            CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_VARIANT,
777            [],
778        )?;
779        conn_execute(
780            &conn,
781            CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_JOIN_SET,
782            [],
783        )?;
784        // t_join_set_response
785        conn_execute(&conn, CREATE_TABLE_T_JOIN_SET_RESPONSE, [])?;
786        conn_execute(
787            &conn,
788            CREATE_INDEX_IDX_T_JOIN_SET_RESPONSE_EXECUTION_ID_ID,
789            [],
790        )?;
791        conn_execute(
792            &conn,
793            CREATE_INDEX_IDX_JOIN_SET_RESPONSE_UNIQUE_CHILD_ID,
794            [],
795        )?;
796        conn_execute(
797            &conn,
798            CREATE_INDEX_IDX_JOIN_SET_RESPONSE_UNIQUE_DELAY_ID,
799            [],
800        )?;
801        // t_state
802        conn_execute(&conn, CREATE_TABLE_T_STATE, [])?;
803        conn_execute(&conn, IDX_T_STATE_LOCK_PENDING, [])?;
804        conn_execute(&conn, IDX_T_STATE_EXPIRED_TIMERS, [])?;
805        conn_execute(&conn, IDX_T_STATE_EXECUTION_ID_IS_TOP_LEVEL, [])?;
806        conn_execute(&conn, IDX_T_STATE_FFQN, [])?;
807        conn_execute(&conn, IDX_T_STATE_CREATED_AT, [])?;
808        // t_delay
809        conn_execute(&conn, CREATE_TABLE_T_DELAY, [])?;
810        // t_backtrace
811        conn_execute(&conn, CREATE_TABLE_T_BACKTRACE, [])?;
812        conn_execute(&conn, IDX_T_BACKTRACE_EXECUTION_ID_VERSION, [])?;
813        Ok(conn)
814    }
815
816    fn connection_rpc(
817        mut conn: Connection,
818        shutdown_requested: &AtomicBool,
819        shutdown_finished: &AtomicBool,
820        mut command_rx: mpsc::Receiver<ThreadCommand>,
821        metrics_threshold: Option<Duration>,
822    ) {
823        let mut histograms = Histograms::new(metrics_threshold);
824        while Self::tick(
825            &mut conn,
826            shutdown_requested,
827            &mut command_rx,
828            &mut histograms,
829        )
830        .is_ok()
831        {
832            // Loop until shutdown is set to true.
833        }
834        debug!("Closing command thread");
835        shutdown_finished.store(true, Ordering::Release);
836    }
837
838    fn tick(
839        conn: &mut Connection,
840        shutdown_requested: &AtomicBool,
841        command_rx: &mut mpsc::Receiver<ThreadCommand>,
842        histograms: &mut Histograms,
843    ) -> Result<(), ()> {
844        let mut ltx_list = Vec::new();
845        // Wait for first logical tx.
846        let mut ltx = match command_rx.blocking_recv() {
847            Some(ThreadCommand::LogicalTx(ltx)) => ltx,
848            Some(ThreadCommand::Shutdown) => {
849                debug!("Shutdown message received");
850                return Err(());
851            }
852            None => {
853                debug!("command_rx was closed");
854                return Err(());
855            }
856        };
857        let all_fns_start = std::time::Instant::now();
858        let mut physical_tx = conn
859            .transaction_with_behavior(TransactionBehavior::Immediate)
860            .map_err(|begin_err| {
861                error!("Cannot open transaction, closing sqlite - {begin_err:?}");
862            })?;
863        Self::ltx_apply_to_tx(&mut ltx, &mut physical_tx, histograms);
864        ltx_list.push(ltx);
865
866        while let Ok(more) = command_rx.try_recv() {
867            let mut ltx = match more {
868                ThreadCommand::Shutdown => {
869                    debug!("Shutdown message received");
870                    return Err(());
871                }
872                ThreadCommand::LogicalTx(ltx) => ltx,
873            };
874
875            Self::ltx_apply_to_tx(&mut ltx, &mut physical_tx, histograms);
876            ltx_list.push(ltx);
877        }
878        histograms.record_all_fns(all_fns_start.elapsed());
879
880        {
881            // Commit
882            if shutdown_requested.load(Ordering::Relaxed) {
883                debug!("Recveived shutdown during processing of the batch");
884                return Err(());
885            }
886            let commit_result = {
887                let now = std::time::Instant::now();
888                let commit_result = physical_tx.commit().map_err(RusqliteError::from);
889                histograms.record_commit(now.elapsed());
890                commit_result
891            };
892            if commit_result.is_ok() || ltx_list.len() == 1 {
893                // Happy path - just send the commit ACK.
894                for ltx in ltx_list {
895                    // Ignore the result: ThreadCommand producer timed out before awaiting the ack.
896                    let _ = ltx.commit_ack_sender.send(commit_result.clone());
897                }
898            } else {
899                warn!(
900                    "Bulk transaction failed, committing {} transactions serially",
901                    ltx_list.len()
902                );
903                // Bulk transaction failed. Replay each ltx in its own physical transaction.
904                // TODO: Use SAVEPOINT + RELEASE SAVEPOINT, ROLLBACK savepoint instead.
905                for ltx in ltx_list {
906                    Self::ltx_commit_single(ltx, conn, shutdown_requested, histograms)?;
907                }
908            }
909        }
910        histograms.print_if_elapsed();
911        Ok(())
912    }
913
914    fn ltx_commit_single(
915        mut ltx: LogicalTx,
916        conn: &mut Connection,
917        shutdown_requested: &AtomicBool,
918        histograms: &mut Histograms,
919    ) -> Result<(), ()> {
920        let mut physical_tx = conn
921            .transaction_with_behavior(TransactionBehavior::Immediate)
922            .map_err(|begin_err| {
923                error!("Cannot open transaction, closing sqlite - {begin_err:?}");
924            })?;
925        Self::ltx_apply_to_tx(&mut ltx, &mut physical_tx, histograms);
926        if shutdown_requested.load(Ordering::Relaxed) {
927            debug!("Recveived shutdown during processing of the batch");
928            return Err(());
929        }
930        let commit_result = {
931            let now = std::time::Instant::now();
932            let commit_result = physical_tx.commit().map_err(RusqliteError::from);
933            histograms.record_commit(now.elapsed());
934            commit_result
935        };
936        // Ignore the result: ThreadCommand producer timed out before awaiting the ack.
937        let _ = ltx.commit_ack_sender.send(commit_result);
938        Ok(())
939    }
940
941    fn ltx_apply_to_tx(
942        ltx: &mut LogicalTx,
943        physical_tx: &mut Transaction,
944        histograms: &mut Histograms,
945    ) {
946        let sent_latency = ltx.sent_at.elapsed();
947        let started_at = Instant::now();
948        (ltx.func)(physical_tx);
949        histograms.record_command(sent_latency, ltx.func_name, started_at.elapsed());
950    }
951
952    #[instrument(level = Level::DEBUG, skip_all, name = "sqlite_new")]
953    pub async fn new<P: AsRef<Path>>(
954        path: P,
955        config: SqliteConfig,
956    ) -> Result<Self, InitializationError> {
957        let path = path.as_ref().to_owned();
958
959        let shutdown_requested = Arc::new(AtomicBool::new(false));
960        let shutdown_finished = Arc::new(AtomicBool::new(false));
961
962        let (command_tx, command_rx) = tokio::sync::mpsc::channel(config.queue_capacity);
963        info!("Sqlite database location: {path:?}");
964        let join_handle = {
965            // Initialize the `Connection`.
966            let init_task = {
967                tokio::task::spawn_blocking(move || {
968                    Self::init_thread(&path, config.pragma_override.unwrap_or_default())
969                })
970                .await
971            };
972            let conn = match init_task {
973                Ok(res) => res?,
974                Err(join_err) => {
975                    error!("Initialization panic - {join_err:?}");
976                    return Err(InitializationError);
977                }
978            };
979            let shutdown_requested = shutdown_requested.clone();
980            let shutdown_finished = shutdown_finished.clone();
981            // Start the RPC thread.
982            std::thread::spawn(move || {
983                Self::connection_rpc(
984                    conn,
985                    &shutdown_requested,
986                    &shutdown_finished,
987                    command_rx,
988                    config.metrics_threshold,
989                );
990            })
991        };
992        Ok(SqlitePool(SqlitePoolInner {
993            shutdown_requested,
994            shutdown_finished,
995            command_tx,
996            response_subscribers: Arc::default(),
997            pending_subscribers: Arc::default(),
998            join_handle: Some(Arc::new(join_handle)),
999            execution_finished_subscribers: Arc::default(),
1000        }))
1001    }
1002
1003    /// Invokes the provided function wrapping a new [`rusqlite::Transaction`] that is committed automatically.
1004    async fn transaction<F, T, E>(&self, mut func: F, func_name: &'static str) -> Result<T, E>
1005    where
1006        F: FnMut(&mut rusqlite::Transaction) -> Result<T, E> + Send + 'static,
1007        T: Send + 'static,
1008        E: From<DbErrorGeneric> + From<RusqliteError> + Send + 'static,
1009    {
1010        let fn_res: Arc<std::sync::Mutex<Option<_>>> = Arc::default();
1011        let (commit_ack_sender, commit_ack_receiver) = oneshot::channel();
1012        let thread_command_func = {
1013            let fn_res = fn_res.clone();
1014            ThreadCommand::LogicalTx(LogicalTx {
1015                func: Box::new(move |tx| {
1016                    let func_res = func(tx);
1017                    *fn_res.lock().unwrap() = Some(func_res);
1018                }),
1019                sent_at: Instant::now(),
1020                func_name,
1021                commit_ack_sender,
1022            })
1023        };
1024        self.0
1025            .command_tx
1026            .send(thread_command_func)
1027            .await
1028            .map_err(|_send_err| DbErrorGeneric::Close)?;
1029
1030        // Wait for commit ack, then get the retval from the mutex.
1031        match commit_ack_receiver.await {
1032            Ok(Ok(())) => {
1033                let mut guard = fn_res.lock().unwrap();
1034                std::mem::take(guard.deref_mut()).expect("ltx must have been run at least once")
1035            }
1036            Ok(Err(rusqlite_err)) => Err(E::from(rusqlite_err)),
1037            Err(_) => Err(E::from(DbErrorGeneric::Close)),
1038        }
1039    }
1040
1041    fn fetch_created_event(
1042        conn: &Connection,
1043        execution_id: &ExecutionId,
1044    ) -> Result<CreateRequest, DbErrorRead> {
1045        let mut stmt = conn.prepare(
1046            "SELECT created_at, json_value FROM t_execution_log WHERE \
1047            execution_id = :execution_id AND version = 0",
1048        )?;
1049        let (created_at, event) = stmt.query_row(
1050            named_params! {
1051                ":execution_id": execution_id.to_string(),
1052            },
1053            |row| {
1054                let created_at = row.get("created_at")?;
1055                let event = row
1056                    .get::<_, JsonWrapper<ExecutionRequest>>("json_value")
1057                    .map_err(|serde| {
1058                        error!("cannot deserialize `Created` event: {row:?} - `{serde:?}`");
1059                        consistency_rusqlite("cannot deserialize `Created` event")
1060                    })?;
1061                Ok((created_at, event.0))
1062            },
1063        )?;
1064        if let ExecutionRequest::Created {
1065            ffqn,
1066            params,
1067            parent,
1068            scheduled_at,
1069            component_id,
1070            metadata,
1071            scheduled_by,
1072        } = event
1073        {
1074            Ok(CreateRequest {
1075                created_at,
1076                execution_id: execution_id.clone(),
1077                ffqn,
1078                params,
1079                parent,
1080                scheduled_at,
1081                component_id,
1082                metadata,
1083                scheduled_by,
1084            })
1085        } else {
1086            error!("Row with version=0 must be a `Created` event - {event:?}");
1087            Err(consistency_db_err("expected `Created` event").into())
1088        }
1089    }
1090
1091    fn check_expected_next_and_appending_version(
1092        expected_version: &Version,
1093        appending_version: &Version,
1094    ) -> Result<(), DbErrorWrite> {
1095        if *expected_version != *appending_version {
1096            debug!(
1097                "Version conflict - expected: {expected_version:?}, appending: {appending_version:?}"
1098            );
1099            return Err(DbErrorWrite::NonRetriable(
1100                DbErrorWriteNonRetriable::VersionConflict {
1101                    expected: expected_version.clone(),
1102                    requested: appending_version.clone(),
1103                },
1104            ));
1105        }
1106        Ok(())
1107    }
1108
1109    #[instrument(level = Level::TRACE, skip_all, fields(execution_id = %req.execution_id))]
1110    fn create_inner(
1111        tx: &Transaction,
1112        req: CreateRequest,
1113    ) -> Result<(AppendResponse, AppendNotifier), DbErrorWrite> {
1114        debug!("create_inner");
1115
1116        let version = Version::default();
1117        let execution_id = req.execution_id.clone();
1118        let execution_id_str = execution_id.to_string();
1119        let ffqn = req.ffqn.clone();
1120        let created_at = req.created_at;
1121        let scheduled_at = req.scheduled_at;
1122        let component_id = req.component_id.clone();
1123        let event = ExecutionRequest::from(req);
1124        let event_ser = serde_json::to_string(&event).map_err(|err| {
1125            error!("Cannot serialize {event:?} - {err:?}");
1126            DbErrorWriteNonRetriable::ValidationFailed("parameter serialization error".into())
1127        })?;
1128        tx.prepare(
1129                "INSERT INTO t_execution_log (execution_id, created_at, version, json_value, variant, join_set_id ) \
1130                VALUES (:execution_id, :created_at, :version, :json_value, :variant, :join_set_id)")
1131        ?
1132        .execute(named_params! {
1133            ":execution_id": &execution_id_str,
1134            ":created_at": created_at,
1135            ":version": version.0,
1136            ":json_value": event_ser,
1137            ":variant": event.variant(),
1138            ":join_set_id": event.join_set_id().map(std::string::ToString::to_string),
1139        })
1140        ?;
1141        let pending_at = {
1142            debug!("Creating with `Pending(`{scheduled_at:?}`)");
1143            tx.prepare(
1144                r"
1145                INSERT INTO t_state (
1146                    execution_id,
1147                    is_top_level,
1148                    corresponding_version,
1149                    pending_expires_finished,
1150                    ffqn,
1151                    state,
1152                    created_at,
1153                    component_id_input_digest,
1154                    updated_at,
1155                    first_scheduled_at,
1156                    intermittent_event_count
1157                    )
1158                VALUES (
1159                    :execution_id,
1160                    :is_top_level,
1161                    :corresponding_version,
1162                    :pending_expires_finished,
1163                    :ffqn,
1164                    :state,
1165                    :created_at,
1166                    :component_id_input_digest,
1167                    CURRENT_TIMESTAMP,
1168                    :first_scheduled_at,
1169                    0
1170                    )
1171                ",
1172            )?
1173            .execute(named_params! {
1174                ":execution_id": execution_id.to_string(),
1175                ":is_top_level": execution_id.is_top_level(),
1176                ":corresponding_version": version.0,
1177                ":pending_expires_finished": scheduled_at,
1178                ":ffqn": ffqn.to_string(),
1179                ":state": STATE_PENDING_AT,
1180                ":created_at": created_at,
1181                ":component_id_input_digest": component_id.input_digest,
1182                ":first_scheduled_at": scheduled_at,
1183            })?;
1184            AppendNotifier {
1185                pending_at: Some(NotifierPendingAt {
1186                    scheduled_at,
1187                    ffqn,
1188                    component_input_digest: component_id.input_digest,
1189                }),
1190                execution_finished: None,
1191                response: None,
1192            }
1193        };
1194        let next_version = Version::new(version.0 + 1);
1195        Ok((next_version, pending_at))
1196    }
1197
1198    #[instrument(level = Level::DEBUG, skip_all, fields(%execution_id, %scheduled_at, %corresponding_version))]
1199    fn update_state_pending_after_response_appended(
1200        tx: &Transaction,
1201        execution_id: &ExecutionId,
1202        scheduled_at: DateTime<Utc>,     // Changing to state PendingAt
1203        corresponding_version: &Version, // t_execution_log is not be changed
1204        component_input_digest: InputContentDigest,
1205    ) -> Result<AppendNotifier, DbErrorWrite> {
1206        debug!("Setting t_state to Pending(`{scheduled_at:?}`) after response appended");
1207        let mut stmt = tx
1208            .prepare_cached(
1209                r"
1210                UPDATE t_state
1211                SET
1212                    corresponding_version = :corresponding_version,
1213                    pending_expires_finished = :pending_expires_finished,
1214                    state = :state,
1215                    updated_at = CURRENT_TIMESTAMP,
1216
1217                    last_lock_version = NULL,
1218
1219                    join_set_id = NULL,
1220                    join_set_closing = NULL,
1221
1222                    result_kind = NULL
1223                WHERE execution_id = :execution_id
1224            ",
1225            )
1226            .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1227        let updated = stmt
1228            .execute(named_params! {
1229                ":execution_id": execution_id,
1230                ":corresponding_version": corresponding_version.0,
1231                ":pending_expires_finished": scheduled_at,
1232                ":state": STATE_PENDING_AT,
1233            })
1234            .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1235        if updated != 1 {
1236            return Err(DbErrorWrite::NotFound);
1237        }
1238        Ok(AppendNotifier {
1239            pending_at: Some(NotifierPendingAt {
1240                scheduled_at,
1241                ffqn: Self::fetch_created_event(tx, execution_id)?.ffqn,
1242                component_input_digest,
1243            }),
1244            execution_finished: None,
1245            response: None,
1246        })
1247    }
1248
1249    #[instrument(level = Level::DEBUG, skip_all, fields(%execution_id, %scheduled_at, %appending_version))]
1250    fn update_state_pending_after_event_appended(
1251        tx: &Transaction,
1252        execution_id: &ExecutionId,
1253        appending_version: &Version,
1254        scheduled_at: DateTime<Utc>, // Changing to state PendingAt
1255        intermittent_failure: bool,
1256        component_input_digest: InputContentDigest,
1257    ) -> Result<(AppendResponse, AppendNotifier), DbErrorWrite> {
1258        debug!("Setting t_state to Pending(`{scheduled_at:?}`) after event appended");
1259        // If response idx is unknown, use 0. This distinguishs the two paths.
1260        // JoinNext will always send an actual idx.
1261        let mut stmt = tx
1262            .prepare_cached(
1263                r"
1264                UPDATE t_state
1265                SET
1266                    corresponding_version = :appending_version,
1267                    pending_expires_finished = :pending_expires_finished,
1268                    state = :state,
1269                    updated_at = CURRENT_TIMESTAMP,
1270                    intermittent_event_count = intermittent_event_count + :intermittent_delta,
1271
1272                    last_lock_version = NULL,
1273
1274                    join_set_id = NULL,
1275                    join_set_closing = NULL,
1276
1277                    result_kind = NULL
1278                WHERE execution_id = :execution_id;
1279            ",
1280            )
1281            .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1282        let updated = stmt
1283            .execute(named_params! {
1284                ":execution_id": execution_id.to_string(),
1285                ":appending_version": appending_version.0,
1286                ":pending_expires_finished": scheduled_at,
1287                ":state": STATE_PENDING_AT,
1288                ":intermittent_delta": i32::from(intermittent_failure) // 0 or 1
1289            })
1290            .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1291        if updated != 1 {
1292            return Err(DbErrorWrite::NotFound);
1293        }
1294        Ok((
1295            appending_version.increment(),
1296            AppendNotifier {
1297                pending_at: Some(NotifierPendingAt {
1298                    scheduled_at,
1299                    ffqn: Self::fetch_created_event(tx, execution_id)?.ffqn,
1300                    component_input_digest,
1301                }),
1302                execution_finished: None,
1303                response: None,
1304            },
1305        ))
1306    }
1307
1308    fn update_state_locked_get_intermittent_event_count(
1309        tx: &Transaction,
1310        execution_id: &ExecutionId,
1311        executor_id: ExecutorId,
1312        run_id: RunId,
1313        lock_expires_at: DateTime<Utc>,
1314        appending_version: &Version,
1315        retry_config: ComponentRetryConfig,
1316    ) -> Result<u32, DbErrorWrite> {
1317        debug!("Setting t_state to Locked(`{lock_expires_at:?}`)");
1318        let backoff_millis =
1319            u64::try_from(retry_config.retry_exp_backoff.as_millis()).expect("backoff too big");
1320        let execution_id_str = execution_id.to_string();
1321        let mut stmt = tx
1322            .prepare_cached(
1323                r"
1324                UPDATE t_state
1325                SET
1326                    corresponding_version = :appending_version,
1327                    pending_expires_finished = :pending_expires_finished,
1328                    state = :state,
1329                    updated_at = CURRENT_TIMESTAMP,
1330
1331                    max_retries = :max_retries,
1332                    retry_exp_backoff_millis = :retry_exp_backoff_millis,
1333                    last_lock_version = :appending_version,
1334                    executor_id = :executor_id,
1335                    run_id = :run_id,
1336
1337                    join_set_id = NULL,
1338                    join_set_closing = NULL,
1339
1340                    result_kind = NULL
1341                WHERE execution_id = :execution_id
1342            ",
1343            )
1344            .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1345        let updated = stmt
1346            .execute(named_params! {
1347                ":execution_id": execution_id_str,
1348                ":appending_version": appending_version.0,
1349                ":pending_expires_finished": lock_expires_at,
1350                ":state": STATE_LOCKED,
1351                ":max_retries": retry_config.max_retries,
1352                ":retry_exp_backoff_millis": backoff_millis,
1353                ":executor_id": executor_id.to_string(),
1354                ":run_id": run_id.to_string(),
1355            })
1356            .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1357        if updated != 1 {
1358            return Err(DbErrorWrite::NotFound);
1359        }
1360
1361        // fetch intermittent event count from the just-inserted row.
1362        let intermittent_event_count = tx
1363            .prepare(
1364                "SELECT intermittent_event_count FROM t_state WHERE execution_id = :execution_id",
1365            )?
1366            .query_row(
1367                named_params! {
1368                    ":execution_id": execution_id_str,
1369                },
1370                |row| {
1371                    let intermittent_event_count = row.get("intermittent_event_count")?;
1372                    Ok(intermittent_event_count)
1373                },
1374            )
1375            .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1376
1377        Ok(intermittent_event_count)
1378    }
1379
1380    fn update_state_blocked(
1381        tx: &Transaction,
1382        execution_id: &ExecutionId,
1383        appending_version: &Version,
1384        // BlockedByJoinSet fields
1385        join_set_id: &JoinSetId,
1386        lock_expires_at: DateTime<Utc>,
1387        join_set_closing: bool,
1388    ) -> Result<
1389        AppendResponse, // next version
1390        DbErrorWrite,
1391    > {
1392        debug!("Setting t_state to BlockedByJoinSet(`{join_set_id}`)");
1393        let execution_id_str = execution_id.to_string();
1394        let mut stmt = tx.prepare_cached(
1395            r"
1396                UPDATE t_state
1397                SET
1398                    corresponding_version = :appending_version,
1399                    pending_expires_finished = :pending_expires_finished,
1400                    state = :state,
1401                    updated_at = CURRENT_TIMESTAMP,
1402
1403                    last_lock_version = NULL,
1404
1405                    join_set_id = :join_set_id,
1406                    join_set_closing = :join_set_closing,
1407
1408                    result_kind = NULL
1409                WHERE execution_id = :execution_id
1410            ",
1411        )?;
1412        let updated = stmt.execute(named_params! {
1413            ":execution_id": execution_id_str,
1414            ":appending_version": appending_version.0,
1415            ":pending_expires_finished": lock_expires_at,
1416            ":state": STATE_BLOCKED_BY_JOIN_SET,
1417            ":join_set_id": join_set_id,
1418            ":join_set_closing": join_set_closing,
1419        })?;
1420        if updated != 1 {
1421            return Err(DbErrorWrite::NotFound);
1422        }
1423        Ok(appending_version.increment())
1424    }
1425
1426    fn update_state_finished(
1427        tx: &Transaction,
1428        execution_id: &ExecutionId,
1429        appending_version: &Version,
1430        // Finished fields
1431        finished_at: DateTime<Utc>,
1432        result_kind: PendingStateFinishedResultKind,
1433    ) -> Result<(), DbErrorWrite> {
1434        debug!("Setting t_state to Finished");
1435        let execution_id_str = execution_id.to_string();
1436        let mut stmt = tx.prepare_cached(
1437            r"
1438                UPDATE t_state
1439                SET
1440                    corresponding_version = :appending_version,
1441                    pending_expires_finished = :pending_expires_finished,
1442                    state = :state,
1443                    updated_at = CURRENT_TIMESTAMP,
1444
1445                    last_lock_version = NULL,
1446                    executor_id = NULL,
1447                    run_id = NULL,
1448
1449                    join_set_id = NULL,
1450                    join_set_closing = NULL,
1451
1452                    result_kind = :result_kind
1453                WHERE execution_id = :execution_id
1454            ",
1455        )?;
1456
1457        let updated = stmt.execute(named_params! {
1458            ":execution_id": execution_id_str,
1459            ":appending_version": appending_version.0,
1460            ":pending_expires_finished": finished_at,
1461            ":state": STATE_FINISHED,
1462            ":result_kind": JsonWrapper(result_kind),
1463        })?;
1464        if updated != 1 {
1465            return Err(DbErrorWrite::NotFound);
1466        }
1467        Ok(())
1468    }
1469
1470    // Upon appending new event to t_execution_log, copy the previous t_state with changed appending_version and created_at.
1471    #[instrument(level = Level::TRACE, skip_all, fields(%execution_id, %appending_version))]
1472    fn bump_state_next_version(
1473        tx: &Transaction,
1474        execution_id: &ExecutionId,
1475        appending_version: &Version,
1476        delay_req: Option<DelayReq>,
1477    ) -> Result<AppendResponse /* next version */, DbErrorWrite> {
1478        debug!("update_index_version");
1479        let execution_id_str = execution_id.to_string();
1480        let mut stmt = tx.prepare_cached(
1481            r"
1482                UPDATE t_state
1483                SET
1484                    corresponding_version = :appending_version,
1485                    updated_at = CURRENT_TIMESTAMP
1486                WHERE execution_id = :execution_id
1487            ",
1488        )?;
1489        let updated = stmt.execute(named_params! {
1490            ":execution_id": execution_id_str,
1491            ":appending_version": appending_version.0,
1492        })?;
1493        if updated != 1 {
1494            return Err(DbErrorWrite::NotFound);
1495        }
1496        if let Some(DelayReq {
1497            join_set_id,
1498            delay_id,
1499            expires_at,
1500        }) = delay_req
1501        {
1502            debug!("Inserting delay to `t_delay`");
1503            let mut stmt = tx.prepare_cached(
1504                "INSERT INTO t_delay (execution_id, join_set_id, delay_id, expires_at) \
1505                VALUES \
1506                (:execution_id, :join_set_id, :delay_id, :expires_at)",
1507            )?;
1508            stmt.execute(named_params! {
1509                ":execution_id": execution_id_str,
1510                ":join_set_id": join_set_id.to_string(),
1511                ":delay_id": delay_id.to_string(),
1512                ":expires_at": expires_at,
1513            })?;
1514        }
1515        Ok(appending_version.increment())
1516    }
1517
1518    fn get_combined_state(
1519        tx: &Transaction,
1520        execution_id: &ExecutionId,
1521    ) -> Result<CombinedState, DbErrorRead> {
1522        let mut stmt = tx.prepare(
1523            r"
1524                SELECT
1525                    state, ffqn, component_id_input_digest, corresponding_version, pending_expires_finished,
1526                    last_lock_version, executor_id, run_id,
1527                    join_set_id, join_set_closing,
1528                    result_kind
1529                    FROM t_state
1530                WHERE
1531                    execution_id = :execution_id
1532                ",
1533        )?;
1534        stmt.query_row(
1535            named_params! {
1536                ":execution_id": execution_id.to_string(),
1537            },
1538            |row| {
1539                CombinedState::new(
1540                    CombinedStateDTO {
1541                        component_id_input_digest: row.get("component_id_input_digest")?,
1542                        state: row.get("state")?,
1543                        ffqn: row.get("ffqn")?,
1544                        pending_expires_finished: row
1545                            .get::<_, DateTime<Utc>>("pending_expires_finished")?,
1546                        last_lock_version: row
1547                            .get::<_, Option<VersionType>>("last_lock_version")?
1548                            .map(Version::new),
1549                        executor_id: row.get::<_, Option<ExecutorId>>("executor_id")?,
1550                        run_id: row.get::<_, Option<RunId>>("run_id")?,
1551                        join_set_id: row.get::<_, Option<JoinSetId>>("join_set_id")?,
1552                        join_set_closing: row.get::<_, Option<bool>>("join_set_closing")?,
1553                        result_kind: row
1554                            .get::<_, Option<JsonWrapper<PendingStateFinishedResultKind>>>(
1555                                "result_kind",
1556                            )?
1557                            .map(|wrapper| wrapper.0),
1558                    },
1559                    Version::new(row.get("corresponding_version")?),
1560                )
1561            },
1562        )
1563        .map_err(DbErrorRead::from)
1564    }
1565
1566    fn list_executions(
1567        read_tx: &Transaction,
1568        ffqn: Option<&FunctionFqn>,
1569        top_level_only: bool,
1570        pagination: &ExecutionListPagination,
1571    ) -> Result<Vec<ExecutionWithState>, DbErrorGeneric> {
1572        #[derive(Debug)]
1573        struct StatementModifier<'a> {
1574            where_vec: Vec<String>,
1575            params: Vec<(&'static str, ToSqlOutput<'a>)>,
1576            limit: u32,
1577            limit_desc: bool,
1578        }
1579
1580        fn paginate<'a, T: rusqlite::ToSql + 'static>(
1581            pagination: &'a Pagination<Option<T>>,
1582            column: &str,
1583            top_level_only: bool,
1584        ) -> Result<StatementModifier<'a>, DbErrorGeneric> {
1585            let mut where_vec: Vec<String> = vec![];
1586            let mut params: Vec<(&'static str, ToSqlOutput<'a>)> = vec![];
1587            let limit = pagination.length();
1588            let limit_desc = pagination.is_desc();
1589            match pagination {
1590                Pagination::NewerThan {
1591                    cursor: Some(cursor),
1592                    ..
1593                }
1594                | Pagination::OlderThan {
1595                    cursor: Some(cursor),
1596                    ..
1597                } => {
1598                    where_vec.push(format!("{column} {rel} :cursor", rel = pagination.rel()));
1599                    let cursor = cursor.to_sql().map_err(|err| {
1600                        error!("Possible program error - cannot convert cursor to sql - {err:?}");
1601                        DbErrorGeneric::Uncategorized("cannot convert cursor to sql".into())
1602                    })?;
1603                    params.push((":cursor", cursor));
1604                }
1605                _ => {}
1606            }
1607            if top_level_only {
1608                where_vec.push("is_top_level=true".to_string());
1609            }
1610            Ok(StatementModifier {
1611                where_vec,
1612                params,
1613                limit: u32::from(limit),
1614                limit_desc,
1615            })
1616        }
1617
1618        let mut statement_mod = match pagination {
1619            ExecutionListPagination::CreatedBy(pagination) => {
1620                paginate(pagination, "created_at", top_level_only)?
1621            }
1622            ExecutionListPagination::ExecutionId(pagination) => {
1623                paginate(pagination, "execution_id", top_level_only)?
1624            }
1625        };
1626
1627        let ffqn_temporary;
1628        if let Some(ffqn) = ffqn {
1629            statement_mod.where_vec.push("ffqn = :ffqn".to_string());
1630            ffqn_temporary = ffqn.to_string();
1631            let ffqn = ffqn_temporary
1632                .to_sql()
1633                .expect("string conversion never fails");
1634
1635            statement_mod.params.push((":ffqn", ffqn));
1636        }
1637
1638        let where_str = if statement_mod.where_vec.is_empty() {
1639            String::new()
1640        } else {
1641            format!("WHERE {}", statement_mod.where_vec.join(" AND "))
1642        };
1643        let sql = format!(
1644            r"
1645            SELECT created_at, first_scheduled_at, component_id_input_digest,
1646            state, execution_id, ffqn, corresponding_version, pending_expires_finished,
1647            last_lock_version, executor_id, run_id,
1648            join_set_id, join_set_closing,
1649            result_kind
1650            FROM t_state {where_str} ORDER BY created_at {desc} LIMIT {limit}
1651            ",
1652            desc = if statement_mod.limit_desc { "DESC" } else { "" },
1653            limit = statement_mod.limit,
1654        );
1655        let mut vec: Vec<_> = read_tx
1656            .prepare(&sql)?
1657            .query_map::<_, &[(&'static str, ToSqlOutput)], _>(
1658                statement_mod
1659                    .params
1660                    .into_iter()
1661                    .collect::<Vec<_>>()
1662                    .as_ref(),
1663                |row| {
1664                    let execution_id = row.get::<_, ExecutionId>("execution_id")?;
1665                    let component_id_input_digest: InputContentDigest =
1666                        row.get("component_id_input_digest")?;
1667                    let created_at = row.get("created_at")?;
1668                    let first_scheduled_at = row.get("first_scheduled_at")?;
1669                    let combined_state = CombinedState::new(
1670                        CombinedStateDTO {
1671                            component_id_input_digest: component_id_input_digest.clone(),
1672                            state: row.get("state")?,
1673                            ffqn: row.get("ffqn")?,
1674                            pending_expires_finished: row
1675                                .get::<_, DateTime<Utc>>("pending_expires_finished")?,
1676                            executor_id: row.get::<_, Option<ExecutorId>>("executor_id")?,
1677
1678                            last_lock_version: row
1679                                .get::<_, Option<VersionType>>("last_lock_version")?
1680                                .map(Version::new),
1681                            run_id: row.get::<_, Option<RunId>>("run_id")?,
1682                            join_set_id: row.get::<_, Option<JoinSetId>>("join_set_id")?,
1683                            join_set_closing: row.get::<_, Option<bool>>("join_set_closing")?,
1684                            result_kind: row
1685                                .get::<_, Option<JsonWrapper<PendingStateFinishedResultKind>>>(
1686                                    "result_kind",
1687                                )?
1688                                .map(|wrapper| wrapper.0),
1689                        },
1690                        Version::new(row.get("corresponding_version")?),
1691                    )?;
1692                    Ok(ExecutionWithState {
1693                        execution_id,
1694                        ffqn: combined_state.ffqn,
1695                        pending_state: combined_state.pending_state,
1696                        created_at,
1697                        first_scheduled_at,
1698                        component_digest: component_id_input_digest,
1699                    })
1700                },
1701            )?
1702            .collect::<Vec<Result<_, _>>>()
1703            .into_iter()
1704            .filter_map(|row| match row {
1705                Ok(row) => Some(row),
1706                Err(err) => {
1707                    error!("Skipping row - {err:?}");
1708                    None
1709                }
1710            })
1711            .collect();
1712
1713        if !statement_mod.limit_desc {
1714            // the list must be sorted in descending order
1715            vec.reverse();
1716        }
1717        Ok(vec)
1718    }
1719
1720    #[instrument(level = Level::TRACE, skip_all, fields(%execution_id, %run_id, %executor_id))]
1721    #[expect(clippy::too_many_arguments)]
1722    fn lock_single_execution(
1723        tx: &Transaction,
1724        created_at: DateTime<Utc>,
1725        component_id: ComponentId,
1726        execution_id: &ExecutionId,
1727        run_id: RunId,
1728        appending_version: &Version,
1729        executor_id: ExecutorId,
1730        lock_expires_at: DateTime<Utc>,
1731        retry_config: ComponentRetryConfig,
1732    ) -> Result<LockedExecution, DbErrorWrite> {
1733        debug!("lock_single_execution");
1734        let combined_state = Self::get_combined_state(tx, execution_id)?;
1735        combined_state.pending_state.can_append_lock(
1736            created_at,
1737            executor_id,
1738            run_id,
1739            lock_expires_at,
1740        )?;
1741        let expected_version = combined_state.get_next_version_assert_not_finished();
1742        Self::check_expected_next_and_appending_version(&expected_version, appending_version)?;
1743
1744        // Append to `execution_log` table.
1745        let locked_event = Locked {
1746            component_id,
1747            executor_id,
1748            lock_expires_at,
1749            run_id,
1750            retry_config,
1751        };
1752        let event = ExecutionRequest::Locked(locked_event.clone());
1753        let event_ser = serde_json::to_string(&event).map_err(|err| {
1754            warn!("Cannot serialize {event:?} - {err:?}");
1755            DbErrorWriteNonRetriable::ValidationFailed("parameter serialization error".into())
1756        })?;
1757        let mut stmt = tx
1758            .prepare_cached(
1759                "INSERT INTO t_execution_log \
1760            (execution_id, created_at, json_value, version, variant) \
1761            VALUES \
1762            (:execution_id, :created_at, :json_value, :version, :variant)",
1763            )
1764            .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1765        stmt.execute(named_params! {
1766            ":execution_id": execution_id.to_string(),
1767            ":created_at": created_at,
1768            ":json_value": event_ser,
1769            ":version": appending_version.0,
1770            ":variant": event.variant(),
1771        })
1772        .map_err(|err| {
1773            warn!("Cannot lock execution - {err:?}");
1774            DbErrorWrite::NonRetriable(DbErrorWriteNonRetriable::IllegalState("cannot lock".into()))
1775        })?;
1776
1777        // Update `t_state`
1778        let responses = Self::list_responses(tx, execution_id, None)?;
1779        let responses = responses.into_iter().map(|resp| resp.event).collect();
1780        trace!("Responses: {responses:?}");
1781
1782        let intermittent_event_count = Self::update_state_locked_get_intermittent_event_count(
1783            tx,
1784            execution_id,
1785            executor_id,
1786            run_id,
1787            lock_expires_at,
1788            appending_version,
1789            retry_config,
1790        )?;
1791        // Fetch event_history and `Created` event to construct the response.
1792        let mut events = tx
1793            .prepare(
1794                "SELECT json_value, version FROM t_execution_log WHERE \
1795                execution_id = :execution_id AND (variant = :variant1 OR variant = :variant2) \
1796                ORDER BY version",
1797            )?
1798            .query_map(
1799                named_params! {
1800                    ":execution_id": execution_id.to_string(),
1801                    ":variant1": DUMMY_CREATED.variant(),
1802                    ":variant2": DUMMY_HISTORY_EVENT.variant(),
1803                },
1804                |row| {
1805                    let created_at_fake = DateTime::from_timestamp_nanos(0); // not used
1806                    let event = row
1807                        .get::<_, JsonWrapper<ExecutionRequest>>("json_value")
1808                        .map_err(|serde| {
1809                            error!("Cannot deserialize {row:?} - {serde:?}");
1810                            consistency_rusqlite("cannot deserialize event")
1811                        })?
1812                        .0;
1813                    let version = Version(row.get("version")?);
1814
1815                    Ok(ExecutionEvent {
1816                        created_at: created_at_fake,
1817                        event,
1818                        backtrace_id: None,
1819                        version,
1820                    })
1821                },
1822            )?
1823            .collect::<Result<Vec<_>, _>>()?
1824            .into_iter()
1825            .collect::<VecDeque<_>>();
1826        let Some(ExecutionRequest::Created {
1827            ffqn,
1828            params,
1829            parent,
1830            metadata,
1831            ..
1832        }) = events.pop_front().map(|outer| outer.event)
1833        else {
1834            error!("Execution log must contain at least `Created` event");
1835            return Err(consistency_db_err("execution log must contain `Created` event").into());
1836        };
1837
1838        let event_history = events
1839            .into_iter()
1840            .map(|ExecutionEvent { event, version, .. }| {
1841                if let ExecutionRequest::HistoryEvent { event } = event {
1842                    Ok((event, version))
1843                } else {
1844                    error!("Rows can only contain `Created` and `HistoryEvent` event kinds");
1845                    Err(consistency_db_err(
1846                        "rows can only contain `Created` and `HistoryEvent` event kinds",
1847                    ))
1848                }
1849            })
1850            .collect::<Result<Vec<_>, _>>()?;
1851
1852        Ok(LockedExecution {
1853            execution_id: execution_id.clone(),
1854            metadata,
1855            next_version: appending_version.increment(),
1856            ffqn,
1857            params,
1858            event_history,
1859            responses,
1860            parent,
1861            intermittent_event_count,
1862            locked_event,
1863        })
1864    }
1865
1866    fn count_join_next(
1867        tx: &Transaction,
1868        execution_id: &ExecutionId,
1869        join_set_id: &JoinSetId,
1870    ) -> Result<u64, DbErrorRead> {
1871        let mut stmt = tx.prepare(
1872            "SELECT COUNT(*) as count FROM t_execution_log WHERE execution_id = :execution_id AND join_set_id = :join_set_id \
1873            AND history_event_type = :join_next",
1874        )?;
1875        Ok(stmt.query_row(
1876            named_params! {
1877                ":execution_id": execution_id.to_string(),
1878                ":join_set_id": join_set_id.to_string(),
1879                ":join_next": HISTORY_EVENT_TYPE_JOIN_NEXT,
1880            },
1881            |row| row.get("count"),
1882        )?)
1883    }
1884
1885    #[instrument(level = Level::TRACE, skip_all, fields(%execution_id))]
1886    #[expect(clippy::needless_return)]
1887    fn append(
1888        tx: &Transaction,
1889        execution_id: &ExecutionId,
1890        req: AppendRequest,
1891        appending_version: Version,
1892    ) -> Result<(AppendResponse, AppendNotifier), DbErrorWrite> {
1893        if matches!(req.event, ExecutionRequest::Created { .. }) {
1894            return Err(DbErrorWrite::NonRetriable(
1895                DbErrorWriteNonRetriable::ValidationFailed(
1896                    "cannot append `Created` event - use `create` instead".into(),
1897                ),
1898            ));
1899        }
1900        if let AppendRequest {
1901            event:
1902                ExecutionRequest::Locked(Locked {
1903                    component_id,
1904                    executor_id,
1905                    run_id,
1906                    lock_expires_at,
1907                    retry_config,
1908                }),
1909            created_at,
1910        } = req
1911        {
1912            return Self::lock_single_execution(
1913                tx,
1914                created_at,
1915                component_id,
1916                execution_id,
1917                run_id,
1918                &appending_version,
1919                executor_id,
1920                lock_expires_at,
1921                retry_config,
1922            )
1923            .map(|locked_execution| (locked_execution.next_version, AppendNotifier::default()));
1924        }
1925
1926        let combined_state = Self::get_combined_state(tx, execution_id)?;
1927        if combined_state.pending_state.is_finished() {
1928            debug!("Execution is already finished");
1929            return Err(DbErrorWrite::NonRetriable(
1930                DbErrorWriteNonRetriable::IllegalState("already finished".into()),
1931            ));
1932        }
1933
1934        Self::check_expected_next_and_appending_version(
1935            &combined_state.get_next_version_assert_not_finished(),
1936            &appending_version,
1937        )?;
1938        let event_ser = serde_json::to_string(&req.event).map_err(|err| {
1939            error!("Cannot serialize {:?} - {err:?}", req.event);
1940            DbErrorWriteNonRetriable::ValidationFailed("parameter serialization error".into())
1941        })?;
1942
1943        let mut stmt = tx.prepare(
1944                    "INSERT INTO t_execution_log (execution_id, created_at, json_value, version, variant, join_set_id) \
1945                    VALUES (:execution_id, :created_at, :json_value, :version, :variant, :join_set_id)")
1946                    ?;
1947        stmt.execute(named_params! {
1948            ":execution_id": execution_id.to_string(),
1949            ":created_at": req.created_at,
1950            ":json_value": event_ser,
1951            ":version": appending_version.0,
1952            ":variant": req.event.variant(),
1953            ":join_set_id": req.event.join_set_id().map(std::string::ToString::to_string),
1954        })?;
1955        // Calculate current pending state
1956
1957        match &req.event {
1958            ExecutionRequest::Created { .. } => {
1959                unreachable!("handled in the caller")
1960            }
1961
1962            ExecutionRequest::Locked { .. } => {
1963                unreachable!("handled above")
1964            }
1965
1966            ExecutionRequest::TemporarilyFailed {
1967                backoff_expires_at, ..
1968            }
1969            | ExecutionRequest::TemporarilyTimedOut {
1970                backoff_expires_at, ..
1971            } => {
1972                let (next_version, notifier) = Self::update_state_pending_after_event_appended(
1973                    tx,
1974                    execution_id,
1975                    &appending_version,
1976                    *backoff_expires_at,
1977                    true, // an intermittent failure
1978                    combined_state
1979                        .pending_state
1980                        .get_component_id_input_digest()
1981                        .clone(),
1982                )?;
1983                return Ok((next_version, notifier));
1984            }
1985
1986            ExecutionRequest::Unlocked {
1987                backoff_expires_at, ..
1988            } => {
1989                let (next_version, notifier) = Self::update_state_pending_after_event_appended(
1990                    tx,
1991                    execution_id,
1992                    &appending_version,
1993                    *backoff_expires_at,
1994                    false, // not an intermittent failure
1995                    combined_state
1996                        .pending_state
1997                        .get_component_id_input_digest()
1998                        .clone(),
1999                )?;
2000                return Ok((next_version, notifier));
2001            }
2002
2003            ExecutionRequest::Finished { result, .. } => {
2004                Self::update_state_finished(
2005                    tx,
2006                    execution_id,
2007                    &appending_version,
2008                    req.created_at,
2009                    PendingStateFinishedResultKind::from(result),
2010                )?;
2011                return Ok((
2012                    appending_version,
2013                    AppendNotifier {
2014                        pending_at: None,
2015                        execution_finished: Some(NotifierExecutionFinished {
2016                            execution_id: execution_id.clone(),
2017                            retval: result.clone(),
2018                        }),
2019                        response: None,
2020                    },
2021                ));
2022            }
2023
2024            ExecutionRequest::HistoryEvent {
2025                event:
2026                    HistoryEvent::JoinSetCreate { .. }
2027                    | HistoryEvent::JoinSetRequest {
2028                        request: JoinSetRequest::ChildExecutionRequest { .. },
2029                        ..
2030                    }
2031                    | HistoryEvent::Persist { .. }
2032                    | HistoryEvent::Schedule { .. }
2033                    | HistoryEvent::Stub { .. }
2034                    | HistoryEvent::JoinNextTooMany { .. },
2035            } => {
2036                return Ok((
2037                    Self::bump_state_next_version(tx, execution_id, &appending_version, None)?,
2038                    AppendNotifier::default(),
2039                ));
2040            }
2041
2042            ExecutionRequest::HistoryEvent {
2043                event:
2044                    HistoryEvent::JoinSetRequest {
2045                        join_set_id,
2046                        request:
2047                            JoinSetRequest::DelayRequest {
2048                                delay_id,
2049                                expires_at,
2050                                ..
2051                            },
2052                    },
2053            } => {
2054                return Ok((
2055                    Self::bump_state_next_version(
2056                        tx,
2057                        execution_id,
2058                        &appending_version,
2059                        Some(DelayReq {
2060                            join_set_id: join_set_id.clone(),
2061                            delay_id: delay_id.clone(),
2062                            expires_at: *expires_at,
2063                        }),
2064                    )?,
2065                    AppendNotifier::default(),
2066                ));
2067            }
2068
2069            ExecutionRequest::HistoryEvent {
2070                event:
2071                    HistoryEvent::JoinNext {
2072                        join_set_id,
2073                        run_expires_at,
2074                        closing,
2075                        requested_ffqn: _,
2076                    },
2077            } => {
2078                // Did the response arrive already?
2079                let join_next_count = Self::count_join_next(tx, execution_id, join_set_id)?;
2080                let nth_response =
2081                    Self::nth_response(tx, execution_id, join_set_id, join_next_count - 1)?; // Skip n-1 rows
2082                trace!("join_next_count: {join_next_count}, nth_response: {nth_response:?}");
2083                assert!(join_next_count > 0);
2084                if let Some(ResponseWithCursor {
2085                    event:
2086                        JoinSetResponseEventOuter {
2087                            created_at: nth_created_at,
2088                            ..
2089                        },
2090                    cursor: _,
2091                }) = nth_response
2092                {
2093                    let scheduled_at = max(*run_expires_at, nth_created_at); // No need to block
2094                    let (next_version, notifier) = Self::update_state_pending_after_event_appended(
2095                        tx,
2096                        execution_id,
2097                        &appending_version,
2098                        scheduled_at,
2099                        false, // not an intermittent failure
2100                        combined_state
2101                            .pending_state
2102                            .get_component_id_input_digest()
2103                            .clone(),
2104                    )?;
2105                    return Ok((next_version, notifier));
2106                }
2107                return Ok((
2108                    Self::update_state_blocked(
2109                        tx,
2110                        execution_id,
2111                        &appending_version,
2112                        join_set_id,
2113                        *run_expires_at,
2114                        *closing,
2115                    )?,
2116                    AppendNotifier::default(),
2117                ));
2118            }
2119        }
2120    }
2121
2122    fn append_response(
2123        tx: &Transaction,
2124        execution_id: &ExecutionId,
2125        response_outer: JoinSetResponseEventOuter,
2126    ) -> Result<AppendNotifier, DbErrorWrite> {
2127        let mut stmt = tx.prepare(
2128            "INSERT INTO t_join_set_response (execution_id, created_at, join_set_id, delay_id, delay_success, child_execution_id, finished_version) \
2129                    VALUES (:execution_id, :created_at, :join_set_id, :delay_id, :delay_success, :child_execution_id, :finished_version)",
2130        )?;
2131        let join_set_id = &response_outer.event.join_set_id;
2132        let (delay_id, delay_success) = match &response_outer.event.event {
2133            JoinSetResponse::DelayFinished { delay_id, result } => {
2134                (Some(delay_id.to_string()), Some(result.is_ok()))
2135            }
2136            JoinSetResponse::ChildExecutionFinished { .. } => (None, None),
2137        };
2138        let (child_execution_id, finished_version) = match &response_outer.event.event {
2139            JoinSetResponse::ChildExecutionFinished {
2140                child_execution_id,
2141                finished_version,
2142                result: _,
2143            } => (
2144                Some(child_execution_id.to_string()),
2145                Some(finished_version.0),
2146            ),
2147            JoinSetResponse::DelayFinished { .. } => (None, None),
2148        };
2149
2150        stmt.execute(named_params! {
2151            ":execution_id": execution_id.to_string(),
2152            ":created_at": response_outer.created_at,
2153            ":join_set_id": join_set_id.to_string(),
2154            ":delay_id": delay_id,
2155            ":delay_success": delay_success,
2156            ":child_execution_id": child_execution_id,
2157            ":finished_version": finished_version,
2158        })?;
2159
2160        // if the execution is going to be unblocked by this response...
2161        let combined_state = Self::get_combined_state(tx, execution_id)?;
2162        debug!("previous_pending_state: {combined_state:?}");
2163        let mut notifier = if let PendingState::BlockedByJoinSet {
2164            join_set_id: found_join_set_id,
2165            lock_expires_at, // Set to a future time if the worker is keeping the execution warm waiting for the result.
2166            closing: _,
2167            component_id_input_digest,
2168        } = combined_state.pending_state
2169            && *join_set_id == found_join_set_id
2170        {
2171            // PendingAt should be set to current time if called from expired_timers_watcher,
2172            // or to a future time if the execution is hot.
2173            let scheduled_at = max(lock_expires_at, response_outer.created_at);
2174            // TODO: Add diff test
2175            // Unblock the state.
2176            Self::update_state_pending_after_response_appended(
2177                tx,
2178                execution_id,
2179                scheduled_at,
2180                &combined_state.corresponding_version, // not changing the version
2181                component_id_input_digest,
2182            )?
2183        } else {
2184            AppendNotifier::default()
2185        };
2186        if let JoinSetResponseEvent {
2187            join_set_id,
2188            event:
2189                JoinSetResponse::DelayFinished {
2190                    delay_id,
2191                    result: _,
2192                },
2193        } = &response_outer.event
2194        {
2195            debug!(%join_set_id, %delay_id, "Deleting from `t_delay`");
2196            let mut stmt =
2197                tx.prepare_cached("DELETE FROM t_delay WHERE execution_id = :execution_id AND join_set_id = :join_set_id AND delay_id = :delay_id")
2198                ?;
2199            stmt.execute(named_params! {
2200                ":execution_id": execution_id.to_string(),
2201                ":join_set_id": join_set_id.to_string(),
2202                ":delay_id": delay_id.to_string(),
2203            })?;
2204        }
2205        notifier.response = Some((execution_id.clone(), response_outer));
2206        Ok(notifier)
2207    }
2208
2209    fn append_backtrace(
2210        tx: &Transaction,
2211        backtrace_info: &BacktraceInfo,
2212    ) -> Result<(), DbErrorWrite> {
2213        let backtrace = serde_json::to_string(&backtrace_info.wasm_backtrace).map_err(|err| {
2214            warn!(
2215                "Cannot serialize backtrace {:?} - {err:?}",
2216                backtrace_info.wasm_backtrace
2217            );
2218            DbErrorWriteNonRetriable::ValidationFailed("cannot serialize backtrace".into())
2219        })?;
2220        let mut stmt = tx
2221            .prepare(
2222                "INSERT INTO t_backtrace (execution_id, component_id, version_min_including, version_max_excluding, wasm_backtrace) \
2223                    VALUES (:execution_id, :component_id, :version_min_including, :version_max_excluding, :wasm_backtrace)",
2224            )
2225            ?;
2226        stmt.execute(named_params! {
2227            ":execution_id": backtrace_info.execution_id.to_string(),
2228            ":component_id": backtrace_info.component_id.to_string(),
2229            ":version_min_including": backtrace_info.version_min_including.0,
2230            ":version_max_excluding": backtrace_info.version_max_excluding.0,
2231            ":wasm_backtrace": backtrace,
2232        })?;
2233        Ok(())
2234    }
2235
2236    #[cfg(feature = "test")]
2237    fn get(
2238        tx: &Transaction,
2239        execution_id: &ExecutionId,
2240    ) -> Result<concepts::storage::ExecutionLog, DbErrorRead> {
2241        let mut stmt = tx.prepare(
2242            "SELECT created_at, json_value, version FROM t_execution_log WHERE \
2243                        execution_id = :execution_id ORDER BY version",
2244        )?;
2245        let events = stmt
2246            .query_map(
2247                named_params! {
2248                    ":execution_id": execution_id.to_string(),
2249                },
2250                |row| {
2251                    let created_at = row.get("created_at")?;
2252                    let event = row
2253                        .get::<_, JsonWrapper<ExecutionRequest>>("json_value")
2254                        .map_err(|serde| {
2255                            error!("Cannot deserialize {row:?} - {serde:?}");
2256                            consistency_rusqlite("cannot deserialize event")
2257                        })?
2258                        .0;
2259                    let version = Version(row.get("version")?);
2260
2261                    Ok(ExecutionEvent {
2262                        created_at,
2263                        event,
2264                        backtrace_id: None,
2265                        version,
2266                    })
2267                },
2268            )?
2269            .collect::<Result<Vec<_>, _>>()?;
2270        if events.is_empty() {
2271            return Err(DbErrorRead::NotFound);
2272        }
2273        let combined_state = Self::get_combined_state(tx, execution_id)?;
2274        let responses = Self::list_responses(tx, execution_id, None)?
2275            .into_iter()
2276            .map(|resp| resp.event)
2277            .collect();
2278        Ok(concepts::storage::ExecutionLog {
2279            execution_id: execution_id.clone(),
2280            events,
2281            responses,
2282            next_version: combined_state.get_next_version_or_finished(), // In case of finished, this will be the already last version
2283            pending_state: combined_state.pending_state,
2284        })
2285    }
2286
2287    fn list_execution_events(
2288        tx: &Transaction,
2289        execution_id: &ExecutionId,
2290        version_min: VersionType,
2291        version_max_excluding: VersionType,
2292        include_backtrace_id: bool,
2293    ) -> Result<Vec<ExecutionEvent>, DbErrorRead> {
2294        let select = if include_backtrace_id {
2295            "SELECT
2296                log.created_at,
2297                log.json_value,
2298                log.version as version,
2299                -- Select version_min_including from backtrace if a match is found, otherwise NULL
2300                bt.version_min_including AS backtrace_id
2301            FROM
2302                t_execution_log AS log
2303            LEFT OUTER JOIN -- Use LEFT JOIN to keep all logs even if no backtrace matches
2304                t_backtrace AS bt ON log.execution_id = bt.execution_id
2305                                -- Check if the log's version falls within the backtrace's range
2306                                AND log.version >= bt.version_min_including
2307                                AND log.version < bt.version_max_excluding
2308            WHERE
2309                log.execution_id = :execution_id
2310                AND log.version >= :version_min
2311                AND log.version < :version_max_excluding
2312            ORDER BY
2313                log.version;"
2314        } else {
2315            "SELECT
2316                created_at, json_value, NULL as backtrace_id, version
2317            FROM t_execution_log WHERE
2318                execution_id = :execution_id AND version >= :version_min AND version < :version_max_excluding
2319            ORDER BY version"
2320        };
2321        tx.prepare(select)?
2322            .query_map(
2323                named_params! {
2324                    ":execution_id": execution_id.to_string(),
2325                    ":version_min": version_min,
2326                    ":version_max_excluding": version_max_excluding
2327                },
2328                |row| {
2329                    let created_at = row.get("created_at")?;
2330                    let backtrace_id = row
2331                        .get::<_, Option<VersionType>>("backtrace_id")?
2332                        .map(Version::new);
2333                    let version = Version(row.get("version")?);
2334
2335                    let event = row
2336                        .get::<_, JsonWrapper<ExecutionRequest>>("json_value")
2337                        .map(|event| ExecutionEvent {
2338                            created_at,
2339                            event: event.0,
2340                            backtrace_id,
2341                            version,
2342                        })
2343                        .map_err(|serde| {
2344                            error!("Cannot deserialize {row:?} - {serde:?}");
2345                            consistency_rusqlite("cannot deserialize")
2346                        })?;
2347                    Ok(event)
2348                },
2349            )?
2350            .collect::<Result<Vec<_>, _>>()
2351            .map_err(DbErrorRead::from)
2352    }
2353
2354    fn get_execution_event(
2355        tx: &Transaction,
2356        execution_id: &ExecutionId,
2357        version: VersionType,
2358    ) -> Result<ExecutionEvent, DbErrorRead> {
2359        let mut stmt = tx.prepare(
2360            "SELECT created_at, json_value, version FROM t_execution_log WHERE \
2361                        execution_id = :execution_id AND version = :version",
2362        )?;
2363        stmt.query_row(
2364            named_params! {
2365                ":execution_id": execution_id.to_string(),
2366                ":version": version,
2367            },
2368            |row| {
2369                let created_at = row.get("created_at")?;
2370                let event = row
2371                    .get::<_, JsonWrapper<ExecutionRequest>>("json_value")
2372                    .map_err(|serde| {
2373                        error!("Cannot deserialize {row:?} - {serde:?}");
2374                        consistency_rusqlite("cannot deserialize event")
2375                    })?;
2376                let version = Version(row.get("version")?);
2377
2378                Ok(ExecutionEvent {
2379                    created_at,
2380                    event: event.0,
2381                    backtrace_id: None,
2382                    version,
2383                })
2384            },
2385        )
2386        .map_err(DbErrorRead::from)
2387    }
2388
2389    fn get_last_execution_event(
2390        tx: &Transaction,
2391        execution_id: &ExecutionId,
2392    ) -> Result<ExecutionEvent, DbErrorRead> {
2393        let mut stmt = tx.prepare(
2394            "SELECT created_at, json_value, version FROM t_execution_log WHERE \
2395                        execution_id = :execution_id ORDER BY version DESC",
2396        )?;
2397        stmt.query_row(
2398            named_params! {
2399                ":execution_id": execution_id.to_string(),
2400            },
2401            |row| {
2402                let created_at = row.get("created_at")?;
2403                let event = row
2404                    .get::<_, JsonWrapper<ExecutionRequest>>("json_value")
2405                    .map_err(|serde| {
2406                        error!("Cannot deserialize {row:?} - {serde:?}");
2407                        consistency_rusqlite("cannot deserialize event")
2408                    })?;
2409                let version = Version(row.get("version")?);
2410                Ok(ExecutionEvent {
2411                    created_at,
2412                    event: event.0,
2413                    backtrace_id: None,
2414                    version: version.clone(),
2415                })
2416            },
2417        )
2418        .map_err(DbErrorRead::from)
2419    }
2420
2421    fn list_responses(
2422        tx: &Transaction,
2423        execution_id: &ExecutionId,
2424        pagination: Option<Pagination<u32>>,
2425    ) -> Result<Vec<ResponseWithCursor>, DbErrorRead> {
2426        // TODO: Add test
2427        let mut params: Vec<(&'static str, Box<dyn rusqlite::ToSql>)> = vec![];
2428        let mut sql = "SELECT \
2429            r.id, r.created_at, r.join_set_id,  r.delay_id, r.delay_success, r.child_execution_id, r.finished_version, l.json_value \
2430            FROM t_join_set_response r LEFT OUTER JOIN t_execution_log l ON r.child_execution_id = l.execution_id \
2431            WHERE \
2432            r.execution_id = :execution_id \
2433            AND ( r.finished_version = l.version OR r.child_execution_id IS NULL ) \
2434            "
2435        .to_string();
2436        let limit = match &pagination {
2437            Some(
2438                pagination @ (Pagination::NewerThan { cursor, .. }
2439                | Pagination::OlderThan { cursor, .. }),
2440            ) => {
2441                params.push((":cursor", Box::new(cursor)));
2442                write!(sql, " AND r.id {rel} :cursor", rel = pagination.rel()).unwrap();
2443                Some(pagination.length())
2444            }
2445            None => None,
2446        };
2447        sql.push_str(" ORDER BY id");
2448        if pagination.as_ref().is_some_and(Pagination::is_desc) {
2449            sql.push_str(" DESC");
2450        }
2451        if let Some(limit) = limit {
2452            write!(sql, " LIMIT {limit}").unwrap();
2453        }
2454        params.push((":execution_id", Box::new(execution_id.to_string())));
2455        tx.prepare(&sql)?
2456            .query_map::<_, &[(&'static str, &dyn ToSql)], _>(
2457                params
2458                    .iter()
2459                    .map(|(key, value)| (*key, value.as_ref()))
2460                    .collect::<Vec<_>>()
2461                    .as_ref(),
2462                Self::parse_response_with_cursor,
2463            )?
2464            .collect::<Result<Vec<_>, rusqlite::Error>>()
2465            .map_err(DbErrorRead::from)
2466    }
2467
2468    fn parse_response_with_cursor(
2469        row: &rusqlite::Row<'_>,
2470    ) -> Result<ResponseWithCursor, rusqlite::Error> {
2471        let id = row.get("id")?;
2472        let created_at: DateTime<Utc> = row.get("created_at")?;
2473        let join_set_id = row.get::<_, JoinSetId>("join_set_id")?;
2474        let event = match (
2475            row.get::<_, Option<DelayId>>("delay_id")?,
2476            row.get::<_, Option<bool>>("delay_success")?,
2477            row.get::<_, Option<ExecutionIdDerived>>("child_execution_id")?,
2478            row.get::<_, Option<VersionType>>("finished_version")?,
2479            row.get::<_, Option<JsonWrapper<ExecutionRequest>>>("json_value")?,
2480        ) {
2481            (Some(delay_id), Some(delay_success), None, None, None) => {
2482                JoinSetResponse::DelayFinished {
2483                    delay_id,
2484                    result: delay_success.then_some(()).ok_or(()),
2485                }
2486            }
2487            (
2488                None,
2489                None,
2490                Some(child_execution_id),
2491                Some(finished_version),
2492                Some(JsonWrapper(ExecutionRequest::Finished { result, .. })),
2493            ) => JoinSetResponse::ChildExecutionFinished {
2494                child_execution_id,
2495                finished_version: Version(finished_version),
2496                result,
2497            },
2498            (delay, delay_success, child, finished, result) => {
2499                error!(
2500                    "Invalid row in t_join_set_response {id} - {delay:?} {delay_success:?} {child:?} {finished:?} {:?}",
2501                    result.map(|it| it.0)
2502                );
2503                return Err(consistency_rusqlite("invalid row in t_join_set_response"));
2504            }
2505        };
2506        Ok(ResponseWithCursor {
2507            cursor: id,
2508            event: JoinSetResponseEventOuter {
2509                event: JoinSetResponseEvent { join_set_id, event },
2510                created_at,
2511            },
2512        })
2513    }
2514
2515    fn nth_response(
2516        tx: &Transaction,
2517        execution_id: &ExecutionId,
2518        join_set_id: &JoinSetId,
2519        skip_rows: u64,
2520    ) -> Result<Option<ResponseWithCursor>, DbErrorRead> {
2521        // TODO: Add test
2522        tx
2523            .prepare(
2524                "SELECT r.id, r.created_at, r.join_set_id, \
2525                    r.delay_id, r.delay_success, \
2526                    r.child_execution_id, r.finished_version, l.json_value \
2527                    FROM t_join_set_response r LEFT OUTER JOIN t_execution_log l ON r.child_execution_id = l.execution_id \
2528                    WHERE \
2529                    r.execution_id = :execution_id AND r.join_set_id = :join_set_id AND \
2530                    (
2531                    r.finished_version = l.version \
2532                    OR \
2533                    r.child_execution_id IS NULL \
2534                    ) \
2535                    ORDER BY id \
2536                    LIMIT 1 OFFSET :offset",
2537            )
2538            ?
2539            .query_row(
2540                named_params! {
2541                    ":execution_id": execution_id.to_string(),
2542                    ":join_set_id": join_set_id.to_string(),
2543                    ":offset": skip_rows,
2544                },
2545                Self::parse_response_with_cursor,
2546            )
2547            .optional()
2548            .map_err(DbErrorRead::from)
2549    }
2550
2551    fn delay_response(
2552        tx: &Transaction,
2553        execution_id: &ExecutionId,
2554        delay_id: &DelayId,
2555    ) -> Result<Option<bool>, DbErrorRead> {
2556        // TODO: Add test
2557        tx.prepare(
2558            "SELECT delay_success \
2559                    FROM t_join_set_response \
2560                    WHERE \
2561                    execution_id = :execution_id AND delay_id = :delay_id
2562                    ",
2563        )?
2564        .query_row(
2565            named_params! {
2566                ":execution_id": execution_id.to_string(),
2567                ":delay_id": delay_id.to_string(),
2568            },
2569            |row| {
2570                let delay_success = row.get::<_, bool>("delay_success")?;
2571                Ok(delay_success)
2572            },
2573        )
2574        .optional()
2575        .map_err(DbErrorRead::from)
2576    }
2577
2578    // TODO(perf): Instead of OFFSET an per-execution sequential ID could improve the read performance.
2579    #[instrument(level = Level::TRACE, skip_all)]
2580    fn get_responses_with_offset(
2581        tx: &Transaction,
2582        execution_id: &ExecutionId,
2583        skip_rows: usize,
2584    ) -> Result<Vec<JoinSetResponseEventOuter>, DbErrorRead> {
2585        // TODO: Add test
2586        tx.prepare(
2587            "SELECT r.id, r.created_at, r.join_set_id, \
2588            r.delay_id, r.delay_success, \
2589            r.child_execution_id, r.finished_version, l.json_value \
2590            FROM t_join_set_response r LEFT OUTER JOIN t_execution_log l ON r.child_execution_id = l.execution_id \
2591            WHERE \
2592            r.execution_id = :execution_id AND \
2593            ( \
2594            r.finished_version = l.version \
2595            OR r.child_execution_id IS NULL \
2596            ) \
2597            ORDER BY id \
2598            LIMIT -1 OFFSET :offset",
2599        )
2600        ?
2601        .query_map(
2602            named_params! {
2603                ":execution_id": execution_id.to_string(),
2604                ":offset": skip_rows,
2605            },
2606            Self::parse_response_with_cursor,
2607        )
2608        ?
2609        .collect::<Result<Vec<_>, _>>()
2610        .map(|resp| resp.into_iter().map(|vec| vec.event).collect())
2611        .map_err(DbErrorRead::from)
2612    }
2613
2614    fn get_pending_of_single_ffqn(
2615        mut stmt: CachedStatement,
2616        batch_size: usize,
2617        pending_at_or_sooner: DateTime<Utc>,
2618        ffqn: &FunctionFqn,
2619    ) -> Result<Vec<(ExecutionId, Version)>, ()> {
2620        stmt.query_map(
2621            named_params! {
2622                ":pending_expires_finished": pending_at_or_sooner,
2623                ":ffqn": ffqn.to_string(),
2624                ":batch_size": batch_size,
2625            },
2626            |row| {
2627                let execution_id = row.get::<_, ExecutionId>("execution_id")?;
2628                let next_version =
2629                    Version::new(row.get::<_, VersionType>("corresponding_version")?).increment();
2630                Ok((execution_id, next_version))
2631            },
2632        )
2633        .map_err(|err| {
2634            warn!("Ignoring consistency error {err:?}");
2635        })?
2636        .collect::<Result<Vec<_>, _>>()
2637        .map_err(|err| {
2638            warn!("Ignoring consistency error {err:?}");
2639        })
2640    }
2641
2642    /// Get executions and their next versions
2643    fn get_pending_by_ffqns(
2644        conn: &Connection,
2645        batch_size: usize,
2646        pending_at_or_sooner: DateTime<Utc>,
2647        ffqns: &[FunctionFqn],
2648    ) -> Result<Vec<(ExecutionId, Version)>, DbErrorGeneric> {
2649        let mut execution_ids_versions = Vec::with_capacity(batch_size);
2650        for ffqn in ffqns {
2651            // Select executions in PendingAt.
2652            let stmt = conn
2653                .prepare_cached(&format!(
2654                    r#"
2655                    SELECT execution_id, corresponding_version FROM t_state WHERE
2656                    state = "{STATE_PENDING_AT}" AND
2657                    pending_expires_finished <= :pending_expires_finished AND ffqn = :ffqn
2658                    ORDER BY pending_expires_finished LIMIT :batch_size
2659                    "#
2660                ))
2661                .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
2662
2663            if let Ok(execs_and_versions) = Self::get_pending_of_single_ffqn(
2664                stmt,
2665                batch_size - execution_ids_versions.len(),
2666                pending_at_or_sooner,
2667                ffqn,
2668            ) {
2669                execution_ids_versions.extend(execs_and_versions);
2670                if execution_ids_versions.len() == batch_size {
2671                    // Prioritieze lowering of db requests, although ffqns later in the list might get starved.
2672                    break;
2673                }
2674                // consistency errors are ignored since we want to return at least some rows.
2675            }
2676        }
2677        Ok(execution_ids_versions)
2678    }
2679
2680    fn get_pending_by_component_input_digest(
2681        conn: &Connection,
2682        batch_size: usize,
2683        pending_at_or_sooner: DateTime<Utc>,
2684        input_digest: &InputContentDigest,
2685    ) -> Result<Vec<(ExecutionId, Version)>, DbErrorGeneric> {
2686        let mut stmt = conn
2687            .prepare_cached(&format!(
2688                r#"
2689                SELECT execution_id, corresponding_version FROM t_state WHERE
2690                state = "{STATE_PENDING_AT}" AND
2691                pending_expires_finished <= :pending_expires_finished AND
2692                component_id_input_digest = :component_id_input_digest
2693                ORDER BY pending_expires_finished LIMIT :batch_size
2694                "#
2695            ))
2696            .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
2697
2698        stmt.query_map(
2699            named_params! {
2700                ":pending_expires_finished": pending_at_or_sooner,
2701                ":component_id_input_digest": input_digest,
2702                ":batch_size": batch_size,
2703            },
2704            |row| {
2705                let execution_id = row.get::<_, ExecutionId>("execution_id")?;
2706                let next_version =
2707                    Version::new(row.get::<_, VersionType>("corresponding_version")?).increment();
2708                Ok((execution_id, next_version))
2709            },
2710        )?
2711        .collect::<Result<Vec<_>, _>>()
2712        .map_err(DbErrorGeneric::from)
2713    }
2714
2715    // Must be called after write transaction for a correct happens-before relationship.
2716    #[instrument(level = Level::TRACE, skip_all)]
2717    fn notify_all(&self, notifiers: Vec<AppendNotifier>, current_time: DateTime<Utc>) {
2718        let (pending_ats, finished_execs, responses) = {
2719            let (mut pending_ats, mut finished_execs, mut responses) =
2720                (Vec::new(), Vec::new(), Vec::new());
2721            for notifier in notifiers {
2722                if let Some(pending_at) = notifier.pending_at {
2723                    pending_ats.push(pending_at);
2724                }
2725                if let Some(finished) = notifier.execution_finished {
2726                    finished_execs.push(finished);
2727                }
2728                if let Some(response) = notifier.response {
2729                    responses.push(response);
2730                }
2731            }
2732            (pending_ats, finished_execs, responses)
2733        };
2734
2735        // Notify pending_at subscribers.
2736        if !pending_ats.is_empty() {
2737            let guard = self.0.pending_subscribers.lock().unwrap();
2738            for pending_at in pending_ats {
2739                Self::notify_pending_locked(&pending_at, current_time, &guard);
2740            }
2741        }
2742        // Notify execution finished subscribers.
2743        // Every NotifierExecutionFinished value belongs to a different execution, since only `append(Finished)` can produce `NotifierExecutionFinished`.
2744        if !finished_execs.is_empty() {
2745            let mut guard = self.0.execution_finished_subscribers.lock().unwrap();
2746            for finished in finished_execs {
2747                if let Some(listeners_of_exe_id) = guard.remove(&finished.execution_id) {
2748                    for (_tag, sender) in listeners_of_exe_id {
2749                        // Sending while holding the lock but the oneshot sender does not block.
2750                        // If `wait_for_finished_result` happens after the append, it would receive the finished value instead.
2751                        let _ = sender.send(finished.retval.clone());
2752                    }
2753                }
2754            }
2755        }
2756        // Notify response subscribers.
2757        if !responses.is_empty() {
2758            let mut guard = self.0.response_subscribers.lock().unwrap();
2759            for (execution_id, response) in responses {
2760                if let Some((sender, _)) = guard.remove(&execution_id) {
2761                    let _ = sender.send(response);
2762                }
2763            }
2764        }
2765    }
2766
2767    fn notify_pending_locked(
2768        notifier: &NotifierPendingAt,
2769        current_time: DateTime<Utc>,
2770        ffqn_to_pending_subscription: &std::sync::MutexGuard<PendingFfqnSubscribersHolder>,
2771    ) {
2772        // No need to remove here, cleanup is handled by the caller.
2773        if notifier.scheduled_at <= current_time {
2774            ffqn_to_pending_subscription.notify(notifier);
2775        }
2776    }
2777
2778    fn upgrade_execution_component(
2779        tx: &Transaction,
2780        execution_id: &ExecutionId,
2781        old: &InputContentDigest,
2782        new: &InputContentDigest,
2783    ) -> Result<(), DbErrorWrite> {
2784        debug!("Updating t_state to component {new}");
2785        let mut stmt = tx
2786            .prepare_cached(
2787                r"
2788                UPDATE t_state
2789                SET
2790                    updated_at = CURRENT_TIMESTAMP,
2791                    component_id_input_digest = :new
2792                WHERE
2793                    execution_id = :execution_id AND
2794                    component_id_input_digest = :old
2795            ",
2796            )
2797            .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
2798        let updated = stmt
2799            .execute(named_params! {
2800                ":execution_id": execution_id,
2801                ":old": old,
2802                ":new": new,
2803            })
2804            .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
2805        if updated != 1 {
2806            return Err(DbErrorWrite::NotFound);
2807        }
2808        Ok(())
2809    }
2810}
2811
2812#[async_trait]
2813impl DbExecutor for SqlitePool {
2814    #[instrument(level = Level::TRACE, skip(self))]
2815    async fn lock_pending_by_ffqns(
2816        &self,
2817        batch_size: usize,
2818        pending_at_or_sooner: DateTime<Utc>,
2819        ffqns: Arc<[FunctionFqn]>,
2820        created_at: DateTime<Utc>,
2821        component_id: ComponentId,
2822        executor_id: ExecutorId,
2823        lock_expires_at: DateTime<Utc>,
2824        run_id: RunId,
2825        retry_config: ComponentRetryConfig,
2826    ) -> Result<LockPendingResponse, DbErrorGeneric> {
2827        let execution_ids_versions = self
2828            .transaction(
2829                move |conn| {
2830                    Self::get_pending_by_ffqns(conn, batch_size, pending_at_or_sooner, &ffqns)
2831                },
2832                "lock_pending_by_ffqns_get",
2833            )
2834            .await?;
2835        if execution_ids_versions.is_empty() {
2836            Ok(vec![])
2837        } else {
2838            debug!("Locking {execution_ids_versions:?}");
2839            self.transaction(
2840                move |tx| {
2841                    let mut locked_execs = Vec::with_capacity(execution_ids_versions.len());
2842                    // Append lock
2843                    for (execution_id, version) in &execution_ids_versions {
2844                        match Self::lock_single_execution(
2845                            tx,
2846                            created_at,
2847                            component_id.clone(),
2848                            execution_id,
2849                            run_id,
2850                            version,
2851                            executor_id,
2852                            lock_expires_at,
2853                            retry_config,
2854                        ) {
2855                            Ok(locked) => locked_execs.push(locked),
2856                            Err(err) => {
2857                                warn!("Locking row {execution_id} failed - {err:?}");
2858                            }
2859                        }
2860                    }
2861                    Ok(locked_execs)
2862                },
2863                "lock_pending_by_ffqns_one",
2864            )
2865            .await
2866        }
2867    }
2868
2869    #[instrument(level = Level::TRACE, skip(self))]
2870    async fn lock_pending_by_component_id(
2871        &self,
2872        batch_size: usize,
2873        pending_at_or_sooner: DateTime<Utc>,
2874        component_id: &ComponentId,
2875        created_at: DateTime<Utc>,
2876        executor_id: ExecutorId,
2877        lock_expires_at: DateTime<Utc>,
2878        run_id: RunId,
2879        retry_config: ComponentRetryConfig,
2880    ) -> Result<LockPendingResponse, DbErrorGeneric> {
2881        let component_id = component_id.clone();
2882        let execution_ids_versions = self
2883            .transaction(
2884                {
2885                    let component_id = component_id.clone();
2886                    move |conn| {
2887                        Self::get_pending_by_component_input_digest(
2888                            conn,
2889                            batch_size,
2890                            pending_at_or_sooner,
2891                            &component_id.input_digest,
2892                        )
2893                    }
2894                },
2895                "lock_pending_by_component_id_get",
2896            )
2897            .await?;
2898        if execution_ids_versions.is_empty() {
2899            Ok(vec![])
2900        } else {
2901            debug!("Locking {execution_ids_versions:?}");
2902            self.transaction(
2903                move |tx| {
2904                    let mut locked_execs = Vec::with_capacity(execution_ids_versions.len());
2905                    // Append lock
2906                    for (execution_id, version) in &execution_ids_versions {
2907                        match Self::lock_single_execution(
2908                            tx,
2909                            created_at,
2910                            component_id.clone(),
2911                            execution_id,
2912                            run_id,
2913                            version,
2914                            executor_id,
2915                            lock_expires_at,
2916                            retry_config,
2917                        ) {
2918                            Ok(locked) => locked_execs.push(locked),
2919                            Err(err) => {
2920                                warn!("Locking row {execution_id} failed - {err:?}");
2921                            }
2922                        }
2923                    }
2924                    Ok(locked_execs)
2925                },
2926                "lock_pending_by_component_id_one",
2927            )
2928            .await
2929        }
2930    }
2931
2932    #[instrument(level = Level::DEBUG, skip(self))]
2933    async fn lock_one(
2934        &self,
2935        created_at: DateTime<Utc>,
2936        component_id: ComponentId,
2937        execution_id: &ExecutionId,
2938        run_id: RunId,
2939        version: Version,
2940        executor_id: ExecutorId,
2941        lock_expires_at: DateTime<Utc>,
2942        retry_config: ComponentRetryConfig,
2943    ) -> Result<LockedExecution, DbErrorWrite> {
2944        debug!(%execution_id, "lock_one");
2945        let execution_id = execution_id.clone();
2946        self.transaction(
2947            move |tx| {
2948                Self::lock_single_execution(
2949                    tx,
2950                    created_at,
2951                    component_id.clone(),
2952                    &execution_id,
2953                    run_id,
2954                    &version,
2955                    executor_id,
2956                    lock_expires_at,
2957                    retry_config,
2958                )
2959            },
2960            "lock_inner",
2961        )
2962        .await
2963    }
2964
2965    #[instrument(level = Level::DEBUG, skip(self, req))]
2966    async fn append(
2967        &self,
2968        execution_id: ExecutionId,
2969        version: Version,
2970        req: AppendRequest,
2971    ) -> Result<AppendResponse, DbErrorWrite> {
2972        debug!(%req, "append");
2973        trace!(?req, "append");
2974        let created_at = req.created_at;
2975        let (version, notifier) = self
2976            .transaction(
2977                move |tx| Self::append(tx, &execution_id, req.clone(), version.clone()),
2978                "append",
2979            )
2980            .await?;
2981        self.notify_all(vec![notifier], created_at);
2982        Ok(version)
2983    }
2984
2985    #[instrument(level = Level::DEBUG, skip_all)]
2986    async fn append_batch_respond_to_parent(
2987        &self,
2988        events: AppendEventsToExecution,
2989        response: AppendResponseToExecution,
2990        current_time: DateTime<Utc>,
2991    ) -> Result<AppendBatchResponse, DbErrorWrite> {
2992        debug!("append_batch_respond_to_parent");
2993        if events.execution_id == response.parent_execution_id {
2994            // Pending state would be wrong.
2995            // This is not a panic because it depends on DB state.
2996            return Err(DbErrorWrite::NonRetriable(
2997                DbErrorWriteNonRetriable::ValidationFailed(
2998                    "Parameters `execution_id` and `parent_execution_id` cannot be the same".into(),
2999                ),
3000            ));
3001        }
3002        if events.batch.is_empty() {
3003            error!("Batch cannot be empty");
3004            return Err(DbErrorWrite::NonRetriable(
3005                DbErrorWriteNonRetriable::ValidationFailed("batch cannot be empty".into()),
3006            ));
3007        }
3008        let (version, notifiers) = {
3009            self.transaction(
3010                move |tx| {
3011                    let mut version = events.version.clone();
3012                    let mut notifier_of_child = None;
3013                    for append_request in &events.batch {
3014                        let (v, n) = Self::append(
3015                            tx,
3016                            &events.execution_id,
3017                            append_request.clone(),
3018                            version,
3019                        )?;
3020                        version = v;
3021                        notifier_of_child = Some(n);
3022                    }
3023
3024                    let pending_at_parent = Self::append_response(
3025                        tx,
3026                        &response.parent_execution_id,
3027                        JoinSetResponseEventOuter {
3028                            created_at: response.created_at,
3029                            event: JoinSetResponseEvent {
3030                                join_set_id: response.join_set_id.clone(),
3031                                event: JoinSetResponse::ChildExecutionFinished {
3032                                    child_execution_id: response.child_execution_id.clone(),
3033                                    finished_version: response.finished_version.clone(),
3034                                    result: response.result.clone(),
3035                                },
3036                            },
3037                        },
3038                    )?;
3039                    Ok::<_, DbErrorWrite>((
3040                        version,
3041                        vec![
3042                            notifier_of_child.expect("checked that the batch is not empty"),
3043                            pending_at_parent,
3044                        ],
3045                    ))
3046                },
3047                "append_batch_respond_to_parent",
3048            )
3049            .await?
3050        };
3051        self.notify_all(notifiers, current_time);
3052        Ok(version)
3053    }
3054
3055    // Supports only one subscriber (executor) per ffqn.
3056    // A new subscriber replaces the old one, which will eventually time out, which is fine.
3057    #[instrument(level = Level::TRACE, skip(self, timeout_fut))]
3058    async fn wait_for_pending_by_ffqn(
3059        &self,
3060        pending_at_or_sooner: DateTime<Utc>,
3061        ffqns: Arc<[FunctionFqn]>,
3062        timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
3063    ) {
3064        let unique_tag: u64 = rand::random();
3065        let (sender, mut receiver) = mpsc::channel(1); // senders must use `try_send`
3066        {
3067            let mut pending_subscribers = self.0.pending_subscribers.lock().unwrap();
3068            for ffqn in ffqns.as_ref() {
3069                pending_subscribers.insert_ffqn(ffqn.clone(), (sender.clone(), unique_tag));
3070            }
3071        }
3072        async {
3073            let Ok(execution_ids_versions) = self
3074                .transaction(
3075                    {
3076                        let ffqns = ffqns.clone();
3077                        move |conn| Self::get_pending_by_ffqns(conn, 1, pending_at_or_sooner, ffqns.as_ref())
3078                    },
3079                    "get_pending_by_ffqns",
3080                )
3081                .await
3082            else {
3083                trace!(
3084                    "Ignoring get_pending error and waiting in for timeout to avoid executor repolling too soon"
3085                );
3086                timeout_fut.await;
3087                return;
3088            };
3089            if !execution_ids_versions.is_empty() {
3090                trace!("Not waiting, database already contains new pending executions");
3091                return;
3092            }
3093            tokio::select! { // future's liveness: Dropping the loser immediately.
3094                _ = receiver.recv() => {
3095                    trace!("Received a notification");
3096                }
3097                () = timeout_fut => {
3098                }
3099            }
3100        }.await;
3101        // Clean up ffqn_to_pending_subscription in any case
3102        {
3103            let mut pending_subscribers = self.0.pending_subscribers.lock().unwrap();
3104            for ffqn in ffqns.as_ref() {
3105                match pending_subscribers.remove_ffqn(ffqn) {
3106                    Some((_, tag)) if tag == unique_tag => {
3107                        // Cleanup OK.
3108                    }
3109                    Some(other) => {
3110                        // Reinsert foreign sender.
3111                        pending_subscribers.insert_ffqn(ffqn.clone(), other);
3112                    }
3113                    None => {
3114                        // Value was replaced and cleaned up already.
3115                    }
3116                }
3117            }
3118        }
3119    }
3120
3121    // Supports only one subscriber (executor) per component id.
3122    // A new subscriber replaces the old one, which will eventually time out, which is fine.
3123    async fn wait_for_pending_by_component_id(
3124        &self,
3125        pending_at_or_sooner: DateTime<Utc>,
3126        component_id: &ComponentId,
3127        timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
3128    ) {
3129        let unique_tag: u64 = rand::random();
3130        let (sender, mut receiver) = mpsc::channel(1); // senders must use `try_send`
3131        {
3132            let mut pending_subscribers = self.0.pending_subscribers.lock().unwrap();
3133            pending_subscribers.insert_by_component(
3134                component_id.input_digest.clone(),
3135                (sender.clone(), unique_tag),
3136            );
3137        }
3138        async {
3139            let Ok(execution_ids_versions) = self
3140                .transaction(
3141                    {
3142                        let input_digest =component_id.input_digest.clone();
3143                        move |conn| Self::get_pending_by_component_input_digest(conn, 1, pending_at_or_sooner, &input_digest)
3144                    },
3145                    "get_pending_by_component_input_digest",
3146                )
3147                .await
3148            else {
3149                trace!(
3150                    "Ignoring get_pending error and waiting in for timeout to avoid executor repolling too soon"
3151                );
3152                timeout_fut.await;
3153                return;
3154            };
3155            if !execution_ids_versions.is_empty() {
3156                trace!("Not waiting, database already contains new pending executions");
3157                return;
3158            }
3159            tokio::select! { // future's liveness: Dropping the loser immediately.
3160                _ = receiver.recv() => {
3161                    trace!("Received a notification");
3162                }
3163                () = timeout_fut => {
3164                }
3165            }
3166        }.await;
3167        // Clean up ffqn_to_pending_subscription in any case
3168        {
3169            let mut pending_subscribers = self.0.pending_subscribers.lock().unwrap();
3170
3171            match pending_subscribers.remove_by_component(&component_id.input_digest) {
3172                Some((_, tag)) if tag == unique_tag => {
3173                    // Cleanup OK.
3174                }
3175                Some(other) => {
3176                    // Reinsert foreign sender.
3177                    pending_subscribers
3178                        .insert_by_component(component_id.input_digest.clone(), other);
3179                }
3180                None => {
3181                    // Value was replaced and cleaned up already.
3182                }
3183            }
3184        }
3185    }
3186
3187    async fn get_last_execution_event(
3188        &self,
3189        execution_id: &ExecutionId,
3190    ) -> Result<ExecutionEvent, DbErrorRead> {
3191        let execution_id = execution_id.clone();
3192        self.transaction(
3193            move |tx| Self::get_last_execution_event(tx, &execution_id),
3194            "get_last_execution_event",
3195        )
3196        .await
3197    }
3198}
3199
3200#[async_trait]
3201impl DbConnection for SqlitePool {
3202    #[instrument(level = Level::DEBUG, skip_all, fields(execution_id = %req.execution_id))]
3203    async fn create(&self, req: CreateRequest) -> Result<AppendResponse, DbErrorWrite> {
3204        debug!("create");
3205        trace!(?req, "create");
3206        let created_at = req.created_at;
3207        let (version, notifier) = self
3208            .transaction(move |tx| Self::create_inner(tx, req.clone()), "create")
3209            .await?;
3210        self.notify_all(vec![notifier], created_at);
3211        Ok(version)
3212    }
3213
3214    #[instrument(level = Level::DEBUG, skip(self, batch))]
3215    async fn append_batch(
3216        &self,
3217        current_time: DateTime<Utc>,
3218        batch: Vec<AppendRequest>,
3219        execution_id: ExecutionId,
3220        version: Version,
3221    ) -> Result<AppendBatchResponse, DbErrorWrite> {
3222        debug!("append_batch");
3223        trace!(?batch, "append_batch");
3224        assert!(!batch.is_empty(), "Empty batch request");
3225
3226        let (version, notifier) = self
3227            .transaction(
3228                move |tx| {
3229                    let mut version = version.clone();
3230                    let mut notifier = None;
3231                    for append_request in &batch {
3232                        let (v, n) =
3233                            Self::append(tx, &execution_id, append_request.clone(), version)?;
3234                        version = v;
3235                        notifier = Some(n);
3236                    }
3237                    Ok::<_, DbErrorWrite>((
3238                        version,
3239                        notifier.expect("checked that the batch is not empty"),
3240                    ))
3241                },
3242                "append_batch",
3243            )
3244            .await?;
3245
3246        self.notify_all(vec![notifier], current_time);
3247        Ok(version)
3248    }
3249
3250    #[instrument(level = Level::DEBUG, skip(self, batch, child_req))]
3251    async fn append_batch_create_new_execution(
3252        &self,
3253        current_time: DateTime<Utc>,
3254        batch: Vec<AppendRequest>,
3255        execution_id: ExecutionId,
3256        version: Version,
3257        child_req: Vec<CreateRequest>,
3258    ) -> Result<AppendBatchResponse, DbErrorWrite> {
3259        debug!("append_batch_create_new_execution");
3260        trace!(?batch, ?child_req, "append_batch_create_new_execution");
3261        assert!(!batch.is_empty(), "Empty batch request");
3262
3263        let (version, notifiers) = self
3264            .transaction(
3265                move |tx| {
3266                    let mut notifier = None;
3267                    let mut version = version.clone();
3268                    for append_request in &batch {
3269                        let (v, n) =
3270                            Self::append(tx, &execution_id, append_request.clone(), version)?;
3271                        version = v;
3272                        notifier = Some(n);
3273                    }
3274                    let mut notifiers = Vec::new();
3275                    notifiers.push(notifier.expect("checked that the batch is not empty"));
3276
3277                    for child_req in &child_req {
3278                        let (_, notifier) = Self::create_inner(tx, child_req.clone())?;
3279                        notifiers.push(notifier);
3280                    }
3281                    Ok::<_, DbErrorWrite>((version, notifiers))
3282                },
3283                "append_batch_create_new_execution_inner",
3284            )
3285            .await?;
3286        self.notify_all(notifiers, current_time);
3287        Ok(version)
3288    }
3289
3290    #[cfg(feature = "test")]
3291    #[instrument(level = Level::DEBUG, skip(self))]
3292    async fn get(
3293        &self,
3294        execution_id: &ExecutionId,
3295    ) -> Result<concepts::storage::ExecutionLog, DbErrorRead> {
3296        trace!("get");
3297        let execution_id = execution_id.clone();
3298        self.transaction(move |tx| Self::get(tx, &execution_id), "get")
3299            .await
3300    }
3301
3302    #[instrument(level = Level::DEBUG, skip(self))]
3303    async fn list_execution_events(
3304        &self,
3305        execution_id: &ExecutionId,
3306        since: &Version,
3307        max_length: VersionType,
3308        include_backtrace_id: bool,
3309    ) -> Result<Vec<ExecutionEvent>, DbErrorRead> {
3310        let execution_id = execution_id.clone();
3311        let since = since.0;
3312        self.transaction(
3313            move |tx| {
3314                Self::list_execution_events(
3315                    tx,
3316                    &execution_id,
3317                    since,
3318                    since + max_length,
3319                    include_backtrace_id,
3320                )
3321            },
3322            "get",
3323        )
3324        .await
3325    }
3326
3327    // Supports only one subscriber per execution id.
3328    // A new call will overwrite the old subscriber, the old one will end
3329    // with a timeout, which is fine.
3330    #[instrument(level = Level::DEBUG, skip(self, timeout_fut))]
3331    async fn subscribe_to_next_responses(
3332        &self,
3333        execution_id: &ExecutionId,
3334        start_idx: usize,
3335        timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
3336    ) -> Result<Vec<JoinSetResponseEventOuter>, DbErrorReadWithTimeout> {
3337        debug!("next_responses");
3338        let unique_tag: u64 = rand::random();
3339        let execution_id = execution_id.clone();
3340
3341        let cleanup = || {
3342            let mut guard = self.0.response_subscribers.lock().unwrap();
3343            match guard.remove(&execution_id) {
3344                Some((_, tag)) if tag == unique_tag => {} // Cleanup OK.
3345                Some(other) => {
3346                    // Reinsert foreign sender.
3347                    guard.insert(execution_id.clone(), other);
3348                }
3349                None => {} // Value was replaced and cleaned up already, or notification was sent.
3350            }
3351        };
3352
3353        let response_subscribers = self.0.response_subscribers.clone();
3354        let resp_or_receiver = {
3355            let execution_id = execution_id.clone();
3356            self.transaction(
3357                move |tx| {
3358                    let responses = Self::get_responses_with_offset(tx, &execution_id, start_idx)?;
3359                    if responses.is_empty() {
3360                        // cannot race as we have the transaction write lock
3361                        let (sender, receiver) = oneshot::channel();
3362                        response_subscribers
3363                            .lock()
3364                            .unwrap()
3365                            .insert(execution_id.clone(), (sender, unique_tag));
3366                        Ok::<_, DbErrorReadWithTimeout>(itertools::Either::Right(receiver))
3367                    } else {
3368                        Ok(itertools::Either::Left(responses))
3369                    }
3370                },
3371                "subscribe_to_next_responses",
3372            )
3373            .await
3374        }
3375        .inspect_err(|_| {
3376            cleanup();
3377        })?;
3378        match resp_or_receiver {
3379            itertools::Either::Left(resp) => Ok(resp), // no need for cleanup
3380            itertools::Either::Right(receiver) => {
3381                let res = async move {
3382                    tokio::select! {
3383                        resp = receiver => {
3384                            let resp = resp.map_err(|_| DbErrorGeneric::Close)?;
3385                            Ok(vec![resp])
3386                        }
3387                        () = timeout_fut => Err(DbErrorReadWithTimeout::Timeout),
3388                    }
3389                }
3390                .await;
3391                cleanup();
3392                res
3393            }
3394        }
3395    }
3396
3397    // Supports multiple subscribers.
3398    async fn wait_for_finished_result(
3399        &self,
3400        execution_id: &ExecutionId,
3401        timeout_fut: Option<Pin<Box<dyn Future<Output = ()> + Send>>>,
3402    ) -> Result<SupportedFunctionReturnValue, DbErrorReadWithTimeout> {
3403        let unique_tag: u64 = rand::random();
3404        let execution_id = execution_id.clone();
3405        let execution_finished_subscription = self.0.execution_finished_subscribers.clone();
3406
3407        let cleanup = || {
3408            let mut guard = self.0.execution_finished_subscribers.lock().unwrap();
3409            if let Some(subscribers) = guard.get_mut(&execution_id) {
3410                subscribers.remove(&unique_tag);
3411            }
3412        };
3413
3414        let resp_or_receiver = {
3415            let execution_id = execution_id.clone();
3416            self.transaction(move |tx| {
3417                let pending_state =
3418                    Self::get_combined_state(tx, &execution_id)?.pending_state;
3419                if let PendingState::Finished { finished, .. } = pending_state {
3420                    let event =
3421                        Self::get_execution_event(tx, &execution_id, finished.version)?;
3422                    if let ExecutionRequest::Finished { result, ..} = event.event {
3423                        Ok(itertools::Either::Left(result))
3424                    } else {
3425                        error!("Mismatch, expected Finished row: {event:?} based on t_state {finished}");
3426                        Err(DbErrorReadWithTimeout::from(consistency_db_err(
3427                            "cannot get finished event based on t_state version"
3428                        )))
3429                    }
3430                } else {
3431                    // Cannot race with the notifier as we have the transaction write lock:
3432                    // Either the finished event was appended previously, thus `itertools::Either::Left` was selected,
3433                    // or we end up here. If this tx fails, the cleanup will remove this entry.
3434                    let (sender, receiver) = oneshot::channel();
3435                    let mut guard = execution_finished_subscription.lock().unwrap();
3436                    guard.entry(execution_id.clone()).or_default().insert(unique_tag, sender);
3437                    Ok(itertools::Either::Right(receiver))
3438                }
3439            }, "wait_for_finished_result")
3440            .await
3441        }
3442        .inspect_err(|_| {
3443            // This cleanup can race with the notification sender, since both are running after a transaction was finished.
3444            // If the notification sender wins, it removes our oneshot sender and puts a value in it, cleanup will not find the unique tag.
3445            // If cleanup wins, it simply removes the oneshot sender.
3446            cleanup();
3447        })?;
3448
3449        let timeout_fut = timeout_fut.unwrap_or_else(|| Box::pin(std::future::pending()));
3450        match resp_or_receiver {
3451            itertools::Either::Left(resp) => Ok(resp), // no need for cleanup
3452            itertools::Either::Right(receiver) => {
3453                let res = async move {
3454                    tokio::select! {
3455                        resp = receiver => {
3456                            Ok(resp.expect("the notifier sends to all listeners, cannot race with cleanup"))
3457                        }
3458                        () = timeout_fut => Err(DbErrorReadWithTimeout::Timeout),
3459                    }
3460                }
3461                .await;
3462                cleanup();
3463                res
3464            }
3465        }
3466    }
3467
3468    #[cfg(feature = "test")]
3469    #[instrument(level = Level::DEBUG, skip(self, response_event), fields(join_set_id = %response_event.join_set_id))]
3470    async fn append_response(
3471        &self,
3472        created_at: DateTime<Utc>,
3473        execution_id: ExecutionId,
3474        response_event: JoinSetResponseEvent,
3475    ) -> Result<(), DbErrorWrite> {
3476        debug!("append_response");
3477        let event = JoinSetResponseEventOuter {
3478            created_at,
3479            event: response_event,
3480        };
3481        let notifier = self
3482            .transaction(
3483                move |tx| Self::append_response(tx, &execution_id, event.clone()),
3484                "append_response",
3485            )
3486            .await?;
3487        self.notify_all(vec![notifier], created_at);
3488        Ok(())
3489    }
3490
3491    #[instrument(level = Level::DEBUG, skip_all, fields(%join_set_id, %execution_id))]
3492    async fn append_delay_response(
3493        &self,
3494        created_at: DateTime<Utc>,
3495        execution_id: ExecutionId,
3496        join_set_id: JoinSetId,
3497        delay_id: DelayId,
3498        result: Result<(), ()>,
3499    ) -> Result<AppendDelayResponseOutcome, DbErrorWrite> {
3500        debug!("append_delay_response");
3501        let event = JoinSetResponseEventOuter {
3502            created_at,
3503            event: JoinSetResponseEvent {
3504                join_set_id,
3505                event: JoinSetResponse::DelayFinished {
3506                    delay_id: delay_id.clone(),
3507                    result,
3508                },
3509            },
3510        };
3511        let res = self
3512            .transaction(
3513                {
3514                    let execution_id = execution_id.clone();
3515                    move |tx| Self::append_response(tx, &execution_id, event.clone())
3516                },
3517                "append_delay_response",
3518            )
3519            .await;
3520        match res {
3521            Ok(notifier) => {
3522                self.notify_all(vec![notifier], created_at);
3523                Ok(AppendDelayResponseOutcome::Success)
3524            }
3525            Err(DbErrorWrite::NonRetriable(DbErrorWriteNonRetriable::Conflict)) => {
3526                let delay_success = self
3527                    .transaction(
3528                        move |tx| Self::delay_response(tx, &execution_id, &delay_id),
3529                        "delay_response",
3530                    )
3531                    .await?;
3532                match delay_success {
3533                    Some(true) => Ok(AppendDelayResponseOutcome::AlreadyFinished),
3534                    Some(false) => Ok(AppendDelayResponseOutcome::AlreadyCancelled),
3535                    None => Err(DbErrorWrite::Generic(DbErrorGeneric::Uncategorized(
3536                        "insert failed yet select did not find the response".into(),
3537                    ))),
3538                }
3539            }
3540            Err(err) => Err(err),
3541        }
3542    }
3543
3544    #[instrument(level = Level::DEBUG, skip_all)]
3545    async fn append_backtrace(&self, append: BacktraceInfo) -> Result<(), DbErrorWrite> {
3546        debug!("append_backtrace");
3547        self.transaction(
3548            move |tx| Self::append_backtrace(tx, &append),
3549            "append_backtrace",
3550        )
3551        .await
3552    }
3553
3554    #[instrument(level = Level::DEBUG, skip_all)]
3555    async fn append_backtrace_batch(&self, batch: Vec<BacktraceInfo>) -> Result<(), DbErrorWrite> {
3556        debug!("append_backtrace_batch");
3557        self.transaction(
3558            move |tx| {
3559                for append in &batch {
3560                    Self::append_backtrace(tx, append)?;
3561                }
3562                Ok(())
3563            },
3564            "append_backtrace",
3565        )
3566        .await
3567    }
3568
3569    #[instrument(level = Level::DEBUG, skip_all)]
3570    async fn get_backtrace(
3571        &self,
3572        execution_id: &ExecutionId,
3573        filter: BacktraceFilter,
3574    ) -> Result<BacktraceInfo, DbErrorRead> {
3575        debug!("get_last_backtrace");
3576        let execution_id = execution_id.clone();
3577
3578        self.transaction(
3579            move |tx| {
3580                let select = "SELECT component_id, version_min_including, version_max_excluding, wasm_backtrace FROM t_backtrace \
3581                                WHERE execution_id = :execution_id";
3582                let mut params: Vec<(&'static str, Box<dyn rusqlite::ToSql>)> = vec![(":execution_id", Box::new(execution_id.to_string()))];
3583                let select = match &filter {
3584                    BacktraceFilter::Specific(version) =>{
3585                        params.push((":version", Box::new(version.0)));
3586                        format!("{select} AND version_min_including <= :version AND version_max_excluding > :version")
3587                    },
3588                    BacktraceFilter::First => format!("{select} ORDER BY version_min_including LIMIT 1"),
3589                    BacktraceFilter::Last => format!("{select} ORDER BY version_min_including DESC LIMIT 1")
3590                };
3591                tx
3592                    .prepare(&select)
3593                    ?
3594                    .query_row::<_, &[(&'static str, &dyn ToSql)], _>(
3595                        params
3596                            .iter()
3597                            .map(|(key, value)| (*key, value.as_ref()))
3598                            .collect::<Vec<_>>()
3599                            .as_ref(),
3600                    |row| {
3601                        Ok(BacktraceInfo {
3602                            execution_id: execution_id.clone(),
3603                            component_id: row.get::<_, JsonWrapper<_> >("component_id")?.0,
3604                            version_min_including: Version::new(row.get::<_, VersionType>("version_min_including")?),
3605                            version_max_excluding: Version::new(row.get::<_, VersionType>("version_max_excluding")?),
3606                            wasm_backtrace: row.get::<_, JsonWrapper<_>>("wasm_backtrace")?.0,
3607                        })
3608                    },
3609                ).map_err(DbErrorRead::from)
3610            },
3611            "get_last_backtrace",
3612        ).await
3613    }
3614
3615    /// Get currently expired delays and locks.
3616    #[instrument(level = Level::TRACE, skip(self))]
3617    async fn get_expired_timers(
3618        &self,
3619        at: DateTime<Utc>,
3620    ) -> Result<Vec<ExpiredTimer>, DbErrorGeneric> {
3621        self.transaction(
3622            move |conn| {
3623                let mut expired_timers = conn.prepare(
3624                    "SELECT execution_id, join_set_id, delay_id FROM t_delay WHERE expires_at <= :at",
3625                )?
3626                .query_map(
3627                        named_params! {
3628                            ":at": at,
3629                        },
3630                        |row| {
3631                            let execution_id = row.get("execution_id")?;
3632                            let join_set_id = row.get::<_, JoinSetId>("join_set_id")?;
3633                            let delay_id = row.get::<_, DelayId>("delay_id")?;
3634                            let delay = ExpiredDelay { execution_id, join_set_id, delay_id };
3635                            Ok(ExpiredTimer::Delay(delay))
3636                        },
3637                    )?
3638                    .collect::<Result<Vec<_>, _>>()?;
3639                // Extend with expired locks
3640                let expired = conn.prepare(&format!(r#"
3641                    SELECT execution_id, last_lock_version, corresponding_version, intermittent_event_count, max_retries, retry_exp_backoff_millis,
3642                    executor_id, run_id
3643                    FROM t_state
3644                    WHERE pending_expires_finished <= :at AND state = "{STATE_LOCKED}"
3645                    "#
3646                )
3647                )?
3648                .query_map(
3649                        named_params! {
3650                            ":at": at,
3651                        },
3652                        |row| {
3653                            let execution_id = row.get("execution_id")?;
3654                            let locked_at_version = Version::new(row.get("last_lock_version")?);
3655                            let next_version = Version::new(row.get("corresponding_version")?).increment();
3656                            let intermittent_event_count = row.get("intermittent_event_count")?;
3657                            let max_retries = row.get("max_retries")?;
3658                            let retry_exp_backoff_millis = row.get("retry_exp_backoff_millis")?;
3659                            let executor_id = row.get("executor_id")?;
3660                            let run_id = row.get("run_id")?;
3661                            let lock = ExpiredLock {
3662                                execution_id,
3663                                locked_at_version,
3664                                next_version,
3665                                intermittent_event_count,
3666                                max_retries,
3667                                retry_exp_backoff: Duration::from_millis(retry_exp_backoff_millis),
3668                                locked_by: LockedBy { executor_id, run_id },
3669                            };
3670                            Ok(ExpiredTimer::Lock(lock))
3671                        }
3672                    )?
3673                    .collect::<Result<Vec<_>, _>>()?;
3674                expired_timers.extend(expired);
3675                if !expired_timers.is_empty() {
3676                    debug!("get_expired_timers found {expired_timers:?}");
3677                }
3678                Ok(expired_timers)
3679            }, "get_expired_timers"
3680        )
3681        .await
3682    }
3683
3684    async fn get_execution_event(
3685        &self,
3686        execution_id: &ExecutionId,
3687        version: &Version,
3688    ) -> Result<ExecutionEvent, DbErrorRead> {
3689        let version = version.0;
3690        let execution_id = execution_id.clone();
3691        self.transaction(
3692            move |tx| Self::get_execution_event(tx, &execution_id, version),
3693            "get_execution_event",
3694        )
3695        .await
3696    }
3697
3698    async fn get_pending_state(
3699        &self,
3700        execution_id: &ExecutionId,
3701    ) -> Result<PendingState, DbErrorRead> {
3702        let execution_id = execution_id.clone();
3703        Ok(self
3704            .transaction(
3705                move |tx| Self::get_combined_state(tx, &execution_id),
3706                "get_pending_state",
3707            )
3708            .await?
3709            .pending_state)
3710    }
3711
3712    async fn list_executions(
3713        &self,
3714        ffqn: Option<FunctionFqn>,
3715        top_level_only: bool,
3716        pagination: ExecutionListPagination,
3717    ) -> Result<Vec<ExecutionWithState>, DbErrorGeneric> {
3718        self.transaction(
3719            move |tx| Self::list_executions(tx, ffqn.as_ref(), top_level_only, &pagination),
3720            "list_executions",
3721        )
3722        .await
3723    }
3724
3725    async fn list_responses(
3726        &self,
3727        execution_id: &ExecutionId,
3728        pagination: Pagination<u32>,
3729    ) -> Result<Vec<ResponseWithCursor>, DbErrorRead> {
3730        let execution_id = execution_id.clone();
3731        self.transaction(
3732            move |tx| Self::list_responses(tx, &execution_id, Some(pagination)),
3733            "list_responses",
3734        )
3735        .await
3736    }
3737
3738    async fn upgrade_execution_component(
3739        &self,
3740        execution_id: &ExecutionId,
3741        old: &InputContentDigest,
3742        new: &InputContentDigest,
3743    ) -> Result<(), DbErrorWrite> {
3744        let execution_id = execution_id.clone();
3745        let old = old.clone();
3746        let new = new.clone();
3747        self.transaction(
3748            move |tx| Self::upgrade_execution_component(tx, &execution_id, &old, &new),
3749            "upgrade_execution_component",
3750        )
3751        .await
3752    }
3753}
3754
3755#[cfg(any(test, feature = "tempfile"))]
3756pub mod tempfile {
3757    use super::{SqliteConfig, SqlitePool};
3758    use tempfile::NamedTempFile;
3759
3760    pub async fn sqlite_pool() -> (SqlitePool, Option<NamedTempFile>) {
3761        if let Ok(path) = std::env::var("SQLITE_FILE") {
3762            (
3763                SqlitePool::new(path, SqliteConfig::default())
3764                    .await
3765                    .unwrap(),
3766                None,
3767            )
3768        } else {
3769            let file = NamedTempFile::new().unwrap();
3770            let path = file.path();
3771            (
3772                SqlitePool::new(path, SqliteConfig::default())
3773                    .await
3774                    .unwrap(),
3775                Some(file),
3776            )
3777        }
3778    }
3779}