Skip to main content

obeli_sk_db_postgres/
postgres_dao.rs

1use crate::postgres_dao::ddl::{ADMIN_DB_NAME, T_METADATA_EXPECTED_SCHEMA_VERSION};
2use async_trait::async_trait;
3use chrono::{DateTime, Utc};
4use concepts::{
5    ComponentId, ComponentRetryConfig, ComponentType, ExecutionId, FunctionFqn, JoinSetId,
6    StrVariant, SupportedFunctionReturnValue,
7    component_id::{ComponentDigest, Digest},
8    prefixed_ulid::{DelayId, DeploymentId, ExecutionIdDerived, ExecutorId, RunId},
9    storage::{
10        AppendBatchResponse, AppendDelayResponseOutcome, AppendEventsToExecution, AppendRequest,
11        AppendResponse, AppendResponseToExecution, BacktraceFilter, BacktraceInfo, CreateRequest,
12        DUMMY_CREATED, DUMMY_HISTORY_EVENT, DbConnection, DbErrorGeneric, DbErrorRead,
13        DbErrorReadWithTimeout, DbErrorWrite, DbErrorWriteNonRetriable, DbExecutor, DbExternalApi,
14        DbPool, DbPoolCloseable, DeploymentRecord, DeploymentState, DeploymentStatus,
15        ExecutionEvent, ExecutionListPagination, ExecutionRequest, ExecutionWithState,
16        ExecutionWithStateRequestsResponses, ExpiredDelay, ExpiredLock, ExpiredTimer,
17        HISTORY_EVENT_TYPE_JOIN_NEXT, HistoryEvent, JoinSetRequest, JoinSetResponse,
18        JoinSetResponseEvent, JoinSetResponseEventOuter, ListExecutionEventsResponse,
19        ListExecutionsFilter, ListLogsResponse, ListResponsesResponse, LockPendingResponse, Locked,
20        LockedBy, LockedExecution, LogEntry, LogEntryRow, LogFilter, LogInfoAppendRow, LogLevel,
21        LogStreamType, Pagination, PendingState, PendingStateBlockedByJoinSet,
22        PendingStateFinishedResultKind, PendingStateMergedPause, ResponseCursor,
23        ResponseWithCursor, STATE_BLOCKED_BY_JOIN_SET, STATE_FINISHED, STATE_LOCKED,
24        STATE_PENDING_AT, TimeoutOutcome, Version, VersionType, WasmBacktrace,
25    },
26};
27use db_common::{
28    AppendNotifier, CombinedState, CombinedStateDTO, NotifierExecutionFinished, NotifierPendingAt,
29    PendingFfqnSubscribersHolder,
30};
31use deadpool_postgres::{Client, ManagerConfig, Pool, RecyclingMethod};
32use hashbrown::HashMap;
33use secrecy::{ExposeSecret as _, SecretString};
34use sha2::{Digest as _, Sha256};
35use std::{collections::VecDeque, pin::Pin, str::FromStr as _, sync::Arc, time::Duration};
36use std::{fmt::Write as _, panic::Location};
37use strum::IntoEnumIterator as _;
38use tokio::sync::{mpsc, oneshot};
39use tokio_postgres::{
40    NoTls, Row, Transaction,
41    row::RowIndex,
42    types::{FromSql, Json, ToSql},
43};
44use tracing::{Level, debug, error, info, instrument, trace, warn};
45use tracing_error::SpanTrace;
46
47#[track_caller]
48fn get<'a, T: FromSql<'a>, I: RowIndex + std::fmt::Display + Copy>(
49    row: &'a Row,
50    name: I,
51) -> Result<T, DbErrorGeneric> {
52    match row.try_get(name) {
53        Ok(ok) => Ok(ok),
54        Err(err) => {
55            // no map_err, cannot attach `track_caller`
56            Err(consistency_db_err(format!(
57                "Failed to retrieve column '{name}': {err:?}"
58            )))
59        }
60    }
61}
62
63mod ddl {
64    use super::{STATE_LOCKED, STATE_PENDING_AT};
65    use concepts::storage::HISTORY_EVENT_TYPE_JOIN_NEXT;
66    use const_format::formatcp;
67
68    pub const ADMIN_DB_NAME: &str = "postgres";
69
70    pub const T_METADATA_EXPECTED_SCHEMA_VERSION: i32 = 3;
71
72    // T_METADATA
73    pub const CREATE_TABLE_T_METADATA: &str = r"
74CREATE TABLE IF NOT EXISTS t_metadata (
75    id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
76    schema_version INTEGER NOT NULL,
77    created_at TIMESTAMPTZ NOT NULL
78);
79";
80
81    // T_EXECUTION_LOG
82    pub const CREATE_TABLE_T_EXECUTION_LOG: &str = r"
83CREATE TABLE IF NOT EXISTS t_execution_log (
84    execution_id TEXT NOT NULL,
85    created_at TIMESTAMPTZ NOT NULL,
86    json_value JSON NOT NULL,
87    version BIGINT NOT NULL CHECK (version >= 0),
88    variant TEXT NOT NULL,
89    join_set_id TEXT,
90    history_event_type TEXT GENERATED ALWAYS AS (json_value #>> '{history_event,event,type}') STORED,
91    PRIMARY KEY (execution_id, version)
92);
93";
94
95    // Indexes for t_execution_log
96    pub const CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_VERSION: &str = r"
97CREATE INDEX IF NOT EXISTS idx_t_execution_log_execution_id_version ON t_execution_log (execution_id, version);
98";
99
100    pub const CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_VARIANT: &str = r"
101CREATE INDEX IF NOT EXISTS idx_t_execution_log_execution_id_variant ON t_execution_log (execution_id, variant);
102";
103
104    pub const CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_JOIN_SET: &str = const_format::formatcp!(
105        "CREATE INDEX IF NOT EXISTS idx_t_execution_log_execution_id_join_set ON t_execution_log
106        (execution_id, join_set_id, history_event_type) WHERE history_event_type='{}';",
107        HISTORY_EVENT_TYPE_JOIN_NEXT
108    );
109
110    // T_JOIN_SET_RESPONSE
111    pub const CREATE_TABLE_T_JOIN_SET_RESPONSE: &str = r"
112CREATE TABLE IF NOT EXISTS t_join_set_response (
113    id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
114    created_at TIMESTAMPTZ NOT NULL,
115    execution_id TEXT NOT NULL,
116    join_set_id TEXT NOT NULL,
117
118    delay_id TEXT,
119    delay_success BOOLEAN,
120
121    child_execution_id TEXT,
122    finished_version BIGINT CHECK (finished_version >= 0),
123
124    UNIQUE (execution_id, join_set_id, delay_id, child_execution_id)
125);
126";
127
128    // Indexes for t_join_set_response
129    pub const CREATE_INDEX_IDX_T_JOIN_SET_RESPONSE_EXECUTION_ID_ID: &str = r"
130CREATE INDEX IF NOT EXISTS idx_t_join_set_response_execution_id_id ON t_join_set_response (execution_id, id);
131";
132
133    pub const CREATE_INDEX_IDX_JOIN_SET_RESPONSE_UNIQUE_CHILD_ID: &str = r"
134CREATE UNIQUE INDEX IF NOT EXISTS idx_join_set_response_unique_child_id
135ON t_join_set_response (child_execution_id) WHERE child_execution_id IS NOT NULL;
136";
137
138    pub const CREATE_INDEX_IDX_JOIN_SET_RESPONSE_UNIQUE_DELAY_ID: &str = r"
139CREATE UNIQUE INDEX IF NOT EXISTS idx_join_set_response_unique_delay_id
140ON t_join_set_response (delay_id) WHERE delay_id IS NOT NULL;
141";
142
143    // T_STATE
144    pub const CREATE_TABLE_T_STATE: &str = r"
145CREATE TABLE IF NOT EXISTS t_state (
146    execution_id TEXT NOT NULL,
147    is_top_level BOOLEAN NOT NULL,
148    corresponding_version BIGINT NOT NULL CHECK (corresponding_version >= 0),
149    ffqn TEXT NOT NULL,
150    created_at TIMESTAMPTZ NOT NULL,
151    component_id_input_digest BYTEA NOT NULL,
152    component_type TEXT NOT NULL,
153    first_scheduled_at TIMESTAMPTZ NOT NULL,
154    deployment_id TEXT NOT NULL,
155    is_paused BOOLEAN NOT NULL,
156
157    pending_expires_finished TIMESTAMPTZ NOT NULL,
158    state TEXT NOT NULL,
159    updated_at TIMESTAMPTZ NOT NULL,
160    intermittent_event_count BIGINT NOT NULL CHECK (intermittent_event_count >=0),
161
162    max_retries BIGINT CHECK (max_retries >= 0),
163    retry_exp_backoff_millis BIGINT CHECK (retry_exp_backoff_millis >= 0),
164    last_lock_version BIGINT CHECK (last_lock_version >= 0),
165    executor_id TEXT,
166    run_id TEXT,
167
168    join_set_id TEXT,
169    join_set_closing BOOLEAN,
170
171    result_kind JSONB,
172
173    PRIMARY KEY (execution_id)
174);
175";
176
177    // Indexes for t_state
178    // For `get_pending_of_single_ffqn`
179    pub const IDX_T_STATE_LOCK_PENDING_BY_FFQN: &str = formatcp!(
180        "CREATE INDEX IF NOT EXISTS idx_t_state_lock_pending_by_ffqn ON t_state (state, pending_expires_finished, ffqn) WHERE state = '{}';",
181        STATE_PENDING_AT
182    );
183    // For `get_pending_by_component_input_digest`
184    pub const IDX_T_STATE_LOCK_PENDING_BY_COMPONENT: &str = formatcp!(
185        "CREATE INDEX IF NOT EXISTS idx_t_state_lock_pending_by_component ON t_state (state, pending_expires_finished, component_id_input_digest) WHERE state = '{}';",
186        STATE_PENDING_AT
187    );
188
189    pub const IDX_T_STATE_EXPIRED_LOCKS: &str = formatcp!(
190        "CREATE INDEX IF NOT EXISTS idx_t_state_expired_locks ON t_state (pending_expires_finished) WHERE state = '{}';",
191        STATE_LOCKED
192    );
193
194    pub const IDX_T_STATE_EXECUTION_ID_IS_TOP_LEVEL: &str = r"
195CREATE INDEX IF NOT EXISTS idx_t_state_execution_id_is_root ON t_state (execution_id, is_top_level);
196";
197
198    pub const IDX_T_STATE_FFQN: &str = r"
199CREATE INDEX IF NOT EXISTS idx_t_state_ffqn ON t_state (ffqn);
200";
201
202    pub const IDX_T_STATE_CREATED_AT: &str = r"
203CREATE INDEX IF NOT EXISTS idx_t_state_created_at ON t_state (created_at);
204";
205    // For `list_deployment_states`
206    pub const IDX_T_STATE_DEPLOYMENT_STATE: &str = r"
207CREATE INDEX IF NOT EXISTS idx_t_state_deployment_state ON t_state (deployment_id, state);
208";
209
210    // T_DELAY
211    pub const CREATE_TABLE_T_DELAY: &str = r"
212CREATE TABLE IF NOT EXISTS t_delay (
213    execution_id TEXT NOT NULL,
214    join_set_id TEXT NOT NULL,
215    delay_id TEXT NOT NULL,
216    expires_at TIMESTAMPTZ NOT NULL,
217    PRIMARY KEY (execution_id, join_set_id, delay_id)
218);
219";
220
221    // Backtrace
222
223    pub const CREATE_TABLE_T_WASM_BACKTRACE: &str = r"
224CREATE TABLE IF NOT EXISTS t_wasm_backtrace (
225    backtrace_hash BYTEA PRIMARY KEY,
226    wasm_backtrace JSONB NOT NULL
227);
228";
229
230    pub const CREATE_TABLE_T_EXECUTION_BACKTRACE: &str = r"
231CREATE TABLE IF NOT EXISTS t_execution_backtrace (
232    execution_id TEXT NOT NULL,
233    component_id JSONB NOT NULL,
234    version_min_including BIGINT NOT NULL CHECK (version_min_including >= 0),
235    version_max_excluding BIGINT NOT NULL CHECK (version_max_excluding >= 0),
236    backtrace_hash BYTEA NOT NULL,
237    PRIMARY KEY (execution_id, version_min_including, version_max_excluding),
238    FOREIGN KEY (backtrace_hash) REFERENCES t_wasm_backtrace(backtrace_hash)
239);
240";
241
242    pub const IDX_T_EXECUTION_BACKTRACE_EXECUTION_ID_VERSION: &str = r"
243CREATE INDEX IF NOT EXISTS idx_t_execution_backtrace_execution_id_version ON t_execution_backtrace (execution_id, version_min_including, version_max_excluding);
244";
245
246    // Source files
247    pub const CREATE_TABLE_T_SOURCE_FILE: &str = r"
248CREATE TABLE IF NOT EXISTS t_source_file (
249    content_hash BYTEA PRIMARY KEY,
250    content      TEXT NOT NULL
251);
252";
253    pub const CREATE_TABLE_T_COMPONENT_SOURCE: &str = r"
254CREATE TABLE IF NOT EXISTS t_component_source (
255    component_digest BYTEA   NOT NULL,
256    frame_key        TEXT    NOT NULL,
257    is_suffix        BOOLEAN NOT NULL,
258    content_hash     BYTEA   NOT NULL,
259    PRIMARY KEY (component_digest, frame_key, is_suffix),
260    FOREIGN KEY (content_hash) REFERENCES t_source_file(content_hash)
261);
262";
263
264    // Logs & Std sterams
265    pub const CREATE_TABLE_T_LOG: &str = r"
266CREATE TABLE IF NOT EXISTS t_log (
267    id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
268    execution_id TEXT NOT NULL,
269    run_id TEXT NOT NULL,
270    created_at TIMESTAMPTZ NOT NULL,
271    level INTEGER,
272    message TEXT,
273    stream_type INTEGER,
274    payload BYTEA
275);
276";
277    pub const IDX_T_LOG_EXECUTION_ID_RUN_ID_CREATED_AT: &str = r"
278CREATE INDEX IF NOT EXISTS idx_t_log_execution_id_run_id_created_at ON t_log (execution_id, run_id, created_at);
279";
280    pub const IDX_T_LOG_EXECUTION_ID_CREATED_AT: &str = r"
281CREATE INDEX IF NOT EXISTS idx_t_log_execution_id_created_at ON t_log (execution_id, created_at);
282";
283
284    // T_DEPLOYMENT
285    pub const CREATE_TABLE_T_DEPLOYMENT: &str = r"
286CREATE TABLE IF NOT EXISTS t_deployment (
287    deployment_id TEXT     NOT NULL PRIMARY KEY,
288    created_at    TIMESTAMPTZ NOT NULL,
289    last_active_at TIMESTAMPTZ,
290    status        TEXT     NOT NULL DEFAULT 'inactive',
291    config_json      TEXT     NOT NULL,
292    obelisk_version  TEXT     NOT NULL,
293    created_by       TEXT
294);
295";
296    pub const IDX_T_DEPLOYMENT_STATUS: &str = r"
297CREATE INDEX IF NOT EXISTS idx_t_deployment_status ON t_deployment (status);
298";
299    // Enforces at most one active deployment at a time.
300    pub const IDX_T_DEPLOYMENT_SINGLE_ACTIVE: &str = r"
301CREATE UNIQUE INDEX IF NOT EXISTS idx_t_deployment_single_active ON t_deployment ((1)) WHERE status = 'active';
302";
303    // Enforces at most one enqueued deployment at a time.
304    pub const IDX_T_DEPLOYMENT_SINGLE_ENQUEUED: &str = r"
305CREATE UNIQUE INDEX IF NOT EXISTS idx_t_deployment_single_enqueued ON t_deployment ((1)) WHERE status = 'enqueued';
306";
307}
308
309#[derive(Debug, Clone)]
310pub struct PostgresConfig {
311    pub host: String,
312    pub user: String,
313    pub password: SecretString,
314    pub db_name: String,
315}
316
317#[derive(Debug, thiserror::Error)]
318#[error("initialization error")]
319pub struct InitializationError;
320
321async fn create_database(
322    config: &PostgresConfig,
323    provision_policy: ProvisionPolicy,
324) -> Result<DbInitialzationOutcome, InitializationError> {
325    let mut admin_cfg = deadpool_postgres::Config::new();
326    admin_cfg.host = Some(config.host.clone());
327    admin_cfg.user = Some(config.user.clone());
328    admin_cfg.password = Some(config.password.expose_secret().to_string());
329    admin_cfg.dbname = Some(ADMIN_DB_NAME.into());
330    admin_cfg.manager = Some(ManagerConfig {
331        recycling_method: RecyclingMethod::Fast,
332    });
333
334    let admin_pool = admin_cfg.create_pool(None, NoTls).map_err(|err| {
335        error!("Cannot create the default pool - {err:?}");
336        InitializationError
337    })?;
338
339    let client = admin_pool.get().await.map_err(|err| {
340        error!("Cannot get a connection from the default pool - {err:?}");
341        InitializationError
342    })?;
343
344    let row = client
345        .query_opt(
346            &format!(
347                "SELECT 1 FROM pg_database WHERE datname = '{}'",
348                config.db_name
349            ),
350            &[],
351        )
352        .await
353        .map_err(|err| {
354            error!("Cannot select from the default database - {err:?}");
355            InitializationError
356        })?;
357
358    match (row, provision_policy) {
359        (None, ProvisionPolicy::MustCreate | ProvisionPolicy::Auto) => {
360            client
361                .execute(&format!("CREATE DATABASE {}", config.db_name), &[])
362                .await
363                .map_err(|err| {
364                    error!("Cannot create the database - {err:?}");
365                    InitializationError
366                })?;
367            info!("Database '{}' created.", config.db_name);
368            Ok(DbInitialzationOutcome::Created)
369        }
370        (Some(_), ProvisionPolicy::Auto) => {
371            info!("Database '{}' exists.", config.db_name);
372            Ok(DbInitialzationOutcome::Existing)
373        }
374        (Some(_), ProvisionPolicy::MustCreate) => {
375            warn!("Database '{}' already exists.", config.db_name);
376            Err(InitializationError)
377        }
378        (_, ProvisionPolicy::NeverCreate) => unreachable!("checked by the caller"),
379    }
380}
381
382// All mutexes here have a very short critical section completely controlled by this module, thus using std mutex.
383type ResponseSubscribers =
384    Arc<std::sync::Mutex<HashMap<ExecutionId, (oneshot::Sender<ResponseWithCursor>, u64)>>>;
385type PendingSubscribers = Arc<std::sync::Mutex<PendingFfqnSubscribersHolder>>;
386type ExecutionFinishedSubscribers = std::sync::Mutex<
387    HashMap<ExecutionId, HashMap<u64, oneshot::Sender<SupportedFunctionReturnValue>>>,
388>;
389
390pub struct PostgresPool {
391    pool: Pool,
392    response_subscribers: ResponseSubscribers,
393    pending_subscribers: PendingSubscribers,
394    execution_finished_subscribers: Arc<ExecutionFinishedSubscribers>,
395    // only for tests.
396    pub config: PostgresConfig,
397}
398
399#[async_trait]
400impl DbPool for PostgresPool {
401    async fn db_exec_conn(&self) -> Result<Box<dyn DbExecutor>, DbErrorGeneric> {
402        let client = self.pool.get().await?;
403
404        Ok(Box::new(PostgresConnection {
405            client: tokio::sync::Mutex::new(client),
406            response_subscribers: self.response_subscribers.clone(),
407            pending_subscribers: self.pending_subscribers.clone(),
408            execution_finished_subscribers: self.execution_finished_subscribers.clone(),
409        }))
410    }
411
412    async fn connection(&self) -> Result<Box<dyn DbConnection>, DbErrorGeneric> {
413        let client = self.pool.get().await?;
414
415        Ok(Box::new(PostgresConnection {
416            client: tokio::sync::Mutex::new(client),
417            response_subscribers: self.response_subscribers.clone(),
418            pending_subscribers: self.pending_subscribers.clone(),
419            execution_finished_subscribers: self.execution_finished_subscribers.clone(),
420        }))
421    }
422
423    async fn external_api_conn(&self) -> Result<Box<dyn DbExternalApi>, DbErrorGeneric> {
424        let client = self.pool.get().await?;
425
426        Ok(Box::new(PostgresConnection {
427            client: tokio::sync::Mutex::new(client),
428            response_subscribers: self.response_subscribers.clone(),
429            pending_subscribers: self.pending_subscribers.clone(),
430            execution_finished_subscribers: self.execution_finished_subscribers.clone(),
431        }))
432    }
433
434    #[cfg(feature = "test")]
435    async fn connection_test(
436        &self,
437    ) -> Result<Box<dyn concepts::storage::DbConnectionTest>, DbErrorGeneric> {
438        let client = self.pool.get().await?;
439
440        Ok(Box::new(PostgresConnection {
441            client: tokio::sync::Mutex::new(client),
442            response_subscribers: self.response_subscribers.clone(),
443            pending_subscribers: self.pending_subscribers.clone(),
444            execution_finished_subscribers: self.execution_finished_subscribers.clone(),
445        }))
446    }
447}
448
449pub struct PostgresConnection {
450    client: tokio::sync::Mutex<Client>, // Callers should not hold onto a connection for too long but it is not controlled by this module, thus tokio mutex.
451    response_subscribers: ResponseSubscribers,
452    pending_subscribers: PendingSubscribers,
453    execution_finished_subscribers: Arc<ExecutionFinishedSubscribers>,
454}
455
456#[derive(Debug, Clone, Copy, PartialEq, Eq)]
457pub enum ProvisionPolicy {
458    NeverCreate,
459    /// Create database if it does not exist.
460    Auto,
461    /// Only for tests: Fail if database already exists.
462    MustCreate,
463}
464
465#[derive(Debug, Clone, Copy, PartialEq, Eq)]
466pub enum DbInitialzationOutcome {
467    Created,
468    Existing,
469}
470
471impl PostgresPool {
472    #[instrument(skip_all, name = "postgres_new")]
473    pub async fn new(
474        config: PostgresConfig,
475        provision_policy: ProvisionPolicy,
476    ) -> Result<PostgresPool, InitializationError> {
477        Self::new_with_outcome(config, provision_policy)
478            .await
479            .map(|(db, _)| db)
480    }
481
482    pub async fn new_with_outcome(
483        config: PostgresConfig,
484        provision_policy: ProvisionPolicy,
485    ) -> Result<(PostgresPool, DbInitialzationOutcome), InitializationError> {
486        let outcome = if matches!(
487            provision_policy,
488            ProvisionPolicy::Auto | ProvisionPolicy::MustCreate
489        ) {
490            create_database(&config, provision_policy).await?
491        } else {
492            DbInitialzationOutcome::Existing
493        };
494        let mut cfg = deadpool_postgres::Config::new();
495        cfg.host = Some(config.host.clone());
496        cfg.user = Some(config.user.clone());
497        cfg.password = Some(config.password.expose_secret().to_string());
498        cfg.dbname = Some(config.db_name.clone());
499        cfg.manager = Some(ManagerConfig {
500            recycling_method: RecyclingMethod::Fast,
501        });
502
503        let pool = cfg.create_pool(None, NoTls).map_err(|err| {
504            error!("Cannot create the database pool - {err:?}");
505            InitializationError
506        })?;
507        let client = pool.get().await.map_err(|err| {
508            error!("Cannot get a connection from the database pool - {err:?}");
509            InitializationError
510        })?;
511
512        let statements = vec![
513            ddl::CREATE_TABLE_T_METADATA,
514            ddl::CREATE_TABLE_T_EXECUTION_LOG,
515            ddl::CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_VERSION,
516            ddl::CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_VARIANT,
517            ddl::CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_JOIN_SET,
518            ddl::CREATE_TABLE_T_JOIN_SET_RESPONSE,
519            ddl::CREATE_INDEX_IDX_T_JOIN_SET_RESPONSE_EXECUTION_ID_ID,
520            ddl::CREATE_INDEX_IDX_JOIN_SET_RESPONSE_UNIQUE_CHILD_ID,
521            ddl::CREATE_INDEX_IDX_JOIN_SET_RESPONSE_UNIQUE_DELAY_ID,
522            ddl::CREATE_TABLE_T_STATE,
523            ddl::IDX_T_STATE_LOCK_PENDING_BY_FFQN,
524            ddl::IDX_T_STATE_LOCK_PENDING_BY_COMPONENT,
525            ddl::IDX_T_STATE_EXPIRED_LOCKS,
526            ddl::IDX_T_STATE_EXECUTION_ID_IS_TOP_LEVEL,
527            ddl::IDX_T_STATE_FFQN,
528            ddl::IDX_T_STATE_CREATED_AT,
529            ddl::IDX_T_STATE_DEPLOYMENT_STATE,
530            ddl::CREATE_TABLE_T_DELAY,
531            ddl::CREATE_TABLE_T_WASM_BACKTRACE,
532            ddl::CREATE_TABLE_T_EXECUTION_BACKTRACE,
533            ddl::IDX_T_EXECUTION_BACKTRACE_EXECUTION_ID_VERSION,
534            ddl::CREATE_TABLE_T_SOURCE_FILE,
535            ddl::CREATE_TABLE_T_COMPONENT_SOURCE,
536            ddl::CREATE_TABLE_T_LOG,
537            ddl::IDX_T_LOG_EXECUTION_ID_RUN_ID_CREATED_AT,
538            ddl::IDX_T_LOG_EXECUTION_ID_CREATED_AT,
539            ddl::CREATE_TABLE_T_DEPLOYMENT,
540            ddl::IDX_T_DEPLOYMENT_STATUS,
541            ddl::IDX_T_DEPLOYMENT_SINGLE_ACTIVE,
542            ddl::IDX_T_DEPLOYMENT_SINGLE_ENQUEUED,
543        ];
544
545        // Combine into one batch execution for atomicity per round-trip (or efficiency)
546        let batch_sql = statements.join("\n");
547        client.batch_execute(&batch_sql).await.map_err(|err| {
548            error!("Cannot run the DDL import - {err:?}");
549            InitializationError
550        })?;
551
552        let row = client
553            .query_opt(
554                "SELECT schema_version FROM t_metadata ORDER BY id DESC LIMIT 1",
555                &[],
556            )
557            .await
558            .map_err(|err| {
559                error!("Cannot select schema version - {err:?}");
560                InitializationError
561            })?;
562
563        // Postgres INTEGER maps to i32
564        let actual_version = match row {
565            Some(r) => Some(r.try_get::<_, i32>("schema_version").map_err(|e| {
566                error!("Failed to get schema_version column: {e}");
567                InitializationError
568            })?),
569            None => None,
570        };
571
572        match actual_version {
573            None => {
574                client
575                    .execute(
576                        "INSERT INTO t_metadata (schema_version, created_at) VALUES ($1, $2)",
577                        &[&(T_METADATA_EXPECTED_SCHEMA_VERSION), &Utc::now()],
578                    )
579                    .await
580                    .map_err(|err| {
581                        error!("Cannot insert schema version - {err:?}");
582                        InitializationError
583                    })?;
584            }
585            Some(actual_version) => {
586                // Fail on unexpected `schema_version`.
587                if (actual_version) != T_METADATA_EXPECTED_SCHEMA_VERSION {
588                    error!(
589                        "Wrong schema version, expected {T_METADATA_EXPECTED_SCHEMA_VERSION}, got {actual_version}"
590                    );
591                    return Err(InitializationError);
592                }
593            }
594        }
595
596        debug!("Database schema initialized.");
597
598        Ok((
599            PostgresPool {
600                pool,
601                execution_finished_subscribers: Arc::default(),
602                pending_subscribers: Arc::default(),
603                response_subscribers: Arc::default(),
604                config,
605            },
606            outcome,
607        ))
608    }
609}
610
611fn deployment_record_from_pg_row(row: &Row) -> Result<DeploymentRecord, DbErrorRead> {
612    let deployment_id_str: String = get(row, "deployment_id")?;
613    let deployment_id = deployment_id_str.parse::<DeploymentId>().map_err(|e| {
614        DbErrorRead::Generic(consistency_db_err(format!("invalid deployment_id: {e}")))
615    })?;
616    let status_str: String = get(row, "status")?;
617    let status = status_str
618        .parse::<DeploymentStatus>()
619        .map_err(|e| DbErrorRead::Generic(consistency_db_err(format!("invalid status: {e}"))))?;
620    Ok(DeploymentRecord {
621        deployment_id,
622        created_at: get(row, "created_at")?,
623        last_active_at: get(row, "last_active_at")?,
624        status,
625        config_json: get(row, "config_json")?,
626        obelisk_version: get(row, "obelisk_version")?,
627        created_by: get(row, "created_by")?,
628    })
629}
630
631#[track_caller]
632fn consistency_db_err(reason: impl Into<StrVariant>) -> DbErrorGeneric {
633    DbErrorGeneric::Uncategorized {
634        reason: reason.into(),
635        context: SpanTrace::capture(),
636        source: None,
637        loc: Location::caller(),
638    }
639}
640#[track_caller]
641fn consistency_db_err_src(
642    reason: impl Into<StrVariant>,
643    source: Arc<dyn std::error::Error + Send + Sync>,
644) -> DbErrorGeneric {
645    DbErrorGeneric::Uncategorized {
646        reason: reason.into(),
647        context: SpanTrace::capture(),
648        source: Some(source),
649        loc: Location::caller(),
650    }
651}
652
653#[derive(Debug, Clone)]
654struct DelayReq {
655    join_set_id: JoinSetId,
656    delay_id: DelayId,
657    expires_at: DateTime<Utc>,
658}
659
660async fn fetch_created_event(
661    tx: &Transaction<'_>,
662    execution_id: &ExecutionId,
663) -> Result<CreateRequest, DbErrorRead> {
664    let stmt = "SELECT created_at, json_value FROM t_execution_log WHERE \
665                execution_id = $1 AND version = 0";
666
667    let row = tx.query_one(stmt, &[&execution_id.to_string()]).await?;
668
669    let created_at = get(&row, "created_at")?;
670    let event: Json<ExecutionRequest> = get(&row, "json_value")?;
671    let event = event.0;
672
673    if let ExecutionRequest::Created {
674        ffqn,
675        params,
676        parent,
677        scheduled_at,
678        component_id,
679        deployment_id,
680        metadata,
681        scheduled_by,
682    } = event
683    {
684        Ok(CreateRequest {
685            created_at,
686            execution_id: execution_id.clone(),
687            ffqn,
688            params,
689            parent,
690            scheduled_at,
691            component_id,
692            deployment_id,
693            metadata,
694            scheduled_by,
695        })
696    } else {
697        error!("Row with version=0 must be a `Created` event - {event:?}");
698        Err(consistency_db_err("expected `Created` event").into())
699    }
700}
701
702fn check_expected_next_and_appending_version(
703    expected_version: &Version,
704    appending_version: &Version,
705) -> Result<(), DbErrorWrite> {
706    if *expected_version != *appending_version {
707        debug!(
708            "Version conflict - expected: {expected_version:?}, appending: {appending_version:?}"
709        );
710        return Err(DbErrorWrite::NonRetriable(
711            DbErrorWriteNonRetriable::VersionConflict {
712                expected: expected_version.clone(),
713                requested: appending_version.clone(),
714            },
715        ));
716    }
717    Ok(())
718}
719
720#[instrument(level = Level::DEBUG, skip_all, fields(execution_id = %req.execution_id))]
721async fn create_inner(
722    tx: &Transaction<'_>,
723    req: CreateRequest,
724) -> Result<(AppendResponse, AppendNotifier), DbErrorWrite> {
725    trace!("create_inner");
726
727    let version = Version::default();
728    let execution_id = req.execution_id.clone();
729    let execution_id_str = execution_id.to_string();
730    let ffqn = req.ffqn.clone();
731    let created_at = req.created_at;
732    let scheduled_at = req.scheduled_at;
733    let component_id = req.component_id.clone();
734    let deployment_id = req.deployment_id;
735
736    let event = ExecutionRequest::from(req);
737    let event = Json(event);
738
739    tx.execute(
740        "INSERT INTO t_execution_log (
741            execution_id, created_at, version, json_value, variant, join_set_id
742        ) VALUES ($1, $2, $3, $4, $5, $6)",
743        &[
744            &execution_id_str,
745            &created_at,
746            &i64::from(version.0), // BIGINT
747            &event,
748            &event.0.variant(),
749            &event.0.join_set_id().map(std::string::ToString::to_string),
750        ],
751    )
752    .await?;
753
754    let pending_at = {
755        debug!("Creating with `Pending(`{scheduled_at:?}`)");
756
757        tx.execute(
758            r"
759            INSERT INTO t_state (
760                execution_id,
761                is_top_level,
762                corresponding_version,
763                pending_expires_finished,
764                ffqn,
765                state,
766                created_at,
767                component_id_input_digest,
768                component_type,
769                deployment_id,
770                first_scheduled_at,
771                updated_at,
772                intermittent_event_count,
773                is_paused
774            ) VALUES (
775                $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, CURRENT_TIMESTAMP, 0, false
776            )",
777            &[
778                &execution_id_str,
779                &execution_id.is_top_level(),
780                &i64::from(version.0),
781                &scheduled_at,
782                &ffqn.to_string(),
783                &STATE_PENDING_AT,
784                &created_at,
785                &component_id.component_digest.as_slice(),
786                &component_id.component_type.to_string(),
787                &deployment_id.to_string(),
788                &scheduled_at,
789            ],
790        )
791        .await?;
792
793        AppendNotifier {
794            pending_at: Some(NotifierPendingAt {
795                scheduled_at,
796                ffqn,
797                component_input_digest: component_id.component_digest,
798            }),
799            execution_finished: None,
800            response: None,
801        }
802    };
803
804    let next_version = Version::new(version.0 + 1);
805    Ok((next_version, pending_at))
806}
807
808#[instrument(level = Level::DEBUG, skip_all, fields(%execution_id, %scheduled_at))]
809async fn update_state_pending_after_response_appended(
810    tx: &Transaction<'_>,
811    execution_id: &ExecutionId,
812    scheduled_at: DateTime<Utc>, // Changing to state PendingAt
813    component_input_digest: ComponentDigest,
814) -> Result<AppendNotifier, DbErrorWrite> {
815    debug!("Setting t_state to Pending(`{scheduled_at:?}`) after response appended");
816
817    // Convert types for Postgres arguments
818    let execution_id_str = execution_id.to_string();
819
820    let updated = tx
821        .execute(
822            r"
823            UPDATE t_state
824            SET
825                pending_expires_finished = $1,
826                state = $2,
827                updated_at = CURRENT_TIMESTAMP,
828
829                max_retries = NULL,
830                retry_exp_backoff_millis = NULL,
831                last_lock_version = NULL,
832
833                join_set_id = NULL,
834                join_set_closing = NULL,
835
836                result_kind = NULL
837            WHERE execution_id = $3
838            ",
839            &[
840                &scheduled_at,     // $1
841                &STATE_PENDING_AT, // $2
842                &execution_id_str, // $3
843            ],
844        )
845        .await?;
846
847    if updated == 0 {
848        return Err(DbErrorWrite::NotFound);
849    }
850
851    Ok(AppendNotifier {
852        pending_at: Some(NotifierPendingAt {
853            scheduled_at,
854            ffqn: fetch_created_event(tx, execution_id).await?.ffqn,
855            component_input_digest,
856        }),
857        execution_finished: None,
858        response: None,
859    })
860}
861
862#[instrument(level = Level::DEBUG, skip_all, fields(%execution_id, %scheduled_at, %appending_version))]
863async fn update_state_pending_after_event_appended(
864    tx: &Transaction<'_>,
865    execution_id: &ExecutionId,
866    appending_version: &Version,
867    scheduled_at: DateTime<Utc>,
868    intermittent_failure: bool,
869    component_input_digest: ComponentDigest,
870) -> Result<(AppendResponse, AppendNotifier), DbErrorWrite> {
871    debug!("Setting t_state to Pending(`{scheduled_at:?}`) after event appended");
872
873    let intermittent_delta = i64::from(intermittent_failure); // 0 or 1
874
875    let updated = tx
876        .execute(
877            r"
878            UPDATE t_state
879            SET
880                corresponding_version = $1,
881                pending_expires_finished = $2,
882                state = $3,
883                updated_at = CURRENT_TIMESTAMP,
884                intermittent_event_count = intermittent_event_count + $4,
885
886                max_retries = NULL,
887                retry_exp_backoff_millis = NULL,
888                last_lock_version = NULL,
889
890                join_set_id = NULL,
891                join_set_closing = NULL,
892
893                result_kind = NULL
894            WHERE execution_id = $5;
895            ",
896            &[
897                &i64::from(appending_version.0), // $1
898                &scheduled_at,                   // $2
899                &STATE_PENDING_AT,               // $3
900                &intermittent_delta,             // $4
901                &execution_id.to_string(),       // $5
902            ],
903        )
904        .await?;
905
906    if updated != 1 {
907        return Err(DbErrorWrite::NotFound);
908    }
909
910    Ok((
911        appending_version.increment(),
912        AppendNotifier {
913            pending_at: Some(NotifierPendingAt {
914                scheduled_at,
915                ffqn: fetch_created_event(tx, execution_id).await?.ffqn,
916                component_input_digest,
917            }),
918            execution_finished: None,
919            response: None,
920        },
921    ))
922}
923
924#[instrument(level = Level::DEBUG, skip_all, fields(%execution_id, %appending_version))]
925#[expect(clippy::too_many_arguments)]
926async fn update_state_locked_get_intermittent_event_count(
927    tx: &Transaction<'_>,
928    execution_id: &ExecutionId,
929    deployment_id: DeploymentId,
930    component_digest: &ComponentDigest,
931    executor_id: ExecutorId,
932    run_id: RunId,
933    lock_expires_at: DateTime<Utc>,
934    appending_version: &Version,
935    retry_config: ComponentRetryConfig,
936) -> Result<u32, DbErrorWrite> {
937    debug!("Setting t_state to Locked(`{lock_expires_at:?}`)");
938    let backoff_millis =
939        i64::try_from(retry_config.retry_exp_backoff.as_millis()).map_err(|err| {
940            // BIGINT = i64
941            DbErrorGeneric::Uncategorized {
942                reason: "backoff too big".into(),
943                context: SpanTrace::capture(),
944                source: Some(Arc::new(err)),
945                loc: Location::caller(),
946            }
947        })?;
948
949    let execution_id_str = execution_id.to_string();
950
951    let updated = tx
952        .execute(
953            r"
954            UPDATE t_state
955            SET
956                corresponding_version = $1,
957                pending_expires_finished = $2,
958                state = $3,
959                updated_at = CURRENT_TIMESTAMP,
960                deployment_id = $4,
961                component_id_input_digest = $5,
962
963                max_retries = $6,
964                retry_exp_backoff_millis = $7,
965                last_lock_version = $1,
966                executor_id = $8,
967                run_id = $9,
968
969                join_set_id = NULL,
970                join_set_closing = NULL,
971
972                result_kind = NULL
973            WHERE execution_id = $10
974            AND is_paused = false
975            ",
976            &[
977                &i64::from(appending_version.0),
978                &lock_expires_at,
979                &STATE_LOCKED,
980                &deployment_id.to_string(),
981                &component_digest,
982                &retry_config.max_retries.map(i64::from),
983                &backoff_millis,
984                &executor_id.to_string(),
985                &run_id.to_string(),
986                &execution_id_str,
987            ],
988        )
989        .await?;
990
991    if updated != 1 {
992        return Err(DbErrorWrite::NotFound);
993    }
994
995    // fetch intermittent event count
996    let row = tx
997        .query_one(
998            "SELECT intermittent_event_count FROM t_state WHERE execution_id = $1",
999            &[&execution_id_str],
1000        )
1001        .await
1002        .map_err(DbErrorGeneric::from)?;
1003
1004    let count: i64 = get(&row, "intermittent_event_count")?; // Postgres BIGINT
1005    let count = u32::try_from(count)
1006        .map_err(|_| consistency_db_err("`intermittent_event_count` must not be negative"))?;
1007    Ok(count)
1008}
1009
1010#[instrument(level = Level::DEBUG, skip_all, fields(%execution_id, %appending_version))]
1011async fn update_state_blocked(
1012    tx: &Transaction<'_>,
1013    execution_id: &ExecutionId,
1014    appending_version: &Version,
1015    join_set_id: &JoinSetId,
1016    lock_expires_at: DateTime<Utc>,
1017    join_set_closing: bool,
1018) -> Result<AppendResponse, DbErrorWrite> {
1019    debug!("Setting t_state to BlockedByJoinSet(`{join_set_id}`)");
1020
1021    let updated = tx
1022        .execute(
1023            r"
1024            UPDATE t_state
1025            SET
1026                corresponding_version = $1,
1027                pending_expires_finished = $2,
1028                state = $3,
1029                updated_at = CURRENT_TIMESTAMP,
1030
1031                max_retries = NULL,
1032                retry_exp_backoff_millis = NULL,
1033                last_lock_version = NULL,
1034
1035                join_set_id = $4,
1036                join_set_closing = $5,
1037
1038                result_kind = NULL
1039            WHERE execution_id = $6
1040            ",
1041            &[
1042                &i64::from(appending_version.0), // $1
1043                &lock_expires_at,                // $2
1044                &STATE_BLOCKED_BY_JOIN_SET,      // $3
1045                &join_set_id.to_string(),        // $4
1046                &join_set_closing,               // $5 (BOOLEAN)
1047                &execution_id.to_string(),       // $6
1048            ],
1049        )
1050        .await?;
1051
1052    if updated != 1 {
1053        return Err(DbErrorWrite::NotFound);
1054    }
1055    Ok(appending_version.increment())
1056}
1057
1058#[instrument(level = Level::DEBUG, skip_all, fields(%execution_id, %appending_version))]
1059async fn update_state_finished(
1060    tx: &Transaction<'_>,
1061    execution_id: &ExecutionId,
1062    appending_version: &Version,
1063    finished_at: DateTime<Utc>,
1064    result_kind: PendingStateFinishedResultKind,
1065) -> Result<(), DbErrorWrite> {
1066    debug!("Setting t_state to Finished");
1067
1068    let result_kind_json = Json(result_kind);
1069
1070    let updated = tx
1071        .execute(
1072            r"
1073            UPDATE t_state
1074            SET
1075                corresponding_version = $1,
1076                pending_expires_finished = $2,
1077                state = $3,
1078                updated_at = CURRENT_TIMESTAMP,
1079
1080                max_retries = NULL,
1081                retry_exp_backoff_millis = NULL,
1082                last_lock_version = NULL,
1083                executor_id = NULL,
1084                run_id = NULL,
1085
1086                join_set_id = NULL,
1087                join_set_closing = NULL,
1088
1089                result_kind = $4
1090            WHERE execution_id = $5
1091            ",
1092            &[
1093                &i64::from(appending_version.0), // $1
1094                &finished_at,                    // $2
1095                &STATE_FINISHED,                 // $3
1096                &result_kind_json,               // $4
1097                &execution_id.to_string(),       // $5
1098            ],
1099        )
1100        .await?;
1101
1102    if updated != 1 {
1103        return Err(DbErrorWrite::NotFound);
1104    }
1105    Ok(())
1106}
1107
1108#[instrument(level = Level::DEBUG, skip_all, fields(%execution_id, %appending_version, %is_paused))]
1109async fn update_state_paused(
1110    tx: &Transaction<'_>,
1111    execution_id: &ExecutionId,
1112    appending_version: &Version,
1113    is_paused: bool,
1114) -> Result<AppendResponse, DbErrorWrite> {
1115    debug!(
1116        "Setting t_state to {}",
1117        if is_paused { "paused" } else { "unpaused" }
1118    );
1119
1120    let updated = tx
1121        .execute(
1122            r"
1123            UPDATE t_state
1124            SET
1125                corresponding_version = $1,
1126                is_paused = $2,
1127                updated_at = CURRENT_TIMESTAMP
1128            WHERE execution_id = $3
1129            ",
1130            &[
1131                &i64::from(appending_version.0), // $1
1132                &is_paused,                      // $2
1133                &execution_id.to_string(),       // $3
1134            ],
1135        )
1136        .await?;
1137
1138    if updated != 1 {
1139        return Err(DbErrorWrite::NotFound);
1140    }
1141    Ok(appending_version.increment())
1142}
1143
1144#[instrument(level = Level::DEBUG, skip_all, fields(%execution_id, %appending_version))]
1145async fn bump_state_next_version(
1146    tx: &Transaction<'_>,
1147    execution_id: &ExecutionId,
1148    appending_version: &Version,
1149    delay_req: Option<DelayReq>,
1150) -> Result<AppendResponse, DbErrorWrite> {
1151    debug!("update_index_version");
1152    let execution_id_str = execution_id.to_string();
1153
1154    let updated = tx
1155        .execute(
1156            r"
1157            UPDATE t_state
1158            SET
1159                corresponding_version = $1,
1160                updated_at = CURRENT_TIMESTAMP
1161            WHERE execution_id = $2
1162            ",
1163            &[
1164                &i64::from(appending_version.0), // $1
1165                &execution_id_str,               // $2
1166            ],
1167        )
1168        .await?;
1169
1170    if updated != 1 {
1171        return Err(DbErrorWrite::NotFound);
1172    }
1173
1174    if let Some(DelayReq {
1175        join_set_id,
1176        delay_id,
1177        expires_at,
1178    }) = delay_req
1179    {
1180        debug!("Inserting delay to `t_delay`");
1181        tx.execute(
1182            "INSERT INTO t_delay (execution_id, join_set_id, delay_id, expires_at) VALUES ($1, $2, $3, $4)",
1183            &[
1184                &execution_id_str,
1185                &join_set_id.to_string(),
1186                &delay_id.to_string(),
1187                &expires_at,
1188            ],
1189        )
1190        .await?;
1191    }
1192    Ok(appending_version.increment())
1193}
1194
1195async fn get_combined_state(
1196    tx: &Transaction<'_>,
1197    execution_id: &ExecutionId,
1198) -> Result<CombinedState, DbErrorRead> {
1199    let row = tx
1200        .query_one(
1201            r"
1202            SELECT
1203                created_at, first_scheduled_at,
1204                state, ffqn, component_id_input_digest, component_type, deployment_id, corresponding_version, pending_expires_finished,
1205                last_lock_version, executor_id, run_id,
1206                join_set_id, join_set_closing,
1207                result_kind, is_paused
1208            FROM t_state
1209            WHERE execution_id = $1
1210            ",
1211            &[&execution_id.to_string()],
1212        )
1213        .await
1214        .map_err(DbErrorRead::from)?;
1215
1216    // Parsing columns
1217
1218    let created_at: DateTime<Utc> = get(&row, "created_at")?;
1219    let first_scheduled_at: DateTime<Utc> = get(&row, "first_scheduled_at")?;
1220
1221    let digest_bytes: Vec<u8> = get(&row, "component_id_input_digest")?;
1222    let digest = Digest::try_from(digest_bytes.as_slice()).map_err(|err| {
1223        consistency_db_err_src("cannot parse `component_id_input_digest`", Arc::from(err))
1224    })?;
1225    let component_digest = ComponentDigest(digest);
1226
1227    let component_type: String = get(&row, "component_type")?;
1228    let component_type = ComponentType::from_str(&component_type)
1229        .map_err(|err| consistency_db_err_src("cannot parse `component_type`", Arc::from(err)))?;
1230
1231    let deployment_id: String = get(&row, "deployment_id")?;
1232    let deployment_id = DeploymentId::from_str(&deployment_id).map_err(DbErrorGeneric::from)?;
1233
1234    let state: String = get(&row, "state")?;
1235    let ffqn: String = get(&row, "ffqn")?;
1236    let ffqn = FunctionFqn::from_str(&ffqn).map_err(|parse_err| {
1237        consistency_db_err(format!("invalid ffqn value in `t_state` - {parse_err}"))
1238    })?;
1239
1240    let pending_expires_finished: DateTime<Utc> = get(&row, "pending_expires_finished")?;
1241
1242    let last_lock_version_raw: Option<i64> = get(&row, "last_lock_version")?;
1243    let last_lock_version = last_lock_version_raw
1244        .map(Version::try_from)
1245        .transpose()
1246        .map_err(|_| consistency_db_err("version must be non-negative"))?;
1247
1248    let executor_id_raw: Option<String> = get(&row, "executor_id")?;
1249    let executor_id = executor_id_raw
1250        .map(|id| ExecutorId::from_str(&id))
1251        .transpose()
1252        .map_err(DbErrorGeneric::from)?;
1253
1254    let run_id_raw: Option<String> = get(&row, "run_id")?;
1255    let run_id = run_id_raw
1256        .map(|id| RunId::from_str(&id))
1257        .transpose()
1258        .map_err(DbErrorGeneric::from)?;
1259
1260    let join_set_id_raw: Option<String> = get(&row, "join_set_id")?;
1261    let join_set_id = join_set_id_raw
1262        .map(|id| JoinSetId::from_str(&id))
1263        .transpose()
1264        .map_err(DbErrorGeneric::from)?;
1265
1266    let join_set_closing: Option<bool> = get(&row, "join_set_closing")?;
1267
1268    let result_kind: Option<Json<PendingStateFinishedResultKind>> = get(&row, "result_kind")?;
1269    let result_kind = result_kind.map(|it| it.0);
1270
1271    let is_paused: bool = get(&row, "is_paused")?;
1272
1273    let corresponding_version: i64 = get(&row, "corresponding_version")?;
1274    let corresponding_version = Version::new(
1275        VersionType::try_from(corresponding_version)
1276            .map_err(|_| consistency_db_err("version must be non-negative"))?,
1277    );
1278
1279    let dto = CombinedStateDTO {
1280        execution_id: execution_id.clone(),
1281        created_at,
1282        first_scheduled_at,
1283        state,
1284        ffqn,
1285        component_digest,
1286        component_type,
1287        deployment_id,
1288        pending_expires_finished,
1289        last_lock_version,
1290        executor_id,
1291        run_id,
1292        join_set_id,
1293        join_set_closing,
1294        result_kind,
1295        is_paused,
1296    };
1297    CombinedState::new(dto, corresponding_version).map_err(DbErrorRead::from)
1298}
1299
1300async fn list_executions(
1301    read_tx: &Transaction<'_>,
1302    filter: ListExecutionsFilter,
1303    pagination: &ExecutionListPagination,
1304) -> Result<Vec<ExecutionWithState>, DbErrorGeneric> {
1305    // Helper to manage dynamic WHERE clauses and positional parameters ($1, $2...)
1306    struct QueryBuilder {
1307        where_clauses: Vec<String>,
1308        params: Vec<Box<dyn ToSql + Send + Sync>>,
1309    }
1310
1311    impl QueryBuilder {
1312        fn new() -> Self {
1313            Self {
1314                where_clauses: Vec::new(),
1315                params: Vec::new(),
1316            }
1317        }
1318
1319        fn add_param<T>(&mut self, param: T) -> String
1320        where
1321            T: ToSql + Sync + Send + 'static,
1322        {
1323            self.params.push(Box::new(param));
1324            format!("${}", self.params.len())
1325        }
1326
1327        fn add_where(&mut self, clause: String) {
1328            self.where_clauses.push(clause);
1329        }
1330    }
1331
1332    let mut qb = QueryBuilder::new();
1333
1334    // Pagination Logic
1335    let (limit, limit_desc) = match pagination {
1336        ExecutionListPagination::CreatedBy(p) => {
1337            let limit = p.length();
1338            let is_desc = p.is_desc();
1339            if let Some(cursor) = p.cursor() {
1340                let placeholder = qb.add_param(*cursor);
1341                qb.add_where(format!("created_at {} {placeholder}", p.rel()));
1342            }
1343            (limit, is_desc)
1344        }
1345        ExecutionListPagination::ExecutionId(p) => {
1346            let limit = p.length();
1347            let is_desc = p.is_desc();
1348            if let Some(cursor) = p.cursor() {
1349                let placeholder = qb.add_param(cursor.to_string());
1350                qb.add_where(format!("execution_id {} {placeholder}", p.rel()));
1351            }
1352            (limit, is_desc)
1353        }
1354    };
1355
1356    if !filter.show_derived {
1357        qb.add_where("is_top_level = true".to_string());
1358    }
1359    let like = |str| format!("{str}%");
1360    if let Some(ffqn_prefix) = filter.ffqn_prefix {
1361        let placeholder = qb.add_param(like(ffqn_prefix));
1362        qb.add_where(format!("ffqn LIKE {placeholder}"));
1363    }
1364    if filter.hide_finished {
1365        qb.add_where(format!("state != '{STATE_FINISHED}'"));
1366    }
1367    if let Some(prefix) = filter.execution_id_prefix {
1368        let placeholder = qb.add_param(like(prefix));
1369        qb.add_where(format!("execution_id LIKE {placeholder}"));
1370    }
1371    if let Some(component_digest) = filter.component_digest {
1372        let placeholder = qb.add_param(component_digest);
1373        qb.add_where(format!("component_id_input_digest = {placeholder}"));
1374    }
1375    if let Some(deployment_id) = filter.deployment_id {
1376        let placeholder = qb.add_param(deployment_id.to_string());
1377        qb.add_where(format!("deployment_id = {placeholder}"));
1378    }
1379
1380    let where_str = if qb.where_clauses.is_empty() {
1381        String::new()
1382    } else {
1383        format!("WHERE {}", qb.where_clauses.join(" AND "))
1384    };
1385
1386    let order_col = match pagination {
1387        ExecutionListPagination::CreatedBy(_) => "created_at",
1388        ExecutionListPagination::ExecutionId(_) => "execution_id",
1389    };
1390
1391    // Inner query: fetch rows with cursor-based ordering
1392    // Outer query: always return results in descending order
1393    let (inner_order, outer_order) = if limit_desc {
1394        ("DESC", "")
1395    } else {
1396        ("", "DESC")
1397    };
1398
1399    let inner_sql = format!(
1400        r"SELECT created_at, first_scheduled_at, component_id_input_digest, deployment_id,
1401            state, execution_id, ffqn, corresponding_version, pending_expires_finished,
1402            last_lock_version, executor_id, run_id,
1403            join_set_id, join_set_closing,
1404            result_kind, is_paused
1405            FROM t_state {where_str} ORDER BY {order_col} {inner_order} LIMIT {limit}"
1406    );
1407
1408    let sql = if outer_order.is_empty() {
1409        inner_sql
1410    } else {
1411        format!("SELECT * FROM ({inner_sql}) AS sub ORDER BY {order_col} {outer_order}")
1412    };
1413
1414    let params_refs: Vec<&(dyn ToSql + Sync)> = qb
1415        .params
1416        .iter()
1417        .map(|p| p.as_ref() as &(dyn ToSql + Sync))
1418        .collect();
1419
1420    let rows = read_tx.query(&sql, &params_refs).await?;
1421
1422    let mut vec = Vec::with_capacity(rows.len());
1423
1424    for row in rows {
1425        // If parsing of the row fails, log and skip it.
1426        let unpack = || -> Result<ExecutionWithState, DbErrorGeneric> {
1427            let execution_id_str: String = get(&row, "execution_id")?;
1428            let execution_id = ExecutionId::from_str(&execution_id_str)
1429                .map_err(|err| consistency_db_err(err.to_string()))?;
1430
1431            let digest_bytes: Vec<u8> = get(&row, "component_id_input_digest")?;
1432            let digest = Digest::try_from(digest_bytes.as_slice()).map_err(|err| {
1433                consistency_db_err_src("cannot parse `component_id_input_digest`", Arc::from(err))
1434            })?;
1435            let component_digest = ComponentDigest(digest);
1436
1437            let component_type: String = get(&row, "component_type")?;
1438            let component_type = ComponentType::from_str(&component_type).map_err(|err| {
1439                consistency_db_err_src("cannot parse `component_type`", Arc::from(err))
1440            })?;
1441
1442            let deployment_id: String = get(&row, "deployment_id")?;
1443            let deployment_id =
1444                DeploymentId::from_str(&deployment_id).map_err(DbErrorGeneric::from)?;
1445
1446            let created_at: DateTime<Utc> = get(&row, "created_at")?;
1447            let first_scheduled_at: DateTime<Utc> = get(&row, "first_scheduled_at")?;
1448
1449            let result_kind: Option<Json<PendingStateFinishedResultKind>> =
1450                get(&row, "result_kind")?;
1451            let result_kind = result_kind.map(|it| it.0);
1452
1453            let is_paused: bool = get(&row, "is_paused")?;
1454
1455            let corresponding_version: i64 = get(&row, "corresponding_version")?;
1456            let corresponding_version = Version::try_from(corresponding_version)
1457                .map_err(|_| consistency_db_err("version must be non-negative"))?;
1458
1459            let executor_id_str: Option<String> = get(&row, "executor_id")?;
1460            let executor_id = executor_id_str
1461                .map(|id| ExecutorId::from_str(&id))
1462                .transpose()?;
1463
1464            let last_lock_version_raw: Option<i64> = get(&row, "last_lock_version")?;
1465            let last_lock_version = last_lock_version_raw
1466                .map(Version::try_from)
1467                .transpose()
1468                .map_err(|_| consistency_db_err("version must be non-negative"))?;
1469
1470            let run_id_str: Option<String> = get(&row, "run_id")?;
1471            let run_id = run_id_str.map(|id| RunId::from_str(&id)).transpose()?;
1472
1473            let join_set_id_str: Option<String> = get(&row, "join_set_id")?;
1474            let join_set_id = join_set_id_str
1475                .map(|id| JoinSetId::from_str(&id))
1476                .transpose()?;
1477
1478            let ffqn: String = get(&row, "ffqn")?;
1479            let ffqn = FunctionFqn::from_str(&ffqn).map_err(|parse_err| {
1480                error!("Error parsing ffqn - {parse_err:?}");
1481                consistency_db_err("invalid ffqn value in `t_state`")
1482            })?;
1483
1484            let combined_state_dto = CombinedStateDTO {
1485                execution_id,
1486                created_at,
1487                first_scheduled_at,
1488                component_digest,
1489                component_type,
1490                deployment_id,
1491                state: get(&row, "state")?,
1492                ffqn,
1493                pending_expires_finished: get(&row, "pending_expires_finished")?,
1494                executor_id,
1495                last_lock_version,
1496                run_id,
1497                join_set_id,
1498                join_set_closing: get(&row, "join_set_closing")?,
1499                result_kind,
1500                is_paused,
1501            };
1502
1503            let combined_state = CombinedState::new(combined_state_dto, corresponding_version)?;
1504
1505            Ok(combined_state.execution_with_state)
1506        };
1507
1508        match unpack() {
1509            Ok(execution) => vec.push(execution),
1510            Err(err) => {
1511                warn!("Skipping corrupted row in t_state: {err:?}");
1512            }
1513        }
1514    }
1515
1516    Ok(vec)
1517}
1518
1519async fn list_responses(
1520    tx: &Transaction<'_>,
1521    execution_id: &ExecutionId,
1522    pagination: Option<Pagination<u32>>,
1523) -> Result<Vec<ResponseWithCursor>, DbErrorRead> {
1524    // Helper to manage params dynamically
1525    let mut params: Vec<Box<dyn ToSql + Send + Sync>> = Vec::new();
1526    let mut add_param = |p: Box<dyn ToSql + Send + Sync>| {
1527        params.push(p);
1528        format!("${}", params.len())
1529    };
1530
1531    // 1. Base Query
1532    let p_execution_id = add_param(Box::new(execution_id.to_string()));
1533
1534    let mut sql = format!(
1535        "SELECT \
1536            r.id, r.created_at, r.join_set_id, r.delay_id, r.delay_success, r.child_execution_id, r.finished_version, l.json_value \
1537            FROM t_join_set_response r LEFT OUTER JOIN t_execution_log l ON r.child_execution_id = l.execution_id \
1538            WHERE \
1539            r.execution_id = {p_execution_id} \
1540            AND ( r.finished_version = l.version OR r.child_execution_id IS NULL )"
1541    );
1542
1543    // 2. Pagination Logic
1544    let limit = match &pagination {
1545        Some(p @ (Pagination::NewerThan { cursor, .. } | Pagination::OlderThan { cursor, .. })) => {
1546            // Postgres BIGINT is i64.
1547            let p_cursor = add_param(Box::new(i64::from(*cursor)));
1548
1549            // Add WHERE clause for cursor
1550            write!(sql, " AND r.id {} {}", p.rel(), p_cursor).unwrap();
1551
1552            Some(p.length())
1553        }
1554        None => None,
1555    };
1556
1557    // 3. Ordering
1558    sql.push_str(" ORDER BY r.id");
1559    let is_desc = pagination.as_ref().is_some_and(Pagination::is_desc);
1560    if is_desc {
1561        sql.push_str(" DESC");
1562    }
1563
1564    // 4. Limit
1565    if let Some(limit) = limit {
1566        // Postgres limit expects i64
1567        let p_limit = add_param(Box::new(i64::from(limit)));
1568        write!(sql, " LIMIT {p_limit}").unwrap();
1569    }
1570
1571    // Re-order to ascending for consistent oldest-to-newest results
1572    if is_desc {
1573        sql = format!("SELECT * FROM ({sql}) AS sub ORDER BY id ASC");
1574    }
1575
1576    let params_refs: Vec<&(dyn ToSql + Sync)> = params
1577        .iter()
1578        .map(|p| p.as_ref() as &(dyn ToSql + Sync))
1579        .collect();
1580
1581    let rows = tx
1582        .query(&sql, &params_refs)
1583        .await
1584        .map_err(DbErrorRead::from)?;
1585
1586    let mut results = Vec::with_capacity(rows.len());
1587    for row in rows {
1588        results.push(parse_response_with_cursor(&row)?);
1589    }
1590
1591    Ok(results)
1592}
1593
1594async fn list_logs_tx(
1595    tx: &Transaction<'_>,
1596    execution_id: &ExecutionId,
1597    filter: &LogFilter,
1598    pagination: &Pagination<u32>,
1599) -> Result<ListLogsResponse, DbErrorRead> {
1600    let mut param_index = 1;
1601    let mut query = format!(
1602        "SELECT id, run_id, created_at, level, message, stream_type, payload
1603         FROM t_log
1604         WHERE execution_id = ${param_index}",
1605    );
1606    param_index += 1;
1607
1608    let execution_id = execution_id.to_string();
1609    let mut params: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = vec![&execution_id];
1610
1611    // Logs and streams filter
1612    let level_filter = if filter.should_show_logs() {
1613        let levels_str = if !filter.levels().is_empty() {
1614            filter
1615                .levels()
1616                .iter()
1617                .map(|lvl| (*lvl as u8).to_string())
1618                .collect::<Vec<_>>()
1619                .join(",")
1620        } else {
1621            LogLevel::iter()
1622                .map(|lvl| (lvl as u8).to_string())
1623                .collect::<Vec<_>>()
1624                .join(",")
1625        };
1626        Some(format!(" level IN ({levels_str})"))
1627    } else {
1628        None
1629    };
1630    let stream_filter = if filter.should_show_streams() {
1631        let streams_str = if !filter.stream_types().is_empty() {
1632            filter
1633                .stream_types()
1634                .iter()
1635                .map(|st| (*st as u8).to_string())
1636                .collect::<Vec<_>>()
1637                .join(",")
1638        } else {
1639            LogStreamType::iter()
1640                .map(|st| (st as u8).to_string())
1641                .collect::<Vec<_>>()
1642                .join(",")
1643        };
1644        Some(format!(" stream_type IN ({streams_str})"))
1645    } else {
1646        None
1647    };
1648    match (level_filter, stream_filter) {
1649        (Some(level_filter), Some(stream_filter)) => {
1650            write!(&mut query, " AND ({level_filter} OR {stream_filter})")
1651                .expect("writing to string");
1652        }
1653        (Some(level_filter), None) => {
1654            write!(&mut query, " AND {level_filter}").expect("writing to string");
1655        }
1656        (None, Some(stream_filter)) => {
1657            write!(&mut query, " AND {stream_filter}").expect("writing to string");
1658        }
1659        (None, None) => unreachable!("guarded by constructor"),
1660    }
1661
1662    // Pagination
1663    write!(&mut query, " AND id {} ${param_index}", pagination.rel()).expect("writing to string");
1664    let cursor_val: i64 = (*pagination.cursor()).into();
1665    params.push(&cursor_val);
1666    param_index += 1;
1667
1668    // Ordering and limit
1669    write!(
1670        &mut query,
1671        " ORDER BY id {} LIMIT ${}",
1672        if pagination.is_desc() { "DESC" } else { "ASC" },
1673        param_index
1674    )
1675    .expect("writing to string");
1676    let length_val: i64 = i64::from(pagination.length());
1677    params.push(&length_val);
1678
1679    let rows = tx.query(&query, &params[..]).await?;
1680
1681    let mut items = Vec::with_capacity(rows.len());
1682
1683    for row in rows {
1684        let cursor = u32::try_from(get::<i64, _>(&row, "id")?)
1685            .map_err(|_| consistency_db_err("t_join_set_response.id must not be negative"))?;
1686        let created_at: chrono::DateTime<chrono::Utc> = get(&row, "created_at")?;
1687        let run_id: String = get(&row, "run_id")?;
1688        let run_id = RunId::from_str(&run_id).map_err(|parse_err| {
1689            consistency_db_err_src(
1690                format!("cannot convert RunId {run_id}, id: {cursor}"),
1691                Arc::from(parse_err),
1692            )
1693        })?;
1694
1695        let level: Option<i32> = get(&row, "level")?;
1696        let message: Option<String> = get(&row, "message")?;
1697        let stream_type: Option<i32> = get(&row, "stream_type")?;
1698        let payload: Option<Vec<u8>> = get(&row, "payload")?;
1699
1700        let log_entry = match (level, message, stream_type, payload) {
1701            (Some(lvl), Some(msg), None, None) => {
1702                let map_err = |err| {
1703                    consistency_db_err_src(
1704                        format!("cannot convert {lvl} to LogLevel , id: {cursor}"),
1705                        err,
1706                    )
1707                };
1708                LogEntry::Log {
1709                    created_at,
1710                    level: u8::try_from(lvl)
1711                        .map(|lvl| LogLevel::try_from(lvl).map_err(|err| map_err(Arc::from(err))))
1712                        .map_err(|err| map_err(Arc::from(err)))??,
1713                    message: msg,
1714                }
1715            }
1716            (None, None, Some(stype), Some(pl)) => {
1717                let map_err = |err| {
1718                    consistency_db_err_src(
1719                        format!("cannot convert {stype} to LogStreamType , id: {cursor}"),
1720                        err,
1721                    )
1722                };
1723                LogEntry::Stream {
1724                    created_at,
1725                    stream_type: u8::try_from(stype)
1726                        .map(|stype| {
1727                            LogStreamType::try_from(stype).map_err(|err| map_err(Arc::from(err)))
1728                        })
1729                        .map_err(|err| map_err(Arc::from(err)))??,
1730                    payload: pl,
1731                }
1732            }
1733            _ => {
1734                return Err(consistency_db_err(format!("invalid t_log row id:{cursor}")).into());
1735            }
1736        };
1737
1738        items.push(LogEntryRow {
1739            cursor,
1740            run_id,
1741            log_entry,
1742        });
1743    }
1744
1745    Ok(ListLogsResponse {
1746        next_page: items
1747            .last()
1748            .map(|item| Pagination::NewerThan {
1749                length: pagination.length(),
1750                cursor: item.cursor,
1751                including_cursor: false,
1752            })
1753            .unwrap_or(if pagination.is_asc() {
1754                *pagination // no new results, keep the same cursor
1755            } else {
1756                // no prev results, let's start from beginning
1757                Pagination::NewerThan {
1758                    length: pagination.length(),
1759                    cursor: 0,
1760                    including_cursor: false, // does not matter, no row has id = 0
1761                }
1762            }),
1763        prev_page: match items.first() {
1764            Some(item) => Some(Pagination::OlderThan {
1765                length: pagination.length(),
1766                cursor: item.cursor,
1767                including_cursor: false,
1768            }),
1769            None if pagination.is_asc() && *pagination.cursor() > 0 => {
1770                // asked for a next page that does not exists (yet).
1771                Some(pagination.invert())
1772            }
1773            None => None,
1774        },
1775        items,
1776    })
1777}
1778
1779async fn list_deployment_states(
1780    tx: &Transaction<'_>,
1781    current_time: DateTime<Utc>,
1782    pagination: Pagination<Option<DeploymentId>>,
1783    include_config_json: bool,
1784) -> Result<Vec<DeploymentState>, DbErrorRead> {
1785    // Helper for numbered params ($1, $2, ...)
1786    let mut params: Vec<Box<dyn ToSql + Send + Sync>> = Vec::new();
1787    let mut add_param = |p: Box<dyn ToSql + Send + Sync>| {
1788        params.push(p);
1789        format!("${}", params.len())
1790    };
1791
1792    // Base params
1793    let p_now = add_param(Box::new(current_time));
1794
1795    let config_json_col = if include_config_json {
1796        "d.config_json"
1797    } else {
1798        "NULL::TEXT AS config_json"
1799    };
1800
1801    let mut sql = format!(
1802        "
1803        SELECT
1804            d.deployment_id,
1805
1806            COUNT(*) FILTER (WHERE s.state = '{STATE_LOCKED}') AS locked,
1807
1808            COUNT(*) FILTER (
1809                WHERE s.state = '{STATE_PENDING_AT}'
1810                  AND s.pending_expires_finished <= {p_now}
1811            ) AS pending,
1812
1813            COUNT(*) FILTER (
1814                WHERE s.state = '{STATE_PENDING_AT}'
1815                  AND s.pending_expires_finished > {p_now}
1816            ) AS scheduled,
1817
1818            COUNT(*) FILTER (WHERE s.state = '{STATE_BLOCKED_BY_JOIN_SET}') AS blocked,
1819
1820            COUNT(*) FILTER (WHERE s.state = '{STATE_FINISHED}') AS finished,
1821
1822            {config_json_col},
1823            d.created_at,
1824            d.last_active_at,
1825            d.status
1826        FROM t_deployment d
1827        LEFT JOIN t_state s ON s.deployment_id = d.deployment_id"
1828    );
1829
1830    // Pagination
1831    if let Some(cursor) = pagination.cursor() {
1832        let p_cursor = add_param(Box::new(cursor.to_string()));
1833        write!(
1834            sql,
1835            " WHERE d.deployment_id {rel} {p_cursor}",
1836            rel = pagination.rel()
1837        )
1838        .expect("writing to string");
1839    }
1840
1841    // Grouping + ordering
1842    // Inner query: fetch rows with cursor-based ordering
1843    // Outer query: always return results in descending order
1844    let (inner_order, outer_order) = if pagination.is_desc() {
1845        ("DESC", "")
1846    } else {
1847        ("ASC", "DESC")
1848    };
1849
1850    write!(
1851        sql,
1852        " GROUP BY d.deployment_id, d.config_json, d.created_at, d.last_active_at, d.status ORDER BY d.deployment_id {inner_order} LIMIT {}",
1853        pagination.length()
1854    )
1855    .expect("writing to string");
1856
1857    let final_sql = if outer_order.is_empty() {
1858        sql
1859    } else {
1860        format!("SELECT * FROM ({sql}) AS sub ORDER BY deployment_id {outer_order}")
1861    };
1862
1863    let params_refs: Vec<&(dyn ToSql + Sync)> = params
1864        .iter()
1865        .map(|p| p.as_ref() as &(dyn ToSql + Sync))
1866        .collect();
1867
1868    let rows = tx
1869        .query(&final_sql, &params_refs)
1870        .await
1871        .map_err(DbErrorRead::from)?;
1872
1873    let mut result = Vec::with_capacity(rows.len());
1874    for row in rows {
1875        let deployment_id: String = get(&row, "deployment_id")?;
1876        let status_str: String = get::<String, _>(&row, "status")?;
1877        let status = status_str
1878            .parse::<DeploymentStatus>()
1879            .map_err(|e| consistency_db_err(format!("unknown deployment status: {e}")))?;
1880        result.push(DeploymentState {
1881            deployment_id: DeploymentId::from_str(&deployment_id).map_err(DbErrorGeneric::from)?,
1882            locked: u32::try_from(get::<i64, _>(&row, "locked")?).expect("count is never negative"),
1883            pending: u32::try_from(get::<i64, _>(&row, "pending")?)
1884                .expect("count is never negative"),
1885            scheduled: u32::try_from(get::<i64, _>(&row, "scheduled")?)
1886                .expect("count is never negative"),
1887            blocked: u32::try_from(get::<i64, _>(&row, "blocked")?)
1888                .expect("count is never negative"),
1889            finished: u32::try_from(get::<i64, _>(&row, "finished")?)
1890                .expect("count is never negative"),
1891            config_json: get::<Option<String>, _>(&row, "config_json")?,
1892            created_at: get::<DateTime<Utc>, _>(&row, "created_at")?,
1893            last_active_at: get::<Option<DateTime<Utc>>, _>(&row, "last_active_at")?,
1894            status,
1895        });
1896    }
1897
1898    Ok(result)
1899}
1900
1901fn parse_response_with_cursor(
1902    row: &tokio_postgres::Row,
1903) -> Result<ResponseWithCursor, DbErrorRead> {
1904    // Postgres BIGINT = i64.
1905    let id = u32::try_from(get::<i64, _>(row, "id")?)
1906        .map_err(|_| consistency_db_err("id must not be negative"))?;
1907
1908    let created_at: DateTime<Utc> = get(row, "created_at")?;
1909    let join_set_id_str: String = get(row, "join_set_id")?;
1910    let join_set_id = JoinSetId::from_str(&join_set_id_str).map_err(DbErrorGeneric::from)?;
1911
1912    // Extract Optionals
1913    let delay_id: Option<String> = get(row, "delay_id")?;
1914    let delay_id = delay_id
1915        .map(|id| DelayId::from_str(&id))
1916        .transpose()
1917        .map_err(DbErrorGeneric::from)?;
1918    let delay_success: Option<bool> = get(row, "delay_success")?;
1919    let child_execution_id: Option<String> = get(row, "child_execution_id")?;
1920    let child_execution_id = child_execution_id
1921        .map(|id| ExecutionIdDerived::from_str(&id))
1922        .transpose()
1923        .map_err(DbErrorGeneric::from)?;
1924    let finished_version = get::<Option<i64>, _>(row, "finished_version")?
1925        .map(Version::try_from)
1926        .transpose()
1927        .map_err(|_| consistency_db_err("version must be non-negative"))?;
1928    let json_value: Option<Json<ExecutionRequest>> = get(row, "json_value")?;
1929    let json_value = json_value.map(|it| it.0);
1930
1931    let event = match (
1932        delay_id,
1933        delay_success,
1934        child_execution_id,
1935        finished_version,
1936        json_value,
1937    ) {
1938        (Some(delay_id), Some(delay_success), None, None, None) => JoinSetResponse::DelayFinished {
1939            delay_id,
1940            result: delay_success.then_some(()).ok_or(()),
1941        },
1942        (None, None, Some(child_execution_id), Some(finished_version), Some(json_val)) => {
1943            if let ExecutionRequest::Finished { retval: result, .. } = json_val {
1944                JoinSetResponse::ChildExecutionFinished {
1945                    child_execution_id,
1946                    finished_version,
1947                    result,
1948                }
1949            } else {
1950                error!("Joined log entry must be 'Finished'");
1951                return Err(consistency_db_err("joined log entry must be 'Finished'").into());
1952            }
1953        }
1954        (delay, delay_success, child, finished, result) => {
1955            error!(
1956                "Invalid row in t_join_set_response {id} - {delay:?} {delay_success:?} {child:?} {finished:?} {result:?}",
1957            );
1958            return Err(consistency_db_err("invalid row in t_join_set_response").into());
1959        }
1960    };
1961
1962    Ok(ResponseWithCursor {
1963        cursor: ResponseCursor(id),
1964        event: JoinSetResponseEventOuter {
1965            event: JoinSetResponseEvent { join_set_id, event },
1966            created_at,
1967        },
1968    })
1969}
1970
1971#[instrument(level = Level::TRACE, skip_all, fields(%execution_id, %run_id, %executor_id))]
1972#[expect(clippy::too_many_arguments)]
1973async fn lock_single_execution(
1974    tx: &Transaction<'_>,
1975    created_at: DateTime<Utc>,
1976    component_id: &ComponentId,
1977    deployment_id: DeploymentId,
1978    execution_id: &ExecutionId,
1979    run_id: RunId,
1980    appending_version: &Version,
1981    executor_id: ExecutorId,
1982    lock_expires_at: DateTime<Utc>,
1983    retry_config: ComponentRetryConfig,
1984) -> Result<LockedExecution, DbErrorWrite> {
1985    trace!("lock_single_execution");
1986
1987    // Check State
1988    let combined_state = get_combined_state(tx, execution_id).await?;
1989    combined_state
1990        .execution_with_state
1991        .pending_state
1992        .can_append_lock(created_at, executor_id, run_id, lock_expires_at)?;
1993    let expected_version = combined_state.get_next_version_assert_not_finished();
1994    check_expected_next_and_appending_version(&expected_version, appending_version)?;
1995
1996    // Prepare Event
1997    let locked_event = Locked {
1998        component_id: component_id.clone(),
1999        deployment_id,
2000        executor_id,
2001        lock_expires_at,
2002        run_id,
2003        retry_config,
2004    };
2005    let event = ExecutionRequest::Locked(locked_event.clone());
2006
2007    let event = Json(event);
2008
2009    // Append to execution_log
2010    tx.execute(
2011        "INSERT INTO t_execution_log \
2012            (execution_id, created_at, json_value, version, variant) \
2013            VALUES ($1, $2, $3, $4, $5)",
2014        &[
2015            &execution_id.to_string(),
2016            &created_at,
2017            &event,
2018            &i64::from(appending_version.0),
2019            &event.0.variant(),
2020        ],
2021    )
2022    .await
2023    .map_err(|err| {
2024        DbErrorWrite::NonRetriable(DbErrorWriteNonRetriable::IllegalState {
2025            reason: "cannot lock".into(),
2026            context: SpanTrace::capture(),
2027            source: Some(Arc::new(err)),
2028            loc: Location::caller(),
2029        })
2030    })?;
2031
2032    let responses = list_responses(tx, execution_id, None).await?;
2033    trace!("Responses: {responses:?}");
2034
2035    // Update t_state
2036    let intermittent_event_count = update_state_locked_get_intermittent_event_count(
2037        tx,
2038        execution_id,
2039        deployment_id,
2040        &component_id.component_digest,
2041        executor_id,
2042        run_id,
2043        lock_expires_at,
2044        appending_version,
2045        retry_config,
2046    )
2047    .await?;
2048
2049    // Fetch History
2050    // Fetch event_history and `Created` event.
2051    let rows = tx
2052        .query(
2053            "SELECT json_value, version FROM t_execution_log WHERE \
2054                execution_id = $1 AND (variant = $2 OR variant = $3) \
2055                ORDER BY version",
2056            &[
2057                &execution_id.to_string(),
2058                &DUMMY_CREATED.variant(),
2059                &DUMMY_HISTORY_EVENT.variant(),
2060            ],
2061        )
2062        .await
2063        .map_err(DbErrorGeneric::from)?;
2064
2065    let mut events: VecDeque<ExecutionEvent> = VecDeque::new();
2066
2067    for row in rows {
2068        let event: Json<ExecutionRequest> = get(&row, "json_value")?;
2069        let event = event.0;
2070
2071        let version: i64 = get(&row, "version")?;
2072        let version = Version::try_from(version)
2073            .map_err(|_| consistency_db_err("version must be non-negative"))?;
2074
2075        events.push_back(ExecutionEvent {
2076            created_at: DateTime::from_timestamp_nanos(0), // not used, only the inner event and version
2077            event,
2078            backtrace_id: None,
2079            version,
2080        });
2081    }
2082
2083    // Extract Created Event
2084    let Some(ExecutionRequest::Created {
2085        ffqn,
2086        params,
2087        parent,
2088        metadata,
2089        ..
2090    }) = events.pop_front().map(|outer| outer.event)
2091    else {
2092        error!("Execution log must contain at least `Created` event");
2093        return Err(consistency_db_err("execution log must contain `Created` event").into());
2094    };
2095
2096    // Extract History Events
2097    let mut event_history = Vec::new();
2098    for ExecutionEvent { event, version, .. } in events {
2099        if let ExecutionRequest::HistoryEvent { event } = event {
2100            event_history.push((event, version));
2101        } else {
2102            error!("Rows can only contain `Created` and `HistoryEvent` event kinds");
2103            return Err(consistency_db_err(
2104                "rows can only contain `Created` and `HistoryEvent` event kinds",
2105            )
2106            .into());
2107        }
2108    }
2109
2110    Ok(LockedExecution {
2111        execution_id: execution_id.clone(),
2112        metadata,
2113        next_version: appending_version.increment(),
2114        ffqn,
2115        params,
2116        event_history,
2117        responses,
2118        parent,
2119        intermittent_event_count,
2120        locked_event,
2121    })
2122}
2123
2124async fn count_join_next(
2125    tx: &Transaction<'_>,
2126    execution_id: &ExecutionId,
2127    join_set_id: &JoinSetId,
2128) -> Result<u32, DbErrorRead> {
2129    let row = tx
2130            .query_one(
2131                "SELECT COUNT(*) as count FROM t_execution_log WHERE execution_id = $1 AND join_set_id = $2 \
2132                AND history_event_type = $3",
2133                &[
2134                    &execution_id.to_string(),
2135                    &join_set_id.to_string(),
2136                    &HISTORY_EVENT_TYPE_JOIN_NEXT,
2137                ],
2138            )
2139            .await
2140            .map_err(DbErrorRead::from)?;
2141
2142    let count = u32::try_from(get::<i64, _>(&row, "count")?).expect("COUNT cannot be negative");
2143    Ok(count)
2144}
2145
2146async fn nth_response(
2147    tx: &Transaction<'_>,
2148    execution_id: &ExecutionId,
2149    join_set_id: &JoinSetId,
2150    skip_rows: u32,
2151) -> Result<Option<ResponseWithCursor>, DbErrorRead> {
2152    let row = tx
2153            .query_opt(
2154                "SELECT r.id, r.created_at, r.join_set_id, \
2155                 r.delay_id, r.delay_success, \
2156                 r.child_execution_id, r.finished_version, l.json_value \
2157                 FROM t_join_set_response r LEFT OUTER JOIN t_execution_log l ON r.child_execution_id = l.execution_id \
2158                 WHERE \
2159                 r.execution_id = $1 AND r.join_set_id = $2 AND \
2160                 ( \
2161                 r.finished_version = l.version \
2162                 OR \
2163                 r.child_execution_id IS NULL \
2164                 ) \
2165                 ORDER BY id \
2166                 LIMIT 1 OFFSET $3",
2167                 &[
2168                     &execution_id.to_string(),
2169                     &join_set_id.to_string(),
2170                     &i64::from(skip_rows),
2171                 ]
2172            )
2173            .await
2174            .map_err(DbErrorRead::from)?;
2175
2176    match row {
2177        Some(r) => Ok(Some(parse_response_with_cursor(&r)?)),
2178        None => Ok(None),
2179    }
2180}
2181
2182#[instrument(level = Level::TRACE, skip_all, fields(%execution_id))]
2183async fn append(
2184    tx: &Transaction<'_>,
2185    execution_id: &ExecutionId,
2186    req: AppendRequest,
2187    appending_version: Version,
2188) -> Result<(AppendResponse, AppendNotifier), DbErrorWrite> {
2189    if matches!(req.event, ExecutionRequest::Created { .. }) {
2190        return Err(DbErrorWrite::NonRetriable(
2191            DbErrorWriteNonRetriable::ValidationFailed(
2192                "cannot append `Created` event - use `create` instead".into(),
2193            ),
2194        ));
2195    }
2196
2197    if let AppendRequest {
2198        event:
2199            ExecutionRequest::Locked(Locked {
2200                component_id,
2201                deployment_id,
2202                executor_id,
2203                run_id,
2204                lock_expires_at,
2205                retry_config,
2206            }),
2207        created_at,
2208    } = req
2209    {
2210        return lock_single_execution(
2211            tx,
2212            created_at,
2213            &component_id,
2214            deployment_id,
2215            execution_id,
2216            run_id,
2217            &appending_version,
2218            executor_id,
2219            lock_expires_at,
2220            retry_config,
2221        )
2222        .await
2223        .map(|locked_execution| (locked_execution.next_version, AppendNotifier::default()));
2224    }
2225
2226    let combined_state = get_combined_state(tx, execution_id).await?;
2227    if combined_state
2228        .execution_with_state
2229        .pending_state
2230        .is_finished()
2231    {
2232        debug!("Execution is already finished");
2233        return Err(DbErrorWrite::NonRetriable(
2234            DbErrorWriteNonRetriable::AlreadyFinished,
2235        ));
2236    }
2237
2238    check_expected_next_and_appending_version(
2239        &combined_state.get_next_version_assert_not_finished(),
2240        &appending_version,
2241    )?;
2242
2243    let event = Json(req.event);
2244
2245    // Insert into t_execution_log
2246    tx.execute(
2247            "INSERT INTO t_execution_log (execution_id, created_at, json_value, version, variant, join_set_id) \
2248             VALUES ($1, $2, $3, $4, $5, $6)",
2249            &[
2250                &execution_id.to_string(),
2251                &req.created_at,
2252                &event,
2253                &i64::from(appending_version.0),
2254                &event.0.variant(),
2255                &event.0.join_set_id().map(std::string::ToString::to_string),
2256            ],
2257        )
2258        .await?;
2259
2260    // Calculate current pending state
2261    match &event.0 {
2262        ExecutionRequest::Created { .. } => {
2263            unreachable!("handled in the caller")
2264        }
2265
2266        ExecutionRequest::Locked { .. } => {
2267            unreachable!("handled above")
2268        }
2269
2270        ExecutionRequest::TemporarilyFailed {
2271            backoff_expires_at, ..
2272        }
2273        | ExecutionRequest::TemporarilyTimedOut {
2274            backoff_expires_at, ..
2275        } => {
2276            let (next_version, notifier) = update_state_pending_after_event_appended(
2277                tx,
2278                execution_id,
2279                &appending_version,
2280                *backoff_expires_at,
2281                true, // an intermittent failure
2282                combined_state.execution_with_state.component_digest,
2283            )
2284            .await?;
2285            return Ok((next_version, notifier));
2286        }
2287
2288        ExecutionRequest::Unlocked {
2289            backoff_expires_at, ..
2290        } => {
2291            let (next_version, notifier) = update_state_pending_after_event_appended(
2292                tx,
2293                execution_id,
2294                &appending_version,
2295                *backoff_expires_at,
2296                false, // not an intermittent failure
2297                combined_state.execution_with_state.component_digest,
2298            )
2299            .await?;
2300            return Ok((next_version, notifier));
2301        }
2302
2303        ExecutionRequest::Paused => {
2304            match &combined_state.execution_with_state.pending_state {
2305                PendingState::Finished { .. } => {
2306                    unreachable!("handled above");
2307                }
2308                PendingState::Paused(..) => {
2309                    return Err(DbErrorWriteNonRetriable::IllegalState {
2310                        reason: "cannot pause, execution is already paused".into(),
2311                        context: SpanTrace::capture(),
2312                        source: None,
2313                        loc: Location::caller(),
2314                    }
2315                    .into());
2316                }
2317                _ => {}
2318            }
2319            let next_version =
2320                update_state_paused(tx, execution_id, &appending_version, true).await?;
2321            return Ok((next_version, AppendNotifier::default()));
2322        }
2323
2324        ExecutionRequest::Unpaused => {
2325            if !combined_state
2326                .execution_with_state
2327                .pending_state
2328                .is_paused()
2329            {
2330                return Err(DbErrorWriteNonRetriable::IllegalState {
2331                    reason: "cannot unpause, execution is not paused".into(),
2332                    context: SpanTrace::capture(),
2333                    source: None,
2334                    loc: Location::caller(),
2335                }
2336                .into());
2337            }
2338            let next_version =
2339                update_state_paused(tx, execution_id, &appending_version, false).await?;
2340            return Ok((next_version, AppendNotifier::default()));
2341        }
2342
2343        ExecutionRequest::Finished { retval, .. } => {
2344            update_state_finished(
2345                tx,
2346                execution_id,
2347                &appending_version,
2348                req.created_at,
2349                PendingStateFinishedResultKind::from(retval),
2350            )
2351            .await?;
2352            return Ok((
2353                appending_version,
2354                AppendNotifier {
2355                    pending_at: None,
2356                    execution_finished: Some(NotifierExecutionFinished {
2357                        execution_id: execution_id.clone(),
2358                        retval: retval.clone(),
2359                    }),
2360                    response: None,
2361                },
2362            ));
2363        }
2364
2365        ExecutionRequest::HistoryEvent {
2366            event:
2367                HistoryEvent::JoinSetCreate { .. }
2368                | HistoryEvent::JoinSetRequest {
2369                    request: JoinSetRequest::ChildExecutionRequest { .. },
2370                    ..
2371                }
2372                | HistoryEvent::Persist { .. }
2373                | HistoryEvent::Schedule { .. }
2374                | HistoryEvent::Stub { .. }
2375                | HistoryEvent::JoinNextTooMany { .. }
2376                | HistoryEvent::JoinNextTry { .. },
2377        } => {
2378            return Ok((
2379                bump_state_next_version(tx, execution_id, &appending_version, None).await?,
2380                AppendNotifier::default(),
2381            ));
2382        }
2383
2384        ExecutionRequest::HistoryEvent {
2385            event:
2386                HistoryEvent::JoinSetRequest {
2387                    join_set_id,
2388                    request:
2389                        JoinSetRequest::DelayRequest {
2390                            delay_id,
2391                            expires_at,
2392                            ..
2393                        },
2394                },
2395        } => {
2396            return Ok((
2397                bump_state_next_version(
2398                    tx,
2399                    execution_id,
2400                    &appending_version,
2401                    Some(DelayReq {
2402                        join_set_id: join_set_id.clone(),
2403                        delay_id: delay_id.clone(),
2404                        expires_at: *expires_at,
2405                    }),
2406                )
2407                .await?,
2408                AppendNotifier::default(),
2409            ));
2410        }
2411
2412        ExecutionRequest::HistoryEvent {
2413            event:
2414                HistoryEvent::JoinNext {
2415                    join_set_id,
2416                    run_expires_at,
2417                    closing,
2418                    requested_ffqn: _,
2419                },
2420        } => {
2421            // Did the response arrive already?
2422            let join_next_count = count_join_next(tx, execution_id, join_set_id).await?;
2423
2424            // Fetch the response corresponding to this JoinNext (skip n-1)
2425            let nth_response =
2426                nth_response(tx, execution_id, join_set_id, join_next_count - 1).await?;
2427
2428            trace!("join_next_count: {join_next_count}, nth_response: {nth_response:?}");
2429            assert!(join_next_count > 0);
2430
2431            if let Some(ResponseWithCursor {
2432                event:
2433                    JoinSetResponseEventOuter {
2434                        created_at: nth_created_at,
2435                        ..
2436                    },
2437                cursor: _,
2438            }) = nth_response
2439            {
2440                let scheduled_at = std::cmp::max(*run_expires_at, nth_created_at);
2441                let (next_version, notifier) = update_state_pending_after_event_appended(
2442                    tx,
2443                    execution_id,
2444                    &appending_version,
2445                    scheduled_at,
2446                    false, // not an intermittent failure
2447                    combined_state.execution_with_state.component_digest,
2448                )
2449                .await?;
2450                return Ok((next_version, notifier));
2451            }
2452
2453            return Ok((
2454                update_state_blocked(
2455                    tx,
2456                    execution_id,
2457                    &appending_version,
2458                    join_set_id,
2459                    *run_expires_at,
2460                    *closing,
2461                )
2462                .await?,
2463                AppendNotifier::default(),
2464            ));
2465        }
2466    }
2467}
2468
2469async fn append_response(
2470    tx: &Transaction<'_>,
2471    execution_id: &ExecutionId,
2472    event: JoinSetResponseEventOuter,
2473) -> Result<AppendNotifier, DbErrorWrite> {
2474    let join_set_id = &event.event.join_set_id;
2475
2476    let (delay_id, delay_success) = match &event.event.event {
2477        JoinSetResponse::DelayFinished { delay_id, result } => {
2478            (Some(delay_id.to_string()), Some(result.is_ok()))
2479        }
2480        JoinSetResponse::ChildExecutionFinished { .. } => (None, None),
2481    };
2482
2483    let (child_execution_id, finished_version) = match &event.event.event {
2484        JoinSetResponse::ChildExecutionFinished {
2485            child_execution_id,
2486            finished_version,
2487            result: _,
2488        } => (
2489            Some(child_execution_id.to_string()),
2490            Some(i64::from(finished_version.0)),
2491        ),
2492        JoinSetResponse::DelayFinished { .. } => (None, None),
2493    };
2494
2495    let row = tx.query_one(
2496            "INSERT INTO t_join_set_response (execution_id, created_at, join_set_id, delay_id, delay_success, child_execution_id, finished_version) \
2497             VALUES ($1, $2, $3, $4, $5, $6, $7) RETURNING id",
2498             &[
2499                 &execution_id.to_string(),
2500                 &event.created_at,
2501                 &join_set_id.to_string(),
2502                 &delay_id,
2503                 &delay_success,
2504                 &child_execution_id,
2505                 &finished_version,
2506             ]
2507        ).await?;
2508    let cursor = ResponseCursor(
2509        u32::try_from(get::<i64, _>(&row, 0)?)
2510            .map_err(|_| consistency_db_err("t_join_set_response.id must not be negative"))?,
2511    );
2512    // if the execution is going to be unblocked by this response...
2513    let combined_state = get_combined_state(tx, execution_id).await?;
2514    debug!("previous_pending_state: {combined_state:?}");
2515
2516    let mut notifier = if let PendingStateMergedPause::BlockedByJoinSet {
2517        state:
2518            PendingStateBlockedByJoinSet {
2519                join_set_id: found_join_set_id,
2520                lock_expires_at, // Set to a future time if the worker is keeping the execution warm waiting for the result.
2521                closing: _,
2522            },
2523        paused: _,
2524    } =
2525        PendingStateMergedPause::from(combined_state.execution_with_state.pending_state)
2526        && *join_set_id == found_join_set_id
2527    {
2528        let scheduled_at = std::cmp::max(lock_expires_at, event.created_at);
2529        // Unblock the state.
2530        update_state_pending_after_response_appended(
2531            tx,
2532            execution_id,
2533            scheduled_at,
2534            combined_state.execution_with_state.component_digest,
2535        )
2536        .await?
2537    } else {
2538        AppendNotifier::default()
2539    };
2540
2541    if let JoinSetResponseEvent {
2542        join_set_id,
2543        event:
2544            JoinSetResponse::DelayFinished {
2545                delay_id,
2546                result: _,
2547            },
2548    } = &event.event
2549    {
2550        debug!(%join_set_id, %delay_id, "Deleting from `t_delay`");
2551        tx.execute(
2552            "DELETE FROM t_delay WHERE execution_id = $1 AND join_set_id = $2 AND delay_id = $3",
2553            &[
2554                &execution_id.to_string(),
2555                &join_set_id.to_string(),
2556                &delay_id.to_string(),
2557            ],
2558        )
2559        .await?;
2560    }
2561
2562    notifier.response = Some((execution_id.clone(), ResponseWithCursor { cursor, event }));
2563    Ok(notifier)
2564}
2565
2566async fn append_backtrace(
2567    tx: &Transaction<'_>,
2568    backtrace_info: &BacktraceInfo,
2569) -> Result<(), DbErrorWrite> {
2570    // Compute hash for deduplication
2571    let backtrace_hash = backtrace_info.wasm_backtrace.hash();
2572
2573    // Insert into t_wasm_backtrace if not already present
2574    tx.execute(
2575        "INSERT INTO t_wasm_backtrace (backtrace_hash, wasm_backtrace) \
2576         VALUES ($1, $2) \
2577         ON CONFLICT (backtrace_hash) DO NOTHING",
2578        &[
2579            &backtrace_hash.as_slice(),
2580            &Json(&backtrace_info.wasm_backtrace),
2581        ],
2582    )
2583    .await?;
2584
2585    // Insert into t_execution_backtrace referencing the hash
2586    tx.execute(
2587        "INSERT INTO t_execution_backtrace \
2588         (execution_id, component_id, version_min_including, version_max_excluding, backtrace_hash) \
2589         VALUES ($1, $2, $3, $4, $5)",
2590        &[
2591            &backtrace_info.execution_id.to_string(),
2592            &Json(&backtrace_info.component_id),
2593            &i64::from(backtrace_info.version_min_including.0),
2594            &i64::from(backtrace_info.version_max_excluding.0),
2595            &backtrace_hash.as_slice(),
2596        ],
2597    )
2598    .await?;
2599
2600    Ok(())
2601}
2602
2603async fn append_log(tx: &Transaction<'_>, row: &LogInfoAppendRow) -> Result<(), DbErrorWrite> {
2604    let (level, message, stream_type, payload, created_at) = match &row.log_entry {
2605        LogEntry::Log {
2606            created_at,
2607            level,
2608            message,
2609        } => (
2610            Some(*level as i32),
2611            Some(message.as_str()),
2612            None::<i32>,
2613            None::<&[u8]>,
2614            created_at,
2615        ),
2616        LogEntry::Stream {
2617            created_at,
2618            payload,
2619            stream_type,
2620        } => (
2621            None::<i32>,
2622            None::<&str>,
2623            Some(*stream_type as i32),
2624            Some(payload.as_slice()),
2625            created_at,
2626        ),
2627    };
2628
2629    tx.execute(
2630        "INSERT INTO t_log (
2631            execution_id,
2632            run_id,
2633            created_at,
2634            level,
2635            message,
2636            stream_type,
2637            payload
2638        ) VALUES ($1, $2, $3, $4, $5, $6, $7)",
2639        &[
2640            &row.execution_id.to_string(),
2641            &row.run_id.to_string(),
2642            &created_at,
2643            &level,
2644            &message,
2645            &stream_type,
2646            &payload,
2647        ],
2648    )
2649    .await?;
2650
2651    Ok(())
2652}
2653
2654async fn get_execution_log(
2655    tx: &Transaction<'_>,
2656    execution_id: &ExecutionId,
2657) -> Result<concepts::storage::ExecutionLog, DbErrorRead> {
2658    let rows = tx
2659        .query(
2660            "SELECT created_at, json_value, version FROM t_execution_log WHERE \
2661                 execution_id = $1 ORDER BY version",
2662            &[&execution_id.to_string()],
2663        )
2664        .await
2665        .map_err(DbErrorRead::from)?;
2666
2667    if rows.is_empty() {
2668        return Err(DbErrorRead::NotFound);
2669    }
2670
2671    let mut events = Vec::with_capacity(rows.len());
2672    for row in rows {
2673        let created_at: DateTime<Utc> = get(&row, "created_at")?;
2674        let event: Json<ExecutionRequest> = get(&row, "json_value")?;
2675        let event = event.0;
2676        let version: i64 = get(&row, "version")?;
2677        let version = Version::try_from(version)
2678            .map_err(|_| consistency_db_err("version must be non-negative"))?;
2679
2680        events.push(ExecutionEvent {
2681            created_at,
2682            event,
2683            backtrace_id: None,
2684            version,
2685        });
2686    }
2687
2688    let combined_state = get_combined_state(tx, execution_id).await?;
2689    let responses = list_responses(tx, execution_id, None).await?;
2690
2691    Ok(concepts::storage::ExecutionLog {
2692        execution_id: execution_id.clone(),
2693        events,
2694        responses,
2695        next_version: combined_state.get_next_version_or_finished(),
2696        pending_state: combined_state.execution_with_state.pending_state,
2697        component_digest: combined_state.execution_with_state.component_digest,
2698        component_type: combined_state.execution_with_state.component_type,
2699        deployment_id: combined_state.execution_with_state.deployment_id,
2700    })
2701}
2702
2703async fn get_max_version(
2704    tx: &Transaction<'_>,
2705    execution_id: &ExecutionId,
2706) -> Result<Version, DbErrorRead> {
2707    let row = tx
2708        .query_one(
2709            "SELECT MAX(version) as version FROM t_execution_log WHERE execution_id = $1",
2710            &[&execution_id.to_string()],
2711        )
2712        .await?;
2713    let max_version: i64 = get(&row, "version")?;
2714    let max_version = Version::try_from(max_version)
2715        .map_err(|_| consistency_db_err("version must be non-negative"))?;
2716    Ok(max_version)
2717}
2718
2719async fn get_max_response_cursor(
2720    tx: &Transaction<'_>,
2721    execution_id: &ExecutionId,
2722) -> Result<ResponseCursor, DbErrorRead> {
2723    let row = tx
2724        .query_one(
2725            "SELECT MAX(id) as id FROM t_join_set_response WHERE execution_id = $1",
2726            &[&execution_id.to_string()],
2727        )
2728        .await?;
2729    // Assume the execution exists and has no responses
2730    let max_cursor = get::<Option<i64>, _>(&row, "id")?.unwrap_or_default();
2731    let max_cursor = ResponseCursor(
2732        u32::try_from(max_cursor).map_err(|_| consistency_db_err("id must not be negative"))?,
2733    );
2734    Ok(max_cursor)
2735}
2736
2737async fn list_execution_events(
2738    tx: &Transaction<'_>,
2739    execution_id: &ExecutionId,
2740    pagination: Pagination<VersionType>,
2741    include_backtrace_id: bool,
2742) -> Result<Vec<ExecutionEvent>, DbErrorRead> {
2743    let mut params: Vec<Box<dyn ToSql + Send + Sync>> = Vec::new();
2744    let mut add_param = |p: Box<dyn ToSql + Send + Sync>| {
2745        params.push(p);
2746        format!("${}", params.len())
2747    };
2748
2749    let p_execution_id = add_param(Box::new(execution_id.to_string()));
2750
2751    let (cursor, length, rel, is_desc) = match &pagination {
2752        Pagination::NewerThan {
2753            cursor,
2754            length,
2755            including_cursor,
2756        } => (
2757            *cursor,
2758            *length,
2759            if *including_cursor { ">=" } else { ">" },
2760            false,
2761        ),
2762        Pagination::OlderThan {
2763            cursor,
2764            length,
2765            including_cursor,
2766        } => (
2767            *cursor,
2768            *length,
2769            if *including_cursor { "<=" } else { "<" },
2770            true,
2771        ),
2772    };
2773    let p_cursor = add_param(Box::new(i64::from(cursor)));
2774    let p_limit = add_param(Box::new(i64::from(length)));
2775
2776    let base_select = if include_backtrace_id {
2777        format!(
2778            "SELECT
2779                log.created_at,
2780                log.json_value,
2781                log.version,
2782                bt.version_min_including AS backtrace_id
2783            FROM
2784                t_execution_log AS log
2785            LEFT OUTER JOIN
2786                t_execution_backtrace AS bt ON log.execution_id = bt.execution_id
2787                                AND log.version >= bt.version_min_including
2788                                AND log.version < bt.version_max_excluding
2789            WHERE
2790                log.execution_id = {p_execution_id}
2791                AND log.version {rel} {p_cursor}"
2792        )
2793    } else {
2794        format!(
2795            "SELECT
2796                created_at, json_value, NULL::BIGINT as backtrace_id, version
2797            FROM t_execution_log WHERE
2798                execution_id = {p_execution_id} AND version {rel} {p_cursor}"
2799        )
2800    };
2801
2802    let order = if is_desc { "DESC" } else { "ASC" };
2803    let mut sql = format!("{base_select} ORDER BY version {order} LIMIT {p_limit}");
2804
2805    // Re-order to ascending for consistent oldest-to-newest results
2806    if is_desc {
2807        sql = format!("SELECT * FROM ({sql}) AS sub ORDER BY version ASC");
2808    }
2809
2810    let params_refs: Vec<&(dyn ToSql + Sync)> = params
2811        .iter()
2812        .map(|p| p.as_ref() as &(dyn ToSql + Sync))
2813        .collect();
2814
2815    let rows = tx
2816        .query(&sql, &params_refs)
2817        .await
2818        .map_err(DbErrorRead::from)?;
2819
2820    let mut events = Vec::with_capacity(rows.len());
2821    for row in rows {
2822        let created_at: DateTime<Utc> = get(&row, "created_at")?;
2823        let backtrace_id = get::<Option<i64>, _>(&row, "backtrace_id")?
2824            .map(Version::try_from)
2825            .transpose()
2826            .map_err(|_| consistency_db_err("version must be non-negative"))?;
2827
2828        let version = get::<i64, _>(&row, "version")?;
2829        let version = Version::new(
2830            VersionType::try_from(version)
2831                .map_err(|_| consistency_db_err("version must be non-negative"))?,
2832        );
2833        let event_req: Json<ExecutionRequest> = get(&row, "json_value")?;
2834        let event_req = event_req.0;
2835
2836        events.push(ExecutionEvent {
2837            created_at,
2838            event: event_req,
2839            backtrace_id,
2840            version,
2841        });
2842    }
2843    Ok(events)
2844}
2845
2846async fn get_execution_event(
2847    tx: &Transaction<'_>,
2848    execution_id: &ExecutionId,
2849    version: VersionType,
2850) -> Result<ExecutionEvent, DbErrorRead> {
2851    let row = tx
2852        .query_one(
2853            "SELECT created_at, json_value, version FROM t_execution_log WHERE \
2854                 execution_id = $1 AND version = $2",
2855            &[&execution_id.to_string(), &i64::from(version)],
2856        )
2857        .await?;
2858
2859    let created_at: DateTime<Utc> = get(&row, "created_at")?;
2860    let json_val: Json<ExecutionRequest> = get(&row, "json_value")?;
2861    let version = get::<i64, _>(&row, "version")?;
2862    let version = Version::try_from(version)
2863        .map_err(|_| consistency_db_err("version must be non-negative"))?;
2864    let event = json_val.0;
2865
2866    Ok(ExecutionEvent {
2867        created_at,
2868        event,
2869        backtrace_id: None,
2870        version,
2871    })
2872}
2873
2874async fn get_last_execution_event(
2875    tx: &Transaction<'_>,
2876    execution_id: &ExecutionId,
2877) -> Result<ExecutionEvent, DbErrorRead> {
2878    let row = tx
2879        .query_one(
2880            "SELECT created_at, json_value, version FROM t_execution_log WHERE \
2881                 execution_id = $1 ORDER BY version DESC LIMIT 1",
2882            &[&execution_id.to_string()],
2883        )
2884        .await?;
2885
2886    let created_at: DateTime<Utc> = get(&row, "created_at")?;
2887    let event: Json<ExecutionRequest> = get(&row, "json_value")?;
2888    let event = event.0;
2889    let version: i64 = get(&row, "version")?;
2890    let version = Version::try_from(version)
2891        .map_err(|_| consistency_db_err("version must be non-negative"))?;
2892
2893    Ok(ExecutionEvent {
2894        created_at,
2895        event,
2896        backtrace_id: None,
2897        version,
2898    })
2899}
2900
2901async fn delay_response(
2902    tx: &Transaction<'_>,
2903    execution_id: &ExecutionId,
2904    delay_id: &DelayId,
2905) -> Result<Option<bool>, DbErrorRead> {
2906    let row = tx
2907        .query_opt(
2908            "SELECT delay_success \
2909                 FROM t_join_set_response \
2910                 WHERE \
2911                 execution_id = $1 AND delay_id = $2",
2912            &[&execution_id.to_string(), &delay_id.to_string()],
2913        )
2914        .await?;
2915
2916    match row {
2917        Some(r) => Ok(Some(get::<bool, _>(&r, "delay_success")?)),
2918        None => Ok(None),
2919    }
2920}
2921
2922#[instrument(level = Level::TRACE, skip_all)]
2923async fn get_responses_after(
2924    tx: &Transaction<'_>,
2925    execution_id: &ExecutionId,
2926    last_response: ResponseCursor,
2927) -> Result<Vec<ResponseWithCursor>, DbErrorRead> {
2928    let rows = tx
2929            .query(
2930                "SELECT r.id, r.created_at, r.join_set_id, \
2931                 r.delay_id, r.delay_success, \
2932                 r.child_execution_id, r.finished_version, child.json_value \
2933                 FROM t_join_set_response r LEFT OUTER JOIN t_execution_log child ON r.child_execution_id = child.execution_id \
2934                 WHERE \
2935                 r.id > $1 AND \
2936                 r.execution_id = $2 AND \
2937                 ( \
2938                 r.finished_version = child.version \
2939                 OR \
2940                 r.child_execution_id IS NULL \
2941                 ) \
2942                 ORDER BY id \
2943                 ",
2944                 &[
2945                     &i64::from(last_response.0),
2946                     &execution_id.to_string(),
2947                 ]
2948            )
2949            .await?;
2950
2951    let mut results = Vec::with_capacity(rows.len());
2952    for row in rows {
2953        let resp = parse_response_with_cursor(&row)?;
2954        results.push(resp);
2955    }
2956    Ok(results)
2957}
2958
2959async fn get_pending_of_single_ffqn(
2960    tx: &Transaction<'_>,
2961    batch_size: u32,
2962    pending_at_or_sooner: DateTime<Utc>,
2963    ffqn: &FunctionFqn,
2964    select_strategy: SelectStrategy,
2965) -> Result<Vec<(ExecutionId, Version)>, ()> {
2966    let rows = tx
2967        .query(
2968            &format!(
2969                r"
2970                SELECT execution_id, corresponding_version FROM t_state
2971                WHERE
2972                state = '{STATE_PENDING_AT}' AND
2973                pending_expires_finished <= $1 AND ffqn = $2
2974                AND is_paused = false
2975                ORDER BY pending_expires_finished
2976                {}
2977                LIMIT $3
2978                ",
2979                if select_strategy == SelectStrategy::LockForUpdate {
2980                    "FOR UPDATE SKIP LOCKED"
2981                } else {
2982                    ""
2983                }
2984            ),
2985            &[
2986                &pending_at_or_sooner,
2987                &ffqn.to_string(),
2988                &(i64::from(batch_size)),
2989            ],
2990        )
2991        .await
2992        .map_err(|err| {
2993            warn!("Ignoring consistency error {err:?}");
2994        })?;
2995
2996    let mut result = Vec::with_capacity(rows.len());
2997    for row in rows {
2998        let unpack = || -> Result<(ExecutionId, Version), DbErrorGeneric> {
2999            let eid_str: String = get(&row, "execution_id")?;
3000            let corresponding_version: i64 = get(&row, "corresponding_version")?;
3001            let corresponding_version = Version::try_from(corresponding_version)
3002                .map_err(|_| consistency_db_err("version must be non-negative"))?;
3003
3004            if let Ok(eid) = ExecutionId::from_str(&eid_str) {
3005                return Ok((eid, corresponding_version.increment()));
3006            }
3007            Err(consistency_db_err("invalid execution_id"))
3008        };
3009
3010        match unpack() {
3011            Ok(val) => result.push(val),
3012            Err(err) => warn!("Ignoring corrupted row in pending check: {err:?}"),
3013        }
3014    }
3015    Ok(result)
3016}
3017
3018/// Get executions and their next versions
3019async fn get_pending_by_ffqns(
3020    tx: &Transaction<'_>,
3021    batch_size: u32,
3022    pending_at_or_sooner: DateTime<Utc>,
3023    ffqns: &[FunctionFqn],
3024    select_strategy: SelectStrategy,
3025) -> Result<Vec<(ExecutionId, Version)>, DbErrorGeneric> {
3026    let batch_size = usize::try_from(batch_size).expect("16 bit systems are unsupported");
3027    let mut execution_ids_versions = Vec::with_capacity(batch_size);
3028
3029    for ffqn in ffqns {
3030        let needed = batch_size - execution_ids_versions.len();
3031        if needed == 0 {
3032            break;
3033        }
3034        let needed = u32::try_from(needed).expect("u32 - usize cannot overflow an 32");
3035        if let Ok(execs) =
3036            get_pending_of_single_ffqn(tx, needed, pending_at_or_sooner, ffqn, select_strategy)
3037                .await
3038        {
3039            execution_ids_versions.extend(execs);
3040        }
3041    }
3042
3043    Ok(execution_ids_versions)
3044}
3045
3046#[derive(Debug, Copy, Clone, PartialEq, Eq)]
3047enum SelectStrategy {
3048    Read,
3049    LockForUpdate,
3050}
3051
3052async fn get_pending_by_component_input_digest(
3053    tx: &Transaction<'_>,
3054    batch_size: u32,
3055    pending_at_or_sooner: DateTime<Utc>,
3056    input_digest: &ComponentDigest,
3057    select_strategy: SelectStrategy,
3058) -> Result<Vec<(ExecutionId, Version)>, DbErrorGeneric> {
3059    let rows = tx
3060        .query(
3061            &format!(
3062                r"
3063                SELECT execution_id, corresponding_version FROM t_state WHERE
3064                state = '{STATE_PENDING_AT}' AND
3065                pending_expires_finished <= $1 AND
3066                component_id_input_digest = $2
3067                AND is_paused = false
3068                ORDER BY pending_expires_finished
3069                {}
3070                LIMIT $3
3071                ",
3072                if select_strategy == SelectStrategy::LockForUpdate {
3073                    "FOR UPDATE SKIP LOCKED"
3074                } else {
3075                    ""
3076                }
3077            ),
3078            &[&pending_at_or_sooner, &input_digest, &i64::from(batch_size)],
3079        )
3080        .await?;
3081
3082    let mut result = Vec::with_capacity(rows.len());
3083    for row in rows {
3084        let unpack = || -> Result<(ExecutionId, Version), DbErrorGeneric> {
3085            let eid_str: String = get(&row, "execution_id")?;
3086            let corresponding_version: i64 = get(&row, "corresponding_version")?;
3087            let corresponding_version = Version::try_from(corresponding_version)
3088                .map_err(|_| consistency_db_err("version must be non-negative"))?;
3089
3090            let eid = ExecutionId::from_str(&eid_str)
3091                .map_err(|err| consistency_db_err(err.to_string()))?;
3092            Ok((eid, corresponding_version.increment()))
3093        };
3094
3095        match unpack() {
3096            Ok(val) => result.push(val),
3097            Err(err) => {
3098                warn!("Skipping corrupted row in get_pending_by_component_input_digest: {err:?}");
3099            }
3100        }
3101    }
3102
3103    Ok(result)
3104}
3105
3106fn notify_pending_locked(
3107    notifier: &NotifierPendingAt,
3108    current_time: DateTime<Utc>,
3109    ffqn_to_pending_subscription: &std::sync::MutexGuard<PendingFfqnSubscribersHolder>,
3110) {
3111    if notifier.scheduled_at <= current_time {
3112        ffqn_to_pending_subscription.notify(notifier);
3113    }
3114}
3115
3116async fn upgrade_execution_component(
3117    tx: &Transaction<'_>,
3118    execution_id: &ExecutionId,
3119    old: &ComponentDigest,
3120    new: &ComponentDigest,
3121) -> Result<(), DbErrorWrite> {
3122    debug!("Updating t_state to component {new}");
3123
3124    let updated = tx
3125        .execute(
3126            r"
3127                UPDATE t_state
3128                SET
3129                    updated_at = CURRENT_TIMESTAMP,
3130                    component_id_input_digest = $1
3131                WHERE
3132                    execution_id = $2 AND
3133                    component_id_input_digest = $3
3134                ",
3135            &[
3136                &new.as_slice(),           // $1: BYTEA
3137                &execution_id.to_string(), // $2: TEXT
3138                &old.as_slice(),           // $3: BYTEA
3139            ],
3140        )
3141        .await?;
3142
3143    if updated != 1 {
3144        return Err(DbErrorWrite::NotFound);
3145    }
3146    Ok(())
3147}
3148
3149impl PostgresConnection {
3150    // Must be called after write transaction commit for a correct happens-before relationship.
3151    #[instrument(level = Level::TRACE, skip_all)]
3152    fn notify_all(&self, notifiers: Vec<AppendNotifier>, current_time: DateTime<Utc>) {
3153        let (pending_ats, finished_execs, responses) = {
3154            let (mut pending_ats, mut finished_execs, mut responses) =
3155                (Vec::new(), Vec::new(), Vec::new());
3156            for notifier in notifiers {
3157                if let Some(pending_at) = notifier.pending_at {
3158                    pending_ats.push(pending_at);
3159                }
3160                if let Some(finished) = notifier.execution_finished {
3161                    finished_execs.push(finished);
3162                }
3163                if let Some(response) = notifier.response {
3164                    responses.push(response);
3165                }
3166            }
3167            (pending_ats, finished_execs, responses)
3168        };
3169
3170        // Notify pending_at subscribers.
3171        if !pending_ats.is_empty() {
3172            let guard = self.pending_subscribers.lock().unwrap();
3173            for pending_at in pending_ats {
3174                notify_pending_locked(&pending_at, current_time, &guard);
3175            }
3176        }
3177        // Notify execution finished subscribers.
3178        if !finished_execs.is_empty() {
3179            let mut guard = self.execution_finished_subscribers.lock().unwrap();
3180            for finished in finished_execs {
3181                if let Some(listeners_of_exe_id) = guard.remove(&finished.execution_id) {
3182                    for (_tag, sender) in listeners_of_exe_id {
3183                        let _ = sender.send(finished.retval.clone());
3184                    }
3185                }
3186            }
3187        }
3188        // Notify response subscribers.
3189        if !responses.is_empty() {
3190            let mut guard = self.response_subscribers.lock().unwrap();
3191            for (execution_id, response) in responses {
3192                if let Some((sender, _)) = guard.remove(&execution_id) {
3193                    let _ = sender.send(response);
3194                }
3195            }
3196        }
3197    }
3198}
3199
3200#[async_trait]
3201impl DbExecutor for PostgresConnection {
3202    #[instrument(level = Level::TRACE, skip(self))]
3203    async fn lock_pending_by_ffqns(
3204        &self,
3205        batch_size: u32,
3206        pending_at_or_sooner: DateTime<Utc>,
3207        ffqns: Arc<[FunctionFqn]>,
3208        created_at: DateTime<Utc>,
3209        component_id: ComponentId,
3210        deployment_id: DeploymentId,
3211        executor_id: ExecutorId,
3212        lock_expires_at: DateTime<Utc>,
3213        run_id: RunId,
3214        retry_config: ComponentRetryConfig,
3215    ) -> Result<LockPendingResponse, DbErrorWrite> {
3216        let mut client_guard = self.client.lock().await;
3217        let tx = client_guard.transaction().await?;
3218
3219        let execution_ids_versions = get_pending_by_ffqns(
3220            &tx,
3221            batch_size,
3222            pending_at_or_sooner,
3223            &ffqns,
3224            SelectStrategy::LockForUpdate,
3225        )
3226        .await?;
3227
3228        if execution_ids_versions.is_empty() {
3229            // Commit is required to release the connection state cleanly,
3230            // though rollback/drop works too for read-only.
3231            tx.commit().await?;
3232            return Ok(vec![]);
3233        }
3234
3235        debug!("Locking {execution_ids_versions:?}");
3236
3237        // Lock using the same transaction
3238        let mut locked_execs = Vec::with_capacity(execution_ids_versions.len());
3239        for (execution_id, version) in execution_ids_versions {
3240            match lock_single_execution(
3241                &tx,
3242                created_at,
3243                &component_id,
3244                deployment_id,
3245                &execution_id,
3246                run_id,
3247                &version,
3248                executor_id,
3249                lock_expires_at,
3250                retry_config,
3251            )
3252            .await
3253            {
3254                Ok(locked) => locked_execs.push(locked),
3255                Err(err) => {
3256                    tx.rollback().await?; // lock_single_execution performs multiple writes
3257                    debug!("Locking row {execution_id} failed - {err:?}");
3258                    return Err(err);
3259                }
3260            }
3261        }
3262
3263        tx.commit().await?;
3264
3265        Ok(locked_execs)
3266    }
3267
3268    #[instrument(level = Level::TRACE, skip(self))]
3269    async fn lock_pending_by_component_digest(
3270        &self,
3271        batch_size: u32,
3272        pending_at_or_sooner: DateTime<Utc>,
3273        component_id: &ComponentId,
3274        deployment_id: DeploymentId,
3275        created_at: DateTime<Utc>,
3276        executor_id: ExecutorId,
3277        lock_expires_at: DateTime<Utc>,
3278        run_id: RunId,
3279        retry_config: ComponentRetryConfig,
3280    ) -> Result<LockPendingResponse, DbErrorWrite> {
3281        let mut client_guard = self.client.lock().await;
3282        let tx = client_guard.transaction().await?;
3283
3284        let execution_ids_versions = get_pending_by_component_input_digest(
3285            &tx,
3286            batch_size,
3287            pending_at_or_sooner,
3288            &component_id.component_digest,
3289            SelectStrategy::LockForUpdate,
3290        )
3291        .await?;
3292
3293        if execution_ids_versions.is_empty() {
3294            tx.commit().await?;
3295            return Ok(vec![]);
3296        }
3297
3298        debug!("Locking {execution_ids_versions:?}");
3299
3300        let mut locked_execs = Vec::with_capacity(execution_ids_versions.len());
3301        for (execution_id, version) in execution_ids_versions {
3302            match lock_single_execution(
3303                &tx,
3304                created_at,
3305                component_id,
3306                deployment_id,
3307                &execution_id,
3308                run_id,
3309                &version,
3310                executor_id,
3311                lock_expires_at,
3312                retry_config,
3313            )
3314            .await
3315            {
3316                Ok(locked) => locked_execs.push(locked),
3317                Err(err) => {
3318                    tx.rollback().await?; // lock_single_execution performs multiple writes
3319                    debug!("Locking row {execution_id} failed - {err:?}");
3320                    return Err(err);
3321                }
3322            }
3323        }
3324
3325        tx.commit().await?;
3326        Ok(locked_execs)
3327    }
3328
3329    #[cfg(feature = "test")]
3330    #[instrument(level = Level::DEBUG, skip(self))]
3331    async fn lock_one(
3332        &self,
3333        created_at: DateTime<Utc>,
3334        component_id: ComponentId,
3335        deployment_id: DeploymentId,
3336        execution_id: &ExecutionId,
3337        run_id: RunId,
3338        version: Version,
3339        executor_id: ExecutorId,
3340        lock_expires_at: DateTime<Utc>,
3341        retry_config: ComponentRetryConfig,
3342    ) -> Result<LockedExecution, DbErrorWrite> {
3343        debug!(%execution_id, "lock_one");
3344        let mut client_guard = self.client.lock().await;
3345        let tx = client_guard.transaction().await?;
3346
3347        let res = lock_single_execution(
3348            &tx,
3349            created_at,
3350            &component_id,
3351            deployment_id,
3352            execution_id,
3353            run_id,
3354            &version,
3355            executor_id,
3356            lock_expires_at,
3357            retry_config,
3358        )
3359        .await?;
3360
3361        tx.commit().await?;
3362        Ok(res)
3363    }
3364
3365    #[instrument(level = Level::DEBUG, skip(self, req))]
3366    async fn append(
3367        &self,
3368        execution_id: ExecutionId,
3369        version: Version,
3370        req: AppendRequest,
3371    ) -> Result<AppendResponse, DbErrorWrite> {
3372        debug!(%req, "append");
3373        trace!(?req, "append");
3374        let created_at = req.created_at;
3375
3376        let mut client_guard = self.client.lock().await;
3377        let tx = client_guard.transaction().await?;
3378
3379        let (new_version, notifier) = append(&tx, &execution_id, req, version).await?;
3380
3381        tx.commit().await?;
3382
3383        // Explicitly drop guard (optional, happens at end of scope anyway)
3384        drop(client_guard);
3385
3386        self.notify_all(vec![notifier], created_at);
3387        Ok(new_version)
3388    }
3389
3390    #[instrument(level = Level::DEBUG, skip_all)]
3391    async fn append_batch_respond_to_parent(
3392        &self,
3393        events: AppendEventsToExecution,
3394        response: AppendResponseToExecution,
3395        current_time: DateTime<Utc>,
3396    ) -> Result<AppendBatchResponse, DbErrorWrite> {
3397        debug!("append_batch_respond_to_parent");
3398        if events.execution_id == response.parent_execution_id {
3399            return Err(DbErrorWrite::NonRetriable(
3400                DbErrorWriteNonRetriable::ValidationFailed(
3401                    "Parameters `execution_id` and `parent_execution_id` cannot be the same".into(),
3402                ),
3403            ));
3404        }
3405        if events.batch.is_empty() {
3406            return Err(DbErrorWrite::NonRetriable(
3407                DbErrorWriteNonRetriable::ValidationFailed("batch cannot be empty".into()),
3408            ));
3409        }
3410
3411        let mut client_guard = self.client.lock().await;
3412        let tx = client_guard.transaction().await?;
3413
3414        let mut version = events.version;
3415        let mut notifiers = Vec::new();
3416
3417        for append_request in events.batch {
3418            let (v, n) = append(&tx, &events.execution_id, append_request, version).await?;
3419            version = v;
3420            notifiers.push(n);
3421        }
3422
3423        let pending_at_parent = append_response(
3424            &tx,
3425            &response.parent_execution_id,
3426            JoinSetResponseEventOuter {
3427                created_at: response.created_at,
3428                event: JoinSetResponseEvent {
3429                    join_set_id: response.join_set_id,
3430                    event: JoinSetResponse::ChildExecutionFinished {
3431                        child_execution_id: response.child_execution_id,
3432                        finished_version: response.finished_version,
3433                        result: response.result,
3434                    },
3435                },
3436            },
3437        )
3438        .await?;
3439        notifiers.push(pending_at_parent);
3440
3441        tx.commit().await?;
3442        drop(client_guard);
3443
3444        self.notify_all(notifiers, current_time);
3445        Ok(version)
3446    }
3447
3448    #[instrument(level = Level::TRACE, skip(self, timeout_fut))]
3449    async fn wait_for_pending_by_ffqn(
3450        &self,
3451        pending_at_or_sooner: DateTime<Utc>,
3452        ffqns: Arc<[FunctionFqn]>,
3453        timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
3454    ) {
3455        let unique_tag: u64 = rand::random();
3456        let (sender, mut receiver) = mpsc::channel(1);
3457        {
3458            let mut pending_subscribers = self.pending_subscribers.lock().unwrap();
3459            for ffqn in ffqns.as_ref() {
3460                pending_subscribers.insert_ffqn(ffqn.clone(), (sender.clone(), unique_tag));
3461            }
3462        }
3463
3464        async {
3465            let mut db_has_pending = false;
3466            {
3467                // Scope the lock so we don't hold it while waiting for timeout
3468                let mut client_guard = self.client.lock().await;
3469                // Read-only transaction check
3470                if let Ok(tx) = client_guard.transaction().await {
3471                    if let Ok(res) = get_pending_by_ffqns(
3472                        &tx,
3473                        1,
3474                        pending_at_or_sooner,
3475                        &ffqns,
3476                        SelectStrategy::Read,
3477                    )
3478                    .await
3479                        && !res.is_empty()
3480                    {
3481                        db_has_pending = true;
3482                    }
3483                    // Commit/Rollback read transaction
3484                    let _ = tx.commit().await;
3485                }
3486            }
3487
3488            if db_has_pending {
3489                trace!("Not waiting, database already contains new pending executions");
3490                return;
3491            }
3492
3493            tokio::select! {
3494                _ = receiver.recv() => {
3495                    trace!("Received a notification");
3496                }
3497                () = timeout_fut => {
3498                }
3499            }
3500        }
3501        .await;
3502
3503        // Cleanup
3504        {
3505            let mut pending_subscribers = self.pending_subscribers.lock().unwrap();
3506            for ffqn in ffqns.as_ref() {
3507                match pending_subscribers.remove_ffqn(ffqn) {
3508                    Some((_, tag)) if tag == unique_tag => {}
3509                    Some(other) => {
3510                        pending_subscribers.insert_ffqn(ffqn.clone(), other);
3511                    }
3512                    None => {}
3513                }
3514            }
3515        }
3516    }
3517
3518    #[instrument(level = Level::DEBUG, skip(self, timeout_fut))]
3519    async fn wait_for_pending_by_component_digest(
3520        &self,
3521        pending_at_or_sooner: DateTime<Utc>,
3522        component_digest: &ComponentDigest,
3523        timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
3524    ) {
3525        let unique_tag: u64 = rand::random();
3526        let (sender, mut receiver) = mpsc::channel(1);
3527        {
3528            let mut pending_subscribers = self.pending_subscribers.lock().unwrap();
3529            pending_subscribers
3530                .insert_by_component(component_digest.clone(), (sender.clone(), unique_tag));
3531        }
3532
3533        async {
3534            let mut db_has_pending = false;
3535            {
3536                let mut client_guard = self.client.lock().await;
3537                if let Ok(tx) = client_guard.transaction().await {
3538                    if let Ok(res) = get_pending_by_component_input_digest(
3539                        &tx,
3540                        1,
3541                        pending_at_or_sooner,
3542                        component_digest,
3543                        SelectStrategy::Read,
3544                    )
3545                    .await
3546                        && !res.is_empty()
3547                    {
3548                        db_has_pending = true;
3549                    }
3550                    let _ = tx.commit().await;
3551                }
3552            }
3553
3554            if db_has_pending {
3555                trace!("Not waiting, database already contains new pending executions");
3556                return;
3557            }
3558
3559            tokio::select! {
3560                _ = receiver.recv() => {
3561                    trace!("Received a notification");
3562                }
3563                () = timeout_fut => {
3564                }
3565            }
3566        }
3567        .await;
3568
3569        // Cleanup
3570        {
3571            let mut pending_subscribers = self.pending_subscribers.lock().unwrap();
3572            match pending_subscribers.remove_by_component(component_digest) {
3573                Some((_, tag)) if tag == unique_tag => {}
3574                Some(other) => {
3575                    pending_subscribers.insert_by_component(component_digest.clone(), other);
3576                }
3577                None => {}
3578            }
3579        }
3580    }
3581
3582    async fn get_last_execution_event(
3583        &self,
3584        execution_id: &ExecutionId,
3585    ) -> Result<ExecutionEvent, DbErrorRead> {
3586        let mut client_guard = self.client.lock().await;
3587        let tx = client_guard.transaction().await?;
3588
3589        let event = get_last_execution_event(&tx, execution_id).await?;
3590
3591        tx.commit().await?;
3592        Ok(event)
3593    }
3594}
3595#[async_trait]
3596impl DbConnection for PostgresConnection {
3597    #[instrument(level = Level::DEBUG, skip_all, fields(execution_id = %req.execution_id))]
3598    async fn create(&self, req: CreateRequest) -> Result<AppendResponse, DbErrorWrite> {
3599        debug!("create");
3600        trace!(?req, "create");
3601        let created_at = req.created_at;
3602
3603        let mut client_guard = self.client.lock().await;
3604        let tx = client_guard.transaction().await?;
3605
3606        let (version, notifier) = create_inner(&tx, req.clone()).await?;
3607
3608        tx.commit().await?;
3609        drop(client_guard); // Release DB lock before notifying
3610
3611        self.notify_all(vec![notifier], created_at);
3612        Ok(version)
3613    }
3614
3615    #[instrument(level = Level::DEBUG, skip(self))]
3616    async fn get(
3617        &self,
3618        execution_id: &ExecutionId,
3619    ) -> Result<concepts::storage::ExecutionLog, DbErrorRead> {
3620        trace!("get");
3621        let mut client_guard = self.client.lock().await;
3622        let tx = client_guard.transaction().await?;
3623
3624        let res = get_execution_log(&tx, execution_id).await?;
3625
3626        tx.commit().await?;
3627        Ok(res)
3628    }
3629
3630    #[instrument(level = Level::DEBUG, skip(self, batch))]
3631    async fn append_batch(
3632        &self,
3633        current_time: DateTime<Utc>,
3634        batch: Vec<AppendRequest>,
3635        execution_id: ExecutionId,
3636        version: Version,
3637    ) -> Result<AppendBatchResponse, DbErrorWrite> {
3638        debug!("append_batch");
3639        trace!(?batch, "append_batch");
3640        assert!(!batch.is_empty(), "Empty batch request");
3641
3642        let mut client_guard = self.client.lock().await;
3643        let tx = client_guard.transaction().await?;
3644
3645        let mut version = version;
3646        let mut notifier = None;
3647
3648        for append_request in batch {
3649            let (v, n) = append(&tx, &execution_id, append_request, version).await?;
3650            version = v;
3651            notifier = Some(n);
3652        }
3653
3654        tx.commit().await?;
3655        drop(client_guard);
3656
3657        self.notify_all(
3658            vec![notifier.expect("checked that the batch is not empty")],
3659            current_time,
3660        );
3661        Ok(version)
3662    }
3663
3664    #[instrument(level = Level::DEBUG, skip_all, fields(%execution_id, %version))]
3665    async fn append_batch_create_new_execution(
3666        &self,
3667        current_time: DateTime<Utc>,
3668        batch: Vec<AppendRequest>,
3669        execution_id: ExecutionId,
3670        version: Version,
3671        child_req: Vec<CreateRequest>,
3672        backtraces: Vec<BacktraceInfo>,
3673    ) -> Result<AppendBatchResponse, DbErrorWrite> {
3674        debug!("append_batch_create_new_execution");
3675        trace!(?batch, ?child_req, "append_batch_create_new_execution");
3676        assert!(!batch.is_empty(), "Empty batch request");
3677
3678        let mut client_guard = self.client.lock().await;
3679        let tx = client_guard.transaction().await?;
3680
3681        let mut version = version;
3682        let mut notifier = None;
3683
3684        for append_request in batch {
3685            let (v, n) = append(&tx, &execution_id, append_request, version).await?;
3686            version = v;
3687            notifier = Some(n);
3688        }
3689
3690        let mut notifiers = Vec::new();
3691        notifiers.push(notifier.expect("checked that the batch is not empty"));
3692
3693        for req in child_req {
3694            let (_, n) = create_inner(&tx, req).await?;
3695            notifiers.push(n);
3696        }
3697        for backtrace in backtraces {
3698            append_backtrace(&tx, &backtrace).await?;
3699        }
3700        tx.commit().await?;
3701        drop(client_guard);
3702
3703        self.notify_all(notifiers, current_time);
3704        Ok(version)
3705    }
3706
3707    #[instrument(level = Level::DEBUG, skip(self, timeout_fut))]
3708    async fn subscribe_to_next_responses(
3709        &self,
3710        execution_id: &ExecutionId,
3711        last_response: ResponseCursor,
3712        timeout_fut: Pin<Box<dyn Future<Output = TimeoutOutcome> + Send>>,
3713    ) -> Result<Vec<ResponseWithCursor>, DbErrorReadWithTimeout> {
3714        debug!("next_responses");
3715        let unique_tag: u64 = rand::random();
3716        let execution_id_clone = execution_id.clone();
3717
3718        let cleanup = || {
3719            let mut guard = self.response_subscribers.lock().unwrap();
3720            match guard.remove(&execution_id_clone) {
3721                Some((_, tag)) if tag == unique_tag => {}
3722                Some(other) => {
3723                    guard.insert(execution_id_clone.clone(), other);
3724                }
3725                None => {}
3726            }
3727        };
3728
3729        let receiver = {
3730            let mut client_guard = self.client.lock().await;
3731            let tx = client_guard.transaction().await?;
3732
3733            // Register listener before fetching from database.
3734            // This is a best-effort mechanism that shortens the polling time, if it
3735            // does not detect the response it will sleep using `timeout_fut`. Consumers
3736            // are expected to poll this function in a loop.
3737            // Currently the notification mechanism only works on a single node deployment.
3738            let (sender, receiver) = oneshot::channel();
3739            self.response_subscribers
3740                .lock()
3741                .unwrap()
3742                .insert(execution_id.clone(), (sender, unique_tag));
3743
3744            let responses = get_responses_after(&tx, execution_id, last_response).await?;
3745
3746            if responses.is_empty() {
3747                // Commit read transaction
3748                tx.commit().await.map_err(|err| {
3749                    cleanup(); // Remove the just inserted subscriber.
3750                    DbErrorRead::from(err)
3751                })?;
3752                receiver
3753            } else {
3754                cleanup(); // Remove the just inserted subscriber as we already have the answer.
3755                tx.commit().await?;
3756                return Ok(responses);
3757            }
3758        };
3759
3760        let res = tokio::select! {
3761            resp = receiver => {
3762                match resp {
3763                    Ok(resp) => Ok(vec![resp]),
3764                    Err(_) => Err(DbErrorReadWithTimeout::from(DbErrorGeneric::Close)),
3765                }
3766            }
3767            outcome = timeout_fut => Err(DbErrorReadWithTimeout::Timeout(outcome)),
3768        };
3769
3770        cleanup();
3771        res
3772    }
3773
3774    #[instrument(level = Level::DEBUG, skip(self, timeout_fut))]
3775    async fn wait_for_finished_result(
3776        &self,
3777        execution_id: &ExecutionId,
3778        timeout_fut: Option<Pin<Box<dyn Future<Output = TimeoutOutcome> + Send>>>,
3779    ) -> Result<SupportedFunctionReturnValue, DbErrorReadWithTimeout> {
3780        let unique_tag: u64 = rand::random();
3781        let execution_id_clone = execution_id.clone();
3782
3783        let cleanup = || {
3784            let mut guard = self.execution_finished_subscribers.lock().unwrap();
3785            if let Some(subscribers) = guard.get_mut(&execution_id_clone) {
3786                subscribers.remove(&unique_tag);
3787            }
3788        };
3789
3790        let receiver = {
3791            let mut client_guard = self.client.lock().await;
3792            let tx = client_guard.transaction().await?;
3793
3794            // Register listener
3795            let (sender, receiver) = oneshot::channel();
3796            {
3797                let mut guard = self.execution_finished_subscribers.lock().unwrap();
3798                guard
3799                    .entry(execution_id.clone())
3800                    .or_default()
3801                    .insert(unique_tag, sender);
3802            }
3803
3804            let pending_state = get_combined_state(&tx, execution_id)
3805                .await?
3806                .execution_with_state
3807                .pending_state;
3808
3809            if let PendingState::Finished(finished) = pending_state {
3810                let event = get_execution_event(&tx, execution_id, finished.version).await?;
3811                tx.commit().await?;
3812                cleanup();
3813
3814                if let ExecutionRequest::Finished { retval, .. } = event.event {
3815                    return Ok(retval);
3816                }
3817                error!("Mismatch, expected Finished row: {event:?} based on t_state {finished}");
3818                return Err(DbErrorReadWithTimeout::from(consistency_db_err(
3819                    "cannot get finished event based on t_state version",
3820                )));
3821            }
3822            tx.commit().await?;
3823            receiver
3824        };
3825
3826        let timeout_fut = timeout_fut.unwrap_or_else(|| Box::pin(std::future::pending()));
3827        let res = tokio::select! {
3828            resp = receiver => {
3829                match resp {
3830                    Ok(retval) => Ok(retval),
3831                    Err(_recv_err) => Err(DbErrorGeneric::Close.into())
3832                }
3833            }
3834            outcome = timeout_fut => Err(DbErrorReadWithTimeout::Timeout(outcome)),
3835        };
3836
3837        cleanup();
3838        res
3839    }
3840
3841    #[instrument(level = Level::DEBUG, skip_all, fields(%join_set_id, %execution_id))]
3842    async fn append_delay_response(
3843        &self,
3844        created_at: DateTime<Utc>,
3845        execution_id: ExecutionId,
3846        join_set_id: JoinSetId,
3847        delay_id: DelayId,
3848        result: Result<(), ()>,
3849    ) -> Result<AppendDelayResponseOutcome, DbErrorWrite> {
3850        debug!("append_delay_response");
3851        let event = JoinSetResponseEventOuter {
3852            created_at,
3853            event: JoinSetResponseEvent {
3854                join_set_id,
3855                event: JoinSetResponse::DelayFinished {
3856                    delay_id: delay_id.clone(),
3857                    result,
3858                },
3859            },
3860        };
3861
3862        let mut client_guard = self.client.lock().await;
3863        let tx = client_guard.transaction().await?;
3864
3865        let res = append_response(&tx, &execution_id, event).await;
3866
3867        match res {
3868            Ok(notifier) => {
3869                tx.commit().await?;
3870                drop(client_guard);
3871                self.notify_all(vec![notifier], created_at);
3872                Ok(AppendDelayResponseOutcome::Success)
3873            }
3874            Err(DbErrorWrite::NonRetriable(DbErrorWriteNonRetriable::Conflict)) => {
3875                // Check if already finished
3876                // Roll back the failed tx, start new read tx.
3877                tx.rollback().await?;
3878
3879                // Start new tx for check
3880                let tx = client_guard.transaction().await?;
3881                let delay_success = delay_response(&tx, &execution_id, &delay_id).await?;
3882                tx.commit().await?;
3883
3884                match delay_success {
3885                    Some(true) => Ok(AppendDelayResponseOutcome::AlreadyFinished),
3886                    Some(false) => Ok(AppendDelayResponseOutcome::AlreadyCancelled),
3887                    None => Err(DbErrorWrite::Generic(consistency_db_err(
3888                        "insert failed yet select did not find the response",
3889                    ))),
3890                }
3891            }
3892            Err(err) => {
3893                let _ = tx.rollback().await; // cleanup
3894                Err(err)
3895            }
3896        }
3897    }
3898
3899    #[instrument(level = Level::DEBUG, skip_all)]
3900    async fn append_backtrace(&self, append: BacktraceInfo) -> Result<(), DbErrorWrite> {
3901        debug!("append_backtrace");
3902        let mut client_guard = self.client.lock().await;
3903        let tx = client_guard.transaction().await?;
3904
3905        append_backtrace(&tx, &append).await?;
3906
3907        tx.commit().await?;
3908        Ok(())
3909    }
3910
3911    #[instrument(level = Level::DEBUG, skip_all)]
3912    async fn append_backtrace_batch(&self, batch: Vec<BacktraceInfo>) -> Result<(), DbErrorWrite> {
3913        debug!("append_backtrace_batch");
3914        let mut client_guard = self.client.lock().await;
3915        let tx = client_guard.transaction().await?;
3916
3917        for append in batch {
3918            append_backtrace(&tx, &append).await?;
3919        }
3920
3921        tx.commit().await?;
3922        Ok(())
3923    }
3924
3925    #[instrument(level = Level::DEBUG, skip_all)]
3926    async fn append_log(&self, row: LogInfoAppendRow) -> Result<(), DbErrorWrite> {
3927        trace!("append_log");
3928        let mut client_guard = self.client.lock().await;
3929        let tx = client_guard.transaction().await?;
3930        append_log(&tx, &row).await?;
3931        tx.commit().await?;
3932
3933        Ok(())
3934    }
3935
3936    #[instrument(level = Level::DEBUG, skip_all)]
3937    async fn append_log_batch(&self, batch: &[LogInfoAppendRow]) -> Result<(), DbErrorWrite> {
3938        trace!("append_log_batch");
3939        let mut client_guard = self.client.lock().await;
3940        let tx = client_guard.transaction().await?;
3941        for row in batch {
3942            append_log(&tx, row).await?;
3943        }
3944        tx.commit().await?;
3945        Ok(())
3946    }
3947
3948    /// Get currently expired delays and locks.
3949    #[instrument(level = Level::TRACE, skip(self))]
3950    async fn get_expired_timers(
3951        &self,
3952        at: DateTime<Utc>,
3953    ) -> Result<Vec<ExpiredTimer>, DbErrorGeneric> {
3954        let mut client_guard = self.client.lock().await;
3955        let tx = client_guard.transaction().await?;
3956
3957        // Expired Delays
3958        let rows = tx
3959            .query(
3960                "SELECT execution_id, join_set_id, delay_id FROM t_delay WHERE expires_at <= $1",
3961                &[&at],
3962            )
3963            .await?;
3964
3965        let mut expired_timers = Vec::with_capacity(rows.len());
3966        for row in rows {
3967            let unpack = || -> Result<ExpiredTimer, DbErrorGeneric> {
3968                let execution_id: String = get(&row, "execution_id")?;
3969                let execution_id = ExecutionId::from_str(&execution_id)?;
3970                let join_set_id: String = get(&row, "join_set_id")?;
3971                let join_set_id = JoinSetId::from_str(&join_set_id)?;
3972                let delay_id: String = get(&row, "delay_id")?;
3973                let delay_id = DelayId::from_str(&delay_id)?;
3974
3975                Ok(ExpiredTimer::Delay(ExpiredDelay {
3976                    execution_id,
3977                    join_set_id,
3978                    delay_id,
3979                }))
3980            };
3981
3982            match unpack() {
3983                Ok(timer) => expired_timers.push(timer),
3984                Err(err) => warn!("Skipping corrupted row in get_expired_timers (delays): {err:?}"),
3985            }
3986        }
3987
3988        // Expired Locks
3989        let rows = tx.query(
3990            &format!(
3991                "SELECT execution_id, last_lock_version, corresponding_version, intermittent_event_count, max_retries, retry_exp_backoff_millis, executor_id, run_id \
3992                 FROM t_state \
3993                 WHERE pending_expires_finished <= $1 AND state = '{STATE_LOCKED}'"
3994            ),
3995            &[&at]
3996        ).await?;
3997
3998        for row in rows {
3999            let unpack = || -> Result<ExpiredTimer, DbErrorGeneric> {
4000                let execution_id: String = get(&row, "execution_id")?;
4001                let execution_id = ExecutionId::from_str(&execution_id)?;
4002                let last_lock_version: i64 = get(&row, "last_lock_version")?;
4003                let last_lock_version = Version::try_from(last_lock_version)?;
4004
4005                let corresponding_version: i64 = get(&row, "corresponding_version")?;
4006                let corresponding_version = Version::try_from(corresponding_version)?;
4007
4008                let intermittent_event_count =
4009                    u32::try_from(get::<i64, _>(&row, "intermittent_event_count")?).map_err(
4010                        |_| consistency_db_err("`intermittent_event_count` must not be negative"),
4011                    )?;
4012
4013                let max_retries = get::<Option<i64>, _>(&row, "max_retries")?
4014                    .map(u32::try_from)
4015                    .transpose()
4016                    .map_err(|_| consistency_db_err("`max_retries` must not be negative"))?;
4017                let retry_exp_backoff_millis =
4018                    u32::try_from(get::<i64, _>(&row, "retry_exp_backoff_millis")?).map_err(
4019                        |_| consistency_db_err("`retry_exp_backoff_millis` must not be negative"),
4020                    )?;
4021                let executor_id: String = get(&row, "executor_id")?;
4022                let executor_id = ExecutorId::from_str(&executor_id)?;
4023                let run_id: String = get(&row, "run_id")?;
4024                let run_id = RunId::from_str(&run_id)?;
4025
4026                Ok(ExpiredTimer::Lock(ExpiredLock {
4027                    execution_id,
4028                    locked_at_version: last_lock_version,
4029                    next_version: corresponding_version.increment(),
4030                    intermittent_event_count,
4031                    max_retries,
4032                    retry_exp_backoff: Duration::from_millis(u64::from(retry_exp_backoff_millis)),
4033                    locked_by: LockedBy {
4034                        executor_id,
4035                        run_id,
4036                    },
4037                }))
4038            };
4039
4040            match unpack() {
4041                Ok(timer) => expired_timers.push(timer),
4042                Err(err) => warn!("Skipping corrupted row in get_expired_timers (locks): {err:?}"),
4043            }
4044        }
4045
4046        tx.commit().await?;
4047
4048        if !expired_timers.is_empty() {
4049            debug!("get_expired_timers found {expired_timers:?}");
4050        }
4051        Ok(expired_timers)
4052    }
4053
4054    async fn get_execution_event(
4055        &self,
4056        execution_id: &ExecutionId,
4057        version: &Version,
4058    ) -> Result<ExecutionEvent, DbErrorRead> {
4059        let mut client_guard = self.client.lock().await;
4060        let tx = client_guard.transaction().await?;
4061
4062        let event = get_execution_event(&tx, execution_id, version.0).await?;
4063
4064        tx.commit().await?;
4065        Ok(event)
4066    }
4067
4068    async fn get_pending_state(
4069        &self,
4070        execution_id: &ExecutionId,
4071    ) -> Result<ExecutionWithState, DbErrorRead> {
4072        let mut client_guard = self.client.lock().await;
4073        let tx = client_guard.transaction().await?;
4074
4075        let combined_state = get_combined_state(&tx, execution_id).await?;
4076
4077        tx.commit().await?;
4078        Ok(combined_state.execution_with_state)
4079    }
4080}
4081
4082#[async_trait]
4083impl DbExternalApi for PostgresConnection {
4084    #[instrument(skip(self))]
4085    async fn get_backtrace(
4086        &self,
4087        execution_id: &ExecutionId,
4088        filter: BacktraceFilter,
4089    ) -> Result<BacktraceInfo, DbErrorRead> {
4090        debug!("get_backtrace");
4091
4092        let mut client_guard = self.client.lock().await;
4093        let tx = client_guard.transaction().await?;
4094
4095        let mut params: Vec<Box<dyn tokio_postgres::types::ToSql + Sync + Send>> = Vec::new();
4096
4097        params.push(Box::new(execution_id.to_string())); // $1
4098        let p_execution_id_idx = format!("${}", params.len()); // $1
4099
4100        let mut sql = String::new();
4101        write!(
4102            &mut sql,
4103            "SELECT component_id, version_min_including, version_max_excluding, wasm_backtrace \
4104            FROM t_execution_backtrace e INNER JOIN t_wasm_backtrace w ON e.backtrace_hash = w.backtrace_hash \
4105            WHERE execution_id = {p_execution_id_idx}"
4106        )
4107        .unwrap();
4108
4109        match &filter {
4110            BacktraceFilter::Specific(version) => {
4111                params.push(Box::new(i64::from(version.0))); // $2
4112                let p_ver_idx = format!("${}", params.len()); // $2
4113                write!(
4114                    &mut sql,
4115                    " AND version_min_including <= {p_ver_idx} AND version_max_excluding > {p_ver_idx}"
4116                )
4117                .unwrap();
4118            }
4119            BacktraceFilter::First => {
4120                sql.push_str(" ORDER BY version_min_including LIMIT 1");
4121            }
4122            BacktraceFilter::Last => {
4123                sql.push_str(" ORDER BY version_min_including DESC LIMIT 1");
4124            }
4125        }
4126
4127        let params_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> =
4128            params.iter().map(|p| p.as_ref() as _).collect();
4129
4130        let row = tx.query_one(&sql, &params_refs).await?;
4131
4132        let component_id: Json<ComponentId> = get(&row, "component_id")?;
4133        let component_id = component_id.0;
4134
4135        let version_min_including =
4136            Version::try_from(get::<i64, _>(&row, "version_min_including")?)?;
4137
4138        let version_max_excluding =
4139            Version::try_from(get::<i64, _>(&row, "version_max_excluding")?)?;
4140
4141        // wasm_backtrace stored as JSONB
4142        let wasm_backtrace: Json<WasmBacktrace> = get(&row, "wasm_backtrace")?;
4143        let wasm_backtrace = wasm_backtrace.0;
4144
4145        tx.commit().await?;
4146
4147        Ok(BacktraceInfo {
4148            execution_id: execution_id.clone(),
4149            component_id,
4150            version_min_including,
4151            version_max_excluding,
4152            wasm_backtrace,
4153        })
4154    }
4155
4156    #[instrument(skip_all)]
4157    async fn upsert_source_file(
4158        &self,
4159        component_digest: &ComponentDigest,
4160        frame_key: &str,
4161        is_suffix: bool,
4162        content: &str,
4163    ) -> Result<(), DbErrorWrite> {
4164        let content_hash: [u8; 32] = Sha256::digest(content.as_bytes()).into();
4165        let mut client_guard = self.client.lock().await;
4166        let tx = client_guard.transaction().await?;
4167        tx.execute(
4168            "INSERT INTO t_source_file (content_hash, content) \
4169             VALUES ($1, $2) \
4170             ON CONFLICT (content_hash) DO NOTHING",
4171            &[&content_hash.as_slice(), &content],
4172        )
4173        .await?;
4174        tx.execute(
4175            "INSERT INTO t_component_source \
4176             (component_digest, frame_key, is_suffix, content_hash) \
4177             VALUES ($1, $2, $3, $4) \
4178             ON CONFLICT (component_digest, frame_key, is_suffix) DO NOTHING",
4179            &[
4180                &component_digest.as_slice(),
4181                &frame_key,
4182                &is_suffix,
4183                &content_hash.as_slice(),
4184            ],
4185        )
4186        .await?;
4187        tx.commit().await?;
4188        Ok(())
4189    }
4190
4191    #[instrument(skip_all)]
4192    async fn get_source_file(
4193        &self,
4194        component_digest: &ComponentDigest,
4195        file: &str,
4196    ) -> Result<Option<String>, DbErrorRead> {
4197        let mut client_guard = self.client.lock().await;
4198        let tx = client_guard.transaction().await?;
4199        let rows = tx
4200            .query(
4201                "SELECT s.content \
4202                 FROM t_component_source cs \
4203                 JOIN t_source_file s ON cs.content_hash = s.content_hash \
4204                 WHERE cs.component_digest = $1 \
4205                   AND ( \
4206                       (NOT cs.is_suffix AND cs.frame_key = $2) \
4207                    OR (cs.is_suffix AND right($2, length(cs.frame_key)) = cs.frame_key) \
4208                   )",
4209                &[&component_digest.as_slice(), &file],
4210            )
4211            .await?;
4212        tx.commit().await?;
4213        match rows.len() {
4214            0 => Ok(None),
4215            1 => Ok(Some(get::<String, _>(&rows[0], "content")?)),
4216            _ => {
4217                warn!("Multiple suffix matches for '{file}', returning None");
4218                Ok(None)
4219            }
4220        }
4221    }
4222
4223    #[instrument(skip(self))]
4224    async fn list_executions(
4225        &self,
4226        filter: ListExecutionsFilter,
4227        pagination: ExecutionListPagination,
4228    ) -> Result<Vec<ExecutionWithState>, DbErrorGeneric> {
4229        let mut client_guard = self.client.lock().await;
4230        let tx = client_guard.transaction().await?;
4231
4232        let result = list_executions(&tx, filter, &pagination).await?;
4233
4234        tx.commit().await?;
4235        Ok(result)
4236    }
4237
4238    #[instrument(skip(self))]
4239    async fn list_execution_events(
4240        &self,
4241        execution_id: &ExecutionId,
4242        pagination: Pagination<VersionType>,
4243        include_backtrace_id: bool,
4244    ) -> Result<ListExecutionEventsResponse, DbErrorRead> {
4245        let mut client_guard = self.client.lock().await;
4246        let tx = client_guard.transaction().await?;
4247
4248        let events =
4249            list_execution_events(&tx, execution_id, pagination, include_backtrace_id).await?;
4250        let max_version = get_max_version(&tx, execution_id).await?;
4251
4252        tx.commit().await?;
4253        Ok(ListExecutionEventsResponse {
4254            events,
4255            max_version,
4256        })
4257    }
4258
4259    #[instrument(skip(self))]
4260    async fn list_responses(
4261        &self,
4262        execution_id: &ExecutionId,
4263        pagination: Pagination<u32>,
4264    ) -> Result<ListResponsesResponse, DbErrorRead> {
4265        let mut client_guard = self.client.lock().await;
4266        let tx = client_guard.transaction().await?;
4267
4268        let responses = list_responses(&tx, execution_id, Some(pagination)).await?;
4269        let max_cursor = get_max_response_cursor(&tx, execution_id).await?;
4270
4271        tx.commit().await?;
4272        Ok(ListResponsesResponse {
4273            responses,
4274            max_cursor,
4275        })
4276    }
4277
4278    #[instrument(skip(self))]
4279    async fn list_execution_events_responses(
4280        &self,
4281        execution_id: &ExecutionId,
4282        req_since: &Version,
4283        req_max_length: VersionType,
4284        req_include_backtrace_id: bool,
4285        resp_pagination: Pagination<u32>,
4286    ) -> Result<ExecutionWithStateRequestsResponses, DbErrorRead> {
4287        let mut client_guard = self.client.lock().await;
4288        let tx = client_guard.transaction().await?;
4289
4290        let combined_state = get_combined_state(&tx, execution_id).await?;
4291
4292        let events = list_execution_events(
4293            &tx,
4294            execution_id,
4295            Pagination::NewerThan {
4296                length: req_max_length
4297                    .try_into()
4298                    .expect("req_max_length fits in u16"),
4299                cursor: req_since.0,
4300                including_cursor: true,
4301            },
4302            req_include_backtrace_id,
4303        )
4304        .await?;
4305
4306        let responses = list_responses(&tx, execution_id, Some(resp_pagination)).await?;
4307        let max_version = get_max_version(&tx, execution_id).await?;
4308        let max_cursor = get_max_response_cursor(&tx, execution_id).await?;
4309
4310        tx.commit().await?;
4311
4312        Ok(ExecutionWithStateRequestsResponses {
4313            execution_with_state: combined_state.execution_with_state,
4314            events,
4315            responses,
4316            max_version,
4317            max_cursor,
4318        })
4319    }
4320
4321    #[instrument(skip(self))]
4322    async fn upgrade_execution_component(
4323        &self,
4324        execution_id: &ExecutionId,
4325        old: &ComponentDigest,
4326        new: &ComponentDigest,
4327    ) -> Result<(), DbErrorWrite> {
4328        let mut client_guard = self.client.lock().await;
4329        let tx = client_guard.transaction().await?;
4330
4331        upgrade_execution_component(&tx, execution_id, old, new).await?;
4332
4333        tx.commit().await?;
4334        Ok(())
4335    }
4336
4337    #[instrument(skip(self))]
4338    async fn list_logs(
4339        &self,
4340        execution_id: &ExecutionId,
4341        filter: LogFilter,
4342        pagination: Pagination<u32>,
4343    ) -> Result<ListLogsResponse, DbErrorRead> {
4344        let mut client_guard = self.client.lock().await;
4345        let tx = client_guard.transaction().await?;
4346        let responses = list_logs_tx(&tx, execution_id, &filter, &pagination).await?;
4347        tx.commit().await?;
4348        Ok(responses)
4349    }
4350
4351    #[instrument(skip(self))]
4352    async fn list_deployment_states(
4353        &self,
4354        current_time: DateTime<Utc>,
4355        pagination: Pagination<Option<DeploymentId>>,
4356        include_config_json: bool,
4357    ) -> Result<Vec<DeploymentState>, DbErrorRead> {
4358        let mut client_guard = self.client.lock().await;
4359        let tx = client_guard.transaction().await?;
4360        let deployments =
4361            list_deployment_states(&tx, current_time, pagination, include_config_json).await?;
4362        tx.commit().await?;
4363        Ok(deployments)
4364    }
4365
4366    #[instrument(skip(self))]
4367    async fn insert_deployment(&self, record: DeploymentRecord) -> Result<(), DbErrorWrite> {
4368        assert_eq!(
4369            record.status,
4370            DeploymentStatus::Inactive,
4371            "insert_deployment requires Inactive status"
4372        );
4373        assert!(
4374            record.last_active_at.is_none(),
4375            "insert_deployment requires last_active_at == None"
4376        );
4377        let mut client_guard = self.client.lock().await;
4378        let tx = client_guard.transaction().await?;
4379        tx.execute(
4380            "INSERT INTO t_deployment \
4381             (deployment_id, created_at, status, config_json, obelisk_version, created_by) \
4382             VALUES ($1, $2, $3, $4, $5, $6)",
4383            &[
4384                &record.deployment_id.to_string(), // $1
4385                &record.created_at,                // $2
4386                &record.status.as_str(),           // $3
4387                &record.config_json,               // $4
4388                &record.obelisk_version,           // $5
4389                &record.created_by,                // $6
4390            ],
4391        )
4392        .await?;
4393        tx.commit().await?;
4394        Ok(())
4395    }
4396
4397    #[instrument(skip(self))]
4398    async fn activate_deployment(
4399        &self,
4400        deployment_id: DeploymentId,
4401        now: DateTime<Utc>,
4402    ) -> Result<(), DbErrorWrite> {
4403        let mut client_guard = self.client.lock().await;
4404        let tx = client_guard.transaction().await?;
4405        // Demote currently active or enqueued deployment to inactive.
4406        tx.execute(
4407            "UPDATE t_deployment SET status = 'inactive' WHERE status IN ('active', 'enqueued')",
4408            &[],
4409        )
4410        .await?;
4411        // Set target deployment to active, recording activation time.
4412        let rows = tx
4413            .execute(
4414                "UPDATE t_deployment SET status = 'active', last_active_at = $1 WHERE deployment_id = $2",
4415                &[&now, &deployment_id.to_string()],
4416            )
4417            .await?;
4418        tx.commit().await?;
4419        if rows == 0 {
4420            return Err(DbErrorWrite::NotFound);
4421        }
4422        Ok(())
4423    }
4424
4425    async fn enqueue_deployment(&self, deployment_id: DeploymentId) -> Result<(), DbErrorWrite> {
4426        let mut client_guard = self.client.lock().await;
4427        let tx = client_guard.transaction().await?;
4428        // Guard: reject if target deployment is currently active.
4429        let status_opt = tx
4430            .query_opt(
4431                "SELECT status FROM t_deployment WHERE deployment_id = $1",
4432                &[&deployment_id.to_string()],
4433            )
4434            .await?;
4435        match status_opt.as_ref().map(|r| r.get::<_, &str>("status")) {
4436            None => return Err(DbErrorWrite::NotFound),
4437            Some("active") => return Err(DbErrorWriteNonRetriable::Conflict.into()),
4438            _ => {}
4439        }
4440        // Demote any previously enqueued deployment to inactive.
4441        tx.execute(
4442            "UPDATE t_deployment SET status = 'inactive' WHERE status = 'enqueued'",
4443            &[],
4444        )
4445        .await?;
4446        // Set target deployment to enqueued.
4447        let rows = tx
4448            .execute(
4449                "UPDATE t_deployment SET status = 'enqueued' WHERE deployment_id = $1",
4450                &[&deployment_id.to_string()],
4451            )
4452            .await?;
4453        tx.commit().await?;
4454        if rows == 0 {
4455            return Err(DbErrorWrite::NotFound);
4456        }
4457        Ok(())
4458    }
4459
4460    #[instrument(skip(self))]
4461    async fn get_deployment(
4462        &self,
4463        deployment_id: DeploymentId,
4464    ) -> Result<Option<DeploymentRecord>, DbErrorRead> {
4465        let mut client_guard = self.client.lock().await;
4466        let tx = client_guard.transaction().await?;
4467        let row = tx
4468            .query_opt(
4469                "SELECT deployment_id, created_at, last_active_at, status, config_json, obelisk_version, created_by \
4470                 FROM t_deployment WHERE deployment_id = $1",
4471                &[&deployment_id.to_string()],
4472            )
4473            .await?;
4474        tx.commit().await?;
4475        match row {
4476            None => Ok(None),
4477            Some(r) => Ok(Some(deployment_record_from_pg_row(&r)?)),
4478        }
4479    }
4480
4481    #[instrument(skip(self))]
4482    #[cfg(feature = "test")]
4483    async fn get_active_deployment(&self) -> Result<Option<DeploymentRecord>, DbErrorRead> {
4484        let mut client_guard = self.client.lock().await;
4485        let tx = client_guard.transaction().await?;
4486        let row = tx
4487            .query_opt(
4488                "SELECT deployment_id, created_at, last_active_at, status, config_json, obelisk_version, created_by \
4489                 FROM t_deployment WHERE status = 'active' LIMIT 1",
4490                &[],
4491            )
4492            .await?;
4493        tx.commit().await?;
4494        match row {
4495            None => Ok(None),
4496            Some(r) => Ok(Some(deployment_record_from_pg_row(&r)?)),
4497        }
4498    }
4499
4500    async fn get_current_deployment(&self) -> Result<Option<DeploymentRecord>, DbErrorRead> {
4501        let mut client_guard = self.client.lock().await;
4502        let tx = client_guard.transaction().await?;
4503        let row = tx
4504            .query_opt(
4505                "SELECT deployment_id, created_at, last_active_at, status, config_json, obelisk_version, created_by \
4506                 FROM t_deployment WHERE status IN ('enqueued', 'active') \
4507                 ORDER BY CASE status WHEN 'enqueued' THEN 0 ELSE 1 END LIMIT 1",
4508                &[],
4509            )
4510            .await?;
4511        tx.commit().await?;
4512        match row {
4513            None => Ok(None),
4514            Some(r) => Ok(Some(deployment_record_from_pg_row(&r)?)),
4515        }
4516    }
4517
4518    #[instrument(skip(self))]
4519    async fn list_deployments(
4520        &self,
4521        pagination: Pagination<Option<DeploymentId>>,
4522    ) -> Result<Vec<DeploymentRecord>, DbErrorRead> {
4523        let mut client_guard = self.client.lock().await;
4524        let tx = client_guard.transaction().await?;
4525        let mut params: Vec<Box<dyn tokio_postgres::types::ToSql + Sync + Send>> = Vec::new();
4526        let mut add_param = |p: Box<dyn tokio_postgres::types::ToSql + Sync + Send>| {
4527            params.push(p);
4528            format!("${}", params.len())
4529        };
4530
4531        let mut sql = String::from(
4532            "SELECT deployment_id, created_at, last_active_at, status, config_json, obelisk_version, created_by \
4533             FROM t_deployment",
4534        );
4535
4536        if let Some(cursor) = pagination.cursor() {
4537            let p_cursor = add_param(Box::new(cursor.to_string()));
4538            write!(
4539                sql,
4540                " WHERE deployment_id {rel} {p_cursor}",
4541                rel = pagination.rel()
4542            )
4543            .expect("writing to string");
4544        }
4545
4546        let (inner_order, outer_order) = if pagination.is_desc() {
4547            ("DESC", "")
4548        } else {
4549            ("ASC", "DESC")
4550        };
4551
4552        write!(
4553            sql,
4554            " ORDER BY deployment_id {inner_order} LIMIT {limit}",
4555            limit = pagination.length()
4556        )
4557        .expect("writing to string");
4558
4559        let final_sql = if outer_order.is_empty() {
4560            sql
4561        } else {
4562            format!("SELECT * FROM ({sql}) AS sub ORDER BY deployment_id {outer_order}")
4563        };
4564
4565        let params_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> =
4566            params.iter().map(|p| p.as_ref() as _).collect();
4567
4568        let rows = tx.query(&final_sql, &params_refs).await?;
4569        tx.commit().await?;
4570
4571        rows.iter()
4572            .map(deployment_record_from_pg_row)
4573            .collect::<Result<Vec<_>, _>>()
4574    }
4575
4576    #[instrument(skip(self))]
4577    async fn pause_execution(
4578        &self,
4579        execution_id: &ExecutionId,
4580        paused_at: DateTime<Utc>,
4581    ) -> Result<AppendResponse, DbErrorWrite> {
4582        let mut client_guard = self.client.lock().await;
4583        let tx = client_guard.transaction().await?;
4584
4585        let combined_state = get_combined_state(&tx, execution_id).await?;
4586        let appending_version = combined_state.get_next_version_fail_if_finished()?;
4587        debug!("Pausing with {appending_version}");
4588        let (next_version, _) = append(
4589            &tx,
4590            execution_id,
4591            AppendRequest {
4592                created_at: paused_at,
4593                event: ExecutionRequest::Paused,
4594            },
4595            appending_version,
4596        )
4597        .await?;
4598
4599        tx.commit().await?;
4600        Ok(next_version)
4601    }
4602
4603    #[instrument(skip(self))]
4604    async fn unpause_execution(
4605        &self,
4606        execution_id: &ExecutionId,
4607        unpaused_at: DateTime<Utc>,
4608    ) -> Result<AppendResponse, DbErrorWrite> {
4609        let mut client_guard = self.client.lock().await;
4610        let tx = client_guard.transaction().await?;
4611
4612        let combined_state = get_combined_state(&tx, execution_id).await?;
4613        let appending_version = combined_state.get_next_version_fail_if_finished()?;
4614        debug!("Unpausing with {appending_version}");
4615        let (next_version, _) = append(
4616            &tx,
4617            execution_id,
4618            AppendRequest {
4619                created_at: unpaused_at,
4620                event: ExecutionRequest::Unpaused,
4621            },
4622            appending_version,
4623        )
4624        .await?;
4625
4626        tx.commit().await?;
4627        Ok(next_version)
4628    }
4629}
4630
4631#[async_trait]
4632impl DbPoolCloseable for PostgresPool {
4633    async fn close(&self) {
4634        self.pool.close();
4635    }
4636}
4637
4638#[cfg(feature = "test")]
4639#[async_trait]
4640impl concepts::storage::DbConnectionTest for PostgresConnection {
4641    #[instrument(level = Level::DEBUG, skip(self, response_event), fields(join_set_id = %response_event.join_set_id))]
4642    async fn append_response(
4643        &self,
4644        created_at: DateTime<Utc>,
4645        execution_id: ExecutionId,
4646        response_event: JoinSetResponseEvent,
4647    ) -> Result<(), DbErrorWrite> {
4648        debug!("append_response");
4649        let event = JoinSetResponseEventOuter {
4650            created_at,
4651            event: response_event,
4652        };
4653
4654        let mut client_guard = self.client.lock().await;
4655        let tx = client_guard.transaction().await?;
4656
4657        let notifier = append_response(&tx, &execution_id, event).await?;
4658
4659        tx.commit().await?;
4660        drop(client_guard);
4661
4662        self.notify_all(vec![notifier], created_at);
4663        Ok(())
4664    }
4665}
4666
4667#[cfg(feature = "test")]
4668impl PostgresPool {
4669    pub async fn drop_database(&self) {
4670        let mut cfg = deadpool_postgres::Config::new();
4671        cfg.host = Some(self.config.host.clone());
4672        cfg.user = Some(self.config.user.clone());
4673        cfg.password = Some(self.config.password.expose_secret().to_string());
4674        cfg.dbname = Some(ADMIN_DB_NAME.into());
4675        cfg.manager = Some(ManagerConfig {
4676            recycling_method: RecyclingMethod::Fast,
4677        });
4678
4679        let pool = cfg
4680            .create_pool(None, NoTls)
4681            .map_err(|err| {
4682                error!("Cannot create the default pool - {err:?}");
4683                InitializationError
4684            })
4685            .unwrap();
4686
4687        let client = pool
4688            .get()
4689            .await
4690            .map_err(|err| {
4691                error!("Cannot get a connection from the default pool - {err:?}");
4692                InitializationError
4693            })
4694            .unwrap();
4695        for _ in 0..3 {
4696            let res = client
4697                .execute(&format!("DROP DATABASE {}", self.config.db_name), &[])
4698                .await; // Waits 1s on error, no need to sleep more.
4699            if res.is_ok() {
4700                debug!("Database '{}' dropped.", self.config.db_name);
4701                return;
4702            }
4703            debug!("Dropping db failed - {res:?}",);
4704        }
4705        warn!("Did not drop database {}", self.config.db_name);
4706    }
4707}