1use crate::postgres_dao::ddl::{ADMIN_DB_NAME, T_METADATA_EXPECTED_SCHEMA_VERSION};
2use async_trait::async_trait;
3use chrono::{DateTime, Utc};
4use concepts::{
5 ComponentId, ComponentRetryConfig, ComponentType, ExecutionId, FunctionFqn, JoinSetId,
6 StrVariant, SupportedFunctionReturnValue,
7 component_id::{ComponentDigest, Digest},
8 prefixed_ulid::{DelayId, DeploymentId, ExecutionIdDerived, ExecutorId, RunId},
9 storage::{
10 AppendBatchResponse, AppendDelayResponseOutcome, AppendEventsToExecution, AppendRequest,
11 AppendResponse, AppendResponseToExecution, BacktraceFilter, BacktraceInfo, CreateRequest,
12 DUMMY_CREATED, DUMMY_HISTORY_EVENT, DbConnection, DbErrorGeneric, DbErrorRead,
13 DbErrorReadWithTimeout, DbErrorWrite, DbErrorWriteNonRetriable, DbExecutor, DbExternalApi,
14 DbPool, DbPoolCloseable, DeploymentRecord, DeploymentState, DeploymentStatus,
15 ExecutionEvent, ExecutionListPagination, ExecutionRequest, ExecutionWithState,
16 ExecutionWithStateRequestsResponses, ExpiredDelay, ExpiredLock, ExpiredTimer,
17 HISTORY_EVENT_TYPE_JOIN_NEXT, HistoryEvent, JoinSetRequest, JoinSetResponse,
18 JoinSetResponseEvent, JoinSetResponseEventOuter, ListExecutionEventsResponse,
19 ListExecutionsFilter, ListLogsResponse, ListResponsesResponse, LockPendingResponse, Locked,
20 LockedBy, LockedExecution, LogEntry, LogEntryRow, LogFilter, LogInfoAppendRow, LogLevel,
21 LogStreamType, Pagination, PendingState, PendingStateBlockedByJoinSet,
22 PendingStateFinishedResultKind, PendingStateMergedPause, ResponseCursor,
23 ResponseWithCursor, STATE_BLOCKED_BY_JOIN_SET, STATE_FINISHED, STATE_LOCKED,
24 STATE_PENDING_AT, TimeoutOutcome, Version, VersionType, WasmBacktrace,
25 },
26};
27use db_common::{
28 AppendNotifier, CombinedState, CombinedStateDTO, NotifierExecutionFinished, NotifierPendingAt,
29 PendingFfqnSubscribersHolder,
30};
31use deadpool_postgres::{Client, ManagerConfig, Pool, RecyclingMethod};
32use hashbrown::HashMap;
33use secrecy::{ExposeSecret as _, SecretString};
34use sha2::{Digest as _, Sha256};
35use std::{collections::VecDeque, pin::Pin, str::FromStr as _, sync::Arc, time::Duration};
36use std::{fmt::Write as _, panic::Location};
37use strum::IntoEnumIterator as _;
38use tokio::sync::{mpsc, oneshot};
39use tokio_postgres::{
40 NoTls, Row, Transaction,
41 row::RowIndex,
42 types::{FromSql, Json, ToSql},
43};
44use tracing::{Level, debug, error, info, instrument, trace, warn};
45use tracing_error::SpanTrace;
46
47#[track_caller]
48fn get<'a, T: FromSql<'a>, I: RowIndex + std::fmt::Display + Copy>(
49 row: &'a Row,
50 name: I,
51) -> Result<T, DbErrorGeneric> {
52 match row.try_get(name) {
53 Ok(ok) => Ok(ok),
54 Err(err) => {
55 Err(consistency_db_err(format!(
57 "Failed to retrieve column '{name}': {err:?}"
58 )))
59 }
60 }
61}
62
63mod ddl {
64 use super::{STATE_LOCKED, STATE_PENDING_AT};
65 use concepts::storage::HISTORY_EVENT_TYPE_JOIN_NEXT;
66 use const_format::formatcp;
67
68 pub const ADMIN_DB_NAME: &str = "postgres";
69
70 pub const T_METADATA_EXPECTED_SCHEMA_VERSION: i32 = 3;
71
72 pub const CREATE_TABLE_T_METADATA: &str = r"
74CREATE TABLE IF NOT EXISTS t_metadata (
75 id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
76 schema_version INTEGER NOT NULL,
77 created_at TIMESTAMPTZ NOT NULL
78);
79";
80
81 pub const CREATE_TABLE_T_EXECUTION_LOG: &str = r"
83CREATE TABLE IF NOT EXISTS t_execution_log (
84 execution_id TEXT NOT NULL,
85 created_at TIMESTAMPTZ NOT NULL,
86 json_value JSON NOT NULL,
87 version BIGINT NOT NULL CHECK (version >= 0),
88 variant TEXT NOT NULL,
89 join_set_id TEXT,
90 history_event_type TEXT GENERATED ALWAYS AS (json_value #>> '{history_event,event,type}') STORED,
91 PRIMARY KEY (execution_id, version)
92);
93";
94
95 pub const CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_VERSION: &str = r"
97CREATE INDEX IF NOT EXISTS idx_t_execution_log_execution_id_version ON t_execution_log (execution_id, version);
98";
99
100 pub const CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_VARIANT: &str = r"
101CREATE INDEX IF NOT EXISTS idx_t_execution_log_execution_id_variant ON t_execution_log (execution_id, variant);
102";
103
104 pub const CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_JOIN_SET: &str = const_format::formatcp!(
105 "CREATE INDEX IF NOT EXISTS idx_t_execution_log_execution_id_join_set ON t_execution_log
106 (execution_id, join_set_id, history_event_type) WHERE history_event_type='{}';",
107 HISTORY_EVENT_TYPE_JOIN_NEXT
108 );
109
110 pub const CREATE_TABLE_T_JOIN_SET_RESPONSE: &str = r"
112CREATE TABLE IF NOT EXISTS t_join_set_response (
113 id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
114 created_at TIMESTAMPTZ NOT NULL,
115 execution_id TEXT NOT NULL,
116 join_set_id TEXT NOT NULL,
117
118 delay_id TEXT,
119 delay_success BOOLEAN,
120
121 child_execution_id TEXT,
122 finished_version BIGINT CHECK (finished_version >= 0),
123
124 UNIQUE (execution_id, join_set_id, delay_id, child_execution_id)
125);
126";
127
128 pub const CREATE_INDEX_IDX_T_JOIN_SET_RESPONSE_EXECUTION_ID_ID: &str = r"
130CREATE INDEX IF NOT EXISTS idx_t_join_set_response_execution_id_id ON t_join_set_response (execution_id, id);
131";
132
133 pub const CREATE_INDEX_IDX_JOIN_SET_RESPONSE_UNIQUE_CHILD_ID: &str = r"
134CREATE UNIQUE INDEX IF NOT EXISTS idx_join_set_response_unique_child_id
135ON t_join_set_response (child_execution_id) WHERE child_execution_id IS NOT NULL;
136";
137
138 pub const CREATE_INDEX_IDX_JOIN_SET_RESPONSE_UNIQUE_DELAY_ID: &str = r"
139CREATE UNIQUE INDEX IF NOT EXISTS idx_join_set_response_unique_delay_id
140ON t_join_set_response (delay_id) WHERE delay_id IS NOT NULL;
141";
142
143 pub const CREATE_TABLE_T_STATE: &str = r"
145CREATE TABLE IF NOT EXISTS t_state (
146 execution_id TEXT NOT NULL,
147 is_top_level BOOLEAN NOT NULL,
148 corresponding_version BIGINT NOT NULL CHECK (corresponding_version >= 0),
149 ffqn TEXT NOT NULL,
150 created_at TIMESTAMPTZ NOT NULL,
151 component_id_input_digest BYTEA NOT NULL,
152 component_type TEXT NOT NULL,
153 first_scheduled_at TIMESTAMPTZ NOT NULL,
154 deployment_id TEXT NOT NULL,
155 is_paused BOOLEAN NOT NULL,
156
157 pending_expires_finished TIMESTAMPTZ NOT NULL,
158 state TEXT NOT NULL,
159 updated_at TIMESTAMPTZ NOT NULL,
160 intermittent_event_count BIGINT NOT NULL CHECK (intermittent_event_count >=0),
161
162 max_retries BIGINT CHECK (max_retries >= 0),
163 retry_exp_backoff_millis BIGINT CHECK (retry_exp_backoff_millis >= 0),
164 last_lock_version BIGINT CHECK (last_lock_version >= 0),
165 executor_id TEXT,
166 run_id TEXT,
167
168 join_set_id TEXT,
169 join_set_closing BOOLEAN,
170
171 result_kind JSONB,
172
173 PRIMARY KEY (execution_id)
174);
175";
176
177 pub const IDX_T_STATE_LOCK_PENDING_BY_FFQN: &str = formatcp!(
180 "CREATE INDEX IF NOT EXISTS idx_t_state_lock_pending_by_ffqn ON t_state (state, pending_expires_finished, ffqn) WHERE state = '{}';",
181 STATE_PENDING_AT
182 );
183 pub const IDX_T_STATE_LOCK_PENDING_BY_COMPONENT: &str = formatcp!(
185 "CREATE INDEX IF NOT EXISTS idx_t_state_lock_pending_by_component ON t_state (state, pending_expires_finished, component_id_input_digest) WHERE state = '{}';",
186 STATE_PENDING_AT
187 );
188
189 pub const IDX_T_STATE_EXPIRED_LOCKS: &str = formatcp!(
190 "CREATE INDEX IF NOT EXISTS idx_t_state_expired_locks ON t_state (pending_expires_finished) WHERE state = '{}';",
191 STATE_LOCKED
192 );
193
194 pub const IDX_T_STATE_EXECUTION_ID_IS_TOP_LEVEL: &str = r"
195CREATE INDEX IF NOT EXISTS idx_t_state_execution_id_is_root ON t_state (execution_id, is_top_level);
196";
197
198 pub const IDX_T_STATE_FFQN: &str = r"
199CREATE INDEX IF NOT EXISTS idx_t_state_ffqn ON t_state (ffqn);
200";
201
202 pub const IDX_T_STATE_CREATED_AT: &str = r"
203CREATE INDEX IF NOT EXISTS idx_t_state_created_at ON t_state (created_at);
204";
205 pub const IDX_T_STATE_DEPLOYMENT_STATE: &str = r"
207CREATE INDEX IF NOT EXISTS idx_t_state_deployment_state ON t_state (deployment_id, state);
208";
209
210 pub const CREATE_TABLE_T_DELAY: &str = r"
212CREATE TABLE IF NOT EXISTS t_delay (
213 execution_id TEXT NOT NULL,
214 join_set_id TEXT NOT NULL,
215 delay_id TEXT NOT NULL,
216 expires_at TIMESTAMPTZ NOT NULL,
217 PRIMARY KEY (execution_id, join_set_id, delay_id)
218);
219";
220
221 pub const CREATE_TABLE_T_WASM_BACKTRACE: &str = r"
224CREATE TABLE IF NOT EXISTS t_wasm_backtrace (
225 backtrace_hash BYTEA PRIMARY KEY,
226 wasm_backtrace JSONB NOT NULL
227);
228";
229
230 pub const CREATE_TABLE_T_EXECUTION_BACKTRACE: &str = r"
231CREATE TABLE IF NOT EXISTS t_execution_backtrace (
232 execution_id TEXT NOT NULL,
233 component_id JSONB NOT NULL,
234 version_min_including BIGINT NOT NULL CHECK (version_min_including >= 0),
235 version_max_excluding BIGINT NOT NULL CHECK (version_max_excluding >= 0),
236 backtrace_hash BYTEA NOT NULL,
237 PRIMARY KEY (execution_id, version_min_including, version_max_excluding),
238 FOREIGN KEY (backtrace_hash) REFERENCES t_wasm_backtrace(backtrace_hash)
239);
240";
241
242 pub const IDX_T_EXECUTION_BACKTRACE_EXECUTION_ID_VERSION: &str = r"
243CREATE INDEX IF NOT EXISTS idx_t_execution_backtrace_execution_id_version ON t_execution_backtrace (execution_id, version_min_including, version_max_excluding);
244";
245
246 pub const CREATE_TABLE_T_SOURCE_FILE: &str = r"
248CREATE TABLE IF NOT EXISTS t_source_file (
249 content_hash BYTEA PRIMARY KEY,
250 content TEXT NOT NULL
251);
252";
253 pub const CREATE_TABLE_T_COMPONENT_SOURCE: &str = r"
254CREATE TABLE IF NOT EXISTS t_component_source (
255 component_digest BYTEA NOT NULL,
256 frame_key TEXT NOT NULL,
257 is_suffix BOOLEAN NOT NULL,
258 content_hash BYTEA NOT NULL,
259 PRIMARY KEY (component_digest, frame_key, is_suffix),
260 FOREIGN KEY (content_hash) REFERENCES t_source_file(content_hash)
261);
262";
263
264 pub const CREATE_TABLE_T_LOG: &str = r"
266CREATE TABLE IF NOT EXISTS t_log (
267 id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
268 execution_id TEXT NOT NULL,
269 run_id TEXT NOT NULL,
270 created_at TIMESTAMPTZ NOT NULL,
271 level INTEGER,
272 message TEXT,
273 stream_type INTEGER,
274 payload BYTEA
275);
276";
277 pub const IDX_T_LOG_EXECUTION_ID_RUN_ID_CREATED_AT: &str = r"
278CREATE INDEX IF NOT EXISTS idx_t_log_execution_id_run_id_created_at ON t_log (execution_id, run_id, created_at);
279";
280 pub const IDX_T_LOG_EXECUTION_ID_CREATED_AT: &str = r"
281CREATE INDEX IF NOT EXISTS idx_t_log_execution_id_created_at ON t_log (execution_id, created_at);
282";
283
284 pub const CREATE_TABLE_T_DEPLOYMENT: &str = r"
286CREATE TABLE IF NOT EXISTS t_deployment (
287 deployment_id TEXT NOT NULL PRIMARY KEY,
288 created_at TIMESTAMPTZ NOT NULL,
289 last_active_at TIMESTAMPTZ,
290 status TEXT NOT NULL DEFAULT 'inactive',
291 config_json TEXT NOT NULL,
292 obelisk_version TEXT NOT NULL,
293 created_by TEXT
294);
295";
296 pub const IDX_T_DEPLOYMENT_STATUS: &str = r"
297CREATE INDEX IF NOT EXISTS idx_t_deployment_status ON t_deployment (status);
298";
299 pub const IDX_T_DEPLOYMENT_SINGLE_ACTIVE: &str = r"
301CREATE UNIQUE INDEX IF NOT EXISTS idx_t_deployment_single_active ON t_deployment ((1)) WHERE status = 'active';
302";
303 pub const IDX_T_DEPLOYMENT_SINGLE_ENQUEUED: &str = r"
305CREATE UNIQUE INDEX IF NOT EXISTS idx_t_deployment_single_enqueued ON t_deployment ((1)) WHERE status = 'enqueued';
306";
307}
308
309#[derive(Debug, Clone)]
310pub struct PostgresConfig {
311 pub host: String,
312 pub user: String,
313 pub password: SecretString,
314 pub db_name: String,
315}
316
317#[derive(Debug, thiserror::Error)]
318#[error("initialization error")]
319pub struct InitializationError;
320
321async fn create_database(
322 config: &PostgresConfig,
323 provision_policy: ProvisionPolicy,
324) -> Result<DbInitialzationOutcome, InitializationError> {
325 let mut admin_cfg = deadpool_postgres::Config::new();
326 admin_cfg.host = Some(config.host.clone());
327 admin_cfg.user = Some(config.user.clone());
328 admin_cfg.password = Some(config.password.expose_secret().to_string());
329 admin_cfg.dbname = Some(ADMIN_DB_NAME.into());
330 admin_cfg.manager = Some(ManagerConfig {
331 recycling_method: RecyclingMethod::Fast,
332 });
333
334 let admin_pool = admin_cfg.create_pool(None, NoTls).map_err(|err| {
335 error!("Cannot create the default pool - {err:?}");
336 InitializationError
337 })?;
338
339 let client = admin_pool.get().await.map_err(|err| {
340 error!("Cannot get a connection from the default pool - {err:?}");
341 InitializationError
342 })?;
343
344 let row = client
345 .query_opt(
346 &format!(
347 "SELECT 1 FROM pg_database WHERE datname = '{}'",
348 config.db_name
349 ),
350 &[],
351 )
352 .await
353 .map_err(|err| {
354 error!("Cannot select from the default database - {err:?}");
355 InitializationError
356 })?;
357
358 match (row, provision_policy) {
359 (None, ProvisionPolicy::MustCreate | ProvisionPolicy::Auto) => {
360 client
361 .execute(&format!("CREATE DATABASE {}", config.db_name), &[])
362 .await
363 .map_err(|err| {
364 error!("Cannot create the database - {err:?}");
365 InitializationError
366 })?;
367 info!("Database '{}' created.", config.db_name);
368 Ok(DbInitialzationOutcome::Created)
369 }
370 (Some(_), ProvisionPolicy::Auto) => {
371 info!("Database '{}' exists.", config.db_name);
372 Ok(DbInitialzationOutcome::Existing)
373 }
374 (Some(_), ProvisionPolicy::MustCreate) => {
375 warn!("Database '{}' already exists.", config.db_name);
376 Err(InitializationError)
377 }
378 (_, ProvisionPolicy::NeverCreate) => unreachable!("checked by the caller"),
379 }
380}
381
382type ResponseSubscribers =
384 Arc<std::sync::Mutex<HashMap<ExecutionId, (oneshot::Sender<ResponseWithCursor>, u64)>>>;
385type PendingSubscribers = Arc<std::sync::Mutex<PendingFfqnSubscribersHolder>>;
386type ExecutionFinishedSubscribers = std::sync::Mutex<
387 HashMap<ExecutionId, HashMap<u64, oneshot::Sender<SupportedFunctionReturnValue>>>,
388>;
389
390pub struct PostgresPool {
391 pool: Pool,
392 response_subscribers: ResponseSubscribers,
393 pending_subscribers: PendingSubscribers,
394 execution_finished_subscribers: Arc<ExecutionFinishedSubscribers>,
395 pub config: PostgresConfig,
397}
398
399#[async_trait]
400impl DbPool for PostgresPool {
401 async fn db_exec_conn(&self) -> Result<Box<dyn DbExecutor>, DbErrorGeneric> {
402 let client = self.pool.get().await?;
403
404 Ok(Box::new(PostgresConnection {
405 client: tokio::sync::Mutex::new(client),
406 response_subscribers: self.response_subscribers.clone(),
407 pending_subscribers: self.pending_subscribers.clone(),
408 execution_finished_subscribers: self.execution_finished_subscribers.clone(),
409 }))
410 }
411
412 async fn connection(&self) -> Result<Box<dyn DbConnection>, DbErrorGeneric> {
413 let client = self.pool.get().await?;
414
415 Ok(Box::new(PostgresConnection {
416 client: tokio::sync::Mutex::new(client),
417 response_subscribers: self.response_subscribers.clone(),
418 pending_subscribers: self.pending_subscribers.clone(),
419 execution_finished_subscribers: self.execution_finished_subscribers.clone(),
420 }))
421 }
422
423 async fn external_api_conn(&self) -> Result<Box<dyn DbExternalApi>, DbErrorGeneric> {
424 let client = self.pool.get().await?;
425
426 Ok(Box::new(PostgresConnection {
427 client: tokio::sync::Mutex::new(client),
428 response_subscribers: self.response_subscribers.clone(),
429 pending_subscribers: self.pending_subscribers.clone(),
430 execution_finished_subscribers: self.execution_finished_subscribers.clone(),
431 }))
432 }
433
434 #[cfg(feature = "test")]
435 async fn connection_test(
436 &self,
437 ) -> Result<Box<dyn concepts::storage::DbConnectionTest>, DbErrorGeneric> {
438 let client = self.pool.get().await?;
439
440 Ok(Box::new(PostgresConnection {
441 client: tokio::sync::Mutex::new(client),
442 response_subscribers: self.response_subscribers.clone(),
443 pending_subscribers: self.pending_subscribers.clone(),
444 execution_finished_subscribers: self.execution_finished_subscribers.clone(),
445 }))
446 }
447}
448
449pub struct PostgresConnection {
450 client: tokio::sync::Mutex<Client>, response_subscribers: ResponseSubscribers,
452 pending_subscribers: PendingSubscribers,
453 execution_finished_subscribers: Arc<ExecutionFinishedSubscribers>,
454}
455
456#[derive(Debug, Clone, Copy, PartialEq, Eq)]
457pub enum ProvisionPolicy {
458 NeverCreate,
459 Auto,
461 MustCreate,
463}
464
465#[derive(Debug, Clone, Copy, PartialEq, Eq)]
466pub enum DbInitialzationOutcome {
467 Created,
468 Existing,
469}
470
471impl PostgresPool {
472 #[instrument(skip_all, name = "postgres_new")]
473 pub async fn new(
474 config: PostgresConfig,
475 provision_policy: ProvisionPolicy,
476 ) -> Result<PostgresPool, InitializationError> {
477 Self::new_with_outcome(config, provision_policy)
478 .await
479 .map(|(db, _)| db)
480 }
481
482 pub async fn new_with_outcome(
483 config: PostgresConfig,
484 provision_policy: ProvisionPolicy,
485 ) -> Result<(PostgresPool, DbInitialzationOutcome), InitializationError> {
486 let outcome = if matches!(
487 provision_policy,
488 ProvisionPolicy::Auto | ProvisionPolicy::MustCreate
489 ) {
490 create_database(&config, provision_policy).await?
491 } else {
492 DbInitialzationOutcome::Existing
493 };
494 let mut cfg = deadpool_postgres::Config::new();
495 cfg.host = Some(config.host.clone());
496 cfg.user = Some(config.user.clone());
497 cfg.password = Some(config.password.expose_secret().to_string());
498 cfg.dbname = Some(config.db_name.clone());
499 cfg.manager = Some(ManagerConfig {
500 recycling_method: RecyclingMethod::Fast,
501 });
502
503 let pool = cfg.create_pool(None, NoTls).map_err(|err| {
504 error!("Cannot create the database pool - {err:?}");
505 InitializationError
506 })?;
507 let client = pool.get().await.map_err(|err| {
508 error!("Cannot get a connection from the database pool - {err:?}");
509 InitializationError
510 })?;
511
512 let statements = vec![
513 ddl::CREATE_TABLE_T_METADATA,
514 ddl::CREATE_TABLE_T_EXECUTION_LOG,
515 ddl::CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_VERSION,
516 ddl::CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_VARIANT,
517 ddl::CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_JOIN_SET,
518 ddl::CREATE_TABLE_T_JOIN_SET_RESPONSE,
519 ddl::CREATE_INDEX_IDX_T_JOIN_SET_RESPONSE_EXECUTION_ID_ID,
520 ddl::CREATE_INDEX_IDX_JOIN_SET_RESPONSE_UNIQUE_CHILD_ID,
521 ddl::CREATE_INDEX_IDX_JOIN_SET_RESPONSE_UNIQUE_DELAY_ID,
522 ddl::CREATE_TABLE_T_STATE,
523 ddl::IDX_T_STATE_LOCK_PENDING_BY_FFQN,
524 ddl::IDX_T_STATE_LOCK_PENDING_BY_COMPONENT,
525 ddl::IDX_T_STATE_EXPIRED_LOCKS,
526 ddl::IDX_T_STATE_EXECUTION_ID_IS_TOP_LEVEL,
527 ddl::IDX_T_STATE_FFQN,
528 ddl::IDX_T_STATE_CREATED_AT,
529 ddl::IDX_T_STATE_DEPLOYMENT_STATE,
530 ddl::CREATE_TABLE_T_DELAY,
531 ddl::CREATE_TABLE_T_WASM_BACKTRACE,
532 ddl::CREATE_TABLE_T_EXECUTION_BACKTRACE,
533 ddl::IDX_T_EXECUTION_BACKTRACE_EXECUTION_ID_VERSION,
534 ddl::CREATE_TABLE_T_SOURCE_FILE,
535 ddl::CREATE_TABLE_T_COMPONENT_SOURCE,
536 ddl::CREATE_TABLE_T_LOG,
537 ddl::IDX_T_LOG_EXECUTION_ID_RUN_ID_CREATED_AT,
538 ddl::IDX_T_LOG_EXECUTION_ID_CREATED_AT,
539 ddl::CREATE_TABLE_T_DEPLOYMENT,
540 ddl::IDX_T_DEPLOYMENT_STATUS,
541 ddl::IDX_T_DEPLOYMENT_SINGLE_ACTIVE,
542 ddl::IDX_T_DEPLOYMENT_SINGLE_ENQUEUED,
543 ];
544
545 let batch_sql = statements.join("\n");
547 client.batch_execute(&batch_sql).await.map_err(|err| {
548 error!("Cannot run the DDL import - {err:?}");
549 InitializationError
550 })?;
551
552 let row = client
553 .query_opt(
554 "SELECT schema_version FROM t_metadata ORDER BY id DESC LIMIT 1",
555 &[],
556 )
557 .await
558 .map_err(|err| {
559 error!("Cannot select schema version - {err:?}");
560 InitializationError
561 })?;
562
563 let actual_version = match row {
565 Some(r) => Some(r.try_get::<_, i32>("schema_version").map_err(|e| {
566 error!("Failed to get schema_version column: {e}");
567 InitializationError
568 })?),
569 None => None,
570 };
571
572 match actual_version {
573 None => {
574 client
575 .execute(
576 "INSERT INTO t_metadata (schema_version, created_at) VALUES ($1, $2)",
577 &[&(T_METADATA_EXPECTED_SCHEMA_VERSION), &Utc::now()],
578 )
579 .await
580 .map_err(|err| {
581 error!("Cannot insert schema version - {err:?}");
582 InitializationError
583 })?;
584 }
585 Some(actual_version) => {
586 if (actual_version) != T_METADATA_EXPECTED_SCHEMA_VERSION {
588 error!(
589 "Wrong schema version, expected {T_METADATA_EXPECTED_SCHEMA_VERSION}, got {actual_version}"
590 );
591 return Err(InitializationError);
592 }
593 }
594 }
595
596 debug!("Database schema initialized.");
597
598 Ok((
599 PostgresPool {
600 pool,
601 execution_finished_subscribers: Arc::default(),
602 pending_subscribers: Arc::default(),
603 response_subscribers: Arc::default(),
604 config,
605 },
606 outcome,
607 ))
608 }
609}
610
611fn deployment_record_from_pg_row(row: &Row) -> Result<DeploymentRecord, DbErrorRead> {
612 let deployment_id_str: String = get(row, "deployment_id")?;
613 let deployment_id = deployment_id_str.parse::<DeploymentId>().map_err(|e| {
614 DbErrorRead::Generic(consistency_db_err(format!("invalid deployment_id: {e}")))
615 })?;
616 let status_str: String = get(row, "status")?;
617 let status = status_str
618 .parse::<DeploymentStatus>()
619 .map_err(|e| DbErrorRead::Generic(consistency_db_err(format!("invalid status: {e}"))))?;
620 Ok(DeploymentRecord {
621 deployment_id,
622 created_at: get(row, "created_at")?,
623 last_active_at: get(row, "last_active_at")?,
624 status,
625 config_json: get(row, "config_json")?,
626 obelisk_version: get(row, "obelisk_version")?,
627 created_by: get(row, "created_by")?,
628 })
629}
630
631#[track_caller]
632fn consistency_db_err(reason: impl Into<StrVariant>) -> DbErrorGeneric {
633 DbErrorGeneric::Uncategorized {
634 reason: reason.into(),
635 context: SpanTrace::capture(),
636 source: None,
637 loc: Location::caller(),
638 }
639}
640#[track_caller]
641fn consistency_db_err_src(
642 reason: impl Into<StrVariant>,
643 source: Arc<dyn std::error::Error + Send + Sync>,
644) -> DbErrorGeneric {
645 DbErrorGeneric::Uncategorized {
646 reason: reason.into(),
647 context: SpanTrace::capture(),
648 source: Some(source),
649 loc: Location::caller(),
650 }
651}
652
653#[derive(Debug, Clone)]
654struct DelayReq {
655 join_set_id: JoinSetId,
656 delay_id: DelayId,
657 expires_at: DateTime<Utc>,
658}
659
660async fn fetch_created_event(
661 tx: &Transaction<'_>,
662 execution_id: &ExecutionId,
663) -> Result<CreateRequest, DbErrorRead> {
664 let stmt = "SELECT created_at, json_value FROM t_execution_log WHERE \
665 execution_id = $1 AND version = 0";
666
667 let row = tx.query_one(stmt, &[&execution_id.to_string()]).await?;
668
669 let created_at = get(&row, "created_at")?;
670 let event: Json<ExecutionRequest> = get(&row, "json_value")?;
671 let event = event.0;
672
673 if let ExecutionRequest::Created {
674 ffqn,
675 params,
676 parent,
677 scheduled_at,
678 component_id,
679 deployment_id,
680 metadata,
681 scheduled_by,
682 } = event
683 {
684 Ok(CreateRequest {
685 created_at,
686 execution_id: execution_id.clone(),
687 ffqn,
688 params,
689 parent,
690 scheduled_at,
691 component_id,
692 deployment_id,
693 metadata,
694 scheduled_by,
695 })
696 } else {
697 error!("Row with version=0 must be a `Created` event - {event:?}");
698 Err(consistency_db_err("expected `Created` event").into())
699 }
700}
701
702fn check_expected_next_and_appending_version(
703 expected_version: &Version,
704 appending_version: &Version,
705) -> Result<(), DbErrorWrite> {
706 if *expected_version != *appending_version {
707 debug!(
708 "Version conflict - expected: {expected_version:?}, appending: {appending_version:?}"
709 );
710 return Err(DbErrorWrite::NonRetriable(
711 DbErrorWriteNonRetriable::VersionConflict {
712 expected: expected_version.clone(),
713 requested: appending_version.clone(),
714 },
715 ));
716 }
717 Ok(())
718}
719
720#[instrument(level = Level::DEBUG, skip_all, fields(execution_id = %req.execution_id))]
721async fn create_inner(
722 tx: &Transaction<'_>,
723 req: CreateRequest,
724) -> Result<(AppendResponse, AppendNotifier), DbErrorWrite> {
725 trace!("create_inner");
726
727 let version = Version::default();
728 let execution_id = req.execution_id.clone();
729 let execution_id_str = execution_id.to_string();
730 let ffqn = req.ffqn.clone();
731 let created_at = req.created_at;
732 let scheduled_at = req.scheduled_at;
733 let component_id = req.component_id.clone();
734 let deployment_id = req.deployment_id;
735
736 let event = ExecutionRequest::from(req);
737 let event = Json(event);
738
739 tx.execute(
740 "INSERT INTO t_execution_log (
741 execution_id, created_at, version, json_value, variant, join_set_id
742 ) VALUES ($1, $2, $3, $4, $5, $6)",
743 &[
744 &execution_id_str,
745 &created_at,
746 &i64::from(version.0), &event,
748 &event.0.variant(),
749 &event.0.join_set_id().map(std::string::ToString::to_string),
750 ],
751 )
752 .await?;
753
754 let pending_at = {
755 debug!("Creating with `Pending(`{scheduled_at:?}`)");
756
757 tx.execute(
758 r"
759 INSERT INTO t_state (
760 execution_id,
761 is_top_level,
762 corresponding_version,
763 pending_expires_finished,
764 ffqn,
765 state,
766 created_at,
767 component_id_input_digest,
768 component_type,
769 deployment_id,
770 first_scheduled_at,
771 updated_at,
772 intermittent_event_count,
773 is_paused
774 ) VALUES (
775 $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, CURRENT_TIMESTAMP, 0, false
776 )",
777 &[
778 &execution_id_str,
779 &execution_id.is_top_level(),
780 &i64::from(version.0),
781 &scheduled_at,
782 &ffqn.to_string(),
783 &STATE_PENDING_AT,
784 &created_at,
785 &component_id.component_digest.as_slice(),
786 &component_id.component_type.to_string(),
787 &deployment_id.to_string(),
788 &scheduled_at,
789 ],
790 )
791 .await?;
792
793 AppendNotifier {
794 pending_at: Some(NotifierPendingAt {
795 scheduled_at,
796 ffqn,
797 component_input_digest: component_id.component_digest,
798 }),
799 execution_finished: None,
800 response: None,
801 }
802 };
803
804 let next_version = Version::new(version.0 + 1);
805 Ok((next_version, pending_at))
806}
807
808#[instrument(level = Level::DEBUG, skip_all, fields(%execution_id, %scheduled_at))]
809async fn update_state_pending_after_response_appended(
810 tx: &Transaction<'_>,
811 execution_id: &ExecutionId,
812 scheduled_at: DateTime<Utc>, component_input_digest: ComponentDigest,
814) -> Result<AppendNotifier, DbErrorWrite> {
815 debug!("Setting t_state to Pending(`{scheduled_at:?}`) after response appended");
816
817 let execution_id_str = execution_id.to_string();
819
820 let updated = tx
821 .execute(
822 r"
823 UPDATE t_state
824 SET
825 pending_expires_finished = $1,
826 state = $2,
827 updated_at = CURRENT_TIMESTAMP,
828
829 max_retries = NULL,
830 retry_exp_backoff_millis = NULL,
831 last_lock_version = NULL,
832
833 join_set_id = NULL,
834 join_set_closing = NULL,
835
836 result_kind = NULL
837 WHERE execution_id = $3
838 ",
839 &[
840 &scheduled_at, &STATE_PENDING_AT, &execution_id_str, ],
844 )
845 .await?;
846
847 if updated == 0 {
848 return Err(DbErrorWrite::NotFound);
849 }
850
851 Ok(AppendNotifier {
852 pending_at: Some(NotifierPendingAt {
853 scheduled_at,
854 ffqn: fetch_created_event(tx, execution_id).await?.ffqn,
855 component_input_digest,
856 }),
857 execution_finished: None,
858 response: None,
859 })
860}
861
862#[instrument(level = Level::DEBUG, skip_all, fields(%execution_id, %scheduled_at, %appending_version))]
863async fn update_state_pending_after_event_appended(
864 tx: &Transaction<'_>,
865 execution_id: &ExecutionId,
866 appending_version: &Version,
867 scheduled_at: DateTime<Utc>,
868 intermittent_failure: bool,
869 component_input_digest: ComponentDigest,
870) -> Result<(AppendResponse, AppendNotifier), DbErrorWrite> {
871 debug!("Setting t_state to Pending(`{scheduled_at:?}`) after event appended");
872
873 let intermittent_delta = i64::from(intermittent_failure); let updated = tx
876 .execute(
877 r"
878 UPDATE t_state
879 SET
880 corresponding_version = $1,
881 pending_expires_finished = $2,
882 state = $3,
883 updated_at = CURRENT_TIMESTAMP,
884 intermittent_event_count = intermittent_event_count + $4,
885
886 max_retries = NULL,
887 retry_exp_backoff_millis = NULL,
888 last_lock_version = NULL,
889
890 join_set_id = NULL,
891 join_set_closing = NULL,
892
893 result_kind = NULL
894 WHERE execution_id = $5;
895 ",
896 &[
897 &i64::from(appending_version.0), &scheduled_at, &STATE_PENDING_AT, &intermittent_delta, &execution_id.to_string(), ],
903 )
904 .await?;
905
906 if updated != 1 {
907 return Err(DbErrorWrite::NotFound);
908 }
909
910 Ok((
911 appending_version.increment(),
912 AppendNotifier {
913 pending_at: Some(NotifierPendingAt {
914 scheduled_at,
915 ffqn: fetch_created_event(tx, execution_id).await?.ffqn,
916 component_input_digest,
917 }),
918 execution_finished: None,
919 response: None,
920 },
921 ))
922}
923
924#[instrument(level = Level::DEBUG, skip_all, fields(%execution_id, %appending_version))]
925#[expect(clippy::too_many_arguments)]
926async fn update_state_locked_get_intermittent_event_count(
927 tx: &Transaction<'_>,
928 execution_id: &ExecutionId,
929 deployment_id: DeploymentId,
930 component_digest: &ComponentDigest,
931 executor_id: ExecutorId,
932 run_id: RunId,
933 lock_expires_at: DateTime<Utc>,
934 appending_version: &Version,
935 retry_config: ComponentRetryConfig,
936) -> Result<u32, DbErrorWrite> {
937 debug!("Setting t_state to Locked(`{lock_expires_at:?}`)");
938 let backoff_millis =
939 i64::try_from(retry_config.retry_exp_backoff.as_millis()).map_err(|err| {
940 DbErrorGeneric::Uncategorized {
942 reason: "backoff too big".into(),
943 context: SpanTrace::capture(),
944 source: Some(Arc::new(err)),
945 loc: Location::caller(),
946 }
947 })?;
948
949 let execution_id_str = execution_id.to_string();
950
951 let updated = tx
952 .execute(
953 r"
954 UPDATE t_state
955 SET
956 corresponding_version = $1,
957 pending_expires_finished = $2,
958 state = $3,
959 updated_at = CURRENT_TIMESTAMP,
960 deployment_id = $4,
961 component_id_input_digest = $5,
962
963 max_retries = $6,
964 retry_exp_backoff_millis = $7,
965 last_lock_version = $1,
966 executor_id = $8,
967 run_id = $9,
968
969 join_set_id = NULL,
970 join_set_closing = NULL,
971
972 result_kind = NULL
973 WHERE execution_id = $10
974 AND is_paused = false
975 ",
976 &[
977 &i64::from(appending_version.0),
978 &lock_expires_at,
979 &STATE_LOCKED,
980 &deployment_id.to_string(),
981 &component_digest,
982 &retry_config.max_retries.map(i64::from),
983 &backoff_millis,
984 &executor_id.to_string(),
985 &run_id.to_string(),
986 &execution_id_str,
987 ],
988 )
989 .await?;
990
991 if updated != 1 {
992 return Err(DbErrorWrite::NotFound);
993 }
994
995 let row = tx
997 .query_one(
998 "SELECT intermittent_event_count FROM t_state WHERE execution_id = $1",
999 &[&execution_id_str],
1000 )
1001 .await
1002 .map_err(DbErrorGeneric::from)?;
1003
1004 let count: i64 = get(&row, "intermittent_event_count")?; let count = u32::try_from(count)
1006 .map_err(|_| consistency_db_err("`intermittent_event_count` must not be negative"))?;
1007 Ok(count)
1008}
1009
1010#[instrument(level = Level::DEBUG, skip_all, fields(%execution_id, %appending_version))]
1011async fn update_state_blocked(
1012 tx: &Transaction<'_>,
1013 execution_id: &ExecutionId,
1014 appending_version: &Version,
1015 join_set_id: &JoinSetId,
1016 lock_expires_at: DateTime<Utc>,
1017 join_set_closing: bool,
1018) -> Result<AppendResponse, DbErrorWrite> {
1019 debug!("Setting t_state to BlockedByJoinSet(`{join_set_id}`)");
1020
1021 let updated = tx
1022 .execute(
1023 r"
1024 UPDATE t_state
1025 SET
1026 corresponding_version = $1,
1027 pending_expires_finished = $2,
1028 state = $3,
1029 updated_at = CURRENT_TIMESTAMP,
1030
1031 max_retries = NULL,
1032 retry_exp_backoff_millis = NULL,
1033 last_lock_version = NULL,
1034
1035 join_set_id = $4,
1036 join_set_closing = $5,
1037
1038 result_kind = NULL
1039 WHERE execution_id = $6
1040 ",
1041 &[
1042 &i64::from(appending_version.0), &lock_expires_at, &STATE_BLOCKED_BY_JOIN_SET, &join_set_id.to_string(), &join_set_closing, &execution_id.to_string(), ],
1049 )
1050 .await?;
1051
1052 if updated != 1 {
1053 return Err(DbErrorWrite::NotFound);
1054 }
1055 Ok(appending_version.increment())
1056}
1057
1058#[instrument(level = Level::DEBUG, skip_all, fields(%execution_id, %appending_version))]
1059async fn update_state_finished(
1060 tx: &Transaction<'_>,
1061 execution_id: &ExecutionId,
1062 appending_version: &Version,
1063 finished_at: DateTime<Utc>,
1064 result_kind: PendingStateFinishedResultKind,
1065) -> Result<(), DbErrorWrite> {
1066 debug!("Setting t_state to Finished");
1067
1068 let result_kind_json = Json(result_kind);
1069
1070 let updated = tx
1071 .execute(
1072 r"
1073 UPDATE t_state
1074 SET
1075 corresponding_version = $1,
1076 pending_expires_finished = $2,
1077 state = $3,
1078 updated_at = CURRENT_TIMESTAMP,
1079
1080 max_retries = NULL,
1081 retry_exp_backoff_millis = NULL,
1082 last_lock_version = NULL,
1083 executor_id = NULL,
1084 run_id = NULL,
1085
1086 join_set_id = NULL,
1087 join_set_closing = NULL,
1088
1089 result_kind = $4
1090 WHERE execution_id = $5
1091 ",
1092 &[
1093 &i64::from(appending_version.0), &finished_at, &STATE_FINISHED, &result_kind_json, &execution_id.to_string(), ],
1099 )
1100 .await?;
1101
1102 if updated != 1 {
1103 return Err(DbErrorWrite::NotFound);
1104 }
1105 Ok(())
1106}
1107
1108#[instrument(level = Level::DEBUG, skip_all, fields(%execution_id, %appending_version, %is_paused))]
1109async fn update_state_paused(
1110 tx: &Transaction<'_>,
1111 execution_id: &ExecutionId,
1112 appending_version: &Version,
1113 is_paused: bool,
1114) -> Result<AppendResponse, DbErrorWrite> {
1115 debug!(
1116 "Setting t_state to {}",
1117 if is_paused { "paused" } else { "unpaused" }
1118 );
1119
1120 let updated = tx
1121 .execute(
1122 r"
1123 UPDATE t_state
1124 SET
1125 corresponding_version = $1,
1126 is_paused = $2,
1127 updated_at = CURRENT_TIMESTAMP
1128 WHERE execution_id = $3
1129 ",
1130 &[
1131 &i64::from(appending_version.0), &is_paused, &execution_id.to_string(), ],
1135 )
1136 .await?;
1137
1138 if updated != 1 {
1139 return Err(DbErrorWrite::NotFound);
1140 }
1141 Ok(appending_version.increment())
1142}
1143
1144#[instrument(level = Level::DEBUG, skip_all, fields(%execution_id, %appending_version))]
1145async fn bump_state_next_version(
1146 tx: &Transaction<'_>,
1147 execution_id: &ExecutionId,
1148 appending_version: &Version,
1149 delay_req: Option<DelayReq>,
1150) -> Result<AppendResponse, DbErrorWrite> {
1151 debug!("update_index_version");
1152 let execution_id_str = execution_id.to_string();
1153
1154 let updated = tx
1155 .execute(
1156 r"
1157 UPDATE t_state
1158 SET
1159 corresponding_version = $1,
1160 updated_at = CURRENT_TIMESTAMP
1161 WHERE execution_id = $2
1162 ",
1163 &[
1164 &i64::from(appending_version.0), &execution_id_str, ],
1167 )
1168 .await?;
1169
1170 if updated != 1 {
1171 return Err(DbErrorWrite::NotFound);
1172 }
1173
1174 if let Some(DelayReq {
1175 join_set_id,
1176 delay_id,
1177 expires_at,
1178 }) = delay_req
1179 {
1180 debug!("Inserting delay to `t_delay`");
1181 tx.execute(
1182 "INSERT INTO t_delay (execution_id, join_set_id, delay_id, expires_at) VALUES ($1, $2, $3, $4)",
1183 &[
1184 &execution_id_str,
1185 &join_set_id.to_string(),
1186 &delay_id.to_string(),
1187 &expires_at,
1188 ],
1189 )
1190 .await?;
1191 }
1192 Ok(appending_version.increment())
1193}
1194
1195async fn get_combined_state(
1196 tx: &Transaction<'_>,
1197 execution_id: &ExecutionId,
1198) -> Result<CombinedState, DbErrorRead> {
1199 let row = tx
1200 .query_one(
1201 r"
1202 SELECT
1203 created_at, first_scheduled_at,
1204 state, ffqn, component_id_input_digest, component_type, deployment_id, corresponding_version, pending_expires_finished,
1205 last_lock_version, executor_id, run_id,
1206 join_set_id, join_set_closing,
1207 result_kind, is_paused
1208 FROM t_state
1209 WHERE execution_id = $1
1210 ",
1211 &[&execution_id.to_string()],
1212 )
1213 .await
1214 .map_err(DbErrorRead::from)?;
1215
1216 let created_at: DateTime<Utc> = get(&row, "created_at")?;
1219 let first_scheduled_at: DateTime<Utc> = get(&row, "first_scheduled_at")?;
1220
1221 let digest_bytes: Vec<u8> = get(&row, "component_id_input_digest")?;
1222 let digest = Digest::try_from(digest_bytes.as_slice()).map_err(|err| {
1223 consistency_db_err_src("cannot parse `component_id_input_digest`", Arc::from(err))
1224 })?;
1225 let component_digest = ComponentDigest(digest);
1226
1227 let component_type: String = get(&row, "component_type")?;
1228 let component_type = ComponentType::from_str(&component_type)
1229 .map_err(|err| consistency_db_err_src("cannot parse `component_type`", Arc::from(err)))?;
1230
1231 let deployment_id: String = get(&row, "deployment_id")?;
1232 let deployment_id = DeploymentId::from_str(&deployment_id).map_err(DbErrorGeneric::from)?;
1233
1234 let state: String = get(&row, "state")?;
1235 let ffqn: String = get(&row, "ffqn")?;
1236 let ffqn = FunctionFqn::from_str(&ffqn).map_err(|parse_err| {
1237 consistency_db_err(format!("invalid ffqn value in `t_state` - {parse_err}"))
1238 })?;
1239
1240 let pending_expires_finished: DateTime<Utc> = get(&row, "pending_expires_finished")?;
1241
1242 let last_lock_version_raw: Option<i64> = get(&row, "last_lock_version")?;
1243 let last_lock_version = last_lock_version_raw
1244 .map(Version::try_from)
1245 .transpose()
1246 .map_err(|_| consistency_db_err("version must be non-negative"))?;
1247
1248 let executor_id_raw: Option<String> = get(&row, "executor_id")?;
1249 let executor_id = executor_id_raw
1250 .map(|id| ExecutorId::from_str(&id))
1251 .transpose()
1252 .map_err(DbErrorGeneric::from)?;
1253
1254 let run_id_raw: Option<String> = get(&row, "run_id")?;
1255 let run_id = run_id_raw
1256 .map(|id| RunId::from_str(&id))
1257 .transpose()
1258 .map_err(DbErrorGeneric::from)?;
1259
1260 let join_set_id_raw: Option<String> = get(&row, "join_set_id")?;
1261 let join_set_id = join_set_id_raw
1262 .map(|id| JoinSetId::from_str(&id))
1263 .transpose()
1264 .map_err(DbErrorGeneric::from)?;
1265
1266 let join_set_closing: Option<bool> = get(&row, "join_set_closing")?;
1267
1268 let result_kind: Option<Json<PendingStateFinishedResultKind>> = get(&row, "result_kind")?;
1269 let result_kind = result_kind.map(|it| it.0);
1270
1271 let is_paused: bool = get(&row, "is_paused")?;
1272
1273 let corresponding_version: i64 = get(&row, "corresponding_version")?;
1274 let corresponding_version = Version::new(
1275 VersionType::try_from(corresponding_version)
1276 .map_err(|_| consistency_db_err("version must be non-negative"))?,
1277 );
1278
1279 let dto = CombinedStateDTO {
1280 execution_id: execution_id.clone(),
1281 created_at,
1282 first_scheduled_at,
1283 state,
1284 ffqn,
1285 component_digest,
1286 component_type,
1287 deployment_id,
1288 pending_expires_finished,
1289 last_lock_version,
1290 executor_id,
1291 run_id,
1292 join_set_id,
1293 join_set_closing,
1294 result_kind,
1295 is_paused,
1296 };
1297 CombinedState::new(dto, corresponding_version).map_err(DbErrorRead::from)
1298}
1299
1300async fn list_executions(
1301 read_tx: &Transaction<'_>,
1302 filter: ListExecutionsFilter,
1303 pagination: &ExecutionListPagination,
1304) -> Result<Vec<ExecutionWithState>, DbErrorGeneric> {
1305 struct QueryBuilder {
1307 where_clauses: Vec<String>,
1308 params: Vec<Box<dyn ToSql + Send + Sync>>,
1309 }
1310
1311 impl QueryBuilder {
1312 fn new() -> Self {
1313 Self {
1314 where_clauses: Vec::new(),
1315 params: Vec::new(),
1316 }
1317 }
1318
1319 fn add_param<T>(&mut self, param: T) -> String
1320 where
1321 T: ToSql + Sync + Send + 'static,
1322 {
1323 self.params.push(Box::new(param));
1324 format!("${}", self.params.len())
1325 }
1326
1327 fn add_where(&mut self, clause: String) {
1328 self.where_clauses.push(clause);
1329 }
1330 }
1331
1332 let mut qb = QueryBuilder::new();
1333
1334 let (limit, limit_desc) = match pagination {
1336 ExecutionListPagination::CreatedBy(p) => {
1337 let limit = p.length();
1338 let is_desc = p.is_desc();
1339 if let Some(cursor) = p.cursor() {
1340 let placeholder = qb.add_param(*cursor);
1341 qb.add_where(format!("created_at {} {placeholder}", p.rel()));
1342 }
1343 (limit, is_desc)
1344 }
1345 ExecutionListPagination::ExecutionId(p) => {
1346 let limit = p.length();
1347 let is_desc = p.is_desc();
1348 if let Some(cursor) = p.cursor() {
1349 let placeholder = qb.add_param(cursor.to_string());
1350 qb.add_where(format!("execution_id {} {placeholder}", p.rel()));
1351 }
1352 (limit, is_desc)
1353 }
1354 };
1355
1356 if !filter.show_derived {
1357 qb.add_where("is_top_level = true".to_string());
1358 }
1359 let like = |str| format!("{str}%");
1360 if let Some(ffqn_prefix) = filter.ffqn_prefix {
1361 let placeholder = qb.add_param(like(ffqn_prefix));
1362 qb.add_where(format!("ffqn LIKE {placeholder}"));
1363 }
1364 if filter.hide_finished {
1365 qb.add_where(format!("state != '{STATE_FINISHED}'"));
1366 }
1367 if let Some(prefix) = filter.execution_id_prefix {
1368 let placeholder = qb.add_param(like(prefix));
1369 qb.add_where(format!("execution_id LIKE {placeholder}"));
1370 }
1371 if let Some(component_digest) = filter.component_digest {
1372 let placeholder = qb.add_param(component_digest);
1373 qb.add_where(format!("component_id_input_digest = {placeholder}"));
1374 }
1375 if let Some(deployment_id) = filter.deployment_id {
1376 let placeholder = qb.add_param(deployment_id.to_string());
1377 qb.add_where(format!("deployment_id = {placeholder}"));
1378 }
1379
1380 let where_str = if qb.where_clauses.is_empty() {
1381 String::new()
1382 } else {
1383 format!("WHERE {}", qb.where_clauses.join(" AND "))
1384 };
1385
1386 let order_col = match pagination {
1387 ExecutionListPagination::CreatedBy(_) => "created_at",
1388 ExecutionListPagination::ExecutionId(_) => "execution_id",
1389 };
1390
1391 let (inner_order, outer_order) = if limit_desc {
1394 ("DESC", "")
1395 } else {
1396 ("", "DESC")
1397 };
1398
1399 let inner_sql = format!(
1400 r"SELECT created_at, first_scheduled_at, component_id_input_digest, deployment_id,
1401 state, execution_id, ffqn, corresponding_version, pending_expires_finished,
1402 last_lock_version, executor_id, run_id,
1403 join_set_id, join_set_closing,
1404 result_kind, is_paused
1405 FROM t_state {where_str} ORDER BY {order_col} {inner_order} LIMIT {limit}"
1406 );
1407
1408 let sql = if outer_order.is_empty() {
1409 inner_sql
1410 } else {
1411 format!("SELECT * FROM ({inner_sql}) AS sub ORDER BY {order_col} {outer_order}")
1412 };
1413
1414 let params_refs: Vec<&(dyn ToSql + Sync)> = qb
1415 .params
1416 .iter()
1417 .map(|p| p.as_ref() as &(dyn ToSql + Sync))
1418 .collect();
1419
1420 let rows = read_tx.query(&sql, ¶ms_refs).await?;
1421
1422 let mut vec = Vec::with_capacity(rows.len());
1423
1424 for row in rows {
1425 let unpack = || -> Result<ExecutionWithState, DbErrorGeneric> {
1427 let execution_id_str: String = get(&row, "execution_id")?;
1428 let execution_id = ExecutionId::from_str(&execution_id_str)
1429 .map_err(|err| consistency_db_err(err.to_string()))?;
1430
1431 let digest_bytes: Vec<u8> = get(&row, "component_id_input_digest")?;
1432 let digest = Digest::try_from(digest_bytes.as_slice()).map_err(|err| {
1433 consistency_db_err_src("cannot parse `component_id_input_digest`", Arc::from(err))
1434 })?;
1435 let component_digest = ComponentDigest(digest);
1436
1437 let component_type: String = get(&row, "component_type")?;
1438 let component_type = ComponentType::from_str(&component_type).map_err(|err| {
1439 consistency_db_err_src("cannot parse `component_type`", Arc::from(err))
1440 })?;
1441
1442 let deployment_id: String = get(&row, "deployment_id")?;
1443 let deployment_id =
1444 DeploymentId::from_str(&deployment_id).map_err(DbErrorGeneric::from)?;
1445
1446 let created_at: DateTime<Utc> = get(&row, "created_at")?;
1447 let first_scheduled_at: DateTime<Utc> = get(&row, "first_scheduled_at")?;
1448
1449 let result_kind: Option<Json<PendingStateFinishedResultKind>> =
1450 get(&row, "result_kind")?;
1451 let result_kind = result_kind.map(|it| it.0);
1452
1453 let is_paused: bool = get(&row, "is_paused")?;
1454
1455 let corresponding_version: i64 = get(&row, "corresponding_version")?;
1456 let corresponding_version = Version::try_from(corresponding_version)
1457 .map_err(|_| consistency_db_err("version must be non-negative"))?;
1458
1459 let executor_id_str: Option<String> = get(&row, "executor_id")?;
1460 let executor_id = executor_id_str
1461 .map(|id| ExecutorId::from_str(&id))
1462 .transpose()?;
1463
1464 let last_lock_version_raw: Option<i64> = get(&row, "last_lock_version")?;
1465 let last_lock_version = last_lock_version_raw
1466 .map(Version::try_from)
1467 .transpose()
1468 .map_err(|_| consistency_db_err("version must be non-negative"))?;
1469
1470 let run_id_str: Option<String> = get(&row, "run_id")?;
1471 let run_id = run_id_str.map(|id| RunId::from_str(&id)).transpose()?;
1472
1473 let join_set_id_str: Option<String> = get(&row, "join_set_id")?;
1474 let join_set_id = join_set_id_str
1475 .map(|id| JoinSetId::from_str(&id))
1476 .transpose()?;
1477
1478 let ffqn: String = get(&row, "ffqn")?;
1479 let ffqn = FunctionFqn::from_str(&ffqn).map_err(|parse_err| {
1480 error!("Error parsing ffqn - {parse_err:?}");
1481 consistency_db_err("invalid ffqn value in `t_state`")
1482 })?;
1483
1484 let combined_state_dto = CombinedStateDTO {
1485 execution_id,
1486 created_at,
1487 first_scheduled_at,
1488 component_digest,
1489 component_type,
1490 deployment_id,
1491 state: get(&row, "state")?,
1492 ffqn,
1493 pending_expires_finished: get(&row, "pending_expires_finished")?,
1494 executor_id,
1495 last_lock_version,
1496 run_id,
1497 join_set_id,
1498 join_set_closing: get(&row, "join_set_closing")?,
1499 result_kind,
1500 is_paused,
1501 };
1502
1503 let combined_state = CombinedState::new(combined_state_dto, corresponding_version)?;
1504
1505 Ok(combined_state.execution_with_state)
1506 };
1507
1508 match unpack() {
1509 Ok(execution) => vec.push(execution),
1510 Err(err) => {
1511 warn!("Skipping corrupted row in t_state: {err:?}");
1512 }
1513 }
1514 }
1515
1516 Ok(vec)
1517}
1518
1519async fn list_responses(
1520 tx: &Transaction<'_>,
1521 execution_id: &ExecutionId,
1522 pagination: Option<Pagination<u32>>,
1523) -> Result<Vec<ResponseWithCursor>, DbErrorRead> {
1524 let mut params: Vec<Box<dyn ToSql + Send + Sync>> = Vec::new();
1526 let mut add_param = |p: Box<dyn ToSql + Send + Sync>| {
1527 params.push(p);
1528 format!("${}", params.len())
1529 };
1530
1531 let p_execution_id = add_param(Box::new(execution_id.to_string()));
1533
1534 let mut sql = format!(
1535 "SELECT \
1536 r.id, r.created_at, r.join_set_id, r.delay_id, r.delay_success, r.child_execution_id, r.finished_version, l.json_value \
1537 FROM t_join_set_response r LEFT OUTER JOIN t_execution_log l ON r.child_execution_id = l.execution_id \
1538 WHERE \
1539 r.execution_id = {p_execution_id} \
1540 AND ( r.finished_version = l.version OR r.child_execution_id IS NULL )"
1541 );
1542
1543 let limit = match &pagination {
1545 Some(p @ (Pagination::NewerThan { cursor, .. } | Pagination::OlderThan { cursor, .. })) => {
1546 let p_cursor = add_param(Box::new(i64::from(*cursor)));
1548
1549 write!(sql, " AND r.id {} {}", p.rel(), p_cursor).unwrap();
1551
1552 Some(p.length())
1553 }
1554 None => None,
1555 };
1556
1557 sql.push_str(" ORDER BY r.id");
1559 let is_desc = pagination.as_ref().is_some_and(Pagination::is_desc);
1560 if is_desc {
1561 sql.push_str(" DESC");
1562 }
1563
1564 if let Some(limit) = limit {
1566 let p_limit = add_param(Box::new(i64::from(limit)));
1568 write!(sql, " LIMIT {p_limit}").unwrap();
1569 }
1570
1571 if is_desc {
1573 sql = format!("SELECT * FROM ({sql}) AS sub ORDER BY id ASC");
1574 }
1575
1576 let params_refs: Vec<&(dyn ToSql + Sync)> = params
1577 .iter()
1578 .map(|p| p.as_ref() as &(dyn ToSql + Sync))
1579 .collect();
1580
1581 let rows = tx
1582 .query(&sql, ¶ms_refs)
1583 .await
1584 .map_err(DbErrorRead::from)?;
1585
1586 let mut results = Vec::with_capacity(rows.len());
1587 for row in rows {
1588 results.push(parse_response_with_cursor(&row)?);
1589 }
1590
1591 Ok(results)
1592}
1593
1594async fn list_logs_tx(
1595 tx: &Transaction<'_>,
1596 execution_id: &ExecutionId,
1597 filter: &LogFilter,
1598 pagination: &Pagination<u32>,
1599) -> Result<ListLogsResponse, DbErrorRead> {
1600 let mut param_index = 1;
1601 let mut query = format!(
1602 "SELECT id, run_id, created_at, level, message, stream_type, payload
1603 FROM t_log
1604 WHERE execution_id = ${param_index}",
1605 );
1606 param_index += 1;
1607
1608 let execution_id = execution_id.to_string();
1609 let mut params: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = vec![&execution_id];
1610
1611 let level_filter = if filter.should_show_logs() {
1613 let levels_str = if !filter.levels().is_empty() {
1614 filter
1615 .levels()
1616 .iter()
1617 .map(|lvl| (*lvl as u8).to_string())
1618 .collect::<Vec<_>>()
1619 .join(",")
1620 } else {
1621 LogLevel::iter()
1622 .map(|lvl| (lvl as u8).to_string())
1623 .collect::<Vec<_>>()
1624 .join(",")
1625 };
1626 Some(format!(" level IN ({levels_str})"))
1627 } else {
1628 None
1629 };
1630 let stream_filter = if filter.should_show_streams() {
1631 let streams_str = if !filter.stream_types().is_empty() {
1632 filter
1633 .stream_types()
1634 .iter()
1635 .map(|st| (*st as u8).to_string())
1636 .collect::<Vec<_>>()
1637 .join(",")
1638 } else {
1639 LogStreamType::iter()
1640 .map(|st| (st as u8).to_string())
1641 .collect::<Vec<_>>()
1642 .join(",")
1643 };
1644 Some(format!(" stream_type IN ({streams_str})"))
1645 } else {
1646 None
1647 };
1648 match (level_filter, stream_filter) {
1649 (Some(level_filter), Some(stream_filter)) => {
1650 write!(&mut query, " AND ({level_filter} OR {stream_filter})")
1651 .expect("writing to string");
1652 }
1653 (Some(level_filter), None) => {
1654 write!(&mut query, " AND {level_filter}").expect("writing to string");
1655 }
1656 (None, Some(stream_filter)) => {
1657 write!(&mut query, " AND {stream_filter}").expect("writing to string");
1658 }
1659 (None, None) => unreachable!("guarded by constructor"),
1660 }
1661
1662 write!(&mut query, " AND id {} ${param_index}", pagination.rel()).expect("writing to string");
1664 let cursor_val: i64 = (*pagination.cursor()).into();
1665 params.push(&cursor_val);
1666 param_index += 1;
1667
1668 write!(
1670 &mut query,
1671 " ORDER BY id {} LIMIT ${}",
1672 if pagination.is_desc() { "DESC" } else { "ASC" },
1673 param_index
1674 )
1675 .expect("writing to string");
1676 let length_val: i64 = i64::from(pagination.length());
1677 params.push(&length_val);
1678
1679 let rows = tx.query(&query, ¶ms[..]).await?;
1680
1681 let mut items = Vec::with_capacity(rows.len());
1682
1683 for row in rows {
1684 let cursor = u32::try_from(get::<i64, _>(&row, "id")?)
1685 .map_err(|_| consistency_db_err("t_join_set_response.id must not be negative"))?;
1686 let created_at: chrono::DateTime<chrono::Utc> = get(&row, "created_at")?;
1687 let run_id: String = get(&row, "run_id")?;
1688 let run_id = RunId::from_str(&run_id).map_err(|parse_err| {
1689 consistency_db_err_src(
1690 format!("cannot convert RunId {run_id}, id: {cursor}"),
1691 Arc::from(parse_err),
1692 )
1693 })?;
1694
1695 let level: Option<i32> = get(&row, "level")?;
1696 let message: Option<String> = get(&row, "message")?;
1697 let stream_type: Option<i32> = get(&row, "stream_type")?;
1698 let payload: Option<Vec<u8>> = get(&row, "payload")?;
1699
1700 let log_entry = match (level, message, stream_type, payload) {
1701 (Some(lvl), Some(msg), None, None) => {
1702 let map_err = |err| {
1703 consistency_db_err_src(
1704 format!("cannot convert {lvl} to LogLevel , id: {cursor}"),
1705 err,
1706 )
1707 };
1708 LogEntry::Log {
1709 created_at,
1710 level: u8::try_from(lvl)
1711 .map(|lvl| LogLevel::try_from(lvl).map_err(|err| map_err(Arc::from(err))))
1712 .map_err(|err| map_err(Arc::from(err)))??,
1713 message: msg,
1714 }
1715 }
1716 (None, None, Some(stype), Some(pl)) => {
1717 let map_err = |err| {
1718 consistency_db_err_src(
1719 format!("cannot convert {stype} to LogStreamType , id: {cursor}"),
1720 err,
1721 )
1722 };
1723 LogEntry::Stream {
1724 created_at,
1725 stream_type: u8::try_from(stype)
1726 .map(|stype| {
1727 LogStreamType::try_from(stype).map_err(|err| map_err(Arc::from(err)))
1728 })
1729 .map_err(|err| map_err(Arc::from(err)))??,
1730 payload: pl,
1731 }
1732 }
1733 _ => {
1734 return Err(consistency_db_err(format!("invalid t_log row id:{cursor}")).into());
1735 }
1736 };
1737
1738 items.push(LogEntryRow {
1739 cursor,
1740 run_id,
1741 log_entry,
1742 });
1743 }
1744
1745 Ok(ListLogsResponse {
1746 next_page: items
1747 .last()
1748 .map(|item| Pagination::NewerThan {
1749 length: pagination.length(),
1750 cursor: item.cursor,
1751 including_cursor: false,
1752 })
1753 .unwrap_or(if pagination.is_asc() {
1754 *pagination } else {
1756 Pagination::NewerThan {
1758 length: pagination.length(),
1759 cursor: 0,
1760 including_cursor: false, }
1762 }),
1763 prev_page: match items.first() {
1764 Some(item) => Some(Pagination::OlderThan {
1765 length: pagination.length(),
1766 cursor: item.cursor,
1767 including_cursor: false,
1768 }),
1769 None if pagination.is_asc() && *pagination.cursor() > 0 => {
1770 Some(pagination.invert())
1772 }
1773 None => None,
1774 },
1775 items,
1776 })
1777}
1778
1779async fn list_deployment_states(
1780 tx: &Transaction<'_>,
1781 current_time: DateTime<Utc>,
1782 pagination: Pagination<Option<DeploymentId>>,
1783 include_config_json: bool,
1784) -> Result<Vec<DeploymentState>, DbErrorRead> {
1785 let mut params: Vec<Box<dyn ToSql + Send + Sync>> = Vec::new();
1787 let mut add_param = |p: Box<dyn ToSql + Send + Sync>| {
1788 params.push(p);
1789 format!("${}", params.len())
1790 };
1791
1792 let p_now = add_param(Box::new(current_time));
1794
1795 let config_json_col = if include_config_json {
1796 "d.config_json"
1797 } else {
1798 "NULL::TEXT AS config_json"
1799 };
1800
1801 let mut sql = format!(
1802 "
1803 SELECT
1804 d.deployment_id,
1805
1806 COUNT(*) FILTER (WHERE s.state = '{STATE_LOCKED}') AS locked,
1807
1808 COUNT(*) FILTER (
1809 WHERE s.state = '{STATE_PENDING_AT}'
1810 AND s.pending_expires_finished <= {p_now}
1811 ) AS pending,
1812
1813 COUNT(*) FILTER (
1814 WHERE s.state = '{STATE_PENDING_AT}'
1815 AND s.pending_expires_finished > {p_now}
1816 ) AS scheduled,
1817
1818 COUNT(*) FILTER (WHERE s.state = '{STATE_BLOCKED_BY_JOIN_SET}') AS blocked,
1819
1820 COUNT(*) FILTER (WHERE s.state = '{STATE_FINISHED}') AS finished,
1821
1822 {config_json_col},
1823 d.created_at,
1824 d.last_active_at,
1825 d.status
1826 FROM t_deployment d
1827 LEFT JOIN t_state s ON s.deployment_id = d.deployment_id"
1828 );
1829
1830 if let Some(cursor) = pagination.cursor() {
1832 let p_cursor = add_param(Box::new(cursor.to_string()));
1833 write!(
1834 sql,
1835 " WHERE d.deployment_id {rel} {p_cursor}",
1836 rel = pagination.rel()
1837 )
1838 .expect("writing to string");
1839 }
1840
1841 let (inner_order, outer_order) = if pagination.is_desc() {
1845 ("DESC", "")
1846 } else {
1847 ("ASC", "DESC")
1848 };
1849
1850 write!(
1851 sql,
1852 " GROUP BY d.deployment_id, d.config_json, d.created_at, d.last_active_at, d.status ORDER BY d.deployment_id {inner_order} LIMIT {}",
1853 pagination.length()
1854 )
1855 .expect("writing to string");
1856
1857 let final_sql = if outer_order.is_empty() {
1858 sql
1859 } else {
1860 format!("SELECT * FROM ({sql}) AS sub ORDER BY deployment_id {outer_order}")
1861 };
1862
1863 let params_refs: Vec<&(dyn ToSql + Sync)> = params
1864 .iter()
1865 .map(|p| p.as_ref() as &(dyn ToSql + Sync))
1866 .collect();
1867
1868 let rows = tx
1869 .query(&final_sql, ¶ms_refs)
1870 .await
1871 .map_err(DbErrorRead::from)?;
1872
1873 let mut result = Vec::with_capacity(rows.len());
1874 for row in rows {
1875 let deployment_id: String = get(&row, "deployment_id")?;
1876 let status_str: String = get::<String, _>(&row, "status")?;
1877 let status = status_str
1878 .parse::<DeploymentStatus>()
1879 .map_err(|e| consistency_db_err(format!("unknown deployment status: {e}")))?;
1880 result.push(DeploymentState {
1881 deployment_id: DeploymentId::from_str(&deployment_id).map_err(DbErrorGeneric::from)?,
1882 locked: u32::try_from(get::<i64, _>(&row, "locked")?).expect("count is never negative"),
1883 pending: u32::try_from(get::<i64, _>(&row, "pending")?)
1884 .expect("count is never negative"),
1885 scheduled: u32::try_from(get::<i64, _>(&row, "scheduled")?)
1886 .expect("count is never negative"),
1887 blocked: u32::try_from(get::<i64, _>(&row, "blocked")?)
1888 .expect("count is never negative"),
1889 finished: u32::try_from(get::<i64, _>(&row, "finished")?)
1890 .expect("count is never negative"),
1891 config_json: get::<Option<String>, _>(&row, "config_json")?,
1892 created_at: get::<DateTime<Utc>, _>(&row, "created_at")?,
1893 last_active_at: get::<Option<DateTime<Utc>>, _>(&row, "last_active_at")?,
1894 status,
1895 });
1896 }
1897
1898 Ok(result)
1899}
1900
1901fn parse_response_with_cursor(
1902 row: &tokio_postgres::Row,
1903) -> Result<ResponseWithCursor, DbErrorRead> {
1904 let id = u32::try_from(get::<i64, _>(row, "id")?)
1906 .map_err(|_| consistency_db_err("id must not be negative"))?;
1907
1908 let created_at: DateTime<Utc> = get(row, "created_at")?;
1909 let join_set_id_str: String = get(row, "join_set_id")?;
1910 let join_set_id = JoinSetId::from_str(&join_set_id_str).map_err(DbErrorGeneric::from)?;
1911
1912 let delay_id: Option<String> = get(row, "delay_id")?;
1914 let delay_id = delay_id
1915 .map(|id| DelayId::from_str(&id))
1916 .transpose()
1917 .map_err(DbErrorGeneric::from)?;
1918 let delay_success: Option<bool> = get(row, "delay_success")?;
1919 let child_execution_id: Option<String> = get(row, "child_execution_id")?;
1920 let child_execution_id = child_execution_id
1921 .map(|id| ExecutionIdDerived::from_str(&id))
1922 .transpose()
1923 .map_err(DbErrorGeneric::from)?;
1924 let finished_version = get::<Option<i64>, _>(row, "finished_version")?
1925 .map(Version::try_from)
1926 .transpose()
1927 .map_err(|_| consistency_db_err("version must be non-negative"))?;
1928 let json_value: Option<Json<ExecutionRequest>> = get(row, "json_value")?;
1929 let json_value = json_value.map(|it| it.0);
1930
1931 let event = match (
1932 delay_id,
1933 delay_success,
1934 child_execution_id,
1935 finished_version,
1936 json_value,
1937 ) {
1938 (Some(delay_id), Some(delay_success), None, None, None) => JoinSetResponse::DelayFinished {
1939 delay_id,
1940 result: delay_success.then_some(()).ok_or(()),
1941 },
1942 (None, None, Some(child_execution_id), Some(finished_version), Some(json_val)) => {
1943 if let ExecutionRequest::Finished { retval: result, .. } = json_val {
1944 JoinSetResponse::ChildExecutionFinished {
1945 child_execution_id,
1946 finished_version,
1947 result,
1948 }
1949 } else {
1950 error!("Joined log entry must be 'Finished'");
1951 return Err(consistency_db_err("joined log entry must be 'Finished'").into());
1952 }
1953 }
1954 (delay, delay_success, child, finished, result) => {
1955 error!(
1956 "Invalid row in t_join_set_response {id} - {delay:?} {delay_success:?} {child:?} {finished:?} {result:?}",
1957 );
1958 return Err(consistency_db_err("invalid row in t_join_set_response").into());
1959 }
1960 };
1961
1962 Ok(ResponseWithCursor {
1963 cursor: ResponseCursor(id),
1964 event: JoinSetResponseEventOuter {
1965 event: JoinSetResponseEvent { join_set_id, event },
1966 created_at,
1967 },
1968 })
1969}
1970
1971#[instrument(level = Level::TRACE, skip_all, fields(%execution_id, %run_id, %executor_id))]
1972#[expect(clippy::too_many_arguments)]
1973async fn lock_single_execution(
1974 tx: &Transaction<'_>,
1975 created_at: DateTime<Utc>,
1976 component_id: &ComponentId,
1977 deployment_id: DeploymentId,
1978 execution_id: &ExecutionId,
1979 run_id: RunId,
1980 appending_version: &Version,
1981 executor_id: ExecutorId,
1982 lock_expires_at: DateTime<Utc>,
1983 retry_config: ComponentRetryConfig,
1984) -> Result<LockedExecution, DbErrorWrite> {
1985 trace!("lock_single_execution");
1986
1987 let combined_state = get_combined_state(tx, execution_id).await?;
1989 combined_state
1990 .execution_with_state
1991 .pending_state
1992 .can_append_lock(created_at, executor_id, run_id, lock_expires_at)?;
1993 let expected_version = combined_state.get_next_version_assert_not_finished();
1994 check_expected_next_and_appending_version(&expected_version, appending_version)?;
1995
1996 let locked_event = Locked {
1998 component_id: component_id.clone(),
1999 deployment_id,
2000 executor_id,
2001 lock_expires_at,
2002 run_id,
2003 retry_config,
2004 };
2005 let event = ExecutionRequest::Locked(locked_event.clone());
2006
2007 let event = Json(event);
2008
2009 tx.execute(
2011 "INSERT INTO t_execution_log \
2012 (execution_id, created_at, json_value, version, variant) \
2013 VALUES ($1, $2, $3, $4, $5)",
2014 &[
2015 &execution_id.to_string(),
2016 &created_at,
2017 &event,
2018 &i64::from(appending_version.0),
2019 &event.0.variant(),
2020 ],
2021 )
2022 .await
2023 .map_err(|err| {
2024 DbErrorWrite::NonRetriable(DbErrorWriteNonRetriable::IllegalState {
2025 reason: "cannot lock".into(),
2026 context: SpanTrace::capture(),
2027 source: Some(Arc::new(err)),
2028 loc: Location::caller(),
2029 })
2030 })?;
2031
2032 let responses = list_responses(tx, execution_id, None).await?;
2033 trace!("Responses: {responses:?}");
2034
2035 let intermittent_event_count = update_state_locked_get_intermittent_event_count(
2037 tx,
2038 execution_id,
2039 deployment_id,
2040 &component_id.component_digest,
2041 executor_id,
2042 run_id,
2043 lock_expires_at,
2044 appending_version,
2045 retry_config,
2046 )
2047 .await?;
2048
2049 let rows = tx
2052 .query(
2053 "SELECT json_value, version FROM t_execution_log WHERE \
2054 execution_id = $1 AND (variant = $2 OR variant = $3) \
2055 ORDER BY version",
2056 &[
2057 &execution_id.to_string(),
2058 &DUMMY_CREATED.variant(),
2059 &DUMMY_HISTORY_EVENT.variant(),
2060 ],
2061 )
2062 .await
2063 .map_err(DbErrorGeneric::from)?;
2064
2065 let mut events: VecDeque<ExecutionEvent> = VecDeque::new();
2066
2067 for row in rows {
2068 let event: Json<ExecutionRequest> = get(&row, "json_value")?;
2069 let event = event.0;
2070
2071 let version: i64 = get(&row, "version")?;
2072 let version = Version::try_from(version)
2073 .map_err(|_| consistency_db_err("version must be non-negative"))?;
2074
2075 events.push_back(ExecutionEvent {
2076 created_at: DateTime::from_timestamp_nanos(0), event,
2078 backtrace_id: None,
2079 version,
2080 });
2081 }
2082
2083 let Some(ExecutionRequest::Created {
2085 ffqn,
2086 params,
2087 parent,
2088 metadata,
2089 ..
2090 }) = events.pop_front().map(|outer| outer.event)
2091 else {
2092 error!("Execution log must contain at least `Created` event");
2093 return Err(consistency_db_err("execution log must contain `Created` event").into());
2094 };
2095
2096 let mut event_history = Vec::new();
2098 for ExecutionEvent { event, version, .. } in events {
2099 if let ExecutionRequest::HistoryEvent { event } = event {
2100 event_history.push((event, version));
2101 } else {
2102 error!("Rows can only contain `Created` and `HistoryEvent` event kinds");
2103 return Err(consistency_db_err(
2104 "rows can only contain `Created` and `HistoryEvent` event kinds",
2105 )
2106 .into());
2107 }
2108 }
2109
2110 Ok(LockedExecution {
2111 execution_id: execution_id.clone(),
2112 metadata,
2113 next_version: appending_version.increment(),
2114 ffqn,
2115 params,
2116 event_history,
2117 responses,
2118 parent,
2119 intermittent_event_count,
2120 locked_event,
2121 })
2122}
2123
2124async fn count_join_next(
2125 tx: &Transaction<'_>,
2126 execution_id: &ExecutionId,
2127 join_set_id: &JoinSetId,
2128) -> Result<u32, DbErrorRead> {
2129 let row = tx
2130 .query_one(
2131 "SELECT COUNT(*) as count FROM t_execution_log WHERE execution_id = $1 AND join_set_id = $2 \
2132 AND history_event_type = $3",
2133 &[
2134 &execution_id.to_string(),
2135 &join_set_id.to_string(),
2136 &HISTORY_EVENT_TYPE_JOIN_NEXT,
2137 ],
2138 )
2139 .await
2140 .map_err(DbErrorRead::from)?;
2141
2142 let count = u32::try_from(get::<i64, _>(&row, "count")?).expect("COUNT cannot be negative");
2143 Ok(count)
2144}
2145
2146async fn nth_response(
2147 tx: &Transaction<'_>,
2148 execution_id: &ExecutionId,
2149 join_set_id: &JoinSetId,
2150 skip_rows: u32,
2151) -> Result<Option<ResponseWithCursor>, DbErrorRead> {
2152 let row = tx
2153 .query_opt(
2154 "SELECT r.id, r.created_at, r.join_set_id, \
2155 r.delay_id, r.delay_success, \
2156 r.child_execution_id, r.finished_version, l.json_value \
2157 FROM t_join_set_response r LEFT OUTER JOIN t_execution_log l ON r.child_execution_id = l.execution_id \
2158 WHERE \
2159 r.execution_id = $1 AND r.join_set_id = $2 AND \
2160 ( \
2161 r.finished_version = l.version \
2162 OR \
2163 r.child_execution_id IS NULL \
2164 ) \
2165 ORDER BY id \
2166 LIMIT 1 OFFSET $3",
2167 &[
2168 &execution_id.to_string(),
2169 &join_set_id.to_string(),
2170 &i64::from(skip_rows),
2171 ]
2172 )
2173 .await
2174 .map_err(DbErrorRead::from)?;
2175
2176 match row {
2177 Some(r) => Ok(Some(parse_response_with_cursor(&r)?)),
2178 None => Ok(None),
2179 }
2180}
2181
2182#[instrument(level = Level::TRACE, skip_all, fields(%execution_id))]
2183async fn append(
2184 tx: &Transaction<'_>,
2185 execution_id: &ExecutionId,
2186 req: AppendRequest,
2187 appending_version: Version,
2188) -> Result<(AppendResponse, AppendNotifier), DbErrorWrite> {
2189 if matches!(req.event, ExecutionRequest::Created { .. }) {
2190 return Err(DbErrorWrite::NonRetriable(
2191 DbErrorWriteNonRetriable::ValidationFailed(
2192 "cannot append `Created` event - use `create` instead".into(),
2193 ),
2194 ));
2195 }
2196
2197 if let AppendRequest {
2198 event:
2199 ExecutionRequest::Locked(Locked {
2200 component_id,
2201 deployment_id,
2202 executor_id,
2203 run_id,
2204 lock_expires_at,
2205 retry_config,
2206 }),
2207 created_at,
2208 } = req
2209 {
2210 return lock_single_execution(
2211 tx,
2212 created_at,
2213 &component_id,
2214 deployment_id,
2215 execution_id,
2216 run_id,
2217 &appending_version,
2218 executor_id,
2219 lock_expires_at,
2220 retry_config,
2221 )
2222 .await
2223 .map(|locked_execution| (locked_execution.next_version, AppendNotifier::default()));
2224 }
2225
2226 let combined_state = get_combined_state(tx, execution_id).await?;
2227 if combined_state
2228 .execution_with_state
2229 .pending_state
2230 .is_finished()
2231 {
2232 debug!("Execution is already finished");
2233 return Err(DbErrorWrite::NonRetriable(
2234 DbErrorWriteNonRetriable::AlreadyFinished,
2235 ));
2236 }
2237
2238 check_expected_next_and_appending_version(
2239 &combined_state.get_next_version_assert_not_finished(),
2240 &appending_version,
2241 )?;
2242
2243 let event = Json(req.event);
2244
2245 tx.execute(
2247 "INSERT INTO t_execution_log (execution_id, created_at, json_value, version, variant, join_set_id) \
2248 VALUES ($1, $2, $3, $4, $5, $6)",
2249 &[
2250 &execution_id.to_string(),
2251 &req.created_at,
2252 &event,
2253 &i64::from(appending_version.0),
2254 &event.0.variant(),
2255 &event.0.join_set_id().map(std::string::ToString::to_string),
2256 ],
2257 )
2258 .await?;
2259
2260 match &event.0 {
2262 ExecutionRequest::Created { .. } => {
2263 unreachable!("handled in the caller")
2264 }
2265
2266 ExecutionRequest::Locked { .. } => {
2267 unreachable!("handled above")
2268 }
2269
2270 ExecutionRequest::TemporarilyFailed {
2271 backoff_expires_at, ..
2272 }
2273 | ExecutionRequest::TemporarilyTimedOut {
2274 backoff_expires_at, ..
2275 } => {
2276 let (next_version, notifier) = update_state_pending_after_event_appended(
2277 tx,
2278 execution_id,
2279 &appending_version,
2280 *backoff_expires_at,
2281 true, combined_state.execution_with_state.component_digest,
2283 )
2284 .await?;
2285 return Ok((next_version, notifier));
2286 }
2287
2288 ExecutionRequest::Unlocked {
2289 backoff_expires_at, ..
2290 } => {
2291 let (next_version, notifier) = update_state_pending_after_event_appended(
2292 tx,
2293 execution_id,
2294 &appending_version,
2295 *backoff_expires_at,
2296 false, combined_state.execution_with_state.component_digest,
2298 )
2299 .await?;
2300 return Ok((next_version, notifier));
2301 }
2302
2303 ExecutionRequest::Paused => {
2304 match &combined_state.execution_with_state.pending_state {
2305 PendingState::Finished { .. } => {
2306 unreachable!("handled above");
2307 }
2308 PendingState::Paused(..) => {
2309 return Err(DbErrorWriteNonRetriable::IllegalState {
2310 reason: "cannot pause, execution is already paused".into(),
2311 context: SpanTrace::capture(),
2312 source: None,
2313 loc: Location::caller(),
2314 }
2315 .into());
2316 }
2317 _ => {}
2318 }
2319 let next_version =
2320 update_state_paused(tx, execution_id, &appending_version, true).await?;
2321 return Ok((next_version, AppendNotifier::default()));
2322 }
2323
2324 ExecutionRequest::Unpaused => {
2325 if !combined_state
2326 .execution_with_state
2327 .pending_state
2328 .is_paused()
2329 {
2330 return Err(DbErrorWriteNonRetriable::IllegalState {
2331 reason: "cannot unpause, execution is not paused".into(),
2332 context: SpanTrace::capture(),
2333 source: None,
2334 loc: Location::caller(),
2335 }
2336 .into());
2337 }
2338 let next_version =
2339 update_state_paused(tx, execution_id, &appending_version, false).await?;
2340 return Ok((next_version, AppendNotifier::default()));
2341 }
2342
2343 ExecutionRequest::Finished { retval, .. } => {
2344 update_state_finished(
2345 tx,
2346 execution_id,
2347 &appending_version,
2348 req.created_at,
2349 PendingStateFinishedResultKind::from(retval),
2350 )
2351 .await?;
2352 return Ok((
2353 appending_version,
2354 AppendNotifier {
2355 pending_at: None,
2356 execution_finished: Some(NotifierExecutionFinished {
2357 execution_id: execution_id.clone(),
2358 retval: retval.clone(),
2359 }),
2360 response: None,
2361 },
2362 ));
2363 }
2364
2365 ExecutionRequest::HistoryEvent {
2366 event:
2367 HistoryEvent::JoinSetCreate { .. }
2368 | HistoryEvent::JoinSetRequest {
2369 request: JoinSetRequest::ChildExecutionRequest { .. },
2370 ..
2371 }
2372 | HistoryEvent::Persist { .. }
2373 | HistoryEvent::Schedule { .. }
2374 | HistoryEvent::Stub { .. }
2375 | HistoryEvent::JoinNextTooMany { .. }
2376 | HistoryEvent::JoinNextTry { .. },
2377 } => {
2378 return Ok((
2379 bump_state_next_version(tx, execution_id, &appending_version, None).await?,
2380 AppendNotifier::default(),
2381 ));
2382 }
2383
2384 ExecutionRequest::HistoryEvent {
2385 event:
2386 HistoryEvent::JoinSetRequest {
2387 join_set_id,
2388 request:
2389 JoinSetRequest::DelayRequest {
2390 delay_id,
2391 expires_at,
2392 ..
2393 },
2394 },
2395 } => {
2396 return Ok((
2397 bump_state_next_version(
2398 tx,
2399 execution_id,
2400 &appending_version,
2401 Some(DelayReq {
2402 join_set_id: join_set_id.clone(),
2403 delay_id: delay_id.clone(),
2404 expires_at: *expires_at,
2405 }),
2406 )
2407 .await?,
2408 AppendNotifier::default(),
2409 ));
2410 }
2411
2412 ExecutionRequest::HistoryEvent {
2413 event:
2414 HistoryEvent::JoinNext {
2415 join_set_id,
2416 run_expires_at,
2417 closing,
2418 requested_ffqn: _,
2419 },
2420 } => {
2421 let join_next_count = count_join_next(tx, execution_id, join_set_id).await?;
2423
2424 let nth_response =
2426 nth_response(tx, execution_id, join_set_id, join_next_count - 1).await?;
2427
2428 trace!("join_next_count: {join_next_count}, nth_response: {nth_response:?}");
2429 assert!(join_next_count > 0);
2430
2431 if let Some(ResponseWithCursor {
2432 event:
2433 JoinSetResponseEventOuter {
2434 created_at: nth_created_at,
2435 ..
2436 },
2437 cursor: _,
2438 }) = nth_response
2439 {
2440 let scheduled_at = std::cmp::max(*run_expires_at, nth_created_at);
2441 let (next_version, notifier) = update_state_pending_after_event_appended(
2442 tx,
2443 execution_id,
2444 &appending_version,
2445 scheduled_at,
2446 false, combined_state.execution_with_state.component_digest,
2448 )
2449 .await?;
2450 return Ok((next_version, notifier));
2451 }
2452
2453 return Ok((
2454 update_state_blocked(
2455 tx,
2456 execution_id,
2457 &appending_version,
2458 join_set_id,
2459 *run_expires_at,
2460 *closing,
2461 )
2462 .await?,
2463 AppendNotifier::default(),
2464 ));
2465 }
2466 }
2467}
2468
2469async fn append_response(
2470 tx: &Transaction<'_>,
2471 execution_id: &ExecutionId,
2472 event: JoinSetResponseEventOuter,
2473) -> Result<AppendNotifier, DbErrorWrite> {
2474 let join_set_id = &event.event.join_set_id;
2475
2476 let (delay_id, delay_success) = match &event.event.event {
2477 JoinSetResponse::DelayFinished { delay_id, result } => {
2478 (Some(delay_id.to_string()), Some(result.is_ok()))
2479 }
2480 JoinSetResponse::ChildExecutionFinished { .. } => (None, None),
2481 };
2482
2483 let (child_execution_id, finished_version) = match &event.event.event {
2484 JoinSetResponse::ChildExecutionFinished {
2485 child_execution_id,
2486 finished_version,
2487 result: _,
2488 } => (
2489 Some(child_execution_id.to_string()),
2490 Some(i64::from(finished_version.0)),
2491 ),
2492 JoinSetResponse::DelayFinished { .. } => (None, None),
2493 };
2494
2495 let row = tx.query_one(
2496 "INSERT INTO t_join_set_response (execution_id, created_at, join_set_id, delay_id, delay_success, child_execution_id, finished_version) \
2497 VALUES ($1, $2, $3, $4, $5, $6, $7) RETURNING id",
2498 &[
2499 &execution_id.to_string(),
2500 &event.created_at,
2501 &join_set_id.to_string(),
2502 &delay_id,
2503 &delay_success,
2504 &child_execution_id,
2505 &finished_version,
2506 ]
2507 ).await?;
2508 let cursor = ResponseCursor(
2509 u32::try_from(get::<i64, _>(&row, 0)?)
2510 .map_err(|_| consistency_db_err("t_join_set_response.id must not be negative"))?,
2511 );
2512 let combined_state = get_combined_state(tx, execution_id).await?;
2514 debug!("previous_pending_state: {combined_state:?}");
2515
2516 let mut notifier = if let PendingStateMergedPause::BlockedByJoinSet {
2517 state:
2518 PendingStateBlockedByJoinSet {
2519 join_set_id: found_join_set_id,
2520 lock_expires_at, closing: _,
2522 },
2523 paused: _,
2524 } =
2525 PendingStateMergedPause::from(combined_state.execution_with_state.pending_state)
2526 && *join_set_id == found_join_set_id
2527 {
2528 let scheduled_at = std::cmp::max(lock_expires_at, event.created_at);
2529 update_state_pending_after_response_appended(
2531 tx,
2532 execution_id,
2533 scheduled_at,
2534 combined_state.execution_with_state.component_digest,
2535 )
2536 .await?
2537 } else {
2538 AppendNotifier::default()
2539 };
2540
2541 if let JoinSetResponseEvent {
2542 join_set_id,
2543 event:
2544 JoinSetResponse::DelayFinished {
2545 delay_id,
2546 result: _,
2547 },
2548 } = &event.event
2549 {
2550 debug!(%join_set_id, %delay_id, "Deleting from `t_delay`");
2551 tx.execute(
2552 "DELETE FROM t_delay WHERE execution_id = $1 AND join_set_id = $2 AND delay_id = $3",
2553 &[
2554 &execution_id.to_string(),
2555 &join_set_id.to_string(),
2556 &delay_id.to_string(),
2557 ],
2558 )
2559 .await?;
2560 }
2561
2562 notifier.response = Some((execution_id.clone(), ResponseWithCursor { cursor, event }));
2563 Ok(notifier)
2564}
2565
2566async fn append_backtrace(
2567 tx: &Transaction<'_>,
2568 backtrace_info: &BacktraceInfo,
2569) -> Result<(), DbErrorWrite> {
2570 let backtrace_hash = backtrace_info.wasm_backtrace.hash();
2572
2573 tx.execute(
2575 "INSERT INTO t_wasm_backtrace (backtrace_hash, wasm_backtrace) \
2576 VALUES ($1, $2) \
2577 ON CONFLICT (backtrace_hash) DO NOTHING",
2578 &[
2579 &backtrace_hash.as_slice(),
2580 &Json(&backtrace_info.wasm_backtrace),
2581 ],
2582 )
2583 .await?;
2584
2585 tx.execute(
2587 "INSERT INTO t_execution_backtrace \
2588 (execution_id, component_id, version_min_including, version_max_excluding, backtrace_hash) \
2589 VALUES ($1, $2, $3, $4, $5)",
2590 &[
2591 &backtrace_info.execution_id.to_string(),
2592 &Json(&backtrace_info.component_id),
2593 &i64::from(backtrace_info.version_min_including.0),
2594 &i64::from(backtrace_info.version_max_excluding.0),
2595 &backtrace_hash.as_slice(),
2596 ],
2597 )
2598 .await?;
2599
2600 Ok(())
2601}
2602
2603async fn append_log(tx: &Transaction<'_>, row: &LogInfoAppendRow) -> Result<(), DbErrorWrite> {
2604 let (level, message, stream_type, payload, created_at) = match &row.log_entry {
2605 LogEntry::Log {
2606 created_at,
2607 level,
2608 message,
2609 } => (
2610 Some(*level as i32),
2611 Some(message.as_str()),
2612 None::<i32>,
2613 None::<&[u8]>,
2614 created_at,
2615 ),
2616 LogEntry::Stream {
2617 created_at,
2618 payload,
2619 stream_type,
2620 } => (
2621 None::<i32>,
2622 None::<&str>,
2623 Some(*stream_type as i32),
2624 Some(payload.as_slice()),
2625 created_at,
2626 ),
2627 };
2628
2629 tx.execute(
2630 "INSERT INTO t_log (
2631 execution_id,
2632 run_id,
2633 created_at,
2634 level,
2635 message,
2636 stream_type,
2637 payload
2638 ) VALUES ($1, $2, $3, $4, $5, $6, $7)",
2639 &[
2640 &row.execution_id.to_string(),
2641 &row.run_id.to_string(),
2642 &created_at,
2643 &level,
2644 &message,
2645 &stream_type,
2646 &payload,
2647 ],
2648 )
2649 .await?;
2650
2651 Ok(())
2652}
2653
2654async fn get_execution_log(
2655 tx: &Transaction<'_>,
2656 execution_id: &ExecutionId,
2657) -> Result<concepts::storage::ExecutionLog, DbErrorRead> {
2658 let rows = tx
2659 .query(
2660 "SELECT created_at, json_value, version FROM t_execution_log WHERE \
2661 execution_id = $1 ORDER BY version",
2662 &[&execution_id.to_string()],
2663 )
2664 .await
2665 .map_err(DbErrorRead::from)?;
2666
2667 if rows.is_empty() {
2668 return Err(DbErrorRead::NotFound);
2669 }
2670
2671 let mut events = Vec::with_capacity(rows.len());
2672 for row in rows {
2673 let created_at: DateTime<Utc> = get(&row, "created_at")?;
2674 let event: Json<ExecutionRequest> = get(&row, "json_value")?;
2675 let event = event.0;
2676 let version: i64 = get(&row, "version")?;
2677 let version = Version::try_from(version)
2678 .map_err(|_| consistency_db_err("version must be non-negative"))?;
2679
2680 events.push(ExecutionEvent {
2681 created_at,
2682 event,
2683 backtrace_id: None,
2684 version,
2685 });
2686 }
2687
2688 let combined_state = get_combined_state(tx, execution_id).await?;
2689 let responses = list_responses(tx, execution_id, None).await?;
2690
2691 Ok(concepts::storage::ExecutionLog {
2692 execution_id: execution_id.clone(),
2693 events,
2694 responses,
2695 next_version: combined_state.get_next_version_or_finished(),
2696 pending_state: combined_state.execution_with_state.pending_state,
2697 component_digest: combined_state.execution_with_state.component_digest,
2698 component_type: combined_state.execution_with_state.component_type,
2699 deployment_id: combined_state.execution_with_state.deployment_id,
2700 })
2701}
2702
2703async fn get_max_version(
2704 tx: &Transaction<'_>,
2705 execution_id: &ExecutionId,
2706) -> Result<Version, DbErrorRead> {
2707 let row = tx
2708 .query_one(
2709 "SELECT MAX(version) as version FROM t_execution_log WHERE execution_id = $1",
2710 &[&execution_id.to_string()],
2711 )
2712 .await?;
2713 let max_version: i64 = get(&row, "version")?;
2714 let max_version = Version::try_from(max_version)
2715 .map_err(|_| consistency_db_err("version must be non-negative"))?;
2716 Ok(max_version)
2717}
2718
2719async fn get_max_response_cursor(
2720 tx: &Transaction<'_>,
2721 execution_id: &ExecutionId,
2722) -> Result<ResponseCursor, DbErrorRead> {
2723 let row = tx
2724 .query_one(
2725 "SELECT MAX(id) as id FROM t_join_set_response WHERE execution_id = $1",
2726 &[&execution_id.to_string()],
2727 )
2728 .await?;
2729 let max_cursor = get::<Option<i64>, _>(&row, "id")?.unwrap_or_default();
2731 let max_cursor = ResponseCursor(
2732 u32::try_from(max_cursor).map_err(|_| consistency_db_err("id must not be negative"))?,
2733 );
2734 Ok(max_cursor)
2735}
2736
2737async fn list_execution_events(
2738 tx: &Transaction<'_>,
2739 execution_id: &ExecutionId,
2740 pagination: Pagination<VersionType>,
2741 include_backtrace_id: bool,
2742) -> Result<Vec<ExecutionEvent>, DbErrorRead> {
2743 let mut params: Vec<Box<dyn ToSql + Send + Sync>> = Vec::new();
2744 let mut add_param = |p: Box<dyn ToSql + Send + Sync>| {
2745 params.push(p);
2746 format!("${}", params.len())
2747 };
2748
2749 let p_execution_id = add_param(Box::new(execution_id.to_string()));
2750
2751 let (cursor, length, rel, is_desc) = match &pagination {
2752 Pagination::NewerThan {
2753 cursor,
2754 length,
2755 including_cursor,
2756 } => (
2757 *cursor,
2758 *length,
2759 if *including_cursor { ">=" } else { ">" },
2760 false,
2761 ),
2762 Pagination::OlderThan {
2763 cursor,
2764 length,
2765 including_cursor,
2766 } => (
2767 *cursor,
2768 *length,
2769 if *including_cursor { "<=" } else { "<" },
2770 true,
2771 ),
2772 };
2773 let p_cursor = add_param(Box::new(i64::from(cursor)));
2774 let p_limit = add_param(Box::new(i64::from(length)));
2775
2776 let base_select = if include_backtrace_id {
2777 format!(
2778 "SELECT
2779 log.created_at,
2780 log.json_value,
2781 log.version,
2782 bt.version_min_including AS backtrace_id
2783 FROM
2784 t_execution_log AS log
2785 LEFT OUTER JOIN
2786 t_execution_backtrace AS bt ON log.execution_id = bt.execution_id
2787 AND log.version >= bt.version_min_including
2788 AND log.version < bt.version_max_excluding
2789 WHERE
2790 log.execution_id = {p_execution_id}
2791 AND log.version {rel} {p_cursor}"
2792 )
2793 } else {
2794 format!(
2795 "SELECT
2796 created_at, json_value, NULL::BIGINT as backtrace_id, version
2797 FROM t_execution_log WHERE
2798 execution_id = {p_execution_id} AND version {rel} {p_cursor}"
2799 )
2800 };
2801
2802 let order = if is_desc { "DESC" } else { "ASC" };
2803 let mut sql = format!("{base_select} ORDER BY version {order} LIMIT {p_limit}");
2804
2805 if is_desc {
2807 sql = format!("SELECT * FROM ({sql}) AS sub ORDER BY version ASC");
2808 }
2809
2810 let params_refs: Vec<&(dyn ToSql + Sync)> = params
2811 .iter()
2812 .map(|p| p.as_ref() as &(dyn ToSql + Sync))
2813 .collect();
2814
2815 let rows = tx
2816 .query(&sql, ¶ms_refs)
2817 .await
2818 .map_err(DbErrorRead::from)?;
2819
2820 let mut events = Vec::with_capacity(rows.len());
2821 for row in rows {
2822 let created_at: DateTime<Utc> = get(&row, "created_at")?;
2823 let backtrace_id = get::<Option<i64>, _>(&row, "backtrace_id")?
2824 .map(Version::try_from)
2825 .transpose()
2826 .map_err(|_| consistency_db_err("version must be non-negative"))?;
2827
2828 let version = get::<i64, _>(&row, "version")?;
2829 let version = Version::new(
2830 VersionType::try_from(version)
2831 .map_err(|_| consistency_db_err("version must be non-negative"))?,
2832 );
2833 let event_req: Json<ExecutionRequest> = get(&row, "json_value")?;
2834 let event_req = event_req.0;
2835
2836 events.push(ExecutionEvent {
2837 created_at,
2838 event: event_req,
2839 backtrace_id,
2840 version,
2841 });
2842 }
2843 Ok(events)
2844}
2845
2846async fn get_execution_event(
2847 tx: &Transaction<'_>,
2848 execution_id: &ExecutionId,
2849 version: VersionType,
2850) -> Result<ExecutionEvent, DbErrorRead> {
2851 let row = tx
2852 .query_one(
2853 "SELECT created_at, json_value, version FROM t_execution_log WHERE \
2854 execution_id = $1 AND version = $2",
2855 &[&execution_id.to_string(), &i64::from(version)],
2856 )
2857 .await?;
2858
2859 let created_at: DateTime<Utc> = get(&row, "created_at")?;
2860 let json_val: Json<ExecutionRequest> = get(&row, "json_value")?;
2861 let version = get::<i64, _>(&row, "version")?;
2862 let version = Version::try_from(version)
2863 .map_err(|_| consistency_db_err("version must be non-negative"))?;
2864 let event = json_val.0;
2865
2866 Ok(ExecutionEvent {
2867 created_at,
2868 event,
2869 backtrace_id: None,
2870 version,
2871 })
2872}
2873
2874async fn get_last_execution_event(
2875 tx: &Transaction<'_>,
2876 execution_id: &ExecutionId,
2877) -> Result<ExecutionEvent, DbErrorRead> {
2878 let row = tx
2879 .query_one(
2880 "SELECT created_at, json_value, version FROM t_execution_log WHERE \
2881 execution_id = $1 ORDER BY version DESC LIMIT 1",
2882 &[&execution_id.to_string()],
2883 )
2884 .await?;
2885
2886 let created_at: DateTime<Utc> = get(&row, "created_at")?;
2887 let event: Json<ExecutionRequest> = get(&row, "json_value")?;
2888 let event = event.0;
2889 let version: i64 = get(&row, "version")?;
2890 let version = Version::try_from(version)
2891 .map_err(|_| consistency_db_err("version must be non-negative"))?;
2892
2893 Ok(ExecutionEvent {
2894 created_at,
2895 event,
2896 backtrace_id: None,
2897 version,
2898 })
2899}
2900
2901async fn delay_response(
2902 tx: &Transaction<'_>,
2903 execution_id: &ExecutionId,
2904 delay_id: &DelayId,
2905) -> Result<Option<bool>, DbErrorRead> {
2906 let row = tx
2907 .query_opt(
2908 "SELECT delay_success \
2909 FROM t_join_set_response \
2910 WHERE \
2911 execution_id = $1 AND delay_id = $2",
2912 &[&execution_id.to_string(), &delay_id.to_string()],
2913 )
2914 .await?;
2915
2916 match row {
2917 Some(r) => Ok(Some(get::<bool, _>(&r, "delay_success")?)),
2918 None => Ok(None),
2919 }
2920}
2921
2922#[instrument(level = Level::TRACE, skip_all)]
2923async fn get_responses_after(
2924 tx: &Transaction<'_>,
2925 execution_id: &ExecutionId,
2926 last_response: ResponseCursor,
2927) -> Result<Vec<ResponseWithCursor>, DbErrorRead> {
2928 let rows = tx
2929 .query(
2930 "SELECT r.id, r.created_at, r.join_set_id, \
2931 r.delay_id, r.delay_success, \
2932 r.child_execution_id, r.finished_version, child.json_value \
2933 FROM t_join_set_response r LEFT OUTER JOIN t_execution_log child ON r.child_execution_id = child.execution_id \
2934 WHERE \
2935 r.id > $1 AND \
2936 r.execution_id = $2 AND \
2937 ( \
2938 r.finished_version = child.version \
2939 OR \
2940 r.child_execution_id IS NULL \
2941 ) \
2942 ORDER BY id \
2943 ",
2944 &[
2945 &i64::from(last_response.0),
2946 &execution_id.to_string(),
2947 ]
2948 )
2949 .await?;
2950
2951 let mut results = Vec::with_capacity(rows.len());
2952 for row in rows {
2953 let resp = parse_response_with_cursor(&row)?;
2954 results.push(resp);
2955 }
2956 Ok(results)
2957}
2958
2959async fn get_pending_of_single_ffqn(
2960 tx: &Transaction<'_>,
2961 batch_size: u32,
2962 pending_at_or_sooner: DateTime<Utc>,
2963 ffqn: &FunctionFqn,
2964 select_strategy: SelectStrategy,
2965) -> Result<Vec<(ExecutionId, Version)>, ()> {
2966 let rows = tx
2967 .query(
2968 &format!(
2969 r"
2970 SELECT execution_id, corresponding_version FROM t_state
2971 WHERE
2972 state = '{STATE_PENDING_AT}' AND
2973 pending_expires_finished <= $1 AND ffqn = $2
2974 AND is_paused = false
2975 ORDER BY pending_expires_finished
2976 {}
2977 LIMIT $3
2978 ",
2979 if select_strategy == SelectStrategy::LockForUpdate {
2980 "FOR UPDATE SKIP LOCKED"
2981 } else {
2982 ""
2983 }
2984 ),
2985 &[
2986 &pending_at_or_sooner,
2987 &ffqn.to_string(),
2988 &(i64::from(batch_size)),
2989 ],
2990 )
2991 .await
2992 .map_err(|err| {
2993 warn!("Ignoring consistency error {err:?}");
2994 })?;
2995
2996 let mut result = Vec::with_capacity(rows.len());
2997 for row in rows {
2998 let unpack = || -> Result<(ExecutionId, Version), DbErrorGeneric> {
2999 let eid_str: String = get(&row, "execution_id")?;
3000 let corresponding_version: i64 = get(&row, "corresponding_version")?;
3001 let corresponding_version = Version::try_from(corresponding_version)
3002 .map_err(|_| consistency_db_err("version must be non-negative"))?;
3003
3004 if let Ok(eid) = ExecutionId::from_str(&eid_str) {
3005 return Ok((eid, corresponding_version.increment()));
3006 }
3007 Err(consistency_db_err("invalid execution_id"))
3008 };
3009
3010 match unpack() {
3011 Ok(val) => result.push(val),
3012 Err(err) => warn!("Ignoring corrupted row in pending check: {err:?}"),
3013 }
3014 }
3015 Ok(result)
3016}
3017
3018async fn get_pending_by_ffqns(
3020 tx: &Transaction<'_>,
3021 batch_size: u32,
3022 pending_at_or_sooner: DateTime<Utc>,
3023 ffqns: &[FunctionFqn],
3024 select_strategy: SelectStrategy,
3025) -> Result<Vec<(ExecutionId, Version)>, DbErrorGeneric> {
3026 let batch_size = usize::try_from(batch_size).expect("16 bit systems are unsupported");
3027 let mut execution_ids_versions = Vec::with_capacity(batch_size);
3028
3029 for ffqn in ffqns {
3030 let needed = batch_size - execution_ids_versions.len();
3031 if needed == 0 {
3032 break;
3033 }
3034 let needed = u32::try_from(needed).expect("u32 - usize cannot overflow an 32");
3035 if let Ok(execs) =
3036 get_pending_of_single_ffqn(tx, needed, pending_at_or_sooner, ffqn, select_strategy)
3037 .await
3038 {
3039 execution_ids_versions.extend(execs);
3040 }
3041 }
3042
3043 Ok(execution_ids_versions)
3044}
3045
3046#[derive(Debug, Copy, Clone, PartialEq, Eq)]
3047enum SelectStrategy {
3048 Read,
3049 LockForUpdate,
3050}
3051
3052async fn get_pending_by_component_input_digest(
3053 tx: &Transaction<'_>,
3054 batch_size: u32,
3055 pending_at_or_sooner: DateTime<Utc>,
3056 input_digest: &ComponentDigest,
3057 select_strategy: SelectStrategy,
3058) -> Result<Vec<(ExecutionId, Version)>, DbErrorGeneric> {
3059 let rows = tx
3060 .query(
3061 &format!(
3062 r"
3063 SELECT execution_id, corresponding_version FROM t_state WHERE
3064 state = '{STATE_PENDING_AT}' AND
3065 pending_expires_finished <= $1 AND
3066 component_id_input_digest = $2
3067 AND is_paused = false
3068 ORDER BY pending_expires_finished
3069 {}
3070 LIMIT $3
3071 ",
3072 if select_strategy == SelectStrategy::LockForUpdate {
3073 "FOR UPDATE SKIP LOCKED"
3074 } else {
3075 ""
3076 }
3077 ),
3078 &[&pending_at_or_sooner, &input_digest, &i64::from(batch_size)],
3079 )
3080 .await?;
3081
3082 let mut result = Vec::with_capacity(rows.len());
3083 for row in rows {
3084 let unpack = || -> Result<(ExecutionId, Version), DbErrorGeneric> {
3085 let eid_str: String = get(&row, "execution_id")?;
3086 let corresponding_version: i64 = get(&row, "corresponding_version")?;
3087 let corresponding_version = Version::try_from(corresponding_version)
3088 .map_err(|_| consistency_db_err("version must be non-negative"))?;
3089
3090 let eid = ExecutionId::from_str(&eid_str)
3091 .map_err(|err| consistency_db_err(err.to_string()))?;
3092 Ok((eid, corresponding_version.increment()))
3093 };
3094
3095 match unpack() {
3096 Ok(val) => result.push(val),
3097 Err(err) => {
3098 warn!("Skipping corrupted row in get_pending_by_component_input_digest: {err:?}");
3099 }
3100 }
3101 }
3102
3103 Ok(result)
3104}
3105
3106fn notify_pending_locked(
3107 notifier: &NotifierPendingAt,
3108 current_time: DateTime<Utc>,
3109 ffqn_to_pending_subscription: &std::sync::MutexGuard<PendingFfqnSubscribersHolder>,
3110) {
3111 if notifier.scheduled_at <= current_time {
3112 ffqn_to_pending_subscription.notify(notifier);
3113 }
3114}
3115
3116async fn upgrade_execution_component(
3117 tx: &Transaction<'_>,
3118 execution_id: &ExecutionId,
3119 old: &ComponentDigest,
3120 new: &ComponentDigest,
3121) -> Result<(), DbErrorWrite> {
3122 debug!("Updating t_state to component {new}");
3123
3124 let updated = tx
3125 .execute(
3126 r"
3127 UPDATE t_state
3128 SET
3129 updated_at = CURRENT_TIMESTAMP,
3130 component_id_input_digest = $1
3131 WHERE
3132 execution_id = $2 AND
3133 component_id_input_digest = $3
3134 ",
3135 &[
3136 &new.as_slice(), &execution_id.to_string(), &old.as_slice(), ],
3140 )
3141 .await?;
3142
3143 if updated != 1 {
3144 return Err(DbErrorWrite::NotFound);
3145 }
3146 Ok(())
3147}
3148
3149impl PostgresConnection {
3150 #[instrument(level = Level::TRACE, skip_all)]
3152 fn notify_all(&self, notifiers: Vec<AppendNotifier>, current_time: DateTime<Utc>) {
3153 let (pending_ats, finished_execs, responses) = {
3154 let (mut pending_ats, mut finished_execs, mut responses) =
3155 (Vec::new(), Vec::new(), Vec::new());
3156 for notifier in notifiers {
3157 if let Some(pending_at) = notifier.pending_at {
3158 pending_ats.push(pending_at);
3159 }
3160 if let Some(finished) = notifier.execution_finished {
3161 finished_execs.push(finished);
3162 }
3163 if let Some(response) = notifier.response {
3164 responses.push(response);
3165 }
3166 }
3167 (pending_ats, finished_execs, responses)
3168 };
3169
3170 if !pending_ats.is_empty() {
3172 let guard = self.pending_subscribers.lock().unwrap();
3173 for pending_at in pending_ats {
3174 notify_pending_locked(&pending_at, current_time, &guard);
3175 }
3176 }
3177 if !finished_execs.is_empty() {
3179 let mut guard = self.execution_finished_subscribers.lock().unwrap();
3180 for finished in finished_execs {
3181 if let Some(listeners_of_exe_id) = guard.remove(&finished.execution_id) {
3182 for (_tag, sender) in listeners_of_exe_id {
3183 let _ = sender.send(finished.retval.clone());
3184 }
3185 }
3186 }
3187 }
3188 if !responses.is_empty() {
3190 let mut guard = self.response_subscribers.lock().unwrap();
3191 for (execution_id, response) in responses {
3192 if let Some((sender, _)) = guard.remove(&execution_id) {
3193 let _ = sender.send(response);
3194 }
3195 }
3196 }
3197 }
3198}
3199
3200#[async_trait]
3201impl DbExecutor for PostgresConnection {
3202 #[instrument(level = Level::TRACE, skip(self))]
3203 async fn lock_pending_by_ffqns(
3204 &self,
3205 batch_size: u32,
3206 pending_at_or_sooner: DateTime<Utc>,
3207 ffqns: Arc<[FunctionFqn]>,
3208 created_at: DateTime<Utc>,
3209 component_id: ComponentId,
3210 deployment_id: DeploymentId,
3211 executor_id: ExecutorId,
3212 lock_expires_at: DateTime<Utc>,
3213 run_id: RunId,
3214 retry_config: ComponentRetryConfig,
3215 ) -> Result<LockPendingResponse, DbErrorWrite> {
3216 let mut client_guard = self.client.lock().await;
3217 let tx = client_guard.transaction().await?;
3218
3219 let execution_ids_versions = get_pending_by_ffqns(
3220 &tx,
3221 batch_size,
3222 pending_at_or_sooner,
3223 &ffqns,
3224 SelectStrategy::LockForUpdate,
3225 )
3226 .await?;
3227
3228 if execution_ids_versions.is_empty() {
3229 tx.commit().await?;
3232 return Ok(vec![]);
3233 }
3234
3235 debug!("Locking {execution_ids_versions:?}");
3236
3237 let mut locked_execs = Vec::with_capacity(execution_ids_versions.len());
3239 for (execution_id, version) in execution_ids_versions {
3240 match lock_single_execution(
3241 &tx,
3242 created_at,
3243 &component_id,
3244 deployment_id,
3245 &execution_id,
3246 run_id,
3247 &version,
3248 executor_id,
3249 lock_expires_at,
3250 retry_config,
3251 )
3252 .await
3253 {
3254 Ok(locked) => locked_execs.push(locked),
3255 Err(err) => {
3256 tx.rollback().await?; debug!("Locking row {execution_id} failed - {err:?}");
3258 return Err(err);
3259 }
3260 }
3261 }
3262
3263 tx.commit().await?;
3264
3265 Ok(locked_execs)
3266 }
3267
3268 #[instrument(level = Level::TRACE, skip(self))]
3269 async fn lock_pending_by_component_digest(
3270 &self,
3271 batch_size: u32,
3272 pending_at_or_sooner: DateTime<Utc>,
3273 component_id: &ComponentId,
3274 deployment_id: DeploymentId,
3275 created_at: DateTime<Utc>,
3276 executor_id: ExecutorId,
3277 lock_expires_at: DateTime<Utc>,
3278 run_id: RunId,
3279 retry_config: ComponentRetryConfig,
3280 ) -> Result<LockPendingResponse, DbErrorWrite> {
3281 let mut client_guard = self.client.lock().await;
3282 let tx = client_guard.transaction().await?;
3283
3284 let execution_ids_versions = get_pending_by_component_input_digest(
3285 &tx,
3286 batch_size,
3287 pending_at_or_sooner,
3288 &component_id.component_digest,
3289 SelectStrategy::LockForUpdate,
3290 )
3291 .await?;
3292
3293 if execution_ids_versions.is_empty() {
3294 tx.commit().await?;
3295 return Ok(vec![]);
3296 }
3297
3298 debug!("Locking {execution_ids_versions:?}");
3299
3300 let mut locked_execs = Vec::with_capacity(execution_ids_versions.len());
3301 for (execution_id, version) in execution_ids_versions {
3302 match lock_single_execution(
3303 &tx,
3304 created_at,
3305 component_id,
3306 deployment_id,
3307 &execution_id,
3308 run_id,
3309 &version,
3310 executor_id,
3311 lock_expires_at,
3312 retry_config,
3313 )
3314 .await
3315 {
3316 Ok(locked) => locked_execs.push(locked),
3317 Err(err) => {
3318 tx.rollback().await?; debug!("Locking row {execution_id} failed - {err:?}");
3320 return Err(err);
3321 }
3322 }
3323 }
3324
3325 tx.commit().await?;
3326 Ok(locked_execs)
3327 }
3328
3329 #[cfg(feature = "test")]
3330 #[instrument(level = Level::DEBUG, skip(self))]
3331 async fn lock_one(
3332 &self,
3333 created_at: DateTime<Utc>,
3334 component_id: ComponentId,
3335 deployment_id: DeploymentId,
3336 execution_id: &ExecutionId,
3337 run_id: RunId,
3338 version: Version,
3339 executor_id: ExecutorId,
3340 lock_expires_at: DateTime<Utc>,
3341 retry_config: ComponentRetryConfig,
3342 ) -> Result<LockedExecution, DbErrorWrite> {
3343 debug!(%execution_id, "lock_one");
3344 let mut client_guard = self.client.lock().await;
3345 let tx = client_guard.transaction().await?;
3346
3347 let res = lock_single_execution(
3348 &tx,
3349 created_at,
3350 &component_id,
3351 deployment_id,
3352 execution_id,
3353 run_id,
3354 &version,
3355 executor_id,
3356 lock_expires_at,
3357 retry_config,
3358 )
3359 .await?;
3360
3361 tx.commit().await?;
3362 Ok(res)
3363 }
3364
3365 #[instrument(level = Level::DEBUG, skip(self, req))]
3366 async fn append(
3367 &self,
3368 execution_id: ExecutionId,
3369 version: Version,
3370 req: AppendRequest,
3371 ) -> Result<AppendResponse, DbErrorWrite> {
3372 debug!(%req, "append");
3373 trace!(?req, "append");
3374 let created_at = req.created_at;
3375
3376 let mut client_guard = self.client.lock().await;
3377 let tx = client_guard.transaction().await?;
3378
3379 let (new_version, notifier) = append(&tx, &execution_id, req, version).await?;
3380
3381 tx.commit().await?;
3382
3383 drop(client_guard);
3385
3386 self.notify_all(vec![notifier], created_at);
3387 Ok(new_version)
3388 }
3389
3390 #[instrument(level = Level::DEBUG, skip_all)]
3391 async fn append_batch_respond_to_parent(
3392 &self,
3393 events: AppendEventsToExecution,
3394 response: AppendResponseToExecution,
3395 current_time: DateTime<Utc>,
3396 ) -> Result<AppendBatchResponse, DbErrorWrite> {
3397 debug!("append_batch_respond_to_parent");
3398 if events.execution_id == response.parent_execution_id {
3399 return Err(DbErrorWrite::NonRetriable(
3400 DbErrorWriteNonRetriable::ValidationFailed(
3401 "Parameters `execution_id` and `parent_execution_id` cannot be the same".into(),
3402 ),
3403 ));
3404 }
3405 if events.batch.is_empty() {
3406 return Err(DbErrorWrite::NonRetriable(
3407 DbErrorWriteNonRetriable::ValidationFailed("batch cannot be empty".into()),
3408 ));
3409 }
3410
3411 let mut client_guard = self.client.lock().await;
3412 let tx = client_guard.transaction().await?;
3413
3414 let mut version = events.version;
3415 let mut notifiers = Vec::new();
3416
3417 for append_request in events.batch {
3418 let (v, n) = append(&tx, &events.execution_id, append_request, version).await?;
3419 version = v;
3420 notifiers.push(n);
3421 }
3422
3423 let pending_at_parent = append_response(
3424 &tx,
3425 &response.parent_execution_id,
3426 JoinSetResponseEventOuter {
3427 created_at: response.created_at,
3428 event: JoinSetResponseEvent {
3429 join_set_id: response.join_set_id,
3430 event: JoinSetResponse::ChildExecutionFinished {
3431 child_execution_id: response.child_execution_id,
3432 finished_version: response.finished_version,
3433 result: response.result,
3434 },
3435 },
3436 },
3437 )
3438 .await?;
3439 notifiers.push(pending_at_parent);
3440
3441 tx.commit().await?;
3442 drop(client_guard);
3443
3444 self.notify_all(notifiers, current_time);
3445 Ok(version)
3446 }
3447
3448 #[instrument(level = Level::TRACE, skip(self, timeout_fut))]
3449 async fn wait_for_pending_by_ffqn(
3450 &self,
3451 pending_at_or_sooner: DateTime<Utc>,
3452 ffqns: Arc<[FunctionFqn]>,
3453 timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
3454 ) {
3455 let unique_tag: u64 = rand::random();
3456 let (sender, mut receiver) = mpsc::channel(1);
3457 {
3458 let mut pending_subscribers = self.pending_subscribers.lock().unwrap();
3459 for ffqn in ffqns.as_ref() {
3460 pending_subscribers.insert_ffqn(ffqn.clone(), (sender.clone(), unique_tag));
3461 }
3462 }
3463
3464 async {
3465 let mut db_has_pending = false;
3466 {
3467 let mut client_guard = self.client.lock().await;
3469 if let Ok(tx) = client_guard.transaction().await {
3471 if let Ok(res) = get_pending_by_ffqns(
3472 &tx,
3473 1,
3474 pending_at_or_sooner,
3475 &ffqns,
3476 SelectStrategy::Read,
3477 )
3478 .await
3479 && !res.is_empty()
3480 {
3481 db_has_pending = true;
3482 }
3483 let _ = tx.commit().await;
3485 }
3486 }
3487
3488 if db_has_pending {
3489 trace!("Not waiting, database already contains new pending executions");
3490 return;
3491 }
3492
3493 tokio::select! {
3494 _ = receiver.recv() => {
3495 trace!("Received a notification");
3496 }
3497 () = timeout_fut => {
3498 }
3499 }
3500 }
3501 .await;
3502
3503 {
3505 let mut pending_subscribers = self.pending_subscribers.lock().unwrap();
3506 for ffqn in ffqns.as_ref() {
3507 match pending_subscribers.remove_ffqn(ffqn) {
3508 Some((_, tag)) if tag == unique_tag => {}
3509 Some(other) => {
3510 pending_subscribers.insert_ffqn(ffqn.clone(), other);
3511 }
3512 None => {}
3513 }
3514 }
3515 }
3516 }
3517
3518 #[instrument(level = Level::DEBUG, skip(self, timeout_fut))]
3519 async fn wait_for_pending_by_component_digest(
3520 &self,
3521 pending_at_or_sooner: DateTime<Utc>,
3522 component_digest: &ComponentDigest,
3523 timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
3524 ) {
3525 let unique_tag: u64 = rand::random();
3526 let (sender, mut receiver) = mpsc::channel(1);
3527 {
3528 let mut pending_subscribers = self.pending_subscribers.lock().unwrap();
3529 pending_subscribers
3530 .insert_by_component(component_digest.clone(), (sender.clone(), unique_tag));
3531 }
3532
3533 async {
3534 let mut db_has_pending = false;
3535 {
3536 let mut client_guard = self.client.lock().await;
3537 if let Ok(tx) = client_guard.transaction().await {
3538 if let Ok(res) = get_pending_by_component_input_digest(
3539 &tx,
3540 1,
3541 pending_at_or_sooner,
3542 component_digest,
3543 SelectStrategy::Read,
3544 )
3545 .await
3546 && !res.is_empty()
3547 {
3548 db_has_pending = true;
3549 }
3550 let _ = tx.commit().await;
3551 }
3552 }
3553
3554 if db_has_pending {
3555 trace!("Not waiting, database already contains new pending executions");
3556 return;
3557 }
3558
3559 tokio::select! {
3560 _ = receiver.recv() => {
3561 trace!("Received a notification");
3562 }
3563 () = timeout_fut => {
3564 }
3565 }
3566 }
3567 .await;
3568
3569 {
3571 let mut pending_subscribers = self.pending_subscribers.lock().unwrap();
3572 match pending_subscribers.remove_by_component(component_digest) {
3573 Some((_, tag)) if tag == unique_tag => {}
3574 Some(other) => {
3575 pending_subscribers.insert_by_component(component_digest.clone(), other);
3576 }
3577 None => {}
3578 }
3579 }
3580 }
3581
3582 async fn get_last_execution_event(
3583 &self,
3584 execution_id: &ExecutionId,
3585 ) -> Result<ExecutionEvent, DbErrorRead> {
3586 let mut client_guard = self.client.lock().await;
3587 let tx = client_guard.transaction().await?;
3588
3589 let event = get_last_execution_event(&tx, execution_id).await?;
3590
3591 tx.commit().await?;
3592 Ok(event)
3593 }
3594}
3595#[async_trait]
3596impl DbConnection for PostgresConnection {
3597 #[instrument(level = Level::DEBUG, skip_all, fields(execution_id = %req.execution_id))]
3598 async fn create(&self, req: CreateRequest) -> Result<AppendResponse, DbErrorWrite> {
3599 debug!("create");
3600 trace!(?req, "create");
3601 let created_at = req.created_at;
3602
3603 let mut client_guard = self.client.lock().await;
3604 let tx = client_guard.transaction().await?;
3605
3606 let (version, notifier) = create_inner(&tx, req.clone()).await?;
3607
3608 tx.commit().await?;
3609 drop(client_guard); self.notify_all(vec![notifier], created_at);
3612 Ok(version)
3613 }
3614
3615 #[instrument(level = Level::DEBUG, skip(self))]
3616 async fn get(
3617 &self,
3618 execution_id: &ExecutionId,
3619 ) -> Result<concepts::storage::ExecutionLog, DbErrorRead> {
3620 trace!("get");
3621 let mut client_guard = self.client.lock().await;
3622 let tx = client_guard.transaction().await?;
3623
3624 let res = get_execution_log(&tx, execution_id).await?;
3625
3626 tx.commit().await?;
3627 Ok(res)
3628 }
3629
3630 #[instrument(level = Level::DEBUG, skip(self, batch))]
3631 async fn append_batch(
3632 &self,
3633 current_time: DateTime<Utc>,
3634 batch: Vec<AppendRequest>,
3635 execution_id: ExecutionId,
3636 version: Version,
3637 ) -> Result<AppendBatchResponse, DbErrorWrite> {
3638 debug!("append_batch");
3639 trace!(?batch, "append_batch");
3640 assert!(!batch.is_empty(), "Empty batch request");
3641
3642 let mut client_guard = self.client.lock().await;
3643 let tx = client_guard.transaction().await?;
3644
3645 let mut version = version;
3646 let mut notifier = None;
3647
3648 for append_request in batch {
3649 let (v, n) = append(&tx, &execution_id, append_request, version).await?;
3650 version = v;
3651 notifier = Some(n);
3652 }
3653
3654 tx.commit().await?;
3655 drop(client_guard);
3656
3657 self.notify_all(
3658 vec![notifier.expect("checked that the batch is not empty")],
3659 current_time,
3660 );
3661 Ok(version)
3662 }
3663
3664 #[instrument(level = Level::DEBUG, skip_all, fields(%execution_id, %version))]
3665 async fn append_batch_create_new_execution(
3666 &self,
3667 current_time: DateTime<Utc>,
3668 batch: Vec<AppendRequest>,
3669 execution_id: ExecutionId,
3670 version: Version,
3671 child_req: Vec<CreateRequest>,
3672 backtraces: Vec<BacktraceInfo>,
3673 ) -> Result<AppendBatchResponse, DbErrorWrite> {
3674 debug!("append_batch_create_new_execution");
3675 trace!(?batch, ?child_req, "append_batch_create_new_execution");
3676 assert!(!batch.is_empty(), "Empty batch request");
3677
3678 let mut client_guard = self.client.lock().await;
3679 let tx = client_guard.transaction().await?;
3680
3681 let mut version = version;
3682 let mut notifier = None;
3683
3684 for append_request in batch {
3685 let (v, n) = append(&tx, &execution_id, append_request, version).await?;
3686 version = v;
3687 notifier = Some(n);
3688 }
3689
3690 let mut notifiers = Vec::new();
3691 notifiers.push(notifier.expect("checked that the batch is not empty"));
3692
3693 for req in child_req {
3694 let (_, n) = create_inner(&tx, req).await?;
3695 notifiers.push(n);
3696 }
3697 for backtrace in backtraces {
3698 append_backtrace(&tx, &backtrace).await?;
3699 }
3700 tx.commit().await?;
3701 drop(client_guard);
3702
3703 self.notify_all(notifiers, current_time);
3704 Ok(version)
3705 }
3706
3707 #[instrument(level = Level::DEBUG, skip(self, timeout_fut))]
3708 async fn subscribe_to_next_responses(
3709 &self,
3710 execution_id: &ExecutionId,
3711 last_response: ResponseCursor,
3712 timeout_fut: Pin<Box<dyn Future<Output = TimeoutOutcome> + Send>>,
3713 ) -> Result<Vec<ResponseWithCursor>, DbErrorReadWithTimeout> {
3714 debug!("next_responses");
3715 let unique_tag: u64 = rand::random();
3716 let execution_id_clone = execution_id.clone();
3717
3718 let cleanup = || {
3719 let mut guard = self.response_subscribers.lock().unwrap();
3720 match guard.remove(&execution_id_clone) {
3721 Some((_, tag)) if tag == unique_tag => {}
3722 Some(other) => {
3723 guard.insert(execution_id_clone.clone(), other);
3724 }
3725 None => {}
3726 }
3727 };
3728
3729 let receiver = {
3730 let mut client_guard = self.client.lock().await;
3731 let tx = client_guard.transaction().await?;
3732
3733 let (sender, receiver) = oneshot::channel();
3739 self.response_subscribers
3740 .lock()
3741 .unwrap()
3742 .insert(execution_id.clone(), (sender, unique_tag));
3743
3744 let responses = get_responses_after(&tx, execution_id, last_response).await?;
3745
3746 if responses.is_empty() {
3747 tx.commit().await.map_err(|err| {
3749 cleanup(); DbErrorRead::from(err)
3751 })?;
3752 receiver
3753 } else {
3754 cleanup(); tx.commit().await?;
3756 return Ok(responses);
3757 }
3758 };
3759
3760 let res = tokio::select! {
3761 resp = receiver => {
3762 match resp {
3763 Ok(resp) => Ok(vec![resp]),
3764 Err(_) => Err(DbErrorReadWithTimeout::from(DbErrorGeneric::Close)),
3765 }
3766 }
3767 outcome = timeout_fut => Err(DbErrorReadWithTimeout::Timeout(outcome)),
3768 };
3769
3770 cleanup();
3771 res
3772 }
3773
3774 #[instrument(level = Level::DEBUG, skip(self, timeout_fut))]
3775 async fn wait_for_finished_result(
3776 &self,
3777 execution_id: &ExecutionId,
3778 timeout_fut: Option<Pin<Box<dyn Future<Output = TimeoutOutcome> + Send>>>,
3779 ) -> Result<SupportedFunctionReturnValue, DbErrorReadWithTimeout> {
3780 let unique_tag: u64 = rand::random();
3781 let execution_id_clone = execution_id.clone();
3782
3783 let cleanup = || {
3784 let mut guard = self.execution_finished_subscribers.lock().unwrap();
3785 if let Some(subscribers) = guard.get_mut(&execution_id_clone) {
3786 subscribers.remove(&unique_tag);
3787 }
3788 };
3789
3790 let receiver = {
3791 let mut client_guard = self.client.lock().await;
3792 let tx = client_guard.transaction().await?;
3793
3794 let (sender, receiver) = oneshot::channel();
3796 {
3797 let mut guard = self.execution_finished_subscribers.lock().unwrap();
3798 guard
3799 .entry(execution_id.clone())
3800 .or_default()
3801 .insert(unique_tag, sender);
3802 }
3803
3804 let pending_state = get_combined_state(&tx, execution_id)
3805 .await?
3806 .execution_with_state
3807 .pending_state;
3808
3809 if let PendingState::Finished(finished) = pending_state {
3810 let event = get_execution_event(&tx, execution_id, finished.version).await?;
3811 tx.commit().await?;
3812 cleanup();
3813
3814 if let ExecutionRequest::Finished { retval, .. } = event.event {
3815 return Ok(retval);
3816 }
3817 error!("Mismatch, expected Finished row: {event:?} based on t_state {finished}");
3818 return Err(DbErrorReadWithTimeout::from(consistency_db_err(
3819 "cannot get finished event based on t_state version",
3820 )));
3821 }
3822 tx.commit().await?;
3823 receiver
3824 };
3825
3826 let timeout_fut = timeout_fut.unwrap_or_else(|| Box::pin(std::future::pending()));
3827 let res = tokio::select! {
3828 resp = receiver => {
3829 match resp {
3830 Ok(retval) => Ok(retval),
3831 Err(_recv_err) => Err(DbErrorGeneric::Close.into())
3832 }
3833 }
3834 outcome = timeout_fut => Err(DbErrorReadWithTimeout::Timeout(outcome)),
3835 };
3836
3837 cleanup();
3838 res
3839 }
3840
3841 #[instrument(level = Level::DEBUG, skip_all, fields(%join_set_id, %execution_id))]
3842 async fn append_delay_response(
3843 &self,
3844 created_at: DateTime<Utc>,
3845 execution_id: ExecutionId,
3846 join_set_id: JoinSetId,
3847 delay_id: DelayId,
3848 result: Result<(), ()>,
3849 ) -> Result<AppendDelayResponseOutcome, DbErrorWrite> {
3850 debug!("append_delay_response");
3851 let event = JoinSetResponseEventOuter {
3852 created_at,
3853 event: JoinSetResponseEvent {
3854 join_set_id,
3855 event: JoinSetResponse::DelayFinished {
3856 delay_id: delay_id.clone(),
3857 result,
3858 },
3859 },
3860 };
3861
3862 let mut client_guard = self.client.lock().await;
3863 let tx = client_guard.transaction().await?;
3864
3865 let res = append_response(&tx, &execution_id, event).await;
3866
3867 match res {
3868 Ok(notifier) => {
3869 tx.commit().await?;
3870 drop(client_guard);
3871 self.notify_all(vec![notifier], created_at);
3872 Ok(AppendDelayResponseOutcome::Success)
3873 }
3874 Err(DbErrorWrite::NonRetriable(DbErrorWriteNonRetriable::Conflict)) => {
3875 tx.rollback().await?;
3878
3879 let tx = client_guard.transaction().await?;
3881 let delay_success = delay_response(&tx, &execution_id, &delay_id).await?;
3882 tx.commit().await?;
3883
3884 match delay_success {
3885 Some(true) => Ok(AppendDelayResponseOutcome::AlreadyFinished),
3886 Some(false) => Ok(AppendDelayResponseOutcome::AlreadyCancelled),
3887 None => Err(DbErrorWrite::Generic(consistency_db_err(
3888 "insert failed yet select did not find the response",
3889 ))),
3890 }
3891 }
3892 Err(err) => {
3893 let _ = tx.rollback().await; Err(err)
3895 }
3896 }
3897 }
3898
3899 #[instrument(level = Level::DEBUG, skip_all)]
3900 async fn append_backtrace(&self, append: BacktraceInfo) -> Result<(), DbErrorWrite> {
3901 debug!("append_backtrace");
3902 let mut client_guard = self.client.lock().await;
3903 let tx = client_guard.transaction().await?;
3904
3905 append_backtrace(&tx, &append).await?;
3906
3907 tx.commit().await?;
3908 Ok(())
3909 }
3910
3911 #[instrument(level = Level::DEBUG, skip_all)]
3912 async fn append_backtrace_batch(&self, batch: Vec<BacktraceInfo>) -> Result<(), DbErrorWrite> {
3913 debug!("append_backtrace_batch");
3914 let mut client_guard = self.client.lock().await;
3915 let tx = client_guard.transaction().await?;
3916
3917 for append in batch {
3918 append_backtrace(&tx, &append).await?;
3919 }
3920
3921 tx.commit().await?;
3922 Ok(())
3923 }
3924
3925 #[instrument(level = Level::DEBUG, skip_all)]
3926 async fn append_log(&self, row: LogInfoAppendRow) -> Result<(), DbErrorWrite> {
3927 trace!("append_log");
3928 let mut client_guard = self.client.lock().await;
3929 let tx = client_guard.transaction().await?;
3930 append_log(&tx, &row).await?;
3931 tx.commit().await?;
3932
3933 Ok(())
3934 }
3935
3936 #[instrument(level = Level::DEBUG, skip_all)]
3937 async fn append_log_batch(&self, batch: &[LogInfoAppendRow]) -> Result<(), DbErrorWrite> {
3938 trace!("append_log_batch");
3939 let mut client_guard = self.client.lock().await;
3940 let tx = client_guard.transaction().await?;
3941 for row in batch {
3942 append_log(&tx, row).await?;
3943 }
3944 tx.commit().await?;
3945 Ok(())
3946 }
3947
3948 #[instrument(level = Level::TRACE, skip(self))]
3950 async fn get_expired_timers(
3951 &self,
3952 at: DateTime<Utc>,
3953 ) -> Result<Vec<ExpiredTimer>, DbErrorGeneric> {
3954 let mut client_guard = self.client.lock().await;
3955 let tx = client_guard.transaction().await?;
3956
3957 let rows = tx
3959 .query(
3960 "SELECT execution_id, join_set_id, delay_id FROM t_delay WHERE expires_at <= $1",
3961 &[&at],
3962 )
3963 .await?;
3964
3965 let mut expired_timers = Vec::with_capacity(rows.len());
3966 for row in rows {
3967 let unpack = || -> Result<ExpiredTimer, DbErrorGeneric> {
3968 let execution_id: String = get(&row, "execution_id")?;
3969 let execution_id = ExecutionId::from_str(&execution_id)?;
3970 let join_set_id: String = get(&row, "join_set_id")?;
3971 let join_set_id = JoinSetId::from_str(&join_set_id)?;
3972 let delay_id: String = get(&row, "delay_id")?;
3973 let delay_id = DelayId::from_str(&delay_id)?;
3974
3975 Ok(ExpiredTimer::Delay(ExpiredDelay {
3976 execution_id,
3977 join_set_id,
3978 delay_id,
3979 }))
3980 };
3981
3982 match unpack() {
3983 Ok(timer) => expired_timers.push(timer),
3984 Err(err) => warn!("Skipping corrupted row in get_expired_timers (delays): {err:?}"),
3985 }
3986 }
3987
3988 let rows = tx.query(
3990 &format!(
3991 "SELECT execution_id, last_lock_version, corresponding_version, intermittent_event_count, max_retries, retry_exp_backoff_millis, executor_id, run_id \
3992 FROM t_state \
3993 WHERE pending_expires_finished <= $1 AND state = '{STATE_LOCKED}'"
3994 ),
3995 &[&at]
3996 ).await?;
3997
3998 for row in rows {
3999 let unpack = || -> Result<ExpiredTimer, DbErrorGeneric> {
4000 let execution_id: String = get(&row, "execution_id")?;
4001 let execution_id = ExecutionId::from_str(&execution_id)?;
4002 let last_lock_version: i64 = get(&row, "last_lock_version")?;
4003 let last_lock_version = Version::try_from(last_lock_version)?;
4004
4005 let corresponding_version: i64 = get(&row, "corresponding_version")?;
4006 let corresponding_version = Version::try_from(corresponding_version)?;
4007
4008 let intermittent_event_count =
4009 u32::try_from(get::<i64, _>(&row, "intermittent_event_count")?).map_err(
4010 |_| consistency_db_err("`intermittent_event_count` must not be negative"),
4011 )?;
4012
4013 let max_retries = get::<Option<i64>, _>(&row, "max_retries")?
4014 .map(u32::try_from)
4015 .transpose()
4016 .map_err(|_| consistency_db_err("`max_retries` must not be negative"))?;
4017 let retry_exp_backoff_millis =
4018 u32::try_from(get::<i64, _>(&row, "retry_exp_backoff_millis")?).map_err(
4019 |_| consistency_db_err("`retry_exp_backoff_millis` must not be negative"),
4020 )?;
4021 let executor_id: String = get(&row, "executor_id")?;
4022 let executor_id = ExecutorId::from_str(&executor_id)?;
4023 let run_id: String = get(&row, "run_id")?;
4024 let run_id = RunId::from_str(&run_id)?;
4025
4026 Ok(ExpiredTimer::Lock(ExpiredLock {
4027 execution_id,
4028 locked_at_version: last_lock_version,
4029 next_version: corresponding_version.increment(),
4030 intermittent_event_count,
4031 max_retries,
4032 retry_exp_backoff: Duration::from_millis(u64::from(retry_exp_backoff_millis)),
4033 locked_by: LockedBy {
4034 executor_id,
4035 run_id,
4036 },
4037 }))
4038 };
4039
4040 match unpack() {
4041 Ok(timer) => expired_timers.push(timer),
4042 Err(err) => warn!("Skipping corrupted row in get_expired_timers (locks): {err:?}"),
4043 }
4044 }
4045
4046 tx.commit().await?;
4047
4048 if !expired_timers.is_empty() {
4049 debug!("get_expired_timers found {expired_timers:?}");
4050 }
4051 Ok(expired_timers)
4052 }
4053
4054 async fn get_execution_event(
4055 &self,
4056 execution_id: &ExecutionId,
4057 version: &Version,
4058 ) -> Result<ExecutionEvent, DbErrorRead> {
4059 let mut client_guard = self.client.lock().await;
4060 let tx = client_guard.transaction().await?;
4061
4062 let event = get_execution_event(&tx, execution_id, version.0).await?;
4063
4064 tx.commit().await?;
4065 Ok(event)
4066 }
4067
4068 async fn get_pending_state(
4069 &self,
4070 execution_id: &ExecutionId,
4071 ) -> Result<ExecutionWithState, DbErrorRead> {
4072 let mut client_guard = self.client.lock().await;
4073 let tx = client_guard.transaction().await?;
4074
4075 let combined_state = get_combined_state(&tx, execution_id).await?;
4076
4077 tx.commit().await?;
4078 Ok(combined_state.execution_with_state)
4079 }
4080}
4081
4082#[async_trait]
4083impl DbExternalApi for PostgresConnection {
4084 #[instrument(skip(self))]
4085 async fn get_backtrace(
4086 &self,
4087 execution_id: &ExecutionId,
4088 filter: BacktraceFilter,
4089 ) -> Result<BacktraceInfo, DbErrorRead> {
4090 debug!("get_backtrace");
4091
4092 let mut client_guard = self.client.lock().await;
4093 let tx = client_guard.transaction().await?;
4094
4095 let mut params: Vec<Box<dyn tokio_postgres::types::ToSql + Sync + Send>> = Vec::new();
4096
4097 params.push(Box::new(execution_id.to_string())); let p_execution_id_idx = format!("${}", params.len()); let mut sql = String::new();
4101 write!(
4102 &mut sql,
4103 "SELECT component_id, version_min_including, version_max_excluding, wasm_backtrace \
4104 FROM t_execution_backtrace e INNER JOIN t_wasm_backtrace w ON e.backtrace_hash = w.backtrace_hash \
4105 WHERE execution_id = {p_execution_id_idx}"
4106 )
4107 .unwrap();
4108
4109 match &filter {
4110 BacktraceFilter::Specific(version) => {
4111 params.push(Box::new(i64::from(version.0))); let p_ver_idx = format!("${}", params.len()); write!(
4114 &mut sql,
4115 " AND version_min_including <= {p_ver_idx} AND version_max_excluding > {p_ver_idx}"
4116 )
4117 .unwrap();
4118 }
4119 BacktraceFilter::First => {
4120 sql.push_str(" ORDER BY version_min_including LIMIT 1");
4121 }
4122 BacktraceFilter::Last => {
4123 sql.push_str(" ORDER BY version_min_including DESC LIMIT 1");
4124 }
4125 }
4126
4127 let params_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> =
4128 params.iter().map(|p| p.as_ref() as _).collect();
4129
4130 let row = tx.query_one(&sql, ¶ms_refs).await?;
4131
4132 let component_id: Json<ComponentId> = get(&row, "component_id")?;
4133 let component_id = component_id.0;
4134
4135 let version_min_including =
4136 Version::try_from(get::<i64, _>(&row, "version_min_including")?)?;
4137
4138 let version_max_excluding =
4139 Version::try_from(get::<i64, _>(&row, "version_max_excluding")?)?;
4140
4141 let wasm_backtrace: Json<WasmBacktrace> = get(&row, "wasm_backtrace")?;
4143 let wasm_backtrace = wasm_backtrace.0;
4144
4145 tx.commit().await?;
4146
4147 Ok(BacktraceInfo {
4148 execution_id: execution_id.clone(),
4149 component_id,
4150 version_min_including,
4151 version_max_excluding,
4152 wasm_backtrace,
4153 })
4154 }
4155
4156 #[instrument(skip_all)]
4157 async fn upsert_source_file(
4158 &self,
4159 component_digest: &ComponentDigest,
4160 frame_key: &str,
4161 is_suffix: bool,
4162 content: &str,
4163 ) -> Result<(), DbErrorWrite> {
4164 let content_hash: [u8; 32] = Sha256::digest(content.as_bytes()).into();
4165 let mut client_guard = self.client.lock().await;
4166 let tx = client_guard.transaction().await?;
4167 tx.execute(
4168 "INSERT INTO t_source_file (content_hash, content) \
4169 VALUES ($1, $2) \
4170 ON CONFLICT (content_hash) DO NOTHING",
4171 &[&content_hash.as_slice(), &content],
4172 )
4173 .await?;
4174 tx.execute(
4175 "INSERT INTO t_component_source \
4176 (component_digest, frame_key, is_suffix, content_hash) \
4177 VALUES ($1, $2, $3, $4) \
4178 ON CONFLICT (component_digest, frame_key, is_suffix) DO NOTHING",
4179 &[
4180 &component_digest.as_slice(),
4181 &frame_key,
4182 &is_suffix,
4183 &content_hash.as_slice(),
4184 ],
4185 )
4186 .await?;
4187 tx.commit().await?;
4188 Ok(())
4189 }
4190
4191 #[instrument(skip_all)]
4192 async fn get_source_file(
4193 &self,
4194 component_digest: &ComponentDigest,
4195 file: &str,
4196 ) -> Result<Option<String>, DbErrorRead> {
4197 let mut client_guard = self.client.lock().await;
4198 let tx = client_guard.transaction().await?;
4199 let rows = tx
4200 .query(
4201 "SELECT s.content \
4202 FROM t_component_source cs \
4203 JOIN t_source_file s ON cs.content_hash = s.content_hash \
4204 WHERE cs.component_digest = $1 \
4205 AND ( \
4206 (NOT cs.is_suffix AND cs.frame_key = $2) \
4207 OR (cs.is_suffix AND right($2, length(cs.frame_key)) = cs.frame_key) \
4208 )",
4209 &[&component_digest.as_slice(), &file],
4210 )
4211 .await?;
4212 tx.commit().await?;
4213 match rows.len() {
4214 0 => Ok(None),
4215 1 => Ok(Some(get::<String, _>(&rows[0], "content")?)),
4216 _ => {
4217 warn!("Multiple suffix matches for '{file}', returning None");
4218 Ok(None)
4219 }
4220 }
4221 }
4222
4223 #[instrument(skip(self))]
4224 async fn list_executions(
4225 &self,
4226 filter: ListExecutionsFilter,
4227 pagination: ExecutionListPagination,
4228 ) -> Result<Vec<ExecutionWithState>, DbErrorGeneric> {
4229 let mut client_guard = self.client.lock().await;
4230 let tx = client_guard.transaction().await?;
4231
4232 let result = list_executions(&tx, filter, &pagination).await?;
4233
4234 tx.commit().await?;
4235 Ok(result)
4236 }
4237
4238 #[instrument(skip(self))]
4239 async fn list_execution_events(
4240 &self,
4241 execution_id: &ExecutionId,
4242 pagination: Pagination<VersionType>,
4243 include_backtrace_id: bool,
4244 ) -> Result<ListExecutionEventsResponse, DbErrorRead> {
4245 let mut client_guard = self.client.lock().await;
4246 let tx = client_guard.transaction().await?;
4247
4248 let events =
4249 list_execution_events(&tx, execution_id, pagination, include_backtrace_id).await?;
4250 let max_version = get_max_version(&tx, execution_id).await?;
4251
4252 tx.commit().await?;
4253 Ok(ListExecutionEventsResponse {
4254 events,
4255 max_version,
4256 })
4257 }
4258
4259 #[instrument(skip(self))]
4260 async fn list_responses(
4261 &self,
4262 execution_id: &ExecutionId,
4263 pagination: Pagination<u32>,
4264 ) -> Result<ListResponsesResponse, DbErrorRead> {
4265 let mut client_guard = self.client.lock().await;
4266 let tx = client_guard.transaction().await?;
4267
4268 let responses = list_responses(&tx, execution_id, Some(pagination)).await?;
4269 let max_cursor = get_max_response_cursor(&tx, execution_id).await?;
4270
4271 tx.commit().await?;
4272 Ok(ListResponsesResponse {
4273 responses,
4274 max_cursor,
4275 })
4276 }
4277
4278 #[instrument(skip(self))]
4279 async fn list_execution_events_responses(
4280 &self,
4281 execution_id: &ExecutionId,
4282 req_since: &Version,
4283 req_max_length: VersionType,
4284 req_include_backtrace_id: bool,
4285 resp_pagination: Pagination<u32>,
4286 ) -> Result<ExecutionWithStateRequestsResponses, DbErrorRead> {
4287 let mut client_guard = self.client.lock().await;
4288 let tx = client_guard.transaction().await?;
4289
4290 let combined_state = get_combined_state(&tx, execution_id).await?;
4291
4292 let events = list_execution_events(
4293 &tx,
4294 execution_id,
4295 Pagination::NewerThan {
4296 length: req_max_length
4297 .try_into()
4298 .expect("req_max_length fits in u16"),
4299 cursor: req_since.0,
4300 including_cursor: true,
4301 },
4302 req_include_backtrace_id,
4303 )
4304 .await?;
4305
4306 let responses = list_responses(&tx, execution_id, Some(resp_pagination)).await?;
4307 let max_version = get_max_version(&tx, execution_id).await?;
4308 let max_cursor = get_max_response_cursor(&tx, execution_id).await?;
4309
4310 tx.commit().await?;
4311
4312 Ok(ExecutionWithStateRequestsResponses {
4313 execution_with_state: combined_state.execution_with_state,
4314 events,
4315 responses,
4316 max_version,
4317 max_cursor,
4318 })
4319 }
4320
4321 #[instrument(skip(self))]
4322 async fn upgrade_execution_component(
4323 &self,
4324 execution_id: &ExecutionId,
4325 old: &ComponentDigest,
4326 new: &ComponentDigest,
4327 ) -> Result<(), DbErrorWrite> {
4328 let mut client_guard = self.client.lock().await;
4329 let tx = client_guard.transaction().await?;
4330
4331 upgrade_execution_component(&tx, execution_id, old, new).await?;
4332
4333 tx.commit().await?;
4334 Ok(())
4335 }
4336
4337 #[instrument(skip(self))]
4338 async fn list_logs(
4339 &self,
4340 execution_id: &ExecutionId,
4341 filter: LogFilter,
4342 pagination: Pagination<u32>,
4343 ) -> Result<ListLogsResponse, DbErrorRead> {
4344 let mut client_guard = self.client.lock().await;
4345 let tx = client_guard.transaction().await?;
4346 let responses = list_logs_tx(&tx, execution_id, &filter, &pagination).await?;
4347 tx.commit().await?;
4348 Ok(responses)
4349 }
4350
4351 #[instrument(skip(self))]
4352 async fn list_deployment_states(
4353 &self,
4354 current_time: DateTime<Utc>,
4355 pagination: Pagination<Option<DeploymentId>>,
4356 include_config_json: bool,
4357 ) -> Result<Vec<DeploymentState>, DbErrorRead> {
4358 let mut client_guard = self.client.lock().await;
4359 let tx = client_guard.transaction().await?;
4360 let deployments =
4361 list_deployment_states(&tx, current_time, pagination, include_config_json).await?;
4362 tx.commit().await?;
4363 Ok(deployments)
4364 }
4365
4366 #[instrument(skip(self))]
4367 async fn insert_deployment(&self, record: DeploymentRecord) -> Result<(), DbErrorWrite> {
4368 assert_eq!(
4369 record.status,
4370 DeploymentStatus::Inactive,
4371 "insert_deployment requires Inactive status"
4372 );
4373 assert!(
4374 record.last_active_at.is_none(),
4375 "insert_deployment requires last_active_at == None"
4376 );
4377 let mut client_guard = self.client.lock().await;
4378 let tx = client_guard.transaction().await?;
4379 tx.execute(
4380 "INSERT INTO t_deployment \
4381 (deployment_id, created_at, status, config_json, obelisk_version, created_by) \
4382 VALUES ($1, $2, $3, $4, $5, $6)",
4383 &[
4384 &record.deployment_id.to_string(), &record.created_at, &record.status.as_str(), &record.config_json, &record.obelisk_version, &record.created_by, ],
4391 )
4392 .await?;
4393 tx.commit().await?;
4394 Ok(())
4395 }
4396
4397 #[instrument(skip(self))]
4398 async fn activate_deployment(
4399 &self,
4400 deployment_id: DeploymentId,
4401 now: DateTime<Utc>,
4402 ) -> Result<(), DbErrorWrite> {
4403 let mut client_guard = self.client.lock().await;
4404 let tx = client_guard.transaction().await?;
4405 tx.execute(
4407 "UPDATE t_deployment SET status = 'inactive' WHERE status IN ('active', 'enqueued')",
4408 &[],
4409 )
4410 .await?;
4411 let rows = tx
4413 .execute(
4414 "UPDATE t_deployment SET status = 'active', last_active_at = $1 WHERE deployment_id = $2",
4415 &[&now, &deployment_id.to_string()],
4416 )
4417 .await?;
4418 tx.commit().await?;
4419 if rows == 0 {
4420 return Err(DbErrorWrite::NotFound);
4421 }
4422 Ok(())
4423 }
4424
4425 async fn enqueue_deployment(&self, deployment_id: DeploymentId) -> Result<(), DbErrorWrite> {
4426 let mut client_guard = self.client.lock().await;
4427 let tx = client_guard.transaction().await?;
4428 let status_opt = tx
4430 .query_opt(
4431 "SELECT status FROM t_deployment WHERE deployment_id = $1",
4432 &[&deployment_id.to_string()],
4433 )
4434 .await?;
4435 match status_opt.as_ref().map(|r| r.get::<_, &str>("status")) {
4436 None => return Err(DbErrorWrite::NotFound),
4437 Some("active") => return Err(DbErrorWriteNonRetriable::Conflict.into()),
4438 _ => {}
4439 }
4440 tx.execute(
4442 "UPDATE t_deployment SET status = 'inactive' WHERE status = 'enqueued'",
4443 &[],
4444 )
4445 .await?;
4446 let rows = tx
4448 .execute(
4449 "UPDATE t_deployment SET status = 'enqueued' WHERE deployment_id = $1",
4450 &[&deployment_id.to_string()],
4451 )
4452 .await?;
4453 tx.commit().await?;
4454 if rows == 0 {
4455 return Err(DbErrorWrite::NotFound);
4456 }
4457 Ok(())
4458 }
4459
4460 #[instrument(skip(self))]
4461 async fn get_deployment(
4462 &self,
4463 deployment_id: DeploymentId,
4464 ) -> Result<Option<DeploymentRecord>, DbErrorRead> {
4465 let mut client_guard = self.client.lock().await;
4466 let tx = client_guard.transaction().await?;
4467 let row = tx
4468 .query_opt(
4469 "SELECT deployment_id, created_at, last_active_at, status, config_json, obelisk_version, created_by \
4470 FROM t_deployment WHERE deployment_id = $1",
4471 &[&deployment_id.to_string()],
4472 )
4473 .await?;
4474 tx.commit().await?;
4475 match row {
4476 None => Ok(None),
4477 Some(r) => Ok(Some(deployment_record_from_pg_row(&r)?)),
4478 }
4479 }
4480
4481 #[instrument(skip(self))]
4482 #[cfg(feature = "test")]
4483 async fn get_active_deployment(&self) -> Result<Option<DeploymentRecord>, DbErrorRead> {
4484 let mut client_guard = self.client.lock().await;
4485 let tx = client_guard.transaction().await?;
4486 let row = tx
4487 .query_opt(
4488 "SELECT deployment_id, created_at, last_active_at, status, config_json, obelisk_version, created_by \
4489 FROM t_deployment WHERE status = 'active' LIMIT 1",
4490 &[],
4491 )
4492 .await?;
4493 tx.commit().await?;
4494 match row {
4495 None => Ok(None),
4496 Some(r) => Ok(Some(deployment_record_from_pg_row(&r)?)),
4497 }
4498 }
4499
4500 async fn get_current_deployment(&self) -> Result<Option<DeploymentRecord>, DbErrorRead> {
4501 let mut client_guard = self.client.lock().await;
4502 let tx = client_guard.transaction().await?;
4503 let row = tx
4504 .query_opt(
4505 "SELECT deployment_id, created_at, last_active_at, status, config_json, obelisk_version, created_by \
4506 FROM t_deployment WHERE status IN ('enqueued', 'active') \
4507 ORDER BY CASE status WHEN 'enqueued' THEN 0 ELSE 1 END LIMIT 1",
4508 &[],
4509 )
4510 .await?;
4511 tx.commit().await?;
4512 match row {
4513 None => Ok(None),
4514 Some(r) => Ok(Some(deployment_record_from_pg_row(&r)?)),
4515 }
4516 }
4517
4518 #[instrument(skip(self))]
4519 async fn list_deployments(
4520 &self,
4521 pagination: Pagination<Option<DeploymentId>>,
4522 ) -> Result<Vec<DeploymentRecord>, DbErrorRead> {
4523 let mut client_guard = self.client.lock().await;
4524 let tx = client_guard.transaction().await?;
4525 let mut params: Vec<Box<dyn tokio_postgres::types::ToSql + Sync + Send>> = Vec::new();
4526 let mut add_param = |p: Box<dyn tokio_postgres::types::ToSql + Sync + Send>| {
4527 params.push(p);
4528 format!("${}", params.len())
4529 };
4530
4531 let mut sql = String::from(
4532 "SELECT deployment_id, created_at, last_active_at, status, config_json, obelisk_version, created_by \
4533 FROM t_deployment",
4534 );
4535
4536 if let Some(cursor) = pagination.cursor() {
4537 let p_cursor = add_param(Box::new(cursor.to_string()));
4538 write!(
4539 sql,
4540 " WHERE deployment_id {rel} {p_cursor}",
4541 rel = pagination.rel()
4542 )
4543 .expect("writing to string");
4544 }
4545
4546 let (inner_order, outer_order) = if pagination.is_desc() {
4547 ("DESC", "")
4548 } else {
4549 ("ASC", "DESC")
4550 };
4551
4552 write!(
4553 sql,
4554 " ORDER BY deployment_id {inner_order} LIMIT {limit}",
4555 limit = pagination.length()
4556 )
4557 .expect("writing to string");
4558
4559 let final_sql = if outer_order.is_empty() {
4560 sql
4561 } else {
4562 format!("SELECT * FROM ({sql}) AS sub ORDER BY deployment_id {outer_order}")
4563 };
4564
4565 let params_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> =
4566 params.iter().map(|p| p.as_ref() as _).collect();
4567
4568 let rows = tx.query(&final_sql, ¶ms_refs).await?;
4569 tx.commit().await?;
4570
4571 rows.iter()
4572 .map(deployment_record_from_pg_row)
4573 .collect::<Result<Vec<_>, _>>()
4574 }
4575
4576 #[instrument(skip(self))]
4577 async fn pause_execution(
4578 &self,
4579 execution_id: &ExecutionId,
4580 paused_at: DateTime<Utc>,
4581 ) -> Result<AppendResponse, DbErrorWrite> {
4582 let mut client_guard = self.client.lock().await;
4583 let tx = client_guard.transaction().await?;
4584
4585 let combined_state = get_combined_state(&tx, execution_id).await?;
4586 let appending_version = combined_state.get_next_version_fail_if_finished()?;
4587 debug!("Pausing with {appending_version}");
4588 let (next_version, _) = append(
4589 &tx,
4590 execution_id,
4591 AppendRequest {
4592 created_at: paused_at,
4593 event: ExecutionRequest::Paused,
4594 },
4595 appending_version,
4596 )
4597 .await?;
4598
4599 tx.commit().await?;
4600 Ok(next_version)
4601 }
4602
4603 #[instrument(skip(self))]
4604 async fn unpause_execution(
4605 &self,
4606 execution_id: &ExecutionId,
4607 unpaused_at: DateTime<Utc>,
4608 ) -> Result<AppendResponse, DbErrorWrite> {
4609 let mut client_guard = self.client.lock().await;
4610 let tx = client_guard.transaction().await?;
4611
4612 let combined_state = get_combined_state(&tx, execution_id).await?;
4613 let appending_version = combined_state.get_next_version_fail_if_finished()?;
4614 debug!("Unpausing with {appending_version}");
4615 let (next_version, _) = append(
4616 &tx,
4617 execution_id,
4618 AppendRequest {
4619 created_at: unpaused_at,
4620 event: ExecutionRequest::Unpaused,
4621 },
4622 appending_version,
4623 )
4624 .await?;
4625
4626 tx.commit().await?;
4627 Ok(next_version)
4628 }
4629}
4630
4631#[async_trait]
4632impl DbPoolCloseable for PostgresPool {
4633 async fn close(&self) {
4634 self.pool.close();
4635 }
4636}
4637
4638#[cfg(feature = "test")]
4639#[async_trait]
4640impl concepts::storage::DbConnectionTest for PostgresConnection {
4641 #[instrument(level = Level::DEBUG, skip(self, response_event), fields(join_set_id = %response_event.join_set_id))]
4642 async fn append_response(
4643 &self,
4644 created_at: DateTime<Utc>,
4645 execution_id: ExecutionId,
4646 response_event: JoinSetResponseEvent,
4647 ) -> Result<(), DbErrorWrite> {
4648 debug!("append_response");
4649 let event = JoinSetResponseEventOuter {
4650 created_at,
4651 event: response_event,
4652 };
4653
4654 let mut client_guard = self.client.lock().await;
4655 let tx = client_guard.transaction().await?;
4656
4657 let notifier = append_response(&tx, &execution_id, event).await?;
4658
4659 tx.commit().await?;
4660 drop(client_guard);
4661
4662 self.notify_all(vec![notifier], created_at);
4663 Ok(())
4664 }
4665}
4666
4667#[cfg(feature = "test")]
4668impl PostgresPool {
4669 pub async fn drop_database(&self) {
4670 let mut cfg = deadpool_postgres::Config::new();
4671 cfg.host = Some(self.config.host.clone());
4672 cfg.user = Some(self.config.user.clone());
4673 cfg.password = Some(self.config.password.expose_secret().to_string());
4674 cfg.dbname = Some(ADMIN_DB_NAME.into());
4675 cfg.manager = Some(ManagerConfig {
4676 recycling_method: RecyclingMethod::Fast,
4677 });
4678
4679 let pool = cfg
4680 .create_pool(None, NoTls)
4681 .map_err(|err| {
4682 error!("Cannot create the default pool - {err:?}");
4683 InitializationError
4684 })
4685 .unwrap();
4686
4687 let client = pool
4688 .get()
4689 .await
4690 .map_err(|err| {
4691 error!("Cannot get a connection from the default pool - {err:?}");
4692 InitializationError
4693 })
4694 .unwrap();
4695 for _ in 0..3 {
4696 let res = client
4697 .execute(&format!("DROP DATABASE {}", self.config.db_name), &[])
4698 .await; if res.is_ok() {
4700 debug!("Database '{}' dropped.", self.config.db_name);
4701 return;
4702 }
4703 debug!("Dropping db failed - {res:?}",);
4704 }
4705 warn!("Did not drop database {}", self.config.db_name);
4706 }
4707}