obeli_sk_db_postgres/
postgres_dao.rs

1use crate::postgres_dao::ddl::{ADMIN_DB_NAME, T_METADATA_EXPECTED_SCHEMA_VERSION};
2use async_trait::async_trait;
3use chrono::{DateTime, Utc};
4use concepts::{
5    ComponentId, ComponentRetryConfig, ContentDigest, ExecutionId, FunctionFqn, JoinSetId,
6    StrVariant, SupportedFunctionReturnValue,
7    component_id::{Digest, InputContentDigest},
8    prefixed_ulid::{DelayId, ExecutionIdDerived, ExecutorId, RunId},
9    storage::{
10        AppendBatchResponse, AppendDelayResponseOutcome, AppendEventsToExecution, AppendRequest,
11        AppendResponse, AppendResponseToExecution, BacktraceFilter, BacktraceInfo, CreateRequest,
12        DUMMY_CREATED, DUMMY_HISTORY_EVENT, DbConnection, DbErrorGeneric, DbErrorRead,
13        DbErrorReadWithTimeout, DbErrorWrite, DbErrorWriteNonRetriable, DbExecutor, DbExternalApi,
14        DbPool, DbPoolCloseable, ExecutionEvent, ExecutionListPagination, ExecutionRequest,
15        ExecutionWithState, ExecutionWithStateRequestsResponses, ExpiredDelay, ExpiredLock,
16        ExpiredTimer, HISTORY_EVENT_TYPE_JOIN_NEXT, HistoryEvent, JoinSetRequest, JoinSetResponse,
17        JoinSetResponseEvent, JoinSetResponseEventOuter, ListExecutionsFilter, LockPendingResponse,
18        Locked, LockedBy, LockedExecution, Pagination, PendingState, PendingStateFinished,
19        PendingStateFinishedResultKind, PendingStateLocked, ResponseWithCursor,
20        STATE_BLOCKED_BY_JOIN_SET, STATE_FINISHED, STATE_LOCKED, STATE_PENDING_AT, TimeoutOutcome,
21        Version, VersionType, WasmBacktrace,
22    },
23};
24use deadpool_postgres::{Client, Config, ManagerConfig, Pool, RecyclingMethod};
25use hashbrown::HashMap;
26use std::{collections::VecDeque, pin::Pin, str::FromStr as _, sync::Arc, time::Duration};
27use std::{fmt::Write as _, panic::Location};
28use tokio::sync::{mpsc, oneshot};
29use tokio_postgres::{
30    NoTls, Row, Transaction,
31    types::{FromSql, Json, ToSql},
32};
33use tracing::{Level, debug, error, info, instrument, trace, warn};
34use tracing_error::SpanTrace;
35
36fn get<'a, T: FromSql<'a>>(row: &'a Row, name: &str) -> Result<T, DbErrorGeneric> {
37    row.try_get(name)
38        .map_err(|err| consistency_db_err(format!("Failed to retrieve column '{name}': {err:?}")))
39}
40
41mod ddl {
42    use concepts::storage::HISTORY_EVENT_TYPE_JOIN_NEXT;
43
44    pub const ADMIN_DB_NAME: &str = "postgres";
45
46    pub const T_METADATA_EXPECTED_SCHEMA_VERSION: i32 = 1;
47
48    // CREATE_TABLE_T_METADATA
49    pub const CREATE_TABLE_T_METADATA: &str = r"
50CREATE TABLE IF NOT EXISTS t_metadata (
51    id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
52    schema_version INTEGER NOT NULL,
53    created_at TIMESTAMPTZ NOT NULL
54);
55";
56
57    // CREATE_TABLE_T_EXECUTION_LOG
58    pub const CREATE_TABLE_T_EXECUTION_LOG: &str = r"
59CREATE TABLE IF NOT EXISTS t_execution_log (
60    execution_id TEXT NOT NULL,
61    created_at TIMESTAMPTZ NOT NULL,
62    json_value JSON NOT NULL,
63    version BIGINT NOT NULL CHECK (version >= 0),
64    variant TEXT NOT NULL,
65    join_set_id TEXT,
66    history_event_type TEXT GENERATED ALWAYS AS (json_value #>> '{history_event,event,type}') STORED,
67    PRIMARY KEY (execution_id, version)
68);
69";
70
71    // Indexes for t_execution_log
72    pub const CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_VERSION: &str = r"
73CREATE INDEX IF NOT EXISTS idx_t_execution_log_execution_id_version ON t_execution_log (execution_id, version);
74";
75
76    pub const CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_VARIANT: &str = r"
77CREATE INDEX IF NOT EXISTS idx_t_execution_log_execution_id_variant ON t_execution_log (execution_id, variant);
78";
79
80    pub const CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_JOIN_SET: &str = const_format::formatcp!(
81        "CREATE INDEX IF NOT EXISTS idx_t_execution_log_execution_id_join_set ON t_execution_log (execution_id, join_set_id, history_event_type) WHERE history_event_type='{}';",
82        HISTORY_EVENT_TYPE_JOIN_NEXT
83    );
84
85    // CREATE_TABLE_T_JOIN_SET_RESPONSE
86    pub const CREATE_TABLE_T_JOIN_SET_RESPONSE: &str = r"
87CREATE TABLE IF NOT EXISTS t_join_set_response (
88    id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
89    created_at TIMESTAMPTZ NOT NULL,
90    execution_id TEXT NOT NULL,
91    join_set_id TEXT NOT NULL,
92
93    delay_id TEXT,
94    delay_success BOOLEAN,
95
96    child_execution_id TEXT,
97    finished_version BIGINT CHECK (finished_version >= 0),
98
99    UNIQUE (execution_id, join_set_id, delay_id, child_execution_id)
100);
101";
102
103    // Indexes for t_join_set_response
104    pub const CREATE_INDEX_IDX_T_JOIN_SET_RESPONSE_EXECUTION_ID_ID: &str = r"
105CREATE INDEX IF NOT EXISTS idx_t_join_set_response_execution_id_id ON t_join_set_response (execution_id, id);
106";
107
108    pub const CREATE_INDEX_IDX_JOIN_SET_RESPONSE_UNIQUE_CHILD_ID: &str = r"
109CREATE UNIQUE INDEX IF NOT EXISTS idx_join_set_response_unique_child_id
110ON t_join_set_response (child_execution_id) WHERE child_execution_id IS NOT NULL;
111";
112
113    pub const CREATE_INDEX_IDX_JOIN_SET_RESPONSE_UNIQUE_DELAY_ID: &str = r"
114CREATE UNIQUE INDEX IF NOT EXISTS idx_join_set_response_unique_delay_id
115ON t_join_set_response (delay_id) WHERE delay_id IS NOT NULL;
116";
117
118    // CREATE_TABLE_T_STATE
119    pub const CREATE_TABLE_T_STATE: &str = r"
120CREATE TABLE IF NOT EXISTS t_state (
121    execution_id TEXT NOT NULL,
122    is_top_level BOOLEAN NOT NULL,
123    corresponding_version BIGINT NOT NULL CHECK (corresponding_version >= 0),
124    ffqn TEXT NOT NULL,
125    created_at TIMESTAMPTZ NOT NULL,
126    component_id_input_digest BYTEA NOT NULL,
127    first_scheduled_at TIMESTAMPTZ NOT NULL,
128
129    pending_expires_finished TIMESTAMPTZ NOT NULL,
130    state TEXT NOT NULL,
131    updated_at TIMESTAMPTZ NOT NULL,
132    intermittent_event_count BIGINT NOT NULL CHECK (intermittent_event_count >=0),
133
134    max_retries BIGINT CHECK (max_retries >= 0),
135    retry_exp_backoff_millis BIGINT CHECK (retry_exp_backoff_millis >= 0),
136    last_lock_version BIGINT CHECK (last_lock_version >= 0),
137    executor_id TEXT,
138    run_id TEXT,
139
140    join_set_id TEXT,
141    join_set_closing BOOLEAN,
142
143    result_kind JSONB,
144
145    PRIMARY KEY (execution_id)
146);
147";
148
149    // Indexes for t_state
150    pub const IDX_T_STATE_LOCK_PENDING: &str = r"
151CREATE INDEX IF NOT EXISTS idx_t_state_lock_pending ON t_state (state, pending_expires_finished, ffqn);
152";
153
154    pub const IDX_T_STATE_EXPIRED_TIMERS: &str = r"
155CREATE INDEX IF NOT EXISTS idx_t_state_expired_timers ON t_state (pending_expires_finished) WHERE executor_id IS NOT NULL;
156";
157
158    pub const IDX_T_STATE_EXECUTION_ID_IS_TOP_LEVEL: &str = r"
159CREATE INDEX IF NOT EXISTS idx_t_state_execution_id_is_root ON t_state (execution_id, is_top_level);
160";
161
162    pub const IDX_T_STATE_FFQN: &str = r"
163CREATE INDEX IF NOT EXISTS idx_t_state_ffqn ON t_state (ffqn);
164";
165
166    pub const IDX_T_STATE_CREATED_AT: &str = r"
167CREATE INDEX IF NOT EXISTS idx_t_state_created_at ON t_state (created_at);
168";
169
170    // CREATE_TABLE_T_DELAY
171    pub const CREATE_TABLE_T_DELAY: &str = r"
172CREATE TABLE IF NOT EXISTS t_delay (
173    execution_id TEXT NOT NULL,
174    join_set_id TEXT NOT NULL,
175    delay_id TEXT NOT NULL,
176    expires_at TIMESTAMPTZ NOT NULL,
177    PRIMARY KEY (execution_id, join_set_id, delay_id)
178);
179";
180
181    // CREATE_TABLE_T_BACKTRACE
182    pub const CREATE_TABLE_T_BACKTRACE: &str = r"
183CREATE TABLE IF NOT EXISTS t_backtrace (
184    execution_id TEXT NOT NULL,
185    component_id JSONB NOT NULL,
186    version_min_including BIGINT NOT NULL CHECK (version_min_including >= 0),
187    version_max_excluding BIGINT NOT NULL CHECK (version_max_excluding >= 0),
188    wasm_backtrace JSONB NOT NULL,
189    PRIMARY KEY (execution_id, version_min_including, version_max_excluding)
190);
191";
192
193    pub const IDX_T_BACKTRACE_EXECUTION_ID_VERSION: &str = r"
194CREATE INDEX IF NOT EXISTS idx_t_backtrace_execution_id_version ON t_backtrace (execution_id, version_min_including, version_max_excluding);
195";
196}
197
198#[derive(derive_more::Debug, Clone)]
199pub struct PostgresConfig {
200    pub host: String,
201    pub user: String,
202    #[debug(skip)]
203    pub password: String,
204    pub db_name: String,
205}
206
207#[derive(Debug, thiserror::Error)]
208#[error("initialization error")]
209pub struct InitializationError;
210
211async fn create_database(
212    config: &PostgresConfig,
213) -> Result<DbInitialzationOutcome, InitializationError> {
214    let mut cfg = Config::new();
215    cfg.host = Some(config.host.clone());
216    cfg.user = Some(config.user.clone());
217    cfg.password = Some(config.password.clone());
218    cfg.dbname = Some(ADMIN_DB_NAME.into());
219    cfg.manager = Some(ManagerConfig {
220        recycling_method: RecyclingMethod::Fast,
221    });
222
223    let pool = cfg.create_pool(None, NoTls).map_err(|err| {
224        error!("Cannot create the default pool - {err:?}");
225        InitializationError
226    })?;
227
228    let client = pool.get().await.map_err(|err| {
229        error!("Cannot get a connection from the default pool - {err:?}");
230        InitializationError
231    })?;
232
233    let row = client
234        .query_opt(
235            &format!(
236                "SELECT 1 FROM pg_database WHERE datname = '{}'",
237                config.db_name
238            ),
239            &[],
240        )
241        .await
242        .map_err(|err| {
243            error!("Cannot select from the default database - {err:?}");
244            InitializationError
245        })?;
246
247    let outcome = if row.is_none() {
248        client
249            .execute(&format!("CREATE DATABASE {}", config.db_name), &[])
250            .await
251            .map_err(|err| {
252                error!("Cannot create the database - {err:?}");
253                InitializationError
254            })?;
255
256        DbInitialzationOutcome::Created
257    } else {
258        DbInitialzationOutcome::Existing
259    };
260    match outcome {
261        DbInitialzationOutcome::Created => info!("Database '{}' created.", config.db_name),
262        DbInitialzationOutcome::Existing => info!("Database '{}' exists.", config.db_name),
263    }
264    Ok(outcome)
265}
266
267// All mutexes here have a very short critical section completely controlled by this module, thus using std mutex.
268type ResponseSubscribers =
269    Arc<std::sync::Mutex<HashMap<ExecutionId, (oneshot::Sender<JoinSetResponseEventOuter>, u64)>>>;
270type PendingSubscribers = Arc<std::sync::Mutex<PendingFfqnSubscribersHolder>>;
271type ExecutionFinishedSubscribers = std::sync::Mutex<
272    HashMap<ExecutionId, HashMap<u64, oneshot::Sender<SupportedFunctionReturnValue>>>,
273>;
274
275pub struct PostgresPool {
276    pool: Pool,
277    response_subscribers: ResponseSubscribers,
278    pending_subscribers: PendingSubscribers,
279    execution_finished_subscribers: Arc<ExecutionFinishedSubscribers>,
280    #[allow(dead_code)] // only for deleting the db in tests.
281    config: PostgresConfig,
282}
283
284#[async_trait]
285impl DbPool for PostgresPool {
286    async fn db_exec_conn(&self) -> Result<Box<dyn DbExecutor>, DbErrorGeneric> {
287        let client = self.pool.get().await?;
288
289        Ok(Box::new(PostgresConnection {
290            client: tokio::sync::Mutex::new(client),
291            response_subscribers: self.response_subscribers.clone(),
292            pending_subscribers: self.pending_subscribers.clone(),
293            execution_finished_subscribers: self.execution_finished_subscribers.clone(),
294        }))
295    }
296
297    async fn connection(&self) -> Result<Box<dyn DbConnection>, DbErrorGeneric> {
298        let client = self.pool.get().await?;
299
300        Ok(Box::new(PostgresConnection {
301            client: tokio::sync::Mutex::new(client),
302            response_subscribers: self.response_subscribers.clone(),
303            pending_subscribers: self.pending_subscribers.clone(),
304            execution_finished_subscribers: self.execution_finished_subscribers.clone(),
305        }))
306    }
307
308    async fn external_api_conn(&self) -> Result<Box<dyn DbExternalApi>, DbErrorGeneric> {
309        let client = self.pool.get().await?;
310
311        Ok(Box::new(PostgresConnection {
312            client: tokio::sync::Mutex::new(client),
313            response_subscribers: self.response_subscribers.clone(),
314            pending_subscribers: self.pending_subscribers.clone(),
315            execution_finished_subscribers: self.execution_finished_subscribers.clone(),
316        }))
317    }
318
319    #[cfg(feature = "test")]
320    async fn connection_test(
321        &self,
322    ) -> Result<Box<dyn concepts::storage::DbConnectionTest>, DbErrorGeneric> {
323        let client = self.pool.get().await?;
324
325        Ok(Box::new(PostgresConnection {
326            client: tokio::sync::Mutex::new(client),
327            response_subscribers: self.response_subscribers.clone(),
328            pending_subscribers: self.pending_subscribers.clone(),
329            execution_finished_subscribers: self.execution_finished_subscribers.clone(),
330        }))
331    }
332}
333
334pub struct PostgresConnection {
335    client: tokio::sync::Mutex<Client>, // Callers should not hold onto a connection for too long but it is not controlled by this module, thus tokio mutex.
336    response_subscribers: ResponseSubscribers,
337    pending_subscribers: PendingSubscribers,
338    execution_finished_subscribers: Arc<ExecutionFinishedSubscribers>,
339}
340
341#[derive(Default)]
342struct PendingFfqnSubscribersHolder {
343    by_ffqns: HashMap<FunctionFqn, (mpsc::Sender<()>, u64)>,
344    by_component: HashMap<InputContentDigest /* input digest */, (mpsc::Sender<()>, u64)>,
345}
346impl PendingFfqnSubscribersHolder {
347    fn notify(&self, notifier: &NotifierPendingAt) {
348        if let Some((subscription, _)) = self.by_ffqns.get(&notifier.ffqn) {
349            debug!("Notifying pending subscriber by ffqn");
350            // Does not block
351            let _ = subscription.try_send(());
352        }
353        if let Some((subscription, _)) = self.by_component.get(&notifier.component_input_digest) {
354            debug!("Notifying pending subscriber by component");
355            // Does not block
356            let _ = subscription.try_send(());
357        }
358    }
359
360    fn insert_ffqn(&mut self, ffqn: FunctionFqn, value: (mpsc::Sender<()>, u64)) {
361        self.by_ffqns.insert(ffqn, value);
362    }
363
364    fn remove_ffqn(&mut self, ffqn: &FunctionFqn) -> Option<(mpsc::Sender<()>, u64)> {
365        self.by_ffqns.remove(ffqn)
366    }
367
368    fn insert_by_component(
369        &mut self,
370        input_content_digest: InputContentDigest,
371        value: (mpsc::Sender<()>, u64),
372    ) {
373        self.by_component.insert(input_content_digest, value);
374    }
375
376    fn remove_by_component(
377        &mut self,
378        input_content_digest: &InputContentDigest,
379    ) -> Option<(mpsc::Sender<()>, u64)> {
380        self.by_component.remove(input_content_digest)
381    }
382}
383
384#[derive(Debug, Clone, Copy, PartialEq, Eq)]
385pub enum ProvisionPolicy {
386    Never,
387    /// Create database if it does not exist.
388    Auto,
389}
390
391#[derive(Debug, Clone, Copy, PartialEq, Eq)]
392pub enum DbInitialzationOutcome {
393    Created,
394    Existing,
395}
396
397impl PostgresPool {
398    #[instrument(skip_all, name = "postgres_new")]
399    pub async fn new(
400        config: PostgresConfig,
401        provision_policy: ProvisionPolicy,
402    ) -> Result<PostgresPool, InitializationError> {
403        Self::new_with_outcome(config, provision_policy)
404            .await
405            .map(|(db, _)| db)
406    }
407
408    pub async fn new_with_outcome(
409        config: PostgresConfig,
410        provision_policy: ProvisionPolicy,
411    ) -> Result<(PostgresPool, DbInitialzationOutcome), InitializationError> {
412        let outcome = if provision_policy == ProvisionPolicy::Auto {
413            create_database(&config).await?
414        } else {
415            DbInitialzationOutcome::Existing
416        };
417        let mut cfg = Config::new();
418        cfg.host = Some(config.host.clone());
419        cfg.user = Some(config.user.clone());
420        cfg.password = Some(config.password.clone());
421        cfg.dbname = Some(config.db_name.clone());
422        cfg.manager = Some(ManagerConfig {
423            recycling_method: RecyclingMethod::Fast,
424        });
425
426        let pool = cfg.create_pool(None, NoTls).map_err(|err| {
427            error!("Cannot create the database pool - {err:?}");
428            InitializationError
429        })?;
430        let client = pool.get().await.map_err(|err| {
431            error!("Cannot get a connection from the database pool - {err:?}");
432            InitializationError
433        })?;
434
435        let statements = vec![
436            ddl::CREATE_TABLE_T_METADATA,
437            ddl::CREATE_TABLE_T_EXECUTION_LOG,
438            ddl::CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_VERSION,
439            ddl::CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_VARIANT,
440            ddl::CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_JOIN_SET,
441            ddl::CREATE_TABLE_T_JOIN_SET_RESPONSE,
442            ddl::CREATE_INDEX_IDX_T_JOIN_SET_RESPONSE_EXECUTION_ID_ID,
443            ddl::CREATE_INDEX_IDX_JOIN_SET_RESPONSE_UNIQUE_CHILD_ID,
444            ddl::CREATE_INDEX_IDX_JOIN_SET_RESPONSE_UNIQUE_DELAY_ID,
445            ddl::CREATE_TABLE_T_STATE,
446            ddl::IDX_T_STATE_LOCK_PENDING,
447            ddl::IDX_T_STATE_EXPIRED_TIMERS,
448            ddl::IDX_T_STATE_EXECUTION_ID_IS_TOP_LEVEL,
449            ddl::IDX_T_STATE_FFQN,
450            ddl::IDX_T_STATE_CREATED_AT,
451            ddl::CREATE_TABLE_T_DELAY,
452            ddl::CREATE_TABLE_T_BACKTRACE,
453            ddl::IDX_T_BACKTRACE_EXECUTION_ID_VERSION,
454        ];
455
456        // Combine into one batch execution for atomicity per round-trip (or efficiency)
457        let batch_sql = statements.join("\n");
458        client.batch_execute(&batch_sql).await.map_err(|err| {
459            error!("Cannot run the DDL import - {err:?}");
460            InitializationError
461        })?;
462
463        let row = client
464            .query_opt(
465                "SELECT schema_version FROM t_metadata ORDER BY id DESC LIMIT 1",
466                &[],
467            )
468            .await
469            .map_err(|err| {
470                error!("Cannot select schema version - {err:?}");
471                InitializationError
472            })?;
473
474        // Postgres INTEGER maps to i32
475        let actual_version = match row {
476            Some(r) => Some(r.try_get::<_, i32>("schema_version").map_err(|e| {
477                error!("Failed to get schema_version column: {e}");
478                InitializationError
479            })?),
480            None => None,
481        };
482
483        match actual_version {
484            None => {
485                client
486                    .execute(
487                        "INSERT INTO t_metadata (schema_version, created_at) VALUES ($1, $2)",
488                        &[&(T_METADATA_EXPECTED_SCHEMA_VERSION), &Utc::now()],
489                    )
490                    .await
491                    .map_err(|err| {
492                        error!("Cannot insert schema version - {err:?}");
493                        InitializationError
494                    })?;
495            }
496            Some(actual_version) => {
497                // Fail on unexpected `schema_version`.
498                if (actual_version) != T_METADATA_EXPECTED_SCHEMA_VERSION {
499                    error!(
500                        "Wrong schema version, expected {T_METADATA_EXPECTED_SCHEMA_VERSION}, got {actual_version}"
501                    );
502                    return Err(InitializationError);
503                }
504            }
505        }
506
507        debug!("Database schema initialized.");
508
509        Ok((
510            PostgresPool {
511                pool,
512                execution_finished_subscribers: Arc::default(),
513                pending_subscribers: Arc::default(),
514                response_subscribers: Arc::default(),
515                config,
516            },
517            outcome,
518        ))
519    }
520}
521
522#[track_caller]
523fn consistency_db_err(reason: impl Into<StrVariant>) -> DbErrorGeneric {
524    DbErrorGeneric::Uncategorized {
525        reason: reason.into(),
526        context: SpanTrace::capture(),
527        source: None,
528        loc: Location::caller(),
529    }
530}
531
532#[derive(Debug)]
533struct CombinedStateDTO {
534    execution_id: ExecutionId,
535    state: String,
536    ffqn: FunctionFqn,
537    component_digest: InputContentDigest,
538    created_at: DateTime<Utc>,
539    first_scheduled_at: DateTime<Utc>,
540    pending_expires_finished: DateTime<Utc>,
541    // Locked:
542    last_lock_version: Option<Version>,
543    executor_id: Option<ExecutorId>,
544    run_id: Option<RunId>,
545    // Blocked by join set:
546    join_set_id: Option<JoinSetId>,
547    join_set_closing: Option<bool>,
548    // Finished:
549    result_kind: Option<PendingStateFinishedResultKind>,
550}
551#[derive(Debug)]
552struct CombinedState {
553    execution_with_state: ExecutionWithState,
554    corresponding_version: Version,
555}
556
557impl CombinedState {
558    fn new(dto: CombinedStateDTO, corresponding_version: Version) -> Result<Self, DbErrorGeneric> {
559        let execution_with_state = match dto {
560            // Pending - just created
561            CombinedStateDTO {
562                execution_id,
563                created_at,
564                first_scheduled_at,
565                state,
566                ffqn,
567                component_digest,
568                pending_expires_finished: scheduled_at,
569                last_lock_version: None,
570                executor_id: None,
571                run_id: None,
572                join_set_id: None,
573                join_set_closing: None,
574                result_kind: None,
575            } if state == STATE_PENDING_AT => ExecutionWithState {
576                component_digest,
577                execution_id,
578                ffqn,
579                created_at,
580                first_scheduled_at,
581                pending_state: PendingState::PendingAt {
582                    scheduled_at,
583                    last_lock: None,
584                },
585            },
586            // Pending, previously locked
587            CombinedStateDTO {
588                execution_id,
589                created_at,
590                first_scheduled_at,
591                state,
592                ffqn,
593                component_digest,
594                pending_expires_finished: scheduled_at,
595                last_lock_version: None,
596                executor_id: Some(executor_id),
597                run_id: Some(run_id),
598                join_set_id: None,
599                join_set_closing: None,
600                result_kind: None,
601            } if state == STATE_PENDING_AT => ExecutionWithState {
602                component_digest,
603                execution_id,
604                ffqn,
605                created_at,
606                first_scheduled_at,
607                pending_state: PendingState::PendingAt {
608                    scheduled_at,
609                    last_lock: Some(LockedBy {
610                        executor_id,
611                        run_id,
612                    }),
613                },
614            },
615            CombinedStateDTO {
616                execution_id,
617                created_at,
618                first_scheduled_at,
619                state,
620                ffqn,
621                component_digest,
622                pending_expires_finished: lock_expires_at,
623                last_lock_version: Some(_),
624                executor_id: Some(executor_id),
625                run_id: Some(run_id),
626                join_set_id: None,
627                join_set_closing: None,
628                result_kind: None,
629            } if state == STATE_LOCKED => ExecutionWithState {
630                component_digest,
631                execution_id,
632                ffqn,
633                created_at,
634                first_scheduled_at,
635                pending_state: PendingState::Locked(PendingStateLocked {
636                    locked_by: LockedBy {
637                        executor_id,
638                        run_id,
639                    },
640                    lock_expires_at,
641                }),
642            },
643            CombinedStateDTO {
644                execution_id,
645                created_at,
646                first_scheduled_at,
647                state,
648                ffqn,
649                component_digest,
650                pending_expires_finished: lock_expires_at,
651                last_lock_version: None,
652                executor_id: _,
653                run_id: _,
654                join_set_id: Some(join_set_id),
655                join_set_closing: Some(join_set_closing),
656                result_kind: None,
657            } if state == STATE_BLOCKED_BY_JOIN_SET => ExecutionWithState {
658                component_digest,
659                execution_id,
660                ffqn,
661                created_at,
662                first_scheduled_at,
663                pending_state: PendingState::BlockedByJoinSet {
664                    join_set_id: join_set_id.clone(),
665                    closing: join_set_closing,
666                    lock_expires_at,
667                },
668            },
669            CombinedStateDTO {
670                execution_id,
671                created_at,
672                first_scheduled_at,
673                state,
674                ffqn,
675                component_digest,
676                pending_expires_finished: finished_at,
677                last_lock_version: None,
678                executor_id: None,
679                run_id: None,
680                join_set_id: None,
681                join_set_closing: None,
682                result_kind: Some(result_kind),
683            } if state == STATE_FINISHED => ExecutionWithState {
684                component_digest,
685                execution_id,
686                ffqn,
687                created_at,
688                first_scheduled_at,
689                pending_state: PendingState::Finished {
690                    finished: PendingStateFinished {
691                        finished_at,
692                        version: corresponding_version.0,
693                        result_kind,
694                    },
695                },
696            },
697            _ => {
698                error!("Cannot deserialize pending state from  {dto:?}");
699                return Err(consistency_db_err("invalid `t_state`"));
700            }
701        };
702        Ok(Self {
703            execution_with_state,
704            corresponding_version,
705        })
706    }
707
708    fn get_next_version_assert_not_finished(&self) -> Version {
709        assert!(!self.execution_with_state.pending_state.is_finished());
710        self.corresponding_version.increment()
711    }
712
713    #[cfg(feature = "test")]
714    fn get_next_version_or_finished(&self) -> Version {
715        if self.execution_with_state.pending_state.is_finished() {
716            self.corresponding_version.clone()
717        } else {
718            self.corresponding_version.increment()
719        }
720    }
721}
722
723#[derive(Debug)]
724struct NotifierPendingAt {
725    scheduled_at: DateTime<Utc>,
726    ffqn: FunctionFqn,
727    component_input_digest: InputContentDigest,
728}
729
730#[derive(Debug)]
731struct NotifierExecutionFinished {
732    execution_id: ExecutionId,
733    retval: SupportedFunctionReturnValue,
734}
735
736#[derive(Debug, Default)]
737struct AppendNotifier {
738    pending_at: Option<NotifierPendingAt>,
739    execution_finished: Option<NotifierExecutionFinished>,
740    response: Option<(ExecutionId, JoinSetResponseEventOuter)>,
741}
742
743#[derive(Debug, Clone)]
744struct DelayReq {
745    join_set_id: JoinSetId,
746    delay_id: DelayId,
747    expires_at: DateTime<Utc>,
748}
749
750async fn fetch_created_event(
751    tx: &Transaction<'_>,
752    execution_id: &ExecutionId,
753) -> Result<CreateRequest, DbErrorRead> {
754    let stmt = "SELECT created_at, json_value FROM t_execution_log WHERE \
755                execution_id = $1 AND version = 0";
756
757    let row = tx.query_one(stmt, &[&execution_id.to_string()]).await?;
758
759    let created_at = get(&row, "created_at")?;
760    let event: Json<ExecutionRequest> = get(&row, "json_value")?;
761    let event = event.0;
762
763    if let ExecutionRequest::Created {
764        ffqn,
765        params,
766        parent,
767        scheduled_at,
768        component_id,
769        metadata,
770        scheduled_by,
771    } = event
772    {
773        Ok(CreateRequest {
774            created_at,
775            execution_id: execution_id.clone(),
776            ffqn,
777            params,
778            parent,
779            scheduled_at,
780            component_id,
781            metadata,
782            scheduled_by,
783        })
784    } else {
785        error!("Row with version=0 must be a `Created` event - {event:?}");
786        Err(consistency_db_err("expected `Created` event").into())
787    }
788}
789
790fn check_expected_next_and_appending_version(
791    expected_version: &Version,
792    appending_version: &Version,
793) -> Result<(), DbErrorWrite> {
794    if *expected_version != *appending_version {
795        debug!(
796            "Version conflict - expected: {expected_version:?}, appending: {appending_version:?}"
797        );
798        return Err(DbErrorWrite::NonRetriable(
799            DbErrorWriteNonRetriable::VersionConflict {
800                expected: expected_version.clone(),
801                requested: appending_version.clone(),
802            },
803        ));
804    }
805    Ok(())
806}
807
808#[instrument(level = Level::TRACE, skip_all, fields(execution_id = %req.execution_id))]
809async fn create_inner(
810    tx: &Transaction<'_>,
811    req: CreateRequest,
812) -> Result<(AppendResponse, AppendNotifier), DbErrorWrite> {
813    trace!("create_inner");
814
815    let version = Version::default();
816    let execution_id = req.execution_id.clone();
817    let execution_id_str = execution_id.to_string();
818    let ffqn = req.ffqn.clone();
819    let created_at = req.created_at;
820    let scheduled_at = req.scheduled_at;
821    let component_id = req.component_id.clone();
822
823    let event = ExecutionRequest::from(req);
824    let event = Json(event);
825
826    tx.execute(
827        "INSERT INTO t_execution_log (
828            execution_id, created_at, version, json_value, variant, join_set_id
829        ) VALUES ($1, $2, $3, $4, $5, $6)",
830        &[
831            &execution_id_str,
832            &created_at,
833            &i64::from(version.0), // BIGINT
834            &event,
835            &event.0.variant(),
836            &event.0.join_set_id().map(std::string::ToString::to_string),
837        ],
838    )
839    .await?;
840
841    let pending_at = {
842        debug!("Creating with `Pending(`{scheduled_at:?}`)");
843
844        tx.execute(
845            r"
846            INSERT INTO t_state (
847                execution_id,
848                is_top_level,
849                corresponding_version,
850                pending_expires_finished,
851                ffqn,
852                state,
853                created_at,
854                component_id_input_digest,
855                updated_at,
856                first_scheduled_at,
857                intermittent_event_count
858            ) VALUES (
859                $1, $2, $3, $4, $5, $6, $7, $8, CURRENT_TIMESTAMP, $9, 0
860            )",
861            &[
862                &execution_id_str,
863                &execution_id.is_top_level(),
864                &i64::from(version.0),
865                &scheduled_at,
866                &ffqn.to_string(),
867                &STATE_PENDING_AT,
868                &created_at,
869                &component_id.input_digest.as_slice(),
870                &scheduled_at,
871            ],
872        )
873        .await?;
874
875        AppendNotifier {
876            pending_at: Some(NotifierPendingAt {
877                scheduled_at,
878                ffqn,
879                component_input_digest: component_id.input_digest,
880            }),
881            execution_finished: None,
882            response: None,
883        }
884    };
885
886    let next_version = Version::new(version.0 + 1);
887    Ok((next_version, pending_at))
888}
889
890#[instrument(level = Level::DEBUG, skip_all, fields(%execution_id, %scheduled_at, %corresponding_version))]
891async fn update_state_pending_after_response_appended(
892    tx: &Transaction<'_>,
893    execution_id: &ExecutionId,
894    scheduled_at: DateTime<Utc>,     // Changing to state PendingAt
895    corresponding_version: &Version, // t_execution_log is not changed
896    component_input_digest: InputContentDigest,
897) -> Result<AppendNotifier, DbErrorWrite> {
898    debug!("Setting t_state to Pending(`{scheduled_at:?}`) after response appended");
899
900    // Convert types for Postgres arguments
901    let execution_id_str = execution_id.to_string();
902    let version = i64::from(corresponding_version.0);
903
904    let updated = tx
905        .execute(
906            r"
907            UPDATE t_state
908            SET
909                corresponding_version = $1,
910                pending_expires_finished = $2,
911                state = $3,
912                updated_at = CURRENT_TIMESTAMP,
913
914                last_lock_version = NULL,
915
916                join_set_id = NULL,
917                join_set_closing = NULL,
918
919                result_kind = NULL
920            WHERE execution_id = $4
921            ",
922            &[
923                &version,          // $1
924                &scheduled_at,     // $2
925                &STATE_PENDING_AT, // $3
926                &execution_id_str, // $4
927            ],
928        )
929        .await?;
930
931    if updated == 0 {
932        return Err(DbErrorWrite::NotFound);
933    }
934
935    Ok(AppendNotifier {
936        pending_at: Some(NotifierPendingAt {
937            scheduled_at,
938            ffqn: fetch_created_event(tx, execution_id).await?.ffqn,
939            component_input_digest,
940        }),
941        execution_finished: None,
942        response: None,
943    })
944}
945
946#[instrument(level = Level::DEBUG, skip_all, fields(%execution_id, %scheduled_at, %appending_version))]
947async fn update_state_pending_after_event_appended(
948    tx: &Transaction<'_>,
949    execution_id: &ExecutionId,
950    appending_version: &Version,
951    scheduled_at: DateTime<Utc>,
952    intermittent_failure: bool,
953    component_input_digest: InputContentDigest,
954) -> Result<(AppendResponse, AppendNotifier), DbErrorWrite> {
955    debug!("Setting t_state to Pending(`{scheduled_at:?}`) after event appended");
956
957    let intermittent_delta = i64::from(intermittent_failure); // 0 or 1
958
959    let updated = tx
960        .execute(
961            r"
962            UPDATE t_state
963            SET
964                corresponding_version = $1,
965                pending_expires_finished = $2,
966                state = $3,
967                updated_at = CURRENT_TIMESTAMP,
968                intermittent_event_count = intermittent_event_count + $4,
969
970                last_lock_version = NULL,
971
972                join_set_id = NULL,
973                join_set_closing = NULL,
974
975                result_kind = NULL
976            WHERE execution_id = $5;
977            ",
978            &[
979                &i64::from(appending_version.0), // $1
980                &scheduled_at,                   // $2
981                &STATE_PENDING_AT,               // $3
982                &intermittent_delta,             // $4
983                &execution_id.to_string(),       // $5
984            ],
985        )
986        .await?;
987
988    if updated != 1 {
989        return Err(DbErrorWrite::NotFound);
990    }
991
992    Ok((
993        appending_version.increment(),
994        AppendNotifier {
995            pending_at: Some(NotifierPendingAt {
996                scheduled_at,
997                ffqn: fetch_created_event(tx, execution_id).await?.ffqn,
998                component_input_digest,
999            }),
1000            execution_finished: None,
1001            response: None,
1002        },
1003    ))
1004}
1005
1006async fn update_state_locked_get_intermittent_event_count(
1007    tx: &Transaction<'_>,
1008    execution_id: &ExecutionId,
1009    executor_id: ExecutorId,
1010    run_id: RunId,
1011    lock_expires_at: DateTime<Utc>,
1012    appending_version: &Version,
1013    retry_config: ComponentRetryConfig,
1014) -> Result<u32, DbErrorWrite> {
1015    debug!("Setting t_state to Locked(`{lock_expires_at:?}`)");
1016    let backoff_millis =
1017        i64::try_from(retry_config.retry_exp_backoff.as_millis()).map_err(|err| {
1018            // BIGINT = i64
1019            DbErrorGeneric::Uncategorized {
1020                reason: "backoff too big".into(),
1021                context: SpanTrace::capture(),
1022                source: Some(Arc::new(err)),
1023                loc: Location::caller(),
1024            }
1025        })?;
1026
1027    let execution_id_str = execution_id.to_string();
1028
1029    let updated = tx
1030        .execute(
1031            r"
1032            UPDATE t_state
1033            SET
1034                corresponding_version = $1,
1035                pending_expires_finished = $2,
1036                state = $3,
1037                updated_at = CURRENT_TIMESTAMP,
1038
1039                max_retries = $4,
1040                retry_exp_backoff_millis = $5,
1041                last_lock_version = $1, -- appending_version again
1042                executor_id = $6,
1043                run_id = $7,
1044
1045                join_set_id = NULL,
1046                join_set_closing = NULL,
1047
1048                result_kind = NULL
1049            WHERE execution_id = $8
1050            ",
1051            &[
1052                &i64::from(appending_version.0),          // $1
1053                &lock_expires_at,                         // $2
1054                &STATE_LOCKED,                            // $3
1055                &retry_config.max_retries.map(i64::from), // $4
1056                &backoff_millis,                          // $5
1057                &executor_id.to_string(),                 // $6
1058                &run_id.to_string(),                      // $7
1059                &execution_id_str,                        // $8
1060            ],
1061        )
1062        .await?;
1063
1064    if updated != 1 {
1065        return Err(DbErrorWrite::NotFound);
1066    }
1067
1068    // fetch intermittent event count
1069    let row = tx
1070        .query_one(
1071            "SELECT intermittent_event_count FROM t_state WHERE execution_id = $1",
1072            &[&execution_id_str],
1073        )
1074        .await
1075        .map_err(DbErrorGeneric::from)?;
1076
1077    let count: i64 = get(&row, "intermittent_event_count")?; // Postgres BIGINT
1078    let count = u32::try_from(count)
1079        .map_err(|_| consistency_db_err("`intermittent_event_count` must not be negative"))?;
1080    Ok(count)
1081}
1082
1083async fn update_state_blocked(
1084    tx: &Transaction<'_>,
1085    execution_id: &ExecutionId,
1086    appending_version: &Version,
1087    join_set_id: &JoinSetId,
1088    lock_expires_at: DateTime<Utc>,
1089    join_set_closing: bool,
1090) -> Result<AppendResponse, DbErrorWrite> {
1091    debug!("Setting t_state to BlockedByJoinSet(`{join_set_id}`)");
1092
1093    let updated = tx
1094        .execute(
1095            r"
1096            UPDATE t_state
1097            SET
1098                corresponding_version = $1,
1099                pending_expires_finished = $2,
1100                state = $3,
1101                updated_at = CURRENT_TIMESTAMP,
1102
1103                last_lock_version = NULL,
1104
1105                join_set_id = $4,
1106                join_set_closing = $5,
1107
1108                result_kind = NULL
1109            WHERE execution_id = $6
1110            ",
1111            &[
1112                &i64::from(appending_version.0), // $1
1113                &lock_expires_at,                // $2
1114                &STATE_BLOCKED_BY_JOIN_SET,      // $3
1115                &join_set_id.to_string(),        // $4
1116                &join_set_closing,               // $5 (BOOLEAN)
1117                &execution_id.to_string(),       // $6
1118            ],
1119        )
1120        .await?;
1121
1122    if updated != 1 {
1123        return Err(DbErrorWrite::NotFound);
1124    }
1125    Ok(appending_version.increment())
1126}
1127
1128async fn update_state_finished(
1129    tx: &Transaction<'_>,
1130    execution_id: &ExecutionId,
1131    appending_version: &Version,
1132    finished_at: DateTime<Utc>,
1133    result_kind: PendingStateFinishedResultKind,
1134) -> Result<(), DbErrorWrite> {
1135    debug!("Setting t_state to Finished");
1136
1137    let result_kind_json = Json(result_kind);
1138
1139    let updated = tx
1140        .execute(
1141            r"
1142            UPDATE t_state
1143            SET
1144                corresponding_version = $1,
1145                pending_expires_finished = $2,
1146                state = $3,
1147                updated_at = CURRENT_TIMESTAMP,
1148
1149                last_lock_version = NULL,
1150                executor_id = NULL,
1151                run_id = NULL,
1152
1153                join_set_id = NULL,
1154                join_set_closing = NULL,
1155
1156                result_kind = $4
1157            WHERE execution_id = $5
1158            ",
1159            &[
1160                &i64::from(appending_version.0), // $1
1161                &finished_at,                    // $2
1162                &STATE_FINISHED,                 // $3
1163                &result_kind_json,               // $4
1164                &execution_id.to_string(),       // $5
1165            ],
1166        )
1167        .await?;
1168
1169    if updated != 1 {
1170        return Err(DbErrorWrite::NotFound);
1171    }
1172    Ok(())
1173}
1174
1175#[instrument(level = Level::TRACE, skip_all, fields(%execution_id, %appending_version))]
1176async fn bump_state_next_version(
1177    tx: &Transaction<'_>,
1178    execution_id: &ExecutionId,
1179    appending_version: &Version,
1180    delay_req: Option<DelayReq>,
1181) -> Result<AppendResponse, DbErrorWrite> {
1182    debug!("update_index_version");
1183    let execution_id_str = execution_id.to_string();
1184
1185    let updated = tx
1186        .execute(
1187            r"
1188            UPDATE t_state
1189            SET
1190                corresponding_version = $1,
1191                updated_at = CURRENT_TIMESTAMP
1192            WHERE execution_id = $2
1193            ",
1194            &[
1195                &i64::from(appending_version.0), // $1
1196                &execution_id_str,               // $2
1197            ],
1198        )
1199        .await?;
1200
1201    if updated != 1 {
1202        return Err(DbErrorWrite::NotFound);
1203    }
1204
1205    if let Some(DelayReq {
1206        join_set_id,
1207        delay_id,
1208        expires_at,
1209    }) = delay_req
1210    {
1211        debug!("Inserting delay to `t_delay`");
1212        tx.execute(
1213            "INSERT INTO t_delay (execution_id, join_set_id, delay_id, expires_at) VALUES ($1, $2, $3, $4)",
1214            &[
1215                &execution_id_str,
1216                &join_set_id.to_string(),
1217                &delay_id.to_string(),
1218                &expires_at,
1219            ],
1220        )
1221        .await?;
1222    }
1223    Ok(appending_version.increment())
1224}
1225
1226async fn get_combined_state(
1227    tx: &Transaction<'_>,
1228    execution_id: &ExecutionId,
1229) -> Result<CombinedState, DbErrorRead> {
1230    let row = tx
1231        .query_one(
1232            r"
1233            SELECT
1234                created_at, first_scheduled_at,
1235                state, ffqn, component_id_input_digest, corresponding_version, pending_expires_finished,
1236                last_lock_version, executor_id, run_id,
1237                join_set_id, join_set_closing,
1238                result_kind
1239            FROM t_state
1240            WHERE execution_id = $1
1241            ",
1242            &[&execution_id.to_string()],
1243        )
1244        .await
1245        .map_err(DbErrorRead::from)?;
1246
1247    // Parsing columns
1248
1249    let created_at: DateTime<Utc> = get(&row, "created_at")?;
1250    let first_scheduled_at: DateTime<Utc> = get(&row, "first_scheduled_at")?;
1251
1252    let digest_bytes: Vec<u8> = get(&row, "component_id_input_digest")?;
1253    let digest = Digest::try_from(digest_bytes.as_slice())
1254        .map_err(|err| consistency_db_err(err.to_string()))?;
1255    let component_id_input_digest = InputContentDigest(ContentDigest(digest));
1256
1257    let state: String = get(&row, "state")?;
1258    let ffqn: String = get(&row, "ffqn")?;
1259    let ffqn = FunctionFqn::from_str(&ffqn).map_err(|parse_err| {
1260        consistency_db_err(format!("invalid ffqn value in `t_state` - {parse_err}"))
1261    })?;
1262
1263    let pending_expires_finished: DateTime<Utc> = get(&row, "pending_expires_finished")?;
1264
1265    let last_lock_version_raw: Option<i64> = get(&row, "last_lock_version")?;
1266    let last_lock_version = last_lock_version_raw
1267        .map(Version::try_from)
1268        .transpose()
1269        .map_err(|_| consistency_db_err("version must be non-negative"))?;
1270
1271    let executor_id_raw: Option<String> = get(&row, "executor_id")?;
1272    let executor_id = executor_id_raw
1273        .map(|id| ExecutorId::from_str(&id))
1274        .transpose()
1275        .map_err(DbErrorGeneric::from)?;
1276
1277    let run_id_raw: Option<String> = get(&row, "run_id")?;
1278    let run_id = run_id_raw
1279        .map(|id| RunId::from_str(&id))
1280        .transpose()
1281        .map_err(DbErrorGeneric::from)?;
1282
1283    let join_set_id_raw: Option<String> = get(&row, "join_set_id")?;
1284    let join_set_id = join_set_id_raw
1285        .map(|id| JoinSetId::from_str(&id))
1286        .transpose()
1287        .map_err(DbErrorGeneric::from)?;
1288
1289    let join_set_closing: Option<bool> = get(&row, "join_set_closing")?;
1290
1291    let result_kind: Option<Json<PendingStateFinishedResultKind>> = get(&row, "result_kind")?;
1292    let result_kind = result_kind.map(|it| it.0);
1293
1294    let corresponding_version = get::<i64>(&row, "corresponding_version")?;
1295    let corresponding_version = Version::new(
1296        VersionType::try_from(corresponding_version)
1297            .map_err(|_| consistency_db_err("version must be non-negative"))?,
1298    );
1299
1300    let dto = CombinedStateDTO {
1301        execution_id: execution_id.clone(),
1302        created_at,
1303        first_scheduled_at,
1304        state,
1305        ffqn,
1306        component_digest: component_id_input_digest,
1307        pending_expires_finished,
1308        last_lock_version,
1309        executor_id,
1310        run_id,
1311        join_set_id,
1312        join_set_closing,
1313        result_kind,
1314    };
1315    CombinedState::new(dto, corresponding_version).map_err(DbErrorRead::from)
1316}
1317
1318async fn list_executions(
1319    read_tx: &Transaction<'_>,
1320    filter: ListExecutionsFilter,
1321    pagination: &ExecutionListPagination,
1322) -> Result<Vec<ExecutionWithState>, DbErrorGeneric> {
1323    // Helper to manage dynamic WHERE clauses and positional parameters ($1, $2...)
1324    struct QueryBuilder {
1325        where_clauses: Vec<String>,
1326        params: Vec<Box<dyn ToSql + Send + Sync>>,
1327    }
1328
1329    impl QueryBuilder {
1330        fn new() -> Self {
1331            Self {
1332                where_clauses: Vec::new(),
1333                params: Vec::new(),
1334            }
1335        }
1336
1337        fn add_param<T>(&mut self, param: T) -> String
1338        where
1339            T: ToSql + Sync + Send + 'static,
1340        {
1341            self.params.push(Box::new(param));
1342            format!("${}", self.params.len())
1343        }
1344
1345        fn add_where(&mut self, clause: String) {
1346            self.where_clauses.push(clause);
1347        }
1348    }
1349
1350    let mut qb = QueryBuilder::new();
1351
1352    // Pagination Logic
1353    let (limit, limit_desc) = match pagination {
1354        ExecutionListPagination::CreatedBy(p) => {
1355            let limit = p.length();
1356            let is_desc = p.is_desc();
1357            if let Some(cursor) = p.cursor() {
1358                let placeholder = qb.add_param(*cursor);
1359                qb.add_where(format!("created_at {} {placeholder}", p.rel()));
1360            }
1361            (limit, is_desc)
1362        }
1363        ExecutionListPagination::ExecutionId(p) => {
1364            let limit = p.length();
1365            let is_desc = p.is_desc();
1366            if let Some(cursor) = p.cursor() {
1367                let placeholder = qb.add_param(cursor.to_string());
1368                qb.add_where(format!("execution_id {} {placeholder}", p.rel()));
1369            }
1370            (limit, is_desc)
1371        }
1372    };
1373
1374    if !filter.show_derived {
1375        qb.add_where("is_top_level = true".to_string());
1376    }
1377
1378    if let Some(ffqn_prefix) = filter.ffqn_prefix {
1379        let placeholder = qb.add_param(format!("{ffqn_prefix}%")); // added %
1380        qb.add_where(format!("ffqn LIKE {placeholder}"));
1381    }
1382    if filter.hide_finished {
1383        qb.add_where(format!("state != '{STATE_FINISHED}'"));
1384    }
1385    if let Some(prefix) = filter.execution_id_prefix {
1386        let placeholder = qb.add_param(format!("{prefix}%")); // added %
1387        qb.add_where(format!("execution_id LIKE {placeholder}"));
1388    }
1389
1390    let where_str = if qb.where_clauses.is_empty() {
1391        String::new()
1392    } else {
1393        format!("WHERE {}", qb.where_clauses.join(" AND "))
1394    };
1395
1396    let order_col = match pagination {
1397        ExecutionListPagination::CreatedBy(_) => "created_at",
1398        ExecutionListPagination::ExecutionId(_) => "execution_id",
1399    };
1400
1401    let desc_str = if limit_desc { "DESC" } else { "" };
1402
1403    let sql = format!(
1404        r"
1405            SELECT created_at, first_scheduled_at, component_id_input_digest,
1406            state, execution_id, ffqn, corresponding_version, pending_expires_finished,
1407            last_lock_version, executor_id, run_id,
1408            join_set_id, join_set_closing,
1409            result_kind
1410            FROM t_state {where_str} ORDER BY {order_col} {desc_str} LIMIT {limit}
1411            "
1412    );
1413
1414    let params_refs: Vec<&(dyn ToSql + Sync)> = qb
1415        .params
1416        .iter()
1417        .map(|p| p.as_ref() as &(dyn ToSql + Sync))
1418        .collect();
1419
1420    let rows = read_tx.query(&sql, &params_refs).await?;
1421
1422    let mut vec = Vec::with_capacity(rows.len());
1423
1424    for row in rows {
1425        // If parsing of the row fails, log and skip it.
1426        let unpack = || -> Result<ExecutionWithState, DbErrorGeneric> {
1427            let execution_id_str: String = get(&row, "execution_id")?;
1428            let execution_id = ExecutionId::from_str(&execution_id_str)
1429                .map_err(|err| consistency_db_err(err.to_string()))?;
1430
1431            let digest_bytes: Vec<u8> = get(&row, "component_id_input_digest")?;
1432            let digest = Digest::try_from(digest_bytes.as_slice())
1433                .map_err(|err| consistency_db_err(err.to_string()))?;
1434            let component_id_input_digest = InputContentDigest(ContentDigest(digest));
1435
1436            let created_at: DateTime<Utc> = get(&row, "created_at")?;
1437            let first_scheduled_at: DateTime<Utc> = get(&row, "first_scheduled_at")?;
1438
1439            let result_kind: Option<Json<PendingStateFinishedResultKind>> =
1440                get(&row, "result_kind")?;
1441            let result_kind = result_kind.map(|it| it.0);
1442
1443            let corresponding_version: i64 = get(&row, "corresponding_version")?;
1444            let corresponding_version = Version::try_from(corresponding_version)
1445                .map_err(|_| consistency_db_err("version must be non-negative"))?;
1446
1447            let executor_id_str: Option<String> = get(&row, "executor_id")?;
1448            let executor_id = executor_id_str
1449                .map(|id| ExecutorId::from_str(&id))
1450                .transpose()?;
1451
1452            let last_lock_version_raw: Option<i64> = get(&row, "last_lock_version")?;
1453            let last_lock_version = last_lock_version_raw
1454                .map(Version::try_from)
1455                .transpose()
1456                .map_err(|_| consistency_db_err("version must be non-negative"))?;
1457
1458            let run_id_str: Option<String> = get(&row, "run_id")?;
1459            let run_id = run_id_str.map(|id| RunId::from_str(&id)).transpose()?;
1460
1461            let join_set_id_str: Option<String> = get(&row, "join_set_id")?;
1462            let join_set_id = join_set_id_str
1463                .map(|id| JoinSetId::from_str(&id))
1464                .transpose()?;
1465
1466            let ffqn: String = get(&row, "ffqn")?;
1467            let ffqn = FunctionFqn::from_str(&ffqn).map_err(|parse_err| {
1468                error!("Error parsing ffqn - {parse_err:?}");
1469                consistency_db_err("invalid ffqn value in `t_state`")
1470            })?;
1471
1472            let combined_state_dto = CombinedStateDTO {
1473                execution_id,
1474                created_at,
1475                first_scheduled_at,
1476                component_digest: component_id_input_digest,
1477                state: get(&row, "state")?,
1478                ffqn,
1479                pending_expires_finished: get(&row, "pending_expires_finished")?,
1480                executor_id,
1481                last_lock_version,
1482                run_id,
1483                join_set_id,
1484                join_set_closing: get(&row, "join_set_closing")?,
1485                result_kind,
1486            };
1487
1488            let combined_state = CombinedState::new(combined_state_dto, corresponding_version)?;
1489
1490            Ok(combined_state.execution_with_state)
1491        };
1492
1493        match unpack() {
1494            Ok(execution) => vec.push(execution),
1495            Err(err) => {
1496                warn!("Skipping corrupted row in t_state: {err:?}");
1497            }
1498        }
1499    }
1500
1501    if !limit_desc {
1502        // the list must be sorted in descending order
1503        vec.reverse();
1504    }
1505    Ok(vec)
1506}
1507
1508async fn list_responses(
1509    tx: &Transaction<'_>,
1510    execution_id: &ExecutionId,
1511    pagination: Option<Pagination<u32>>,
1512) -> Result<Vec<ResponseWithCursor>, DbErrorRead> {
1513    // Helper to manage params dynamically
1514    let mut params: Vec<Box<dyn ToSql + Send + Sync>> = Vec::new();
1515    let mut add_param = |p: Box<dyn ToSql + Send + Sync>| {
1516        params.push(p);
1517        format!("${}", params.len())
1518    };
1519
1520    // 1. Base Query
1521    let p_execution_id = add_param(Box::new(execution_id.to_string()));
1522
1523    let mut sql = format!(
1524        "SELECT \
1525            r.id, r.created_at, r.join_set_id, r.delay_id, r.delay_success, r.child_execution_id, r.finished_version, l.json_value \
1526            FROM t_join_set_response r LEFT OUTER JOIN t_execution_log l ON r.child_execution_id = l.execution_id \
1527            WHERE \
1528            r.execution_id = {p_execution_id} \
1529            AND ( r.finished_version = l.version OR r.child_execution_id IS NULL )"
1530    );
1531
1532    // 2. Pagination Logic
1533    let limit = match &pagination {
1534        Some(p @ (Pagination::NewerThan { cursor, .. } | Pagination::OlderThan { cursor, .. })) => {
1535            // Postgres BIGINT is i64.
1536            let p_cursor = add_param(Box::new(i64::from(*cursor)));
1537
1538            // Add WHERE clause for cursor
1539            write!(sql, " AND r.id {} {}", p.rel(), p_cursor).unwrap();
1540
1541            Some(p.length())
1542        }
1543        None => None,
1544    };
1545
1546    // 3. Ordering
1547    sql.push_str(" ORDER BY r.id");
1548    if pagination.as_ref().is_some_and(Pagination::is_desc) {
1549        sql.push_str(" DESC");
1550    }
1551
1552    // 4. Limit
1553    if let Some(limit) = limit {
1554        // Postgres limit expects i64
1555        let p_limit = add_param(Box::new(i64::from(limit)));
1556        write!(sql, " LIMIT {p_limit}").unwrap();
1557    }
1558
1559    let params_refs: Vec<&(dyn ToSql + Sync)> = params
1560        .iter()
1561        .map(|p| p.as_ref() as &(dyn ToSql + Sync))
1562        .collect();
1563
1564    let rows = tx
1565        .query(&sql, &params_refs)
1566        .await
1567        .map_err(DbErrorRead::from)?;
1568
1569    let mut results = Vec::with_capacity(rows.len());
1570    for row in rows {
1571        results.push(parse_response_with_cursor(&row)?);
1572    }
1573
1574    Ok(results)
1575}
1576
1577fn parse_response_with_cursor(
1578    row: &tokio_postgres::Row,
1579) -> Result<ResponseWithCursor, DbErrorRead> {
1580    // Postgres BIGINT = i64.
1581    let id = u32::try_from(get::<i64>(row, "id")?)
1582        .map_err(|_| consistency_db_err("id must not be negative"))?;
1583
1584    let created_at: DateTime<Utc> = get(row, "created_at")?;
1585    let join_set_id_str: String = get(row, "join_set_id")?;
1586    let join_set_id = JoinSetId::from_str(&join_set_id_str).map_err(DbErrorGeneric::from)?;
1587
1588    // Extract Optionals
1589    let delay_id: Option<String> = get(row, "delay_id")?;
1590    let delay_id = delay_id
1591        .map(|id| DelayId::from_str(&id))
1592        .transpose()
1593        .map_err(DbErrorGeneric::from)?;
1594    let delay_success: Option<bool> = get(row, "delay_success")?;
1595    let child_execution_id: Option<String> = get(row, "child_execution_id")?;
1596    let child_execution_id = child_execution_id
1597        .map(|id| ExecutionIdDerived::from_str(&id))
1598        .transpose()
1599        .map_err(DbErrorGeneric::from)?;
1600    let finished_version = get::<Option<i64>>(row, "finished_version")?
1601        .map(Version::try_from)
1602        .transpose()
1603        .map_err(|_| consistency_db_err("version must be non-negative"))?;
1604    let json_value: Option<Json<ExecutionRequest>> = get(row, "json_value")?;
1605    let json_value = json_value.map(|it| it.0);
1606
1607    let event = match (
1608        delay_id,
1609        delay_success,
1610        child_execution_id,
1611        finished_version,
1612        json_value,
1613    ) {
1614        (Some(delay_id), Some(delay_success), None, None, None) => JoinSetResponse::DelayFinished {
1615            delay_id,
1616            result: delay_success.then_some(()).ok_or(()),
1617        },
1618        (None, None, Some(child_execution_id), Some(finished_version), Some(json_val)) => {
1619            if let ExecutionRequest::Finished { result, .. } = json_val {
1620                JoinSetResponse::ChildExecutionFinished {
1621                    child_execution_id,
1622                    finished_version,
1623                    result,
1624                }
1625            } else {
1626                error!("Joined log entry must be 'Finished'");
1627                return Err(consistency_db_err("joined log entry must be 'Finished'").into());
1628            }
1629        }
1630        (delay, delay_success, child, finished, result) => {
1631            error!(
1632                "Invalid row in t_join_set_response {id} - {delay:?} {delay_success:?} {child:?} {finished:?} {result:?}",
1633            );
1634            return Err(consistency_db_err("invalid row in t_join_set_response").into());
1635        }
1636    };
1637
1638    Ok(ResponseWithCursor {
1639        cursor: id,
1640        event: JoinSetResponseEventOuter {
1641            event: JoinSetResponseEvent { join_set_id, event },
1642            created_at,
1643        },
1644    })
1645}
1646
1647#[instrument(level = Level::TRACE, skip_all, fields(%execution_id, %run_id, %executor_id))]
1648#[expect(clippy::too_many_arguments)]
1649async fn lock_single_execution(
1650    tx: &Transaction<'_>,
1651    created_at: DateTime<Utc>,
1652    component_id: ComponentId,
1653    execution_id: &ExecutionId,
1654    run_id: RunId,
1655    appending_version: &Version,
1656    executor_id: ExecutorId,
1657    lock_expires_at: DateTime<Utc>,
1658    retry_config: ComponentRetryConfig,
1659) -> Result<LockedExecution, DbErrorWrite> {
1660    debug!("lock_single_execution");
1661
1662    // Check State
1663    let combined_state = get_combined_state(tx, execution_id).await?;
1664    combined_state
1665        .execution_with_state
1666        .pending_state
1667        .can_append_lock(created_at, executor_id, run_id, lock_expires_at)?;
1668    let expected_version = combined_state.get_next_version_assert_not_finished();
1669    check_expected_next_and_appending_version(&expected_version, appending_version)?;
1670
1671    // Prepare Event
1672    let locked_event = Locked {
1673        component_id,
1674        executor_id,
1675        lock_expires_at,
1676        run_id,
1677        retry_config,
1678    };
1679    let event = ExecutionRequest::Locked(locked_event.clone());
1680
1681    let event = Json(event);
1682
1683    // Append to execution_log
1684    tx.execute(
1685        "INSERT INTO t_execution_log \
1686            (execution_id, created_at, json_value, version, variant) \
1687            VALUES ($1, $2, $3, $4, $5)",
1688        &[
1689            &execution_id.to_string(),
1690            &created_at,
1691            &event,
1692            &i64::from(appending_version.0),
1693            &event.0.variant(),
1694        ],
1695    )
1696    .await
1697    .map_err(|err| {
1698        DbErrorWrite::NonRetriable(DbErrorWriteNonRetriable::IllegalState {
1699            reason: "cannot lock".into(),
1700            context: SpanTrace::capture(),
1701            source: Some(Arc::new(err)),
1702            loc: Location::caller(),
1703        })
1704    })?;
1705
1706    // Update t_state
1707    let responses_dto = list_responses(tx, execution_id, None).await?;
1708    let responses = responses_dto.into_iter().map(|resp| resp.event).collect();
1709    trace!("Responses: {responses:?}");
1710
1711    let intermittent_event_count = update_state_locked_get_intermittent_event_count(
1712        tx,
1713        execution_id,
1714        executor_id,
1715        run_id,
1716        lock_expires_at,
1717        appending_version,
1718        retry_config,
1719    )
1720    .await?;
1721
1722    // Fetch History
1723    // Fetch event_history and `Created` event.
1724    let rows = tx
1725        .query(
1726            "SELECT json_value, version FROM t_execution_log WHERE \
1727                execution_id = $1 AND (variant = $2 OR variant = $3) \
1728                ORDER BY version",
1729            &[
1730                &execution_id.to_string(),
1731                &DUMMY_CREATED.variant(),
1732                &DUMMY_HISTORY_EVENT.variant(),
1733            ],
1734        )
1735        .await
1736        .map_err(DbErrorGeneric::from)?;
1737
1738    let mut events: VecDeque<ExecutionEvent> = VecDeque::new();
1739
1740    for row in rows {
1741        let event: Json<ExecutionRequest> = get(&row, "json_value")?;
1742        let event = event.0;
1743
1744        let version: i64 = get(&row, "version")?;
1745        let version = Version::try_from(version)
1746            .map_err(|_| consistency_db_err("version must be non-negative"))?;
1747
1748        events.push_back(ExecutionEvent {
1749            created_at: DateTime::from_timestamp_nanos(0), // not used, only the inner event and version
1750            event,
1751            backtrace_id: None,
1752            version,
1753        });
1754    }
1755
1756    // Extract Created Event
1757    let Some(ExecutionRequest::Created {
1758        ffqn,
1759        params,
1760        parent,
1761        metadata,
1762        ..
1763    }) = events.pop_front().map(|outer| outer.event)
1764    else {
1765        error!("Execution log must contain at least `Created` event");
1766        return Err(consistency_db_err("execution log must contain `Created` event").into());
1767    };
1768
1769    // Extract History Events
1770    let mut event_history = Vec::new();
1771    for ExecutionEvent { event, version, .. } in events {
1772        if let ExecutionRequest::HistoryEvent { event } = event {
1773            event_history.push((event, version));
1774        } else {
1775            error!("Rows can only contain `Created` and `HistoryEvent` event kinds");
1776            return Err(consistency_db_err(
1777                "rows can only contain `Created` and `HistoryEvent` event kinds",
1778            )
1779            .into());
1780        }
1781    }
1782
1783    Ok(LockedExecution {
1784        execution_id: execution_id.clone(),
1785        metadata,
1786        next_version: appending_version.increment(),
1787        ffqn,
1788        params,
1789        event_history,
1790        responses,
1791        parent,
1792        intermittent_event_count,
1793        locked_event,
1794    })
1795}
1796
1797async fn count_join_next(
1798    tx: &Transaction<'_>,
1799    execution_id: &ExecutionId,
1800    join_set_id: &JoinSetId,
1801) -> Result<u32, DbErrorRead> {
1802    let row = tx
1803            .query_one(
1804                "SELECT COUNT(*) as count FROM t_execution_log WHERE execution_id = $1 AND join_set_id = $2 \
1805                AND history_event_type = $3",
1806                &[
1807                    &execution_id.to_string(),
1808                    &join_set_id.to_string(),
1809                    &HISTORY_EVENT_TYPE_JOIN_NEXT,
1810                ],
1811            )
1812            .await
1813            .map_err(DbErrorRead::from)?;
1814
1815    let count = u32::try_from(get::<i64>(&row, "count")?).expect("COUNT cannot be negative");
1816    Ok(count)
1817}
1818
1819async fn nth_response(
1820    tx: &Transaction<'_>,
1821    execution_id: &ExecutionId,
1822    join_set_id: &JoinSetId,
1823    skip_rows: u32,
1824) -> Result<Option<ResponseWithCursor>, DbErrorRead> {
1825    let row = tx
1826            .query_opt(
1827                "SELECT r.id, r.created_at, r.join_set_id, \
1828                 r.delay_id, r.delay_success, \
1829                 r.child_execution_id, r.finished_version, l.json_value \
1830                 FROM t_join_set_response r LEFT OUTER JOIN t_execution_log l ON r.child_execution_id = l.execution_id \
1831                 WHERE \
1832                 r.execution_id = $1 AND r.join_set_id = $2 AND \
1833                 ( \
1834                 r.finished_version = l.version \
1835                 OR \
1836                 r.child_execution_id IS NULL \
1837                 ) \
1838                 ORDER BY id \
1839                 LIMIT 1 OFFSET $3",
1840                 &[
1841                     &execution_id.to_string(),
1842                     &join_set_id.to_string(),
1843                     &i64::from(skip_rows),
1844                 ]
1845            )
1846            .await
1847            .map_err(DbErrorRead::from)?;
1848
1849    match row {
1850        Some(r) => Ok(Some(parse_response_with_cursor(&r)?)),
1851        None => Ok(None),
1852    }
1853}
1854
1855#[instrument(level = Level::TRACE, skip_all, fields(%execution_id))]
1856async fn append(
1857    tx: &Transaction<'_>,
1858    execution_id: &ExecutionId,
1859    req: AppendRequest,
1860    appending_version: Version,
1861) -> Result<(AppendResponse, AppendNotifier), DbErrorWrite> {
1862    if matches!(req.event, ExecutionRequest::Created { .. }) {
1863        return Err(DbErrorWrite::NonRetriable(
1864            DbErrorWriteNonRetriable::ValidationFailed(
1865                "cannot append `Created` event - use `create` instead".into(),
1866            ),
1867        ));
1868    }
1869
1870    if let AppendRequest {
1871        event:
1872            ExecutionRequest::Locked(Locked {
1873                component_id,
1874                executor_id,
1875                run_id,
1876                lock_expires_at,
1877                retry_config,
1878            }),
1879        created_at,
1880    } = req
1881    {
1882        return lock_single_execution(
1883            tx,
1884            created_at,
1885            component_id,
1886            execution_id,
1887            run_id,
1888            &appending_version,
1889            executor_id,
1890            lock_expires_at,
1891            retry_config,
1892        )
1893        .await
1894        .map(|locked_execution| (locked_execution.next_version, AppendNotifier::default()));
1895    }
1896
1897    let combined_state = get_combined_state(tx, execution_id).await?;
1898    if combined_state
1899        .execution_with_state
1900        .pending_state
1901        .is_finished()
1902    {
1903        debug!("Execution is already finished");
1904        return Err(DbErrorWrite::NonRetriable(
1905            DbErrorWriteNonRetriable::IllegalState {
1906                reason: "already finished".into(),
1907                context: SpanTrace::capture(),
1908                source: None,
1909                loc: Location::caller(),
1910            },
1911        ));
1912    }
1913
1914    check_expected_next_and_appending_version(
1915        &combined_state.get_next_version_assert_not_finished(),
1916        &appending_version,
1917    )?;
1918
1919    let event = Json(req.event);
1920
1921    // Insert into t_execution_log
1922    tx.execute(
1923            "INSERT INTO t_execution_log (execution_id, created_at, json_value, version, variant, join_set_id) \
1924             VALUES ($1, $2, $3, $4, $5, $6)",
1925            &[
1926                &execution_id.to_string(),
1927                &req.created_at,
1928                &event,
1929                &i64::from(appending_version.0),
1930                &event.0.variant(),
1931                &event.0.join_set_id().map(std::string::ToString::to_string),
1932            ],
1933        )
1934        .await?;
1935
1936    // Calculate current pending state
1937    match &event.0 {
1938        ExecutionRequest::Created { .. } => {
1939            unreachable!("handled in the caller")
1940        }
1941
1942        ExecutionRequest::Locked { .. } => {
1943            unreachable!("handled above")
1944        }
1945
1946        ExecutionRequest::TemporarilyFailed {
1947            backoff_expires_at, ..
1948        }
1949        | ExecutionRequest::TemporarilyTimedOut {
1950            backoff_expires_at, ..
1951        } => {
1952            let (next_version, notifier) = update_state_pending_after_event_appended(
1953                tx,
1954                execution_id,
1955                &appending_version,
1956                *backoff_expires_at,
1957                true, // an intermittent failure
1958                combined_state.execution_with_state.component_digest,
1959            )
1960            .await?;
1961            return Ok((next_version, notifier));
1962        }
1963
1964        ExecutionRequest::Unlocked {
1965            backoff_expires_at, ..
1966        } => {
1967            let (next_version, notifier) = update_state_pending_after_event_appended(
1968                tx,
1969                execution_id,
1970                &appending_version,
1971                *backoff_expires_at,
1972                false, // not an intermittent failure
1973                combined_state.execution_with_state.component_digest,
1974            )
1975            .await?;
1976            return Ok((next_version, notifier));
1977        }
1978
1979        ExecutionRequest::Finished { result, .. } => {
1980            update_state_finished(
1981                tx,
1982                execution_id,
1983                &appending_version,
1984                req.created_at,
1985                PendingStateFinishedResultKind::from(result),
1986            )
1987            .await?;
1988            return Ok((
1989                appending_version,
1990                AppendNotifier {
1991                    pending_at: None,
1992                    execution_finished: Some(NotifierExecutionFinished {
1993                        execution_id: execution_id.clone(),
1994                        retval: result.clone(),
1995                    }),
1996                    response: None,
1997                },
1998            ));
1999        }
2000
2001        ExecutionRequest::HistoryEvent {
2002            event:
2003                HistoryEvent::JoinSetCreate { .. }
2004                | HistoryEvent::JoinSetRequest {
2005                    request: JoinSetRequest::ChildExecutionRequest { .. },
2006                    ..
2007                }
2008                | HistoryEvent::Persist { .. }
2009                | HistoryEvent::Schedule { .. }
2010                | HistoryEvent::Stub { .. }
2011                | HistoryEvent::JoinNextTooMany { .. },
2012        } => {
2013            return Ok((
2014                bump_state_next_version(tx, execution_id, &appending_version, None).await?,
2015                AppendNotifier::default(),
2016            ));
2017        }
2018
2019        ExecutionRequest::HistoryEvent {
2020            event:
2021                HistoryEvent::JoinSetRequest {
2022                    join_set_id,
2023                    request:
2024                        JoinSetRequest::DelayRequest {
2025                            delay_id,
2026                            expires_at,
2027                            ..
2028                        },
2029                },
2030        } => {
2031            return Ok((
2032                bump_state_next_version(
2033                    tx,
2034                    execution_id,
2035                    &appending_version,
2036                    Some(DelayReq {
2037                        join_set_id: join_set_id.clone(),
2038                        delay_id: delay_id.clone(),
2039                        expires_at: *expires_at,
2040                    }),
2041                )
2042                .await?,
2043                AppendNotifier::default(),
2044            ));
2045        }
2046
2047        ExecutionRequest::HistoryEvent {
2048            event:
2049                HistoryEvent::JoinNext {
2050                    join_set_id,
2051                    run_expires_at,
2052                    closing,
2053                    requested_ffqn: _,
2054                },
2055        } => {
2056            // Did the response arrive already?
2057            let join_next_count = count_join_next(tx, execution_id, join_set_id).await?;
2058
2059            // Fetch the response corresponding to this JoinNext (skip n-1)
2060            let nth_response =
2061                nth_response(tx, execution_id, join_set_id, join_next_count - 1).await?;
2062
2063            trace!("join_next_count: {join_next_count}, nth_response: {nth_response:?}");
2064            assert!(join_next_count > 0);
2065
2066            if let Some(ResponseWithCursor {
2067                event:
2068                    JoinSetResponseEventOuter {
2069                        created_at: nth_created_at,
2070                        ..
2071                    },
2072                cursor: _,
2073            }) = nth_response
2074            {
2075                let scheduled_at = std::cmp::max(*run_expires_at, nth_created_at);
2076                let (next_version, notifier) = update_state_pending_after_event_appended(
2077                    tx,
2078                    execution_id,
2079                    &appending_version,
2080                    scheduled_at,
2081                    false, // not an intermittent failure
2082                    combined_state.execution_with_state.component_digest,
2083                )
2084                .await?;
2085                return Ok((next_version, notifier));
2086            }
2087
2088            return Ok((
2089                update_state_blocked(
2090                    tx,
2091                    execution_id,
2092                    &appending_version,
2093                    join_set_id,
2094                    *run_expires_at,
2095                    *closing,
2096                )
2097                .await?,
2098                AppendNotifier::default(),
2099            ));
2100        }
2101    }
2102}
2103
2104async fn append_response(
2105    tx: &Transaction<'_>,
2106    execution_id: &ExecutionId,
2107    response_outer: JoinSetResponseEventOuter,
2108) -> Result<AppendNotifier, DbErrorWrite> {
2109    let join_set_id = &response_outer.event.join_set_id;
2110
2111    let (delay_id, delay_success) = match &response_outer.event.event {
2112        JoinSetResponse::DelayFinished { delay_id, result } => {
2113            (Some(delay_id.to_string()), Some(result.is_ok()))
2114        }
2115        JoinSetResponse::ChildExecutionFinished { .. } => (None, None),
2116    };
2117
2118    let (child_execution_id, finished_version) = match &response_outer.event.event {
2119        JoinSetResponse::ChildExecutionFinished {
2120            child_execution_id,
2121            finished_version,
2122            result: _,
2123        } => (
2124            Some(child_execution_id.to_string()),
2125            Some(i64::from(finished_version.0)),
2126        ),
2127        JoinSetResponse::DelayFinished { .. } => (None, None),
2128    };
2129
2130    tx.execute(
2131            "INSERT INTO t_join_set_response (execution_id, created_at, join_set_id, delay_id, delay_success, child_execution_id, finished_version) \
2132             VALUES ($1, $2, $3, $4, $5, $6, $7)",
2133             &[
2134                 &execution_id.to_string(),
2135                 &response_outer.created_at,
2136                 &join_set_id.to_string(),
2137                 &delay_id,
2138                 &delay_success,
2139                 &child_execution_id,
2140                 &finished_version,
2141             ]
2142        ).await?;
2143
2144    // if the execution is going to be unblocked by this response...
2145    let combined_state = get_combined_state(tx, execution_id).await?;
2146    debug!("previous_pending_state: {combined_state:?}");
2147
2148    let mut notifier = if let PendingState::BlockedByJoinSet {
2149        join_set_id: found_join_set_id,
2150        lock_expires_at,
2151        closing: _,
2152    } = combined_state.execution_with_state.pending_state
2153        && *join_set_id == found_join_set_id
2154    {
2155        let scheduled_at = std::cmp::max(lock_expires_at, response_outer.created_at);
2156        // Unblock the state.
2157        update_state_pending_after_response_appended(
2158            tx,
2159            execution_id,
2160            scheduled_at,
2161            &combined_state.corresponding_version,
2162            combined_state.execution_with_state.component_digest,
2163        )
2164        .await?
2165    } else {
2166        AppendNotifier::default()
2167    };
2168
2169    if let JoinSetResponseEvent {
2170        join_set_id,
2171        event:
2172            JoinSetResponse::DelayFinished {
2173                delay_id,
2174                result: _,
2175            },
2176    } = &response_outer.event
2177    {
2178        debug!(%join_set_id, %delay_id, "Deleting from `t_delay`");
2179        tx.execute(
2180            "DELETE FROM t_delay WHERE execution_id = $1 AND join_set_id = $2 AND delay_id = $3",
2181            &[
2182                &execution_id.to_string(),
2183                &join_set_id.to_string(),
2184                &delay_id.to_string(),
2185            ],
2186        )
2187        .await?;
2188    }
2189
2190    notifier.response = Some((execution_id.clone(), response_outer));
2191    Ok(notifier)
2192}
2193
2194async fn append_backtrace(
2195    tx: &Transaction<'_>,
2196    backtrace_info: &BacktraceInfo,
2197) -> Result<(), DbErrorWrite> {
2198    let backtrace_json = Json(&backtrace_info.wasm_backtrace);
2199
2200    tx.execute(
2201            "INSERT INTO t_backtrace (execution_id, component_id, version_min_including, version_max_excluding, wasm_backtrace) \
2202             VALUES ($1, $2, $3, $4, $5)",
2203            &[
2204                &backtrace_info.execution_id.to_string(),
2205                &Json(&backtrace_info.component_id),
2206                &i64::from(backtrace_info.version_min_including.0),
2207                &i64::from(backtrace_info.version_max_excluding.0),
2208                &backtrace_json,
2209            ],
2210        )
2211        .await?;
2212
2213    Ok(())
2214}
2215
2216#[cfg(feature = "test")]
2217async fn get_execution_log(
2218    tx: &Transaction<'_>,
2219    execution_id: &ExecutionId,
2220) -> Result<concepts::storage::ExecutionLog, DbErrorRead> {
2221    let rows = tx
2222        .query(
2223            "SELECT created_at, json_value, version FROM t_execution_log WHERE \
2224                 execution_id = $1 ORDER BY version",
2225            &[&execution_id.to_string()],
2226        )
2227        .await
2228        .map_err(DbErrorRead::from)?;
2229
2230    if rows.is_empty() {
2231        return Err(DbErrorRead::NotFound);
2232    }
2233
2234    let mut events = Vec::with_capacity(rows.len());
2235    for row in rows {
2236        let created_at: DateTime<Utc> = get(&row, "created_at")?;
2237        let event: Json<ExecutionRequest> = get(&row, "json_value")?;
2238        let event = event.0;
2239        let version: i64 = get(&row, "version")?;
2240        let version = Version::try_from(version)
2241            .map_err(|_| consistency_db_err("version must be non-negative"))?;
2242
2243        events.push(ExecutionEvent {
2244            created_at,
2245            event,
2246            backtrace_id: None,
2247            version,
2248        });
2249    }
2250
2251    let combined_state = get_combined_state(tx, execution_id).await?;
2252    let responses_dto = list_responses(tx, execution_id, None).await?;
2253    let responses = responses_dto.into_iter().map(|resp| resp.event).collect();
2254
2255    Ok(concepts::storage::ExecutionLog {
2256        execution_id: execution_id.clone(),
2257        events,
2258        responses,
2259        next_version: combined_state.get_next_version_or_finished(),
2260        pending_state: combined_state.execution_with_state.pending_state,
2261        component_digest: combined_state.execution_with_state.component_digest,
2262    })
2263}
2264
2265async fn list_execution_events(
2266    tx: &Transaction<'_>,
2267    execution_id: &ExecutionId,
2268    version_min: VersionType,
2269    version_max_excluding: VersionType,
2270    include_backtrace_id: bool,
2271) -> Result<Vec<ExecutionEvent>, DbErrorRead> {
2272    let sql = if include_backtrace_id {
2273        "SELECT
2274                log.created_at,
2275                log.json_value,
2276                log.version,
2277                bt.version_min_including AS backtrace_id
2278            FROM
2279                t_execution_log AS log
2280            LEFT OUTER JOIN
2281                t_backtrace AS bt ON log.execution_id = bt.execution_id
2282                                AND log.version >= bt.version_min_including
2283                                AND log.version < bt.version_max_excluding
2284            WHERE
2285                log.execution_id = $1
2286                AND log.version >= $2
2287                AND log.version < $3
2288            ORDER BY
2289                log.version"
2290    } else {
2291        "SELECT
2292                created_at, json_value, NULL::BIGINT as backtrace_id, version
2293            FROM t_execution_log WHERE
2294                execution_id = $1 AND version >= $2 AND version < $3
2295            ORDER BY version"
2296    };
2297
2298    let rows = tx
2299        .query(
2300            sql,
2301            &[
2302                &execution_id.to_string(),
2303                &i64::from(version_min),
2304                &i64::from(version_max_excluding),
2305            ],
2306        )
2307        .await
2308        .map_err(DbErrorRead::from)?;
2309
2310    let mut events = Vec::with_capacity(rows.len());
2311    for row in rows {
2312        let created_at: DateTime<Utc> = get(&row, "created_at")?;
2313        let backtrace_id = get::<Option<i64>>(&row, "backtrace_id")?
2314            .map(Version::try_from)
2315            .transpose()
2316            .map_err(|_| consistency_db_err("version must be non-negative"))?;
2317
2318        let version = get::<i64>(&row, "version")?;
2319        let version = Version::new(
2320            VersionType::try_from(version)
2321                .map_err(|_| consistency_db_err("version must be non-negative"))?,
2322        );
2323        let event_req: Json<ExecutionRequest> = get(&row, "json_value")?;
2324        let event_req = event_req.0;
2325
2326        events.push(ExecutionEvent {
2327            created_at,
2328            event: event_req,
2329            backtrace_id,
2330            version,
2331        });
2332    }
2333    Ok(events)
2334}
2335
2336async fn get_execution_event(
2337    tx: &Transaction<'_>,
2338    execution_id: &ExecutionId,
2339    version: VersionType,
2340) -> Result<ExecutionEvent, DbErrorRead> {
2341    let row = tx
2342        .query_one(
2343            "SELECT created_at, json_value, version FROM t_execution_log WHERE \
2344                 execution_id = $1 AND version = $2",
2345            &[&execution_id.to_string(), &i64::from(version)],
2346        )
2347        .await?;
2348
2349    let created_at: DateTime<Utc> = get(&row, "created_at")?;
2350    let json_val: Json<ExecutionRequest> = get(&row, "json_value")?;
2351    let version = get::<i64>(&row, "version")?;
2352    let version = Version::try_from(version)
2353        .map_err(|_| consistency_db_err("version must be non-negative"))?;
2354    let event = json_val.0;
2355
2356    Ok(ExecutionEvent {
2357        created_at,
2358        event,
2359        backtrace_id: None,
2360        version,
2361    })
2362}
2363
2364async fn get_last_execution_event(
2365    tx: &Transaction<'_>,
2366    execution_id: &ExecutionId,
2367) -> Result<ExecutionEvent, DbErrorRead> {
2368    let row = tx
2369        .query_one(
2370            "SELECT created_at, json_value, version FROM t_execution_log WHERE \
2371                 execution_id = $1 ORDER BY version DESC LIMIT 1",
2372            &[&execution_id.to_string()],
2373        )
2374        .await?;
2375
2376    let created_at: DateTime<Utc> = get(&row, "created_at")?;
2377    let event: Json<ExecutionRequest> = get(&row, "json_value")?;
2378    let event = event.0;
2379    let version = get::<i64>(&row, "version")?;
2380    let version = Version::try_from(version)
2381        .map_err(|_| consistency_db_err("version must be non-negative"))?;
2382
2383    Ok(ExecutionEvent {
2384        created_at,
2385        event,
2386        backtrace_id: None,
2387        version,
2388    })
2389}
2390
2391async fn delay_response(
2392    tx: &Transaction<'_>,
2393    execution_id: &ExecutionId,
2394    delay_id: &DelayId,
2395) -> Result<Option<bool>, DbErrorRead> {
2396    let row = tx
2397        .query_opt(
2398            "SELECT delay_success \
2399                 FROM t_join_set_response \
2400                 WHERE \
2401                 execution_id = $1 AND delay_id = $2",
2402            &[&execution_id.to_string(), &delay_id.to_string()],
2403        )
2404        .await?;
2405
2406    match row {
2407        Some(r) => Ok(Some(get::<bool>(&r, "delay_success")?)),
2408        None => Ok(None),
2409    }
2410}
2411
2412#[instrument(level = Level::TRACE, skip_all)]
2413async fn get_responses_with_offset(
2414    tx: &Transaction<'_>,
2415    execution_id: &ExecutionId,
2416    skip_rows: u32,
2417) -> Result<Vec<JoinSetResponseEventOuter>, DbErrorRead> {
2418    let rows = tx
2419            .query(
2420                "SELECT r.id, r.created_at, r.join_set_id, \
2421                 r.delay_id, r.delay_success, \
2422                 r.child_execution_id, r.finished_version, l.json_value \
2423                 FROM t_join_set_response r LEFT OUTER JOIN t_execution_log l ON r.child_execution_id = l.execution_id \
2424                 WHERE \
2425                 r.execution_id = $1 AND \
2426                 ( \
2427                 r.finished_version = l.version \
2428                 OR r.child_execution_id IS NULL \
2429                 ) \
2430                 ORDER BY id \
2431                 OFFSET $2",
2432                 &[
2433                     &execution_id.to_string(),
2434                     &(i64::from(skip_rows)),
2435                 ]
2436            )
2437            .await?;
2438
2439    let mut results = Vec::with_capacity(rows.len());
2440    for row in rows {
2441        let resp = parse_response_with_cursor(&row)?;
2442        results.push(resp.event);
2443    }
2444    Ok(results)
2445}
2446
2447async fn get_pending_of_single_ffqn(
2448    tx: &Transaction<'_>,
2449    batch_size: u32,
2450    pending_at_or_sooner: DateTime<Utc>,
2451    ffqn: &FunctionFqn,
2452    select_strategy: SelectStrategy,
2453) -> Result<Vec<(ExecutionId, Version)>, ()> {
2454    let rows = tx
2455        .query(
2456            &format!(
2457                "SELECT execution_id, corresponding_version FROM t_state \
2458                WHERE \
2459                state = '{STATE_PENDING_AT}' AND \
2460                pending_expires_finished <= $1 AND ffqn = $2 \
2461                ORDER BY pending_expires_finished \
2462                {} \
2463                LIMIT $3",
2464                if select_strategy == SelectStrategy::LockForUpdate {
2465                    "FOR UPDATE SKIP LOCKED"
2466                } else {
2467                    ""
2468                }
2469            ),
2470            &[
2471                &pending_at_or_sooner,
2472                &ffqn.to_string(),
2473                &(i64::from(batch_size)),
2474            ],
2475        )
2476        .await
2477        .map_err(|err| {
2478            warn!("Ignoring consistency error {err:?}");
2479        })?;
2480
2481    let mut result = Vec::with_capacity(rows.len());
2482    for row in rows {
2483        let unpack = || -> Result<(ExecutionId, Version), DbErrorGeneric> {
2484            let eid_str: String = get(&row, "execution_id")?;
2485            let corresponding_version: i64 = get(&row, "corresponding_version")?;
2486            let corresponding_version = Version::try_from(corresponding_version)
2487                .map_err(|_| consistency_db_err("version must be non-negative"))?;
2488
2489            if let Ok(eid) = ExecutionId::from_str(&eid_str) {
2490                return Ok((eid, corresponding_version.increment()));
2491            }
2492            Err(consistency_db_err("invalid execution_id"))
2493        };
2494
2495        match unpack() {
2496            Ok(val) => result.push(val),
2497            Err(err) => warn!("Ignoring corrupted row in pending check: {err:?}"),
2498        }
2499    }
2500    Ok(result)
2501}
2502
2503/// Get executions and their next versions
2504async fn get_pending_by_ffqns(
2505    tx: &Transaction<'_>,
2506    batch_size: u32,
2507    pending_at_or_sooner: DateTime<Utc>,
2508    ffqns: &[FunctionFqn],
2509    select_strategy: SelectStrategy,
2510) -> Result<Vec<(ExecutionId, Version)>, DbErrorGeneric> {
2511    let batch_size = usize::try_from(batch_size).expect("16 bit systems are unsupported");
2512    let mut execution_ids_versions = Vec::with_capacity(batch_size);
2513
2514    for ffqn in ffqns {
2515        let needed = batch_size - execution_ids_versions.len();
2516        if needed == 0 {
2517            break;
2518        }
2519        let needed = u32::try_from(needed).expect("u32 - usize cannot overflow an 32");
2520        if let Ok(execs) =
2521            get_pending_of_single_ffqn(tx, needed, pending_at_or_sooner, ffqn, select_strategy)
2522                .await
2523        {
2524            execution_ids_versions.extend(execs);
2525        }
2526    }
2527
2528    Ok(execution_ids_versions)
2529}
2530
2531#[derive(Debug, Copy, Clone, PartialEq, Eq)]
2532enum SelectStrategy {
2533    Read,
2534    LockForUpdate,
2535}
2536
2537async fn get_pending_by_component_input_digest(
2538    tx: &Transaction<'_>,
2539    batch_size: u32,
2540    pending_at_or_sooner: DateTime<Utc>,
2541    input_digest: &InputContentDigest,
2542    select_strategy: SelectStrategy,
2543) -> Result<Vec<(ExecutionId, Version)>, DbErrorGeneric> {
2544    let rows = tx
2545        .query(
2546            &format!(
2547                "SELECT execution_id, corresponding_version FROM t_state WHERE \
2548                state = '{STATE_PENDING_AT}' AND \
2549                pending_expires_finished <= $1 AND \
2550                component_id_input_digest = $2 \
2551                ORDER BY pending_expires_finished \
2552                {} \
2553                LIMIT $3",
2554                if select_strategy == SelectStrategy::LockForUpdate {
2555                    "FOR UPDATE SKIP LOCKED"
2556                } else {
2557                    ""
2558                }
2559            ),
2560            &[
2561                &pending_at_or_sooner,
2562                &input_digest.as_slice(), // BYTEA
2563                &i64::from(batch_size),
2564            ],
2565        )
2566        .await?;
2567
2568    let mut result = Vec::with_capacity(rows.len());
2569    for row in rows {
2570        let unpack = || -> Result<(ExecutionId, Version), DbErrorGeneric> {
2571            let eid_str: String = get(&row, "execution_id")?;
2572            let corresponding_version: i64 = get(&row, "corresponding_version")?;
2573            let corresponding_version = Version::try_from(corresponding_version)
2574                .map_err(|_| consistency_db_err("version must be non-negative"))?;
2575
2576            let eid = ExecutionId::from_str(&eid_str)
2577                .map_err(|err| consistency_db_err(err.to_string()))?;
2578            Ok((eid, corresponding_version.increment()))
2579        };
2580
2581        match unpack() {
2582            Ok(val) => result.push(val),
2583            Err(err) => {
2584                warn!("Skipping corrupted row in get_pending_by_component_input_digest: {err:?}");
2585            }
2586        }
2587    }
2588
2589    Ok(result)
2590}
2591
2592fn notify_pending_locked(
2593    notifier: &NotifierPendingAt,
2594    current_time: DateTime<Utc>,
2595    ffqn_to_pending_subscription: &std::sync::MutexGuard<PendingFfqnSubscribersHolder>,
2596) {
2597    if notifier.scheduled_at <= current_time {
2598        ffqn_to_pending_subscription.notify(notifier);
2599    }
2600}
2601
2602async fn upgrade_execution_component(
2603    tx: &Transaction<'_>,
2604    execution_id: &ExecutionId,
2605    old: &InputContentDigest,
2606    new: &InputContentDigest,
2607) -> Result<(), DbErrorWrite> {
2608    debug!("Updating t_state to component {new}");
2609
2610    let updated = tx
2611        .execute(
2612            r"
2613                UPDATE t_state
2614                SET
2615                    updated_at = CURRENT_TIMESTAMP,
2616                    component_id_input_digest = $1
2617                WHERE
2618                    execution_id = $2 AND
2619                    component_id_input_digest = $3
2620                ",
2621            &[
2622                &new.as_slice(),           // $1: BYTEA
2623                &execution_id.to_string(), // $2: TEXT
2624                &old.as_slice(),           // $3: BYTEA
2625            ],
2626        )
2627        .await?;
2628
2629    if updated != 1 {
2630        return Err(DbErrorWrite::NotFound);
2631    }
2632    Ok(())
2633}
2634
2635impl PostgresConnection {
2636    // Must be called after write transaction commit for a correct happens-before relationship.
2637    #[instrument(level = Level::TRACE, skip_all)]
2638    fn notify_all(&self, notifiers: Vec<AppendNotifier>, current_time: DateTime<Utc>) {
2639        let (pending_ats, finished_execs, responses) = {
2640            let (mut pending_ats, mut finished_execs, mut responses) =
2641                (Vec::new(), Vec::new(), Vec::new());
2642            for notifier in notifiers {
2643                if let Some(pending_at) = notifier.pending_at {
2644                    pending_ats.push(pending_at);
2645                }
2646                if let Some(finished) = notifier.execution_finished {
2647                    finished_execs.push(finished);
2648                }
2649                if let Some(response) = notifier.response {
2650                    responses.push(response);
2651                }
2652            }
2653            (pending_ats, finished_execs, responses)
2654        };
2655
2656        // Notify pending_at subscribers.
2657        if !pending_ats.is_empty() {
2658            let guard = self.pending_subscribers.lock().unwrap();
2659            for pending_at in pending_ats {
2660                notify_pending_locked(&pending_at, current_time, &guard);
2661            }
2662        }
2663        // Notify execution finished subscribers.
2664        if !finished_execs.is_empty() {
2665            let mut guard = self.execution_finished_subscribers.lock().unwrap();
2666            for finished in finished_execs {
2667                if let Some(listeners_of_exe_id) = guard.remove(&finished.execution_id) {
2668                    for (_tag, sender) in listeners_of_exe_id {
2669                        let _ = sender.send(finished.retval.clone());
2670                    }
2671                }
2672            }
2673        }
2674        // Notify response subscribers.
2675        if !responses.is_empty() {
2676            let mut guard = self.response_subscribers.lock().unwrap();
2677            for (execution_id, response) in responses {
2678                if let Some((sender, _)) = guard.remove(&execution_id) {
2679                    let _ = sender.send(response);
2680                }
2681            }
2682        }
2683    }
2684}
2685
2686#[async_trait]
2687impl DbExecutor for PostgresConnection {
2688    #[instrument(level = Level::TRACE, skip(self))]
2689    async fn lock_pending_by_ffqns(
2690        &self,
2691        batch_size: u32,
2692        pending_at_or_sooner: DateTime<Utc>,
2693        ffqns: Arc<[FunctionFqn]>,
2694        created_at: DateTime<Utc>,
2695        component_id: ComponentId,
2696        executor_id: ExecutorId,
2697        lock_expires_at: DateTime<Utc>,
2698        run_id: RunId,
2699        retry_config: ComponentRetryConfig,
2700    ) -> Result<LockPendingResponse, DbErrorGeneric> {
2701        let mut client_guard = self.client.lock().await;
2702        let tx = client_guard.transaction().await?;
2703
2704        let execution_ids_versions = get_pending_by_ffqns(
2705            &tx,
2706            batch_size,
2707            pending_at_or_sooner,
2708            &ffqns,
2709            SelectStrategy::LockForUpdate,
2710        )
2711        .await?;
2712
2713        if execution_ids_versions.is_empty() {
2714            // Commit is required to release the connection state cleanly,
2715            // though rollback/drop works too for read-only.
2716            tx.commit().await?;
2717            return Ok(vec![]);
2718        }
2719
2720        debug!("Locking {execution_ids_versions:?}");
2721
2722        // Lock using the same transaction
2723        let mut locked_execs = Vec::with_capacity(execution_ids_versions.len());
2724        for (execution_id, version) in execution_ids_versions {
2725            match lock_single_execution(
2726                &tx,
2727                created_at,
2728                component_id.clone(),
2729                &execution_id,
2730                run_id,
2731                &version,
2732                executor_id,
2733                lock_expires_at,
2734                retry_config,
2735            )
2736            .await
2737            {
2738                Ok(locked) => locked_execs.push(locked),
2739                Err(err) => {
2740                    warn!("Locking row {execution_id} failed - {err:?}");
2741                }
2742            }
2743        }
2744
2745        tx.commit().await?;
2746
2747        Ok(locked_execs)
2748    }
2749
2750    #[instrument(level = Level::TRACE, skip(self))]
2751    async fn lock_pending_by_component_id(
2752        &self,
2753        batch_size: u32,
2754        pending_at_or_sooner: DateTime<Utc>,
2755        component_id: &ComponentId,
2756        created_at: DateTime<Utc>,
2757        executor_id: ExecutorId,
2758        lock_expires_at: DateTime<Utc>,
2759        run_id: RunId,
2760        retry_config: ComponentRetryConfig,
2761    ) -> Result<LockPendingResponse, DbErrorGeneric> {
2762        let mut client_guard = self.client.lock().await;
2763        let tx = client_guard.transaction().await?;
2764
2765        let execution_ids_versions = get_pending_by_component_input_digest(
2766            &tx,
2767            batch_size,
2768            pending_at_or_sooner,
2769            &component_id.input_digest,
2770            SelectStrategy::LockForUpdate,
2771        )
2772        .await?;
2773
2774        if execution_ids_versions.is_empty() {
2775            tx.commit().await?;
2776            return Ok(vec![]);
2777        }
2778
2779        debug!("Locking {execution_ids_versions:?}");
2780
2781        let mut locked_execs = Vec::with_capacity(execution_ids_versions.len());
2782        for (execution_id, version) in execution_ids_versions {
2783            match lock_single_execution(
2784                &tx,
2785                created_at,
2786                component_id.clone(),
2787                &execution_id,
2788                run_id,
2789                &version,
2790                executor_id,
2791                lock_expires_at,
2792                retry_config,
2793            )
2794            .await
2795            {
2796                Ok(locked) => locked_execs.push(locked),
2797                Err(err) => {
2798                    warn!("Locking row {execution_id} failed - {err:?}");
2799                }
2800            }
2801        }
2802
2803        tx.commit().await?;
2804        Ok(locked_execs)
2805    }
2806
2807    #[instrument(level = Level::DEBUG, skip(self))]
2808    async fn lock_one(
2809        &self,
2810        created_at: DateTime<Utc>,
2811        component_id: ComponentId,
2812        execution_id: &ExecutionId,
2813        run_id: RunId,
2814        version: Version,
2815        executor_id: ExecutorId,
2816        lock_expires_at: DateTime<Utc>,
2817        retry_config: ComponentRetryConfig,
2818    ) -> Result<LockedExecution, DbErrorWrite> {
2819        debug!(%execution_id, "lock_one");
2820        let mut client_guard = self.client.lock().await;
2821        let tx = client_guard.transaction().await?;
2822
2823        let res = lock_single_execution(
2824            &tx,
2825            created_at,
2826            component_id,
2827            execution_id,
2828            run_id,
2829            &version,
2830            executor_id,
2831            lock_expires_at,
2832            retry_config,
2833        )
2834        .await?;
2835
2836        tx.commit().await?;
2837        Ok(res)
2838    }
2839
2840    #[instrument(level = Level::DEBUG, skip(self, req))]
2841    async fn append(
2842        &self,
2843        execution_id: ExecutionId,
2844        version: Version,
2845        req: AppendRequest,
2846    ) -> Result<AppendResponse, DbErrorWrite> {
2847        debug!(%req, "append");
2848        trace!(?req, "append");
2849        let created_at = req.created_at;
2850
2851        let mut client_guard = self.client.lock().await;
2852        let tx = client_guard.transaction().await?;
2853
2854        let (new_version, notifier) = append(&tx, &execution_id, req, version).await?;
2855
2856        tx.commit().await?;
2857
2858        // Explicitly drop guard (optional, happens at end of scope anyway)
2859        drop(client_guard);
2860
2861        self.notify_all(vec![notifier], created_at);
2862        Ok(new_version)
2863    }
2864
2865    #[instrument(level = Level::DEBUG, skip_all)]
2866    async fn append_batch_respond_to_parent(
2867        &self,
2868        events: AppendEventsToExecution,
2869        response: AppendResponseToExecution,
2870        current_time: DateTime<Utc>,
2871    ) -> Result<AppendBatchResponse, DbErrorWrite> {
2872        debug!("append_batch_respond_to_parent");
2873        if events.execution_id == response.parent_execution_id {
2874            return Err(DbErrorWrite::NonRetriable(
2875                DbErrorWriteNonRetriable::ValidationFailed(
2876                    "Parameters `execution_id` and `parent_execution_id` cannot be the same".into(),
2877                ),
2878            ));
2879        }
2880        if events.batch.is_empty() {
2881            return Err(DbErrorWrite::NonRetriable(
2882                DbErrorWriteNonRetriable::ValidationFailed("batch cannot be empty".into()),
2883            ));
2884        }
2885
2886        let mut client_guard = self.client.lock().await;
2887        let tx = client_guard.transaction().await?;
2888
2889        let mut version = events.version;
2890        let mut notifiers = Vec::new();
2891
2892        for append_request in events.batch {
2893            let (v, n) = append(&tx, &events.execution_id, append_request, version).await?;
2894            version = v;
2895            notifiers.push(n);
2896        }
2897
2898        let pending_at_parent = append_response(
2899            &tx,
2900            &response.parent_execution_id,
2901            JoinSetResponseEventOuter {
2902                created_at: response.created_at,
2903                event: JoinSetResponseEvent {
2904                    join_set_id: response.join_set_id,
2905                    event: JoinSetResponse::ChildExecutionFinished {
2906                        child_execution_id: response.child_execution_id,
2907                        finished_version: response.finished_version,
2908                        result: response.result,
2909                    },
2910                },
2911            },
2912        )
2913        .await?;
2914        notifiers.push(pending_at_parent);
2915
2916        tx.commit().await?;
2917        drop(client_guard);
2918
2919        self.notify_all(notifiers, current_time);
2920        Ok(version)
2921    }
2922
2923    #[instrument(level = Level::TRACE, skip(self, timeout_fut))]
2924    async fn wait_for_pending_by_ffqn(
2925        &self,
2926        pending_at_or_sooner: DateTime<Utc>,
2927        ffqns: Arc<[FunctionFqn]>,
2928        timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
2929    ) {
2930        let unique_tag: u64 = rand::random();
2931        let (sender, mut receiver) = mpsc::channel(1);
2932        {
2933            let mut pending_subscribers = self.pending_subscribers.lock().unwrap();
2934            for ffqn in ffqns.as_ref() {
2935                pending_subscribers.insert_ffqn(ffqn.clone(), (sender.clone(), unique_tag));
2936            }
2937        }
2938
2939        async {
2940            let mut db_has_pending = false;
2941            {
2942                // Scope the lock so we don't hold it while waiting for timeout
2943                let mut client_guard = self.client.lock().await;
2944                // Read-only transaction check
2945                if let Ok(tx) = client_guard.transaction().await {
2946                    if let Ok(res) = get_pending_by_ffqns(
2947                        &tx,
2948                        1,
2949                        pending_at_or_sooner,
2950                        &ffqns,
2951                        SelectStrategy::Read,
2952                    )
2953                    .await
2954                        && !res.is_empty()
2955                    {
2956                        db_has_pending = true;
2957                    }
2958                    // Commit/Rollback read transaction
2959                    let _ = tx.commit().await;
2960                }
2961            }
2962
2963            if db_has_pending {
2964                trace!("Not waiting, database already contains new pending executions");
2965                return;
2966            }
2967
2968            tokio::select! {
2969                _ = receiver.recv() => {
2970                    trace!("Received a notification");
2971                }
2972                () = timeout_fut => {
2973                }
2974            }
2975        }
2976        .await;
2977
2978        // Cleanup
2979        {
2980            let mut pending_subscribers = self.pending_subscribers.lock().unwrap();
2981            for ffqn in ffqns.as_ref() {
2982                match pending_subscribers.remove_ffqn(ffqn) {
2983                    Some((_, tag)) if tag == unique_tag => {}
2984                    Some(other) => {
2985                        pending_subscribers.insert_ffqn(ffqn.clone(), other);
2986                    }
2987                    None => {}
2988                }
2989            }
2990        }
2991    }
2992
2993    #[instrument(level = Level::DEBUG, skip(self, timeout_fut))]
2994    async fn wait_for_pending_by_component_id(
2995        &self,
2996        pending_at_or_sooner: DateTime<Utc>,
2997        component_id: &ComponentId,
2998        timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
2999    ) {
3000        let unique_tag: u64 = rand::random();
3001        let (sender, mut receiver) = mpsc::channel(1);
3002        {
3003            let mut pending_subscribers = self.pending_subscribers.lock().unwrap();
3004            pending_subscribers.insert_by_component(
3005                component_id.input_digest.clone(),
3006                (sender.clone(), unique_tag),
3007            );
3008        }
3009
3010        async {
3011            let mut db_has_pending = false;
3012            {
3013                let mut client_guard = self.client.lock().await;
3014                if let Ok(tx) = client_guard.transaction().await {
3015                    if let Ok(res) = get_pending_by_component_input_digest(
3016                        &tx,
3017                        1,
3018                        pending_at_or_sooner,
3019                        &component_id.input_digest,
3020                        SelectStrategy::Read,
3021                    )
3022                    .await
3023                        && !res.is_empty()
3024                    {
3025                        db_has_pending = true;
3026                    }
3027                    let _ = tx.commit().await;
3028                }
3029            }
3030
3031            if db_has_pending {
3032                trace!("Not waiting, database already contains new pending executions");
3033                return;
3034            }
3035
3036            tokio::select! {
3037                _ = receiver.recv() => {
3038                    trace!("Received a notification");
3039                }
3040                () = timeout_fut => {
3041                }
3042            }
3043        }
3044        .await;
3045
3046        // Cleanup
3047        {
3048            let mut pending_subscribers = self.pending_subscribers.lock().unwrap();
3049            match pending_subscribers.remove_by_component(&component_id.input_digest) {
3050                Some((_, tag)) if tag == unique_tag => {}
3051                Some(other) => {
3052                    pending_subscribers
3053                        .insert_by_component(component_id.input_digest.clone(), other);
3054                }
3055                None => {}
3056            }
3057        }
3058    }
3059
3060    async fn get_last_execution_event(
3061        &self,
3062        execution_id: &ExecutionId,
3063    ) -> Result<ExecutionEvent, DbErrorRead> {
3064        let mut client_guard = self.client.lock().await;
3065        let tx = client_guard.transaction().await?;
3066
3067        let event = get_last_execution_event(&tx, execution_id).await?;
3068
3069        tx.commit().await?;
3070        Ok(event)
3071    }
3072}
3073#[async_trait]
3074impl DbConnection for PostgresConnection {
3075    #[instrument(level = Level::DEBUG, skip_all, fields(execution_id = %req.execution_id))]
3076    async fn create(&self, req: CreateRequest) -> Result<AppendResponse, DbErrorWrite> {
3077        debug!("create");
3078        trace!(?req, "create");
3079        let created_at = req.created_at;
3080
3081        let mut client_guard = self.client.lock().await;
3082        let tx = client_guard.transaction().await?;
3083
3084        let (version, notifier) = create_inner(&tx, req.clone()).await?;
3085
3086        tx.commit().await?;
3087        drop(client_guard); // Release DB lock before notifying
3088
3089        self.notify_all(vec![notifier], created_at);
3090        Ok(version)
3091    }
3092
3093    #[instrument(level = Level::DEBUG, skip(self, batch))]
3094    async fn append_batch(
3095        &self,
3096        current_time: DateTime<Utc>,
3097        batch: Vec<AppendRequest>,
3098        execution_id: ExecutionId,
3099        version: Version,
3100    ) -> Result<AppendBatchResponse, DbErrorWrite> {
3101        debug!("append_batch");
3102        trace!(?batch, "append_batch");
3103        assert!(!batch.is_empty(), "Empty batch request");
3104
3105        let mut client_guard = self.client.lock().await;
3106        let tx = client_guard.transaction().await?;
3107
3108        let mut version = version;
3109        let mut notifier = None;
3110
3111        for append_request in batch {
3112            let (v, n) = append(&tx, &execution_id, append_request, version).await?;
3113            version = v;
3114            notifier = Some(n);
3115        }
3116
3117        tx.commit().await?;
3118        drop(client_guard);
3119
3120        self.notify_all(
3121            vec![notifier.expect("checked that the batch is not empty")],
3122            current_time,
3123        );
3124        Ok(version)
3125    }
3126
3127    #[instrument(level = Level::DEBUG, skip(self, batch, child_req))]
3128    async fn append_batch_create_new_execution(
3129        &self,
3130        current_time: DateTime<Utc>,
3131        batch: Vec<AppendRequest>,
3132        execution_id: ExecutionId,
3133        version: Version,
3134        child_req: Vec<CreateRequest>,
3135    ) -> Result<AppendBatchResponse, DbErrorWrite> {
3136        debug!("append_batch_create_new_execution");
3137        trace!(?batch, ?child_req, "append_batch_create_new_execution");
3138        assert!(!batch.is_empty(), "Empty batch request");
3139
3140        let mut client_guard = self.client.lock().await;
3141        let tx = client_guard.transaction().await?;
3142
3143        let mut version = version;
3144        let mut notifier = None;
3145
3146        for append_request in batch {
3147            let (v, n) = append(&tx, &execution_id, append_request, version).await?;
3148            version = v;
3149            notifier = Some(n);
3150        }
3151
3152        let mut notifiers = Vec::new();
3153        notifiers.push(notifier.expect("checked that the batch is not empty"));
3154
3155        for req in child_req {
3156            let (_, n) = create_inner(&tx, req).await?;
3157            notifiers.push(n);
3158        }
3159
3160        tx.commit().await?;
3161        drop(client_guard);
3162
3163        self.notify_all(notifiers, current_time);
3164        Ok(version)
3165    }
3166
3167    #[instrument(level = Level::DEBUG, skip(self, timeout_fut))]
3168    async fn subscribe_to_next_responses(
3169        &self,
3170        execution_id: &ExecutionId,
3171        start_idx: u32,
3172        timeout_fut: Pin<Box<dyn Future<Output = TimeoutOutcome> + Send>>,
3173    ) -> Result<Vec<JoinSetResponseEventOuter>, DbErrorReadWithTimeout> {
3174        debug!("next_responses");
3175        let unique_tag: u64 = rand::random();
3176        let execution_id_clone = execution_id.clone();
3177
3178        let cleanup = || {
3179            let mut guard = self.response_subscribers.lock().unwrap();
3180            match guard.remove(&execution_id_clone) {
3181                Some((_, tag)) if tag == unique_tag => {}
3182                Some(other) => {
3183                    guard.insert(execution_id_clone.clone(), other);
3184                }
3185                None => {}
3186            }
3187        };
3188
3189        let receiver = {
3190            let mut client_guard = self.client.lock().await;
3191            let tx = client_guard.transaction().await?;
3192
3193            // Register listener before fetching from database.
3194            // This is a best-effort mechanism that shortens the polling time, if it
3195            // does not detect the response it will sleep using `timeout_fut`. Consumers
3196            // are expected to poll this function in a loop.
3197            // Currently the notification mechanism only works on a single node deployment.
3198            let (sender, receiver) = oneshot::channel();
3199            self.response_subscribers
3200                .lock()
3201                .unwrap()
3202                .insert(execution_id.clone(), (sender, unique_tag));
3203
3204            let responses = get_responses_with_offset(&tx, execution_id, start_idx).await?;
3205
3206            if responses.is_empty() {
3207                // Commit read transaction
3208                tx.commit().await.map_err(|err| {
3209                    cleanup(); // Remove the just inserted subscriber.
3210                    DbErrorRead::from(err)
3211                })?;
3212                receiver
3213            } else {
3214                cleanup(); // Remove the just inserted subscriber as we already have the answer.
3215                tx.commit().await?;
3216                return Ok(responses);
3217            }
3218        };
3219
3220        let res = tokio::select! {
3221            resp = receiver => {
3222                match resp {
3223                    Ok(resp) => Ok(vec![resp]),
3224                    Err(_) => Err(DbErrorReadWithTimeout::from(DbErrorGeneric::Close)),
3225                }
3226            }
3227            outcome = timeout_fut => Err(DbErrorReadWithTimeout::Timeout(outcome)),
3228        };
3229
3230        cleanup();
3231        res
3232    }
3233
3234    #[instrument(level = Level::DEBUG, skip(self, timeout_fut))]
3235    async fn wait_for_finished_result(
3236        &self,
3237        execution_id: &ExecutionId,
3238        timeout_fut: Option<Pin<Box<dyn Future<Output = TimeoutOutcome> + Send>>>,
3239    ) -> Result<SupportedFunctionReturnValue, DbErrorReadWithTimeout> {
3240        let unique_tag: u64 = rand::random();
3241        let execution_id_clone = execution_id.clone();
3242
3243        let cleanup = || {
3244            let mut guard = self.execution_finished_subscribers.lock().unwrap();
3245            if let Some(subscribers) = guard.get_mut(&execution_id_clone) {
3246                subscribers.remove(&unique_tag);
3247            }
3248        };
3249
3250        let receiver = {
3251            let mut client_guard = self.client.lock().await;
3252            let tx = client_guard.transaction().await?;
3253
3254            // Register listener
3255            let (sender, receiver) = oneshot::channel();
3256            {
3257                let mut guard = self.execution_finished_subscribers.lock().unwrap();
3258                guard
3259                    .entry(execution_id.clone())
3260                    .or_default()
3261                    .insert(unique_tag, sender);
3262            }
3263
3264            let pending_state = get_combined_state(&tx, execution_id)
3265                .await?
3266                .execution_with_state
3267                .pending_state;
3268
3269            if let PendingState::Finished { finished, .. } = pending_state {
3270                let event = get_execution_event(&tx, execution_id, finished.version).await?;
3271                tx.commit().await?;
3272                cleanup();
3273
3274                if let ExecutionRequest::Finished { result, .. } = event.event {
3275                    return Ok(result);
3276                }
3277                error!("Mismatch, expected Finished row: {event:?} based on t_state {finished}");
3278                return Err(DbErrorReadWithTimeout::from(consistency_db_err(
3279                    "cannot get finished event based on t_state version",
3280                )));
3281            }
3282            tx.commit().await?;
3283            receiver
3284        };
3285
3286        let timeout_fut = timeout_fut.unwrap_or_else(|| Box::pin(std::future::pending()));
3287        let res = tokio::select! {
3288            resp = receiver => {
3289                match resp {
3290                    Ok(retval) => Ok(retval),
3291                    Err(_recv_err) => Err(DbErrorGeneric::Close.into())
3292                }
3293            }
3294            outcome = timeout_fut => Err(DbErrorReadWithTimeout::Timeout(outcome)),
3295        };
3296
3297        cleanup();
3298        res
3299    }
3300
3301    #[instrument(level = Level::DEBUG, skip_all, fields(%join_set_id, %execution_id))]
3302    async fn append_delay_response(
3303        &self,
3304        created_at: DateTime<Utc>,
3305        execution_id: ExecutionId,
3306        join_set_id: JoinSetId,
3307        delay_id: DelayId,
3308        result: Result<(), ()>,
3309    ) -> Result<AppendDelayResponseOutcome, DbErrorWrite> {
3310        debug!("append_delay_response");
3311        let event = JoinSetResponseEventOuter {
3312            created_at,
3313            event: JoinSetResponseEvent {
3314                join_set_id,
3315                event: JoinSetResponse::DelayFinished {
3316                    delay_id: delay_id.clone(),
3317                    result,
3318                },
3319            },
3320        };
3321
3322        let mut client_guard = self.client.lock().await;
3323        let tx = client_guard.transaction().await?;
3324
3325        let res = append_response(&tx, &execution_id, event).await;
3326
3327        match res {
3328            Ok(notifier) => {
3329                tx.commit().await?;
3330                drop(client_guard);
3331                self.notify_all(vec![notifier], created_at);
3332                Ok(AppendDelayResponseOutcome::Success)
3333            }
3334            Err(DbErrorWrite::NonRetriable(DbErrorWriteNonRetriable::Conflict)) => {
3335                // Check if already finished
3336                // Roll back the failed tx, start new read tx.
3337                tx.rollback().await?;
3338
3339                // Start new tx for check
3340                let tx = client_guard.transaction().await?;
3341                let delay_success = delay_response(&tx, &execution_id, &delay_id).await?;
3342                tx.commit().await?;
3343
3344                match delay_success {
3345                    Some(true) => Ok(AppendDelayResponseOutcome::AlreadyFinished),
3346                    Some(false) => Ok(AppendDelayResponseOutcome::AlreadyCancelled),
3347                    None => Err(DbErrorWrite::Generic(consistency_db_err(
3348                        "insert failed yet select did not find the response",
3349                    ))),
3350                }
3351            }
3352            Err(err) => {
3353                let _ = tx.rollback().await; // cleanup
3354                Err(err)
3355            }
3356        }
3357    }
3358
3359    #[instrument(level = Level::DEBUG, skip_all)]
3360    async fn append_backtrace(&self, append: BacktraceInfo) -> Result<(), DbErrorWrite> {
3361        debug!("append_backtrace");
3362        let mut client_guard = self.client.lock().await;
3363        let tx = client_guard.transaction().await?;
3364
3365        append_backtrace(&tx, &append).await?;
3366
3367        tx.commit().await?;
3368        Ok(())
3369    }
3370
3371    #[instrument(level = Level::DEBUG, skip_all)]
3372    async fn append_backtrace_batch(&self, batch: Vec<BacktraceInfo>) -> Result<(), DbErrorWrite> {
3373        debug!("append_backtrace_batch");
3374        let mut client_guard = self.client.lock().await;
3375        let tx = client_guard.transaction().await?;
3376
3377        for append in batch {
3378            append_backtrace(&tx, &append).await?;
3379        }
3380
3381        tx.commit().await?;
3382        Ok(())
3383    }
3384
3385    /// Get currently expired delays and locks.
3386    #[instrument(level = Level::TRACE, skip(self))]
3387    async fn get_expired_timers(
3388        &self,
3389        at: DateTime<Utc>,
3390    ) -> Result<Vec<ExpiredTimer>, DbErrorGeneric> {
3391        let mut client_guard = self.client.lock().await;
3392        let tx = client_guard.transaction().await?;
3393
3394        // Expired Delays
3395        let rows = tx
3396            .query(
3397                "SELECT execution_id, join_set_id, delay_id FROM t_delay WHERE expires_at <= $1",
3398                &[&at],
3399            )
3400            .await?;
3401
3402        let mut expired_timers = Vec::with_capacity(rows.len());
3403        for row in rows {
3404            let unpack = || -> Result<ExpiredTimer, DbErrorGeneric> {
3405                let execution_id: String = get(&row, "execution_id")?;
3406                let execution_id = ExecutionId::from_str(&execution_id)?;
3407                let join_set_id: String = get(&row, "join_set_id")?;
3408                let join_set_id = JoinSetId::from_str(&join_set_id)?;
3409                let delay_id: String = get(&row, "delay_id")?;
3410                let delay_id = DelayId::from_str(&delay_id)?;
3411
3412                Ok(ExpiredTimer::Delay(ExpiredDelay {
3413                    execution_id,
3414                    join_set_id,
3415                    delay_id,
3416                }))
3417            };
3418
3419            match unpack() {
3420                Ok(timer) => expired_timers.push(timer),
3421                Err(err) => warn!("Skipping corrupted row in get_expired_timers (delays): {err:?}"),
3422            }
3423        }
3424
3425        // Expired Locks
3426        let rows = tx.query(
3427            &format!(
3428                "SELECT execution_id, last_lock_version, corresponding_version, intermittent_event_count, max_retries, retry_exp_backoff_millis, executor_id, run_id \
3429                 FROM t_state \
3430                 WHERE pending_expires_finished <= $1 AND state = '{STATE_LOCKED}'"
3431            ),
3432            &[&at]
3433        ).await?;
3434
3435        for row in rows {
3436            let unpack = || -> Result<ExpiredTimer, DbErrorGeneric> {
3437                let execution_id: String = get(&row, "execution_id")?;
3438                let execution_id = ExecutionId::from_str(&execution_id)?;
3439                let last_lock_version: i64 = get(&row, "last_lock_version")?;
3440                let last_lock_version = Version::try_from(last_lock_version)?;
3441
3442                let corresponding_version: i64 = get(&row, "corresponding_version")?;
3443                let corresponding_version = Version::try_from(corresponding_version)?;
3444
3445                let intermittent_event_count =
3446                    u32::try_from(get::<i64>(&row, "intermittent_event_count")?).map_err(|_| {
3447                        consistency_db_err("`intermittent_event_count` must not be negative")
3448                    })?;
3449
3450                let max_retries = get::<Option<i64>>(&row, "max_retries")?
3451                    .map(u32::try_from)
3452                    .transpose()
3453                    .map_err(|_| consistency_db_err("`max_retries` must not be negative"))?;
3454                let retry_exp_backoff_millis =
3455                    u32::try_from(get::<i64>(&row, "retry_exp_backoff_millis")?).map_err(|_| {
3456                        consistency_db_err("`retry_exp_backoff_millis` must not be negative")
3457                    })?;
3458                let executor_id: String = get(&row, "executor_id")?;
3459                let executor_id = ExecutorId::from_str(&executor_id)?;
3460                let run_id: String = get(&row, "run_id")?;
3461                let run_id = RunId::from_str(&run_id)?;
3462
3463                Ok(ExpiredTimer::Lock(ExpiredLock {
3464                    execution_id,
3465                    locked_at_version: last_lock_version,
3466                    next_version: corresponding_version.increment(),
3467                    intermittent_event_count,
3468                    max_retries,
3469                    retry_exp_backoff: Duration::from_millis(u64::from(retry_exp_backoff_millis)),
3470                    locked_by: LockedBy {
3471                        executor_id,
3472                        run_id,
3473                    },
3474                }))
3475            };
3476
3477            match unpack() {
3478                Ok(timer) => expired_timers.push(timer),
3479                Err(err) => warn!("Skipping corrupted row in get_expired_timers (locks): {err:?}"),
3480            }
3481        }
3482
3483        tx.commit().await?;
3484
3485        if !expired_timers.is_empty() {
3486            debug!("get_expired_timers found {expired_timers:?}");
3487        }
3488        Ok(expired_timers)
3489    }
3490
3491    async fn get_execution_event(
3492        &self,
3493        execution_id: &ExecutionId,
3494        version: &Version,
3495    ) -> Result<ExecutionEvent, DbErrorRead> {
3496        let mut client_guard = self.client.lock().await;
3497        let tx = client_guard.transaction().await?;
3498
3499        let event = get_execution_event(&tx, execution_id, version.0).await?;
3500
3501        tx.commit().await?;
3502        Ok(event)
3503    }
3504
3505    async fn get_pending_state(
3506        &self,
3507        execution_id: &ExecutionId,
3508    ) -> Result<ExecutionWithState, DbErrorRead> {
3509        let mut client_guard = self.client.lock().await;
3510        let tx = client_guard.transaction().await?;
3511
3512        let combined_state = get_combined_state(&tx, execution_id).await?;
3513
3514        tx.commit().await?;
3515        Ok(combined_state.execution_with_state)
3516    }
3517}
3518
3519#[async_trait]
3520impl DbExternalApi for PostgresConnection {
3521    #[instrument(skip(self))]
3522    async fn get_backtrace(
3523        &self,
3524        execution_id: &ExecutionId,
3525        filter: BacktraceFilter,
3526    ) -> Result<BacktraceInfo, DbErrorRead> {
3527        debug!("get_backtrace");
3528
3529        let mut client_guard = self.client.lock().await;
3530        let tx = client_guard.transaction().await?;
3531
3532        let mut params: Vec<Box<dyn tokio_postgres::types::ToSql + Sync + Send>> = Vec::new();
3533
3534        params.push(Box::new(execution_id.to_string())); // $1
3535        let p_execution_id_idx = format!("${}", params.len()); // $1
3536
3537        let mut sql = String::new();
3538        write!(
3539            &mut sql,
3540            "SELECT component_id, version_min_including, version_max_excluding, wasm_backtrace \
3541     FROM t_backtrace WHERE execution_id = {p_execution_id_idx}"
3542        )
3543        .unwrap();
3544
3545        match &filter {
3546            BacktraceFilter::Specific(version) => {
3547                params.push(Box::new(i64::from(version.0))); // $2
3548                let p_ver_idx = format!("${}", params.len()); // $2
3549                write!(
3550                    &mut sql,
3551                    " AND version_min_including <= {p_ver_idx} AND version_max_excluding > {p_ver_idx}"
3552                )
3553                .unwrap();
3554            }
3555            BacktraceFilter::First => {
3556                sql.push_str(" ORDER BY version_min_including LIMIT 1");
3557            }
3558            BacktraceFilter::Last => {
3559                sql.push_str(" ORDER BY version_min_including DESC LIMIT 1");
3560            }
3561        }
3562
3563        let params_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> =
3564            params.iter().map(|p| p.as_ref() as _).collect();
3565
3566        let row = tx.query_one(&sql, &params_refs).await?;
3567
3568        let component_id: Json<ComponentId> = get(&row, "component_id")?;
3569        let component_id = component_id.0;
3570
3571        let version_min_including = Version::try_from(get::<i64>(&row, "version_min_including")?)?;
3572
3573        let version_max_excluding = Version::try_from(get::<i64>(&row, "version_max_excluding")?)?;
3574
3575        // wasm_backtrace stored as JSONB
3576        let wasm_backtrace: Json<WasmBacktrace> = get(&row, "wasm_backtrace")?;
3577        let wasm_backtrace = wasm_backtrace.0;
3578
3579        tx.commit().await?;
3580
3581        Ok(BacktraceInfo {
3582            execution_id: execution_id.clone(),
3583            component_id,
3584            version_min_including,
3585            version_max_excluding,
3586            wasm_backtrace,
3587        })
3588    }
3589
3590    #[instrument(skip(self))]
3591    async fn list_executions(
3592        &self,
3593        filter: ListExecutionsFilter,
3594        pagination: ExecutionListPagination,
3595    ) -> Result<Vec<ExecutionWithState>, DbErrorGeneric> {
3596        let mut client_guard = self.client.lock().await;
3597        let tx = client_guard.transaction().await?;
3598
3599        let result = list_executions(&tx, filter, &pagination).await?;
3600
3601        tx.commit().await?;
3602        Ok(result)
3603    }
3604
3605    #[instrument(skip(self))]
3606    async fn list_execution_events(
3607        &self,
3608        execution_id: &ExecutionId,
3609        since: &Version,
3610        max_length: VersionType,
3611        include_backtrace_id: bool,
3612    ) -> Result<Vec<ExecutionEvent>, DbErrorRead> {
3613        let mut client_guard = self.client.lock().await;
3614        let tx = client_guard.transaction().await?;
3615
3616        let execution_events = list_execution_events(
3617            &tx,
3618            execution_id,
3619            since.0,
3620            since.0 + max_length,
3621            include_backtrace_id,
3622        )
3623        .await?;
3624
3625        tx.commit().await?;
3626        Ok(execution_events)
3627    }
3628
3629    #[instrument(skip(self))]
3630    async fn list_responses(
3631        &self,
3632        execution_id: &ExecutionId,
3633        pagination: Pagination<u32>,
3634    ) -> Result<Vec<ResponseWithCursor>, DbErrorRead> {
3635        let mut client_guard = self.client.lock().await;
3636        let tx = client_guard.transaction().await?;
3637
3638        let responses = list_responses(&tx, execution_id, Some(pagination)).await?;
3639
3640        tx.commit().await?;
3641        Ok(responses)
3642    }
3643
3644    #[instrument(skip(self))]
3645    async fn list_execution_events_responses(
3646        &self,
3647        execution_id: &ExecutionId,
3648        req_since: &Version,
3649        req_max_length: VersionType,
3650        req_include_backtrace_id: bool,
3651        resp_pagination: Pagination<u32>,
3652    ) -> Result<ExecutionWithStateRequestsResponses, DbErrorRead> {
3653        let mut client_guard = self.client.lock().await;
3654        let tx = client_guard.transaction().await?;
3655
3656        let combined_state = get_combined_state(&tx, execution_id).await?;
3657
3658        let events = list_execution_events(
3659            &tx,
3660            execution_id,
3661            req_since.0,
3662            req_since.0 + req_max_length,
3663            req_include_backtrace_id,
3664        )
3665        .await?;
3666
3667        let responses = list_responses(&tx, execution_id, Some(resp_pagination)).await?;
3668
3669        tx.commit().await?;
3670
3671        Ok(ExecutionWithStateRequestsResponses {
3672            execution_with_state: combined_state.execution_with_state,
3673            events,
3674            responses,
3675        })
3676    }
3677
3678    #[instrument(skip(self))]
3679    async fn upgrade_execution_component(
3680        &self,
3681        execution_id: &ExecutionId,
3682        old: &InputContentDigest,
3683        new: &InputContentDigest,
3684    ) -> Result<(), DbErrorWrite> {
3685        let mut client_guard = self.client.lock().await;
3686        let tx = client_guard.transaction().await?;
3687
3688        upgrade_execution_component(&tx, execution_id, old, new).await?;
3689
3690        tx.commit().await?;
3691        Ok(())
3692    }
3693}
3694
3695#[async_trait]
3696impl DbPoolCloseable for PostgresPool {
3697    async fn close(&self) {
3698        self.pool.close();
3699    }
3700}
3701
3702#[cfg(feature = "test")]
3703#[async_trait]
3704impl concepts::storage::DbConnectionTest for PostgresConnection {
3705    #[instrument(level = Level::DEBUG, skip(self, response_event), fields(join_set_id = %response_event.join_set_id))]
3706    async fn append_response(
3707        &self,
3708        created_at: DateTime<Utc>,
3709        execution_id: ExecutionId,
3710        response_event: JoinSetResponseEvent,
3711    ) -> Result<(), DbErrorWrite> {
3712        debug!("append_response");
3713        let event = JoinSetResponseEventOuter {
3714            created_at,
3715            event: response_event,
3716        };
3717
3718        let mut client_guard = self.client.lock().await;
3719        let tx = client_guard.transaction().await?;
3720
3721        let notifier = append_response(&tx, &execution_id, event).await?;
3722
3723        tx.commit().await?;
3724        drop(client_guard);
3725
3726        self.notify_all(vec![notifier], created_at);
3727        Ok(())
3728    }
3729
3730    #[instrument(level = Level::DEBUG, skip(self))]
3731    async fn get(
3732        &self,
3733        execution_id: &ExecutionId,
3734    ) -> Result<concepts::storage::ExecutionLog, DbErrorRead> {
3735        trace!("get");
3736        let mut client_guard = self.client.lock().await;
3737        let tx = client_guard.transaction().await?;
3738
3739        let res = get_execution_log(&tx, execution_id).await?;
3740
3741        tx.commit().await?;
3742        Ok(res)
3743    }
3744}
3745
3746#[cfg(feature = "test")]
3747impl PostgresPool {
3748    pub async fn drop_database(&self) {
3749        let mut cfg = Config::new();
3750        cfg.host = Some(self.config.host.clone());
3751        cfg.user = Some(self.config.user.clone());
3752        cfg.password = Some(self.config.password.clone());
3753        cfg.dbname = Some(ADMIN_DB_NAME.into());
3754        cfg.manager = Some(ManagerConfig {
3755            recycling_method: RecyclingMethod::Fast,
3756        });
3757
3758        let pool = cfg
3759            .create_pool(None, NoTls)
3760            .map_err(|err| {
3761                error!("Cannot create the default pool - {err:?}");
3762                InitializationError
3763            })
3764            .unwrap();
3765
3766        let client = pool
3767            .get()
3768            .await
3769            .map_err(|err| {
3770                error!("Cannot get a connection from the default pool - {err:?}");
3771                InitializationError
3772            })
3773            .unwrap();
3774        for _ in 0..3 {
3775            let res = client
3776                .execute(&format!("DROP DATABASE {}", self.config.db_name), &[])
3777                .await; // Waits 1s on error, no need to sleep more.
3778            if res.is_ok() {
3779                debug!("Database '{}' dropped.", self.config.db_name);
3780                return;
3781            }
3782            debug!("Dropping db failed - {res:?}",);
3783        }
3784        warn!("Did not drop database {}", self.config.db_name);
3785    }
3786}