1use crate::postgres_dao::ddl::{ADMIN_DB_NAME, T_METADATA_EXPECTED_SCHEMA_VERSION};
2use async_trait::async_trait;
3use chrono::{DateTime, Utc};
4use concepts::{
5 ComponentId, ComponentRetryConfig, ComponentType, ExecutionId, FunctionFqn, JoinSetId,
6 StrVariant, SupportedFunctionReturnValue,
7 component_id::{ComponentDigest, Digest},
8 prefixed_ulid::{DelayId, DeploymentId, ExecutionIdDerived, ExecutorId, RunId},
9 storage::{
10 AppendBatchResponse, AppendDelayResponseOutcome, AppendEventsToExecution, AppendRequest,
11 AppendResponse, AppendResponseToExecution, BacktraceFilter, BacktraceInfo, CreateRequest,
12 DUMMY_CREATED, DUMMY_HISTORY_EVENT, DbConnection, DbErrorGeneric, DbErrorRead,
13 DbErrorReadWithTimeout, DbErrorWrite, DbErrorWriteNonRetriable, DbExecutor, DbExternalApi,
14 DbPool, DbPoolCloseable, DeploymentRecord, DeploymentState, DeploymentStatus,
15 ExecutionEvent, ExecutionListPagination, ExecutionRequest, ExecutionWithState,
16 ExecutionWithStateRequestsResponses, ExpiredDelay, ExpiredLock, ExpiredTimer,
17 HISTORY_EVENT_TYPE_JOIN_NEXT, HistoryEvent, JoinSetRequest, JoinSetResponse,
18 JoinSetResponseEvent, JoinSetResponseEventOuter, ListExecutionEventsResponse,
19 ListExecutionsFilter, ListLogsResponse, ListResponsesResponse, LockPendingResponse, Locked,
20 LockedBy, LockedExecution, LogEntry, LogEntryRow, LogFilter, LogInfoAppendRow, LogLevel,
21 LogStreamType, Pagination, PendingState, PendingStateBlockedByJoinSet,
22 PendingStateFinishedResultKind, PendingStateMergedPause, ResponseCursor,
23 ResponseWithCursor, STATE_BLOCKED_BY_JOIN_SET, STATE_FINISHED, STATE_LOCKED,
24 STATE_PENDING_AT, TimeoutOutcome, Version, VersionType, WasmBacktrace,
25 },
26};
27use db_common::{
28 AppendNotifier, CombinedState, CombinedStateDTO, NotifierExecutionFinished, NotifierPendingAt,
29 PendingFfqnSubscribersHolder,
30};
31use deadpool_postgres::{Client, ManagerConfig, Pool, RecyclingMethod};
32use hashbrown::HashMap;
33use secrecy::{ExposeSecret as _, SecretString};
34use sha2::{Digest as _, Sha256};
35use std::{collections::VecDeque, pin::Pin, str::FromStr as _, sync::Arc, time::Duration};
36use std::{fmt::Write as _, panic::Location};
37use strum::IntoEnumIterator as _;
38use tokio::sync::{mpsc, oneshot};
39use tokio_postgres::{
40 NoTls, Row, Transaction,
41 row::RowIndex,
42 types::{FromSql, Json, ToSql},
43};
44use tracing::{Level, debug, error, info, instrument, trace, warn};
45use tracing_error::SpanTrace;
46
47#[track_caller]
48fn get<'a, T: FromSql<'a>, I: RowIndex + std::fmt::Display + Copy>(
49 row: &'a Row,
50 name: I,
51) -> Result<T, DbErrorGeneric> {
52 match row.try_get(name) {
53 Ok(ok) => Ok(ok),
54 Err(err) => {
55 Err(consistency_db_err(format!(
57 "Failed to retrieve column '{name}': {err:?}"
58 )))
59 }
60 }
61}
62
63mod ddl {
64 use super::{STATE_LOCKED, STATE_PENDING_AT};
65 use concepts::storage::HISTORY_EVENT_TYPE_JOIN_NEXT;
66 use const_format::formatcp;
67
68 pub const ADMIN_DB_NAME: &str = "postgres";
69
70 pub const T_METADATA_EXPECTED_SCHEMA_VERSION: i32 = 3;
71
72 pub const CREATE_TABLE_T_METADATA: &str = r"
74CREATE TABLE IF NOT EXISTS t_metadata (
75 id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
76 schema_version INTEGER NOT NULL,
77 created_at TIMESTAMPTZ NOT NULL
78);
79";
80
81 pub const CREATE_TABLE_T_EXECUTION_LOG: &str = r"
83CREATE TABLE IF NOT EXISTS t_execution_log (
84 execution_id TEXT NOT NULL,
85 created_at TIMESTAMPTZ NOT NULL,
86 json_value JSON NOT NULL,
87 version BIGINT NOT NULL CHECK (version >= 0),
88 variant TEXT NOT NULL,
89 join_set_id TEXT,
90 history_event_type TEXT GENERATED ALWAYS AS (json_value #>> '{history_event,event,type}') STORED,
91 PRIMARY KEY (execution_id, version)
92);
93";
94
95 pub const CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_VERSION: &str = r"
97CREATE INDEX IF NOT EXISTS idx_t_execution_log_execution_id_version ON t_execution_log (execution_id, version);
98";
99
100 pub const CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_VARIANT: &str = r"
101CREATE INDEX IF NOT EXISTS idx_t_execution_log_execution_id_variant ON t_execution_log (execution_id, variant);
102";
103
104 pub const CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_JOIN_SET: &str = const_format::formatcp!(
105 "CREATE INDEX IF NOT EXISTS idx_t_execution_log_execution_id_join_set ON t_execution_log
106 (execution_id, join_set_id, history_event_type) WHERE history_event_type='{}';",
107 HISTORY_EVENT_TYPE_JOIN_NEXT
108 );
109
110 pub const CREATE_TABLE_T_JOIN_SET_RESPONSE: &str = r"
112CREATE TABLE IF NOT EXISTS t_join_set_response (
113 id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
114 created_at TIMESTAMPTZ NOT NULL,
115 execution_id TEXT NOT NULL,
116 join_set_id TEXT NOT NULL,
117
118 delay_id TEXT,
119 delay_success BOOLEAN,
120
121 child_execution_id TEXT,
122 finished_version BIGINT CHECK (finished_version >= 0),
123
124 UNIQUE (execution_id, join_set_id, delay_id, child_execution_id)
125);
126";
127
128 pub const CREATE_INDEX_IDX_T_JOIN_SET_RESPONSE_EXECUTION_ID_ID: &str = r"
130CREATE INDEX IF NOT EXISTS idx_t_join_set_response_execution_id_id ON t_join_set_response (execution_id, id);
131";
132
133 pub const CREATE_INDEX_IDX_JOIN_SET_RESPONSE_UNIQUE_CHILD_ID: &str = r"
134CREATE UNIQUE INDEX IF NOT EXISTS idx_join_set_response_unique_child_id
135ON t_join_set_response (child_execution_id) WHERE child_execution_id IS NOT NULL;
136";
137
138 pub const CREATE_INDEX_IDX_JOIN_SET_RESPONSE_UNIQUE_DELAY_ID: &str = r"
139CREATE UNIQUE INDEX IF NOT EXISTS idx_join_set_response_unique_delay_id
140ON t_join_set_response (delay_id) WHERE delay_id IS NOT NULL;
141";
142
143 pub const CREATE_TABLE_T_STATE: &str = r"
145CREATE TABLE IF NOT EXISTS t_state (
146 execution_id TEXT NOT NULL,
147 is_top_level BOOLEAN NOT NULL,
148 corresponding_version BIGINT NOT NULL CHECK (corresponding_version >= 0),
149 ffqn TEXT NOT NULL,
150 created_at TIMESTAMPTZ NOT NULL,
151 component_id_input_digest BYTEA NOT NULL,
152 component_type TEXT NOT NULL,
153 first_scheduled_at TIMESTAMPTZ NOT NULL,
154 deployment_id TEXT NOT NULL,
155 is_paused BOOLEAN NOT NULL,
156
157 pending_expires_finished TIMESTAMPTZ NOT NULL,
158 state TEXT NOT NULL,
159 updated_at TIMESTAMPTZ NOT NULL,
160 intermittent_event_count BIGINT NOT NULL CHECK (intermittent_event_count >=0),
161
162 max_retries BIGINT CHECK (max_retries >= 0),
163 retry_exp_backoff_millis BIGINT CHECK (retry_exp_backoff_millis >= 0),
164 last_lock_version BIGINT CHECK (last_lock_version >= 0),
165 executor_id TEXT,
166 run_id TEXT,
167
168 join_set_id TEXT,
169 join_set_closing BOOLEAN,
170
171 result_kind JSONB,
172
173 PRIMARY KEY (execution_id)
174);
175";
176
177 pub const IDX_T_STATE_LOCK_PENDING_BY_FFQN: &str = formatcp!(
180 "CREATE INDEX IF NOT EXISTS idx_t_state_lock_pending_by_ffqn ON t_state (state, pending_expires_finished, ffqn) WHERE state = '{}';",
181 STATE_PENDING_AT
182 );
183 pub const IDX_T_STATE_LOCK_PENDING_BY_COMPONENT: &str = formatcp!(
185 "CREATE INDEX IF NOT EXISTS idx_t_state_lock_pending_by_component ON t_state (state, pending_expires_finished, component_id_input_digest) WHERE state = '{}';",
186 STATE_PENDING_AT
187 );
188
189 pub const IDX_T_STATE_EXPIRED_LOCKS: &str = formatcp!(
190 "CREATE INDEX IF NOT EXISTS idx_t_state_expired_locks ON t_state (pending_expires_finished) WHERE state = '{}';",
191 STATE_LOCKED
192 );
193
194 pub const IDX_T_STATE_EXECUTION_ID_IS_TOP_LEVEL: &str = r"
195CREATE INDEX IF NOT EXISTS idx_t_state_execution_id_is_root ON t_state (execution_id, is_top_level);
196";
197
198 pub const IDX_T_STATE_FFQN: &str = r"
199CREATE INDEX IF NOT EXISTS idx_t_state_ffqn ON t_state (ffqn);
200";
201
202 pub const IDX_T_STATE_CREATED_AT: &str = r"
203CREATE INDEX IF NOT EXISTS idx_t_state_created_at ON t_state (created_at);
204";
205 pub const IDX_T_STATE_DEPLOYMENT_STATE: &str = r"
207CREATE INDEX IF NOT EXISTS idx_t_state_deployment_state ON t_state (deployment_id, state);
208";
209
210 pub const CREATE_TABLE_T_DELAY: &str = r"
212CREATE TABLE IF NOT EXISTS t_delay (
213 execution_id TEXT NOT NULL,
214 join_set_id TEXT NOT NULL,
215 delay_id TEXT NOT NULL,
216 expires_at TIMESTAMPTZ NOT NULL,
217 PRIMARY KEY (execution_id, join_set_id, delay_id)
218);
219";
220
221 pub const CREATE_TABLE_T_WASM_BACKTRACE: &str = r"
224CREATE TABLE IF NOT EXISTS t_wasm_backtrace (
225 backtrace_hash BYTEA PRIMARY KEY,
226 wasm_backtrace JSONB NOT NULL
227);
228";
229
230 pub const CREATE_TABLE_T_EXECUTION_BACKTRACE: &str = r"
231CREATE TABLE IF NOT EXISTS t_execution_backtrace (
232 execution_id TEXT NOT NULL,
233 component_id JSONB NOT NULL,
234 version_min_including BIGINT NOT NULL CHECK (version_min_including >= 0),
235 version_max_excluding BIGINT NOT NULL CHECK (version_max_excluding >= 0),
236 backtrace_hash BYTEA NOT NULL,
237 PRIMARY KEY (execution_id, version_min_including, version_max_excluding),
238 FOREIGN KEY (backtrace_hash) REFERENCES t_wasm_backtrace(backtrace_hash)
239);
240";
241
242 pub const IDX_T_EXECUTION_BACKTRACE_EXECUTION_ID_VERSION: &str = r"
243CREATE INDEX IF NOT EXISTS idx_t_execution_backtrace_execution_id_version ON t_execution_backtrace (execution_id, version_min_including, version_max_excluding);
244";
245
246 pub const CREATE_TABLE_T_SOURCE_FILE: &str = r"
248CREATE TABLE IF NOT EXISTS t_source_file (
249 content_hash BYTEA PRIMARY KEY,
250 content TEXT NOT NULL
251);
252";
253 pub const CREATE_TABLE_T_COMPONENT_SOURCE: &str = r"
254CREATE TABLE IF NOT EXISTS t_component_source (
255 component_digest BYTEA NOT NULL,
256 frame_key TEXT NOT NULL,
257 is_suffix BOOLEAN NOT NULL,
258 content_hash BYTEA NOT NULL,
259 PRIMARY KEY (component_digest, frame_key, is_suffix),
260 FOREIGN KEY (content_hash) REFERENCES t_source_file(content_hash)
261);
262";
263
264 pub const CREATE_TABLE_T_LOG: &str = r"
266CREATE TABLE IF NOT EXISTS t_log (
267 id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
268 execution_id TEXT NOT NULL,
269 run_id TEXT NOT NULL,
270 created_at TIMESTAMPTZ NOT NULL,
271 level INTEGER,
272 message TEXT,
273 stream_type INTEGER,
274 payload BYTEA
275);
276";
277 pub const IDX_T_LOG_EXECUTION_ID_RUN_ID_CREATED_AT: &str = r"
278CREATE INDEX IF NOT EXISTS idx_t_log_execution_id_run_id_created_at ON t_log (execution_id, run_id, created_at);
279";
280 pub const IDX_T_LOG_EXECUTION_ID_CREATED_AT: &str = r"
281CREATE INDEX IF NOT EXISTS idx_t_log_execution_id_created_at ON t_log (execution_id, created_at);
282";
283
284 pub const CREATE_TABLE_T_DEPLOYMENT: &str = r"
286CREATE TABLE IF NOT EXISTS t_deployment (
287 deployment_id TEXT NOT NULL PRIMARY KEY,
288 created_at TIMESTAMPTZ NOT NULL,
289 last_active_at TIMESTAMPTZ,
290 status TEXT NOT NULL DEFAULT 'inactive',
291 config_json TEXT NOT NULL,
292 obelisk_version TEXT NOT NULL,
293 created_by TEXT
294);
295";
296 pub const IDX_T_DEPLOYMENT_STATUS: &str = r"
297CREATE INDEX IF NOT EXISTS idx_t_deployment_status ON t_deployment (status);
298";
299 pub const IDX_T_DEPLOYMENT_SINGLE_ACTIVE: &str = r"
301CREATE UNIQUE INDEX IF NOT EXISTS idx_t_deployment_single_active ON t_deployment ((1)) WHERE status = 'active';
302";
303 pub const IDX_T_DEPLOYMENT_SINGLE_ENQUEUED: &str = r"
305CREATE UNIQUE INDEX IF NOT EXISTS idx_t_deployment_single_enqueued ON t_deployment ((1)) WHERE status = 'enqueued';
306";
307}
308
309#[derive(Debug, Clone)]
310pub struct PostgresConfig {
311 pub host: String,
312 pub user: String,
313 pub password: SecretString,
314 pub db_name: String,
315}
316
317#[derive(Debug, thiserror::Error)]
318#[error("initialization error")]
319pub struct InitializationError;
320
321async fn create_database(
322 config: &PostgresConfig,
323 provision_policy: ProvisionPolicy,
324) -> Result<DbInitialzationOutcome, InitializationError> {
325 let mut admin_cfg = deadpool_postgres::Config::new();
326 admin_cfg.host = Some(config.host.clone());
327 admin_cfg.user = Some(config.user.clone());
328 admin_cfg.password = Some(config.password.expose_secret().to_string());
329 admin_cfg.dbname = Some(ADMIN_DB_NAME.into());
330 admin_cfg.manager = Some(ManagerConfig {
331 recycling_method: RecyclingMethod::Fast,
332 });
333
334 let admin_pool = admin_cfg.create_pool(None, NoTls).map_err(|err| {
335 error!("Cannot create the default pool - {err:?}");
336 InitializationError
337 })?;
338
339 let client = admin_pool.get().await.map_err(|err| {
340 error!("Cannot get a connection from the default pool - {err:?}");
341 InitializationError
342 })?;
343
344 let row = client
345 .query_opt(
346 &format!(
347 "SELECT 1 FROM pg_database WHERE datname = '{}'",
348 config.db_name
349 ),
350 &[],
351 )
352 .await
353 .map_err(|err| {
354 error!("Cannot select from the default database - {err:?}");
355 InitializationError
356 })?;
357
358 match (row, provision_policy) {
359 (None, ProvisionPolicy::MustCreate | ProvisionPolicy::Auto) => {
360 client
361 .execute(&format!("CREATE DATABASE {}", config.db_name), &[])
362 .await
363 .map_err(|err| {
364 error!("Cannot create the database - {err:?}");
365 InitializationError
366 })?;
367 info!("Database '{}' created.", config.db_name);
368 Ok(DbInitialzationOutcome::Created)
369 }
370 (Some(_), ProvisionPolicy::Auto) => {
371 info!("Database '{}' exists.", config.db_name);
372 Ok(DbInitialzationOutcome::Existing)
373 }
374 (Some(_), ProvisionPolicy::MustCreate) => {
375 warn!("Database '{}' already exists.", config.db_name);
376 Err(InitializationError)
377 }
378 (_, ProvisionPolicy::NeverCreate) => unreachable!("checked by the caller"),
379 }
380}
381
382type ResponseSubscribers =
384 Arc<std::sync::Mutex<HashMap<ExecutionId, (oneshot::Sender<ResponseWithCursor>, u64)>>>;
385type PendingSubscribers = Arc<std::sync::Mutex<PendingFfqnSubscribersHolder>>;
386type ExecutionFinishedSubscribers = std::sync::Mutex<
387 HashMap<ExecutionId, HashMap<u64, oneshot::Sender<SupportedFunctionReturnValue>>>,
388>;
389
390pub struct PostgresPool {
391 pool: Pool,
392 response_subscribers: ResponseSubscribers,
393 pending_subscribers: PendingSubscribers,
394 execution_finished_subscribers: Arc<ExecutionFinishedSubscribers>,
395 pub config: PostgresConfig,
397}
398
399#[async_trait]
400impl DbPool for PostgresPool {
401 async fn db_exec_conn(&self) -> Result<Box<dyn DbExecutor>, DbErrorGeneric> {
402 let client = self.pool.get().await?;
403
404 Ok(Box::new(PostgresConnection {
405 client: tokio::sync::Mutex::new(client),
406 response_subscribers: self.response_subscribers.clone(),
407 pending_subscribers: self.pending_subscribers.clone(),
408 execution_finished_subscribers: self.execution_finished_subscribers.clone(),
409 }))
410 }
411
412 async fn connection(&self) -> Result<Box<dyn DbConnection>, DbErrorGeneric> {
413 let client = self.pool.get().await?;
414
415 Ok(Box::new(PostgresConnection {
416 client: tokio::sync::Mutex::new(client),
417 response_subscribers: self.response_subscribers.clone(),
418 pending_subscribers: self.pending_subscribers.clone(),
419 execution_finished_subscribers: self.execution_finished_subscribers.clone(),
420 }))
421 }
422
423 async fn external_api_conn(&self) -> Result<Box<dyn DbExternalApi>, DbErrorGeneric> {
424 let client = self.pool.get().await?;
425
426 Ok(Box::new(PostgresConnection {
427 client: tokio::sync::Mutex::new(client),
428 response_subscribers: self.response_subscribers.clone(),
429 pending_subscribers: self.pending_subscribers.clone(),
430 execution_finished_subscribers: self.execution_finished_subscribers.clone(),
431 }))
432 }
433
434 #[cfg(feature = "test")]
435 async fn connection_test(
436 &self,
437 ) -> Result<Box<dyn concepts::storage::DbConnectionTest>, DbErrorGeneric> {
438 let client = self.pool.get().await?;
439
440 Ok(Box::new(PostgresConnection {
441 client: tokio::sync::Mutex::new(client),
442 response_subscribers: self.response_subscribers.clone(),
443 pending_subscribers: self.pending_subscribers.clone(),
444 execution_finished_subscribers: self.execution_finished_subscribers.clone(),
445 }))
446 }
447}
448
449pub struct PostgresConnection {
450 client: tokio::sync::Mutex<Client>, response_subscribers: ResponseSubscribers,
452 pending_subscribers: PendingSubscribers,
453 execution_finished_subscribers: Arc<ExecutionFinishedSubscribers>,
454}
455
456#[derive(Debug, Clone, Copy, PartialEq, Eq)]
457pub enum ProvisionPolicy {
458 NeverCreate,
459 Auto,
461 MustCreate,
463}
464
465#[derive(Debug, Clone, Copy, PartialEq, Eq)]
466pub enum DbInitialzationOutcome {
467 Created,
468 Existing,
469}
470
471impl PostgresPool {
472 #[instrument(skip_all, name = "postgres_new")]
473 pub async fn new(
474 config: PostgresConfig,
475 provision_policy: ProvisionPolicy,
476 ) -> Result<PostgresPool, InitializationError> {
477 Self::new_with_outcome(config, provision_policy)
478 .await
479 .map(|(db, _)| db)
480 }
481
482 pub async fn new_with_outcome(
483 config: PostgresConfig,
484 provision_policy: ProvisionPolicy,
485 ) -> Result<(PostgresPool, DbInitialzationOutcome), InitializationError> {
486 let outcome = if matches!(
487 provision_policy,
488 ProvisionPolicy::Auto | ProvisionPolicy::MustCreate
489 ) {
490 create_database(&config, provision_policy).await?
491 } else {
492 DbInitialzationOutcome::Existing
493 };
494 let mut cfg = deadpool_postgres::Config::new();
495 cfg.host = Some(config.host.clone());
496 cfg.user = Some(config.user.clone());
497 cfg.password = Some(config.password.expose_secret().to_string());
498 cfg.dbname = Some(config.db_name.clone());
499 cfg.manager = Some(ManagerConfig {
500 recycling_method: RecyclingMethod::Fast,
501 });
502
503 let pool = cfg.create_pool(None, NoTls).map_err(|err| {
504 error!("Cannot create the database pool - {err:?}");
505 InitializationError
506 })?;
507 let client = pool.get().await.map_err(|err| {
508 error!("Cannot get a connection from the database pool - {err:?}");
509 InitializationError
510 })?;
511
512 let statements = vec![
513 ddl::CREATE_TABLE_T_METADATA,
514 ddl::CREATE_TABLE_T_EXECUTION_LOG,
515 ddl::CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_VERSION,
516 ddl::CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_VARIANT,
517 ddl::CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_JOIN_SET,
518 ddl::CREATE_TABLE_T_JOIN_SET_RESPONSE,
519 ddl::CREATE_INDEX_IDX_T_JOIN_SET_RESPONSE_EXECUTION_ID_ID,
520 ddl::CREATE_INDEX_IDX_JOIN_SET_RESPONSE_UNIQUE_CHILD_ID,
521 ddl::CREATE_INDEX_IDX_JOIN_SET_RESPONSE_UNIQUE_DELAY_ID,
522 ddl::CREATE_TABLE_T_STATE,
523 ddl::IDX_T_STATE_LOCK_PENDING_BY_FFQN,
524 ddl::IDX_T_STATE_LOCK_PENDING_BY_COMPONENT,
525 ddl::IDX_T_STATE_EXPIRED_LOCKS,
526 ddl::IDX_T_STATE_EXECUTION_ID_IS_TOP_LEVEL,
527 ddl::IDX_T_STATE_FFQN,
528 ddl::IDX_T_STATE_CREATED_AT,
529 ddl::IDX_T_STATE_DEPLOYMENT_STATE,
530 ddl::CREATE_TABLE_T_DELAY,
531 ddl::CREATE_TABLE_T_WASM_BACKTRACE,
532 ddl::CREATE_TABLE_T_EXECUTION_BACKTRACE,
533 ddl::IDX_T_EXECUTION_BACKTRACE_EXECUTION_ID_VERSION,
534 ddl::CREATE_TABLE_T_SOURCE_FILE,
535 ddl::CREATE_TABLE_T_COMPONENT_SOURCE,
536 ddl::CREATE_TABLE_T_LOG,
537 ddl::IDX_T_LOG_EXECUTION_ID_RUN_ID_CREATED_AT,
538 ddl::IDX_T_LOG_EXECUTION_ID_CREATED_AT,
539 ddl::CREATE_TABLE_T_DEPLOYMENT,
540 ddl::IDX_T_DEPLOYMENT_STATUS,
541 ddl::IDX_T_DEPLOYMENT_SINGLE_ACTIVE,
542 ddl::IDX_T_DEPLOYMENT_SINGLE_ENQUEUED,
543 ];
544
545 let batch_sql = statements.join("\n");
547 client.batch_execute(&batch_sql).await.map_err(|err| {
548 error!("Cannot run the DDL import - {err:?}");
549 InitializationError
550 })?;
551
552 let row = client
553 .query_opt(
554 "SELECT schema_version FROM t_metadata ORDER BY id DESC LIMIT 1",
555 &[],
556 )
557 .await
558 .map_err(|err| {
559 error!("Cannot select schema version - {err:?}");
560 InitializationError
561 })?;
562
563 let actual_version = match row {
565 Some(r) => Some(r.try_get::<_, i32>("schema_version").map_err(|e| {
566 error!("Failed to get schema_version column: {e}");
567 InitializationError
568 })?),
569 None => None,
570 };
571
572 match actual_version {
573 None => {
574 client
575 .execute(
576 "INSERT INTO t_metadata (schema_version, created_at) VALUES ($1, $2)",
577 &[&(T_METADATA_EXPECTED_SCHEMA_VERSION), &Utc::now()],
578 )
579 .await
580 .map_err(|err| {
581 error!("Cannot insert schema version - {err:?}");
582 InitializationError
583 })?;
584 }
585 Some(actual_version) => {
586 if (actual_version) != T_METADATA_EXPECTED_SCHEMA_VERSION {
588 error!(
589 "Wrong schema version, expected {T_METADATA_EXPECTED_SCHEMA_VERSION}, got {actual_version}"
590 );
591 return Err(InitializationError);
592 }
593 }
594 }
595
596 debug!("Database schema initialized.");
597
598 Ok((
599 PostgresPool {
600 pool,
601 execution_finished_subscribers: Arc::default(),
602 pending_subscribers: Arc::default(),
603 response_subscribers: Arc::default(),
604 config,
605 },
606 outcome,
607 ))
608 }
609}
610
611fn deployment_record_from_pg_row(row: &Row) -> Result<DeploymentRecord, DbErrorRead> {
612 let deployment_id_str: String = get(row, "deployment_id")?;
613 let deployment_id = deployment_id_str.parse::<DeploymentId>().map_err(|e| {
614 DbErrorRead::Generic(consistency_db_err(format!("invalid deployment_id: {e}")))
615 })?;
616 let status_str: String = get(row, "status")?;
617 let status = status_str
618 .parse::<DeploymentStatus>()
619 .map_err(|e| DbErrorRead::Generic(consistency_db_err(format!("invalid status: {e}"))))?;
620 Ok(DeploymentRecord {
621 deployment_id,
622 created_at: get(row, "created_at")?,
623 last_active_at: get(row, "last_active_at")?,
624 status,
625 config_json: get(row, "config_json")?,
626 obelisk_version: get(row, "obelisk_version")?,
627 created_by: get(row, "created_by")?,
628 })
629}
630
631#[track_caller]
632fn consistency_db_err(reason: impl Into<StrVariant>) -> DbErrorGeneric {
633 DbErrorGeneric::Uncategorized {
634 reason: reason.into(),
635 context: SpanTrace::capture(),
636 source: None,
637 loc: Location::caller(),
638 }
639}
640#[track_caller]
641fn consistency_db_err_src(
642 reason: impl Into<StrVariant>,
643 source: Arc<dyn std::error::Error + Send + Sync>,
644) -> DbErrorGeneric {
645 DbErrorGeneric::Uncategorized {
646 reason: reason.into(),
647 context: SpanTrace::capture(),
648 source: Some(source),
649 loc: Location::caller(),
650 }
651}
652
653#[derive(Debug, Clone)]
654struct DelayReq {
655 join_set_id: JoinSetId,
656 delay_id: DelayId,
657 expires_at: DateTime<Utc>,
658}
659
660async fn fetch_created_event(
661 tx: &Transaction<'_>,
662 execution_id: &ExecutionId,
663) -> Result<CreateRequest, DbErrorRead> {
664 let stmt = "SELECT created_at, json_value FROM t_execution_log WHERE \
665 execution_id = $1 AND version = 0";
666
667 let row = tx.query_one(stmt, &[&execution_id.to_string()]).await?;
668
669 let created_at = get(&row, "created_at")?;
670 let event: Json<ExecutionRequest> = get(&row, "json_value")?;
671 let event = event.0;
672
673 if let ExecutionRequest::Created {
674 ffqn,
675 params,
676 parent,
677 scheduled_at,
678 component_id,
679 deployment_id,
680 metadata,
681 scheduled_by,
682 } = event
683 {
684 Ok(CreateRequest {
685 created_at,
686 execution_id: execution_id.clone(),
687 ffqn,
688 params,
689 parent,
690 scheduled_at,
691 component_id,
692 deployment_id,
693 metadata,
694 scheduled_by,
695 })
696 } else {
697 error!("Row with version=0 must be a `Created` event - {event:?}");
698 Err(consistency_db_err("expected `Created` event").into())
699 }
700}
701
702fn check_expected_next_and_appending_version(
703 expected_version: &Version,
704 appending_version: &Version,
705) -> Result<(), DbErrorWrite> {
706 if *expected_version != *appending_version {
707 debug!(
708 "Version conflict - expected: {expected_version:?}, appending: {appending_version:?}"
709 );
710 return Err(DbErrorWrite::NonRetriable(
711 DbErrorWriteNonRetriable::VersionConflict {
712 expected: expected_version.clone(),
713 requested: appending_version.clone(),
714 },
715 ));
716 }
717 Ok(())
718}
719
720#[instrument(level = Level::DEBUG, skip_all, fields(execution_id = %req.execution_id))]
721async fn create_inner(
722 tx: &Transaction<'_>,
723 req: CreateRequest,
724) -> Result<(AppendResponse, AppendNotifier), DbErrorWrite> {
725 trace!("create_inner");
726
727 let version = Version::default();
728 let execution_id = req.execution_id.clone();
729 let execution_id_str = execution_id.to_string();
730 let ffqn = req.ffqn.clone();
731 let created_at = req.created_at;
732 let scheduled_at = req.scheduled_at;
733 let component_id = req.component_id.clone();
734 let deployment_id = req.deployment_id;
735
736 let event = ExecutionRequest::from(req);
737 let event = Json(event);
738
739 tx.execute(
740 "INSERT INTO t_execution_log (
741 execution_id, created_at, version, json_value, variant, join_set_id
742 ) VALUES ($1, $2, $3, $4, $5, $6)",
743 &[
744 &execution_id_str,
745 &created_at,
746 &i64::from(version.0), &event,
748 &event.0.variant(),
749 &event.0.join_set_id().map(std::string::ToString::to_string),
750 ],
751 )
752 .await?;
753
754 let pending_at = {
755 debug!("Creating with `Pending(`{scheduled_at:?}`)");
756
757 tx.execute(
758 r"
759 INSERT INTO t_state (
760 execution_id,
761 is_top_level,
762 corresponding_version,
763 pending_expires_finished,
764 ffqn,
765 state,
766 created_at,
767 component_id_input_digest,
768 component_type,
769 deployment_id,
770 first_scheduled_at,
771 updated_at,
772 intermittent_event_count,
773 is_paused
774 ) VALUES (
775 $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, CURRENT_TIMESTAMP, 0, false
776 )",
777 &[
778 &execution_id_str,
779 &execution_id.is_top_level(),
780 &i64::from(version.0),
781 &scheduled_at,
782 &ffqn.to_string(),
783 &STATE_PENDING_AT,
784 &created_at,
785 &component_id.component_digest.as_slice(),
786 &component_id.component_type.to_string(),
787 &deployment_id.to_string(),
788 &scheduled_at,
789 ],
790 )
791 .await?;
792
793 AppendNotifier {
794 pending_at: Some(NotifierPendingAt {
795 scheduled_at,
796 ffqn,
797 component_input_digest: component_id.component_digest,
798 }),
799 execution_finished: None,
800 response: None,
801 }
802 };
803
804 let next_version = Version::new(version.0 + 1);
805 Ok((next_version, pending_at))
806}
807
808#[instrument(level = Level::DEBUG, skip_all, fields(%execution_id, %scheduled_at))]
809async fn update_state_pending_after_response_appended(
810 tx: &Transaction<'_>,
811 execution_id: &ExecutionId,
812 scheduled_at: DateTime<Utc>, component_input_digest: ComponentDigest,
814) -> Result<AppendNotifier, DbErrorWrite> {
815 debug!("Setting t_state to Pending(`{scheduled_at:?}`) after response appended");
816
817 let execution_id_str = execution_id.to_string();
819
820 let updated = tx
821 .execute(
822 r"
823 UPDATE t_state
824 SET
825 pending_expires_finished = $1,
826 state = $2,
827 updated_at = CURRENT_TIMESTAMP,
828
829 max_retries = NULL,
830 retry_exp_backoff_millis = NULL,
831 last_lock_version = NULL,
832
833 join_set_id = NULL,
834 join_set_closing = NULL,
835
836 result_kind = NULL
837 WHERE execution_id = $3
838 ",
839 &[
840 &scheduled_at, &STATE_PENDING_AT, &execution_id_str, ],
844 )
845 .await?;
846
847 if updated == 0 {
848 return Err(DbErrorWrite::NotFound);
849 }
850
851 Ok(AppendNotifier {
852 pending_at: Some(NotifierPendingAt {
853 scheduled_at,
854 ffqn: fetch_created_event(tx, execution_id).await?.ffqn,
855 component_input_digest,
856 }),
857 execution_finished: None,
858 response: None,
859 })
860}
861
862#[instrument(level = Level::DEBUG, skip_all, fields(%execution_id, %scheduled_at, %appending_version))]
863async fn update_state_pending_after_event_appended(
864 tx: &Transaction<'_>,
865 execution_id: &ExecutionId,
866 appending_version: &Version,
867 scheduled_at: DateTime<Utc>,
868 intermittent_failure: bool,
869 component_input_digest: ComponentDigest,
870) -> Result<(AppendResponse, AppendNotifier), DbErrorWrite> {
871 debug!("Setting t_state to Pending(`{scheduled_at:?}`) after event appended");
872
873 let intermittent_delta = i64::from(intermittent_failure); let updated = tx
876 .execute(
877 r"
878 UPDATE t_state
879 SET
880 corresponding_version = $1,
881 pending_expires_finished = $2,
882 state = $3,
883 updated_at = CURRENT_TIMESTAMP,
884 intermittent_event_count = intermittent_event_count + $4,
885
886 max_retries = NULL,
887 retry_exp_backoff_millis = NULL,
888 last_lock_version = NULL,
889
890 join_set_id = NULL,
891 join_set_closing = NULL,
892
893 result_kind = NULL
894 WHERE execution_id = $5;
895 ",
896 &[
897 &i64::from(appending_version.0), &scheduled_at, &STATE_PENDING_AT, &intermittent_delta, &execution_id.to_string(), ],
903 )
904 .await?;
905
906 if updated != 1 {
907 return Err(DbErrorWrite::NotFound);
908 }
909
910 Ok((
911 appending_version.increment(),
912 AppendNotifier {
913 pending_at: Some(NotifierPendingAt {
914 scheduled_at,
915 ffqn: fetch_created_event(tx, execution_id).await?.ffqn,
916 component_input_digest,
917 }),
918 execution_finished: None,
919 response: None,
920 },
921 ))
922}
923
924#[instrument(level = Level::DEBUG, skip_all, fields(%execution_id, %appending_version))]
925#[expect(clippy::too_many_arguments)]
926async fn update_state_locked_get_intermittent_event_count(
927 tx: &Transaction<'_>,
928 execution_id: &ExecutionId,
929 deployment_id: DeploymentId,
930 component_digest: &ComponentDigest,
931 executor_id: ExecutorId,
932 run_id: RunId,
933 lock_expires_at: DateTime<Utc>,
934 appending_version: &Version,
935 retry_config: ComponentRetryConfig,
936) -> Result<u32, DbErrorWrite> {
937 debug!("Setting t_state to Locked(`{lock_expires_at:?}`)");
938 let backoff_millis =
939 i64::try_from(retry_config.retry_exp_backoff.as_millis()).map_err(|err| {
940 DbErrorGeneric::Uncategorized {
942 reason: "backoff too big".into(),
943 context: SpanTrace::capture(),
944 source: Some(Arc::new(err)),
945 loc: Location::caller(),
946 }
947 })?;
948
949 let execution_id_str = execution_id.to_string();
950
951 let updated = tx
952 .execute(
953 r"
954 UPDATE t_state
955 SET
956 corresponding_version = $1,
957 pending_expires_finished = $2,
958 state = $3,
959 updated_at = CURRENT_TIMESTAMP,
960 deployment_id = $4,
961 component_id_input_digest = $5,
962
963 max_retries = $6,
964 retry_exp_backoff_millis = $7,
965 last_lock_version = $1,
966 executor_id = $8,
967 run_id = $9,
968
969 join_set_id = NULL,
970 join_set_closing = NULL,
971
972 result_kind = NULL
973 WHERE execution_id = $10
974 AND is_paused = false
975 ",
976 &[
977 &i64::from(appending_version.0),
978 &lock_expires_at,
979 &STATE_LOCKED,
980 &deployment_id.to_string(),
981 &component_digest,
982 &retry_config.max_retries.map(i64::from),
983 &backoff_millis,
984 &executor_id.to_string(),
985 &run_id.to_string(),
986 &execution_id_str,
987 ],
988 )
989 .await?;
990
991 if updated != 1 {
992 return Err(DbErrorWrite::NotFound);
993 }
994
995 let row = tx
997 .query_one(
998 "SELECT intermittent_event_count FROM t_state WHERE execution_id = $1",
999 &[&execution_id_str],
1000 )
1001 .await
1002 .map_err(DbErrorGeneric::from)?;
1003
1004 let count: i64 = get(&row, "intermittent_event_count")?; let count = u32::try_from(count)
1006 .map_err(|_| consistency_db_err("`intermittent_event_count` must not be negative"))?;
1007 Ok(count)
1008}
1009
1010#[instrument(level = Level::DEBUG, skip_all, fields(%execution_id, %appending_version))]
1011async fn update_state_blocked(
1012 tx: &Transaction<'_>,
1013 execution_id: &ExecutionId,
1014 appending_version: &Version,
1015 join_set_id: &JoinSetId,
1016 lock_expires_at: DateTime<Utc>,
1017 join_set_closing: bool,
1018) -> Result<AppendResponse, DbErrorWrite> {
1019 debug!("Setting t_state to BlockedByJoinSet(`{join_set_id}`)");
1020
1021 let updated = tx
1022 .execute(
1023 r"
1024 UPDATE t_state
1025 SET
1026 corresponding_version = $1,
1027 pending_expires_finished = $2,
1028 state = $3,
1029 updated_at = CURRENT_TIMESTAMP,
1030
1031 max_retries = NULL,
1032 retry_exp_backoff_millis = NULL,
1033 last_lock_version = NULL,
1034
1035 join_set_id = $4,
1036 join_set_closing = $5,
1037
1038 result_kind = NULL
1039 WHERE execution_id = $6
1040 ",
1041 &[
1042 &i64::from(appending_version.0), &lock_expires_at, &STATE_BLOCKED_BY_JOIN_SET, &join_set_id.to_string(), &join_set_closing, &execution_id.to_string(), ],
1049 )
1050 .await?;
1051
1052 if updated != 1 {
1053 return Err(DbErrorWrite::NotFound);
1054 }
1055 Ok(appending_version.increment())
1056}
1057
1058#[instrument(level = Level::DEBUG, skip_all, fields(%execution_id, %appending_version))]
1059async fn update_state_finished(
1060 tx: &Transaction<'_>,
1061 execution_id: &ExecutionId,
1062 appending_version: &Version,
1063 finished_at: DateTime<Utc>,
1064 result_kind: PendingStateFinishedResultKind,
1065) -> Result<(), DbErrorWrite> {
1066 debug!("Setting t_state to Finished");
1067
1068 let result_kind_json = Json(result_kind);
1069
1070 let updated = tx
1071 .execute(
1072 r"
1073 UPDATE t_state
1074 SET
1075 corresponding_version = $1,
1076 pending_expires_finished = $2,
1077 state = $3,
1078 updated_at = CURRENT_TIMESTAMP,
1079
1080 max_retries = NULL,
1081 retry_exp_backoff_millis = NULL,
1082 last_lock_version = NULL,
1083 executor_id = NULL,
1084 run_id = NULL,
1085
1086 join_set_id = NULL,
1087 join_set_closing = NULL,
1088
1089 result_kind = $4
1090 WHERE execution_id = $5
1091 ",
1092 &[
1093 &i64::from(appending_version.0), &finished_at, &STATE_FINISHED, &result_kind_json, &execution_id.to_string(), ],
1099 )
1100 .await?;
1101
1102 if updated != 1 {
1103 return Err(DbErrorWrite::NotFound);
1104 }
1105 Ok(())
1106}
1107
1108#[instrument(level = Level::DEBUG, skip_all, fields(%execution_id, %appending_version, %is_paused))]
1109async fn update_state_paused(
1110 tx: &Transaction<'_>,
1111 execution_id: &ExecutionId,
1112 appending_version: &Version,
1113 is_paused: bool,
1114) -> Result<AppendResponse, DbErrorWrite> {
1115 debug!(
1116 "Setting t_state to {}",
1117 if is_paused { "paused" } else { "unpaused" }
1118 );
1119
1120 let updated = tx
1121 .execute(
1122 r"
1123 UPDATE t_state
1124 SET
1125 corresponding_version = $1,
1126 is_paused = $2,
1127 updated_at = CURRENT_TIMESTAMP
1128 WHERE execution_id = $3
1129 ",
1130 &[
1131 &i64::from(appending_version.0), &is_paused, &execution_id.to_string(), ],
1135 )
1136 .await?;
1137
1138 if updated != 1 {
1139 return Err(DbErrorWrite::NotFound);
1140 }
1141 Ok(appending_version.increment())
1142}
1143
1144#[instrument(level = Level::DEBUG, skip_all, fields(%execution_id, %appending_version))]
1145async fn bump_state_next_version(
1146 tx: &Transaction<'_>,
1147 execution_id: &ExecutionId,
1148 appending_version: &Version,
1149 delay_req: Option<DelayReq>,
1150) -> Result<AppendResponse, DbErrorWrite> {
1151 debug!("update_index_version");
1152 let execution_id_str = execution_id.to_string();
1153
1154 let updated = tx
1155 .execute(
1156 r"
1157 UPDATE t_state
1158 SET
1159 corresponding_version = $1,
1160 updated_at = CURRENT_TIMESTAMP
1161 WHERE execution_id = $2
1162 ",
1163 &[
1164 &i64::from(appending_version.0), &execution_id_str, ],
1167 )
1168 .await?;
1169
1170 if updated != 1 {
1171 return Err(DbErrorWrite::NotFound);
1172 }
1173
1174 if let Some(DelayReq {
1175 join_set_id,
1176 delay_id,
1177 expires_at,
1178 }) = delay_req
1179 {
1180 debug!("Inserting delay to `t_delay`");
1181 tx.execute(
1182 "INSERT INTO t_delay (execution_id, join_set_id, delay_id, expires_at) VALUES ($1, $2, $3, $4)",
1183 &[
1184 &execution_id_str,
1185 &join_set_id.to_string(),
1186 &delay_id.to_string(),
1187 &expires_at,
1188 ],
1189 )
1190 .await?;
1191 }
1192 Ok(appending_version.increment())
1193}
1194
1195async fn get_combined_state(
1196 tx: &Transaction<'_>,
1197 execution_id: &ExecutionId,
1198) -> Result<CombinedState, DbErrorRead> {
1199 let row = tx
1200 .query_one(
1201 r"
1202 SELECT
1203 created_at, first_scheduled_at,
1204 state, ffqn, component_id_input_digest, component_type, deployment_id, corresponding_version, pending_expires_finished,
1205 last_lock_version, executor_id, run_id,
1206 join_set_id, join_set_closing,
1207 result_kind, is_paused
1208 FROM t_state
1209 WHERE execution_id = $1
1210 ",
1211 &[&execution_id.to_string()],
1212 )
1213 .await
1214 .map_err(DbErrorRead::from)?;
1215
1216 let created_at: DateTime<Utc> = get(&row, "created_at")?;
1219 let first_scheduled_at: DateTime<Utc> = get(&row, "first_scheduled_at")?;
1220
1221 let digest_bytes: Vec<u8> = get(&row, "component_id_input_digest")?;
1222 let digest = Digest::try_from(digest_bytes.as_slice()).map_err(|err| {
1223 consistency_db_err_src("cannot parse `component_id_input_digest`", Arc::from(err))
1224 })?;
1225 let component_digest = ComponentDigest(digest);
1226
1227 let component_type: String = get(&row, "component_type")?;
1228 let component_type = ComponentType::from_str(&component_type)
1229 .map_err(|err| consistency_db_err_src("cannot parse `component_type`", Arc::from(err)))?;
1230
1231 let deployment_id: String = get(&row, "deployment_id")?;
1232 let deployment_id = DeploymentId::from_str(&deployment_id).map_err(DbErrorGeneric::from)?;
1233
1234 let state: String = get(&row, "state")?;
1235 let ffqn: String = get(&row, "ffqn")?;
1236 let ffqn = FunctionFqn::from_str(&ffqn).map_err(|parse_err| {
1237 consistency_db_err(format!("invalid ffqn value in `t_state` - {parse_err}"))
1238 })?;
1239
1240 let pending_expires_finished: DateTime<Utc> = get(&row, "pending_expires_finished")?;
1241
1242 let last_lock_version_raw: Option<i64> = get(&row, "last_lock_version")?;
1243 let last_lock_version = last_lock_version_raw
1244 .map(Version::try_from)
1245 .transpose()
1246 .map_err(|_| consistency_db_err("version must be non-negative"))?;
1247
1248 let executor_id_raw: Option<String> = get(&row, "executor_id")?;
1249 let executor_id = executor_id_raw
1250 .map(|id| ExecutorId::from_str(&id))
1251 .transpose()
1252 .map_err(DbErrorGeneric::from)?;
1253
1254 let run_id_raw: Option<String> = get(&row, "run_id")?;
1255 let run_id = run_id_raw
1256 .map(|id| RunId::from_str(&id))
1257 .transpose()
1258 .map_err(DbErrorGeneric::from)?;
1259
1260 let join_set_id_raw: Option<String> = get(&row, "join_set_id")?;
1261 let join_set_id = join_set_id_raw
1262 .map(|id| JoinSetId::from_str(&id))
1263 .transpose()
1264 .map_err(DbErrorGeneric::from)?;
1265
1266 let join_set_closing: Option<bool> = get(&row, "join_set_closing")?;
1267
1268 let result_kind: Option<Json<PendingStateFinishedResultKind>> = get(&row, "result_kind")?;
1269 let result_kind = result_kind.map(|it| it.0);
1270
1271 let is_paused: bool = get(&row, "is_paused")?;
1272
1273 let corresponding_version: i64 = get(&row, "corresponding_version")?;
1274 let corresponding_version = Version::new(
1275 VersionType::try_from(corresponding_version)
1276 .map_err(|_| consistency_db_err("version must be non-negative"))?,
1277 );
1278
1279 let dto = CombinedStateDTO {
1280 execution_id: execution_id.clone(),
1281 created_at,
1282 first_scheduled_at,
1283 state,
1284 ffqn,
1285 component_digest,
1286 component_type,
1287 deployment_id,
1288 pending_expires_finished,
1289 last_lock_version,
1290 executor_id,
1291 run_id,
1292 join_set_id,
1293 join_set_closing,
1294 result_kind,
1295 is_paused,
1296 };
1297 CombinedState::new(dto, corresponding_version).map_err(DbErrorRead::from)
1298}
1299
1300async fn list_executions(
1301 read_tx: &Transaction<'_>,
1302 filter: ListExecutionsFilter,
1303 pagination: &ExecutionListPagination,
1304) -> Result<Vec<ExecutionWithState>, DbErrorGeneric> {
1305 struct QueryBuilder {
1307 where_clauses: Vec<String>,
1308 params: Vec<Box<dyn ToSql + Send + Sync>>,
1309 }
1310
1311 impl QueryBuilder {
1312 fn new() -> Self {
1313 Self {
1314 where_clauses: Vec::new(),
1315 params: Vec::new(),
1316 }
1317 }
1318
1319 fn add_param<T>(&mut self, param: T) -> String
1320 where
1321 T: ToSql + Sync + Send + 'static,
1322 {
1323 self.params.push(Box::new(param));
1324 format!("${}", self.params.len())
1325 }
1326
1327 fn add_where(&mut self, clause: String) {
1328 self.where_clauses.push(clause);
1329 }
1330 }
1331
1332 let mut qb = QueryBuilder::new();
1333
1334 let (limit, limit_desc) = match pagination {
1336 ExecutionListPagination::CreatedBy(p) => {
1337 let limit = p.length();
1338 let is_desc = p.is_desc();
1339 if let Some(cursor) = p.cursor() {
1340 let placeholder = qb.add_param(*cursor);
1341 qb.add_where(format!("created_at {} {placeholder}", p.rel()));
1342 }
1343 (limit, is_desc)
1344 }
1345 ExecutionListPagination::ExecutionId(p) => {
1346 let limit = p.length();
1347 let is_desc = p.is_desc();
1348 if let Some(cursor) = p.cursor() {
1349 let placeholder = qb.add_param(cursor.to_string());
1350 qb.add_where(format!("execution_id {} {placeholder}", p.rel()));
1351 }
1352 (limit, is_desc)
1353 }
1354 };
1355
1356 if !filter.show_derived {
1357 qb.add_where("is_top_level = true".to_string());
1358 }
1359 let like = |str| format!("{str}%");
1360 if let Some(ffqn_prefix) = filter.ffqn_prefix {
1361 let placeholder = qb.add_param(like(ffqn_prefix));
1362 qb.add_where(format!("ffqn LIKE {placeholder}"));
1363 }
1364 if filter.hide_finished {
1365 qb.add_where(format!("state != '{STATE_FINISHED}'"));
1366 }
1367 if let Some(prefix) = filter.execution_id_prefix {
1368 let placeholder = qb.add_param(like(prefix));
1369 qb.add_where(format!("execution_id LIKE {placeholder}"));
1370 }
1371 if let Some(component_digest) = filter.component_digest {
1372 let placeholder = qb.add_param(component_digest);
1373 qb.add_where(format!("component_id_input_digest = {placeholder}"));
1374 }
1375 if let Some(deployment_id) = filter.deployment_id {
1376 let placeholder = qb.add_param(deployment_id.to_string());
1377 qb.add_where(format!("deployment_id = {placeholder}"));
1378 }
1379
1380 let where_str = if qb.where_clauses.is_empty() {
1381 String::new()
1382 } else {
1383 format!("WHERE {}", qb.where_clauses.join(" AND "))
1384 };
1385
1386 let order_col = match pagination {
1387 ExecutionListPagination::CreatedBy(_) => "created_at",
1388 ExecutionListPagination::ExecutionId(_) => "execution_id",
1389 };
1390
1391 let (inner_order, outer_order) = if limit_desc {
1394 ("DESC", "")
1395 } else {
1396 ("", "DESC")
1397 };
1398
1399 let inner_sql = format!(
1400 r"SELECT created_at, first_scheduled_at, component_id_input_digest, deployment_id,
1401 state, execution_id, ffqn, corresponding_version, pending_expires_finished,
1402 last_lock_version, executor_id, run_id,
1403 join_set_id, join_set_closing,
1404 result_kind, is_paused
1405 FROM t_state {where_str} ORDER BY {order_col} {inner_order} LIMIT {limit}"
1406 );
1407
1408 let sql = if outer_order.is_empty() {
1409 inner_sql
1410 } else {
1411 format!("SELECT * FROM ({inner_sql}) AS sub ORDER BY {order_col} {outer_order}")
1412 };
1413
1414 let params_refs: Vec<&(dyn ToSql + Sync)> = qb
1415 .params
1416 .iter()
1417 .map(|p| p.as_ref() as &(dyn ToSql + Sync))
1418 .collect();
1419
1420 let rows = read_tx.query(&sql, ¶ms_refs).await?;
1421
1422 let mut vec = Vec::with_capacity(rows.len());
1423
1424 for row in rows {
1425 let unpack = || -> Result<ExecutionWithState, DbErrorGeneric> {
1427 let execution_id_str: String = get(&row, "execution_id")?;
1428 let execution_id = ExecutionId::from_str(&execution_id_str)
1429 .map_err(|err| consistency_db_err(err.to_string()))?;
1430
1431 let digest_bytes: Vec<u8> = get(&row, "component_id_input_digest")?;
1432 let digest = Digest::try_from(digest_bytes.as_slice()).map_err(|err| {
1433 consistency_db_err_src("cannot parse `component_id_input_digest`", Arc::from(err))
1434 })?;
1435 let component_digest = ComponentDigest(digest);
1436
1437 let component_type: String = get(&row, "component_type")?;
1438 let component_type = ComponentType::from_str(&component_type).map_err(|err| {
1439 consistency_db_err_src("cannot parse `component_type`", Arc::from(err))
1440 })?;
1441
1442 let deployment_id: String = get(&row, "deployment_id")?;
1443 let deployment_id =
1444 DeploymentId::from_str(&deployment_id).map_err(DbErrorGeneric::from)?;
1445
1446 let created_at: DateTime<Utc> = get(&row, "created_at")?;
1447 let first_scheduled_at: DateTime<Utc> = get(&row, "first_scheduled_at")?;
1448
1449 let result_kind: Option<Json<PendingStateFinishedResultKind>> =
1450 get(&row, "result_kind")?;
1451 let result_kind = result_kind.map(|it| it.0);
1452
1453 let is_paused: bool = get(&row, "is_paused")?;
1454
1455 let corresponding_version: i64 = get(&row, "corresponding_version")?;
1456 let corresponding_version = Version::try_from(corresponding_version)
1457 .map_err(|_| consistency_db_err("version must be non-negative"))?;
1458
1459 let executor_id_str: Option<String> = get(&row, "executor_id")?;
1460 let executor_id = executor_id_str
1461 .map(|id| ExecutorId::from_str(&id))
1462 .transpose()?;
1463
1464 let last_lock_version_raw: Option<i64> = get(&row, "last_lock_version")?;
1465 let last_lock_version = last_lock_version_raw
1466 .map(Version::try_from)
1467 .transpose()
1468 .map_err(|_| consistency_db_err("version must be non-negative"))?;
1469
1470 let run_id_str: Option<String> = get(&row, "run_id")?;
1471 let run_id = run_id_str.map(|id| RunId::from_str(&id)).transpose()?;
1472
1473 let join_set_id_str: Option<String> = get(&row, "join_set_id")?;
1474 let join_set_id = join_set_id_str
1475 .map(|id| JoinSetId::from_str(&id))
1476 .transpose()?;
1477
1478 let ffqn: String = get(&row, "ffqn")?;
1479 let ffqn = FunctionFqn::from_str(&ffqn).map_err(|parse_err| {
1480 error!("Error parsing ffqn - {parse_err:?}");
1481 consistency_db_err("invalid ffqn value in `t_state`")
1482 })?;
1483
1484 let combined_state_dto = CombinedStateDTO {
1485 execution_id,
1486 created_at,
1487 first_scheduled_at,
1488 component_digest,
1489 component_type,
1490 deployment_id,
1491 state: get(&row, "state")?,
1492 ffqn,
1493 pending_expires_finished: get(&row, "pending_expires_finished")?,
1494 executor_id,
1495 last_lock_version,
1496 run_id,
1497 join_set_id,
1498 join_set_closing: get(&row, "join_set_closing")?,
1499 result_kind,
1500 is_paused,
1501 };
1502
1503 let combined_state = CombinedState::new(combined_state_dto, corresponding_version)?;
1504
1505 Ok(combined_state.execution_with_state)
1506 };
1507
1508 match unpack() {
1509 Ok(execution) => vec.push(execution),
1510 Err(err) => {
1511 warn!("Skipping corrupted row in t_state: {err:?}");
1512 }
1513 }
1514 }
1515
1516 Ok(vec)
1517}
1518
1519async fn list_responses(
1520 tx: &Transaction<'_>,
1521 execution_id: &ExecutionId,
1522 pagination: Option<Pagination<u32>>,
1523) -> Result<Vec<ResponseWithCursor>, DbErrorRead> {
1524 let mut params: Vec<Box<dyn ToSql + Send + Sync>> = Vec::new();
1526 let mut add_param = |p: Box<dyn ToSql + Send + Sync>| {
1527 params.push(p);
1528 format!("${}", params.len())
1529 };
1530
1531 let p_execution_id = add_param(Box::new(execution_id.to_string()));
1533
1534 let mut sql = format!(
1535 "SELECT \
1536 r.id, r.created_at, r.join_set_id, r.delay_id, r.delay_success, r.child_execution_id, r.finished_version, l.json_value \
1537 FROM t_join_set_response r LEFT OUTER JOIN t_execution_log l ON r.child_execution_id = l.execution_id \
1538 WHERE \
1539 r.execution_id = {p_execution_id} \
1540 AND ( r.finished_version = l.version OR r.child_execution_id IS NULL )"
1541 );
1542
1543 let limit = match &pagination {
1545 Some(p @ (Pagination::NewerThan { cursor, .. } | Pagination::OlderThan { cursor, .. })) => {
1546 let p_cursor = add_param(Box::new(i64::from(*cursor)));
1548
1549 write!(sql, " AND r.id {} {}", p.rel(), p_cursor).unwrap();
1551
1552 Some(p.length())
1553 }
1554 None => None,
1555 };
1556
1557 sql.push_str(" ORDER BY r.id");
1559 let is_desc = pagination.as_ref().is_some_and(Pagination::is_desc);
1560 if is_desc {
1561 sql.push_str(" DESC");
1562 }
1563
1564 if let Some(limit) = limit {
1566 let p_limit = add_param(Box::new(i64::from(limit)));
1568 write!(sql, " LIMIT {p_limit}").unwrap();
1569 }
1570
1571 if is_desc {
1573 sql = format!("SELECT * FROM ({sql}) AS sub ORDER BY id ASC");
1574 }
1575
1576 let params_refs: Vec<&(dyn ToSql + Sync)> = params
1577 .iter()
1578 .map(|p| p.as_ref() as &(dyn ToSql + Sync))
1579 .collect();
1580
1581 let rows = tx
1582 .query(&sql, ¶ms_refs)
1583 .await
1584 .map_err(DbErrorRead::from)?;
1585
1586 let mut results = Vec::with_capacity(rows.len());
1587 for row in rows {
1588 results.push(parse_response_with_cursor(&row)?);
1589 }
1590
1591 Ok(results)
1592}
1593
1594async fn list_logs_tx(
1595 tx: &Transaction<'_>,
1596 execution_id: &ExecutionId,
1597 show_derived: bool,
1598 filter: &LogFilter,
1599 pagination: &Pagination<DateTime<Utc>>,
1600) -> Result<ListLogsResponse, DbErrorRead> {
1601 let mut param_index = 1;
1602 let exec_id_str = execution_id.to_string();
1603 let exec_id_filter = if show_derived {
1604 format!("execution_id LIKE ${param_index} || '%'")
1605 } else {
1606 format!("execution_id = ${param_index}")
1607 };
1608 let mut query = format!(
1609 "SELECT id, run_id, created_at, level, message, stream_type, payload, execution_id
1610 FROM t_log
1611 WHERE {exec_id_filter}"
1612 );
1613 let mut params: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = vec![&exec_id_str];
1614 param_index += 1;
1615
1616 let level_filter = if filter.should_show_logs() {
1618 let levels_str = if !filter.levels().is_empty() {
1619 filter
1620 .levels()
1621 .iter()
1622 .map(|lvl| (*lvl as u8).to_string())
1623 .collect::<Vec<_>>()
1624 .join(",")
1625 } else {
1626 LogLevel::iter()
1627 .map(|lvl| (lvl as u8).to_string())
1628 .collect::<Vec<_>>()
1629 .join(",")
1630 };
1631 Some(format!(" level IN ({levels_str})"))
1632 } else {
1633 None
1634 };
1635 let stream_filter = if filter.should_show_streams() {
1636 let streams_str = if !filter.stream_types().is_empty() {
1637 filter
1638 .stream_types()
1639 .iter()
1640 .map(|st| (*st as u8).to_string())
1641 .collect::<Vec<_>>()
1642 .join(",")
1643 } else {
1644 LogStreamType::iter()
1645 .map(|st| (st as u8).to_string())
1646 .collect::<Vec<_>>()
1647 .join(",")
1648 };
1649 Some(format!(" stream_type IN ({streams_str})"))
1650 } else {
1651 None
1652 };
1653 match (level_filter, stream_filter) {
1654 (Some(level_filter), Some(stream_filter)) => {
1655 write!(&mut query, " AND ({level_filter} OR {stream_filter})")
1656 .expect("writing to string");
1657 }
1658 (Some(level_filter), None) => {
1659 write!(&mut query, " AND {level_filter}").expect("writing to string");
1660 }
1661 (None, Some(stream_filter)) => {
1662 write!(&mut query, " AND {stream_filter}").expect("writing to string");
1663 }
1664 (None, None) => unreachable!("guarded by constructor"),
1665 }
1666
1667 write!(
1669 &mut query,
1670 " AND created_at {} ${param_index}",
1671 pagination.rel()
1672 )
1673 .expect("writing to string");
1674 let cursor_val = pagination.cursor();
1675 params.push(cursor_val);
1676 param_index += 1;
1677
1678 let dir = if pagination.is_desc() { "DESC" } else { "ASC" };
1680 write!(
1681 &mut query,
1682 " ORDER BY created_at {dir}, id {dir} LIMIT ${param_index}",
1683 )
1684 .expect("writing to string");
1685 let length_val: i64 = i64::from(pagination.length());
1686 params.push(&length_val);
1687
1688 let rows = tx.query(&query, ¶ms[..]).await?;
1689
1690 let mut items = Vec::with_capacity(rows.len());
1691
1692 for row in rows {
1693 let created_at: chrono::DateTime<chrono::Utc> = get(&row, "created_at")?;
1694 let run_id: String = get(&row, "run_id")?;
1695 let run_id = RunId::from_str(&run_id).map_err(|parse_err| {
1696 consistency_db_err_src(
1697 format!("cannot convert RunId {run_id}"),
1698 Arc::from(parse_err),
1699 )
1700 })?;
1701 let execution_id_str: String = get(&row, "execution_id")?;
1702 let execution_id = ExecutionId::from_str(&execution_id_str).map_err(|parse_err| {
1703 consistency_db_err_src(
1704 format!("cannot convert ExecutionId {execution_id_str}"),
1705 Arc::from(parse_err),
1706 )
1707 })?;
1708
1709 let level: Option<i32> = get(&row, "level")?;
1710 let message: Option<String> = get(&row, "message")?;
1711 let stream_type: Option<i32> = get(&row, "stream_type")?;
1712 let payload: Option<Vec<u8>> = get(&row, "payload")?;
1713
1714 let log_entry = match (level, message, stream_type, payload) {
1715 (Some(lvl), Some(msg), None, None) => {
1716 let map_err =
1717 |err| consistency_db_err_src(format!("cannot convert {lvl} to LogLevel"), err);
1718 LogEntry::Log {
1719 created_at,
1720 level: u8::try_from(lvl)
1721 .map(|lvl| LogLevel::try_from(lvl).map_err(|err| map_err(Arc::from(err))))
1722 .map_err(|err| map_err(Arc::from(err)))??,
1723 message: msg,
1724 }
1725 }
1726 (None, None, Some(stype), Some(pl)) => {
1727 let map_err = |err| {
1728 consistency_db_err_src(format!("cannot convert {stype} to LogStreamType"), err)
1729 };
1730 LogEntry::Stream {
1731 created_at,
1732 stream_type: u8::try_from(stype)
1733 .map(|stype| {
1734 LogStreamType::try_from(stype).map_err(|err| map_err(Arc::from(err)))
1735 })
1736 .map_err(|err| map_err(Arc::from(err)))??,
1737 payload: pl,
1738 }
1739 }
1740 _ => {
1741 return Err(consistency_db_err("invalid t_log row".to_string()).into());
1742 }
1743 };
1744
1745 items.push(LogEntryRow {
1746 cursor: created_at,
1747 run_id,
1748 log_entry,
1749 execution_id,
1750 });
1751 }
1752
1753 Ok(ListLogsResponse {
1754 next_page: items
1755 .last()
1756 .map(|item| Pagination::NewerThan {
1757 length: pagination.length(),
1758 cursor: item.cursor,
1759 including_cursor: false,
1760 })
1761 .unwrap_or(if pagination.is_asc() {
1762 *pagination } else {
1764 Pagination::NewerThan {
1766 length: pagination.length(),
1767 cursor: DateTime::<Utc>::UNIX_EPOCH,
1768 including_cursor: false,
1769 }
1770 }),
1771 prev_page: match items.first() {
1772 Some(item) => Some(Pagination::OlderThan {
1773 length: pagination.length(),
1774 cursor: item.cursor,
1775 including_cursor: false,
1776 }),
1777 None if pagination.is_asc() && pagination.cursor() > &DateTime::<Utc>::UNIX_EPOCH => {
1778 Some(pagination.invert())
1780 }
1781 None => None,
1782 },
1783 items,
1784 })
1785}
1786
1787async fn list_deployment_states(
1788 tx: &Transaction<'_>,
1789 current_time: DateTime<Utc>,
1790 pagination: Pagination<Option<DeploymentId>>,
1791 include_config_json: bool,
1792) -> Result<Vec<DeploymentState>, DbErrorRead> {
1793 let mut params: Vec<Box<dyn ToSql + Send + Sync>> = Vec::new();
1795 let mut add_param = |p: Box<dyn ToSql + Send + Sync>| {
1796 params.push(p);
1797 format!("${}", params.len())
1798 };
1799
1800 let p_now = add_param(Box::new(current_time));
1802
1803 let config_json_col = if include_config_json {
1804 "d.config_json"
1805 } else {
1806 "NULL::TEXT AS config_json"
1807 };
1808
1809 let mut sql = format!(
1810 "
1811 SELECT
1812 d.deployment_id,
1813
1814 COUNT(*) FILTER (WHERE s.state = '{STATE_LOCKED}') AS locked,
1815
1816 COUNT(*) FILTER (
1817 WHERE s.state = '{STATE_PENDING_AT}'
1818 AND s.pending_expires_finished <= {p_now}
1819 ) AS pending,
1820
1821 COUNT(*) FILTER (
1822 WHERE s.state = '{STATE_PENDING_AT}'
1823 AND s.pending_expires_finished > {p_now}
1824 ) AS scheduled,
1825
1826 COUNT(*) FILTER (WHERE s.state = '{STATE_BLOCKED_BY_JOIN_SET}') AS blocked,
1827
1828 COUNT(*) FILTER (WHERE s.state = '{STATE_FINISHED}') AS finished,
1829
1830 {config_json_col},
1831 d.created_at,
1832 d.last_active_at,
1833 d.status
1834 FROM t_deployment d
1835 LEFT JOIN t_state s ON s.deployment_id = d.deployment_id"
1836 );
1837
1838 if let Some(cursor) = pagination.cursor() {
1840 let p_cursor = add_param(Box::new(cursor.to_string()));
1841 write!(
1842 sql,
1843 " WHERE d.deployment_id {rel} {p_cursor}",
1844 rel = pagination.rel()
1845 )
1846 .expect("writing to string");
1847 }
1848
1849 let (inner_order, outer_order) = if pagination.is_desc() {
1853 ("DESC", "")
1854 } else {
1855 ("ASC", "DESC")
1856 };
1857
1858 write!(
1859 sql,
1860 " GROUP BY d.deployment_id, d.config_json, d.created_at, d.last_active_at, d.status ORDER BY d.deployment_id {inner_order} LIMIT {}",
1861 pagination.length()
1862 )
1863 .expect("writing to string");
1864
1865 let final_sql = if outer_order.is_empty() {
1866 sql
1867 } else {
1868 format!("SELECT * FROM ({sql}) AS sub ORDER BY deployment_id {outer_order}")
1869 };
1870
1871 let params_refs: Vec<&(dyn ToSql + Sync)> = params
1872 .iter()
1873 .map(|p| p.as_ref() as &(dyn ToSql + Sync))
1874 .collect();
1875
1876 let rows = tx
1877 .query(&final_sql, ¶ms_refs)
1878 .await
1879 .map_err(DbErrorRead::from)?;
1880
1881 let mut result = Vec::with_capacity(rows.len());
1882 for row in rows {
1883 let deployment_id: String = get(&row, "deployment_id")?;
1884 let status_str: String = get::<String, _>(&row, "status")?;
1885 let status = status_str
1886 .parse::<DeploymentStatus>()
1887 .map_err(|e| consistency_db_err(format!("unknown deployment status: {e}")))?;
1888 result.push(DeploymentState {
1889 deployment_id: DeploymentId::from_str(&deployment_id).map_err(DbErrorGeneric::from)?,
1890 locked: u32::try_from(get::<i64, _>(&row, "locked")?).expect("count is never negative"),
1891 pending: u32::try_from(get::<i64, _>(&row, "pending")?)
1892 .expect("count is never negative"),
1893 scheduled: u32::try_from(get::<i64, _>(&row, "scheduled")?)
1894 .expect("count is never negative"),
1895 blocked: u32::try_from(get::<i64, _>(&row, "blocked")?)
1896 .expect("count is never negative"),
1897 finished: u32::try_from(get::<i64, _>(&row, "finished")?)
1898 .expect("count is never negative"),
1899 config_json: get::<Option<String>, _>(&row, "config_json")?,
1900 created_at: get::<DateTime<Utc>, _>(&row, "created_at")?,
1901 last_active_at: get::<Option<DateTime<Utc>>, _>(&row, "last_active_at")?,
1902 status,
1903 });
1904 }
1905
1906 Ok(result)
1907}
1908
1909fn parse_response_with_cursor(
1910 row: &tokio_postgres::Row,
1911) -> Result<ResponseWithCursor, DbErrorRead> {
1912 let id = u32::try_from(get::<i64, _>(row, "id")?)
1914 .map_err(|_| consistency_db_err("id must not be negative"))?;
1915
1916 let created_at: DateTime<Utc> = get(row, "created_at")?;
1917 let join_set_id_str: String = get(row, "join_set_id")?;
1918 let join_set_id = JoinSetId::from_str(&join_set_id_str).map_err(DbErrorGeneric::from)?;
1919
1920 let delay_id: Option<String> = get(row, "delay_id")?;
1922 let delay_id = delay_id
1923 .map(|id| DelayId::from_str(&id))
1924 .transpose()
1925 .map_err(DbErrorGeneric::from)?;
1926 let delay_success: Option<bool> = get(row, "delay_success")?;
1927 let child_execution_id: Option<String> = get(row, "child_execution_id")?;
1928 let child_execution_id = child_execution_id
1929 .map(|id| ExecutionIdDerived::from_str(&id))
1930 .transpose()
1931 .map_err(DbErrorGeneric::from)?;
1932 let finished_version = get::<Option<i64>, _>(row, "finished_version")?
1933 .map(Version::try_from)
1934 .transpose()
1935 .map_err(|_| consistency_db_err("version must be non-negative"))?;
1936 let json_value: Option<Json<ExecutionRequest>> = get(row, "json_value")?;
1937 let json_value = json_value.map(|it| it.0);
1938
1939 let event = match (
1940 delay_id,
1941 delay_success,
1942 child_execution_id,
1943 finished_version,
1944 json_value,
1945 ) {
1946 (Some(delay_id), Some(delay_success), None, None, None) => JoinSetResponse::DelayFinished {
1947 delay_id,
1948 result: delay_success.then_some(()).ok_or(()),
1949 },
1950 (None, None, Some(child_execution_id), Some(finished_version), Some(json_val)) => {
1951 if let ExecutionRequest::Finished { retval: result, .. } = json_val {
1952 JoinSetResponse::ChildExecutionFinished {
1953 child_execution_id,
1954 finished_version,
1955 result,
1956 }
1957 } else {
1958 error!("Joined log entry must be 'Finished'");
1959 return Err(consistency_db_err("joined log entry must be 'Finished'").into());
1960 }
1961 }
1962 (delay, delay_success, child, finished, result) => {
1963 error!(
1964 "Invalid row in t_join_set_response {id} - {delay:?} {delay_success:?} {child:?} {finished:?} {result:?}",
1965 );
1966 return Err(consistency_db_err("invalid row in t_join_set_response").into());
1967 }
1968 };
1969
1970 Ok(ResponseWithCursor {
1971 cursor: ResponseCursor(id),
1972 event: JoinSetResponseEventOuter {
1973 event: JoinSetResponseEvent { join_set_id, event },
1974 created_at,
1975 },
1976 })
1977}
1978
1979#[instrument(level = Level::TRACE, skip_all, fields(%execution_id, %run_id, %executor_id))]
1980#[expect(clippy::too_many_arguments)]
1981async fn lock_single_execution(
1982 tx: &Transaction<'_>,
1983 created_at: DateTime<Utc>,
1984 component_id: &ComponentId,
1985 deployment_id: DeploymentId,
1986 execution_id: &ExecutionId,
1987 run_id: RunId,
1988 appending_version: &Version,
1989 executor_id: ExecutorId,
1990 lock_expires_at: DateTime<Utc>,
1991 retry_config: ComponentRetryConfig,
1992) -> Result<LockedExecution, DbErrorWrite> {
1993 trace!("lock_single_execution");
1994
1995 let combined_state = get_combined_state(tx, execution_id).await?;
1997 combined_state
1998 .execution_with_state
1999 .pending_state
2000 .can_append_lock(created_at, executor_id, run_id, lock_expires_at)?;
2001 let expected_version = combined_state.get_next_version_assert_not_finished();
2002 check_expected_next_and_appending_version(&expected_version, appending_version)?;
2003
2004 let locked_event = Locked {
2006 component_id: component_id.clone(),
2007 deployment_id,
2008 executor_id,
2009 lock_expires_at,
2010 run_id,
2011 retry_config,
2012 };
2013 let event = ExecutionRequest::Locked(locked_event.clone());
2014
2015 let event = Json(event);
2016
2017 tx.execute(
2019 "INSERT INTO t_execution_log \
2020 (execution_id, created_at, json_value, version, variant) \
2021 VALUES ($1, $2, $3, $4, $5)",
2022 &[
2023 &execution_id.to_string(),
2024 &created_at,
2025 &event,
2026 &i64::from(appending_version.0),
2027 &event.0.variant(),
2028 ],
2029 )
2030 .await
2031 .map_err(|err| {
2032 DbErrorWrite::NonRetriable(DbErrorWriteNonRetriable::IllegalState {
2033 reason: "cannot lock".into(),
2034 context: SpanTrace::capture(),
2035 source: Some(Arc::new(err)),
2036 loc: Location::caller(),
2037 })
2038 })?;
2039
2040 let responses = list_responses(tx, execution_id, None).await?;
2041 trace!("Responses: {responses:?}");
2042
2043 let intermittent_event_count = update_state_locked_get_intermittent_event_count(
2045 tx,
2046 execution_id,
2047 deployment_id,
2048 &component_id.component_digest,
2049 executor_id,
2050 run_id,
2051 lock_expires_at,
2052 appending_version,
2053 retry_config,
2054 )
2055 .await?;
2056
2057 let rows = tx
2060 .query(
2061 "SELECT json_value, version FROM t_execution_log WHERE \
2062 execution_id = $1 AND (variant = $2 OR variant = $3) \
2063 ORDER BY version",
2064 &[
2065 &execution_id.to_string(),
2066 &DUMMY_CREATED.variant(),
2067 &DUMMY_HISTORY_EVENT.variant(),
2068 ],
2069 )
2070 .await
2071 .map_err(DbErrorGeneric::from)?;
2072
2073 let mut events: VecDeque<ExecutionEvent> = VecDeque::new();
2074
2075 for row in rows {
2076 let event: Json<ExecutionRequest> = get(&row, "json_value")?;
2077 let event = event.0;
2078
2079 let version: i64 = get(&row, "version")?;
2080 let version = Version::try_from(version)
2081 .map_err(|_| consistency_db_err("version must be non-negative"))?;
2082
2083 events.push_back(ExecutionEvent {
2084 created_at: DateTime::from_timestamp_nanos(0), event,
2086 backtrace_id: None,
2087 version,
2088 });
2089 }
2090
2091 let Some(ExecutionRequest::Created {
2093 ffqn,
2094 params,
2095 parent,
2096 metadata,
2097 ..
2098 }) = events.pop_front().map(|outer| outer.event)
2099 else {
2100 error!("Execution log must contain at least `Created` event");
2101 return Err(consistency_db_err("execution log must contain `Created` event").into());
2102 };
2103
2104 let mut event_history = Vec::new();
2106 for ExecutionEvent { event, version, .. } in events {
2107 if let ExecutionRequest::HistoryEvent { event } = event {
2108 event_history.push((event, version));
2109 } else {
2110 error!("Rows can only contain `Created` and `HistoryEvent` event kinds");
2111 return Err(consistency_db_err(
2112 "rows can only contain `Created` and `HistoryEvent` event kinds",
2113 )
2114 .into());
2115 }
2116 }
2117
2118 Ok(LockedExecution {
2119 execution_id: execution_id.clone(),
2120 metadata,
2121 next_version: appending_version.increment(),
2122 ffqn,
2123 params,
2124 event_history,
2125 responses,
2126 parent,
2127 intermittent_event_count,
2128 locked_event,
2129 })
2130}
2131
2132async fn count_join_next(
2133 tx: &Transaction<'_>,
2134 execution_id: &ExecutionId,
2135 join_set_id: &JoinSetId,
2136) -> Result<u32, DbErrorRead> {
2137 let row = tx
2138 .query_one(
2139 "SELECT COUNT(*) as count FROM t_execution_log WHERE execution_id = $1 AND join_set_id = $2 \
2140 AND history_event_type = $3",
2141 &[
2142 &execution_id.to_string(),
2143 &join_set_id.to_string(),
2144 &HISTORY_EVENT_TYPE_JOIN_NEXT,
2145 ],
2146 )
2147 .await
2148 .map_err(DbErrorRead::from)?;
2149
2150 let count = u32::try_from(get::<i64, _>(&row, "count")?).expect("COUNT cannot be negative");
2151 Ok(count)
2152}
2153
2154async fn nth_response(
2155 tx: &Transaction<'_>,
2156 execution_id: &ExecutionId,
2157 join_set_id: &JoinSetId,
2158 skip_rows: u32,
2159) -> Result<Option<ResponseWithCursor>, DbErrorRead> {
2160 let row = tx
2161 .query_opt(
2162 "SELECT r.id, r.created_at, r.join_set_id, \
2163 r.delay_id, r.delay_success, \
2164 r.child_execution_id, r.finished_version, l.json_value \
2165 FROM t_join_set_response r LEFT OUTER JOIN t_execution_log l ON r.child_execution_id = l.execution_id \
2166 WHERE \
2167 r.execution_id = $1 AND r.join_set_id = $2 AND \
2168 ( \
2169 r.finished_version = l.version \
2170 OR \
2171 r.child_execution_id IS NULL \
2172 ) \
2173 ORDER BY id \
2174 LIMIT 1 OFFSET $3",
2175 &[
2176 &execution_id.to_string(),
2177 &join_set_id.to_string(),
2178 &i64::from(skip_rows),
2179 ]
2180 )
2181 .await
2182 .map_err(DbErrorRead::from)?;
2183
2184 match row {
2185 Some(r) => Ok(Some(parse_response_with_cursor(&r)?)),
2186 None => Ok(None),
2187 }
2188}
2189
2190#[instrument(level = Level::TRACE, skip_all, fields(%execution_id))]
2191async fn append(
2192 tx: &Transaction<'_>,
2193 execution_id: &ExecutionId,
2194 req: AppendRequest,
2195 appending_version: Version,
2196) -> Result<(AppendResponse, AppendNotifier), DbErrorWrite> {
2197 if matches!(req.event, ExecutionRequest::Created { .. }) {
2198 return Err(DbErrorWrite::NonRetriable(
2199 DbErrorWriteNonRetriable::ValidationFailed(
2200 "cannot append `Created` event - use `create` instead".into(),
2201 ),
2202 ));
2203 }
2204
2205 if let AppendRequest {
2206 event:
2207 ExecutionRequest::Locked(Locked {
2208 component_id,
2209 deployment_id,
2210 executor_id,
2211 run_id,
2212 lock_expires_at,
2213 retry_config,
2214 }),
2215 created_at,
2216 } = req
2217 {
2218 return lock_single_execution(
2219 tx,
2220 created_at,
2221 &component_id,
2222 deployment_id,
2223 execution_id,
2224 run_id,
2225 &appending_version,
2226 executor_id,
2227 lock_expires_at,
2228 retry_config,
2229 )
2230 .await
2231 .map(|locked_execution| (locked_execution.next_version, AppendNotifier::default()));
2232 }
2233
2234 let combined_state = get_combined_state(tx, execution_id).await?;
2235 if combined_state
2236 .execution_with_state
2237 .pending_state
2238 .is_finished()
2239 {
2240 debug!("Execution is already finished");
2241 return Err(DbErrorWrite::NonRetriable(
2242 DbErrorWriteNonRetriable::AlreadyFinished,
2243 ));
2244 }
2245
2246 check_expected_next_and_appending_version(
2247 &combined_state.get_next_version_assert_not_finished(),
2248 &appending_version,
2249 )?;
2250
2251 let event = Json(req.event);
2252
2253 tx.execute(
2255 "INSERT INTO t_execution_log (execution_id, created_at, json_value, version, variant, join_set_id) \
2256 VALUES ($1, $2, $3, $4, $5, $6)",
2257 &[
2258 &execution_id.to_string(),
2259 &req.created_at,
2260 &event,
2261 &i64::from(appending_version.0),
2262 &event.0.variant(),
2263 &event.0.join_set_id().map(std::string::ToString::to_string),
2264 ],
2265 )
2266 .await?;
2267
2268 match &event.0 {
2270 ExecutionRequest::Created { .. } => {
2271 unreachable!("handled in the caller")
2272 }
2273
2274 ExecutionRequest::Locked { .. } => {
2275 unreachable!("handled above")
2276 }
2277
2278 ExecutionRequest::TemporarilyFailed {
2279 backoff_expires_at, ..
2280 }
2281 | ExecutionRequest::TemporarilyTimedOut {
2282 backoff_expires_at, ..
2283 } => {
2284 let (next_version, notifier) = update_state_pending_after_event_appended(
2285 tx,
2286 execution_id,
2287 &appending_version,
2288 *backoff_expires_at,
2289 true, combined_state.execution_with_state.component_digest,
2291 )
2292 .await?;
2293 return Ok((next_version, notifier));
2294 }
2295
2296 ExecutionRequest::Unlocked {
2297 backoff_expires_at, ..
2298 } => {
2299 let (next_version, notifier) = update_state_pending_after_event_appended(
2300 tx,
2301 execution_id,
2302 &appending_version,
2303 *backoff_expires_at,
2304 false, combined_state.execution_with_state.component_digest,
2306 )
2307 .await?;
2308 return Ok((next_version, notifier));
2309 }
2310
2311 ExecutionRequest::Paused => {
2312 match &combined_state.execution_with_state.pending_state {
2313 PendingState::Finished { .. } => {
2314 unreachable!("handled above");
2315 }
2316 PendingState::Paused(..) => {
2317 return Err(DbErrorWriteNonRetriable::IllegalState {
2318 reason: "cannot pause, execution is already paused".into(),
2319 context: SpanTrace::capture(),
2320 source: None,
2321 loc: Location::caller(),
2322 }
2323 .into());
2324 }
2325 _ => {}
2326 }
2327 let next_version =
2328 update_state_paused(tx, execution_id, &appending_version, true).await?;
2329 return Ok((next_version, AppendNotifier::default()));
2330 }
2331
2332 ExecutionRequest::Unpaused => {
2333 if !combined_state
2334 .execution_with_state
2335 .pending_state
2336 .is_paused()
2337 {
2338 return Err(DbErrorWriteNonRetriable::IllegalState {
2339 reason: "cannot unpause, execution is not paused".into(),
2340 context: SpanTrace::capture(),
2341 source: None,
2342 loc: Location::caller(),
2343 }
2344 .into());
2345 }
2346 let next_version =
2347 update_state_paused(tx, execution_id, &appending_version, false).await?;
2348 return Ok((next_version, AppendNotifier::default()));
2349 }
2350
2351 ExecutionRequest::Finished { retval, .. } => {
2352 update_state_finished(
2353 tx,
2354 execution_id,
2355 &appending_version,
2356 req.created_at,
2357 PendingStateFinishedResultKind::from(retval),
2358 )
2359 .await?;
2360 return Ok((
2361 appending_version,
2362 AppendNotifier {
2363 pending_at: None,
2364 execution_finished: Some(NotifierExecutionFinished {
2365 execution_id: execution_id.clone(),
2366 retval: retval.clone(),
2367 }),
2368 response: None,
2369 },
2370 ));
2371 }
2372
2373 ExecutionRequest::HistoryEvent {
2374 event:
2375 HistoryEvent::JoinSetCreate { .. }
2376 | HistoryEvent::JoinSetRequest {
2377 request: JoinSetRequest::ChildExecutionRequest { .. },
2378 ..
2379 }
2380 | HistoryEvent::Persist { .. }
2381 | HistoryEvent::Schedule { .. }
2382 | HistoryEvent::Stub { .. }
2383 | HistoryEvent::JoinNextTooMany { .. }
2384 | HistoryEvent::JoinNextTry { .. },
2385 } => {
2386 return Ok((
2387 bump_state_next_version(tx, execution_id, &appending_version, None).await?,
2388 AppendNotifier::default(),
2389 ));
2390 }
2391
2392 ExecutionRequest::HistoryEvent {
2393 event:
2394 HistoryEvent::JoinSetRequest {
2395 join_set_id,
2396 request:
2397 JoinSetRequest::DelayRequest {
2398 delay_id,
2399 expires_at,
2400 ..
2401 },
2402 },
2403 } => {
2404 return Ok((
2405 bump_state_next_version(
2406 tx,
2407 execution_id,
2408 &appending_version,
2409 Some(DelayReq {
2410 join_set_id: join_set_id.clone(),
2411 delay_id: delay_id.clone(),
2412 expires_at: *expires_at,
2413 }),
2414 )
2415 .await?,
2416 AppendNotifier::default(),
2417 ));
2418 }
2419
2420 ExecutionRequest::HistoryEvent {
2421 event:
2422 HistoryEvent::JoinNext {
2423 join_set_id,
2424 run_expires_at,
2425 closing,
2426 requested_ffqn: _,
2427 },
2428 } => {
2429 let join_next_count = count_join_next(tx, execution_id, join_set_id).await?;
2431
2432 let nth_response =
2434 nth_response(tx, execution_id, join_set_id, join_next_count - 1).await?;
2435
2436 trace!("join_next_count: {join_next_count}, nth_response: {nth_response:?}");
2437 assert!(join_next_count > 0);
2438
2439 if let Some(ResponseWithCursor {
2440 event:
2441 JoinSetResponseEventOuter {
2442 created_at: nth_created_at,
2443 ..
2444 },
2445 cursor: _,
2446 }) = nth_response
2447 {
2448 let scheduled_at = std::cmp::max(*run_expires_at, nth_created_at);
2449 let (next_version, notifier) = update_state_pending_after_event_appended(
2450 tx,
2451 execution_id,
2452 &appending_version,
2453 scheduled_at,
2454 false, combined_state.execution_with_state.component_digest,
2456 )
2457 .await?;
2458 return Ok((next_version, notifier));
2459 }
2460
2461 return Ok((
2462 update_state_blocked(
2463 tx,
2464 execution_id,
2465 &appending_version,
2466 join_set_id,
2467 *run_expires_at,
2468 *closing,
2469 )
2470 .await?,
2471 AppendNotifier::default(),
2472 ));
2473 }
2474 }
2475}
2476
2477async fn append_response(
2478 tx: &Transaction<'_>,
2479 execution_id: &ExecutionId,
2480 event: JoinSetResponseEventOuter,
2481) -> Result<AppendNotifier, DbErrorWrite> {
2482 let join_set_id = &event.event.join_set_id;
2483
2484 let (delay_id, delay_success) = match &event.event.event {
2485 JoinSetResponse::DelayFinished { delay_id, result } => {
2486 (Some(delay_id.to_string()), Some(result.is_ok()))
2487 }
2488 JoinSetResponse::ChildExecutionFinished { .. } => (None, None),
2489 };
2490
2491 let (child_execution_id, finished_version) = match &event.event.event {
2492 JoinSetResponse::ChildExecutionFinished {
2493 child_execution_id,
2494 finished_version,
2495 result: _,
2496 } => (
2497 Some(child_execution_id.to_string()),
2498 Some(i64::from(finished_version.0)),
2499 ),
2500 JoinSetResponse::DelayFinished { .. } => (None, None),
2501 };
2502
2503 let row = tx.query_one(
2504 "INSERT INTO t_join_set_response (execution_id, created_at, join_set_id, delay_id, delay_success, child_execution_id, finished_version) \
2505 VALUES ($1, $2, $3, $4, $5, $6, $7) RETURNING id",
2506 &[
2507 &execution_id.to_string(),
2508 &event.created_at,
2509 &join_set_id.to_string(),
2510 &delay_id,
2511 &delay_success,
2512 &child_execution_id,
2513 &finished_version,
2514 ]
2515 ).await?;
2516 let cursor = ResponseCursor(
2517 u32::try_from(get::<i64, _>(&row, 0)?)
2518 .map_err(|_| consistency_db_err("t_join_set_response.id must not be negative"))?,
2519 );
2520 let combined_state = get_combined_state(tx, execution_id).await?;
2522 debug!("previous_pending_state: {combined_state:?}");
2523
2524 let mut notifier = if let PendingStateMergedPause::BlockedByJoinSet {
2525 state:
2526 PendingStateBlockedByJoinSet {
2527 join_set_id: found_join_set_id,
2528 lock_expires_at, closing: _,
2530 },
2531 paused: _,
2532 } =
2533 PendingStateMergedPause::from(combined_state.execution_with_state.pending_state)
2534 && *join_set_id == found_join_set_id
2535 {
2536 let scheduled_at = std::cmp::max(lock_expires_at, event.created_at);
2537 update_state_pending_after_response_appended(
2539 tx,
2540 execution_id,
2541 scheduled_at,
2542 combined_state.execution_with_state.component_digest,
2543 )
2544 .await?
2545 } else {
2546 AppendNotifier::default()
2547 };
2548
2549 if let JoinSetResponseEvent {
2550 join_set_id,
2551 event:
2552 JoinSetResponse::DelayFinished {
2553 delay_id,
2554 result: _,
2555 },
2556 } = &event.event
2557 {
2558 debug!(%join_set_id, %delay_id, "Deleting from `t_delay`");
2559 tx.execute(
2560 "DELETE FROM t_delay WHERE execution_id = $1 AND join_set_id = $2 AND delay_id = $3",
2561 &[
2562 &execution_id.to_string(),
2563 &join_set_id.to_string(),
2564 &delay_id.to_string(),
2565 ],
2566 )
2567 .await?;
2568 }
2569
2570 notifier.response = Some((execution_id.clone(), ResponseWithCursor { cursor, event }));
2571 Ok(notifier)
2572}
2573
2574async fn append_backtrace(
2575 tx: &Transaction<'_>,
2576 backtrace_info: &BacktraceInfo,
2577) -> Result<(), DbErrorWrite> {
2578 let backtrace_hash = backtrace_info.wasm_backtrace.hash();
2580
2581 tx.execute(
2583 "INSERT INTO t_wasm_backtrace (backtrace_hash, wasm_backtrace) \
2584 VALUES ($1, $2) \
2585 ON CONFLICT (backtrace_hash) DO NOTHING",
2586 &[
2587 &backtrace_hash.as_slice(),
2588 &Json(&backtrace_info.wasm_backtrace),
2589 ],
2590 )
2591 .await?;
2592
2593 tx.execute(
2595 "INSERT INTO t_execution_backtrace \
2596 (execution_id, component_id, version_min_including, version_max_excluding, backtrace_hash) \
2597 VALUES ($1, $2, $3, $4, $5)",
2598 &[
2599 &backtrace_info.execution_id.to_string(),
2600 &Json(&backtrace_info.component_id),
2601 &i64::from(backtrace_info.version_min_including.0),
2602 &i64::from(backtrace_info.version_max_excluding.0),
2603 &backtrace_hash.as_slice(),
2604 ],
2605 )
2606 .await?;
2607
2608 Ok(())
2609}
2610
2611async fn append_log(tx: &Transaction<'_>, row: &LogInfoAppendRow) -> Result<(), DbErrorWrite> {
2612 let (level, message, stream_type, payload, created_at) = match &row.log_entry {
2613 LogEntry::Log {
2614 created_at,
2615 level,
2616 message,
2617 } => (
2618 Some(*level as i32),
2619 Some(message.as_str()),
2620 None::<i32>,
2621 None::<&[u8]>,
2622 created_at,
2623 ),
2624 LogEntry::Stream {
2625 created_at,
2626 payload,
2627 stream_type,
2628 } => (
2629 None::<i32>,
2630 None::<&str>,
2631 Some(*stream_type as i32),
2632 Some(payload.as_slice()),
2633 created_at,
2634 ),
2635 };
2636
2637 tx.execute(
2638 "INSERT INTO t_log (
2639 execution_id,
2640 run_id,
2641 created_at,
2642 level,
2643 message,
2644 stream_type,
2645 payload
2646 ) VALUES ($1, $2, $3, $4, $5, $6, $7)",
2647 &[
2648 &row.execution_id.to_string(),
2649 &row.run_id.to_string(),
2650 &created_at,
2651 &level,
2652 &message,
2653 &stream_type,
2654 &payload,
2655 ],
2656 )
2657 .await?;
2658
2659 Ok(())
2660}
2661
2662async fn get_execution_log(
2663 tx: &Transaction<'_>,
2664 execution_id: &ExecutionId,
2665) -> Result<concepts::storage::ExecutionLog, DbErrorRead> {
2666 let rows = tx
2667 .query(
2668 "SELECT created_at, json_value, version FROM t_execution_log WHERE \
2669 execution_id = $1 ORDER BY version",
2670 &[&execution_id.to_string()],
2671 )
2672 .await
2673 .map_err(DbErrorRead::from)?;
2674
2675 if rows.is_empty() {
2676 return Err(DbErrorRead::NotFound);
2677 }
2678
2679 let mut events = Vec::with_capacity(rows.len());
2680 for row in rows {
2681 let created_at: DateTime<Utc> = get(&row, "created_at")?;
2682 let event: Json<ExecutionRequest> = get(&row, "json_value")?;
2683 let event = event.0;
2684 let version: i64 = get(&row, "version")?;
2685 let version = Version::try_from(version)
2686 .map_err(|_| consistency_db_err("version must be non-negative"))?;
2687
2688 events.push(ExecutionEvent {
2689 created_at,
2690 event,
2691 backtrace_id: None,
2692 version,
2693 });
2694 }
2695
2696 let combined_state = get_combined_state(tx, execution_id).await?;
2697 let responses = list_responses(tx, execution_id, None).await?;
2698
2699 Ok(concepts::storage::ExecutionLog {
2700 execution_id: execution_id.clone(),
2701 events,
2702 responses,
2703 next_version: combined_state.get_next_version_or_finished(),
2704 pending_state: combined_state.execution_with_state.pending_state,
2705 component_digest: combined_state.execution_with_state.component_digest,
2706 component_type: combined_state.execution_with_state.component_type,
2707 deployment_id: combined_state.execution_with_state.deployment_id,
2708 })
2709}
2710
2711async fn get_max_version(
2712 tx: &Transaction<'_>,
2713 execution_id: &ExecutionId,
2714) -> Result<Version, DbErrorRead> {
2715 let row = tx
2716 .query_one(
2717 "SELECT MAX(version) as version FROM t_execution_log WHERE execution_id = $1",
2718 &[&execution_id.to_string()],
2719 )
2720 .await?;
2721 let max_version: i64 = get(&row, "version")?;
2722 let max_version = Version::try_from(max_version)
2723 .map_err(|_| consistency_db_err("version must be non-negative"))?;
2724 Ok(max_version)
2725}
2726
2727async fn get_max_response_cursor(
2728 tx: &Transaction<'_>,
2729 execution_id: &ExecutionId,
2730) -> Result<ResponseCursor, DbErrorRead> {
2731 let row = tx
2732 .query_one(
2733 "SELECT MAX(id) as id FROM t_join_set_response WHERE execution_id = $1",
2734 &[&execution_id.to_string()],
2735 )
2736 .await?;
2737 let max_cursor = get::<Option<i64>, _>(&row, "id")?.unwrap_or_default();
2739 let max_cursor = ResponseCursor(
2740 u32::try_from(max_cursor).map_err(|_| consistency_db_err("id must not be negative"))?,
2741 );
2742 Ok(max_cursor)
2743}
2744
2745async fn list_execution_events(
2746 tx: &Transaction<'_>,
2747 execution_id: &ExecutionId,
2748 pagination: Pagination<VersionType>,
2749 include_backtrace_id: bool,
2750) -> Result<Vec<ExecutionEvent>, DbErrorRead> {
2751 let mut params: Vec<Box<dyn ToSql + Send + Sync>> = Vec::new();
2752 let mut add_param = |p: Box<dyn ToSql + Send + Sync>| {
2753 params.push(p);
2754 format!("${}", params.len())
2755 };
2756
2757 let p_execution_id = add_param(Box::new(execution_id.to_string()));
2758
2759 let (cursor, length, rel, is_desc) = match &pagination {
2760 Pagination::NewerThan {
2761 cursor,
2762 length,
2763 including_cursor,
2764 } => (
2765 *cursor,
2766 *length,
2767 if *including_cursor { ">=" } else { ">" },
2768 false,
2769 ),
2770 Pagination::OlderThan {
2771 cursor,
2772 length,
2773 including_cursor,
2774 } => (
2775 *cursor,
2776 *length,
2777 if *including_cursor { "<=" } else { "<" },
2778 true,
2779 ),
2780 };
2781 let p_cursor = add_param(Box::new(i64::from(cursor)));
2782 let p_limit = add_param(Box::new(i64::from(length)));
2783
2784 let base_select = if include_backtrace_id {
2785 format!(
2786 "SELECT
2787 log.created_at,
2788 log.json_value,
2789 log.version,
2790 bt.version_min_including AS backtrace_id
2791 FROM
2792 t_execution_log AS log
2793 LEFT OUTER JOIN
2794 t_execution_backtrace AS bt ON log.execution_id = bt.execution_id
2795 AND log.version >= bt.version_min_including
2796 AND log.version < bt.version_max_excluding
2797 WHERE
2798 log.execution_id = {p_execution_id}
2799 AND log.version {rel} {p_cursor}"
2800 )
2801 } else {
2802 format!(
2803 "SELECT
2804 created_at, json_value, NULL::BIGINT as backtrace_id, version
2805 FROM t_execution_log WHERE
2806 execution_id = {p_execution_id} AND version {rel} {p_cursor}"
2807 )
2808 };
2809
2810 let order = if is_desc { "DESC" } else { "ASC" };
2811 let mut sql = format!("{base_select} ORDER BY version {order} LIMIT {p_limit}");
2812
2813 if is_desc {
2815 sql = format!("SELECT * FROM ({sql}) AS sub ORDER BY version ASC");
2816 }
2817
2818 let params_refs: Vec<&(dyn ToSql + Sync)> = params
2819 .iter()
2820 .map(|p| p.as_ref() as &(dyn ToSql + Sync))
2821 .collect();
2822
2823 let rows = tx
2824 .query(&sql, ¶ms_refs)
2825 .await
2826 .map_err(DbErrorRead::from)?;
2827
2828 let mut events = Vec::with_capacity(rows.len());
2829 for row in rows {
2830 let created_at: DateTime<Utc> = get(&row, "created_at")?;
2831 let backtrace_id = get::<Option<i64>, _>(&row, "backtrace_id")?
2832 .map(Version::try_from)
2833 .transpose()
2834 .map_err(|_| consistency_db_err("version must be non-negative"))?;
2835
2836 let version = get::<i64, _>(&row, "version")?;
2837 let version = Version::new(
2838 VersionType::try_from(version)
2839 .map_err(|_| consistency_db_err("version must be non-negative"))?,
2840 );
2841 let event_req: Json<ExecutionRequest> = get(&row, "json_value")?;
2842 let event_req = event_req.0;
2843
2844 events.push(ExecutionEvent {
2845 created_at,
2846 event: event_req,
2847 backtrace_id,
2848 version,
2849 });
2850 }
2851 Ok(events)
2852}
2853
2854async fn get_execution_event(
2855 tx: &Transaction<'_>,
2856 execution_id: &ExecutionId,
2857 version: VersionType,
2858) -> Result<ExecutionEvent, DbErrorRead> {
2859 let row = tx
2860 .query_one(
2861 "SELECT created_at, json_value, version FROM t_execution_log WHERE \
2862 execution_id = $1 AND version = $2",
2863 &[&execution_id.to_string(), &i64::from(version)],
2864 )
2865 .await?;
2866
2867 let created_at: DateTime<Utc> = get(&row, "created_at")?;
2868 let json_val: Json<ExecutionRequest> = get(&row, "json_value")?;
2869 let version = get::<i64, _>(&row, "version")?;
2870 let version = Version::try_from(version)
2871 .map_err(|_| consistency_db_err("version must be non-negative"))?;
2872 let event = json_val.0;
2873
2874 Ok(ExecutionEvent {
2875 created_at,
2876 event,
2877 backtrace_id: None,
2878 version,
2879 })
2880}
2881
2882async fn get_last_execution_event(
2883 tx: &Transaction<'_>,
2884 execution_id: &ExecutionId,
2885) -> Result<ExecutionEvent, DbErrorRead> {
2886 let row = tx
2887 .query_one(
2888 "SELECT created_at, json_value, version FROM t_execution_log WHERE \
2889 execution_id = $1 ORDER BY version DESC LIMIT 1",
2890 &[&execution_id.to_string()],
2891 )
2892 .await?;
2893
2894 let created_at: DateTime<Utc> = get(&row, "created_at")?;
2895 let event: Json<ExecutionRequest> = get(&row, "json_value")?;
2896 let event = event.0;
2897 let version: i64 = get(&row, "version")?;
2898 let version = Version::try_from(version)
2899 .map_err(|_| consistency_db_err("version must be non-negative"))?;
2900
2901 Ok(ExecutionEvent {
2902 created_at,
2903 event,
2904 backtrace_id: None,
2905 version,
2906 })
2907}
2908
2909async fn delay_response(
2910 tx: &Transaction<'_>,
2911 execution_id: &ExecutionId,
2912 delay_id: &DelayId,
2913) -> Result<Option<bool>, DbErrorRead> {
2914 let row = tx
2915 .query_opt(
2916 "SELECT delay_success \
2917 FROM t_join_set_response \
2918 WHERE \
2919 execution_id = $1 AND delay_id = $2",
2920 &[&execution_id.to_string(), &delay_id.to_string()],
2921 )
2922 .await?;
2923
2924 match row {
2925 Some(r) => Ok(Some(get::<bool, _>(&r, "delay_success")?)),
2926 None => Ok(None),
2927 }
2928}
2929
2930#[instrument(level = Level::TRACE, skip_all)]
2931async fn get_responses_after(
2932 tx: &Transaction<'_>,
2933 execution_id: &ExecutionId,
2934 last_response: ResponseCursor,
2935) -> Result<Vec<ResponseWithCursor>, DbErrorRead> {
2936 let rows = tx
2937 .query(
2938 "SELECT r.id, r.created_at, r.join_set_id, \
2939 r.delay_id, r.delay_success, \
2940 r.child_execution_id, r.finished_version, child.json_value \
2941 FROM t_join_set_response r LEFT OUTER JOIN t_execution_log child ON r.child_execution_id = child.execution_id \
2942 WHERE \
2943 r.id > $1 AND \
2944 r.execution_id = $2 AND \
2945 ( \
2946 r.finished_version = child.version \
2947 OR \
2948 r.child_execution_id IS NULL \
2949 ) \
2950 ORDER BY id \
2951 ",
2952 &[
2953 &i64::from(last_response.0),
2954 &execution_id.to_string(),
2955 ]
2956 )
2957 .await?;
2958
2959 let mut results = Vec::with_capacity(rows.len());
2960 for row in rows {
2961 let resp = parse_response_with_cursor(&row)?;
2962 results.push(resp);
2963 }
2964 Ok(results)
2965}
2966
2967async fn get_pending_of_single_ffqn(
2968 tx: &Transaction<'_>,
2969 batch_size: u32,
2970 pending_at_or_sooner: DateTime<Utc>,
2971 ffqn: &FunctionFqn,
2972 select_strategy: SelectStrategy,
2973) -> Result<Vec<(ExecutionId, Version)>, ()> {
2974 let rows = tx
2975 .query(
2976 &format!(
2977 r"
2978 SELECT execution_id, corresponding_version FROM t_state
2979 WHERE
2980 state = '{STATE_PENDING_AT}' AND
2981 pending_expires_finished <= $1 AND ffqn = $2
2982 AND is_paused = false
2983 ORDER BY pending_expires_finished
2984 {}
2985 LIMIT $3
2986 ",
2987 if select_strategy == SelectStrategy::LockForUpdate {
2988 "FOR UPDATE SKIP LOCKED"
2989 } else {
2990 ""
2991 }
2992 ),
2993 &[
2994 &pending_at_or_sooner,
2995 &ffqn.to_string(),
2996 &(i64::from(batch_size)),
2997 ],
2998 )
2999 .await
3000 .map_err(|err| {
3001 warn!("Ignoring consistency error {err:?}");
3002 })?;
3003
3004 let mut result = Vec::with_capacity(rows.len());
3005 for row in rows {
3006 let unpack = || -> Result<(ExecutionId, Version), DbErrorGeneric> {
3007 let eid_str: String = get(&row, "execution_id")?;
3008 let corresponding_version: i64 = get(&row, "corresponding_version")?;
3009 let corresponding_version = Version::try_from(corresponding_version)
3010 .map_err(|_| consistency_db_err("version must be non-negative"))?;
3011
3012 if let Ok(eid) = ExecutionId::from_str(&eid_str) {
3013 return Ok((eid, corresponding_version.increment()));
3014 }
3015 Err(consistency_db_err("invalid execution_id"))
3016 };
3017
3018 match unpack() {
3019 Ok(val) => result.push(val),
3020 Err(err) => warn!("Ignoring corrupted row in pending check: {err:?}"),
3021 }
3022 }
3023 Ok(result)
3024}
3025
3026async fn get_pending_by_ffqns(
3028 tx: &Transaction<'_>,
3029 batch_size: u32,
3030 pending_at_or_sooner: DateTime<Utc>,
3031 ffqns: &[FunctionFqn],
3032 select_strategy: SelectStrategy,
3033) -> Result<Vec<(ExecutionId, Version)>, DbErrorGeneric> {
3034 let batch_size = usize::try_from(batch_size).expect("16 bit systems are unsupported");
3035 let mut execution_ids_versions = Vec::with_capacity(batch_size);
3036
3037 for ffqn in ffqns {
3038 let needed = batch_size - execution_ids_versions.len();
3039 if needed == 0 {
3040 break;
3041 }
3042 let needed = u32::try_from(needed).expect("u32 - usize cannot overflow an 32");
3043 if let Ok(execs) =
3044 get_pending_of_single_ffqn(tx, needed, pending_at_or_sooner, ffqn, select_strategy)
3045 .await
3046 {
3047 execution_ids_versions.extend(execs);
3048 }
3049 }
3050
3051 Ok(execution_ids_versions)
3052}
3053
3054#[derive(Debug, Copy, Clone, PartialEq, Eq)]
3055enum SelectStrategy {
3056 Read,
3057 LockForUpdate,
3058}
3059
3060async fn get_pending_by_component_input_digest(
3061 tx: &Transaction<'_>,
3062 batch_size: u32,
3063 pending_at_or_sooner: DateTime<Utc>,
3064 input_digest: &ComponentDigest,
3065 select_strategy: SelectStrategy,
3066) -> Result<Vec<(ExecutionId, Version)>, DbErrorGeneric> {
3067 let rows = tx
3068 .query(
3069 &format!(
3070 r"
3071 SELECT execution_id, corresponding_version FROM t_state WHERE
3072 state = '{STATE_PENDING_AT}' AND
3073 pending_expires_finished <= $1 AND
3074 component_id_input_digest = $2
3075 AND is_paused = false
3076 ORDER BY pending_expires_finished
3077 {}
3078 LIMIT $3
3079 ",
3080 if select_strategy == SelectStrategy::LockForUpdate {
3081 "FOR UPDATE SKIP LOCKED"
3082 } else {
3083 ""
3084 }
3085 ),
3086 &[&pending_at_or_sooner, &input_digest, &i64::from(batch_size)],
3087 )
3088 .await?;
3089
3090 let mut result = Vec::with_capacity(rows.len());
3091 for row in rows {
3092 let unpack = || -> Result<(ExecutionId, Version), DbErrorGeneric> {
3093 let eid_str: String = get(&row, "execution_id")?;
3094 let corresponding_version: i64 = get(&row, "corresponding_version")?;
3095 let corresponding_version = Version::try_from(corresponding_version)
3096 .map_err(|_| consistency_db_err("version must be non-negative"))?;
3097
3098 let eid = ExecutionId::from_str(&eid_str)
3099 .map_err(|err| consistency_db_err(err.to_string()))?;
3100 Ok((eid, corresponding_version.increment()))
3101 };
3102
3103 match unpack() {
3104 Ok(val) => result.push(val),
3105 Err(err) => {
3106 warn!("Skipping corrupted row in get_pending_by_component_input_digest: {err:?}");
3107 }
3108 }
3109 }
3110
3111 Ok(result)
3112}
3113
3114fn notify_pending_locked(
3115 notifier: &NotifierPendingAt,
3116 current_time: DateTime<Utc>,
3117 ffqn_to_pending_subscription: &std::sync::MutexGuard<PendingFfqnSubscribersHolder>,
3118) {
3119 if notifier.scheduled_at <= current_time {
3120 ffqn_to_pending_subscription.notify(notifier);
3121 }
3122}
3123
3124async fn upgrade_execution_component(
3125 tx: &Transaction<'_>,
3126 execution_id: &ExecutionId,
3127 old: &ComponentDigest,
3128 new: &ComponentDigest,
3129) -> Result<(), DbErrorWrite> {
3130 debug!("Updating t_state to component {new}");
3131
3132 let updated = tx
3133 .execute(
3134 r"
3135 UPDATE t_state
3136 SET
3137 updated_at = CURRENT_TIMESTAMP,
3138 component_id_input_digest = $1
3139 WHERE
3140 execution_id = $2 AND
3141 component_id_input_digest = $3
3142 ",
3143 &[
3144 &new.as_slice(), &execution_id.to_string(), &old.as_slice(), ],
3148 )
3149 .await?;
3150
3151 if updated != 1 {
3152 return Err(DbErrorWrite::NotFound);
3153 }
3154 Ok(())
3155}
3156
3157impl PostgresConnection {
3158 #[instrument(level = Level::TRACE, skip_all)]
3160 fn notify_all(&self, notifiers: Vec<AppendNotifier>, current_time: DateTime<Utc>) {
3161 let (pending_ats, finished_execs, responses) = {
3162 let (mut pending_ats, mut finished_execs, mut responses) =
3163 (Vec::new(), Vec::new(), Vec::new());
3164 for notifier in notifiers {
3165 if let Some(pending_at) = notifier.pending_at {
3166 pending_ats.push(pending_at);
3167 }
3168 if let Some(finished) = notifier.execution_finished {
3169 finished_execs.push(finished);
3170 }
3171 if let Some(response) = notifier.response {
3172 responses.push(response);
3173 }
3174 }
3175 (pending_ats, finished_execs, responses)
3176 };
3177
3178 if !pending_ats.is_empty() {
3180 let guard = self.pending_subscribers.lock().unwrap();
3181 for pending_at in pending_ats {
3182 notify_pending_locked(&pending_at, current_time, &guard);
3183 }
3184 }
3185 if !finished_execs.is_empty() {
3187 let mut guard = self.execution_finished_subscribers.lock().unwrap();
3188 for finished in finished_execs {
3189 if let Some(listeners_of_exe_id) = guard.remove(&finished.execution_id) {
3190 for (_tag, sender) in listeners_of_exe_id {
3191 let _ = sender.send(finished.retval.clone());
3192 }
3193 }
3194 }
3195 }
3196 if !responses.is_empty() {
3198 let mut guard = self.response_subscribers.lock().unwrap();
3199 for (execution_id, response) in responses {
3200 if let Some((sender, _)) = guard.remove(&execution_id) {
3201 let _ = sender.send(response);
3202 }
3203 }
3204 }
3205 }
3206}
3207
3208#[async_trait]
3209impl DbExecutor for PostgresConnection {
3210 #[instrument(level = Level::TRACE, skip(self))]
3211 async fn lock_pending_by_ffqns(
3212 &self,
3213 batch_size: u32,
3214 pending_at_or_sooner: DateTime<Utc>,
3215 ffqns: Arc<[FunctionFqn]>,
3216 created_at: DateTime<Utc>,
3217 component_id: ComponentId,
3218 deployment_id: DeploymentId,
3219 executor_id: ExecutorId,
3220 lock_expires_at: DateTime<Utc>,
3221 run_id: RunId,
3222 retry_config: ComponentRetryConfig,
3223 ) -> Result<LockPendingResponse, DbErrorWrite> {
3224 let mut client_guard = self.client.lock().await;
3225 let tx = client_guard.transaction().await?;
3226
3227 let execution_ids_versions = get_pending_by_ffqns(
3228 &tx,
3229 batch_size,
3230 pending_at_or_sooner,
3231 &ffqns,
3232 SelectStrategy::LockForUpdate,
3233 )
3234 .await?;
3235
3236 if execution_ids_versions.is_empty() {
3237 tx.commit().await?;
3240 return Ok(vec![]);
3241 }
3242
3243 debug!("Locking {execution_ids_versions:?}");
3244
3245 let mut locked_execs = Vec::with_capacity(execution_ids_versions.len());
3247 for (execution_id, version) in execution_ids_versions {
3248 match lock_single_execution(
3249 &tx,
3250 created_at,
3251 &component_id,
3252 deployment_id,
3253 &execution_id,
3254 run_id,
3255 &version,
3256 executor_id,
3257 lock_expires_at,
3258 retry_config,
3259 )
3260 .await
3261 {
3262 Ok(locked) => locked_execs.push(locked),
3263 Err(err) => {
3264 tx.rollback().await?; debug!("Locking row {execution_id} failed - {err:?}");
3266 return Err(err);
3267 }
3268 }
3269 }
3270
3271 tx.commit().await?;
3272
3273 Ok(locked_execs)
3274 }
3275
3276 #[instrument(level = Level::TRACE, skip(self))]
3277 async fn lock_pending_by_component_digest(
3278 &self,
3279 batch_size: u32,
3280 pending_at_or_sooner: DateTime<Utc>,
3281 component_id: &ComponentId,
3282 deployment_id: DeploymentId,
3283 created_at: DateTime<Utc>,
3284 executor_id: ExecutorId,
3285 lock_expires_at: DateTime<Utc>,
3286 run_id: RunId,
3287 retry_config: ComponentRetryConfig,
3288 ) -> Result<LockPendingResponse, DbErrorWrite> {
3289 let mut client_guard = self.client.lock().await;
3290 let tx = client_guard.transaction().await?;
3291
3292 let execution_ids_versions = get_pending_by_component_input_digest(
3293 &tx,
3294 batch_size,
3295 pending_at_or_sooner,
3296 &component_id.component_digest,
3297 SelectStrategy::LockForUpdate,
3298 )
3299 .await?;
3300
3301 if execution_ids_versions.is_empty() {
3302 tx.commit().await?;
3303 return Ok(vec![]);
3304 }
3305
3306 debug!("Locking {execution_ids_versions:?}");
3307
3308 let mut locked_execs = Vec::with_capacity(execution_ids_versions.len());
3309 for (execution_id, version) in execution_ids_versions {
3310 match lock_single_execution(
3311 &tx,
3312 created_at,
3313 component_id,
3314 deployment_id,
3315 &execution_id,
3316 run_id,
3317 &version,
3318 executor_id,
3319 lock_expires_at,
3320 retry_config,
3321 )
3322 .await
3323 {
3324 Ok(locked) => locked_execs.push(locked),
3325 Err(err) => {
3326 tx.rollback().await?; debug!("Locking row {execution_id} failed - {err:?}");
3328 return Err(err);
3329 }
3330 }
3331 }
3332
3333 tx.commit().await?;
3334 Ok(locked_execs)
3335 }
3336
3337 #[cfg(feature = "test")]
3338 #[instrument(level = Level::DEBUG, skip(self))]
3339 async fn lock_one(
3340 &self,
3341 created_at: DateTime<Utc>,
3342 component_id: ComponentId,
3343 deployment_id: DeploymentId,
3344 execution_id: &ExecutionId,
3345 run_id: RunId,
3346 version: Version,
3347 executor_id: ExecutorId,
3348 lock_expires_at: DateTime<Utc>,
3349 retry_config: ComponentRetryConfig,
3350 ) -> Result<LockedExecution, DbErrorWrite> {
3351 debug!(%execution_id, "lock_one");
3352 let mut client_guard = self.client.lock().await;
3353 let tx = client_guard.transaction().await?;
3354
3355 let res = lock_single_execution(
3356 &tx,
3357 created_at,
3358 &component_id,
3359 deployment_id,
3360 execution_id,
3361 run_id,
3362 &version,
3363 executor_id,
3364 lock_expires_at,
3365 retry_config,
3366 )
3367 .await?;
3368
3369 tx.commit().await?;
3370 Ok(res)
3371 }
3372
3373 #[instrument(level = Level::DEBUG, skip(self, req))]
3374 async fn append(
3375 &self,
3376 execution_id: ExecutionId,
3377 version: Version,
3378 req: AppendRequest,
3379 ) -> Result<AppendResponse, DbErrorWrite> {
3380 debug!(%req, "append");
3381 trace!(?req, "append");
3382 let created_at = req.created_at;
3383
3384 let mut client_guard = self.client.lock().await;
3385 let tx = client_guard.transaction().await?;
3386
3387 let (new_version, notifier) = append(&tx, &execution_id, req, version).await?;
3388
3389 tx.commit().await?;
3390
3391 drop(client_guard);
3393
3394 self.notify_all(vec![notifier], created_at);
3395 Ok(new_version)
3396 }
3397
3398 #[instrument(level = Level::DEBUG, skip_all)]
3399 async fn append_batch_respond_to_parent(
3400 &self,
3401 events: AppendEventsToExecution,
3402 response: AppendResponseToExecution,
3403 current_time: DateTime<Utc>,
3404 ) -> Result<AppendBatchResponse, DbErrorWrite> {
3405 debug!("append_batch_respond_to_parent");
3406 if events.execution_id == response.parent_execution_id {
3407 return Err(DbErrorWrite::NonRetriable(
3408 DbErrorWriteNonRetriable::ValidationFailed(
3409 "Parameters `execution_id` and `parent_execution_id` cannot be the same".into(),
3410 ),
3411 ));
3412 }
3413 if events.batch.is_empty() {
3414 return Err(DbErrorWrite::NonRetriable(
3415 DbErrorWriteNonRetriable::ValidationFailed("batch cannot be empty".into()),
3416 ));
3417 }
3418
3419 let mut client_guard = self.client.lock().await;
3420 let tx = client_guard.transaction().await?;
3421
3422 let mut version = events.version;
3423 let mut notifiers = Vec::new();
3424
3425 for append_request in events.batch {
3426 let (v, n) = append(&tx, &events.execution_id, append_request, version).await?;
3427 version = v;
3428 notifiers.push(n);
3429 }
3430
3431 let pending_at_parent = append_response(
3432 &tx,
3433 &response.parent_execution_id,
3434 JoinSetResponseEventOuter {
3435 created_at: response.created_at,
3436 event: JoinSetResponseEvent {
3437 join_set_id: response.join_set_id,
3438 event: JoinSetResponse::ChildExecutionFinished {
3439 child_execution_id: response.child_execution_id,
3440 finished_version: response.finished_version,
3441 result: response.result,
3442 },
3443 },
3444 },
3445 )
3446 .await?;
3447 notifiers.push(pending_at_parent);
3448
3449 tx.commit().await?;
3450 drop(client_guard);
3451
3452 self.notify_all(notifiers, current_time);
3453 Ok(version)
3454 }
3455
3456 #[instrument(level = Level::TRACE, skip(self, timeout_fut))]
3457 async fn wait_for_pending_by_ffqn(
3458 &self,
3459 pending_at_or_sooner: DateTime<Utc>,
3460 ffqns: Arc<[FunctionFqn]>,
3461 timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
3462 ) {
3463 let unique_tag: u64 = rand::random();
3464 let (sender, mut receiver) = mpsc::channel(1);
3465 {
3466 let mut pending_subscribers = self.pending_subscribers.lock().unwrap();
3467 for ffqn in ffqns.as_ref() {
3468 pending_subscribers.insert_ffqn(ffqn.clone(), (sender.clone(), unique_tag));
3469 }
3470 }
3471
3472 async {
3473 let mut db_has_pending = false;
3474 {
3475 let mut client_guard = self.client.lock().await;
3477 if let Ok(tx) = client_guard.transaction().await {
3479 if let Ok(res) = get_pending_by_ffqns(
3480 &tx,
3481 1,
3482 pending_at_or_sooner,
3483 &ffqns,
3484 SelectStrategy::Read,
3485 )
3486 .await
3487 && !res.is_empty()
3488 {
3489 db_has_pending = true;
3490 }
3491 let _ = tx.commit().await;
3493 }
3494 }
3495
3496 if db_has_pending {
3497 trace!("Not waiting, database already contains new pending executions");
3498 return;
3499 }
3500
3501 tokio::select! {
3502 _ = receiver.recv() => {
3503 trace!("Received a notification");
3504 }
3505 () = timeout_fut => {
3506 }
3507 }
3508 }
3509 .await;
3510
3511 {
3513 let mut pending_subscribers = self.pending_subscribers.lock().unwrap();
3514 for ffqn in ffqns.as_ref() {
3515 match pending_subscribers.remove_ffqn(ffqn) {
3516 Some((_, tag)) if tag == unique_tag => {}
3517 Some(other) => {
3518 pending_subscribers.insert_ffqn(ffqn.clone(), other);
3519 }
3520 None => {}
3521 }
3522 }
3523 }
3524 }
3525
3526 #[instrument(level = Level::DEBUG, skip(self, timeout_fut))]
3527 async fn wait_for_pending_by_component_digest(
3528 &self,
3529 pending_at_or_sooner: DateTime<Utc>,
3530 component_digest: &ComponentDigest,
3531 timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
3532 ) {
3533 let unique_tag: u64 = rand::random();
3534 let (sender, mut receiver) = mpsc::channel(1);
3535 {
3536 let mut pending_subscribers = self.pending_subscribers.lock().unwrap();
3537 pending_subscribers
3538 .insert_by_component(component_digest.clone(), (sender.clone(), unique_tag));
3539 }
3540
3541 async {
3542 let mut db_has_pending = false;
3543 {
3544 let mut client_guard = self.client.lock().await;
3545 if let Ok(tx) = client_guard.transaction().await {
3546 if let Ok(res) = get_pending_by_component_input_digest(
3547 &tx,
3548 1,
3549 pending_at_or_sooner,
3550 component_digest,
3551 SelectStrategy::Read,
3552 )
3553 .await
3554 && !res.is_empty()
3555 {
3556 db_has_pending = true;
3557 }
3558 let _ = tx.commit().await;
3559 }
3560 }
3561
3562 if db_has_pending {
3563 trace!("Not waiting, database already contains new pending executions");
3564 return;
3565 }
3566
3567 tokio::select! {
3568 _ = receiver.recv() => {
3569 trace!("Received a notification");
3570 }
3571 () = timeout_fut => {
3572 }
3573 }
3574 }
3575 .await;
3576
3577 {
3579 let mut pending_subscribers = self.pending_subscribers.lock().unwrap();
3580 match pending_subscribers.remove_by_component(component_digest) {
3581 Some((_, tag)) if tag == unique_tag => {}
3582 Some(other) => {
3583 pending_subscribers.insert_by_component(component_digest.clone(), other);
3584 }
3585 None => {}
3586 }
3587 }
3588 }
3589
3590 async fn get_last_execution_event(
3591 &self,
3592 execution_id: &ExecutionId,
3593 ) -> Result<ExecutionEvent, DbErrorRead> {
3594 let mut client_guard = self.client.lock().await;
3595 let tx = client_guard.transaction().await?;
3596
3597 let event = get_last_execution_event(&tx, execution_id).await?;
3598
3599 tx.commit().await?;
3600 Ok(event)
3601 }
3602}
3603#[async_trait]
3604impl DbConnection for PostgresConnection {
3605 #[instrument(level = Level::DEBUG, skip_all, fields(execution_id = %req.execution_id))]
3606 async fn create(&self, req: CreateRequest) -> Result<AppendResponse, DbErrorWrite> {
3607 debug!("create");
3608 trace!(?req, "create");
3609 let created_at = req.created_at;
3610
3611 let mut client_guard = self.client.lock().await;
3612 let tx = client_guard.transaction().await?;
3613
3614 let (version, notifier) = create_inner(&tx, req.clone()).await?;
3615
3616 tx.commit().await?;
3617 drop(client_guard); self.notify_all(vec![notifier], created_at);
3620 Ok(version)
3621 }
3622
3623 #[instrument(level = Level::DEBUG, skip(self))]
3624 async fn get(
3625 &self,
3626 execution_id: &ExecutionId,
3627 ) -> Result<concepts::storage::ExecutionLog, DbErrorRead> {
3628 trace!("get");
3629 let mut client_guard = self.client.lock().await;
3630 let tx = client_guard.transaction().await?;
3631
3632 let res = get_execution_log(&tx, execution_id).await?;
3633
3634 tx.commit().await?;
3635 Ok(res)
3636 }
3637
3638 #[instrument(level = Level::DEBUG, skip(self, batch))]
3639 async fn append_batch(
3640 &self,
3641 current_time: DateTime<Utc>,
3642 batch: Vec<AppendRequest>,
3643 execution_id: ExecutionId,
3644 version: Version,
3645 ) -> Result<AppendBatchResponse, DbErrorWrite> {
3646 debug!("append_batch");
3647 trace!(?batch, "append_batch");
3648 assert!(!batch.is_empty(), "Empty batch request");
3649
3650 let mut client_guard = self.client.lock().await;
3651 let tx = client_guard.transaction().await?;
3652
3653 let mut version = version;
3654 let mut notifier = None;
3655
3656 for append_request in batch {
3657 let (v, n) = append(&tx, &execution_id, append_request, version).await?;
3658 version = v;
3659 notifier = Some(n);
3660 }
3661
3662 tx.commit().await?;
3663 drop(client_guard);
3664
3665 self.notify_all(
3666 vec![notifier.expect("checked that the batch is not empty")],
3667 current_time,
3668 );
3669 Ok(version)
3670 }
3671
3672 #[instrument(level = Level::DEBUG, skip_all, fields(%execution_id, %version))]
3673 async fn append_batch_create_new_execution(
3674 &self,
3675 current_time: DateTime<Utc>,
3676 batch: Vec<AppendRequest>,
3677 execution_id: ExecutionId,
3678 version: Version,
3679 child_req: Vec<CreateRequest>,
3680 backtraces: Vec<BacktraceInfo>,
3681 ) -> Result<AppendBatchResponse, DbErrorWrite> {
3682 debug!("append_batch_create_new_execution");
3683 trace!(?batch, ?child_req, "append_batch_create_new_execution");
3684 assert!(!batch.is_empty(), "Empty batch request");
3685
3686 let mut client_guard = self.client.lock().await;
3687 let tx = client_guard.transaction().await?;
3688
3689 let mut version = version;
3690 let mut notifier = None;
3691
3692 for append_request in batch {
3693 let (v, n) = append(&tx, &execution_id, append_request, version).await?;
3694 version = v;
3695 notifier = Some(n);
3696 }
3697
3698 let mut notifiers = Vec::new();
3699 notifiers.push(notifier.expect("checked that the batch is not empty"));
3700
3701 for req in child_req {
3702 let (_, n) = create_inner(&tx, req).await?;
3703 notifiers.push(n);
3704 }
3705 for backtrace in backtraces {
3706 append_backtrace(&tx, &backtrace).await?;
3707 }
3708 tx.commit().await?;
3709 drop(client_guard);
3710
3711 self.notify_all(notifiers, current_time);
3712 Ok(version)
3713 }
3714
3715 #[instrument(level = Level::DEBUG, skip(self, timeout_fut))]
3716 async fn subscribe_to_next_responses(
3717 &self,
3718 execution_id: &ExecutionId,
3719 last_response: ResponseCursor,
3720 timeout_fut: Pin<Box<dyn Future<Output = TimeoutOutcome> + Send>>,
3721 ) -> Result<Vec<ResponseWithCursor>, DbErrorReadWithTimeout> {
3722 debug!("next_responses");
3723 let unique_tag: u64 = rand::random();
3724 let execution_id_clone = execution_id.clone();
3725
3726 let cleanup = || {
3727 let mut guard = self.response_subscribers.lock().unwrap();
3728 match guard.remove(&execution_id_clone) {
3729 Some((_, tag)) if tag == unique_tag => {}
3730 Some(other) => {
3731 guard.insert(execution_id_clone.clone(), other);
3732 }
3733 None => {}
3734 }
3735 };
3736
3737 let receiver = {
3738 let mut client_guard = self.client.lock().await;
3739 let tx = client_guard.transaction().await?;
3740
3741 let (sender, receiver) = oneshot::channel();
3747 self.response_subscribers
3748 .lock()
3749 .unwrap()
3750 .insert(execution_id.clone(), (sender, unique_tag));
3751
3752 let responses = get_responses_after(&tx, execution_id, last_response).await?;
3753
3754 if responses.is_empty() {
3755 tx.commit().await.map_err(|err| {
3757 cleanup(); DbErrorRead::from(err)
3759 })?;
3760 receiver
3761 } else {
3762 cleanup(); tx.commit().await?;
3764 return Ok(responses);
3765 }
3766 };
3767
3768 let res = tokio::select! {
3769 resp = receiver => {
3770 match resp {
3771 Ok(resp) => Ok(vec![resp]),
3772 Err(_) => Err(DbErrorReadWithTimeout::from(DbErrorGeneric::Close)),
3773 }
3774 }
3775 outcome = timeout_fut => Err(DbErrorReadWithTimeout::Timeout(outcome)),
3776 };
3777
3778 cleanup();
3779 res
3780 }
3781
3782 #[instrument(level = Level::DEBUG, skip(self, timeout_fut))]
3783 async fn wait_for_finished_result(
3784 &self,
3785 execution_id: &ExecutionId,
3786 timeout_fut: Option<Pin<Box<dyn Future<Output = TimeoutOutcome> + Send>>>,
3787 ) -> Result<SupportedFunctionReturnValue, DbErrorReadWithTimeout> {
3788 let unique_tag: u64 = rand::random();
3789 let execution_id_clone = execution_id.clone();
3790
3791 let cleanup = || {
3792 let mut guard = self.execution_finished_subscribers.lock().unwrap();
3793 if let Some(subscribers) = guard.get_mut(&execution_id_clone) {
3794 subscribers.remove(&unique_tag);
3795 }
3796 };
3797
3798 let receiver = {
3799 let mut client_guard = self.client.lock().await;
3800 let tx = client_guard.transaction().await?;
3801
3802 let (sender, receiver) = oneshot::channel();
3804 {
3805 let mut guard = self.execution_finished_subscribers.lock().unwrap();
3806 guard
3807 .entry(execution_id.clone())
3808 .or_default()
3809 .insert(unique_tag, sender);
3810 }
3811
3812 let pending_state = get_combined_state(&tx, execution_id)
3813 .await?
3814 .execution_with_state
3815 .pending_state;
3816
3817 if let PendingState::Finished(finished) = pending_state {
3818 let event = get_execution_event(&tx, execution_id, finished.version).await?;
3819 tx.commit().await?;
3820 cleanup();
3821
3822 if let ExecutionRequest::Finished { retval, .. } = event.event {
3823 return Ok(retval);
3824 }
3825 error!("Mismatch, expected Finished row: {event:?} based on t_state {finished}");
3826 return Err(DbErrorReadWithTimeout::from(consistency_db_err(
3827 "cannot get finished event based on t_state version",
3828 )));
3829 }
3830 tx.commit().await?;
3831 receiver
3832 };
3833
3834 let timeout_fut = timeout_fut.unwrap_or_else(|| Box::pin(std::future::pending()));
3835 let res = tokio::select! {
3836 resp = receiver => {
3837 match resp {
3838 Ok(retval) => Ok(retval),
3839 Err(_recv_err) => Err(DbErrorGeneric::Close.into())
3840 }
3841 }
3842 outcome = timeout_fut => Err(DbErrorReadWithTimeout::Timeout(outcome)),
3843 };
3844
3845 cleanup();
3846 res
3847 }
3848
3849 #[instrument(level = Level::DEBUG, skip_all, fields(%join_set_id, %execution_id))]
3850 async fn append_delay_response(
3851 &self,
3852 created_at: DateTime<Utc>,
3853 execution_id: ExecutionId,
3854 join_set_id: JoinSetId,
3855 delay_id: DelayId,
3856 result: Result<(), ()>,
3857 ) -> Result<AppendDelayResponseOutcome, DbErrorWrite> {
3858 debug!("append_delay_response");
3859 let event = JoinSetResponseEventOuter {
3860 created_at,
3861 event: JoinSetResponseEvent {
3862 join_set_id,
3863 event: JoinSetResponse::DelayFinished {
3864 delay_id: delay_id.clone(),
3865 result,
3866 },
3867 },
3868 };
3869
3870 let mut client_guard = self.client.lock().await;
3871 let tx = client_guard.transaction().await?;
3872
3873 let res = append_response(&tx, &execution_id, event).await;
3874
3875 match res {
3876 Ok(notifier) => {
3877 tx.commit().await?;
3878 drop(client_guard);
3879 self.notify_all(vec![notifier], created_at);
3880 Ok(AppendDelayResponseOutcome::Success)
3881 }
3882 Err(DbErrorWrite::NonRetriable(DbErrorWriteNonRetriable::Conflict)) => {
3883 tx.rollback().await?;
3886
3887 let tx = client_guard.transaction().await?;
3889 let delay_success = delay_response(&tx, &execution_id, &delay_id).await?;
3890 tx.commit().await?;
3891
3892 match delay_success {
3893 Some(true) => Ok(AppendDelayResponseOutcome::AlreadyFinished),
3894 Some(false) => Ok(AppendDelayResponseOutcome::AlreadyCancelled),
3895 None => Err(DbErrorWrite::Generic(consistency_db_err(
3896 "insert failed yet select did not find the response",
3897 ))),
3898 }
3899 }
3900 Err(err) => {
3901 let _ = tx.rollback().await; Err(err)
3903 }
3904 }
3905 }
3906
3907 #[instrument(level = Level::DEBUG, skip_all)]
3908 async fn append_backtrace(&self, append: BacktraceInfo) -> Result<(), DbErrorWrite> {
3909 debug!("append_backtrace");
3910 let mut client_guard = self.client.lock().await;
3911 let tx = client_guard.transaction().await?;
3912
3913 append_backtrace(&tx, &append).await?;
3914
3915 tx.commit().await?;
3916 Ok(())
3917 }
3918
3919 #[instrument(level = Level::DEBUG, skip_all)]
3920 async fn append_backtrace_batch(&self, batch: Vec<BacktraceInfo>) -> Result<(), DbErrorWrite> {
3921 debug!("append_backtrace_batch");
3922 let mut client_guard = self.client.lock().await;
3923 let tx = client_guard.transaction().await?;
3924
3925 for append in batch {
3926 append_backtrace(&tx, &append).await?;
3927 }
3928
3929 tx.commit().await?;
3930 Ok(())
3931 }
3932
3933 #[instrument(level = Level::DEBUG, skip_all)]
3934 async fn append_log(&self, row: LogInfoAppendRow) -> Result<(), DbErrorWrite> {
3935 trace!("append_log");
3936 let mut client_guard = self.client.lock().await;
3937 let tx = client_guard.transaction().await?;
3938 append_log(&tx, &row).await?;
3939 tx.commit().await?;
3940
3941 Ok(())
3942 }
3943
3944 #[instrument(level = Level::DEBUG, skip_all)]
3945 async fn append_log_batch(&self, batch: &[LogInfoAppendRow]) -> Result<(), DbErrorWrite> {
3946 trace!("append_log_batch");
3947 let mut client_guard = self.client.lock().await;
3948 let tx = client_guard.transaction().await?;
3949 for row in batch {
3950 append_log(&tx, row).await?;
3951 }
3952 tx.commit().await?;
3953 Ok(())
3954 }
3955
3956 #[instrument(level = Level::TRACE, skip(self))]
3958 async fn get_expired_timers(
3959 &self,
3960 at: DateTime<Utc>,
3961 ) -> Result<Vec<ExpiredTimer>, DbErrorGeneric> {
3962 let mut client_guard = self.client.lock().await;
3963 let tx = client_guard.transaction().await?;
3964
3965 let rows = tx
3967 .query(
3968 "SELECT execution_id, join_set_id, delay_id FROM t_delay WHERE expires_at <= $1",
3969 &[&at],
3970 )
3971 .await?;
3972
3973 let mut expired_timers = Vec::with_capacity(rows.len());
3974 for row in rows {
3975 let unpack = || -> Result<ExpiredTimer, DbErrorGeneric> {
3976 let execution_id: String = get(&row, "execution_id")?;
3977 let execution_id = ExecutionId::from_str(&execution_id)?;
3978 let join_set_id: String = get(&row, "join_set_id")?;
3979 let join_set_id = JoinSetId::from_str(&join_set_id)?;
3980 let delay_id: String = get(&row, "delay_id")?;
3981 let delay_id = DelayId::from_str(&delay_id)?;
3982
3983 Ok(ExpiredTimer::Delay(ExpiredDelay {
3984 execution_id,
3985 join_set_id,
3986 delay_id,
3987 }))
3988 };
3989
3990 match unpack() {
3991 Ok(timer) => expired_timers.push(timer),
3992 Err(err) => warn!("Skipping corrupted row in get_expired_timers (delays): {err:?}"),
3993 }
3994 }
3995
3996 let rows = tx.query(
3998 &format!(
3999 "SELECT execution_id, last_lock_version, corresponding_version, intermittent_event_count, max_retries, retry_exp_backoff_millis, executor_id, run_id \
4000 FROM t_state \
4001 WHERE pending_expires_finished <= $1 AND state = '{STATE_LOCKED}'"
4002 ),
4003 &[&at]
4004 ).await?;
4005
4006 for row in rows {
4007 let unpack = || -> Result<ExpiredTimer, DbErrorGeneric> {
4008 let execution_id: String = get(&row, "execution_id")?;
4009 let execution_id = ExecutionId::from_str(&execution_id)?;
4010 let last_lock_version: i64 = get(&row, "last_lock_version")?;
4011 let last_lock_version = Version::try_from(last_lock_version)?;
4012
4013 let corresponding_version: i64 = get(&row, "corresponding_version")?;
4014 let corresponding_version = Version::try_from(corresponding_version)?;
4015
4016 let intermittent_event_count =
4017 u32::try_from(get::<i64, _>(&row, "intermittent_event_count")?).map_err(
4018 |_| consistency_db_err("`intermittent_event_count` must not be negative"),
4019 )?;
4020
4021 let max_retries = get::<Option<i64>, _>(&row, "max_retries")?
4022 .map(u32::try_from)
4023 .transpose()
4024 .map_err(|_| consistency_db_err("`max_retries` must not be negative"))?;
4025 let retry_exp_backoff_millis =
4026 u32::try_from(get::<i64, _>(&row, "retry_exp_backoff_millis")?).map_err(
4027 |_| consistency_db_err("`retry_exp_backoff_millis` must not be negative"),
4028 )?;
4029 let executor_id: String = get(&row, "executor_id")?;
4030 let executor_id = ExecutorId::from_str(&executor_id)?;
4031 let run_id: String = get(&row, "run_id")?;
4032 let run_id = RunId::from_str(&run_id)?;
4033
4034 Ok(ExpiredTimer::Lock(ExpiredLock {
4035 execution_id,
4036 locked_at_version: last_lock_version,
4037 next_version: corresponding_version.increment(),
4038 intermittent_event_count,
4039 max_retries,
4040 retry_exp_backoff: Duration::from_millis(u64::from(retry_exp_backoff_millis)),
4041 locked_by: LockedBy {
4042 executor_id,
4043 run_id,
4044 },
4045 }))
4046 };
4047
4048 match unpack() {
4049 Ok(timer) => expired_timers.push(timer),
4050 Err(err) => warn!("Skipping corrupted row in get_expired_timers (locks): {err:?}"),
4051 }
4052 }
4053
4054 tx.commit().await?;
4055
4056 if !expired_timers.is_empty() {
4057 debug!("get_expired_timers found {expired_timers:?}");
4058 }
4059 Ok(expired_timers)
4060 }
4061
4062 async fn get_execution_event(
4063 &self,
4064 execution_id: &ExecutionId,
4065 version: &Version,
4066 ) -> Result<ExecutionEvent, DbErrorRead> {
4067 let mut client_guard = self.client.lock().await;
4068 let tx = client_guard.transaction().await?;
4069
4070 let event = get_execution_event(&tx, execution_id, version.0).await?;
4071
4072 tx.commit().await?;
4073 Ok(event)
4074 }
4075
4076 async fn get_pending_state(
4077 &self,
4078 execution_id: &ExecutionId,
4079 ) -> Result<ExecutionWithState, DbErrorRead> {
4080 let mut client_guard = self.client.lock().await;
4081 let tx = client_guard.transaction().await?;
4082
4083 let combined_state = get_combined_state(&tx, execution_id).await?;
4084
4085 tx.commit().await?;
4086 Ok(combined_state.execution_with_state)
4087 }
4088}
4089
4090#[async_trait]
4091impl DbExternalApi for PostgresConnection {
4092 #[instrument(skip(self))]
4093 async fn get_backtrace(
4094 &self,
4095 execution_id: &ExecutionId,
4096 filter: BacktraceFilter,
4097 ) -> Result<BacktraceInfo, DbErrorRead> {
4098 debug!("get_backtrace");
4099
4100 let mut client_guard = self.client.lock().await;
4101 let tx = client_guard.transaction().await?;
4102
4103 let mut params: Vec<Box<dyn tokio_postgres::types::ToSql + Sync + Send>> = Vec::new();
4104
4105 params.push(Box::new(execution_id.to_string())); let p_execution_id_idx = format!("${}", params.len()); let mut sql = String::new();
4109 write!(
4110 &mut sql,
4111 "SELECT component_id, version_min_including, version_max_excluding, wasm_backtrace \
4112 FROM t_execution_backtrace e INNER JOIN t_wasm_backtrace w ON e.backtrace_hash = w.backtrace_hash \
4113 WHERE execution_id = {p_execution_id_idx}"
4114 )
4115 .unwrap();
4116
4117 match &filter {
4118 BacktraceFilter::Specific(version) => {
4119 params.push(Box::new(i64::from(version.0))); let p_ver_idx = format!("${}", params.len()); write!(
4122 &mut sql,
4123 " AND version_min_including <= {p_ver_idx} AND version_max_excluding > {p_ver_idx}"
4124 )
4125 .unwrap();
4126 }
4127 BacktraceFilter::First => {
4128 sql.push_str(" ORDER BY version_min_including LIMIT 1");
4129 }
4130 BacktraceFilter::Last => {
4131 sql.push_str(" ORDER BY version_min_including DESC LIMIT 1");
4132 }
4133 }
4134
4135 let params_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> =
4136 params.iter().map(|p| p.as_ref() as _).collect();
4137
4138 let row = tx.query_one(&sql, ¶ms_refs).await?;
4139
4140 let component_id: Json<ComponentId> = get(&row, "component_id")?;
4141 let component_id = component_id.0;
4142
4143 let version_min_including =
4144 Version::try_from(get::<i64, _>(&row, "version_min_including")?)?;
4145
4146 let version_max_excluding =
4147 Version::try_from(get::<i64, _>(&row, "version_max_excluding")?)?;
4148
4149 let wasm_backtrace: Json<WasmBacktrace> = get(&row, "wasm_backtrace")?;
4151 let wasm_backtrace = wasm_backtrace.0;
4152
4153 tx.commit().await?;
4154
4155 Ok(BacktraceInfo {
4156 execution_id: execution_id.clone(),
4157 component_id,
4158 version_min_including,
4159 version_max_excluding,
4160 wasm_backtrace,
4161 })
4162 }
4163
4164 #[instrument(skip_all)]
4165 async fn upsert_source_file(
4166 &self,
4167 component_digest: &ComponentDigest,
4168 frame_key: &str,
4169 is_suffix: bool,
4170 content: &str,
4171 ) -> Result<(), DbErrorWrite> {
4172 let content_hash: [u8; 32] = Sha256::digest(content.as_bytes()).into();
4173 let mut client_guard = self.client.lock().await;
4174 let tx = client_guard.transaction().await?;
4175 tx.execute(
4176 "INSERT INTO t_source_file (content_hash, content) \
4177 VALUES ($1, $2) \
4178 ON CONFLICT (content_hash) DO NOTHING",
4179 &[&content_hash.as_slice(), &content],
4180 )
4181 .await?;
4182 tx.execute(
4183 "INSERT INTO t_component_source \
4184 (component_digest, frame_key, is_suffix, content_hash) \
4185 VALUES ($1, $2, $3, $4) \
4186 ON CONFLICT (component_digest, frame_key, is_suffix) DO NOTHING",
4187 &[
4188 &component_digest.as_slice(),
4189 &frame_key,
4190 &is_suffix,
4191 &content_hash.as_slice(),
4192 ],
4193 )
4194 .await?;
4195 tx.commit().await?;
4196 Ok(())
4197 }
4198
4199 #[instrument(skip_all)]
4200 async fn get_source_file(
4201 &self,
4202 component_digest: &ComponentDigest,
4203 file: &str,
4204 ) -> Result<Option<String>, DbErrorRead> {
4205 let mut client_guard = self.client.lock().await;
4206 let tx = client_guard.transaction().await?;
4207 let rows = tx
4208 .query(
4209 "SELECT s.content \
4210 FROM t_component_source cs \
4211 JOIN t_source_file s ON cs.content_hash = s.content_hash \
4212 WHERE cs.component_digest = $1 \
4213 AND ( \
4214 (NOT cs.is_suffix AND cs.frame_key = $2) \
4215 OR (cs.is_suffix AND right($2, length(cs.frame_key)) = cs.frame_key) \
4216 )",
4217 &[&component_digest.as_slice(), &file],
4218 )
4219 .await?;
4220 tx.commit().await?;
4221 match rows.len() {
4222 0 => Ok(None),
4223 1 => Ok(Some(get::<String, _>(&rows[0], "content")?)),
4224 _ => {
4225 warn!("Multiple suffix matches for '{file}', returning None");
4226 Ok(None)
4227 }
4228 }
4229 }
4230
4231 #[instrument(skip(self))]
4232 async fn list_executions(
4233 &self,
4234 filter: ListExecutionsFilter,
4235 pagination: ExecutionListPagination,
4236 ) -> Result<Vec<ExecutionWithState>, DbErrorGeneric> {
4237 let mut client_guard = self.client.lock().await;
4238 let tx = client_guard.transaction().await?;
4239
4240 let result = list_executions(&tx, filter, &pagination).await?;
4241
4242 tx.commit().await?;
4243 Ok(result)
4244 }
4245
4246 #[instrument(skip(self))]
4247 async fn list_execution_events(
4248 &self,
4249 execution_id: &ExecutionId,
4250 pagination: Pagination<VersionType>,
4251 include_backtrace_id: bool,
4252 ) -> Result<ListExecutionEventsResponse, DbErrorRead> {
4253 let mut client_guard = self.client.lock().await;
4254 let tx = client_guard.transaction().await?;
4255
4256 let events =
4257 list_execution_events(&tx, execution_id, pagination, include_backtrace_id).await?;
4258 let max_version = get_max_version(&tx, execution_id).await?;
4259
4260 tx.commit().await?;
4261 Ok(ListExecutionEventsResponse {
4262 events,
4263 max_version,
4264 })
4265 }
4266
4267 #[instrument(skip(self))]
4268 async fn list_responses(
4269 &self,
4270 execution_id: &ExecutionId,
4271 pagination: Pagination<u32>,
4272 ) -> Result<ListResponsesResponse, DbErrorRead> {
4273 let mut client_guard = self.client.lock().await;
4274 let tx = client_guard.transaction().await?;
4275
4276 let responses = list_responses(&tx, execution_id, Some(pagination)).await?;
4277 let max_cursor = get_max_response_cursor(&tx, execution_id).await?;
4278
4279 tx.commit().await?;
4280 Ok(ListResponsesResponse {
4281 responses,
4282 max_cursor,
4283 })
4284 }
4285
4286 #[instrument(skip(self))]
4287 async fn list_execution_events_responses(
4288 &self,
4289 execution_id: &ExecutionId,
4290 req_since: &Version,
4291 req_max_length: VersionType,
4292 req_include_backtrace_id: bool,
4293 resp_pagination: Pagination<u32>,
4294 ) -> Result<ExecutionWithStateRequestsResponses, DbErrorRead> {
4295 let mut client_guard = self.client.lock().await;
4296 let tx = client_guard.transaction().await?;
4297
4298 let combined_state = get_combined_state(&tx, execution_id).await?;
4299
4300 let events = list_execution_events(
4301 &tx,
4302 execution_id,
4303 Pagination::NewerThan {
4304 length: req_max_length
4305 .try_into()
4306 .expect("req_max_length fits in u16"),
4307 cursor: req_since.0,
4308 including_cursor: true,
4309 },
4310 req_include_backtrace_id,
4311 )
4312 .await?;
4313
4314 let responses = list_responses(&tx, execution_id, Some(resp_pagination)).await?;
4315 let max_version = get_max_version(&tx, execution_id).await?;
4316 let max_cursor = get_max_response_cursor(&tx, execution_id).await?;
4317
4318 tx.commit().await?;
4319
4320 Ok(ExecutionWithStateRequestsResponses {
4321 execution_with_state: combined_state.execution_with_state,
4322 events,
4323 responses,
4324 max_version,
4325 max_cursor,
4326 })
4327 }
4328
4329 #[instrument(skip(self))]
4330 async fn upgrade_execution_component(
4331 &self,
4332 execution_id: &ExecutionId,
4333 old: &ComponentDigest,
4334 new: &ComponentDigest,
4335 ) -> Result<(), DbErrorWrite> {
4336 let mut client_guard = self.client.lock().await;
4337 let tx = client_guard.transaction().await?;
4338
4339 upgrade_execution_component(&tx, execution_id, old, new).await?;
4340
4341 tx.commit().await?;
4342 Ok(())
4343 }
4344
4345 #[instrument(skip(self))]
4346 async fn list_logs(
4347 &self,
4348 execution_id: &ExecutionId,
4349 show_derived: bool,
4350 filter: LogFilter,
4351 pagination: Pagination<DateTime<Utc>>,
4352 ) -> Result<ListLogsResponse, DbErrorRead> {
4353 let mut client_guard = self.client.lock().await;
4354 let tx = client_guard.transaction().await?;
4355 let responses = list_logs_tx(&tx, execution_id, show_derived, &filter, &pagination).await?;
4356 tx.commit().await?;
4357 Ok(responses)
4358 }
4359
4360 #[instrument(skip(self))]
4361 async fn list_deployment_states(
4362 &self,
4363 current_time: DateTime<Utc>,
4364 pagination: Pagination<Option<DeploymentId>>,
4365 include_config_json: bool,
4366 ) -> Result<Vec<DeploymentState>, DbErrorRead> {
4367 let mut client_guard = self.client.lock().await;
4368 let tx = client_guard.transaction().await?;
4369 let deployments =
4370 list_deployment_states(&tx, current_time, pagination, include_config_json).await?;
4371 tx.commit().await?;
4372 Ok(deployments)
4373 }
4374
4375 #[instrument(skip(self))]
4376 async fn insert_deployment(&self, record: DeploymentRecord) -> Result<(), DbErrorWrite> {
4377 assert_eq!(
4378 record.status,
4379 DeploymentStatus::Inactive,
4380 "insert_deployment requires Inactive status"
4381 );
4382 assert!(
4383 record.last_active_at.is_none(),
4384 "insert_deployment requires last_active_at == None"
4385 );
4386 let mut client_guard = self.client.lock().await;
4387 let tx = client_guard.transaction().await?;
4388 tx.execute(
4389 "INSERT INTO t_deployment \
4390 (deployment_id, created_at, status, config_json, obelisk_version, created_by) \
4391 VALUES ($1, $2, $3, $4, $5, $6)",
4392 &[
4393 &record.deployment_id.to_string(), &record.created_at, &record.status.as_str(), &record.config_json, &record.obelisk_version, &record.created_by, ],
4400 )
4401 .await?;
4402 tx.commit().await?;
4403 Ok(())
4404 }
4405
4406 #[instrument(skip(self))]
4407 async fn activate_deployment(
4408 &self,
4409 deployment_id: DeploymentId,
4410 now: DateTime<Utc>,
4411 ) -> Result<(), DbErrorWrite> {
4412 let mut client_guard = self.client.lock().await;
4413 let tx = client_guard.transaction().await?;
4414 tx.execute(
4416 "UPDATE t_deployment SET status = 'inactive' WHERE status IN ('active', 'enqueued')",
4417 &[],
4418 )
4419 .await?;
4420 let rows = tx
4422 .execute(
4423 "UPDATE t_deployment SET status = 'active', last_active_at = $1 WHERE deployment_id = $2",
4424 &[&now, &deployment_id.to_string()],
4425 )
4426 .await?;
4427 tx.commit().await?;
4428 if rows == 0 {
4429 return Err(DbErrorWrite::NotFound);
4430 }
4431 Ok(())
4432 }
4433
4434 async fn enqueue_deployment(&self, deployment_id: DeploymentId) -> Result<(), DbErrorWrite> {
4435 let mut client_guard = self.client.lock().await;
4436 let tx = client_guard.transaction().await?;
4437 let status_opt = tx
4439 .query_opt(
4440 "SELECT status FROM t_deployment WHERE deployment_id = $1",
4441 &[&deployment_id.to_string()],
4442 )
4443 .await?;
4444 match status_opt.as_ref().map(|r| r.get::<_, &str>("status")) {
4445 None => return Err(DbErrorWrite::NotFound),
4446 Some("active") => return Err(DbErrorWriteNonRetriable::Conflict.into()),
4447 _ => {}
4448 }
4449 tx.execute(
4451 "UPDATE t_deployment SET status = 'inactive' WHERE status = 'enqueued'",
4452 &[],
4453 )
4454 .await?;
4455 let rows = tx
4457 .execute(
4458 "UPDATE t_deployment SET status = 'enqueued' WHERE deployment_id = $1",
4459 &[&deployment_id.to_string()],
4460 )
4461 .await?;
4462 tx.commit().await?;
4463 if rows == 0 {
4464 return Err(DbErrorWrite::NotFound);
4465 }
4466 Ok(())
4467 }
4468
4469 #[instrument(skip(self))]
4470 async fn get_deployment(
4471 &self,
4472 deployment_id: DeploymentId,
4473 ) -> Result<Option<DeploymentRecord>, DbErrorRead> {
4474 let mut client_guard = self.client.lock().await;
4475 let tx = client_guard.transaction().await?;
4476 let row = tx
4477 .query_opt(
4478 "SELECT deployment_id, created_at, last_active_at, status, config_json, obelisk_version, created_by \
4479 FROM t_deployment WHERE deployment_id = $1",
4480 &[&deployment_id.to_string()],
4481 )
4482 .await?;
4483 tx.commit().await?;
4484 match row {
4485 None => Ok(None),
4486 Some(r) => Ok(Some(deployment_record_from_pg_row(&r)?)),
4487 }
4488 }
4489
4490 #[instrument(skip(self))]
4491 #[cfg(feature = "test")]
4492 async fn get_active_deployment(&self) -> Result<Option<DeploymentRecord>, DbErrorRead> {
4493 let mut client_guard = self.client.lock().await;
4494 let tx = client_guard.transaction().await?;
4495 let row = tx
4496 .query_opt(
4497 "SELECT deployment_id, created_at, last_active_at, status, config_json, obelisk_version, created_by \
4498 FROM t_deployment WHERE status = 'active' LIMIT 1",
4499 &[],
4500 )
4501 .await?;
4502 tx.commit().await?;
4503 match row {
4504 None => Ok(None),
4505 Some(r) => Ok(Some(deployment_record_from_pg_row(&r)?)),
4506 }
4507 }
4508
4509 async fn get_current_deployment(&self) -> Result<Option<DeploymentRecord>, DbErrorRead> {
4510 let mut client_guard = self.client.lock().await;
4511 let tx = client_guard.transaction().await?;
4512 let row = tx
4513 .query_opt(
4514 "SELECT deployment_id, created_at, last_active_at, status, config_json, obelisk_version, created_by \
4515 FROM t_deployment WHERE status IN ('enqueued', 'active') \
4516 ORDER BY CASE status WHEN 'enqueued' THEN 0 ELSE 1 END LIMIT 1",
4517 &[],
4518 )
4519 .await?;
4520 tx.commit().await?;
4521 match row {
4522 None => Ok(None),
4523 Some(r) => Ok(Some(deployment_record_from_pg_row(&r)?)),
4524 }
4525 }
4526
4527 #[instrument(skip(self))]
4528 async fn list_deployments(
4529 &self,
4530 pagination: Pagination<Option<DeploymentId>>,
4531 ) -> Result<Vec<DeploymentRecord>, DbErrorRead> {
4532 let mut client_guard = self.client.lock().await;
4533 let tx = client_guard.transaction().await?;
4534 let mut params: Vec<Box<dyn tokio_postgres::types::ToSql + Sync + Send>> = Vec::new();
4535 let mut add_param = |p: Box<dyn tokio_postgres::types::ToSql + Sync + Send>| {
4536 params.push(p);
4537 format!("${}", params.len())
4538 };
4539
4540 let mut sql = String::from(
4541 "SELECT deployment_id, created_at, last_active_at, status, config_json, obelisk_version, created_by \
4542 FROM t_deployment",
4543 );
4544
4545 if let Some(cursor) = pagination.cursor() {
4546 let p_cursor = add_param(Box::new(cursor.to_string()));
4547 write!(
4548 sql,
4549 " WHERE deployment_id {rel} {p_cursor}",
4550 rel = pagination.rel()
4551 )
4552 .expect("writing to string");
4553 }
4554
4555 let (inner_order, outer_order) = if pagination.is_desc() {
4556 ("DESC", "")
4557 } else {
4558 ("ASC", "DESC")
4559 };
4560
4561 write!(
4562 sql,
4563 " ORDER BY deployment_id {inner_order} LIMIT {limit}",
4564 limit = pagination.length()
4565 )
4566 .expect("writing to string");
4567
4568 let final_sql = if outer_order.is_empty() {
4569 sql
4570 } else {
4571 format!("SELECT * FROM ({sql}) AS sub ORDER BY deployment_id {outer_order}")
4572 };
4573
4574 let params_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> =
4575 params.iter().map(|p| p.as_ref() as _).collect();
4576
4577 let rows = tx.query(&final_sql, ¶ms_refs).await?;
4578 tx.commit().await?;
4579
4580 rows.iter()
4581 .map(deployment_record_from_pg_row)
4582 .collect::<Result<Vec<_>, _>>()
4583 }
4584
4585 #[instrument(skip(self))]
4586 async fn pause_execution(
4587 &self,
4588 execution_id: &ExecutionId,
4589 paused_at: DateTime<Utc>,
4590 ) -> Result<AppendResponse, DbErrorWrite> {
4591 let mut client_guard = self.client.lock().await;
4592 let tx = client_guard.transaction().await?;
4593
4594 let combined_state = get_combined_state(&tx, execution_id).await?;
4595 let appending_version = combined_state.get_next_version_fail_if_finished()?;
4596 debug!("Pausing with {appending_version}");
4597 let (next_version, _) = append(
4598 &tx,
4599 execution_id,
4600 AppendRequest {
4601 created_at: paused_at,
4602 event: ExecutionRequest::Paused,
4603 },
4604 appending_version,
4605 )
4606 .await?;
4607
4608 tx.commit().await?;
4609 Ok(next_version)
4610 }
4611
4612 #[instrument(skip(self))]
4613 async fn unpause_execution(
4614 &self,
4615 execution_id: &ExecutionId,
4616 unpaused_at: DateTime<Utc>,
4617 ) -> Result<AppendResponse, DbErrorWrite> {
4618 let mut client_guard = self.client.lock().await;
4619 let tx = client_guard.transaction().await?;
4620
4621 let combined_state = get_combined_state(&tx, execution_id).await?;
4622 let appending_version = combined_state.get_next_version_fail_if_finished()?;
4623 debug!("Unpausing with {appending_version}");
4624 let (next_version, _) = append(
4625 &tx,
4626 execution_id,
4627 AppendRequest {
4628 created_at: unpaused_at,
4629 event: ExecutionRequest::Unpaused,
4630 },
4631 appending_version,
4632 )
4633 .await?;
4634
4635 tx.commit().await?;
4636 Ok(next_version)
4637 }
4638}
4639
4640#[async_trait]
4641impl DbPoolCloseable for PostgresPool {
4642 async fn close(&self) {
4643 self.pool.close();
4644 }
4645}
4646
4647#[cfg(feature = "test")]
4648#[async_trait]
4649impl concepts::storage::DbConnectionTest for PostgresConnection {
4650 #[instrument(level = Level::DEBUG, skip(self, response_event), fields(join_set_id = %response_event.join_set_id))]
4651 async fn append_response(
4652 &self,
4653 created_at: DateTime<Utc>,
4654 execution_id: ExecutionId,
4655 response_event: JoinSetResponseEvent,
4656 ) -> Result<(), DbErrorWrite> {
4657 debug!("append_response");
4658 let event = JoinSetResponseEventOuter {
4659 created_at,
4660 event: response_event,
4661 };
4662
4663 let mut client_guard = self.client.lock().await;
4664 let tx = client_guard.transaction().await?;
4665
4666 let notifier = append_response(&tx, &execution_id, event).await?;
4667
4668 tx.commit().await?;
4669 drop(client_guard);
4670
4671 self.notify_all(vec![notifier], created_at);
4672 Ok(())
4673 }
4674}
4675
4676#[cfg(feature = "test")]
4677impl PostgresPool {
4678 pub async fn drop_database(&self) {
4679 let mut cfg = deadpool_postgres::Config::new();
4680 cfg.host = Some(self.config.host.clone());
4681 cfg.user = Some(self.config.user.clone());
4682 cfg.password = Some(self.config.password.expose_secret().to_string());
4683 cfg.dbname = Some(ADMIN_DB_NAME.into());
4684 cfg.manager = Some(ManagerConfig {
4685 recycling_method: RecyclingMethod::Fast,
4686 });
4687
4688 let pool = cfg
4689 .create_pool(None, NoTls)
4690 .map_err(|err| {
4691 error!("Cannot create the default pool - {err:?}");
4692 InitializationError
4693 })
4694 .unwrap();
4695
4696 let client = pool
4697 .get()
4698 .await
4699 .map_err(|err| {
4700 error!("Cannot get a connection from the default pool - {err:?}");
4701 InitializationError
4702 })
4703 .unwrap();
4704 for _ in 0..3 {
4705 let res = client
4706 .execute(&format!("DROP DATABASE {}", self.config.db_name), &[])
4707 .await; if res.is_ok() {
4709 debug!("Database '{}' dropped.", self.config.db_name);
4710 return;
4711 }
4712 debug!("Dropping db failed - {res:?}",);
4713 }
4714 warn!("Did not drop database {}", self.config.db_name);
4715 }
4716}