Skip to main content

obeli_sk_db_postgres/
postgres_dao.rs

1use crate::postgres_dao::ddl::{ADMIN_DB_NAME, T_METADATA_EXPECTED_SCHEMA_VERSION};
2use async_trait::async_trait;
3use chrono::{DateTime, Utc};
4use concepts::{
5    ComponentId, ComponentRetryConfig, ComponentType, ExecutionId, FunctionFqn, JoinSetId,
6    StrVariant, SupportedFunctionReturnValue,
7    component_id::{ComponentDigest, Digest},
8    prefixed_ulid::{DelayId, DeploymentId, ExecutionIdDerived, ExecutorId, RunId},
9    storage::{
10        AppendBatchResponse, AppendDelayResponseOutcome, AppendEventsToExecution, AppendRequest,
11        AppendResponse, AppendResponseToExecution, BacktraceFilter, BacktraceInfo, CreateRequest,
12        DUMMY_CREATED, DUMMY_HISTORY_EVENT, DbConnection, DbErrorGeneric, DbErrorRead,
13        DbErrorReadWithTimeout, DbErrorWrite, DbErrorWriteNonRetriable, DbExecutor, DbExternalApi,
14        DbPool, DbPoolCloseable, DeploymentRecord, DeploymentState, DeploymentStatus,
15        ExecutionEvent, ExecutionListPagination, ExecutionRequest, ExecutionWithState,
16        ExecutionWithStateRequestsResponses, ExpiredDelay, ExpiredLock, ExpiredTimer,
17        HISTORY_EVENT_TYPE_JOIN_NEXT, HistoryEvent, JoinSetRequest, JoinSetResponse,
18        JoinSetResponseEvent, JoinSetResponseEventOuter, ListExecutionEventsResponse,
19        ListExecutionsFilter, ListLogsResponse, ListResponsesResponse, LockPendingResponse, Locked,
20        LockedBy, LockedExecution, LogEntry, LogEntryRow, LogFilter, LogInfoAppendRow, LogLevel,
21        LogStreamType, Pagination, PendingState, PendingStateBlockedByJoinSet,
22        PendingStateFinishedResultKind, PendingStateMergedPause, ResponseCursor,
23        ResponseWithCursor, STATE_BLOCKED_BY_JOIN_SET, STATE_FINISHED, STATE_LOCKED,
24        STATE_PENDING_AT, TimeoutOutcome, Version, VersionType, WasmBacktrace,
25    },
26};
27use db_common::{
28    AppendNotifier, CombinedState, CombinedStateDTO, NotifierExecutionFinished, NotifierPendingAt,
29    PendingFfqnSubscribersHolder,
30};
31use deadpool_postgres::{Client, ManagerConfig, Pool, RecyclingMethod};
32use hashbrown::HashMap;
33use secrecy::{ExposeSecret as _, SecretString};
34use sha2::{Digest as _, Sha256};
35use std::{collections::VecDeque, pin::Pin, str::FromStr as _, sync::Arc, time::Duration};
36use std::{fmt::Write as _, panic::Location};
37use strum::IntoEnumIterator as _;
38use tokio::sync::{mpsc, oneshot};
39use tokio_postgres::{
40    NoTls, Row, Transaction,
41    row::RowIndex,
42    types::{FromSql, Json, ToSql},
43};
44use tracing::{Level, debug, error, info, instrument, trace, warn};
45use tracing_error::SpanTrace;
46
47#[track_caller]
48fn get<'a, T: FromSql<'a>, I: RowIndex + std::fmt::Display + Copy>(
49    row: &'a Row,
50    name: I,
51) -> Result<T, DbErrorGeneric> {
52    match row.try_get(name) {
53        Ok(ok) => Ok(ok),
54        Err(err) => {
55            // no map_err, cannot attach `track_caller`
56            Err(consistency_db_err(format!(
57                "Failed to retrieve column '{name}': {err:?}"
58            )))
59        }
60    }
61}
62
63mod ddl {
64    use super::{STATE_LOCKED, STATE_PENDING_AT};
65    use concepts::storage::HISTORY_EVENT_TYPE_JOIN_NEXT;
66    use const_format::formatcp;
67
68    pub const ADMIN_DB_NAME: &str = "postgres";
69
70    pub const T_METADATA_EXPECTED_SCHEMA_VERSION: i32 = 3;
71
72    // T_METADATA
73    pub const CREATE_TABLE_T_METADATA: &str = r"
74CREATE TABLE IF NOT EXISTS t_metadata (
75    id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
76    schema_version INTEGER NOT NULL,
77    created_at TIMESTAMPTZ NOT NULL
78);
79";
80
81    // T_EXECUTION_LOG
82    pub const CREATE_TABLE_T_EXECUTION_LOG: &str = r"
83CREATE TABLE IF NOT EXISTS t_execution_log (
84    execution_id TEXT NOT NULL,
85    created_at TIMESTAMPTZ NOT NULL,
86    json_value JSON NOT NULL,
87    version BIGINT NOT NULL CHECK (version >= 0),
88    variant TEXT NOT NULL,
89    join_set_id TEXT,
90    history_event_type TEXT GENERATED ALWAYS AS (json_value #>> '{history_event,event,type}') STORED,
91    PRIMARY KEY (execution_id, version)
92);
93";
94
95    // Indexes for t_execution_log
96    pub const CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_VERSION: &str = r"
97CREATE INDEX IF NOT EXISTS idx_t_execution_log_execution_id_version ON t_execution_log (execution_id, version);
98";
99
100    pub const CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_VARIANT: &str = r"
101CREATE INDEX IF NOT EXISTS idx_t_execution_log_execution_id_variant ON t_execution_log (execution_id, variant);
102";
103
104    pub const CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_JOIN_SET: &str = const_format::formatcp!(
105        "CREATE INDEX IF NOT EXISTS idx_t_execution_log_execution_id_join_set ON t_execution_log
106        (execution_id, join_set_id, history_event_type) WHERE history_event_type='{}';",
107        HISTORY_EVENT_TYPE_JOIN_NEXT
108    );
109
110    // T_JOIN_SET_RESPONSE
111    pub const CREATE_TABLE_T_JOIN_SET_RESPONSE: &str = r"
112CREATE TABLE IF NOT EXISTS t_join_set_response (
113    id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
114    created_at TIMESTAMPTZ NOT NULL,
115    execution_id TEXT NOT NULL,
116    join_set_id TEXT NOT NULL,
117
118    delay_id TEXT,
119    delay_success BOOLEAN,
120
121    child_execution_id TEXT,
122    finished_version BIGINT CHECK (finished_version >= 0),
123
124    UNIQUE (execution_id, join_set_id, delay_id, child_execution_id)
125);
126";
127
128    // Indexes for t_join_set_response
129    pub const CREATE_INDEX_IDX_T_JOIN_SET_RESPONSE_EXECUTION_ID_ID: &str = r"
130CREATE INDEX IF NOT EXISTS idx_t_join_set_response_execution_id_id ON t_join_set_response (execution_id, id);
131";
132
133    pub const CREATE_INDEX_IDX_JOIN_SET_RESPONSE_UNIQUE_CHILD_ID: &str = r"
134CREATE UNIQUE INDEX IF NOT EXISTS idx_join_set_response_unique_child_id
135ON t_join_set_response (child_execution_id) WHERE child_execution_id IS NOT NULL;
136";
137
138    pub const CREATE_INDEX_IDX_JOIN_SET_RESPONSE_UNIQUE_DELAY_ID: &str = r"
139CREATE UNIQUE INDEX IF NOT EXISTS idx_join_set_response_unique_delay_id
140ON t_join_set_response (delay_id) WHERE delay_id IS NOT NULL;
141";
142
143    // T_STATE
144    pub const CREATE_TABLE_T_STATE: &str = r"
145CREATE TABLE IF NOT EXISTS t_state (
146    execution_id TEXT NOT NULL,
147    is_top_level BOOLEAN NOT NULL,
148    corresponding_version BIGINT NOT NULL CHECK (corresponding_version >= 0),
149    ffqn TEXT NOT NULL,
150    created_at TIMESTAMPTZ NOT NULL,
151    component_id_input_digest BYTEA NOT NULL,
152    component_type TEXT NOT NULL,
153    first_scheduled_at TIMESTAMPTZ NOT NULL,
154    deployment_id TEXT NOT NULL,
155    is_paused BOOLEAN NOT NULL,
156
157    pending_expires_finished TIMESTAMPTZ NOT NULL,
158    state TEXT NOT NULL,
159    updated_at TIMESTAMPTZ NOT NULL,
160    intermittent_event_count BIGINT NOT NULL CHECK (intermittent_event_count >=0),
161
162    max_retries BIGINT CHECK (max_retries >= 0),
163    retry_exp_backoff_millis BIGINT CHECK (retry_exp_backoff_millis >= 0),
164    last_lock_version BIGINT CHECK (last_lock_version >= 0),
165    executor_id TEXT,
166    run_id TEXT,
167
168    join_set_id TEXT,
169    join_set_closing BOOLEAN,
170
171    result_kind JSONB,
172
173    PRIMARY KEY (execution_id)
174);
175";
176
177    // Indexes for t_state
178    // For `get_pending_of_single_ffqn`
179    pub const IDX_T_STATE_LOCK_PENDING_BY_FFQN: &str = formatcp!(
180        "CREATE INDEX IF NOT EXISTS idx_t_state_lock_pending_by_ffqn ON t_state (state, pending_expires_finished, ffqn) WHERE state = '{}';",
181        STATE_PENDING_AT
182    );
183    // For `get_pending_by_component_input_digest`
184    pub const IDX_T_STATE_LOCK_PENDING_BY_COMPONENT: &str = formatcp!(
185        "CREATE INDEX IF NOT EXISTS idx_t_state_lock_pending_by_component ON t_state (state, pending_expires_finished, component_id_input_digest) WHERE state = '{}';",
186        STATE_PENDING_AT
187    );
188
189    pub const IDX_T_STATE_EXPIRED_LOCKS: &str = formatcp!(
190        "CREATE INDEX IF NOT EXISTS idx_t_state_expired_locks ON t_state (pending_expires_finished) WHERE state = '{}';",
191        STATE_LOCKED
192    );
193
194    pub const IDX_T_STATE_EXECUTION_ID_IS_TOP_LEVEL: &str = r"
195CREATE INDEX IF NOT EXISTS idx_t_state_execution_id_is_root ON t_state (execution_id, is_top_level);
196";
197
198    pub const IDX_T_STATE_FFQN: &str = r"
199CREATE INDEX IF NOT EXISTS idx_t_state_ffqn ON t_state (ffqn);
200";
201
202    pub const IDX_T_STATE_CREATED_AT: &str = r"
203CREATE INDEX IF NOT EXISTS idx_t_state_created_at ON t_state (created_at);
204";
205    // For `list_deployment_states`
206    pub const IDX_T_STATE_DEPLOYMENT_STATE: &str = r"
207CREATE INDEX IF NOT EXISTS idx_t_state_deployment_state ON t_state (deployment_id, state);
208";
209
210    // T_DELAY
211    pub const CREATE_TABLE_T_DELAY: &str = r"
212CREATE TABLE IF NOT EXISTS t_delay (
213    execution_id TEXT NOT NULL,
214    join_set_id TEXT NOT NULL,
215    delay_id TEXT NOT NULL,
216    expires_at TIMESTAMPTZ NOT NULL,
217    PRIMARY KEY (execution_id, join_set_id, delay_id)
218);
219";
220
221    // Backtrace
222
223    pub const CREATE_TABLE_T_WASM_BACKTRACE: &str = r"
224CREATE TABLE IF NOT EXISTS t_wasm_backtrace (
225    backtrace_hash BYTEA PRIMARY KEY,
226    wasm_backtrace JSONB NOT NULL
227);
228";
229
230    pub const CREATE_TABLE_T_EXECUTION_BACKTRACE: &str = r"
231CREATE TABLE IF NOT EXISTS t_execution_backtrace (
232    execution_id TEXT NOT NULL,
233    component_id JSONB NOT NULL,
234    version_min_including BIGINT NOT NULL CHECK (version_min_including >= 0),
235    version_max_excluding BIGINT NOT NULL CHECK (version_max_excluding >= 0),
236    backtrace_hash BYTEA NOT NULL,
237    PRIMARY KEY (execution_id, version_min_including, version_max_excluding),
238    FOREIGN KEY (backtrace_hash) REFERENCES t_wasm_backtrace(backtrace_hash)
239);
240";
241
242    pub const IDX_T_EXECUTION_BACKTRACE_EXECUTION_ID_VERSION: &str = r"
243CREATE INDEX IF NOT EXISTS idx_t_execution_backtrace_execution_id_version ON t_execution_backtrace (execution_id, version_min_including, version_max_excluding);
244";
245
246    // Source files
247    pub const CREATE_TABLE_T_SOURCE_FILE: &str = r"
248CREATE TABLE IF NOT EXISTS t_source_file (
249    content_hash BYTEA PRIMARY KEY,
250    content      TEXT NOT NULL
251);
252";
253    pub const CREATE_TABLE_T_COMPONENT_SOURCE: &str = r"
254CREATE TABLE IF NOT EXISTS t_component_source (
255    component_digest BYTEA   NOT NULL,
256    frame_key        TEXT    NOT NULL,
257    is_suffix        BOOLEAN NOT NULL,
258    content_hash     BYTEA   NOT NULL,
259    PRIMARY KEY (component_digest, frame_key, is_suffix),
260    FOREIGN KEY (content_hash) REFERENCES t_source_file(content_hash)
261);
262";
263
264    // Logs & Std sterams
265    pub const CREATE_TABLE_T_LOG: &str = r"
266CREATE TABLE IF NOT EXISTS t_log (
267    id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
268    execution_id TEXT NOT NULL,
269    run_id TEXT NOT NULL,
270    created_at TIMESTAMPTZ NOT NULL,
271    level INTEGER,
272    message TEXT,
273    stream_type INTEGER,
274    payload BYTEA
275);
276";
277    pub const IDX_T_LOG_EXECUTION_ID_RUN_ID_CREATED_AT: &str = r"
278CREATE INDEX IF NOT EXISTS idx_t_log_execution_id_run_id_created_at ON t_log (execution_id, run_id, created_at);
279";
280    pub const IDX_T_LOG_EXECUTION_ID_CREATED_AT: &str = r"
281CREATE INDEX IF NOT EXISTS idx_t_log_execution_id_created_at ON t_log (execution_id, created_at);
282";
283
284    // T_DEPLOYMENT
285    pub const CREATE_TABLE_T_DEPLOYMENT: &str = r"
286CREATE TABLE IF NOT EXISTS t_deployment (
287    deployment_id TEXT     NOT NULL PRIMARY KEY,
288    created_at    TIMESTAMPTZ NOT NULL,
289    last_active_at TIMESTAMPTZ,
290    status        TEXT     NOT NULL DEFAULT 'inactive',
291    config_json      TEXT     NOT NULL,
292    obelisk_version  TEXT     NOT NULL,
293    created_by       TEXT
294);
295";
296    pub const IDX_T_DEPLOYMENT_STATUS: &str = r"
297CREATE INDEX IF NOT EXISTS idx_t_deployment_status ON t_deployment (status);
298";
299    // Enforces at most one active deployment at a time.
300    pub const IDX_T_DEPLOYMENT_SINGLE_ACTIVE: &str = r"
301CREATE UNIQUE INDEX IF NOT EXISTS idx_t_deployment_single_active ON t_deployment ((1)) WHERE status = 'active';
302";
303    // Enforces at most one enqueued deployment at a time.
304    pub const IDX_T_DEPLOYMENT_SINGLE_ENQUEUED: &str = r"
305CREATE UNIQUE INDEX IF NOT EXISTS idx_t_deployment_single_enqueued ON t_deployment ((1)) WHERE status = 'enqueued';
306";
307}
308
309#[derive(Debug, Clone)]
310pub struct PostgresConfig {
311    pub host: String,
312    pub user: String,
313    pub password: SecretString,
314    pub db_name: String,
315}
316
317#[derive(Debug, thiserror::Error)]
318#[error("initialization error")]
319pub struct InitializationError;
320
321async fn create_database(
322    config: &PostgresConfig,
323    provision_policy: ProvisionPolicy,
324) -> Result<DbInitialzationOutcome, InitializationError> {
325    let mut admin_cfg = deadpool_postgres::Config::new();
326    admin_cfg.host = Some(config.host.clone());
327    admin_cfg.user = Some(config.user.clone());
328    admin_cfg.password = Some(config.password.expose_secret().to_string());
329    admin_cfg.dbname = Some(ADMIN_DB_NAME.into());
330    admin_cfg.manager = Some(ManagerConfig {
331        recycling_method: RecyclingMethod::Fast,
332    });
333
334    let admin_pool = admin_cfg.create_pool(None, NoTls).map_err(|err| {
335        error!("Cannot create the default pool - {err:?}");
336        InitializationError
337    })?;
338
339    let client = admin_pool.get().await.map_err(|err| {
340        error!("Cannot get a connection from the default pool - {err:?}");
341        InitializationError
342    })?;
343
344    let row = client
345        .query_opt(
346            &format!(
347                "SELECT 1 FROM pg_database WHERE datname = '{}'",
348                config.db_name
349            ),
350            &[],
351        )
352        .await
353        .map_err(|err| {
354            error!("Cannot select from the default database - {err:?}");
355            InitializationError
356        })?;
357
358    match (row, provision_policy) {
359        (None, ProvisionPolicy::MustCreate | ProvisionPolicy::Auto) => {
360            client
361                .execute(&format!("CREATE DATABASE {}", config.db_name), &[])
362                .await
363                .map_err(|err| {
364                    error!("Cannot create the database - {err:?}");
365                    InitializationError
366                })?;
367            info!("Database '{}' created.", config.db_name);
368            Ok(DbInitialzationOutcome::Created)
369        }
370        (Some(_), ProvisionPolicy::Auto) => {
371            info!("Database '{}' exists.", config.db_name);
372            Ok(DbInitialzationOutcome::Existing)
373        }
374        (Some(_), ProvisionPolicy::MustCreate) => {
375            warn!("Database '{}' already exists.", config.db_name);
376            Err(InitializationError)
377        }
378        (_, ProvisionPolicy::NeverCreate) => unreachable!("checked by the caller"),
379    }
380}
381
382// All mutexes here have a very short critical section completely controlled by this module, thus using std mutex.
383type ResponseSubscribers =
384    Arc<std::sync::Mutex<HashMap<ExecutionId, (oneshot::Sender<ResponseWithCursor>, u64)>>>;
385type PendingSubscribers = Arc<std::sync::Mutex<PendingFfqnSubscribersHolder>>;
386type ExecutionFinishedSubscribers = std::sync::Mutex<
387    HashMap<ExecutionId, HashMap<u64, oneshot::Sender<SupportedFunctionReturnValue>>>,
388>;
389
390pub struct PostgresPool {
391    pool: Pool,
392    response_subscribers: ResponseSubscribers,
393    pending_subscribers: PendingSubscribers,
394    execution_finished_subscribers: Arc<ExecutionFinishedSubscribers>,
395    // only for tests.
396    pub config: PostgresConfig,
397}
398
399#[async_trait]
400impl DbPool for PostgresPool {
401    async fn db_exec_conn(&self) -> Result<Box<dyn DbExecutor>, DbErrorGeneric> {
402        let client = self.pool.get().await?;
403
404        Ok(Box::new(PostgresConnection {
405            client: tokio::sync::Mutex::new(client),
406            response_subscribers: self.response_subscribers.clone(),
407            pending_subscribers: self.pending_subscribers.clone(),
408            execution_finished_subscribers: self.execution_finished_subscribers.clone(),
409        }))
410    }
411
412    async fn connection(&self) -> Result<Box<dyn DbConnection>, DbErrorGeneric> {
413        let client = self.pool.get().await?;
414
415        Ok(Box::new(PostgresConnection {
416            client: tokio::sync::Mutex::new(client),
417            response_subscribers: self.response_subscribers.clone(),
418            pending_subscribers: self.pending_subscribers.clone(),
419            execution_finished_subscribers: self.execution_finished_subscribers.clone(),
420        }))
421    }
422
423    async fn external_api_conn(&self) -> Result<Box<dyn DbExternalApi>, DbErrorGeneric> {
424        let client = self.pool.get().await?;
425
426        Ok(Box::new(PostgresConnection {
427            client: tokio::sync::Mutex::new(client),
428            response_subscribers: self.response_subscribers.clone(),
429            pending_subscribers: self.pending_subscribers.clone(),
430            execution_finished_subscribers: self.execution_finished_subscribers.clone(),
431        }))
432    }
433
434    #[cfg(feature = "test")]
435    async fn connection_test(
436        &self,
437    ) -> Result<Box<dyn concepts::storage::DbConnectionTest>, DbErrorGeneric> {
438        let client = self.pool.get().await?;
439
440        Ok(Box::new(PostgresConnection {
441            client: tokio::sync::Mutex::new(client),
442            response_subscribers: self.response_subscribers.clone(),
443            pending_subscribers: self.pending_subscribers.clone(),
444            execution_finished_subscribers: self.execution_finished_subscribers.clone(),
445        }))
446    }
447}
448
449pub struct PostgresConnection {
450    client: tokio::sync::Mutex<Client>, // Callers should not hold onto a connection for too long but it is not controlled by this module, thus tokio mutex.
451    response_subscribers: ResponseSubscribers,
452    pending_subscribers: PendingSubscribers,
453    execution_finished_subscribers: Arc<ExecutionFinishedSubscribers>,
454}
455
456#[derive(Debug, Clone, Copy, PartialEq, Eq)]
457pub enum ProvisionPolicy {
458    NeverCreate,
459    /// Create database if it does not exist.
460    Auto,
461    /// Only for tests: Fail if database already exists.
462    MustCreate,
463}
464
465#[derive(Debug, Clone, Copy, PartialEq, Eq)]
466pub enum DbInitialzationOutcome {
467    Created,
468    Existing,
469}
470
471impl PostgresPool {
472    #[instrument(skip_all, name = "postgres_new")]
473    pub async fn new(
474        config: PostgresConfig,
475        provision_policy: ProvisionPolicy,
476    ) -> Result<PostgresPool, InitializationError> {
477        Self::new_with_outcome(config, provision_policy)
478            .await
479            .map(|(db, _)| db)
480    }
481
482    pub async fn new_with_outcome(
483        config: PostgresConfig,
484        provision_policy: ProvisionPolicy,
485    ) -> Result<(PostgresPool, DbInitialzationOutcome), InitializationError> {
486        let outcome = if matches!(
487            provision_policy,
488            ProvisionPolicy::Auto | ProvisionPolicy::MustCreate
489        ) {
490            create_database(&config, provision_policy).await?
491        } else {
492            DbInitialzationOutcome::Existing
493        };
494        let mut cfg = deadpool_postgres::Config::new();
495        cfg.host = Some(config.host.clone());
496        cfg.user = Some(config.user.clone());
497        cfg.password = Some(config.password.expose_secret().to_string());
498        cfg.dbname = Some(config.db_name.clone());
499        cfg.manager = Some(ManagerConfig {
500            recycling_method: RecyclingMethod::Fast,
501        });
502
503        let pool = cfg.create_pool(None, NoTls).map_err(|err| {
504            error!("Cannot create the database pool - {err:?}");
505            InitializationError
506        })?;
507        let client = pool.get().await.map_err(|err| {
508            error!("Cannot get a connection from the database pool - {err:?}");
509            InitializationError
510        })?;
511
512        let statements = vec![
513            ddl::CREATE_TABLE_T_METADATA,
514            ddl::CREATE_TABLE_T_EXECUTION_LOG,
515            ddl::CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_VERSION,
516            ddl::CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_VARIANT,
517            ddl::CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_JOIN_SET,
518            ddl::CREATE_TABLE_T_JOIN_SET_RESPONSE,
519            ddl::CREATE_INDEX_IDX_T_JOIN_SET_RESPONSE_EXECUTION_ID_ID,
520            ddl::CREATE_INDEX_IDX_JOIN_SET_RESPONSE_UNIQUE_CHILD_ID,
521            ddl::CREATE_INDEX_IDX_JOIN_SET_RESPONSE_UNIQUE_DELAY_ID,
522            ddl::CREATE_TABLE_T_STATE,
523            ddl::IDX_T_STATE_LOCK_PENDING_BY_FFQN,
524            ddl::IDX_T_STATE_LOCK_PENDING_BY_COMPONENT,
525            ddl::IDX_T_STATE_EXPIRED_LOCKS,
526            ddl::IDX_T_STATE_EXECUTION_ID_IS_TOP_LEVEL,
527            ddl::IDX_T_STATE_FFQN,
528            ddl::IDX_T_STATE_CREATED_AT,
529            ddl::IDX_T_STATE_DEPLOYMENT_STATE,
530            ddl::CREATE_TABLE_T_DELAY,
531            ddl::CREATE_TABLE_T_WASM_BACKTRACE,
532            ddl::CREATE_TABLE_T_EXECUTION_BACKTRACE,
533            ddl::IDX_T_EXECUTION_BACKTRACE_EXECUTION_ID_VERSION,
534            ddl::CREATE_TABLE_T_SOURCE_FILE,
535            ddl::CREATE_TABLE_T_COMPONENT_SOURCE,
536            ddl::CREATE_TABLE_T_LOG,
537            ddl::IDX_T_LOG_EXECUTION_ID_RUN_ID_CREATED_AT,
538            ddl::IDX_T_LOG_EXECUTION_ID_CREATED_AT,
539            ddl::CREATE_TABLE_T_DEPLOYMENT,
540            ddl::IDX_T_DEPLOYMENT_STATUS,
541            ddl::IDX_T_DEPLOYMENT_SINGLE_ACTIVE,
542            ddl::IDX_T_DEPLOYMENT_SINGLE_ENQUEUED,
543        ];
544
545        // Combine into one batch execution for atomicity per round-trip (or efficiency)
546        let batch_sql = statements.join("\n");
547        client.batch_execute(&batch_sql).await.map_err(|err| {
548            error!("Cannot run the DDL import - {err:?}");
549            InitializationError
550        })?;
551
552        let row = client
553            .query_opt(
554                "SELECT schema_version FROM t_metadata ORDER BY id DESC LIMIT 1",
555                &[],
556            )
557            .await
558            .map_err(|err| {
559                error!("Cannot select schema version - {err:?}");
560                InitializationError
561            })?;
562
563        // Postgres INTEGER maps to i32
564        let actual_version = match row {
565            Some(r) => Some(r.try_get::<_, i32>("schema_version").map_err(|e| {
566                error!("Failed to get schema_version column: {e}");
567                InitializationError
568            })?),
569            None => None,
570        };
571
572        match actual_version {
573            None => {
574                client
575                    .execute(
576                        "INSERT INTO t_metadata (schema_version, created_at) VALUES ($1, $2)",
577                        &[&(T_METADATA_EXPECTED_SCHEMA_VERSION), &Utc::now()],
578                    )
579                    .await
580                    .map_err(|err| {
581                        error!("Cannot insert schema version - {err:?}");
582                        InitializationError
583                    })?;
584            }
585            Some(actual_version) => {
586                // Fail on unexpected `schema_version`.
587                if (actual_version) != T_METADATA_EXPECTED_SCHEMA_VERSION {
588                    error!(
589                        "Wrong schema version, expected {T_METADATA_EXPECTED_SCHEMA_VERSION}, got {actual_version}"
590                    );
591                    return Err(InitializationError);
592                }
593            }
594        }
595
596        debug!("Database schema initialized.");
597
598        Ok((
599            PostgresPool {
600                pool,
601                execution_finished_subscribers: Arc::default(),
602                pending_subscribers: Arc::default(),
603                response_subscribers: Arc::default(),
604                config,
605            },
606            outcome,
607        ))
608    }
609}
610
611fn deployment_record_from_pg_row(row: &Row) -> Result<DeploymentRecord, DbErrorRead> {
612    let deployment_id_str: String = get(row, "deployment_id")?;
613    let deployment_id = deployment_id_str.parse::<DeploymentId>().map_err(|e| {
614        DbErrorRead::Generic(consistency_db_err(format!("invalid deployment_id: {e}")))
615    })?;
616    let status_str: String = get(row, "status")?;
617    let status = status_str
618        .parse::<DeploymentStatus>()
619        .map_err(|e| DbErrorRead::Generic(consistency_db_err(format!("invalid status: {e}"))))?;
620    Ok(DeploymentRecord {
621        deployment_id,
622        created_at: get(row, "created_at")?,
623        last_active_at: get(row, "last_active_at")?,
624        status,
625        config_json: get(row, "config_json")?,
626        obelisk_version: get(row, "obelisk_version")?,
627        created_by: get(row, "created_by")?,
628    })
629}
630
631#[track_caller]
632fn consistency_db_err(reason: impl Into<StrVariant>) -> DbErrorGeneric {
633    DbErrorGeneric::Uncategorized {
634        reason: reason.into(),
635        context: SpanTrace::capture(),
636        source: None,
637        loc: Location::caller(),
638    }
639}
640#[track_caller]
641fn consistency_db_err_src(
642    reason: impl Into<StrVariant>,
643    source: Arc<dyn std::error::Error + Send + Sync>,
644) -> DbErrorGeneric {
645    DbErrorGeneric::Uncategorized {
646        reason: reason.into(),
647        context: SpanTrace::capture(),
648        source: Some(source),
649        loc: Location::caller(),
650    }
651}
652
653#[derive(Debug, Clone)]
654struct DelayReq {
655    join_set_id: JoinSetId,
656    delay_id: DelayId,
657    expires_at: DateTime<Utc>,
658}
659
660async fn fetch_created_event(
661    tx: &Transaction<'_>,
662    execution_id: &ExecutionId,
663) -> Result<CreateRequest, DbErrorRead> {
664    let stmt = "SELECT created_at, json_value FROM t_execution_log WHERE \
665                execution_id = $1 AND version = 0";
666
667    let row = tx.query_one(stmt, &[&execution_id.to_string()]).await?;
668
669    let created_at = get(&row, "created_at")?;
670    let event: Json<ExecutionRequest> = get(&row, "json_value")?;
671    let event = event.0;
672
673    if let ExecutionRequest::Created {
674        ffqn,
675        params,
676        parent,
677        scheduled_at,
678        component_id,
679        deployment_id,
680        metadata,
681        scheduled_by,
682    } = event
683    {
684        Ok(CreateRequest {
685            created_at,
686            execution_id: execution_id.clone(),
687            ffqn,
688            params,
689            parent,
690            scheduled_at,
691            component_id,
692            deployment_id,
693            metadata,
694            scheduled_by,
695        })
696    } else {
697        error!("Row with version=0 must be a `Created` event - {event:?}");
698        Err(consistency_db_err("expected `Created` event").into())
699    }
700}
701
702fn check_expected_next_and_appending_version(
703    expected_version: &Version,
704    appending_version: &Version,
705) -> Result<(), DbErrorWrite> {
706    if *expected_version != *appending_version {
707        debug!(
708            "Version conflict - expected: {expected_version:?}, appending: {appending_version:?}"
709        );
710        return Err(DbErrorWrite::NonRetriable(
711            DbErrorWriteNonRetriable::VersionConflict {
712                expected: expected_version.clone(),
713                requested: appending_version.clone(),
714            },
715        ));
716    }
717    Ok(())
718}
719
720#[instrument(level = Level::DEBUG, skip_all, fields(execution_id = %req.execution_id))]
721async fn create_inner(
722    tx: &Transaction<'_>,
723    req: CreateRequest,
724) -> Result<(AppendResponse, AppendNotifier), DbErrorWrite> {
725    trace!("create_inner");
726
727    let version = Version::default();
728    let execution_id = req.execution_id.clone();
729    let execution_id_str = execution_id.to_string();
730    let ffqn = req.ffqn.clone();
731    let created_at = req.created_at;
732    let scheduled_at = req.scheduled_at;
733    let component_id = req.component_id.clone();
734    let deployment_id = req.deployment_id;
735
736    let event = ExecutionRequest::from(req);
737    let event = Json(event);
738
739    tx.execute(
740        "INSERT INTO t_execution_log (
741            execution_id, created_at, version, json_value, variant, join_set_id
742        ) VALUES ($1, $2, $3, $4, $5, $6)",
743        &[
744            &execution_id_str,
745            &created_at,
746            &i64::from(version.0), // BIGINT
747            &event,
748            &event.0.variant(),
749            &event.0.join_set_id().map(std::string::ToString::to_string),
750        ],
751    )
752    .await?;
753
754    let pending_at = {
755        debug!("Creating with `Pending(`{scheduled_at:?}`)");
756
757        tx.execute(
758            r"
759            INSERT INTO t_state (
760                execution_id,
761                is_top_level,
762                corresponding_version,
763                pending_expires_finished,
764                ffqn,
765                state,
766                created_at,
767                component_id_input_digest,
768                component_type,
769                deployment_id,
770                first_scheduled_at,
771                updated_at,
772                intermittent_event_count,
773                is_paused
774            ) VALUES (
775                $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, CURRENT_TIMESTAMP, 0, false
776            )",
777            &[
778                &execution_id_str,
779                &execution_id.is_top_level(),
780                &i64::from(version.0),
781                &scheduled_at,
782                &ffqn.to_string(),
783                &STATE_PENDING_AT,
784                &created_at,
785                &component_id.component_digest.as_slice(),
786                &component_id.component_type.to_string(),
787                &deployment_id.to_string(),
788                &scheduled_at,
789            ],
790        )
791        .await?;
792
793        AppendNotifier {
794            pending_at: Some(NotifierPendingAt {
795                scheduled_at,
796                ffqn,
797                component_input_digest: component_id.component_digest,
798            }),
799            execution_finished: None,
800            response: None,
801        }
802    };
803
804    let next_version = Version::new(version.0 + 1);
805    Ok((next_version, pending_at))
806}
807
808#[instrument(level = Level::DEBUG, skip_all, fields(%execution_id, %scheduled_at))]
809async fn update_state_pending_after_response_appended(
810    tx: &Transaction<'_>,
811    execution_id: &ExecutionId,
812    scheduled_at: DateTime<Utc>, // Changing to state PendingAt
813    component_input_digest: ComponentDigest,
814) -> Result<AppendNotifier, DbErrorWrite> {
815    debug!("Setting t_state to Pending(`{scheduled_at:?}`) after response appended");
816
817    // Convert types for Postgres arguments
818    let execution_id_str = execution_id.to_string();
819
820    let updated = tx
821        .execute(
822            r"
823            UPDATE t_state
824            SET
825                pending_expires_finished = $1,
826                state = $2,
827                updated_at = CURRENT_TIMESTAMP,
828
829                max_retries = NULL,
830                retry_exp_backoff_millis = NULL,
831                last_lock_version = NULL,
832
833                join_set_id = NULL,
834                join_set_closing = NULL,
835
836                result_kind = NULL
837            WHERE execution_id = $3
838            ",
839            &[
840                &scheduled_at,     // $1
841                &STATE_PENDING_AT, // $2
842                &execution_id_str, // $3
843            ],
844        )
845        .await?;
846
847    if updated == 0 {
848        return Err(DbErrorWrite::NotFound);
849    }
850
851    Ok(AppendNotifier {
852        pending_at: Some(NotifierPendingAt {
853            scheduled_at,
854            ffqn: fetch_created_event(tx, execution_id).await?.ffqn,
855            component_input_digest,
856        }),
857        execution_finished: None,
858        response: None,
859    })
860}
861
862#[instrument(level = Level::DEBUG, skip_all, fields(%execution_id, %scheduled_at, %appending_version))]
863async fn update_state_pending_after_event_appended(
864    tx: &Transaction<'_>,
865    execution_id: &ExecutionId,
866    appending_version: &Version,
867    scheduled_at: DateTime<Utc>,
868    intermittent_failure: bool,
869    component_input_digest: ComponentDigest,
870) -> Result<(AppendResponse, AppendNotifier), DbErrorWrite> {
871    debug!("Setting t_state to Pending(`{scheduled_at:?}`) after event appended");
872
873    let intermittent_delta = i64::from(intermittent_failure); // 0 or 1
874
875    let updated = tx
876        .execute(
877            r"
878            UPDATE t_state
879            SET
880                corresponding_version = $1,
881                pending_expires_finished = $2,
882                state = $3,
883                updated_at = CURRENT_TIMESTAMP,
884                intermittent_event_count = intermittent_event_count + $4,
885
886                max_retries = NULL,
887                retry_exp_backoff_millis = NULL,
888                last_lock_version = NULL,
889
890                join_set_id = NULL,
891                join_set_closing = NULL,
892
893                result_kind = NULL
894            WHERE execution_id = $5;
895            ",
896            &[
897                &i64::from(appending_version.0), // $1
898                &scheduled_at,                   // $2
899                &STATE_PENDING_AT,               // $3
900                &intermittent_delta,             // $4
901                &execution_id.to_string(),       // $5
902            ],
903        )
904        .await?;
905
906    if updated != 1 {
907        return Err(DbErrorWrite::NotFound);
908    }
909
910    Ok((
911        appending_version.increment(),
912        AppendNotifier {
913            pending_at: Some(NotifierPendingAt {
914                scheduled_at,
915                ffqn: fetch_created_event(tx, execution_id).await?.ffqn,
916                component_input_digest,
917            }),
918            execution_finished: None,
919            response: None,
920        },
921    ))
922}
923
924#[instrument(level = Level::DEBUG, skip_all, fields(%execution_id, %appending_version))]
925#[expect(clippy::too_many_arguments)]
926async fn update_state_locked_get_intermittent_event_count(
927    tx: &Transaction<'_>,
928    execution_id: &ExecutionId,
929    deployment_id: DeploymentId,
930    component_digest: &ComponentDigest,
931    executor_id: ExecutorId,
932    run_id: RunId,
933    lock_expires_at: DateTime<Utc>,
934    appending_version: &Version,
935    retry_config: ComponentRetryConfig,
936) -> Result<u32, DbErrorWrite> {
937    debug!("Setting t_state to Locked(`{lock_expires_at:?}`)");
938    let backoff_millis =
939        i64::try_from(retry_config.retry_exp_backoff.as_millis()).map_err(|err| {
940            // BIGINT = i64
941            DbErrorGeneric::Uncategorized {
942                reason: "backoff too big".into(),
943                context: SpanTrace::capture(),
944                source: Some(Arc::new(err)),
945                loc: Location::caller(),
946            }
947        })?;
948
949    let execution_id_str = execution_id.to_string();
950
951    let updated = tx
952        .execute(
953            r"
954            UPDATE t_state
955            SET
956                corresponding_version = $1,
957                pending_expires_finished = $2,
958                state = $3,
959                updated_at = CURRENT_TIMESTAMP,
960                deployment_id = $4,
961                component_id_input_digest = $5,
962
963                max_retries = $6,
964                retry_exp_backoff_millis = $7,
965                last_lock_version = $1,
966                executor_id = $8,
967                run_id = $9,
968
969                join_set_id = NULL,
970                join_set_closing = NULL,
971
972                result_kind = NULL
973            WHERE execution_id = $10
974            AND is_paused = false
975            ",
976            &[
977                &i64::from(appending_version.0),
978                &lock_expires_at,
979                &STATE_LOCKED,
980                &deployment_id.to_string(),
981                &component_digest,
982                &retry_config.max_retries.map(i64::from),
983                &backoff_millis,
984                &executor_id.to_string(),
985                &run_id.to_string(),
986                &execution_id_str,
987            ],
988        )
989        .await?;
990
991    if updated != 1 {
992        return Err(DbErrorWrite::NotFound);
993    }
994
995    // fetch intermittent event count
996    let row = tx
997        .query_one(
998            "SELECT intermittent_event_count FROM t_state WHERE execution_id = $1",
999            &[&execution_id_str],
1000        )
1001        .await
1002        .map_err(DbErrorGeneric::from)?;
1003
1004    let count: i64 = get(&row, "intermittent_event_count")?; // Postgres BIGINT
1005    let count = u32::try_from(count)
1006        .map_err(|_| consistency_db_err("`intermittent_event_count` must not be negative"))?;
1007    Ok(count)
1008}
1009
1010#[instrument(level = Level::DEBUG, skip_all, fields(%execution_id, %appending_version))]
1011async fn update_state_blocked(
1012    tx: &Transaction<'_>,
1013    execution_id: &ExecutionId,
1014    appending_version: &Version,
1015    join_set_id: &JoinSetId,
1016    lock_expires_at: DateTime<Utc>,
1017    join_set_closing: bool,
1018) -> Result<AppendResponse, DbErrorWrite> {
1019    debug!("Setting t_state to BlockedByJoinSet(`{join_set_id}`)");
1020
1021    let updated = tx
1022        .execute(
1023            r"
1024            UPDATE t_state
1025            SET
1026                corresponding_version = $1,
1027                pending_expires_finished = $2,
1028                state = $3,
1029                updated_at = CURRENT_TIMESTAMP,
1030
1031                max_retries = NULL,
1032                retry_exp_backoff_millis = NULL,
1033                last_lock_version = NULL,
1034
1035                join_set_id = $4,
1036                join_set_closing = $5,
1037
1038                result_kind = NULL
1039            WHERE execution_id = $6
1040            ",
1041            &[
1042                &i64::from(appending_version.0), // $1
1043                &lock_expires_at,                // $2
1044                &STATE_BLOCKED_BY_JOIN_SET,      // $3
1045                &join_set_id.to_string(),        // $4
1046                &join_set_closing,               // $5 (BOOLEAN)
1047                &execution_id.to_string(),       // $6
1048            ],
1049        )
1050        .await?;
1051
1052    if updated != 1 {
1053        return Err(DbErrorWrite::NotFound);
1054    }
1055    Ok(appending_version.increment())
1056}
1057
1058#[instrument(level = Level::DEBUG, skip_all, fields(%execution_id, %appending_version))]
1059async fn update_state_finished(
1060    tx: &Transaction<'_>,
1061    execution_id: &ExecutionId,
1062    appending_version: &Version,
1063    finished_at: DateTime<Utc>,
1064    result_kind: PendingStateFinishedResultKind,
1065) -> Result<(), DbErrorWrite> {
1066    debug!("Setting t_state to Finished");
1067
1068    let result_kind_json = Json(result_kind);
1069
1070    let updated = tx
1071        .execute(
1072            r"
1073            UPDATE t_state
1074            SET
1075                corresponding_version = $1,
1076                pending_expires_finished = $2,
1077                state = $3,
1078                updated_at = CURRENT_TIMESTAMP,
1079
1080                max_retries = NULL,
1081                retry_exp_backoff_millis = NULL,
1082                last_lock_version = NULL,
1083                executor_id = NULL,
1084                run_id = NULL,
1085
1086                join_set_id = NULL,
1087                join_set_closing = NULL,
1088
1089                result_kind = $4
1090            WHERE execution_id = $5
1091            ",
1092            &[
1093                &i64::from(appending_version.0), // $1
1094                &finished_at,                    // $2
1095                &STATE_FINISHED,                 // $3
1096                &result_kind_json,               // $4
1097                &execution_id.to_string(),       // $5
1098            ],
1099        )
1100        .await?;
1101
1102    if updated != 1 {
1103        return Err(DbErrorWrite::NotFound);
1104    }
1105    Ok(())
1106}
1107
1108#[instrument(level = Level::DEBUG, skip_all, fields(%execution_id, %appending_version, %is_paused))]
1109async fn update_state_paused(
1110    tx: &Transaction<'_>,
1111    execution_id: &ExecutionId,
1112    appending_version: &Version,
1113    is_paused: bool,
1114) -> Result<AppendResponse, DbErrorWrite> {
1115    debug!(
1116        "Setting t_state to {}",
1117        if is_paused { "paused" } else { "unpaused" }
1118    );
1119
1120    let updated = tx
1121        .execute(
1122            r"
1123            UPDATE t_state
1124            SET
1125                corresponding_version = $1,
1126                is_paused = $2,
1127                updated_at = CURRENT_TIMESTAMP
1128            WHERE execution_id = $3
1129            ",
1130            &[
1131                &i64::from(appending_version.0), // $1
1132                &is_paused,                      // $2
1133                &execution_id.to_string(),       // $3
1134            ],
1135        )
1136        .await?;
1137
1138    if updated != 1 {
1139        return Err(DbErrorWrite::NotFound);
1140    }
1141    Ok(appending_version.increment())
1142}
1143
1144#[instrument(level = Level::DEBUG, skip_all, fields(%execution_id, %appending_version))]
1145async fn bump_state_next_version(
1146    tx: &Transaction<'_>,
1147    execution_id: &ExecutionId,
1148    appending_version: &Version,
1149    delay_req: Option<DelayReq>,
1150) -> Result<AppendResponse, DbErrorWrite> {
1151    debug!("update_index_version");
1152    let execution_id_str = execution_id.to_string();
1153
1154    let updated = tx
1155        .execute(
1156            r"
1157            UPDATE t_state
1158            SET
1159                corresponding_version = $1,
1160                updated_at = CURRENT_TIMESTAMP
1161            WHERE execution_id = $2
1162            ",
1163            &[
1164                &i64::from(appending_version.0), // $1
1165                &execution_id_str,               // $2
1166            ],
1167        )
1168        .await?;
1169
1170    if updated != 1 {
1171        return Err(DbErrorWrite::NotFound);
1172    }
1173
1174    if let Some(DelayReq {
1175        join_set_id,
1176        delay_id,
1177        expires_at,
1178    }) = delay_req
1179    {
1180        debug!("Inserting delay to `t_delay`");
1181        tx.execute(
1182            "INSERT INTO t_delay (execution_id, join_set_id, delay_id, expires_at) VALUES ($1, $2, $3, $4)",
1183            &[
1184                &execution_id_str,
1185                &join_set_id.to_string(),
1186                &delay_id.to_string(),
1187                &expires_at,
1188            ],
1189        )
1190        .await?;
1191    }
1192    Ok(appending_version.increment())
1193}
1194
1195async fn get_combined_state(
1196    tx: &Transaction<'_>,
1197    execution_id: &ExecutionId,
1198) -> Result<CombinedState, DbErrorRead> {
1199    let row = tx
1200        .query_one(
1201            r"
1202            SELECT
1203                created_at, first_scheduled_at,
1204                state, ffqn, component_id_input_digest, component_type, deployment_id, corresponding_version, pending_expires_finished,
1205                last_lock_version, executor_id, run_id,
1206                join_set_id, join_set_closing,
1207                result_kind, is_paused
1208            FROM t_state
1209            WHERE execution_id = $1
1210            ",
1211            &[&execution_id.to_string()],
1212        )
1213        .await
1214        .map_err(DbErrorRead::from)?;
1215
1216    // Parsing columns
1217
1218    let created_at: DateTime<Utc> = get(&row, "created_at")?;
1219    let first_scheduled_at: DateTime<Utc> = get(&row, "first_scheduled_at")?;
1220
1221    let digest_bytes: Vec<u8> = get(&row, "component_id_input_digest")?;
1222    let digest = Digest::try_from(digest_bytes.as_slice()).map_err(|err| {
1223        consistency_db_err_src("cannot parse `component_id_input_digest`", Arc::from(err))
1224    })?;
1225    let component_digest = ComponentDigest(digest);
1226
1227    let component_type: String = get(&row, "component_type")?;
1228    let component_type = ComponentType::from_str(&component_type)
1229        .map_err(|err| consistency_db_err_src("cannot parse `component_type`", Arc::from(err)))?;
1230
1231    let deployment_id: String = get(&row, "deployment_id")?;
1232    let deployment_id = DeploymentId::from_str(&deployment_id).map_err(DbErrorGeneric::from)?;
1233
1234    let state: String = get(&row, "state")?;
1235    let ffqn: String = get(&row, "ffqn")?;
1236    let ffqn = FunctionFqn::from_str(&ffqn).map_err(|parse_err| {
1237        consistency_db_err(format!("invalid ffqn value in `t_state` - {parse_err}"))
1238    })?;
1239
1240    let pending_expires_finished: DateTime<Utc> = get(&row, "pending_expires_finished")?;
1241
1242    let last_lock_version_raw: Option<i64> = get(&row, "last_lock_version")?;
1243    let last_lock_version = last_lock_version_raw
1244        .map(Version::try_from)
1245        .transpose()
1246        .map_err(|_| consistency_db_err("version must be non-negative"))?;
1247
1248    let executor_id_raw: Option<String> = get(&row, "executor_id")?;
1249    let executor_id = executor_id_raw
1250        .map(|id| ExecutorId::from_str(&id))
1251        .transpose()
1252        .map_err(DbErrorGeneric::from)?;
1253
1254    let run_id_raw: Option<String> = get(&row, "run_id")?;
1255    let run_id = run_id_raw
1256        .map(|id| RunId::from_str(&id))
1257        .transpose()
1258        .map_err(DbErrorGeneric::from)?;
1259
1260    let join_set_id_raw: Option<String> = get(&row, "join_set_id")?;
1261    let join_set_id = join_set_id_raw
1262        .map(|id| JoinSetId::from_str(&id))
1263        .transpose()
1264        .map_err(DbErrorGeneric::from)?;
1265
1266    let join_set_closing: Option<bool> = get(&row, "join_set_closing")?;
1267
1268    let result_kind: Option<Json<PendingStateFinishedResultKind>> = get(&row, "result_kind")?;
1269    let result_kind = result_kind.map(|it| it.0);
1270
1271    let is_paused: bool = get(&row, "is_paused")?;
1272
1273    let corresponding_version: i64 = get(&row, "corresponding_version")?;
1274    let corresponding_version = Version::new(
1275        VersionType::try_from(corresponding_version)
1276            .map_err(|_| consistency_db_err("version must be non-negative"))?,
1277    );
1278
1279    let dto = CombinedStateDTO {
1280        execution_id: execution_id.clone(),
1281        created_at,
1282        first_scheduled_at,
1283        state,
1284        ffqn,
1285        component_digest,
1286        component_type,
1287        deployment_id,
1288        pending_expires_finished,
1289        last_lock_version,
1290        executor_id,
1291        run_id,
1292        join_set_id,
1293        join_set_closing,
1294        result_kind,
1295        is_paused,
1296    };
1297    CombinedState::new(dto, corresponding_version).map_err(DbErrorRead::from)
1298}
1299
1300async fn list_executions(
1301    read_tx: &Transaction<'_>,
1302    filter: ListExecutionsFilter,
1303    pagination: &ExecutionListPagination,
1304) -> Result<Vec<ExecutionWithState>, DbErrorGeneric> {
1305    // Helper to manage dynamic WHERE clauses and positional parameters ($1, $2...)
1306    struct QueryBuilder {
1307        where_clauses: Vec<String>,
1308        params: Vec<Box<dyn ToSql + Send + Sync>>,
1309    }
1310
1311    impl QueryBuilder {
1312        fn new() -> Self {
1313            Self {
1314                where_clauses: Vec::new(),
1315                params: Vec::new(),
1316            }
1317        }
1318
1319        fn add_param<T>(&mut self, param: T) -> String
1320        where
1321            T: ToSql + Sync + Send + 'static,
1322        {
1323            self.params.push(Box::new(param));
1324            format!("${}", self.params.len())
1325        }
1326
1327        fn add_where(&mut self, clause: String) {
1328            self.where_clauses.push(clause);
1329        }
1330    }
1331
1332    let mut qb = QueryBuilder::new();
1333
1334    // Pagination Logic
1335    let (limit, limit_desc) = match pagination {
1336        ExecutionListPagination::CreatedBy(p) => {
1337            let limit = p.length();
1338            let is_desc = p.is_desc();
1339            if let Some(cursor) = p.cursor() {
1340                let placeholder = qb.add_param(*cursor);
1341                qb.add_where(format!("created_at {} {placeholder}", p.rel()));
1342            }
1343            (limit, is_desc)
1344        }
1345        ExecutionListPagination::ExecutionId(p) => {
1346            let limit = p.length();
1347            let is_desc = p.is_desc();
1348            if let Some(cursor) = p.cursor() {
1349                let placeholder = qb.add_param(cursor.to_string());
1350                qb.add_where(format!("execution_id {} {placeholder}", p.rel()));
1351            }
1352            (limit, is_desc)
1353        }
1354    };
1355
1356    if !filter.show_derived {
1357        qb.add_where("is_top_level = true".to_string());
1358    }
1359    let like = |str| format!("{str}%");
1360    if let Some(ffqn_prefix) = filter.ffqn_prefix {
1361        let placeholder = qb.add_param(like(ffqn_prefix));
1362        qb.add_where(format!("ffqn LIKE {placeholder}"));
1363    }
1364    if filter.hide_finished {
1365        qb.add_where(format!("state != '{STATE_FINISHED}'"));
1366    }
1367    if let Some(prefix) = filter.execution_id_prefix {
1368        let placeholder = qb.add_param(like(prefix));
1369        qb.add_where(format!("execution_id LIKE {placeholder}"));
1370    }
1371    if let Some(component_digest) = filter.component_digest {
1372        let placeholder = qb.add_param(component_digest);
1373        qb.add_where(format!("component_id_input_digest = {placeholder}"));
1374    }
1375    if let Some(deployment_id) = filter.deployment_id {
1376        let placeholder = qb.add_param(deployment_id.to_string());
1377        qb.add_where(format!("deployment_id = {placeholder}"));
1378    }
1379
1380    let where_str = if qb.where_clauses.is_empty() {
1381        String::new()
1382    } else {
1383        format!("WHERE {}", qb.where_clauses.join(" AND "))
1384    };
1385
1386    let order_col = match pagination {
1387        ExecutionListPagination::CreatedBy(_) => "created_at",
1388        ExecutionListPagination::ExecutionId(_) => "execution_id",
1389    };
1390
1391    // Inner query: fetch rows with cursor-based ordering
1392    // Outer query: always return results in descending order
1393    let (inner_order, outer_order) = if limit_desc {
1394        ("DESC", "")
1395    } else {
1396        ("", "DESC")
1397    };
1398
1399    let inner_sql = format!(
1400        r"SELECT created_at, first_scheduled_at, component_id_input_digest, deployment_id,
1401            state, execution_id, ffqn, corresponding_version, pending_expires_finished,
1402            last_lock_version, executor_id, run_id,
1403            join_set_id, join_set_closing,
1404            result_kind, is_paused
1405            FROM t_state {where_str} ORDER BY {order_col} {inner_order} LIMIT {limit}"
1406    );
1407
1408    let sql = if outer_order.is_empty() {
1409        inner_sql
1410    } else {
1411        format!("SELECT * FROM ({inner_sql}) AS sub ORDER BY {order_col} {outer_order}")
1412    };
1413
1414    let params_refs: Vec<&(dyn ToSql + Sync)> = qb
1415        .params
1416        .iter()
1417        .map(|p| p.as_ref() as &(dyn ToSql + Sync))
1418        .collect();
1419
1420    let rows = read_tx.query(&sql, &params_refs).await?;
1421
1422    let mut vec = Vec::with_capacity(rows.len());
1423
1424    for row in rows {
1425        // If parsing of the row fails, log and skip it.
1426        let unpack = || -> Result<ExecutionWithState, DbErrorGeneric> {
1427            let execution_id_str: String = get(&row, "execution_id")?;
1428            let execution_id = ExecutionId::from_str(&execution_id_str)
1429                .map_err(|err| consistency_db_err(err.to_string()))?;
1430
1431            let digest_bytes: Vec<u8> = get(&row, "component_id_input_digest")?;
1432            let digest = Digest::try_from(digest_bytes.as_slice()).map_err(|err| {
1433                consistency_db_err_src("cannot parse `component_id_input_digest`", Arc::from(err))
1434            })?;
1435            let component_digest = ComponentDigest(digest);
1436
1437            let component_type: String = get(&row, "component_type")?;
1438            let component_type = ComponentType::from_str(&component_type).map_err(|err| {
1439                consistency_db_err_src("cannot parse `component_type`", Arc::from(err))
1440            })?;
1441
1442            let deployment_id: String = get(&row, "deployment_id")?;
1443            let deployment_id =
1444                DeploymentId::from_str(&deployment_id).map_err(DbErrorGeneric::from)?;
1445
1446            let created_at: DateTime<Utc> = get(&row, "created_at")?;
1447            let first_scheduled_at: DateTime<Utc> = get(&row, "first_scheduled_at")?;
1448
1449            let result_kind: Option<Json<PendingStateFinishedResultKind>> =
1450                get(&row, "result_kind")?;
1451            let result_kind = result_kind.map(|it| it.0);
1452
1453            let is_paused: bool = get(&row, "is_paused")?;
1454
1455            let corresponding_version: i64 = get(&row, "corresponding_version")?;
1456            let corresponding_version = Version::try_from(corresponding_version)
1457                .map_err(|_| consistency_db_err("version must be non-negative"))?;
1458
1459            let executor_id_str: Option<String> = get(&row, "executor_id")?;
1460            let executor_id = executor_id_str
1461                .map(|id| ExecutorId::from_str(&id))
1462                .transpose()?;
1463
1464            let last_lock_version_raw: Option<i64> = get(&row, "last_lock_version")?;
1465            let last_lock_version = last_lock_version_raw
1466                .map(Version::try_from)
1467                .transpose()
1468                .map_err(|_| consistency_db_err("version must be non-negative"))?;
1469
1470            let run_id_str: Option<String> = get(&row, "run_id")?;
1471            let run_id = run_id_str.map(|id| RunId::from_str(&id)).transpose()?;
1472
1473            let join_set_id_str: Option<String> = get(&row, "join_set_id")?;
1474            let join_set_id = join_set_id_str
1475                .map(|id| JoinSetId::from_str(&id))
1476                .transpose()?;
1477
1478            let ffqn: String = get(&row, "ffqn")?;
1479            let ffqn = FunctionFqn::from_str(&ffqn).map_err(|parse_err| {
1480                error!("Error parsing ffqn - {parse_err:?}");
1481                consistency_db_err("invalid ffqn value in `t_state`")
1482            })?;
1483
1484            let combined_state_dto = CombinedStateDTO {
1485                execution_id,
1486                created_at,
1487                first_scheduled_at,
1488                component_digest,
1489                component_type,
1490                deployment_id,
1491                state: get(&row, "state")?,
1492                ffqn,
1493                pending_expires_finished: get(&row, "pending_expires_finished")?,
1494                executor_id,
1495                last_lock_version,
1496                run_id,
1497                join_set_id,
1498                join_set_closing: get(&row, "join_set_closing")?,
1499                result_kind,
1500                is_paused,
1501            };
1502
1503            let combined_state = CombinedState::new(combined_state_dto, corresponding_version)?;
1504
1505            Ok(combined_state.execution_with_state)
1506        };
1507
1508        match unpack() {
1509            Ok(execution) => vec.push(execution),
1510            Err(err) => {
1511                warn!("Skipping corrupted row in t_state: {err:?}");
1512            }
1513        }
1514    }
1515
1516    Ok(vec)
1517}
1518
1519async fn list_responses(
1520    tx: &Transaction<'_>,
1521    execution_id: &ExecutionId,
1522    pagination: Option<Pagination<u32>>,
1523) -> Result<Vec<ResponseWithCursor>, DbErrorRead> {
1524    // Helper to manage params dynamically
1525    let mut params: Vec<Box<dyn ToSql + Send + Sync>> = Vec::new();
1526    let mut add_param = |p: Box<dyn ToSql + Send + Sync>| {
1527        params.push(p);
1528        format!("${}", params.len())
1529    };
1530
1531    // 1. Base Query
1532    let p_execution_id = add_param(Box::new(execution_id.to_string()));
1533
1534    let mut sql = format!(
1535        "SELECT \
1536            r.id, r.created_at, r.join_set_id, r.delay_id, r.delay_success, r.child_execution_id, r.finished_version, l.json_value \
1537            FROM t_join_set_response r LEFT OUTER JOIN t_execution_log l ON r.child_execution_id = l.execution_id \
1538            WHERE \
1539            r.execution_id = {p_execution_id} \
1540            AND ( r.finished_version = l.version OR r.child_execution_id IS NULL )"
1541    );
1542
1543    // 2. Pagination Logic
1544    let limit = match &pagination {
1545        Some(p @ (Pagination::NewerThan { cursor, .. } | Pagination::OlderThan { cursor, .. })) => {
1546            // Postgres BIGINT is i64.
1547            let p_cursor = add_param(Box::new(i64::from(*cursor)));
1548
1549            // Add WHERE clause for cursor
1550            write!(sql, " AND r.id {} {}", p.rel(), p_cursor).unwrap();
1551
1552            Some(p.length())
1553        }
1554        None => None,
1555    };
1556
1557    // 3. Ordering
1558    sql.push_str(" ORDER BY r.id");
1559    let is_desc = pagination.as_ref().is_some_and(Pagination::is_desc);
1560    if is_desc {
1561        sql.push_str(" DESC");
1562    }
1563
1564    // 4. Limit
1565    if let Some(limit) = limit {
1566        // Postgres limit expects i64
1567        let p_limit = add_param(Box::new(i64::from(limit)));
1568        write!(sql, " LIMIT {p_limit}").unwrap();
1569    }
1570
1571    // Re-order to ascending for consistent oldest-to-newest results
1572    if is_desc {
1573        sql = format!("SELECT * FROM ({sql}) AS sub ORDER BY id ASC");
1574    }
1575
1576    let params_refs: Vec<&(dyn ToSql + Sync)> = params
1577        .iter()
1578        .map(|p| p.as_ref() as &(dyn ToSql + Sync))
1579        .collect();
1580
1581    let rows = tx
1582        .query(&sql, &params_refs)
1583        .await
1584        .map_err(DbErrorRead::from)?;
1585
1586    let mut results = Vec::with_capacity(rows.len());
1587    for row in rows {
1588        results.push(parse_response_with_cursor(&row)?);
1589    }
1590
1591    Ok(results)
1592}
1593
1594async fn list_logs_tx(
1595    tx: &Transaction<'_>,
1596    execution_id: &ExecutionId,
1597    show_derived: bool,
1598    filter: &LogFilter,
1599    pagination: &Pagination<DateTime<Utc>>,
1600) -> Result<ListLogsResponse, DbErrorRead> {
1601    let mut param_index = 1;
1602    let exec_id_str = execution_id.to_string();
1603    let exec_id_filter = if show_derived {
1604        format!("execution_id LIKE ${param_index} || '%'")
1605    } else {
1606        format!("execution_id = ${param_index}")
1607    };
1608    let mut query = format!(
1609        "SELECT id, run_id, created_at, level, message, stream_type, payload, execution_id
1610         FROM t_log
1611         WHERE {exec_id_filter}"
1612    );
1613    let mut params: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = vec![&exec_id_str];
1614    param_index += 1;
1615
1616    // Logs and streams filter
1617    let level_filter = if filter.should_show_logs() {
1618        let levels_str = if !filter.levels().is_empty() {
1619            filter
1620                .levels()
1621                .iter()
1622                .map(|lvl| (*lvl as u8).to_string())
1623                .collect::<Vec<_>>()
1624                .join(",")
1625        } else {
1626            LogLevel::iter()
1627                .map(|lvl| (lvl as u8).to_string())
1628                .collect::<Vec<_>>()
1629                .join(",")
1630        };
1631        Some(format!(" level IN ({levels_str})"))
1632    } else {
1633        None
1634    };
1635    let stream_filter = if filter.should_show_streams() {
1636        let streams_str = if !filter.stream_types().is_empty() {
1637            filter
1638                .stream_types()
1639                .iter()
1640                .map(|st| (*st as u8).to_string())
1641                .collect::<Vec<_>>()
1642                .join(",")
1643        } else {
1644            LogStreamType::iter()
1645                .map(|st| (st as u8).to_string())
1646                .collect::<Vec<_>>()
1647                .join(",")
1648        };
1649        Some(format!(" stream_type IN ({streams_str})"))
1650    } else {
1651        None
1652    };
1653    match (level_filter, stream_filter) {
1654        (Some(level_filter), Some(stream_filter)) => {
1655            write!(&mut query, " AND ({level_filter} OR {stream_filter})")
1656                .expect("writing to string");
1657        }
1658        (Some(level_filter), None) => {
1659            write!(&mut query, " AND {level_filter}").expect("writing to string");
1660        }
1661        (None, Some(stream_filter)) => {
1662            write!(&mut query, " AND {stream_filter}").expect("writing to string");
1663        }
1664        (None, None) => unreachable!("guarded by constructor"),
1665    }
1666
1667    // Pagination
1668    write!(
1669        &mut query,
1670        " AND created_at {} ${param_index}",
1671        pagination.rel()
1672    )
1673    .expect("writing to string");
1674    let cursor_val = pagination.cursor();
1675    params.push(cursor_val);
1676    param_index += 1;
1677
1678    // Ordering and limit
1679    let dir = if pagination.is_desc() { "DESC" } else { "ASC" };
1680    write!(
1681        &mut query,
1682        " ORDER BY created_at {dir}, id {dir} LIMIT ${param_index}",
1683    )
1684    .expect("writing to string");
1685    let length_val: i64 = i64::from(pagination.length());
1686    params.push(&length_val);
1687
1688    let rows = tx.query(&query, &params[..]).await?;
1689
1690    let mut items = Vec::with_capacity(rows.len());
1691
1692    for row in rows {
1693        let created_at: chrono::DateTime<chrono::Utc> = get(&row, "created_at")?;
1694        let run_id: String = get(&row, "run_id")?;
1695        let run_id = RunId::from_str(&run_id).map_err(|parse_err| {
1696            consistency_db_err_src(
1697                format!("cannot convert RunId {run_id}"),
1698                Arc::from(parse_err),
1699            )
1700        })?;
1701        let execution_id_str: String = get(&row, "execution_id")?;
1702        let execution_id = ExecutionId::from_str(&execution_id_str).map_err(|parse_err| {
1703            consistency_db_err_src(
1704                format!("cannot convert ExecutionId {execution_id_str}"),
1705                Arc::from(parse_err),
1706            )
1707        })?;
1708
1709        let level: Option<i32> = get(&row, "level")?;
1710        let message: Option<String> = get(&row, "message")?;
1711        let stream_type: Option<i32> = get(&row, "stream_type")?;
1712        let payload: Option<Vec<u8>> = get(&row, "payload")?;
1713
1714        let log_entry = match (level, message, stream_type, payload) {
1715            (Some(lvl), Some(msg), None, None) => {
1716                let map_err =
1717                    |err| consistency_db_err_src(format!("cannot convert {lvl} to LogLevel"), err);
1718                LogEntry::Log {
1719                    created_at,
1720                    level: u8::try_from(lvl)
1721                        .map(|lvl| LogLevel::try_from(lvl).map_err(|err| map_err(Arc::from(err))))
1722                        .map_err(|err| map_err(Arc::from(err)))??,
1723                    message: msg,
1724                }
1725            }
1726            (None, None, Some(stype), Some(pl)) => {
1727                let map_err = |err| {
1728                    consistency_db_err_src(format!("cannot convert {stype} to LogStreamType"), err)
1729                };
1730                LogEntry::Stream {
1731                    created_at,
1732                    stream_type: u8::try_from(stype)
1733                        .map(|stype| {
1734                            LogStreamType::try_from(stype).map_err(|err| map_err(Arc::from(err)))
1735                        })
1736                        .map_err(|err| map_err(Arc::from(err)))??,
1737                    payload: pl,
1738                }
1739            }
1740            _ => {
1741                return Err(consistency_db_err("invalid t_log row".to_string()).into());
1742            }
1743        };
1744
1745        items.push(LogEntryRow {
1746            cursor: created_at,
1747            run_id,
1748            log_entry,
1749            execution_id,
1750        });
1751    }
1752
1753    Ok(ListLogsResponse {
1754        next_page: items
1755            .last()
1756            .map(|item| Pagination::NewerThan {
1757                length: pagination.length(),
1758                cursor: item.cursor,
1759                including_cursor: false,
1760            })
1761            .unwrap_or(if pagination.is_asc() {
1762                *pagination // no new results, keep the same cursor
1763            } else {
1764                // no prev results, let's start from beginning
1765                Pagination::NewerThan {
1766                    length: pagination.length(),
1767                    cursor: DateTime::<Utc>::UNIX_EPOCH,
1768                    including_cursor: false,
1769                }
1770            }),
1771        prev_page: match items.first() {
1772            Some(item) => Some(Pagination::OlderThan {
1773                length: pagination.length(),
1774                cursor: item.cursor,
1775                including_cursor: false,
1776            }),
1777            None if pagination.is_asc() && pagination.cursor() > &DateTime::<Utc>::UNIX_EPOCH => {
1778                // asked for a next page that does not exists (yet).
1779                Some(pagination.invert())
1780            }
1781            None => None,
1782        },
1783        items,
1784    })
1785}
1786
1787async fn list_deployment_states(
1788    tx: &Transaction<'_>,
1789    current_time: DateTime<Utc>,
1790    pagination: Pagination<Option<DeploymentId>>,
1791    include_config_json: bool,
1792) -> Result<Vec<DeploymentState>, DbErrorRead> {
1793    // Helper for numbered params ($1, $2, ...)
1794    let mut params: Vec<Box<dyn ToSql + Send + Sync>> = Vec::new();
1795    let mut add_param = |p: Box<dyn ToSql + Send + Sync>| {
1796        params.push(p);
1797        format!("${}", params.len())
1798    };
1799
1800    // Base params
1801    let p_now = add_param(Box::new(current_time));
1802
1803    let config_json_col = if include_config_json {
1804        "d.config_json"
1805    } else {
1806        "NULL::TEXT AS config_json"
1807    };
1808
1809    let mut sql = format!(
1810        "
1811        SELECT
1812            d.deployment_id,
1813
1814            COUNT(*) FILTER (WHERE s.state = '{STATE_LOCKED}') AS locked,
1815
1816            COUNT(*) FILTER (
1817                WHERE s.state = '{STATE_PENDING_AT}'
1818                  AND s.pending_expires_finished <= {p_now}
1819            ) AS pending,
1820
1821            COUNT(*) FILTER (
1822                WHERE s.state = '{STATE_PENDING_AT}'
1823                  AND s.pending_expires_finished > {p_now}
1824            ) AS scheduled,
1825
1826            COUNT(*) FILTER (WHERE s.state = '{STATE_BLOCKED_BY_JOIN_SET}') AS blocked,
1827
1828            COUNT(*) FILTER (WHERE s.state = '{STATE_FINISHED}') AS finished,
1829
1830            {config_json_col},
1831            d.created_at,
1832            d.last_active_at,
1833            d.status
1834        FROM t_deployment d
1835        LEFT JOIN t_state s ON s.deployment_id = d.deployment_id"
1836    );
1837
1838    // Pagination
1839    if let Some(cursor) = pagination.cursor() {
1840        let p_cursor = add_param(Box::new(cursor.to_string()));
1841        write!(
1842            sql,
1843            " WHERE d.deployment_id {rel} {p_cursor}",
1844            rel = pagination.rel()
1845        )
1846        .expect("writing to string");
1847    }
1848
1849    // Grouping + ordering
1850    // Inner query: fetch rows with cursor-based ordering
1851    // Outer query: always return results in descending order
1852    let (inner_order, outer_order) = if pagination.is_desc() {
1853        ("DESC", "")
1854    } else {
1855        ("ASC", "DESC")
1856    };
1857
1858    write!(
1859        sql,
1860        " GROUP BY d.deployment_id, d.config_json, d.created_at, d.last_active_at, d.status ORDER BY d.deployment_id {inner_order} LIMIT {}",
1861        pagination.length()
1862    )
1863    .expect("writing to string");
1864
1865    let final_sql = if outer_order.is_empty() {
1866        sql
1867    } else {
1868        format!("SELECT * FROM ({sql}) AS sub ORDER BY deployment_id {outer_order}")
1869    };
1870
1871    let params_refs: Vec<&(dyn ToSql + Sync)> = params
1872        .iter()
1873        .map(|p| p.as_ref() as &(dyn ToSql + Sync))
1874        .collect();
1875
1876    let rows = tx
1877        .query(&final_sql, &params_refs)
1878        .await
1879        .map_err(DbErrorRead::from)?;
1880
1881    let mut result = Vec::with_capacity(rows.len());
1882    for row in rows {
1883        let deployment_id: String = get(&row, "deployment_id")?;
1884        let status_str: String = get::<String, _>(&row, "status")?;
1885        let status = status_str
1886            .parse::<DeploymentStatus>()
1887            .map_err(|e| consistency_db_err(format!("unknown deployment status: {e}")))?;
1888        result.push(DeploymentState {
1889            deployment_id: DeploymentId::from_str(&deployment_id).map_err(DbErrorGeneric::from)?,
1890            locked: u32::try_from(get::<i64, _>(&row, "locked")?).expect("count is never negative"),
1891            pending: u32::try_from(get::<i64, _>(&row, "pending")?)
1892                .expect("count is never negative"),
1893            scheduled: u32::try_from(get::<i64, _>(&row, "scheduled")?)
1894                .expect("count is never negative"),
1895            blocked: u32::try_from(get::<i64, _>(&row, "blocked")?)
1896                .expect("count is never negative"),
1897            finished: u32::try_from(get::<i64, _>(&row, "finished")?)
1898                .expect("count is never negative"),
1899            config_json: get::<Option<String>, _>(&row, "config_json")?,
1900            created_at: get::<DateTime<Utc>, _>(&row, "created_at")?,
1901            last_active_at: get::<Option<DateTime<Utc>>, _>(&row, "last_active_at")?,
1902            status,
1903        });
1904    }
1905
1906    Ok(result)
1907}
1908
1909fn parse_response_with_cursor(
1910    row: &tokio_postgres::Row,
1911) -> Result<ResponseWithCursor, DbErrorRead> {
1912    // Postgres BIGINT = i64.
1913    let id = u32::try_from(get::<i64, _>(row, "id")?)
1914        .map_err(|_| consistency_db_err("id must not be negative"))?;
1915
1916    let created_at: DateTime<Utc> = get(row, "created_at")?;
1917    let join_set_id_str: String = get(row, "join_set_id")?;
1918    let join_set_id = JoinSetId::from_str(&join_set_id_str).map_err(DbErrorGeneric::from)?;
1919
1920    // Extract Optionals
1921    let delay_id: Option<String> = get(row, "delay_id")?;
1922    let delay_id = delay_id
1923        .map(|id| DelayId::from_str(&id))
1924        .transpose()
1925        .map_err(DbErrorGeneric::from)?;
1926    let delay_success: Option<bool> = get(row, "delay_success")?;
1927    let child_execution_id: Option<String> = get(row, "child_execution_id")?;
1928    let child_execution_id = child_execution_id
1929        .map(|id| ExecutionIdDerived::from_str(&id))
1930        .transpose()
1931        .map_err(DbErrorGeneric::from)?;
1932    let finished_version = get::<Option<i64>, _>(row, "finished_version")?
1933        .map(Version::try_from)
1934        .transpose()
1935        .map_err(|_| consistency_db_err("version must be non-negative"))?;
1936    let json_value: Option<Json<ExecutionRequest>> = get(row, "json_value")?;
1937    let json_value = json_value.map(|it| it.0);
1938
1939    let event = match (
1940        delay_id,
1941        delay_success,
1942        child_execution_id,
1943        finished_version,
1944        json_value,
1945    ) {
1946        (Some(delay_id), Some(delay_success), None, None, None) => JoinSetResponse::DelayFinished {
1947            delay_id,
1948            result: delay_success.then_some(()).ok_or(()),
1949        },
1950        (None, None, Some(child_execution_id), Some(finished_version), Some(json_val)) => {
1951            if let ExecutionRequest::Finished { retval: result, .. } = json_val {
1952                JoinSetResponse::ChildExecutionFinished {
1953                    child_execution_id,
1954                    finished_version,
1955                    result,
1956                }
1957            } else {
1958                error!("Joined log entry must be 'Finished'");
1959                return Err(consistency_db_err("joined log entry must be 'Finished'").into());
1960            }
1961        }
1962        (delay, delay_success, child, finished, result) => {
1963            error!(
1964                "Invalid row in t_join_set_response {id} - {delay:?} {delay_success:?} {child:?} {finished:?} {result:?}",
1965            );
1966            return Err(consistency_db_err("invalid row in t_join_set_response").into());
1967        }
1968    };
1969
1970    Ok(ResponseWithCursor {
1971        cursor: ResponseCursor(id),
1972        event: JoinSetResponseEventOuter {
1973            event: JoinSetResponseEvent { join_set_id, event },
1974            created_at,
1975        },
1976    })
1977}
1978
1979#[instrument(level = Level::TRACE, skip_all, fields(%execution_id, %run_id, %executor_id))]
1980#[expect(clippy::too_many_arguments)]
1981async fn lock_single_execution(
1982    tx: &Transaction<'_>,
1983    created_at: DateTime<Utc>,
1984    component_id: &ComponentId,
1985    deployment_id: DeploymentId,
1986    execution_id: &ExecutionId,
1987    run_id: RunId,
1988    appending_version: &Version,
1989    executor_id: ExecutorId,
1990    lock_expires_at: DateTime<Utc>,
1991    retry_config: ComponentRetryConfig,
1992) -> Result<LockedExecution, DbErrorWrite> {
1993    trace!("lock_single_execution");
1994
1995    // Check State
1996    let combined_state = get_combined_state(tx, execution_id).await?;
1997    combined_state
1998        .execution_with_state
1999        .pending_state
2000        .can_append_lock(created_at, executor_id, run_id, lock_expires_at)?;
2001    let expected_version = combined_state.get_next_version_assert_not_finished();
2002    check_expected_next_and_appending_version(&expected_version, appending_version)?;
2003
2004    // Prepare Event
2005    let locked_event = Locked {
2006        component_id: component_id.clone(),
2007        deployment_id,
2008        executor_id,
2009        lock_expires_at,
2010        run_id,
2011        retry_config,
2012    };
2013    let event = ExecutionRequest::Locked(locked_event.clone());
2014
2015    let event = Json(event);
2016
2017    // Append to execution_log
2018    tx.execute(
2019        "INSERT INTO t_execution_log \
2020            (execution_id, created_at, json_value, version, variant) \
2021            VALUES ($1, $2, $3, $4, $5)",
2022        &[
2023            &execution_id.to_string(),
2024            &created_at,
2025            &event,
2026            &i64::from(appending_version.0),
2027            &event.0.variant(),
2028        ],
2029    )
2030    .await
2031    .map_err(|err| {
2032        DbErrorWrite::NonRetriable(DbErrorWriteNonRetriable::IllegalState {
2033            reason: "cannot lock".into(),
2034            context: SpanTrace::capture(),
2035            source: Some(Arc::new(err)),
2036            loc: Location::caller(),
2037        })
2038    })?;
2039
2040    let responses = list_responses(tx, execution_id, None).await?;
2041    trace!("Responses: {responses:?}");
2042
2043    // Update t_state
2044    let intermittent_event_count = update_state_locked_get_intermittent_event_count(
2045        tx,
2046        execution_id,
2047        deployment_id,
2048        &component_id.component_digest,
2049        executor_id,
2050        run_id,
2051        lock_expires_at,
2052        appending_version,
2053        retry_config,
2054    )
2055    .await?;
2056
2057    // Fetch History
2058    // Fetch event_history and `Created` event.
2059    let rows = tx
2060        .query(
2061            "SELECT json_value, version FROM t_execution_log WHERE \
2062                execution_id = $1 AND (variant = $2 OR variant = $3) \
2063                ORDER BY version",
2064            &[
2065                &execution_id.to_string(),
2066                &DUMMY_CREATED.variant(),
2067                &DUMMY_HISTORY_EVENT.variant(),
2068            ],
2069        )
2070        .await
2071        .map_err(DbErrorGeneric::from)?;
2072
2073    let mut events: VecDeque<ExecutionEvent> = VecDeque::new();
2074
2075    for row in rows {
2076        let event: Json<ExecutionRequest> = get(&row, "json_value")?;
2077        let event = event.0;
2078
2079        let version: i64 = get(&row, "version")?;
2080        let version = Version::try_from(version)
2081            .map_err(|_| consistency_db_err("version must be non-negative"))?;
2082
2083        events.push_back(ExecutionEvent {
2084            created_at: DateTime::from_timestamp_nanos(0), // not used, only the inner event and version
2085            event,
2086            backtrace_id: None,
2087            version,
2088        });
2089    }
2090
2091    // Extract Created Event
2092    let Some(ExecutionRequest::Created {
2093        ffqn,
2094        params,
2095        parent,
2096        metadata,
2097        ..
2098    }) = events.pop_front().map(|outer| outer.event)
2099    else {
2100        error!("Execution log must contain at least `Created` event");
2101        return Err(consistency_db_err("execution log must contain `Created` event").into());
2102    };
2103
2104    // Extract History Events
2105    let mut event_history = Vec::new();
2106    for ExecutionEvent { event, version, .. } in events {
2107        if let ExecutionRequest::HistoryEvent { event } = event {
2108            event_history.push((event, version));
2109        } else {
2110            error!("Rows can only contain `Created` and `HistoryEvent` event kinds");
2111            return Err(consistency_db_err(
2112                "rows can only contain `Created` and `HistoryEvent` event kinds",
2113            )
2114            .into());
2115        }
2116    }
2117
2118    Ok(LockedExecution {
2119        execution_id: execution_id.clone(),
2120        metadata,
2121        next_version: appending_version.increment(),
2122        ffqn,
2123        params,
2124        event_history,
2125        responses,
2126        parent,
2127        intermittent_event_count,
2128        locked_event,
2129    })
2130}
2131
2132async fn count_join_next(
2133    tx: &Transaction<'_>,
2134    execution_id: &ExecutionId,
2135    join_set_id: &JoinSetId,
2136) -> Result<u32, DbErrorRead> {
2137    let row = tx
2138            .query_one(
2139                "SELECT COUNT(*) as count FROM t_execution_log WHERE execution_id = $1 AND join_set_id = $2 \
2140                AND history_event_type = $3",
2141                &[
2142                    &execution_id.to_string(),
2143                    &join_set_id.to_string(),
2144                    &HISTORY_EVENT_TYPE_JOIN_NEXT,
2145                ],
2146            )
2147            .await
2148            .map_err(DbErrorRead::from)?;
2149
2150    let count = u32::try_from(get::<i64, _>(&row, "count")?).expect("COUNT cannot be negative");
2151    Ok(count)
2152}
2153
2154async fn nth_response(
2155    tx: &Transaction<'_>,
2156    execution_id: &ExecutionId,
2157    join_set_id: &JoinSetId,
2158    skip_rows: u32,
2159) -> Result<Option<ResponseWithCursor>, DbErrorRead> {
2160    let row = tx
2161            .query_opt(
2162                "SELECT r.id, r.created_at, r.join_set_id, \
2163                 r.delay_id, r.delay_success, \
2164                 r.child_execution_id, r.finished_version, l.json_value \
2165                 FROM t_join_set_response r LEFT OUTER JOIN t_execution_log l ON r.child_execution_id = l.execution_id \
2166                 WHERE \
2167                 r.execution_id = $1 AND r.join_set_id = $2 AND \
2168                 ( \
2169                 r.finished_version = l.version \
2170                 OR \
2171                 r.child_execution_id IS NULL \
2172                 ) \
2173                 ORDER BY id \
2174                 LIMIT 1 OFFSET $3",
2175                 &[
2176                     &execution_id.to_string(),
2177                     &join_set_id.to_string(),
2178                     &i64::from(skip_rows),
2179                 ]
2180            )
2181            .await
2182            .map_err(DbErrorRead::from)?;
2183
2184    match row {
2185        Some(r) => Ok(Some(parse_response_with_cursor(&r)?)),
2186        None => Ok(None),
2187    }
2188}
2189
2190#[instrument(level = Level::TRACE, skip_all, fields(%execution_id))]
2191async fn append(
2192    tx: &Transaction<'_>,
2193    execution_id: &ExecutionId,
2194    req: AppendRequest,
2195    appending_version: Version,
2196) -> Result<(AppendResponse, AppendNotifier), DbErrorWrite> {
2197    if matches!(req.event, ExecutionRequest::Created { .. }) {
2198        return Err(DbErrorWrite::NonRetriable(
2199            DbErrorWriteNonRetriable::ValidationFailed(
2200                "cannot append `Created` event - use `create` instead".into(),
2201            ),
2202        ));
2203    }
2204
2205    if let AppendRequest {
2206        event:
2207            ExecutionRequest::Locked(Locked {
2208                component_id,
2209                deployment_id,
2210                executor_id,
2211                run_id,
2212                lock_expires_at,
2213                retry_config,
2214            }),
2215        created_at,
2216    } = req
2217    {
2218        return lock_single_execution(
2219            tx,
2220            created_at,
2221            &component_id,
2222            deployment_id,
2223            execution_id,
2224            run_id,
2225            &appending_version,
2226            executor_id,
2227            lock_expires_at,
2228            retry_config,
2229        )
2230        .await
2231        .map(|locked_execution| (locked_execution.next_version, AppendNotifier::default()));
2232    }
2233
2234    let combined_state = get_combined_state(tx, execution_id).await?;
2235    if combined_state
2236        .execution_with_state
2237        .pending_state
2238        .is_finished()
2239    {
2240        debug!("Execution is already finished");
2241        return Err(DbErrorWrite::NonRetriable(
2242            DbErrorWriteNonRetriable::AlreadyFinished,
2243        ));
2244    }
2245
2246    check_expected_next_and_appending_version(
2247        &combined_state.get_next_version_assert_not_finished(),
2248        &appending_version,
2249    )?;
2250
2251    let event = Json(req.event);
2252
2253    // Insert into t_execution_log
2254    tx.execute(
2255            "INSERT INTO t_execution_log (execution_id, created_at, json_value, version, variant, join_set_id) \
2256             VALUES ($1, $2, $3, $4, $5, $6)",
2257            &[
2258                &execution_id.to_string(),
2259                &req.created_at,
2260                &event,
2261                &i64::from(appending_version.0),
2262                &event.0.variant(),
2263                &event.0.join_set_id().map(std::string::ToString::to_string),
2264            ],
2265        )
2266        .await?;
2267
2268    // Calculate current pending state
2269    match &event.0 {
2270        ExecutionRequest::Created { .. } => {
2271            unreachable!("handled in the caller")
2272        }
2273
2274        ExecutionRequest::Locked { .. } => {
2275            unreachable!("handled above")
2276        }
2277
2278        ExecutionRequest::TemporarilyFailed {
2279            backoff_expires_at, ..
2280        }
2281        | ExecutionRequest::TemporarilyTimedOut {
2282            backoff_expires_at, ..
2283        } => {
2284            let (next_version, notifier) = update_state_pending_after_event_appended(
2285                tx,
2286                execution_id,
2287                &appending_version,
2288                *backoff_expires_at,
2289                true, // an intermittent failure
2290                combined_state.execution_with_state.component_digest,
2291            )
2292            .await?;
2293            return Ok((next_version, notifier));
2294        }
2295
2296        ExecutionRequest::Unlocked {
2297            backoff_expires_at, ..
2298        } => {
2299            let (next_version, notifier) = update_state_pending_after_event_appended(
2300                tx,
2301                execution_id,
2302                &appending_version,
2303                *backoff_expires_at,
2304                false, // not an intermittent failure
2305                combined_state.execution_with_state.component_digest,
2306            )
2307            .await?;
2308            return Ok((next_version, notifier));
2309        }
2310
2311        ExecutionRequest::Paused => {
2312            match &combined_state.execution_with_state.pending_state {
2313                PendingState::Finished { .. } => {
2314                    unreachable!("handled above");
2315                }
2316                PendingState::Paused(..) => {
2317                    return Err(DbErrorWriteNonRetriable::IllegalState {
2318                        reason: "cannot pause, execution is already paused".into(),
2319                        context: SpanTrace::capture(),
2320                        source: None,
2321                        loc: Location::caller(),
2322                    }
2323                    .into());
2324                }
2325                _ => {}
2326            }
2327            let next_version =
2328                update_state_paused(tx, execution_id, &appending_version, true).await?;
2329            return Ok((next_version, AppendNotifier::default()));
2330        }
2331
2332        ExecutionRequest::Unpaused => {
2333            if !combined_state
2334                .execution_with_state
2335                .pending_state
2336                .is_paused()
2337            {
2338                return Err(DbErrorWriteNonRetriable::IllegalState {
2339                    reason: "cannot unpause, execution is not paused".into(),
2340                    context: SpanTrace::capture(),
2341                    source: None,
2342                    loc: Location::caller(),
2343                }
2344                .into());
2345            }
2346            let next_version =
2347                update_state_paused(tx, execution_id, &appending_version, false).await?;
2348            return Ok((next_version, AppendNotifier::default()));
2349        }
2350
2351        ExecutionRequest::Finished { retval, .. } => {
2352            update_state_finished(
2353                tx,
2354                execution_id,
2355                &appending_version,
2356                req.created_at,
2357                PendingStateFinishedResultKind::from(retval),
2358            )
2359            .await?;
2360            return Ok((
2361                appending_version,
2362                AppendNotifier {
2363                    pending_at: None,
2364                    execution_finished: Some(NotifierExecutionFinished {
2365                        execution_id: execution_id.clone(),
2366                        retval: retval.clone(),
2367                    }),
2368                    response: None,
2369                },
2370            ));
2371        }
2372
2373        ExecutionRequest::HistoryEvent {
2374            event:
2375                HistoryEvent::JoinSetCreate { .. }
2376                | HistoryEvent::JoinSetRequest {
2377                    request: JoinSetRequest::ChildExecutionRequest { .. },
2378                    ..
2379                }
2380                | HistoryEvent::Persist { .. }
2381                | HistoryEvent::Schedule { .. }
2382                | HistoryEvent::Stub { .. }
2383                | HistoryEvent::JoinNextTooMany { .. }
2384                | HistoryEvent::JoinNextTry { .. },
2385        } => {
2386            return Ok((
2387                bump_state_next_version(tx, execution_id, &appending_version, None).await?,
2388                AppendNotifier::default(),
2389            ));
2390        }
2391
2392        ExecutionRequest::HistoryEvent {
2393            event:
2394                HistoryEvent::JoinSetRequest {
2395                    join_set_id,
2396                    request:
2397                        JoinSetRequest::DelayRequest {
2398                            delay_id,
2399                            expires_at,
2400                            ..
2401                        },
2402                },
2403        } => {
2404            return Ok((
2405                bump_state_next_version(
2406                    tx,
2407                    execution_id,
2408                    &appending_version,
2409                    Some(DelayReq {
2410                        join_set_id: join_set_id.clone(),
2411                        delay_id: delay_id.clone(),
2412                        expires_at: *expires_at,
2413                    }),
2414                )
2415                .await?,
2416                AppendNotifier::default(),
2417            ));
2418        }
2419
2420        ExecutionRequest::HistoryEvent {
2421            event:
2422                HistoryEvent::JoinNext {
2423                    join_set_id,
2424                    run_expires_at,
2425                    closing,
2426                    requested_ffqn: _,
2427                },
2428        } => {
2429            // Did the response arrive already?
2430            let join_next_count = count_join_next(tx, execution_id, join_set_id).await?;
2431
2432            // Fetch the response corresponding to this JoinNext (skip n-1)
2433            let nth_response =
2434                nth_response(tx, execution_id, join_set_id, join_next_count - 1).await?;
2435
2436            trace!("join_next_count: {join_next_count}, nth_response: {nth_response:?}");
2437            assert!(join_next_count > 0);
2438
2439            if let Some(ResponseWithCursor {
2440                event:
2441                    JoinSetResponseEventOuter {
2442                        created_at: nth_created_at,
2443                        ..
2444                    },
2445                cursor: _,
2446            }) = nth_response
2447            {
2448                let scheduled_at = std::cmp::max(*run_expires_at, nth_created_at);
2449                let (next_version, notifier) = update_state_pending_after_event_appended(
2450                    tx,
2451                    execution_id,
2452                    &appending_version,
2453                    scheduled_at,
2454                    false, // not an intermittent failure
2455                    combined_state.execution_with_state.component_digest,
2456                )
2457                .await?;
2458                return Ok((next_version, notifier));
2459            }
2460
2461            return Ok((
2462                update_state_blocked(
2463                    tx,
2464                    execution_id,
2465                    &appending_version,
2466                    join_set_id,
2467                    *run_expires_at,
2468                    *closing,
2469                )
2470                .await?,
2471                AppendNotifier::default(),
2472            ));
2473        }
2474    }
2475}
2476
2477async fn append_response(
2478    tx: &Transaction<'_>,
2479    execution_id: &ExecutionId,
2480    event: JoinSetResponseEventOuter,
2481) -> Result<AppendNotifier, DbErrorWrite> {
2482    let join_set_id = &event.event.join_set_id;
2483
2484    let (delay_id, delay_success) = match &event.event.event {
2485        JoinSetResponse::DelayFinished { delay_id, result } => {
2486            (Some(delay_id.to_string()), Some(result.is_ok()))
2487        }
2488        JoinSetResponse::ChildExecutionFinished { .. } => (None, None),
2489    };
2490
2491    let (child_execution_id, finished_version) = match &event.event.event {
2492        JoinSetResponse::ChildExecutionFinished {
2493            child_execution_id,
2494            finished_version,
2495            result: _,
2496        } => (
2497            Some(child_execution_id.to_string()),
2498            Some(i64::from(finished_version.0)),
2499        ),
2500        JoinSetResponse::DelayFinished { .. } => (None, None),
2501    };
2502
2503    let row = tx.query_one(
2504            "INSERT INTO t_join_set_response (execution_id, created_at, join_set_id, delay_id, delay_success, child_execution_id, finished_version) \
2505             VALUES ($1, $2, $3, $4, $5, $6, $7) RETURNING id",
2506             &[
2507                 &execution_id.to_string(),
2508                 &event.created_at,
2509                 &join_set_id.to_string(),
2510                 &delay_id,
2511                 &delay_success,
2512                 &child_execution_id,
2513                 &finished_version,
2514             ]
2515        ).await?;
2516    let cursor = ResponseCursor(
2517        u32::try_from(get::<i64, _>(&row, 0)?)
2518            .map_err(|_| consistency_db_err("t_join_set_response.id must not be negative"))?,
2519    );
2520    // if the execution is going to be unblocked by this response...
2521    let combined_state = get_combined_state(tx, execution_id).await?;
2522    debug!("previous_pending_state: {combined_state:?}");
2523
2524    let mut notifier = if let PendingStateMergedPause::BlockedByJoinSet {
2525        state:
2526            PendingStateBlockedByJoinSet {
2527                join_set_id: found_join_set_id,
2528                lock_expires_at, // Set to a future time if the worker is keeping the execution warm waiting for the result.
2529                closing: _,
2530            },
2531        paused: _,
2532    } =
2533        PendingStateMergedPause::from(combined_state.execution_with_state.pending_state)
2534        && *join_set_id == found_join_set_id
2535    {
2536        let scheduled_at = std::cmp::max(lock_expires_at, event.created_at);
2537        // Unblock the state.
2538        update_state_pending_after_response_appended(
2539            tx,
2540            execution_id,
2541            scheduled_at,
2542            combined_state.execution_with_state.component_digest,
2543        )
2544        .await?
2545    } else {
2546        AppendNotifier::default()
2547    };
2548
2549    if let JoinSetResponseEvent {
2550        join_set_id,
2551        event:
2552            JoinSetResponse::DelayFinished {
2553                delay_id,
2554                result: _,
2555            },
2556    } = &event.event
2557    {
2558        debug!(%join_set_id, %delay_id, "Deleting from `t_delay`");
2559        tx.execute(
2560            "DELETE FROM t_delay WHERE execution_id = $1 AND join_set_id = $2 AND delay_id = $3",
2561            &[
2562                &execution_id.to_string(),
2563                &join_set_id.to_string(),
2564                &delay_id.to_string(),
2565            ],
2566        )
2567        .await?;
2568    }
2569
2570    notifier.response = Some((execution_id.clone(), ResponseWithCursor { cursor, event }));
2571    Ok(notifier)
2572}
2573
2574async fn append_backtrace(
2575    tx: &Transaction<'_>,
2576    backtrace_info: &BacktraceInfo,
2577) -> Result<(), DbErrorWrite> {
2578    // Compute hash for deduplication
2579    let backtrace_hash = backtrace_info.wasm_backtrace.hash();
2580
2581    // Insert into t_wasm_backtrace if not already present
2582    tx.execute(
2583        "INSERT INTO t_wasm_backtrace (backtrace_hash, wasm_backtrace) \
2584         VALUES ($1, $2) \
2585         ON CONFLICT (backtrace_hash) DO NOTHING",
2586        &[
2587            &backtrace_hash.as_slice(),
2588            &Json(&backtrace_info.wasm_backtrace),
2589        ],
2590    )
2591    .await?;
2592
2593    // Insert into t_execution_backtrace referencing the hash
2594    tx.execute(
2595        "INSERT INTO t_execution_backtrace \
2596         (execution_id, component_id, version_min_including, version_max_excluding, backtrace_hash) \
2597         VALUES ($1, $2, $3, $4, $5)",
2598        &[
2599            &backtrace_info.execution_id.to_string(),
2600            &Json(&backtrace_info.component_id),
2601            &i64::from(backtrace_info.version_min_including.0),
2602            &i64::from(backtrace_info.version_max_excluding.0),
2603            &backtrace_hash.as_slice(),
2604        ],
2605    )
2606    .await?;
2607
2608    Ok(())
2609}
2610
2611async fn append_log(tx: &Transaction<'_>, row: &LogInfoAppendRow) -> Result<(), DbErrorWrite> {
2612    let (level, message, stream_type, payload, created_at) = match &row.log_entry {
2613        LogEntry::Log {
2614            created_at,
2615            level,
2616            message,
2617        } => (
2618            Some(*level as i32),
2619            Some(message.as_str()),
2620            None::<i32>,
2621            None::<&[u8]>,
2622            created_at,
2623        ),
2624        LogEntry::Stream {
2625            created_at,
2626            payload,
2627            stream_type,
2628        } => (
2629            None::<i32>,
2630            None::<&str>,
2631            Some(*stream_type as i32),
2632            Some(payload.as_slice()),
2633            created_at,
2634        ),
2635    };
2636
2637    tx.execute(
2638        "INSERT INTO t_log (
2639            execution_id,
2640            run_id,
2641            created_at,
2642            level,
2643            message,
2644            stream_type,
2645            payload
2646        ) VALUES ($1, $2, $3, $4, $5, $6, $7)",
2647        &[
2648            &row.execution_id.to_string(),
2649            &row.run_id.to_string(),
2650            &created_at,
2651            &level,
2652            &message,
2653            &stream_type,
2654            &payload,
2655        ],
2656    )
2657    .await?;
2658
2659    Ok(())
2660}
2661
2662async fn get_execution_log(
2663    tx: &Transaction<'_>,
2664    execution_id: &ExecutionId,
2665) -> Result<concepts::storage::ExecutionLog, DbErrorRead> {
2666    let rows = tx
2667        .query(
2668            "SELECT created_at, json_value, version FROM t_execution_log WHERE \
2669                 execution_id = $1 ORDER BY version",
2670            &[&execution_id.to_string()],
2671        )
2672        .await
2673        .map_err(DbErrorRead::from)?;
2674
2675    if rows.is_empty() {
2676        return Err(DbErrorRead::NotFound);
2677    }
2678
2679    let mut events = Vec::with_capacity(rows.len());
2680    for row in rows {
2681        let created_at: DateTime<Utc> = get(&row, "created_at")?;
2682        let event: Json<ExecutionRequest> = get(&row, "json_value")?;
2683        let event = event.0;
2684        let version: i64 = get(&row, "version")?;
2685        let version = Version::try_from(version)
2686            .map_err(|_| consistency_db_err("version must be non-negative"))?;
2687
2688        events.push(ExecutionEvent {
2689            created_at,
2690            event,
2691            backtrace_id: None,
2692            version,
2693        });
2694    }
2695
2696    let combined_state = get_combined_state(tx, execution_id).await?;
2697    let responses = list_responses(tx, execution_id, None).await?;
2698
2699    Ok(concepts::storage::ExecutionLog {
2700        execution_id: execution_id.clone(),
2701        events,
2702        responses,
2703        next_version: combined_state.get_next_version_or_finished(),
2704        pending_state: combined_state.execution_with_state.pending_state,
2705        component_digest: combined_state.execution_with_state.component_digest,
2706        component_type: combined_state.execution_with_state.component_type,
2707        deployment_id: combined_state.execution_with_state.deployment_id,
2708    })
2709}
2710
2711async fn get_max_version(
2712    tx: &Transaction<'_>,
2713    execution_id: &ExecutionId,
2714) -> Result<Version, DbErrorRead> {
2715    let row = tx
2716        .query_one(
2717            "SELECT MAX(version) as version FROM t_execution_log WHERE execution_id = $1",
2718            &[&execution_id.to_string()],
2719        )
2720        .await?;
2721    let max_version: i64 = get(&row, "version")?;
2722    let max_version = Version::try_from(max_version)
2723        .map_err(|_| consistency_db_err("version must be non-negative"))?;
2724    Ok(max_version)
2725}
2726
2727async fn get_max_response_cursor(
2728    tx: &Transaction<'_>,
2729    execution_id: &ExecutionId,
2730) -> Result<ResponseCursor, DbErrorRead> {
2731    let row = tx
2732        .query_one(
2733            "SELECT MAX(id) as id FROM t_join_set_response WHERE execution_id = $1",
2734            &[&execution_id.to_string()],
2735        )
2736        .await?;
2737    // Assume the execution exists and has no responses
2738    let max_cursor = get::<Option<i64>, _>(&row, "id")?.unwrap_or_default();
2739    let max_cursor = ResponseCursor(
2740        u32::try_from(max_cursor).map_err(|_| consistency_db_err("id must not be negative"))?,
2741    );
2742    Ok(max_cursor)
2743}
2744
2745async fn list_execution_events(
2746    tx: &Transaction<'_>,
2747    execution_id: &ExecutionId,
2748    pagination: Pagination<VersionType>,
2749    include_backtrace_id: bool,
2750) -> Result<Vec<ExecutionEvent>, DbErrorRead> {
2751    let mut params: Vec<Box<dyn ToSql + Send + Sync>> = Vec::new();
2752    let mut add_param = |p: Box<dyn ToSql + Send + Sync>| {
2753        params.push(p);
2754        format!("${}", params.len())
2755    };
2756
2757    let p_execution_id = add_param(Box::new(execution_id.to_string()));
2758
2759    let (cursor, length, rel, is_desc) = match &pagination {
2760        Pagination::NewerThan {
2761            cursor,
2762            length,
2763            including_cursor,
2764        } => (
2765            *cursor,
2766            *length,
2767            if *including_cursor { ">=" } else { ">" },
2768            false,
2769        ),
2770        Pagination::OlderThan {
2771            cursor,
2772            length,
2773            including_cursor,
2774        } => (
2775            *cursor,
2776            *length,
2777            if *including_cursor { "<=" } else { "<" },
2778            true,
2779        ),
2780    };
2781    let p_cursor = add_param(Box::new(i64::from(cursor)));
2782    let p_limit = add_param(Box::new(i64::from(length)));
2783
2784    let base_select = if include_backtrace_id {
2785        format!(
2786            "SELECT
2787                log.created_at,
2788                log.json_value,
2789                log.version,
2790                bt.version_min_including AS backtrace_id
2791            FROM
2792                t_execution_log AS log
2793            LEFT OUTER JOIN
2794                t_execution_backtrace AS bt ON log.execution_id = bt.execution_id
2795                                AND log.version >= bt.version_min_including
2796                                AND log.version < bt.version_max_excluding
2797            WHERE
2798                log.execution_id = {p_execution_id}
2799                AND log.version {rel} {p_cursor}"
2800        )
2801    } else {
2802        format!(
2803            "SELECT
2804                created_at, json_value, NULL::BIGINT as backtrace_id, version
2805            FROM t_execution_log WHERE
2806                execution_id = {p_execution_id} AND version {rel} {p_cursor}"
2807        )
2808    };
2809
2810    let order = if is_desc { "DESC" } else { "ASC" };
2811    let mut sql = format!("{base_select} ORDER BY version {order} LIMIT {p_limit}");
2812
2813    // Re-order to ascending for consistent oldest-to-newest results
2814    if is_desc {
2815        sql = format!("SELECT * FROM ({sql}) AS sub ORDER BY version ASC");
2816    }
2817
2818    let params_refs: Vec<&(dyn ToSql + Sync)> = params
2819        .iter()
2820        .map(|p| p.as_ref() as &(dyn ToSql + Sync))
2821        .collect();
2822
2823    let rows = tx
2824        .query(&sql, &params_refs)
2825        .await
2826        .map_err(DbErrorRead::from)?;
2827
2828    let mut events = Vec::with_capacity(rows.len());
2829    for row in rows {
2830        let created_at: DateTime<Utc> = get(&row, "created_at")?;
2831        let backtrace_id = get::<Option<i64>, _>(&row, "backtrace_id")?
2832            .map(Version::try_from)
2833            .transpose()
2834            .map_err(|_| consistency_db_err("version must be non-negative"))?;
2835
2836        let version = get::<i64, _>(&row, "version")?;
2837        let version = Version::new(
2838            VersionType::try_from(version)
2839                .map_err(|_| consistency_db_err("version must be non-negative"))?,
2840        );
2841        let event_req: Json<ExecutionRequest> = get(&row, "json_value")?;
2842        let event_req = event_req.0;
2843
2844        events.push(ExecutionEvent {
2845            created_at,
2846            event: event_req,
2847            backtrace_id,
2848            version,
2849        });
2850    }
2851    Ok(events)
2852}
2853
2854async fn get_execution_event(
2855    tx: &Transaction<'_>,
2856    execution_id: &ExecutionId,
2857    version: VersionType,
2858) -> Result<ExecutionEvent, DbErrorRead> {
2859    let row = tx
2860        .query_one(
2861            "SELECT created_at, json_value, version FROM t_execution_log WHERE \
2862                 execution_id = $1 AND version = $2",
2863            &[&execution_id.to_string(), &i64::from(version)],
2864        )
2865        .await?;
2866
2867    let created_at: DateTime<Utc> = get(&row, "created_at")?;
2868    let json_val: Json<ExecutionRequest> = get(&row, "json_value")?;
2869    let version = get::<i64, _>(&row, "version")?;
2870    let version = Version::try_from(version)
2871        .map_err(|_| consistency_db_err("version must be non-negative"))?;
2872    let event = json_val.0;
2873
2874    Ok(ExecutionEvent {
2875        created_at,
2876        event,
2877        backtrace_id: None,
2878        version,
2879    })
2880}
2881
2882async fn get_last_execution_event(
2883    tx: &Transaction<'_>,
2884    execution_id: &ExecutionId,
2885) -> Result<ExecutionEvent, DbErrorRead> {
2886    let row = tx
2887        .query_one(
2888            "SELECT created_at, json_value, version FROM t_execution_log WHERE \
2889                 execution_id = $1 ORDER BY version DESC LIMIT 1",
2890            &[&execution_id.to_string()],
2891        )
2892        .await?;
2893
2894    let created_at: DateTime<Utc> = get(&row, "created_at")?;
2895    let event: Json<ExecutionRequest> = get(&row, "json_value")?;
2896    let event = event.0;
2897    let version: i64 = get(&row, "version")?;
2898    let version = Version::try_from(version)
2899        .map_err(|_| consistency_db_err("version must be non-negative"))?;
2900
2901    Ok(ExecutionEvent {
2902        created_at,
2903        event,
2904        backtrace_id: None,
2905        version,
2906    })
2907}
2908
2909async fn delay_response(
2910    tx: &Transaction<'_>,
2911    execution_id: &ExecutionId,
2912    delay_id: &DelayId,
2913) -> Result<Option<bool>, DbErrorRead> {
2914    let row = tx
2915        .query_opt(
2916            "SELECT delay_success \
2917                 FROM t_join_set_response \
2918                 WHERE \
2919                 execution_id = $1 AND delay_id = $2",
2920            &[&execution_id.to_string(), &delay_id.to_string()],
2921        )
2922        .await?;
2923
2924    match row {
2925        Some(r) => Ok(Some(get::<bool, _>(&r, "delay_success")?)),
2926        None => Ok(None),
2927    }
2928}
2929
2930#[instrument(level = Level::TRACE, skip_all)]
2931async fn get_responses_after(
2932    tx: &Transaction<'_>,
2933    execution_id: &ExecutionId,
2934    last_response: ResponseCursor,
2935) -> Result<Vec<ResponseWithCursor>, DbErrorRead> {
2936    let rows = tx
2937            .query(
2938                "SELECT r.id, r.created_at, r.join_set_id, \
2939                 r.delay_id, r.delay_success, \
2940                 r.child_execution_id, r.finished_version, child.json_value \
2941                 FROM t_join_set_response r LEFT OUTER JOIN t_execution_log child ON r.child_execution_id = child.execution_id \
2942                 WHERE \
2943                 r.id > $1 AND \
2944                 r.execution_id = $2 AND \
2945                 ( \
2946                 r.finished_version = child.version \
2947                 OR \
2948                 r.child_execution_id IS NULL \
2949                 ) \
2950                 ORDER BY id \
2951                 ",
2952                 &[
2953                     &i64::from(last_response.0),
2954                     &execution_id.to_string(),
2955                 ]
2956            )
2957            .await?;
2958
2959    let mut results = Vec::with_capacity(rows.len());
2960    for row in rows {
2961        let resp = parse_response_with_cursor(&row)?;
2962        results.push(resp);
2963    }
2964    Ok(results)
2965}
2966
2967async fn get_pending_of_single_ffqn(
2968    tx: &Transaction<'_>,
2969    batch_size: u32,
2970    pending_at_or_sooner: DateTime<Utc>,
2971    ffqn: &FunctionFqn,
2972    select_strategy: SelectStrategy,
2973) -> Result<Vec<(ExecutionId, Version)>, ()> {
2974    let rows = tx
2975        .query(
2976            &format!(
2977                r"
2978                SELECT execution_id, corresponding_version FROM t_state
2979                WHERE
2980                state = '{STATE_PENDING_AT}' AND
2981                pending_expires_finished <= $1 AND ffqn = $2
2982                AND is_paused = false
2983                ORDER BY pending_expires_finished
2984                {}
2985                LIMIT $3
2986                ",
2987                if select_strategy == SelectStrategy::LockForUpdate {
2988                    "FOR UPDATE SKIP LOCKED"
2989                } else {
2990                    ""
2991                }
2992            ),
2993            &[
2994                &pending_at_or_sooner,
2995                &ffqn.to_string(),
2996                &(i64::from(batch_size)),
2997            ],
2998        )
2999        .await
3000        .map_err(|err| {
3001            warn!("Ignoring consistency error {err:?}");
3002        })?;
3003
3004    let mut result = Vec::with_capacity(rows.len());
3005    for row in rows {
3006        let unpack = || -> Result<(ExecutionId, Version), DbErrorGeneric> {
3007            let eid_str: String = get(&row, "execution_id")?;
3008            let corresponding_version: i64 = get(&row, "corresponding_version")?;
3009            let corresponding_version = Version::try_from(corresponding_version)
3010                .map_err(|_| consistency_db_err("version must be non-negative"))?;
3011
3012            if let Ok(eid) = ExecutionId::from_str(&eid_str) {
3013                return Ok((eid, corresponding_version.increment()));
3014            }
3015            Err(consistency_db_err("invalid execution_id"))
3016        };
3017
3018        match unpack() {
3019            Ok(val) => result.push(val),
3020            Err(err) => warn!("Ignoring corrupted row in pending check: {err:?}"),
3021        }
3022    }
3023    Ok(result)
3024}
3025
3026/// Get executions and their next versions
3027async fn get_pending_by_ffqns(
3028    tx: &Transaction<'_>,
3029    batch_size: u32,
3030    pending_at_or_sooner: DateTime<Utc>,
3031    ffqns: &[FunctionFqn],
3032    select_strategy: SelectStrategy,
3033) -> Result<Vec<(ExecutionId, Version)>, DbErrorGeneric> {
3034    let batch_size = usize::try_from(batch_size).expect("16 bit systems are unsupported");
3035    let mut execution_ids_versions = Vec::with_capacity(batch_size);
3036
3037    for ffqn in ffqns {
3038        let needed = batch_size - execution_ids_versions.len();
3039        if needed == 0 {
3040            break;
3041        }
3042        let needed = u32::try_from(needed).expect("u32 - usize cannot overflow an 32");
3043        if let Ok(execs) =
3044            get_pending_of_single_ffqn(tx, needed, pending_at_or_sooner, ffqn, select_strategy)
3045                .await
3046        {
3047            execution_ids_versions.extend(execs);
3048        }
3049    }
3050
3051    Ok(execution_ids_versions)
3052}
3053
3054#[derive(Debug, Copy, Clone, PartialEq, Eq)]
3055enum SelectStrategy {
3056    Read,
3057    LockForUpdate,
3058}
3059
3060async fn get_pending_by_component_input_digest(
3061    tx: &Transaction<'_>,
3062    batch_size: u32,
3063    pending_at_or_sooner: DateTime<Utc>,
3064    input_digest: &ComponentDigest,
3065    select_strategy: SelectStrategy,
3066) -> Result<Vec<(ExecutionId, Version)>, DbErrorGeneric> {
3067    let rows = tx
3068        .query(
3069            &format!(
3070                r"
3071                SELECT execution_id, corresponding_version FROM t_state WHERE
3072                state = '{STATE_PENDING_AT}' AND
3073                pending_expires_finished <= $1 AND
3074                component_id_input_digest = $2
3075                AND is_paused = false
3076                ORDER BY pending_expires_finished
3077                {}
3078                LIMIT $3
3079                ",
3080                if select_strategy == SelectStrategy::LockForUpdate {
3081                    "FOR UPDATE SKIP LOCKED"
3082                } else {
3083                    ""
3084                }
3085            ),
3086            &[&pending_at_or_sooner, &input_digest, &i64::from(batch_size)],
3087        )
3088        .await?;
3089
3090    let mut result = Vec::with_capacity(rows.len());
3091    for row in rows {
3092        let unpack = || -> Result<(ExecutionId, Version), DbErrorGeneric> {
3093            let eid_str: String = get(&row, "execution_id")?;
3094            let corresponding_version: i64 = get(&row, "corresponding_version")?;
3095            let corresponding_version = Version::try_from(corresponding_version)
3096                .map_err(|_| consistency_db_err("version must be non-negative"))?;
3097
3098            let eid = ExecutionId::from_str(&eid_str)
3099                .map_err(|err| consistency_db_err(err.to_string()))?;
3100            Ok((eid, corresponding_version.increment()))
3101        };
3102
3103        match unpack() {
3104            Ok(val) => result.push(val),
3105            Err(err) => {
3106                warn!("Skipping corrupted row in get_pending_by_component_input_digest: {err:?}");
3107            }
3108        }
3109    }
3110
3111    Ok(result)
3112}
3113
3114fn notify_pending_locked(
3115    notifier: &NotifierPendingAt,
3116    current_time: DateTime<Utc>,
3117    ffqn_to_pending_subscription: &std::sync::MutexGuard<PendingFfqnSubscribersHolder>,
3118) {
3119    if notifier.scheduled_at <= current_time {
3120        ffqn_to_pending_subscription.notify(notifier);
3121    }
3122}
3123
3124async fn upgrade_execution_component(
3125    tx: &Transaction<'_>,
3126    execution_id: &ExecutionId,
3127    old: &ComponentDigest,
3128    new: &ComponentDigest,
3129) -> Result<(), DbErrorWrite> {
3130    debug!("Updating t_state to component {new}");
3131
3132    let updated = tx
3133        .execute(
3134            r"
3135                UPDATE t_state
3136                SET
3137                    updated_at = CURRENT_TIMESTAMP,
3138                    component_id_input_digest = $1
3139                WHERE
3140                    execution_id = $2 AND
3141                    component_id_input_digest = $3
3142                ",
3143            &[
3144                &new.as_slice(),           // $1: BYTEA
3145                &execution_id.to_string(), // $2: TEXT
3146                &old.as_slice(),           // $3: BYTEA
3147            ],
3148        )
3149        .await?;
3150
3151    if updated != 1 {
3152        return Err(DbErrorWrite::NotFound);
3153    }
3154    Ok(())
3155}
3156
3157impl PostgresConnection {
3158    // Must be called after write transaction commit for a correct happens-before relationship.
3159    #[instrument(level = Level::TRACE, skip_all)]
3160    fn notify_all(&self, notifiers: Vec<AppendNotifier>, current_time: DateTime<Utc>) {
3161        let (pending_ats, finished_execs, responses) = {
3162            let (mut pending_ats, mut finished_execs, mut responses) =
3163                (Vec::new(), Vec::new(), Vec::new());
3164            for notifier in notifiers {
3165                if let Some(pending_at) = notifier.pending_at {
3166                    pending_ats.push(pending_at);
3167                }
3168                if let Some(finished) = notifier.execution_finished {
3169                    finished_execs.push(finished);
3170                }
3171                if let Some(response) = notifier.response {
3172                    responses.push(response);
3173                }
3174            }
3175            (pending_ats, finished_execs, responses)
3176        };
3177
3178        // Notify pending_at subscribers.
3179        if !pending_ats.is_empty() {
3180            let guard = self.pending_subscribers.lock().unwrap();
3181            for pending_at in pending_ats {
3182                notify_pending_locked(&pending_at, current_time, &guard);
3183            }
3184        }
3185        // Notify execution finished subscribers.
3186        if !finished_execs.is_empty() {
3187            let mut guard = self.execution_finished_subscribers.lock().unwrap();
3188            for finished in finished_execs {
3189                if let Some(listeners_of_exe_id) = guard.remove(&finished.execution_id) {
3190                    for (_tag, sender) in listeners_of_exe_id {
3191                        let _ = sender.send(finished.retval.clone());
3192                    }
3193                }
3194            }
3195        }
3196        // Notify response subscribers.
3197        if !responses.is_empty() {
3198            let mut guard = self.response_subscribers.lock().unwrap();
3199            for (execution_id, response) in responses {
3200                if let Some((sender, _)) = guard.remove(&execution_id) {
3201                    let _ = sender.send(response);
3202                }
3203            }
3204        }
3205    }
3206}
3207
3208#[async_trait]
3209impl DbExecutor for PostgresConnection {
3210    #[instrument(level = Level::TRACE, skip(self))]
3211    async fn lock_pending_by_ffqns(
3212        &self,
3213        batch_size: u32,
3214        pending_at_or_sooner: DateTime<Utc>,
3215        ffqns: Arc<[FunctionFqn]>,
3216        created_at: DateTime<Utc>,
3217        component_id: ComponentId,
3218        deployment_id: DeploymentId,
3219        executor_id: ExecutorId,
3220        lock_expires_at: DateTime<Utc>,
3221        run_id: RunId,
3222        retry_config: ComponentRetryConfig,
3223    ) -> Result<LockPendingResponse, DbErrorWrite> {
3224        let mut client_guard = self.client.lock().await;
3225        let tx = client_guard.transaction().await?;
3226
3227        let execution_ids_versions = get_pending_by_ffqns(
3228            &tx,
3229            batch_size,
3230            pending_at_or_sooner,
3231            &ffqns,
3232            SelectStrategy::LockForUpdate,
3233        )
3234        .await?;
3235
3236        if execution_ids_versions.is_empty() {
3237            // Commit is required to release the connection state cleanly,
3238            // though rollback/drop works too for read-only.
3239            tx.commit().await?;
3240            return Ok(vec![]);
3241        }
3242
3243        debug!("Locking {execution_ids_versions:?}");
3244
3245        // Lock using the same transaction
3246        let mut locked_execs = Vec::with_capacity(execution_ids_versions.len());
3247        for (execution_id, version) in execution_ids_versions {
3248            match lock_single_execution(
3249                &tx,
3250                created_at,
3251                &component_id,
3252                deployment_id,
3253                &execution_id,
3254                run_id,
3255                &version,
3256                executor_id,
3257                lock_expires_at,
3258                retry_config,
3259            )
3260            .await
3261            {
3262                Ok(locked) => locked_execs.push(locked),
3263                Err(err) => {
3264                    tx.rollback().await?; // lock_single_execution performs multiple writes
3265                    debug!("Locking row {execution_id} failed - {err:?}");
3266                    return Err(err);
3267                }
3268            }
3269        }
3270
3271        tx.commit().await?;
3272
3273        Ok(locked_execs)
3274    }
3275
3276    #[instrument(level = Level::TRACE, skip(self))]
3277    async fn lock_pending_by_component_digest(
3278        &self,
3279        batch_size: u32,
3280        pending_at_or_sooner: DateTime<Utc>,
3281        component_id: &ComponentId,
3282        deployment_id: DeploymentId,
3283        created_at: DateTime<Utc>,
3284        executor_id: ExecutorId,
3285        lock_expires_at: DateTime<Utc>,
3286        run_id: RunId,
3287        retry_config: ComponentRetryConfig,
3288    ) -> Result<LockPendingResponse, DbErrorWrite> {
3289        let mut client_guard = self.client.lock().await;
3290        let tx = client_guard.transaction().await?;
3291
3292        let execution_ids_versions = get_pending_by_component_input_digest(
3293            &tx,
3294            batch_size,
3295            pending_at_or_sooner,
3296            &component_id.component_digest,
3297            SelectStrategy::LockForUpdate,
3298        )
3299        .await?;
3300
3301        if execution_ids_versions.is_empty() {
3302            tx.commit().await?;
3303            return Ok(vec![]);
3304        }
3305
3306        debug!("Locking {execution_ids_versions:?}");
3307
3308        let mut locked_execs = Vec::with_capacity(execution_ids_versions.len());
3309        for (execution_id, version) in execution_ids_versions {
3310            match lock_single_execution(
3311                &tx,
3312                created_at,
3313                component_id,
3314                deployment_id,
3315                &execution_id,
3316                run_id,
3317                &version,
3318                executor_id,
3319                lock_expires_at,
3320                retry_config,
3321            )
3322            .await
3323            {
3324                Ok(locked) => locked_execs.push(locked),
3325                Err(err) => {
3326                    tx.rollback().await?; // lock_single_execution performs multiple writes
3327                    debug!("Locking row {execution_id} failed - {err:?}");
3328                    return Err(err);
3329                }
3330            }
3331        }
3332
3333        tx.commit().await?;
3334        Ok(locked_execs)
3335    }
3336
3337    #[cfg(feature = "test")]
3338    #[instrument(level = Level::DEBUG, skip(self))]
3339    async fn lock_one(
3340        &self,
3341        created_at: DateTime<Utc>,
3342        component_id: ComponentId,
3343        deployment_id: DeploymentId,
3344        execution_id: &ExecutionId,
3345        run_id: RunId,
3346        version: Version,
3347        executor_id: ExecutorId,
3348        lock_expires_at: DateTime<Utc>,
3349        retry_config: ComponentRetryConfig,
3350    ) -> Result<LockedExecution, DbErrorWrite> {
3351        debug!(%execution_id, "lock_one");
3352        let mut client_guard = self.client.lock().await;
3353        let tx = client_guard.transaction().await?;
3354
3355        let res = lock_single_execution(
3356            &tx,
3357            created_at,
3358            &component_id,
3359            deployment_id,
3360            execution_id,
3361            run_id,
3362            &version,
3363            executor_id,
3364            lock_expires_at,
3365            retry_config,
3366        )
3367        .await?;
3368
3369        tx.commit().await?;
3370        Ok(res)
3371    }
3372
3373    #[instrument(level = Level::DEBUG, skip(self, req))]
3374    async fn append(
3375        &self,
3376        execution_id: ExecutionId,
3377        version: Version,
3378        req: AppendRequest,
3379    ) -> Result<AppendResponse, DbErrorWrite> {
3380        debug!(%req, "append");
3381        trace!(?req, "append");
3382        let created_at = req.created_at;
3383
3384        let mut client_guard = self.client.lock().await;
3385        let tx = client_guard.transaction().await?;
3386
3387        let (new_version, notifier) = append(&tx, &execution_id, req, version).await?;
3388
3389        tx.commit().await?;
3390
3391        // Explicitly drop guard (optional, happens at end of scope anyway)
3392        drop(client_guard);
3393
3394        self.notify_all(vec![notifier], created_at);
3395        Ok(new_version)
3396    }
3397
3398    #[instrument(level = Level::DEBUG, skip_all)]
3399    async fn append_batch_respond_to_parent(
3400        &self,
3401        events: AppendEventsToExecution,
3402        response: AppendResponseToExecution,
3403        current_time: DateTime<Utc>,
3404    ) -> Result<AppendBatchResponse, DbErrorWrite> {
3405        debug!("append_batch_respond_to_parent");
3406        if events.execution_id == response.parent_execution_id {
3407            return Err(DbErrorWrite::NonRetriable(
3408                DbErrorWriteNonRetriable::ValidationFailed(
3409                    "Parameters `execution_id` and `parent_execution_id` cannot be the same".into(),
3410                ),
3411            ));
3412        }
3413        if events.batch.is_empty() {
3414            return Err(DbErrorWrite::NonRetriable(
3415                DbErrorWriteNonRetriable::ValidationFailed("batch cannot be empty".into()),
3416            ));
3417        }
3418
3419        let mut client_guard = self.client.lock().await;
3420        let tx = client_guard.transaction().await?;
3421
3422        let mut version = events.version;
3423        let mut notifiers = Vec::new();
3424
3425        for append_request in events.batch {
3426            let (v, n) = append(&tx, &events.execution_id, append_request, version).await?;
3427            version = v;
3428            notifiers.push(n);
3429        }
3430
3431        let pending_at_parent = append_response(
3432            &tx,
3433            &response.parent_execution_id,
3434            JoinSetResponseEventOuter {
3435                created_at: response.created_at,
3436                event: JoinSetResponseEvent {
3437                    join_set_id: response.join_set_id,
3438                    event: JoinSetResponse::ChildExecutionFinished {
3439                        child_execution_id: response.child_execution_id,
3440                        finished_version: response.finished_version,
3441                        result: response.result,
3442                    },
3443                },
3444            },
3445        )
3446        .await?;
3447        notifiers.push(pending_at_parent);
3448
3449        tx.commit().await?;
3450        drop(client_guard);
3451
3452        self.notify_all(notifiers, current_time);
3453        Ok(version)
3454    }
3455
3456    #[instrument(level = Level::TRACE, skip(self, timeout_fut))]
3457    async fn wait_for_pending_by_ffqn(
3458        &self,
3459        pending_at_or_sooner: DateTime<Utc>,
3460        ffqns: Arc<[FunctionFqn]>,
3461        timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
3462    ) {
3463        let unique_tag: u64 = rand::random();
3464        let (sender, mut receiver) = mpsc::channel(1);
3465        {
3466            let mut pending_subscribers = self.pending_subscribers.lock().unwrap();
3467            for ffqn in ffqns.as_ref() {
3468                pending_subscribers.insert_ffqn(ffqn.clone(), (sender.clone(), unique_tag));
3469            }
3470        }
3471
3472        async {
3473            let mut db_has_pending = false;
3474            {
3475                // Scope the lock so we don't hold it while waiting for timeout
3476                let mut client_guard = self.client.lock().await;
3477                // Read-only transaction check
3478                if let Ok(tx) = client_guard.transaction().await {
3479                    if let Ok(res) = get_pending_by_ffqns(
3480                        &tx,
3481                        1,
3482                        pending_at_or_sooner,
3483                        &ffqns,
3484                        SelectStrategy::Read,
3485                    )
3486                    .await
3487                        && !res.is_empty()
3488                    {
3489                        db_has_pending = true;
3490                    }
3491                    // Commit/Rollback read transaction
3492                    let _ = tx.commit().await;
3493                }
3494            }
3495
3496            if db_has_pending {
3497                trace!("Not waiting, database already contains new pending executions");
3498                return;
3499            }
3500
3501            tokio::select! {
3502                _ = receiver.recv() => {
3503                    trace!("Received a notification");
3504                }
3505                () = timeout_fut => {
3506                }
3507            }
3508        }
3509        .await;
3510
3511        // Cleanup
3512        {
3513            let mut pending_subscribers = self.pending_subscribers.lock().unwrap();
3514            for ffqn in ffqns.as_ref() {
3515                match pending_subscribers.remove_ffqn(ffqn) {
3516                    Some((_, tag)) if tag == unique_tag => {}
3517                    Some(other) => {
3518                        pending_subscribers.insert_ffqn(ffqn.clone(), other);
3519                    }
3520                    None => {}
3521                }
3522            }
3523        }
3524    }
3525
3526    #[instrument(level = Level::DEBUG, skip(self, timeout_fut))]
3527    async fn wait_for_pending_by_component_digest(
3528        &self,
3529        pending_at_or_sooner: DateTime<Utc>,
3530        component_digest: &ComponentDigest,
3531        timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
3532    ) {
3533        let unique_tag: u64 = rand::random();
3534        let (sender, mut receiver) = mpsc::channel(1);
3535        {
3536            let mut pending_subscribers = self.pending_subscribers.lock().unwrap();
3537            pending_subscribers
3538                .insert_by_component(component_digest.clone(), (sender.clone(), unique_tag));
3539        }
3540
3541        async {
3542            let mut db_has_pending = false;
3543            {
3544                let mut client_guard = self.client.lock().await;
3545                if let Ok(tx) = client_guard.transaction().await {
3546                    if let Ok(res) = get_pending_by_component_input_digest(
3547                        &tx,
3548                        1,
3549                        pending_at_or_sooner,
3550                        component_digest,
3551                        SelectStrategy::Read,
3552                    )
3553                    .await
3554                        && !res.is_empty()
3555                    {
3556                        db_has_pending = true;
3557                    }
3558                    let _ = tx.commit().await;
3559                }
3560            }
3561
3562            if db_has_pending {
3563                trace!("Not waiting, database already contains new pending executions");
3564                return;
3565            }
3566
3567            tokio::select! {
3568                _ = receiver.recv() => {
3569                    trace!("Received a notification");
3570                }
3571                () = timeout_fut => {
3572                }
3573            }
3574        }
3575        .await;
3576
3577        // Cleanup
3578        {
3579            let mut pending_subscribers = self.pending_subscribers.lock().unwrap();
3580            match pending_subscribers.remove_by_component(component_digest) {
3581                Some((_, tag)) if tag == unique_tag => {}
3582                Some(other) => {
3583                    pending_subscribers.insert_by_component(component_digest.clone(), other);
3584                }
3585                None => {}
3586            }
3587        }
3588    }
3589
3590    async fn get_last_execution_event(
3591        &self,
3592        execution_id: &ExecutionId,
3593    ) -> Result<ExecutionEvent, DbErrorRead> {
3594        let mut client_guard = self.client.lock().await;
3595        let tx = client_guard.transaction().await?;
3596
3597        let event = get_last_execution_event(&tx, execution_id).await?;
3598
3599        tx.commit().await?;
3600        Ok(event)
3601    }
3602}
3603#[async_trait]
3604impl DbConnection for PostgresConnection {
3605    #[instrument(level = Level::DEBUG, skip_all, fields(execution_id = %req.execution_id))]
3606    async fn create(&self, req: CreateRequest) -> Result<AppendResponse, DbErrorWrite> {
3607        debug!("create");
3608        trace!(?req, "create");
3609        let created_at = req.created_at;
3610
3611        let mut client_guard = self.client.lock().await;
3612        let tx = client_guard.transaction().await?;
3613
3614        let (version, notifier) = create_inner(&tx, req.clone()).await?;
3615
3616        tx.commit().await?;
3617        drop(client_guard); // Release DB lock before notifying
3618
3619        self.notify_all(vec![notifier], created_at);
3620        Ok(version)
3621    }
3622
3623    #[instrument(level = Level::DEBUG, skip(self))]
3624    async fn get(
3625        &self,
3626        execution_id: &ExecutionId,
3627    ) -> Result<concepts::storage::ExecutionLog, DbErrorRead> {
3628        trace!("get");
3629        let mut client_guard = self.client.lock().await;
3630        let tx = client_guard.transaction().await?;
3631
3632        let res = get_execution_log(&tx, execution_id).await?;
3633
3634        tx.commit().await?;
3635        Ok(res)
3636    }
3637
3638    #[instrument(level = Level::DEBUG, skip(self, batch))]
3639    async fn append_batch(
3640        &self,
3641        current_time: DateTime<Utc>,
3642        batch: Vec<AppendRequest>,
3643        execution_id: ExecutionId,
3644        version: Version,
3645    ) -> Result<AppendBatchResponse, DbErrorWrite> {
3646        debug!("append_batch");
3647        trace!(?batch, "append_batch");
3648        assert!(!batch.is_empty(), "Empty batch request");
3649
3650        let mut client_guard = self.client.lock().await;
3651        let tx = client_guard.transaction().await?;
3652
3653        let mut version = version;
3654        let mut notifier = None;
3655
3656        for append_request in batch {
3657            let (v, n) = append(&tx, &execution_id, append_request, version).await?;
3658            version = v;
3659            notifier = Some(n);
3660        }
3661
3662        tx.commit().await?;
3663        drop(client_guard);
3664
3665        self.notify_all(
3666            vec![notifier.expect("checked that the batch is not empty")],
3667            current_time,
3668        );
3669        Ok(version)
3670    }
3671
3672    #[instrument(level = Level::DEBUG, skip_all, fields(%execution_id, %version))]
3673    async fn append_batch_create_new_execution(
3674        &self,
3675        current_time: DateTime<Utc>,
3676        batch: Vec<AppendRequest>,
3677        execution_id: ExecutionId,
3678        version: Version,
3679        child_req: Vec<CreateRequest>,
3680        backtraces: Vec<BacktraceInfo>,
3681    ) -> Result<AppendBatchResponse, DbErrorWrite> {
3682        debug!("append_batch_create_new_execution");
3683        trace!(?batch, ?child_req, "append_batch_create_new_execution");
3684        assert!(!batch.is_empty(), "Empty batch request");
3685
3686        let mut client_guard = self.client.lock().await;
3687        let tx = client_guard.transaction().await?;
3688
3689        let mut version = version;
3690        let mut notifier = None;
3691
3692        for append_request in batch {
3693            let (v, n) = append(&tx, &execution_id, append_request, version).await?;
3694            version = v;
3695            notifier = Some(n);
3696        }
3697
3698        let mut notifiers = Vec::new();
3699        notifiers.push(notifier.expect("checked that the batch is not empty"));
3700
3701        for req in child_req {
3702            let (_, n) = create_inner(&tx, req).await?;
3703            notifiers.push(n);
3704        }
3705        for backtrace in backtraces {
3706            append_backtrace(&tx, &backtrace).await?;
3707        }
3708        tx.commit().await?;
3709        drop(client_guard);
3710
3711        self.notify_all(notifiers, current_time);
3712        Ok(version)
3713    }
3714
3715    #[instrument(level = Level::DEBUG, skip(self, timeout_fut))]
3716    async fn subscribe_to_next_responses(
3717        &self,
3718        execution_id: &ExecutionId,
3719        last_response: ResponseCursor,
3720        timeout_fut: Pin<Box<dyn Future<Output = TimeoutOutcome> + Send>>,
3721    ) -> Result<Vec<ResponseWithCursor>, DbErrorReadWithTimeout> {
3722        debug!("next_responses");
3723        let unique_tag: u64 = rand::random();
3724        let execution_id_clone = execution_id.clone();
3725
3726        let cleanup = || {
3727            let mut guard = self.response_subscribers.lock().unwrap();
3728            match guard.remove(&execution_id_clone) {
3729                Some((_, tag)) if tag == unique_tag => {}
3730                Some(other) => {
3731                    guard.insert(execution_id_clone.clone(), other);
3732                }
3733                None => {}
3734            }
3735        };
3736
3737        let receiver = {
3738            let mut client_guard = self.client.lock().await;
3739            let tx = client_guard.transaction().await?;
3740
3741            // Register listener before fetching from database.
3742            // This is a best-effort mechanism that shortens the polling time, if it
3743            // does not detect the response it will sleep using `timeout_fut`. Consumers
3744            // are expected to poll this function in a loop.
3745            // Currently the notification mechanism only works on a single node deployment.
3746            let (sender, receiver) = oneshot::channel();
3747            self.response_subscribers
3748                .lock()
3749                .unwrap()
3750                .insert(execution_id.clone(), (sender, unique_tag));
3751
3752            let responses = get_responses_after(&tx, execution_id, last_response).await?;
3753
3754            if responses.is_empty() {
3755                // Commit read transaction
3756                tx.commit().await.map_err(|err| {
3757                    cleanup(); // Remove the just inserted subscriber.
3758                    DbErrorRead::from(err)
3759                })?;
3760                receiver
3761            } else {
3762                cleanup(); // Remove the just inserted subscriber as we already have the answer.
3763                tx.commit().await?;
3764                return Ok(responses);
3765            }
3766        };
3767
3768        let res = tokio::select! {
3769            resp = receiver => {
3770                match resp {
3771                    Ok(resp) => Ok(vec![resp]),
3772                    Err(_) => Err(DbErrorReadWithTimeout::from(DbErrorGeneric::Close)),
3773                }
3774            }
3775            outcome = timeout_fut => Err(DbErrorReadWithTimeout::Timeout(outcome)),
3776        };
3777
3778        cleanup();
3779        res
3780    }
3781
3782    #[instrument(level = Level::DEBUG, skip(self, timeout_fut))]
3783    async fn wait_for_finished_result(
3784        &self,
3785        execution_id: &ExecutionId,
3786        timeout_fut: Option<Pin<Box<dyn Future<Output = TimeoutOutcome> + Send>>>,
3787    ) -> Result<SupportedFunctionReturnValue, DbErrorReadWithTimeout> {
3788        let unique_tag: u64 = rand::random();
3789        let execution_id_clone = execution_id.clone();
3790
3791        let cleanup = || {
3792            let mut guard = self.execution_finished_subscribers.lock().unwrap();
3793            if let Some(subscribers) = guard.get_mut(&execution_id_clone) {
3794                subscribers.remove(&unique_tag);
3795            }
3796        };
3797
3798        let receiver = {
3799            let mut client_guard = self.client.lock().await;
3800            let tx = client_guard.transaction().await?;
3801
3802            // Register listener
3803            let (sender, receiver) = oneshot::channel();
3804            {
3805                let mut guard = self.execution_finished_subscribers.lock().unwrap();
3806                guard
3807                    .entry(execution_id.clone())
3808                    .or_default()
3809                    .insert(unique_tag, sender);
3810            }
3811
3812            let pending_state = get_combined_state(&tx, execution_id)
3813                .await?
3814                .execution_with_state
3815                .pending_state;
3816
3817            if let PendingState::Finished(finished) = pending_state {
3818                let event = get_execution_event(&tx, execution_id, finished.version).await?;
3819                tx.commit().await?;
3820                cleanup();
3821
3822                if let ExecutionRequest::Finished { retval, .. } = event.event {
3823                    return Ok(retval);
3824                }
3825                error!("Mismatch, expected Finished row: {event:?} based on t_state {finished}");
3826                return Err(DbErrorReadWithTimeout::from(consistency_db_err(
3827                    "cannot get finished event based on t_state version",
3828                )));
3829            }
3830            tx.commit().await?;
3831            receiver
3832        };
3833
3834        let timeout_fut = timeout_fut.unwrap_or_else(|| Box::pin(std::future::pending()));
3835        let res = tokio::select! {
3836            resp = receiver => {
3837                match resp {
3838                    Ok(retval) => Ok(retval),
3839                    Err(_recv_err) => Err(DbErrorGeneric::Close.into())
3840                }
3841            }
3842            outcome = timeout_fut => Err(DbErrorReadWithTimeout::Timeout(outcome)),
3843        };
3844
3845        cleanup();
3846        res
3847    }
3848
3849    #[instrument(level = Level::DEBUG, skip_all, fields(%join_set_id, %execution_id))]
3850    async fn append_delay_response(
3851        &self,
3852        created_at: DateTime<Utc>,
3853        execution_id: ExecutionId,
3854        join_set_id: JoinSetId,
3855        delay_id: DelayId,
3856        result: Result<(), ()>,
3857    ) -> Result<AppendDelayResponseOutcome, DbErrorWrite> {
3858        debug!("append_delay_response");
3859        let event = JoinSetResponseEventOuter {
3860            created_at,
3861            event: JoinSetResponseEvent {
3862                join_set_id,
3863                event: JoinSetResponse::DelayFinished {
3864                    delay_id: delay_id.clone(),
3865                    result,
3866                },
3867            },
3868        };
3869
3870        let mut client_guard = self.client.lock().await;
3871        let tx = client_guard.transaction().await?;
3872
3873        let res = append_response(&tx, &execution_id, event).await;
3874
3875        match res {
3876            Ok(notifier) => {
3877                tx.commit().await?;
3878                drop(client_guard);
3879                self.notify_all(vec![notifier], created_at);
3880                Ok(AppendDelayResponseOutcome::Success)
3881            }
3882            Err(DbErrorWrite::NonRetriable(DbErrorWriteNonRetriable::Conflict)) => {
3883                // Check if already finished
3884                // Roll back the failed tx, start new read tx.
3885                tx.rollback().await?;
3886
3887                // Start new tx for check
3888                let tx = client_guard.transaction().await?;
3889                let delay_success = delay_response(&tx, &execution_id, &delay_id).await?;
3890                tx.commit().await?;
3891
3892                match delay_success {
3893                    Some(true) => Ok(AppendDelayResponseOutcome::AlreadyFinished),
3894                    Some(false) => Ok(AppendDelayResponseOutcome::AlreadyCancelled),
3895                    None => Err(DbErrorWrite::Generic(consistency_db_err(
3896                        "insert failed yet select did not find the response",
3897                    ))),
3898                }
3899            }
3900            Err(err) => {
3901                let _ = tx.rollback().await; // cleanup
3902                Err(err)
3903            }
3904        }
3905    }
3906
3907    #[instrument(level = Level::DEBUG, skip_all)]
3908    async fn append_backtrace(&self, append: BacktraceInfo) -> Result<(), DbErrorWrite> {
3909        debug!("append_backtrace");
3910        let mut client_guard = self.client.lock().await;
3911        let tx = client_guard.transaction().await?;
3912
3913        append_backtrace(&tx, &append).await?;
3914
3915        tx.commit().await?;
3916        Ok(())
3917    }
3918
3919    #[instrument(level = Level::DEBUG, skip_all)]
3920    async fn append_backtrace_batch(&self, batch: Vec<BacktraceInfo>) -> Result<(), DbErrorWrite> {
3921        debug!("append_backtrace_batch");
3922        let mut client_guard = self.client.lock().await;
3923        let tx = client_guard.transaction().await?;
3924
3925        for append in batch {
3926            append_backtrace(&tx, &append).await?;
3927        }
3928
3929        tx.commit().await?;
3930        Ok(())
3931    }
3932
3933    #[instrument(level = Level::DEBUG, skip_all)]
3934    async fn append_log(&self, row: LogInfoAppendRow) -> Result<(), DbErrorWrite> {
3935        trace!("append_log");
3936        let mut client_guard = self.client.lock().await;
3937        let tx = client_guard.transaction().await?;
3938        append_log(&tx, &row).await?;
3939        tx.commit().await?;
3940
3941        Ok(())
3942    }
3943
3944    #[instrument(level = Level::DEBUG, skip_all)]
3945    async fn append_log_batch(&self, batch: &[LogInfoAppendRow]) -> Result<(), DbErrorWrite> {
3946        trace!("append_log_batch");
3947        let mut client_guard = self.client.lock().await;
3948        let tx = client_guard.transaction().await?;
3949        for row in batch {
3950            append_log(&tx, row).await?;
3951        }
3952        tx.commit().await?;
3953        Ok(())
3954    }
3955
3956    /// Get currently expired delays and locks.
3957    #[instrument(level = Level::TRACE, skip(self))]
3958    async fn get_expired_timers(
3959        &self,
3960        at: DateTime<Utc>,
3961    ) -> Result<Vec<ExpiredTimer>, DbErrorGeneric> {
3962        let mut client_guard = self.client.lock().await;
3963        let tx = client_guard.transaction().await?;
3964
3965        // Expired Delays
3966        let rows = tx
3967            .query(
3968                "SELECT execution_id, join_set_id, delay_id FROM t_delay WHERE expires_at <= $1",
3969                &[&at],
3970            )
3971            .await?;
3972
3973        let mut expired_timers = Vec::with_capacity(rows.len());
3974        for row in rows {
3975            let unpack = || -> Result<ExpiredTimer, DbErrorGeneric> {
3976                let execution_id: String = get(&row, "execution_id")?;
3977                let execution_id = ExecutionId::from_str(&execution_id)?;
3978                let join_set_id: String = get(&row, "join_set_id")?;
3979                let join_set_id = JoinSetId::from_str(&join_set_id)?;
3980                let delay_id: String = get(&row, "delay_id")?;
3981                let delay_id = DelayId::from_str(&delay_id)?;
3982
3983                Ok(ExpiredTimer::Delay(ExpiredDelay {
3984                    execution_id,
3985                    join_set_id,
3986                    delay_id,
3987                }))
3988            };
3989
3990            match unpack() {
3991                Ok(timer) => expired_timers.push(timer),
3992                Err(err) => warn!("Skipping corrupted row in get_expired_timers (delays): {err:?}"),
3993            }
3994        }
3995
3996        // Expired Locks
3997        let rows = tx.query(
3998            &format!(
3999                "SELECT execution_id, last_lock_version, corresponding_version, intermittent_event_count, max_retries, retry_exp_backoff_millis, executor_id, run_id \
4000                 FROM t_state \
4001                 WHERE pending_expires_finished <= $1 AND state = '{STATE_LOCKED}'"
4002            ),
4003            &[&at]
4004        ).await?;
4005
4006        for row in rows {
4007            let unpack = || -> Result<ExpiredTimer, DbErrorGeneric> {
4008                let execution_id: String = get(&row, "execution_id")?;
4009                let execution_id = ExecutionId::from_str(&execution_id)?;
4010                let last_lock_version: i64 = get(&row, "last_lock_version")?;
4011                let last_lock_version = Version::try_from(last_lock_version)?;
4012
4013                let corresponding_version: i64 = get(&row, "corresponding_version")?;
4014                let corresponding_version = Version::try_from(corresponding_version)?;
4015
4016                let intermittent_event_count =
4017                    u32::try_from(get::<i64, _>(&row, "intermittent_event_count")?).map_err(
4018                        |_| consistency_db_err("`intermittent_event_count` must not be negative"),
4019                    )?;
4020
4021                let max_retries = get::<Option<i64>, _>(&row, "max_retries")?
4022                    .map(u32::try_from)
4023                    .transpose()
4024                    .map_err(|_| consistency_db_err("`max_retries` must not be negative"))?;
4025                let retry_exp_backoff_millis =
4026                    u32::try_from(get::<i64, _>(&row, "retry_exp_backoff_millis")?).map_err(
4027                        |_| consistency_db_err("`retry_exp_backoff_millis` must not be negative"),
4028                    )?;
4029                let executor_id: String = get(&row, "executor_id")?;
4030                let executor_id = ExecutorId::from_str(&executor_id)?;
4031                let run_id: String = get(&row, "run_id")?;
4032                let run_id = RunId::from_str(&run_id)?;
4033
4034                Ok(ExpiredTimer::Lock(ExpiredLock {
4035                    execution_id,
4036                    locked_at_version: last_lock_version,
4037                    next_version: corresponding_version.increment(),
4038                    intermittent_event_count,
4039                    max_retries,
4040                    retry_exp_backoff: Duration::from_millis(u64::from(retry_exp_backoff_millis)),
4041                    locked_by: LockedBy {
4042                        executor_id,
4043                        run_id,
4044                    },
4045                }))
4046            };
4047
4048            match unpack() {
4049                Ok(timer) => expired_timers.push(timer),
4050                Err(err) => warn!("Skipping corrupted row in get_expired_timers (locks): {err:?}"),
4051            }
4052        }
4053
4054        tx.commit().await?;
4055
4056        if !expired_timers.is_empty() {
4057            debug!("get_expired_timers found {expired_timers:?}");
4058        }
4059        Ok(expired_timers)
4060    }
4061
4062    async fn get_execution_event(
4063        &self,
4064        execution_id: &ExecutionId,
4065        version: &Version,
4066    ) -> Result<ExecutionEvent, DbErrorRead> {
4067        let mut client_guard = self.client.lock().await;
4068        let tx = client_guard.transaction().await?;
4069
4070        let event = get_execution_event(&tx, execution_id, version.0).await?;
4071
4072        tx.commit().await?;
4073        Ok(event)
4074    }
4075
4076    async fn get_pending_state(
4077        &self,
4078        execution_id: &ExecutionId,
4079    ) -> Result<ExecutionWithState, DbErrorRead> {
4080        let mut client_guard = self.client.lock().await;
4081        let tx = client_guard.transaction().await?;
4082
4083        let combined_state = get_combined_state(&tx, execution_id).await?;
4084
4085        tx.commit().await?;
4086        Ok(combined_state.execution_with_state)
4087    }
4088}
4089
4090#[async_trait]
4091impl DbExternalApi for PostgresConnection {
4092    #[instrument(skip(self))]
4093    async fn get_backtrace(
4094        &self,
4095        execution_id: &ExecutionId,
4096        filter: BacktraceFilter,
4097    ) -> Result<BacktraceInfo, DbErrorRead> {
4098        debug!("get_backtrace");
4099
4100        let mut client_guard = self.client.lock().await;
4101        let tx = client_guard.transaction().await?;
4102
4103        let mut params: Vec<Box<dyn tokio_postgres::types::ToSql + Sync + Send>> = Vec::new();
4104
4105        params.push(Box::new(execution_id.to_string())); // $1
4106        let p_execution_id_idx = format!("${}", params.len()); // $1
4107
4108        let mut sql = String::new();
4109        write!(
4110            &mut sql,
4111            "SELECT component_id, version_min_including, version_max_excluding, wasm_backtrace \
4112            FROM t_execution_backtrace e INNER JOIN t_wasm_backtrace w ON e.backtrace_hash = w.backtrace_hash \
4113            WHERE execution_id = {p_execution_id_idx}"
4114        )
4115        .unwrap();
4116
4117        match &filter {
4118            BacktraceFilter::Specific(version) => {
4119                params.push(Box::new(i64::from(version.0))); // $2
4120                let p_ver_idx = format!("${}", params.len()); // $2
4121                write!(
4122                    &mut sql,
4123                    " AND version_min_including <= {p_ver_idx} AND version_max_excluding > {p_ver_idx}"
4124                )
4125                .unwrap();
4126            }
4127            BacktraceFilter::First => {
4128                sql.push_str(" ORDER BY version_min_including LIMIT 1");
4129            }
4130            BacktraceFilter::Last => {
4131                sql.push_str(" ORDER BY version_min_including DESC LIMIT 1");
4132            }
4133        }
4134
4135        let params_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> =
4136            params.iter().map(|p| p.as_ref() as _).collect();
4137
4138        let row = tx.query_one(&sql, &params_refs).await?;
4139
4140        let component_id: Json<ComponentId> = get(&row, "component_id")?;
4141        let component_id = component_id.0;
4142
4143        let version_min_including =
4144            Version::try_from(get::<i64, _>(&row, "version_min_including")?)?;
4145
4146        let version_max_excluding =
4147            Version::try_from(get::<i64, _>(&row, "version_max_excluding")?)?;
4148
4149        // wasm_backtrace stored as JSONB
4150        let wasm_backtrace: Json<WasmBacktrace> = get(&row, "wasm_backtrace")?;
4151        let wasm_backtrace = wasm_backtrace.0;
4152
4153        tx.commit().await?;
4154
4155        Ok(BacktraceInfo {
4156            execution_id: execution_id.clone(),
4157            component_id,
4158            version_min_including,
4159            version_max_excluding,
4160            wasm_backtrace,
4161        })
4162    }
4163
4164    #[instrument(skip_all)]
4165    async fn upsert_source_file(
4166        &self,
4167        component_digest: &ComponentDigest,
4168        frame_key: &str,
4169        is_suffix: bool,
4170        content: &str,
4171    ) -> Result<(), DbErrorWrite> {
4172        let content_hash: [u8; 32] = Sha256::digest(content.as_bytes()).into();
4173        let mut client_guard = self.client.lock().await;
4174        let tx = client_guard.transaction().await?;
4175        tx.execute(
4176            "INSERT INTO t_source_file (content_hash, content) \
4177             VALUES ($1, $2) \
4178             ON CONFLICT (content_hash) DO NOTHING",
4179            &[&content_hash.as_slice(), &content],
4180        )
4181        .await?;
4182        tx.execute(
4183            "INSERT INTO t_component_source \
4184             (component_digest, frame_key, is_suffix, content_hash) \
4185             VALUES ($1, $2, $3, $4) \
4186             ON CONFLICT (component_digest, frame_key, is_suffix) DO NOTHING",
4187            &[
4188                &component_digest.as_slice(),
4189                &frame_key,
4190                &is_suffix,
4191                &content_hash.as_slice(),
4192            ],
4193        )
4194        .await?;
4195        tx.commit().await?;
4196        Ok(())
4197    }
4198
4199    #[instrument(skip_all)]
4200    async fn get_source_file(
4201        &self,
4202        component_digest: &ComponentDigest,
4203        file: &str,
4204    ) -> Result<Option<String>, DbErrorRead> {
4205        let mut client_guard = self.client.lock().await;
4206        let tx = client_guard.transaction().await?;
4207        let rows = tx
4208            .query(
4209                "SELECT s.content \
4210                 FROM t_component_source cs \
4211                 JOIN t_source_file s ON cs.content_hash = s.content_hash \
4212                 WHERE cs.component_digest = $1 \
4213                   AND ( \
4214                       (NOT cs.is_suffix AND cs.frame_key = $2) \
4215                    OR (cs.is_suffix AND right($2, length(cs.frame_key)) = cs.frame_key) \
4216                   )",
4217                &[&component_digest.as_slice(), &file],
4218            )
4219            .await?;
4220        tx.commit().await?;
4221        match rows.len() {
4222            0 => Ok(None),
4223            1 => Ok(Some(get::<String, _>(&rows[0], "content")?)),
4224            _ => {
4225                warn!("Multiple suffix matches for '{file}', returning None");
4226                Ok(None)
4227            }
4228        }
4229    }
4230
4231    #[instrument(skip(self))]
4232    async fn list_executions(
4233        &self,
4234        filter: ListExecutionsFilter,
4235        pagination: ExecutionListPagination,
4236    ) -> Result<Vec<ExecutionWithState>, DbErrorGeneric> {
4237        let mut client_guard = self.client.lock().await;
4238        let tx = client_guard.transaction().await?;
4239
4240        let result = list_executions(&tx, filter, &pagination).await?;
4241
4242        tx.commit().await?;
4243        Ok(result)
4244    }
4245
4246    #[instrument(skip(self))]
4247    async fn list_execution_events(
4248        &self,
4249        execution_id: &ExecutionId,
4250        pagination: Pagination<VersionType>,
4251        include_backtrace_id: bool,
4252    ) -> Result<ListExecutionEventsResponse, DbErrorRead> {
4253        let mut client_guard = self.client.lock().await;
4254        let tx = client_guard.transaction().await?;
4255
4256        let events =
4257            list_execution_events(&tx, execution_id, pagination, include_backtrace_id).await?;
4258        let max_version = get_max_version(&tx, execution_id).await?;
4259
4260        tx.commit().await?;
4261        Ok(ListExecutionEventsResponse {
4262            events,
4263            max_version,
4264        })
4265    }
4266
4267    #[instrument(skip(self))]
4268    async fn list_responses(
4269        &self,
4270        execution_id: &ExecutionId,
4271        pagination: Pagination<u32>,
4272    ) -> Result<ListResponsesResponse, DbErrorRead> {
4273        let mut client_guard = self.client.lock().await;
4274        let tx = client_guard.transaction().await?;
4275
4276        let responses = list_responses(&tx, execution_id, Some(pagination)).await?;
4277        let max_cursor = get_max_response_cursor(&tx, execution_id).await?;
4278
4279        tx.commit().await?;
4280        Ok(ListResponsesResponse {
4281            responses,
4282            max_cursor,
4283        })
4284    }
4285
4286    #[instrument(skip(self))]
4287    async fn list_execution_events_responses(
4288        &self,
4289        execution_id: &ExecutionId,
4290        req_since: &Version,
4291        req_max_length: VersionType,
4292        req_include_backtrace_id: bool,
4293        resp_pagination: Pagination<u32>,
4294    ) -> Result<ExecutionWithStateRequestsResponses, DbErrorRead> {
4295        let mut client_guard = self.client.lock().await;
4296        let tx = client_guard.transaction().await?;
4297
4298        let combined_state = get_combined_state(&tx, execution_id).await?;
4299
4300        let events = list_execution_events(
4301            &tx,
4302            execution_id,
4303            Pagination::NewerThan {
4304                length: req_max_length
4305                    .try_into()
4306                    .expect("req_max_length fits in u16"),
4307                cursor: req_since.0,
4308                including_cursor: true,
4309            },
4310            req_include_backtrace_id,
4311        )
4312        .await?;
4313
4314        let responses = list_responses(&tx, execution_id, Some(resp_pagination)).await?;
4315        let max_version = get_max_version(&tx, execution_id).await?;
4316        let max_cursor = get_max_response_cursor(&tx, execution_id).await?;
4317
4318        tx.commit().await?;
4319
4320        Ok(ExecutionWithStateRequestsResponses {
4321            execution_with_state: combined_state.execution_with_state,
4322            events,
4323            responses,
4324            max_version,
4325            max_cursor,
4326        })
4327    }
4328
4329    #[instrument(skip(self))]
4330    async fn upgrade_execution_component(
4331        &self,
4332        execution_id: &ExecutionId,
4333        old: &ComponentDigest,
4334        new: &ComponentDigest,
4335    ) -> Result<(), DbErrorWrite> {
4336        let mut client_guard = self.client.lock().await;
4337        let tx = client_guard.transaction().await?;
4338
4339        upgrade_execution_component(&tx, execution_id, old, new).await?;
4340
4341        tx.commit().await?;
4342        Ok(())
4343    }
4344
4345    #[instrument(skip(self))]
4346    async fn list_logs(
4347        &self,
4348        execution_id: &ExecutionId,
4349        show_derived: bool,
4350        filter: LogFilter,
4351        pagination: Pagination<DateTime<Utc>>,
4352    ) -> Result<ListLogsResponse, DbErrorRead> {
4353        let mut client_guard = self.client.lock().await;
4354        let tx = client_guard.transaction().await?;
4355        let responses = list_logs_tx(&tx, execution_id, show_derived, &filter, &pagination).await?;
4356        tx.commit().await?;
4357        Ok(responses)
4358    }
4359
4360    #[instrument(skip(self))]
4361    async fn list_deployment_states(
4362        &self,
4363        current_time: DateTime<Utc>,
4364        pagination: Pagination<Option<DeploymentId>>,
4365        include_config_json: bool,
4366    ) -> Result<Vec<DeploymentState>, DbErrorRead> {
4367        let mut client_guard = self.client.lock().await;
4368        let tx = client_guard.transaction().await?;
4369        let deployments =
4370            list_deployment_states(&tx, current_time, pagination, include_config_json).await?;
4371        tx.commit().await?;
4372        Ok(deployments)
4373    }
4374
4375    #[instrument(skip(self))]
4376    async fn insert_deployment(&self, record: DeploymentRecord) -> Result<(), DbErrorWrite> {
4377        assert_eq!(
4378            record.status,
4379            DeploymentStatus::Inactive,
4380            "insert_deployment requires Inactive status"
4381        );
4382        assert!(
4383            record.last_active_at.is_none(),
4384            "insert_deployment requires last_active_at == None"
4385        );
4386        let mut client_guard = self.client.lock().await;
4387        let tx = client_guard.transaction().await?;
4388        tx.execute(
4389            "INSERT INTO t_deployment \
4390             (deployment_id, created_at, status, config_json, obelisk_version, created_by) \
4391             VALUES ($1, $2, $3, $4, $5, $6)",
4392            &[
4393                &record.deployment_id.to_string(), // $1
4394                &record.created_at,                // $2
4395                &record.status.as_str(),           // $3
4396                &record.config_json,               // $4
4397                &record.obelisk_version,           // $5
4398                &record.created_by,                // $6
4399            ],
4400        )
4401        .await?;
4402        tx.commit().await?;
4403        Ok(())
4404    }
4405
4406    #[instrument(skip(self))]
4407    async fn activate_deployment(
4408        &self,
4409        deployment_id: DeploymentId,
4410        now: DateTime<Utc>,
4411    ) -> Result<(), DbErrorWrite> {
4412        let mut client_guard = self.client.lock().await;
4413        let tx = client_guard.transaction().await?;
4414        // Demote currently active or enqueued deployment to inactive.
4415        tx.execute(
4416            "UPDATE t_deployment SET status = 'inactive' WHERE status IN ('active', 'enqueued')",
4417            &[],
4418        )
4419        .await?;
4420        // Set target deployment to active, recording activation time.
4421        let rows = tx
4422            .execute(
4423                "UPDATE t_deployment SET status = 'active', last_active_at = $1 WHERE deployment_id = $2",
4424                &[&now, &deployment_id.to_string()],
4425            )
4426            .await?;
4427        tx.commit().await?;
4428        if rows == 0 {
4429            return Err(DbErrorWrite::NotFound);
4430        }
4431        Ok(())
4432    }
4433
4434    async fn enqueue_deployment(&self, deployment_id: DeploymentId) -> Result<(), DbErrorWrite> {
4435        let mut client_guard = self.client.lock().await;
4436        let tx = client_guard.transaction().await?;
4437        // Guard: reject if target deployment is currently active.
4438        let status_opt = tx
4439            .query_opt(
4440                "SELECT status FROM t_deployment WHERE deployment_id = $1",
4441                &[&deployment_id.to_string()],
4442            )
4443            .await?;
4444        match status_opt.as_ref().map(|r| r.get::<_, &str>("status")) {
4445            None => return Err(DbErrorWrite::NotFound),
4446            Some("active") => return Err(DbErrorWriteNonRetriable::Conflict.into()),
4447            _ => {}
4448        }
4449        // Demote any previously enqueued deployment to inactive.
4450        tx.execute(
4451            "UPDATE t_deployment SET status = 'inactive' WHERE status = 'enqueued'",
4452            &[],
4453        )
4454        .await?;
4455        // Set target deployment to enqueued.
4456        let rows = tx
4457            .execute(
4458                "UPDATE t_deployment SET status = 'enqueued' WHERE deployment_id = $1",
4459                &[&deployment_id.to_string()],
4460            )
4461            .await?;
4462        tx.commit().await?;
4463        if rows == 0 {
4464            return Err(DbErrorWrite::NotFound);
4465        }
4466        Ok(())
4467    }
4468
4469    #[instrument(skip(self))]
4470    async fn get_deployment(
4471        &self,
4472        deployment_id: DeploymentId,
4473    ) -> Result<Option<DeploymentRecord>, DbErrorRead> {
4474        let mut client_guard = self.client.lock().await;
4475        let tx = client_guard.transaction().await?;
4476        let row = tx
4477            .query_opt(
4478                "SELECT deployment_id, created_at, last_active_at, status, config_json, obelisk_version, created_by \
4479                 FROM t_deployment WHERE deployment_id = $1",
4480                &[&deployment_id.to_string()],
4481            )
4482            .await?;
4483        tx.commit().await?;
4484        match row {
4485            None => Ok(None),
4486            Some(r) => Ok(Some(deployment_record_from_pg_row(&r)?)),
4487        }
4488    }
4489
4490    #[instrument(skip(self))]
4491    #[cfg(feature = "test")]
4492    async fn get_active_deployment(&self) -> Result<Option<DeploymentRecord>, DbErrorRead> {
4493        let mut client_guard = self.client.lock().await;
4494        let tx = client_guard.transaction().await?;
4495        let row = tx
4496            .query_opt(
4497                "SELECT deployment_id, created_at, last_active_at, status, config_json, obelisk_version, created_by \
4498                 FROM t_deployment WHERE status = 'active' LIMIT 1",
4499                &[],
4500            )
4501            .await?;
4502        tx.commit().await?;
4503        match row {
4504            None => Ok(None),
4505            Some(r) => Ok(Some(deployment_record_from_pg_row(&r)?)),
4506        }
4507    }
4508
4509    async fn get_current_deployment(&self) -> Result<Option<DeploymentRecord>, DbErrorRead> {
4510        let mut client_guard = self.client.lock().await;
4511        let tx = client_guard.transaction().await?;
4512        let row = tx
4513            .query_opt(
4514                "SELECT deployment_id, created_at, last_active_at, status, config_json, obelisk_version, created_by \
4515                 FROM t_deployment WHERE status IN ('enqueued', 'active') \
4516                 ORDER BY CASE status WHEN 'enqueued' THEN 0 ELSE 1 END LIMIT 1",
4517                &[],
4518            )
4519            .await?;
4520        tx.commit().await?;
4521        match row {
4522            None => Ok(None),
4523            Some(r) => Ok(Some(deployment_record_from_pg_row(&r)?)),
4524        }
4525    }
4526
4527    #[instrument(skip(self))]
4528    async fn list_deployments(
4529        &self,
4530        pagination: Pagination<Option<DeploymentId>>,
4531    ) -> Result<Vec<DeploymentRecord>, DbErrorRead> {
4532        let mut client_guard = self.client.lock().await;
4533        let tx = client_guard.transaction().await?;
4534        let mut params: Vec<Box<dyn tokio_postgres::types::ToSql + Sync + Send>> = Vec::new();
4535        let mut add_param = |p: Box<dyn tokio_postgres::types::ToSql + Sync + Send>| {
4536            params.push(p);
4537            format!("${}", params.len())
4538        };
4539
4540        let mut sql = String::from(
4541            "SELECT deployment_id, created_at, last_active_at, status, config_json, obelisk_version, created_by \
4542             FROM t_deployment",
4543        );
4544
4545        if let Some(cursor) = pagination.cursor() {
4546            let p_cursor = add_param(Box::new(cursor.to_string()));
4547            write!(
4548                sql,
4549                " WHERE deployment_id {rel} {p_cursor}",
4550                rel = pagination.rel()
4551            )
4552            .expect("writing to string");
4553        }
4554
4555        let (inner_order, outer_order) = if pagination.is_desc() {
4556            ("DESC", "")
4557        } else {
4558            ("ASC", "DESC")
4559        };
4560
4561        write!(
4562            sql,
4563            " ORDER BY deployment_id {inner_order} LIMIT {limit}",
4564            limit = pagination.length()
4565        )
4566        .expect("writing to string");
4567
4568        let final_sql = if outer_order.is_empty() {
4569            sql
4570        } else {
4571            format!("SELECT * FROM ({sql}) AS sub ORDER BY deployment_id {outer_order}")
4572        };
4573
4574        let params_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> =
4575            params.iter().map(|p| p.as_ref() as _).collect();
4576
4577        let rows = tx.query(&final_sql, &params_refs).await?;
4578        tx.commit().await?;
4579
4580        rows.iter()
4581            .map(deployment_record_from_pg_row)
4582            .collect::<Result<Vec<_>, _>>()
4583    }
4584
4585    #[instrument(skip(self))]
4586    async fn pause_execution(
4587        &self,
4588        execution_id: &ExecutionId,
4589        paused_at: DateTime<Utc>,
4590    ) -> Result<AppendResponse, DbErrorWrite> {
4591        let mut client_guard = self.client.lock().await;
4592        let tx = client_guard.transaction().await?;
4593
4594        let combined_state = get_combined_state(&tx, execution_id).await?;
4595        let appending_version = combined_state.get_next_version_fail_if_finished()?;
4596        debug!("Pausing with {appending_version}");
4597        let (next_version, _) = append(
4598            &tx,
4599            execution_id,
4600            AppendRequest {
4601                created_at: paused_at,
4602                event: ExecutionRequest::Paused,
4603            },
4604            appending_version,
4605        )
4606        .await?;
4607
4608        tx.commit().await?;
4609        Ok(next_version)
4610    }
4611
4612    #[instrument(skip(self))]
4613    async fn unpause_execution(
4614        &self,
4615        execution_id: &ExecutionId,
4616        unpaused_at: DateTime<Utc>,
4617    ) -> Result<AppendResponse, DbErrorWrite> {
4618        let mut client_guard = self.client.lock().await;
4619        let tx = client_guard.transaction().await?;
4620
4621        let combined_state = get_combined_state(&tx, execution_id).await?;
4622        let appending_version = combined_state.get_next_version_fail_if_finished()?;
4623        debug!("Unpausing with {appending_version}");
4624        let (next_version, _) = append(
4625            &tx,
4626            execution_id,
4627            AppendRequest {
4628                created_at: unpaused_at,
4629                event: ExecutionRequest::Unpaused,
4630            },
4631            appending_version,
4632        )
4633        .await?;
4634
4635        tx.commit().await?;
4636        Ok(next_version)
4637    }
4638}
4639
4640#[async_trait]
4641impl DbPoolCloseable for PostgresPool {
4642    async fn close(&self) {
4643        self.pool.close();
4644    }
4645}
4646
4647#[cfg(feature = "test")]
4648#[async_trait]
4649impl concepts::storage::DbConnectionTest for PostgresConnection {
4650    #[instrument(level = Level::DEBUG, skip(self, response_event), fields(join_set_id = %response_event.join_set_id))]
4651    async fn append_response(
4652        &self,
4653        created_at: DateTime<Utc>,
4654        execution_id: ExecutionId,
4655        response_event: JoinSetResponseEvent,
4656    ) -> Result<(), DbErrorWrite> {
4657        debug!("append_response");
4658        let event = JoinSetResponseEventOuter {
4659            created_at,
4660            event: response_event,
4661        };
4662
4663        let mut client_guard = self.client.lock().await;
4664        let tx = client_guard.transaction().await?;
4665
4666        let notifier = append_response(&tx, &execution_id, event).await?;
4667
4668        tx.commit().await?;
4669        drop(client_guard);
4670
4671        self.notify_all(vec![notifier], created_at);
4672        Ok(())
4673    }
4674}
4675
4676#[cfg(feature = "test")]
4677impl PostgresPool {
4678    pub async fn drop_database(&self) {
4679        let mut cfg = deadpool_postgres::Config::new();
4680        cfg.host = Some(self.config.host.clone());
4681        cfg.user = Some(self.config.user.clone());
4682        cfg.password = Some(self.config.password.expose_secret().to_string());
4683        cfg.dbname = Some(ADMIN_DB_NAME.into());
4684        cfg.manager = Some(ManagerConfig {
4685            recycling_method: RecyclingMethod::Fast,
4686        });
4687
4688        let pool = cfg
4689            .create_pool(None, NoTls)
4690            .map_err(|err| {
4691                error!("Cannot create the default pool - {err:?}");
4692                InitializationError
4693            })
4694            .unwrap();
4695
4696        let client = pool
4697            .get()
4698            .await
4699            .map_err(|err| {
4700                error!("Cannot get a connection from the default pool - {err:?}");
4701                InitializationError
4702            })
4703            .unwrap();
4704        for _ in 0..3 {
4705            let res = client
4706                .execute(&format!("DROP DATABASE {}", self.config.db_name), &[])
4707                .await; // Waits 1s on error, no need to sleep more.
4708            if res.is_ok() {
4709                debug!("Database '{}' dropped.", self.config.db_name);
4710                return;
4711            }
4712            debug!("Dropping db failed - {res:?}",);
4713        }
4714        warn!("Did not drop database {}", self.config.db_name);
4715    }
4716}