Skip to main content

obeli_sk_db_postgres/
postgres_dao.rs

1use crate::postgres_dao::ddl::{ADMIN_DB_NAME, T_METADATA_EXPECTED_SCHEMA_VERSION};
2use async_trait::async_trait;
3use chrono::{DateTime, Utc};
4use concepts::{
5    ComponentId, ComponentRetryConfig, ComponentType, ContentDigest, ExecutionId, FunctionFqn,
6    JoinSetId, StrVariant, SupportedFunctionReturnValue,
7    component_id::{Digest, InputContentDigest},
8    prefixed_ulid::{DelayId, DeploymentId, ExecutionIdDerived, ExecutorId, RunId},
9    storage::{
10        AppendBatchResponse, AppendDelayResponseOutcome, AppendEventsToExecution, AppendRequest,
11        AppendResponse, AppendResponseToExecution, BacktraceFilter, BacktraceInfo, CreateRequest,
12        DUMMY_CREATED, DUMMY_HISTORY_EVENT, DbConnection, DbErrorGeneric, DbErrorRead,
13        DbErrorReadWithTimeout, DbErrorWrite, DbErrorWriteNonRetriable, DbExecutor, DbExternalApi,
14        DbPool, DbPoolCloseable, DeploymentState, ExecutionEvent, ExecutionListPagination,
15        ExecutionRequest, ExecutionWithState, ExecutionWithStateRequestsResponses, ExpiredDelay,
16        ExpiredLock, ExpiredTimer, HISTORY_EVENT_TYPE_JOIN_NEXT, HistoryEvent, JoinSetRequest,
17        JoinSetResponse, JoinSetResponseEvent, JoinSetResponseEventOuter,
18        ListExecutionEventsResponse, ListExecutionsFilter, ListLogsResponse, ListResponsesResponse,
19        LockPendingResponse, Locked, LockedBy, LockedExecution, LogEntry, LogEntryRow, LogFilter,
20        LogInfoAppendRow, LogLevel, LogStreamType, Pagination, PendingState,
21        PendingStateBlockedByJoinSet, PendingStateFinishedResultKind, PendingStateMergedPause,
22        ResponseCursor, ResponseWithCursor, STATE_BLOCKED_BY_JOIN_SET, STATE_FINISHED,
23        STATE_LOCKED, STATE_PENDING_AT, TimeoutOutcome, Version, VersionType, WasmBacktrace,
24    },
25};
26use db_common::{
27    AppendNotifier, CombinedState, CombinedStateDTO, NotifierExecutionFinished, NotifierPendingAt,
28    PendingFfqnSubscribersHolder,
29};
30use deadpool_postgres::{Client, ManagerConfig, Pool, RecyclingMethod};
31use hashbrown::HashMap;
32use std::{collections::VecDeque, pin::Pin, str::FromStr as _, sync::Arc, time::Duration};
33use std::{fmt::Write as _, panic::Location};
34use strum::IntoEnumIterator as _;
35use tokio::sync::{mpsc, oneshot};
36use tokio_postgres::{
37    NoTls, Row, Transaction,
38    row::RowIndex,
39    types::{FromSql, Json, ToSql},
40};
41use tracing::{Level, debug, error, info, instrument, trace, warn};
42use tracing_error::SpanTrace;
43
44#[track_caller]
45fn get<'a, T: FromSql<'a>, I: RowIndex + std::fmt::Display + Copy>(
46    row: &'a Row,
47    name: I,
48) -> Result<T, DbErrorGeneric> {
49    match row.try_get(name) {
50        Ok(ok) => Ok(ok),
51        Err(err) => {
52            // no map_err, cannot attach `track_caller`
53            Err(consistency_db_err(format!(
54                "Failed to retrieve column '{name}': {err:?}"
55            )))
56        }
57    }
58}
59
60mod ddl {
61    use super::{STATE_LOCKED, STATE_PENDING_AT};
62    use concepts::storage::HISTORY_EVENT_TYPE_JOIN_NEXT;
63    use const_format::formatcp;
64
65    pub const ADMIN_DB_NAME: &str = "postgres";
66
67    pub const T_METADATA_EXPECTED_SCHEMA_VERSION: i32 = 2;
68
69    // T_METADATA
70    pub const CREATE_TABLE_T_METADATA: &str = r"
71CREATE TABLE IF NOT EXISTS t_metadata (
72    id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
73    schema_version INTEGER NOT NULL,
74    created_at TIMESTAMPTZ NOT NULL
75);
76";
77
78    // T_EXECUTION_LOG
79    pub const CREATE_TABLE_T_EXECUTION_LOG: &str = r"
80CREATE TABLE IF NOT EXISTS t_execution_log (
81    execution_id TEXT NOT NULL,
82    created_at TIMESTAMPTZ NOT NULL,
83    json_value JSON NOT NULL,
84    version BIGINT NOT NULL CHECK (version >= 0),
85    variant TEXT NOT NULL,
86    join_set_id TEXT,
87    history_event_type TEXT GENERATED ALWAYS AS (json_value #>> '{history_event,event,type}') STORED,
88    PRIMARY KEY (execution_id, version)
89);
90";
91
92    // Indexes for t_execution_log
93    pub const CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_VERSION: &str = r"
94CREATE INDEX IF NOT EXISTS idx_t_execution_log_execution_id_version ON t_execution_log (execution_id, version);
95";
96
97    pub const CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_VARIANT: &str = r"
98CREATE INDEX IF NOT EXISTS idx_t_execution_log_execution_id_variant ON t_execution_log (execution_id, variant);
99";
100
101    pub const CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_JOIN_SET: &str = const_format::formatcp!(
102        "CREATE INDEX IF NOT EXISTS idx_t_execution_log_execution_id_join_set ON t_execution_log
103        (execution_id, join_set_id, history_event_type) WHERE history_event_type='{}';",
104        HISTORY_EVENT_TYPE_JOIN_NEXT
105    );
106
107    // T_JOIN_SET_RESPONSE
108    pub const CREATE_TABLE_T_JOIN_SET_RESPONSE: &str = r"
109CREATE TABLE IF NOT EXISTS t_join_set_response (
110    id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
111    created_at TIMESTAMPTZ NOT NULL,
112    execution_id TEXT NOT NULL,
113    join_set_id TEXT NOT NULL,
114
115    delay_id TEXT,
116    delay_success BOOLEAN,
117
118    child_execution_id TEXT,
119    finished_version BIGINT CHECK (finished_version >= 0),
120
121    UNIQUE (execution_id, join_set_id, delay_id, child_execution_id)
122);
123";
124
125    // Indexes for t_join_set_response
126    pub const CREATE_INDEX_IDX_T_JOIN_SET_RESPONSE_EXECUTION_ID_ID: &str = r"
127CREATE INDEX IF NOT EXISTS idx_t_join_set_response_execution_id_id ON t_join_set_response (execution_id, id);
128";
129
130    pub const CREATE_INDEX_IDX_JOIN_SET_RESPONSE_UNIQUE_CHILD_ID: &str = r"
131CREATE UNIQUE INDEX IF NOT EXISTS idx_join_set_response_unique_child_id
132ON t_join_set_response (child_execution_id) WHERE child_execution_id IS NOT NULL;
133";
134
135    pub const CREATE_INDEX_IDX_JOIN_SET_RESPONSE_UNIQUE_DELAY_ID: &str = r"
136CREATE UNIQUE INDEX IF NOT EXISTS idx_join_set_response_unique_delay_id
137ON t_join_set_response (delay_id) WHERE delay_id IS NOT NULL;
138";
139
140    // T_STATE
141    pub const CREATE_TABLE_T_STATE: &str = r"
142CREATE TABLE IF NOT EXISTS t_state (
143    execution_id TEXT NOT NULL,
144    is_top_level BOOLEAN NOT NULL,
145    corresponding_version BIGINT NOT NULL CHECK (corresponding_version >= 0),
146    ffqn TEXT NOT NULL,
147    created_at TIMESTAMPTZ NOT NULL,
148    component_id_input_digest BYTEA NOT NULL,
149    component_type TEXT NOT NULL,
150    first_scheduled_at TIMESTAMPTZ NOT NULL,
151    deployment_id TEXT NOT NULL,
152    is_paused BOOLEAN NOT NULL,
153
154    pending_expires_finished TIMESTAMPTZ NOT NULL,
155    state TEXT NOT NULL,
156    updated_at TIMESTAMPTZ NOT NULL,
157    intermittent_event_count BIGINT NOT NULL CHECK (intermittent_event_count >=0),
158
159    max_retries BIGINT CHECK (max_retries >= 0),
160    retry_exp_backoff_millis BIGINT CHECK (retry_exp_backoff_millis >= 0),
161    last_lock_version BIGINT CHECK (last_lock_version >= 0),
162    executor_id TEXT,
163    run_id TEXT,
164
165    join_set_id TEXT,
166    join_set_closing BOOLEAN,
167
168    result_kind JSONB,
169
170    PRIMARY KEY (execution_id)
171);
172";
173
174    // Indexes for t_state
175    // For `get_pending_of_single_ffqn`
176    pub const IDX_T_STATE_LOCK_PENDING_BY_FFQN: &str = formatcp!(
177        "CREATE INDEX IF NOT EXISTS idx_t_state_lock_pending_by_ffqn ON t_state (state, pending_expires_finished, ffqn) WHERE state = '{}';",
178        STATE_PENDING_AT
179    );
180    // For `get_pending_by_component_input_digest`
181    pub const IDX_T_STATE_LOCK_PENDING_BY_COMPONENT: &str = formatcp!(
182        "CREATE INDEX IF NOT EXISTS idx_t_state_lock_pending_by_component ON t_state (state, pending_expires_finished, component_id_input_digest) WHERE state = '{}';",
183        STATE_PENDING_AT
184    );
185
186    pub const IDX_T_STATE_EXPIRED_LOCKS: &str = formatcp!(
187        "CREATE INDEX IF NOT EXISTS idx_t_state_expired_locks ON t_state (pending_expires_finished) WHERE state = '{}';",
188        STATE_LOCKED
189    );
190
191    pub const IDX_T_STATE_EXECUTION_ID_IS_TOP_LEVEL: &str = r"
192CREATE INDEX IF NOT EXISTS idx_t_state_execution_id_is_root ON t_state (execution_id, is_top_level);
193";
194
195    pub const IDX_T_STATE_FFQN: &str = r"
196CREATE INDEX IF NOT EXISTS idx_t_state_ffqn ON t_state (ffqn);
197";
198
199    pub const IDX_T_STATE_CREATED_AT: &str = r"
200CREATE INDEX IF NOT EXISTS idx_t_state_created_at ON t_state (created_at);
201";
202    // For `list_deployment_states`
203    pub const IDX_T_STATE_DEPLOYMENT_STATE: &str = r"
204CREATE INDEX IF NOT EXISTS idx_t_state_deployment_state ON t_state (deployment_id, state);
205";
206
207    // T_DELAY
208    pub const CREATE_TABLE_T_DELAY: &str = r"
209CREATE TABLE IF NOT EXISTS t_delay (
210    execution_id TEXT NOT NULL,
211    join_set_id TEXT NOT NULL,
212    delay_id TEXT NOT NULL,
213    expires_at TIMESTAMPTZ NOT NULL,
214    PRIMARY KEY (execution_id, join_set_id, delay_id)
215);
216";
217
218    // Backtrace
219
220    pub const CREATE_TABLE_T_WASM_BACKTRACE: &str = r"
221CREATE TABLE IF NOT EXISTS t_wasm_backtrace (
222    backtrace_hash BYTEA PRIMARY KEY,
223    wasm_backtrace JSONB NOT NULL
224);
225";
226
227    pub const CREATE_TABLE_T_EXECUTION_BACKTRACE: &str = r"
228CREATE TABLE IF NOT EXISTS t_execution_backtrace (
229    execution_id TEXT NOT NULL,
230    component_id JSONB NOT NULL,
231    version_min_including BIGINT NOT NULL CHECK (version_min_including >= 0),
232    version_max_excluding BIGINT NOT NULL CHECK (version_max_excluding >= 0),
233    backtrace_hash BYTEA NOT NULL,
234    PRIMARY KEY (execution_id, version_min_including, version_max_excluding),
235    FOREIGN KEY (backtrace_hash) REFERENCES t_wasm_backtrace(backtrace_hash)
236);
237";
238
239    pub const IDX_T_EXECUTION_BACKTRACE_EXECUTION_ID_VERSION: &str = r"
240CREATE INDEX IF NOT EXISTS idx_t_execution_backtrace_execution_id_version ON t_execution_backtrace (execution_id, version_min_including, version_max_excluding);
241";
242
243    // Logs & Std sterams
244    pub const CREATE_TABLE_T_LOG: &str = r"
245CREATE TABLE IF NOT EXISTS t_log (
246    id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
247    execution_id TEXT NOT NULL,
248    run_id TEXT NOT NULL,
249    created_at TIMESTAMPTZ NOT NULL,
250    level INTEGER,
251    message TEXT,
252    stream_type INTEGER,
253    payload BYTEA
254);
255";
256    pub const IDX_T_LOG_EXECUTION_ID_RUN_ID_CREATED_AT: &str = r"
257CREATE INDEX IF NOT EXISTS idx_t_log_execution_id_run_id_created_at ON t_log (execution_id, run_id, created_at);
258";
259    pub const IDX_T_LOG_EXECUTION_ID_CREATED_AT: &str = r"
260CREATE INDEX IF NOT EXISTS idx_t_log_execution_id_created_at ON t_log (execution_id, created_at);
261";
262}
263
264#[derive(derive_more::Debug, Clone)]
265pub struct PostgresConfig {
266    pub host: String,
267    pub user: String,
268    #[debug(skip)]
269    pub password: String,
270    pub db_name: String,
271}
272
273#[derive(Debug, thiserror::Error)]
274#[error("initialization error")]
275pub struct InitializationError;
276
277async fn create_database(
278    config: &PostgresConfig,
279    provision_policy: ProvisionPolicy,
280) -> Result<DbInitialzationOutcome, InitializationError> {
281    let mut admin_cfg = deadpool_postgres::Config::new();
282    admin_cfg.host = Some(config.host.clone());
283    admin_cfg.user = Some(config.user.clone());
284    admin_cfg.password = Some(config.password.clone());
285    admin_cfg.dbname = Some(ADMIN_DB_NAME.into());
286    admin_cfg.manager = Some(ManagerConfig {
287        recycling_method: RecyclingMethod::Fast,
288    });
289
290    let admin_pool = admin_cfg.create_pool(None, NoTls).map_err(|err| {
291        error!("Cannot create the default pool - {err:?}");
292        InitializationError
293    })?;
294
295    let client = admin_pool.get().await.map_err(|err| {
296        error!("Cannot get a connection from the default pool - {err:?}");
297        InitializationError
298    })?;
299
300    let row = client
301        .query_opt(
302            &format!(
303                "SELECT 1 FROM pg_database WHERE datname = '{}'",
304                config.db_name
305            ),
306            &[],
307        )
308        .await
309        .map_err(|err| {
310            error!("Cannot select from the default database - {err:?}");
311            InitializationError
312        })?;
313
314    match (row, provision_policy) {
315        (None, ProvisionPolicy::MustCreate | ProvisionPolicy::Auto) => {
316            client
317                .execute(&format!("CREATE DATABASE {}", config.db_name), &[])
318                .await
319                .map_err(|err| {
320                    error!("Cannot create the database - {err:?}");
321                    InitializationError
322                })?;
323            info!("Database '{}' created.", config.db_name);
324            Ok(DbInitialzationOutcome::Created)
325        }
326        (Some(_), ProvisionPolicy::Auto) => {
327            info!("Database '{}' exists.", config.db_name);
328            Ok(DbInitialzationOutcome::Existing)
329        }
330        (Some(_), ProvisionPolicy::MustCreate) => {
331            warn!("Database '{}' already exists.", config.db_name);
332            Err(InitializationError)
333        }
334        (_, ProvisionPolicy::NeverCreate) => unreachable!("checked by the caller"),
335    }
336}
337
338// All mutexes here have a very short critical section completely controlled by this module, thus using std mutex.
339type ResponseSubscribers =
340    Arc<std::sync::Mutex<HashMap<ExecutionId, (oneshot::Sender<ResponseWithCursor>, u64)>>>;
341type PendingSubscribers = Arc<std::sync::Mutex<PendingFfqnSubscribersHolder>>;
342type ExecutionFinishedSubscribers = std::sync::Mutex<
343    HashMap<ExecutionId, HashMap<u64, oneshot::Sender<SupportedFunctionReturnValue>>>,
344>;
345
346pub struct PostgresPool {
347    pool: Pool,
348    response_subscribers: ResponseSubscribers,
349    pending_subscribers: PendingSubscribers,
350    execution_finished_subscribers: Arc<ExecutionFinishedSubscribers>,
351    // only for tests.
352    pub config: PostgresConfig,
353}
354
355#[async_trait]
356impl DbPool for PostgresPool {
357    async fn db_exec_conn(&self) -> Result<Box<dyn DbExecutor>, DbErrorGeneric> {
358        let client = self.pool.get().await?;
359
360        Ok(Box::new(PostgresConnection {
361            client: tokio::sync::Mutex::new(client),
362            response_subscribers: self.response_subscribers.clone(),
363            pending_subscribers: self.pending_subscribers.clone(),
364            execution_finished_subscribers: self.execution_finished_subscribers.clone(),
365        }))
366    }
367
368    async fn connection(&self) -> Result<Box<dyn DbConnection>, DbErrorGeneric> {
369        let client = self.pool.get().await?;
370
371        Ok(Box::new(PostgresConnection {
372            client: tokio::sync::Mutex::new(client),
373            response_subscribers: self.response_subscribers.clone(),
374            pending_subscribers: self.pending_subscribers.clone(),
375            execution_finished_subscribers: self.execution_finished_subscribers.clone(),
376        }))
377    }
378
379    async fn external_api_conn(&self) -> Result<Box<dyn DbExternalApi>, DbErrorGeneric> {
380        let client = self.pool.get().await?;
381
382        Ok(Box::new(PostgresConnection {
383            client: tokio::sync::Mutex::new(client),
384            response_subscribers: self.response_subscribers.clone(),
385            pending_subscribers: self.pending_subscribers.clone(),
386            execution_finished_subscribers: self.execution_finished_subscribers.clone(),
387        }))
388    }
389
390    #[cfg(feature = "test")]
391    async fn connection_test(
392        &self,
393    ) -> Result<Box<dyn concepts::storage::DbConnectionTest>, DbErrorGeneric> {
394        let client = self.pool.get().await?;
395
396        Ok(Box::new(PostgresConnection {
397            client: tokio::sync::Mutex::new(client),
398            response_subscribers: self.response_subscribers.clone(),
399            pending_subscribers: self.pending_subscribers.clone(),
400            execution_finished_subscribers: self.execution_finished_subscribers.clone(),
401        }))
402    }
403}
404
405pub struct PostgresConnection {
406    client: tokio::sync::Mutex<Client>, // Callers should not hold onto a connection for too long but it is not controlled by this module, thus tokio mutex.
407    response_subscribers: ResponseSubscribers,
408    pending_subscribers: PendingSubscribers,
409    execution_finished_subscribers: Arc<ExecutionFinishedSubscribers>,
410}
411
412#[derive(Debug, Clone, Copy, PartialEq, Eq)]
413pub enum ProvisionPolicy {
414    NeverCreate,
415    /// Create database if it does not exist.
416    Auto,
417    /// Only for tests: Fail if database already exists.
418    MustCreate,
419}
420
421#[derive(Debug, Clone, Copy, PartialEq, Eq)]
422pub enum DbInitialzationOutcome {
423    Created,
424    Existing,
425}
426
427impl PostgresPool {
428    #[instrument(skip_all, name = "postgres_new")]
429    pub async fn new(
430        config: PostgresConfig,
431        provision_policy: ProvisionPolicy,
432    ) -> Result<PostgresPool, InitializationError> {
433        Self::new_with_outcome(config, provision_policy)
434            .await
435            .map(|(db, _)| db)
436    }
437
438    pub async fn new_with_outcome(
439        config: PostgresConfig,
440        provision_policy: ProvisionPolicy,
441    ) -> Result<(PostgresPool, DbInitialzationOutcome), InitializationError> {
442        let outcome = if matches!(
443            provision_policy,
444            ProvisionPolicy::Auto | ProvisionPolicy::MustCreate
445        ) {
446            create_database(&config, provision_policy).await?
447        } else {
448            DbInitialzationOutcome::Existing
449        };
450        let mut cfg = deadpool_postgres::Config::new();
451        cfg.host = Some(config.host.clone());
452        cfg.user = Some(config.user.clone());
453        cfg.password = Some(config.password.clone());
454        cfg.dbname = Some(config.db_name.clone());
455        cfg.manager = Some(ManagerConfig {
456            recycling_method: RecyclingMethod::Fast,
457        });
458
459        let pool = cfg.create_pool(None, NoTls).map_err(|err| {
460            error!("Cannot create the database pool - {err:?}");
461            InitializationError
462        })?;
463        let client = pool.get().await.map_err(|err| {
464            error!("Cannot get a connection from the database pool - {err:?}");
465            InitializationError
466        })?;
467
468        let statements = vec![
469            ddl::CREATE_TABLE_T_METADATA,
470            ddl::CREATE_TABLE_T_EXECUTION_LOG,
471            ddl::CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_VERSION,
472            ddl::CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_VARIANT,
473            ddl::CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_JOIN_SET,
474            ddl::CREATE_TABLE_T_JOIN_SET_RESPONSE,
475            ddl::CREATE_INDEX_IDX_T_JOIN_SET_RESPONSE_EXECUTION_ID_ID,
476            ddl::CREATE_INDEX_IDX_JOIN_SET_RESPONSE_UNIQUE_CHILD_ID,
477            ddl::CREATE_INDEX_IDX_JOIN_SET_RESPONSE_UNIQUE_DELAY_ID,
478            ddl::CREATE_TABLE_T_STATE,
479            ddl::IDX_T_STATE_LOCK_PENDING_BY_FFQN,
480            ddl::IDX_T_STATE_LOCK_PENDING_BY_COMPONENT,
481            ddl::IDX_T_STATE_EXPIRED_LOCKS,
482            ddl::IDX_T_STATE_EXECUTION_ID_IS_TOP_LEVEL,
483            ddl::IDX_T_STATE_FFQN,
484            ddl::IDX_T_STATE_CREATED_AT,
485            ddl::IDX_T_STATE_DEPLOYMENT_STATE,
486            ddl::CREATE_TABLE_T_DELAY,
487            ddl::CREATE_TABLE_T_WASM_BACKTRACE,
488            ddl::CREATE_TABLE_T_EXECUTION_BACKTRACE,
489            ddl::IDX_T_EXECUTION_BACKTRACE_EXECUTION_ID_VERSION,
490            ddl::CREATE_TABLE_T_LOG,
491            ddl::IDX_T_LOG_EXECUTION_ID_RUN_ID_CREATED_AT,
492            ddl::IDX_T_LOG_EXECUTION_ID_CREATED_AT,
493        ];
494
495        // Combine into one batch execution for atomicity per round-trip (or efficiency)
496        let batch_sql = statements.join("\n");
497        client.batch_execute(&batch_sql).await.map_err(|err| {
498            error!("Cannot run the DDL import - {err:?}");
499            InitializationError
500        })?;
501
502        let row = client
503            .query_opt(
504                "SELECT schema_version FROM t_metadata ORDER BY id DESC LIMIT 1",
505                &[],
506            )
507            .await
508            .map_err(|err| {
509                error!("Cannot select schema version - {err:?}");
510                InitializationError
511            })?;
512
513        // Postgres INTEGER maps to i32
514        let actual_version = match row {
515            Some(r) => Some(r.try_get::<_, i32>("schema_version").map_err(|e| {
516                error!("Failed to get schema_version column: {e}");
517                InitializationError
518            })?),
519            None => None,
520        };
521
522        match actual_version {
523            None => {
524                client
525                    .execute(
526                        "INSERT INTO t_metadata (schema_version, created_at) VALUES ($1, $2)",
527                        &[&(T_METADATA_EXPECTED_SCHEMA_VERSION), &Utc::now()],
528                    )
529                    .await
530                    .map_err(|err| {
531                        error!("Cannot insert schema version - {err:?}");
532                        InitializationError
533                    })?;
534            }
535            Some(actual_version) => {
536                // Fail on unexpected `schema_version`.
537                if (actual_version) != T_METADATA_EXPECTED_SCHEMA_VERSION {
538                    error!(
539                        "Wrong schema version, expected {T_METADATA_EXPECTED_SCHEMA_VERSION}, got {actual_version}"
540                    );
541                    return Err(InitializationError);
542                }
543            }
544        }
545
546        debug!("Database schema initialized.");
547
548        Ok((
549            PostgresPool {
550                pool,
551                execution_finished_subscribers: Arc::default(),
552                pending_subscribers: Arc::default(),
553                response_subscribers: Arc::default(),
554                config,
555            },
556            outcome,
557        ))
558    }
559}
560
561#[track_caller]
562fn consistency_db_err(reason: impl Into<StrVariant>) -> DbErrorGeneric {
563    DbErrorGeneric::Uncategorized {
564        reason: reason.into(),
565        context: SpanTrace::capture(),
566        source: None,
567        loc: Location::caller(),
568    }
569}
570#[track_caller]
571fn consistency_db_err_src(
572    reason: impl Into<StrVariant>,
573    source: Arc<dyn std::error::Error + Send + Sync>,
574) -> DbErrorGeneric {
575    DbErrorGeneric::Uncategorized {
576        reason: reason.into(),
577        context: SpanTrace::capture(),
578        source: Some(source),
579        loc: Location::caller(),
580    }
581}
582
583#[derive(Debug, Clone)]
584struct DelayReq {
585    join_set_id: JoinSetId,
586    delay_id: DelayId,
587    expires_at: DateTime<Utc>,
588}
589
590async fn fetch_created_event(
591    tx: &Transaction<'_>,
592    execution_id: &ExecutionId,
593) -> Result<CreateRequest, DbErrorRead> {
594    let stmt = "SELECT created_at, json_value FROM t_execution_log WHERE \
595                execution_id = $1 AND version = 0";
596
597    let row = tx.query_one(stmt, &[&execution_id.to_string()]).await?;
598
599    let created_at = get(&row, "created_at")?;
600    let event: Json<ExecutionRequest> = get(&row, "json_value")?;
601    let event = event.0;
602
603    if let ExecutionRequest::Created {
604        ffqn,
605        params,
606        parent,
607        scheduled_at,
608        component_id,
609        deployment_id,
610        metadata,
611        scheduled_by,
612    } = event
613    {
614        Ok(CreateRequest {
615            created_at,
616            execution_id: execution_id.clone(),
617            ffqn,
618            params,
619            parent,
620            scheduled_at,
621            component_id,
622            deployment_id,
623            metadata,
624            scheduled_by,
625        })
626    } else {
627        error!("Row with version=0 must be a `Created` event - {event:?}");
628        Err(consistency_db_err("expected `Created` event").into())
629    }
630}
631
632fn check_expected_next_and_appending_version(
633    expected_version: &Version,
634    appending_version: &Version,
635) -> Result<(), DbErrorWrite> {
636    if *expected_version != *appending_version {
637        debug!(
638            "Version conflict - expected: {expected_version:?}, appending: {appending_version:?}"
639        );
640        return Err(DbErrorWrite::NonRetriable(
641            DbErrorWriteNonRetriable::VersionConflict {
642                expected: expected_version.clone(),
643                requested: appending_version.clone(),
644            },
645        ));
646    }
647    Ok(())
648}
649
650#[instrument(level = Level::DEBUG, skip_all, fields(execution_id = %req.execution_id))]
651async fn create_inner(
652    tx: &Transaction<'_>,
653    req: CreateRequest,
654) -> Result<(AppendResponse, AppendNotifier), DbErrorWrite> {
655    trace!("create_inner");
656
657    let version = Version::default();
658    let execution_id = req.execution_id.clone();
659    let execution_id_str = execution_id.to_string();
660    let ffqn = req.ffqn.clone();
661    let created_at = req.created_at;
662    let scheduled_at = req.scheduled_at;
663    let component_id = req.component_id.clone();
664    let deployment_id = req.deployment_id;
665
666    let event = ExecutionRequest::from(req);
667    let event = Json(event);
668
669    tx.execute(
670        "INSERT INTO t_execution_log (
671            execution_id, created_at, version, json_value, variant, join_set_id
672        ) VALUES ($1, $2, $3, $4, $5, $6)",
673        &[
674            &execution_id_str,
675            &created_at,
676            &i64::from(version.0), // BIGINT
677            &event,
678            &event.0.variant(),
679            &event.0.join_set_id().map(std::string::ToString::to_string),
680        ],
681    )
682    .await?;
683
684    let pending_at = {
685        debug!("Creating with `Pending(`{scheduled_at:?}`)");
686
687        tx.execute(
688            r"
689            INSERT INTO t_state (
690                execution_id,
691                is_top_level,
692                corresponding_version,
693                pending_expires_finished,
694                ffqn,
695                state,
696                created_at,
697                component_id_input_digest,
698                component_type,
699                deployment_id,
700                first_scheduled_at,
701                updated_at,
702                intermittent_event_count,
703                is_paused
704            ) VALUES (
705                $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, CURRENT_TIMESTAMP, 0, false
706            )",
707            &[
708                &execution_id_str,
709                &execution_id.is_top_level(),
710                &i64::from(version.0),
711                &scheduled_at,
712                &ffqn.to_string(),
713                &STATE_PENDING_AT,
714                &created_at,
715                &component_id.input_digest.as_slice(),
716                &component_id.component_type.to_string(),
717                &deployment_id.to_string(),
718                &scheduled_at,
719            ],
720        )
721        .await?;
722
723        AppendNotifier {
724            pending_at: Some(NotifierPendingAt {
725                scheduled_at,
726                ffqn,
727                component_input_digest: component_id.input_digest,
728            }),
729            execution_finished: None,
730            response: None,
731        }
732    };
733
734    let next_version = Version::new(version.0 + 1);
735    Ok((next_version, pending_at))
736}
737
738#[instrument(level = Level::DEBUG, skip_all, fields(%execution_id, %scheduled_at))]
739async fn update_state_pending_after_response_appended(
740    tx: &Transaction<'_>,
741    execution_id: &ExecutionId,
742    scheduled_at: DateTime<Utc>, // Changing to state PendingAt
743    component_input_digest: InputContentDigest,
744) -> Result<AppendNotifier, DbErrorWrite> {
745    debug!("Setting t_state to Pending(`{scheduled_at:?}`) after response appended");
746
747    // Convert types for Postgres arguments
748    let execution_id_str = execution_id.to_string();
749
750    let updated = tx
751        .execute(
752            r"
753            UPDATE t_state
754            SET
755                pending_expires_finished = $1,
756                state = $2,
757                updated_at = CURRENT_TIMESTAMP,
758
759                max_retries = NULL,
760                retry_exp_backoff_millis = NULL,
761                last_lock_version = NULL,
762
763                join_set_id = NULL,
764                join_set_closing = NULL,
765
766                result_kind = NULL
767            WHERE execution_id = $3
768            ",
769            &[
770                &scheduled_at,     // $1
771                &STATE_PENDING_AT, // $2
772                &execution_id_str, // $3
773            ],
774        )
775        .await?;
776
777    if updated == 0 {
778        return Err(DbErrorWrite::NotFound);
779    }
780
781    Ok(AppendNotifier {
782        pending_at: Some(NotifierPendingAt {
783            scheduled_at,
784            ffqn: fetch_created_event(tx, execution_id).await?.ffqn,
785            component_input_digest,
786        }),
787        execution_finished: None,
788        response: None,
789    })
790}
791
792#[instrument(level = Level::DEBUG, skip_all, fields(%execution_id, %scheduled_at, %appending_version))]
793async fn update_state_pending_after_event_appended(
794    tx: &Transaction<'_>,
795    execution_id: &ExecutionId,
796    appending_version: &Version,
797    scheduled_at: DateTime<Utc>,
798    intermittent_failure: bool,
799    component_input_digest: InputContentDigest,
800) -> Result<(AppendResponse, AppendNotifier), DbErrorWrite> {
801    debug!("Setting t_state to Pending(`{scheduled_at:?}`) after event appended");
802
803    let intermittent_delta = i64::from(intermittent_failure); // 0 or 1
804
805    let updated = tx
806        .execute(
807            r"
808            UPDATE t_state
809            SET
810                corresponding_version = $1,
811                pending_expires_finished = $2,
812                state = $3,
813                updated_at = CURRENT_TIMESTAMP,
814                intermittent_event_count = intermittent_event_count + $4,
815
816                max_retries = NULL,
817                retry_exp_backoff_millis = NULL,
818                last_lock_version = NULL,
819
820                join_set_id = NULL,
821                join_set_closing = NULL,
822
823                result_kind = NULL
824            WHERE execution_id = $5;
825            ",
826            &[
827                &i64::from(appending_version.0), // $1
828                &scheduled_at,                   // $2
829                &STATE_PENDING_AT,               // $3
830                &intermittent_delta,             // $4
831                &execution_id.to_string(),       // $5
832            ],
833        )
834        .await?;
835
836    if updated != 1 {
837        return Err(DbErrorWrite::NotFound);
838    }
839
840    Ok((
841        appending_version.increment(),
842        AppendNotifier {
843            pending_at: Some(NotifierPendingAt {
844                scheduled_at,
845                ffqn: fetch_created_event(tx, execution_id).await?.ffqn,
846                component_input_digest,
847            }),
848            execution_finished: None,
849            response: None,
850        },
851    ))
852}
853
854#[instrument(level = Level::DEBUG, skip_all, fields(%execution_id, %appending_version))]
855#[expect(clippy::too_many_arguments)]
856async fn update_state_locked_get_intermittent_event_count(
857    tx: &Transaction<'_>,
858    execution_id: &ExecutionId,
859    deployment_id: DeploymentId,
860    component_digest: &InputContentDigest,
861    executor_id: ExecutorId,
862    run_id: RunId,
863    lock_expires_at: DateTime<Utc>,
864    appending_version: &Version,
865    retry_config: ComponentRetryConfig,
866) -> Result<u32, DbErrorWrite> {
867    debug!("Setting t_state to Locked(`{lock_expires_at:?}`)");
868    let backoff_millis =
869        i64::try_from(retry_config.retry_exp_backoff.as_millis()).map_err(|err| {
870            // BIGINT = i64
871            DbErrorGeneric::Uncategorized {
872                reason: "backoff too big".into(),
873                context: SpanTrace::capture(),
874                source: Some(Arc::new(err)),
875                loc: Location::caller(),
876            }
877        })?;
878
879    let execution_id_str = execution_id.to_string();
880
881    let updated = tx
882        .execute(
883            r"
884            UPDATE t_state
885            SET
886                corresponding_version = $1,
887                pending_expires_finished = $2,
888                state = $3,
889                updated_at = CURRENT_TIMESTAMP,
890                deployment_id = $4,
891                component_id_input_digest = $5,
892
893                max_retries = $6,
894                retry_exp_backoff_millis = $7,
895                last_lock_version = $1,
896                executor_id = $8,
897                run_id = $9,
898
899                join_set_id = NULL,
900                join_set_closing = NULL,
901
902                result_kind = NULL
903            WHERE execution_id = $10
904            AND is_paused = false
905            ",
906            &[
907                &i64::from(appending_version.0),
908                &lock_expires_at,
909                &STATE_LOCKED,
910                &deployment_id.to_string(),
911                &component_digest,
912                &retry_config.max_retries.map(i64::from),
913                &backoff_millis,
914                &executor_id.to_string(),
915                &run_id.to_string(),
916                &execution_id_str,
917            ],
918        )
919        .await?;
920
921    if updated != 1 {
922        return Err(DbErrorWrite::NotFound);
923    }
924
925    // fetch intermittent event count
926    let row = tx
927        .query_one(
928            "SELECT intermittent_event_count FROM t_state WHERE execution_id = $1",
929            &[&execution_id_str],
930        )
931        .await
932        .map_err(DbErrorGeneric::from)?;
933
934    let count: i64 = get(&row, "intermittent_event_count")?; // Postgres BIGINT
935    let count = u32::try_from(count)
936        .map_err(|_| consistency_db_err("`intermittent_event_count` must not be negative"))?;
937    Ok(count)
938}
939
940#[instrument(level = Level::DEBUG, skip_all, fields(%execution_id, %appending_version))]
941async fn update_state_blocked(
942    tx: &Transaction<'_>,
943    execution_id: &ExecutionId,
944    appending_version: &Version,
945    join_set_id: &JoinSetId,
946    lock_expires_at: DateTime<Utc>,
947    join_set_closing: bool,
948) -> Result<AppendResponse, DbErrorWrite> {
949    debug!("Setting t_state to BlockedByJoinSet(`{join_set_id}`)");
950
951    let updated = tx
952        .execute(
953            r"
954            UPDATE t_state
955            SET
956                corresponding_version = $1,
957                pending_expires_finished = $2,
958                state = $3,
959                updated_at = CURRENT_TIMESTAMP,
960
961                max_retries = NULL,
962                retry_exp_backoff_millis = NULL,
963                last_lock_version = NULL,
964
965                join_set_id = $4,
966                join_set_closing = $5,
967
968                result_kind = NULL
969            WHERE execution_id = $6
970            ",
971            &[
972                &i64::from(appending_version.0), // $1
973                &lock_expires_at,                // $2
974                &STATE_BLOCKED_BY_JOIN_SET,      // $3
975                &join_set_id.to_string(),        // $4
976                &join_set_closing,               // $5 (BOOLEAN)
977                &execution_id.to_string(),       // $6
978            ],
979        )
980        .await?;
981
982    if updated != 1 {
983        return Err(DbErrorWrite::NotFound);
984    }
985    Ok(appending_version.increment())
986}
987
988#[instrument(level = Level::DEBUG, skip_all, fields(%execution_id, %appending_version))]
989async fn update_state_finished(
990    tx: &Transaction<'_>,
991    execution_id: &ExecutionId,
992    appending_version: &Version,
993    finished_at: DateTime<Utc>,
994    result_kind: PendingStateFinishedResultKind,
995) -> Result<(), DbErrorWrite> {
996    debug!("Setting t_state to Finished");
997
998    let result_kind_json = Json(result_kind);
999
1000    let updated = tx
1001        .execute(
1002            r"
1003            UPDATE t_state
1004            SET
1005                corresponding_version = $1,
1006                pending_expires_finished = $2,
1007                state = $3,
1008                updated_at = CURRENT_TIMESTAMP,
1009
1010                max_retries = NULL,
1011                retry_exp_backoff_millis = NULL,
1012                last_lock_version = NULL,
1013                executor_id = NULL,
1014                run_id = NULL,
1015
1016                join_set_id = NULL,
1017                join_set_closing = NULL,
1018
1019                result_kind = $4
1020            WHERE execution_id = $5
1021            ",
1022            &[
1023                &i64::from(appending_version.0), // $1
1024                &finished_at,                    // $2
1025                &STATE_FINISHED,                 // $3
1026                &result_kind_json,               // $4
1027                &execution_id.to_string(),       // $5
1028            ],
1029        )
1030        .await?;
1031
1032    if updated != 1 {
1033        return Err(DbErrorWrite::NotFound);
1034    }
1035    Ok(())
1036}
1037
1038#[instrument(level = Level::DEBUG, skip_all, fields(%execution_id, %appending_version, %is_paused))]
1039async fn update_state_paused(
1040    tx: &Transaction<'_>,
1041    execution_id: &ExecutionId,
1042    appending_version: &Version,
1043    is_paused: bool,
1044) -> Result<AppendResponse, DbErrorWrite> {
1045    debug!(
1046        "Setting t_state to {}",
1047        if is_paused { "paused" } else { "unpaused" }
1048    );
1049
1050    let updated = tx
1051        .execute(
1052            r"
1053            UPDATE t_state
1054            SET
1055                corresponding_version = $1,
1056                is_paused = $2,
1057                updated_at = CURRENT_TIMESTAMP
1058            WHERE execution_id = $3
1059            ",
1060            &[
1061                &i64::from(appending_version.0), // $1
1062                &is_paused,                      // $2
1063                &execution_id.to_string(),       // $3
1064            ],
1065        )
1066        .await?;
1067
1068    if updated != 1 {
1069        return Err(DbErrorWrite::NotFound);
1070    }
1071    Ok(appending_version.increment())
1072}
1073
1074#[instrument(level = Level::DEBUG, skip_all, fields(%execution_id, %appending_version))]
1075async fn bump_state_next_version(
1076    tx: &Transaction<'_>,
1077    execution_id: &ExecutionId,
1078    appending_version: &Version,
1079    delay_req: Option<DelayReq>,
1080) -> Result<AppendResponse, DbErrorWrite> {
1081    debug!("update_index_version");
1082    let execution_id_str = execution_id.to_string();
1083
1084    let updated = tx
1085        .execute(
1086            r"
1087            UPDATE t_state
1088            SET
1089                corresponding_version = $1,
1090                updated_at = CURRENT_TIMESTAMP
1091            WHERE execution_id = $2
1092            ",
1093            &[
1094                &i64::from(appending_version.0), // $1
1095                &execution_id_str,               // $2
1096            ],
1097        )
1098        .await?;
1099
1100    if updated != 1 {
1101        return Err(DbErrorWrite::NotFound);
1102    }
1103
1104    if let Some(DelayReq {
1105        join_set_id,
1106        delay_id,
1107        expires_at,
1108    }) = delay_req
1109    {
1110        debug!("Inserting delay to `t_delay`");
1111        tx.execute(
1112            "INSERT INTO t_delay (execution_id, join_set_id, delay_id, expires_at) VALUES ($1, $2, $3, $4)",
1113            &[
1114                &execution_id_str,
1115                &join_set_id.to_string(),
1116                &delay_id.to_string(),
1117                &expires_at,
1118            ],
1119        )
1120        .await?;
1121    }
1122    Ok(appending_version.increment())
1123}
1124
1125async fn get_combined_state(
1126    tx: &Transaction<'_>,
1127    execution_id: &ExecutionId,
1128) -> Result<CombinedState, DbErrorRead> {
1129    let row = tx
1130        .query_one(
1131            r"
1132            SELECT
1133                created_at, first_scheduled_at,
1134                state, ffqn, component_id_input_digest, component_type, deployment_id, corresponding_version, pending_expires_finished,
1135                last_lock_version, executor_id, run_id,
1136                join_set_id, join_set_closing,
1137                result_kind, is_paused
1138            FROM t_state
1139            WHERE execution_id = $1
1140            ",
1141            &[&execution_id.to_string()],
1142        )
1143        .await
1144        .map_err(DbErrorRead::from)?;
1145
1146    // Parsing columns
1147
1148    let created_at: DateTime<Utc> = get(&row, "created_at")?;
1149    let first_scheduled_at: DateTime<Utc> = get(&row, "first_scheduled_at")?;
1150
1151    let digest_bytes: Vec<u8> = get(&row, "component_id_input_digest")?;
1152    let digest = Digest::try_from(digest_bytes.as_slice()).map_err(|err| {
1153        consistency_db_err_src("cannot parse `component_id_input_digest`", Arc::from(err))
1154    })?;
1155    let component_digest = InputContentDigest(ContentDigest(digest));
1156
1157    let component_type: String = get(&row, "component_type")?;
1158    let component_type = ComponentType::from_str(&component_type)
1159        .map_err(|err| consistency_db_err_src("cannot parse `component_type`", Arc::from(err)))?;
1160
1161    let deployment_id: String = get(&row, "deployment_id")?;
1162    let deployment_id = DeploymentId::from_str(&deployment_id).map_err(DbErrorGeneric::from)?;
1163
1164    let state: String = get(&row, "state")?;
1165    let ffqn: String = get(&row, "ffqn")?;
1166    let ffqn = FunctionFqn::from_str(&ffqn).map_err(|parse_err| {
1167        consistency_db_err(format!("invalid ffqn value in `t_state` - {parse_err}"))
1168    })?;
1169
1170    let pending_expires_finished: DateTime<Utc> = get(&row, "pending_expires_finished")?;
1171
1172    let last_lock_version_raw: Option<i64> = get(&row, "last_lock_version")?;
1173    let last_lock_version = last_lock_version_raw
1174        .map(Version::try_from)
1175        .transpose()
1176        .map_err(|_| consistency_db_err("version must be non-negative"))?;
1177
1178    let executor_id_raw: Option<String> = get(&row, "executor_id")?;
1179    let executor_id = executor_id_raw
1180        .map(|id| ExecutorId::from_str(&id))
1181        .transpose()
1182        .map_err(DbErrorGeneric::from)?;
1183
1184    let run_id_raw: Option<String> = get(&row, "run_id")?;
1185    let run_id = run_id_raw
1186        .map(|id| RunId::from_str(&id))
1187        .transpose()
1188        .map_err(DbErrorGeneric::from)?;
1189
1190    let join_set_id_raw: Option<String> = get(&row, "join_set_id")?;
1191    let join_set_id = join_set_id_raw
1192        .map(|id| JoinSetId::from_str(&id))
1193        .transpose()
1194        .map_err(DbErrorGeneric::from)?;
1195
1196    let join_set_closing: Option<bool> = get(&row, "join_set_closing")?;
1197
1198    let result_kind: Option<Json<PendingStateFinishedResultKind>> = get(&row, "result_kind")?;
1199    let result_kind = result_kind.map(|it| it.0);
1200
1201    let is_paused: bool = get(&row, "is_paused")?;
1202
1203    let corresponding_version: i64 = get(&row, "corresponding_version")?;
1204    let corresponding_version = Version::new(
1205        VersionType::try_from(corresponding_version)
1206            .map_err(|_| consistency_db_err("version must be non-negative"))?,
1207    );
1208
1209    let dto = CombinedStateDTO {
1210        execution_id: execution_id.clone(),
1211        created_at,
1212        first_scheduled_at,
1213        state,
1214        ffqn,
1215        component_digest,
1216        component_type,
1217        deployment_id,
1218        pending_expires_finished,
1219        last_lock_version,
1220        executor_id,
1221        run_id,
1222        join_set_id,
1223        join_set_closing,
1224        result_kind,
1225        is_paused,
1226    };
1227    CombinedState::new(dto, corresponding_version).map_err(DbErrorRead::from)
1228}
1229
1230async fn list_executions(
1231    read_tx: &Transaction<'_>,
1232    filter: ListExecutionsFilter,
1233    pagination: &ExecutionListPagination,
1234) -> Result<Vec<ExecutionWithState>, DbErrorGeneric> {
1235    // Helper to manage dynamic WHERE clauses and positional parameters ($1, $2...)
1236    struct QueryBuilder {
1237        where_clauses: Vec<String>,
1238        params: Vec<Box<dyn ToSql + Send + Sync>>,
1239    }
1240
1241    impl QueryBuilder {
1242        fn new() -> Self {
1243            Self {
1244                where_clauses: Vec::new(),
1245                params: Vec::new(),
1246            }
1247        }
1248
1249        fn add_param<T>(&mut self, param: T) -> String
1250        where
1251            T: ToSql + Sync + Send + 'static,
1252        {
1253            self.params.push(Box::new(param));
1254            format!("${}", self.params.len())
1255        }
1256
1257        fn add_where(&mut self, clause: String) {
1258            self.where_clauses.push(clause);
1259        }
1260    }
1261
1262    let mut qb = QueryBuilder::new();
1263
1264    // Pagination Logic
1265    let (limit, limit_desc) = match pagination {
1266        ExecutionListPagination::CreatedBy(p) => {
1267            let limit = p.length();
1268            let is_desc = p.is_desc();
1269            if let Some(cursor) = p.cursor() {
1270                let placeholder = qb.add_param(*cursor);
1271                qb.add_where(format!("created_at {} {placeholder}", p.rel()));
1272            }
1273            (limit, is_desc)
1274        }
1275        ExecutionListPagination::ExecutionId(p) => {
1276            let limit = p.length();
1277            let is_desc = p.is_desc();
1278            if let Some(cursor) = p.cursor() {
1279                let placeholder = qb.add_param(cursor.to_string());
1280                qb.add_where(format!("execution_id {} {placeholder}", p.rel()));
1281            }
1282            (limit, is_desc)
1283        }
1284    };
1285
1286    if !filter.show_derived {
1287        qb.add_where("is_top_level = true".to_string());
1288    }
1289    let like = |str| format!("{str}%");
1290    if let Some(ffqn_prefix) = filter.ffqn_prefix {
1291        let placeholder = qb.add_param(like(ffqn_prefix));
1292        qb.add_where(format!("ffqn LIKE {placeholder}"));
1293    }
1294    if filter.hide_finished {
1295        qb.add_where(format!("state != '{STATE_FINISHED}'"));
1296    }
1297    if let Some(prefix) = filter.execution_id_prefix {
1298        let placeholder = qb.add_param(like(prefix));
1299        qb.add_where(format!("execution_id LIKE {placeholder}"));
1300    }
1301    if let Some(component_digest) = filter.component_digest {
1302        let placeholder = qb.add_param(component_digest);
1303        qb.add_where(format!("component_id_input_digest = {placeholder}"));
1304    }
1305    if let Some(deployment_id) = filter.deployment_id {
1306        let placeholder = qb.add_param(deployment_id.to_string());
1307        qb.add_where(format!("deployment_id = {placeholder}"));
1308    }
1309
1310    let where_str = if qb.where_clauses.is_empty() {
1311        String::new()
1312    } else {
1313        format!("WHERE {}", qb.where_clauses.join(" AND "))
1314    };
1315
1316    let order_col = match pagination {
1317        ExecutionListPagination::CreatedBy(_) => "created_at",
1318        ExecutionListPagination::ExecutionId(_) => "execution_id",
1319    };
1320
1321    // Inner query: fetch rows with cursor-based ordering
1322    // Outer query: always return results in descending order
1323    let (inner_order, outer_order) = if limit_desc {
1324        ("DESC", "")
1325    } else {
1326        ("", "DESC")
1327    };
1328
1329    let inner_sql = format!(
1330        r"SELECT created_at, first_scheduled_at, component_id_input_digest, deployment_id,
1331            state, execution_id, ffqn, corresponding_version, pending_expires_finished,
1332            last_lock_version, executor_id, run_id,
1333            join_set_id, join_set_closing,
1334            result_kind, is_paused
1335            FROM t_state {where_str} ORDER BY {order_col} {inner_order} LIMIT {limit}"
1336    );
1337
1338    let sql = if outer_order.is_empty() {
1339        inner_sql
1340    } else {
1341        format!("SELECT * FROM ({inner_sql}) AS sub ORDER BY {order_col} {outer_order}")
1342    };
1343
1344    let params_refs: Vec<&(dyn ToSql + Sync)> = qb
1345        .params
1346        .iter()
1347        .map(|p| p.as_ref() as &(dyn ToSql + Sync))
1348        .collect();
1349
1350    let rows = read_tx.query(&sql, &params_refs).await?;
1351
1352    let mut vec = Vec::with_capacity(rows.len());
1353
1354    for row in rows {
1355        // If parsing of the row fails, log and skip it.
1356        let unpack = || -> Result<ExecutionWithState, DbErrorGeneric> {
1357            let execution_id_str: String = get(&row, "execution_id")?;
1358            let execution_id = ExecutionId::from_str(&execution_id_str)
1359                .map_err(|err| consistency_db_err(err.to_string()))?;
1360
1361            let digest_bytes: Vec<u8> = get(&row, "component_id_input_digest")?;
1362            let digest = Digest::try_from(digest_bytes.as_slice()).map_err(|err| {
1363                consistency_db_err_src("cannot parse `component_id_input_digest`", Arc::from(err))
1364            })?;
1365            let component_digest = InputContentDigest(ContentDigest(digest));
1366
1367            let component_type: String = get(&row, "component_type")?;
1368            let component_type = ComponentType::from_str(&component_type).map_err(|err| {
1369                consistency_db_err_src("cannot parse `component_type`", Arc::from(err))
1370            })?;
1371
1372            let deployment_id: String = get(&row, "deployment_id")?;
1373            let deployment_id =
1374                DeploymentId::from_str(&deployment_id).map_err(DbErrorGeneric::from)?;
1375
1376            let created_at: DateTime<Utc> = get(&row, "created_at")?;
1377            let first_scheduled_at: DateTime<Utc> = get(&row, "first_scheduled_at")?;
1378
1379            let result_kind: Option<Json<PendingStateFinishedResultKind>> =
1380                get(&row, "result_kind")?;
1381            let result_kind = result_kind.map(|it| it.0);
1382
1383            let is_paused: bool = get(&row, "is_paused")?;
1384
1385            let corresponding_version: i64 = get(&row, "corresponding_version")?;
1386            let corresponding_version = Version::try_from(corresponding_version)
1387                .map_err(|_| consistency_db_err("version must be non-negative"))?;
1388
1389            let executor_id_str: Option<String> = get(&row, "executor_id")?;
1390            let executor_id = executor_id_str
1391                .map(|id| ExecutorId::from_str(&id))
1392                .transpose()?;
1393
1394            let last_lock_version_raw: Option<i64> = get(&row, "last_lock_version")?;
1395            let last_lock_version = last_lock_version_raw
1396                .map(Version::try_from)
1397                .transpose()
1398                .map_err(|_| consistency_db_err("version must be non-negative"))?;
1399
1400            let run_id_str: Option<String> = get(&row, "run_id")?;
1401            let run_id = run_id_str.map(|id| RunId::from_str(&id)).transpose()?;
1402
1403            let join_set_id_str: Option<String> = get(&row, "join_set_id")?;
1404            let join_set_id = join_set_id_str
1405                .map(|id| JoinSetId::from_str(&id))
1406                .transpose()?;
1407
1408            let ffqn: String = get(&row, "ffqn")?;
1409            let ffqn = FunctionFqn::from_str(&ffqn).map_err(|parse_err| {
1410                error!("Error parsing ffqn - {parse_err:?}");
1411                consistency_db_err("invalid ffqn value in `t_state`")
1412            })?;
1413
1414            let combined_state_dto = CombinedStateDTO {
1415                execution_id,
1416                created_at,
1417                first_scheduled_at,
1418                component_digest,
1419                component_type,
1420                deployment_id,
1421                state: get(&row, "state")?,
1422                ffqn,
1423                pending_expires_finished: get(&row, "pending_expires_finished")?,
1424                executor_id,
1425                last_lock_version,
1426                run_id,
1427                join_set_id,
1428                join_set_closing: get(&row, "join_set_closing")?,
1429                result_kind,
1430                is_paused,
1431            };
1432
1433            let combined_state = CombinedState::new(combined_state_dto, corresponding_version)?;
1434
1435            Ok(combined_state.execution_with_state)
1436        };
1437
1438        match unpack() {
1439            Ok(execution) => vec.push(execution),
1440            Err(err) => {
1441                warn!("Skipping corrupted row in t_state: {err:?}");
1442            }
1443        }
1444    }
1445
1446    Ok(vec)
1447}
1448
1449async fn list_responses(
1450    tx: &Transaction<'_>,
1451    execution_id: &ExecutionId,
1452    pagination: Option<Pagination<u32>>,
1453) -> Result<Vec<ResponseWithCursor>, DbErrorRead> {
1454    // Helper to manage params dynamically
1455    let mut params: Vec<Box<dyn ToSql + Send + Sync>> = Vec::new();
1456    let mut add_param = |p: Box<dyn ToSql + Send + Sync>| {
1457        params.push(p);
1458        format!("${}", params.len())
1459    };
1460
1461    // 1. Base Query
1462    let p_execution_id = add_param(Box::new(execution_id.to_string()));
1463
1464    let mut sql = format!(
1465        "SELECT \
1466            r.id, r.created_at, r.join_set_id, r.delay_id, r.delay_success, r.child_execution_id, r.finished_version, l.json_value \
1467            FROM t_join_set_response r LEFT OUTER JOIN t_execution_log l ON r.child_execution_id = l.execution_id \
1468            WHERE \
1469            r.execution_id = {p_execution_id} \
1470            AND ( r.finished_version = l.version OR r.child_execution_id IS NULL )"
1471    );
1472
1473    // 2. Pagination Logic
1474    let limit = match &pagination {
1475        Some(p @ (Pagination::NewerThan { cursor, .. } | Pagination::OlderThan { cursor, .. })) => {
1476            // Postgres BIGINT is i64.
1477            let p_cursor = add_param(Box::new(i64::from(*cursor)));
1478
1479            // Add WHERE clause for cursor
1480            write!(sql, " AND r.id {} {}", p.rel(), p_cursor).unwrap();
1481
1482            Some(p.length())
1483        }
1484        None => None,
1485    };
1486
1487    // 3. Ordering
1488    sql.push_str(" ORDER BY r.id");
1489    let is_desc = pagination.as_ref().is_some_and(Pagination::is_desc);
1490    if is_desc {
1491        sql.push_str(" DESC");
1492    }
1493
1494    // 4. Limit
1495    if let Some(limit) = limit {
1496        // Postgres limit expects i64
1497        let p_limit = add_param(Box::new(i64::from(limit)));
1498        write!(sql, " LIMIT {p_limit}").unwrap();
1499    }
1500
1501    // Re-order to ascending for consistent oldest-to-newest results
1502    if is_desc {
1503        sql = format!("SELECT * FROM ({sql}) AS sub ORDER BY id ASC");
1504    }
1505
1506    let params_refs: Vec<&(dyn ToSql + Sync)> = params
1507        .iter()
1508        .map(|p| p.as_ref() as &(dyn ToSql + Sync))
1509        .collect();
1510
1511    let rows = tx
1512        .query(&sql, &params_refs)
1513        .await
1514        .map_err(DbErrorRead::from)?;
1515
1516    let mut results = Vec::with_capacity(rows.len());
1517    for row in rows {
1518        results.push(parse_response_with_cursor(&row)?);
1519    }
1520
1521    Ok(results)
1522}
1523
1524async fn list_logs_tx(
1525    tx: &Transaction<'_>,
1526    execution_id: &ExecutionId,
1527    filter: &LogFilter,
1528    pagination: &Pagination<u32>,
1529) -> Result<ListLogsResponse, DbErrorRead> {
1530    let mut param_index = 1;
1531    let mut query = format!(
1532        "SELECT id, run_id, created_at, level, message, stream_type, payload
1533         FROM t_log
1534         WHERE execution_id = ${param_index}",
1535    );
1536    param_index += 1;
1537
1538    let execution_id = execution_id.to_string();
1539    let mut params: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = vec![&execution_id];
1540
1541    // Logs and streams filter
1542    let level_filter = if filter.should_show_logs() {
1543        let levels_str = if !filter.levels().is_empty() {
1544            filter
1545                .levels()
1546                .iter()
1547                .map(|lvl| (*lvl as u8).to_string())
1548                .collect::<Vec<_>>()
1549                .join(",")
1550        } else {
1551            LogLevel::iter()
1552                .map(|lvl| (lvl as u8).to_string())
1553                .collect::<Vec<_>>()
1554                .join(",")
1555        };
1556        Some(format!(" level IN ({levels_str})"))
1557    } else {
1558        None
1559    };
1560    let stream_filter = if filter.should_show_streams() {
1561        let streams_str = if !filter.stream_types().is_empty() {
1562            filter
1563                .stream_types()
1564                .iter()
1565                .map(|st| (*st as u8).to_string())
1566                .collect::<Vec<_>>()
1567                .join(",")
1568        } else {
1569            LogStreamType::iter()
1570                .map(|st| (st as u8).to_string())
1571                .collect::<Vec<_>>()
1572                .join(",")
1573        };
1574        Some(format!(" stream_type IN ({streams_str})"))
1575    } else {
1576        None
1577    };
1578    match (level_filter, stream_filter) {
1579        (Some(level_filter), Some(stream_filter)) => {
1580            write!(&mut query, " AND ({level_filter} OR {stream_filter})")
1581                .expect("writing to string");
1582        }
1583        (Some(level_filter), None) => {
1584            write!(&mut query, " AND {level_filter}").expect("writing to string");
1585        }
1586        (None, Some(stream_filter)) => {
1587            write!(&mut query, " AND {stream_filter}").expect("writing to string");
1588        }
1589        (None, None) => unreachable!("guarded by constructor"),
1590    }
1591
1592    // Pagination
1593    write!(&mut query, " AND id {} ${param_index}", pagination.rel()).expect("writing to string");
1594    let cursor_val: i64 = (*pagination.cursor()).into();
1595    params.push(&cursor_val);
1596    param_index += 1;
1597
1598    // Ordering and limit
1599    write!(
1600        &mut query,
1601        " ORDER BY id {} LIMIT ${}",
1602        if pagination.is_desc() { "DESC" } else { "ASC" },
1603        param_index
1604    )
1605    .expect("writing to string");
1606    let length_val: i64 = i64::from(pagination.length());
1607    params.push(&length_val);
1608
1609    let rows = tx.query(&query, &params[..]).await?;
1610
1611    let mut items = Vec::with_capacity(rows.len());
1612
1613    for row in rows {
1614        let cursor = u32::try_from(get::<i64, _>(&row, "id")?)
1615            .map_err(|_| consistency_db_err("t_join_set_response.id must not be negative"))?;
1616        let created_at: chrono::DateTime<chrono::Utc> = get(&row, "created_at")?;
1617        let run_id: String = get(&row, "run_id")?;
1618        let run_id = RunId::from_str(&run_id).map_err(|parse_err| {
1619            consistency_db_err_src(
1620                format!("cannot convert RunId {run_id}, id: {cursor}"),
1621                Arc::from(parse_err),
1622            )
1623        })?;
1624
1625        let level: Option<i32> = get(&row, "level")?;
1626        let message: Option<String> = get(&row, "message")?;
1627        let stream_type: Option<i32> = get(&row, "stream_type")?;
1628        let payload: Option<Vec<u8>> = get(&row, "payload")?;
1629
1630        let log_entry = match (level, message, stream_type, payload) {
1631            (Some(lvl), Some(msg), None, None) => {
1632                let map_err = |err| {
1633                    consistency_db_err_src(
1634                        format!("cannot convert {lvl} to LogLevel , id: {cursor}"),
1635                        err,
1636                    )
1637                };
1638                LogEntry::Log {
1639                    created_at,
1640                    level: u8::try_from(lvl)
1641                        .map(|lvl| LogLevel::try_from(lvl).map_err(|err| map_err(Arc::from(err))))
1642                        .map_err(|err| map_err(Arc::from(err)))??,
1643                    message: msg,
1644                }
1645            }
1646            (None, None, Some(stype), Some(pl)) => {
1647                let map_err = |err| {
1648                    consistency_db_err_src(
1649                        format!("cannot convert {stype} to LogStreamType , id: {cursor}"),
1650                        err,
1651                    )
1652                };
1653                LogEntry::Stream {
1654                    created_at,
1655                    stream_type: u8::try_from(stype)
1656                        .map(|stype| {
1657                            LogStreamType::try_from(stype).map_err(|err| map_err(Arc::from(err)))
1658                        })
1659                        .map_err(|err| map_err(Arc::from(err)))??,
1660                    payload: pl,
1661                }
1662            }
1663            _ => {
1664                return Err(consistency_db_err(format!("invalid t_log row id:{cursor}")).into());
1665            }
1666        };
1667
1668        items.push(LogEntryRow {
1669            cursor,
1670            run_id,
1671            log_entry,
1672        });
1673    }
1674
1675    Ok(ListLogsResponse {
1676        next_page: items
1677            .last()
1678            .map(|item| Pagination::NewerThan {
1679                length: pagination.length(),
1680                cursor: item.cursor,
1681                including_cursor: false,
1682            })
1683            .unwrap_or(if pagination.is_asc() {
1684                *pagination // no new results, keep the same cursor
1685            } else {
1686                // no prev results, let's start from beginning
1687                Pagination::NewerThan {
1688                    length: pagination.length(),
1689                    cursor: 0,
1690                    including_cursor: false, // does not matter, no row has id = 0
1691                }
1692            }),
1693        prev_page: match items.first() {
1694            Some(item) => Some(Pagination::OlderThan {
1695                length: pagination.length(),
1696                cursor: item.cursor,
1697                including_cursor: false,
1698            }),
1699            None if pagination.is_asc() && *pagination.cursor() > 0 => {
1700                // asked for a next page that does not exists (yet).
1701                Some(pagination.invert())
1702            }
1703            None => None,
1704        },
1705        items,
1706    })
1707}
1708
1709async fn list_deployment_states(
1710    tx: &Transaction<'_>,
1711    current_time: DateTime<Utc>,
1712    pagination: Pagination<Option<DeploymentId>>,
1713) -> Result<Vec<DeploymentState>, DbErrorRead> {
1714    // Helper for numbered params ($1, $2, ...)
1715    let mut params: Vec<Box<dyn ToSql + Send + Sync>> = Vec::new();
1716    let mut add_param = |p: Box<dyn ToSql + Send + Sync>| {
1717        params.push(p);
1718        format!("${}", params.len())
1719    };
1720
1721    // Base params
1722    let p_now = add_param(Box::new(current_time));
1723
1724    let mut sql = format!(
1725        "
1726        SELECT
1727            deployment_id,
1728
1729            COUNT(*) FILTER (WHERE state = '{STATE_LOCKED}') AS locked,
1730
1731            COUNT(*) FILTER (
1732                WHERE state = '{STATE_PENDING_AT}'
1733                  AND pending_expires_finished <= {p_now}
1734            ) AS pending,
1735
1736            COUNT(*) FILTER (
1737                WHERE state = '{STATE_PENDING_AT}'
1738                  AND pending_expires_finished > {p_now}
1739            ) AS scheduled,
1740
1741            COUNT(*) FILTER (WHERE state = '{STATE_BLOCKED_BY_JOIN_SET}') AS blocked,
1742
1743            COUNT(*) FILTER (WHERE state = '{STATE_FINISHED}') AS finished
1744        FROM t_state"
1745    );
1746
1747    // Pagination
1748    if let Some(cursor) = pagination.cursor() {
1749        let p_cursor = add_param(Box::new(cursor.to_string()));
1750        write!(
1751            sql,
1752            " WHERE deployment_id {rel} {p_cursor}",
1753            rel = pagination.rel()
1754        )
1755        .expect("writing to string");
1756    }
1757
1758    // Grouping + ordering
1759    // Inner query: fetch rows with cursor-based ordering
1760    // Outer query: always return results in descending order
1761    let (inner_order, outer_order) = if pagination.is_desc() {
1762        ("DESC", "")
1763    } else {
1764        ("ASC", "DESC")
1765    };
1766
1767    write!(
1768        sql,
1769        " GROUP BY deployment_id ORDER BY deployment_id {inner_order} LIMIT {}",
1770        pagination.length()
1771    )
1772    .expect("writing to string");
1773
1774    let final_sql = if outer_order.is_empty() {
1775        sql
1776    } else {
1777        format!("SELECT * FROM ({sql}) AS sub ORDER BY deployment_id {outer_order}")
1778    };
1779
1780    let params_refs: Vec<&(dyn ToSql + Sync)> = params
1781        .iter()
1782        .map(|p| p.as_ref() as &(dyn ToSql + Sync))
1783        .collect();
1784
1785    let rows = tx
1786        .query(&final_sql, &params_refs)
1787        .await
1788        .map_err(DbErrorRead::from)?;
1789
1790    let mut result = Vec::with_capacity(rows.len());
1791    for row in rows {
1792        let deployment_id: String = get(&row, "deployment_id")?;
1793        result.push(DeploymentState {
1794            deployment_id: DeploymentId::from_str(&deployment_id).map_err(DbErrorGeneric::from)?,
1795            locked: u32::try_from(get::<i64, _>(&row, "locked")?).expect("count is never negative"),
1796            pending: u32::try_from(get::<i64, _>(&row, "pending")?)
1797                .expect("count is never negative"),
1798            scheduled: u32::try_from(get::<i64, _>(&row, "scheduled")?)
1799                .expect("count is never negative"),
1800            blocked: u32::try_from(get::<i64, _>(&row, "blocked")?)
1801                .expect("count is never negative"),
1802            finished: u32::try_from(get::<i64, _>(&row, "finished")?)
1803                .expect("count is never negative"),
1804        });
1805    }
1806
1807    Ok(result)
1808}
1809
1810fn parse_response_with_cursor(
1811    row: &tokio_postgres::Row,
1812) -> Result<ResponseWithCursor, DbErrorRead> {
1813    // Postgres BIGINT = i64.
1814    let id = u32::try_from(get::<i64, _>(row, "id")?)
1815        .map_err(|_| consistency_db_err("id must not be negative"))?;
1816
1817    let created_at: DateTime<Utc> = get(row, "created_at")?;
1818    let join_set_id_str: String = get(row, "join_set_id")?;
1819    let join_set_id = JoinSetId::from_str(&join_set_id_str).map_err(DbErrorGeneric::from)?;
1820
1821    // Extract Optionals
1822    let delay_id: Option<String> = get(row, "delay_id")?;
1823    let delay_id = delay_id
1824        .map(|id| DelayId::from_str(&id))
1825        .transpose()
1826        .map_err(DbErrorGeneric::from)?;
1827    let delay_success: Option<bool> = get(row, "delay_success")?;
1828    let child_execution_id: Option<String> = get(row, "child_execution_id")?;
1829    let child_execution_id = child_execution_id
1830        .map(|id| ExecutionIdDerived::from_str(&id))
1831        .transpose()
1832        .map_err(DbErrorGeneric::from)?;
1833    let finished_version = get::<Option<i64>, _>(row, "finished_version")?
1834        .map(Version::try_from)
1835        .transpose()
1836        .map_err(|_| consistency_db_err("version must be non-negative"))?;
1837    let json_value: Option<Json<ExecutionRequest>> = get(row, "json_value")?;
1838    let json_value = json_value.map(|it| it.0);
1839
1840    let event = match (
1841        delay_id,
1842        delay_success,
1843        child_execution_id,
1844        finished_version,
1845        json_value,
1846    ) {
1847        (Some(delay_id), Some(delay_success), None, None, None) => JoinSetResponse::DelayFinished {
1848            delay_id,
1849            result: delay_success.then_some(()).ok_or(()),
1850        },
1851        (None, None, Some(child_execution_id), Some(finished_version), Some(json_val)) => {
1852            if let ExecutionRequest::Finished { result, .. } = json_val {
1853                JoinSetResponse::ChildExecutionFinished {
1854                    child_execution_id,
1855                    finished_version,
1856                    result,
1857                }
1858            } else {
1859                error!("Joined log entry must be 'Finished'");
1860                return Err(consistency_db_err("joined log entry must be 'Finished'").into());
1861            }
1862        }
1863        (delay, delay_success, child, finished, result) => {
1864            error!(
1865                "Invalid row in t_join_set_response {id} - {delay:?} {delay_success:?} {child:?} {finished:?} {result:?}",
1866            );
1867            return Err(consistency_db_err("invalid row in t_join_set_response").into());
1868        }
1869    };
1870
1871    Ok(ResponseWithCursor {
1872        cursor: ResponseCursor(id),
1873        event: JoinSetResponseEventOuter {
1874            event: JoinSetResponseEvent { join_set_id, event },
1875            created_at,
1876        },
1877    })
1878}
1879
1880#[instrument(level = Level::TRACE, skip_all, fields(%execution_id, %run_id, %executor_id))]
1881#[expect(clippy::too_many_arguments)]
1882async fn lock_single_execution(
1883    tx: &Transaction<'_>,
1884    created_at: DateTime<Utc>,
1885    component_id: &ComponentId,
1886    deployment_id: DeploymentId,
1887    execution_id: &ExecutionId,
1888    run_id: RunId,
1889    appending_version: &Version,
1890    executor_id: ExecutorId,
1891    lock_expires_at: DateTime<Utc>,
1892    retry_config: ComponentRetryConfig,
1893) -> Result<LockedExecution, DbErrorWrite> {
1894    debug!("lock_single_execution");
1895
1896    // Check State
1897    let combined_state = get_combined_state(tx, execution_id).await?;
1898    combined_state
1899        .execution_with_state
1900        .pending_state
1901        .can_append_lock(created_at, executor_id, run_id, lock_expires_at)?;
1902    let expected_version = combined_state.get_next_version_assert_not_finished();
1903    check_expected_next_and_appending_version(&expected_version, appending_version)?;
1904
1905    // Prepare Event
1906    let locked_event = Locked {
1907        component_id: component_id.clone(),
1908        deployment_id,
1909        executor_id,
1910        lock_expires_at,
1911        run_id,
1912        retry_config,
1913    };
1914    let event = ExecutionRequest::Locked(locked_event.clone());
1915
1916    let event = Json(event);
1917
1918    // Append to execution_log
1919    tx.execute(
1920        "INSERT INTO t_execution_log \
1921            (execution_id, created_at, json_value, version, variant) \
1922            VALUES ($1, $2, $3, $4, $5)",
1923        &[
1924            &execution_id.to_string(),
1925            &created_at,
1926            &event,
1927            &i64::from(appending_version.0),
1928            &event.0.variant(),
1929        ],
1930    )
1931    .await
1932    .map_err(|err| {
1933        DbErrorWrite::NonRetriable(DbErrorWriteNonRetriable::IllegalState {
1934            reason: "cannot lock".into(),
1935            context: SpanTrace::capture(),
1936            source: Some(Arc::new(err)),
1937            loc: Location::caller(),
1938        })
1939    })?;
1940
1941    let responses = list_responses(tx, execution_id, None).await?;
1942    trace!("Responses: {responses:?}");
1943
1944    // Update t_state
1945    let intermittent_event_count = update_state_locked_get_intermittent_event_count(
1946        tx,
1947        execution_id,
1948        deployment_id,
1949        &component_id.input_digest,
1950        executor_id,
1951        run_id,
1952        lock_expires_at,
1953        appending_version,
1954        retry_config,
1955    )
1956    .await?;
1957
1958    // Fetch History
1959    // Fetch event_history and `Created` event.
1960    let rows = tx
1961        .query(
1962            "SELECT json_value, version FROM t_execution_log WHERE \
1963                execution_id = $1 AND (variant = $2 OR variant = $3) \
1964                ORDER BY version",
1965            &[
1966                &execution_id.to_string(),
1967                &DUMMY_CREATED.variant(),
1968                &DUMMY_HISTORY_EVENT.variant(),
1969            ],
1970        )
1971        .await
1972        .map_err(DbErrorGeneric::from)?;
1973
1974    let mut events: VecDeque<ExecutionEvent> = VecDeque::new();
1975
1976    for row in rows {
1977        let event: Json<ExecutionRequest> = get(&row, "json_value")?;
1978        let event = event.0;
1979
1980        let version: i64 = get(&row, "version")?;
1981        let version = Version::try_from(version)
1982            .map_err(|_| consistency_db_err("version must be non-negative"))?;
1983
1984        events.push_back(ExecutionEvent {
1985            created_at: DateTime::from_timestamp_nanos(0), // not used, only the inner event and version
1986            event,
1987            backtrace_id: None,
1988            version,
1989        });
1990    }
1991
1992    // Extract Created Event
1993    let Some(ExecutionRequest::Created {
1994        ffqn,
1995        params,
1996        parent,
1997        metadata,
1998        ..
1999    }) = events.pop_front().map(|outer| outer.event)
2000    else {
2001        error!("Execution log must contain at least `Created` event");
2002        return Err(consistency_db_err("execution log must contain `Created` event").into());
2003    };
2004
2005    // Extract History Events
2006    let mut event_history = Vec::new();
2007    for ExecutionEvent { event, version, .. } in events {
2008        if let ExecutionRequest::HistoryEvent { event } = event {
2009            event_history.push((event, version));
2010        } else {
2011            error!("Rows can only contain `Created` and `HistoryEvent` event kinds");
2012            return Err(consistency_db_err(
2013                "rows can only contain `Created` and `HistoryEvent` event kinds",
2014            )
2015            .into());
2016        }
2017    }
2018
2019    Ok(LockedExecution {
2020        execution_id: execution_id.clone(),
2021        metadata,
2022        next_version: appending_version.increment(),
2023        ffqn,
2024        params,
2025        event_history,
2026        responses,
2027        parent,
2028        intermittent_event_count,
2029        locked_event,
2030    })
2031}
2032
2033async fn count_join_next(
2034    tx: &Transaction<'_>,
2035    execution_id: &ExecutionId,
2036    join_set_id: &JoinSetId,
2037) -> Result<u32, DbErrorRead> {
2038    let row = tx
2039            .query_one(
2040                "SELECT COUNT(*) as count FROM t_execution_log WHERE execution_id = $1 AND join_set_id = $2 \
2041                AND history_event_type = $3",
2042                &[
2043                    &execution_id.to_string(),
2044                    &join_set_id.to_string(),
2045                    &HISTORY_EVENT_TYPE_JOIN_NEXT,
2046                ],
2047            )
2048            .await
2049            .map_err(DbErrorRead::from)?;
2050
2051    let count = u32::try_from(get::<i64, _>(&row, "count")?).expect("COUNT cannot be negative");
2052    Ok(count)
2053}
2054
2055async fn nth_response(
2056    tx: &Transaction<'_>,
2057    execution_id: &ExecutionId,
2058    join_set_id: &JoinSetId,
2059    skip_rows: u32,
2060) -> Result<Option<ResponseWithCursor>, DbErrorRead> {
2061    let row = tx
2062            .query_opt(
2063                "SELECT r.id, r.created_at, r.join_set_id, \
2064                 r.delay_id, r.delay_success, \
2065                 r.child_execution_id, r.finished_version, l.json_value \
2066                 FROM t_join_set_response r LEFT OUTER JOIN t_execution_log l ON r.child_execution_id = l.execution_id \
2067                 WHERE \
2068                 r.execution_id = $1 AND r.join_set_id = $2 AND \
2069                 ( \
2070                 r.finished_version = l.version \
2071                 OR \
2072                 r.child_execution_id IS NULL \
2073                 ) \
2074                 ORDER BY id \
2075                 LIMIT 1 OFFSET $3",
2076                 &[
2077                     &execution_id.to_string(),
2078                     &join_set_id.to_string(),
2079                     &i64::from(skip_rows),
2080                 ]
2081            )
2082            .await
2083            .map_err(DbErrorRead::from)?;
2084
2085    match row {
2086        Some(r) => Ok(Some(parse_response_with_cursor(&r)?)),
2087        None => Ok(None),
2088    }
2089}
2090
2091#[instrument(level = Level::TRACE, skip_all, fields(%execution_id))]
2092async fn append(
2093    tx: &Transaction<'_>,
2094    execution_id: &ExecutionId,
2095    req: AppendRequest,
2096    appending_version: Version,
2097) -> Result<(AppendResponse, AppendNotifier), DbErrorWrite> {
2098    if matches!(req.event, ExecutionRequest::Created { .. }) {
2099        return Err(DbErrorWrite::NonRetriable(
2100            DbErrorWriteNonRetriable::ValidationFailed(
2101                "cannot append `Created` event - use `create` instead".into(),
2102            ),
2103        ));
2104    }
2105
2106    if let AppendRequest {
2107        event:
2108            ExecutionRequest::Locked(Locked {
2109                component_id,
2110                deployment_id,
2111                executor_id,
2112                run_id,
2113                lock_expires_at,
2114                retry_config,
2115            }),
2116        created_at,
2117    } = req
2118    {
2119        return lock_single_execution(
2120            tx,
2121            created_at,
2122            &component_id,
2123            deployment_id,
2124            execution_id,
2125            run_id,
2126            &appending_version,
2127            executor_id,
2128            lock_expires_at,
2129            retry_config,
2130        )
2131        .await
2132        .map(|locked_execution| (locked_execution.next_version, AppendNotifier::default()));
2133    }
2134
2135    let combined_state = get_combined_state(tx, execution_id).await?;
2136    if combined_state
2137        .execution_with_state
2138        .pending_state
2139        .is_finished()
2140    {
2141        debug!("Execution is already finished");
2142        return Err(DbErrorWrite::NonRetriable(
2143            DbErrorWriteNonRetriable::IllegalState {
2144                reason: "already finished".into(),
2145                context: SpanTrace::capture(),
2146                source: None,
2147                loc: Location::caller(),
2148            },
2149        ));
2150    }
2151
2152    check_expected_next_and_appending_version(
2153        &combined_state.get_next_version_assert_not_finished(),
2154        &appending_version,
2155    )?;
2156
2157    let event = Json(req.event);
2158
2159    // Insert into t_execution_log
2160    tx.execute(
2161            "INSERT INTO t_execution_log (execution_id, created_at, json_value, version, variant, join_set_id) \
2162             VALUES ($1, $2, $3, $4, $5, $6)",
2163            &[
2164                &execution_id.to_string(),
2165                &req.created_at,
2166                &event,
2167                &i64::from(appending_version.0),
2168                &event.0.variant(),
2169                &event.0.join_set_id().map(std::string::ToString::to_string),
2170            ],
2171        )
2172        .await?;
2173
2174    // Calculate current pending state
2175    match &event.0 {
2176        ExecutionRequest::Created { .. } => {
2177            unreachable!("handled in the caller")
2178        }
2179
2180        ExecutionRequest::Locked { .. } => {
2181            unreachable!("handled above")
2182        }
2183
2184        ExecutionRequest::TemporarilyFailed {
2185            backoff_expires_at, ..
2186        }
2187        | ExecutionRequest::TemporarilyTimedOut {
2188            backoff_expires_at, ..
2189        } => {
2190            let (next_version, notifier) = update_state_pending_after_event_appended(
2191                tx,
2192                execution_id,
2193                &appending_version,
2194                *backoff_expires_at,
2195                true, // an intermittent failure
2196                combined_state.execution_with_state.component_digest,
2197            )
2198            .await?;
2199            return Ok((next_version, notifier));
2200        }
2201
2202        ExecutionRequest::Unlocked {
2203            backoff_expires_at, ..
2204        } => {
2205            let (next_version, notifier) = update_state_pending_after_event_appended(
2206                tx,
2207                execution_id,
2208                &appending_version,
2209                *backoff_expires_at,
2210                false, // not an intermittent failure
2211                combined_state.execution_with_state.component_digest,
2212            )
2213            .await?;
2214            return Ok((next_version, notifier));
2215        }
2216
2217        ExecutionRequest::Paused => {
2218            match &combined_state.execution_with_state.pending_state {
2219                PendingState::Finished { .. } => {
2220                    unreachable!("handled above");
2221                }
2222                PendingState::Paused(..) => {
2223                    return Err(DbErrorWriteNonRetriable::IllegalState {
2224                        reason: "cannot pause, execution is already paused".into(),
2225                        context: SpanTrace::capture(),
2226                        source: None,
2227                        loc: Location::caller(),
2228                    }
2229                    .into());
2230                }
2231                _ => {}
2232            }
2233            let next_version =
2234                update_state_paused(tx, execution_id, &appending_version, true).await?;
2235            return Ok((next_version, AppendNotifier::default()));
2236        }
2237
2238        ExecutionRequest::Unpaused => {
2239            if !combined_state
2240                .execution_with_state
2241                .pending_state
2242                .is_paused()
2243            {
2244                return Err(DbErrorWriteNonRetriable::IllegalState {
2245                    reason: "cannot unpause, execution is not paused".into(),
2246                    context: SpanTrace::capture(),
2247                    source: None,
2248                    loc: Location::caller(),
2249                }
2250                .into());
2251            }
2252            let next_version =
2253                update_state_paused(tx, execution_id, &appending_version, false).await?;
2254            return Ok((next_version, AppendNotifier::default()));
2255        }
2256
2257        ExecutionRequest::Finished { result, .. } => {
2258            update_state_finished(
2259                tx,
2260                execution_id,
2261                &appending_version,
2262                req.created_at,
2263                PendingStateFinishedResultKind::from(result),
2264            )
2265            .await?;
2266            return Ok((
2267                appending_version,
2268                AppendNotifier {
2269                    pending_at: None,
2270                    execution_finished: Some(NotifierExecutionFinished {
2271                        execution_id: execution_id.clone(),
2272                        retval: result.clone(),
2273                    }),
2274                    response: None,
2275                },
2276            ));
2277        }
2278
2279        ExecutionRequest::HistoryEvent {
2280            event:
2281                HistoryEvent::JoinSetCreate { .. }
2282                | HistoryEvent::JoinSetRequest {
2283                    request: JoinSetRequest::ChildExecutionRequest { .. },
2284                    ..
2285                }
2286                | HistoryEvent::Persist { .. }
2287                | HistoryEvent::Schedule { .. }
2288                | HistoryEvent::Stub { .. }
2289                | HistoryEvent::JoinNextTooMany { .. }
2290                | HistoryEvent::JoinNextTry { .. },
2291        } => {
2292            return Ok((
2293                bump_state_next_version(tx, execution_id, &appending_version, None).await?,
2294                AppendNotifier::default(),
2295            ));
2296        }
2297
2298        ExecutionRequest::HistoryEvent {
2299            event:
2300                HistoryEvent::JoinSetRequest {
2301                    join_set_id,
2302                    request:
2303                        JoinSetRequest::DelayRequest {
2304                            delay_id,
2305                            expires_at,
2306                            ..
2307                        },
2308                },
2309        } => {
2310            return Ok((
2311                bump_state_next_version(
2312                    tx,
2313                    execution_id,
2314                    &appending_version,
2315                    Some(DelayReq {
2316                        join_set_id: join_set_id.clone(),
2317                        delay_id: delay_id.clone(),
2318                        expires_at: *expires_at,
2319                    }),
2320                )
2321                .await?,
2322                AppendNotifier::default(),
2323            ));
2324        }
2325
2326        ExecutionRequest::HistoryEvent {
2327            event:
2328                HistoryEvent::JoinNext {
2329                    join_set_id,
2330                    run_expires_at,
2331                    closing,
2332                    requested_ffqn: _,
2333                },
2334        } => {
2335            // Did the response arrive already?
2336            let join_next_count = count_join_next(tx, execution_id, join_set_id).await?;
2337
2338            // Fetch the response corresponding to this JoinNext (skip n-1)
2339            let nth_response =
2340                nth_response(tx, execution_id, join_set_id, join_next_count - 1).await?;
2341
2342            trace!("join_next_count: {join_next_count}, nth_response: {nth_response:?}");
2343            assert!(join_next_count > 0);
2344
2345            if let Some(ResponseWithCursor {
2346                event:
2347                    JoinSetResponseEventOuter {
2348                        created_at: nth_created_at,
2349                        ..
2350                    },
2351                cursor: _,
2352            }) = nth_response
2353            {
2354                let scheduled_at = std::cmp::max(*run_expires_at, nth_created_at);
2355                let (next_version, notifier) = update_state_pending_after_event_appended(
2356                    tx,
2357                    execution_id,
2358                    &appending_version,
2359                    scheduled_at,
2360                    false, // not an intermittent failure
2361                    combined_state.execution_with_state.component_digest,
2362                )
2363                .await?;
2364                return Ok((next_version, notifier));
2365            }
2366
2367            return Ok((
2368                update_state_blocked(
2369                    tx,
2370                    execution_id,
2371                    &appending_version,
2372                    join_set_id,
2373                    *run_expires_at,
2374                    *closing,
2375                )
2376                .await?,
2377                AppendNotifier::default(),
2378            ));
2379        }
2380    }
2381}
2382
2383async fn append_response(
2384    tx: &Transaction<'_>,
2385    execution_id: &ExecutionId,
2386    event: JoinSetResponseEventOuter,
2387) -> Result<AppendNotifier, DbErrorWrite> {
2388    let join_set_id = &event.event.join_set_id;
2389
2390    let (delay_id, delay_success) = match &event.event.event {
2391        JoinSetResponse::DelayFinished { delay_id, result } => {
2392            (Some(delay_id.to_string()), Some(result.is_ok()))
2393        }
2394        JoinSetResponse::ChildExecutionFinished { .. } => (None, None),
2395    };
2396
2397    let (child_execution_id, finished_version) = match &event.event.event {
2398        JoinSetResponse::ChildExecutionFinished {
2399            child_execution_id,
2400            finished_version,
2401            result: _,
2402        } => (
2403            Some(child_execution_id.to_string()),
2404            Some(i64::from(finished_version.0)),
2405        ),
2406        JoinSetResponse::DelayFinished { .. } => (None, None),
2407    };
2408
2409    let row = tx.query_one(
2410            "INSERT INTO t_join_set_response (execution_id, created_at, join_set_id, delay_id, delay_success, child_execution_id, finished_version) \
2411             VALUES ($1, $2, $3, $4, $5, $6, $7) RETURNING id",
2412             &[
2413                 &execution_id.to_string(),
2414                 &event.created_at,
2415                 &join_set_id.to_string(),
2416                 &delay_id,
2417                 &delay_success,
2418                 &child_execution_id,
2419                 &finished_version,
2420             ]
2421        ).await?;
2422    let cursor = ResponseCursor(
2423        u32::try_from(get::<i64, _>(&row, 0)?)
2424            .map_err(|_| consistency_db_err("t_join_set_response.id must not be negative"))?,
2425    );
2426    // if the execution is going to be unblocked by this response...
2427    let combined_state = get_combined_state(tx, execution_id).await?;
2428    debug!("previous_pending_state: {combined_state:?}");
2429
2430    let mut notifier = if let PendingStateMergedPause::BlockedByJoinSet {
2431        state:
2432            PendingStateBlockedByJoinSet {
2433                join_set_id: found_join_set_id,
2434                lock_expires_at, // Set to a future time if the worker is keeping the execution warm waiting for the result.
2435                closing: _,
2436            },
2437        paused: _,
2438    } =
2439        PendingStateMergedPause::from(combined_state.execution_with_state.pending_state)
2440        && *join_set_id == found_join_set_id
2441    {
2442        let scheduled_at = std::cmp::max(lock_expires_at, event.created_at);
2443        // Unblock the state.
2444        update_state_pending_after_response_appended(
2445            tx,
2446            execution_id,
2447            scheduled_at,
2448            combined_state.execution_with_state.component_digest,
2449        )
2450        .await?
2451    } else {
2452        AppendNotifier::default()
2453    };
2454
2455    if let JoinSetResponseEvent {
2456        join_set_id,
2457        event:
2458            JoinSetResponse::DelayFinished {
2459                delay_id,
2460                result: _,
2461            },
2462    } = &event.event
2463    {
2464        debug!(%join_set_id, %delay_id, "Deleting from `t_delay`");
2465        tx.execute(
2466            "DELETE FROM t_delay WHERE execution_id = $1 AND join_set_id = $2 AND delay_id = $3",
2467            &[
2468                &execution_id.to_string(),
2469                &join_set_id.to_string(),
2470                &delay_id.to_string(),
2471            ],
2472        )
2473        .await?;
2474    }
2475
2476    notifier.response = Some((execution_id.clone(), ResponseWithCursor { cursor, event }));
2477    Ok(notifier)
2478}
2479
2480async fn append_backtrace(
2481    tx: &Transaction<'_>,
2482    backtrace_info: &BacktraceInfo,
2483) -> Result<(), DbErrorWrite> {
2484    // Compute hash for deduplication
2485    let backtrace_hash = backtrace_info.wasm_backtrace.hash();
2486
2487    // Insert into t_wasm_backtrace if not already present
2488    tx.execute(
2489        "INSERT INTO t_wasm_backtrace (backtrace_hash, wasm_backtrace) \
2490         VALUES ($1, $2) \
2491         ON CONFLICT (backtrace_hash) DO NOTHING",
2492        &[
2493            &backtrace_hash.as_slice(),
2494            &Json(&backtrace_info.wasm_backtrace),
2495        ],
2496    )
2497    .await?;
2498
2499    // Insert into t_execution_backtrace referencing the hash
2500    tx.execute(
2501        "INSERT INTO t_execution_backtrace \
2502         (execution_id, component_id, version_min_including, version_max_excluding, backtrace_hash) \
2503         VALUES ($1, $2, $3, $4, $5)",
2504        &[
2505            &backtrace_info.execution_id.to_string(),
2506            &Json(&backtrace_info.component_id),
2507            &i64::from(backtrace_info.version_min_including.0),
2508            &i64::from(backtrace_info.version_max_excluding.0),
2509            &backtrace_hash.as_slice(),
2510        ],
2511    )
2512    .await?;
2513
2514    Ok(())
2515}
2516
2517async fn append_log(tx: &Transaction<'_>, row: &LogInfoAppendRow) -> Result<(), DbErrorWrite> {
2518    let (level, message, stream_type, payload, created_at) = match &row.log_entry {
2519        LogEntry::Log {
2520            created_at,
2521            level,
2522            message,
2523        } => (
2524            Some(*level as i32),
2525            Some(message.as_str()),
2526            None::<i32>,
2527            None::<&[u8]>,
2528            created_at,
2529        ),
2530        LogEntry::Stream {
2531            created_at,
2532            payload,
2533            stream_type,
2534        } => (
2535            None::<i32>,
2536            None::<&str>,
2537            Some(*stream_type as i32),
2538            Some(payload.as_slice()),
2539            created_at,
2540        ),
2541    };
2542
2543    tx.execute(
2544        "INSERT INTO t_log (
2545            execution_id,
2546            run_id,
2547            created_at,
2548            level,
2549            message,
2550            stream_type,
2551            payload
2552        ) VALUES ($1, $2, $3, $4, $5, $6, $7)",
2553        &[
2554            &row.execution_id.to_string(),
2555            &row.run_id.to_string(),
2556            &created_at,
2557            &level,
2558            &message,
2559            &stream_type,
2560            &payload,
2561        ],
2562    )
2563    .await?;
2564
2565    Ok(())
2566}
2567
2568async fn get_execution_log(
2569    tx: &Transaction<'_>,
2570    execution_id: &ExecutionId,
2571) -> Result<concepts::storage::ExecutionLog, DbErrorRead> {
2572    let rows = tx
2573        .query(
2574            "SELECT created_at, json_value, version FROM t_execution_log WHERE \
2575                 execution_id = $1 ORDER BY version",
2576            &[&execution_id.to_string()],
2577        )
2578        .await
2579        .map_err(DbErrorRead::from)?;
2580
2581    if rows.is_empty() {
2582        return Err(DbErrorRead::NotFound);
2583    }
2584
2585    let mut events = Vec::with_capacity(rows.len());
2586    for row in rows {
2587        let created_at: DateTime<Utc> = get(&row, "created_at")?;
2588        let event: Json<ExecutionRequest> = get(&row, "json_value")?;
2589        let event = event.0;
2590        let version: i64 = get(&row, "version")?;
2591        let version = Version::try_from(version)
2592            .map_err(|_| consistency_db_err("version must be non-negative"))?;
2593
2594        events.push(ExecutionEvent {
2595            created_at,
2596            event,
2597            backtrace_id: None,
2598            version,
2599        });
2600    }
2601
2602    let combined_state = get_combined_state(tx, execution_id).await?;
2603    let responses = list_responses(tx, execution_id, None).await?;
2604
2605    Ok(concepts::storage::ExecutionLog {
2606        execution_id: execution_id.clone(),
2607        events,
2608        responses,
2609        next_version: combined_state.get_next_version_or_finished(),
2610        pending_state: combined_state.execution_with_state.pending_state,
2611        component_digest: combined_state.execution_with_state.component_digest,
2612        component_type: combined_state.execution_with_state.component_type,
2613        deployment_id: combined_state.execution_with_state.deployment_id,
2614    })
2615}
2616
2617async fn get_max_version(
2618    tx: &Transaction<'_>,
2619    execution_id: &ExecutionId,
2620) -> Result<Version, DbErrorRead> {
2621    let row = tx
2622        .query_one(
2623            "SELECT MAX(version) as version FROM t_execution_log WHERE execution_id = $1",
2624            &[&execution_id.to_string()],
2625        )
2626        .await?;
2627    let max_version: i64 = get(&row, "version")?;
2628    let max_version = Version::try_from(max_version)
2629        .map_err(|_| consistency_db_err("version must be non-negative"))?;
2630    Ok(max_version)
2631}
2632
2633async fn get_max_response_cursor(
2634    tx: &Transaction<'_>,
2635    execution_id: &ExecutionId,
2636) -> Result<ResponseCursor, DbErrorRead> {
2637    let row = tx
2638        .query_one(
2639            "SELECT MAX(id) as id FROM t_join_set_response WHERE execution_id = $1",
2640            &[&execution_id.to_string()],
2641        )
2642        .await?;
2643    // Assume the execution exists and has no responses
2644    let max_cursor = get::<Option<i64>, _>(&row, "id")?.unwrap_or_default();
2645    let max_cursor = ResponseCursor(
2646        u32::try_from(max_cursor).map_err(|_| consistency_db_err("id must not be negative"))?,
2647    );
2648    Ok(max_cursor)
2649}
2650
2651async fn list_execution_events(
2652    tx: &Transaction<'_>,
2653    execution_id: &ExecutionId,
2654    pagination: Pagination<VersionType>,
2655    include_backtrace_id: bool,
2656) -> Result<Vec<ExecutionEvent>, DbErrorRead> {
2657    let mut params: Vec<Box<dyn ToSql + Send + Sync>> = Vec::new();
2658    let mut add_param = |p: Box<dyn ToSql + Send + Sync>| {
2659        params.push(p);
2660        format!("${}", params.len())
2661    };
2662
2663    let p_execution_id = add_param(Box::new(execution_id.to_string()));
2664
2665    let (cursor, length, rel, is_desc) = match &pagination {
2666        Pagination::NewerThan {
2667            cursor,
2668            length,
2669            including_cursor,
2670        } => (
2671            *cursor,
2672            *length,
2673            if *including_cursor { ">=" } else { ">" },
2674            false,
2675        ),
2676        Pagination::OlderThan {
2677            cursor,
2678            length,
2679            including_cursor,
2680        } => (
2681            *cursor,
2682            *length,
2683            if *including_cursor { "<=" } else { "<" },
2684            true,
2685        ),
2686    };
2687    let p_cursor = add_param(Box::new(i64::from(cursor)));
2688    let p_limit = add_param(Box::new(i64::from(length)));
2689
2690    let base_select = if include_backtrace_id {
2691        format!(
2692            "SELECT
2693                log.created_at,
2694                log.json_value,
2695                log.version,
2696                bt.version_min_including AS backtrace_id
2697            FROM
2698                t_execution_log AS log
2699            LEFT OUTER JOIN
2700                t_execution_backtrace AS bt ON log.execution_id = bt.execution_id
2701                                AND log.version >= bt.version_min_including
2702                                AND log.version < bt.version_max_excluding
2703            WHERE
2704                log.execution_id = {p_execution_id}
2705                AND log.version {rel} {p_cursor}"
2706        )
2707    } else {
2708        format!(
2709            "SELECT
2710                created_at, json_value, NULL::BIGINT as backtrace_id, version
2711            FROM t_execution_log WHERE
2712                execution_id = {p_execution_id} AND version {rel} {p_cursor}"
2713        )
2714    };
2715
2716    let order = if is_desc { "DESC" } else { "ASC" };
2717    let mut sql = format!("{base_select} ORDER BY version {order} LIMIT {p_limit}");
2718
2719    // Re-order to ascending for consistent oldest-to-newest results
2720    if is_desc {
2721        sql = format!("SELECT * FROM ({sql}) AS sub ORDER BY version ASC");
2722    }
2723
2724    let params_refs: Vec<&(dyn ToSql + Sync)> = params
2725        .iter()
2726        .map(|p| p.as_ref() as &(dyn ToSql + Sync))
2727        .collect();
2728
2729    let rows = tx
2730        .query(&sql, &params_refs)
2731        .await
2732        .map_err(DbErrorRead::from)?;
2733
2734    let mut events = Vec::with_capacity(rows.len());
2735    for row in rows {
2736        let created_at: DateTime<Utc> = get(&row, "created_at")?;
2737        let backtrace_id = get::<Option<i64>, _>(&row, "backtrace_id")?
2738            .map(Version::try_from)
2739            .transpose()
2740            .map_err(|_| consistency_db_err("version must be non-negative"))?;
2741
2742        let version = get::<i64, _>(&row, "version")?;
2743        let version = Version::new(
2744            VersionType::try_from(version)
2745                .map_err(|_| consistency_db_err("version must be non-negative"))?,
2746        );
2747        let event_req: Json<ExecutionRequest> = get(&row, "json_value")?;
2748        let event_req = event_req.0;
2749
2750        events.push(ExecutionEvent {
2751            created_at,
2752            event: event_req,
2753            backtrace_id,
2754            version,
2755        });
2756    }
2757    Ok(events)
2758}
2759
2760async fn get_execution_event(
2761    tx: &Transaction<'_>,
2762    execution_id: &ExecutionId,
2763    version: VersionType,
2764) -> Result<ExecutionEvent, DbErrorRead> {
2765    let row = tx
2766        .query_one(
2767            "SELECT created_at, json_value, version FROM t_execution_log WHERE \
2768                 execution_id = $1 AND version = $2",
2769            &[&execution_id.to_string(), &i64::from(version)],
2770        )
2771        .await?;
2772
2773    let created_at: DateTime<Utc> = get(&row, "created_at")?;
2774    let json_val: Json<ExecutionRequest> = get(&row, "json_value")?;
2775    let version = get::<i64, _>(&row, "version")?;
2776    let version = Version::try_from(version)
2777        .map_err(|_| consistency_db_err("version must be non-negative"))?;
2778    let event = json_val.0;
2779
2780    Ok(ExecutionEvent {
2781        created_at,
2782        event,
2783        backtrace_id: None,
2784        version,
2785    })
2786}
2787
2788async fn get_last_execution_event(
2789    tx: &Transaction<'_>,
2790    execution_id: &ExecutionId,
2791) -> Result<ExecutionEvent, DbErrorRead> {
2792    let row = tx
2793        .query_one(
2794            "SELECT created_at, json_value, version FROM t_execution_log WHERE \
2795                 execution_id = $1 ORDER BY version DESC LIMIT 1",
2796            &[&execution_id.to_string()],
2797        )
2798        .await?;
2799
2800    let created_at: DateTime<Utc> = get(&row, "created_at")?;
2801    let event: Json<ExecutionRequest> = get(&row, "json_value")?;
2802    let event = event.0;
2803    let version: i64 = get(&row, "version")?;
2804    let version = Version::try_from(version)
2805        .map_err(|_| consistency_db_err("version must be non-negative"))?;
2806
2807    Ok(ExecutionEvent {
2808        created_at,
2809        event,
2810        backtrace_id: None,
2811        version,
2812    })
2813}
2814
2815async fn delay_response(
2816    tx: &Transaction<'_>,
2817    execution_id: &ExecutionId,
2818    delay_id: &DelayId,
2819) -> Result<Option<bool>, DbErrorRead> {
2820    let row = tx
2821        .query_opt(
2822            "SELECT delay_success \
2823                 FROM t_join_set_response \
2824                 WHERE \
2825                 execution_id = $1 AND delay_id = $2",
2826            &[&execution_id.to_string(), &delay_id.to_string()],
2827        )
2828        .await?;
2829
2830    match row {
2831        Some(r) => Ok(Some(get::<bool, _>(&r, "delay_success")?)),
2832        None => Ok(None),
2833    }
2834}
2835
2836#[instrument(level = Level::TRACE, skip_all)]
2837async fn get_responses_after(
2838    tx: &Transaction<'_>,
2839    execution_id: &ExecutionId,
2840    last_response: ResponseCursor,
2841) -> Result<Vec<ResponseWithCursor>, DbErrorRead> {
2842    let rows = tx
2843            .query(
2844                "SELECT r.id, r.created_at, r.join_set_id, \
2845                 r.delay_id, r.delay_success, \
2846                 r.child_execution_id, r.finished_version, child.json_value \
2847                 FROM t_join_set_response r LEFT OUTER JOIN t_execution_log child ON r.child_execution_id = child.execution_id \
2848                 WHERE \
2849                 r.id > $1 AND \
2850                 r.execution_id = $2 AND \
2851                 ( \
2852                 r.finished_version = child.version \
2853                 OR \
2854                 r.child_execution_id IS NULL \
2855                 ) \
2856                 ORDER BY id \
2857                 ",
2858                 &[
2859                     &i64::from(last_response.0),
2860                     &execution_id.to_string(),
2861                 ]
2862            )
2863            .await?;
2864
2865    let mut results = Vec::with_capacity(rows.len());
2866    for row in rows {
2867        let resp = parse_response_with_cursor(&row)?;
2868        results.push(resp);
2869    }
2870    Ok(results)
2871}
2872
2873async fn get_pending_of_single_ffqn(
2874    tx: &Transaction<'_>,
2875    batch_size: u32,
2876    pending_at_or_sooner: DateTime<Utc>,
2877    ffqn: &FunctionFqn,
2878    select_strategy: SelectStrategy,
2879) -> Result<Vec<(ExecutionId, Version)>, ()> {
2880    let rows = tx
2881        .query(
2882            &format!(
2883                r"
2884                SELECT execution_id, corresponding_version FROM t_state
2885                WHERE
2886                state = '{STATE_PENDING_AT}' AND
2887                pending_expires_finished <= $1 AND ffqn = $2
2888                AND is_paused = false
2889                ORDER BY pending_expires_finished
2890                {}
2891                LIMIT $3
2892                ",
2893                if select_strategy == SelectStrategy::LockForUpdate {
2894                    "FOR UPDATE SKIP LOCKED"
2895                } else {
2896                    ""
2897                }
2898            ),
2899            &[
2900                &pending_at_or_sooner,
2901                &ffqn.to_string(),
2902                &(i64::from(batch_size)),
2903            ],
2904        )
2905        .await
2906        .map_err(|err| {
2907            warn!("Ignoring consistency error {err:?}");
2908        })?;
2909
2910    let mut result = Vec::with_capacity(rows.len());
2911    for row in rows {
2912        let unpack = || -> Result<(ExecutionId, Version), DbErrorGeneric> {
2913            let eid_str: String = get(&row, "execution_id")?;
2914            let corresponding_version: i64 = get(&row, "corresponding_version")?;
2915            let corresponding_version = Version::try_from(corresponding_version)
2916                .map_err(|_| consistency_db_err("version must be non-negative"))?;
2917
2918            if let Ok(eid) = ExecutionId::from_str(&eid_str) {
2919                return Ok((eid, corresponding_version.increment()));
2920            }
2921            Err(consistency_db_err("invalid execution_id"))
2922        };
2923
2924        match unpack() {
2925            Ok(val) => result.push(val),
2926            Err(err) => warn!("Ignoring corrupted row in pending check: {err:?}"),
2927        }
2928    }
2929    Ok(result)
2930}
2931
2932/// Get executions and their next versions
2933async fn get_pending_by_ffqns(
2934    tx: &Transaction<'_>,
2935    batch_size: u32,
2936    pending_at_or_sooner: DateTime<Utc>,
2937    ffqns: &[FunctionFqn],
2938    select_strategy: SelectStrategy,
2939) -> Result<Vec<(ExecutionId, Version)>, DbErrorGeneric> {
2940    let batch_size = usize::try_from(batch_size).expect("16 bit systems are unsupported");
2941    let mut execution_ids_versions = Vec::with_capacity(batch_size);
2942
2943    for ffqn in ffqns {
2944        let needed = batch_size - execution_ids_versions.len();
2945        if needed == 0 {
2946            break;
2947        }
2948        let needed = u32::try_from(needed).expect("u32 - usize cannot overflow an 32");
2949        if let Ok(execs) =
2950            get_pending_of_single_ffqn(tx, needed, pending_at_or_sooner, ffqn, select_strategy)
2951                .await
2952        {
2953            execution_ids_versions.extend(execs);
2954        }
2955    }
2956
2957    Ok(execution_ids_versions)
2958}
2959
2960#[derive(Debug, Copy, Clone, PartialEq, Eq)]
2961enum SelectStrategy {
2962    Read,
2963    LockForUpdate,
2964}
2965
2966async fn get_pending_by_component_input_digest(
2967    tx: &Transaction<'_>,
2968    batch_size: u32,
2969    pending_at_or_sooner: DateTime<Utc>,
2970    input_digest: &InputContentDigest,
2971    select_strategy: SelectStrategy,
2972) -> Result<Vec<(ExecutionId, Version)>, DbErrorGeneric> {
2973    let rows = tx
2974        .query(
2975            &format!(
2976                r"
2977                SELECT execution_id, corresponding_version FROM t_state WHERE
2978                state = '{STATE_PENDING_AT}' AND
2979                pending_expires_finished <= $1 AND
2980                component_id_input_digest = $2
2981                AND is_paused = false
2982                ORDER BY pending_expires_finished
2983                {}
2984                LIMIT $3
2985                ",
2986                if select_strategy == SelectStrategy::LockForUpdate {
2987                    "FOR UPDATE SKIP LOCKED"
2988                } else {
2989                    ""
2990                }
2991            ),
2992            &[&pending_at_or_sooner, &input_digest, &i64::from(batch_size)],
2993        )
2994        .await?;
2995
2996    let mut result = Vec::with_capacity(rows.len());
2997    for row in rows {
2998        let unpack = || -> Result<(ExecutionId, Version), DbErrorGeneric> {
2999            let eid_str: String = get(&row, "execution_id")?;
3000            let corresponding_version: i64 = get(&row, "corresponding_version")?;
3001            let corresponding_version = Version::try_from(corresponding_version)
3002                .map_err(|_| consistency_db_err("version must be non-negative"))?;
3003
3004            let eid = ExecutionId::from_str(&eid_str)
3005                .map_err(|err| consistency_db_err(err.to_string()))?;
3006            Ok((eid, corresponding_version.increment()))
3007        };
3008
3009        match unpack() {
3010            Ok(val) => result.push(val),
3011            Err(err) => {
3012                warn!("Skipping corrupted row in get_pending_by_component_input_digest: {err:?}");
3013            }
3014        }
3015    }
3016
3017    Ok(result)
3018}
3019
3020fn notify_pending_locked(
3021    notifier: &NotifierPendingAt,
3022    current_time: DateTime<Utc>,
3023    ffqn_to_pending_subscription: &std::sync::MutexGuard<PendingFfqnSubscribersHolder>,
3024) {
3025    if notifier.scheduled_at <= current_time {
3026        ffqn_to_pending_subscription.notify(notifier);
3027    }
3028}
3029
3030async fn upgrade_execution_component(
3031    tx: &Transaction<'_>,
3032    execution_id: &ExecutionId,
3033    old: &InputContentDigest,
3034    new: &InputContentDigest,
3035) -> Result<(), DbErrorWrite> {
3036    debug!("Updating t_state to component {new}");
3037
3038    let updated = tx
3039        .execute(
3040            r"
3041                UPDATE t_state
3042                SET
3043                    updated_at = CURRENT_TIMESTAMP,
3044                    component_id_input_digest = $1
3045                WHERE
3046                    execution_id = $2 AND
3047                    component_id_input_digest = $3
3048                ",
3049            &[
3050                &new.as_slice(),           // $1: BYTEA
3051                &execution_id.to_string(), // $2: TEXT
3052                &old.as_slice(),           // $3: BYTEA
3053            ],
3054        )
3055        .await?;
3056
3057    if updated != 1 {
3058        return Err(DbErrorWrite::NotFound);
3059    }
3060    Ok(())
3061}
3062
3063impl PostgresConnection {
3064    // Must be called after write transaction commit for a correct happens-before relationship.
3065    #[instrument(level = Level::TRACE, skip_all)]
3066    fn notify_all(&self, notifiers: Vec<AppendNotifier>, current_time: DateTime<Utc>) {
3067        let (pending_ats, finished_execs, responses) = {
3068            let (mut pending_ats, mut finished_execs, mut responses) =
3069                (Vec::new(), Vec::new(), Vec::new());
3070            for notifier in notifiers {
3071                if let Some(pending_at) = notifier.pending_at {
3072                    pending_ats.push(pending_at);
3073                }
3074                if let Some(finished) = notifier.execution_finished {
3075                    finished_execs.push(finished);
3076                }
3077                if let Some(response) = notifier.response {
3078                    responses.push(response);
3079                }
3080            }
3081            (pending_ats, finished_execs, responses)
3082        };
3083
3084        // Notify pending_at subscribers.
3085        if !pending_ats.is_empty() {
3086            let guard = self.pending_subscribers.lock().unwrap();
3087            for pending_at in pending_ats {
3088                notify_pending_locked(&pending_at, current_time, &guard);
3089            }
3090        }
3091        // Notify execution finished subscribers.
3092        if !finished_execs.is_empty() {
3093            let mut guard = self.execution_finished_subscribers.lock().unwrap();
3094            for finished in finished_execs {
3095                if let Some(listeners_of_exe_id) = guard.remove(&finished.execution_id) {
3096                    for (_tag, sender) in listeners_of_exe_id {
3097                        let _ = sender.send(finished.retval.clone());
3098                    }
3099                }
3100            }
3101        }
3102        // Notify response subscribers.
3103        if !responses.is_empty() {
3104            let mut guard = self.response_subscribers.lock().unwrap();
3105            for (execution_id, response) in responses {
3106                if let Some((sender, _)) = guard.remove(&execution_id) {
3107                    let _ = sender.send(response);
3108                }
3109            }
3110        }
3111    }
3112}
3113
3114#[async_trait]
3115impl DbExecutor for PostgresConnection {
3116    #[instrument(level = Level::TRACE, skip(self))]
3117    async fn lock_pending_by_ffqns(
3118        &self,
3119        batch_size: u32,
3120        pending_at_or_sooner: DateTime<Utc>,
3121        ffqns: Arc<[FunctionFqn]>,
3122        created_at: DateTime<Utc>,
3123        component_id: ComponentId,
3124        deployment_id: DeploymentId,
3125        executor_id: ExecutorId,
3126        lock_expires_at: DateTime<Utc>,
3127        run_id: RunId,
3128        retry_config: ComponentRetryConfig,
3129    ) -> Result<LockPendingResponse, DbErrorWrite> {
3130        let mut client_guard = self.client.lock().await;
3131        let tx = client_guard.transaction().await?;
3132
3133        let execution_ids_versions = get_pending_by_ffqns(
3134            &tx,
3135            batch_size,
3136            pending_at_or_sooner,
3137            &ffqns,
3138            SelectStrategy::LockForUpdate,
3139        )
3140        .await?;
3141
3142        if execution_ids_versions.is_empty() {
3143            // Commit is required to release the connection state cleanly,
3144            // though rollback/drop works too for read-only.
3145            tx.commit().await?;
3146            return Ok(vec![]);
3147        }
3148
3149        debug!("Locking {execution_ids_versions:?}");
3150
3151        // Lock using the same transaction
3152        let mut locked_execs = Vec::with_capacity(execution_ids_versions.len());
3153        for (execution_id, version) in execution_ids_versions {
3154            match lock_single_execution(
3155                &tx,
3156                created_at,
3157                &component_id,
3158                deployment_id,
3159                &execution_id,
3160                run_id,
3161                &version,
3162                executor_id,
3163                lock_expires_at,
3164                retry_config,
3165            )
3166            .await
3167            {
3168                Ok(locked) => locked_execs.push(locked),
3169                Err(err) => {
3170                    tx.rollback().await?; // lock_single_execution performs multiple writes
3171                    debug!("Locking row {execution_id} failed - {err:?}");
3172                    return Err(err);
3173                }
3174            }
3175        }
3176
3177        tx.commit().await?;
3178
3179        Ok(locked_execs)
3180    }
3181
3182    #[instrument(level = Level::TRACE, skip(self))]
3183    async fn lock_pending_by_component_digest(
3184        &self,
3185        batch_size: u32,
3186        pending_at_or_sooner: DateTime<Utc>,
3187        component_id: &ComponentId,
3188        deployment_id: DeploymentId,
3189        created_at: DateTime<Utc>,
3190        executor_id: ExecutorId,
3191        lock_expires_at: DateTime<Utc>,
3192        run_id: RunId,
3193        retry_config: ComponentRetryConfig,
3194    ) -> Result<LockPendingResponse, DbErrorWrite> {
3195        let mut client_guard = self.client.lock().await;
3196        let tx = client_guard.transaction().await?;
3197
3198        let execution_ids_versions = get_pending_by_component_input_digest(
3199            &tx,
3200            batch_size,
3201            pending_at_or_sooner,
3202            &component_id.input_digest,
3203            SelectStrategy::LockForUpdate,
3204        )
3205        .await?;
3206
3207        if execution_ids_versions.is_empty() {
3208            tx.commit().await?;
3209            return Ok(vec![]);
3210        }
3211
3212        debug!("Locking {execution_ids_versions:?}");
3213
3214        let mut locked_execs = Vec::with_capacity(execution_ids_versions.len());
3215        for (execution_id, version) in execution_ids_versions {
3216            match lock_single_execution(
3217                &tx,
3218                created_at,
3219                component_id,
3220                deployment_id,
3221                &execution_id,
3222                run_id,
3223                &version,
3224                executor_id,
3225                lock_expires_at,
3226                retry_config,
3227            )
3228            .await
3229            {
3230                Ok(locked) => locked_execs.push(locked),
3231                Err(err) => {
3232                    tx.rollback().await?; // lock_single_execution performs multiple writes
3233                    debug!("Locking row {execution_id} failed - {err:?}");
3234                    return Err(err);
3235                }
3236            }
3237        }
3238
3239        tx.commit().await?;
3240        Ok(locked_execs)
3241    }
3242
3243    #[cfg(feature = "test")]
3244    #[instrument(level = Level::DEBUG, skip(self))]
3245    async fn lock_one(
3246        &self,
3247        created_at: DateTime<Utc>,
3248        component_id: ComponentId,
3249        deployment_id: DeploymentId,
3250        execution_id: &ExecutionId,
3251        run_id: RunId,
3252        version: Version,
3253        executor_id: ExecutorId,
3254        lock_expires_at: DateTime<Utc>,
3255        retry_config: ComponentRetryConfig,
3256    ) -> Result<LockedExecution, DbErrorWrite> {
3257        debug!(%execution_id, "lock_one");
3258        let mut client_guard = self.client.lock().await;
3259        let tx = client_guard.transaction().await?;
3260
3261        let res = lock_single_execution(
3262            &tx,
3263            created_at,
3264            &component_id,
3265            deployment_id,
3266            execution_id,
3267            run_id,
3268            &version,
3269            executor_id,
3270            lock_expires_at,
3271            retry_config,
3272        )
3273        .await?;
3274
3275        tx.commit().await?;
3276        Ok(res)
3277    }
3278
3279    #[instrument(level = Level::DEBUG, skip(self, req))]
3280    async fn append(
3281        &self,
3282        execution_id: ExecutionId,
3283        version: Version,
3284        req: AppendRequest,
3285    ) -> Result<AppendResponse, DbErrorWrite> {
3286        debug!(%req, "append");
3287        trace!(?req, "append");
3288        let created_at = req.created_at;
3289
3290        let mut client_guard = self.client.lock().await;
3291        let tx = client_guard.transaction().await?;
3292
3293        let (new_version, notifier) = append(&tx, &execution_id, req, version).await?;
3294
3295        tx.commit().await?;
3296
3297        // Explicitly drop guard (optional, happens at end of scope anyway)
3298        drop(client_guard);
3299
3300        self.notify_all(vec![notifier], created_at);
3301        Ok(new_version)
3302    }
3303
3304    #[instrument(level = Level::DEBUG, skip_all)]
3305    async fn append_batch_respond_to_parent(
3306        &self,
3307        events: AppendEventsToExecution,
3308        response: AppendResponseToExecution,
3309        current_time: DateTime<Utc>,
3310    ) -> Result<AppendBatchResponse, DbErrorWrite> {
3311        debug!("append_batch_respond_to_parent");
3312        if events.execution_id == response.parent_execution_id {
3313            return Err(DbErrorWrite::NonRetriable(
3314                DbErrorWriteNonRetriable::ValidationFailed(
3315                    "Parameters `execution_id` and `parent_execution_id` cannot be the same".into(),
3316                ),
3317            ));
3318        }
3319        if events.batch.is_empty() {
3320            return Err(DbErrorWrite::NonRetriable(
3321                DbErrorWriteNonRetriable::ValidationFailed("batch cannot be empty".into()),
3322            ));
3323        }
3324
3325        let mut client_guard = self.client.lock().await;
3326        let tx = client_guard.transaction().await?;
3327
3328        let mut version = events.version;
3329        let mut notifiers = Vec::new();
3330
3331        for append_request in events.batch {
3332            let (v, n) = append(&tx, &events.execution_id, append_request, version).await?;
3333            version = v;
3334            notifiers.push(n);
3335        }
3336
3337        let pending_at_parent = append_response(
3338            &tx,
3339            &response.parent_execution_id,
3340            JoinSetResponseEventOuter {
3341                created_at: response.created_at,
3342                event: JoinSetResponseEvent {
3343                    join_set_id: response.join_set_id,
3344                    event: JoinSetResponse::ChildExecutionFinished {
3345                        child_execution_id: response.child_execution_id,
3346                        finished_version: response.finished_version,
3347                        result: response.result,
3348                    },
3349                },
3350            },
3351        )
3352        .await?;
3353        notifiers.push(pending_at_parent);
3354
3355        tx.commit().await?;
3356        drop(client_guard);
3357
3358        self.notify_all(notifiers, current_time);
3359        Ok(version)
3360    }
3361
3362    #[instrument(level = Level::TRACE, skip(self, timeout_fut))]
3363    async fn wait_for_pending_by_ffqn(
3364        &self,
3365        pending_at_or_sooner: DateTime<Utc>,
3366        ffqns: Arc<[FunctionFqn]>,
3367        timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
3368    ) {
3369        let unique_tag: u64 = rand::random();
3370        let (sender, mut receiver) = mpsc::channel(1);
3371        {
3372            let mut pending_subscribers = self.pending_subscribers.lock().unwrap();
3373            for ffqn in ffqns.as_ref() {
3374                pending_subscribers.insert_ffqn(ffqn.clone(), (sender.clone(), unique_tag));
3375            }
3376        }
3377
3378        async {
3379            let mut db_has_pending = false;
3380            {
3381                // Scope the lock so we don't hold it while waiting for timeout
3382                let mut client_guard = self.client.lock().await;
3383                // Read-only transaction check
3384                if let Ok(tx) = client_guard.transaction().await {
3385                    if let Ok(res) = get_pending_by_ffqns(
3386                        &tx,
3387                        1,
3388                        pending_at_or_sooner,
3389                        &ffqns,
3390                        SelectStrategy::Read,
3391                    )
3392                    .await
3393                        && !res.is_empty()
3394                    {
3395                        db_has_pending = true;
3396                    }
3397                    // Commit/Rollback read transaction
3398                    let _ = tx.commit().await;
3399                }
3400            }
3401
3402            if db_has_pending {
3403                trace!("Not waiting, database already contains new pending executions");
3404                return;
3405            }
3406
3407            tokio::select! {
3408                _ = receiver.recv() => {
3409                    trace!("Received a notification");
3410                }
3411                () = timeout_fut => {
3412                }
3413            }
3414        }
3415        .await;
3416
3417        // Cleanup
3418        {
3419            let mut pending_subscribers = self.pending_subscribers.lock().unwrap();
3420            for ffqn in ffqns.as_ref() {
3421                match pending_subscribers.remove_ffqn(ffqn) {
3422                    Some((_, tag)) if tag == unique_tag => {}
3423                    Some(other) => {
3424                        pending_subscribers.insert_ffqn(ffqn.clone(), other);
3425                    }
3426                    None => {}
3427                }
3428            }
3429        }
3430    }
3431
3432    #[instrument(level = Level::DEBUG, skip(self, timeout_fut))]
3433    async fn wait_for_pending_by_component_digest(
3434        &self,
3435        pending_at_or_sooner: DateTime<Utc>,
3436        component_digest: &InputContentDigest,
3437        timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
3438    ) {
3439        let unique_tag: u64 = rand::random();
3440        let (sender, mut receiver) = mpsc::channel(1);
3441        {
3442            let mut pending_subscribers = self.pending_subscribers.lock().unwrap();
3443            pending_subscribers
3444                .insert_by_component(component_digest.clone(), (sender.clone(), unique_tag));
3445        }
3446
3447        async {
3448            let mut db_has_pending = false;
3449            {
3450                let mut client_guard = self.client.lock().await;
3451                if let Ok(tx) = client_guard.transaction().await {
3452                    if let Ok(res) = get_pending_by_component_input_digest(
3453                        &tx,
3454                        1,
3455                        pending_at_or_sooner,
3456                        component_digest,
3457                        SelectStrategy::Read,
3458                    )
3459                    .await
3460                        && !res.is_empty()
3461                    {
3462                        db_has_pending = true;
3463                    }
3464                    let _ = tx.commit().await;
3465                }
3466            }
3467
3468            if db_has_pending {
3469                trace!("Not waiting, database already contains new pending executions");
3470                return;
3471            }
3472
3473            tokio::select! {
3474                _ = receiver.recv() => {
3475                    trace!("Received a notification");
3476                }
3477                () = timeout_fut => {
3478                }
3479            }
3480        }
3481        .await;
3482
3483        // Cleanup
3484        {
3485            let mut pending_subscribers = self.pending_subscribers.lock().unwrap();
3486            match pending_subscribers.remove_by_component(component_digest) {
3487                Some((_, tag)) if tag == unique_tag => {}
3488                Some(other) => {
3489                    pending_subscribers.insert_by_component(component_digest.clone(), other);
3490                }
3491                None => {}
3492            }
3493        }
3494    }
3495
3496    async fn get_last_execution_event(
3497        &self,
3498        execution_id: &ExecutionId,
3499    ) -> Result<ExecutionEvent, DbErrorRead> {
3500        let mut client_guard = self.client.lock().await;
3501        let tx = client_guard.transaction().await?;
3502
3503        let event = get_last_execution_event(&tx, execution_id).await?;
3504
3505        tx.commit().await?;
3506        Ok(event)
3507    }
3508}
3509#[async_trait]
3510impl DbConnection for PostgresConnection {
3511    #[instrument(level = Level::DEBUG, skip_all, fields(execution_id = %req.execution_id))]
3512    async fn create(&self, req: CreateRequest) -> Result<AppendResponse, DbErrorWrite> {
3513        debug!("create");
3514        trace!(?req, "create");
3515        let created_at = req.created_at;
3516
3517        let mut client_guard = self.client.lock().await;
3518        let tx = client_guard.transaction().await?;
3519
3520        let (version, notifier) = create_inner(&tx, req.clone()).await?;
3521
3522        tx.commit().await?;
3523        drop(client_guard); // Release DB lock before notifying
3524
3525        self.notify_all(vec![notifier], created_at);
3526        Ok(version)
3527    }
3528
3529    #[instrument(level = Level::DEBUG, skip(self))]
3530    async fn get(
3531        &self,
3532        execution_id: &ExecutionId,
3533    ) -> Result<concepts::storage::ExecutionLog, DbErrorRead> {
3534        trace!("get");
3535        let mut client_guard = self.client.lock().await;
3536        let tx = client_guard.transaction().await?;
3537
3538        let res = get_execution_log(&tx, execution_id).await?;
3539
3540        tx.commit().await?;
3541        Ok(res)
3542    }
3543
3544    #[instrument(level = Level::DEBUG, skip(self, batch))]
3545    async fn append_batch(
3546        &self,
3547        current_time: DateTime<Utc>,
3548        batch: Vec<AppendRequest>,
3549        execution_id: ExecutionId,
3550        version: Version,
3551    ) -> Result<AppendBatchResponse, DbErrorWrite> {
3552        debug!("append_batch");
3553        trace!(?batch, "append_batch");
3554        assert!(!batch.is_empty(), "Empty batch request");
3555
3556        let mut client_guard = self.client.lock().await;
3557        let tx = client_guard.transaction().await?;
3558
3559        let mut version = version;
3560        let mut notifier = None;
3561
3562        for append_request in batch {
3563            let (v, n) = append(&tx, &execution_id, append_request, version).await?;
3564            version = v;
3565            notifier = Some(n);
3566        }
3567
3568        tx.commit().await?;
3569        drop(client_guard);
3570
3571        self.notify_all(
3572            vec![notifier.expect("checked that the batch is not empty")],
3573            current_time,
3574        );
3575        Ok(version)
3576    }
3577
3578    #[instrument(level = Level::DEBUG, skip_all, fields(%execution_id, %version))]
3579    async fn append_batch_create_new_execution(
3580        &self,
3581        current_time: DateTime<Utc>,
3582        batch: Vec<AppendRequest>,
3583        execution_id: ExecutionId,
3584        version: Version,
3585        child_req: Vec<CreateRequest>,
3586        backtraces: Vec<BacktraceInfo>,
3587    ) -> Result<AppendBatchResponse, DbErrorWrite> {
3588        debug!("append_batch_create_new_execution");
3589        trace!(?batch, ?child_req, "append_batch_create_new_execution");
3590        assert!(!batch.is_empty(), "Empty batch request");
3591
3592        let mut client_guard = self.client.lock().await;
3593        let tx = client_guard.transaction().await?;
3594
3595        let mut version = version;
3596        let mut notifier = None;
3597
3598        for append_request in batch {
3599            let (v, n) = append(&tx, &execution_id, append_request, version).await?;
3600            version = v;
3601            notifier = Some(n);
3602        }
3603
3604        let mut notifiers = Vec::new();
3605        notifiers.push(notifier.expect("checked that the batch is not empty"));
3606
3607        for req in child_req {
3608            let (_, n) = create_inner(&tx, req).await?;
3609            notifiers.push(n);
3610        }
3611        for backtrace in backtraces {
3612            append_backtrace(&tx, &backtrace).await?;
3613        }
3614        tx.commit().await?;
3615        drop(client_guard);
3616
3617        self.notify_all(notifiers, current_time);
3618        Ok(version)
3619    }
3620
3621    #[instrument(level = Level::DEBUG, skip(self, timeout_fut))]
3622    async fn subscribe_to_next_responses(
3623        &self,
3624        execution_id: &ExecutionId,
3625        last_response: ResponseCursor,
3626        timeout_fut: Pin<Box<dyn Future<Output = TimeoutOutcome> + Send>>,
3627    ) -> Result<Vec<ResponseWithCursor>, DbErrorReadWithTimeout> {
3628        debug!("next_responses");
3629        let unique_tag: u64 = rand::random();
3630        let execution_id_clone = execution_id.clone();
3631
3632        let cleanup = || {
3633            let mut guard = self.response_subscribers.lock().unwrap();
3634            match guard.remove(&execution_id_clone) {
3635                Some((_, tag)) if tag == unique_tag => {}
3636                Some(other) => {
3637                    guard.insert(execution_id_clone.clone(), other);
3638                }
3639                None => {}
3640            }
3641        };
3642
3643        let receiver = {
3644            let mut client_guard = self.client.lock().await;
3645            let tx = client_guard.transaction().await?;
3646
3647            // Register listener before fetching from database.
3648            // This is a best-effort mechanism that shortens the polling time, if it
3649            // does not detect the response it will sleep using `timeout_fut`. Consumers
3650            // are expected to poll this function in a loop.
3651            // Currently the notification mechanism only works on a single node deployment.
3652            let (sender, receiver) = oneshot::channel();
3653            self.response_subscribers
3654                .lock()
3655                .unwrap()
3656                .insert(execution_id.clone(), (sender, unique_tag));
3657
3658            let responses = get_responses_after(&tx, execution_id, last_response).await?;
3659
3660            if responses.is_empty() {
3661                // Commit read transaction
3662                tx.commit().await.map_err(|err| {
3663                    cleanup(); // Remove the just inserted subscriber.
3664                    DbErrorRead::from(err)
3665                })?;
3666                receiver
3667            } else {
3668                cleanup(); // Remove the just inserted subscriber as we already have the answer.
3669                tx.commit().await?;
3670                return Ok(responses);
3671            }
3672        };
3673
3674        let res = tokio::select! {
3675            resp = receiver => {
3676                match resp {
3677                    Ok(resp) => Ok(vec![resp]),
3678                    Err(_) => Err(DbErrorReadWithTimeout::from(DbErrorGeneric::Close)),
3679                }
3680            }
3681            outcome = timeout_fut => Err(DbErrorReadWithTimeout::Timeout(outcome)),
3682        };
3683
3684        cleanup();
3685        res
3686    }
3687
3688    #[instrument(level = Level::DEBUG, skip(self, timeout_fut))]
3689    async fn wait_for_finished_result(
3690        &self,
3691        execution_id: &ExecutionId,
3692        timeout_fut: Option<Pin<Box<dyn Future<Output = TimeoutOutcome> + Send>>>,
3693    ) -> Result<SupportedFunctionReturnValue, DbErrorReadWithTimeout> {
3694        let unique_tag: u64 = rand::random();
3695        let execution_id_clone = execution_id.clone();
3696
3697        let cleanup = || {
3698            let mut guard = self.execution_finished_subscribers.lock().unwrap();
3699            if let Some(subscribers) = guard.get_mut(&execution_id_clone) {
3700                subscribers.remove(&unique_tag);
3701            }
3702        };
3703
3704        let receiver = {
3705            let mut client_guard = self.client.lock().await;
3706            let tx = client_guard.transaction().await?;
3707
3708            // Register listener
3709            let (sender, receiver) = oneshot::channel();
3710            {
3711                let mut guard = self.execution_finished_subscribers.lock().unwrap();
3712                guard
3713                    .entry(execution_id.clone())
3714                    .or_default()
3715                    .insert(unique_tag, sender);
3716            }
3717
3718            let pending_state = get_combined_state(&tx, execution_id)
3719                .await?
3720                .execution_with_state
3721                .pending_state;
3722
3723            if let PendingState::Finished(finished) = pending_state {
3724                let event = get_execution_event(&tx, execution_id, finished.version).await?;
3725                tx.commit().await?;
3726                cleanup();
3727
3728                if let ExecutionRequest::Finished { result, .. } = event.event {
3729                    return Ok(result);
3730                }
3731                error!("Mismatch, expected Finished row: {event:?} based on t_state {finished}");
3732                return Err(DbErrorReadWithTimeout::from(consistency_db_err(
3733                    "cannot get finished event based on t_state version",
3734                )));
3735            }
3736            tx.commit().await?;
3737            receiver
3738        };
3739
3740        let timeout_fut = timeout_fut.unwrap_or_else(|| Box::pin(std::future::pending()));
3741        let res = tokio::select! {
3742            resp = receiver => {
3743                match resp {
3744                    Ok(retval) => Ok(retval),
3745                    Err(_recv_err) => Err(DbErrorGeneric::Close.into())
3746                }
3747            }
3748            outcome = timeout_fut => Err(DbErrorReadWithTimeout::Timeout(outcome)),
3749        };
3750
3751        cleanup();
3752        res
3753    }
3754
3755    #[instrument(level = Level::DEBUG, skip_all, fields(%join_set_id, %execution_id))]
3756    async fn append_delay_response(
3757        &self,
3758        created_at: DateTime<Utc>,
3759        execution_id: ExecutionId,
3760        join_set_id: JoinSetId,
3761        delay_id: DelayId,
3762        result: Result<(), ()>,
3763    ) -> Result<AppendDelayResponseOutcome, DbErrorWrite> {
3764        debug!("append_delay_response");
3765        let event = JoinSetResponseEventOuter {
3766            created_at,
3767            event: JoinSetResponseEvent {
3768                join_set_id,
3769                event: JoinSetResponse::DelayFinished {
3770                    delay_id: delay_id.clone(),
3771                    result,
3772                },
3773            },
3774        };
3775
3776        let mut client_guard = self.client.lock().await;
3777        let tx = client_guard.transaction().await?;
3778
3779        let res = append_response(&tx, &execution_id, event).await;
3780
3781        match res {
3782            Ok(notifier) => {
3783                tx.commit().await?;
3784                drop(client_guard);
3785                self.notify_all(vec![notifier], created_at);
3786                Ok(AppendDelayResponseOutcome::Success)
3787            }
3788            Err(DbErrorWrite::NonRetriable(DbErrorWriteNonRetriable::Conflict)) => {
3789                // Check if already finished
3790                // Roll back the failed tx, start new read tx.
3791                tx.rollback().await?;
3792
3793                // Start new tx for check
3794                let tx = client_guard.transaction().await?;
3795                let delay_success = delay_response(&tx, &execution_id, &delay_id).await?;
3796                tx.commit().await?;
3797
3798                match delay_success {
3799                    Some(true) => Ok(AppendDelayResponseOutcome::AlreadyFinished),
3800                    Some(false) => Ok(AppendDelayResponseOutcome::AlreadyCancelled),
3801                    None => Err(DbErrorWrite::Generic(consistency_db_err(
3802                        "insert failed yet select did not find the response",
3803                    ))),
3804                }
3805            }
3806            Err(err) => {
3807                let _ = tx.rollback().await; // cleanup
3808                Err(err)
3809            }
3810        }
3811    }
3812
3813    #[instrument(level = Level::DEBUG, skip_all)]
3814    async fn append_backtrace(&self, append: BacktraceInfo) -> Result<(), DbErrorWrite> {
3815        debug!("append_backtrace");
3816        let mut client_guard = self.client.lock().await;
3817        let tx = client_guard.transaction().await?;
3818
3819        append_backtrace(&tx, &append).await?;
3820
3821        tx.commit().await?;
3822        Ok(())
3823    }
3824
3825    #[instrument(level = Level::DEBUG, skip_all)]
3826    async fn append_backtrace_batch(&self, batch: Vec<BacktraceInfo>) -> Result<(), DbErrorWrite> {
3827        debug!("append_backtrace_batch");
3828        let mut client_guard = self.client.lock().await;
3829        let tx = client_guard.transaction().await?;
3830
3831        for append in batch {
3832            append_backtrace(&tx, &append).await?;
3833        }
3834
3835        tx.commit().await?;
3836        Ok(())
3837    }
3838
3839    #[instrument(level = Level::DEBUG, skip_all)]
3840    async fn append_log(&self, row: LogInfoAppendRow) -> Result<(), DbErrorWrite> {
3841        trace!("append_log");
3842        let mut client_guard = self.client.lock().await;
3843        let tx = client_guard.transaction().await?;
3844        append_log(&tx, &row).await?;
3845        tx.commit().await?;
3846
3847        Ok(())
3848    }
3849
3850    #[instrument(level = Level::DEBUG, skip_all)]
3851    async fn append_log_batch(&self, batch: &[LogInfoAppendRow]) -> Result<(), DbErrorWrite> {
3852        trace!("append_log_batch");
3853        let mut client_guard = self.client.lock().await;
3854        let tx = client_guard.transaction().await?;
3855        for row in batch {
3856            append_log(&tx, row).await?;
3857        }
3858        tx.commit().await?;
3859        Ok(())
3860    }
3861
3862    /// Get currently expired delays and locks.
3863    #[instrument(level = Level::TRACE, skip(self))]
3864    async fn get_expired_timers(
3865        &self,
3866        at: DateTime<Utc>,
3867    ) -> Result<Vec<ExpiredTimer>, DbErrorGeneric> {
3868        let mut client_guard = self.client.lock().await;
3869        let tx = client_guard.transaction().await?;
3870
3871        // Expired Delays
3872        let rows = tx
3873            .query(
3874                "SELECT execution_id, join_set_id, delay_id FROM t_delay WHERE expires_at <= $1",
3875                &[&at],
3876            )
3877            .await?;
3878
3879        let mut expired_timers = Vec::with_capacity(rows.len());
3880        for row in rows {
3881            let unpack = || -> Result<ExpiredTimer, DbErrorGeneric> {
3882                let execution_id: String = get(&row, "execution_id")?;
3883                let execution_id = ExecutionId::from_str(&execution_id)?;
3884                let join_set_id: String = get(&row, "join_set_id")?;
3885                let join_set_id = JoinSetId::from_str(&join_set_id)?;
3886                let delay_id: String = get(&row, "delay_id")?;
3887                let delay_id = DelayId::from_str(&delay_id)?;
3888
3889                Ok(ExpiredTimer::Delay(ExpiredDelay {
3890                    execution_id,
3891                    join_set_id,
3892                    delay_id,
3893                }))
3894            };
3895
3896            match unpack() {
3897                Ok(timer) => expired_timers.push(timer),
3898                Err(err) => warn!("Skipping corrupted row in get_expired_timers (delays): {err:?}"),
3899            }
3900        }
3901
3902        // Expired Locks
3903        let rows = tx.query(
3904            &format!(
3905                "SELECT execution_id, last_lock_version, corresponding_version, intermittent_event_count, max_retries, retry_exp_backoff_millis, executor_id, run_id \
3906                 FROM t_state \
3907                 WHERE pending_expires_finished <= $1 AND state = '{STATE_LOCKED}'"
3908            ),
3909            &[&at]
3910        ).await?;
3911
3912        for row in rows {
3913            let unpack = || -> Result<ExpiredTimer, DbErrorGeneric> {
3914                let execution_id: String = get(&row, "execution_id")?;
3915                let execution_id = ExecutionId::from_str(&execution_id)?;
3916                let last_lock_version: i64 = get(&row, "last_lock_version")?;
3917                let last_lock_version = Version::try_from(last_lock_version)?;
3918
3919                let corresponding_version: i64 = get(&row, "corresponding_version")?;
3920                let corresponding_version = Version::try_from(corresponding_version)?;
3921
3922                let intermittent_event_count =
3923                    u32::try_from(get::<i64, _>(&row, "intermittent_event_count")?).map_err(
3924                        |_| consistency_db_err("`intermittent_event_count` must not be negative"),
3925                    )?;
3926
3927                let max_retries = get::<Option<i64>, _>(&row, "max_retries")?
3928                    .map(u32::try_from)
3929                    .transpose()
3930                    .map_err(|_| consistency_db_err("`max_retries` must not be negative"))?;
3931                let retry_exp_backoff_millis =
3932                    u32::try_from(get::<i64, _>(&row, "retry_exp_backoff_millis")?).map_err(
3933                        |_| consistency_db_err("`retry_exp_backoff_millis` must not be negative"),
3934                    )?;
3935                let executor_id: String = get(&row, "executor_id")?;
3936                let executor_id = ExecutorId::from_str(&executor_id)?;
3937                let run_id: String = get(&row, "run_id")?;
3938                let run_id = RunId::from_str(&run_id)?;
3939
3940                Ok(ExpiredTimer::Lock(ExpiredLock {
3941                    execution_id,
3942                    locked_at_version: last_lock_version,
3943                    next_version: corresponding_version.increment(),
3944                    intermittent_event_count,
3945                    max_retries,
3946                    retry_exp_backoff: Duration::from_millis(u64::from(retry_exp_backoff_millis)),
3947                    locked_by: LockedBy {
3948                        executor_id,
3949                        run_id,
3950                    },
3951                }))
3952            };
3953
3954            match unpack() {
3955                Ok(timer) => expired_timers.push(timer),
3956                Err(err) => warn!("Skipping corrupted row in get_expired_timers (locks): {err:?}"),
3957            }
3958        }
3959
3960        tx.commit().await?;
3961
3962        if !expired_timers.is_empty() {
3963            debug!("get_expired_timers found {expired_timers:?}");
3964        }
3965        Ok(expired_timers)
3966    }
3967
3968    async fn get_execution_event(
3969        &self,
3970        execution_id: &ExecutionId,
3971        version: &Version,
3972    ) -> Result<ExecutionEvent, DbErrorRead> {
3973        let mut client_guard = self.client.lock().await;
3974        let tx = client_guard.transaction().await?;
3975
3976        let event = get_execution_event(&tx, execution_id, version.0).await?;
3977
3978        tx.commit().await?;
3979        Ok(event)
3980    }
3981
3982    async fn get_pending_state(
3983        &self,
3984        execution_id: &ExecutionId,
3985    ) -> Result<ExecutionWithState, DbErrorRead> {
3986        let mut client_guard = self.client.lock().await;
3987        let tx = client_guard.transaction().await?;
3988
3989        let combined_state = get_combined_state(&tx, execution_id).await?;
3990
3991        tx.commit().await?;
3992        Ok(combined_state.execution_with_state)
3993    }
3994}
3995
3996#[async_trait]
3997impl DbExternalApi for PostgresConnection {
3998    #[instrument(skip(self))]
3999    async fn get_backtrace(
4000        &self,
4001        execution_id: &ExecutionId,
4002        filter: BacktraceFilter,
4003    ) -> Result<BacktraceInfo, DbErrorRead> {
4004        debug!("get_backtrace");
4005
4006        let mut client_guard = self.client.lock().await;
4007        let tx = client_guard.transaction().await?;
4008
4009        let mut params: Vec<Box<dyn tokio_postgres::types::ToSql + Sync + Send>> = Vec::new();
4010
4011        params.push(Box::new(execution_id.to_string())); // $1
4012        let p_execution_id_idx = format!("${}", params.len()); // $1
4013
4014        let mut sql = String::new();
4015        write!(
4016            &mut sql,
4017            "SELECT component_id, version_min_including, version_max_excluding, wasm_backtrace \
4018            FROM t_execution_backtrace e INNER JOIN t_wasm_backtrace w ON e.backtrace_hash = w.backtrace_hash \
4019            WHERE execution_id = {p_execution_id_idx}"
4020        )
4021        .unwrap();
4022
4023        match &filter {
4024            BacktraceFilter::Specific(version) => {
4025                params.push(Box::new(i64::from(version.0))); // $2
4026                let p_ver_idx = format!("${}", params.len()); // $2
4027                write!(
4028                    &mut sql,
4029                    " AND version_min_including <= {p_ver_idx} AND version_max_excluding > {p_ver_idx}"
4030                )
4031                .unwrap();
4032            }
4033            BacktraceFilter::First => {
4034                sql.push_str(" ORDER BY version_min_including LIMIT 1");
4035            }
4036            BacktraceFilter::Last => {
4037                sql.push_str(" ORDER BY version_min_including DESC LIMIT 1");
4038            }
4039        }
4040
4041        let params_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> =
4042            params.iter().map(|p| p.as_ref() as _).collect();
4043
4044        let row = tx.query_one(&sql, &params_refs).await?;
4045
4046        let component_id: Json<ComponentId> = get(&row, "component_id")?;
4047        let component_id = component_id.0;
4048
4049        let version_min_including =
4050            Version::try_from(get::<i64, _>(&row, "version_min_including")?)?;
4051
4052        let version_max_excluding =
4053            Version::try_from(get::<i64, _>(&row, "version_max_excluding")?)?;
4054
4055        // wasm_backtrace stored as JSONB
4056        let wasm_backtrace: Json<WasmBacktrace> = get(&row, "wasm_backtrace")?;
4057        let wasm_backtrace = wasm_backtrace.0;
4058
4059        tx.commit().await?;
4060
4061        Ok(BacktraceInfo {
4062            execution_id: execution_id.clone(),
4063            component_id,
4064            version_min_including,
4065            version_max_excluding,
4066            wasm_backtrace,
4067        })
4068    }
4069
4070    #[instrument(skip(self))]
4071    async fn list_executions(
4072        &self,
4073        filter: ListExecutionsFilter,
4074        pagination: ExecutionListPagination,
4075    ) -> Result<Vec<ExecutionWithState>, DbErrorGeneric> {
4076        let mut client_guard = self.client.lock().await;
4077        let tx = client_guard.transaction().await?;
4078
4079        let result = list_executions(&tx, filter, &pagination).await?;
4080
4081        tx.commit().await?;
4082        Ok(result)
4083    }
4084
4085    #[instrument(skip(self))]
4086    async fn list_execution_events(
4087        &self,
4088        execution_id: &ExecutionId,
4089        pagination: Pagination<VersionType>,
4090        include_backtrace_id: bool,
4091    ) -> Result<ListExecutionEventsResponse, DbErrorRead> {
4092        let mut client_guard = self.client.lock().await;
4093        let tx = client_guard.transaction().await?;
4094
4095        let events =
4096            list_execution_events(&tx, execution_id, pagination, include_backtrace_id).await?;
4097        let max_version = get_max_version(&tx, execution_id).await?;
4098
4099        tx.commit().await?;
4100        Ok(ListExecutionEventsResponse {
4101            events,
4102            max_version,
4103        })
4104    }
4105
4106    #[instrument(skip(self))]
4107    async fn list_responses(
4108        &self,
4109        execution_id: &ExecutionId,
4110        pagination: Pagination<u32>,
4111    ) -> Result<ListResponsesResponse, DbErrorRead> {
4112        let mut client_guard = self.client.lock().await;
4113        let tx = client_guard.transaction().await?;
4114
4115        let responses = list_responses(&tx, execution_id, Some(pagination)).await?;
4116        let max_cursor = get_max_response_cursor(&tx, execution_id).await?;
4117
4118        tx.commit().await?;
4119        Ok(ListResponsesResponse {
4120            responses,
4121            max_cursor,
4122        })
4123    }
4124
4125    #[instrument(skip(self))]
4126    async fn list_execution_events_responses(
4127        &self,
4128        execution_id: &ExecutionId,
4129        req_since: &Version,
4130        req_max_length: VersionType,
4131        req_include_backtrace_id: bool,
4132        resp_pagination: Pagination<u32>,
4133    ) -> Result<ExecutionWithStateRequestsResponses, DbErrorRead> {
4134        let mut client_guard = self.client.lock().await;
4135        let tx = client_guard.transaction().await?;
4136
4137        let combined_state = get_combined_state(&tx, execution_id).await?;
4138
4139        let events = list_execution_events(
4140            &tx,
4141            execution_id,
4142            Pagination::NewerThan {
4143                length: req_max_length
4144                    .try_into()
4145                    .expect("req_max_length fits in u16"),
4146                cursor: req_since.0,
4147                including_cursor: true,
4148            },
4149            req_include_backtrace_id,
4150        )
4151        .await?;
4152
4153        let responses = list_responses(&tx, execution_id, Some(resp_pagination)).await?;
4154        let max_version = get_max_version(&tx, execution_id).await?;
4155        let max_cursor = get_max_response_cursor(&tx, execution_id).await?;
4156
4157        tx.commit().await?;
4158
4159        Ok(ExecutionWithStateRequestsResponses {
4160            execution_with_state: combined_state.execution_with_state,
4161            events,
4162            responses,
4163            max_version,
4164            max_cursor,
4165        })
4166    }
4167
4168    #[instrument(skip(self))]
4169    async fn upgrade_execution_component(
4170        &self,
4171        execution_id: &ExecutionId,
4172        old: &InputContentDigest,
4173        new: &InputContentDigest,
4174    ) -> Result<(), DbErrorWrite> {
4175        let mut client_guard = self.client.lock().await;
4176        let tx = client_guard.transaction().await?;
4177
4178        upgrade_execution_component(&tx, execution_id, old, new).await?;
4179
4180        tx.commit().await?;
4181        Ok(())
4182    }
4183
4184    #[instrument(skip(self))]
4185    async fn list_logs(
4186        &self,
4187        execution_id: &ExecutionId,
4188        filter: LogFilter,
4189        pagination: Pagination<u32>,
4190    ) -> Result<ListLogsResponse, DbErrorRead> {
4191        let mut client_guard = self.client.lock().await;
4192        let tx = client_guard.transaction().await?;
4193        let responses = list_logs_tx(&tx, execution_id, &filter, &pagination).await?;
4194        tx.commit().await?;
4195        Ok(responses)
4196    }
4197
4198    #[instrument(skip(self))]
4199    async fn list_deployment_states(
4200        &self,
4201        current_time: DateTime<Utc>,
4202        pagination: Pagination<Option<DeploymentId>>,
4203    ) -> Result<Vec<DeploymentState>, DbErrorRead> {
4204        let mut client_guard = self.client.lock().await;
4205        let tx = client_guard.transaction().await?;
4206        let deployments = list_deployment_states(&tx, current_time, pagination).await?;
4207        tx.commit().await?;
4208        Ok(deployments)
4209    }
4210
4211    #[instrument(skip(self))]
4212    async fn pause_execution(
4213        &self,
4214        execution_id: &ExecutionId,
4215        paused_at: DateTime<Utc>,
4216    ) -> Result<AppendResponse, DbErrorWrite> {
4217        let mut client_guard = self.client.lock().await;
4218        let tx = client_guard.transaction().await?;
4219
4220        let combined_state = get_combined_state(&tx, execution_id).await?;
4221        let appending_version = combined_state.get_next_version_fail_if_finished()?;
4222        debug!("Pausing with {appending_version}");
4223        let (next_version, _) = append(
4224            &tx,
4225            execution_id,
4226            AppendRequest {
4227                created_at: paused_at,
4228                event: ExecutionRequest::Paused,
4229            },
4230            appending_version,
4231        )
4232        .await?;
4233
4234        tx.commit().await?;
4235        Ok(next_version)
4236    }
4237
4238    #[instrument(skip(self))]
4239    async fn unpause_execution(
4240        &self,
4241        execution_id: &ExecutionId,
4242        unpaused_at: DateTime<Utc>,
4243    ) -> Result<AppendResponse, DbErrorWrite> {
4244        let mut client_guard = self.client.lock().await;
4245        let tx = client_guard.transaction().await?;
4246
4247        let combined_state = get_combined_state(&tx, execution_id).await?;
4248        let appending_version = combined_state.get_next_version_fail_if_finished()?;
4249        debug!("Unpausing with {appending_version}");
4250        let (next_version, _) = append(
4251            &tx,
4252            execution_id,
4253            AppendRequest {
4254                created_at: unpaused_at,
4255                event: ExecutionRequest::Unpaused,
4256            },
4257            appending_version,
4258        )
4259        .await?;
4260
4261        tx.commit().await?;
4262        Ok(next_version)
4263    }
4264}
4265
4266#[async_trait]
4267impl DbPoolCloseable for PostgresPool {
4268    async fn close(&self) {
4269        self.pool.close();
4270    }
4271}
4272
4273#[cfg(feature = "test")]
4274#[async_trait]
4275impl concepts::storage::DbConnectionTest for PostgresConnection {
4276    #[instrument(level = Level::DEBUG, skip(self, response_event), fields(join_set_id = %response_event.join_set_id))]
4277    async fn append_response(
4278        &self,
4279        created_at: DateTime<Utc>,
4280        execution_id: ExecutionId,
4281        response_event: JoinSetResponseEvent,
4282    ) -> Result<(), DbErrorWrite> {
4283        debug!("append_response");
4284        let event = JoinSetResponseEventOuter {
4285            created_at,
4286            event: response_event,
4287        };
4288
4289        let mut client_guard = self.client.lock().await;
4290        let tx = client_guard.transaction().await?;
4291
4292        let notifier = append_response(&tx, &execution_id, event).await?;
4293
4294        tx.commit().await?;
4295        drop(client_guard);
4296
4297        self.notify_all(vec![notifier], created_at);
4298        Ok(())
4299    }
4300}
4301
4302#[cfg(feature = "test")]
4303impl PostgresPool {
4304    pub async fn drop_database(&self) {
4305        let mut cfg = deadpool_postgres::Config::new();
4306        cfg.host = Some(self.config.host.clone());
4307        cfg.user = Some(self.config.user.clone());
4308        cfg.password = Some(self.config.password.clone());
4309        cfg.dbname = Some(ADMIN_DB_NAME.into());
4310        cfg.manager = Some(ManagerConfig {
4311            recycling_method: RecyclingMethod::Fast,
4312        });
4313
4314        let pool = cfg
4315            .create_pool(None, NoTls)
4316            .map_err(|err| {
4317                error!("Cannot create the default pool - {err:?}");
4318                InitializationError
4319            })
4320            .unwrap();
4321
4322        let client = pool
4323            .get()
4324            .await
4325            .map_err(|err| {
4326                error!("Cannot get a connection from the default pool - {err:?}");
4327                InitializationError
4328            })
4329            .unwrap();
4330        for _ in 0..3 {
4331            let res = client
4332                .execute(&format!("DROP DATABASE {}", self.config.db_name), &[])
4333                .await; // Waits 1s on error, no need to sleep more.
4334            if res.is_ok() {
4335                debug!("Database '{}' dropped.", self.config.db_name);
4336                return;
4337            }
4338            debug!("Dropping db failed - {res:?}",);
4339        }
4340        warn!("Did not drop database {}", self.config.db_name);
4341    }
4342}