Skip to main content

obeli_sk_db_postgres/
postgres_dao.rs

1use crate::postgres_dao::ddl::ADMIN_DB_NAME;
2use async_trait::async_trait;
3use chrono::{DateTime, Utc};
4use concepts::{
5    ComponentId, ComponentRetryConfig, ComponentType, ExecutionId, FunctionFqn, JoinSetId,
6    StrVariant, SupportedFunctionReturnValue,
7    component_id::{ComponentDigest, Digest},
8    prefixed_ulid::{DelayId, DeploymentId, ExecutionIdDerived, ExecutorId, RunId},
9    storage::{
10        AppendBatchResponse, AppendDelayResponseOutcome, AppendEventsToExecution, AppendRequest,
11        AppendResponse, AppendResponseToExecution, BacktraceFilter, BacktraceInfo,
12        ComponentMetadataRecord, CreateRequest, DUMMY_CREATED, DUMMY_HISTORY_EVENT, DbConnection,
13        DbErrorGeneric, DbErrorRead, DbErrorReadWithTimeout, DbErrorStubResponse, DbErrorWrite,
14        DbErrorWriteNonRetriable, DbExecutor, DbExternalApi, DbPool, DbPoolCloseable,
15        DeploymentComponentDetail, DeploymentComponentRecord, DeploymentRecord, DeploymentState,
16        DeploymentStatus, ExecutionEvent, ExecutionListPagination, ExecutionRequest,
17        ExecutionWithState, ExecutionWithStateRequestsResponses, ExpiredDelay, ExpiredLock,
18        ExpiredTimer, HISTORY_EVENT_TYPE_JOIN_NEXT, HistoryEvent, JoinSetRequest, JoinSetResponse,
19        JoinSetResponseEvent, JoinSetResponseEventOuter, ListExecutionEventsResponse,
20        ListExecutionsFilter, ListLogsResponse, ListResponsesResponse, LockPendingResponse, Locked,
21        LockedBy, LockedExecution, LogEntry, LogEntryRow, LogFilter, LogInfoAppendRow, LogLevel,
22        LogStreamType, Pagination, PendingState, PendingStateBlockedByJoinSet,
23        PendingStateFinishedResultKind, PendingStateMergedPause, ResponseCursor,
24        ResponseWithCursor, STATE_BLOCKED_BY_JOIN_SET, STATE_FINISHED, STATE_LOCKED,
25        STATE_PENDING_AT, TimeoutOutcome, Version, VersionType, WasmBacktrace,
26    },
27};
28use db_common::{
29    AppendNotifier, CombinedState, CombinedStateDTO, NotifierExecutionFinished, NotifierPendingAt,
30    PendingFfqnSubscribersHolder,
31};
32use deadpool_postgres::{Client, ManagerConfig, Pool, RecyclingMethod};
33use hashbrown::HashMap;
34use secrecy::{ExposeSecret as _, SecretString};
35use sha2::{Digest as _, Sha256};
36use std::{collections::VecDeque, pin::Pin, str::FromStr as _, sync::Arc, time::Duration};
37use std::{fmt::Write as _, panic::Location};
38use strum::IntoEnumIterator as _;
39use tokio::sync::{mpsc, oneshot};
40use tokio_postgres::{
41    NoTls, Row, Transaction,
42    row::RowIndex,
43    types::{FromSql, Json, ToSql},
44};
45use tracing::{Level, debug, error, info, instrument, trace, warn};
46use tracing_error::SpanTrace;
47
48#[track_caller]
49fn get<'a, T: FromSql<'a>, I: RowIndex + std::fmt::Display + Copy>(
50    row: &'a Row,
51    name: I,
52) -> Result<T, DbErrorGeneric> {
53    match row.try_get(name) {
54        Ok(ok) => Ok(ok),
55        Err(err) => {
56            // no map_err, cannot attach `track_caller`
57            Err(consistency_db_err(format!(
58                "Failed to retrieve column '{name}': {err:?}"
59            )))
60        }
61    }
62}
63
64mod ddl {
65    pub const ADMIN_DB_NAME: &str = "postgres";
66}
67
68mod embedded {
69    refinery::embed_migrations!("migrations");
70}
71
72#[derive(Debug, Clone)]
73pub struct PostgresConfig {
74    pub host: String,
75    pub user: String,
76    pub password: SecretString,
77    pub db_name: String,
78}
79
80#[derive(Debug, thiserror::Error)]
81#[error("initialization error")]
82pub struct InitializationError;
83
84async fn create_database(
85    config: &PostgresConfig,
86    provision_policy: ProvisionPolicy,
87) -> Result<DbInitialzationOutcome, InitializationError> {
88    let mut admin_cfg = deadpool_postgres::Config::new();
89    admin_cfg.host = Some(config.host.clone());
90    admin_cfg.user = Some(config.user.clone());
91    admin_cfg.password = Some(config.password.expose_secret().to_string());
92    admin_cfg.dbname = Some(ADMIN_DB_NAME.into());
93    admin_cfg.manager = Some(ManagerConfig {
94        recycling_method: RecyclingMethod::Fast,
95    });
96
97    let admin_pool = admin_cfg.create_pool(None, NoTls).map_err(|err| {
98        error!("Cannot create the default pool - {err:?}");
99        InitializationError
100    })?;
101
102    let client = admin_pool.get().await.map_err(|err| {
103        error!("Cannot get a connection from the default pool - {err:?}");
104        InitializationError
105    })?;
106
107    let row = client
108        .query_opt(
109            &format!(
110                "SELECT 1 FROM pg_database WHERE datname = '{}'",
111                config.db_name
112            ),
113            &[],
114        )
115        .await
116        .map_err(|err| {
117            error!("Cannot select from the default database - {err:?}");
118            InitializationError
119        })?;
120
121    match (row, provision_policy) {
122        (None, ProvisionPolicy::MustCreate | ProvisionPolicy::Auto) => {
123            client
124                .execute(&format!("CREATE DATABASE {}", config.db_name), &[])
125                .await
126                .map_err(|err| {
127                    error!("Cannot create the database - {err:?}");
128                    InitializationError
129                })?;
130            info!("Database '{}' created.", config.db_name);
131            Ok(DbInitialzationOutcome::Created)
132        }
133        (Some(_), ProvisionPolicy::Auto) => {
134            info!("Database '{}' exists.", config.db_name);
135            Ok(DbInitialzationOutcome::Existing)
136        }
137        (Some(_), ProvisionPolicy::MustCreate) => {
138            warn!("Database '{}' already exists.", config.db_name);
139            Err(InitializationError)
140        }
141        (_, ProvisionPolicy::NeverCreate) => unreachable!("checked by the caller"),
142    }
143}
144
145// All mutexes here have a very short critical section completely controlled by this module, thus using std mutex.
146type ResponseSubscribers =
147    Arc<std::sync::Mutex<HashMap<ExecutionId, (oneshot::Sender<ResponseWithCursor>, u64)>>>;
148type PendingSubscribers = Arc<std::sync::Mutex<PendingFfqnSubscribersHolder>>;
149type ExecutionFinishedSubscribers = std::sync::Mutex<
150    HashMap<ExecutionId, HashMap<u64, oneshot::Sender<SupportedFunctionReturnValue>>>,
151>;
152
153pub struct PostgresPool {
154    pool: Pool,
155    response_subscribers: ResponseSubscribers,
156    pending_subscribers: PendingSubscribers,
157    execution_finished_subscribers: Arc<ExecutionFinishedSubscribers>,
158    // only for tests.
159    pub config: PostgresConfig,
160}
161
162#[async_trait]
163impl DbPool for PostgresPool {
164    async fn db_exec_conn(&self) -> Result<Box<dyn DbExecutor>, DbErrorGeneric> {
165        let client = self.pool.get().await?;
166
167        Ok(Box::new(PostgresConnection {
168            client: tokio::sync::Mutex::new(client),
169            response_subscribers: self.response_subscribers.clone(),
170            pending_subscribers: self.pending_subscribers.clone(),
171            execution_finished_subscribers: self.execution_finished_subscribers.clone(),
172        }))
173    }
174
175    async fn connection(&self) -> Result<Box<dyn DbConnection>, DbErrorGeneric> {
176        let client = self.pool.get().await?;
177
178        Ok(Box::new(PostgresConnection {
179            client: tokio::sync::Mutex::new(client),
180            response_subscribers: self.response_subscribers.clone(),
181            pending_subscribers: self.pending_subscribers.clone(),
182            execution_finished_subscribers: self.execution_finished_subscribers.clone(),
183        }))
184    }
185
186    async fn external_api_conn(&self) -> Result<Box<dyn DbExternalApi>, DbErrorGeneric> {
187        let client = self.pool.get().await?;
188
189        Ok(Box::new(PostgresConnection {
190            client: tokio::sync::Mutex::new(client),
191            response_subscribers: self.response_subscribers.clone(),
192            pending_subscribers: self.pending_subscribers.clone(),
193            execution_finished_subscribers: self.execution_finished_subscribers.clone(),
194        }))
195    }
196
197    #[cfg(feature = "test")]
198    async fn connection_test(
199        &self,
200    ) -> Result<Box<dyn concepts::storage::DbConnectionTest>, DbErrorGeneric> {
201        let client = self.pool.get().await?;
202
203        Ok(Box::new(PostgresConnection {
204            client: tokio::sync::Mutex::new(client),
205            response_subscribers: self.response_subscribers.clone(),
206            pending_subscribers: self.pending_subscribers.clone(),
207            execution_finished_subscribers: self.execution_finished_subscribers.clone(),
208        }))
209    }
210}
211
212pub struct PostgresConnection {
213    client: tokio::sync::Mutex<Client>, // Callers should not hold onto a connection for too long but it is not controlled by this module, thus tokio mutex.
214    response_subscribers: ResponseSubscribers,
215    pending_subscribers: PendingSubscribers,
216    execution_finished_subscribers: Arc<ExecutionFinishedSubscribers>,
217}
218
219#[derive(Debug, Clone, Copy, PartialEq, Eq)]
220pub enum ProvisionPolicy {
221    NeverCreate,
222    /// Create database if it does not exist.
223    Auto,
224    /// Only for tests: Fail if database already exists.
225    MustCreate,
226}
227
228#[derive(Debug, Clone, Copy, PartialEq, Eq)]
229pub enum DbInitialzationOutcome {
230    Created,
231    Existing,
232}
233
234impl PostgresPool {
235    #[instrument(skip_all, name = "postgres_new")]
236    pub async fn new(
237        config: PostgresConfig,
238        provision_policy: ProvisionPolicy,
239    ) -> Result<PostgresPool, InitializationError> {
240        Self::new_with_outcome(config, provision_policy)
241            .await
242            .map(|(db, _)| db)
243    }
244
245    pub async fn new_with_outcome(
246        config: PostgresConfig,
247        provision_policy: ProvisionPolicy,
248    ) -> Result<(PostgresPool, DbInitialzationOutcome), InitializationError> {
249        let outcome = if matches!(
250            provision_policy,
251            ProvisionPolicy::Auto | ProvisionPolicy::MustCreate
252        ) {
253            create_database(&config, provision_policy).await?
254        } else {
255            DbInitialzationOutcome::Existing
256        };
257        let mut cfg = deadpool_postgres::Config::new();
258        cfg.host = Some(config.host.clone());
259        cfg.user = Some(config.user.clone());
260        cfg.password = Some(config.password.expose_secret().to_string());
261        cfg.dbname = Some(config.db_name.clone());
262        cfg.manager = Some(ManagerConfig {
263            recycling_method: RecyclingMethod::Fast,
264        });
265
266        let pool = cfg.create_pool(None, NoTls).map_err(|err| {
267            error!("Cannot create the database pool - {err:?}");
268            InitializationError
269        })?;
270        let mut client = pool.get().await.map_err(|err| {
271            error!("Cannot get a connection from the database pool - {err:?}");
272            InitializationError
273        })?;
274
275        embedded::migrations::runner()
276            .run_async(&mut **client)
277            .await
278            .map_err(|err| {
279                error!("Cannot run migrations - {err:?}");
280                InitializationError
281            })?;
282
283        debug!("Database schema initialized.");
284
285        Ok((
286            PostgresPool {
287                pool,
288                execution_finished_subscribers: Arc::default(),
289                pending_subscribers: Arc::default(),
290                response_subscribers: Arc::default(),
291                config,
292            },
293            outcome,
294        ))
295    }
296}
297
298fn deployment_record_from_pg_row(row: &Row) -> Result<DeploymentRecord, DbErrorRead> {
299    let deployment_id_str: String = get(row, "deployment_id")?;
300    let deployment_id = deployment_id_str.parse::<DeploymentId>().map_err(|e| {
301        DbErrorRead::Generic(consistency_db_err(format!("invalid deployment_id: {e}")))
302    })?;
303    let status_str: String = get(row, "status")?;
304    let status = status_str
305        .parse::<DeploymentStatus>()
306        .map_err(|e| DbErrorRead::Generic(consistency_db_err(format!("invalid status: {e}"))))?;
307    Ok(DeploymentRecord {
308        deployment_id,
309        created_at: get(row, "created_at")?,
310        last_active_at: get(row, "last_active_at")?,
311        status,
312        config_json: get(row, "config_json")?,
313        obelisk_version: get(row, "obelisk_version")?,
314        created_by: get(row, "created_by")?,
315    })
316}
317
318fn deployment_component_detail_from_pg_row(
319    row: &Row,
320) -> Result<DeploymentComponentDetail, DbErrorRead> {
321    let component_name: String = get(row, "component_name")?;
322    let component_type: ComponentType =
323        get::<String, _>(row, "component_type")?
324            .parse()
325            .map_err(|err| {
326                DbErrorRead::Generic(consistency_db_err(format!("invalid component_type: {err}")))
327            })?;
328    let component_digest = ComponentDigest(Digest(
329        get::<Vec<u8>, _>(row, "component_digest")?
330            .try_into()
331            .map_err(|_| {
332                DbErrorRead::Generic(consistency_db_err("invalid component_digest length"))
333            })?,
334    ));
335    let component_id = ComponentId::new(
336        component_type,
337        StrVariant::from(component_name),
338        component_digest,
339    )
340    .map_err(|err| DbErrorRead::Generic(consistency_db_err(err.to_string())))?;
341    let imports =
342        get::<Json<Vec<concepts::storage::PersistedFunctionMetadata>>, _>(row, "imports_json")?.0;
343    let exports =
344        get::<Json<Vec<concepts::storage::PersistedFunctionMetadata>>, _>(row, "exports_json")?.0;
345    let wit: String = get(row, "wit")?;
346    Ok(DeploymentComponentDetail {
347        component_id,
348        imports,
349        exports,
350        wit,
351    })
352}
353
354#[track_caller]
355fn consistency_db_err(reason: impl Into<StrVariant>) -> DbErrorGeneric {
356    DbErrorGeneric::Uncategorized {
357        reason: reason.into(),
358        context: SpanTrace::capture(),
359        source: None,
360        loc: Location::caller(),
361    }
362}
363#[track_caller]
364fn consistency_db_err_src(
365    reason: impl Into<StrVariant>,
366    source: Arc<dyn std::error::Error + Send + Sync>,
367) -> DbErrorGeneric {
368    DbErrorGeneric::Uncategorized {
369        reason: reason.into(),
370        context: SpanTrace::capture(),
371        source: Some(source),
372        loc: Location::caller(),
373    }
374}
375
376#[derive(Debug, Clone)]
377struct DelayReq {
378    join_set_id: JoinSetId,
379    delay_id: DelayId,
380    expires_at: DateTime<Utc>,
381    paused: bool,
382}
383
384async fn fetch_created_event(
385    tx: &Transaction<'_>,
386    execution_id: &ExecutionId,
387) -> Result<CreateRequest, DbErrorRead> {
388    let stmt = "SELECT created_at, json_value FROM t_execution_log WHERE \
389                execution_id = $1 AND version = 0";
390
391    let row = tx.query_one(stmt, &[&execution_id.to_string()]).await?;
392
393    let created_at = get(&row, "created_at")?;
394    let event: Json<ExecutionRequest> = get(&row, "json_value")?;
395    let event = event.0;
396
397    if let ExecutionRequest::Created {
398        ffqn,
399        params,
400        parent,
401        scheduled_at,
402        component_id,
403        deployment_id,
404        metadata,
405        scheduled_by,
406    } = event
407    {
408        Ok(CreateRequest {
409            created_at,
410            execution_id: execution_id.clone(),
411            ffqn,
412            params,
413            parent,
414            scheduled_at,
415            component_id,
416            deployment_id,
417            metadata,
418            scheduled_by,
419            paused: false,
420        })
421    } else {
422        error!("Row with version=0 must be a `Created` event - {event:?}");
423        Err(consistency_db_err("expected `Created` event").into())
424    }
425}
426
427fn check_expected_next_and_appending_version(
428    expected_version: &Version,
429    appending_version: &Version,
430) -> Result<(), DbErrorWrite> {
431    if *expected_version != *appending_version {
432        debug!(
433            "Version conflict - expected: {expected_version:?}, appending: {appending_version:?}"
434        );
435        return Err(DbErrorWrite::NonRetriable(
436            DbErrorWriteNonRetriable::VersionConflict {
437                expected: expected_version.clone(),
438                requested: appending_version.clone(),
439            },
440        ));
441    }
442    Ok(())
443}
444
445#[instrument(level = Level::DEBUG, skip_all, fields(execution_id = %req.execution_id))]
446async fn create_inner(
447    tx: &Transaction<'_>,
448    req: CreateRequest,
449) -> Result<(AppendResponse, AppendNotifier), DbErrorWrite> {
450    trace!("create_inner");
451
452    let version = Version::default();
453    let execution_id = req.execution_id.clone();
454    let execution_id_str = execution_id.to_string();
455    let ffqn = req.ffqn.clone();
456    let created_at = req.created_at;
457    let scheduled_at = req.scheduled_at;
458    let component_id = req.component_id.clone();
459    let deployment_id = req.deployment_id;
460    let paused = req.paused;
461
462    let event = ExecutionRequest::from(req);
463    let event = Json(event);
464
465    tx.execute(
466        "INSERT INTO t_execution_log (
467            execution_id, created_at, version, json_value, variant, join_set_id
468        ) VALUES ($1, $2, $3, $4, $5, $6)",
469        &[
470            &execution_id_str,
471            &created_at,
472            &i64::from(version.0), // BIGINT
473            &event,
474            &event.0.variant(),
475            &event.0.join_set_id().map(std::string::ToString::to_string),
476        ],
477    )
478    .await?;
479
480    let pending_at = {
481        debug!("Creating with `Pending(`{scheduled_at:?}`)");
482
483        tx.execute(
484            r"
485            INSERT INTO t_state (
486                execution_id,
487                is_top_level,
488                corresponding_version,
489                pending_expires_finished,
490                ffqn,
491                state,
492                created_at,
493                component_id_input_digest,
494                component_type,
495                deployment_id,
496                first_scheduled_at,
497                updated_at,
498                intermittent_event_count,
499                is_paused
500            ) VALUES (
501                $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, CURRENT_TIMESTAMP, 0, $12
502            )",
503            &[
504                &execution_id_str,
505                &execution_id.is_top_level(),
506                &i64::from(version.0),
507                &scheduled_at,
508                &ffqn.to_string(),
509                &STATE_PENDING_AT,
510                &created_at,
511                &component_id.component_digest.as_slice(),
512                &component_id.component_type.to_string(),
513                &deployment_id.to_string(),
514                &scheduled_at,
515                &false,
516            ], // paused set to false here to avoid "execution is already paused" error.
517        )
518        .await?;
519
520        AppendNotifier {
521            pending_at: if paused {
522                None
523            } else {
524                Some(NotifierPendingAt {
525                    scheduled_at,
526                    ffqn: ffqn.clone(),
527                    component_input_digest: component_id.component_digest,
528                })
529            },
530            execution_finished: None,
531            response: None,
532        }
533    };
534
535    let mut next_version = Version::new(version.0 + 1);
536    if paused {
537        let (v, _) = append(
538            tx,
539            &execution_id,
540            AppendRequest {
541                created_at,
542                event: ExecutionRequest::Paused,
543            },
544            next_version,
545        )
546        .await?;
547        next_version = v;
548    }
549    Ok((next_version, pending_at))
550}
551
552#[instrument(level = Level::DEBUG, skip_all, fields(%execution_id, %scheduled_at))]
553async fn update_state_pending_after_response_appended(
554    tx: &Transaction<'_>,
555    execution_id: &ExecutionId,
556    scheduled_at: DateTime<Utc>, // Changing to state PendingAt
557    component_input_digest: ComponentDigest,
558) -> Result<AppendNotifier, DbErrorWrite> {
559    debug!("Setting t_state to Pending(`{scheduled_at:?}`) after response appended");
560
561    // Convert types for Postgres arguments
562    let execution_id_str = execution_id.to_string();
563
564    let updated = tx
565        .execute(
566            r"
567            UPDATE t_state
568            SET
569                pending_expires_finished = $1,
570                state = $2,
571                updated_at = CURRENT_TIMESTAMP,
572
573                max_retries = NULL,
574                retry_exp_backoff_millis = NULL,
575                last_lock_version = NULL,
576
577                join_set_id = NULL,
578                join_set_closing = NULL,
579
580                result_kind = NULL
581            WHERE execution_id = $3
582            ",
583            &[
584                &scheduled_at,     // $1
585                &STATE_PENDING_AT, // $2
586                &execution_id_str, // $3
587            ],
588        )
589        .await?;
590
591    if updated == 0 {
592        return Err(DbErrorWrite::NotFound);
593    }
594
595    Ok(AppendNotifier {
596        pending_at: Some(NotifierPendingAt {
597            scheduled_at,
598            ffqn: fetch_created_event(tx, execution_id).await?.ffqn,
599            component_input_digest,
600        }),
601        execution_finished: None,
602        response: None,
603    })
604}
605
606#[instrument(level = Level::DEBUG, skip_all, fields(%execution_id, %scheduled_at, %appending_version))]
607async fn update_state_pending_after_event_appended(
608    tx: &Transaction<'_>,
609    execution_id: &ExecutionId,
610    appending_version: &Version,
611    scheduled_at: DateTime<Utc>,
612    intermittent_failure: bool,
613    component_input_digest: ComponentDigest,
614) -> Result<(AppendResponse, AppendNotifier), DbErrorWrite> {
615    debug!("Setting t_state to Pending(`{scheduled_at:?}`) after event appended");
616
617    let intermittent_delta = i64::from(intermittent_failure); // 0 or 1
618
619    let updated = tx
620        .execute(
621            r"
622            UPDATE t_state
623            SET
624                corresponding_version = $1,
625                pending_expires_finished = $2,
626                state = $3,
627                updated_at = CURRENT_TIMESTAMP,
628                intermittent_event_count = intermittent_event_count + $4,
629
630                max_retries = NULL,
631                retry_exp_backoff_millis = NULL,
632                last_lock_version = NULL,
633
634                join_set_id = NULL,
635                join_set_closing = NULL,
636
637                result_kind = NULL
638            WHERE execution_id = $5;
639            ",
640            &[
641                &i64::from(appending_version.0), // $1
642                &scheduled_at,                   // $2
643                &STATE_PENDING_AT,               // $3
644                &intermittent_delta,             // $4
645                &execution_id.to_string(),       // $5
646            ],
647        )
648        .await?;
649
650    if updated != 1 {
651        return Err(DbErrorWrite::NotFound);
652    }
653
654    Ok((
655        appending_version.increment(),
656        AppendNotifier {
657            pending_at: Some(NotifierPendingAt {
658                scheduled_at,
659                ffqn: fetch_created_event(tx, execution_id).await?.ffqn,
660                component_input_digest,
661            }),
662            execution_finished: None,
663            response: None,
664        },
665    ))
666}
667
668#[instrument(level = Level::DEBUG, skip_all, fields(%execution_id, %appending_version))]
669#[expect(clippy::too_many_arguments)]
670async fn update_state_locked_get_intermittent_event_count(
671    tx: &Transaction<'_>,
672    execution_id: &ExecutionId,
673    deployment_id: DeploymentId,
674    component_digest: &ComponentDigest,
675    executor_id: ExecutorId,
676    run_id: RunId,
677    lock_expires_at: DateTime<Utc>,
678    appending_version: &Version,
679    retry_config: ComponentRetryConfig,
680) -> Result<u32, DbErrorWrite> {
681    debug!("Setting t_state to Locked(`{lock_expires_at:?}`)");
682    let backoff_millis =
683        i64::try_from(retry_config.retry_exp_backoff.as_millis()).map_err(|err| {
684            // BIGINT = i64
685            DbErrorGeneric::Uncategorized {
686                reason: "backoff too big".into(),
687                context: SpanTrace::capture(),
688                source: Some(Arc::new(err)),
689                loc: Location::caller(),
690            }
691        })?;
692
693    let execution_id_str = execution_id.to_string();
694
695    let updated = tx
696        .execute(
697            r"
698            UPDATE t_state
699            SET
700                corresponding_version = $1,
701                pending_expires_finished = $2,
702                state = $3,
703                updated_at = CURRENT_TIMESTAMP,
704                deployment_id = $4,
705                component_id_input_digest = $5,
706
707                max_retries = $6,
708                retry_exp_backoff_millis = $7,
709                last_lock_version = $1,
710                executor_id = $8,
711                run_id = $9,
712
713                join_set_id = NULL,
714                join_set_closing = NULL,
715
716                result_kind = NULL
717            WHERE execution_id = $10
718            AND is_paused = false
719            ",
720            &[
721                &i64::from(appending_version.0),
722                &lock_expires_at,
723                &STATE_LOCKED,
724                &deployment_id.to_string(),
725                &component_digest,
726                &retry_config.max_retries.map(i64::from),
727                &backoff_millis,
728                &executor_id.to_string(),
729                &run_id.to_string(),
730                &execution_id_str,
731            ],
732        )
733        .await?;
734
735    if updated != 1 {
736        return Err(DbErrorWrite::NotFound);
737    }
738
739    // fetch intermittent event count
740    let row = tx
741        .query_one(
742            "SELECT intermittent_event_count FROM t_state WHERE execution_id = $1",
743            &[&execution_id_str],
744        )
745        .await
746        .map_err(DbErrorGeneric::from)?;
747
748    let count: i64 = get(&row, "intermittent_event_count")?; // Postgres BIGINT
749    let count = u32::try_from(count)
750        .map_err(|_| consistency_db_err("`intermittent_event_count` must not be negative"))?;
751    Ok(count)
752}
753
754#[instrument(level = Level::DEBUG, skip_all, fields(%execution_id, %appending_version))]
755async fn update_state_blocked(
756    tx: &Transaction<'_>,
757    execution_id: &ExecutionId,
758    appending_version: &Version,
759    join_set_id: &JoinSetId,
760    lock_expires_at: DateTime<Utc>,
761    join_set_closing: bool,
762) -> Result<AppendResponse, DbErrorWrite> {
763    debug!("Setting t_state to BlockedByJoinSet(`{join_set_id}`)");
764
765    let updated = tx
766        .execute(
767            r"
768            UPDATE t_state
769            SET
770                corresponding_version = $1,
771                pending_expires_finished = $2,
772                state = $3,
773                updated_at = CURRENT_TIMESTAMP,
774
775                max_retries = NULL,
776                retry_exp_backoff_millis = NULL,
777                last_lock_version = NULL,
778
779                join_set_id = $4,
780                join_set_closing = $5,
781
782                result_kind = NULL
783            WHERE execution_id = $6
784            ",
785            &[
786                &i64::from(appending_version.0), // $1
787                &lock_expires_at,                // $2
788                &STATE_BLOCKED_BY_JOIN_SET,      // $3
789                &join_set_id.to_string(),        // $4
790                &join_set_closing,               // $5 (BOOLEAN)
791                &execution_id.to_string(),       // $6
792            ],
793        )
794        .await?;
795
796    if updated != 1 {
797        return Err(DbErrorWrite::NotFound);
798    }
799    Ok(appending_version.increment())
800}
801
802#[instrument(level = Level::DEBUG, skip_all, fields(%execution_id, %appending_version))]
803async fn update_state_finished(
804    tx: &Transaction<'_>,
805    execution_id: &ExecutionId,
806    appending_version: &Version,
807    finished_at: DateTime<Utc>,
808    result_kind: PendingStateFinishedResultKind,
809) -> Result<(), DbErrorWrite> {
810    debug!("Setting t_state to Finished");
811
812    let result_kind_json = Json(result_kind);
813
814    let updated = tx
815        .execute(
816            r"
817            UPDATE t_state
818            SET
819                corresponding_version = $1,
820                pending_expires_finished = $2,
821                state = $3,
822                updated_at = CURRENT_TIMESTAMP,
823
824                max_retries = NULL,
825                retry_exp_backoff_millis = NULL,
826                last_lock_version = NULL,
827                executor_id = NULL,
828                run_id = NULL,
829
830                join_set_id = NULL,
831                join_set_closing = NULL,
832
833                is_paused = false,
834                result_kind = $4
835            WHERE execution_id = $5
836            ",
837            &[
838                &i64::from(appending_version.0), // $1
839                &finished_at,                    // $2
840                &STATE_FINISHED,                 // $3
841                &result_kind_json,               // $4
842                &execution_id.to_string(),       // $5
843            ],
844        )
845        .await?;
846
847    if updated != 1 {
848        return Err(DbErrorWrite::NotFound);
849    }
850    Ok(())
851}
852
853#[instrument(level = Level::DEBUG, skip_all, fields(%execution_id, %appending_version, %is_paused))]
854async fn update_state_paused(
855    tx: &Transaction<'_>,
856    execution_id: &ExecutionId,
857    appending_version: &Version,
858    is_paused: bool,
859) -> Result<AppendResponse, DbErrorWrite> {
860    debug!(
861        "Setting t_state to {}",
862        if is_paused { "paused" } else { "unpaused" }
863    );
864
865    let updated = tx
866        .execute(
867            r"
868            UPDATE t_state
869            SET
870                corresponding_version = $1,
871                is_paused = $2,
872                updated_at = CURRENT_TIMESTAMP
873            WHERE execution_id = $3
874            ",
875            &[
876                &i64::from(appending_version.0), // $1
877                &is_paused,                      // $2
878                &execution_id.to_string(),       // $3
879            ],
880        )
881        .await?;
882
883    if updated != 1 {
884        return Err(DbErrorWrite::NotFound);
885    }
886    Ok(appending_version.increment())
887}
888
889#[instrument(level = Level::DEBUG, skip_all, fields(%execution_id, %appending_version))]
890async fn bump_state_next_version(
891    tx: &Transaction<'_>,
892    execution_id: &ExecutionId,
893    appending_version: &Version,
894    delay_req: Option<DelayReq>,
895) -> Result<AppendResponse, DbErrorWrite> {
896    debug!("update_index_version");
897    let execution_id_str = execution_id.to_string();
898
899    let updated = tx
900        .execute(
901            r"
902            UPDATE t_state
903            SET
904                corresponding_version = $1,
905                updated_at = CURRENT_TIMESTAMP
906            WHERE execution_id = $2
907            ",
908            &[
909                &i64::from(appending_version.0), // $1
910                &execution_id_str,               // $2
911            ],
912        )
913        .await?;
914
915    if updated != 1 {
916        return Err(DbErrorWrite::NotFound);
917    }
918
919    if let Some(DelayReq {
920        join_set_id,
921        delay_id,
922        expires_at,
923        paused,
924    }) = delay_req
925    {
926        debug!("Inserting delay to `t_delay`");
927        tx.execute(
928            "INSERT INTO t_delay (execution_id, join_set_id, delay_id, expires_at, is_paused) VALUES ($1, $2, $3, $4, $5)",
929            &[
930                &execution_id_str,
931                &join_set_id.to_string(),
932                &delay_id.to_string(),
933                &expires_at,
934                &paused,
935            ],
936        )
937        .await?;
938    }
939    Ok(appending_version.increment())
940}
941
942async fn get_combined_state(
943    tx: &Transaction<'_>,
944    execution_id: &ExecutionId,
945) -> Result<CombinedState, DbErrorRead> {
946    let row = tx
947        .query_one(
948            r"
949            SELECT
950                created_at, first_scheduled_at,
951                state, ffqn, component_id_input_digest, component_type, deployment_id, corresponding_version, pending_expires_finished,
952                last_lock_version, executor_id, run_id,
953                join_set_id, join_set_closing,
954                result_kind, is_paused
955            FROM t_state
956            WHERE execution_id = $1
957            ",
958            &[&execution_id.to_string()],
959        )
960        .await
961        .map_err(DbErrorRead::from)?;
962
963    // Parsing columns
964
965    let created_at: DateTime<Utc> = get(&row, "created_at")?;
966    let first_scheduled_at: DateTime<Utc> = get(&row, "first_scheduled_at")?;
967
968    let digest_bytes: Vec<u8> = get(&row, "component_id_input_digest")?;
969    let digest = Digest::try_from(digest_bytes.as_slice()).map_err(|err| {
970        consistency_db_err_src("cannot parse `component_id_input_digest`", Arc::from(err))
971    })?;
972    let component_digest = ComponentDigest(digest);
973
974    let component_type: String = get(&row, "component_type")?;
975    let component_type = ComponentType::from_str(&component_type)
976        .map_err(|err| consistency_db_err_src("cannot parse `component_type`", Arc::from(err)))?;
977
978    let deployment_id: String = get(&row, "deployment_id")?;
979    let deployment_id = DeploymentId::from_str(&deployment_id).map_err(DbErrorGeneric::from)?;
980
981    let state: String = get(&row, "state")?;
982    let ffqn: String = get(&row, "ffqn")?;
983    let ffqn = FunctionFqn::from_str(&ffqn).map_err(|parse_err| {
984        consistency_db_err(format!("invalid ffqn value in `t_state` - {parse_err}"))
985    })?;
986
987    let pending_expires_finished: DateTime<Utc> = get(&row, "pending_expires_finished")?;
988
989    let last_lock_version_raw: Option<i64> = get(&row, "last_lock_version")?;
990    let last_lock_version = last_lock_version_raw
991        .map(Version::try_from)
992        .transpose()
993        .map_err(|_| consistency_db_err("version must be non-negative"))?;
994
995    let executor_id_raw: Option<String> = get(&row, "executor_id")?;
996    let executor_id = executor_id_raw
997        .map(|id| ExecutorId::from_str(&id))
998        .transpose()
999        .map_err(DbErrorGeneric::from)?;
1000
1001    let run_id_raw: Option<String> = get(&row, "run_id")?;
1002    let run_id = run_id_raw
1003        .map(|id| RunId::from_str(&id))
1004        .transpose()
1005        .map_err(DbErrorGeneric::from)?;
1006
1007    let join_set_id_raw: Option<String> = get(&row, "join_set_id")?;
1008    let join_set_id = join_set_id_raw
1009        .map(|id| JoinSetId::from_str(&id))
1010        .transpose()
1011        .map_err(DbErrorGeneric::from)?;
1012
1013    let join_set_closing: Option<bool> = get(&row, "join_set_closing")?;
1014
1015    let result_kind: Option<Json<PendingStateFinishedResultKind>> = get(&row, "result_kind")?;
1016    let result_kind = result_kind.map(|it| it.0);
1017
1018    let is_paused: bool = get(&row, "is_paused")?;
1019
1020    let corresponding_version: i64 = get(&row, "corresponding_version")?;
1021    let corresponding_version = Version::new(
1022        VersionType::try_from(corresponding_version)
1023            .map_err(|_| consistency_db_err("version must be non-negative"))?,
1024    );
1025
1026    let dto = CombinedStateDTO {
1027        execution_id: execution_id.clone(),
1028        created_at,
1029        first_scheduled_at,
1030        state,
1031        ffqn,
1032        component_digest,
1033        component_type,
1034        deployment_id,
1035        pending_expires_finished,
1036        last_lock_version,
1037        executor_id,
1038        run_id,
1039        join_set_id,
1040        join_set_closing,
1041        result_kind,
1042        is_paused,
1043    };
1044    CombinedState::new(dto, corresponding_version).map_err(DbErrorRead::from)
1045}
1046
1047async fn list_executions(
1048    read_tx: &Transaction<'_>,
1049    filter: ListExecutionsFilter,
1050    pagination: &ExecutionListPagination,
1051) -> Result<Vec<ExecutionWithState>, DbErrorGeneric> {
1052    // Helper to manage dynamic WHERE clauses and positional parameters ($1, $2...)
1053    struct QueryBuilder {
1054        where_clauses: Vec<String>,
1055        params: Vec<Box<dyn ToSql + Send + Sync>>,
1056    }
1057
1058    impl QueryBuilder {
1059        fn new() -> Self {
1060            Self {
1061                where_clauses: Vec::new(),
1062                params: Vec::new(),
1063            }
1064        }
1065
1066        fn add_param<T>(&mut self, param: T) -> String
1067        where
1068            T: ToSql + Sync + Send + 'static,
1069        {
1070            self.params.push(Box::new(param));
1071            format!("${}", self.params.len())
1072        }
1073
1074        fn add_where(&mut self, clause: String) {
1075            self.where_clauses.push(clause);
1076        }
1077    }
1078
1079    let mut qb = QueryBuilder::new();
1080
1081    // Pagination Logic
1082    let (limit, limit_desc) = match pagination {
1083        ExecutionListPagination::CreatedBy(p) => {
1084            let limit = p.length();
1085            let is_desc = p.is_desc();
1086            if let Some(cursor) = p.cursor() {
1087                let placeholder = qb.add_param(*cursor);
1088                qb.add_where(format!("created_at {} {placeholder}", p.rel()));
1089            }
1090            (limit, is_desc)
1091        }
1092        ExecutionListPagination::ExecutionId(p) => {
1093            let limit = p.length();
1094            let is_desc = p.is_desc();
1095            if let Some(cursor) = p.cursor() {
1096                let placeholder = qb.add_param(cursor.to_string());
1097                qb.add_where(format!("execution_id {} {placeholder}", p.rel()));
1098            }
1099            (limit, is_desc)
1100        }
1101    };
1102
1103    if !filter.show_derived {
1104        qb.add_where("is_top_level = true".to_string());
1105    }
1106    if let Some(function_name_filter) = filter.function_name_filter {
1107        let placeholder = qb.add_param(function_name_filter.like_pattern());
1108        qb.add_where(format!("ffqn LIKE {placeholder}"));
1109    }
1110    let like = |value: String| format!("{value}%");
1111    if filter.hide_finished {
1112        qb.add_where(format!("state != '{STATE_FINISHED}'"));
1113    }
1114    if let Some(prefix) = filter.execution_id_prefix {
1115        let placeholder = qb.add_param(like(prefix));
1116        qb.add_where(format!("execution_id LIKE {placeholder}"));
1117    }
1118    if let Some(component_digest) = filter.component_digest {
1119        let placeholder = qb.add_param(component_digest);
1120        qb.add_where(format!("component_id_input_digest = {placeholder}"));
1121    }
1122    if let Some(deployment_id) = filter.deployment_id {
1123        let placeholder = qb.add_param(deployment_id.to_string());
1124        qb.add_where(format!("deployment_id = {placeholder}"));
1125    }
1126
1127    let where_str = if qb.where_clauses.is_empty() {
1128        String::new()
1129    } else {
1130        format!("WHERE {}", qb.where_clauses.join(" AND "))
1131    };
1132
1133    let order_col = match pagination {
1134        ExecutionListPagination::CreatedBy(_) => "created_at",
1135        ExecutionListPagination::ExecutionId(_) => "execution_id",
1136    };
1137
1138    // Inner query: fetch rows with cursor-based ordering
1139    // Outer query: always return results in descending order
1140    let (inner_order, outer_order) = if limit_desc {
1141        ("DESC", "")
1142    } else {
1143        ("", "DESC")
1144    };
1145
1146    let inner_sql = format!(
1147        r"SELECT created_at, first_scheduled_at, component_id_input_digest, component_type, deployment_id,
1148            state, execution_id, ffqn, corresponding_version, pending_expires_finished,
1149            last_lock_version, executor_id, run_id,
1150            join_set_id, join_set_closing,
1151            result_kind, is_paused
1152            FROM t_state {where_str} ORDER BY {order_col} {inner_order} LIMIT {limit}"
1153    );
1154
1155    let sql = if outer_order.is_empty() {
1156        inner_sql
1157    } else {
1158        format!("SELECT * FROM ({inner_sql}) AS sub ORDER BY {order_col} {outer_order}")
1159    };
1160
1161    let params_refs: Vec<&(dyn ToSql + Sync)> = qb
1162        .params
1163        .iter()
1164        .map(|p| p.as_ref() as &(dyn ToSql + Sync))
1165        .collect();
1166
1167    let rows = read_tx.query(&sql, &params_refs).await?;
1168
1169    let mut vec = Vec::with_capacity(rows.len());
1170
1171    for row in rows {
1172        // If parsing of the row fails, log and skip it.
1173        let unpack = || -> Result<ExecutionWithState, DbErrorGeneric> {
1174            let execution_id_str: String = get(&row, "execution_id")?;
1175            let execution_id = ExecutionId::from_str(&execution_id_str)
1176                .map_err(|err| consistency_db_err(err.to_string()))?;
1177
1178            let digest_bytes: Vec<u8> = get(&row, "component_id_input_digest")?;
1179            let digest = Digest::try_from(digest_bytes.as_slice()).map_err(|err| {
1180                consistency_db_err_src("cannot parse `component_id_input_digest`", Arc::from(err))
1181            })?;
1182            let component_digest = ComponentDigest(digest);
1183
1184            let component_type: String = get(&row, "component_type")?;
1185            let component_type = ComponentType::from_str(&component_type).map_err(|err| {
1186                consistency_db_err_src("cannot parse `component_type`", Arc::from(err))
1187            })?;
1188
1189            let deployment_id: String = get(&row, "deployment_id")?;
1190            let deployment_id =
1191                DeploymentId::from_str(&deployment_id).map_err(DbErrorGeneric::from)?;
1192
1193            let created_at: DateTime<Utc> = get(&row, "created_at")?;
1194            let first_scheduled_at: DateTime<Utc> = get(&row, "first_scheduled_at")?;
1195
1196            let result_kind: Option<Json<PendingStateFinishedResultKind>> =
1197                get(&row, "result_kind")?;
1198            let result_kind = result_kind.map(|it| it.0);
1199
1200            let is_paused: bool = get(&row, "is_paused")?;
1201
1202            let corresponding_version: i64 = get(&row, "corresponding_version")?;
1203            let corresponding_version = Version::try_from(corresponding_version)
1204                .map_err(|_| consistency_db_err("version must be non-negative"))?;
1205
1206            let executor_id_str: Option<String> = get(&row, "executor_id")?;
1207            let executor_id = executor_id_str
1208                .map(|id| ExecutorId::from_str(&id))
1209                .transpose()?;
1210
1211            let last_lock_version_raw: Option<i64> = get(&row, "last_lock_version")?;
1212            let last_lock_version = last_lock_version_raw
1213                .map(Version::try_from)
1214                .transpose()
1215                .map_err(|_| consistency_db_err("version must be non-negative"))?;
1216
1217            let run_id_str: Option<String> = get(&row, "run_id")?;
1218            let run_id = run_id_str.map(|id| RunId::from_str(&id)).transpose()?;
1219
1220            let join_set_id_str: Option<String> = get(&row, "join_set_id")?;
1221            let join_set_id = join_set_id_str
1222                .map(|id| JoinSetId::from_str(&id))
1223                .transpose()?;
1224
1225            let ffqn: String = get(&row, "ffqn")?;
1226            let ffqn = FunctionFqn::from_str(&ffqn).map_err(|parse_err| {
1227                error!("Error parsing ffqn - {parse_err:?}");
1228                consistency_db_err("invalid ffqn value in `t_state`")
1229            })?;
1230
1231            let combined_state_dto = CombinedStateDTO {
1232                execution_id,
1233                created_at,
1234                first_scheduled_at,
1235                component_digest,
1236                component_type,
1237                deployment_id,
1238                state: get(&row, "state")?,
1239                ffqn,
1240                pending_expires_finished: get(&row, "pending_expires_finished")?,
1241                executor_id,
1242                last_lock_version,
1243                run_id,
1244                join_set_id,
1245                join_set_closing: get(&row, "join_set_closing")?,
1246                result_kind,
1247                is_paused,
1248            };
1249
1250            let combined_state = CombinedState::new(combined_state_dto, corresponding_version)?;
1251
1252            Ok(combined_state.execution_with_state)
1253        };
1254
1255        match unpack() {
1256            Ok(execution) => vec.push(execution),
1257            Err(err) => {
1258                warn!("Skipping corrupted row in t_state: {err:?}");
1259            }
1260        }
1261    }
1262
1263    Ok(vec)
1264}
1265
1266async fn list_responses(
1267    tx: &Transaction<'_>,
1268    execution_id: &ExecutionId,
1269    pagination: Option<Pagination<u32>>,
1270) -> Result<Vec<ResponseWithCursor>, DbErrorRead> {
1271    // Helper to manage params dynamically
1272    let mut params: Vec<Box<dyn ToSql + Send + Sync>> = Vec::new();
1273    let mut add_param = |p: Box<dyn ToSql + Send + Sync>| {
1274        params.push(p);
1275        format!("${}", params.len())
1276    };
1277
1278    // 1. Base Query
1279    let p_execution_id = add_param(Box::new(execution_id.to_string()));
1280
1281    let mut sql = format!(
1282        "SELECT \
1283            r.id, r.created_at, r.join_set_id, r.delay_id, r.delay_success, r.child_execution_id, r.finished_version, l.json_value \
1284            FROM t_join_set_response r LEFT OUTER JOIN t_execution_log l ON r.child_execution_id = l.execution_id \
1285            WHERE \
1286            r.execution_id = {p_execution_id} \
1287            AND ( r.finished_version = l.version OR r.child_execution_id IS NULL )"
1288    );
1289
1290    // 2. Pagination Logic
1291    let limit = match &pagination {
1292        Some(p @ (Pagination::NewerThan { cursor, .. } | Pagination::OlderThan { cursor, .. })) => {
1293            // Postgres BIGINT is i64.
1294            let p_cursor = add_param(Box::new(i64::from(*cursor)));
1295
1296            // Add WHERE clause for cursor
1297            write!(sql, " AND r.id {} {}", p.rel(), p_cursor).unwrap();
1298
1299            Some(p.length())
1300        }
1301        None => None,
1302    };
1303
1304    // 3. Ordering
1305    sql.push_str(" ORDER BY r.id");
1306    let is_desc = pagination.as_ref().is_some_and(Pagination::is_desc);
1307    if is_desc {
1308        sql.push_str(" DESC");
1309    }
1310
1311    // 4. Limit
1312    if let Some(limit) = limit {
1313        // Postgres limit expects i64
1314        let p_limit = add_param(Box::new(i64::from(limit)));
1315        write!(sql, " LIMIT {p_limit}").unwrap();
1316    }
1317
1318    // Re-order to ascending for consistent oldest-to-newest results
1319    if is_desc {
1320        sql = format!("SELECT * FROM ({sql}) AS sub ORDER BY id ASC");
1321    }
1322
1323    let params_refs: Vec<&(dyn ToSql + Sync)> = params
1324        .iter()
1325        .map(|p| p.as_ref() as &(dyn ToSql + Sync))
1326        .collect();
1327
1328    let rows = tx
1329        .query(&sql, &params_refs)
1330        .await
1331        .map_err(DbErrorRead::from)?;
1332
1333    let mut results = Vec::with_capacity(rows.len());
1334    for row in rows {
1335        results.push(parse_response_with_cursor(&row)?);
1336    }
1337
1338    Ok(results)
1339}
1340
1341async fn list_logs_tx(
1342    tx: &Transaction<'_>,
1343    execution_id: &ExecutionId,
1344    show_derived: bool,
1345    filter: &LogFilter,
1346    pagination: &Pagination<DateTime<Utc>>,
1347) -> Result<ListLogsResponse, DbErrorRead> {
1348    let mut param_index = 1;
1349    let exec_id_str = execution_id.to_string();
1350    let exec_id_filter = if show_derived {
1351        format!("execution_id LIKE ${param_index} || '%'")
1352    } else {
1353        format!("execution_id = ${param_index}")
1354    };
1355    let mut query = format!(
1356        "SELECT id, run_id, created_at, level, message, stream_type, payload, execution_id
1357         FROM t_log
1358         WHERE {exec_id_filter}"
1359    );
1360    let mut params: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = vec![&exec_id_str];
1361    param_index += 1;
1362
1363    // Logs and streams filter
1364    let level_filter = if filter.should_show_logs() {
1365        let levels_str = if !filter.levels().is_empty() {
1366            filter
1367                .levels()
1368                .iter()
1369                .map(|lvl| (*lvl as u8).to_string())
1370                .collect::<Vec<_>>()
1371                .join(",")
1372        } else {
1373            LogLevel::iter()
1374                .map(|lvl| (lvl as u8).to_string())
1375                .collect::<Vec<_>>()
1376                .join(",")
1377        };
1378        Some(format!(" level IN ({levels_str})"))
1379    } else {
1380        None
1381    };
1382    let stream_filter = if filter.should_show_streams() {
1383        let streams_str = if !filter.stream_types().is_empty() {
1384            filter
1385                .stream_types()
1386                .iter()
1387                .map(|st| (*st as u8).to_string())
1388                .collect::<Vec<_>>()
1389                .join(",")
1390        } else {
1391            LogStreamType::iter()
1392                .map(|st| (st as u8).to_string())
1393                .collect::<Vec<_>>()
1394                .join(",")
1395        };
1396        Some(format!(" stream_type IN ({streams_str})"))
1397    } else {
1398        None
1399    };
1400    match (level_filter, stream_filter) {
1401        (Some(level_filter), Some(stream_filter)) => {
1402            write!(&mut query, " AND ({level_filter} OR {stream_filter})")
1403                .expect("writing to string");
1404        }
1405        (Some(level_filter), None) => {
1406            write!(&mut query, " AND {level_filter}").expect("writing to string");
1407        }
1408        (None, Some(stream_filter)) => {
1409            write!(&mut query, " AND {stream_filter}").expect("writing to string");
1410        }
1411        (None, None) => unreachable!("guarded by constructor"),
1412    }
1413
1414    // Pagination
1415    write!(
1416        &mut query,
1417        " AND created_at {} ${param_index}",
1418        pagination.rel()
1419    )
1420    .expect("writing to string");
1421    let cursor_val = pagination.cursor();
1422    params.push(cursor_val);
1423    param_index += 1;
1424
1425    // Ordering and limit
1426    let dir = if pagination.is_desc() { "DESC" } else { "ASC" };
1427    write!(
1428        &mut query,
1429        " ORDER BY created_at {dir}, id {dir} LIMIT ${param_index}",
1430    )
1431    .expect("writing to string");
1432    let length_val: i64 = i64::from(pagination.length());
1433    params.push(&length_val);
1434
1435    let rows = tx.query(&query, &params[..]).await?;
1436
1437    let mut items = Vec::with_capacity(rows.len());
1438
1439    for row in rows {
1440        let created_at: chrono::DateTime<chrono::Utc> = get(&row, "created_at")?;
1441        let run_id: String = get(&row, "run_id")?;
1442        let run_id = RunId::from_str(&run_id).map_err(|parse_err| {
1443            consistency_db_err_src(
1444                format!("cannot convert RunId {run_id}"),
1445                Arc::from(parse_err),
1446            )
1447        })?;
1448        let execution_id_str: String = get(&row, "execution_id")?;
1449        let execution_id = ExecutionId::from_str(&execution_id_str).map_err(|parse_err| {
1450            consistency_db_err_src(
1451                format!("cannot convert ExecutionId {execution_id_str}"),
1452                Arc::from(parse_err),
1453            )
1454        })?;
1455
1456        let level: Option<i32> = get(&row, "level")?;
1457        let message: Option<String> = get(&row, "message")?;
1458        let stream_type: Option<i32> = get(&row, "stream_type")?;
1459        let payload: Option<Vec<u8>> = get(&row, "payload")?;
1460
1461        let log_entry = match (level, message, stream_type, payload) {
1462            (Some(lvl), Some(msg), None, None) => {
1463                let map_err =
1464                    |err| consistency_db_err_src(format!("cannot convert {lvl} to LogLevel"), err);
1465                LogEntry::Log {
1466                    created_at,
1467                    level: u8::try_from(lvl)
1468                        .map(|lvl| LogLevel::try_from(lvl).map_err(|err| map_err(Arc::from(err))))
1469                        .map_err(|err| map_err(Arc::from(err)))??,
1470                    message: msg,
1471                }
1472            }
1473            (None, None, Some(stype), Some(pl)) => {
1474                let map_err = |err| {
1475                    consistency_db_err_src(format!("cannot convert {stype} to LogStreamType"), err)
1476                };
1477                LogEntry::Stream {
1478                    created_at,
1479                    stream_type: u8::try_from(stype)
1480                        .map(|stype| {
1481                            LogStreamType::try_from(stype).map_err(|err| map_err(Arc::from(err)))
1482                        })
1483                        .map_err(|err| map_err(Arc::from(err)))??,
1484                    payload: pl,
1485                }
1486            }
1487            _ => {
1488                return Err(consistency_db_err("invalid t_log row".to_string()).into());
1489            }
1490        };
1491
1492        items.push(LogEntryRow {
1493            cursor: created_at,
1494            run_id,
1495            log_entry,
1496            execution_id,
1497        });
1498    }
1499
1500    Ok(ListLogsResponse {
1501        next_page: items
1502            .last()
1503            .map(|item| Pagination::NewerThan {
1504                length: pagination.length(),
1505                cursor: item.cursor,
1506                including_cursor: false,
1507            })
1508            .unwrap_or(if pagination.is_asc() {
1509                *pagination // no new results, keep the same cursor
1510            } else {
1511                // no prev results, let's start from beginning
1512                Pagination::NewerThan {
1513                    length: pagination.length(),
1514                    cursor: DateTime::<Utc>::UNIX_EPOCH,
1515                    including_cursor: false,
1516                }
1517            }),
1518        prev_page: match items.first() {
1519            Some(item) => Some(Pagination::OlderThan {
1520                length: pagination.length(),
1521                cursor: item.cursor,
1522                including_cursor: false,
1523            }),
1524            None if pagination.is_asc() && pagination.cursor() > &DateTime::<Utc>::UNIX_EPOCH => {
1525                // asked for a next page that does not exists (yet).
1526                Some(pagination.invert())
1527            }
1528            None => None,
1529        },
1530        items,
1531    })
1532}
1533
1534async fn list_deployment_states(
1535    tx: &Transaction<'_>,
1536    current_time: DateTime<Utc>,
1537    pagination: Pagination<Option<DeploymentId>>,
1538    include_config_json: bool,
1539) -> Result<Vec<DeploymentState>, DbErrorRead> {
1540    // Helper for numbered params ($1, $2, ...)
1541    let mut params: Vec<Box<dyn ToSql + Send + Sync>> = Vec::new();
1542    let mut add_param = |p: Box<dyn ToSql + Send + Sync>| {
1543        params.push(p);
1544        format!("${}", params.len())
1545    };
1546
1547    // Base params
1548    let p_now = add_param(Box::new(current_time));
1549
1550    let config_json_col = if include_config_json {
1551        "d.config_json"
1552    } else {
1553        "NULL::TEXT AS config_json"
1554    };
1555
1556    let mut sql = format!(
1557        "
1558        SELECT
1559            d.deployment_id,
1560
1561            COUNT(*) FILTER (WHERE s.state = '{STATE_LOCKED}') AS locked,
1562
1563            COUNT(*) FILTER (
1564                WHERE s.state = '{STATE_PENDING_AT}'
1565                  AND s.pending_expires_finished <= {p_now}
1566            ) AS pending,
1567
1568            COUNT(*) FILTER (
1569                WHERE s.state = '{STATE_PENDING_AT}'
1570                  AND s.pending_expires_finished > {p_now}
1571            ) AS scheduled,
1572
1573            COUNT(*) FILTER (WHERE s.state = '{STATE_BLOCKED_BY_JOIN_SET}') AS blocked,
1574
1575            COUNT(*) FILTER (WHERE s.state = '{STATE_FINISHED}') AS finished,
1576
1577            {config_json_col},
1578            d.created_at,
1579            d.last_active_at,
1580            d.status
1581        FROM t_deployment d
1582        LEFT JOIN t_state s ON s.deployment_id = d.deployment_id"
1583    );
1584
1585    // Pagination
1586    if let Some(cursor) = pagination.cursor() {
1587        let p_cursor = add_param(Box::new(cursor.to_string()));
1588        write!(
1589            sql,
1590            " WHERE d.deployment_id {rel} {p_cursor}",
1591            rel = pagination.rel()
1592        )
1593        .expect("writing to string");
1594    }
1595
1596    // Grouping + ordering
1597    // Inner query: fetch rows with cursor-based ordering
1598    // Outer query: always return results in descending order
1599    let (inner_order, outer_order) = if pagination.is_desc() {
1600        ("DESC", "")
1601    } else {
1602        ("ASC", "DESC")
1603    };
1604
1605    write!(
1606        sql,
1607        " GROUP BY d.deployment_id, d.config_json, d.created_at, d.last_active_at, d.status ORDER BY d.deployment_id {inner_order} LIMIT {}",
1608        pagination.length()
1609    )
1610    .expect("writing to string");
1611
1612    let final_sql = if outer_order.is_empty() {
1613        sql
1614    } else {
1615        format!("SELECT * FROM ({sql}) AS sub ORDER BY deployment_id {outer_order}")
1616    };
1617
1618    let params_refs: Vec<&(dyn ToSql + Sync)> = params
1619        .iter()
1620        .map(|p| p.as_ref() as &(dyn ToSql + Sync))
1621        .collect();
1622
1623    let rows = tx
1624        .query(&final_sql, &params_refs)
1625        .await
1626        .map_err(DbErrorRead::from)?;
1627
1628    let mut result = Vec::with_capacity(rows.len());
1629    for row in rows {
1630        let deployment_id: String = get(&row, "deployment_id")?;
1631        let status_str: String = get::<String, _>(&row, "status")?;
1632        let status = status_str
1633            .parse::<DeploymentStatus>()
1634            .map_err(|e| consistency_db_err(format!("unknown deployment status: {e}")))?;
1635        result.push(DeploymentState {
1636            deployment_id: DeploymentId::from_str(&deployment_id).map_err(DbErrorGeneric::from)?,
1637            locked: u32::try_from(get::<i64, _>(&row, "locked")?).expect("count is never negative"),
1638            pending: u32::try_from(get::<i64, _>(&row, "pending")?)
1639                .expect("count is never negative"),
1640            scheduled: u32::try_from(get::<i64, _>(&row, "scheduled")?)
1641                .expect("count is never negative"),
1642            blocked: u32::try_from(get::<i64, _>(&row, "blocked")?)
1643                .expect("count is never negative"),
1644            finished: u32::try_from(get::<i64, _>(&row, "finished")?)
1645                .expect("count is never negative"),
1646            config_json: get::<Option<String>, _>(&row, "config_json")?,
1647            created_at: get::<DateTime<Utc>, _>(&row, "created_at")?,
1648            last_active_at: get::<Option<DateTime<Utc>>, _>(&row, "last_active_at")?,
1649            status,
1650        });
1651    }
1652
1653    Ok(result)
1654}
1655
1656fn parse_response_with_cursor(
1657    row: &tokio_postgres::Row,
1658) -> Result<ResponseWithCursor, DbErrorRead> {
1659    // Postgres BIGINT = i64.
1660    let id = u32::try_from(get::<i64, _>(row, "id")?)
1661        .map_err(|_| consistency_db_err("id must not be negative"))?;
1662
1663    let created_at: DateTime<Utc> = get(row, "created_at")?;
1664    let join_set_id_str: String = get(row, "join_set_id")?;
1665    let join_set_id = JoinSetId::from_str(&join_set_id_str).map_err(DbErrorGeneric::from)?;
1666
1667    // Extract Optionals
1668    let delay_id: Option<String> = get(row, "delay_id")?;
1669    let delay_id = delay_id
1670        .map(|id| DelayId::from_str(&id))
1671        .transpose()
1672        .map_err(DbErrorGeneric::from)?;
1673    let delay_success: Option<bool> = get(row, "delay_success")?;
1674    let child_execution_id: Option<String> = get(row, "child_execution_id")?;
1675    let child_execution_id = child_execution_id
1676        .map(|id| ExecutionIdDerived::from_str(&id))
1677        .transpose()
1678        .map_err(DbErrorGeneric::from)?;
1679    let finished_version = get::<Option<i64>, _>(row, "finished_version")?
1680        .map(Version::try_from)
1681        .transpose()
1682        .map_err(|_| consistency_db_err("version must be non-negative"))?;
1683    let json_value: Option<Json<ExecutionRequest>> = get(row, "json_value")?;
1684    let json_value = json_value.map(|it| it.0);
1685
1686    let event = match (
1687        delay_id,
1688        delay_success,
1689        child_execution_id,
1690        finished_version,
1691        json_value,
1692    ) {
1693        (Some(delay_id), Some(delay_success), None, None, None) => JoinSetResponse::DelayFinished {
1694            delay_id,
1695            result: delay_success.then_some(()).ok_or(()),
1696        },
1697        (None, None, Some(child_execution_id), Some(finished_version), Some(json_val)) => {
1698            if let ExecutionRequest::Finished { retval: result, .. } = json_val {
1699                JoinSetResponse::ChildExecutionFinished {
1700                    child_execution_id,
1701                    finished_version,
1702                    result,
1703                }
1704            } else {
1705                error!("Joined log entry must be 'Finished'");
1706                return Err(consistency_db_err("joined log entry must be 'Finished'").into());
1707            }
1708        }
1709        (delay, delay_success, child, finished, result) => {
1710            error!(
1711                "Invalid row in t_join_set_response {id} - {delay:?} {delay_success:?} {child:?} {finished:?} {result:?}",
1712            );
1713            return Err(consistency_db_err("invalid row in t_join_set_response").into());
1714        }
1715    };
1716
1717    Ok(ResponseWithCursor {
1718        cursor: ResponseCursor(id),
1719        event: JoinSetResponseEventOuter {
1720            event: JoinSetResponseEvent { join_set_id, event },
1721            created_at,
1722        },
1723    })
1724}
1725
1726#[instrument(level = Level::TRACE, skip_all, fields(%execution_id, %run_id, %executor_id))]
1727#[expect(clippy::too_many_arguments)]
1728async fn lock_single_execution(
1729    tx: &Transaction<'_>,
1730    created_at: DateTime<Utc>,
1731    component_id: &ComponentId,
1732    deployment_id: DeploymentId,
1733    execution_id: &ExecutionId,
1734    run_id: RunId,
1735    appending_version: &Version,
1736    executor_id: ExecutorId,
1737    lock_expires_at: DateTime<Utc>,
1738    retry_config: ComponentRetryConfig,
1739) -> Result<LockedExecution, DbErrorWrite> {
1740    trace!("lock_single_execution");
1741
1742    // Check State
1743    let combined_state = get_combined_state(tx, execution_id).await?;
1744    combined_state
1745        .execution_with_state
1746        .pending_state
1747        .can_append_lock(created_at, executor_id, run_id, lock_expires_at)?;
1748    let expected_version = combined_state.get_next_version_assert_not_finished();
1749    check_expected_next_and_appending_version(&expected_version, appending_version)?;
1750
1751    // Prepare Event
1752    let locked_event = Locked {
1753        component_id: component_id.clone(),
1754        deployment_id,
1755        executor_id,
1756        lock_expires_at,
1757        run_id,
1758        retry_config,
1759    };
1760    let event = ExecutionRequest::Locked(locked_event.clone());
1761
1762    let event = Json(event);
1763
1764    // Append to execution_log
1765    tx.execute(
1766        "INSERT INTO t_execution_log \
1767            (execution_id, created_at, json_value, version, variant) \
1768            VALUES ($1, $2, $3, $4, $5)",
1769        &[
1770            &execution_id.to_string(),
1771            &created_at,
1772            &event,
1773            &i64::from(appending_version.0),
1774            &event.0.variant(),
1775        ],
1776    )
1777    .await
1778    .map_err(|err| {
1779        DbErrorWrite::NonRetriable(DbErrorWriteNonRetriable::IllegalState {
1780            reason: "cannot lock".into(),
1781            context: SpanTrace::capture(),
1782            source: Some(Arc::new(err)),
1783            loc: Location::caller(),
1784        })
1785    })?;
1786
1787    let responses = list_responses(tx, execution_id, None).await?;
1788    trace!("Responses: {responses:?}");
1789
1790    // Update t_state
1791    let intermittent_event_count = update_state_locked_get_intermittent_event_count(
1792        tx,
1793        execution_id,
1794        deployment_id,
1795        &component_id.component_digest,
1796        executor_id,
1797        run_id,
1798        lock_expires_at,
1799        appending_version,
1800        retry_config,
1801    )
1802    .await?;
1803
1804    // Fetch History
1805    // Fetch event_history and `Created` event.
1806    let rows = tx
1807        .query(
1808            "SELECT json_value, version FROM t_execution_log WHERE \
1809                execution_id = $1 AND (variant = $2 OR variant = $3) \
1810                ORDER BY version",
1811            &[
1812                &execution_id.to_string(),
1813                &DUMMY_CREATED.variant(),
1814                &DUMMY_HISTORY_EVENT.variant(),
1815            ],
1816        )
1817        .await
1818        .map_err(DbErrorGeneric::from)?;
1819
1820    let mut events: VecDeque<ExecutionEvent> = VecDeque::new();
1821
1822    for row in rows {
1823        let event: Json<ExecutionRequest> = get(&row, "json_value")?;
1824        let event = event.0;
1825
1826        let version: i64 = get(&row, "version")?;
1827        let version = Version::try_from(version)
1828            .map_err(|_| consistency_db_err("version must be non-negative"))?;
1829
1830        events.push_back(ExecutionEvent {
1831            created_at: DateTime::from_timestamp_nanos(0), // not used, only the inner event and version
1832            event,
1833            backtrace_id: None,
1834            version,
1835        });
1836    }
1837
1838    // Extract Created Event
1839    let Some(ExecutionRequest::Created {
1840        ffqn,
1841        params,
1842        parent,
1843        metadata,
1844        ..
1845    }) = events.pop_front().map(|outer| outer.event)
1846    else {
1847        error!("Execution log must contain at least `Created` event");
1848        return Err(consistency_db_err("execution log must contain `Created` event").into());
1849    };
1850
1851    // Extract History Events
1852    let mut event_history = Vec::new();
1853    for ExecutionEvent { event, version, .. } in events {
1854        if let ExecutionRequest::HistoryEvent { event } = event {
1855            event_history.push((event, version));
1856        } else {
1857            error!("Rows can only contain `Created` and `HistoryEvent` event kinds");
1858            return Err(consistency_db_err(
1859                "rows can only contain `Created` and `HistoryEvent` event kinds",
1860            )
1861            .into());
1862        }
1863    }
1864
1865    Ok(LockedExecution {
1866        execution_id: execution_id.clone(),
1867        metadata,
1868        next_version: appending_version.increment(),
1869        ffqn,
1870        params,
1871        event_history,
1872        responses,
1873        parent,
1874        intermittent_event_count,
1875        locked_event,
1876    })
1877}
1878
1879async fn count_join_next(
1880    tx: &Transaction<'_>,
1881    execution_id: &ExecutionId,
1882    join_set_id: &JoinSetId,
1883) -> Result<u32, DbErrorRead> {
1884    let row = tx
1885            .query_one(
1886                "SELECT COUNT(*) as count FROM t_execution_log WHERE execution_id = $1 AND join_set_id = $2 \
1887                AND history_event_type = $3",
1888                &[
1889                    &execution_id.to_string(),
1890                    &join_set_id.to_string(),
1891                    &HISTORY_EVENT_TYPE_JOIN_NEXT,
1892                ],
1893            )
1894            .await
1895            .map_err(DbErrorRead::from)?;
1896
1897    let count = u32::try_from(get::<i64, _>(&row, "count")?).expect("COUNT cannot be negative");
1898    Ok(count)
1899}
1900
1901async fn nth_response(
1902    tx: &Transaction<'_>,
1903    execution_id: &ExecutionId,
1904    join_set_id: &JoinSetId,
1905    skip_rows: u32,
1906) -> Result<Option<ResponseWithCursor>, DbErrorRead> {
1907    let row = tx
1908            .query_opt(
1909                "SELECT r.id, r.created_at, r.join_set_id, \
1910                 r.delay_id, r.delay_success, \
1911                 r.child_execution_id, r.finished_version, l.json_value \
1912                 FROM t_join_set_response r LEFT OUTER JOIN t_execution_log l ON r.child_execution_id = l.execution_id \
1913                 WHERE \
1914                 r.execution_id = $1 AND r.join_set_id = $2 AND \
1915                 ( \
1916                 r.finished_version = l.version \
1917                 OR \
1918                 r.child_execution_id IS NULL \
1919                 ) \
1920                 ORDER BY id \
1921                 LIMIT 1 OFFSET $3",
1922                 &[
1923                     &execution_id.to_string(),
1924                     &join_set_id.to_string(),
1925                     &i64::from(skip_rows),
1926                 ]
1927            )
1928            .await
1929            .map_err(DbErrorRead::from)?;
1930
1931    match row {
1932        Some(r) => Ok(Some(parse_response_with_cursor(&r)?)),
1933        None => Ok(None),
1934    }
1935}
1936
1937#[instrument(level = Level::TRACE, skip_all, fields(%execution_id))]
1938async fn append(
1939    tx: &Transaction<'_>,
1940    execution_id: &ExecutionId,
1941    req: AppendRequest,
1942    appending_version: Version,
1943) -> Result<(AppendResponse, AppendNotifier), DbErrorWrite> {
1944    if matches!(req.event, ExecutionRequest::Created { .. }) {
1945        return Err(DbErrorWrite::NonRetriable(
1946            DbErrorWriteNonRetriable::ValidationFailed(
1947                "cannot append `Created` event - use `create` instead".into(),
1948            ),
1949        ));
1950    }
1951
1952    if let AppendRequest {
1953        event:
1954            ExecutionRequest::Locked(Locked {
1955                component_id,
1956                deployment_id,
1957                executor_id,
1958                run_id,
1959                lock_expires_at,
1960                retry_config,
1961            }),
1962        created_at,
1963    } = req
1964    {
1965        return lock_single_execution(
1966            tx,
1967            created_at,
1968            &component_id,
1969            deployment_id,
1970            execution_id,
1971            run_id,
1972            &appending_version,
1973            executor_id,
1974            lock_expires_at,
1975            retry_config,
1976        )
1977        .await
1978        .map(|locked_execution| (locked_execution.next_version, AppendNotifier::default()));
1979    }
1980
1981    let combined_state = get_combined_state(tx, execution_id).await?;
1982    if combined_state
1983        .execution_with_state
1984        .pending_state
1985        .is_finished()
1986    {
1987        debug!("Execution is already finished");
1988        return Err(DbErrorWrite::NonRetriable(
1989            DbErrorWriteNonRetriable::AlreadyFinished,
1990        ));
1991    }
1992
1993    check_expected_next_and_appending_version(
1994        &combined_state.get_next_version_assert_not_finished(),
1995        &appending_version,
1996    )?;
1997
1998    let event = Json(req.event);
1999
2000    // Insert into t_execution_log
2001    tx.execute(
2002            "INSERT INTO t_execution_log (execution_id, created_at, json_value, version, variant, join_set_id) \
2003             VALUES ($1, $2, $3, $4, $5, $6)",
2004            &[
2005                &execution_id.to_string(),
2006                &req.created_at,
2007                &event,
2008                &i64::from(appending_version.0),
2009                &event.0.variant(),
2010                &event.0.join_set_id().map(std::string::ToString::to_string),
2011            ],
2012        )
2013        .await?;
2014
2015    // Calculate current pending state
2016    match &event.0 {
2017        ExecutionRequest::Created { .. } => {
2018            unreachable!("handled in the caller")
2019        }
2020
2021        ExecutionRequest::Locked { .. } => {
2022            unreachable!("handled above")
2023        }
2024
2025        ExecutionRequest::TemporarilyFailed {
2026            backoff_expires_at, ..
2027        }
2028        | ExecutionRequest::TemporarilyTimedOut {
2029            backoff_expires_at, ..
2030        } => {
2031            let (next_version, notifier) = update_state_pending_after_event_appended(
2032                tx,
2033                execution_id,
2034                &appending_version,
2035                *backoff_expires_at,
2036                true, // an intermittent failure
2037                combined_state.execution_with_state.component_digest,
2038            )
2039            .await?;
2040            return Ok((next_version, notifier));
2041        }
2042
2043        ExecutionRequest::Unlocked {
2044            backoff_expires_at, ..
2045        } => {
2046            let (next_version, notifier) = update_state_pending_after_event_appended(
2047                tx,
2048                execution_id,
2049                &appending_version,
2050                *backoff_expires_at,
2051                false, // not an intermittent failure
2052                combined_state.execution_with_state.component_digest,
2053            )
2054            .await?;
2055            return Ok((next_version, notifier));
2056        }
2057
2058        ExecutionRequest::Paused => {
2059            match &combined_state.execution_with_state.pending_state {
2060                PendingState::Finished { .. } => {
2061                    unreachable!("handled above");
2062                }
2063                PendingState::Locked(..) => {
2064                    return Err(DbErrorWriteNonRetriable::IllegalState {
2065                        reason:
2066                            "cannot append Paused event when execution is locked; use pause_execution"
2067                                .into(),
2068                        context: SpanTrace::capture(),
2069                        source: None,
2070                        loc: Location::caller(),
2071                    }
2072                    .into());
2073                }
2074                PendingState::Paused(..) => {
2075                    return Err(DbErrorWriteNonRetriable::IllegalState {
2076                        reason: "cannot pause, execution is already paused".into(),
2077                        context: SpanTrace::capture(),
2078                        source: None,
2079                        loc: Location::caller(),
2080                    }
2081                    .into());
2082                }
2083                _ => {}
2084            }
2085            let next_version =
2086                update_state_paused(tx, execution_id, &appending_version, true).await?;
2087            return Ok((next_version, AppendNotifier::default()));
2088        }
2089
2090        ExecutionRequest::Unpaused => {
2091            if !combined_state
2092                .execution_with_state
2093                .pending_state
2094                .is_paused()
2095            {
2096                return Err(DbErrorWriteNonRetriable::IllegalState {
2097                    reason: "cannot unpause, execution is not paused".into(),
2098                    context: SpanTrace::capture(),
2099                    source: None,
2100                    loc: Location::caller(),
2101                }
2102                .into());
2103            }
2104            let next_version =
2105                update_state_paused(tx, execution_id, &appending_version, false).await?;
2106            return Ok((next_version, AppendNotifier::default()));
2107        }
2108
2109        ExecutionRequest::Finished { retval, .. } => {
2110            update_state_finished(
2111                tx,
2112                execution_id,
2113                &appending_version,
2114                req.created_at,
2115                PendingStateFinishedResultKind::from(retval),
2116            )
2117            .await?;
2118            return Ok((
2119                appending_version,
2120                AppendNotifier {
2121                    pending_at: None,
2122                    execution_finished: Some(NotifierExecutionFinished {
2123                        execution_id: execution_id.clone(),
2124                        retval: retval.clone(),
2125                    }),
2126                    response: None,
2127                },
2128            ));
2129        }
2130
2131        ExecutionRequest::HistoryEvent {
2132            event:
2133                HistoryEvent::JoinSetCreate { .. }
2134                | HistoryEvent::JoinSetRequest {
2135                    request: JoinSetRequest::ChildExecutionRequest { .. },
2136                    ..
2137                }
2138                | HistoryEvent::Persist { .. }
2139                | HistoryEvent::Schedule { .. }
2140                | HistoryEvent::Stub { .. }
2141                | HistoryEvent::JoinNextTooMany { .. }
2142                | HistoryEvent::JoinNextTry { .. },
2143        } => {
2144            return Ok((
2145                bump_state_next_version(tx, execution_id, &appending_version, None).await?,
2146                AppendNotifier::default(),
2147            ));
2148        }
2149
2150        ExecutionRequest::HistoryEvent {
2151            event:
2152                HistoryEvent::JoinSetRequest {
2153                    join_set_id,
2154                    request:
2155                        JoinSetRequest::DelayRequest {
2156                            delay_id,
2157                            expires_at,
2158                            paused,
2159                            ..
2160                        },
2161                },
2162        } => {
2163            return Ok((
2164                bump_state_next_version(
2165                    tx,
2166                    execution_id,
2167                    &appending_version,
2168                    Some(DelayReq {
2169                        join_set_id: join_set_id.clone(),
2170                        delay_id: delay_id.clone(),
2171                        expires_at: *expires_at,
2172                        paused: *paused,
2173                    }),
2174                )
2175                .await?,
2176                AppendNotifier::default(),
2177            ));
2178        }
2179
2180        ExecutionRequest::HistoryEvent {
2181            event:
2182                HistoryEvent::JoinNext {
2183                    join_set_id,
2184                    run_expires_at,
2185                    closing,
2186                    requested_ffqn: _,
2187                },
2188        } => {
2189            // Did the response arrive already?
2190            let join_next_count = count_join_next(tx, execution_id, join_set_id).await?;
2191
2192            // Fetch the response corresponding to this JoinNext (skip n-1)
2193            let nth_response =
2194                nth_response(tx, execution_id, join_set_id, join_next_count - 1).await?;
2195
2196            trace!("join_next_count: {join_next_count}, nth_response: {nth_response:?}");
2197            assert!(join_next_count > 0);
2198
2199            if let Some(ResponseWithCursor {
2200                event:
2201                    JoinSetResponseEventOuter {
2202                        created_at: nth_created_at,
2203                        ..
2204                    },
2205                cursor: _,
2206            }) = nth_response
2207            {
2208                let scheduled_at = std::cmp::max(*run_expires_at, nth_created_at);
2209                let (next_version, notifier) = update_state_pending_after_event_appended(
2210                    tx,
2211                    execution_id,
2212                    &appending_version,
2213                    scheduled_at,
2214                    false, // not an intermittent failure
2215                    combined_state.execution_with_state.component_digest,
2216                )
2217                .await?;
2218                return Ok((next_version, notifier));
2219            }
2220
2221            return Ok((
2222                update_state_blocked(
2223                    tx,
2224                    execution_id,
2225                    &appending_version,
2226                    join_set_id,
2227                    *run_expires_at,
2228                    *closing,
2229                )
2230                .await?,
2231                AppendNotifier::default(),
2232            ));
2233        }
2234    }
2235}
2236
2237async fn append_response(
2238    tx: &Transaction<'_>,
2239    execution_id: &ExecutionId,
2240    event: JoinSetResponseEventOuter,
2241) -> Result<AppendNotifier, DbErrorWrite> {
2242    let join_set_id = &event.event.join_set_id;
2243
2244    let (delay_id, delay_success) = match &event.event.event {
2245        JoinSetResponse::DelayFinished { delay_id, result } => {
2246            (Some(delay_id.to_string()), Some(result.is_ok()))
2247        }
2248        JoinSetResponse::ChildExecutionFinished { .. } => (None, None),
2249    };
2250
2251    let (child_execution_id, finished_version) = match &event.event.event {
2252        JoinSetResponse::ChildExecutionFinished {
2253            child_execution_id,
2254            finished_version,
2255            result: _,
2256        } => (
2257            Some(child_execution_id.to_string()),
2258            Some(i64::from(finished_version.0)),
2259        ),
2260        JoinSetResponse::DelayFinished { .. } => (None, None),
2261    };
2262
2263    let row = tx.query_one(
2264            "INSERT INTO t_join_set_response (execution_id, created_at, join_set_id, delay_id, delay_success, child_execution_id, finished_version) \
2265             VALUES ($1, $2, $3, $4, $5, $6, $7) RETURNING id",
2266             &[
2267                 &execution_id.to_string(),
2268                 &event.created_at,
2269                 &join_set_id.to_string(),
2270                 &delay_id,
2271                 &delay_success,
2272                 &child_execution_id,
2273                 &finished_version,
2274             ]
2275        ).await?;
2276    let cursor = ResponseCursor(
2277        u32::try_from(get::<i64, _>(&row, 0)?)
2278            .map_err(|_| consistency_db_err("t_join_set_response.id must not be negative"))?,
2279    );
2280    // if the execution is going to be unblocked by this response...
2281    let combined_state = get_combined_state(tx, execution_id).await?;
2282    debug!("previous_pending_state: {combined_state:?}");
2283
2284    let mut notifier = if let PendingStateMergedPause::BlockedByJoinSet {
2285        state:
2286            PendingStateBlockedByJoinSet {
2287                join_set_id: found_join_set_id,
2288                lock_expires_at, // Set to a future time if the worker is keeping the execution warm waiting for the result.
2289                closing: _,
2290            },
2291        paused: _,
2292    } =
2293        PendingStateMergedPause::from(combined_state.execution_with_state.pending_state)
2294        && *join_set_id == found_join_set_id
2295    {
2296        let scheduled_at = std::cmp::max(lock_expires_at, event.created_at);
2297        // Unblock the state.
2298        update_state_pending_after_response_appended(
2299            tx,
2300            execution_id,
2301            scheduled_at,
2302            combined_state.execution_with_state.component_digest,
2303        )
2304        .await?
2305    } else {
2306        AppendNotifier::default()
2307    };
2308
2309    if let JoinSetResponseEvent {
2310        join_set_id,
2311        event:
2312            JoinSetResponse::DelayFinished {
2313                delay_id,
2314                result: _,
2315            },
2316    } = &event.event
2317    {
2318        debug!(%join_set_id, %delay_id, "Deleting from `t_delay`");
2319        tx.execute(
2320            "DELETE FROM t_delay WHERE execution_id = $1 AND join_set_id = $2 AND delay_id = $3",
2321            &[
2322                &execution_id.to_string(),
2323                &join_set_id.to_string(),
2324                &delay_id.to_string(),
2325            ],
2326        )
2327        .await?;
2328    }
2329
2330    notifier.response = Some((execution_id.clone(), ResponseWithCursor { cursor, event }));
2331    Ok(notifier)
2332}
2333
2334async fn append_backtrace(
2335    tx: &Transaction<'_>,
2336    backtrace_info: &BacktraceInfo,
2337) -> Result<(), DbErrorWrite> {
2338    // Compute hash for deduplication
2339    let backtrace_hash = backtrace_info.wasm_backtrace.hash();
2340
2341    // Insert into t_wasm_backtrace if not already present
2342    tx.execute(
2343        "INSERT INTO t_wasm_backtrace (backtrace_hash, wasm_backtrace) \
2344         VALUES ($1, $2) \
2345         ON CONFLICT (backtrace_hash) DO NOTHING",
2346        &[
2347            &backtrace_hash.as_slice(),
2348            &Json(&backtrace_info.wasm_backtrace),
2349        ],
2350    )
2351    .await?;
2352
2353    // Insert into t_execution_backtrace referencing the hash
2354    tx.execute(
2355        "INSERT INTO t_execution_backtrace \
2356         (execution_id, component_id, version_min_including, version_max_excluding, backtrace_hash) \
2357         VALUES ($1, $2, $3, $4, $5)",
2358        &[
2359            &backtrace_info.execution_id.to_string(),
2360            &Json(&backtrace_info.component_id),
2361            &i64::from(backtrace_info.version_min_including.0),
2362            &i64::from(backtrace_info.version_max_excluding.0),
2363            &backtrace_hash.as_slice(),
2364        ],
2365    )
2366    .await?;
2367
2368    Ok(())
2369}
2370
2371async fn append_log(tx: &Transaction<'_>, row: &LogInfoAppendRow) -> Result<(), DbErrorWrite> {
2372    let (level, message, stream_type, payload, created_at) = match &row.log_entry {
2373        LogEntry::Log {
2374            created_at,
2375            level,
2376            message,
2377        } => (
2378            Some(*level as i32),
2379            Some(message.as_str()),
2380            None::<i32>,
2381            None::<&[u8]>,
2382            created_at,
2383        ),
2384        LogEntry::Stream {
2385            created_at,
2386            payload,
2387            stream_type,
2388        } => (
2389            None::<i32>,
2390            None::<&str>,
2391            Some(*stream_type as i32),
2392            Some(payload.as_slice()),
2393            created_at,
2394        ),
2395    };
2396
2397    tx.execute(
2398        "INSERT INTO t_log (
2399            execution_id,
2400            run_id,
2401            created_at,
2402            level,
2403            message,
2404            stream_type,
2405            payload
2406        ) VALUES ($1, $2, $3, $4, $5, $6, $7)",
2407        &[
2408            &row.execution_id.to_string(),
2409            &row.run_id.to_string(),
2410            &created_at,
2411            &level,
2412            &message,
2413            &stream_type,
2414            &payload,
2415        ],
2416    )
2417    .await?;
2418
2419    Ok(())
2420}
2421
2422async fn get_execution_log(
2423    tx: &Transaction<'_>,
2424    execution_id: &ExecutionId,
2425) -> Result<concepts::storage::ExecutionLog, DbErrorRead> {
2426    let rows = tx
2427        .query(
2428            "SELECT created_at, json_value, version FROM t_execution_log WHERE \
2429                 execution_id = $1 ORDER BY version",
2430            &[&execution_id.to_string()],
2431        )
2432        .await
2433        .map_err(DbErrorRead::from)?;
2434
2435    if rows.is_empty() {
2436        return Err(DbErrorRead::NotFound);
2437    }
2438
2439    let mut events = Vec::with_capacity(rows.len());
2440    for row in rows {
2441        let created_at: DateTime<Utc> = get(&row, "created_at")?;
2442        let event: Json<ExecutionRequest> = get(&row, "json_value")?;
2443        let event = event.0;
2444        let version: i64 = get(&row, "version")?;
2445        let version = Version::try_from(version)
2446            .map_err(|_| consistency_db_err("version must be non-negative"))?;
2447
2448        events.push(ExecutionEvent {
2449            created_at,
2450            event,
2451            backtrace_id: None,
2452            version,
2453        });
2454    }
2455
2456    let combined_state = get_combined_state(tx, execution_id).await?;
2457    let responses = list_responses(tx, execution_id, None).await?;
2458
2459    Ok(concepts::storage::ExecutionLog {
2460        execution_id: execution_id.clone(),
2461        events,
2462        responses,
2463        next_version: combined_state.get_next_version_or_finished(),
2464        pending_state: combined_state.execution_with_state.pending_state,
2465        component_digest: combined_state.execution_with_state.component_digest,
2466        component_type: combined_state.execution_with_state.component_type,
2467        deployment_id: combined_state.execution_with_state.deployment_id,
2468    })
2469}
2470
2471async fn get_max_version(
2472    tx: &Transaction<'_>,
2473    execution_id: &ExecutionId,
2474) -> Result<Version, DbErrorRead> {
2475    let row = tx
2476        .query_one(
2477            "SELECT MAX(version) as version FROM t_execution_log WHERE execution_id = $1",
2478            &[&execution_id.to_string()],
2479        )
2480        .await?;
2481    let max_version: i64 = get(&row, "version")?;
2482    let max_version = Version::try_from(max_version)
2483        .map_err(|_| consistency_db_err("version must be non-negative"))?;
2484    Ok(max_version)
2485}
2486
2487async fn get_max_response_cursor(
2488    tx: &Transaction<'_>,
2489    execution_id: &ExecutionId,
2490) -> Result<ResponseCursor, DbErrorRead> {
2491    let row = tx
2492        .query_one(
2493            "SELECT MAX(id) as id FROM t_join_set_response WHERE execution_id = $1",
2494            &[&execution_id.to_string()],
2495        )
2496        .await?;
2497    // Assume the execution exists and has no responses
2498    let max_cursor = get::<Option<i64>, _>(&row, "id")?.unwrap_or_default();
2499    let max_cursor = ResponseCursor(
2500        u32::try_from(max_cursor).map_err(|_| consistency_db_err("id must not be negative"))?,
2501    );
2502    Ok(max_cursor)
2503}
2504
2505async fn list_execution_events(
2506    tx: &Transaction<'_>,
2507    execution_id: &ExecutionId,
2508    pagination: Pagination<VersionType>,
2509    include_backtrace_id: bool,
2510) -> Result<Vec<ExecutionEvent>, DbErrorRead> {
2511    let mut params: Vec<Box<dyn ToSql + Send + Sync>> = Vec::new();
2512    let mut add_param = |p: Box<dyn ToSql + Send + Sync>| {
2513        params.push(p);
2514        format!("${}", params.len())
2515    };
2516
2517    let p_execution_id = add_param(Box::new(execution_id.to_string()));
2518
2519    let (cursor, length, rel, is_desc) = match &pagination {
2520        Pagination::NewerThan {
2521            cursor,
2522            length,
2523            including_cursor,
2524        } => (
2525            *cursor,
2526            *length,
2527            if *including_cursor { ">=" } else { ">" },
2528            false,
2529        ),
2530        Pagination::OlderThan {
2531            cursor,
2532            length,
2533            including_cursor,
2534        } => (
2535            *cursor,
2536            *length,
2537            if *including_cursor { "<=" } else { "<" },
2538            true,
2539        ),
2540    };
2541    let p_cursor = add_param(Box::new(i64::from(cursor)));
2542    let p_limit = add_param(Box::new(i64::from(length)));
2543
2544    let base_select = if include_backtrace_id {
2545        format!(
2546            "SELECT
2547                log.created_at,
2548                log.json_value,
2549                log.version,
2550                bt.version_min_including AS backtrace_id
2551            FROM
2552                t_execution_log AS log
2553            LEFT OUTER JOIN
2554                t_execution_backtrace AS bt ON log.execution_id = bt.execution_id
2555                                AND log.version >= bt.version_min_including
2556                                AND log.version < bt.version_max_excluding
2557            WHERE
2558                log.execution_id = {p_execution_id}
2559                AND log.version {rel} {p_cursor}"
2560        )
2561    } else {
2562        format!(
2563            "SELECT
2564                created_at, json_value, NULL::BIGINT as backtrace_id, version
2565            FROM t_execution_log WHERE
2566                execution_id = {p_execution_id} AND version {rel} {p_cursor}"
2567        )
2568    };
2569
2570    let order = if is_desc { "DESC" } else { "ASC" };
2571    let mut sql = format!("{base_select} ORDER BY version {order} LIMIT {p_limit}");
2572
2573    // Re-order to ascending for consistent oldest-to-newest results
2574    if is_desc {
2575        sql = format!("SELECT * FROM ({sql}) AS sub ORDER BY version ASC");
2576    }
2577
2578    let params_refs: Vec<&(dyn ToSql + Sync)> = params
2579        .iter()
2580        .map(|p| p.as_ref() as &(dyn ToSql + Sync))
2581        .collect();
2582
2583    let rows = tx
2584        .query(&sql, &params_refs)
2585        .await
2586        .map_err(DbErrorRead::from)?;
2587
2588    let mut events = Vec::with_capacity(rows.len());
2589    for row in rows {
2590        let created_at: DateTime<Utc> = get(&row, "created_at")?;
2591        let backtrace_id = get::<Option<i64>, _>(&row, "backtrace_id")?
2592            .map(Version::try_from)
2593            .transpose()
2594            .map_err(|_| consistency_db_err("version must be non-negative"))?;
2595
2596        let version = get::<i64, _>(&row, "version")?;
2597        let version = Version::new(
2598            VersionType::try_from(version)
2599                .map_err(|_| consistency_db_err("version must be non-negative"))?,
2600        );
2601        let event_req: Json<ExecutionRequest> = get(&row, "json_value")?;
2602        let event_req = event_req.0;
2603
2604        events.push(ExecutionEvent {
2605            created_at,
2606            event: event_req,
2607            backtrace_id,
2608            version,
2609        });
2610    }
2611    Ok(events)
2612}
2613
2614async fn get_execution_event(
2615    tx: &Transaction<'_>,
2616    execution_id: &ExecutionId,
2617    version: VersionType,
2618) -> Result<ExecutionEvent, DbErrorRead> {
2619    let row = tx
2620        .query_one(
2621            "SELECT created_at, json_value, version FROM t_execution_log WHERE \
2622                 execution_id = $1 AND version = $2",
2623            &[&execution_id.to_string(), &i64::from(version)],
2624        )
2625        .await?;
2626
2627    let created_at: DateTime<Utc> = get(&row, "created_at")?;
2628    let json_val: Json<ExecutionRequest> = get(&row, "json_value")?;
2629    let version = get::<i64, _>(&row, "version")?;
2630    let version = Version::try_from(version)
2631        .map_err(|_| consistency_db_err("version must be non-negative"))?;
2632    let event = json_val.0;
2633
2634    Ok(ExecutionEvent {
2635        created_at,
2636        event,
2637        backtrace_id: None,
2638        version,
2639    })
2640}
2641
2642async fn get_last_execution_event(
2643    tx: &Transaction<'_>,
2644    execution_id: &ExecutionId,
2645) -> Result<ExecutionEvent, DbErrorRead> {
2646    let row = tx
2647        .query_one(
2648            "SELECT created_at, json_value, version FROM t_execution_log WHERE \
2649                 execution_id = $1 ORDER BY version DESC LIMIT 1",
2650            &[&execution_id.to_string()],
2651        )
2652        .await?;
2653
2654    let created_at: DateTime<Utc> = get(&row, "created_at")?;
2655    let event: Json<ExecutionRequest> = get(&row, "json_value")?;
2656    let event = event.0;
2657    let version: i64 = get(&row, "version")?;
2658    let version = Version::try_from(version)
2659        .map_err(|_| consistency_db_err("version must be non-negative"))?;
2660
2661    Ok(ExecutionEvent {
2662        created_at,
2663        event,
2664        backtrace_id: None,
2665        version,
2666    })
2667}
2668
2669async fn delay_response(
2670    tx: &Transaction<'_>,
2671    execution_id: &ExecutionId,
2672    delay_id: &DelayId,
2673) -> Result<Option<bool>, DbErrorRead> {
2674    let row = tx
2675        .query_opt(
2676            "SELECT delay_success \
2677                 FROM t_join_set_response \
2678                 WHERE \
2679                 execution_id = $1 AND delay_id = $2",
2680            &[&execution_id.to_string(), &delay_id.to_string()],
2681        )
2682        .await?;
2683
2684    match row {
2685        Some(r) => Ok(Some(get::<bool, _>(&r, "delay_success")?)),
2686        None => Ok(None),
2687    }
2688}
2689
2690#[instrument(level = Level::TRACE, skip_all)]
2691async fn get_responses_after(
2692    tx: &Transaction<'_>,
2693    execution_id: &ExecutionId,
2694    last_response: ResponseCursor,
2695) -> Result<Vec<ResponseWithCursor>, DbErrorRead> {
2696    let rows = tx
2697            .query(
2698                "SELECT r.id, r.created_at, r.join_set_id, \
2699                 r.delay_id, r.delay_success, \
2700                 r.child_execution_id, r.finished_version, child.json_value \
2701                 FROM t_join_set_response r LEFT OUTER JOIN t_execution_log child ON r.child_execution_id = child.execution_id \
2702                 WHERE \
2703                 r.id > $1 AND \
2704                 r.execution_id = $2 AND \
2705                 ( \
2706                 r.finished_version = child.version \
2707                 OR \
2708                 r.child_execution_id IS NULL \
2709                 ) \
2710                 ORDER BY id \
2711                 ",
2712                 &[
2713                     &i64::from(last_response.0),
2714                     &execution_id.to_string(),
2715                 ]
2716            )
2717            .await?;
2718
2719    let mut results = Vec::with_capacity(rows.len());
2720    for row in rows {
2721        let resp = parse_response_with_cursor(&row)?;
2722        results.push(resp);
2723    }
2724    Ok(results)
2725}
2726
2727async fn get_pending_of_single_ffqn(
2728    tx: &Transaction<'_>,
2729    batch_size: u32,
2730    pending_at_or_sooner: DateTime<Utc>,
2731    ffqn: &FunctionFqn,
2732    select_strategy: SelectStrategy,
2733) -> Result<Vec<(ExecutionId, Version)>, ()> {
2734    let rows = tx
2735        .query(
2736            &format!(
2737                r"
2738                SELECT execution_id, corresponding_version FROM t_state
2739                WHERE
2740                state = '{STATE_PENDING_AT}' AND
2741                pending_expires_finished <= $1 AND ffqn = $2
2742                AND is_paused = false
2743                ORDER BY pending_expires_finished
2744                {}
2745                LIMIT $3
2746                ",
2747                if select_strategy == SelectStrategy::LockForUpdate {
2748                    "FOR UPDATE SKIP LOCKED"
2749                } else {
2750                    ""
2751                }
2752            ),
2753            &[
2754                &pending_at_or_sooner,
2755                &ffqn.to_string(),
2756                &(i64::from(batch_size)),
2757            ],
2758        )
2759        .await
2760        .map_err(|err| {
2761            warn!("Ignoring consistency error {err:?}");
2762        })?;
2763
2764    let mut result = Vec::with_capacity(rows.len());
2765    for row in rows {
2766        let unpack = || -> Result<(ExecutionId, Version), DbErrorGeneric> {
2767            let eid_str: String = get(&row, "execution_id")?;
2768            let corresponding_version: i64 = get(&row, "corresponding_version")?;
2769            let corresponding_version = Version::try_from(corresponding_version)
2770                .map_err(|_| consistency_db_err("version must be non-negative"))?;
2771
2772            if let Ok(eid) = ExecutionId::from_str(&eid_str) {
2773                return Ok((eid, corresponding_version.increment()));
2774            }
2775            Err(consistency_db_err("invalid execution_id"))
2776        };
2777
2778        match unpack() {
2779            Ok(val) => result.push(val),
2780            Err(err) => warn!("Ignoring corrupted row in pending check: {err:?}"),
2781        }
2782    }
2783    Ok(result)
2784}
2785
2786/// Get executions and their next versions
2787async fn get_pending_by_ffqns(
2788    tx: &Transaction<'_>,
2789    batch_size: u32,
2790    pending_at_or_sooner: DateTime<Utc>,
2791    ffqns: &[FunctionFqn],
2792    select_strategy: SelectStrategy,
2793) -> Result<Vec<(ExecutionId, Version)>, DbErrorGeneric> {
2794    let batch_size = usize::try_from(batch_size).expect("16 bit systems are unsupported");
2795    let mut execution_ids_versions = Vec::with_capacity(batch_size);
2796
2797    for ffqn in ffqns {
2798        let needed = batch_size - execution_ids_versions.len();
2799        if needed == 0 {
2800            break;
2801        }
2802        let needed = u32::try_from(needed).expect("u32 - usize cannot overflow an 32");
2803        if let Ok(execs) =
2804            get_pending_of_single_ffqn(tx, needed, pending_at_or_sooner, ffqn, select_strategy)
2805                .await
2806        {
2807            execution_ids_versions.extend(execs);
2808        }
2809    }
2810
2811    Ok(execution_ids_versions)
2812}
2813
2814#[derive(Debug, Copy, Clone, PartialEq, Eq)]
2815enum SelectStrategy {
2816    Read,
2817    LockForUpdate,
2818}
2819
2820async fn get_pending_by_component_input_digest(
2821    tx: &Transaction<'_>,
2822    batch_size: u32,
2823    pending_at_or_sooner: DateTime<Utc>,
2824    input_digest: &ComponentDigest,
2825    select_strategy: SelectStrategy,
2826) -> Result<Vec<(ExecutionId, Version)>, DbErrorGeneric> {
2827    let rows = tx
2828        .query(
2829            &format!(
2830                r"
2831                SELECT execution_id, corresponding_version FROM t_state WHERE
2832                state = '{STATE_PENDING_AT}' AND
2833                pending_expires_finished <= $1 AND
2834                component_id_input_digest = $2
2835                AND is_paused = false
2836                ORDER BY pending_expires_finished
2837                {}
2838                LIMIT $3
2839                ",
2840                if select_strategy == SelectStrategy::LockForUpdate {
2841                    "FOR UPDATE SKIP LOCKED"
2842                } else {
2843                    ""
2844                }
2845            ),
2846            &[&pending_at_or_sooner, &input_digest, &i64::from(batch_size)],
2847        )
2848        .await?;
2849
2850    let mut result = Vec::with_capacity(rows.len());
2851    for row in rows {
2852        let unpack = || -> Result<(ExecutionId, Version), DbErrorGeneric> {
2853            let eid_str: String = get(&row, "execution_id")?;
2854            let corresponding_version: i64 = get(&row, "corresponding_version")?;
2855            let corresponding_version = Version::try_from(corresponding_version)
2856                .map_err(|_| consistency_db_err("version must be non-negative"))?;
2857
2858            let eid = ExecutionId::from_str(&eid_str)
2859                .map_err(|err| consistency_db_err(err.to_string()))?;
2860            Ok((eid, corresponding_version.increment()))
2861        };
2862
2863        match unpack() {
2864            Ok(val) => result.push(val),
2865            Err(err) => {
2866                warn!("Skipping corrupted row in get_pending_by_component_input_digest: {err:?}");
2867            }
2868        }
2869    }
2870
2871    Ok(result)
2872}
2873
2874fn notify_pending_locked(
2875    notifier: &NotifierPendingAt,
2876    current_time: DateTime<Utc>,
2877    ffqn_to_pending_subscription: &std::sync::MutexGuard<PendingFfqnSubscribersHolder>,
2878) {
2879    if notifier.scheduled_at <= current_time {
2880        ffqn_to_pending_subscription.notify(notifier);
2881    }
2882}
2883
2884async fn upgrade_execution_component(
2885    tx: &Transaction<'_>,
2886    execution_id: &ExecutionId,
2887    old: &ComponentDigest,
2888    new: &ComponentDigest,
2889) -> Result<(), DbErrorWrite> {
2890    debug!("Updating t_state to component {new}");
2891
2892    let updated = tx
2893        .execute(
2894            r"
2895                UPDATE t_state
2896                SET
2897                    updated_at = CURRENT_TIMESTAMP,
2898                    component_id_input_digest = $1
2899                WHERE
2900                    execution_id = $2 AND
2901                    component_id_input_digest = $3
2902                ",
2903            &[
2904                &new.as_slice(),           // $1: BYTEA
2905                &execution_id.to_string(), // $2: TEXT
2906                &old.as_slice(),           // $3: BYTEA
2907            ],
2908        )
2909        .await?;
2910
2911    if updated != 1 {
2912        return Err(DbErrorWrite::NotFound);
2913    }
2914    Ok(())
2915}
2916
2917impl PostgresConnection {
2918    // Must be called after write transaction commit for a correct happens-before relationship.
2919    #[instrument(level = Level::TRACE, skip_all)]
2920    fn notify_all(&self, notifiers: Vec<AppendNotifier>, current_time: DateTime<Utc>) {
2921        let (pending_ats, finished_execs, responses) = {
2922            let (mut pending_ats, mut finished_execs, mut responses) =
2923                (Vec::new(), Vec::new(), Vec::new());
2924            for notifier in notifiers {
2925                if let Some(pending_at) = notifier.pending_at {
2926                    pending_ats.push(pending_at);
2927                }
2928                if let Some(finished) = notifier.execution_finished {
2929                    finished_execs.push(finished);
2930                }
2931                if let Some(response) = notifier.response {
2932                    responses.push(response);
2933                }
2934            }
2935            (pending_ats, finished_execs, responses)
2936        };
2937
2938        // Notify pending_at subscribers.
2939        if !pending_ats.is_empty() {
2940            let guard = self.pending_subscribers.lock().unwrap();
2941            for pending_at in pending_ats {
2942                notify_pending_locked(&pending_at, current_time, &guard);
2943            }
2944        }
2945        // Notify execution finished subscribers.
2946        if !finished_execs.is_empty() {
2947            let mut guard = self.execution_finished_subscribers.lock().unwrap();
2948            for finished in finished_execs {
2949                if let Some(listeners_of_exe_id) = guard.remove(&finished.execution_id) {
2950                    for (_tag, sender) in listeners_of_exe_id {
2951                        let _ = sender.send(finished.retval.clone());
2952                    }
2953                }
2954            }
2955        }
2956        // Notify response subscribers.
2957        if !responses.is_empty() {
2958            let mut guard = self.response_subscribers.lock().unwrap();
2959            for (execution_id, response) in responses {
2960                if let Some((sender, _)) = guard.remove(&execution_id) {
2961                    let _ = sender.send(response);
2962                }
2963            }
2964        }
2965    }
2966}
2967
2968#[async_trait]
2969impl DbExecutor for PostgresConnection {
2970    #[instrument(level = Level::TRACE, skip(self))]
2971    async fn lock_pending_by_ffqns(
2972        &self,
2973        batch_size: u32,
2974        pending_at_or_sooner: DateTime<Utc>,
2975        ffqns: Arc<[FunctionFqn]>,
2976        created_at: DateTime<Utc>,
2977        component_id: ComponentId,
2978        deployment_id: DeploymentId,
2979        executor_id: ExecutorId,
2980        lock_expires_at: DateTime<Utc>,
2981        run_id: RunId,
2982        retry_config: ComponentRetryConfig,
2983    ) -> Result<LockPendingResponse, DbErrorWrite> {
2984        let mut client_guard = self.client.lock().await;
2985        let tx = client_guard.transaction().await?;
2986
2987        let execution_ids_versions = get_pending_by_ffqns(
2988            &tx,
2989            batch_size,
2990            pending_at_or_sooner,
2991            &ffqns,
2992            SelectStrategy::LockForUpdate,
2993        )
2994        .await?;
2995
2996        if execution_ids_versions.is_empty() {
2997            // Commit is required to release the connection state cleanly,
2998            // though rollback/drop works too for read-only.
2999            tx.commit().await?;
3000            return Ok(vec![]);
3001        }
3002
3003        debug!("Locking {execution_ids_versions:?}");
3004
3005        // Lock using the same transaction
3006        let mut locked_execs = Vec::with_capacity(execution_ids_versions.len());
3007        for (execution_id, version) in execution_ids_versions {
3008            match lock_single_execution(
3009                &tx,
3010                created_at,
3011                &component_id,
3012                deployment_id,
3013                &execution_id,
3014                run_id,
3015                &version,
3016                executor_id,
3017                lock_expires_at,
3018                retry_config,
3019            )
3020            .await
3021            {
3022                Ok(locked) => locked_execs.push(locked),
3023                Err(err) => {
3024                    tx.rollback().await?; // lock_single_execution performs multiple writes
3025                    debug!("Locking row {execution_id} failed - {err:?}");
3026                    return Err(err);
3027                }
3028            }
3029        }
3030
3031        tx.commit().await?;
3032
3033        Ok(locked_execs)
3034    }
3035
3036    #[instrument(level = Level::TRACE, skip(self))]
3037    async fn lock_pending_by_component_digest(
3038        &self,
3039        batch_size: u32,
3040        pending_at_or_sooner: DateTime<Utc>,
3041        component_id: &ComponentId,
3042        deployment_id: DeploymentId,
3043        created_at: DateTime<Utc>,
3044        executor_id: ExecutorId,
3045        lock_expires_at: DateTime<Utc>,
3046        run_id: RunId,
3047        retry_config: ComponentRetryConfig,
3048    ) -> Result<LockPendingResponse, DbErrorWrite> {
3049        let mut client_guard = self.client.lock().await;
3050        let tx = client_guard.transaction().await?;
3051
3052        let execution_ids_versions = get_pending_by_component_input_digest(
3053            &tx,
3054            batch_size,
3055            pending_at_or_sooner,
3056            &component_id.component_digest,
3057            SelectStrategy::LockForUpdate,
3058        )
3059        .await?;
3060
3061        if execution_ids_versions.is_empty() {
3062            tx.commit().await?;
3063            return Ok(vec![]);
3064        }
3065
3066        debug!("Locking {execution_ids_versions:?}");
3067
3068        let mut locked_execs = Vec::with_capacity(execution_ids_versions.len());
3069        for (execution_id, version) in execution_ids_versions {
3070            match lock_single_execution(
3071                &tx,
3072                created_at,
3073                component_id,
3074                deployment_id,
3075                &execution_id,
3076                run_id,
3077                &version,
3078                executor_id,
3079                lock_expires_at,
3080                retry_config,
3081            )
3082            .await
3083            {
3084                Ok(locked) => locked_execs.push(locked),
3085                Err(err) => {
3086                    tx.rollback().await?; // lock_single_execution performs multiple writes
3087                    debug!("Locking row {execution_id} failed - {err:?}");
3088                    return Err(err);
3089                }
3090            }
3091        }
3092
3093        tx.commit().await?;
3094        Ok(locked_execs)
3095    }
3096
3097    #[cfg(feature = "test")]
3098    #[instrument(level = Level::DEBUG, skip(self))]
3099    async fn lock_one(
3100        &self,
3101        created_at: DateTime<Utc>,
3102        component_id: ComponentId,
3103        deployment_id: DeploymentId,
3104        execution_id: &ExecutionId,
3105        run_id: RunId,
3106        version: Version,
3107        executor_id: ExecutorId,
3108        lock_expires_at: DateTime<Utc>,
3109        retry_config: ComponentRetryConfig,
3110    ) -> Result<LockedExecution, DbErrorWrite> {
3111        debug!(%execution_id, "lock_one");
3112        let mut client_guard = self.client.lock().await;
3113        let tx = client_guard.transaction().await?;
3114
3115        let res = lock_single_execution(
3116            &tx,
3117            created_at,
3118            &component_id,
3119            deployment_id,
3120            execution_id,
3121            run_id,
3122            &version,
3123            executor_id,
3124            lock_expires_at,
3125            retry_config,
3126        )
3127        .await?;
3128
3129        tx.commit().await?;
3130        Ok(res)
3131    }
3132
3133    #[instrument(level = Level::DEBUG, skip(self, req))]
3134    async fn append(
3135        &self,
3136        execution_id: ExecutionId,
3137        version: Version,
3138        req: AppendRequest,
3139    ) -> Result<AppendResponse, DbErrorWrite> {
3140        debug!(%req, "append");
3141        trace!(?req, "append");
3142        let created_at = req.created_at;
3143
3144        let mut client_guard = self.client.lock().await;
3145        let tx = client_guard.transaction().await?;
3146
3147        let (new_version, notifier) = append(&tx, &execution_id, req, version).await?;
3148
3149        tx.commit().await?;
3150
3151        // Explicitly drop guard (optional, happens at end of scope anyway)
3152        drop(client_guard);
3153
3154        self.notify_all(vec![notifier], created_at);
3155        Ok(new_version)
3156    }
3157
3158    #[instrument(level = Level::DEBUG, skip_all)]
3159    async fn append_batch_respond_to_parent(
3160        &self,
3161        events: AppendEventsToExecution,
3162        response: AppendResponseToExecution,
3163        current_time: DateTime<Utc>,
3164    ) -> Result<AppendBatchResponse, DbErrorWrite> {
3165        debug!("append_batch_respond_to_parent");
3166        if events.execution_id == response.parent_execution_id {
3167            return Err(DbErrorWrite::NonRetriable(
3168                DbErrorWriteNonRetriable::ValidationFailed(
3169                    "Parameters `execution_id` and `parent_execution_id` cannot be the same".into(),
3170                ),
3171            ));
3172        }
3173        if events.batch.is_empty() {
3174            return Err(DbErrorWrite::NonRetriable(
3175                DbErrorWriteNonRetriable::ValidationFailed("batch cannot be empty".into()),
3176            ));
3177        }
3178
3179        let mut client_guard = self.client.lock().await;
3180        let tx = client_guard.transaction().await?;
3181
3182        let mut version = events.version;
3183        let mut notifiers = Vec::new();
3184
3185        for append_request in events.batch {
3186            let (v, n) = append(&tx, &events.execution_id, append_request, version).await?;
3187            version = v;
3188            notifiers.push(n);
3189        }
3190
3191        let pending_at_parent = append_response(
3192            &tx,
3193            &response.parent_execution_id,
3194            JoinSetResponseEventOuter {
3195                created_at: response.created_at,
3196                event: JoinSetResponseEvent {
3197                    join_set_id: response.join_set_id,
3198                    event: JoinSetResponse::ChildExecutionFinished {
3199                        child_execution_id: response.child_execution_id,
3200                        finished_version: response.finished_version,
3201                        result: response.result,
3202                    },
3203                },
3204            },
3205        )
3206        .await?;
3207        notifiers.push(pending_at_parent);
3208
3209        tx.commit().await?;
3210        drop(client_guard);
3211
3212        self.notify_all(notifiers, current_time);
3213        Ok(version)
3214    }
3215
3216    #[instrument(level = Level::TRACE, skip(self, timeout_fut))]
3217    async fn wait_for_pending_by_ffqn(
3218        &self,
3219        pending_at_or_sooner: DateTime<Utc>,
3220        ffqns: Arc<[FunctionFqn]>,
3221        timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
3222    ) {
3223        let unique_tag: u64 = rand::random();
3224        let (sender, mut receiver) = mpsc::channel(1);
3225        {
3226            let mut pending_subscribers = self.pending_subscribers.lock().unwrap();
3227            for ffqn in ffqns.as_ref() {
3228                pending_subscribers.insert_ffqn(ffqn.clone(), (sender.clone(), unique_tag));
3229            }
3230        }
3231
3232        async {
3233            let mut db_has_pending = false;
3234            {
3235                // Scope the lock so we don't hold it while waiting for timeout
3236                let mut client_guard = self.client.lock().await;
3237                // Read-only transaction check
3238                if let Ok(tx) = client_guard.transaction().await {
3239                    if let Ok(res) = get_pending_by_ffqns(
3240                        &tx,
3241                        1,
3242                        pending_at_or_sooner,
3243                        &ffqns,
3244                        SelectStrategy::Read,
3245                    )
3246                    .await
3247                        && !res.is_empty()
3248                    {
3249                        db_has_pending = true;
3250                    }
3251                    // Commit/Rollback read transaction
3252                    let _ = tx.commit().await;
3253                }
3254            }
3255
3256            if db_has_pending {
3257                trace!("Not waiting, database already contains new pending executions");
3258                return;
3259            }
3260
3261            tokio::select! {
3262                _ = receiver.recv() => {
3263                    trace!("Received a notification");
3264                }
3265                () = timeout_fut => {
3266                }
3267            }
3268        }
3269        .await;
3270
3271        // Cleanup
3272        {
3273            let mut pending_subscribers = self.pending_subscribers.lock().unwrap();
3274            for ffqn in ffqns.as_ref() {
3275                match pending_subscribers.remove_ffqn(ffqn) {
3276                    Some((_, tag)) if tag == unique_tag => {}
3277                    Some(other) => {
3278                        pending_subscribers.insert_ffqn(ffqn.clone(), other);
3279                    }
3280                    None => {}
3281                }
3282            }
3283        }
3284    }
3285
3286    #[instrument(level = Level::DEBUG, skip(self, timeout_fut))]
3287    async fn wait_for_pending_by_component_digest(
3288        &self,
3289        pending_at_or_sooner: DateTime<Utc>,
3290        component_digest: &ComponentDigest,
3291        timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
3292    ) {
3293        let unique_tag: u64 = rand::random();
3294        let (sender, mut receiver) = mpsc::channel(1);
3295        {
3296            let mut pending_subscribers = self.pending_subscribers.lock().unwrap();
3297            pending_subscribers
3298                .insert_by_component(component_digest.clone(), (sender.clone(), unique_tag));
3299        }
3300
3301        async {
3302            let mut db_has_pending = false;
3303            {
3304                let mut client_guard = self.client.lock().await;
3305                if let Ok(tx) = client_guard.transaction().await {
3306                    if let Ok(res) = get_pending_by_component_input_digest(
3307                        &tx,
3308                        1,
3309                        pending_at_or_sooner,
3310                        component_digest,
3311                        SelectStrategy::Read,
3312                    )
3313                    .await
3314                        && !res.is_empty()
3315                    {
3316                        db_has_pending = true;
3317                    }
3318                    let _ = tx.commit().await;
3319                }
3320            }
3321
3322            if db_has_pending {
3323                trace!("Not waiting, database already contains new pending executions");
3324                return;
3325            }
3326
3327            tokio::select! {
3328                _ = receiver.recv() => {
3329                    trace!("Received a notification");
3330                }
3331                () = timeout_fut => {
3332                }
3333            }
3334        }
3335        .await;
3336
3337        // Cleanup
3338        {
3339            let mut pending_subscribers = self.pending_subscribers.lock().unwrap();
3340            match pending_subscribers.remove_by_component(component_digest) {
3341                Some((_, tag)) if tag == unique_tag => {}
3342                Some(other) => {
3343                    pending_subscribers.insert_by_component(component_digest.clone(), other);
3344                }
3345                None => {}
3346            }
3347        }
3348    }
3349
3350    async fn get_last_execution_event(
3351        &self,
3352        execution_id: &ExecutionId,
3353    ) -> Result<ExecutionEvent, DbErrorRead> {
3354        let mut client_guard = self.client.lock().await;
3355        let tx = client_guard.transaction().await?;
3356
3357        let event = get_last_execution_event(&tx, execution_id).await?;
3358
3359        tx.commit().await?;
3360        Ok(event)
3361    }
3362}
3363#[async_trait]
3364impl DbConnection for PostgresConnection {
3365    #[instrument(level = Level::DEBUG, skip_all, fields(execution_id = %req.execution_id))]
3366    async fn create(&self, req: CreateRequest) -> Result<AppendResponse, DbErrorWrite> {
3367        debug!("create");
3368        trace!(?req, "create");
3369        let created_at = req.created_at;
3370
3371        let mut client_guard = self.client.lock().await;
3372        let tx = client_guard.transaction().await?;
3373
3374        let (version, notifier) = create_inner(&tx, req.clone()).await?;
3375
3376        tx.commit().await?;
3377        drop(client_guard); // Release DB lock before notifying
3378
3379        self.notify_all(vec![notifier], created_at);
3380        Ok(version)
3381    }
3382
3383    #[instrument(level = Level::DEBUG, skip(self))]
3384    async fn get(
3385        &self,
3386        execution_id: &ExecutionId,
3387    ) -> Result<concepts::storage::ExecutionLog, DbErrorRead> {
3388        trace!("get");
3389        let mut client_guard = self.client.lock().await;
3390        let tx = client_guard.transaction().await?;
3391
3392        let res = get_execution_log(&tx, execution_id).await?;
3393
3394        tx.commit().await?;
3395        Ok(res)
3396    }
3397
3398    #[instrument(level = Level::DEBUG, skip(self, batch))]
3399    async fn append_batch(
3400        &self,
3401        current_time: DateTime<Utc>,
3402        batch: Vec<AppendRequest>,
3403        execution_id: ExecutionId,
3404        version: Version,
3405    ) -> Result<AppendBatchResponse, DbErrorWrite> {
3406        debug!("append_batch");
3407        trace!(?batch, "append_batch");
3408        assert!(!batch.is_empty(), "Empty batch request");
3409
3410        let mut client_guard = self.client.lock().await;
3411        let tx = client_guard.transaction().await?;
3412
3413        let mut version = version;
3414        let mut notifier = None;
3415
3416        for append_request in batch {
3417            let (v, n) = append(&tx, &execution_id, append_request, version).await?;
3418            version = v;
3419            notifier = Some(n);
3420        }
3421
3422        tx.commit().await?;
3423        drop(client_guard);
3424
3425        self.notify_all(
3426            vec![notifier.expect("checked that the batch is not empty")],
3427            current_time,
3428        );
3429        Ok(version)
3430    }
3431
3432    #[instrument(level = Level::DEBUG, skip_all, fields(%execution_id, %version))]
3433    async fn append_batch_create_new_execution(
3434        &self,
3435        current_time: DateTime<Utc>,
3436        batch: Vec<AppendRequest>,
3437        execution_id: ExecutionId,
3438        version: Version,
3439        child_req: Vec<CreateRequest>,
3440        backtraces: Vec<BacktraceInfo>,
3441    ) -> Result<AppendBatchResponse, DbErrorWrite> {
3442        debug!("append_batch_create_new_execution");
3443        trace!(?batch, ?child_req, "append_batch_create_new_execution");
3444        assert!(!batch.is_empty(), "Empty batch request");
3445
3446        let mut client_guard = self.client.lock().await;
3447        let tx = client_guard.transaction().await?;
3448
3449        let mut version = version;
3450        let mut notifier = None;
3451
3452        for append_request in batch {
3453            let (v, n) = append(&tx, &execution_id, append_request, version).await?;
3454            version = v;
3455            notifier = Some(n);
3456        }
3457
3458        let mut notifiers = Vec::new();
3459        notifiers.push(notifier.expect("checked that the batch is not empty"));
3460
3461        for req in child_req {
3462            let (_, n) = create_inner(&tx, req).await?;
3463            notifiers.push(n);
3464        }
3465        for backtrace in backtraces {
3466            append_backtrace(&tx, &backtrace).await?;
3467        }
3468        tx.commit().await?;
3469        drop(client_guard);
3470
3471        self.notify_all(notifiers, current_time);
3472        Ok(version)
3473    }
3474
3475    #[instrument(level = Level::DEBUG, skip(self, timeout_fut))]
3476    async fn subscribe_to_next_responses(
3477        &self,
3478        execution_id: &ExecutionId,
3479        last_response: ResponseCursor,
3480        timeout_fut: Pin<Box<dyn Future<Output = TimeoutOutcome> + Send>>,
3481    ) -> Result<Vec<ResponseWithCursor>, DbErrorReadWithTimeout> {
3482        debug!("next_responses");
3483        let unique_tag: u64 = rand::random();
3484        let execution_id_clone = execution_id.clone();
3485
3486        let cleanup = || {
3487            let mut guard = self.response_subscribers.lock().unwrap();
3488            match guard.remove(&execution_id_clone) {
3489                Some((_, tag)) if tag == unique_tag => {}
3490                Some(other) => {
3491                    guard.insert(execution_id_clone.clone(), other);
3492                }
3493                None => {}
3494            }
3495        };
3496
3497        let receiver = {
3498            let mut client_guard = self.client.lock().await;
3499            let tx = client_guard.transaction().await?;
3500
3501            // Register listener before fetching from database.
3502            // This is a best-effort mechanism that shortens the polling time, if it
3503            // does not detect the response it will sleep using `timeout_fut`. Consumers
3504            // are expected to poll this function in a loop.
3505            // Currently the notification mechanism only works on a single node deployment.
3506            let (sender, receiver) = oneshot::channel();
3507            self.response_subscribers
3508                .lock()
3509                .unwrap()
3510                .insert(execution_id.clone(), (sender, unique_tag));
3511
3512            let responses = get_responses_after(&tx, execution_id, last_response).await?;
3513
3514            if responses.is_empty() {
3515                // Commit read transaction
3516                tx.commit().await.map_err(|err| {
3517                    cleanup(); // Remove the just inserted subscriber.
3518                    DbErrorRead::from(err)
3519                })?;
3520                receiver
3521            } else {
3522                cleanup(); // Remove the just inserted subscriber as we already have the answer.
3523                tx.commit().await?;
3524                return Ok(responses);
3525            }
3526        };
3527
3528        let res = tokio::select! {
3529            resp = receiver => {
3530                match resp {
3531                    Ok(resp) => Ok(vec![resp]),
3532                    Err(_) => Err(DbErrorReadWithTimeout::from(DbErrorGeneric::Close)),
3533                }
3534            }
3535            outcome = timeout_fut => Err(DbErrorReadWithTimeout::Timeout(outcome)),
3536        };
3537
3538        cleanup();
3539        res
3540    }
3541
3542    #[instrument(level = Level::DEBUG, skip(self, timeout_fut))]
3543    async fn wait_for_finished_result(
3544        &self,
3545        execution_id: &ExecutionId,
3546        timeout_fut: Option<Pin<Box<dyn Future<Output = TimeoutOutcome> + Send>>>,
3547    ) -> Result<SupportedFunctionReturnValue, DbErrorReadWithTimeout> {
3548        let unique_tag: u64 = rand::random();
3549        let execution_id_clone = execution_id.clone();
3550
3551        let cleanup = || {
3552            let mut guard = self.execution_finished_subscribers.lock().unwrap();
3553            if let Some(subscribers) = guard.get_mut(&execution_id_clone) {
3554                subscribers.remove(&unique_tag);
3555            }
3556        };
3557
3558        let receiver = {
3559            let mut client_guard = self.client.lock().await;
3560            let tx = client_guard.transaction().await?;
3561
3562            // Register listener
3563            let (sender, receiver) = oneshot::channel();
3564            {
3565                let mut guard = self.execution_finished_subscribers.lock().unwrap();
3566                guard
3567                    .entry(execution_id.clone())
3568                    .or_default()
3569                    .insert(unique_tag, sender);
3570            }
3571
3572            let pending_state = get_combined_state(&tx, execution_id)
3573                .await?
3574                .execution_with_state
3575                .pending_state;
3576
3577            if let PendingState::Finished(finished) = pending_state {
3578                let event = get_execution_event(&tx, execution_id, finished.version).await?;
3579                tx.commit().await?;
3580                cleanup();
3581
3582                if let ExecutionRequest::Finished { retval, .. } = event.event {
3583                    return Ok(retval);
3584                }
3585                error!("Mismatch, expected Finished row: {event:?} based on t_state {finished}");
3586                return Err(DbErrorReadWithTimeout::from(consistency_db_err(
3587                    "cannot get finished event based on t_state version",
3588                )));
3589            }
3590            tx.commit().await?;
3591            receiver
3592        };
3593
3594        let timeout_fut = timeout_fut.unwrap_or_else(|| Box::pin(std::future::pending()));
3595        let res = tokio::select! {
3596            resp = receiver => {
3597                match resp {
3598                    Ok(retval) => Ok(retval),
3599                    Err(_recv_err) => Err(DbErrorGeneric::Close.into())
3600                }
3601            }
3602            outcome = timeout_fut => Err(DbErrorReadWithTimeout::Timeout(outcome)),
3603        };
3604
3605        cleanup();
3606        res
3607    }
3608
3609    #[instrument(level = Level::DEBUG, skip_all, fields(%join_set_id, %execution_id))]
3610    async fn append_delay_response(
3611        &self,
3612        created_at: DateTime<Utc>,
3613        execution_id: ExecutionId,
3614        join_set_id: JoinSetId,
3615        delay_id: DelayId,
3616        result: Result<(), ()>,
3617    ) -> Result<AppendDelayResponseOutcome, DbErrorWrite> {
3618        debug!("append_delay_response");
3619        let event = JoinSetResponseEventOuter {
3620            created_at,
3621            event: JoinSetResponseEvent {
3622                join_set_id,
3623                event: JoinSetResponse::DelayFinished {
3624                    delay_id: delay_id.clone(),
3625                    result,
3626                },
3627            },
3628        };
3629
3630        let mut client_guard = self.client.lock().await;
3631        let tx = client_guard.transaction().await?;
3632
3633        let res = append_response(&tx, &execution_id, event).await;
3634
3635        match res {
3636            Ok(notifier) => {
3637                tx.commit().await?;
3638                drop(client_guard);
3639                self.notify_all(vec![notifier], created_at);
3640                Ok(AppendDelayResponseOutcome::Success)
3641            }
3642            Err(DbErrorWrite::NonRetriable(DbErrorWriteNonRetriable::Conflict)) => {
3643                // Check if already finished
3644                // Roll back the failed tx, start new read tx.
3645                tx.rollback().await?;
3646
3647                // Start new tx for check
3648                let tx = client_guard.transaction().await?;
3649                let delay_success = delay_response(&tx, &execution_id, &delay_id).await?;
3650                tx.commit().await?;
3651
3652                match delay_success {
3653                    Some(true) => Ok(AppendDelayResponseOutcome::AlreadyFinished),
3654                    Some(false) => Ok(AppendDelayResponseOutcome::AlreadyCancelled),
3655                    None => Err(DbErrorWrite::Generic(consistency_db_err(
3656                        "insert failed yet select did not find the response",
3657                    ))),
3658                }
3659            }
3660            Err(err) => {
3661                let _ = tx.rollback().await; // cleanup
3662                Err(err)
3663            }
3664        }
3665    }
3666
3667    #[instrument(level = Level::DEBUG, skip_all)]
3668    async fn append_backtrace(&self, append: BacktraceInfo) -> Result<(), DbErrorWrite> {
3669        debug!("append_backtrace");
3670        let mut client_guard = self.client.lock().await;
3671        let tx = client_guard.transaction().await?;
3672
3673        append_backtrace(&tx, &append).await?;
3674
3675        tx.commit().await?;
3676        Ok(())
3677    }
3678
3679    #[instrument(level = Level::DEBUG, skip_all)]
3680    async fn append_backtrace_batch(&self, batch: Vec<BacktraceInfo>) -> Result<(), DbErrorWrite> {
3681        debug!("append_backtrace_batch");
3682        let mut client_guard = self.client.lock().await;
3683        let tx = client_guard.transaction().await?;
3684
3685        for append in batch {
3686            append_backtrace(&tx, &append).await?;
3687        }
3688
3689        tx.commit().await?;
3690        Ok(())
3691    }
3692
3693    #[instrument(level = Level::DEBUG, skip_all)]
3694    async fn append_log(&self, row: LogInfoAppendRow) -> Result<(), DbErrorWrite> {
3695        trace!("append_log");
3696        let mut client_guard = self.client.lock().await;
3697        let tx = client_guard.transaction().await?;
3698        append_log(&tx, &row).await?;
3699        tx.commit().await?;
3700
3701        Ok(())
3702    }
3703
3704    #[instrument(level = Level::DEBUG, skip_all)]
3705    async fn append_log_batch(&self, batch: &[LogInfoAppendRow]) -> Result<(), DbErrorWrite> {
3706        trace!("append_log_batch");
3707        let mut client_guard = self.client.lock().await;
3708        let tx = client_guard.transaction().await?;
3709        for row in batch {
3710            append_log(&tx, row).await?;
3711        }
3712        tx.commit().await?;
3713        Ok(())
3714    }
3715
3716    /// Get currently expired delays and locks.
3717    #[instrument(level = Level::TRACE, skip(self))]
3718    async fn get_expired_timers(
3719        &self,
3720        at: DateTime<Utc>,
3721    ) -> Result<Vec<ExpiredTimer>, DbErrorGeneric> {
3722        let mut client_guard = self.client.lock().await;
3723        let tx = client_guard.transaction().await?;
3724
3725        // Expired Delays
3726        let rows = tx
3727            .query(
3728                "SELECT execution_id, join_set_id, delay_id FROM t_delay WHERE expires_at <= $1 AND NOT is_paused",
3729                &[&at],
3730            )
3731            .await?;
3732
3733        let mut expired_timers = Vec::with_capacity(rows.len());
3734        for row in rows {
3735            let unpack = || -> Result<ExpiredTimer, DbErrorGeneric> {
3736                let execution_id: String = get(&row, "execution_id")?;
3737                let execution_id = ExecutionId::from_str(&execution_id)?;
3738                let join_set_id: String = get(&row, "join_set_id")?;
3739                let join_set_id = JoinSetId::from_str(&join_set_id)?;
3740                let delay_id: String = get(&row, "delay_id")?;
3741                let delay_id = DelayId::from_str(&delay_id)?;
3742
3743                Ok(ExpiredTimer::Delay(ExpiredDelay {
3744                    execution_id,
3745                    join_set_id,
3746                    delay_id,
3747                }))
3748            };
3749
3750            match unpack() {
3751                Ok(timer) => expired_timers.push(timer),
3752                Err(err) => warn!("Skipping corrupted row in get_expired_timers (delays): {err:?}"),
3753            }
3754        }
3755
3756        // Expired Locks
3757        let rows = tx.query(
3758            &format!(
3759                "SELECT execution_id, last_lock_version, corresponding_version, intermittent_event_count, max_retries, retry_exp_backoff_millis, executor_id, run_id, is_paused \
3760                 FROM t_state \
3761                 WHERE pending_expires_finished <= $1 AND state = '{STATE_LOCKED}'"
3762            ),
3763            &[&at]
3764        ).await?;
3765
3766        for row in rows {
3767            let unpack = || -> Result<Option<ExpiredTimer>, DbErrorGeneric> {
3768                let execution_id: String = get(&row, "execution_id")?;
3769                let execution_id = ExecutionId::from_str(&execution_id)?;
3770                let is_paused: bool = get(&row, "is_paused")?;
3771                if is_paused {
3772                    error!(%execution_id, "encountered invalid paused locked execution while scanning expired locks");
3773                    return Ok(None);
3774                }
3775                let last_lock_version: i64 = get(&row, "last_lock_version")?;
3776                let last_lock_version = Version::try_from(last_lock_version)?;
3777
3778                let corresponding_version: i64 = get(&row, "corresponding_version")?;
3779                let corresponding_version = Version::try_from(corresponding_version)?;
3780
3781                let intermittent_event_count =
3782                    u32::try_from(get::<i64, _>(&row, "intermittent_event_count")?).map_err(
3783                        |_| consistency_db_err("`intermittent_event_count` must not be negative"),
3784                    )?;
3785
3786                let max_retries = get::<Option<i64>, _>(&row, "max_retries")?
3787                    .map(u32::try_from)
3788                    .transpose()
3789                    .map_err(|_| consistency_db_err("`max_retries` must not be negative"))?;
3790                let retry_exp_backoff_millis =
3791                    u32::try_from(get::<i64, _>(&row, "retry_exp_backoff_millis")?).map_err(
3792                        |_| consistency_db_err("`retry_exp_backoff_millis` must not be negative"),
3793                    )?;
3794                let executor_id: String = get(&row, "executor_id")?;
3795                let executor_id = ExecutorId::from_str(&executor_id)?;
3796                let run_id: String = get(&row, "run_id")?;
3797                let run_id = RunId::from_str(&run_id)?;
3798
3799                Ok(Some(ExpiredTimer::Lock(ExpiredLock {
3800                    execution_id,
3801                    locked_at_version: last_lock_version,
3802                    next_version: corresponding_version.increment(),
3803                    intermittent_event_count,
3804                    max_retries,
3805                    retry_exp_backoff: Duration::from_millis(u64::from(retry_exp_backoff_millis)),
3806                    locked_by: LockedBy {
3807                        executor_id,
3808                        run_id,
3809                    },
3810                })))
3811            };
3812
3813            match unpack() {
3814                Ok(Some(timer)) => expired_timers.push(timer),
3815                Ok(None) => {}
3816                Err(err) => warn!("Skipping corrupted row in get_expired_timers (locks): {err:?}"),
3817            }
3818        }
3819
3820        tx.commit().await?;
3821
3822        if !expired_timers.is_empty() {
3823            debug!("get_expired_timers found {expired_timers:?}");
3824        }
3825        Ok(expired_timers)
3826    }
3827
3828    async fn get_execution_event(
3829        &self,
3830        execution_id: &ExecutionId,
3831        version: &Version,
3832    ) -> Result<ExecutionEvent, DbErrorRead> {
3833        let mut client_guard = self.client.lock().await;
3834        let tx = client_guard.transaction().await?;
3835
3836        let event = get_execution_event(&tx, execution_id, version.0).await?;
3837
3838        tx.commit().await?;
3839        Ok(event)
3840    }
3841
3842    #[instrument(level = Level::DEBUG, skip_all)]
3843    async fn upsert_stub_response(
3844        &self,
3845        execution_id: ExecutionIdDerived,
3846        version: Version,
3847        req: AppendRequest,
3848        response: AppendResponseToExecution,
3849        current_time: DateTime<Utc>,
3850    ) -> Result<(), DbErrorStubResponse> {
3851        debug!("upsert_stub_response");
3852        #[cfg(debug_assertions)]
3853        {
3854            let (expected_parent, expected_join_set) = execution_id.split_to_parts();
3855            debug_assert_eq!(expected_parent, response.parent_execution_id);
3856            debug_assert_eq!(expected_join_set, response.join_set_id);
3857            debug_assert_eq!(execution_id, response.child_execution_id);
3858        }
3859        let execution_id = ExecutionId::Derived(execution_id);
3860        let expected_retval = response.result.clone();
3861        let version_for_read = version.0;
3862
3863        let mut client_guard = self.client.lock().await;
3864        let tx = client_guard.transaction().await?;
3865
3866        let notifiers = match append(&tx, &execution_id, req, version).await {
3867            Ok((_next_version, notifier_of_child)) => {
3868                let pending_at_parent = append_response(
3869                    &tx,
3870                    &response.parent_execution_id,
3871                    JoinSetResponseEventOuter {
3872                        created_at: response.created_at,
3873                        event: JoinSetResponseEvent {
3874                            join_set_id: response.join_set_id,
3875                            event: JoinSetResponse::ChildExecutionFinished {
3876                                child_execution_id: response.child_execution_id,
3877                                finished_version: response.finished_version,
3878                                result: response.result,
3879                            },
3880                        },
3881                    },
3882                )
3883                .await
3884                .map_err(DbErrorStubResponse::Write)?;
3885                Some(vec![notifier_of_child, pending_at_parent])
3886            }
3887            Err(DbErrorWrite::NonRetriable(DbErrorWriteNonRetriable::AlreadyFinished)) => {
3888                // Idempotency check: read the existing event and compare retval.
3889                let found = get_execution_event(&tx, &execution_id, version_for_read)
3890                    .await
3891                    .map_err(|_| DbErrorStubResponse::StubConflict)?;
3892                match found.event {
3893                    ExecutionRequest::Finished { retval, .. } if retval == expected_retval => None,
3894                    _ => return Err(DbErrorStubResponse::StubConflict),
3895                }
3896            }
3897            Err(other) => return Err(DbErrorStubResponse::Write(other)),
3898        };
3899        tx.commit().await?;
3900        drop(client_guard);
3901        if let Some(notifiers) = notifiers {
3902            self.notify_all(notifiers, current_time);
3903        }
3904        Ok(())
3905    }
3906
3907    async fn get_pending_state(
3908        &self,
3909        execution_id: &ExecutionId,
3910    ) -> Result<ExecutionWithState, DbErrorRead> {
3911        let mut client_guard = self.client.lock().await;
3912        let tx = client_guard.transaction().await?;
3913
3914        let combined_state = get_combined_state(&tx, execution_id).await?;
3915
3916        tx.commit().await?;
3917        Ok(combined_state.execution_with_state)
3918    }
3919}
3920
3921#[async_trait]
3922impl DbExternalApi for PostgresConnection {
3923    #[instrument(skip(self))]
3924    async fn get_backtrace(
3925        &self,
3926        execution_id: &ExecutionId,
3927        filter: BacktraceFilter,
3928    ) -> Result<BacktraceInfo, DbErrorRead> {
3929        debug!("get_backtrace");
3930
3931        let mut client_guard = self.client.lock().await;
3932        let tx = client_guard.transaction().await?;
3933
3934        let mut params: Vec<Box<dyn tokio_postgres::types::ToSql + Sync + Send>> = Vec::new();
3935
3936        params.push(Box::new(execution_id.to_string())); // $1
3937        let p_execution_id_idx = format!("${}", params.len()); // $1
3938
3939        let mut sql = String::new();
3940        write!(
3941            &mut sql,
3942            "SELECT component_id, version_min_including, version_max_excluding, wasm_backtrace \
3943            FROM t_execution_backtrace e INNER JOIN t_wasm_backtrace w ON e.backtrace_hash = w.backtrace_hash \
3944            WHERE execution_id = {p_execution_id_idx}"
3945        )
3946        .unwrap();
3947
3948        match &filter {
3949            BacktraceFilter::Specific(version) => {
3950                params.push(Box::new(i64::from(version.0))); // $2
3951                let p_ver_idx = format!("${}", params.len()); // $2
3952                write!(
3953                    &mut sql,
3954                    " AND version_min_including <= {p_ver_idx} AND version_max_excluding > {p_ver_idx}"
3955                )
3956                .unwrap();
3957            }
3958            BacktraceFilter::First => {
3959                sql.push_str(" ORDER BY version_min_including LIMIT 1");
3960            }
3961            BacktraceFilter::Last => {
3962                sql.push_str(" ORDER BY version_min_including DESC LIMIT 1");
3963            }
3964        }
3965
3966        let params_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> =
3967            params.iter().map(|p| p.as_ref() as _).collect();
3968
3969        let row = tx.query_one(&sql, &params_refs).await?;
3970
3971        let component_id: Json<ComponentId> = get(&row, "component_id")?;
3972        let component_id = component_id.0;
3973
3974        let version_min_including =
3975            Version::try_from(get::<i64, _>(&row, "version_min_including")?)?;
3976
3977        let version_max_excluding =
3978            Version::try_from(get::<i64, _>(&row, "version_max_excluding")?)?;
3979
3980        // wasm_backtrace stored as JSONB
3981        let wasm_backtrace: Json<WasmBacktrace> = get(&row, "wasm_backtrace")?;
3982        let wasm_backtrace = wasm_backtrace.0;
3983
3984        tx.commit().await?;
3985
3986        Ok(BacktraceInfo {
3987            execution_id: execution_id.clone(),
3988            component_id,
3989            version_min_including,
3990            version_max_excluding,
3991            wasm_backtrace,
3992        })
3993    }
3994
3995    #[instrument(skip_all)]
3996    async fn upsert_source_file(
3997        &self,
3998        component_digest: &ComponentDigest,
3999        frame_key: &str,
4000        is_suffix: bool,
4001        content: &str,
4002    ) -> Result<(), DbErrorWrite> {
4003        let content_hash: [u8; 32] = Sha256::digest(content.as_bytes()).into();
4004        let mut client_guard = self.client.lock().await;
4005        let tx = client_guard.transaction().await?;
4006        tx.execute(
4007            "INSERT INTO t_source_file (content_hash, content) \
4008             VALUES ($1, $2) \
4009             ON CONFLICT (content_hash) DO NOTHING",
4010            &[&content_hash.as_slice(), &content],
4011        )
4012        .await?;
4013        tx.execute(
4014            "INSERT INTO t_component_source \
4015             (component_digest, frame_key, is_suffix, content_hash) \
4016             VALUES ($1, $2, $3, $4) \
4017             ON CONFLICT (component_digest, frame_key, is_suffix) DO NOTHING",
4018            &[
4019                &component_digest.as_slice(),
4020                &frame_key,
4021                &is_suffix,
4022                &content_hash.as_slice(),
4023            ],
4024        )
4025        .await?;
4026        tx.commit().await?;
4027        Ok(())
4028    }
4029
4030    #[instrument(skip_all)]
4031    async fn get_source_file(
4032        &self,
4033        component_digest: &ComponentDigest,
4034        file: &str,
4035    ) -> Result<Option<String>, DbErrorRead> {
4036        let mut client_guard = self.client.lock().await;
4037        let tx = client_guard.transaction().await?;
4038        let rows = tx
4039            .query(
4040                "SELECT s.content \
4041                 FROM t_component_source cs \
4042                 JOIN t_source_file s ON cs.content_hash = s.content_hash \
4043                 WHERE cs.component_digest = $1 \
4044                   AND ( \
4045                       (NOT cs.is_suffix AND cs.frame_key = $2) \
4046                    OR (cs.is_suffix AND right($2, length(cs.frame_key)) = cs.frame_key) \
4047                   )",
4048                &[&component_digest.as_slice(), &file],
4049            )
4050            .await?;
4051        tx.commit().await?;
4052        match rows.len() {
4053            0 => Ok(None),
4054            1 => Ok(Some(get::<String, _>(&rows[0], "content")?)),
4055            _ => {
4056                warn!("Multiple suffix matches for '{file}', returning None");
4057                Ok(None)
4058            }
4059        }
4060    }
4061
4062    #[instrument(skip_all)]
4063    async fn upsert_component_metadata(
4064        &self,
4065        records: Vec<ComponentMetadataRecord>,
4066    ) -> Result<(), DbErrorWrite> {
4067        let mut client_guard = self.client.lock().await;
4068        let tx = client_guard.transaction().await?;
4069        for record in records {
4070            tx.execute(
4071                "INSERT INTO t_component_metadata \
4072                 (component_digest, imports_json, exports_json, wit, wit_origin) \
4073                 VALUES ($1, $2, $3, $4, $5) \
4074                 ON CONFLICT (component_digest) DO NOTHING",
4075                &[
4076                    &record.component_digest.as_slice(),
4077                    &Json(&record.imports),
4078                    &Json(&record.exports),
4079                    &record.wit,
4080                    &record.wit_origin,
4081                ],
4082            )
4083            .await?;
4084        }
4085        tx.commit().await?;
4086        Ok(())
4087    }
4088
4089    #[instrument(skip_all)]
4090    async fn insert_deployment_components(
4091        &self,
4092        deployment_id: DeploymentId,
4093        records: Vec<DeploymentComponentRecord>,
4094    ) -> Result<(), DbErrorWrite> {
4095        let mut client_guard = self.client.lock().await;
4096        let tx = client_guard.transaction().await?;
4097        for record in records {
4098            debug_assert_eq!(record.deployment_id, deployment_id);
4099            tx.execute(
4100                "INSERT INTO t_deployment_component \
4101                 (deployment_id, component_name, component_type, component_digest) \
4102                 VALUES ($1, $2, $3, $4) \
4103                 ON CONFLICT (deployment_id, component_name) DO NOTHING",
4104                &[
4105                    &deployment_id.to_string(),
4106                    &record.component_name.to_string(),
4107                    &record.component_type.to_string(),
4108                    &record.component_digest.as_slice(),
4109                ],
4110            )
4111            .await?;
4112        }
4113        tx.commit().await?;
4114        Ok(())
4115    }
4116
4117    #[instrument(skip_all)]
4118    async fn list_deployment_components(
4119        &self,
4120        deployment_id: DeploymentId,
4121    ) -> Result<Vec<DeploymentComponentDetail>, DbErrorRead> {
4122        let mut client_guard = self.client.lock().await;
4123        let tx = client_guard.transaction().await?;
4124        let rows = tx
4125            .query(
4126                "SELECT dc.component_name, dc.component_type, dc.component_digest, \
4127                        cm.imports_json, cm.exports_json, cm.wit \
4128                 FROM t_deployment_component dc \
4129                 JOIN t_component_metadata cm ON dc.component_digest = cm.component_digest \
4130                 WHERE dc.deployment_id = $1 \
4131                 ORDER BY dc.component_type, dc.component_name",
4132                &[&deployment_id.to_string()],
4133            )
4134            .await?;
4135        tx.commit().await?;
4136        rows.iter()
4137            .map(deployment_component_detail_from_pg_row)
4138            .collect()
4139    }
4140
4141    #[instrument(skip_all)]
4142    async fn get_deployment_component_wit(
4143        &self,
4144        deployment_id: DeploymentId,
4145        component_digest: &ComponentDigest,
4146    ) -> Result<Option<String>, DbErrorRead> {
4147        let mut client_guard = self.client.lock().await;
4148        let tx = client_guard.transaction().await?;
4149        let row = tx
4150            .query_opt(
4151                "SELECT cm.wit \
4152                 FROM t_deployment_component dc \
4153                 JOIN t_component_metadata cm ON dc.component_digest = cm.component_digest \
4154                 WHERE dc.deployment_id = $1 AND dc.component_digest = $2 \
4155                 LIMIT 1",
4156                &[&deployment_id.to_string(), &component_digest.as_slice()],
4157            )
4158            .await?;
4159        tx.commit().await?;
4160        row.map(|row| get::<String, _>(&row, "wit").map_err(DbErrorRead::Generic))
4161            .transpose()
4162    }
4163
4164    #[instrument(skip(self))]
4165    async fn list_executions(
4166        &self,
4167        filter: ListExecutionsFilter,
4168        pagination: ExecutionListPagination,
4169    ) -> Result<Vec<ExecutionWithState>, DbErrorGeneric> {
4170        let mut client_guard = self.client.lock().await;
4171        let tx = client_guard.transaction().await?;
4172
4173        let result = list_executions(&tx, filter, &pagination).await?;
4174
4175        tx.commit().await?;
4176        Ok(result)
4177    }
4178
4179    #[instrument(skip(self))]
4180    async fn list_execution_events(
4181        &self,
4182        execution_id: &ExecutionId,
4183        pagination: Pagination<VersionType>,
4184        include_backtrace_id: bool,
4185    ) -> Result<ListExecutionEventsResponse, DbErrorRead> {
4186        let mut client_guard = self.client.lock().await;
4187        let tx = client_guard.transaction().await?;
4188
4189        let events =
4190            list_execution_events(&tx, execution_id, pagination, include_backtrace_id).await?;
4191        let max_version = get_max_version(&tx, execution_id).await?;
4192
4193        tx.commit().await?;
4194        Ok(ListExecutionEventsResponse {
4195            events,
4196            max_version,
4197        })
4198    }
4199
4200    #[instrument(skip(self))]
4201    async fn list_responses(
4202        &self,
4203        execution_id: &ExecutionId,
4204        pagination: Pagination<u32>,
4205    ) -> Result<ListResponsesResponse, DbErrorRead> {
4206        let mut client_guard = self.client.lock().await;
4207        let tx = client_guard.transaction().await?;
4208
4209        let responses = list_responses(&tx, execution_id, Some(pagination)).await?;
4210        let max_cursor = get_max_response_cursor(&tx, execution_id).await?;
4211
4212        tx.commit().await?;
4213        Ok(ListResponsesResponse {
4214            responses,
4215            max_cursor,
4216        })
4217    }
4218
4219    #[instrument(skip(self))]
4220    async fn list_execution_events_responses(
4221        &self,
4222        execution_id: &ExecutionId,
4223        req_since: &Version,
4224        req_max_length: VersionType,
4225        req_include_backtrace_id: bool,
4226        resp_pagination: Pagination<u32>,
4227    ) -> Result<ExecutionWithStateRequestsResponses, DbErrorRead> {
4228        let mut client_guard = self.client.lock().await;
4229        let tx = client_guard.transaction().await?;
4230
4231        let combined_state = get_combined_state(&tx, execution_id).await?;
4232
4233        let events = list_execution_events(
4234            &tx,
4235            execution_id,
4236            Pagination::NewerThan {
4237                length: req_max_length
4238                    .try_into()
4239                    .expect("req_max_length fits in u16"),
4240                cursor: req_since.0,
4241                including_cursor: true,
4242            },
4243            req_include_backtrace_id,
4244        )
4245        .await?;
4246
4247        let responses = list_responses(&tx, execution_id, Some(resp_pagination)).await?;
4248        let max_version = get_max_version(&tx, execution_id).await?;
4249        let max_cursor = get_max_response_cursor(&tx, execution_id).await?;
4250
4251        tx.commit().await?;
4252
4253        Ok(ExecutionWithStateRequestsResponses {
4254            execution_with_state: combined_state.execution_with_state,
4255            events,
4256            responses,
4257            max_version,
4258            max_cursor,
4259        })
4260    }
4261
4262    #[instrument(skip(self))]
4263    async fn upgrade_execution_component(
4264        &self,
4265        execution_id: &ExecutionId,
4266        old: &ComponentDigest,
4267        new: &ComponentDigest,
4268    ) -> Result<(), DbErrorWrite> {
4269        let mut client_guard = self.client.lock().await;
4270        let tx = client_guard.transaction().await?;
4271
4272        upgrade_execution_component(&tx, execution_id, old, new).await?;
4273
4274        tx.commit().await?;
4275        Ok(())
4276    }
4277
4278    #[instrument(skip(self))]
4279    async fn list_logs(
4280        &self,
4281        execution_id: &ExecutionId,
4282        show_derived: bool,
4283        filter: LogFilter,
4284        pagination: Pagination<DateTime<Utc>>,
4285    ) -> Result<ListLogsResponse, DbErrorRead> {
4286        let mut client_guard = self.client.lock().await;
4287        let tx = client_guard.transaction().await?;
4288        let responses = list_logs_tx(&tx, execution_id, show_derived, &filter, &pagination).await?;
4289        tx.commit().await?;
4290        Ok(responses)
4291    }
4292
4293    #[instrument(skip(self))]
4294    async fn list_deployment_states(
4295        &self,
4296        current_time: DateTime<Utc>,
4297        pagination: Pagination<Option<DeploymentId>>,
4298        include_config_json: bool,
4299    ) -> Result<Vec<DeploymentState>, DbErrorRead> {
4300        let mut client_guard = self.client.lock().await;
4301        let tx = client_guard.transaction().await?;
4302        let deployments =
4303            list_deployment_states(&tx, current_time, pagination, include_config_json).await?;
4304        tx.commit().await?;
4305        Ok(deployments)
4306    }
4307
4308    #[instrument(skip(self))]
4309    async fn insert_deployment(&self, record: DeploymentRecord) -> Result<(), DbErrorWrite> {
4310        assert_eq!(
4311            record.status,
4312            DeploymentStatus::Inactive,
4313            "insert_deployment requires Inactive status"
4314        );
4315        assert!(
4316            record.last_active_at.is_none(),
4317            "insert_deployment requires last_active_at == None"
4318        );
4319        let mut client_guard = self.client.lock().await;
4320        let tx = client_guard.transaction().await?;
4321        tx.execute(
4322            "INSERT INTO t_deployment \
4323             (deployment_id, created_at, status, config_json, obelisk_version, created_by) \
4324             VALUES ($1, $2, $3, $4, $5, $6)",
4325            &[
4326                &record.deployment_id.to_string(), // $1
4327                &record.created_at,                // $2
4328                &record.status.as_str(),           // $3
4329                &record.config_json,               // $4
4330                &record.obelisk_version,           // $5
4331                &record.created_by,                // $6
4332            ],
4333        )
4334        .await?;
4335        tx.commit().await?;
4336        Ok(())
4337    }
4338
4339    #[instrument(skip(self))]
4340    async fn activate_deployment(
4341        &self,
4342        deployment_id: DeploymentId,
4343        now: DateTime<Utc>,
4344    ) -> Result<(), DbErrorWrite> {
4345        let mut client_guard = self.client.lock().await;
4346        let tx = client_guard.transaction().await?;
4347        // Demote currently active or enqueued deployment to inactive.
4348        tx.execute(
4349            "UPDATE t_deployment SET status = 'inactive' WHERE status IN ('active', 'enqueued')",
4350            &[],
4351        )
4352        .await?;
4353        // Set target deployment to active, recording activation time.
4354        let rows = tx
4355            .execute(
4356                "UPDATE t_deployment SET status = 'active', last_active_at = $1 WHERE deployment_id = $2",
4357                &[&now, &deployment_id.to_string()],
4358            )
4359            .await?;
4360        tx.commit().await?;
4361        if rows == 0 {
4362            return Err(DbErrorWrite::NotFound);
4363        }
4364        Ok(())
4365    }
4366
4367    async fn enqueue_deployment(&self, deployment_id: DeploymentId) -> Result<(), DbErrorWrite> {
4368        let mut client_guard = self.client.lock().await;
4369        let tx = client_guard.transaction().await?;
4370        // Guard: reject if target deployment is currently active.
4371        let status_opt = tx
4372            .query_opt(
4373                "SELECT status FROM t_deployment WHERE deployment_id = $1",
4374                &[&deployment_id.to_string()],
4375            )
4376            .await?;
4377        match status_opt.as_ref().map(|r| r.get::<_, &str>("status")) {
4378            None => return Err(DbErrorWrite::NotFound),
4379            Some("active") => return Err(DbErrorWriteNonRetriable::Conflict.into()),
4380            _ => {}
4381        }
4382        // Demote any previously enqueued deployment to inactive.
4383        tx.execute(
4384            "UPDATE t_deployment SET status = 'inactive' WHERE status = 'enqueued'",
4385            &[],
4386        )
4387        .await?;
4388        // Set target deployment to enqueued.
4389        let rows = tx
4390            .execute(
4391                "UPDATE t_deployment SET status = 'enqueued' WHERE deployment_id = $1",
4392                &[&deployment_id.to_string()],
4393            )
4394            .await?;
4395        tx.commit().await?;
4396        if rows == 0 {
4397            return Err(DbErrorWrite::NotFound);
4398        }
4399        Ok(())
4400    }
4401
4402    #[instrument(skip(self))]
4403    async fn get_deployment(
4404        &self,
4405        deployment_id: DeploymentId,
4406    ) -> Result<Option<DeploymentRecord>, DbErrorRead> {
4407        let mut client_guard = self.client.lock().await;
4408        let tx = client_guard.transaction().await?;
4409        let row = tx
4410            .query_opt(
4411                "SELECT deployment_id, created_at, last_active_at, status, config_json, obelisk_version, created_by \
4412                 FROM t_deployment WHERE deployment_id = $1",
4413                &[&deployment_id.to_string()],
4414            )
4415            .await?;
4416        tx.commit().await?;
4417        match row {
4418            None => Ok(None),
4419            Some(r) => Ok(Some(deployment_record_from_pg_row(&r)?)),
4420        }
4421    }
4422
4423    #[instrument(skip(self))]
4424    #[cfg(feature = "test")]
4425    async fn get_active_deployment(&self) -> Result<Option<DeploymentRecord>, DbErrorRead> {
4426        let mut client_guard = self.client.lock().await;
4427        let tx = client_guard.transaction().await?;
4428        let row = tx
4429            .query_opt(
4430                "SELECT deployment_id, created_at, last_active_at, status, config_json, obelisk_version, created_by \
4431                 FROM t_deployment WHERE status = 'active' LIMIT 1",
4432                &[],
4433            )
4434            .await?;
4435        tx.commit().await?;
4436        match row {
4437            None => Ok(None),
4438            Some(r) => Ok(Some(deployment_record_from_pg_row(&r)?)),
4439        }
4440    }
4441
4442    async fn get_current_deployment(&self) -> Result<Option<DeploymentRecord>, DbErrorRead> {
4443        let mut client_guard = self.client.lock().await;
4444        let tx = client_guard.transaction().await?;
4445        let row = tx
4446            .query_opt(
4447                "SELECT deployment_id, created_at, last_active_at, status, config_json, obelisk_version, created_by \
4448                 FROM t_deployment WHERE status IN ('enqueued', 'active') \
4449                 ORDER BY CASE status WHEN 'enqueued' THEN 0 ELSE 1 END LIMIT 1",
4450                &[],
4451            )
4452            .await?;
4453        tx.commit().await?;
4454        match row {
4455            None => Ok(None),
4456            Some(r) => Ok(Some(deployment_record_from_pg_row(&r)?)),
4457        }
4458    }
4459
4460    #[instrument(skip(self))]
4461    async fn list_deployments(
4462        &self,
4463        pagination: Pagination<Option<DeploymentId>>,
4464    ) -> Result<Vec<DeploymentRecord>, DbErrorRead> {
4465        let mut client_guard = self.client.lock().await;
4466        let tx = client_guard.transaction().await?;
4467        let mut params: Vec<Box<dyn tokio_postgres::types::ToSql + Sync + Send>> = Vec::new();
4468        let mut add_param = |p: Box<dyn tokio_postgres::types::ToSql + Sync + Send>| {
4469            params.push(p);
4470            format!("${}", params.len())
4471        };
4472
4473        let mut sql = String::from(
4474            "SELECT deployment_id, created_at, last_active_at, status, config_json, obelisk_version, created_by \
4475             FROM t_deployment",
4476        );
4477
4478        if let Some(cursor) = pagination.cursor() {
4479            let p_cursor = add_param(Box::new(cursor.to_string()));
4480            write!(
4481                sql,
4482                " WHERE deployment_id {rel} {p_cursor}",
4483                rel = pagination.rel()
4484            )
4485            .expect("writing to string");
4486        }
4487
4488        let (inner_order, outer_order) = if pagination.is_desc() {
4489            ("DESC", "")
4490        } else {
4491            ("ASC", "DESC")
4492        };
4493
4494        write!(
4495            sql,
4496            " ORDER BY deployment_id {inner_order} LIMIT {limit}",
4497            limit = pagination.length()
4498        )
4499        .expect("writing to string");
4500
4501        let final_sql = if outer_order.is_empty() {
4502            sql
4503        } else {
4504            format!("SELECT * FROM ({sql}) AS sub ORDER BY deployment_id {outer_order}")
4505        };
4506
4507        let params_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> =
4508            params.iter().map(|p| p.as_ref() as _).collect();
4509
4510        let rows = tx.query(&final_sql, &params_refs).await?;
4511        tx.commit().await?;
4512
4513        rows.iter()
4514            .map(deployment_record_from_pg_row)
4515            .collect::<Result<Vec<_>, _>>()
4516    }
4517
4518    #[instrument(skip(self))]
4519    async fn pause_execution(
4520        &self,
4521        execution_id: &ExecutionId,
4522        paused_at: DateTime<Utc>,
4523    ) -> Result<AppendResponse, DbErrorWrite> {
4524        let mut client_guard = self.client.lock().await;
4525        let tx = client_guard.transaction().await?;
4526
4527        let combined_state = get_combined_state(&tx, execution_id).await?;
4528        let appending_version = combined_state.get_next_version_fail_if_finished()?;
4529        debug!("Pausing with {appending_version}");
4530        let next_version = if matches!(
4531            combined_state.execution_with_state.pending_state,
4532            PendingState::Locked(_)
4533        ) {
4534            let (next_version, _notifier) = append(
4535                &tx,
4536                execution_id,
4537                AppendRequest {
4538                    created_at: paused_at,
4539                    event: ExecutionRequest::Unlocked {
4540                        backoff_expires_at: paused_at,
4541                        reason: StrVariant::Static("paused"),
4542                    },
4543                },
4544                appending_version,
4545            )
4546            .await?;
4547            next_version
4548        } else {
4549            appending_version
4550        };
4551        let (next_version, _notifier) = append(
4552            &tx,
4553            execution_id,
4554            AppendRequest {
4555                created_at: paused_at,
4556                event: ExecutionRequest::Paused,
4557            },
4558            next_version,
4559        )
4560        .await?;
4561        tx.commit().await?;
4562        Ok(next_version)
4563    }
4564
4565    #[instrument(skip(self))]
4566    async fn unpause_execution(
4567        &self,
4568        execution_id: &ExecutionId,
4569        unpaused_at: DateTime<Utc>,
4570    ) -> Result<AppendResponse, DbErrorWrite> {
4571        let mut client_guard = self.client.lock().await;
4572        let tx = client_guard.transaction().await?;
4573
4574        let combined_state = get_combined_state(&tx, execution_id).await?;
4575        let appending_version = combined_state.get_next_version_fail_if_finished()?;
4576        debug!("Unpausing with {appending_version}");
4577        let (next_version, _) = append(
4578            &tx,
4579            execution_id,
4580            AppendRequest {
4581                created_at: unpaused_at,
4582                event: ExecutionRequest::Unpaused,
4583            },
4584            appending_version,
4585        )
4586        .await?;
4587
4588        tx.commit().await?;
4589        Ok(next_version)
4590    }
4591
4592    #[instrument(skip(self))]
4593    async fn pause_delay(&self, delay_id: &DelayId) -> Result<(), DbErrorWrite> {
4594        let (execution_id, join_set_id) = delay_id.split_to_parts();
4595        let client_guard = self.client.lock().await;
4596        let rows_modified = client_guard
4597            .execute(
4598                "UPDATE t_delay SET is_paused = TRUE \
4599                 WHERE execution_id = $1 AND join_set_id = $2 AND delay_id = $3",
4600                &[
4601                    &execution_id.to_string(),
4602                    &join_set_id.to_string(),
4603                    &delay_id.to_string(),
4604                ],
4605            )
4606            .await?;
4607        if rows_modified == 0 {
4608            return Err(DbErrorWrite::NotFound);
4609        }
4610        Ok(())
4611    }
4612
4613    #[instrument(skip(self))]
4614    async fn unpause_delay(&self, delay_id: &DelayId) -> Result<(), DbErrorWrite> {
4615        let (execution_id, join_set_id) = delay_id.split_to_parts();
4616        let client_guard = self.client.lock().await;
4617        let rows_modified = client_guard
4618            .execute(
4619                "UPDATE t_delay SET is_paused = FALSE \
4620                 WHERE execution_id = $1 AND join_set_id = $2 AND delay_id = $3",
4621                &[
4622                    &execution_id.to_string(),
4623                    &join_set_id.to_string(),
4624                    &delay_id.to_string(),
4625                ],
4626            )
4627            .await?;
4628        if rows_modified == 0 {
4629            return Err(DbErrorWrite::NotFound);
4630        }
4631        Ok(())
4632    }
4633}
4634
4635#[async_trait]
4636impl DbPoolCloseable for PostgresPool {
4637    async fn close(&self) {
4638        self.pool.close();
4639    }
4640}
4641
4642#[cfg(feature = "test")]
4643#[async_trait]
4644impl concepts::storage::DbConnectionTest for PostgresConnection {
4645    #[instrument(level = Level::DEBUG, skip(self, response_event), fields(join_set_id = %response_event.join_set_id))]
4646    async fn append_response(
4647        &self,
4648        created_at: DateTime<Utc>,
4649        execution_id: ExecutionId,
4650        response_event: JoinSetResponseEvent,
4651    ) -> Result<(), DbErrorWrite> {
4652        debug!("append_response");
4653        let event = JoinSetResponseEventOuter {
4654            created_at,
4655            event: response_event,
4656        };
4657
4658        let mut client_guard = self.client.lock().await;
4659        let tx = client_guard.transaction().await?;
4660
4661        let notifier = append_response(&tx, &execution_id, event).await?;
4662
4663        tx.commit().await?;
4664        drop(client_guard);
4665
4666        self.notify_all(vec![notifier], created_at);
4667        Ok(())
4668    }
4669}
4670
4671#[cfg(feature = "test")]
4672impl PostgresPool {
4673    pub async fn drop_database(&self) {
4674        let mut cfg = deadpool_postgres::Config::new();
4675        cfg.host = Some(self.config.host.clone());
4676        cfg.user = Some(self.config.user.clone());
4677        cfg.password = Some(self.config.password.expose_secret().to_string());
4678        cfg.dbname = Some(ADMIN_DB_NAME.into());
4679        cfg.manager = Some(ManagerConfig {
4680            recycling_method: RecyclingMethod::Fast,
4681        });
4682
4683        let pool = cfg
4684            .create_pool(None, NoTls)
4685            .map_err(|err| {
4686                error!("Cannot create the default pool - {err:?}");
4687                InitializationError
4688            })
4689            .unwrap();
4690
4691        let client = pool
4692            .get()
4693            .await
4694            .map_err(|err| {
4695                error!("Cannot get a connection from the default pool - {err:?}");
4696                InitializationError
4697            })
4698            .unwrap();
4699        for _ in 0..3 {
4700            let res = client
4701                .execute(&format!("DROP DATABASE {}", self.config.db_name), &[])
4702                .await; // Waits 1s on error, no need to sleep more.
4703            if res.is_ok() {
4704                debug!("Database '{}' dropped.", self.config.db_name);
4705                return;
4706            }
4707            debug!("Dropping db failed - {res:?}",);
4708        }
4709        warn!("Did not drop database {}", self.config.db_name);
4710    }
4711}