obeli_sk_db_postgres/
postgres_dao.rs

1use crate::postgres_dao::ddl::{ADMIN_DB_NAME, T_METADATA_EXPECTED_SCHEMA_VERSION};
2use async_trait::async_trait;
3use chrono::{DateTime, Utc};
4use concepts::{
5    ComponentId, ComponentRetryConfig, ContentDigest, ExecutionId, FunctionFqn, JoinSetId,
6    StrVariant, SupportedFunctionReturnValue,
7    component_id::{Digest, InputContentDigest},
8    prefixed_ulid::{DelayId, ExecutionIdDerived, ExecutorId, RunId},
9    storage::{
10        AppendBatchResponse, AppendDelayResponseOutcome, AppendEventsToExecution, AppendRequest,
11        AppendResponse, AppendResponseToExecution, BacktraceFilter, BacktraceInfo, CreateRequest,
12        DUMMY_CREATED, DUMMY_HISTORY_EVENT, DbConnection, DbErrorGeneric, DbErrorRead,
13        DbErrorReadWithTimeout, DbErrorWrite, DbErrorWriteNonRetriable, DbExecutor, DbExternalApi,
14        DbPool, DbPoolCloseable, ExecutionEvent, ExecutionListPagination, ExecutionRequest,
15        ExecutionWithState, ExpiredDelay, ExpiredLock, ExpiredTimer, HISTORY_EVENT_TYPE_JOIN_NEXT,
16        HistoryEvent, JoinSetRequest, JoinSetResponse, JoinSetResponseEvent,
17        JoinSetResponseEventOuter, LockPendingResponse, Locked, LockedBy, LockedExecution,
18        Pagination, PendingState, PendingStateFinished, PendingStateFinishedResultKind,
19        PendingStateLocked, ResponseWithCursor, STATE_BLOCKED_BY_JOIN_SET, STATE_FINISHED,
20        STATE_LOCKED, STATE_PENDING_AT, TimeoutOutcome, Version, VersionType, WasmBacktrace,
21    },
22};
23use deadpool_postgres::{Client, Config, ManagerConfig, Pool, RecyclingMethod};
24use hashbrown::HashMap;
25use std::fmt::Write as _;
26use std::{collections::VecDeque, pin::Pin, str::FromStr as _, sync::Arc, time::Duration};
27use tokio::sync::{mpsc, oneshot};
28use tokio_postgres::{
29    NoTls, Transaction,
30    types::{Json, ToSql},
31};
32use tracing::{Level, debug, error, info, instrument, trace, warn};
33
34macro_rules! get {
35    ($row:expr, $col:expr) => {
36        $row.try_get($col).map_err(|e| {
37            consistency_db_err(format!("Failed to retrieve column '{}': {}", $col, e))
38        })?
39    };
40    // Variant for specifying type explicitly if inference fails
41    ($row:expr, $col:expr, $ty:ty) => {
42        $row.try_get::<_, $ty>($col).map_err(|e| {
43            consistency_db_err(format!("Failed to retrieve column '{}': {}", $col, e))
44        })?
45    };
46}
47
48mod ddl {
49    use concepts::storage::HISTORY_EVENT_TYPE_JOIN_NEXT;
50
51    pub const ADMIN_DB_NAME: &str = "postgres";
52
53    pub const T_METADATA_EXPECTED_SCHEMA_VERSION: i32 = 1;
54
55    // CREATE_TABLE_T_METADATA
56    pub const CREATE_TABLE_T_METADATA: &str = r"
57CREATE TABLE IF NOT EXISTS t_metadata (
58    id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
59    schema_version INTEGER NOT NULL,
60    created_at TIMESTAMPTZ NOT NULL
61);
62";
63
64    // CREATE_TABLE_T_EXECUTION_LOG
65    pub const CREATE_TABLE_T_EXECUTION_LOG: &str = r"
66CREATE TABLE IF NOT EXISTS t_execution_log (
67    execution_id TEXT NOT NULL,
68    created_at TIMESTAMPTZ NOT NULL,
69    json_value JSON NOT NULL,
70    version BIGINT NOT NULL CHECK (version >= 0),
71    variant TEXT NOT NULL,
72    join_set_id TEXT,
73    history_event_type TEXT GENERATED ALWAYS AS (json_value #>> '{HistoryEvent,event,type}') STORED,
74    PRIMARY KEY (execution_id, version)
75);
76";
77
78    // Indexes for t_execution_log
79    pub const CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_VERSION: &str = r"
80CREATE INDEX IF NOT EXISTS idx_t_execution_log_execution_id_version ON t_execution_log (execution_id, version);
81";
82
83    pub const CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_VARIANT: &str = r"
84CREATE INDEX IF NOT EXISTS idx_t_execution_log_execution_id_variant ON t_execution_log (execution_id, variant);
85";
86
87    pub const CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_JOIN_SET: &str = const_format::formatcp!(
88        "CREATE INDEX IF NOT EXISTS idx_t_execution_log_execution_id_join_set ON t_execution_log (execution_id, join_set_id, history_event_type) WHERE history_event_type='{}';",
89        HISTORY_EVENT_TYPE_JOIN_NEXT
90    );
91
92    // CREATE_TABLE_T_JOIN_SET_RESPONSE
93    pub const CREATE_TABLE_T_JOIN_SET_RESPONSE: &str = r"
94CREATE TABLE IF NOT EXISTS t_join_set_response (
95    id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
96    created_at TIMESTAMPTZ NOT NULL,
97    execution_id TEXT NOT NULL,
98    join_set_id TEXT NOT NULL,
99
100    delay_id TEXT,
101    delay_success BOOLEAN,
102
103    child_execution_id TEXT,
104    finished_version BIGINT CHECK (finished_version >= 0),
105
106    UNIQUE (execution_id, join_set_id, delay_id, child_execution_id)
107);
108";
109
110    // Indexes for t_join_set_response
111    pub const CREATE_INDEX_IDX_T_JOIN_SET_RESPONSE_EXECUTION_ID_ID: &str = r"
112CREATE INDEX IF NOT EXISTS idx_t_join_set_response_execution_id_id ON t_join_set_response (execution_id, id);
113";
114
115    pub const CREATE_INDEX_IDX_JOIN_SET_RESPONSE_UNIQUE_CHILD_ID: &str = r"
116CREATE UNIQUE INDEX IF NOT EXISTS idx_join_set_response_unique_child_id
117ON t_join_set_response (child_execution_id) WHERE child_execution_id IS NOT NULL;
118";
119
120    pub const CREATE_INDEX_IDX_JOIN_SET_RESPONSE_UNIQUE_DELAY_ID: &str = r"
121CREATE UNIQUE INDEX IF NOT EXISTS idx_join_set_response_unique_delay_id
122ON t_join_set_response (delay_id) WHERE delay_id IS NOT NULL;
123";
124
125    // CREATE_TABLE_T_STATE
126    pub const CREATE_TABLE_T_STATE: &str = r"
127CREATE TABLE IF NOT EXISTS t_state (
128    execution_id TEXT NOT NULL,
129    is_top_level BOOLEAN NOT NULL,
130    corresponding_version BIGINT NOT NULL CHECK (corresponding_version >= 0),
131    ffqn TEXT NOT NULL,
132    created_at TIMESTAMPTZ NOT NULL,
133    component_id_input_digest BYTEA NOT NULL,
134    first_scheduled_at TIMESTAMPTZ NOT NULL,
135
136    pending_expires_finished TIMESTAMPTZ NOT NULL,
137    state TEXT NOT NULL,
138    updated_at TIMESTAMPTZ NOT NULL,
139    intermittent_event_count BIGINT NOT NULL CHECK (intermittent_event_count >=0),
140
141    max_retries BIGINT CHECK (max_retries >= 0),
142    retry_exp_backoff_millis BIGINT CHECK (retry_exp_backoff_millis >= 0),
143    last_lock_version BIGINT CHECK (last_lock_version >= 0),
144    executor_id TEXT,
145    run_id TEXT,
146
147    join_set_id TEXT,
148    join_set_closing BOOLEAN,
149
150    result_kind JSONB,
151
152    PRIMARY KEY (execution_id)
153);
154";
155
156    // Indexes for t_state
157    pub const IDX_T_STATE_LOCK_PENDING: &str = r"
158CREATE INDEX IF NOT EXISTS idx_t_state_lock_pending ON t_state (state, pending_expires_finished, ffqn);
159";
160
161    pub const IDX_T_STATE_EXPIRED_TIMERS: &str = r"
162CREATE INDEX IF NOT EXISTS idx_t_state_expired_timers ON t_state (pending_expires_finished) WHERE executor_id IS NOT NULL;
163";
164
165    pub const IDX_T_STATE_EXECUTION_ID_IS_TOP_LEVEL: &str = r"
166CREATE INDEX IF NOT EXISTS idx_t_state_execution_id_is_root ON t_state (execution_id, is_top_level);
167";
168
169    pub const IDX_T_STATE_FFQN: &str = r"
170CREATE INDEX IF NOT EXISTS idx_t_state_ffqn ON t_state (ffqn);
171";
172
173    pub const IDX_T_STATE_CREATED_AT: &str = r"
174CREATE INDEX IF NOT EXISTS idx_t_state_created_at ON t_state (created_at);
175";
176
177    // CREATE_TABLE_T_DELAY
178    pub const CREATE_TABLE_T_DELAY: &str = r"
179CREATE TABLE IF NOT EXISTS t_delay (
180    execution_id TEXT NOT NULL,
181    join_set_id TEXT NOT NULL,
182    delay_id TEXT NOT NULL,
183    expires_at TIMESTAMPTZ NOT NULL,
184    PRIMARY KEY (execution_id, join_set_id, delay_id)
185);
186";
187
188    // CREATE_TABLE_T_BACKTRACE
189    pub const CREATE_TABLE_T_BACKTRACE: &str = r"
190CREATE TABLE IF NOT EXISTS t_backtrace (
191    execution_id TEXT NOT NULL,
192    component_id JSONB NOT NULL,
193    version_min_including BIGINT NOT NULL CHECK (version_min_including >= 0),
194    version_max_excluding BIGINT NOT NULL CHECK (version_max_excluding >= 0),
195    wasm_backtrace JSONB NOT NULL,
196    PRIMARY KEY (execution_id, version_min_including, version_max_excluding)
197);
198";
199
200    pub const IDX_T_BACKTRACE_EXECUTION_ID_VERSION: &str = r"
201CREATE INDEX IF NOT EXISTS idx_t_backtrace_execution_id_version ON t_backtrace (execution_id, version_min_including, version_max_excluding);
202";
203}
204
205#[derive(derive_more::Debug, Clone)]
206pub struct PostgresConfig {
207    pub host: String,
208    pub user: String,
209    #[debug(skip)]
210    pub password: String,
211    pub db_name: String,
212}
213
214#[derive(Debug, thiserror::Error)]
215#[error("initialization error")]
216pub struct InitializationError;
217
218async fn create_database(
219    config: &PostgresConfig,
220) -> Result<DbInitialzationOutcome, InitializationError> {
221    let mut cfg = Config::new();
222    cfg.host = Some(config.host.clone());
223    cfg.user = Some(config.user.clone());
224    cfg.password = Some(config.password.clone());
225    cfg.dbname = Some(ADMIN_DB_NAME.into());
226    cfg.manager = Some(ManagerConfig {
227        recycling_method: RecyclingMethod::Fast,
228    });
229
230    let pool = cfg.create_pool(None, NoTls).map_err(|err| {
231        error!("Cannot create the default pool - {err:?}");
232        InitializationError
233    })?;
234
235    let client = pool.get().await.map_err(|err| {
236        error!("Cannot get a connection from the default pool - {err:?}");
237        InitializationError
238    })?;
239
240    let row = client
241        .query_opt(
242            &format!(
243                "SELECT 1 FROM pg_database WHERE datname = '{}'",
244                config.db_name
245            ),
246            &[],
247        )
248        .await
249        .map_err(|err| {
250            error!("Cannot select from the default database - {err:?}");
251            InitializationError
252        })?;
253
254    let outcome = if row.is_none() {
255        client
256            .execute(&format!("CREATE DATABASE {}", config.db_name), &[])
257            .await
258            .map_err(|err| {
259                error!("Cannot create the database - {err:?}");
260                InitializationError
261            })?;
262
263        DbInitialzationOutcome::Created
264    } else {
265        DbInitialzationOutcome::Existing
266    };
267    match outcome {
268        DbInitialzationOutcome::Created => info!("Database '{}' created.", config.db_name),
269        DbInitialzationOutcome::Existing => info!("Database '{}' exists.", config.db_name),
270    }
271    Ok(outcome)
272}
273
274// All mutexes here have a very short critical section completely controlled by this module, thus using std mutex.
275type ResponseSubscribers =
276    Arc<std::sync::Mutex<HashMap<ExecutionId, (oneshot::Sender<JoinSetResponseEventOuter>, u64)>>>;
277type PendingSubscribers = Arc<std::sync::Mutex<PendingFfqnSubscribersHolder>>;
278type ExecutionFinishedSubscribers = std::sync::Mutex<
279    HashMap<ExecutionId, HashMap<u64, oneshot::Sender<SupportedFunctionReturnValue>>>,
280>;
281
282fn map_pool_error(err: deadpool_postgres::PoolError) -> DbErrorGeneric {
283    match err {
284        deadpool_postgres::PoolError::Backend(err) => DbErrorGeneric::from(err),
285        deadpool_postgres::PoolError::Closed => DbErrorGeneric::Close,
286        other => DbErrorGeneric::Uncategorized(other.to_string().into()),
287    }
288}
289
290pub struct PostgresPool {
291    pool: Pool,
292    response_subscribers: ResponseSubscribers,
293    pending_subscribers: PendingSubscribers,
294    execution_finished_subscribers: Arc<ExecutionFinishedSubscribers>,
295    #[allow(dead_code)] // only for deleting the db in tests.
296    config: PostgresConfig,
297}
298
299#[async_trait]
300impl DbPool for PostgresPool {
301    async fn db_exec_conn(&self) -> Result<Box<dyn DbExecutor>, DbErrorGeneric> {
302        let client = self.pool.get().await.map_err(map_pool_error)?;
303
304        Ok(Box::new(PostgresConnection {
305            client: tokio::sync::Mutex::new(client),
306            response_subscribers: self.response_subscribers.clone(),
307            pending_subscribers: self.pending_subscribers.clone(),
308            execution_finished_subscribers: self.execution_finished_subscribers.clone(),
309        }))
310    }
311
312    async fn connection(&self) -> Result<Box<dyn DbConnection>, DbErrorGeneric> {
313        let client = self.pool.get().await.map_err(map_pool_error)?;
314
315        Ok(Box::new(PostgresConnection {
316            client: tokio::sync::Mutex::new(client),
317            response_subscribers: self.response_subscribers.clone(),
318            pending_subscribers: self.pending_subscribers.clone(),
319            execution_finished_subscribers: self.execution_finished_subscribers.clone(),
320        }))
321    }
322
323    async fn external_api_conn(&self) -> Result<Box<dyn DbExternalApi>, DbErrorGeneric> {
324        let client = self.pool.get().await.map_err(map_pool_error)?;
325
326        Ok(Box::new(PostgresConnection {
327            client: tokio::sync::Mutex::new(client),
328            response_subscribers: self.response_subscribers.clone(),
329            pending_subscribers: self.pending_subscribers.clone(),
330            execution_finished_subscribers: self.execution_finished_subscribers.clone(),
331        }))
332    }
333
334    #[cfg(feature = "test")]
335    async fn connection_test(
336        &self,
337    ) -> Result<Box<dyn concepts::storage::DbConnectionTest>, DbErrorGeneric> {
338        let client = self.pool.get().await.map_err(map_pool_error)?;
339
340        Ok(Box::new(PostgresConnection {
341            client: tokio::sync::Mutex::new(client),
342            response_subscribers: self.response_subscribers.clone(),
343            pending_subscribers: self.pending_subscribers.clone(),
344            execution_finished_subscribers: self.execution_finished_subscribers.clone(),
345        }))
346    }
347}
348
349pub struct PostgresConnection {
350    client: tokio::sync::Mutex<Client>, // Callers should not hold onto a connection for too long but it is not controlled by this module, thus tokio mutex.
351    response_subscribers: ResponseSubscribers,
352    pending_subscribers: PendingSubscribers,
353    execution_finished_subscribers: Arc<ExecutionFinishedSubscribers>,
354}
355
356#[derive(Default)]
357struct PendingFfqnSubscribersHolder {
358    by_ffqns: HashMap<FunctionFqn, (mpsc::Sender<()>, u64)>,
359    by_component: HashMap<InputContentDigest /* input digest */, (mpsc::Sender<()>, u64)>,
360}
361impl PendingFfqnSubscribersHolder {
362    fn notify(&self, notifier: &NotifierPendingAt) {
363        if let Some((subscription, _)) = self.by_ffqns.get(&notifier.ffqn) {
364            debug!("Notifying pending subscriber by ffqn");
365            // Does not block
366            let _ = subscription.try_send(());
367        }
368        if let Some((subscription, _)) = self.by_component.get(&notifier.component_input_digest) {
369            debug!("Notifying pending subscriber by component");
370            // Does not block
371            let _ = subscription.try_send(());
372        }
373    }
374
375    fn insert_ffqn(&mut self, ffqn: FunctionFqn, value: (mpsc::Sender<()>, u64)) {
376        self.by_ffqns.insert(ffqn, value);
377    }
378
379    fn remove_ffqn(&mut self, ffqn: &FunctionFqn) -> Option<(mpsc::Sender<()>, u64)> {
380        self.by_ffqns.remove(ffqn)
381    }
382
383    fn insert_by_component(
384        &mut self,
385        input_content_digest: InputContentDigest,
386        value: (mpsc::Sender<()>, u64),
387    ) {
388        self.by_component.insert(input_content_digest, value);
389    }
390
391    fn remove_by_component(
392        &mut self,
393        input_content_digest: &InputContentDigest,
394    ) -> Option<(mpsc::Sender<()>, u64)> {
395        self.by_component.remove(input_content_digest)
396    }
397}
398
399#[derive(Debug, Clone, Copy, PartialEq, Eq)]
400pub enum ProvisionPolicy {
401    Never,
402    /// Create database if it does not exist.
403    Auto,
404}
405
406#[derive(Debug, Clone, Copy, PartialEq, Eq)]
407pub enum DbInitialzationOutcome {
408    Created,
409    Existing,
410}
411
412impl PostgresPool {
413    #[instrument(skip_all, name = "postgres_new")]
414    pub async fn new(
415        config: PostgresConfig,
416        provision_policy: ProvisionPolicy,
417    ) -> Result<PostgresPool, InitializationError> {
418        Self::new_with_outcome(config, provision_policy)
419            .await
420            .map(|(db, _)| db)
421    }
422
423    pub async fn new_with_outcome(
424        config: PostgresConfig,
425        provision_policy: ProvisionPolicy,
426    ) -> Result<(PostgresPool, DbInitialzationOutcome), InitializationError> {
427        let outcome = if provision_policy == ProvisionPolicy::Auto {
428            create_database(&config).await?
429        } else {
430            DbInitialzationOutcome::Existing
431        };
432        let mut cfg = Config::new();
433        cfg.host = Some(config.host.clone());
434        cfg.user = Some(config.user.clone());
435        cfg.password = Some(config.password.clone());
436        cfg.dbname = Some(config.db_name.clone());
437        cfg.manager = Some(ManagerConfig {
438            recycling_method: RecyclingMethod::Fast,
439        });
440
441        let pool = cfg.create_pool(None, NoTls).map_err(|err| {
442            error!("Cannot create the database pool - {err:?}");
443            InitializationError
444        })?;
445        let client = pool.get().await.map_err(|err| {
446            error!("Cannot get a connection from the database pool - {err:?}");
447            InitializationError
448        })?;
449
450        let statements = vec![
451            ddl::CREATE_TABLE_T_METADATA,
452            ddl::CREATE_TABLE_T_EXECUTION_LOG,
453            ddl::CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_VERSION,
454            ddl::CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_VARIANT,
455            ddl::CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_JOIN_SET,
456            ddl::CREATE_TABLE_T_JOIN_SET_RESPONSE,
457            ddl::CREATE_INDEX_IDX_T_JOIN_SET_RESPONSE_EXECUTION_ID_ID,
458            ddl::CREATE_INDEX_IDX_JOIN_SET_RESPONSE_UNIQUE_CHILD_ID,
459            ddl::CREATE_INDEX_IDX_JOIN_SET_RESPONSE_UNIQUE_DELAY_ID,
460            ddl::CREATE_TABLE_T_STATE,
461            ddl::IDX_T_STATE_LOCK_PENDING,
462            ddl::IDX_T_STATE_EXPIRED_TIMERS,
463            ddl::IDX_T_STATE_EXECUTION_ID_IS_TOP_LEVEL,
464            ddl::IDX_T_STATE_FFQN,
465            ddl::IDX_T_STATE_CREATED_AT,
466            ddl::CREATE_TABLE_T_DELAY,
467            ddl::CREATE_TABLE_T_BACKTRACE,
468            ddl::IDX_T_BACKTRACE_EXECUTION_ID_VERSION,
469        ];
470
471        // Combine into one batch execution for atomicity per round-trip (or efficiency)
472        let batch_sql = statements.join("\n");
473        client.batch_execute(&batch_sql).await.map_err(|err| {
474            error!("Cannot run the DDL import - {err:?}");
475            InitializationError
476        })?;
477
478        let row = client
479            .query_opt(
480                "SELECT schema_version FROM t_metadata ORDER BY id DESC LIMIT 1",
481                &[],
482            )
483            .await
484            .map_err(|err| {
485                error!("Cannot select schema version - {err:?}");
486                InitializationError
487            })?;
488
489        // Postgres INTEGER maps to i32
490        let actual_version = match row {
491            Some(r) => Some(r.try_get::<_, i32>("schema_version").map_err(|e| {
492                error!("Failed to get schema_version column: {e}");
493                InitializationError
494            })?),
495            None => None,
496        };
497
498        match actual_version {
499            None => {
500                client
501                    .execute(
502                        "INSERT INTO t_metadata (schema_version, created_at) VALUES ($1, $2)",
503                        &[&(T_METADATA_EXPECTED_SCHEMA_VERSION), &Utc::now()],
504                    )
505                    .await
506                    .map_err(|err| {
507                        error!("Cannot insert schema version - {err:?}");
508                        InitializationError
509                    })?;
510            }
511            Some(actual_version) => {
512                // Fail on unexpected `schema_version`.
513                if (actual_version) != T_METADATA_EXPECTED_SCHEMA_VERSION {
514                    error!(
515                        "Wrong schema version, expected {T_METADATA_EXPECTED_SCHEMA_VERSION}, got {actual_version}"
516                    );
517                    return Err(InitializationError);
518                }
519            }
520        }
521
522        debug!("Database schema initialized.");
523
524        Ok((
525            PostgresPool {
526                pool,
527                execution_finished_subscribers: Arc::default(),
528                pending_subscribers: Arc::default(),
529                response_subscribers: Arc::default(),
530                config,
531            },
532            outcome,
533        ))
534    }
535}
536
537fn consistency_db_err(err: impl Into<StrVariant>) -> DbErrorGeneric {
538    let err = err.into();
539    warn!(
540        backtrace = %std::backtrace::Backtrace::capture(),
541        "Consistency error: {err}"
542    );
543    DbErrorGeneric::Uncategorized(err)
544}
545
546#[derive(Debug)]
547struct CombinedStateDTO {
548    state: String,
549    ffqn: String,
550    component_id_input_digest: InputContentDigest,
551    pending_expires_finished: DateTime<Utc>,
552    // Locked:
553    last_lock_version: Option<Version>,
554    executor_id: Option<ExecutorId>,
555    run_id: Option<RunId>,
556    // Blocked by join set:
557    join_set_id: Option<JoinSetId>,
558    join_set_closing: Option<bool>,
559    // Finished:
560    result_kind: Option<PendingStateFinishedResultKind>,
561}
562#[derive(Debug)]
563struct CombinedState {
564    ffqn: FunctionFqn,
565    pending_state: PendingState,
566    corresponding_version: Version,
567}
568
569impl CombinedState {
570    fn new(dto: CombinedStateDTO, corresponding_version: Version) -> Result<Self, DbErrorGeneric> {
571        let ffqn = FunctionFqn::from_str(&dto.ffqn).map_err(|parse_err| {
572            error!("Error parsing ffqn of {dto:?} - {parse_err:?}");
573            consistency_db_err("invalid ffqn value in `t_state`")
574        })?;
575        let pending_state = match dto {
576            // Pending - just created
577            CombinedStateDTO {
578                state,
579                ffqn: _,
580                component_id_input_digest,
581                pending_expires_finished: scheduled_at,
582                last_lock_version: None,
583                executor_id: None,
584                run_id: None,
585                join_set_id: None,
586                join_set_closing: None,
587                result_kind: None,
588            } if state == STATE_PENDING_AT => PendingState::PendingAt {
589                scheduled_at,
590                last_lock: None,
591                component_id_input_digest,
592            },
593            // Pending, previously locked
594            CombinedStateDTO {
595                state,
596                ffqn: _,
597                component_id_input_digest,
598                pending_expires_finished: scheduled_at,
599                last_lock_version: None,
600                executor_id: Some(executor_id),
601                run_id: Some(run_id),
602                join_set_id: None,
603                join_set_closing: None,
604                result_kind: None,
605            } if state == STATE_PENDING_AT => PendingState::PendingAt {
606                scheduled_at,
607                last_lock: Some(LockedBy {
608                    executor_id,
609                    run_id,
610                }),
611                component_id_input_digest,
612            },
613            CombinedStateDTO {
614                state,
615                ffqn: _,
616                component_id_input_digest,
617                pending_expires_finished: lock_expires_at,
618                last_lock_version: Some(_),
619                executor_id: Some(executor_id),
620                run_id: Some(run_id),
621                join_set_id: None,
622                join_set_closing: None,
623                result_kind: None,
624            } if state == STATE_LOCKED => PendingState::Locked(PendingStateLocked {
625                locked_by: LockedBy {
626                    executor_id,
627                    run_id,
628                },
629                lock_expires_at,
630                component_id_input_digest,
631            }),
632            CombinedStateDTO {
633                state,
634                ffqn: _,
635                component_id_input_digest,
636                pending_expires_finished: lock_expires_at,
637                last_lock_version: None,
638                executor_id: _,
639                run_id: _,
640                join_set_id: Some(join_set_id),
641                join_set_closing: Some(join_set_closing),
642                result_kind: None,
643            } if state == STATE_BLOCKED_BY_JOIN_SET => PendingState::BlockedByJoinSet {
644                join_set_id: join_set_id.clone(),
645                closing: join_set_closing,
646                lock_expires_at,
647                component_id_input_digest,
648            },
649            CombinedStateDTO {
650                state,
651                ffqn: _,
652                component_id_input_digest,
653                pending_expires_finished: finished_at,
654                last_lock_version: None,
655                executor_id: None,
656                run_id: None,
657                join_set_id: None,
658                join_set_closing: None,
659                result_kind: Some(result_kind),
660            } if state == STATE_FINISHED => PendingState::Finished {
661                finished: PendingStateFinished {
662                    finished_at,
663                    version: corresponding_version.0,
664                    result_kind,
665                },
666                component_id_input_digest,
667            },
668
669            _ => {
670                error!("Cannot deserialize pending state from  {dto:?}");
671                return Err(consistency_db_err("invalid `t_state`"));
672            }
673        };
674        Ok(Self {
675            ffqn,
676            pending_state,
677            corresponding_version,
678        })
679    }
680
681    fn get_next_version_assert_not_finished(&self) -> Version {
682        assert!(!self.pending_state.is_finished());
683        self.corresponding_version.increment()
684    }
685
686    #[cfg(feature = "test")]
687    fn get_next_version_or_finished(&self) -> Version {
688        if self.pending_state.is_finished() {
689            self.corresponding_version.clone()
690        } else {
691            self.corresponding_version.increment()
692        }
693    }
694}
695
696#[derive(Debug)]
697struct NotifierPendingAt {
698    scheduled_at: DateTime<Utc>,
699    ffqn: FunctionFqn,
700    component_input_digest: InputContentDigest,
701}
702
703#[derive(Debug)]
704struct NotifierExecutionFinished {
705    execution_id: ExecutionId,
706    retval: SupportedFunctionReturnValue,
707}
708
709#[derive(Debug, Default)]
710struct AppendNotifier {
711    pending_at: Option<NotifierPendingAt>,
712    execution_finished: Option<NotifierExecutionFinished>,
713    response: Option<(ExecutionId, JoinSetResponseEventOuter)>,
714}
715
716#[derive(Debug, Clone)]
717struct DelayReq {
718    join_set_id: JoinSetId,
719    delay_id: DelayId,
720    expires_at: DateTime<Utc>,
721}
722
723async fn fetch_created_event(
724    tx: &Transaction<'_>,
725    execution_id: &ExecutionId,
726) -> Result<CreateRequest, DbErrorRead> {
727    let stmt = "SELECT created_at, json_value FROM t_execution_log WHERE \
728                execution_id = $1 AND version = 0";
729
730    let row = tx.query_one(stmt, &[&execution_id.to_string()]).await?;
731
732    let created_at = get!(row, "created_at");
733    let event: Json<ExecutionRequest> = get!(row, "json_value");
734    let event = event.0;
735
736    if let ExecutionRequest::Created {
737        ffqn,
738        params,
739        parent,
740        scheduled_at,
741        component_id,
742        metadata,
743        scheduled_by,
744    } = event
745    {
746        Ok(CreateRequest {
747            created_at,
748            execution_id: execution_id.clone(),
749            ffqn,
750            params,
751            parent,
752            scheduled_at,
753            component_id,
754            metadata,
755            scheduled_by,
756        })
757    } else {
758        error!("Row with version=0 must be a `Created` event - {event:?}");
759        Err(consistency_db_err("expected `Created` event").into())
760    }
761}
762
763fn check_expected_next_and_appending_version(
764    expected_version: &Version,
765    appending_version: &Version,
766) -> Result<(), DbErrorWrite> {
767    if *expected_version != *appending_version {
768        debug!(
769            "Version conflict - expected: {expected_version:?}, appending: {appending_version:?}"
770        );
771        return Err(DbErrorWrite::NonRetriable(
772            DbErrorWriteNonRetriable::VersionConflict {
773                expected: expected_version.clone(),
774                requested: appending_version.clone(),
775            },
776        ));
777    }
778    Ok(())
779}
780
781#[instrument(level = Level::TRACE, skip_all, fields(execution_id = %req.execution_id))]
782async fn create_inner(
783    tx: &Transaction<'_>,
784    req: CreateRequest,
785) -> Result<(AppendResponse, AppendNotifier), DbErrorWrite> {
786    trace!("create_inner");
787
788    let version = Version::default();
789    let execution_id = req.execution_id.clone();
790    let execution_id_str = execution_id.to_string();
791    let ffqn = req.ffqn.clone();
792    let created_at = req.created_at;
793    let scheduled_at = req.scheduled_at;
794    let component_id = req.component_id.clone();
795
796    let event = ExecutionRequest::from(req);
797    let event = Json(event);
798
799    tx.execute(
800        "INSERT INTO t_execution_log (
801            execution_id, created_at, version, json_value, variant, join_set_id
802        ) VALUES ($1, $2, $3, $4, $5, $6)",
803        &[
804            &execution_id_str,
805            &created_at,
806            &i64::from(version.0), // BIGINT
807            &event,
808            &event.0.variant(),
809            &event.0.join_set_id().map(std::string::ToString::to_string),
810        ],
811    )
812    .await?;
813
814    let pending_at = {
815        debug!("Creating with `Pending(`{scheduled_at:?}`)");
816
817        tx.execute(
818            r"
819            INSERT INTO t_state (
820                execution_id,
821                is_top_level,
822                corresponding_version,
823                pending_expires_finished,
824                ffqn,
825                state,
826                created_at,
827                component_id_input_digest,
828                updated_at,
829                first_scheduled_at,
830                intermittent_event_count
831            ) VALUES (
832                $1, $2, $3, $4, $5, $6, $7, $8, CURRENT_TIMESTAMP, $9, 0
833            )",
834            &[
835                &execution_id_str,
836                &execution_id.is_top_level(),
837                &i64::from(version.0),
838                &scheduled_at,
839                &ffqn.to_string(),
840                &STATE_PENDING_AT,
841                &created_at,
842                &component_id.input_digest.as_slice(),
843                &scheduled_at,
844            ],
845        )
846        .await?;
847
848        AppendNotifier {
849            pending_at: Some(NotifierPendingAt {
850                scheduled_at,
851                ffqn,
852                component_input_digest: component_id.input_digest,
853            }),
854            execution_finished: None,
855            response: None,
856        }
857    };
858
859    let next_version = Version::new(version.0 + 1);
860    Ok((next_version, pending_at))
861}
862
863#[instrument(level = Level::DEBUG, skip_all, fields(%execution_id, %scheduled_at, %corresponding_version))]
864async fn update_state_pending_after_response_appended(
865    tx: &Transaction<'_>,
866    execution_id: &ExecutionId,
867    scheduled_at: DateTime<Utc>,     // Changing to state PendingAt
868    corresponding_version: &Version, // t_execution_log is not changed
869    component_input_digest: InputContentDigest,
870) -> Result<AppendNotifier, DbErrorWrite> {
871    debug!("Setting t_state to Pending(`{scheduled_at:?}`) after response appended");
872
873    // Convert types for Postgres arguments
874    let execution_id_str = execution_id.to_string();
875    let version = i64::from(corresponding_version.0);
876
877    let updated = tx
878        .execute(
879            r"
880            UPDATE t_state
881            SET
882                corresponding_version = $1,
883                pending_expires_finished = $2,
884                state = $3,
885                updated_at = CURRENT_TIMESTAMP,
886
887                last_lock_version = NULL,
888
889                join_set_id = NULL,
890                join_set_closing = NULL,
891
892                result_kind = NULL
893            WHERE execution_id = $4
894            ",
895            &[
896                &version,          // $1
897                &scheduled_at,     // $2
898                &STATE_PENDING_AT, // $3
899                &execution_id_str, // $4
900            ],
901        )
902        .await?;
903
904    if updated == 0 {
905        return Err(DbErrorWrite::NotFound);
906    }
907
908    Ok(AppendNotifier {
909        pending_at: Some(NotifierPendingAt {
910            scheduled_at,
911            ffqn: fetch_created_event(tx, execution_id).await?.ffqn,
912            component_input_digest,
913        }),
914        execution_finished: None,
915        response: None,
916    })
917}
918
919#[instrument(level = Level::DEBUG, skip_all, fields(%execution_id, %scheduled_at, %appending_version))]
920async fn update_state_pending_after_event_appended(
921    tx: &Transaction<'_>,
922    execution_id: &ExecutionId,
923    appending_version: &Version,
924    scheduled_at: DateTime<Utc>,
925    intermittent_failure: bool,
926    component_input_digest: InputContentDigest,
927) -> Result<(AppendResponse, AppendNotifier), DbErrorWrite> {
928    debug!("Setting t_state to Pending(`{scheduled_at:?}`) after event appended");
929
930    let intermittent_delta = i64::from(intermittent_failure); // 0 or 1
931
932    let updated = tx
933        .execute(
934            r"
935            UPDATE t_state
936            SET
937                corresponding_version = $1,
938                pending_expires_finished = $2,
939                state = $3,
940                updated_at = CURRENT_TIMESTAMP,
941                intermittent_event_count = intermittent_event_count + $4,
942
943                last_lock_version = NULL,
944
945                join_set_id = NULL,
946                join_set_closing = NULL,
947
948                result_kind = NULL
949            WHERE execution_id = $5;
950            ",
951            &[
952                &i64::from(appending_version.0), // $1
953                &scheduled_at,                   // $2
954                &STATE_PENDING_AT,               // $3
955                &intermittent_delta,             // $4
956                &execution_id.to_string(),       // $5
957            ],
958        )
959        .await?;
960
961    if updated != 1 {
962        return Err(DbErrorWrite::NotFound);
963    }
964
965    Ok((
966        appending_version.increment(),
967        AppendNotifier {
968            pending_at: Some(NotifierPendingAt {
969                scheduled_at,
970                ffqn: fetch_created_event(tx, execution_id).await?.ffqn,
971                component_input_digest,
972            }),
973            execution_finished: None,
974            response: None,
975        },
976    ))
977}
978
979async fn update_state_locked_get_intermittent_event_count(
980    tx: &Transaction<'_>,
981    execution_id: &ExecutionId,
982    executor_id: ExecutorId,
983    run_id: RunId,
984    lock_expires_at: DateTime<Utc>,
985    appending_version: &Version,
986    retry_config: ComponentRetryConfig,
987) -> Result<u32, DbErrorWrite> {
988    debug!("Setting t_state to Locked(`{lock_expires_at:?}`)");
989    let backoff_millis = i64::try_from(retry_config.retry_exp_backoff.as_millis())
990        .map_err(|_| DbErrorGeneric::Uncategorized("backoff too big".into()))?; // BIGINT = i64
991
992    let execution_id_str = execution_id.to_string();
993
994    let updated = tx
995        .execute(
996            r"
997            UPDATE t_state
998            SET
999                corresponding_version = $1,
1000                pending_expires_finished = $2,
1001                state = $3,
1002                updated_at = CURRENT_TIMESTAMP,
1003
1004                max_retries = $4,
1005                retry_exp_backoff_millis = $5,
1006                last_lock_version = $1, -- appending_version again
1007                executor_id = $6,
1008                run_id = $7,
1009
1010                join_set_id = NULL,
1011                join_set_closing = NULL,
1012
1013                result_kind = NULL
1014            WHERE execution_id = $8
1015            ",
1016            &[
1017                &i64::from(appending_version.0),          // $1
1018                &lock_expires_at,                         // $2
1019                &STATE_LOCKED,                            // $3
1020                &retry_config.max_retries.map(i64::from), // $4
1021                &backoff_millis,                          // $5
1022                &executor_id.to_string(),                 // $6
1023                &run_id.to_string(),                      // $7
1024                &execution_id_str,                        // $8
1025            ],
1026        )
1027        .await?;
1028
1029    if updated != 1 {
1030        return Err(DbErrorWrite::NotFound);
1031    }
1032
1033    // fetch intermittent event count
1034    let row = tx
1035        .query_one(
1036            "SELECT intermittent_event_count FROM t_state WHERE execution_id = $1",
1037            &[&execution_id_str],
1038        )
1039        .await
1040        .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1041
1042    let count: i64 = get!(row, "intermittent_event_count"); // Postgres BIGINT
1043    let count = u32::try_from(count)
1044        .map_err(|_| consistency_db_err("`intermittent_event_count` must not be negative"))?;
1045    Ok(count)
1046}
1047
1048async fn update_state_blocked(
1049    tx: &Transaction<'_>,
1050    execution_id: &ExecutionId,
1051    appending_version: &Version,
1052    join_set_id: &JoinSetId,
1053    lock_expires_at: DateTime<Utc>,
1054    join_set_closing: bool,
1055) -> Result<AppendResponse, DbErrorWrite> {
1056    debug!("Setting t_state to BlockedByJoinSet(`{join_set_id}`)");
1057
1058    let updated = tx
1059        .execute(
1060            r"
1061            UPDATE t_state
1062            SET
1063                corresponding_version = $1,
1064                pending_expires_finished = $2,
1065                state = $3,
1066                updated_at = CURRENT_TIMESTAMP,
1067
1068                last_lock_version = NULL,
1069
1070                join_set_id = $4,
1071                join_set_closing = $5,
1072
1073                result_kind = NULL
1074            WHERE execution_id = $6
1075            ",
1076            &[
1077                &i64::from(appending_version.0), // $1
1078                &lock_expires_at,                // $2
1079                &STATE_BLOCKED_BY_JOIN_SET,      // $3
1080                &join_set_id.to_string(),        // $4
1081                &join_set_closing,               // $5 (BOOLEAN)
1082                &execution_id.to_string(),       // $6
1083            ],
1084        )
1085        .await?;
1086
1087    if updated != 1 {
1088        return Err(DbErrorWrite::NotFound);
1089    }
1090    Ok(appending_version.increment())
1091}
1092
1093async fn update_state_finished(
1094    tx: &Transaction<'_>,
1095    execution_id: &ExecutionId,
1096    appending_version: &Version,
1097    finished_at: DateTime<Utc>,
1098    result_kind: PendingStateFinishedResultKind,
1099) -> Result<(), DbErrorWrite> {
1100    debug!("Setting t_state to Finished");
1101
1102    let result_kind_json = Json(result_kind);
1103
1104    let updated = tx
1105        .execute(
1106            r"
1107            UPDATE t_state
1108            SET
1109                corresponding_version = $1,
1110                pending_expires_finished = $2,
1111                state = $3,
1112                updated_at = CURRENT_TIMESTAMP,
1113
1114                last_lock_version = NULL,
1115                executor_id = NULL,
1116                run_id = NULL,
1117
1118                join_set_id = NULL,
1119                join_set_closing = NULL,
1120
1121                result_kind = $4
1122            WHERE execution_id = $5
1123            ",
1124            &[
1125                &i64::from(appending_version.0), // $1
1126                &finished_at,                    // $2
1127                &STATE_FINISHED,                 // $3
1128                &result_kind_json,               // $4
1129                &execution_id.to_string(),       // $5
1130            ],
1131        )
1132        .await?;
1133
1134    if updated != 1 {
1135        return Err(DbErrorWrite::NotFound);
1136    }
1137    Ok(())
1138}
1139
1140#[instrument(level = Level::TRACE, skip_all, fields(%execution_id, %appending_version))]
1141async fn bump_state_next_version(
1142    tx: &Transaction<'_>,
1143    execution_id: &ExecutionId,
1144    appending_version: &Version,
1145    delay_req: Option<DelayReq>,
1146) -> Result<AppendResponse, DbErrorWrite> {
1147    debug!("update_index_version");
1148    let execution_id_str = execution_id.to_string();
1149
1150    let updated = tx
1151        .execute(
1152            r"
1153            UPDATE t_state
1154            SET
1155                corresponding_version = $1,
1156                updated_at = CURRENT_TIMESTAMP
1157            WHERE execution_id = $2
1158            ",
1159            &[
1160                &i64::from(appending_version.0), // $1
1161                &execution_id_str,               // $2
1162            ],
1163        )
1164        .await?;
1165
1166    if updated != 1 {
1167        return Err(DbErrorWrite::NotFound);
1168    }
1169
1170    if let Some(DelayReq {
1171        join_set_id,
1172        delay_id,
1173        expires_at,
1174    }) = delay_req
1175    {
1176        debug!("Inserting delay to `t_delay`");
1177        tx.execute(
1178            "INSERT INTO t_delay (execution_id, join_set_id, delay_id, expires_at) VALUES ($1, $2, $3, $4)",
1179            &[
1180                &execution_id_str,
1181                &join_set_id.to_string(),
1182                &delay_id.to_string(),
1183                &expires_at,
1184            ],
1185        )
1186        .await?;
1187    }
1188    Ok(appending_version.increment())
1189}
1190
1191async fn get_combined_state(
1192    tx: &Transaction<'_>,
1193    execution_id: &ExecutionId,
1194) -> Result<CombinedState, DbErrorRead> {
1195    let row = tx
1196        .query_one(
1197            r"
1198            SELECT
1199                state, ffqn, component_id_input_digest, corresponding_version, pending_expires_finished,
1200                last_lock_version, executor_id, run_id,
1201                join_set_id, join_set_closing,
1202                result_kind
1203            FROM t_state
1204            WHERE execution_id = $1
1205            ",
1206            &[&execution_id.to_string()],
1207        )
1208        .await
1209        .map_err(DbErrorRead::from)?;
1210
1211    // Parsing columns
1212    let digest_bytes: Vec<u8> = get!(row, "component_id_input_digest");
1213    let digest = Digest::try_from(digest_bytes.as_slice())
1214        .map_err(|err| consistency_db_err(err.to_string()))?;
1215    let component_id_input_digest = InputContentDigest(ContentDigest(digest));
1216
1217    let state: String = get!(row, "state");
1218    let ffqn: String = get!(row, "ffqn");
1219    let pending_expires_finished: DateTime<Utc> = get!(row, "pending_expires_finished");
1220
1221    let last_lock_version_raw: Option<i64> = get!(row, "last_lock_version");
1222    let last_lock_version = last_lock_version_raw
1223        .map(Version::try_from)
1224        .transpose()
1225        .map_err(|_| consistency_db_err("version must be non-negative"))?;
1226
1227    let executor_id_raw: Option<String> = get!(row, "executor_id");
1228    let executor_id = executor_id_raw
1229        .map(|id| ExecutorId::from_str(&id))
1230        .transpose()
1231        .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1232
1233    let run_id_raw: Option<String> = get!(row, "run_id");
1234    let run_id = run_id_raw
1235        .map(|id| RunId::from_str(&id))
1236        .transpose()
1237        .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1238
1239    let join_set_id_raw: Option<String> = get!(row, "join_set_id");
1240    let join_set_id = join_set_id_raw
1241        .map(|id| JoinSetId::from_str(&id))
1242        .transpose()
1243        .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1244
1245    let join_set_closing: Option<bool> = get!(row, "join_set_closing");
1246
1247    let result_kind: Option<Json<PendingStateFinishedResultKind>> = get!(row, "result_kind");
1248    let result_kind = result_kind.map(|it| it.0);
1249
1250    let corresponding_version = get!(row, "corresponding_version", i64);
1251    let corresponding_version = Version::new(
1252        VersionType::try_from(corresponding_version)
1253            .map_err(|_| consistency_db_err("version must be non-negative"))?,
1254    );
1255
1256    let dto = CombinedStateDTO {
1257        state,
1258        ffqn,
1259        component_id_input_digest,
1260        pending_expires_finished,
1261        last_lock_version,
1262        executor_id,
1263        run_id,
1264        join_set_id,
1265        join_set_closing,
1266        result_kind,
1267    };
1268    CombinedState::new(dto, corresponding_version).map_err(DbErrorRead::from)
1269}
1270
1271async fn list_executions(
1272    read_tx: &Transaction<'_>,
1273    ffqn: Option<&FunctionFqn>,
1274    top_level_only: bool,
1275    pagination: &ExecutionListPagination,
1276) -> Result<Vec<ExecutionWithState>, DbErrorGeneric> {
1277    // Helper to manage dynamic WHERE clauses and positional parameters ($1, $2...)
1278    struct QueryBuilder {
1279        where_clauses: Vec<String>,
1280        params: Vec<Box<dyn ToSql + Send + Sync>>,
1281    }
1282
1283    impl QueryBuilder {
1284        fn new() -> Self {
1285            Self {
1286                where_clauses: Vec::new(),
1287                params: Vec::new(),
1288            }
1289        }
1290
1291        fn add_param<T>(&mut self, param: T) -> String
1292        where
1293            T: ToSql + Sync + Send + 'static,
1294        {
1295            self.params.push(Box::new(param));
1296            format!("${}", self.params.len())
1297        }
1298
1299        fn add_where(&mut self, clause: String) {
1300            self.where_clauses.push(clause);
1301        }
1302    }
1303
1304    let mut qb = QueryBuilder::new();
1305
1306    // 1. Pagination Logic
1307    let (limit, limit_desc) = match pagination {
1308        ExecutionListPagination::CreatedBy(p) => {
1309            let limit = p.length();
1310            let is_desc = p.is_desc();
1311            if let Some(cursor) = p.cursor() {
1312                let placeholder = qb.add_param(*cursor);
1313                qb.add_where(format!("created_at {} {}", p.rel(), placeholder));
1314            }
1315            (limit, is_desc)
1316        }
1317        ExecutionListPagination::ExecutionId(p) => {
1318            let limit = p.length();
1319            let is_desc = p.is_desc();
1320            if let Some(cursor) = p.cursor() {
1321                let placeholder = qb.add_param(cursor.to_string());
1322                qb.add_where(format!("execution_id {} {}", p.rel(), placeholder));
1323            }
1324            (limit, is_desc)
1325        }
1326    };
1327
1328    // 2. Top Level Filter
1329    if top_level_only {
1330        qb.add_where("is_top_level = true".to_string());
1331    }
1332
1333    // 3. FFQN Filter
1334    if let Some(ffqn) = ffqn {
1335        let placeholder = qb.add_param(ffqn.to_string());
1336        qb.add_where(format!("ffqn = {placeholder}"));
1337    }
1338
1339    // 4. Construct Query
1340    let where_str = if qb.where_clauses.is_empty() {
1341        String::new()
1342    } else {
1343        format!("WHERE {}", qb.where_clauses.join(" AND "))
1344    };
1345
1346    let order_col = match pagination {
1347        ExecutionListPagination::CreatedBy(_) => "created_at",
1348        ExecutionListPagination::ExecutionId(_) => "execution_id",
1349    };
1350
1351    let desc_str = if limit_desc { "DESC" } else { "" };
1352
1353    let sql = format!(
1354        r"
1355            SELECT created_at, first_scheduled_at, component_id_input_digest,
1356            state, execution_id, ffqn, corresponding_version, pending_expires_finished,
1357            last_lock_version, executor_id, run_id,
1358            join_set_id, join_set_closing,
1359            result_kind
1360            FROM t_state {where_str} ORDER BY {order_col} {desc_str} LIMIT {limit}
1361            "
1362    );
1363
1364    let params_refs: Vec<&(dyn ToSql + Sync)> = qb
1365        .params
1366        .iter()
1367        .map(|p| p.as_ref() as &(dyn ToSql + Sync))
1368        .collect();
1369
1370    let rows = read_tx
1371        .query(&sql, &params_refs)
1372        .await
1373        .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1374
1375    let mut vec = Vec::with_capacity(rows.len());
1376
1377    for row in rows {
1378        // If parsing of the row fails, log and skip it.
1379        let unpack = || -> Result<ExecutionWithState, DbErrorGeneric> {
1380            let execution_id_str: String = get!(row, "execution_id");
1381            let execution_id = ExecutionId::from_str(&execution_id_str)
1382                .map_err(|err| consistency_db_err(err.to_string()))?;
1383
1384            let digest_bytes: Vec<u8> = get!(row, "component_id_input_digest");
1385            let digest = Digest::try_from(digest_bytes.as_slice())
1386                .map_err(|err| consistency_db_err(err.to_string()))?;
1387            let component_id_input_digest = InputContentDigest(ContentDigest(digest));
1388
1389            let created_at: DateTime<Utc> = get!(row, "created_at");
1390            let first_scheduled_at: DateTime<Utc> = get!(row, "first_scheduled_at");
1391
1392            let result_kind: Option<Json<PendingStateFinishedResultKind>> =
1393                get!(row, "result_kind");
1394            let result_kind = result_kind.map(|it| it.0);
1395
1396            let corresponding_version: i64 = get!(row, "corresponding_version");
1397            let corresponding_version = Version::try_from(corresponding_version)
1398                .map_err(|_| consistency_db_err("version must be non-negative"))?;
1399
1400            let executor_id_str: Option<String> = get!(row, "executor_id");
1401            let executor_id = executor_id_str
1402                .map(|id| ExecutorId::from_str(&id))
1403                .transpose()
1404                .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1405
1406            let last_lock_version_raw: Option<i64> = get!(row, "last_lock_version");
1407            let last_lock_version = last_lock_version_raw
1408                .map(Version::try_from)
1409                .transpose()
1410                .map_err(|_| consistency_db_err("version must be non-negative"))?;
1411
1412            let run_id_str: Option<String> = get!(row, "run_id");
1413            let run_id = run_id_str
1414                .map(|id| RunId::from_str(&id))
1415                .transpose()
1416                .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1417
1418            let join_set_id_str: Option<String> = get!(row, "join_set_id");
1419            let join_set_id = join_set_id_str
1420                .map(|id| JoinSetId::from_str(&id))
1421                .transpose()
1422                .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1423
1424            let combined_state_dto = CombinedStateDTO {
1425                component_id_input_digest: component_id_input_digest.clone(),
1426                state: get!(row, "state"),
1427                ffqn: get!(row, "ffqn"),
1428                pending_expires_finished: get!(row, "pending_expires_finished"),
1429                executor_id,
1430                last_lock_version,
1431                run_id,
1432                join_set_id,
1433                join_set_closing: get!(row, "join_set_closing"),
1434                result_kind,
1435            };
1436
1437            let combined_state = CombinedState::new(combined_state_dto, corresponding_version)?;
1438
1439            Ok(ExecutionWithState {
1440                execution_id,
1441                ffqn: combined_state.ffqn,
1442                pending_state: combined_state.pending_state,
1443                created_at,
1444                first_scheduled_at,
1445                component_digest: component_id_input_digest,
1446            })
1447        };
1448
1449        match unpack() {
1450            Ok(execution) => vec.push(execution),
1451            Err(err) => {
1452                warn!("Skipping corrupted row in t_state: {err:?}");
1453            }
1454        }
1455    }
1456
1457    if !limit_desc {
1458        // the list must be sorted in descending order
1459        vec.reverse();
1460    }
1461    Ok(vec)
1462}
1463
1464async fn list_responses(
1465    tx: &Transaction<'_>,
1466    execution_id: &ExecutionId,
1467    pagination: Option<Pagination<u32>>,
1468) -> Result<Vec<ResponseWithCursor>, DbErrorRead> {
1469    // Helper to manage params dynamically
1470    let mut params: Vec<Box<dyn ToSql + Send + Sync>> = Vec::new();
1471    let mut add_param = |p: Box<dyn ToSql + Send + Sync>| {
1472        params.push(p);
1473        format!("${}", params.len())
1474    };
1475
1476    // 1. Base Query
1477    let p_execution_id = add_param(Box::new(execution_id.to_string()));
1478
1479    let mut sql = format!(
1480        "SELECT \
1481            r.id, r.created_at, r.join_set_id, r.delay_id, r.delay_success, r.child_execution_id, r.finished_version, l.json_value \
1482            FROM t_join_set_response r LEFT OUTER JOIN t_execution_log l ON r.child_execution_id = l.execution_id \
1483            WHERE \
1484            r.execution_id = {p_execution_id} \
1485            AND ( r.finished_version = l.version OR r.child_execution_id IS NULL )"
1486    );
1487
1488    // 2. Pagination Logic
1489    let limit = match &pagination {
1490        Some(p @ (Pagination::NewerThan { cursor, .. } | Pagination::OlderThan { cursor, .. })) => {
1491            // Postgres BIGINT is i64.
1492            let p_cursor = add_param(Box::new(i64::from(*cursor)));
1493
1494            // Add WHERE clause for cursor
1495            write!(sql, " AND r.id {} {}", p.rel(), p_cursor).unwrap();
1496
1497            Some(p.length())
1498        }
1499        None => None,
1500    };
1501
1502    // 3. Ordering
1503    sql.push_str(" ORDER BY r.id");
1504    if pagination.as_ref().is_some_and(Pagination::is_desc) {
1505        sql.push_str(" DESC");
1506    }
1507
1508    // 4. Limit
1509    if let Some(limit) = limit {
1510        // Postgres limit expects i64
1511        let p_limit = add_param(Box::new(i64::from(limit)));
1512        write!(sql, " LIMIT {p_limit}").unwrap();
1513    }
1514
1515    let params_refs: Vec<&(dyn ToSql + Sync)> = params
1516        .iter()
1517        .map(|p| p.as_ref() as &(dyn ToSql + Sync))
1518        .collect();
1519
1520    let rows = tx
1521        .query(&sql, &params_refs)
1522        .await
1523        .map_err(DbErrorRead::from)?;
1524
1525    let mut results = Vec::with_capacity(rows.len());
1526    for row in rows {
1527        results.push(parse_response_with_cursor(&row)?);
1528    }
1529
1530    Ok(results)
1531}
1532
1533fn parse_response_with_cursor(
1534    row: &tokio_postgres::Row,
1535) -> Result<ResponseWithCursor, DbErrorRead> {
1536    // Postgres BIGINT = i64.
1537    let id = u32::try_from(get!(row, "id", i64))
1538        .map_err(|_| consistency_db_err("id must not be negative"))?;
1539
1540    let created_at: DateTime<Utc> = get!(row, "created_at");
1541    let join_set_id_str: String = get!(row, "join_set_id");
1542    let join_set_id = JoinSetId::from_str(&join_set_id_str)
1543        .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1544
1545    // Extract Optionals
1546    let delay_id: Option<String> = get!(row, "delay_id");
1547    let delay_id = delay_id
1548        .map(|id| DelayId::from_str(&id))
1549        .transpose()
1550        .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1551    let delay_success: Option<bool> = get!(row, "delay_success");
1552    let child_execution_id: Option<String> = get!(row, "child_execution_id");
1553    let child_execution_id = child_execution_id
1554        .map(|id| ExecutionIdDerived::from_str(&id))
1555        .transpose()
1556        .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1557    let finished_version = get!(row, "finished_version", Option<i64>)
1558        .map(Version::try_from)
1559        .transpose()
1560        .map_err(|_| consistency_db_err("version must be non-negative"))?;
1561    let json_value: Option<Json<ExecutionRequest>> = get!(row, "json_value");
1562    let json_value = json_value.map(|it| it.0);
1563
1564    let event = match (
1565        delay_id,
1566        delay_success,
1567        child_execution_id,
1568        finished_version,
1569        json_value,
1570    ) {
1571        (Some(delay_id), Some(delay_success), None, None, None) => JoinSetResponse::DelayFinished {
1572            delay_id,
1573            result: delay_success.then_some(()).ok_or(()),
1574        },
1575        (None, None, Some(child_execution_id), Some(finished_version), Some(json_val)) => {
1576            if let ExecutionRequest::Finished { result, .. } = json_val {
1577                JoinSetResponse::ChildExecutionFinished {
1578                    child_execution_id,
1579                    finished_version,
1580                    result,
1581                }
1582            } else {
1583                error!("Joined log entry must be 'Finished'");
1584                return Err(consistency_db_err("joined log entry must be 'Finished'").into());
1585            }
1586        }
1587        (delay, delay_success, child, finished, result) => {
1588            error!(
1589                "Invalid row in t_join_set_response {id} - {delay:?} {delay_success:?} {child:?} {finished:?} {result:?}",
1590            );
1591            return Err(consistency_db_err("invalid row in t_join_set_response").into());
1592        }
1593    };
1594
1595    Ok(ResponseWithCursor {
1596        cursor: id,
1597        event: JoinSetResponseEventOuter {
1598            event: JoinSetResponseEvent { join_set_id, event },
1599            created_at,
1600        },
1601    })
1602}
1603
1604#[instrument(level = Level::TRACE, skip_all, fields(%execution_id, %run_id, %executor_id))]
1605#[expect(clippy::too_many_arguments)]
1606async fn lock_single_execution(
1607    tx: &Transaction<'_>,
1608    created_at: DateTime<Utc>,
1609    component_id: ComponentId,
1610    execution_id: &ExecutionId,
1611    run_id: RunId,
1612    appending_version: &Version,
1613    executor_id: ExecutorId,
1614    lock_expires_at: DateTime<Utc>,
1615    retry_config: ComponentRetryConfig,
1616) -> Result<LockedExecution, DbErrorWrite> {
1617    debug!("lock_single_execution");
1618
1619    // 1. Check State
1620    let combined_state = get_combined_state(tx, execution_id).await?;
1621    combined_state.pending_state.can_append_lock(
1622        created_at,
1623        executor_id,
1624        run_id,
1625        lock_expires_at,
1626    )?;
1627    let expected_version = combined_state.get_next_version_assert_not_finished();
1628    check_expected_next_and_appending_version(&expected_version, appending_version)?;
1629
1630    // 2. Prepare Event
1631    let locked_event = Locked {
1632        component_id,
1633        executor_id,
1634        lock_expires_at,
1635        run_id,
1636        retry_config,
1637    };
1638    let event = ExecutionRequest::Locked(locked_event.clone());
1639
1640    let event = Json(event);
1641
1642    // 3. Append to execution_log
1643    tx.execute(
1644        "INSERT INTO t_execution_log \
1645            (execution_id, created_at, json_value, version, variant) \
1646            VALUES ($1, $2, $3, $4, $5)",
1647        &[
1648            &execution_id.to_string(),
1649            &created_at,
1650            &event,
1651            &i64::from(appending_version.0),
1652            &event.0.variant(),
1653        ],
1654    )
1655    .await
1656    .map_err(|err| {
1657        warn!("Cannot lock execution - {err:?}");
1658        DbErrorWrite::NonRetriable(DbErrorWriteNonRetriable::IllegalState("cannot lock".into()))
1659    })?;
1660
1661    // 4. Update t_state
1662    let responses_dto = list_responses(tx, execution_id, None).await?;
1663    let responses = responses_dto.into_iter().map(|resp| resp.event).collect();
1664    trace!("Responses: {responses:?}");
1665
1666    let intermittent_event_count = update_state_locked_get_intermittent_event_count(
1667        tx,
1668        execution_id,
1669        executor_id,
1670        run_id,
1671        lock_expires_at,
1672        appending_version,
1673        retry_config,
1674    )
1675    .await?;
1676
1677    // 5. Fetch History
1678    // Fetch event_history and `Created` event.
1679    let rows = tx
1680        .query(
1681            "SELECT json_value, version FROM t_execution_log WHERE \
1682                execution_id = $1 AND (variant = $2 OR variant = $3) \
1683                ORDER BY version",
1684            &[
1685                &execution_id.to_string(),
1686                &DUMMY_CREATED.variant(),
1687                &DUMMY_HISTORY_EVENT.variant(),
1688            ],
1689        )
1690        .await
1691        .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1692
1693    let mut events: VecDeque<ExecutionEvent> = VecDeque::new();
1694
1695    for row in rows {
1696        let event: Json<ExecutionRequest> = get!(row, "json_value");
1697        let event = event.0;
1698
1699        let version: i64 = get!(row, "version");
1700        let version = Version::try_from(version)
1701            .map_err(|_| consistency_db_err("version must be non-negative"))?;
1702
1703        events.push_back(ExecutionEvent {
1704            created_at: DateTime::from_timestamp_nanos(0), // not used, only the inner event and version
1705            event,
1706            backtrace_id: None,
1707            version,
1708        });
1709    }
1710
1711    // 6. Extract Created Event
1712    let Some(ExecutionRequest::Created {
1713        ffqn,
1714        params,
1715        parent,
1716        metadata,
1717        ..
1718    }) = events.pop_front().map(|outer| outer.event)
1719    else {
1720        error!("Execution log must contain at least `Created` event");
1721        return Err(consistency_db_err("execution log must contain `Created` event").into());
1722    };
1723
1724    // 7. Extract History Events
1725    let mut event_history = Vec::new();
1726    for ExecutionEvent { event, version, .. } in events {
1727        if let ExecutionRequest::HistoryEvent { event } = event {
1728            event_history.push((event, version));
1729        } else {
1730            error!("Rows can only contain `Created` and `HistoryEvent` event kinds");
1731            return Err(consistency_db_err(
1732                "rows can only contain `Created` and `HistoryEvent` event kinds",
1733            )
1734            .into());
1735        }
1736    }
1737
1738    Ok(LockedExecution {
1739        execution_id: execution_id.clone(),
1740        metadata,
1741        next_version: appending_version.increment(),
1742        ffqn,
1743        params,
1744        event_history,
1745        responses,
1746        parent,
1747        intermittent_event_count,
1748        locked_event,
1749    })
1750}
1751
1752async fn count_join_next(
1753    tx: &Transaction<'_>,
1754    execution_id: &ExecutionId,
1755    join_set_id: &JoinSetId,
1756) -> Result<u32, DbErrorRead> {
1757    let row = tx
1758            .query_one(
1759                "SELECT COUNT(*) as count FROM t_execution_log WHERE execution_id = $1 AND join_set_id = $2 \
1760                AND history_event_type = $3",
1761                &[
1762                    &execution_id.to_string(),
1763                    &join_set_id.to_string(),
1764                    &HISTORY_EVENT_TYPE_JOIN_NEXT,
1765                ],
1766            )
1767            .await
1768            .map_err(DbErrorRead::from)?;
1769
1770    let count = u32::try_from(get!(row, "count", i64)).expect("COUNT cannot be negative");
1771    Ok(count)
1772}
1773
1774async fn nth_response(
1775    tx: &Transaction<'_>,
1776    execution_id: &ExecutionId,
1777    join_set_id: &JoinSetId,
1778    skip_rows: u32,
1779) -> Result<Option<ResponseWithCursor>, DbErrorRead> {
1780    let row = tx
1781            .query_opt(
1782                "SELECT r.id, r.created_at, r.join_set_id, \
1783                 r.delay_id, r.delay_success, \
1784                 r.child_execution_id, r.finished_version, l.json_value \
1785                 FROM t_join_set_response r LEFT OUTER JOIN t_execution_log l ON r.child_execution_id = l.execution_id \
1786                 WHERE \
1787                 r.execution_id = $1 AND r.join_set_id = $2 AND \
1788                 ( \
1789                 r.finished_version = l.version \
1790                 OR \
1791                 r.child_execution_id IS NULL \
1792                 ) \
1793                 ORDER BY id \
1794                 LIMIT 1 OFFSET $3",
1795                 &[
1796                     &execution_id.to_string(),
1797                     &join_set_id.to_string(),
1798                     &i64::from(skip_rows),
1799                 ]
1800            )
1801            .await
1802            .map_err(DbErrorRead::from)?;
1803
1804    match row {
1805        Some(r) => Ok(Some(parse_response_with_cursor(&r)?)),
1806        None => Ok(None),
1807    }
1808}
1809
1810#[instrument(level = Level::TRACE, skip_all, fields(%execution_id))]
1811async fn append(
1812    tx: &Transaction<'_>,
1813    execution_id: &ExecutionId,
1814    req: AppendRequest,
1815    appending_version: Version,
1816) -> Result<(AppendResponse, AppendNotifier), DbErrorWrite> {
1817    if matches!(req.event, ExecutionRequest::Created { .. }) {
1818        return Err(DbErrorWrite::NonRetriable(
1819            DbErrorWriteNonRetriable::ValidationFailed(
1820                "cannot append `Created` event - use `create` instead".into(),
1821            ),
1822        ));
1823    }
1824
1825    if let AppendRequest {
1826        event:
1827            ExecutionRequest::Locked(Locked {
1828                component_id,
1829                executor_id,
1830                run_id,
1831                lock_expires_at,
1832                retry_config,
1833            }),
1834        created_at,
1835    } = req
1836    {
1837        return lock_single_execution(
1838            tx,
1839            created_at,
1840            component_id,
1841            execution_id,
1842            run_id,
1843            &appending_version,
1844            executor_id,
1845            lock_expires_at,
1846            retry_config,
1847        )
1848        .await
1849        .map(|locked_execution| (locked_execution.next_version, AppendNotifier::default()));
1850    }
1851
1852    let combined_state = get_combined_state(tx, execution_id).await?;
1853    if combined_state.pending_state.is_finished() {
1854        debug!("Execution is already finished");
1855        return Err(DbErrorWrite::NonRetriable(
1856            DbErrorWriteNonRetriable::IllegalState("already finished".into()),
1857        ));
1858    }
1859
1860    check_expected_next_and_appending_version(
1861        &combined_state.get_next_version_assert_not_finished(),
1862        &appending_version,
1863    )?;
1864
1865    let event = Json(req.event);
1866
1867    // Insert into t_execution_log
1868    tx.execute(
1869            "INSERT INTO t_execution_log (execution_id, created_at, json_value, version, variant, join_set_id) \
1870             VALUES ($1, $2, $3, $4, $5, $6)",
1871            &[
1872                &execution_id.to_string(),
1873                &req.created_at,
1874                &event,
1875                &i64::from(appending_version.0),
1876                &event.0.variant(),
1877                &event.0.join_set_id().map(std::string::ToString::to_string),
1878            ],
1879        )
1880        .await?;
1881
1882    // Calculate current pending state
1883    match &event.0 {
1884        ExecutionRequest::Created { .. } => {
1885            unreachable!("handled in the caller")
1886        }
1887
1888        ExecutionRequest::Locked { .. } => {
1889            unreachable!("handled above")
1890        }
1891
1892        ExecutionRequest::TemporarilyFailed {
1893            backoff_expires_at, ..
1894        }
1895        | ExecutionRequest::TemporarilyTimedOut {
1896            backoff_expires_at, ..
1897        } => {
1898            let (next_version, notifier) = update_state_pending_after_event_appended(
1899                tx,
1900                execution_id,
1901                &appending_version,
1902                *backoff_expires_at,
1903                true, // an intermittent failure
1904                combined_state
1905                    .pending_state
1906                    .get_component_id_input_digest()
1907                    .clone(),
1908            )
1909            .await?;
1910            return Ok((next_version, notifier));
1911        }
1912
1913        ExecutionRequest::Unlocked {
1914            backoff_expires_at, ..
1915        } => {
1916            let (next_version, notifier) = update_state_pending_after_event_appended(
1917                tx,
1918                execution_id,
1919                &appending_version,
1920                *backoff_expires_at,
1921                false, // not an intermittent failure
1922                combined_state
1923                    .pending_state
1924                    .get_component_id_input_digest()
1925                    .clone(),
1926            )
1927            .await?;
1928            return Ok((next_version, notifier));
1929        }
1930
1931        ExecutionRequest::Finished { result, .. } => {
1932            update_state_finished(
1933                tx,
1934                execution_id,
1935                &appending_version,
1936                req.created_at,
1937                PendingStateFinishedResultKind::from(result),
1938            )
1939            .await?;
1940            return Ok((
1941                appending_version,
1942                AppendNotifier {
1943                    pending_at: None,
1944                    execution_finished: Some(NotifierExecutionFinished {
1945                        execution_id: execution_id.clone(),
1946                        retval: result.clone(),
1947                    }),
1948                    response: None,
1949                },
1950            ));
1951        }
1952
1953        ExecutionRequest::HistoryEvent {
1954            event:
1955                HistoryEvent::JoinSetCreate { .. }
1956                | HistoryEvent::JoinSetRequest {
1957                    request: JoinSetRequest::ChildExecutionRequest { .. },
1958                    ..
1959                }
1960                | HistoryEvent::Persist { .. }
1961                | HistoryEvent::Schedule { .. }
1962                | HistoryEvent::Stub { .. }
1963                | HistoryEvent::JoinNextTooMany { .. },
1964        } => {
1965            return Ok((
1966                bump_state_next_version(tx, execution_id, &appending_version, None).await?,
1967                AppendNotifier::default(),
1968            ));
1969        }
1970
1971        ExecutionRequest::HistoryEvent {
1972            event:
1973                HistoryEvent::JoinSetRequest {
1974                    join_set_id,
1975                    request:
1976                        JoinSetRequest::DelayRequest {
1977                            delay_id,
1978                            expires_at,
1979                            ..
1980                        },
1981                },
1982        } => {
1983            return Ok((
1984                bump_state_next_version(
1985                    tx,
1986                    execution_id,
1987                    &appending_version,
1988                    Some(DelayReq {
1989                        join_set_id: join_set_id.clone(),
1990                        delay_id: delay_id.clone(),
1991                        expires_at: *expires_at,
1992                    }),
1993                )
1994                .await?,
1995                AppendNotifier::default(),
1996            ));
1997        }
1998
1999        ExecutionRequest::HistoryEvent {
2000            event:
2001                HistoryEvent::JoinNext {
2002                    join_set_id,
2003                    run_expires_at,
2004                    closing,
2005                    requested_ffqn: _,
2006                },
2007        } => {
2008            // Did the response arrive already?
2009            let join_next_count = count_join_next(tx, execution_id, join_set_id).await?;
2010
2011            // Fetch the response corresponding to this JoinNext (skip n-1)
2012            let nth_response =
2013                nth_response(tx, execution_id, join_set_id, join_next_count - 1).await?;
2014
2015            trace!("join_next_count: {join_next_count}, nth_response: {nth_response:?}");
2016            assert!(join_next_count > 0);
2017
2018            if let Some(ResponseWithCursor {
2019                event:
2020                    JoinSetResponseEventOuter {
2021                        created_at: nth_created_at,
2022                        ..
2023                    },
2024                cursor: _,
2025            }) = nth_response
2026            {
2027                let scheduled_at = std::cmp::max(*run_expires_at, nth_created_at);
2028                let (next_version, notifier) = update_state_pending_after_event_appended(
2029                    tx,
2030                    execution_id,
2031                    &appending_version,
2032                    scheduled_at,
2033                    false, // not an intermittent failure
2034                    combined_state
2035                        .pending_state
2036                        .get_component_id_input_digest()
2037                        .clone(),
2038                )
2039                .await?;
2040                return Ok((next_version, notifier));
2041            }
2042
2043            return Ok((
2044                update_state_blocked(
2045                    tx,
2046                    execution_id,
2047                    &appending_version,
2048                    join_set_id,
2049                    *run_expires_at,
2050                    *closing,
2051                )
2052                .await?,
2053                AppendNotifier::default(),
2054            ));
2055        }
2056    }
2057}
2058
2059async fn append_response(
2060    tx: &Transaction<'_>,
2061    execution_id: &ExecutionId,
2062    response_outer: JoinSetResponseEventOuter,
2063) -> Result<AppendNotifier, DbErrorWrite> {
2064    let join_set_id = &response_outer.event.join_set_id;
2065
2066    let (delay_id, delay_success) = match &response_outer.event.event {
2067        JoinSetResponse::DelayFinished { delay_id, result } => {
2068            (Some(delay_id.to_string()), Some(result.is_ok()))
2069        }
2070        JoinSetResponse::ChildExecutionFinished { .. } => (None, None),
2071    };
2072
2073    let (child_execution_id, finished_version) = match &response_outer.event.event {
2074        JoinSetResponse::ChildExecutionFinished {
2075            child_execution_id,
2076            finished_version,
2077            result: _,
2078        } => (
2079            Some(child_execution_id.to_string()),
2080            Some(i64::from(finished_version.0)),
2081        ),
2082        JoinSetResponse::DelayFinished { .. } => (None, None),
2083    };
2084
2085    tx.execute(
2086            "INSERT INTO t_join_set_response (execution_id, created_at, join_set_id, delay_id, delay_success, child_execution_id, finished_version) \
2087             VALUES ($1, $2, $3, $4, $5, $6, $7)",
2088             &[
2089                 &execution_id.to_string(),
2090                 &response_outer.created_at,
2091                 &join_set_id.to_string(),
2092                 &delay_id,
2093                 &delay_success,
2094                 &child_execution_id,
2095                 &finished_version,
2096             ]
2097        ).await?;
2098
2099    // if the execution is going to be unblocked by this response...
2100    let combined_state = get_combined_state(tx, execution_id).await?;
2101    debug!("previous_pending_state: {combined_state:?}");
2102
2103    let mut notifier = if let PendingState::BlockedByJoinSet {
2104        join_set_id: found_join_set_id,
2105        lock_expires_at,
2106        closing: _,
2107        component_id_input_digest,
2108    } = combined_state.pending_state
2109        && *join_set_id == found_join_set_id
2110    {
2111        let scheduled_at = std::cmp::max(lock_expires_at, response_outer.created_at);
2112        // Unblock the state.
2113        update_state_pending_after_response_appended(
2114            tx,
2115            execution_id,
2116            scheduled_at,
2117            &combined_state.corresponding_version,
2118            component_id_input_digest,
2119        )
2120        .await?
2121    } else {
2122        AppendNotifier::default()
2123    };
2124
2125    if let JoinSetResponseEvent {
2126        join_set_id,
2127        event:
2128            JoinSetResponse::DelayFinished {
2129                delay_id,
2130                result: _,
2131            },
2132    } = &response_outer.event
2133    {
2134        debug!(%join_set_id, %delay_id, "Deleting from `t_delay`");
2135        tx.execute(
2136            "DELETE FROM t_delay WHERE execution_id = $1 AND join_set_id = $2 AND delay_id = $3",
2137            &[
2138                &execution_id.to_string(),
2139                &join_set_id.to_string(),
2140                &delay_id.to_string(),
2141            ],
2142        )
2143        .await?;
2144    }
2145
2146    notifier.response = Some((execution_id.clone(), response_outer));
2147    Ok(notifier)
2148}
2149
2150async fn append_backtrace(
2151    tx: &Transaction<'_>,
2152    backtrace_info: &BacktraceInfo,
2153) -> Result<(), DbErrorWrite> {
2154    let backtrace_json = Json(&backtrace_info.wasm_backtrace);
2155
2156    tx.execute(
2157            "INSERT INTO t_backtrace (execution_id, component_id, version_min_including, version_max_excluding, wasm_backtrace) \
2158             VALUES ($1, $2, $3, $4, $5)",
2159            &[
2160                &backtrace_info.execution_id.to_string(),
2161                &Json(&backtrace_info.component_id),
2162                &i64::from(backtrace_info.version_min_including.0),
2163                &i64::from(backtrace_info.version_max_excluding.0),
2164                &backtrace_json,
2165            ],
2166        )
2167        .await?;
2168
2169    Ok(())
2170}
2171
2172#[cfg(feature = "test")]
2173async fn get(
2174    tx: &Transaction<'_>,
2175    execution_id: &ExecutionId,
2176) -> Result<concepts::storage::ExecutionLog, DbErrorRead> {
2177    let rows = tx
2178        .query(
2179            "SELECT created_at, json_value, version FROM t_execution_log WHERE \
2180                 execution_id = $1 ORDER BY version",
2181            &[&execution_id.to_string()],
2182        )
2183        .await
2184        .map_err(DbErrorRead::from)?;
2185
2186    if rows.is_empty() {
2187        return Err(DbErrorRead::NotFound);
2188    }
2189
2190    let mut events = Vec::with_capacity(rows.len());
2191    for row in rows {
2192        let created_at: DateTime<Utc> = get!(row, "created_at");
2193        let event: Json<ExecutionRequest> = get!(row, "json_value");
2194        let event = event.0;
2195        let version: i64 = get!(row, "version");
2196        let version = Version::try_from(version)
2197            .map_err(|_| consistency_db_err("version must be non-negative"))?;
2198
2199        events.push(ExecutionEvent {
2200            created_at,
2201            event,
2202            backtrace_id: None,
2203            version,
2204        });
2205    }
2206
2207    let combined_state = get_combined_state(tx, execution_id).await?;
2208    let responses_dto = list_responses(tx, execution_id, None).await?;
2209    let responses = responses_dto.into_iter().map(|resp| resp.event).collect();
2210
2211    Ok(concepts::storage::ExecutionLog {
2212        execution_id: execution_id.clone(),
2213        events,
2214        responses,
2215        next_version: combined_state.get_next_version_or_finished(),
2216        pending_state: combined_state.pending_state,
2217    })
2218}
2219
2220async fn list_execution_events(
2221    tx: &Transaction<'_>,
2222    execution_id: &ExecutionId,
2223    version_min: VersionType,
2224    version_max_excluding: VersionType,
2225    include_backtrace_id: bool,
2226) -> Result<Vec<ExecutionEvent>, DbErrorRead> {
2227    let sql = if include_backtrace_id {
2228        "SELECT
2229                log.created_at,
2230                log.json_value,
2231                log.version,
2232                bt.version_min_including AS backtrace_id
2233            FROM
2234                t_execution_log AS log
2235            LEFT OUTER JOIN
2236                t_backtrace AS bt ON log.execution_id = bt.execution_id
2237                                AND log.version >= bt.version_min_including
2238                                AND log.version < bt.version_max_excluding
2239            WHERE
2240                log.execution_id = $1
2241                AND log.version >= $2
2242                AND log.version < $3
2243            ORDER BY
2244                log.version"
2245    } else {
2246        "SELECT
2247                created_at, json_value, NULL::BIGINT as backtrace_id, version
2248            FROM t_execution_log WHERE
2249                execution_id = $1 AND version >= $2 AND version < $3
2250            ORDER BY version"
2251    };
2252
2253    let rows = tx
2254        .query(
2255            sql,
2256            &[
2257                &execution_id.to_string(),
2258                &i64::from(version_min),
2259                &i64::from(version_max_excluding),
2260            ],
2261        )
2262        .await
2263        .map_err(DbErrorRead::from)?;
2264
2265    let mut events = Vec::with_capacity(rows.len());
2266    for row in rows {
2267        let created_at: DateTime<Utc> = get!(row, "created_at");
2268        let backtrace_id = get!(row, "backtrace_id", Option<i64>)
2269            .map(Version::try_from)
2270            .transpose()
2271            .map_err(|_| consistency_db_err("version must be non-negative"))?;
2272
2273        let version = get!(row, "version", i64);
2274        let version = Version::new(
2275            VersionType::try_from(version)
2276                .map_err(|_| consistency_db_err("version must be non-negative"))?,
2277        );
2278        let event_req: Json<ExecutionRequest> = get!(row, "json_value");
2279        let event_req = event_req.0;
2280
2281        events.push(ExecutionEvent {
2282            created_at,
2283            event: event_req,
2284            backtrace_id,
2285            version,
2286        });
2287    }
2288    Ok(events)
2289}
2290
2291async fn get_execution_event(
2292    tx: &Transaction<'_>,
2293    execution_id: &ExecutionId,
2294    version: VersionType,
2295) -> Result<ExecutionEvent, DbErrorRead> {
2296    let row = tx
2297        .query_one(
2298            "SELECT created_at, json_value, version FROM t_execution_log WHERE \
2299                 execution_id = $1 AND version = $2",
2300            &[&execution_id.to_string(), &i64::from(version)],
2301        )
2302        .await
2303        .map_err(DbErrorRead::from)?;
2304
2305    let created_at: DateTime<Utc> = get!(row, "created_at");
2306    let json_val: Json<ExecutionRequest> = get!(row, "json_value");
2307    let version = get!(row, "version", i64);
2308    let version = Version::try_from(version)
2309        .map_err(|_| consistency_db_err("version must be non-negative"))?;
2310    let event = json_val.0;
2311
2312    Ok(ExecutionEvent {
2313        created_at,
2314        event,
2315        backtrace_id: None,
2316        version,
2317    })
2318}
2319
2320async fn get_last_execution_event(
2321    tx: &Transaction<'_>,
2322    execution_id: &ExecutionId,
2323) -> Result<ExecutionEvent, DbErrorRead> {
2324    let row = tx
2325        .query_one(
2326            "SELECT created_at, json_value, version FROM t_execution_log WHERE \
2327                 execution_id = $1 ORDER BY version DESC LIMIT 1",
2328            &[&execution_id.to_string()],
2329        )
2330        .await
2331        .map_err(DbErrorRead::from)?;
2332
2333    let created_at: DateTime<Utc> = get!(row, "created_at");
2334    let event: Json<ExecutionRequest> = get!(row, "json_value");
2335    let event = event.0;
2336    let version = get!(row, "version", i64);
2337    let version = Version::try_from(version)
2338        .map_err(|_| consistency_db_err("version must be non-negative"))?;
2339
2340    Ok(ExecutionEvent {
2341        created_at,
2342        event,
2343        backtrace_id: None,
2344        version,
2345    })
2346}
2347
2348async fn delay_response(
2349    tx: &Transaction<'_>,
2350    execution_id: &ExecutionId,
2351    delay_id: &DelayId,
2352) -> Result<Option<bool>, DbErrorRead> {
2353    let row = tx
2354        .query_opt(
2355            "SELECT delay_success \
2356                 FROM t_join_set_response \
2357                 WHERE \
2358                 execution_id = $1 AND delay_id = $2",
2359            &[&execution_id.to_string(), &delay_id.to_string()],
2360        )
2361        .await
2362        .map_err(DbErrorRead::from)?;
2363
2364    match row {
2365        Some(r) => Ok(Some(get!(r, "delay_success", bool))),
2366        None => Ok(None),
2367    }
2368}
2369
2370#[instrument(level = Level::TRACE, skip_all)]
2371async fn get_responses_with_offset(
2372    tx: &Transaction<'_>,
2373    execution_id: &ExecutionId,
2374    skip_rows: u32,
2375) -> Result<Vec<JoinSetResponseEventOuter>, DbErrorRead> {
2376    let rows = tx
2377            .query(
2378                "SELECT r.id, r.created_at, r.join_set_id, \
2379                 r.delay_id, r.delay_success, \
2380                 r.child_execution_id, r.finished_version, l.json_value \
2381                 FROM t_join_set_response r LEFT OUTER JOIN t_execution_log l ON r.child_execution_id = l.execution_id \
2382                 WHERE \
2383                 r.execution_id = $1 AND \
2384                 ( \
2385                 r.finished_version = l.version \
2386                 OR r.child_execution_id IS NULL \
2387                 ) \
2388                 ORDER BY id \
2389                 OFFSET $2",
2390                 &[
2391                     &execution_id.to_string(),
2392                     &(i64::from(skip_rows)),
2393                 ]
2394            )
2395            .await
2396            .map_err(DbErrorRead::from)?;
2397
2398    let mut results = Vec::with_capacity(rows.len());
2399    for row in rows {
2400        let resp = parse_response_with_cursor(&row)?;
2401        results.push(resp.event);
2402    }
2403    Ok(results)
2404}
2405
2406async fn get_pending_of_single_ffqn(
2407    tx: &Transaction<'_>,
2408    batch_size: u32,
2409    pending_at_or_sooner: DateTime<Utc>,
2410    ffqn: &FunctionFqn,
2411    select_strategy: SelectStrategy,
2412) -> Result<Vec<(ExecutionId, Version)>, ()> {
2413    let rows = tx
2414        .query(
2415            &format!(
2416                "SELECT execution_id, corresponding_version FROM t_state \
2417                WHERE \
2418                state = '{STATE_PENDING_AT}' AND \
2419                pending_expires_finished <= $1 AND ffqn = $2 \
2420                ORDER BY pending_expires_finished \
2421                {} \
2422                LIMIT $3",
2423                if select_strategy == SelectStrategy::LockForUpdate {
2424                    "FOR UPDATE SKIP LOCKED"
2425                } else {
2426                    ""
2427                }
2428            ),
2429            &[
2430                &pending_at_or_sooner,
2431                &ffqn.to_string(),
2432                &(i64::from(batch_size)),
2433            ],
2434        )
2435        .await
2436        .map_err(|err| {
2437            warn!("Ignoring consistency error {err:?}");
2438        })?;
2439
2440    let mut result = Vec::with_capacity(rows.len());
2441    for row in rows {
2442        let unpack = || -> Result<(ExecutionId, Version), DbErrorGeneric> {
2443            let eid_str: String = get!(row, "execution_id");
2444            let corresponding_version: i64 = get!(row, "corresponding_version");
2445            let corresponding_version = Version::try_from(corresponding_version)
2446                .map_err(|_| consistency_db_err("version must be non-negative"))?;
2447
2448            if let Ok(eid) = ExecutionId::from_str(&eid_str) {
2449                return Ok((eid, corresponding_version.increment()));
2450            }
2451            Err(consistency_db_err("invalid execution_id"))
2452        };
2453
2454        match unpack() {
2455            Ok(val) => result.push(val),
2456            Err(err) => warn!("Ignoring corrupted row in pending check: {err:?}"),
2457        }
2458    }
2459    Ok(result)
2460}
2461
2462/// Get executions and their next versions
2463async fn get_pending_by_ffqns(
2464    tx: &Transaction<'_>,
2465    batch_size: u32,
2466    pending_at_or_sooner: DateTime<Utc>,
2467    ffqns: &[FunctionFqn],
2468    select_strategy: SelectStrategy,
2469) -> Result<Vec<(ExecutionId, Version)>, DbErrorGeneric> {
2470    let batch_size = usize::try_from(batch_size).expect("16 bit systems are unsupported");
2471    let mut execution_ids_versions = Vec::with_capacity(batch_size);
2472
2473    for ffqn in ffqns {
2474        let needed = batch_size - execution_ids_versions.len();
2475        if needed == 0 {
2476            break;
2477        }
2478        let needed = u32::try_from(needed).expect("u32 - usize cannot overflow an 32");
2479        if let Ok(execs) =
2480            get_pending_of_single_ffqn(tx, needed, pending_at_or_sooner, ffqn, select_strategy)
2481                .await
2482        {
2483            execution_ids_versions.extend(execs);
2484        }
2485    }
2486
2487    Ok(execution_ids_versions)
2488}
2489
2490#[derive(Debug, Copy, Clone, PartialEq, Eq)]
2491enum SelectStrategy {
2492    Read,
2493    LockForUpdate,
2494}
2495
2496async fn get_pending_by_component_input_digest(
2497    tx: &Transaction<'_>,
2498    batch_size: u32,
2499    pending_at_or_sooner: DateTime<Utc>,
2500    input_digest: &InputContentDigest,
2501    select_strategy: SelectStrategy,
2502) -> Result<Vec<(ExecutionId, Version)>, DbErrorGeneric> {
2503    let rows = tx
2504        .query(
2505            &format!(
2506                "SELECT execution_id, corresponding_version FROM t_state WHERE \
2507                state = '{STATE_PENDING_AT}' AND \
2508                pending_expires_finished <= $1 AND \
2509                component_id_input_digest = $2 \
2510                ORDER BY pending_expires_finished \
2511                {} \
2512                LIMIT $3",
2513                if select_strategy == SelectStrategy::LockForUpdate {
2514                    "FOR UPDATE SKIP LOCKED"
2515                } else {
2516                    ""
2517                }
2518            ),
2519            &[
2520                &pending_at_or_sooner,
2521                &input_digest.as_slice(), // BYTEA
2522                &i64::from(batch_size),
2523            ],
2524        )
2525        .await
2526        .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
2527
2528    let mut result = Vec::with_capacity(rows.len());
2529    for row in rows {
2530        let unpack = || -> Result<(ExecutionId, Version), DbErrorGeneric> {
2531            let eid_str: String = get!(row, "execution_id");
2532            let corresponding_version: i64 = get!(row, "corresponding_version");
2533            let corresponding_version = Version::try_from(corresponding_version)
2534                .map_err(|_| consistency_db_err("version must be non-negative"))?;
2535
2536            let eid = ExecutionId::from_str(&eid_str)
2537                .map_err(|err| consistency_db_err(err.to_string()))?;
2538            Ok((eid, corresponding_version.increment()))
2539        };
2540
2541        match unpack() {
2542            Ok(val) => result.push(val),
2543            Err(err) => {
2544                warn!("Skipping corrupted row in get_pending_by_component_input_digest: {err:?}");
2545            }
2546        }
2547    }
2548
2549    Ok(result)
2550}
2551
2552fn notify_pending_locked(
2553    notifier: &NotifierPendingAt,
2554    current_time: DateTime<Utc>,
2555    ffqn_to_pending_subscription: &std::sync::MutexGuard<PendingFfqnSubscribersHolder>,
2556) {
2557    if notifier.scheduled_at <= current_time {
2558        ffqn_to_pending_subscription.notify(notifier);
2559    }
2560}
2561
2562async fn upgrade_execution_component(
2563    tx: &Transaction<'_>,
2564    execution_id: &ExecutionId,
2565    old: &InputContentDigest,
2566    new: &InputContentDigest,
2567) -> Result<(), DbErrorWrite> {
2568    debug!("Updating t_state to component {new}");
2569
2570    let updated = tx
2571        .execute(
2572            r"
2573                UPDATE t_state
2574                SET
2575                    updated_at = CURRENT_TIMESTAMP,
2576                    component_id_input_digest = $1
2577                WHERE
2578                    execution_id = $2 AND
2579                    component_id_input_digest = $3
2580                ",
2581            &[
2582                &new.as_slice(),           // $1: BYTEA
2583                &execution_id.to_string(), // $2: TEXT
2584                &old.as_slice(),           // $3: BYTEA
2585            ],
2586        )
2587        .await
2588        .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
2589
2590    if updated != 1 {
2591        return Err(DbErrorWrite::NotFound);
2592    }
2593    Ok(())
2594}
2595
2596impl PostgresConnection {
2597    // Must be called after write transaction commit for a correct happens-before relationship.
2598    #[instrument(level = Level::TRACE, skip_all)]
2599    fn notify_all(&self, notifiers: Vec<AppendNotifier>, current_time: DateTime<Utc>) {
2600        let (pending_ats, finished_execs, responses) = {
2601            let (mut pending_ats, mut finished_execs, mut responses) =
2602                (Vec::new(), Vec::new(), Vec::new());
2603            for notifier in notifiers {
2604                if let Some(pending_at) = notifier.pending_at {
2605                    pending_ats.push(pending_at);
2606                }
2607                if let Some(finished) = notifier.execution_finished {
2608                    finished_execs.push(finished);
2609                }
2610                if let Some(response) = notifier.response {
2611                    responses.push(response);
2612                }
2613            }
2614            (pending_ats, finished_execs, responses)
2615        };
2616
2617        // Notify pending_at subscribers.
2618        if !pending_ats.is_empty() {
2619            let guard = self.pending_subscribers.lock().unwrap();
2620            for pending_at in pending_ats {
2621                notify_pending_locked(&pending_at, current_time, &guard);
2622            }
2623        }
2624        // Notify execution finished subscribers.
2625        if !finished_execs.is_empty() {
2626            let mut guard = self.execution_finished_subscribers.lock().unwrap();
2627            for finished in finished_execs {
2628                if let Some(listeners_of_exe_id) = guard.remove(&finished.execution_id) {
2629                    for (_tag, sender) in listeners_of_exe_id {
2630                        let _ = sender.send(finished.retval.clone());
2631                    }
2632                }
2633            }
2634        }
2635        // Notify response subscribers.
2636        if !responses.is_empty() {
2637            let mut guard = self.response_subscribers.lock().unwrap();
2638            for (execution_id, response) in responses {
2639                if let Some((sender, _)) = guard.remove(&execution_id) {
2640                    let _ = sender.send(response);
2641                }
2642            }
2643        }
2644    }
2645}
2646
2647#[async_trait]
2648impl DbExecutor for PostgresConnection {
2649    #[instrument(level = Level::TRACE, skip(self))]
2650    async fn lock_pending_by_ffqns(
2651        &self,
2652        batch_size: u32,
2653        pending_at_or_sooner: DateTime<Utc>,
2654        ffqns: Arc<[FunctionFqn]>,
2655        created_at: DateTime<Utc>,
2656        component_id: ComponentId,
2657        executor_id: ExecutorId,
2658        lock_expires_at: DateTime<Utc>,
2659        run_id: RunId,
2660        retry_config: ComponentRetryConfig,
2661    ) -> Result<LockPendingResponse, DbErrorGeneric> {
2662        let mut client_guard = self.client.lock().await;
2663        let tx = client_guard
2664            .transaction()
2665            .await
2666            .map_err(DbErrorGeneric::from)?;
2667
2668        let execution_ids_versions = get_pending_by_ffqns(
2669            &tx,
2670            batch_size,
2671            pending_at_or_sooner,
2672            &ffqns,
2673            SelectStrategy::LockForUpdate,
2674        )
2675        .await?;
2676
2677        if execution_ids_versions.is_empty() {
2678            // Commit is required to release the connection state cleanly,
2679            // though rollback/drop works too for read-only.
2680            tx.commit().await.map_err(DbErrorGeneric::from)?;
2681            return Ok(vec![]);
2682        }
2683
2684        debug!("Locking {execution_ids_versions:?}");
2685
2686        // Lock using the same transaction
2687        let mut locked_execs = Vec::with_capacity(execution_ids_versions.len());
2688        for (execution_id, version) in execution_ids_versions {
2689            match lock_single_execution(
2690                &tx,
2691                created_at,
2692                component_id.clone(),
2693                &execution_id,
2694                run_id,
2695                &version,
2696                executor_id,
2697                lock_expires_at,
2698                retry_config,
2699            )
2700            .await
2701            {
2702                Ok(locked) => locked_execs.push(locked),
2703                Err(err) => {
2704                    warn!("Locking row {execution_id} failed - {err:?}");
2705                }
2706            }
2707        }
2708
2709        tx.commit().await.map_err(DbErrorGeneric::from)?;
2710
2711        Ok(locked_execs)
2712    }
2713
2714    #[instrument(level = Level::TRACE, skip(self))]
2715    async fn lock_pending_by_component_id(
2716        &self,
2717        batch_size: u32,
2718        pending_at_or_sooner: DateTime<Utc>,
2719        component_id: &ComponentId,
2720        created_at: DateTime<Utc>,
2721        executor_id: ExecutorId,
2722        lock_expires_at: DateTime<Utc>,
2723        run_id: RunId,
2724        retry_config: ComponentRetryConfig,
2725    ) -> Result<LockPendingResponse, DbErrorGeneric> {
2726        let mut client_guard = self.client.lock().await;
2727        let tx = client_guard
2728            .transaction()
2729            .await
2730            .map_err(DbErrorGeneric::from)?;
2731
2732        let execution_ids_versions = get_pending_by_component_input_digest(
2733            &tx,
2734            batch_size,
2735            pending_at_or_sooner,
2736            &component_id.input_digest,
2737            SelectStrategy::LockForUpdate,
2738        )
2739        .await?;
2740
2741        if execution_ids_versions.is_empty() {
2742            tx.commit().await.map_err(DbErrorGeneric::from)?;
2743            return Ok(vec![]);
2744        }
2745
2746        debug!("Locking {execution_ids_versions:?}");
2747
2748        let mut locked_execs = Vec::with_capacity(execution_ids_versions.len());
2749        for (execution_id, version) in execution_ids_versions {
2750            match lock_single_execution(
2751                &tx,
2752                created_at,
2753                component_id.clone(),
2754                &execution_id,
2755                run_id,
2756                &version,
2757                executor_id,
2758                lock_expires_at,
2759                retry_config,
2760            )
2761            .await
2762            {
2763                Ok(locked) => locked_execs.push(locked),
2764                Err(err) => {
2765                    warn!("Locking row {execution_id} failed - {err:?}");
2766                }
2767            }
2768        }
2769
2770        tx.commit().await.map_err(DbErrorGeneric::from)?;
2771        Ok(locked_execs)
2772    }
2773
2774    #[instrument(level = Level::DEBUG, skip(self))]
2775    async fn lock_one(
2776        &self,
2777        created_at: DateTime<Utc>,
2778        component_id: ComponentId,
2779        execution_id: &ExecutionId,
2780        run_id: RunId,
2781        version: Version,
2782        executor_id: ExecutorId,
2783        lock_expires_at: DateTime<Utc>,
2784        retry_config: ComponentRetryConfig,
2785    ) -> Result<LockedExecution, DbErrorWrite> {
2786        debug!(%execution_id, "lock_one");
2787        let mut client_guard = self.client.lock().await;
2788        let tx = client_guard
2789            .transaction()
2790            .await
2791            .map_err(DbErrorGeneric::from)?;
2792
2793        let res = lock_single_execution(
2794            &tx,
2795            created_at,
2796            component_id,
2797            execution_id,
2798            run_id,
2799            &version,
2800            executor_id,
2801            lock_expires_at,
2802            retry_config,
2803        )
2804        .await?;
2805
2806        tx.commit().await.map_err(DbErrorGeneric::from)?;
2807        Ok(res)
2808    }
2809
2810    #[instrument(level = Level::DEBUG, skip(self, req))]
2811    async fn append(
2812        &self,
2813        execution_id: ExecutionId,
2814        version: Version,
2815        req: AppendRequest,
2816    ) -> Result<AppendResponse, DbErrorWrite> {
2817        debug!(%req, "append");
2818        trace!(?req, "append");
2819        let created_at = req.created_at;
2820
2821        let mut client_guard = self.client.lock().await;
2822        let tx = client_guard
2823            .transaction()
2824            .await
2825            .map_err(DbErrorGeneric::from)?;
2826
2827        let (new_version, notifier) = append(&tx, &execution_id, req, version).await?;
2828
2829        tx.commit().await.map_err(DbErrorGeneric::from)?;
2830
2831        // Explicitly drop guard (optional, happens at end of scope anyway)
2832        drop(client_guard);
2833
2834        self.notify_all(vec![notifier], created_at);
2835        Ok(new_version)
2836    }
2837
2838    #[instrument(level = Level::DEBUG, skip_all)]
2839    async fn append_batch_respond_to_parent(
2840        &self,
2841        events: AppendEventsToExecution,
2842        response: AppendResponseToExecution,
2843        current_time: DateTime<Utc>,
2844    ) -> Result<AppendBatchResponse, DbErrorWrite> {
2845        debug!("append_batch_respond_to_parent");
2846        if events.execution_id == response.parent_execution_id {
2847            return Err(DbErrorWrite::NonRetriable(
2848                DbErrorWriteNonRetriable::ValidationFailed(
2849                    "Parameters `execution_id` and `parent_execution_id` cannot be the same".into(),
2850                ),
2851            ));
2852        }
2853        if events.batch.is_empty() {
2854            return Err(DbErrorWrite::NonRetriable(
2855                DbErrorWriteNonRetriable::ValidationFailed("batch cannot be empty".into()),
2856            ));
2857        }
2858
2859        let mut client_guard = self.client.lock().await;
2860        let tx = client_guard
2861            .transaction()
2862            .await
2863            .map_err(DbErrorGeneric::from)?;
2864
2865        let mut version = events.version;
2866        let mut notifiers = Vec::new();
2867
2868        for append_request in events.batch {
2869            let (v, n) = append(&tx, &events.execution_id, append_request, version).await?;
2870            version = v;
2871            notifiers.push(n);
2872        }
2873
2874        let pending_at_parent = append_response(
2875            &tx,
2876            &response.parent_execution_id,
2877            JoinSetResponseEventOuter {
2878                created_at: response.created_at,
2879                event: JoinSetResponseEvent {
2880                    join_set_id: response.join_set_id,
2881                    event: JoinSetResponse::ChildExecutionFinished {
2882                        child_execution_id: response.child_execution_id,
2883                        finished_version: response.finished_version,
2884                        result: response.result,
2885                    },
2886                },
2887            },
2888        )
2889        .await?;
2890        notifiers.push(pending_at_parent);
2891
2892        tx.commit().await.map_err(DbErrorGeneric::from)?;
2893        drop(client_guard);
2894
2895        self.notify_all(notifiers, current_time);
2896        Ok(version)
2897    }
2898
2899    #[instrument(level = Level::TRACE, skip(self, timeout_fut))]
2900    async fn wait_for_pending_by_ffqn(
2901        &self,
2902        pending_at_or_sooner: DateTime<Utc>,
2903        ffqns: Arc<[FunctionFqn]>,
2904        timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
2905    ) {
2906        let unique_tag: u64 = rand::random();
2907        let (sender, mut receiver) = mpsc::channel(1);
2908        {
2909            let mut pending_subscribers = self.pending_subscribers.lock().unwrap();
2910            for ffqn in ffqns.as_ref() {
2911                pending_subscribers.insert_ffqn(ffqn.clone(), (sender.clone(), unique_tag));
2912            }
2913        }
2914
2915        async {
2916            let mut db_has_pending = false;
2917            {
2918                // Scope the lock so we don't hold it while waiting for timeout
2919                let mut client_guard = self.client.lock().await;
2920                // Read-only transaction check
2921                if let Ok(tx) = client_guard.transaction().await {
2922                    if let Ok(res) = get_pending_by_ffqns(
2923                        &tx,
2924                        1,
2925                        pending_at_or_sooner,
2926                        &ffqns,
2927                        SelectStrategy::Read,
2928                    )
2929                    .await
2930                        && !res.is_empty()
2931                    {
2932                        db_has_pending = true;
2933                    }
2934                    // Commit/Rollback read transaction
2935                    let _ = tx.commit().await;
2936                }
2937            }
2938
2939            if db_has_pending {
2940                trace!("Not waiting, database already contains new pending executions");
2941                return;
2942            }
2943
2944            tokio::select! {
2945                _ = receiver.recv() => {
2946                    trace!("Received a notification");
2947                }
2948                () = timeout_fut => {
2949                }
2950            }
2951        }
2952        .await;
2953
2954        // Cleanup
2955        {
2956            let mut pending_subscribers = self.pending_subscribers.lock().unwrap();
2957            for ffqn in ffqns.as_ref() {
2958                match pending_subscribers.remove_ffqn(ffqn) {
2959                    Some((_, tag)) if tag == unique_tag => {}
2960                    Some(other) => {
2961                        pending_subscribers.insert_ffqn(ffqn.clone(), other);
2962                    }
2963                    None => {}
2964                }
2965            }
2966        }
2967    }
2968
2969    async fn wait_for_pending_by_component_id(
2970        &self,
2971        pending_at_or_sooner: DateTime<Utc>,
2972        component_id: &ComponentId,
2973        timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
2974    ) {
2975        let unique_tag: u64 = rand::random();
2976        let (sender, mut receiver) = mpsc::channel(1);
2977        {
2978            let mut pending_subscribers = self.pending_subscribers.lock().unwrap();
2979            pending_subscribers.insert_by_component(
2980                component_id.input_digest.clone(),
2981                (sender.clone(), unique_tag),
2982            );
2983        }
2984
2985        async {
2986            let mut db_has_pending = false;
2987            {
2988                let mut client_guard = self.client.lock().await;
2989                if let Ok(tx) = client_guard.transaction().await {
2990                    if let Ok(res) = get_pending_by_component_input_digest(
2991                        &tx,
2992                        1,
2993                        pending_at_or_sooner,
2994                        &component_id.input_digest,
2995                        SelectStrategy::Read,
2996                    )
2997                    .await
2998                        && !res.is_empty()
2999                    {
3000                        db_has_pending = true;
3001                    }
3002                    let _ = tx.commit().await;
3003                }
3004            }
3005
3006            if db_has_pending {
3007                trace!("Not waiting, database already contains new pending executions");
3008                return;
3009            }
3010
3011            tokio::select! {
3012                _ = receiver.recv() => {
3013                    trace!("Received a notification");
3014                }
3015                () = timeout_fut => {
3016                }
3017            }
3018        }
3019        .await;
3020
3021        // Cleanup
3022        {
3023            let mut pending_subscribers = self.pending_subscribers.lock().unwrap();
3024            match pending_subscribers.remove_by_component(&component_id.input_digest) {
3025                Some((_, tag)) if tag == unique_tag => {}
3026                Some(other) => {
3027                    pending_subscribers
3028                        .insert_by_component(component_id.input_digest.clone(), other);
3029                }
3030                None => {}
3031            }
3032        }
3033    }
3034
3035    async fn get_last_execution_event(
3036        &self,
3037        execution_id: &ExecutionId,
3038    ) -> Result<ExecutionEvent, DbErrorRead> {
3039        let mut client_guard = self.client.lock().await;
3040        let tx = client_guard
3041            .transaction()
3042            .await
3043            .map_err(DbErrorGeneric::from)?;
3044
3045        let event = get_last_execution_event(&tx, execution_id).await?;
3046
3047        tx.commit().await.map_err(DbErrorGeneric::from)?;
3048        Ok(event)
3049    }
3050}
3051#[async_trait]
3052impl DbConnection for PostgresConnection {
3053    #[instrument(level = Level::DEBUG, skip_all, fields(execution_id = %req.execution_id))]
3054    async fn create(&self, req: CreateRequest) -> Result<AppendResponse, DbErrorWrite> {
3055        debug!("create");
3056        trace!(?req, "create");
3057        let created_at = req.created_at;
3058
3059        let mut client_guard = self.client.lock().await;
3060        let tx = client_guard
3061            .transaction()
3062            .await
3063            .map_err(DbErrorGeneric::from)?;
3064
3065        let (version, notifier) = create_inner(&tx, req.clone()).await?;
3066
3067        tx.commit().await.map_err(DbErrorGeneric::from)?;
3068        drop(client_guard); // Release DB lock before notifying
3069
3070        self.notify_all(vec![notifier], created_at);
3071        Ok(version)
3072    }
3073
3074    #[instrument(level = Level::DEBUG, skip(self, batch))]
3075    async fn append_batch(
3076        &self,
3077        current_time: DateTime<Utc>,
3078        batch: Vec<AppendRequest>,
3079        execution_id: ExecutionId,
3080        version: Version,
3081    ) -> Result<AppendBatchResponse, DbErrorWrite> {
3082        debug!("append_batch");
3083        trace!(?batch, "append_batch");
3084        assert!(!batch.is_empty(), "Empty batch request");
3085
3086        let mut client_guard = self.client.lock().await;
3087        let tx = client_guard
3088            .transaction()
3089            .await
3090            .map_err(DbErrorGeneric::from)?;
3091
3092        let mut version = version;
3093        let mut notifier = None;
3094
3095        for append_request in batch {
3096            let (v, n) = append(&tx, &execution_id, append_request, version).await?;
3097            version = v;
3098            notifier = Some(n);
3099        }
3100
3101        tx.commit().await.map_err(DbErrorGeneric::from)?;
3102        drop(client_guard);
3103
3104        self.notify_all(
3105            vec![notifier.expect("checked that the batch is not empty")],
3106            current_time,
3107        );
3108        Ok(version)
3109    }
3110
3111    #[instrument(level = Level::DEBUG, skip(self, batch, child_req))]
3112    async fn append_batch_create_new_execution(
3113        &self,
3114        current_time: DateTime<Utc>,
3115        batch: Vec<AppendRequest>,
3116        execution_id: ExecutionId,
3117        version: Version,
3118        child_req: Vec<CreateRequest>,
3119    ) -> Result<AppendBatchResponse, DbErrorWrite> {
3120        debug!("append_batch_create_new_execution");
3121        trace!(?batch, ?child_req, "append_batch_create_new_execution");
3122        assert!(!batch.is_empty(), "Empty batch request");
3123
3124        let mut client_guard = self.client.lock().await;
3125        let tx = client_guard
3126            .transaction()
3127            .await
3128            .map_err(DbErrorGeneric::from)?;
3129
3130        let mut version = version;
3131        let mut notifier = None;
3132
3133        for append_request in batch {
3134            let (v, n) = append(&tx, &execution_id, append_request, version).await?;
3135            version = v;
3136            notifier = Some(n);
3137        }
3138
3139        let mut notifiers = Vec::new();
3140        notifiers.push(notifier.expect("checked that the batch is not empty"));
3141
3142        for req in child_req {
3143            let (_, n) = create_inner(&tx, req).await?;
3144            notifiers.push(n);
3145        }
3146
3147        tx.commit().await.map_err(DbErrorGeneric::from)?;
3148        drop(client_guard);
3149
3150        self.notify_all(notifiers, current_time);
3151        Ok(version)
3152    }
3153
3154    #[instrument(level = Level::DEBUG, skip(self, timeout_fut))]
3155    async fn subscribe_to_next_responses(
3156        &self,
3157        execution_id: &ExecutionId,
3158        start_idx: u32,
3159        timeout_fut: Pin<Box<dyn Future<Output = TimeoutOutcome> + Send>>,
3160    ) -> Result<Vec<JoinSetResponseEventOuter>, DbErrorReadWithTimeout> {
3161        debug!("next_responses");
3162        let unique_tag: u64 = rand::random();
3163        let execution_id_clone = execution_id.clone();
3164
3165        let cleanup = || {
3166            let mut guard = self.response_subscribers.lock().unwrap();
3167            match guard.remove(&execution_id_clone) {
3168                Some((_, tag)) if tag == unique_tag => {}
3169                Some(other) => {
3170                    guard.insert(execution_id_clone.clone(), other);
3171                }
3172                None => {}
3173            }
3174        };
3175
3176        let receiver = {
3177            let mut client_guard = self.client.lock().await;
3178            let tx = client_guard
3179                .transaction()
3180                .await
3181                .map_err(DbErrorRead::from)?;
3182
3183            // Register listener before fetching from database.
3184            // This is a best-effort mechanism that shortens the polling time, if it
3185            // does not detect the response it will sleep using `timeout_fut`. Consumers
3186            // are expected to poll this function in a loop.
3187            // Currently the notification mechanism only works on a single node deployment.
3188            let (sender, receiver) = oneshot::channel();
3189            self.response_subscribers
3190                .lock()
3191                .unwrap()
3192                .insert(execution_id.clone(), (sender, unique_tag));
3193
3194            let responses = get_responses_with_offset(&tx, execution_id, start_idx).await?;
3195
3196            if responses.is_empty() {
3197                // Commit read transaction
3198                tx.commit().await.map_err(|err| {
3199                    cleanup(); // Remove the just inserted subscriber.
3200                    DbErrorRead::from(err)
3201                })?;
3202                receiver
3203            } else {
3204                cleanup(); // Remove the just inserted subscriber as we already have the answer.
3205                tx.commit().await.map_err(DbErrorRead::from)?;
3206                return Ok(responses);
3207            }
3208        };
3209
3210        let res = tokio::select! {
3211            resp = receiver => {
3212                match resp {
3213                    Ok(resp) => Ok(vec![resp]),
3214                    Err(_) => Err(DbErrorReadWithTimeout::from(DbErrorGeneric::Close)),
3215                }
3216            }
3217            outcome = timeout_fut => Err(DbErrorReadWithTimeout::Timeout(outcome)),
3218        };
3219
3220        cleanup();
3221        res
3222    }
3223
3224    async fn wait_for_finished_result(
3225        &self,
3226        execution_id: &ExecutionId,
3227        timeout_fut: Option<Pin<Box<dyn Future<Output = TimeoutOutcome> + Send>>>,
3228    ) -> Result<SupportedFunctionReturnValue, DbErrorReadWithTimeout> {
3229        let unique_tag: u64 = rand::random();
3230        let execution_id_clone = execution_id.clone();
3231
3232        let cleanup = || {
3233            let mut guard = self.execution_finished_subscribers.lock().unwrap();
3234            if let Some(subscribers) = guard.get_mut(&execution_id_clone) {
3235                subscribers.remove(&unique_tag);
3236            }
3237        };
3238
3239        let receiver = {
3240            let mut client_guard = self.client.lock().await;
3241            let tx = client_guard
3242                .transaction()
3243                .await
3244                .map_err(DbErrorRead::from)?;
3245
3246            // Register listener
3247            let (sender, receiver) = oneshot::channel();
3248            {
3249                let mut guard = self.execution_finished_subscribers.lock().unwrap();
3250                guard
3251                    .entry(execution_id.clone())
3252                    .or_default()
3253                    .insert(unique_tag, sender);
3254            }
3255
3256            let pending_state = get_combined_state(&tx, execution_id).await?.pending_state;
3257
3258            if let PendingState::Finished { finished, .. } = pending_state {
3259                let event = get_execution_event(&tx, execution_id, finished.version).await?;
3260                tx.commit().await.map_err(DbErrorRead::from)?;
3261                cleanup();
3262
3263                if let ExecutionRequest::Finished { result, .. } = event.event {
3264                    return Ok(result);
3265                }
3266                error!("Mismatch, expected Finished row: {event:?} based on t_state {finished}");
3267                return Err(DbErrorReadWithTimeout::from(consistency_db_err(
3268                    "cannot get finished event based on t_state version",
3269                )));
3270            }
3271            tx.commit().await.map_err(DbErrorRead::from)?;
3272            receiver
3273        };
3274
3275        let timeout_fut = timeout_fut.unwrap_or_else(|| Box::pin(std::future::pending()));
3276        let res = tokio::select! {
3277            resp = receiver => {
3278                match resp {
3279                    Ok(retval) => Ok(retval),
3280                    Err(_recv_err) => Err(DbErrorGeneric::Close.into())
3281                }
3282            }
3283            outcome = timeout_fut => Err(DbErrorReadWithTimeout::Timeout(outcome)),
3284        };
3285
3286        cleanup();
3287        res
3288    }
3289
3290    #[instrument(level = Level::DEBUG, skip_all, fields(%join_set_id, %execution_id))]
3291    async fn append_delay_response(
3292        &self,
3293        created_at: DateTime<Utc>,
3294        execution_id: ExecutionId,
3295        join_set_id: JoinSetId,
3296        delay_id: DelayId,
3297        result: Result<(), ()>,
3298    ) -> Result<AppendDelayResponseOutcome, DbErrorWrite> {
3299        debug!("append_delay_response");
3300        let event = JoinSetResponseEventOuter {
3301            created_at,
3302            event: JoinSetResponseEvent {
3303                join_set_id,
3304                event: JoinSetResponse::DelayFinished {
3305                    delay_id: delay_id.clone(),
3306                    result,
3307                },
3308            },
3309        };
3310
3311        let mut client_guard = self.client.lock().await;
3312        let tx = client_guard
3313            .transaction()
3314            .await
3315            .map_err(DbErrorGeneric::from)?;
3316
3317        let res = append_response(&tx, &execution_id, event).await;
3318
3319        match res {
3320            Ok(notifier) => {
3321                tx.commit().await.map_err(DbErrorGeneric::from)?;
3322                drop(client_guard);
3323                self.notify_all(vec![notifier], created_at);
3324                Ok(AppendDelayResponseOutcome::Success)
3325            }
3326            Err(DbErrorWrite::NonRetriable(DbErrorWriteNonRetriable::Conflict)) => {
3327                // Check if already finished
3328                // Roll back the failed tx, start new read tx.
3329                tx.rollback().await.map_err(DbErrorGeneric::from)?;
3330
3331                // Start new tx for check
3332                let tx = client_guard
3333                    .transaction()
3334                    .await
3335                    .map_err(DbErrorGeneric::from)?;
3336                let delay_success = delay_response(&tx, &execution_id, &delay_id).await?;
3337                tx.commit().await.map_err(DbErrorGeneric::from)?;
3338
3339                match delay_success {
3340                    Some(true) => Ok(AppendDelayResponseOutcome::AlreadyFinished),
3341                    Some(false) => Ok(AppendDelayResponseOutcome::AlreadyCancelled),
3342                    None => Err(DbErrorWrite::Generic(DbErrorGeneric::Uncategorized(
3343                        "insert failed yet select did not find the response".into(),
3344                    ))),
3345                }
3346            }
3347            Err(err) => {
3348                let _ = tx.rollback().await; // cleanup
3349                Err(err)
3350            }
3351        }
3352    }
3353
3354    #[instrument(level = Level::DEBUG, skip_all)]
3355    async fn append_backtrace(&self, append: BacktraceInfo) -> Result<(), DbErrorWrite> {
3356        debug!("append_backtrace");
3357        let mut client_guard = self.client.lock().await;
3358        let tx = client_guard
3359            .transaction()
3360            .await
3361            .map_err(DbErrorGeneric::from)?;
3362
3363        append_backtrace(&tx, &append).await?;
3364
3365        tx.commit().await.map_err(DbErrorGeneric::from)?;
3366        Ok(())
3367    }
3368
3369    #[instrument(level = Level::DEBUG, skip_all)]
3370    async fn append_backtrace_batch(&self, batch: Vec<BacktraceInfo>) -> Result<(), DbErrorWrite> {
3371        debug!("append_backtrace_batch");
3372        let mut client_guard = self.client.lock().await;
3373        let tx = client_guard
3374            .transaction()
3375            .await
3376            .map_err(DbErrorGeneric::from)?;
3377
3378        for append in batch {
3379            append_backtrace(&tx, &append).await?;
3380        }
3381
3382        tx.commit().await.map_err(DbErrorGeneric::from)?;
3383        Ok(())
3384    }
3385
3386    /// Get currently expired delays and locks.
3387    #[instrument(level = Level::TRACE, skip(self))]
3388    async fn get_expired_timers(
3389        &self,
3390        at: DateTime<Utc>,
3391    ) -> Result<Vec<ExpiredTimer>, DbErrorGeneric> {
3392        let mut client_guard = self.client.lock().await;
3393        let tx = client_guard
3394            .transaction()
3395            .await
3396            .map_err(DbErrorGeneric::from)?;
3397
3398        // 1. Expired Delays
3399        let rows = tx
3400            .query(
3401                "SELECT execution_id, join_set_id, delay_id FROM t_delay WHERE expires_at <= $1",
3402                &[&at],
3403            )
3404            .await
3405            .map_err(DbErrorGeneric::from)?;
3406
3407        let mut expired_timers = Vec::with_capacity(rows.len());
3408        for row in rows {
3409            let unpack = || -> Result<ExpiredTimer, DbErrorGeneric> {
3410                let execution_id: String = get!(row, "execution_id");
3411                let execution_id = ExecutionId::from_str(&execution_id)
3412                    .map_err(|err| consistency_db_err(err.to_string()))?;
3413                let join_set_id: String = get!(row, "join_set_id");
3414                let join_set_id = JoinSetId::from_str(&join_set_id)
3415                    .map_err(|err| consistency_db_err(err.to_string()))?;
3416                let delay_id: String = get!(row, "delay_id");
3417                let delay_id = DelayId::from_str(&delay_id)
3418                    .map_err(|err| consistency_db_err(err.to_string()))?;
3419
3420                Ok(ExpiredTimer::Delay(ExpiredDelay {
3421                    execution_id,
3422                    join_set_id,
3423                    delay_id,
3424                }))
3425            };
3426
3427            match unpack() {
3428                Ok(timer) => expired_timers.push(timer),
3429                Err(err) => warn!("Skipping corrupted row in get_expired_timers (delays): {err:?}"),
3430            }
3431        }
3432
3433        // 2. Expired Locks
3434        let rows = tx.query(
3435            &format!(
3436                "SELECT execution_id, last_lock_version, corresponding_version, intermittent_event_count, max_retries, retry_exp_backoff_millis, executor_id, run_id \
3437                 FROM t_state \
3438                 WHERE pending_expires_finished <= $1 AND state = '{STATE_LOCKED}'"
3439            ),
3440            &[&at]
3441        ).await.map_err(DbErrorGeneric::from)?;
3442
3443        for row in rows {
3444            let unpack = || -> Result<ExpiredTimer, DbErrorGeneric> {
3445                let execution_id: String = get!(row, "execution_id");
3446                let execution_id = ExecutionId::from_str(&execution_id)
3447                    .map_err(|err| consistency_db_err(err.to_string()))?;
3448                let last_lock_version: i64 = get!(row, "last_lock_version");
3449                let last_lock_version = Version::try_from(last_lock_version)
3450                    .map_err(|_| consistency_db_err("version must be non-negative"))?;
3451
3452                let corresponding_version: i64 = get!(row, "corresponding_version");
3453                let corresponding_version = Version::try_from(corresponding_version)
3454                    .map_err(|_| consistency_db_err("version must be non-negative"))?;
3455
3456                let intermittent_event_count =
3457                    u32::try_from(get!(row, "intermittent_event_count", i64)).map_err(|_| {
3458                        consistency_db_err("`intermittent_event_count` must not be negative")
3459                    })?;
3460
3461                let max_retries = get!(row, "max_retries", Option<i64>)
3462                    .map(u32::try_from)
3463                    .transpose()
3464                    .map_err(|_| consistency_db_err("`max_retries` must not be negative"))?;
3465                let retry_exp_backoff_millis =
3466                    u32::try_from(get!(row, "retry_exp_backoff_millis", i64)).map_err(|_| {
3467                        consistency_db_err("`retry_exp_backoff_millis` must not be negative")
3468                    })?;
3469                let executor_id: String = get!(row, "executor_id");
3470                let executor_id = ExecutorId::from_str(&executor_id)
3471                    .map_err(|err| consistency_db_err(err.to_string()))?;
3472                let run_id: String = get!(row, "run_id");
3473                let run_id =
3474                    RunId::from_str(&run_id).map_err(|err| consistency_db_err(err.to_string()))?;
3475
3476                Ok(ExpiredTimer::Lock(ExpiredLock {
3477                    execution_id,
3478                    locked_at_version: last_lock_version,
3479                    next_version: corresponding_version.increment(),
3480                    intermittent_event_count,
3481                    max_retries,
3482                    retry_exp_backoff: Duration::from_millis(u64::from(retry_exp_backoff_millis)),
3483                    locked_by: LockedBy {
3484                        executor_id,
3485                        run_id,
3486                    },
3487                }))
3488            };
3489
3490            match unpack() {
3491                Ok(timer) => expired_timers.push(timer),
3492                Err(err) => warn!("Skipping corrupted row in get_expired_timers (locks): {err:?}"),
3493            }
3494        }
3495
3496        tx.commit().await.map_err(DbErrorGeneric::from)?;
3497
3498        if !expired_timers.is_empty() {
3499            debug!("get_expired_timers found {expired_timers:?}");
3500        }
3501        Ok(expired_timers)
3502    }
3503
3504    async fn get_execution_event(
3505        &self,
3506        execution_id: &ExecutionId,
3507        version: &Version,
3508    ) -> Result<ExecutionEvent, DbErrorRead> {
3509        let mut client_guard = self.client.lock().await;
3510        let tx = client_guard
3511            .transaction()
3512            .await
3513            .map_err(DbErrorRead::from)?;
3514
3515        let event = get_execution_event(&tx, execution_id, version.0).await?;
3516
3517        tx.commit().await.map_err(DbErrorRead::from)?;
3518        Ok(event)
3519    }
3520
3521    async fn get_pending_state(
3522        &self,
3523        execution_id: &ExecutionId,
3524    ) -> Result<PendingState, DbErrorRead> {
3525        let mut client_guard = self.client.lock().await;
3526        let tx = client_guard
3527            .transaction()
3528            .await
3529            .map_err(DbErrorRead::from)?;
3530
3531        let state = get_combined_state(&tx, execution_id).await?.pending_state;
3532
3533        tx.commit().await.map_err(DbErrorRead::from)?;
3534        Ok(state)
3535    }
3536}
3537
3538#[async_trait]
3539impl DbExternalApi for PostgresConnection {
3540    #[instrument(level = Level::DEBUG, skip_all)]
3541    async fn get_backtrace(
3542        &self,
3543        execution_id: &ExecutionId,
3544        filter: BacktraceFilter,
3545    ) -> Result<BacktraceInfo, DbErrorRead> {
3546        debug!("get_backtrace");
3547
3548        let mut client_guard = self.client.lock().await;
3549        let tx = client_guard
3550            .transaction()
3551            .await
3552            .map_err(DbErrorRead::from)?;
3553
3554        let mut params: Vec<Box<dyn tokio_postgres::types::ToSql + Sync + Send>> = Vec::new();
3555
3556        params.push(Box::new(execution_id.to_string())); // $1
3557        let p_execution_id_idx = format!("${}", params.len()); // $1
3558
3559        let mut sql = String::new();
3560        write!(
3561            &mut sql,
3562            "SELECT component_id, version_min_including, version_max_excluding, wasm_backtrace \
3563     FROM t_backtrace WHERE execution_id = {p_execution_id_idx}"
3564        )
3565        .unwrap();
3566
3567        match &filter {
3568            BacktraceFilter::Specific(version) => {
3569                params.push(Box::new(i64::from(version.0))); // $2
3570                let p_ver_idx = format!("${}", params.len()); // $2
3571                write!(
3572                    &mut sql,
3573                    " AND version_min_including <= {p_ver_idx} AND version_max_excluding > {p_ver_idx}"
3574                )
3575                .unwrap();
3576            }
3577            BacktraceFilter::First => {
3578                sql.push_str(" ORDER BY version_min_including LIMIT 1");
3579            }
3580            BacktraceFilter::Last => {
3581                sql.push_str(" ORDER BY version_min_including DESC LIMIT 1");
3582            }
3583        }
3584
3585        let params_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> =
3586            params.iter().map(|p| p.as_ref() as _).collect();
3587
3588        let row = tx
3589            .query_one(&sql, &params_refs)
3590            .await
3591            .map_err(DbErrorRead::from)?;
3592
3593        let component_id: Json<ComponentId> = get!(row, "component_id");
3594        let component_id = component_id.0;
3595
3596        let version_min_including = Version::try_from(get!(row, "version_min_including", i64))
3597            .map_err(|_| consistency_db_err("version must be non-negative"))?;
3598
3599        let version_max_excluding = Version::try_from(get!(row, "version_max_excluding", i64))
3600            .map_err(|_| consistency_db_err("version must be non-negative"))?;
3601
3602        // wasm_backtrace stored as JSONB
3603        let wasm_backtrace: Json<WasmBacktrace> = get!(row, "wasm_backtrace");
3604        let wasm_backtrace = wasm_backtrace.0;
3605
3606        tx.commit().await.map_err(DbErrorRead::from)?;
3607
3608        Ok(BacktraceInfo {
3609            execution_id: execution_id.clone(),
3610            component_id,
3611            version_min_including,
3612            version_max_excluding,
3613            wasm_backtrace,
3614        })
3615    }
3616
3617    async fn list_executions(
3618        &self,
3619        ffqn: Option<FunctionFqn>,
3620        top_level_only: bool,
3621        pagination: ExecutionListPagination,
3622    ) -> Result<Vec<ExecutionWithState>, DbErrorGeneric> {
3623        let mut client_guard = self.client.lock().await;
3624        let tx = client_guard
3625            .transaction()
3626            .await
3627            .map_err(DbErrorGeneric::from)?;
3628
3629        let result = list_executions(&tx, ffqn.as_ref(), top_level_only, &pagination).await?;
3630
3631        tx.commit().await.map_err(DbErrorGeneric::from)?;
3632        Ok(result)
3633    }
3634
3635    #[instrument(level = Level::DEBUG, skip(self))]
3636    async fn list_execution_events(
3637        &self,
3638        execution_id: &ExecutionId,
3639        since: &Version,
3640        max_length: VersionType,
3641        include_backtrace_id: bool,
3642    ) -> Result<Vec<ExecutionEvent>, DbErrorRead> {
3643        let mut client_guard = self.client.lock().await;
3644        let tx = client_guard
3645            .transaction()
3646            .await
3647            .map_err(DbErrorRead::from)?;
3648
3649        let result = list_execution_events(
3650            &tx,
3651            execution_id,
3652            since.0,
3653            since.0 + max_length,
3654            include_backtrace_id,
3655        )
3656        .await?;
3657
3658        tx.commit().await.map_err(DbErrorRead::from)?;
3659        Ok(result)
3660    }
3661
3662    async fn list_responses(
3663        &self,
3664        execution_id: &ExecutionId,
3665        pagination: Pagination<u32>,
3666    ) -> Result<Vec<ResponseWithCursor>, DbErrorRead> {
3667        let mut client_guard = self.client.lock().await;
3668        let tx = client_guard
3669            .transaction()
3670            .await
3671            .map_err(DbErrorRead::from)?;
3672
3673        let result = list_responses(&tx, execution_id, Some(pagination)).await?;
3674
3675        tx.commit().await.map_err(DbErrorRead::from)?;
3676        Ok(result)
3677    }
3678
3679    async fn upgrade_execution_component(
3680        &self,
3681        execution_id: &ExecutionId,
3682        old: &InputContentDigest,
3683        new: &InputContentDigest,
3684    ) -> Result<(), DbErrorWrite> {
3685        let mut client_guard = self.client.lock().await;
3686        let tx = client_guard
3687            .transaction()
3688            .await
3689            .map_err(DbErrorGeneric::from)?;
3690
3691        upgrade_execution_component(&tx, execution_id, old, new).await?;
3692
3693        tx.commit().await.map_err(DbErrorGeneric::from)?;
3694        Ok(())
3695    }
3696}
3697
3698#[async_trait]
3699impl DbPoolCloseable for PostgresPool {
3700    async fn close(&self) {
3701        self.pool.close();
3702    }
3703}
3704
3705#[cfg(feature = "test")]
3706#[async_trait]
3707impl concepts::storage::DbConnectionTest for PostgresConnection {
3708    #[instrument(level = Level::DEBUG, skip(self, response_event), fields(join_set_id = %response_event.join_set_id))]
3709    async fn append_response(
3710        &self,
3711        created_at: DateTime<Utc>,
3712        execution_id: ExecutionId,
3713        response_event: JoinSetResponseEvent,
3714    ) -> Result<(), DbErrorWrite> {
3715        debug!("append_response");
3716        let event = JoinSetResponseEventOuter {
3717            created_at,
3718            event: response_event,
3719        };
3720
3721        let mut client_guard = self.client.lock().await;
3722        let tx = client_guard
3723            .transaction()
3724            .await
3725            .map_err(DbErrorGeneric::from)?;
3726
3727        let notifier = append_response(&tx, &execution_id, event).await?;
3728
3729        tx.commit().await.map_err(DbErrorGeneric::from)?;
3730        drop(client_guard);
3731
3732        self.notify_all(vec![notifier], created_at);
3733        Ok(())
3734    }
3735
3736    #[instrument(level = Level::DEBUG, skip(self))]
3737    async fn get(
3738        &self,
3739        execution_id: &ExecutionId,
3740    ) -> Result<concepts::storage::ExecutionLog, DbErrorRead> {
3741        trace!("get");
3742        let mut client_guard = self.client.lock().await;
3743        let tx = client_guard
3744            .transaction()
3745            .await
3746            .map_err(DbErrorRead::from)?;
3747
3748        let res = get(&tx, execution_id).await?;
3749
3750        tx.commit().await.map_err(DbErrorRead::from)?;
3751        Ok(res)
3752    }
3753}
3754
3755#[cfg(feature = "test")]
3756impl PostgresPool {
3757    pub async fn drop_database(&self) {
3758        let mut cfg = Config::new();
3759        cfg.host = Some(self.config.host.clone());
3760        cfg.user = Some(self.config.user.clone());
3761        cfg.password = Some(self.config.password.clone());
3762        cfg.dbname = Some(ADMIN_DB_NAME.into());
3763        cfg.manager = Some(ManagerConfig {
3764            recycling_method: RecyclingMethod::Fast,
3765        });
3766
3767        let pool = cfg
3768            .create_pool(None, NoTls)
3769            .map_err(|err| {
3770                error!("Cannot create the default pool - {err:?}");
3771                InitializationError
3772            })
3773            .unwrap();
3774
3775        let client = pool
3776            .get()
3777            .await
3778            .map_err(|err| {
3779                error!("Cannot get a connection from the default pool - {err:?}");
3780                InitializationError
3781            })
3782            .unwrap();
3783        for _ in 0..3 {
3784            let res = client
3785                .execute(&format!("DROP DATABASE {}", self.config.db_name), &[])
3786                .await; // Waits 1s on error, no need to sleep more.
3787            if res.is_ok() {
3788                debug!("Database '{}' dropped.", self.config.db_name);
3789                return;
3790            }
3791            debug!("Dropping db failed - {res:?}",);
3792        }
3793        warn!("Did not drop database {}", self.config.db_name);
3794    }
3795}