1use crate::postgres_dao::ddl::{ADMIN_DB_NAME, T_METADATA_EXPECTED_SCHEMA_VERSION};
2use async_trait::async_trait;
3use chrono::{DateTime, Utc};
4use concepts::{
5 ComponentId, ComponentRetryConfig, ComponentType, ContentDigest, ExecutionId, FunctionFqn,
6 JoinSetId, StrVariant, SupportedFunctionReturnValue,
7 component_id::{Digest, InputContentDigest},
8 prefixed_ulid::{DelayId, DeploymentId, ExecutionIdDerived, ExecutorId, RunId},
9 storage::{
10 AppendBatchResponse, AppendDelayResponseOutcome, AppendEventsToExecution, AppendRequest,
11 AppendResponse, AppendResponseToExecution, BacktraceFilter, BacktraceInfo, CreateRequest,
12 DUMMY_CREATED, DUMMY_HISTORY_EVENT, DbConnection, DbErrorGeneric, DbErrorRead,
13 DbErrorReadWithTimeout, DbErrorWrite, DbErrorWriteNonRetriable, DbExecutor, DbExternalApi,
14 DbPool, DbPoolCloseable, DeploymentState, ExecutionEvent, ExecutionListPagination,
15 ExecutionRequest, ExecutionWithState, ExecutionWithStateRequestsResponses, ExpiredDelay,
16 ExpiredLock, ExpiredTimer, HISTORY_EVENT_TYPE_JOIN_NEXT, HistoryEvent, JoinSetRequest,
17 JoinSetResponse, JoinSetResponseEvent, JoinSetResponseEventOuter,
18 ListExecutionEventsResponse, ListExecutionsFilter, ListLogsResponse, ListResponsesResponse,
19 LockPendingResponse, Locked, LockedBy, LockedExecution, LogEntry, LogEntryRow, LogFilter,
20 LogInfoAppendRow, LogLevel, LogStreamType, Pagination, PendingState,
21 PendingStateBlockedByJoinSet, PendingStateFinishedResultKind, PendingStateMergedPause,
22 ResponseCursor, ResponseWithCursor, STATE_BLOCKED_BY_JOIN_SET, STATE_FINISHED,
23 STATE_LOCKED, STATE_PENDING_AT, TimeoutOutcome, Version, VersionType, WasmBacktrace,
24 },
25};
26use db_common::{
27 AppendNotifier, CombinedState, CombinedStateDTO, NotifierExecutionFinished, NotifierPendingAt,
28 PendingFfqnSubscribersHolder,
29};
30use deadpool_postgres::{Client, ManagerConfig, Pool, RecyclingMethod};
31use hashbrown::HashMap;
32use std::{collections::VecDeque, pin::Pin, str::FromStr as _, sync::Arc, time::Duration};
33use std::{fmt::Write as _, panic::Location};
34use strum::IntoEnumIterator as _;
35use tokio::sync::{mpsc, oneshot};
36use tokio_postgres::{
37 NoTls, Row, Transaction,
38 row::RowIndex,
39 types::{FromSql, Json, ToSql},
40};
41use tracing::{Level, debug, error, info, instrument, trace, warn};
42use tracing_error::SpanTrace;
43
44#[track_caller]
45fn get<'a, T: FromSql<'a>, I: RowIndex + std::fmt::Display + Copy>(
46 row: &'a Row,
47 name: I,
48) -> Result<T, DbErrorGeneric> {
49 match row.try_get(name) {
50 Ok(ok) => Ok(ok),
51 Err(err) => {
52 Err(consistency_db_err(format!(
54 "Failed to retrieve column '{name}': {err:?}"
55 )))
56 }
57 }
58}
59
60mod ddl {
61 use super::{STATE_LOCKED, STATE_PENDING_AT};
62 use concepts::storage::HISTORY_EVENT_TYPE_JOIN_NEXT;
63 use const_format::formatcp;
64
65 pub const ADMIN_DB_NAME: &str = "postgres";
66
67 pub const T_METADATA_EXPECTED_SCHEMA_VERSION: i32 = 2;
68
69 pub const CREATE_TABLE_T_METADATA: &str = r"
71CREATE TABLE IF NOT EXISTS t_metadata (
72 id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
73 schema_version INTEGER NOT NULL,
74 created_at TIMESTAMPTZ NOT NULL
75);
76";
77
78 pub const CREATE_TABLE_T_EXECUTION_LOG: &str = r"
80CREATE TABLE IF NOT EXISTS t_execution_log (
81 execution_id TEXT NOT NULL,
82 created_at TIMESTAMPTZ NOT NULL,
83 json_value JSON NOT NULL,
84 version BIGINT NOT NULL CHECK (version >= 0),
85 variant TEXT NOT NULL,
86 join_set_id TEXT,
87 history_event_type TEXT GENERATED ALWAYS AS (json_value #>> '{history_event,event,type}') STORED,
88 PRIMARY KEY (execution_id, version)
89);
90";
91
92 pub const CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_VERSION: &str = r"
94CREATE INDEX IF NOT EXISTS idx_t_execution_log_execution_id_version ON t_execution_log (execution_id, version);
95";
96
97 pub const CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_VARIANT: &str = r"
98CREATE INDEX IF NOT EXISTS idx_t_execution_log_execution_id_variant ON t_execution_log (execution_id, variant);
99";
100
101 pub const CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_JOIN_SET: &str = const_format::formatcp!(
102 "CREATE INDEX IF NOT EXISTS idx_t_execution_log_execution_id_join_set ON t_execution_log
103 (execution_id, join_set_id, history_event_type) WHERE history_event_type='{}';",
104 HISTORY_EVENT_TYPE_JOIN_NEXT
105 );
106
107 pub const CREATE_TABLE_T_JOIN_SET_RESPONSE: &str = r"
109CREATE TABLE IF NOT EXISTS t_join_set_response (
110 id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
111 created_at TIMESTAMPTZ NOT NULL,
112 execution_id TEXT NOT NULL,
113 join_set_id TEXT NOT NULL,
114
115 delay_id TEXT,
116 delay_success BOOLEAN,
117
118 child_execution_id TEXT,
119 finished_version BIGINT CHECK (finished_version >= 0),
120
121 UNIQUE (execution_id, join_set_id, delay_id, child_execution_id)
122);
123";
124
125 pub const CREATE_INDEX_IDX_T_JOIN_SET_RESPONSE_EXECUTION_ID_ID: &str = r"
127CREATE INDEX IF NOT EXISTS idx_t_join_set_response_execution_id_id ON t_join_set_response (execution_id, id);
128";
129
130 pub const CREATE_INDEX_IDX_JOIN_SET_RESPONSE_UNIQUE_CHILD_ID: &str = r"
131CREATE UNIQUE INDEX IF NOT EXISTS idx_join_set_response_unique_child_id
132ON t_join_set_response (child_execution_id) WHERE child_execution_id IS NOT NULL;
133";
134
135 pub const CREATE_INDEX_IDX_JOIN_SET_RESPONSE_UNIQUE_DELAY_ID: &str = r"
136CREATE UNIQUE INDEX IF NOT EXISTS idx_join_set_response_unique_delay_id
137ON t_join_set_response (delay_id) WHERE delay_id IS NOT NULL;
138";
139
140 pub const CREATE_TABLE_T_STATE: &str = r"
142CREATE TABLE IF NOT EXISTS t_state (
143 execution_id TEXT NOT NULL,
144 is_top_level BOOLEAN NOT NULL,
145 corresponding_version BIGINT NOT NULL CHECK (corresponding_version >= 0),
146 ffqn TEXT NOT NULL,
147 created_at TIMESTAMPTZ NOT NULL,
148 component_id_input_digest BYTEA NOT NULL,
149 component_type TEXT NOT NULL,
150 first_scheduled_at TIMESTAMPTZ NOT NULL,
151 deployment_id TEXT NOT NULL,
152 is_paused BOOLEAN NOT NULL,
153
154 pending_expires_finished TIMESTAMPTZ NOT NULL,
155 state TEXT NOT NULL,
156 updated_at TIMESTAMPTZ NOT NULL,
157 intermittent_event_count BIGINT NOT NULL CHECK (intermittent_event_count >=0),
158
159 max_retries BIGINT CHECK (max_retries >= 0),
160 retry_exp_backoff_millis BIGINT CHECK (retry_exp_backoff_millis >= 0),
161 last_lock_version BIGINT CHECK (last_lock_version >= 0),
162 executor_id TEXT,
163 run_id TEXT,
164
165 join_set_id TEXT,
166 join_set_closing BOOLEAN,
167
168 result_kind JSONB,
169
170 PRIMARY KEY (execution_id)
171);
172";
173
174 pub const IDX_T_STATE_LOCK_PENDING_BY_FFQN: &str = formatcp!(
177 "CREATE INDEX IF NOT EXISTS idx_t_state_lock_pending_by_ffqn ON t_state (state, pending_expires_finished, ffqn) WHERE state = '{}';",
178 STATE_PENDING_AT
179 );
180 pub const IDX_T_STATE_LOCK_PENDING_BY_COMPONENT: &str = formatcp!(
182 "CREATE INDEX IF NOT EXISTS idx_t_state_lock_pending_by_component ON t_state (state, pending_expires_finished, component_id_input_digest) WHERE state = '{}';",
183 STATE_PENDING_AT
184 );
185
186 pub const IDX_T_STATE_EXPIRED_LOCKS: &str = formatcp!(
187 "CREATE INDEX IF NOT EXISTS idx_t_state_expired_locks ON t_state (pending_expires_finished) WHERE state = '{}';",
188 STATE_LOCKED
189 );
190
191 pub const IDX_T_STATE_EXECUTION_ID_IS_TOP_LEVEL: &str = r"
192CREATE INDEX IF NOT EXISTS idx_t_state_execution_id_is_root ON t_state (execution_id, is_top_level);
193";
194
195 pub const IDX_T_STATE_FFQN: &str = r"
196CREATE INDEX IF NOT EXISTS idx_t_state_ffqn ON t_state (ffqn);
197";
198
199 pub const IDX_T_STATE_CREATED_AT: &str = r"
200CREATE INDEX IF NOT EXISTS idx_t_state_created_at ON t_state (created_at);
201";
202 pub const IDX_T_STATE_DEPLOYMENT_STATE: &str = r"
204CREATE INDEX IF NOT EXISTS idx_t_state_deployment_state ON t_state (deployment_id, state);
205";
206
207 pub const CREATE_TABLE_T_DELAY: &str = r"
209CREATE TABLE IF NOT EXISTS t_delay (
210 execution_id TEXT NOT NULL,
211 join_set_id TEXT NOT NULL,
212 delay_id TEXT NOT NULL,
213 expires_at TIMESTAMPTZ NOT NULL,
214 PRIMARY KEY (execution_id, join_set_id, delay_id)
215);
216";
217
218 pub const CREATE_TABLE_T_WASM_BACKTRACE: &str = r"
221CREATE TABLE IF NOT EXISTS t_wasm_backtrace (
222 backtrace_hash BYTEA PRIMARY KEY,
223 wasm_backtrace JSONB NOT NULL
224);
225";
226
227 pub const CREATE_TABLE_T_EXECUTION_BACKTRACE: &str = r"
228CREATE TABLE IF NOT EXISTS t_execution_backtrace (
229 execution_id TEXT NOT NULL,
230 component_id JSONB NOT NULL,
231 version_min_including BIGINT NOT NULL CHECK (version_min_including >= 0),
232 version_max_excluding BIGINT NOT NULL CHECK (version_max_excluding >= 0),
233 backtrace_hash BYTEA NOT NULL,
234 PRIMARY KEY (execution_id, version_min_including, version_max_excluding),
235 FOREIGN KEY (backtrace_hash) REFERENCES t_wasm_backtrace(backtrace_hash)
236);
237";
238
239 pub const IDX_T_EXECUTION_BACKTRACE_EXECUTION_ID_VERSION: &str = r"
240CREATE INDEX IF NOT EXISTS idx_t_execution_backtrace_execution_id_version ON t_execution_backtrace (execution_id, version_min_including, version_max_excluding);
241";
242
243 pub const CREATE_TABLE_T_LOG: &str = r"
245CREATE TABLE IF NOT EXISTS t_log (
246 id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
247 execution_id TEXT NOT NULL,
248 run_id TEXT NOT NULL,
249 created_at TIMESTAMPTZ NOT NULL,
250 level INTEGER,
251 message TEXT,
252 stream_type INTEGER,
253 payload BYTEA
254);
255";
256 pub const IDX_T_LOG_EXECUTION_ID_RUN_ID_CREATED_AT: &str = r"
257CREATE INDEX IF NOT EXISTS idx_t_log_execution_id_run_id_created_at ON t_log (execution_id, run_id, created_at);
258";
259 pub const IDX_T_LOG_EXECUTION_ID_CREATED_AT: &str = r"
260CREATE INDEX IF NOT EXISTS idx_t_log_execution_id_created_at ON t_log (execution_id, created_at);
261";
262}
263
264#[derive(derive_more::Debug, Clone)]
265pub struct PostgresConfig {
266 pub host: String,
267 pub user: String,
268 #[debug(skip)]
269 pub password: String,
270 pub db_name: String,
271}
272
273#[derive(Debug, thiserror::Error)]
274#[error("initialization error")]
275pub struct InitializationError;
276
277async fn create_database(
278 config: &PostgresConfig,
279 provision_policy: ProvisionPolicy,
280) -> Result<DbInitialzationOutcome, InitializationError> {
281 let mut admin_cfg = deadpool_postgres::Config::new();
282 admin_cfg.host = Some(config.host.clone());
283 admin_cfg.user = Some(config.user.clone());
284 admin_cfg.password = Some(config.password.clone());
285 admin_cfg.dbname = Some(ADMIN_DB_NAME.into());
286 admin_cfg.manager = Some(ManagerConfig {
287 recycling_method: RecyclingMethod::Fast,
288 });
289
290 let admin_pool = admin_cfg.create_pool(None, NoTls).map_err(|err| {
291 error!("Cannot create the default pool - {err:?}");
292 InitializationError
293 })?;
294
295 let client = admin_pool.get().await.map_err(|err| {
296 error!("Cannot get a connection from the default pool - {err:?}");
297 InitializationError
298 })?;
299
300 let row = client
301 .query_opt(
302 &format!(
303 "SELECT 1 FROM pg_database WHERE datname = '{}'",
304 config.db_name
305 ),
306 &[],
307 )
308 .await
309 .map_err(|err| {
310 error!("Cannot select from the default database - {err:?}");
311 InitializationError
312 })?;
313
314 match (row, provision_policy) {
315 (None, ProvisionPolicy::MustCreate | ProvisionPolicy::Auto) => {
316 client
317 .execute(&format!("CREATE DATABASE {}", config.db_name), &[])
318 .await
319 .map_err(|err| {
320 error!("Cannot create the database - {err:?}");
321 InitializationError
322 })?;
323 info!("Database '{}' created.", config.db_name);
324 Ok(DbInitialzationOutcome::Created)
325 }
326 (Some(_), ProvisionPolicy::Auto) => {
327 info!("Database '{}' exists.", config.db_name);
328 Ok(DbInitialzationOutcome::Existing)
329 }
330 (Some(_), ProvisionPolicy::MustCreate) => {
331 warn!("Database '{}' already exists.", config.db_name);
332 Err(InitializationError)
333 }
334 (_, ProvisionPolicy::NeverCreate) => unreachable!("checked by the caller"),
335 }
336}
337
338type ResponseSubscribers =
340 Arc<std::sync::Mutex<HashMap<ExecutionId, (oneshot::Sender<ResponseWithCursor>, u64)>>>;
341type PendingSubscribers = Arc<std::sync::Mutex<PendingFfqnSubscribersHolder>>;
342type ExecutionFinishedSubscribers = std::sync::Mutex<
343 HashMap<ExecutionId, HashMap<u64, oneshot::Sender<SupportedFunctionReturnValue>>>,
344>;
345
346pub struct PostgresPool {
347 pool: Pool,
348 response_subscribers: ResponseSubscribers,
349 pending_subscribers: PendingSubscribers,
350 execution_finished_subscribers: Arc<ExecutionFinishedSubscribers>,
351 pub config: PostgresConfig,
353}
354
355#[async_trait]
356impl DbPool for PostgresPool {
357 async fn db_exec_conn(&self) -> Result<Box<dyn DbExecutor>, DbErrorGeneric> {
358 let client = self.pool.get().await?;
359
360 Ok(Box::new(PostgresConnection {
361 client: tokio::sync::Mutex::new(client),
362 response_subscribers: self.response_subscribers.clone(),
363 pending_subscribers: self.pending_subscribers.clone(),
364 execution_finished_subscribers: self.execution_finished_subscribers.clone(),
365 }))
366 }
367
368 async fn connection(&self) -> Result<Box<dyn DbConnection>, DbErrorGeneric> {
369 let client = self.pool.get().await?;
370
371 Ok(Box::new(PostgresConnection {
372 client: tokio::sync::Mutex::new(client),
373 response_subscribers: self.response_subscribers.clone(),
374 pending_subscribers: self.pending_subscribers.clone(),
375 execution_finished_subscribers: self.execution_finished_subscribers.clone(),
376 }))
377 }
378
379 async fn external_api_conn(&self) -> Result<Box<dyn DbExternalApi>, DbErrorGeneric> {
380 let client = self.pool.get().await?;
381
382 Ok(Box::new(PostgresConnection {
383 client: tokio::sync::Mutex::new(client),
384 response_subscribers: self.response_subscribers.clone(),
385 pending_subscribers: self.pending_subscribers.clone(),
386 execution_finished_subscribers: self.execution_finished_subscribers.clone(),
387 }))
388 }
389
390 #[cfg(feature = "test")]
391 async fn connection_test(
392 &self,
393 ) -> Result<Box<dyn concepts::storage::DbConnectionTest>, DbErrorGeneric> {
394 let client = self.pool.get().await?;
395
396 Ok(Box::new(PostgresConnection {
397 client: tokio::sync::Mutex::new(client),
398 response_subscribers: self.response_subscribers.clone(),
399 pending_subscribers: self.pending_subscribers.clone(),
400 execution_finished_subscribers: self.execution_finished_subscribers.clone(),
401 }))
402 }
403}
404
405pub struct PostgresConnection {
406 client: tokio::sync::Mutex<Client>, response_subscribers: ResponseSubscribers,
408 pending_subscribers: PendingSubscribers,
409 execution_finished_subscribers: Arc<ExecutionFinishedSubscribers>,
410}
411
412#[derive(Debug, Clone, Copy, PartialEq, Eq)]
413pub enum ProvisionPolicy {
414 NeverCreate,
415 Auto,
417 MustCreate,
419}
420
421#[derive(Debug, Clone, Copy, PartialEq, Eq)]
422pub enum DbInitialzationOutcome {
423 Created,
424 Existing,
425}
426
427impl PostgresPool {
428 #[instrument(skip_all, name = "postgres_new")]
429 pub async fn new(
430 config: PostgresConfig,
431 provision_policy: ProvisionPolicy,
432 ) -> Result<PostgresPool, InitializationError> {
433 Self::new_with_outcome(config, provision_policy)
434 .await
435 .map(|(db, _)| db)
436 }
437
438 pub async fn new_with_outcome(
439 config: PostgresConfig,
440 provision_policy: ProvisionPolicy,
441 ) -> Result<(PostgresPool, DbInitialzationOutcome), InitializationError> {
442 let outcome = if matches!(
443 provision_policy,
444 ProvisionPolicy::Auto | ProvisionPolicy::MustCreate
445 ) {
446 create_database(&config, provision_policy).await?
447 } else {
448 DbInitialzationOutcome::Existing
449 };
450 let mut cfg = deadpool_postgres::Config::new();
451 cfg.host = Some(config.host.clone());
452 cfg.user = Some(config.user.clone());
453 cfg.password = Some(config.password.clone());
454 cfg.dbname = Some(config.db_name.clone());
455 cfg.manager = Some(ManagerConfig {
456 recycling_method: RecyclingMethod::Fast,
457 });
458
459 let pool = cfg.create_pool(None, NoTls).map_err(|err| {
460 error!("Cannot create the database pool - {err:?}");
461 InitializationError
462 })?;
463 let client = pool.get().await.map_err(|err| {
464 error!("Cannot get a connection from the database pool - {err:?}");
465 InitializationError
466 })?;
467
468 let statements = vec![
469 ddl::CREATE_TABLE_T_METADATA,
470 ddl::CREATE_TABLE_T_EXECUTION_LOG,
471 ddl::CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_VERSION,
472 ddl::CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_VARIANT,
473 ddl::CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_JOIN_SET,
474 ddl::CREATE_TABLE_T_JOIN_SET_RESPONSE,
475 ddl::CREATE_INDEX_IDX_T_JOIN_SET_RESPONSE_EXECUTION_ID_ID,
476 ddl::CREATE_INDEX_IDX_JOIN_SET_RESPONSE_UNIQUE_CHILD_ID,
477 ddl::CREATE_INDEX_IDX_JOIN_SET_RESPONSE_UNIQUE_DELAY_ID,
478 ddl::CREATE_TABLE_T_STATE,
479 ddl::IDX_T_STATE_LOCK_PENDING_BY_FFQN,
480 ddl::IDX_T_STATE_LOCK_PENDING_BY_COMPONENT,
481 ddl::IDX_T_STATE_EXPIRED_LOCKS,
482 ddl::IDX_T_STATE_EXECUTION_ID_IS_TOP_LEVEL,
483 ddl::IDX_T_STATE_FFQN,
484 ddl::IDX_T_STATE_CREATED_AT,
485 ddl::IDX_T_STATE_DEPLOYMENT_STATE,
486 ddl::CREATE_TABLE_T_DELAY,
487 ddl::CREATE_TABLE_T_WASM_BACKTRACE,
488 ddl::CREATE_TABLE_T_EXECUTION_BACKTRACE,
489 ddl::IDX_T_EXECUTION_BACKTRACE_EXECUTION_ID_VERSION,
490 ddl::CREATE_TABLE_T_LOG,
491 ddl::IDX_T_LOG_EXECUTION_ID_RUN_ID_CREATED_AT,
492 ddl::IDX_T_LOG_EXECUTION_ID_CREATED_AT,
493 ];
494
495 let batch_sql = statements.join("\n");
497 client.batch_execute(&batch_sql).await.map_err(|err| {
498 error!("Cannot run the DDL import - {err:?}");
499 InitializationError
500 })?;
501
502 let row = client
503 .query_opt(
504 "SELECT schema_version FROM t_metadata ORDER BY id DESC LIMIT 1",
505 &[],
506 )
507 .await
508 .map_err(|err| {
509 error!("Cannot select schema version - {err:?}");
510 InitializationError
511 })?;
512
513 let actual_version = match row {
515 Some(r) => Some(r.try_get::<_, i32>("schema_version").map_err(|e| {
516 error!("Failed to get schema_version column: {e}");
517 InitializationError
518 })?),
519 None => None,
520 };
521
522 match actual_version {
523 None => {
524 client
525 .execute(
526 "INSERT INTO t_metadata (schema_version, created_at) VALUES ($1, $2)",
527 &[&(T_METADATA_EXPECTED_SCHEMA_VERSION), &Utc::now()],
528 )
529 .await
530 .map_err(|err| {
531 error!("Cannot insert schema version - {err:?}");
532 InitializationError
533 })?;
534 }
535 Some(actual_version) => {
536 if (actual_version) != T_METADATA_EXPECTED_SCHEMA_VERSION {
538 error!(
539 "Wrong schema version, expected {T_METADATA_EXPECTED_SCHEMA_VERSION}, got {actual_version}"
540 );
541 return Err(InitializationError);
542 }
543 }
544 }
545
546 debug!("Database schema initialized.");
547
548 Ok((
549 PostgresPool {
550 pool,
551 execution_finished_subscribers: Arc::default(),
552 pending_subscribers: Arc::default(),
553 response_subscribers: Arc::default(),
554 config,
555 },
556 outcome,
557 ))
558 }
559}
560
561#[track_caller]
562fn consistency_db_err(reason: impl Into<StrVariant>) -> DbErrorGeneric {
563 DbErrorGeneric::Uncategorized {
564 reason: reason.into(),
565 context: SpanTrace::capture(),
566 source: None,
567 loc: Location::caller(),
568 }
569}
570#[track_caller]
571fn consistency_db_err_src(
572 reason: impl Into<StrVariant>,
573 source: Arc<dyn std::error::Error + Send + Sync>,
574) -> DbErrorGeneric {
575 DbErrorGeneric::Uncategorized {
576 reason: reason.into(),
577 context: SpanTrace::capture(),
578 source: Some(source),
579 loc: Location::caller(),
580 }
581}
582
583#[derive(Debug, Clone)]
584struct DelayReq {
585 join_set_id: JoinSetId,
586 delay_id: DelayId,
587 expires_at: DateTime<Utc>,
588}
589
590async fn fetch_created_event(
591 tx: &Transaction<'_>,
592 execution_id: &ExecutionId,
593) -> Result<CreateRequest, DbErrorRead> {
594 let stmt = "SELECT created_at, json_value FROM t_execution_log WHERE \
595 execution_id = $1 AND version = 0";
596
597 let row = tx.query_one(stmt, &[&execution_id.to_string()]).await?;
598
599 let created_at = get(&row, "created_at")?;
600 let event: Json<ExecutionRequest> = get(&row, "json_value")?;
601 let event = event.0;
602
603 if let ExecutionRequest::Created {
604 ffqn,
605 params,
606 parent,
607 scheduled_at,
608 component_id,
609 deployment_id,
610 metadata,
611 scheduled_by,
612 } = event
613 {
614 Ok(CreateRequest {
615 created_at,
616 execution_id: execution_id.clone(),
617 ffqn,
618 params,
619 parent,
620 scheduled_at,
621 component_id,
622 deployment_id,
623 metadata,
624 scheduled_by,
625 })
626 } else {
627 error!("Row with version=0 must be a `Created` event - {event:?}");
628 Err(consistency_db_err("expected `Created` event").into())
629 }
630}
631
632fn check_expected_next_and_appending_version(
633 expected_version: &Version,
634 appending_version: &Version,
635) -> Result<(), DbErrorWrite> {
636 if *expected_version != *appending_version {
637 debug!(
638 "Version conflict - expected: {expected_version:?}, appending: {appending_version:?}"
639 );
640 return Err(DbErrorWrite::NonRetriable(
641 DbErrorWriteNonRetriable::VersionConflict {
642 expected: expected_version.clone(),
643 requested: appending_version.clone(),
644 },
645 ));
646 }
647 Ok(())
648}
649
650#[instrument(level = Level::DEBUG, skip_all, fields(execution_id = %req.execution_id))]
651async fn create_inner(
652 tx: &Transaction<'_>,
653 req: CreateRequest,
654) -> Result<(AppendResponse, AppendNotifier), DbErrorWrite> {
655 trace!("create_inner");
656
657 let version = Version::default();
658 let execution_id = req.execution_id.clone();
659 let execution_id_str = execution_id.to_string();
660 let ffqn = req.ffqn.clone();
661 let created_at = req.created_at;
662 let scheduled_at = req.scheduled_at;
663 let component_id = req.component_id.clone();
664 let deployment_id = req.deployment_id;
665
666 let event = ExecutionRequest::from(req);
667 let event = Json(event);
668
669 tx.execute(
670 "INSERT INTO t_execution_log (
671 execution_id, created_at, version, json_value, variant, join_set_id
672 ) VALUES ($1, $2, $3, $4, $5, $6)",
673 &[
674 &execution_id_str,
675 &created_at,
676 &i64::from(version.0), &event,
678 &event.0.variant(),
679 &event.0.join_set_id().map(std::string::ToString::to_string),
680 ],
681 )
682 .await?;
683
684 let pending_at = {
685 debug!("Creating with `Pending(`{scheduled_at:?}`)");
686
687 tx.execute(
688 r"
689 INSERT INTO t_state (
690 execution_id,
691 is_top_level,
692 corresponding_version,
693 pending_expires_finished,
694 ffqn,
695 state,
696 created_at,
697 component_id_input_digest,
698 component_type,
699 deployment_id,
700 first_scheduled_at,
701 updated_at,
702 intermittent_event_count,
703 is_paused
704 ) VALUES (
705 $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, CURRENT_TIMESTAMP, 0, false
706 )",
707 &[
708 &execution_id_str,
709 &execution_id.is_top_level(),
710 &i64::from(version.0),
711 &scheduled_at,
712 &ffqn.to_string(),
713 &STATE_PENDING_AT,
714 &created_at,
715 &component_id.input_digest.as_slice(),
716 &component_id.component_type.to_string(),
717 &deployment_id.to_string(),
718 &scheduled_at,
719 ],
720 )
721 .await?;
722
723 AppendNotifier {
724 pending_at: Some(NotifierPendingAt {
725 scheduled_at,
726 ffqn,
727 component_input_digest: component_id.input_digest,
728 }),
729 execution_finished: None,
730 response: None,
731 }
732 };
733
734 let next_version = Version::new(version.0 + 1);
735 Ok((next_version, pending_at))
736}
737
738#[instrument(level = Level::DEBUG, skip_all, fields(%execution_id, %scheduled_at))]
739async fn update_state_pending_after_response_appended(
740 tx: &Transaction<'_>,
741 execution_id: &ExecutionId,
742 scheduled_at: DateTime<Utc>, component_input_digest: InputContentDigest,
744) -> Result<AppendNotifier, DbErrorWrite> {
745 debug!("Setting t_state to Pending(`{scheduled_at:?}`) after response appended");
746
747 let execution_id_str = execution_id.to_string();
749
750 let updated = tx
751 .execute(
752 r"
753 UPDATE t_state
754 SET
755 pending_expires_finished = $1,
756 state = $2,
757 updated_at = CURRENT_TIMESTAMP,
758
759 max_retries = NULL,
760 retry_exp_backoff_millis = NULL,
761 last_lock_version = NULL,
762
763 join_set_id = NULL,
764 join_set_closing = NULL,
765
766 result_kind = NULL
767 WHERE execution_id = $3
768 ",
769 &[
770 &scheduled_at, &STATE_PENDING_AT, &execution_id_str, ],
774 )
775 .await?;
776
777 if updated == 0 {
778 return Err(DbErrorWrite::NotFound);
779 }
780
781 Ok(AppendNotifier {
782 pending_at: Some(NotifierPendingAt {
783 scheduled_at,
784 ffqn: fetch_created_event(tx, execution_id).await?.ffqn,
785 component_input_digest,
786 }),
787 execution_finished: None,
788 response: None,
789 })
790}
791
792#[instrument(level = Level::DEBUG, skip_all, fields(%execution_id, %scheduled_at, %appending_version))]
793async fn update_state_pending_after_event_appended(
794 tx: &Transaction<'_>,
795 execution_id: &ExecutionId,
796 appending_version: &Version,
797 scheduled_at: DateTime<Utc>,
798 intermittent_failure: bool,
799 component_input_digest: InputContentDigest,
800) -> Result<(AppendResponse, AppendNotifier), DbErrorWrite> {
801 debug!("Setting t_state to Pending(`{scheduled_at:?}`) after event appended");
802
803 let intermittent_delta = i64::from(intermittent_failure); let updated = tx
806 .execute(
807 r"
808 UPDATE t_state
809 SET
810 corresponding_version = $1,
811 pending_expires_finished = $2,
812 state = $3,
813 updated_at = CURRENT_TIMESTAMP,
814 intermittent_event_count = intermittent_event_count + $4,
815
816 max_retries = NULL,
817 retry_exp_backoff_millis = NULL,
818 last_lock_version = NULL,
819
820 join_set_id = NULL,
821 join_set_closing = NULL,
822
823 result_kind = NULL
824 WHERE execution_id = $5;
825 ",
826 &[
827 &i64::from(appending_version.0), &scheduled_at, &STATE_PENDING_AT, &intermittent_delta, &execution_id.to_string(), ],
833 )
834 .await?;
835
836 if updated != 1 {
837 return Err(DbErrorWrite::NotFound);
838 }
839
840 Ok((
841 appending_version.increment(),
842 AppendNotifier {
843 pending_at: Some(NotifierPendingAt {
844 scheduled_at,
845 ffqn: fetch_created_event(tx, execution_id).await?.ffqn,
846 component_input_digest,
847 }),
848 execution_finished: None,
849 response: None,
850 },
851 ))
852}
853
854#[instrument(level = Level::DEBUG, skip_all, fields(%execution_id, %appending_version))]
855#[expect(clippy::too_many_arguments)]
856async fn update_state_locked_get_intermittent_event_count(
857 tx: &Transaction<'_>,
858 execution_id: &ExecutionId,
859 deployment_id: DeploymentId,
860 component_digest: &InputContentDigest,
861 executor_id: ExecutorId,
862 run_id: RunId,
863 lock_expires_at: DateTime<Utc>,
864 appending_version: &Version,
865 retry_config: ComponentRetryConfig,
866) -> Result<u32, DbErrorWrite> {
867 debug!("Setting t_state to Locked(`{lock_expires_at:?}`)");
868 let backoff_millis =
869 i64::try_from(retry_config.retry_exp_backoff.as_millis()).map_err(|err| {
870 DbErrorGeneric::Uncategorized {
872 reason: "backoff too big".into(),
873 context: SpanTrace::capture(),
874 source: Some(Arc::new(err)),
875 loc: Location::caller(),
876 }
877 })?;
878
879 let execution_id_str = execution_id.to_string();
880
881 let updated = tx
882 .execute(
883 r"
884 UPDATE t_state
885 SET
886 corresponding_version = $1,
887 pending_expires_finished = $2,
888 state = $3,
889 updated_at = CURRENT_TIMESTAMP,
890 deployment_id = $4,
891 component_id_input_digest = $5,
892
893 max_retries = $6,
894 retry_exp_backoff_millis = $7,
895 last_lock_version = $1,
896 executor_id = $8,
897 run_id = $9,
898
899 join_set_id = NULL,
900 join_set_closing = NULL,
901
902 result_kind = NULL
903 WHERE execution_id = $10
904 AND is_paused = false
905 ",
906 &[
907 &i64::from(appending_version.0),
908 &lock_expires_at,
909 &STATE_LOCKED,
910 &deployment_id.to_string(),
911 &component_digest,
912 &retry_config.max_retries.map(i64::from),
913 &backoff_millis,
914 &executor_id.to_string(),
915 &run_id.to_string(),
916 &execution_id_str,
917 ],
918 )
919 .await?;
920
921 if updated != 1 {
922 return Err(DbErrorWrite::NotFound);
923 }
924
925 let row = tx
927 .query_one(
928 "SELECT intermittent_event_count FROM t_state WHERE execution_id = $1",
929 &[&execution_id_str],
930 )
931 .await
932 .map_err(DbErrorGeneric::from)?;
933
934 let count: i64 = get(&row, "intermittent_event_count")?; let count = u32::try_from(count)
936 .map_err(|_| consistency_db_err("`intermittent_event_count` must not be negative"))?;
937 Ok(count)
938}
939
940#[instrument(level = Level::DEBUG, skip_all, fields(%execution_id, %appending_version))]
941async fn update_state_blocked(
942 tx: &Transaction<'_>,
943 execution_id: &ExecutionId,
944 appending_version: &Version,
945 join_set_id: &JoinSetId,
946 lock_expires_at: DateTime<Utc>,
947 join_set_closing: bool,
948) -> Result<AppendResponse, DbErrorWrite> {
949 debug!("Setting t_state to BlockedByJoinSet(`{join_set_id}`)");
950
951 let updated = tx
952 .execute(
953 r"
954 UPDATE t_state
955 SET
956 corresponding_version = $1,
957 pending_expires_finished = $2,
958 state = $3,
959 updated_at = CURRENT_TIMESTAMP,
960
961 max_retries = NULL,
962 retry_exp_backoff_millis = NULL,
963 last_lock_version = NULL,
964
965 join_set_id = $4,
966 join_set_closing = $5,
967
968 result_kind = NULL
969 WHERE execution_id = $6
970 ",
971 &[
972 &i64::from(appending_version.0), &lock_expires_at, &STATE_BLOCKED_BY_JOIN_SET, &join_set_id.to_string(), &join_set_closing, &execution_id.to_string(), ],
979 )
980 .await?;
981
982 if updated != 1 {
983 return Err(DbErrorWrite::NotFound);
984 }
985 Ok(appending_version.increment())
986}
987
988#[instrument(level = Level::DEBUG, skip_all, fields(%execution_id, %appending_version))]
989async fn update_state_finished(
990 tx: &Transaction<'_>,
991 execution_id: &ExecutionId,
992 appending_version: &Version,
993 finished_at: DateTime<Utc>,
994 result_kind: PendingStateFinishedResultKind,
995) -> Result<(), DbErrorWrite> {
996 debug!("Setting t_state to Finished");
997
998 let result_kind_json = Json(result_kind);
999
1000 let updated = tx
1001 .execute(
1002 r"
1003 UPDATE t_state
1004 SET
1005 corresponding_version = $1,
1006 pending_expires_finished = $2,
1007 state = $3,
1008 updated_at = CURRENT_TIMESTAMP,
1009
1010 max_retries = NULL,
1011 retry_exp_backoff_millis = NULL,
1012 last_lock_version = NULL,
1013 executor_id = NULL,
1014 run_id = NULL,
1015
1016 join_set_id = NULL,
1017 join_set_closing = NULL,
1018
1019 result_kind = $4
1020 WHERE execution_id = $5
1021 ",
1022 &[
1023 &i64::from(appending_version.0), &finished_at, &STATE_FINISHED, &result_kind_json, &execution_id.to_string(), ],
1029 )
1030 .await?;
1031
1032 if updated != 1 {
1033 return Err(DbErrorWrite::NotFound);
1034 }
1035 Ok(())
1036}
1037
1038#[instrument(level = Level::DEBUG, skip_all, fields(%execution_id, %appending_version, %is_paused))]
1039async fn update_state_paused(
1040 tx: &Transaction<'_>,
1041 execution_id: &ExecutionId,
1042 appending_version: &Version,
1043 is_paused: bool,
1044) -> Result<AppendResponse, DbErrorWrite> {
1045 debug!(
1046 "Setting t_state to {}",
1047 if is_paused { "paused" } else { "unpaused" }
1048 );
1049
1050 let updated = tx
1051 .execute(
1052 r"
1053 UPDATE t_state
1054 SET
1055 corresponding_version = $1,
1056 is_paused = $2,
1057 updated_at = CURRENT_TIMESTAMP
1058 WHERE execution_id = $3
1059 ",
1060 &[
1061 &i64::from(appending_version.0), &is_paused, &execution_id.to_string(), ],
1065 )
1066 .await?;
1067
1068 if updated != 1 {
1069 return Err(DbErrorWrite::NotFound);
1070 }
1071 Ok(appending_version.increment())
1072}
1073
1074#[instrument(level = Level::DEBUG, skip_all, fields(%execution_id, %appending_version))]
1075async fn bump_state_next_version(
1076 tx: &Transaction<'_>,
1077 execution_id: &ExecutionId,
1078 appending_version: &Version,
1079 delay_req: Option<DelayReq>,
1080) -> Result<AppendResponse, DbErrorWrite> {
1081 debug!("update_index_version");
1082 let execution_id_str = execution_id.to_string();
1083
1084 let updated = tx
1085 .execute(
1086 r"
1087 UPDATE t_state
1088 SET
1089 corresponding_version = $1,
1090 updated_at = CURRENT_TIMESTAMP
1091 WHERE execution_id = $2
1092 ",
1093 &[
1094 &i64::from(appending_version.0), &execution_id_str, ],
1097 )
1098 .await?;
1099
1100 if updated != 1 {
1101 return Err(DbErrorWrite::NotFound);
1102 }
1103
1104 if let Some(DelayReq {
1105 join_set_id,
1106 delay_id,
1107 expires_at,
1108 }) = delay_req
1109 {
1110 debug!("Inserting delay to `t_delay`");
1111 tx.execute(
1112 "INSERT INTO t_delay (execution_id, join_set_id, delay_id, expires_at) VALUES ($1, $2, $3, $4)",
1113 &[
1114 &execution_id_str,
1115 &join_set_id.to_string(),
1116 &delay_id.to_string(),
1117 &expires_at,
1118 ],
1119 )
1120 .await?;
1121 }
1122 Ok(appending_version.increment())
1123}
1124
1125async fn get_combined_state(
1126 tx: &Transaction<'_>,
1127 execution_id: &ExecutionId,
1128) -> Result<CombinedState, DbErrorRead> {
1129 let row = tx
1130 .query_one(
1131 r"
1132 SELECT
1133 created_at, first_scheduled_at,
1134 state, ffqn, component_id_input_digest, component_type, deployment_id, corresponding_version, pending_expires_finished,
1135 last_lock_version, executor_id, run_id,
1136 join_set_id, join_set_closing,
1137 result_kind, is_paused
1138 FROM t_state
1139 WHERE execution_id = $1
1140 ",
1141 &[&execution_id.to_string()],
1142 )
1143 .await
1144 .map_err(DbErrorRead::from)?;
1145
1146 let created_at: DateTime<Utc> = get(&row, "created_at")?;
1149 let first_scheduled_at: DateTime<Utc> = get(&row, "first_scheduled_at")?;
1150
1151 let digest_bytes: Vec<u8> = get(&row, "component_id_input_digest")?;
1152 let digest = Digest::try_from(digest_bytes.as_slice()).map_err(|err| {
1153 consistency_db_err_src("cannot parse `component_id_input_digest`", Arc::from(err))
1154 })?;
1155 let component_digest = InputContentDigest(ContentDigest(digest));
1156
1157 let component_type: String = get(&row, "component_type")?;
1158 let component_type = ComponentType::from_str(&component_type)
1159 .map_err(|err| consistency_db_err_src("cannot parse `component_type`", Arc::from(err)))?;
1160
1161 let deployment_id: String = get(&row, "deployment_id")?;
1162 let deployment_id = DeploymentId::from_str(&deployment_id).map_err(DbErrorGeneric::from)?;
1163
1164 let state: String = get(&row, "state")?;
1165 let ffqn: String = get(&row, "ffqn")?;
1166 let ffqn = FunctionFqn::from_str(&ffqn).map_err(|parse_err| {
1167 consistency_db_err(format!("invalid ffqn value in `t_state` - {parse_err}"))
1168 })?;
1169
1170 let pending_expires_finished: DateTime<Utc> = get(&row, "pending_expires_finished")?;
1171
1172 let last_lock_version_raw: Option<i64> = get(&row, "last_lock_version")?;
1173 let last_lock_version = last_lock_version_raw
1174 .map(Version::try_from)
1175 .transpose()
1176 .map_err(|_| consistency_db_err("version must be non-negative"))?;
1177
1178 let executor_id_raw: Option<String> = get(&row, "executor_id")?;
1179 let executor_id = executor_id_raw
1180 .map(|id| ExecutorId::from_str(&id))
1181 .transpose()
1182 .map_err(DbErrorGeneric::from)?;
1183
1184 let run_id_raw: Option<String> = get(&row, "run_id")?;
1185 let run_id = run_id_raw
1186 .map(|id| RunId::from_str(&id))
1187 .transpose()
1188 .map_err(DbErrorGeneric::from)?;
1189
1190 let join_set_id_raw: Option<String> = get(&row, "join_set_id")?;
1191 let join_set_id = join_set_id_raw
1192 .map(|id| JoinSetId::from_str(&id))
1193 .transpose()
1194 .map_err(DbErrorGeneric::from)?;
1195
1196 let join_set_closing: Option<bool> = get(&row, "join_set_closing")?;
1197
1198 let result_kind: Option<Json<PendingStateFinishedResultKind>> = get(&row, "result_kind")?;
1199 let result_kind = result_kind.map(|it| it.0);
1200
1201 let is_paused: bool = get(&row, "is_paused")?;
1202
1203 let corresponding_version: i64 = get(&row, "corresponding_version")?;
1204 let corresponding_version = Version::new(
1205 VersionType::try_from(corresponding_version)
1206 .map_err(|_| consistency_db_err("version must be non-negative"))?,
1207 );
1208
1209 let dto = CombinedStateDTO {
1210 execution_id: execution_id.clone(),
1211 created_at,
1212 first_scheduled_at,
1213 state,
1214 ffqn,
1215 component_digest,
1216 component_type,
1217 deployment_id,
1218 pending_expires_finished,
1219 last_lock_version,
1220 executor_id,
1221 run_id,
1222 join_set_id,
1223 join_set_closing,
1224 result_kind,
1225 is_paused,
1226 };
1227 CombinedState::new(dto, corresponding_version).map_err(DbErrorRead::from)
1228}
1229
1230async fn list_executions(
1231 read_tx: &Transaction<'_>,
1232 filter: ListExecutionsFilter,
1233 pagination: &ExecutionListPagination,
1234) -> Result<Vec<ExecutionWithState>, DbErrorGeneric> {
1235 struct QueryBuilder {
1237 where_clauses: Vec<String>,
1238 params: Vec<Box<dyn ToSql + Send + Sync>>,
1239 }
1240
1241 impl QueryBuilder {
1242 fn new() -> Self {
1243 Self {
1244 where_clauses: Vec::new(),
1245 params: Vec::new(),
1246 }
1247 }
1248
1249 fn add_param<T>(&mut self, param: T) -> String
1250 where
1251 T: ToSql + Sync + Send + 'static,
1252 {
1253 self.params.push(Box::new(param));
1254 format!("${}", self.params.len())
1255 }
1256
1257 fn add_where(&mut self, clause: String) {
1258 self.where_clauses.push(clause);
1259 }
1260 }
1261
1262 let mut qb = QueryBuilder::new();
1263
1264 let (limit, limit_desc) = match pagination {
1266 ExecutionListPagination::CreatedBy(p) => {
1267 let limit = p.length();
1268 let is_desc = p.is_desc();
1269 if let Some(cursor) = p.cursor() {
1270 let placeholder = qb.add_param(*cursor);
1271 qb.add_where(format!("created_at {} {placeholder}", p.rel()));
1272 }
1273 (limit, is_desc)
1274 }
1275 ExecutionListPagination::ExecutionId(p) => {
1276 let limit = p.length();
1277 let is_desc = p.is_desc();
1278 if let Some(cursor) = p.cursor() {
1279 let placeholder = qb.add_param(cursor.to_string());
1280 qb.add_where(format!("execution_id {} {placeholder}", p.rel()));
1281 }
1282 (limit, is_desc)
1283 }
1284 };
1285
1286 if !filter.show_derived {
1287 qb.add_where("is_top_level = true".to_string());
1288 }
1289 let like = |str| format!("{str}%");
1290 if let Some(ffqn_prefix) = filter.ffqn_prefix {
1291 let placeholder = qb.add_param(like(ffqn_prefix));
1292 qb.add_where(format!("ffqn LIKE {placeholder}"));
1293 }
1294 if filter.hide_finished {
1295 qb.add_where(format!("state != '{STATE_FINISHED}'"));
1296 }
1297 if let Some(prefix) = filter.execution_id_prefix {
1298 let placeholder = qb.add_param(like(prefix));
1299 qb.add_where(format!("execution_id LIKE {placeholder}"));
1300 }
1301 if let Some(component_digest) = filter.component_digest {
1302 let placeholder = qb.add_param(component_digest);
1303 qb.add_where(format!("component_id_input_digest = {placeholder}"));
1304 }
1305 if let Some(deployment_id) = filter.deployment_id {
1306 let placeholder = qb.add_param(deployment_id.to_string());
1307 qb.add_where(format!("deployment_id = {placeholder}"));
1308 }
1309
1310 let where_str = if qb.where_clauses.is_empty() {
1311 String::new()
1312 } else {
1313 format!("WHERE {}", qb.where_clauses.join(" AND "))
1314 };
1315
1316 let order_col = match pagination {
1317 ExecutionListPagination::CreatedBy(_) => "created_at",
1318 ExecutionListPagination::ExecutionId(_) => "execution_id",
1319 };
1320
1321 let (inner_order, outer_order) = if limit_desc {
1324 ("DESC", "")
1325 } else {
1326 ("", "DESC")
1327 };
1328
1329 let inner_sql = format!(
1330 r"SELECT created_at, first_scheduled_at, component_id_input_digest, deployment_id,
1331 state, execution_id, ffqn, corresponding_version, pending_expires_finished,
1332 last_lock_version, executor_id, run_id,
1333 join_set_id, join_set_closing,
1334 result_kind, is_paused
1335 FROM t_state {where_str} ORDER BY {order_col} {inner_order} LIMIT {limit}"
1336 );
1337
1338 let sql = if outer_order.is_empty() {
1339 inner_sql
1340 } else {
1341 format!("SELECT * FROM ({inner_sql}) AS sub ORDER BY {order_col} {outer_order}")
1342 };
1343
1344 let params_refs: Vec<&(dyn ToSql + Sync)> = qb
1345 .params
1346 .iter()
1347 .map(|p| p.as_ref() as &(dyn ToSql + Sync))
1348 .collect();
1349
1350 let rows = read_tx.query(&sql, ¶ms_refs).await?;
1351
1352 let mut vec = Vec::with_capacity(rows.len());
1353
1354 for row in rows {
1355 let unpack = || -> Result<ExecutionWithState, DbErrorGeneric> {
1357 let execution_id_str: String = get(&row, "execution_id")?;
1358 let execution_id = ExecutionId::from_str(&execution_id_str)
1359 .map_err(|err| consistency_db_err(err.to_string()))?;
1360
1361 let digest_bytes: Vec<u8> = get(&row, "component_id_input_digest")?;
1362 let digest = Digest::try_from(digest_bytes.as_slice()).map_err(|err| {
1363 consistency_db_err_src("cannot parse `component_id_input_digest`", Arc::from(err))
1364 })?;
1365 let component_digest = InputContentDigest(ContentDigest(digest));
1366
1367 let component_type: String = get(&row, "component_type")?;
1368 let component_type = ComponentType::from_str(&component_type).map_err(|err| {
1369 consistency_db_err_src("cannot parse `component_type`", Arc::from(err))
1370 })?;
1371
1372 let deployment_id: String = get(&row, "deployment_id")?;
1373 let deployment_id =
1374 DeploymentId::from_str(&deployment_id).map_err(DbErrorGeneric::from)?;
1375
1376 let created_at: DateTime<Utc> = get(&row, "created_at")?;
1377 let first_scheduled_at: DateTime<Utc> = get(&row, "first_scheduled_at")?;
1378
1379 let result_kind: Option<Json<PendingStateFinishedResultKind>> =
1380 get(&row, "result_kind")?;
1381 let result_kind = result_kind.map(|it| it.0);
1382
1383 let is_paused: bool = get(&row, "is_paused")?;
1384
1385 let corresponding_version: i64 = get(&row, "corresponding_version")?;
1386 let corresponding_version = Version::try_from(corresponding_version)
1387 .map_err(|_| consistency_db_err("version must be non-negative"))?;
1388
1389 let executor_id_str: Option<String> = get(&row, "executor_id")?;
1390 let executor_id = executor_id_str
1391 .map(|id| ExecutorId::from_str(&id))
1392 .transpose()?;
1393
1394 let last_lock_version_raw: Option<i64> = get(&row, "last_lock_version")?;
1395 let last_lock_version = last_lock_version_raw
1396 .map(Version::try_from)
1397 .transpose()
1398 .map_err(|_| consistency_db_err("version must be non-negative"))?;
1399
1400 let run_id_str: Option<String> = get(&row, "run_id")?;
1401 let run_id = run_id_str.map(|id| RunId::from_str(&id)).transpose()?;
1402
1403 let join_set_id_str: Option<String> = get(&row, "join_set_id")?;
1404 let join_set_id = join_set_id_str
1405 .map(|id| JoinSetId::from_str(&id))
1406 .transpose()?;
1407
1408 let ffqn: String = get(&row, "ffqn")?;
1409 let ffqn = FunctionFqn::from_str(&ffqn).map_err(|parse_err| {
1410 error!("Error parsing ffqn - {parse_err:?}");
1411 consistency_db_err("invalid ffqn value in `t_state`")
1412 })?;
1413
1414 let combined_state_dto = CombinedStateDTO {
1415 execution_id,
1416 created_at,
1417 first_scheduled_at,
1418 component_digest,
1419 component_type,
1420 deployment_id,
1421 state: get(&row, "state")?,
1422 ffqn,
1423 pending_expires_finished: get(&row, "pending_expires_finished")?,
1424 executor_id,
1425 last_lock_version,
1426 run_id,
1427 join_set_id,
1428 join_set_closing: get(&row, "join_set_closing")?,
1429 result_kind,
1430 is_paused,
1431 };
1432
1433 let combined_state = CombinedState::new(combined_state_dto, corresponding_version)?;
1434
1435 Ok(combined_state.execution_with_state)
1436 };
1437
1438 match unpack() {
1439 Ok(execution) => vec.push(execution),
1440 Err(err) => {
1441 warn!("Skipping corrupted row in t_state: {err:?}");
1442 }
1443 }
1444 }
1445
1446 Ok(vec)
1447}
1448
1449async fn list_responses(
1450 tx: &Transaction<'_>,
1451 execution_id: &ExecutionId,
1452 pagination: Option<Pagination<u32>>,
1453) -> Result<Vec<ResponseWithCursor>, DbErrorRead> {
1454 let mut params: Vec<Box<dyn ToSql + Send + Sync>> = Vec::new();
1456 let mut add_param = |p: Box<dyn ToSql + Send + Sync>| {
1457 params.push(p);
1458 format!("${}", params.len())
1459 };
1460
1461 let p_execution_id = add_param(Box::new(execution_id.to_string()));
1463
1464 let mut sql = format!(
1465 "SELECT \
1466 r.id, r.created_at, r.join_set_id, r.delay_id, r.delay_success, r.child_execution_id, r.finished_version, l.json_value \
1467 FROM t_join_set_response r LEFT OUTER JOIN t_execution_log l ON r.child_execution_id = l.execution_id \
1468 WHERE \
1469 r.execution_id = {p_execution_id} \
1470 AND ( r.finished_version = l.version OR r.child_execution_id IS NULL )"
1471 );
1472
1473 let limit = match &pagination {
1475 Some(p @ (Pagination::NewerThan { cursor, .. } | Pagination::OlderThan { cursor, .. })) => {
1476 let p_cursor = add_param(Box::new(i64::from(*cursor)));
1478
1479 write!(sql, " AND r.id {} {}", p.rel(), p_cursor).unwrap();
1481
1482 Some(p.length())
1483 }
1484 None => None,
1485 };
1486
1487 sql.push_str(" ORDER BY r.id");
1489 let is_desc = pagination.as_ref().is_some_and(Pagination::is_desc);
1490 if is_desc {
1491 sql.push_str(" DESC");
1492 }
1493
1494 if let Some(limit) = limit {
1496 let p_limit = add_param(Box::new(i64::from(limit)));
1498 write!(sql, " LIMIT {p_limit}").unwrap();
1499 }
1500
1501 if is_desc {
1503 sql = format!("SELECT * FROM ({sql}) AS sub ORDER BY id ASC");
1504 }
1505
1506 let params_refs: Vec<&(dyn ToSql + Sync)> = params
1507 .iter()
1508 .map(|p| p.as_ref() as &(dyn ToSql + Sync))
1509 .collect();
1510
1511 let rows = tx
1512 .query(&sql, ¶ms_refs)
1513 .await
1514 .map_err(DbErrorRead::from)?;
1515
1516 let mut results = Vec::with_capacity(rows.len());
1517 for row in rows {
1518 results.push(parse_response_with_cursor(&row)?);
1519 }
1520
1521 Ok(results)
1522}
1523
1524async fn list_logs_tx(
1525 tx: &Transaction<'_>,
1526 execution_id: &ExecutionId,
1527 filter: &LogFilter,
1528 pagination: &Pagination<u32>,
1529) -> Result<ListLogsResponse, DbErrorRead> {
1530 let mut param_index = 1;
1531 let mut query = format!(
1532 "SELECT id, run_id, created_at, level, message, stream_type, payload
1533 FROM t_log
1534 WHERE execution_id = ${param_index}",
1535 );
1536 param_index += 1;
1537
1538 let execution_id = execution_id.to_string();
1539 let mut params: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = vec![&execution_id];
1540
1541 let level_filter = if filter.should_show_logs() {
1543 let levels_str = if !filter.levels().is_empty() {
1544 filter
1545 .levels()
1546 .iter()
1547 .map(|lvl| (*lvl as u8).to_string())
1548 .collect::<Vec<_>>()
1549 .join(",")
1550 } else {
1551 LogLevel::iter()
1552 .map(|lvl| (lvl as u8).to_string())
1553 .collect::<Vec<_>>()
1554 .join(",")
1555 };
1556 Some(format!(" level IN ({levels_str})"))
1557 } else {
1558 None
1559 };
1560 let stream_filter = if filter.should_show_streams() {
1561 let streams_str = if !filter.stream_types().is_empty() {
1562 filter
1563 .stream_types()
1564 .iter()
1565 .map(|st| (*st as u8).to_string())
1566 .collect::<Vec<_>>()
1567 .join(",")
1568 } else {
1569 LogStreamType::iter()
1570 .map(|st| (st as u8).to_string())
1571 .collect::<Vec<_>>()
1572 .join(",")
1573 };
1574 Some(format!(" stream_type IN ({streams_str})"))
1575 } else {
1576 None
1577 };
1578 match (level_filter, stream_filter) {
1579 (Some(level_filter), Some(stream_filter)) => {
1580 write!(&mut query, " AND ({level_filter} OR {stream_filter})")
1581 .expect("writing to string");
1582 }
1583 (Some(level_filter), None) => {
1584 write!(&mut query, " AND {level_filter}").expect("writing to string");
1585 }
1586 (None, Some(stream_filter)) => {
1587 write!(&mut query, " AND {stream_filter}").expect("writing to string");
1588 }
1589 (None, None) => unreachable!("guarded by constructor"),
1590 }
1591
1592 write!(&mut query, " AND id {} ${param_index}", pagination.rel()).expect("writing to string");
1594 let cursor_val: i64 = (*pagination.cursor()).into();
1595 params.push(&cursor_val);
1596 param_index += 1;
1597
1598 write!(
1600 &mut query,
1601 " ORDER BY id {} LIMIT ${}",
1602 if pagination.is_desc() { "DESC" } else { "ASC" },
1603 param_index
1604 )
1605 .expect("writing to string");
1606 let length_val: i64 = i64::from(pagination.length());
1607 params.push(&length_val);
1608
1609 let rows = tx.query(&query, ¶ms[..]).await?;
1610
1611 let mut items = Vec::with_capacity(rows.len());
1612
1613 for row in rows {
1614 let cursor = u32::try_from(get::<i64, _>(&row, "id")?)
1615 .map_err(|_| consistency_db_err("t_join_set_response.id must not be negative"))?;
1616 let created_at: chrono::DateTime<chrono::Utc> = get(&row, "created_at")?;
1617 let run_id: String = get(&row, "run_id")?;
1618 let run_id = RunId::from_str(&run_id).map_err(|parse_err| {
1619 consistency_db_err_src(
1620 format!("cannot convert RunId {run_id}, id: {cursor}"),
1621 Arc::from(parse_err),
1622 )
1623 })?;
1624
1625 let level: Option<i32> = get(&row, "level")?;
1626 let message: Option<String> = get(&row, "message")?;
1627 let stream_type: Option<i32> = get(&row, "stream_type")?;
1628 let payload: Option<Vec<u8>> = get(&row, "payload")?;
1629
1630 let log_entry = match (level, message, stream_type, payload) {
1631 (Some(lvl), Some(msg), None, None) => {
1632 let map_err = |err| {
1633 consistency_db_err_src(
1634 format!("cannot convert {lvl} to LogLevel , id: {cursor}"),
1635 err,
1636 )
1637 };
1638 LogEntry::Log {
1639 created_at,
1640 level: u8::try_from(lvl)
1641 .map(|lvl| LogLevel::try_from(lvl).map_err(|err| map_err(Arc::from(err))))
1642 .map_err(|err| map_err(Arc::from(err)))??,
1643 message: msg,
1644 }
1645 }
1646 (None, None, Some(stype), Some(pl)) => {
1647 let map_err = |err| {
1648 consistency_db_err_src(
1649 format!("cannot convert {stype} to LogStreamType , id: {cursor}"),
1650 err,
1651 )
1652 };
1653 LogEntry::Stream {
1654 created_at,
1655 stream_type: u8::try_from(stype)
1656 .map(|stype| {
1657 LogStreamType::try_from(stype).map_err(|err| map_err(Arc::from(err)))
1658 })
1659 .map_err(|err| map_err(Arc::from(err)))??,
1660 payload: pl,
1661 }
1662 }
1663 _ => {
1664 return Err(consistency_db_err(format!("invalid t_log row id:{cursor}")).into());
1665 }
1666 };
1667
1668 items.push(LogEntryRow {
1669 cursor,
1670 run_id,
1671 log_entry,
1672 });
1673 }
1674
1675 Ok(ListLogsResponse {
1676 next_page: items
1677 .last()
1678 .map(|item| Pagination::NewerThan {
1679 length: pagination.length(),
1680 cursor: item.cursor,
1681 including_cursor: false,
1682 })
1683 .unwrap_or(if pagination.is_asc() {
1684 *pagination } else {
1686 Pagination::NewerThan {
1688 length: pagination.length(),
1689 cursor: 0,
1690 including_cursor: false, }
1692 }),
1693 prev_page: match items.first() {
1694 Some(item) => Some(Pagination::OlderThan {
1695 length: pagination.length(),
1696 cursor: item.cursor,
1697 including_cursor: false,
1698 }),
1699 None if pagination.is_asc() && *pagination.cursor() > 0 => {
1700 Some(pagination.invert())
1702 }
1703 None => None,
1704 },
1705 items,
1706 })
1707}
1708
1709async fn list_deployment_states(
1710 tx: &Transaction<'_>,
1711 current_time: DateTime<Utc>,
1712 pagination: Pagination<Option<DeploymentId>>,
1713) -> Result<Vec<DeploymentState>, DbErrorRead> {
1714 let mut params: Vec<Box<dyn ToSql + Send + Sync>> = Vec::new();
1716 let mut add_param = |p: Box<dyn ToSql + Send + Sync>| {
1717 params.push(p);
1718 format!("${}", params.len())
1719 };
1720
1721 let p_now = add_param(Box::new(current_time));
1723
1724 let mut sql = format!(
1725 "
1726 SELECT
1727 deployment_id,
1728
1729 COUNT(*) FILTER (WHERE state = '{STATE_LOCKED}') AS locked,
1730
1731 COUNT(*) FILTER (
1732 WHERE state = '{STATE_PENDING_AT}'
1733 AND pending_expires_finished <= {p_now}
1734 ) AS pending,
1735
1736 COUNT(*) FILTER (
1737 WHERE state = '{STATE_PENDING_AT}'
1738 AND pending_expires_finished > {p_now}
1739 ) AS scheduled,
1740
1741 COUNT(*) FILTER (WHERE state = '{STATE_BLOCKED_BY_JOIN_SET}') AS blocked,
1742
1743 COUNT(*) FILTER (WHERE state = '{STATE_FINISHED}') AS finished
1744 FROM t_state"
1745 );
1746
1747 if let Some(cursor) = pagination.cursor() {
1749 let p_cursor = add_param(Box::new(cursor.to_string()));
1750 write!(
1751 sql,
1752 " WHERE deployment_id {rel} {p_cursor}",
1753 rel = pagination.rel()
1754 )
1755 .expect("writing to string");
1756 }
1757
1758 let (inner_order, outer_order) = if pagination.is_desc() {
1762 ("DESC", "")
1763 } else {
1764 ("ASC", "DESC")
1765 };
1766
1767 write!(
1768 sql,
1769 " GROUP BY deployment_id ORDER BY deployment_id {inner_order} LIMIT {}",
1770 pagination.length()
1771 )
1772 .expect("writing to string");
1773
1774 let final_sql = if outer_order.is_empty() {
1775 sql
1776 } else {
1777 format!("SELECT * FROM ({sql}) AS sub ORDER BY deployment_id {outer_order}")
1778 };
1779
1780 let params_refs: Vec<&(dyn ToSql + Sync)> = params
1781 .iter()
1782 .map(|p| p.as_ref() as &(dyn ToSql + Sync))
1783 .collect();
1784
1785 let rows = tx
1786 .query(&final_sql, ¶ms_refs)
1787 .await
1788 .map_err(DbErrorRead::from)?;
1789
1790 let mut result = Vec::with_capacity(rows.len());
1791 for row in rows {
1792 let deployment_id: String = get(&row, "deployment_id")?;
1793 result.push(DeploymentState {
1794 deployment_id: DeploymentId::from_str(&deployment_id).map_err(DbErrorGeneric::from)?,
1795 locked: u32::try_from(get::<i64, _>(&row, "locked")?).expect("count is never negative"),
1796 pending: u32::try_from(get::<i64, _>(&row, "pending")?)
1797 .expect("count is never negative"),
1798 scheduled: u32::try_from(get::<i64, _>(&row, "scheduled")?)
1799 .expect("count is never negative"),
1800 blocked: u32::try_from(get::<i64, _>(&row, "blocked")?)
1801 .expect("count is never negative"),
1802 finished: u32::try_from(get::<i64, _>(&row, "finished")?)
1803 .expect("count is never negative"),
1804 });
1805 }
1806
1807 Ok(result)
1808}
1809
1810fn parse_response_with_cursor(
1811 row: &tokio_postgres::Row,
1812) -> Result<ResponseWithCursor, DbErrorRead> {
1813 let id = u32::try_from(get::<i64, _>(row, "id")?)
1815 .map_err(|_| consistency_db_err("id must not be negative"))?;
1816
1817 let created_at: DateTime<Utc> = get(row, "created_at")?;
1818 let join_set_id_str: String = get(row, "join_set_id")?;
1819 let join_set_id = JoinSetId::from_str(&join_set_id_str).map_err(DbErrorGeneric::from)?;
1820
1821 let delay_id: Option<String> = get(row, "delay_id")?;
1823 let delay_id = delay_id
1824 .map(|id| DelayId::from_str(&id))
1825 .transpose()
1826 .map_err(DbErrorGeneric::from)?;
1827 let delay_success: Option<bool> = get(row, "delay_success")?;
1828 let child_execution_id: Option<String> = get(row, "child_execution_id")?;
1829 let child_execution_id = child_execution_id
1830 .map(|id| ExecutionIdDerived::from_str(&id))
1831 .transpose()
1832 .map_err(DbErrorGeneric::from)?;
1833 let finished_version = get::<Option<i64>, _>(row, "finished_version")?
1834 .map(Version::try_from)
1835 .transpose()
1836 .map_err(|_| consistency_db_err("version must be non-negative"))?;
1837 let json_value: Option<Json<ExecutionRequest>> = get(row, "json_value")?;
1838 let json_value = json_value.map(|it| it.0);
1839
1840 let event = match (
1841 delay_id,
1842 delay_success,
1843 child_execution_id,
1844 finished_version,
1845 json_value,
1846 ) {
1847 (Some(delay_id), Some(delay_success), None, None, None) => JoinSetResponse::DelayFinished {
1848 delay_id,
1849 result: delay_success.then_some(()).ok_or(()),
1850 },
1851 (None, None, Some(child_execution_id), Some(finished_version), Some(json_val)) => {
1852 if let ExecutionRequest::Finished { result, .. } = json_val {
1853 JoinSetResponse::ChildExecutionFinished {
1854 child_execution_id,
1855 finished_version,
1856 result,
1857 }
1858 } else {
1859 error!("Joined log entry must be 'Finished'");
1860 return Err(consistency_db_err("joined log entry must be 'Finished'").into());
1861 }
1862 }
1863 (delay, delay_success, child, finished, result) => {
1864 error!(
1865 "Invalid row in t_join_set_response {id} - {delay:?} {delay_success:?} {child:?} {finished:?} {result:?}",
1866 );
1867 return Err(consistency_db_err("invalid row in t_join_set_response").into());
1868 }
1869 };
1870
1871 Ok(ResponseWithCursor {
1872 cursor: ResponseCursor(id),
1873 event: JoinSetResponseEventOuter {
1874 event: JoinSetResponseEvent { join_set_id, event },
1875 created_at,
1876 },
1877 })
1878}
1879
1880#[instrument(level = Level::TRACE, skip_all, fields(%execution_id, %run_id, %executor_id))]
1881#[expect(clippy::too_many_arguments)]
1882async fn lock_single_execution(
1883 tx: &Transaction<'_>,
1884 created_at: DateTime<Utc>,
1885 component_id: &ComponentId,
1886 deployment_id: DeploymentId,
1887 execution_id: &ExecutionId,
1888 run_id: RunId,
1889 appending_version: &Version,
1890 executor_id: ExecutorId,
1891 lock_expires_at: DateTime<Utc>,
1892 retry_config: ComponentRetryConfig,
1893) -> Result<LockedExecution, DbErrorWrite> {
1894 debug!("lock_single_execution");
1895
1896 let combined_state = get_combined_state(tx, execution_id).await?;
1898 combined_state
1899 .execution_with_state
1900 .pending_state
1901 .can_append_lock(created_at, executor_id, run_id, lock_expires_at)?;
1902 let expected_version = combined_state.get_next_version_assert_not_finished();
1903 check_expected_next_and_appending_version(&expected_version, appending_version)?;
1904
1905 let locked_event = Locked {
1907 component_id: component_id.clone(),
1908 deployment_id,
1909 executor_id,
1910 lock_expires_at,
1911 run_id,
1912 retry_config,
1913 };
1914 let event = ExecutionRequest::Locked(locked_event.clone());
1915
1916 let event = Json(event);
1917
1918 tx.execute(
1920 "INSERT INTO t_execution_log \
1921 (execution_id, created_at, json_value, version, variant) \
1922 VALUES ($1, $2, $3, $4, $5)",
1923 &[
1924 &execution_id.to_string(),
1925 &created_at,
1926 &event,
1927 &i64::from(appending_version.0),
1928 &event.0.variant(),
1929 ],
1930 )
1931 .await
1932 .map_err(|err| {
1933 DbErrorWrite::NonRetriable(DbErrorWriteNonRetriable::IllegalState {
1934 reason: "cannot lock".into(),
1935 context: SpanTrace::capture(),
1936 source: Some(Arc::new(err)),
1937 loc: Location::caller(),
1938 })
1939 })?;
1940
1941 let responses = list_responses(tx, execution_id, None).await?;
1942 trace!("Responses: {responses:?}");
1943
1944 let intermittent_event_count = update_state_locked_get_intermittent_event_count(
1946 tx,
1947 execution_id,
1948 deployment_id,
1949 &component_id.input_digest,
1950 executor_id,
1951 run_id,
1952 lock_expires_at,
1953 appending_version,
1954 retry_config,
1955 )
1956 .await?;
1957
1958 let rows = tx
1961 .query(
1962 "SELECT json_value, version FROM t_execution_log WHERE \
1963 execution_id = $1 AND (variant = $2 OR variant = $3) \
1964 ORDER BY version",
1965 &[
1966 &execution_id.to_string(),
1967 &DUMMY_CREATED.variant(),
1968 &DUMMY_HISTORY_EVENT.variant(),
1969 ],
1970 )
1971 .await
1972 .map_err(DbErrorGeneric::from)?;
1973
1974 let mut events: VecDeque<ExecutionEvent> = VecDeque::new();
1975
1976 for row in rows {
1977 let event: Json<ExecutionRequest> = get(&row, "json_value")?;
1978 let event = event.0;
1979
1980 let version: i64 = get(&row, "version")?;
1981 let version = Version::try_from(version)
1982 .map_err(|_| consistency_db_err("version must be non-negative"))?;
1983
1984 events.push_back(ExecutionEvent {
1985 created_at: DateTime::from_timestamp_nanos(0), event,
1987 backtrace_id: None,
1988 version,
1989 });
1990 }
1991
1992 let Some(ExecutionRequest::Created {
1994 ffqn,
1995 params,
1996 parent,
1997 metadata,
1998 ..
1999 }) = events.pop_front().map(|outer| outer.event)
2000 else {
2001 error!("Execution log must contain at least `Created` event");
2002 return Err(consistency_db_err("execution log must contain `Created` event").into());
2003 };
2004
2005 let mut event_history = Vec::new();
2007 for ExecutionEvent { event, version, .. } in events {
2008 if let ExecutionRequest::HistoryEvent { event } = event {
2009 event_history.push((event, version));
2010 } else {
2011 error!("Rows can only contain `Created` and `HistoryEvent` event kinds");
2012 return Err(consistency_db_err(
2013 "rows can only contain `Created` and `HistoryEvent` event kinds",
2014 )
2015 .into());
2016 }
2017 }
2018
2019 Ok(LockedExecution {
2020 execution_id: execution_id.clone(),
2021 metadata,
2022 next_version: appending_version.increment(),
2023 ffqn,
2024 params,
2025 event_history,
2026 responses,
2027 parent,
2028 intermittent_event_count,
2029 locked_event,
2030 })
2031}
2032
2033async fn count_join_next(
2034 tx: &Transaction<'_>,
2035 execution_id: &ExecutionId,
2036 join_set_id: &JoinSetId,
2037) -> Result<u32, DbErrorRead> {
2038 let row = tx
2039 .query_one(
2040 "SELECT COUNT(*) as count FROM t_execution_log WHERE execution_id = $1 AND join_set_id = $2 \
2041 AND history_event_type = $3",
2042 &[
2043 &execution_id.to_string(),
2044 &join_set_id.to_string(),
2045 &HISTORY_EVENT_TYPE_JOIN_NEXT,
2046 ],
2047 )
2048 .await
2049 .map_err(DbErrorRead::from)?;
2050
2051 let count = u32::try_from(get::<i64, _>(&row, "count")?).expect("COUNT cannot be negative");
2052 Ok(count)
2053}
2054
2055async fn nth_response(
2056 tx: &Transaction<'_>,
2057 execution_id: &ExecutionId,
2058 join_set_id: &JoinSetId,
2059 skip_rows: u32,
2060) -> Result<Option<ResponseWithCursor>, DbErrorRead> {
2061 let row = tx
2062 .query_opt(
2063 "SELECT r.id, r.created_at, r.join_set_id, \
2064 r.delay_id, r.delay_success, \
2065 r.child_execution_id, r.finished_version, l.json_value \
2066 FROM t_join_set_response r LEFT OUTER JOIN t_execution_log l ON r.child_execution_id = l.execution_id \
2067 WHERE \
2068 r.execution_id = $1 AND r.join_set_id = $2 AND \
2069 ( \
2070 r.finished_version = l.version \
2071 OR \
2072 r.child_execution_id IS NULL \
2073 ) \
2074 ORDER BY id \
2075 LIMIT 1 OFFSET $3",
2076 &[
2077 &execution_id.to_string(),
2078 &join_set_id.to_string(),
2079 &i64::from(skip_rows),
2080 ]
2081 )
2082 .await
2083 .map_err(DbErrorRead::from)?;
2084
2085 match row {
2086 Some(r) => Ok(Some(parse_response_with_cursor(&r)?)),
2087 None => Ok(None),
2088 }
2089}
2090
2091#[instrument(level = Level::TRACE, skip_all, fields(%execution_id))]
2092async fn append(
2093 tx: &Transaction<'_>,
2094 execution_id: &ExecutionId,
2095 req: AppendRequest,
2096 appending_version: Version,
2097) -> Result<(AppendResponse, AppendNotifier), DbErrorWrite> {
2098 if matches!(req.event, ExecutionRequest::Created { .. }) {
2099 return Err(DbErrorWrite::NonRetriable(
2100 DbErrorWriteNonRetriable::ValidationFailed(
2101 "cannot append `Created` event - use `create` instead".into(),
2102 ),
2103 ));
2104 }
2105
2106 if let AppendRequest {
2107 event:
2108 ExecutionRequest::Locked(Locked {
2109 component_id,
2110 deployment_id,
2111 executor_id,
2112 run_id,
2113 lock_expires_at,
2114 retry_config,
2115 }),
2116 created_at,
2117 } = req
2118 {
2119 return lock_single_execution(
2120 tx,
2121 created_at,
2122 &component_id,
2123 deployment_id,
2124 execution_id,
2125 run_id,
2126 &appending_version,
2127 executor_id,
2128 lock_expires_at,
2129 retry_config,
2130 )
2131 .await
2132 .map(|locked_execution| (locked_execution.next_version, AppendNotifier::default()));
2133 }
2134
2135 let combined_state = get_combined_state(tx, execution_id).await?;
2136 if combined_state
2137 .execution_with_state
2138 .pending_state
2139 .is_finished()
2140 {
2141 debug!("Execution is already finished");
2142 return Err(DbErrorWrite::NonRetriable(
2143 DbErrorWriteNonRetriable::IllegalState {
2144 reason: "already finished".into(),
2145 context: SpanTrace::capture(),
2146 source: None,
2147 loc: Location::caller(),
2148 },
2149 ));
2150 }
2151
2152 check_expected_next_and_appending_version(
2153 &combined_state.get_next_version_assert_not_finished(),
2154 &appending_version,
2155 )?;
2156
2157 let event = Json(req.event);
2158
2159 tx.execute(
2161 "INSERT INTO t_execution_log (execution_id, created_at, json_value, version, variant, join_set_id) \
2162 VALUES ($1, $2, $3, $4, $5, $6)",
2163 &[
2164 &execution_id.to_string(),
2165 &req.created_at,
2166 &event,
2167 &i64::from(appending_version.0),
2168 &event.0.variant(),
2169 &event.0.join_set_id().map(std::string::ToString::to_string),
2170 ],
2171 )
2172 .await?;
2173
2174 match &event.0 {
2176 ExecutionRequest::Created { .. } => {
2177 unreachable!("handled in the caller")
2178 }
2179
2180 ExecutionRequest::Locked { .. } => {
2181 unreachable!("handled above")
2182 }
2183
2184 ExecutionRequest::TemporarilyFailed {
2185 backoff_expires_at, ..
2186 }
2187 | ExecutionRequest::TemporarilyTimedOut {
2188 backoff_expires_at, ..
2189 } => {
2190 let (next_version, notifier) = update_state_pending_after_event_appended(
2191 tx,
2192 execution_id,
2193 &appending_version,
2194 *backoff_expires_at,
2195 true, combined_state.execution_with_state.component_digest,
2197 )
2198 .await?;
2199 return Ok((next_version, notifier));
2200 }
2201
2202 ExecutionRequest::Unlocked {
2203 backoff_expires_at, ..
2204 } => {
2205 let (next_version, notifier) = update_state_pending_after_event_appended(
2206 tx,
2207 execution_id,
2208 &appending_version,
2209 *backoff_expires_at,
2210 false, combined_state.execution_with_state.component_digest,
2212 )
2213 .await?;
2214 return Ok((next_version, notifier));
2215 }
2216
2217 ExecutionRequest::Paused => {
2218 match &combined_state.execution_with_state.pending_state {
2219 PendingState::Finished { .. } => {
2220 unreachable!("handled above");
2221 }
2222 PendingState::Paused(..) => {
2223 return Err(DbErrorWriteNonRetriable::IllegalState {
2224 reason: "cannot pause, execution is already paused".into(),
2225 context: SpanTrace::capture(),
2226 source: None,
2227 loc: Location::caller(),
2228 }
2229 .into());
2230 }
2231 _ => {}
2232 }
2233 let next_version =
2234 update_state_paused(tx, execution_id, &appending_version, true).await?;
2235 return Ok((next_version, AppendNotifier::default()));
2236 }
2237
2238 ExecutionRequest::Unpaused => {
2239 if !combined_state
2240 .execution_with_state
2241 .pending_state
2242 .is_paused()
2243 {
2244 return Err(DbErrorWriteNonRetriable::IllegalState {
2245 reason: "cannot unpause, execution is not paused".into(),
2246 context: SpanTrace::capture(),
2247 source: None,
2248 loc: Location::caller(),
2249 }
2250 .into());
2251 }
2252 let next_version =
2253 update_state_paused(tx, execution_id, &appending_version, false).await?;
2254 return Ok((next_version, AppendNotifier::default()));
2255 }
2256
2257 ExecutionRequest::Finished { result, .. } => {
2258 update_state_finished(
2259 tx,
2260 execution_id,
2261 &appending_version,
2262 req.created_at,
2263 PendingStateFinishedResultKind::from(result),
2264 )
2265 .await?;
2266 return Ok((
2267 appending_version,
2268 AppendNotifier {
2269 pending_at: None,
2270 execution_finished: Some(NotifierExecutionFinished {
2271 execution_id: execution_id.clone(),
2272 retval: result.clone(),
2273 }),
2274 response: None,
2275 },
2276 ));
2277 }
2278
2279 ExecutionRequest::HistoryEvent {
2280 event:
2281 HistoryEvent::JoinSetCreate { .. }
2282 | HistoryEvent::JoinSetRequest {
2283 request: JoinSetRequest::ChildExecutionRequest { .. },
2284 ..
2285 }
2286 | HistoryEvent::Persist { .. }
2287 | HistoryEvent::Schedule { .. }
2288 | HistoryEvent::Stub { .. }
2289 | HistoryEvent::JoinNextTooMany { .. }
2290 | HistoryEvent::JoinNextTry { .. },
2291 } => {
2292 return Ok((
2293 bump_state_next_version(tx, execution_id, &appending_version, None).await?,
2294 AppendNotifier::default(),
2295 ));
2296 }
2297
2298 ExecutionRequest::HistoryEvent {
2299 event:
2300 HistoryEvent::JoinSetRequest {
2301 join_set_id,
2302 request:
2303 JoinSetRequest::DelayRequest {
2304 delay_id,
2305 expires_at,
2306 ..
2307 },
2308 },
2309 } => {
2310 return Ok((
2311 bump_state_next_version(
2312 tx,
2313 execution_id,
2314 &appending_version,
2315 Some(DelayReq {
2316 join_set_id: join_set_id.clone(),
2317 delay_id: delay_id.clone(),
2318 expires_at: *expires_at,
2319 }),
2320 )
2321 .await?,
2322 AppendNotifier::default(),
2323 ));
2324 }
2325
2326 ExecutionRequest::HistoryEvent {
2327 event:
2328 HistoryEvent::JoinNext {
2329 join_set_id,
2330 run_expires_at,
2331 closing,
2332 requested_ffqn: _,
2333 },
2334 } => {
2335 let join_next_count = count_join_next(tx, execution_id, join_set_id).await?;
2337
2338 let nth_response =
2340 nth_response(tx, execution_id, join_set_id, join_next_count - 1).await?;
2341
2342 trace!("join_next_count: {join_next_count}, nth_response: {nth_response:?}");
2343 assert!(join_next_count > 0);
2344
2345 if let Some(ResponseWithCursor {
2346 event:
2347 JoinSetResponseEventOuter {
2348 created_at: nth_created_at,
2349 ..
2350 },
2351 cursor: _,
2352 }) = nth_response
2353 {
2354 let scheduled_at = std::cmp::max(*run_expires_at, nth_created_at);
2355 let (next_version, notifier) = update_state_pending_after_event_appended(
2356 tx,
2357 execution_id,
2358 &appending_version,
2359 scheduled_at,
2360 false, combined_state.execution_with_state.component_digest,
2362 )
2363 .await?;
2364 return Ok((next_version, notifier));
2365 }
2366
2367 return Ok((
2368 update_state_blocked(
2369 tx,
2370 execution_id,
2371 &appending_version,
2372 join_set_id,
2373 *run_expires_at,
2374 *closing,
2375 )
2376 .await?,
2377 AppendNotifier::default(),
2378 ));
2379 }
2380 }
2381}
2382
2383async fn append_response(
2384 tx: &Transaction<'_>,
2385 execution_id: &ExecutionId,
2386 event: JoinSetResponseEventOuter,
2387) -> Result<AppendNotifier, DbErrorWrite> {
2388 let join_set_id = &event.event.join_set_id;
2389
2390 let (delay_id, delay_success) = match &event.event.event {
2391 JoinSetResponse::DelayFinished { delay_id, result } => {
2392 (Some(delay_id.to_string()), Some(result.is_ok()))
2393 }
2394 JoinSetResponse::ChildExecutionFinished { .. } => (None, None),
2395 };
2396
2397 let (child_execution_id, finished_version) = match &event.event.event {
2398 JoinSetResponse::ChildExecutionFinished {
2399 child_execution_id,
2400 finished_version,
2401 result: _,
2402 } => (
2403 Some(child_execution_id.to_string()),
2404 Some(i64::from(finished_version.0)),
2405 ),
2406 JoinSetResponse::DelayFinished { .. } => (None, None),
2407 };
2408
2409 let row = tx.query_one(
2410 "INSERT INTO t_join_set_response (execution_id, created_at, join_set_id, delay_id, delay_success, child_execution_id, finished_version) \
2411 VALUES ($1, $2, $3, $4, $5, $6, $7) RETURNING id",
2412 &[
2413 &execution_id.to_string(),
2414 &event.created_at,
2415 &join_set_id.to_string(),
2416 &delay_id,
2417 &delay_success,
2418 &child_execution_id,
2419 &finished_version,
2420 ]
2421 ).await?;
2422 let cursor = ResponseCursor(
2423 u32::try_from(get::<i64, _>(&row, 0)?)
2424 .map_err(|_| consistency_db_err("t_join_set_response.id must not be negative"))?,
2425 );
2426 let combined_state = get_combined_state(tx, execution_id).await?;
2428 debug!("previous_pending_state: {combined_state:?}");
2429
2430 let mut notifier = if let PendingStateMergedPause::BlockedByJoinSet {
2431 state:
2432 PendingStateBlockedByJoinSet {
2433 join_set_id: found_join_set_id,
2434 lock_expires_at, closing: _,
2436 },
2437 paused: _,
2438 } =
2439 PendingStateMergedPause::from(combined_state.execution_with_state.pending_state)
2440 && *join_set_id == found_join_set_id
2441 {
2442 let scheduled_at = std::cmp::max(lock_expires_at, event.created_at);
2443 update_state_pending_after_response_appended(
2445 tx,
2446 execution_id,
2447 scheduled_at,
2448 combined_state.execution_with_state.component_digest,
2449 )
2450 .await?
2451 } else {
2452 AppendNotifier::default()
2453 };
2454
2455 if let JoinSetResponseEvent {
2456 join_set_id,
2457 event:
2458 JoinSetResponse::DelayFinished {
2459 delay_id,
2460 result: _,
2461 },
2462 } = &event.event
2463 {
2464 debug!(%join_set_id, %delay_id, "Deleting from `t_delay`");
2465 tx.execute(
2466 "DELETE FROM t_delay WHERE execution_id = $1 AND join_set_id = $2 AND delay_id = $3",
2467 &[
2468 &execution_id.to_string(),
2469 &join_set_id.to_string(),
2470 &delay_id.to_string(),
2471 ],
2472 )
2473 .await?;
2474 }
2475
2476 notifier.response = Some((execution_id.clone(), ResponseWithCursor { cursor, event }));
2477 Ok(notifier)
2478}
2479
2480async fn append_backtrace(
2481 tx: &Transaction<'_>,
2482 backtrace_info: &BacktraceInfo,
2483) -> Result<(), DbErrorWrite> {
2484 let backtrace_hash = backtrace_info.wasm_backtrace.hash();
2486
2487 tx.execute(
2489 "INSERT INTO t_wasm_backtrace (backtrace_hash, wasm_backtrace) \
2490 VALUES ($1, $2) \
2491 ON CONFLICT (backtrace_hash) DO NOTHING",
2492 &[
2493 &backtrace_hash.as_slice(),
2494 &Json(&backtrace_info.wasm_backtrace),
2495 ],
2496 )
2497 .await?;
2498
2499 tx.execute(
2501 "INSERT INTO t_execution_backtrace \
2502 (execution_id, component_id, version_min_including, version_max_excluding, backtrace_hash) \
2503 VALUES ($1, $2, $3, $4, $5)",
2504 &[
2505 &backtrace_info.execution_id.to_string(),
2506 &Json(&backtrace_info.component_id),
2507 &i64::from(backtrace_info.version_min_including.0),
2508 &i64::from(backtrace_info.version_max_excluding.0),
2509 &backtrace_hash.as_slice(),
2510 ],
2511 )
2512 .await?;
2513
2514 Ok(())
2515}
2516
2517async fn append_log(tx: &Transaction<'_>, row: &LogInfoAppendRow) -> Result<(), DbErrorWrite> {
2518 let (level, message, stream_type, payload, created_at) = match &row.log_entry {
2519 LogEntry::Log {
2520 created_at,
2521 level,
2522 message,
2523 } => (
2524 Some(*level as i32),
2525 Some(message.as_str()),
2526 None::<i32>,
2527 None::<&[u8]>,
2528 created_at,
2529 ),
2530 LogEntry::Stream {
2531 created_at,
2532 payload,
2533 stream_type,
2534 } => (
2535 None::<i32>,
2536 None::<&str>,
2537 Some(*stream_type as i32),
2538 Some(payload.as_slice()),
2539 created_at,
2540 ),
2541 };
2542
2543 tx.execute(
2544 "INSERT INTO t_log (
2545 execution_id,
2546 run_id,
2547 created_at,
2548 level,
2549 message,
2550 stream_type,
2551 payload
2552 ) VALUES ($1, $2, $3, $4, $5, $6, $7)",
2553 &[
2554 &row.execution_id.to_string(),
2555 &row.run_id.to_string(),
2556 &created_at,
2557 &level,
2558 &message,
2559 &stream_type,
2560 &payload,
2561 ],
2562 )
2563 .await?;
2564
2565 Ok(())
2566}
2567
2568async fn get_execution_log(
2569 tx: &Transaction<'_>,
2570 execution_id: &ExecutionId,
2571) -> Result<concepts::storage::ExecutionLog, DbErrorRead> {
2572 let rows = tx
2573 .query(
2574 "SELECT created_at, json_value, version FROM t_execution_log WHERE \
2575 execution_id = $1 ORDER BY version",
2576 &[&execution_id.to_string()],
2577 )
2578 .await
2579 .map_err(DbErrorRead::from)?;
2580
2581 if rows.is_empty() {
2582 return Err(DbErrorRead::NotFound);
2583 }
2584
2585 let mut events = Vec::with_capacity(rows.len());
2586 for row in rows {
2587 let created_at: DateTime<Utc> = get(&row, "created_at")?;
2588 let event: Json<ExecutionRequest> = get(&row, "json_value")?;
2589 let event = event.0;
2590 let version: i64 = get(&row, "version")?;
2591 let version = Version::try_from(version)
2592 .map_err(|_| consistency_db_err("version must be non-negative"))?;
2593
2594 events.push(ExecutionEvent {
2595 created_at,
2596 event,
2597 backtrace_id: None,
2598 version,
2599 });
2600 }
2601
2602 let combined_state = get_combined_state(tx, execution_id).await?;
2603 let responses = list_responses(tx, execution_id, None).await?;
2604
2605 Ok(concepts::storage::ExecutionLog {
2606 execution_id: execution_id.clone(),
2607 events,
2608 responses,
2609 next_version: combined_state.get_next_version_or_finished(),
2610 pending_state: combined_state.execution_with_state.pending_state,
2611 component_digest: combined_state.execution_with_state.component_digest,
2612 component_type: combined_state.execution_with_state.component_type,
2613 deployment_id: combined_state.execution_with_state.deployment_id,
2614 })
2615}
2616
2617async fn get_max_version(
2618 tx: &Transaction<'_>,
2619 execution_id: &ExecutionId,
2620) -> Result<Version, DbErrorRead> {
2621 let row = tx
2622 .query_one(
2623 "SELECT MAX(version) as version FROM t_execution_log WHERE execution_id = $1",
2624 &[&execution_id.to_string()],
2625 )
2626 .await?;
2627 let max_version: i64 = get(&row, "version")?;
2628 let max_version = Version::try_from(max_version)
2629 .map_err(|_| consistency_db_err("version must be non-negative"))?;
2630 Ok(max_version)
2631}
2632
2633async fn get_max_response_cursor(
2634 tx: &Transaction<'_>,
2635 execution_id: &ExecutionId,
2636) -> Result<ResponseCursor, DbErrorRead> {
2637 let row = tx
2638 .query_one(
2639 "SELECT MAX(id) as id FROM t_join_set_response WHERE execution_id = $1",
2640 &[&execution_id.to_string()],
2641 )
2642 .await?;
2643 let max_cursor = get::<Option<i64>, _>(&row, "id")?.unwrap_or_default();
2645 let max_cursor = ResponseCursor(
2646 u32::try_from(max_cursor).map_err(|_| consistency_db_err("id must not be negative"))?,
2647 );
2648 Ok(max_cursor)
2649}
2650
2651async fn list_execution_events(
2652 tx: &Transaction<'_>,
2653 execution_id: &ExecutionId,
2654 pagination: Pagination<VersionType>,
2655 include_backtrace_id: bool,
2656) -> Result<Vec<ExecutionEvent>, DbErrorRead> {
2657 let mut params: Vec<Box<dyn ToSql + Send + Sync>> = Vec::new();
2658 let mut add_param = |p: Box<dyn ToSql + Send + Sync>| {
2659 params.push(p);
2660 format!("${}", params.len())
2661 };
2662
2663 let p_execution_id = add_param(Box::new(execution_id.to_string()));
2664
2665 let (cursor, length, rel, is_desc) = match &pagination {
2666 Pagination::NewerThan {
2667 cursor,
2668 length,
2669 including_cursor,
2670 } => (
2671 *cursor,
2672 *length,
2673 if *including_cursor { ">=" } else { ">" },
2674 false,
2675 ),
2676 Pagination::OlderThan {
2677 cursor,
2678 length,
2679 including_cursor,
2680 } => (
2681 *cursor,
2682 *length,
2683 if *including_cursor { "<=" } else { "<" },
2684 true,
2685 ),
2686 };
2687 let p_cursor = add_param(Box::new(i64::from(cursor)));
2688 let p_limit = add_param(Box::new(i64::from(length)));
2689
2690 let base_select = if include_backtrace_id {
2691 format!(
2692 "SELECT
2693 log.created_at,
2694 log.json_value,
2695 log.version,
2696 bt.version_min_including AS backtrace_id
2697 FROM
2698 t_execution_log AS log
2699 LEFT OUTER JOIN
2700 t_execution_backtrace AS bt ON log.execution_id = bt.execution_id
2701 AND log.version >= bt.version_min_including
2702 AND log.version < bt.version_max_excluding
2703 WHERE
2704 log.execution_id = {p_execution_id}
2705 AND log.version {rel} {p_cursor}"
2706 )
2707 } else {
2708 format!(
2709 "SELECT
2710 created_at, json_value, NULL::BIGINT as backtrace_id, version
2711 FROM t_execution_log WHERE
2712 execution_id = {p_execution_id} AND version {rel} {p_cursor}"
2713 )
2714 };
2715
2716 let order = if is_desc { "DESC" } else { "ASC" };
2717 let mut sql = format!("{base_select} ORDER BY version {order} LIMIT {p_limit}");
2718
2719 if is_desc {
2721 sql = format!("SELECT * FROM ({sql}) AS sub ORDER BY version ASC");
2722 }
2723
2724 let params_refs: Vec<&(dyn ToSql + Sync)> = params
2725 .iter()
2726 .map(|p| p.as_ref() as &(dyn ToSql + Sync))
2727 .collect();
2728
2729 let rows = tx
2730 .query(&sql, ¶ms_refs)
2731 .await
2732 .map_err(DbErrorRead::from)?;
2733
2734 let mut events = Vec::with_capacity(rows.len());
2735 for row in rows {
2736 let created_at: DateTime<Utc> = get(&row, "created_at")?;
2737 let backtrace_id = get::<Option<i64>, _>(&row, "backtrace_id")?
2738 .map(Version::try_from)
2739 .transpose()
2740 .map_err(|_| consistency_db_err("version must be non-negative"))?;
2741
2742 let version = get::<i64, _>(&row, "version")?;
2743 let version = Version::new(
2744 VersionType::try_from(version)
2745 .map_err(|_| consistency_db_err("version must be non-negative"))?,
2746 );
2747 let event_req: Json<ExecutionRequest> = get(&row, "json_value")?;
2748 let event_req = event_req.0;
2749
2750 events.push(ExecutionEvent {
2751 created_at,
2752 event: event_req,
2753 backtrace_id,
2754 version,
2755 });
2756 }
2757 Ok(events)
2758}
2759
2760async fn get_execution_event(
2761 tx: &Transaction<'_>,
2762 execution_id: &ExecutionId,
2763 version: VersionType,
2764) -> Result<ExecutionEvent, DbErrorRead> {
2765 let row = tx
2766 .query_one(
2767 "SELECT created_at, json_value, version FROM t_execution_log WHERE \
2768 execution_id = $1 AND version = $2",
2769 &[&execution_id.to_string(), &i64::from(version)],
2770 )
2771 .await?;
2772
2773 let created_at: DateTime<Utc> = get(&row, "created_at")?;
2774 let json_val: Json<ExecutionRequest> = get(&row, "json_value")?;
2775 let version = get::<i64, _>(&row, "version")?;
2776 let version = Version::try_from(version)
2777 .map_err(|_| consistency_db_err("version must be non-negative"))?;
2778 let event = json_val.0;
2779
2780 Ok(ExecutionEvent {
2781 created_at,
2782 event,
2783 backtrace_id: None,
2784 version,
2785 })
2786}
2787
2788async fn get_last_execution_event(
2789 tx: &Transaction<'_>,
2790 execution_id: &ExecutionId,
2791) -> Result<ExecutionEvent, DbErrorRead> {
2792 let row = tx
2793 .query_one(
2794 "SELECT created_at, json_value, version FROM t_execution_log WHERE \
2795 execution_id = $1 ORDER BY version DESC LIMIT 1",
2796 &[&execution_id.to_string()],
2797 )
2798 .await?;
2799
2800 let created_at: DateTime<Utc> = get(&row, "created_at")?;
2801 let event: Json<ExecutionRequest> = get(&row, "json_value")?;
2802 let event = event.0;
2803 let version: i64 = get(&row, "version")?;
2804 let version = Version::try_from(version)
2805 .map_err(|_| consistency_db_err("version must be non-negative"))?;
2806
2807 Ok(ExecutionEvent {
2808 created_at,
2809 event,
2810 backtrace_id: None,
2811 version,
2812 })
2813}
2814
2815async fn delay_response(
2816 tx: &Transaction<'_>,
2817 execution_id: &ExecutionId,
2818 delay_id: &DelayId,
2819) -> Result<Option<bool>, DbErrorRead> {
2820 let row = tx
2821 .query_opt(
2822 "SELECT delay_success \
2823 FROM t_join_set_response \
2824 WHERE \
2825 execution_id = $1 AND delay_id = $2",
2826 &[&execution_id.to_string(), &delay_id.to_string()],
2827 )
2828 .await?;
2829
2830 match row {
2831 Some(r) => Ok(Some(get::<bool, _>(&r, "delay_success")?)),
2832 None => Ok(None),
2833 }
2834}
2835
2836#[instrument(level = Level::TRACE, skip_all)]
2837async fn get_responses_after(
2838 tx: &Transaction<'_>,
2839 execution_id: &ExecutionId,
2840 last_response: ResponseCursor,
2841) -> Result<Vec<ResponseWithCursor>, DbErrorRead> {
2842 let rows = tx
2843 .query(
2844 "SELECT r.id, r.created_at, r.join_set_id, \
2845 r.delay_id, r.delay_success, \
2846 r.child_execution_id, r.finished_version, child.json_value \
2847 FROM t_join_set_response r LEFT OUTER JOIN t_execution_log child ON r.child_execution_id = child.execution_id \
2848 WHERE \
2849 r.id > $1 AND \
2850 r.execution_id = $2 AND \
2851 ( \
2852 r.finished_version = child.version \
2853 OR \
2854 r.child_execution_id IS NULL \
2855 ) \
2856 ORDER BY id \
2857 ",
2858 &[
2859 &i64::from(last_response.0),
2860 &execution_id.to_string(),
2861 ]
2862 )
2863 .await?;
2864
2865 let mut results = Vec::with_capacity(rows.len());
2866 for row in rows {
2867 let resp = parse_response_with_cursor(&row)?;
2868 results.push(resp);
2869 }
2870 Ok(results)
2871}
2872
2873async fn get_pending_of_single_ffqn(
2874 tx: &Transaction<'_>,
2875 batch_size: u32,
2876 pending_at_or_sooner: DateTime<Utc>,
2877 ffqn: &FunctionFqn,
2878 select_strategy: SelectStrategy,
2879) -> Result<Vec<(ExecutionId, Version)>, ()> {
2880 let rows = tx
2881 .query(
2882 &format!(
2883 r"
2884 SELECT execution_id, corresponding_version FROM t_state
2885 WHERE
2886 state = '{STATE_PENDING_AT}' AND
2887 pending_expires_finished <= $1 AND ffqn = $2
2888 AND is_paused = false
2889 ORDER BY pending_expires_finished
2890 {}
2891 LIMIT $3
2892 ",
2893 if select_strategy == SelectStrategy::LockForUpdate {
2894 "FOR UPDATE SKIP LOCKED"
2895 } else {
2896 ""
2897 }
2898 ),
2899 &[
2900 &pending_at_or_sooner,
2901 &ffqn.to_string(),
2902 &(i64::from(batch_size)),
2903 ],
2904 )
2905 .await
2906 .map_err(|err| {
2907 warn!("Ignoring consistency error {err:?}");
2908 })?;
2909
2910 let mut result = Vec::with_capacity(rows.len());
2911 for row in rows {
2912 let unpack = || -> Result<(ExecutionId, Version), DbErrorGeneric> {
2913 let eid_str: String = get(&row, "execution_id")?;
2914 let corresponding_version: i64 = get(&row, "corresponding_version")?;
2915 let corresponding_version = Version::try_from(corresponding_version)
2916 .map_err(|_| consistency_db_err("version must be non-negative"))?;
2917
2918 if let Ok(eid) = ExecutionId::from_str(&eid_str) {
2919 return Ok((eid, corresponding_version.increment()));
2920 }
2921 Err(consistency_db_err("invalid execution_id"))
2922 };
2923
2924 match unpack() {
2925 Ok(val) => result.push(val),
2926 Err(err) => warn!("Ignoring corrupted row in pending check: {err:?}"),
2927 }
2928 }
2929 Ok(result)
2930}
2931
2932async fn get_pending_by_ffqns(
2934 tx: &Transaction<'_>,
2935 batch_size: u32,
2936 pending_at_or_sooner: DateTime<Utc>,
2937 ffqns: &[FunctionFqn],
2938 select_strategy: SelectStrategy,
2939) -> Result<Vec<(ExecutionId, Version)>, DbErrorGeneric> {
2940 let batch_size = usize::try_from(batch_size).expect("16 bit systems are unsupported");
2941 let mut execution_ids_versions = Vec::with_capacity(batch_size);
2942
2943 for ffqn in ffqns {
2944 let needed = batch_size - execution_ids_versions.len();
2945 if needed == 0 {
2946 break;
2947 }
2948 let needed = u32::try_from(needed).expect("u32 - usize cannot overflow an 32");
2949 if let Ok(execs) =
2950 get_pending_of_single_ffqn(tx, needed, pending_at_or_sooner, ffqn, select_strategy)
2951 .await
2952 {
2953 execution_ids_versions.extend(execs);
2954 }
2955 }
2956
2957 Ok(execution_ids_versions)
2958}
2959
2960#[derive(Debug, Copy, Clone, PartialEq, Eq)]
2961enum SelectStrategy {
2962 Read,
2963 LockForUpdate,
2964}
2965
2966async fn get_pending_by_component_input_digest(
2967 tx: &Transaction<'_>,
2968 batch_size: u32,
2969 pending_at_or_sooner: DateTime<Utc>,
2970 input_digest: &InputContentDigest,
2971 select_strategy: SelectStrategy,
2972) -> Result<Vec<(ExecutionId, Version)>, DbErrorGeneric> {
2973 let rows = tx
2974 .query(
2975 &format!(
2976 r"
2977 SELECT execution_id, corresponding_version FROM t_state WHERE
2978 state = '{STATE_PENDING_AT}' AND
2979 pending_expires_finished <= $1 AND
2980 component_id_input_digest = $2
2981 AND is_paused = false
2982 ORDER BY pending_expires_finished
2983 {}
2984 LIMIT $3
2985 ",
2986 if select_strategy == SelectStrategy::LockForUpdate {
2987 "FOR UPDATE SKIP LOCKED"
2988 } else {
2989 ""
2990 }
2991 ),
2992 &[&pending_at_or_sooner, &input_digest, &i64::from(batch_size)],
2993 )
2994 .await?;
2995
2996 let mut result = Vec::with_capacity(rows.len());
2997 for row in rows {
2998 let unpack = || -> Result<(ExecutionId, Version), DbErrorGeneric> {
2999 let eid_str: String = get(&row, "execution_id")?;
3000 let corresponding_version: i64 = get(&row, "corresponding_version")?;
3001 let corresponding_version = Version::try_from(corresponding_version)
3002 .map_err(|_| consistency_db_err("version must be non-negative"))?;
3003
3004 let eid = ExecutionId::from_str(&eid_str)
3005 .map_err(|err| consistency_db_err(err.to_string()))?;
3006 Ok((eid, corresponding_version.increment()))
3007 };
3008
3009 match unpack() {
3010 Ok(val) => result.push(val),
3011 Err(err) => {
3012 warn!("Skipping corrupted row in get_pending_by_component_input_digest: {err:?}");
3013 }
3014 }
3015 }
3016
3017 Ok(result)
3018}
3019
3020fn notify_pending_locked(
3021 notifier: &NotifierPendingAt,
3022 current_time: DateTime<Utc>,
3023 ffqn_to_pending_subscription: &std::sync::MutexGuard<PendingFfqnSubscribersHolder>,
3024) {
3025 if notifier.scheduled_at <= current_time {
3026 ffqn_to_pending_subscription.notify(notifier);
3027 }
3028}
3029
3030async fn upgrade_execution_component(
3031 tx: &Transaction<'_>,
3032 execution_id: &ExecutionId,
3033 old: &InputContentDigest,
3034 new: &InputContentDigest,
3035) -> Result<(), DbErrorWrite> {
3036 debug!("Updating t_state to component {new}");
3037
3038 let updated = tx
3039 .execute(
3040 r"
3041 UPDATE t_state
3042 SET
3043 updated_at = CURRENT_TIMESTAMP,
3044 component_id_input_digest = $1
3045 WHERE
3046 execution_id = $2 AND
3047 component_id_input_digest = $3
3048 ",
3049 &[
3050 &new.as_slice(), &execution_id.to_string(), &old.as_slice(), ],
3054 )
3055 .await?;
3056
3057 if updated != 1 {
3058 return Err(DbErrorWrite::NotFound);
3059 }
3060 Ok(())
3061}
3062
3063impl PostgresConnection {
3064 #[instrument(level = Level::TRACE, skip_all)]
3066 fn notify_all(&self, notifiers: Vec<AppendNotifier>, current_time: DateTime<Utc>) {
3067 let (pending_ats, finished_execs, responses) = {
3068 let (mut pending_ats, mut finished_execs, mut responses) =
3069 (Vec::new(), Vec::new(), Vec::new());
3070 for notifier in notifiers {
3071 if let Some(pending_at) = notifier.pending_at {
3072 pending_ats.push(pending_at);
3073 }
3074 if let Some(finished) = notifier.execution_finished {
3075 finished_execs.push(finished);
3076 }
3077 if let Some(response) = notifier.response {
3078 responses.push(response);
3079 }
3080 }
3081 (pending_ats, finished_execs, responses)
3082 };
3083
3084 if !pending_ats.is_empty() {
3086 let guard = self.pending_subscribers.lock().unwrap();
3087 for pending_at in pending_ats {
3088 notify_pending_locked(&pending_at, current_time, &guard);
3089 }
3090 }
3091 if !finished_execs.is_empty() {
3093 let mut guard = self.execution_finished_subscribers.lock().unwrap();
3094 for finished in finished_execs {
3095 if let Some(listeners_of_exe_id) = guard.remove(&finished.execution_id) {
3096 for (_tag, sender) in listeners_of_exe_id {
3097 let _ = sender.send(finished.retval.clone());
3098 }
3099 }
3100 }
3101 }
3102 if !responses.is_empty() {
3104 let mut guard = self.response_subscribers.lock().unwrap();
3105 for (execution_id, response) in responses {
3106 if let Some((sender, _)) = guard.remove(&execution_id) {
3107 let _ = sender.send(response);
3108 }
3109 }
3110 }
3111 }
3112}
3113
3114#[async_trait]
3115impl DbExecutor for PostgresConnection {
3116 #[instrument(level = Level::TRACE, skip(self))]
3117 async fn lock_pending_by_ffqns(
3118 &self,
3119 batch_size: u32,
3120 pending_at_or_sooner: DateTime<Utc>,
3121 ffqns: Arc<[FunctionFqn]>,
3122 created_at: DateTime<Utc>,
3123 component_id: ComponentId,
3124 deployment_id: DeploymentId,
3125 executor_id: ExecutorId,
3126 lock_expires_at: DateTime<Utc>,
3127 run_id: RunId,
3128 retry_config: ComponentRetryConfig,
3129 ) -> Result<LockPendingResponse, DbErrorWrite> {
3130 let mut client_guard = self.client.lock().await;
3131 let tx = client_guard.transaction().await?;
3132
3133 let execution_ids_versions = get_pending_by_ffqns(
3134 &tx,
3135 batch_size,
3136 pending_at_or_sooner,
3137 &ffqns,
3138 SelectStrategy::LockForUpdate,
3139 )
3140 .await?;
3141
3142 if execution_ids_versions.is_empty() {
3143 tx.commit().await?;
3146 return Ok(vec![]);
3147 }
3148
3149 debug!("Locking {execution_ids_versions:?}");
3150
3151 let mut locked_execs = Vec::with_capacity(execution_ids_versions.len());
3153 for (execution_id, version) in execution_ids_versions {
3154 match lock_single_execution(
3155 &tx,
3156 created_at,
3157 &component_id,
3158 deployment_id,
3159 &execution_id,
3160 run_id,
3161 &version,
3162 executor_id,
3163 lock_expires_at,
3164 retry_config,
3165 )
3166 .await
3167 {
3168 Ok(locked) => locked_execs.push(locked),
3169 Err(err) => {
3170 tx.rollback().await?; debug!("Locking row {execution_id} failed - {err:?}");
3172 return Err(err);
3173 }
3174 }
3175 }
3176
3177 tx.commit().await?;
3178
3179 Ok(locked_execs)
3180 }
3181
3182 #[instrument(level = Level::TRACE, skip(self))]
3183 async fn lock_pending_by_component_digest(
3184 &self,
3185 batch_size: u32,
3186 pending_at_or_sooner: DateTime<Utc>,
3187 component_id: &ComponentId,
3188 deployment_id: DeploymentId,
3189 created_at: DateTime<Utc>,
3190 executor_id: ExecutorId,
3191 lock_expires_at: DateTime<Utc>,
3192 run_id: RunId,
3193 retry_config: ComponentRetryConfig,
3194 ) -> Result<LockPendingResponse, DbErrorWrite> {
3195 let mut client_guard = self.client.lock().await;
3196 let tx = client_guard.transaction().await?;
3197
3198 let execution_ids_versions = get_pending_by_component_input_digest(
3199 &tx,
3200 batch_size,
3201 pending_at_or_sooner,
3202 &component_id.input_digest,
3203 SelectStrategy::LockForUpdate,
3204 )
3205 .await?;
3206
3207 if execution_ids_versions.is_empty() {
3208 tx.commit().await?;
3209 return Ok(vec![]);
3210 }
3211
3212 debug!("Locking {execution_ids_versions:?}");
3213
3214 let mut locked_execs = Vec::with_capacity(execution_ids_versions.len());
3215 for (execution_id, version) in execution_ids_versions {
3216 match lock_single_execution(
3217 &tx,
3218 created_at,
3219 component_id,
3220 deployment_id,
3221 &execution_id,
3222 run_id,
3223 &version,
3224 executor_id,
3225 lock_expires_at,
3226 retry_config,
3227 )
3228 .await
3229 {
3230 Ok(locked) => locked_execs.push(locked),
3231 Err(err) => {
3232 tx.rollback().await?; debug!("Locking row {execution_id} failed - {err:?}");
3234 return Err(err);
3235 }
3236 }
3237 }
3238
3239 tx.commit().await?;
3240 Ok(locked_execs)
3241 }
3242
3243 #[cfg(feature = "test")]
3244 #[instrument(level = Level::DEBUG, skip(self))]
3245 async fn lock_one(
3246 &self,
3247 created_at: DateTime<Utc>,
3248 component_id: ComponentId,
3249 deployment_id: DeploymentId,
3250 execution_id: &ExecutionId,
3251 run_id: RunId,
3252 version: Version,
3253 executor_id: ExecutorId,
3254 lock_expires_at: DateTime<Utc>,
3255 retry_config: ComponentRetryConfig,
3256 ) -> Result<LockedExecution, DbErrorWrite> {
3257 debug!(%execution_id, "lock_one");
3258 let mut client_guard = self.client.lock().await;
3259 let tx = client_guard.transaction().await?;
3260
3261 let res = lock_single_execution(
3262 &tx,
3263 created_at,
3264 &component_id,
3265 deployment_id,
3266 execution_id,
3267 run_id,
3268 &version,
3269 executor_id,
3270 lock_expires_at,
3271 retry_config,
3272 )
3273 .await?;
3274
3275 tx.commit().await?;
3276 Ok(res)
3277 }
3278
3279 #[instrument(level = Level::DEBUG, skip(self, req))]
3280 async fn append(
3281 &self,
3282 execution_id: ExecutionId,
3283 version: Version,
3284 req: AppendRequest,
3285 ) -> Result<AppendResponse, DbErrorWrite> {
3286 debug!(%req, "append");
3287 trace!(?req, "append");
3288 let created_at = req.created_at;
3289
3290 let mut client_guard = self.client.lock().await;
3291 let tx = client_guard.transaction().await?;
3292
3293 let (new_version, notifier) = append(&tx, &execution_id, req, version).await?;
3294
3295 tx.commit().await?;
3296
3297 drop(client_guard);
3299
3300 self.notify_all(vec![notifier], created_at);
3301 Ok(new_version)
3302 }
3303
3304 #[instrument(level = Level::DEBUG, skip_all)]
3305 async fn append_batch_respond_to_parent(
3306 &self,
3307 events: AppendEventsToExecution,
3308 response: AppendResponseToExecution,
3309 current_time: DateTime<Utc>,
3310 ) -> Result<AppendBatchResponse, DbErrorWrite> {
3311 debug!("append_batch_respond_to_parent");
3312 if events.execution_id == response.parent_execution_id {
3313 return Err(DbErrorWrite::NonRetriable(
3314 DbErrorWriteNonRetriable::ValidationFailed(
3315 "Parameters `execution_id` and `parent_execution_id` cannot be the same".into(),
3316 ),
3317 ));
3318 }
3319 if events.batch.is_empty() {
3320 return Err(DbErrorWrite::NonRetriable(
3321 DbErrorWriteNonRetriable::ValidationFailed("batch cannot be empty".into()),
3322 ));
3323 }
3324
3325 let mut client_guard = self.client.lock().await;
3326 let tx = client_guard.transaction().await?;
3327
3328 let mut version = events.version;
3329 let mut notifiers = Vec::new();
3330
3331 for append_request in events.batch {
3332 let (v, n) = append(&tx, &events.execution_id, append_request, version).await?;
3333 version = v;
3334 notifiers.push(n);
3335 }
3336
3337 let pending_at_parent = append_response(
3338 &tx,
3339 &response.parent_execution_id,
3340 JoinSetResponseEventOuter {
3341 created_at: response.created_at,
3342 event: JoinSetResponseEvent {
3343 join_set_id: response.join_set_id,
3344 event: JoinSetResponse::ChildExecutionFinished {
3345 child_execution_id: response.child_execution_id,
3346 finished_version: response.finished_version,
3347 result: response.result,
3348 },
3349 },
3350 },
3351 )
3352 .await?;
3353 notifiers.push(pending_at_parent);
3354
3355 tx.commit().await?;
3356 drop(client_guard);
3357
3358 self.notify_all(notifiers, current_time);
3359 Ok(version)
3360 }
3361
3362 #[instrument(level = Level::TRACE, skip(self, timeout_fut))]
3363 async fn wait_for_pending_by_ffqn(
3364 &self,
3365 pending_at_or_sooner: DateTime<Utc>,
3366 ffqns: Arc<[FunctionFqn]>,
3367 timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
3368 ) {
3369 let unique_tag: u64 = rand::random();
3370 let (sender, mut receiver) = mpsc::channel(1);
3371 {
3372 let mut pending_subscribers = self.pending_subscribers.lock().unwrap();
3373 for ffqn in ffqns.as_ref() {
3374 pending_subscribers.insert_ffqn(ffqn.clone(), (sender.clone(), unique_tag));
3375 }
3376 }
3377
3378 async {
3379 let mut db_has_pending = false;
3380 {
3381 let mut client_guard = self.client.lock().await;
3383 if let Ok(tx) = client_guard.transaction().await {
3385 if let Ok(res) = get_pending_by_ffqns(
3386 &tx,
3387 1,
3388 pending_at_or_sooner,
3389 &ffqns,
3390 SelectStrategy::Read,
3391 )
3392 .await
3393 && !res.is_empty()
3394 {
3395 db_has_pending = true;
3396 }
3397 let _ = tx.commit().await;
3399 }
3400 }
3401
3402 if db_has_pending {
3403 trace!("Not waiting, database already contains new pending executions");
3404 return;
3405 }
3406
3407 tokio::select! {
3408 _ = receiver.recv() => {
3409 trace!("Received a notification");
3410 }
3411 () = timeout_fut => {
3412 }
3413 }
3414 }
3415 .await;
3416
3417 {
3419 let mut pending_subscribers = self.pending_subscribers.lock().unwrap();
3420 for ffqn in ffqns.as_ref() {
3421 match pending_subscribers.remove_ffqn(ffqn) {
3422 Some((_, tag)) if tag == unique_tag => {}
3423 Some(other) => {
3424 pending_subscribers.insert_ffqn(ffqn.clone(), other);
3425 }
3426 None => {}
3427 }
3428 }
3429 }
3430 }
3431
3432 #[instrument(level = Level::DEBUG, skip(self, timeout_fut))]
3433 async fn wait_for_pending_by_component_digest(
3434 &self,
3435 pending_at_or_sooner: DateTime<Utc>,
3436 component_digest: &InputContentDigest,
3437 timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
3438 ) {
3439 let unique_tag: u64 = rand::random();
3440 let (sender, mut receiver) = mpsc::channel(1);
3441 {
3442 let mut pending_subscribers = self.pending_subscribers.lock().unwrap();
3443 pending_subscribers
3444 .insert_by_component(component_digest.clone(), (sender.clone(), unique_tag));
3445 }
3446
3447 async {
3448 let mut db_has_pending = false;
3449 {
3450 let mut client_guard = self.client.lock().await;
3451 if let Ok(tx) = client_guard.transaction().await {
3452 if let Ok(res) = get_pending_by_component_input_digest(
3453 &tx,
3454 1,
3455 pending_at_or_sooner,
3456 component_digest,
3457 SelectStrategy::Read,
3458 )
3459 .await
3460 && !res.is_empty()
3461 {
3462 db_has_pending = true;
3463 }
3464 let _ = tx.commit().await;
3465 }
3466 }
3467
3468 if db_has_pending {
3469 trace!("Not waiting, database already contains new pending executions");
3470 return;
3471 }
3472
3473 tokio::select! {
3474 _ = receiver.recv() => {
3475 trace!("Received a notification");
3476 }
3477 () = timeout_fut => {
3478 }
3479 }
3480 }
3481 .await;
3482
3483 {
3485 let mut pending_subscribers = self.pending_subscribers.lock().unwrap();
3486 match pending_subscribers.remove_by_component(component_digest) {
3487 Some((_, tag)) if tag == unique_tag => {}
3488 Some(other) => {
3489 pending_subscribers.insert_by_component(component_digest.clone(), other);
3490 }
3491 None => {}
3492 }
3493 }
3494 }
3495
3496 async fn get_last_execution_event(
3497 &self,
3498 execution_id: &ExecutionId,
3499 ) -> Result<ExecutionEvent, DbErrorRead> {
3500 let mut client_guard = self.client.lock().await;
3501 let tx = client_guard.transaction().await?;
3502
3503 let event = get_last_execution_event(&tx, execution_id).await?;
3504
3505 tx.commit().await?;
3506 Ok(event)
3507 }
3508}
3509#[async_trait]
3510impl DbConnection for PostgresConnection {
3511 #[instrument(level = Level::DEBUG, skip_all, fields(execution_id = %req.execution_id))]
3512 async fn create(&self, req: CreateRequest) -> Result<AppendResponse, DbErrorWrite> {
3513 debug!("create");
3514 trace!(?req, "create");
3515 let created_at = req.created_at;
3516
3517 let mut client_guard = self.client.lock().await;
3518 let tx = client_guard.transaction().await?;
3519
3520 let (version, notifier) = create_inner(&tx, req.clone()).await?;
3521
3522 tx.commit().await?;
3523 drop(client_guard); self.notify_all(vec![notifier], created_at);
3526 Ok(version)
3527 }
3528
3529 #[instrument(level = Level::DEBUG, skip(self))]
3530 async fn get(
3531 &self,
3532 execution_id: &ExecutionId,
3533 ) -> Result<concepts::storage::ExecutionLog, DbErrorRead> {
3534 trace!("get");
3535 let mut client_guard = self.client.lock().await;
3536 let tx = client_guard.transaction().await?;
3537
3538 let res = get_execution_log(&tx, execution_id).await?;
3539
3540 tx.commit().await?;
3541 Ok(res)
3542 }
3543
3544 #[instrument(level = Level::DEBUG, skip(self, batch))]
3545 async fn append_batch(
3546 &self,
3547 current_time: DateTime<Utc>,
3548 batch: Vec<AppendRequest>,
3549 execution_id: ExecutionId,
3550 version: Version,
3551 ) -> Result<AppendBatchResponse, DbErrorWrite> {
3552 debug!("append_batch");
3553 trace!(?batch, "append_batch");
3554 assert!(!batch.is_empty(), "Empty batch request");
3555
3556 let mut client_guard = self.client.lock().await;
3557 let tx = client_guard.transaction().await?;
3558
3559 let mut version = version;
3560 let mut notifier = None;
3561
3562 for append_request in batch {
3563 let (v, n) = append(&tx, &execution_id, append_request, version).await?;
3564 version = v;
3565 notifier = Some(n);
3566 }
3567
3568 tx.commit().await?;
3569 drop(client_guard);
3570
3571 self.notify_all(
3572 vec![notifier.expect("checked that the batch is not empty")],
3573 current_time,
3574 );
3575 Ok(version)
3576 }
3577
3578 #[instrument(level = Level::DEBUG, skip_all, fields(%execution_id, %version))]
3579 async fn append_batch_create_new_execution(
3580 &self,
3581 current_time: DateTime<Utc>,
3582 batch: Vec<AppendRequest>,
3583 execution_id: ExecutionId,
3584 version: Version,
3585 child_req: Vec<CreateRequest>,
3586 backtraces: Vec<BacktraceInfo>,
3587 ) -> Result<AppendBatchResponse, DbErrorWrite> {
3588 debug!("append_batch_create_new_execution");
3589 trace!(?batch, ?child_req, "append_batch_create_new_execution");
3590 assert!(!batch.is_empty(), "Empty batch request");
3591
3592 let mut client_guard = self.client.lock().await;
3593 let tx = client_guard.transaction().await?;
3594
3595 let mut version = version;
3596 let mut notifier = None;
3597
3598 for append_request in batch {
3599 let (v, n) = append(&tx, &execution_id, append_request, version).await?;
3600 version = v;
3601 notifier = Some(n);
3602 }
3603
3604 let mut notifiers = Vec::new();
3605 notifiers.push(notifier.expect("checked that the batch is not empty"));
3606
3607 for req in child_req {
3608 let (_, n) = create_inner(&tx, req).await?;
3609 notifiers.push(n);
3610 }
3611 for backtrace in backtraces {
3612 append_backtrace(&tx, &backtrace).await?;
3613 }
3614 tx.commit().await?;
3615 drop(client_guard);
3616
3617 self.notify_all(notifiers, current_time);
3618 Ok(version)
3619 }
3620
3621 #[instrument(level = Level::DEBUG, skip(self, timeout_fut))]
3622 async fn subscribe_to_next_responses(
3623 &self,
3624 execution_id: &ExecutionId,
3625 last_response: ResponseCursor,
3626 timeout_fut: Pin<Box<dyn Future<Output = TimeoutOutcome> + Send>>,
3627 ) -> Result<Vec<ResponseWithCursor>, DbErrorReadWithTimeout> {
3628 debug!("next_responses");
3629 let unique_tag: u64 = rand::random();
3630 let execution_id_clone = execution_id.clone();
3631
3632 let cleanup = || {
3633 let mut guard = self.response_subscribers.lock().unwrap();
3634 match guard.remove(&execution_id_clone) {
3635 Some((_, tag)) if tag == unique_tag => {}
3636 Some(other) => {
3637 guard.insert(execution_id_clone.clone(), other);
3638 }
3639 None => {}
3640 }
3641 };
3642
3643 let receiver = {
3644 let mut client_guard = self.client.lock().await;
3645 let tx = client_guard.transaction().await?;
3646
3647 let (sender, receiver) = oneshot::channel();
3653 self.response_subscribers
3654 .lock()
3655 .unwrap()
3656 .insert(execution_id.clone(), (sender, unique_tag));
3657
3658 let responses = get_responses_after(&tx, execution_id, last_response).await?;
3659
3660 if responses.is_empty() {
3661 tx.commit().await.map_err(|err| {
3663 cleanup(); DbErrorRead::from(err)
3665 })?;
3666 receiver
3667 } else {
3668 cleanup(); tx.commit().await?;
3670 return Ok(responses);
3671 }
3672 };
3673
3674 let res = tokio::select! {
3675 resp = receiver => {
3676 match resp {
3677 Ok(resp) => Ok(vec![resp]),
3678 Err(_) => Err(DbErrorReadWithTimeout::from(DbErrorGeneric::Close)),
3679 }
3680 }
3681 outcome = timeout_fut => Err(DbErrorReadWithTimeout::Timeout(outcome)),
3682 };
3683
3684 cleanup();
3685 res
3686 }
3687
3688 #[instrument(level = Level::DEBUG, skip(self, timeout_fut))]
3689 async fn wait_for_finished_result(
3690 &self,
3691 execution_id: &ExecutionId,
3692 timeout_fut: Option<Pin<Box<dyn Future<Output = TimeoutOutcome> + Send>>>,
3693 ) -> Result<SupportedFunctionReturnValue, DbErrorReadWithTimeout> {
3694 let unique_tag: u64 = rand::random();
3695 let execution_id_clone = execution_id.clone();
3696
3697 let cleanup = || {
3698 let mut guard = self.execution_finished_subscribers.lock().unwrap();
3699 if let Some(subscribers) = guard.get_mut(&execution_id_clone) {
3700 subscribers.remove(&unique_tag);
3701 }
3702 };
3703
3704 let receiver = {
3705 let mut client_guard = self.client.lock().await;
3706 let tx = client_guard.transaction().await?;
3707
3708 let (sender, receiver) = oneshot::channel();
3710 {
3711 let mut guard = self.execution_finished_subscribers.lock().unwrap();
3712 guard
3713 .entry(execution_id.clone())
3714 .or_default()
3715 .insert(unique_tag, sender);
3716 }
3717
3718 let pending_state = get_combined_state(&tx, execution_id)
3719 .await?
3720 .execution_with_state
3721 .pending_state;
3722
3723 if let PendingState::Finished(finished) = pending_state {
3724 let event = get_execution_event(&tx, execution_id, finished.version).await?;
3725 tx.commit().await?;
3726 cleanup();
3727
3728 if let ExecutionRequest::Finished { result, .. } = event.event {
3729 return Ok(result);
3730 }
3731 error!("Mismatch, expected Finished row: {event:?} based on t_state {finished}");
3732 return Err(DbErrorReadWithTimeout::from(consistency_db_err(
3733 "cannot get finished event based on t_state version",
3734 )));
3735 }
3736 tx.commit().await?;
3737 receiver
3738 };
3739
3740 let timeout_fut = timeout_fut.unwrap_or_else(|| Box::pin(std::future::pending()));
3741 let res = tokio::select! {
3742 resp = receiver => {
3743 match resp {
3744 Ok(retval) => Ok(retval),
3745 Err(_recv_err) => Err(DbErrorGeneric::Close.into())
3746 }
3747 }
3748 outcome = timeout_fut => Err(DbErrorReadWithTimeout::Timeout(outcome)),
3749 };
3750
3751 cleanup();
3752 res
3753 }
3754
3755 #[instrument(level = Level::DEBUG, skip_all, fields(%join_set_id, %execution_id))]
3756 async fn append_delay_response(
3757 &self,
3758 created_at: DateTime<Utc>,
3759 execution_id: ExecutionId,
3760 join_set_id: JoinSetId,
3761 delay_id: DelayId,
3762 result: Result<(), ()>,
3763 ) -> Result<AppendDelayResponseOutcome, DbErrorWrite> {
3764 debug!("append_delay_response");
3765 let event = JoinSetResponseEventOuter {
3766 created_at,
3767 event: JoinSetResponseEvent {
3768 join_set_id,
3769 event: JoinSetResponse::DelayFinished {
3770 delay_id: delay_id.clone(),
3771 result,
3772 },
3773 },
3774 };
3775
3776 let mut client_guard = self.client.lock().await;
3777 let tx = client_guard.transaction().await?;
3778
3779 let res = append_response(&tx, &execution_id, event).await;
3780
3781 match res {
3782 Ok(notifier) => {
3783 tx.commit().await?;
3784 drop(client_guard);
3785 self.notify_all(vec![notifier], created_at);
3786 Ok(AppendDelayResponseOutcome::Success)
3787 }
3788 Err(DbErrorWrite::NonRetriable(DbErrorWriteNonRetriable::Conflict)) => {
3789 tx.rollback().await?;
3792
3793 let tx = client_guard.transaction().await?;
3795 let delay_success = delay_response(&tx, &execution_id, &delay_id).await?;
3796 tx.commit().await?;
3797
3798 match delay_success {
3799 Some(true) => Ok(AppendDelayResponseOutcome::AlreadyFinished),
3800 Some(false) => Ok(AppendDelayResponseOutcome::AlreadyCancelled),
3801 None => Err(DbErrorWrite::Generic(consistency_db_err(
3802 "insert failed yet select did not find the response",
3803 ))),
3804 }
3805 }
3806 Err(err) => {
3807 let _ = tx.rollback().await; Err(err)
3809 }
3810 }
3811 }
3812
3813 #[instrument(level = Level::DEBUG, skip_all)]
3814 async fn append_backtrace(&self, append: BacktraceInfo) -> Result<(), DbErrorWrite> {
3815 debug!("append_backtrace");
3816 let mut client_guard = self.client.lock().await;
3817 let tx = client_guard.transaction().await?;
3818
3819 append_backtrace(&tx, &append).await?;
3820
3821 tx.commit().await?;
3822 Ok(())
3823 }
3824
3825 #[instrument(level = Level::DEBUG, skip_all)]
3826 async fn append_backtrace_batch(&self, batch: Vec<BacktraceInfo>) -> Result<(), DbErrorWrite> {
3827 debug!("append_backtrace_batch");
3828 let mut client_guard = self.client.lock().await;
3829 let tx = client_guard.transaction().await?;
3830
3831 for append in batch {
3832 append_backtrace(&tx, &append).await?;
3833 }
3834
3835 tx.commit().await?;
3836 Ok(())
3837 }
3838
3839 #[instrument(level = Level::DEBUG, skip_all)]
3840 async fn append_log(&self, row: LogInfoAppendRow) -> Result<(), DbErrorWrite> {
3841 trace!("append_log");
3842 let mut client_guard = self.client.lock().await;
3843 let tx = client_guard.transaction().await?;
3844 append_log(&tx, &row).await?;
3845 tx.commit().await?;
3846
3847 Ok(())
3848 }
3849
3850 #[instrument(level = Level::DEBUG, skip_all)]
3851 async fn append_log_batch(&self, batch: &[LogInfoAppendRow]) -> Result<(), DbErrorWrite> {
3852 trace!("append_log_batch");
3853 let mut client_guard = self.client.lock().await;
3854 let tx = client_guard.transaction().await?;
3855 for row in batch {
3856 append_log(&tx, row).await?;
3857 }
3858 tx.commit().await?;
3859 Ok(())
3860 }
3861
3862 #[instrument(level = Level::TRACE, skip(self))]
3864 async fn get_expired_timers(
3865 &self,
3866 at: DateTime<Utc>,
3867 ) -> Result<Vec<ExpiredTimer>, DbErrorGeneric> {
3868 let mut client_guard = self.client.lock().await;
3869 let tx = client_guard.transaction().await?;
3870
3871 let rows = tx
3873 .query(
3874 "SELECT execution_id, join_set_id, delay_id FROM t_delay WHERE expires_at <= $1",
3875 &[&at],
3876 )
3877 .await?;
3878
3879 let mut expired_timers = Vec::with_capacity(rows.len());
3880 for row in rows {
3881 let unpack = || -> Result<ExpiredTimer, DbErrorGeneric> {
3882 let execution_id: String = get(&row, "execution_id")?;
3883 let execution_id = ExecutionId::from_str(&execution_id)?;
3884 let join_set_id: String = get(&row, "join_set_id")?;
3885 let join_set_id = JoinSetId::from_str(&join_set_id)?;
3886 let delay_id: String = get(&row, "delay_id")?;
3887 let delay_id = DelayId::from_str(&delay_id)?;
3888
3889 Ok(ExpiredTimer::Delay(ExpiredDelay {
3890 execution_id,
3891 join_set_id,
3892 delay_id,
3893 }))
3894 };
3895
3896 match unpack() {
3897 Ok(timer) => expired_timers.push(timer),
3898 Err(err) => warn!("Skipping corrupted row in get_expired_timers (delays): {err:?}"),
3899 }
3900 }
3901
3902 let rows = tx.query(
3904 &format!(
3905 "SELECT execution_id, last_lock_version, corresponding_version, intermittent_event_count, max_retries, retry_exp_backoff_millis, executor_id, run_id \
3906 FROM t_state \
3907 WHERE pending_expires_finished <= $1 AND state = '{STATE_LOCKED}'"
3908 ),
3909 &[&at]
3910 ).await?;
3911
3912 for row in rows {
3913 let unpack = || -> Result<ExpiredTimer, DbErrorGeneric> {
3914 let execution_id: String = get(&row, "execution_id")?;
3915 let execution_id = ExecutionId::from_str(&execution_id)?;
3916 let last_lock_version: i64 = get(&row, "last_lock_version")?;
3917 let last_lock_version = Version::try_from(last_lock_version)?;
3918
3919 let corresponding_version: i64 = get(&row, "corresponding_version")?;
3920 let corresponding_version = Version::try_from(corresponding_version)?;
3921
3922 let intermittent_event_count =
3923 u32::try_from(get::<i64, _>(&row, "intermittent_event_count")?).map_err(
3924 |_| consistency_db_err("`intermittent_event_count` must not be negative"),
3925 )?;
3926
3927 let max_retries = get::<Option<i64>, _>(&row, "max_retries")?
3928 .map(u32::try_from)
3929 .transpose()
3930 .map_err(|_| consistency_db_err("`max_retries` must not be negative"))?;
3931 let retry_exp_backoff_millis =
3932 u32::try_from(get::<i64, _>(&row, "retry_exp_backoff_millis")?).map_err(
3933 |_| consistency_db_err("`retry_exp_backoff_millis` must not be negative"),
3934 )?;
3935 let executor_id: String = get(&row, "executor_id")?;
3936 let executor_id = ExecutorId::from_str(&executor_id)?;
3937 let run_id: String = get(&row, "run_id")?;
3938 let run_id = RunId::from_str(&run_id)?;
3939
3940 Ok(ExpiredTimer::Lock(ExpiredLock {
3941 execution_id,
3942 locked_at_version: last_lock_version,
3943 next_version: corresponding_version.increment(),
3944 intermittent_event_count,
3945 max_retries,
3946 retry_exp_backoff: Duration::from_millis(u64::from(retry_exp_backoff_millis)),
3947 locked_by: LockedBy {
3948 executor_id,
3949 run_id,
3950 },
3951 }))
3952 };
3953
3954 match unpack() {
3955 Ok(timer) => expired_timers.push(timer),
3956 Err(err) => warn!("Skipping corrupted row in get_expired_timers (locks): {err:?}"),
3957 }
3958 }
3959
3960 tx.commit().await?;
3961
3962 if !expired_timers.is_empty() {
3963 debug!("get_expired_timers found {expired_timers:?}");
3964 }
3965 Ok(expired_timers)
3966 }
3967
3968 async fn get_execution_event(
3969 &self,
3970 execution_id: &ExecutionId,
3971 version: &Version,
3972 ) -> Result<ExecutionEvent, DbErrorRead> {
3973 let mut client_guard = self.client.lock().await;
3974 let tx = client_guard.transaction().await?;
3975
3976 let event = get_execution_event(&tx, execution_id, version.0).await?;
3977
3978 tx.commit().await?;
3979 Ok(event)
3980 }
3981
3982 async fn get_pending_state(
3983 &self,
3984 execution_id: &ExecutionId,
3985 ) -> Result<ExecutionWithState, DbErrorRead> {
3986 let mut client_guard = self.client.lock().await;
3987 let tx = client_guard.transaction().await?;
3988
3989 let combined_state = get_combined_state(&tx, execution_id).await?;
3990
3991 tx.commit().await?;
3992 Ok(combined_state.execution_with_state)
3993 }
3994}
3995
3996#[async_trait]
3997impl DbExternalApi for PostgresConnection {
3998 #[instrument(skip(self))]
3999 async fn get_backtrace(
4000 &self,
4001 execution_id: &ExecutionId,
4002 filter: BacktraceFilter,
4003 ) -> Result<BacktraceInfo, DbErrorRead> {
4004 debug!("get_backtrace");
4005
4006 let mut client_guard = self.client.lock().await;
4007 let tx = client_guard.transaction().await?;
4008
4009 let mut params: Vec<Box<dyn tokio_postgres::types::ToSql + Sync + Send>> = Vec::new();
4010
4011 params.push(Box::new(execution_id.to_string())); let p_execution_id_idx = format!("${}", params.len()); let mut sql = String::new();
4015 write!(
4016 &mut sql,
4017 "SELECT component_id, version_min_including, version_max_excluding, wasm_backtrace \
4018 FROM t_execution_backtrace e INNER JOIN t_wasm_backtrace w ON e.backtrace_hash = w.backtrace_hash \
4019 WHERE execution_id = {p_execution_id_idx}"
4020 )
4021 .unwrap();
4022
4023 match &filter {
4024 BacktraceFilter::Specific(version) => {
4025 params.push(Box::new(i64::from(version.0))); let p_ver_idx = format!("${}", params.len()); write!(
4028 &mut sql,
4029 " AND version_min_including <= {p_ver_idx} AND version_max_excluding > {p_ver_idx}"
4030 )
4031 .unwrap();
4032 }
4033 BacktraceFilter::First => {
4034 sql.push_str(" ORDER BY version_min_including LIMIT 1");
4035 }
4036 BacktraceFilter::Last => {
4037 sql.push_str(" ORDER BY version_min_including DESC LIMIT 1");
4038 }
4039 }
4040
4041 let params_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> =
4042 params.iter().map(|p| p.as_ref() as _).collect();
4043
4044 let row = tx.query_one(&sql, ¶ms_refs).await?;
4045
4046 let component_id: Json<ComponentId> = get(&row, "component_id")?;
4047 let component_id = component_id.0;
4048
4049 let version_min_including =
4050 Version::try_from(get::<i64, _>(&row, "version_min_including")?)?;
4051
4052 let version_max_excluding =
4053 Version::try_from(get::<i64, _>(&row, "version_max_excluding")?)?;
4054
4055 let wasm_backtrace: Json<WasmBacktrace> = get(&row, "wasm_backtrace")?;
4057 let wasm_backtrace = wasm_backtrace.0;
4058
4059 tx.commit().await?;
4060
4061 Ok(BacktraceInfo {
4062 execution_id: execution_id.clone(),
4063 component_id,
4064 version_min_including,
4065 version_max_excluding,
4066 wasm_backtrace,
4067 })
4068 }
4069
4070 #[instrument(skip(self))]
4071 async fn list_executions(
4072 &self,
4073 filter: ListExecutionsFilter,
4074 pagination: ExecutionListPagination,
4075 ) -> Result<Vec<ExecutionWithState>, DbErrorGeneric> {
4076 let mut client_guard = self.client.lock().await;
4077 let tx = client_guard.transaction().await?;
4078
4079 let result = list_executions(&tx, filter, &pagination).await?;
4080
4081 tx.commit().await?;
4082 Ok(result)
4083 }
4084
4085 #[instrument(skip(self))]
4086 async fn list_execution_events(
4087 &self,
4088 execution_id: &ExecutionId,
4089 pagination: Pagination<VersionType>,
4090 include_backtrace_id: bool,
4091 ) -> Result<ListExecutionEventsResponse, DbErrorRead> {
4092 let mut client_guard = self.client.lock().await;
4093 let tx = client_guard.transaction().await?;
4094
4095 let events =
4096 list_execution_events(&tx, execution_id, pagination, include_backtrace_id).await?;
4097 let max_version = get_max_version(&tx, execution_id).await?;
4098
4099 tx.commit().await?;
4100 Ok(ListExecutionEventsResponse {
4101 events,
4102 max_version,
4103 })
4104 }
4105
4106 #[instrument(skip(self))]
4107 async fn list_responses(
4108 &self,
4109 execution_id: &ExecutionId,
4110 pagination: Pagination<u32>,
4111 ) -> Result<ListResponsesResponse, DbErrorRead> {
4112 let mut client_guard = self.client.lock().await;
4113 let tx = client_guard.transaction().await?;
4114
4115 let responses = list_responses(&tx, execution_id, Some(pagination)).await?;
4116 let max_cursor = get_max_response_cursor(&tx, execution_id).await?;
4117
4118 tx.commit().await?;
4119 Ok(ListResponsesResponse {
4120 responses,
4121 max_cursor,
4122 })
4123 }
4124
4125 #[instrument(skip(self))]
4126 async fn list_execution_events_responses(
4127 &self,
4128 execution_id: &ExecutionId,
4129 req_since: &Version,
4130 req_max_length: VersionType,
4131 req_include_backtrace_id: bool,
4132 resp_pagination: Pagination<u32>,
4133 ) -> Result<ExecutionWithStateRequestsResponses, DbErrorRead> {
4134 let mut client_guard = self.client.lock().await;
4135 let tx = client_guard.transaction().await?;
4136
4137 let combined_state = get_combined_state(&tx, execution_id).await?;
4138
4139 let events = list_execution_events(
4140 &tx,
4141 execution_id,
4142 Pagination::NewerThan {
4143 length: req_max_length
4144 .try_into()
4145 .expect("req_max_length fits in u16"),
4146 cursor: req_since.0,
4147 including_cursor: true,
4148 },
4149 req_include_backtrace_id,
4150 )
4151 .await?;
4152
4153 let responses = list_responses(&tx, execution_id, Some(resp_pagination)).await?;
4154 let max_version = get_max_version(&tx, execution_id).await?;
4155 let max_cursor = get_max_response_cursor(&tx, execution_id).await?;
4156
4157 tx.commit().await?;
4158
4159 Ok(ExecutionWithStateRequestsResponses {
4160 execution_with_state: combined_state.execution_with_state,
4161 events,
4162 responses,
4163 max_version,
4164 max_cursor,
4165 })
4166 }
4167
4168 #[instrument(skip(self))]
4169 async fn upgrade_execution_component(
4170 &self,
4171 execution_id: &ExecutionId,
4172 old: &InputContentDigest,
4173 new: &InputContentDigest,
4174 ) -> Result<(), DbErrorWrite> {
4175 let mut client_guard = self.client.lock().await;
4176 let tx = client_guard.transaction().await?;
4177
4178 upgrade_execution_component(&tx, execution_id, old, new).await?;
4179
4180 tx.commit().await?;
4181 Ok(())
4182 }
4183
4184 #[instrument(skip(self))]
4185 async fn list_logs(
4186 &self,
4187 execution_id: &ExecutionId,
4188 filter: LogFilter,
4189 pagination: Pagination<u32>,
4190 ) -> Result<ListLogsResponse, DbErrorRead> {
4191 let mut client_guard = self.client.lock().await;
4192 let tx = client_guard.transaction().await?;
4193 let responses = list_logs_tx(&tx, execution_id, &filter, &pagination).await?;
4194 tx.commit().await?;
4195 Ok(responses)
4196 }
4197
4198 #[instrument(skip(self))]
4199 async fn list_deployment_states(
4200 &self,
4201 current_time: DateTime<Utc>,
4202 pagination: Pagination<Option<DeploymentId>>,
4203 ) -> Result<Vec<DeploymentState>, DbErrorRead> {
4204 let mut client_guard = self.client.lock().await;
4205 let tx = client_guard.transaction().await?;
4206 let deployments = list_deployment_states(&tx, current_time, pagination).await?;
4207 tx.commit().await?;
4208 Ok(deployments)
4209 }
4210
4211 #[instrument(skip(self))]
4212 async fn pause_execution(
4213 &self,
4214 execution_id: &ExecutionId,
4215 paused_at: DateTime<Utc>,
4216 ) -> Result<AppendResponse, DbErrorWrite> {
4217 let mut client_guard = self.client.lock().await;
4218 let tx = client_guard.transaction().await?;
4219
4220 let combined_state = get_combined_state(&tx, execution_id).await?;
4221 let appending_version = combined_state.get_next_version_fail_if_finished()?;
4222 debug!("Pausing with {appending_version}");
4223 let (next_version, _) = append(
4224 &tx,
4225 execution_id,
4226 AppendRequest {
4227 created_at: paused_at,
4228 event: ExecutionRequest::Paused,
4229 },
4230 appending_version,
4231 )
4232 .await?;
4233
4234 tx.commit().await?;
4235 Ok(next_version)
4236 }
4237
4238 #[instrument(skip(self))]
4239 async fn unpause_execution(
4240 &self,
4241 execution_id: &ExecutionId,
4242 unpaused_at: DateTime<Utc>,
4243 ) -> Result<AppendResponse, DbErrorWrite> {
4244 let mut client_guard = self.client.lock().await;
4245 let tx = client_guard.transaction().await?;
4246
4247 let combined_state = get_combined_state(&tx, execution_id).await?;
4248 let appending_version = combined_state.get_next_version_fail_if_finished()?;
4249 debug!("Unpausing with {appending_version}");
4250 let (next_version, _) = append(
4251 &tx,
4252 execution_id,
4253 AppendRequest {
4254 created_at: unpaused_at,
4255 event: ExecutionRequest::Unpaused,
4256 },
4257 appending_version,
4258 )
4259 .await?;
4260
4261 tx.commit().await?;
4262 Ok(next_version)
4263 }
4264}
4265
4266#[async_trait]
4267impl DbPoolCloseable for PostgresPool {
4268 async fn close(&self) {
4269 self.pool.close();
4270 }
4271}
4272
4273#[cfg(feature = "test")]
4274#[async_trait]
4275impl concepts::storage::DbConnectionTest for PostgresConnection {
4276 #[instrument(level = Level::DEBUG, skip(self, response_event), fields(join_set_id = %response_event.join_set_id))]
4277 async fn append_response(
4278 &self,
4279 created_at: DateTime<Utc>,
4280 execution_id: ExecutionId,
4281 response_event: JoinSetResponseEvent,
4282 ) -> Result<(), DbErrorWrite> {
4283 debug!("append_response");
4284 let event = JoinSetResponseEventOuter {
4285 created_at,
4286 event: response_event,
4287 };
4288
4289 let mut client_guard = self.client.lock().await;
4290 let tx = client_guard.transaction().await?;
4291
4292 let notifier = append_response(&tx, &execution_id, event).await?;
4293
4294 tx.commit().await?;
4295 drop(client_guard);
4296
4297 self.notify_all(vec![notifier], created_at);
4298 Ok(())
4299 }
4300}
4301
4302#[cfg(feature = "test")]
4303impl PostgresPool {
4304 pub async fn drop_database(&self) {
4305 let mut cfg = deadpool_postgres::Config::new();
4306 cfg.host = Some(self.config.host.clone());
4307 cfg.user = Some(self.config.user.clone());
4308 cfg.password = Some(self.config.password.clone());
4309 cfg.dbname = Some(ADMIN_DB_NAME.into());
4310 cfg.manager = Some(ManagerConfig {
4311 recycling_method: RecyclingMethod::Fast,
4312 });
4313
4314 let pool = cfg
4315 .create_pool(None, NoTls)
4316 .map_err(|err| {
4317 error!("Cannot create the default pool - {err:?}");
4318 InitializationError
4319 })
4320 .unwrap();
4321
4322 let client = pool
4323 .get()
4324 .await
4325 .map_err(|err| {
4326 error!("Cannot get a connection from the default pool - {err:?}");
4327 InitializationError
4328 })
4329 .unwrap();
4330 for _ in 0..3 {
4331 let res = client
4332 .execute(&format!("DROP DATABASE {}", self.config.db_name), &[])
4333 .await; if res.is_ok() {
4335 debug!("Database '{}' dropped.", self.config.db_name);
4336 return;
4337 }
4338 debug!("Dropping db failed - {res:?}",);
4339 }
4340 warn!("Did not drop database {}", self.config.db_name);
4341 }
4342}