1use crate::postgres_dao::ddl::{ADMIN_DB_NAME, T_METADATA_EXPECTED_SCHEMA_VERSION};
2use async_trait::async_trait;
3use chrono::{DateTime, Utc};
4use concepts::{
5 ComponentId, ComponentRetryConfig, ContentDigest, ExecutionId, FunctionFqn, JoinSetId,
6 StrVariant, SupportedFunctionReturnValue,
7 component_id::{Digest, InputContentDigest},
8 prefixed_ulid::{DelayId, ExecutionIdDerived, ExecutorId, RunId},
9 storage::{
10 AppendBatchResponse, AppendDelayResponseOutcome, AppendEventsToExecution, AppendRequest,
11 AppendResponse, AppendResponseToExecution, BacktraceFilter, BacktraceInfo, CreateRequest,
12 DUMMY_CREATED, DUMMY_HISTORY_EVENT, DbConnection, DbErrorGeneric, DbErrorRead,
13 DbErrorReadWithTimeout, DbErrorWrite, DbErrorWriteNonRetriable, DbExecutor, DbExternalApi,
14 DbPool, DbPoolCloseable, ExecutionEvent, ExecutionListPagination, ExecutionRequest,
15 ExecutionWithState, ExecutionWithStateRequestsResponses, ExpiredDelay, ExpiredLock,
16 ExpiredTimer, HISTORY_EVENT_TYPE_JOIN_NEXT, HistoryEvent, JoinSetRequest, JoinSetResponse,
17 JoinSetResponseEvent, JoinSetResponseEventOuter, ListExecutionsFilter, LockPendingResponse,
18 Locked, LockedBy, LockedExecution, Pagination, PendingState, PendingStateFinished,
19 PendingStateFinishedResultKind, PendingStateLocked, ResponseWithCursor,
20 STATE_BLOCKED_BY_JOIN_SET, STATE_FINISHED, STATE_LOCKED, STATE_PENDING_AT, TimeoutOutcome,
21 Version, VersionType, WasmBacktrace,
22 },
23};
24use deadpool_postgres::{Client, Config, ManagerConfig, Pool, RecyclingMethod};
25use hashbrown::HashMap;
26use std::{collections::VecDeque, pin::Pin, str::FromStr as _, sync::Arc, time::Duration};
27use std::{fmt::Write as _, panic::Location};
28use tokio::sync::{mpsc, oneshot};
29use tokio_postgres::{
30 NoTls, Row, Transaction,
31 types::{FromSql, Json, ToSql},
32};
33use tracing::{Level, debug, error, info, instrument, trace, warn};
34use tracing_error::SpanTrace;
35
36fn get<'a, T: FromSql<'a>>(row: &'a Row, name: &str) -> Result<T, DbErrorGeneric> {
37 row.try_get(name)
38 .map_err(|err| consistency_db_err(format!("Failed to retrieve column '{name}': {err:?}")))
39}
40
41mod ddl {
42 use concepts::storage::HISTORY_EVENT_TYPE_JOIN_NEXT;
43
44 pub const ADMIN_DB_NAME: &str = "postgres";
45
46 pub const T_METADATA_EXPECTED_SCHEMA_VERSION: i32 = 1;
47
48 pub const CREATE_TABLE_T_METADATA: &str = r"
50CREATE TABLE IF NOT EXISTS t_metadata (
51 id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
52 schema_version INTEGER NOT NULL,
53 created_at TIMESTAMPTZ NOT NULL
54);
55";
56
57 pub const CREATE_TABLE_T_EXECUTION_LOG: &str = r"
59CREATE TABLE IF NOT EXISTS t_execution_log (
60 execution_id TEXT NOT NULL,
61 created_at TIMESTAMPTZ NOT NULL,
62 json_value JSON NOT NULL,
63 version BIGINT NOT NULL CHECK (version >= 0),
64 variant TEXT NOT NULL,
65 join_set_id TEXT,
66 history_event_type TEXT GENERATED ALWAYS AS (json_value #>> '{history_event,event,type}') STORED,
67 PRIMARY KEY (execution_id, version)
68);
69";
70
71 pub const CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_VERSION: &str = r"
73CREATE INDEX IF NOT EXISTS idx_t_execution_log_execution_id_version ON t_execution_log (execution_id, version);
74";
75
76 pub const CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_VARIANT: &str = r"
77CREATE INDEX IF NOT EXISTS idx_t_execution_log_execution_id_variant ON t_execution_log (execution_id, variant);
78";
79
80 pub const CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_JOIN_SET: &str = const_format::formatcp!(
81 "CREATE INDEX IF NOT EXISTS idx_t_execution_log_execution_id_join_set ON t_execution_log (execution_id, join_set_id, history_event_type) WHERE history_event_type='{}';",
82 HISTORY_EVENT_TYPE_JOIN_NEXT
83 );
84
85 pub const CREATE_TABLE_T_JOIN_SET_RESPONSE: &str = r"
87CREATE TABLE IF NOT EXISTS t_join_set_response (
88 id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
89 created_at TIMESTAMPTZ NOT NULL,
90 execution_id TEXT NOT NULL,
91 join_set_id TEXT NOT NULL,
92
93 delay_id TEXT,
94 delay_success BOOLEAN,
95
96 child_execution_id TEXT,
97 finished_version BIGINT CHECK (finished_version >= 0),
98
99 UNIQUE (execution_id, join_set_id, delay_id, child_execution_id)
100);
101";
102
103 pub const CREATE_INDEX_IDX_T_JOIN_SET_RESPONSE_EXECUTION_ID_ID: &str = r"
105CREATE INDEX IF NOT EXISTS idx_t_join_set_response_execution_id_id ON t_join_set_response (execution_id, id);
106";
107
108 pub const CREATE_INDEX_IDX_JOIN_SET_RESPONSE_UNIQUE_CHILD_ID: &str = r"
109CREATE UNIQUE INDEX IF NOT EXISTS idx_join_set_response_unique_child_id
110ON t_join_set_response (child_execution_id) WHERE child_execution_id IS NOT NULL;
111";
112
113 pub const CREATE_INDEX_IDX_JOIN_SET_RESPONSE_UNIQUE_DELAY_ID: &str = r"
114CREATE UNIQUE INDEX IF NOT EXISTS idx_join_set_response_unique_delay_id
115ON t_join_set_response (delay_id) WHERE delay_id IS NOT NULL;
116";
117
118 pub const CREATE_TABLE_T_STATE: &str = r"
120CREATE TABLE IF NOT EXISTS t_state (
121 execution_id TEXT NOT NULL,
122 is_top_level BOOLEAN NOT NULL,
123 corresponding_version BIGINT NOT NULL CHECK (corresponding_version >= 0),
124 ffqn TEXT NOT NULL,
125 created_at TIMESTAMPTZ NOT NULL,
126 component_id_input_digest BYTEA NOT NULL,
127 first_scheduled_at TIMESTAMPTZ NOT NULL,
128
129 pending_expires_finished TIMESTAMPTZ NOT NULL,
130 state TEXT NOT NULL,
131 updated_at TIMESTAMPTZ NOT NULL,
132 intermittent_event_count BIGINT NOT NULL CHECK (intermittent_event_count >=0),
133
134 max_retries BIGINT CHECK (max_retries >= 0),
135 retry_exp_backoff_millis BIGINT CHECK (retry_exp_backoff_millis >= 0),
136 last_lock_version BIGINT CHECK (last_lock_version >= 0),
137 executor_id TEXT,
138 run_id TEXT,
139
140 join_set_id TEXT,
141 join_set_closing BOOLEAN,
142
143 result_kind JSONB,
144
145 PRIMARY KEY (execution_id)
146);
147";
148
149 pub const IDX_T_STATE_LOCK_PENDING: &str = r"
151CREATE INDEX IF NOT EXISTS idx_t_state_lock_pending ON t_state (state, pending_expires_finished, ffqn);
152";
153
154 pub const IDX_T_STATE_EXPIRED_TIMERS: &str = r"
155CREATE INDEX IF NOT EXISTS idx_t_state_expired_timers ON t_state (pending_expires_finished) WHERE executor_id IS NOT NULL;
156";
157
158 pub const IDX_T_STATE_EXECUTION_ID_IS_TOP_LEVEL: &str = r"
159CREATE INDEX IF NOT EXISTS idx_t_state_execution_id_is_root ON t_state (execution_id, is_top_level);
160";
161
162 pub const IDX_T_STATE_FFQN: &str = r"
163CREATE INDEX IF NOT EXISTS idx_t_state_ffqn ON t_state (ffqn);
164";
165
166 pub const IDX_T_STATE_CREATED_AT: &str = r"
167CREATE INDEX IF NOT EXISTS idx_t_state_created_at ON t_state (created_at);
168";
169
170 pub const CREATE_TABLE_T_DELAY: &str = r"
172CREATE TABLE IF NOT EXISTS t_delay (
173 execution_id TEXT NOT NULL,
174 join_set_id TEXT NOT NULL,
175 delay_id TEXT NOT NULL,
176 expires_at TIMESTAMPTZ NOT NULL,
177 PRIMARY KEY (execution_id, join_set_id, delay_id)
178);
179";
180
181 pub const CREATE_TABLE_T_BACKTRACE: &str = r"
183CREATE TABLE IF NOT EXISTS t_backtrace (
184 execution_id TEXT NOT NULL,
185 component_id JSONB NOT NULL,
186 version_min_including BIGINT NOT NULL CHECK (version_min_including >= 0),
187 version_max_excluding BIGINT NOT NULL CHECK (version_max_excluding >= 0),
188 wasm_backtrace JSONB NOT NULL,
189 PRIMARY KEY (execution_id, version_min_including, version_max_excluding)
190);
191";
192
193 pub const IDX_T_BACKTRACE_EXECUTION_ID_VERSION: &str = r"
194CREATE INDEX IF NOT EXISTS idx_t_backtrace_execution_id_version ON t_backtrace (execution_id, version_min_including, version_max_excluding);
195";
196}
197
198#[derive(derive_more::Debug, Clone)]
199pub struct PostgresConfig {
200 pub host: String,
201 pub user: String,
202 #[debug(skip)]
203 pub password: String,
204 pub db_name: String,
205}
206
207#[derive(Debug, thiserror::Error)]
208#[error("initialization error")]
209pub struct InitializationError;
210
211async fn create_database(
212 config: &PostgresConfig,
213) -> Result<DbInitialzationOutcome, InitializationError> {
214 let mut cfg = Config::new();
215 cfg.host = Some(config.host.clone());
216 cfg.user = Some(config.user.clone());
217 cfg.password = Some(config.password.clone());
218 cfg.dbname = Some(ADMIN_DB_NAME.into());
219 cfg.manager = Some(ManagerConfig {
220 recycling_method: RecyclingMethod::Fast,
221 });
222
223 let pool = cfg.create_pool(None, NoTls).map_err(|err| {
224 error!("Cannot create the default pool - {err:?}");
225 InitializationError
226 })?;
227
228 let client = pool.get().await.map_err(|err| {
229 error!("Cannot get a connection from the default pool - {err:?}");
230 InitializationError
231 })?;
232
233 let row = client
234 .query_opt(
235 &format!(
236 "SELECT 1 FROM pg_database WHERE datname = '{}'",
237 config.db_name
238 ),
239 &[],
240 )
241 .await
242 .map_err(|err| {
243 error!("Cannot select from the default database - {err:?}");
244 InitializationError
245 })?;
246
247 let outcome = if row.is_none() {
248 client
249 .execute(&format!("CREATE DATABASE {}", config.db_name), &[])
250 .await
251 .map_err(|err| {
252 error!("Cannot create the database - {err:?}");
253 InitializationError
254 })?;
255
256 DbInitialzationOutcome::Created
257 } else {
258 DbInitialzationOutcome::Existing
259 };
260 match outcome {
261 DbInitialzationOutcome::Created => info!("Database '{}' created.", config.db_name),
262 DbInitialzationOutcome::Existing => info!("Database '{}' exists.", config.db_name),
263 }
264 Ok(outcome)
265}
266
267type ResponseSubscribers =
269 Arc<std::sync::Mutex<HashMap<ExecutionId, (oneshot::Sender<JoinSetResponseEventOuter>, u64)>>>;
270type PendingSubscribers = Arc<std::sync::Mutex<PendingFfqnSubscribersHolder>>;
271type ExecutionFinishedSubscribers = std::sync::Mutex<
272 HashMap<ExecutionId, HashMap<u64, oneshot::Sender<SupportedFunctionReturnValue>>>,
273>;
274
275pub struct PostgresPool {
276 pool: Pool,
277 response_subscribers: ResponseSubscribers,
278 pending_subscribers: PendingSubscribers,
279 execution_finished_subscribers: Arc<ExecutionFinishedSubscribers>,
280 #[allow(dead_code)] config: PostgresConfig,
282}
283
284#[async_trait]
285impl DbPool for PostgresPool {
286 async fn db_exec_conn(&self) -> Result<Box<dyn DbExecutor>, DbErrorGeneric> {
287 let client = self.pool.get().await?;
288
289 Ok(Box::new(PostgresConnection {
290 client: tokio::sync::Mutex::new(client),
291 response_subscribers: self.response_subscribers.clone(),
292 pending_subscribers: self.pending_subscribers.clone(),
293 execution_finished_subscribers: self.execution_finished_subscribers.clone(),
294 }))
295 }
296
297 async fn connection(&self) -> Result<Box<dyn DbConnection>, DbErrorGeneric> {
298 let client = self.pool.get().await?;
299
300 Ok(Box::new(PostgresConnection {
301 client: tokio::sync::Mutex::new(client),
302 response_subscribers: self.response_subscribers.clone(),
303 pending_subscribers: self.pending_subscribers.clone(),
304 execution_finished_subscribers: self.execution_finished_subscribers.clone(),
305 }))
306 }
307
308 async fn external_api_conn(&self) -> Result<Box<dyn DbExternalApi>, DbErrorGeneric> {
309 let client = self.pool.get().await?;
310
311 Ok(Box::new(PostgresConnection {
312 client: tokio::sync::Mutex::new(client),
313 response_subscribers: self.response_subscribers.clone(),
314 pending_subscribers: self.pending_subscribers.clone(),
315 execution_finished_subscribers: self.execution_finished_subscribers.clone(),
316 }))
317 }
318
319 #[cfg(feature = "test")]
320 async fn connection_test(
321 &self,
322 ) -> Result<Box<dyn concepts::storage::DbConnectionTest>, DbErrorGeneric> {
323 let client = self.pool.get().await?;
324
325 Ok(Box::new(PostgresConnection {
326 client: tokio::sync::Mutex::new(client),
327 response_subscribers: self.response_subscribers.clone(),
328 pending_subscribers: self.pending_subscribers.clone(),
329 execution_finished_subscribers: self.execution_finished_subscribers.clone(),
330 }))
331 }
332}
333
334pub struct PostgresConnection {
335 client: tokio::sync::Mutex<Client>, response_subscribers: ResponseSubscribers,
337 pending_subscribers: PendingSubscribers,
338 execution_finished_subscribers: Arc<ExecutionFinishedSubscribers>,
339}
340
341#[derive(Default)]
342struct PendingFfqnSubscribersHolder {
343 by_ffqns: HashMap<FunctionFqn, (mpsc::Sender<()>, u64)>,
344 by_component: HashMap<InputContentDigest , (mpsc::Sender<()>, u64)>,
345}
346impl PendingFfqnSubscribersHolder {
347 fn notify(&self, notifier: &NotifierPendingAt) {
348 if let Some((subscription, _)) = self.by_ffqns.get(¬ifier.ffqn) {
349 debug!("Notifying pending subscriber by ffqn");
350 let _ = subscription.try_send(());
352 }
353 if let Some((subscription, _)) = self.by_component.get(¬ifier.component_input_digest) {
354 debug!("Notifying pending subscriber by component");
355 let _ = subscription.try_send(());
357 }
358 }
359
360 fn insert_ffqn(&mut self, ffqn: FunctionFqn, value: (mpsc::Sender<()>, u64)) {
361 self.by_ffqns.insert(ffqn, value);
362 }
363
364 fn remove_ffqn(&mut self, ffqn: &FunctionFqn) -> Option<(mpsc::Sender<()>, u64)> {
365 self.by_ffqns.remove(ffqn)
366 }
367
368 fn insert_by_component(
369 &mut self,
370 input_content_digest: InputContentDigest,
371 value: (mpsc::Sender<()>, u64),
372 ) {
373 self.by_component.insert(input_content_digest, value);
374 }
375
376 fn remove_by_component(
377 &mut self,
378 input_content_digest: &InputContentDigest,
379 ) -> Option<(mpsc::Sender<()>, u64)> {
380 self.by_component.remove(input_content_digest)
381 }
382}
383
384#[derive(Debug, Clone, Copy, PartialEq, Eq)]
385pub enum ProvisionPolicy {
386 Never,
387 Auto,
389}
390
391#[derive(Debug, Clone, Copy, PartialEq, Eq)]
392pub enum DbInitialzationOutcome {
393 Created,
394 Existing,
395}
396
397impl PostgresPool {
398 #[instrument(skip_all, name = "postgres_new")]
399 pub async fn new(
400 config: PostgresConfig,
401 provision_policy: ProvisionPolicy,
402 ) -> Result<PostgresPool, InitializationError> {
403 Self::new_with_outcome(config, provision_policy)
404 .await
405 .map(|(db, _)| db)
406 }
407
408 pub async fn new_with_outcome(
409 config: PostgresConfig,
410 provision_policy: ProvisionPolicy,
411 ) -> Result<(PostgresPool, DbInitialzationOutcome), InitializationError> {
412 let outcome = if provision_policy == ProvisionPolicy::Auto {
413 create_database(&config).await?
414 } else {
415 DbInitialzationOutcome::Existing
416 };
417 let mut cfg = Config::new();
418 cfg.host = Some(config.host.clone());
419 cfg.user = Some(config.user.clone());
420 cfg.password = Some(config.password.clone());
421 cfg.dbname = Some(config.db_name.clone());
422 cfg.manager = Some(ManagerConfig {
423 recycling_method: RecyclingMethod::Fast,
424 });
425
426 let pool = cfg.create_pool(None, NoTls).map_err(|err| {
427 error!("Cannot create the database pool - {err:?}");
428 InitializationError
429 })?;
430 let client = pool.get().await.map_err(|err| {
431 error!("Cannot get a connection from the database pool - {err:?}");
432 InitializationError
433 })?;
434
435 let statements = vec![
436 ddl::CREATE_TABLE_T_METADATA,
437 ddl::CREATE_TABLE_T_EXECUTION_LOG,
438 ddl::CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_VERSION,
439 ddl::CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_VARIANT,
440 ddl::CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_JOIN_SET,
441 ddl::CREATE_TABLE_T_JOIN_SET_RESPONSE,
442 ddl::CREATE_INDEX_IDX_T_JOIN_SET_RESPONSE_EXECUTION_ID_ID,
443 ddl::CREATE_INDEX_IDX_JOIN_SET_RESPONSE_UNIQUE_CHILD_ID,
444 ddl::CREATE_INDEX_IDX_JOIN_SET_RESPONSE_UNIQUE_DELAY_ID,
445 ddl::CREATE_TABLE_T_STATE,
446 ddl::IDX_T_STATE_LOCK_PENDING,
447 ddl::IDX_T_STATE_EXPIRED_TIMERS,
448 ddl::IDX_T_STATE_EXECUTION_ID_IS_TOP_LEVEL,
449 ddl::IDX_T_STATE_FFQN,
450 ddl::IDX_T_STATE_CREATED_AT,
451 ddl::CREATE_TABLE_T_DELAY,
452 ddl::CREATE_TABLE_T_BACKTRACE,
453 ddl::IDX_T_BACKTRACE_EXECUTION_ID_VERSION,
454 ];
455
456 let batch_sql = statements.join("\n");
458 client.batch_execute(&batch_sql).await.map_err(|err| {
459 error!("Cannot run the DDL import - {err:?}");
460 InitializationError
461 })?;
462
463 let row = client
464 .query_opt(
465 "SELECT schema_version FROM t_metadata ORDER BY id DESC LIMIT 1",
466 &[],
467 )
468 .await
469 .map_err(|err| {
470 error!("Cannot select schema version - {err:?}");
471 InitializationError
472 })?;
473
474 let actual_version = match row {
476 Some(r) => Some(r.try_get::<_, i32>("schema_version").map_err(|e| {
477 error!("Failed to get schema_version column: {e}");
478 InitializationError
479 })?),
480 None => None,
481 };
482
483 match actual_version {
484 None => {
485 client
486 .execute(
487 "INSERT INTO t_metadata (schema_version, created_at) VALUES ($1, $2)",
488 &[&(T_METADATA_EXPECTED_SCHEMA_VERSION), &Utc::now()],
489 )
490 .await
491 .map_err(|err| {
492 error!("Cannot insert schema version - {err:?}");
493 InitializationError
494 })?;
495 }
496 Some(actual_version) => {
497 if (actual_version) != T_METADATA_EXPECTED_SCHEMA_VERSION {
499 error!(
500 "Wrong schema version, expected {T_METADATA_EXPECTED_SCHEMA_VERSION}, got {actual_version}"
501 );
502 return Err(InitializationError);
503 }
504 }
505 }
506
507 debug!("Database schema initialized.");
508
509 Ok((
510 PostgresPool {
511 pool,
512 execution_finished_subscribers: Arc::default(),
513 pending_subscribers: Arc::default(),
514 response_subscribers: Arc::default(),
515 config,
516 },
517 outcome,
518 ))
519 }
520}
521
522#[track_caller]
523fn consistency_db_err(reason: impl Into<StrVariant>) -> DbErrorGeneric {
524 DbErrorGeneric::Uncategorized {
525 reason: reason.into(),
526 context: SpanTrace::capture(),
527 source: None,
528 loc: Location::caller(),
529 }
530}
531
532#[derive(Debug)]
533struct CombinedStateDTO {
534 execution_id: ExecutionId,
535 state: String,
536 ffqn: FunctionFqn,
537 component_digest: InputContentDigest,
538 created_at: DateTime<Utc>,
539 first_scheduled_at: DateTime<Utc>,
540 pending_expires_finished: DateTime<Utc>,
541 last_lock_version: Option<Version>,
543 executor_id: Option<ExecutorId>,
544 run_id: Option<RunId>,
545 join_set_id: Option<JoinSetId>,
547 join_set_closing: Option<bool>,
548 result_kind: Option<PendingStateFinishedResultKind>,
550}
551#[derive(Debug)]
552struct CombinedState {
553 execution_with_state: ExecutionWithState,
554 corresponding_version: Version,
555}
556
557impl CombinedState {
558 fn new(dto: CombinedStateDTO, corresponding_version: Version) -> Result<Self, DbErrorGeneric> {
559 let execution_with_state = match dto {
560 CombinedStateDTO {
562 execution_id,
563 created_at,
564 first_scheduled_at,
565 state,
566 ffqn,
567 component_digest,
568 pending_expires_finished: scheduled_at,
569 last_lock_version: None,
570 executor_id: None,
571 run_id: None,
572 join_set_id: None,
573 join_set_closing: None,
574 result_kind: None,
575 } if state == STATE_PENDING_AT => ExecutionWithState {
576 component_digest,
577 execution_id,
578 ffqn,
579 created_at,
580 first_scheduled_at,
581 pending_state: PendingState::PendingAt {
582 scheduled_at,
583 last_lock: None,
584 },
585 },
586 CombinedStateDTO {
588 execution_id,
589 created_at,
590 first_scheduled_at,
591 state,
592 ffqn,
593 component_digest,
594 pending_expires_finished: scheduled_at,
595 last_lock_version: None,
596 executor_id: Some(executor_id),
597 run_id: Some(run_id),
598 join_set_id: None,
599 join_set_closing: None,
600 result_kind: None,
601 } if state == STATE_PENDING_AT => ExecutionWithState {
602 component_digest,
603 execution_id,
604 ffqn,
605 created_at,
606 first_scheduled_at,
607 pending_state: PendingState::PendingAt {
608 scheduled_at,
609 last_lock: Some(LockedBy {
610 executor_id,
611 run_id,
612 }),
613 },
614 },
615 CombinedStateDTO {
616 execution_id,
617 created_at,
618 first_scheduled_at,
619 state,
620 ffqn,
621 component_digest,
622 pending_expires_finished: lock_expires_at,
623 last_lock_version: Some(_),
624 executor_id: Some(executor_id),
625 run_id: Some(run_id),
626 join_set_id: None,
627 join_set_closing: None,
628 result_kind: None,
629 } if state == STATE_LOCKED => ExecutionWithState {
630 component_digest,
631 execution_id,
632 ffqn,
633 created_at,
634 first_scheduled_at,
635 pending_state: PendingState::Locked(PendingStateLocked {
636 locked_by: LockedBy {
637 executor_id,
638 run_id,
639 },
640 lock_expires_at,
641 }),
642 },
643 CombinedStateDTO {
644 execution_id,
645 created_at,
646 first_scheduled_at,
647 state,
648 ffqn,
649 component_digest,
650 pending_expires_finished: lock_expires_at,
651 last_lock_version: None,
652 executor_id: _,
653 run_id: _,
654 join_set_id: Some(join_set_id),
655 join_set_closing: Some(join_set_closing),
656 result_kind: None,
657 } if state == STATE_BLOCKED_BY_JOIN_SET => ExecutionWithState {
658 component_digest,
659 execution_id,
660 ffqn,
661 created_at,
662 first_scheduled_at,
663 pending_state: PendingState::BlockedByJoinSet {
664 join_set_id: join_set_id.clone(),
665 closing: join_set_closing,
666 lock_expires_at,
667 },
668 },
669 CombinedStateDTO {
670 execution_id,
671 created_at,
672 first_scheduled_at,
673 state,
674 ffqn,
675 component_digest,
676 pending_expires_finished: finished_at,
677 last_lock_version: None,
678 executor_id: None,
679 run_id: None,
680 join_set_id: None,
681 join_set_closing: None,
682 result_kind: Some(result_kind),
683 } if state == STATE_FINISHED => ExecutionWithState {
684 component_digest,
685 execution_id,
686 ffqn,
687 created_at,
688 first_scheduled_at,
689 pending_state: PendingState::Finished {
690 finished: PendingStateFinished {
691 finished_at,
692 version: corresponding_version.0,
693 result_kind,
694 },
695 },
696 },
697 _ => {
698 error!("Cannot deserialize pending state from {dto:?}");
699 return Err(consistency_db_err("invalid `t_state`"));
700 }
701 };
702 Ok(Self {
703 execution_with_state,
704 corresponding_version,
705 })
706 }
707
708 fn get_next_version_assert_not_finished(&self) -> Version {
709 assert!(!self.execution_with_state.pending_state.is_finished());
710 self.corresponding_version.increment()
711 }
712
713 #[cfg(feature = "test")]
714 fn get_next_version_or_finished(&self) -> Version {
715 if self.execution_with_state.pending_state.is_finished() {
716 self.corresponding_version.clone()
717 } else {
718 self.corresponding_version.increment()
719 }
720 }
721}
722
723#[derive(Debug)]
724struct NotifierPendingAt {
725 scheduled_at: DateTime<Utc>,
726 ffqn: FunctionFqn,
727 component_input_digest: InputContentDigest,
728}
729
730#[derive(Debug)]
731struct NotifierExecutionFinished {
732 execution_id: ExecutionId,
733 retval: SupportedFunctionReturnValue,
734}
735
736#[derive(Debug, Default)]
737struct AppendNotifier {
738 pending_at: Option<NotifierPendingAt>,
739 execution_finished: Option<NotifierExecutionFinished>,
740 response: Option<(ExecutionId, JoinSetResponseEventOuter)>,
741}
742
743#[derive(Debug, Clone)]
744struct DelayReq {
745 join_set_id: JoinSetId,
746 delay_id: DelayId,
747 expires_at: DateTime<Utc>,
748}
749
750async fn fetch_created_event(
751 tx: &Transaction<'_>,
752 execution_id: &ExecutionId,
753) -> Result<CreateRequest, DbErrorRead> {
754 let stmt = "SELECT created_at, json_value FROM t_execution_log WHERE \
755 execution_id = $1 AND version = 0";
756
757 let row = tx.query_one(stmt, &[&execution_id.to_string()]).await?;
758
759 let created_at = get(&row, "created_at")?;
760 let event: Json<ExecutionRequest> = get(&row, "json_value")?;
761 let event = event.0;
762
763 if let ExecutionRequest::Created {
764 ffqn,
765 params,
766 parent,
767 scheduled_at,
768 component_id,
769 metadata,
770 scheduled_by,
771 } = event
772 {
773 Ok(CreateRequest {
774 created_at,
775 execution_id: execution_id.clone(),
776 ffqn,
777 params,
778 parent,
779 scheduled_at,
780 component_id,
781 metadata,
782 scheduled_by,
783 })
784 } else {
785 error!("Row with version=0 must be a `Created` event - {event:?}");
786 Err(consistency_db_err("expected `Created` event").into())
787 }
788}
789
790fn check_expected_next_and_appending_version(
791 expected_version: &Version,
792 appending_version: &Version,
793) -> Result<(), DbErrorWrite> {
794 if *expected_version != *appending_version {
795 debug!(
796 "Version conflict - expected: {expected_version:?}, appending: {appending_version:?}"
797 );
798 return Err(DbErrorWrite::NonRetriable(
799 DbErrorWriteNonRetriable::VersionConflict {
800 expected: expected_version.clone(),
801 requested: appending_version.clone(),
802 },
803 ));
804 }
805 Ok(())
806}
807
808#[instrument(level = Level::TRACE, skip_all, fields(execution_id = %req.execution_id))]
809async fn create_inner(
810 tx: &Transaction<'_>,
811 req: CreateRequest,
812) -> Result<(AppendResponse, AppendNotifier), DbErrorWrite> {
813 trace!("create_inner");
814
815 let version = Version::default();
816 let execution_id = req.execution_id.clone();
817 let execution_id_str = execution_id.to_string();
818 let ffqn = req.ffqn.clone();
819 let created_at = req.created_at;
820 let scheduled_at = req.scheduled_at;
821 let component_id = req.component_id.clone();
822
823 let event = ExecutionRequest::from(req);
824 let event = Json(event);
825
826 tx.execute(
827 "INSERT INTO t_execution_log (
828 execution_id, created_at, version, json_value, variant, join_set_id
829 ) VALUES ($1, $2, $3, $4, $5, $6)",
830 &[
831 &execution_id_str,
832 &created_at,
833 &i64::from(version.0), &event,
835 &event.0.variant(),
836 &event.0.join_set_id().map(std::string::ToString::to_string),
837 ],
838 )
839 .await?;
840
841 let pending_at = {
842 debug!("Creating with `Pending(`{scheduled_at:?}`)");
843
844 tx.execute(
845 r"
846 INSERT INTO t_state (
847 execution_id,
848 is_top_level,
849 corresponding_version,
850 pending_expires_finished,
851 ffqn,
852 state,
853 created_at,
854 component_id_input_digest,
855 updated_at,
856 first_scheduled_at,
857 intermittent_event_count
858 ) VALUES (
859 $1, $2, $3, $4, $5, $6, $7, $8, CURRENT_TIMESTAMP, $9, 0
860 )",
861 &[
862 &execution_id_str,
863 &execution_id.is_top_level(),
864 &i64::from(version.0),
865 &scheduled_at,
866 &ffqn.to_string(),
867 &STATE_PENDING_AT,
868 &created_at,
869 &component_id.input_digest.as_slice(),
870 &scheduled_at,
871 ],
872 )
873 .await?;
874
875 AppendNotifier {
876 pending_at: Some(NotifierPendingAt {
877 scheduled_at,
878 ffqn,
879 component_input_digest: component_id.input_digest,
880 }),
881 execution_finished: None,
882 response: None,
883 }
884 };
885
886 let next_version = Version::new(version.0 + 1);
887 Ok((next_version, pending_at))
888}
889
890#[instrument(level = Level::DEBUG, skip_all, fields(%execution_id, %scheduled_at, %corresponding_version))]
891async fn update_state_pending_after_response_appended(
892 tx: &Transaction<'_>,
893 execution_id: &ExecutionId,
894 scheduled_at: DateTime<Utc>, corresponding_version: &Version, component_input_digest: InputContentDigest,
897) -> Result<AppendNotifier, DbErrorWrite> {
898 debug!("Setting t_state to Pending(`{scheduled_at:?}`) after response appended");
899
900 let execution_id_str = execution_id.to_string();
902 let version = i64::from(corresponding_version.0);
903
904 let updated = tx
905 .execute(
906 r"
907 UPDATE t_state
908 SET
909 corresponding_version = $1,
910 pending_expires_finished = $2,
911 state = $3,
912 updated_at = CURRENT_TIMESTAMP,
913
914 last_lock_version = NULL,
915
916 join_set_id = NULL,
917 join_set_closing = NULL,
918
919 result_kind = NULL
920 WHERE execution_id = $4
921 ",
922 &[
923 &version, &scheduled_at, &STATE_PENDING_AT, &execution_id_str, ],
928 )
929 .await?;
930
931 if updated == 0 {
932 return Err(DbErrorWrite::NotFound);
933 }
934
935 Ok(AppendNotifier {
936 pending_at: Some(NotifierPendingAt {
937 scheduled_at,
938 ffqn: fetch_created_event(tx, execution_id).await?.ffqn,
939 component_input_digest,
940 }),
941 execution_finished: None,
942 response: None,
943 })
944}
945
946#[instrument(level = Level::DEBUG, skip_all, fields(%execution_id, %scheduled_at, %appending_version))]
947async fn update_state_pending_after_event_appended(
948 tx: &Transaction<'_>,
949 execution_id: &ExecutionId,
950 appending_version: &Version,
951 scheduled_at: DateTime<Utc>,
952 intermittent_failure: bool,
953 component_input_digest: InputContentDigest,
954) -> Result<(AppendResponse, AppendNotifier), DbErrorWrite> {
955 debug!("Setting t_state to Pending(`{scheduled_at:?}`) after event appended");
956
957 let intermittent_delta = i64::from(intermittent_failure); let updated = tx
960 .execute(
961 r"
962 UPDATE t_state
963 SET
964 corresponding_version = $1,
965 pending_expires_finished = $2,
966 state = $3,
967 updated_at = CURRENT_TIMESTAMP,
968 intermittent_event_count = intermittent_event_count + $4,
969
970 last_lock_version = NULL,
971
972 join_set_id = NULL,
973 join_set_closing = NULL,
974
975 result_kind = NULL
976 WHERE execution_id = $5;
977 ",
978 &[
979 &i64::from(appending_version.0), &scheduled_at, &STATE_PENDING_AT, &intermittent_delta, &execution_id.to_string(), ],
985 )
986 .await?;
987
988 if updated != 1 {
989 return Err(DbErrorWrite::NotFound);
990 }
991
992 Ok((
993 appending_version.increment(),
994 AppendNotifier {
995 pending_at: Some(NotifierPendingAt {
996 scheduled_at,
997 ffqn: fetch_created_event(tx, execution_id).await?.ffqn,
998 component_input_digest,
999 }),
1000 execution_finished: None,
1001 response: None,
1002 },
1003 ))
1004}
1005
1006async fn update_state_locked_get_intermittent_event_count(
1007 tx: &Transaction<'_>,
1008 execution_id: &ExecutionId,
1009 executor_id: ExecutorId,
1010 run_id: RunId,
1011 lock_expires_at: DateTime<Utc>,
1012 appending_version: &Version,
1013 retry_config: ComponentRetryConfig,
1014) -> Result<u32, DbErrorWrite> {
1015 debug!("Setting t_state to Locked(`{lock_expires_at:?}`)");
1016 let backoff_millis =
1017 i64::try_from(retry_config.retry_exp_backoff.as_millis()).map_err(|err| {
1018 DbErrorGeneric::Uncategorized {
1020 reason: "backoff too big".into(),
1021 context: SpanTrace::capture(),
1022 source: Some(Arc::new(err)),
1023 loc: Location::caller(),
1024 }
1025 })?;
1026
1027 let execution_id_str = execution_id.to_string();
1028
1029 let updated = tx
1030 .execute(
1031 r"
1032 UPDATE t_state
1033 SET
1034 corresponding_version = $1,
1035 pending_expires_finished = $2,
1036 state = $3,
1037 updated_at = CURRENT_TIMESTAMP,
1038
1039 max_retries = $4,
1040 retry_exp_backoff_millis = $5,
1041 last_lock_version = $1, -- appending_version again
1042 executor_id = $6,
1043 run_id = $7,
1044
1045 join_set_id = NULL,
1046 join_set_closing = NULL,
1047
1048 result_kind = NULL
1049 WHERE execution_id = $8
1050 ",
1051 &[
1052 &i64::from(appending_version.0), &lock_expires_at, &STATE_LOCKED, &retry_config.max_retries.map(i64::from), &backoff_millis, &executor_id.to_string(), &run_id.to_string(), &execution_id_str, ],
1061 )
1062 .await?;
1063
1064 if updated != 1 {
1065 return Err(DbErrorWrite::NotFound);
1066 }
1067
1068 let row = tx
1070 .query_one(
1071 "SELECT intermittent_event_count FROM t_state WHERE execution_id = $1",
1072 &[&execution_id_str],
1073 )
1074 .await
1075 .map_err(DbErrorGeneric::from)?;
1076
1077 let count: i64 = get(&row, "intermittent_event_count")?; let count = u32::try_from(count)
1079 .map_err(|_| consistency_db_err("`intermittent_event_count` must not be negative"))?;
1080 Ok(count)
1081}
1082
1083async fn update_state_blocked(
1084 tx: &Transaction<'_>,
1085 execution_id: &ExecutionId,
1086 appending_version: &Version,
1087 join_set_id: &JoinSetId,
1088 lock_expires_at: DateTime<Utc>,
1089 join_set_closing: bool,
1090) -> Result<AppendResponse, DbErrorWrite> {
1091 debug!("Setting t_state to BlockedByJoinSet(`{join_set_id}`)");
1092
1093 let updated = tx
1094 .execute(
1095 r"
1096 UPDATE t_state
1097 SET
1098 corresponding_version = $1,
1099 pending_expires_finished = $2,
1100 state = $3,
1101 updated_at = CURRENT_TIMESTAMP,
1102
1103 last_lock_version = NULL,
1104
1105 join_set_id = $4,
1106 join_set_closing = $5,
1107
1108 result_kind = NULL
1109 WHERE execution_id = $6
1110 ",
1111 &[
1112 &i64::from(appending_version.0), &lock_expires_at, &STATE_BLOCKED_BY_JOIN_SET, &join_set_id.to_string(), &join_set_closing, &execution_id.to_string(), ],
1119 )
1120 .await?;
1121
1122 if updated != 1 {
1123 return Err(DbErrorWrite::NotFound);
1124 }
1125 Ok(appending_version.increment())
1126}
1127
1128async fn update_state_finished(
1129 tx: &Transaction<'_>,
1130 execution_id: &ExecutionId,
1131 appending_version: &Version,
1132 finished_at: DateTime<Utc>,
1133 result_kind: PendingStateFinishedResultKind,
1134) -> Result<(), DbErrorWrite> {
1135 debug!("Setting t_state to Finished");
1136
1137 let result_kind_json = Json(result_kind);
1138
1139 let updated = tx
1140 .execute(
1141 r"
1142 UPDATE t_state
1143 SET
1144 corresponding_version = $1,
1145 pending_expires_finished = $2,
1146 state = $3,
1147 updated_at = CURRENT_TIMESTAMP,
1148
1149 last_lock_version = NULL,
1150 executor_id = NULL,
1151 run_id = NULL,
1152
1153 join_set_id = NULL,
1154 join_set_closing = NULL,
1155
1156 result_kind = $4
1157 WHERE execution_id = $5
1158 ",
1159 &[
1160 &i64::from(appending_version.0), &finished_at, &STATE_FINISHED, &result_kind_json, &execution_id.to_string(), ],
1166 )
1167 .await?;
1168
1169 if updated != 1 {
1170 return Err(DbErrorWrite::NotFound);
1171 }
1172 Ok(())
1173}
1174
1175#[instrument(level = Level::TRACE, skip_all, fields(%execution_id, %appending_version))]
1176async fn bump_state_next_version(
1177 tx: &Transaction<'_>,
1178 execution_id: &ExecutionId,
1179 appending_version: &Version,
1180 delay_req: Option<DelayReq>,
1181) -> Result<AppendResponse, DbErrorWrite> {
1182 debug!("update_index_version");
1183 let execution_id_str = execution_id.to_string();
1184
1185 let updated = tx
1186 .execute(
1187 r"
1188 UPDATE t_state
1189 SET
1190 corresponding_version = $1,
1191 updated_at = CURRENT_TIMESTAMP
1192 WHERE execution_id = $2
1193 ",
1194 &[
1195 &i64::from(appending_version.0), &execution_id_str, ],
1198 )
1199 .await?;
1200
1201 if updated != 1 {
1202 return Err(DbErrorWrite::NotFound);
1203 }
1204
1205 if let Some(DelayReq {
1206 join_set_id,
1207 delay_id,
1208 expires_at,
1209 }) = delay_req
1210 {
1211 debug!("Inserting delay to `t_delay`");
1212 tx.execute(
1213 "INSERT INTO t_delay (execution_id, join_set_id, delay_id, expires_at) VALUES ($1, $2, $3, $4)",
1214 &[
1215 &execution_id_str,
1216 &join_set_id.to_string(),
1217 &delay_id.to_string(),
1218 &expires_at,
1219 ],
1220 )
1221 .await?;
1222 }
1223 Ok(appending_version.increment())
1224}
1225
1226async fn get_combined_state(
1227 tx: &Transaction<'_>,
1228 execution_id: &ExecutionId,
1229) -> Result<CombinedState, DbErrorRead> {
1230 let row = tx
1231 .query_one(
1232 r"
1233 SELECT
1234 created_at, first_scheduled_at,
1235 state, ffqn, component_id_input_digest, corresponding_version, pending_expires_finished,
1236 last_lock_version, executor_id, run_id,
1237 join_set_id, join_set_closing,
1238 result_kind
1239 FROM t_state
1240 WHERE execution_id = $1
1241 ",
1242 &[&execution_id.to_string()],
1243 )
1244 .await
1245 .map_err(DbErrorRead::from)?;
1246
1247 let created_at: DateTime<Utc> = get(&row, "created_at")?;
1250 let first_scheduled_at: DateTime<Utc> = get(&row, "first_scheduled_at")?;
1251
1252 let digest_bytes: Vec<u8> = get(&row, "component_id_input_digest")?;
1253 let digest = Digest::try_from(digest_bytes.as_slice())
1254 .map_err(|err| consistency_db_err(err.to_string()))?;
1255 let component_id_input_digest = InputContentDigest(ContentDigest(digest));
1256
1257 let state: String = get(&row, "state")?;
1258 let ffqn: String = get(&row, "ffqn")?;
1259 let ffqn = FunctionFqn::from_str(&ffqn).map_err(|parse_err| {
1260 consistency_db_err(format!("invalid ffqn value in `t_state` - {parse_err}"))
1261 })?;
1262
1263 let pending_expires_finished: DateTime<Utc> = get(&row, "pending_expires_finished")?;
1264
1265 let last_lock_version_raw: Option<i64> = get(&row, "last_lock_version")?;
1266 let last_lock_version = last_lock_version_raw
1267 .map(Version::try_from)
1268 .transpose()
1269 .map_err(|_| consistency_db_err("version must be non-negative"))?;
1270
1271 let executor_id_raw: Option<String> = get(&row, "executor_id")?;
1272 let executor_id = executor_id_raw
1273 .map(|id| ExecutorId::from_str(&id))
1274 .transpose()
1275 .map_err(DbErrorGeneric::from)?;
1276
1277 let run_id_raw: Option<String> = get(&row, "run_id")?;
1278 let run_id = run_id_raw
1279 .map(|id| RunId::from_str(&id))
1280 .transpose()
1281 .map_err(DbErrorGeneric::from)?;
1282
1283 let join_set_id_raw: Option<String> = get(&row, "join_set_id")?;
1284 let join_set_id = join_set_id_raw
1285 .map(|id| JoinSetId::from_str(&id))
1286 .transpose()
1287 .map_err(DbErrorGeneric::from)?;
1288
1289 let join_set_closing: Option<bool> = get(&row, "join_set_closing")?;
1290
1291 let result_kind: Option<Json<PendingStateFinishedResultKind>> = get(&row, "result_kind")?;
1292 let result_kind = result_kind.map(|it| it.0);
1293
1294 let corresponding_version = get::<i64>(&row, "corresponding_version")?;
1295 let corresponding_version = Version::new(
1296 VersionType::try_from(corresponding_version)
1297 .map_err(|_| consistency_db_err("version must be non-negative"))?,
1298 );
1299
1300 let dto = CombinedStateDTO {
1301 execution_id: execution_id.clone(),
1302 created_at,
1303 first_scheduled_at,
1304 state,
1305 ffqn,
1306 component_digest: component_id_input_digest,
1307 pending_expires_finished,
1308 last_lock_version,
1309 executor_id,
1310 run_id,
1311 join_set_id,
1312 join_set_closing,
1313 result_kind,
1314 };
1315 CombinedState::new(dto, corresponding_version).map_err(DbErrorRead::from)
1316}
1317
1318async fn list_executions(
1319 read_tx: &Transaction<'_>,
1320 filter: ListExecutionsFilter,
1321 pagination: &ExecutionListPagination,
1322) -> Result<Vec<ExecutionWithState>, DbErrorGeneric> {
1323 struct QueryBuilder {
1325 where_clauses: Vec<String>,
1326 params: Vec<Box<dyn ToSql + Send + Sync>>,
1327 }
1328
1329 impl QueryBuilder {
1330 fn new() -> Self {
1331 Self {
1332 where_clauses: Vec::new(),
1333 params: Vec::new(),
1334 }
1335 }
1336
1337 fn add_param<T>(&mut self, param: T) -> String
1338 where
1339 T: ToSql + Sync + Send + 'static,
1340 {
1341 self.params.push(Box::new(param));
1342 format!("${}", self.params.len())
1343 }
1344
1345 fn add_where(&mut self, clause: String) {
1346 self.where_clauses.push(clause);
1347 }
1348 }
1349
1350 let mut qb = QueryBuilder::new();
1351
1352 let (limit, limit_desc) = match pagination {
1354 ExecutionListPagination::CreatedBy(p) => {
1355 let limit = p.length();
1356 let is_desc = p.is_desc();
1357 if let Some(cursor) = p.cursor() {
1358 let placeholder = qb.add_param(*cursor);
1359 qb.add_where(format!("created_at {} {placeholder}", p.rel()));
1360 }
1361 (limit, is_desc)
1362 }
1363 ExecutionListPagination::ExecutionId(p) => {
1364 let limit = p.length();
1365 let is_desc = p.is_desc();
1366 if let Some(cursor) = p.cursor() {
1367 let placeholder = qb.add_param(cursor.to_string());
1368 qb.add_where(format!("execution_id {} {placeholder}", p.rel()));
1369 }
1370 (limit, is_desc)
1371 }
1372 };
1373
1374 if !filter.show_derived {
1375 qb.add_where("is_top_level = true".to_string());
1376 }
1377
1378 if let Some(ffqn_prefix) = filter.ffqn_prefix {
1379 let placeholder = qb.add_param(format!("{ffqn_prefix}%")); qb.add_where(format!("ffqn LIKE {placeholder}"));
1381 }
1382 if filter.hide_finished {
1383 qb.add_where(format!("state != '{STATE_FINISHED}'"));
1384 }
1385 if let Some(prefix) = filter.execution_id_prefix {
1386 let placeholder = qb.add_param(format!("{prefix}%")); qb.add_where(format!("execution_id LIKE {placeholder}"));
1388 }
1389
1390 let where_str = if qb.where_clauses.is_empty() {
1391 String::new()
1392 } else {
1393 format!("WHERE {}", qb.where_clauses.join(" AND "))
1394 };
1395
1396 let order_col = match pagination {
1397 ExecutionListPagination::CreatedBy(_) => "created_at",
1398 ExecutionListPagination::ExecutionId(_) => "execution_id",
1399 };
1400
1401 let desc_str = if limit_desc { "DESC" } else { "" };
1402
1403 let sql = format!(
1404 r"
1405 SELECT created_at, first_scheduled_at, component_id_input_digest,
1406 state, execution_id, ffqn, corresponding_version, pending_expires_finished,
1407 last_lock_version, executor_id, run_id,
1408 join_set_id, join_set_closing,
1409 result_kind
1410 FROM t_state {where_str} ORDER BY {order_col} {desc_str} LIMIT {limit}
1411 "
1412 );
1413
1414 let params_refs: Vec<&(dyn ToSql + Sync)> = qb
1415 .params
1416 .iter()
1417 .map(|p| p.as_ref() as &(dyn ToSql + Sync))
1418 .collect();
1419
1420 let rows = read_tx.query(&sql, ¶ms_refs).await?;
1421
1422 let mut vec = Vec::with_capacity(rows.len());
1423
1424 for row in rows {
1425 let unpack = || -> Result<ExecutionWithState, DbErrorGeneric> {
1427 let execution_id_str: String = get(&row, "execution_id")?;
1428 let execution_id = ExecutionId::from_str(&execution_id_str)
1429 .map_err(|err| consistency_db_err(err.to_string()))?;
1430
1431 let digest_bytes: Vec<u8> = get(&row, "component_id_input_digest")?;
1432 let digest = Digest::try_from(digest_bytes.as_slice())
1433 .map_err(|err| consistency_db_err(err.to_string()))?;
1434 let component_id_input_digest = InputContentDigest(ContentDigest(digest));
1435
1436 let created_at: DateTime<Utc> = get(&row, "created_at")?;
1437 let first_scheduled_at: DateTime<Utc> = get(&row, "first_scheduled_at")?;
1438
1439 let result_kind: Option<Json<PendingStateFinishedResultKind>> =
1440 get(&row, "result_kind")?;
1441 let result_kind = result_kind.map(|it| it.0);
1442
1443 let corresponding_version: i64 = get(&row, "corresponding_version")?;
1444 let corresponding_version = Version::try_from(corresponding_version)
1445 .map_err(|_| consistency_db_err("version must be non-negative"))?;
1446
1447 let executor_id_str: Option<String> = get(&row, "executor_id")?;
1448 let executor_id = executor_id_str
1449 .map(|id| ExecutorId::from_str(&id))
1450 .transpose()?;
1451
1452 let last_lock_version_raw: Option<i64> = get(&row, "last_lock_version")?;
1453 let last_lock_version = last_lock_version_raw
1454 .map(Version::try_from)
1455 .transpose()
1456 .map_err(|_| consistency_db_err("version must be non-negative"))?;
1457
1458 let run_id_str: Option<String> = get(&row, "run_id")?;
1459 let run_id = run_id_str.map(|id| RunId::from_str(&id)).transpose()?;
1460
1461 let join_set_id_str: Option<String> = get(&row, "join_set_id")?;
1462 let join_set_id = join_set_id_str
1463 .map(|id| JoinSetId::from_str(&id))
1464 .transpose()?;
1465
1466 let ffqn: String = get(&row, "ffqn")?;
1467 let ffqn = FunctionFqn::from_str(&ffqn).map_err(|parse_err| {
1468 error!("Error parsing ffqn - {parse_err:?}");
1469 consistency_db_err("invalid ffqn value in `t_state`")
1470 })?;
1471
1472 let combined_state_dto = CombinedStateDTO {
1473 execution_id,
1474 created_at,
1475 first_scheduled_at,
1476 component_digest: component_id_input_digest,
1477 state: get(&row, "state")?,
1478 ffqn,
1479 pending_expires_finished: get(&row, "pending_expires_finished")?,
1480 executor_id,
1481 last_lock_version,
1482 run_id,
1483 join_set_id,
1484 join_set_closing: get(&row, "join_set_closing")?,
1485 result_kind,
1486 };
1487
1488 let combined_state = CombinedState::new(combined_state_dto, corresponding_version)?;
1489
1490 Ok(combined_state.execution_with_state)
1491 };
1492
1493 match unpack() {
1494 Ok(execution) => vec.push(execution),
1495 Err(err) => {
1496 warn!("Skipping corrupted row in t_state: {err:?}");
1497 }
1498 }
1499 }
1500
1501 if !limit_desc {
1502 vec.reverse();
1504 }
1505 Ok(vec)
1506}
1507
1508async fn list_responses(
1509 tx: &Transaction<'_>,
1510 execution_id: &ExecutionId,
1511 pagination: Option<Pagination<u32>>,
1512) -> Result<Vec<ResponseWithCursor>, DbErrorRead> {
1513 let mut params: Vec<Box<dyn ToSql + Send + Sync>> = Vec::new();
1515 let mut add_param = |p: Box<dyn ToSql + Send + Sync>| {
1516 params.push(p);
1517 format!("${}", params.len())
1518 };
1519
1520 let p_execution_id = add_param(Box::new(execution_id.to_string()));
1522
1523 let mut sql = format!(
1524 "SELECT \
1525 r.id, r.created_at, r.join_set_id, r.delay_id, r.delay_success, r.child_execution_id, r.finished_version, l.json_value \
1526 FROM t_join_set_response r LEFT OUTER JOIN t_execution_log l ON r.child_execution_id = l.execution_id \
1527 WHERE \
1528 r.execution_id = {p_execution_id} \
1529 AND ( r.finished_version = l.version OR r.child_execution_id IS NULL )"
1530 );
1531
1532 let limit = match &pagination {
1534 Some(p @ (Pagination::NewerThan { cursor, .. } | Pagination::OlderThan { cursor, .. })) => {
1535 let p_cursor = add_param(Box::new(i64::from(*cursor)));
1537
1538 write!(sql, " AND r.id {} {}", p.rel(), p_cursor).unwrap();
1540
1541 Some(p.length())
1542 }
1543 None => None,
1544 };
1545
1546 sql.push_str(" ORDER BY r.id");
1548 if pagination.as_ref().is_some_and(Pagination::is_desc) {
1549 sql.push_str(" DESC");
1550 }
1551
1552 if let Some(limit) = limit {
1554 let p_limit = add_param(Box::new(i64::from(limit)));
1556 write!(sql, " LIMIT {p_limit}").unwrap();
1557 }
1558
1559 let params_refs: Vec<&(dyn ToSql + Sync)> = params
1560 .iter()
1561 .map(|p| p.as_ref() as &(dyn ToSql + Sync))
1562 .collect();
1563
1564 let rows = tx
1565 .query(&sql, ¶ms_refs)
1566 .await
1567 .map_err(DbErrorRead::from)?;
1568
1569 let mut results = Vec::with_capacity(rows.len());
1570 for row in rows {
1571 results.push(parse_response_with_cursor(&row)?);
1572 }
1573
1574 Ok(results)
1575}
1576
1577fn parse_response_with_cursor(
1578 row: &tokio_postgres::Row,
1579) -> Result<ResponseWithCursor, DbErrorRead> {
1580 let id = u32::try_from(get::<i64>(row, "id")?)
1582 .map_err(|_| consistency_db_err("id must not be negative"))?;
1583
1584 let created_at: DateTime<Utc> = get(row, "created_at")?;
1585 let join_set_id_str: String = get(row, "join_set_id")?;
1586 let join_set_id = JoinSetId::from_str(&join_set_id_str).map_err(DbErrorGeneric::from)?;
1587
1588 let delay_id: Option<String> = get(row, "delay_id")?;
1590 let delay_id = delay_id
1591 .map(|id| DelayId::from_str(&id))
1592 .transpose()
1593 .map_err(DbErrorGeneric::from)?;
1594 let delay_success: Option<bool> = get(row, "delay_success")?;
1595 let child_execution_id: Option<String> = get(row, "child_execution_id")?;
1596 let child_execution_id = child_execution_id
1597 .map(|id| ExecutionIdDerived::from_str(&id))
1598 .transpose()
1599 .map_err(DbErrorGeneric::from)?;
1600 let finished_version = get::<Option<i64>>(row, "finished_version")?
1601 .map(Version::try_from)
1602 .transpose()
1603 .map_err(|_| consistency_db_err("version must be non-negative"))?;
1604 let json_value: Option<Json<ExecutionRequest>> = get(row, "json_value")?;
1605 let json_value = json_value.map(|it| it.0);
1606
1607 let event = match (
1608 delay_id,
1609 delay_success,
1610 child_execution_id,
1611 finished_version,
1612 json_value,
1613 ) {
1614 (Some(delay_id), Some(delay_success), None, None, None) => JoinSetResponse::DelayFinished {
1615 delay_id,
1616 result: delay_success.then_some(()).ok_or(()),
1617 },
1618 (None, None, Some(child_execution_id), Some(finished_version), Some(json_val)) => {
1619 if let ExecutionRequest::Finished { result, .. } = json_val {
1620 JoinSetResponse::ChildExecutionFinished {
1621 child_execution_id,
1622 finished_version,
1623 result,
1624 }
1625 } else {
1626 error!("Joined log entry must be 'Finished'");
1627 return Err(consistency_db_err("joined log entry must be 'Finished'").into());
1628 }
1629 }
1630 (delay, delay_success, child, finished, result) => {
1631 error!(
1632 "Invalid row in t_join_set_response {id} - {delay:?} {delay_success:?} {child:?} {finished:?} {result:?}",
1633 );
1634 return Err(consistency_db_err("invalid row in t_join_set_response").into());
1635 }
1636 };
1637
1638 Ok(ResponseWithCursor {
1639 cursor: id,
1640 event: JoinSetResponseEventOuter {
1641 event: JoinSetResponseEvent { join_set_id, event },
1642 created_at,
1643 },
1644 })
1645}
1646
1647#[instrument(level = Level::TRACE, skip_all, fields(%execution_id, %run_id, %executor_id))]
1648#[expect(clippy::too_many_arguments)]
1649async fn lock_single_execution(
1650 tx: &Transaction<'_>,
1651 created_at: DateTime<Utc>,
1652 component_id: ComponentId,
1653 execution_id: &ExecutionId,
1654 run_id: RunId,
1655 appending_version: &Version,
1656 executor_id: ExecutorId,
1657 lock_expires_at: DateTime<Utc>,
1658 retry_config: ComponentRetryConfig,
1659) -> Result<LockedExecution, DbErrorWrite> {
1660 debug!("lock_single_execution");
1661
1662 let combined_state = get_combined_state(tx, execution_id).await?;
1664 combined_state
1665 .execution_with_state
1666 .pending_state
1667 .can_append_lock(created_at, executor_id, run_id, lock_expires_at)?;
1668 let expected_version = combined_state.get_next_version_assert_not_finished();
1669 check_expected_next_and_appending_version(&expected_version, appending_version)?;
1670
1671 let locked_event = Locked {
1673 component_id,
1674 executor_id,
1675 lock_expires_at,
1676 run_id,
1677 retry_config,
1678 };
1679 let event = ExecutionRequest::Locked(locked_event.clone());
1680
1681 let event = Json(event);
1682
1683 tx.execute(
1685 "INSERT INTO t_execution_log \
1686 (execution_id, created_at, json_value, version, variant) \
1687 VALUES ($1, $2, $3, $4, $5)",
1688 &[
1689 &execution_id.to_string(),
1690 &created_at,
1691 &event,
1692 &i64::from(appending_version.0),
1693 &event.0.variant(),
1694 ],
1695 )
1696 .await
1697 .map_err(|err| {
1698 DbErrorWrite::NonRetriable(DbErrorWriteNonRetriable::IllegalState {
1699 reason: "cannot lock".into(),
1700 context: SpanTrace::capture(),
1701 source: Some(Arc::new(err)),
1702 loc: Location::caller(),
1703 })
1704 })?;
1705
1706 let responses_dto = list_responses(tx, execution_id, None).await?;
1708 let responses = responses_dto.into_iter().map(|resp| resp.event).collect();
1709 trace!("Responses: {responses:?}");
1710
1711 let intermittent_event_count = update_state_locked_get_intermittent_event_count(
1712 tx,
1713 execution_id,
1714 executor_id,
1715 run_id,
1716 lock_expires_at,
1717 appending_version,
1718 retry_config,
1719 )
1720 .await?;
1721
1722 let rows = tx
1725 .query(
1726 "SELECT json_value, version FROM t_execution_log WHERE \
1727 execution_id = $1 AND (variant = $2 OR variant = $3) \
1728 ORDER BY version",
1729 &[
1730 &execution_id.to_string(),
1731 &DUMMY_CREATED.variant(),
1732 &DUMMY_HISTORY_EVENT.variant(),
1733 ],
1734 )
1735 .await
1736 .map_err(DbErrorGeneric::from)?;
1737
1738 let mut events: VecDeque<ExecutionEvent> = VecDeque::new();
1739
1740 for row in rows {
1741 let event: Json<ExecutionRequest> = get(&row, "json_value")?;
1742 let event = event.0;
1743
1744 let version: i64 = get(&row, "version")?;
1745 let version = Version::try_from(version)
1746 .map_err(|_| consistency_db_err("version must be non-negative"))?;
1747
1748 events.push_back(ExecutionEvent {
1749 created_at: DateTime::from_timestamp_nanos(0), event,
1751 backtrace_id: None,
1752 version,
1753 });
1754 }
1755
1756 let Some(ExecutionRequest::Created {
1758 ffqn,
1759 params,
1760 parent,
1761 metadata,
1762 ..
1763 }) = events.pop_front().map(|outer| outer.event)
1764 else {
1765 error!("Execution log must contain at least `Created` event");
1766 return Err(consistency_db_err("execution log must contain `Created` event").into());
1767 };
1768
1769 let mut event_history = Vec::new();
1771 for ExecutionEvent { event, version, .. } in events {
1772 if let ExecutionRequest::HistoryEvent { event } = event {
1773 event_history.push((event, version));
1774 } else {
1775 error!("Rows can only contain `Created` and `HistoryEvent` event kinds");
1776 return Err(consistency_db_err(
1777 "rows can only contain `Created` and `HistoryEvent` event kinds",
1778 )
1779 .into());
1780 }
1781 }
1782
1783 Ok(LockedExecution {
1784 execution_id: execution_id.clone(),
1785 metadata,
1786 next_version: appending_version.increment(),
1787 ffqn,
1788 params,
1789 event_history,
1790 responses,
1791 parent,
1792 intermittent_event_count,
1793 locked_event,
1794 })
1795}
1796
1797async fn count_join_next(
1798 tx: &Transaction<'_>,
1799 execution_id: &ExecutionId,
1800 join_set_id: &JoinSetId,
1801) -> Result<u32, DbErrorRead> {
1802 let row = tx
1803 .query_one(
1804 "SELECT COUNT(*) as count FROM t_execution_log WHERE execution_id = $1 AND join_set_id = $2 \
1805 AND history_event_type = $3",
1806 &[
1807 &execution_id.to_string(),
1808 &join_set_id.to_string(),
1809 &HISTORY_EVENT_TYPE_JOIN_NEXT,
1810 ],
1811 )
1812 .await
1813 .map_err(DbErrorRead::from)?;
1814
1815 let count = u32::try_from(get::<i64>(&row, "count")?).expect("COUNT cannot be negative");
1816 Ok(count)
1817}
1818
1819async fn nth_response(
1820 tx: &Transaction<'_>,
1821 execution_id: &ExecutionId,
1822 join_set_id: &JoinSetId,
1823 skip_rows: u32,
1824) -> Result<Option<ResponseWithCursor>, DbErrorRead> {
1825 let row = tx
1826 .query_opt(
1827 "SELECT r.id, r.created_at, r.join_set_id, \
1828 r.delay_id, r.delay_success, \
1829 r.child_execution_id, r.finished_version, l.json_value \
1830 FROM t_join_set_response r LEFT OUTER JOIN t_execution_log l ON r.child_execution_id = l.execution_id \
1831 WHERE \
1832 r.execution_id = $1 AND r.join_set_id = $2 AND \
1833 ( \
1834 r.finished_version = l.version \
1835 OR \
1836 r.child_execution_id IS NULL \
1837 ) \
1838 ORDER BY id \
1839 LIMIT 1 OFFSET $3",
1840 &[
1841 &execution_id.to_string(),
1842 &join_set_id.to_string(),
1843 &i64::from(skip_rows),
1844 ]
1845 )
1846 .await
1847 .map_err(DbErrorRead::from)?;
1848
1849 match row {
1850 Some(r) => Ok(Some(parse_response_with_cursor(&r)?)),
1851 None => Ok(None),
1852 }
1853}
1854
1855#[instrument(level = Level::TRACE, skip_all, fields(%execution_id))]
1856async fn append(
1857 tx: &Transaction<'_>,
1858 execution_id: &ExecutionId,
1859 req: AppendRequest,
1860 appending_version: Version,
1861) -> Result<(AppendResponse, AppendNotifier), DbErrorWrite> {
1862 if matches!(req.event, ExecutionRequest::Created { .. }) {
1863 return Err(DbErrorWrite::NonRetriable(
1864 DbErrorWriteNonRetriable::ValidationFailed(
1865 "cannot append `Created` event - use `create` instead".into(),
1866 ),
1867 ));
1868 }
1869
1870 if let AppendRequest {
1871 event:
1872 ExecutionRequest::Locked(Locked {
1873 component_id,
1874 executor_id,
1875 run_id,
1876 lock_expires_at,
1877 retry_config,
1878 }),
1879 created_at,
1880 } = req
1881 {
1882 return lock_single_execution(
1883 tx,
1884 created_at,
1885 component_id,
1886 execution_id,
1887 run_id,
1888 &appending_version,
1889 executor_id,
1890 lock_expires_at,
1891 retry_config,
1892 )
1893 .await
1894 .map(|locked_execution| (locked_execution.next_version, AppendNotifier::default()));
1895 }
1896
1897 let combined_state = get_combined_state(tx, execution_id).await?;
1898 if combined_state
1899 .execution_with_state
1900 .pending_state
1901 .is_finished()
1902 {
1903 debug!("Execution is already finished");
1904 return Err(DbErrorWrite::NonRetriable(
1905 DbErrorWriteNonRetriable::IllegalState {
1906 reason: "already finished".into(),
1907 context: SpanTrace::capture(),
1908 source: None,
1909 loc: Location::caller(),
1910 },
1911 ));
1912 }
1913
1914 check_expected_next_and_appending_version(
1915 &combined_state.get_next_version_assert_not_finished(),
1916 &appending_version,
1917 )?;
1918
1919 let event = Json(req.event);
1920
1921 tx.execute(
1923 "INSERT INTO t_execution_log (execution_id, created_at, json_value, version, variant, join_set_id) \
1924 VALUES ($1, $2, $3, $4, $5, $6)",
1925 &[
1926 &execution_id.to_string(),
1927 &req.created_at,
1928 &event,
1929 &i64::from(appending_version.0),
1930 &event.0.variant(),
1931 &event.0.join_set_id().map(std::string::ToString::to_string),
1932 ],
1933 )
1934 .await?;
1935
1936 match &event.0 {
1938 ExecutionRequest::Created { .. } => {
1939 unreachable!("handled in the caller")
1940 }
1941
1942 ExecutionRequest::Locked { .. } => {
1943 unreachable!("handled above")
1944 }
1945
1946 ExecutionRequest::TemporarilyFailed {
1947 backoff_expires_at, ..
1948 }
1949 | ExecutionRequest::TemporarilyTimedOut {
1950 backoff_expires_at, ..
1951 } => {
1952 let (next_version, notifier) = update_state_pending_after_event_appended(
1953 tx,
1954 execution_id,
1955 &appending_version,
1956 *backoff_expires_at,
1957 true, combined_state.execution_with_state.component_digest,
1959 )
1960 .await?;
1961 return Ok((next_version, notifier));
1962 }
1963
1964 ExecutionRequest::Unlocked {
1965 backoff_expires_at, ..
1966 } => {
1967 let (next_version, notifier) = update_state_pending_after_event_appended(
1968 tx,
1969 execution_id,
1970 &appending_version,
1971 *backoff_expires_at,
1972 false, combined_state.execution_with_state.component_digest,
1974 )
1975 .await?;
1976 return Ok((next_version, notifier));
1977 }
1978
1979 ExecutionRequest::Finished { result, .. } => {
1980 update_state_finished(
1981 tx,
1982 execution_id,
1983 &appending_version,
1984 req.created_at,
1985 PendingStateFinishedResultKind::from(result),
1986 )
1987 .await?;
1988 return Ok((
1989 appending_version,
1990 AppendNotifier {
1991 pending_at: None,
1992 execution_finished: Some(NotifierExecutionFinished {
1993 execution_id: execution_id.clone(),
1994 retval: result.clone(),
1995 }),
1996 response: None,
1997 },
1998 ));
1999 }
2000
2001 ExecutionRequest::HistoryEvent {
2002 event:
2003 HistoryEvent::JoinSetCreate { .. }
2004 | HistoryEvent::JoinSetRequest {
2005 request: JoinSetRequest::ChildExecutionRequest { .. },
2006 ..
2007 }
2008 | HistoryEvent::Persist { .. }
2009 | HistoryEvent::Schedule { .. }
2010 | HistoryEvent::Stub { .. }
2011 | HistoryEvent::JoinNextTooMany { .. },
2012 } => {
2013 return Ok((
2014 bump_state_next_version(tx, execution_id, &appending_version, None).await?,
2015 AppendNotifier::default(),
2016 ));
2017 }
2018
2019 ExecutionRequest::HistoryEvent {
2020 event:
2021 HistoryEvent::JoinSetRequest {
2022 join_set_id,
2023 request:
2024 JoinSetRequest::DelayRequest {
2025 delay_id,
2026 expires_at,
2027 ..
2028 },
2029 },
2030 } => {
2031 return Ok((
2032 bump_state_next_version(
2033 tx,
2034 execution_id,
2035 &appending_version,
2036 Some(DelayReq {
2037 join_set_id: join_set_id.clone(),
2038 delay_id: delay_id.clone(),
2039 expires_at: *expires_at,
2040 }),
2041 )
2042 .await?,
2043 AppendNotifier::default(),
2044 ));
2045 }
2046
2047 ExecutionRequest::HistoryEvent {
2048 event:
2049 HistoryEvent::JoinNext {
2050 join_set_id,
2051 run_expires_at,
2052 closing,
2053 requested_ffqn: _,
2054 },
2055 } => {
2056 let join_next_count = count_join_next(tx, execution_id, join_set_id).await?;
2058
2059 let nth_response =
2061 nth_response(tx, execution_id, join_set_id, join_next_count - 1).await?;
2062
2063 trace!("join_next_count: {join_next_count}, nth_response: {nth_response:?}");
2064 assert!(join_next_count > 0);
2065
2066 if let Some(ResponseWithCursor {
2067 event:
2068 JoinSetResponseEventOuter {
2069 created_at: nth_created_at,
2070 ..
2071 },
2072 cursor: _,
2073 }) = nth_response
2074 {
2075 let scheduled_at = std::cmp::max(*run_expires_at, nth_created_at);
2076 let (next_version, notifier) = update_state_pending_after_event_appended(
2077 tx,
2078 execution_id,
2079 &appending_version,
2080 scheduled_at,
2081 false, combined_state.execution_with_state.component_digest,
2083 )
2084 .await?;
2085 return Ok((next_version, notifier));
2086 }
2087
2088 return Ok((
2089 update_state_blocked(
2090 tx,
2091 execution_id,
2092 &appending_version,
2093 join_set_id,
2094 *run_expires_at,
2095 *closing,
2096 )
2097 .await?,
2098 AppendNotifier::default(),
2099 ));
2100 }
2101 }
2102}
2103
2104async fn append_response(
2105 tx: &Transaction<'_>,
2106 execution_id: &ExecutionId,
2107 response_outer: JoinSetResponseEventOuter,
2108) -> Result<AppendNotifier, DbErrorWrite> {
2109 let join_set_id = &response_outer.event.join_set_id;
2110
2111 let (delay_id, delay_success) = match &response_outer.event.event {
2112 JoinSetResponse::DelayFinished { delay_id, result } => {
2113 (Some(delay_id.to_string()), Some(result.is_ok()))
2114 }
2115 JoinSetResponse::ChildExecutionFinished { .. } => (None, None),
2116 };
2117
2118 let (child_execution_id, finished_version) = match &response_outer.event.event {
2119 JoinSetResponse::ChildExecutionFinished {
2120 child_execution_id,
2121 finished_version,
2122 result: _,
2123 } => (
2124 Some(child_execution_id.to_string()),
2125 Some(i64::from(finished_version.0)),
2126 ),
2127 JoinSetResponse::DelayFinished { .. } => (None, None),
2128 };
2129
2130 tx.execute(
2131 "INSERT INTO t_join_set_response (execution_id, created_at, join_set_id, delay_id, delay_success, child_execution_id, finished_version) \
2132 VALUES ($1, $2, $3, $4, $5, $6, $7)",
2133 &[
2134 &execution_id.to_string(),
2135 &response_outer.created_at,
2136 &join_set_id.to_string(),
2137 &delay_id,
2138 &delay_success,
2139 &child_execution_id,
2140 &finished_version,
2141 ]
2142 ).await?;
2143
2144 let combined_state = get_combined_state(tx, execution_id).await?;
2146 debug!("previous_pending_state: {combined_state:?}");
2147
2148 let mut notifier = if let PendingState::BlockedByJoinSet {
2149 join_set_id: found_join_set_id,
2150 lock_expires_at,
2151 closing: _,
2152 } = combined_state.execution_with_state.pending_state
2153 && *join_set_id == found_join_set_id
2154 {
2155 let scheduled_at = std::cmp::max(lock_expires_at, response_outer.created_at);
2156 update_state_pending_after_response_appended(
2158 tx,
2159 execution_id,
2160 scheduled_at,
2161 &combined_state.corresponding_version,
2162 combined_state.execution_with_state.component_digest,
2163 )
2164 .await?
2165 } else {
2166 AppendNotifier::default()
2167 };
2168
2169 if let JoinSetResponseEvent {
2170 join_set_id,
2171 event:
2172 JoinSetResponse::DelayFinished {
2173 delay_id,
2174 result: _,
2175 },
2176 } = &response_outer.event
2177 {
2178 debug!(%join_set_id, %delay_id, "Deleting from `t_delay`");
2179 tx.execute(
2180 "DELETE FROM t_delay WHERE execution_id = $1 AND join_set_id = $2 AND delay_id = $3",
2181 &[
2182 &execution_id.to_string(),
2183 &join_set_id.to_string(),
2184 &delay_id.to_string(),
2185 ],
2186 )
2187 .await?;
2188 }
2189
2190 notifier.response = Some((execution_id.clone(), response_outer));
2191 Ok(notifier)
2192}
2193
2194async fn append_backtrace(
2195 tx: &Transaction<'_>,
2196 backtrace_info: &BacktraceInfo,
2197) -> Result<(), DbErrorWrite> {
2198 let backtrace_json = Json(&backtrace_info.wasm_backtrace);
2199
2200 tx.execute(
2201 "INSERT INTO t_backtrace (execution_id, component_id, version_min_including, version_max_excluding, wasm_backtrace) \
2202 VALUES ($1, $2, $3, $4, $5)",
2203 &[
2204 &backtrace_info.execution_id.to_string(),
2205 &Json(&backtrace_info.component_id),
2206 &i64::from(backtrace_info.version_min_including.0),
2207 &i64::from(backtrace_info.version_max_excluding.0),
2208 &backtrace_json,
2209 ],
2210 )
2211 .await?;
2212
2213 Ok(())
2214}
2215
2216#[cfg(feature = "test")]
2217async fn get_execution_log(
2218 tx: &Transaction<'_>,
2219 execution_id: &ExecutionId,
2220) -> Result<concepts::storage::ExecutionLog, DbErrorRead> {
2221 let rows = tx
2222 .query(
2223 "SELECT created_at, json_value, version FROM t_execution_log WHERE \
2224 execution_id = $1 ORDER BY version",
2225 &[&execution_id.to_string()],
2226 )
2227 .await
2228 .map_err(DbErrorRead::from)?;
2229
2230 if rows.is_empty() {
2231 return Err(DbErrorRead::NotFound);
2232 }
2233
2234 let mut events = Vec::with_capacity(rows.len());
2235 for row in rows {
2236 let created_at: DateTime<Utc> = get(&row, "created_at")?;
2237 let event: Json<ExecutionRequest> = get(&row, "json_value")?;
2238 let event = event.0;
2239 let version: i64 = get(&row, "version")?;
2240 let version = Version::try_from(version)
2241 .map_err(|_| consistency_db_err("version must be non-negative"))?;
2242
2243 events.push(ExecutionEvent {
2244 created_at,
2245 event,
2246 backtrace_id: None,
2247 version,
2248 });
2249 }
2250
2251 let combined_state = get_combined_state(tx, execution_id).await?;
2252 let responses_dto = list_responses(tx, execution_id, None).await?;
2253 let responses = responses_dto.into_iter().map(|resp| resp.event).collect();
2254
2255 Ok(concepts::storage::ExecutionLog {
2256 execution_id: execution_id.clone(),
2257 events,
2258 responses,
2259 next_version: combined_state.get_next_version_or_finished(),
2260 pending_state: combined_state.execution_with_state.pending_state,
2261 component_digest: combined_state.execution_with_state.component_digest,
2262 })
2263}
2264
2265async fn list_execution_events(
2266 tx: &Transaction<'_>,
2267 execution_id: &ExecutionId,
2268 version_min: VersionType,
2269 version_max_excluding: VersionType,
2270 include_backtrace_id: bool,
2271) -> Result<Vec<ExecutionEvent>, DbErrorRead> {
2272 let sql = if include_backtrace_id {
2273 "SELECT
2274 log.created_at,
2275 log.json_value,
2276 log.version,
2277 bt.version_min_including AS backtrace_id
2278 FROM
2279 t_execution_log AS log
2280 LEFT OUTER JOIN
2281 t_backtrace AS bt ON log.execution_id = bt.execution_id
2282 AND log.version >= bt.version_min_including
2283 AND log.version < bt.version_max_excluding
2284 WHERE
2285 log.execution_id = $1
2286 AND log.version >= $2
2287 AND log.version < $3
2288 ORDER BY
2289 log.version"
2290 } else {
2291 "SELECT
2292 created_at, json_value, NULL::BIGINT as backtrace_id, version
2293 FROM t_execution_log WHERE
2294 execution_id = $1 AND version >= $2 AND version < $3
2295 ORDER BY version"
2296 };
2297
2298 let rows = tx
2299 .query(
2300 sql,
2301 &[
2302 &execution_id.to_string(),
2303 &i64::from(version_min),
2304 &i64::from(version_max_excluding),
2305 ],
2306 )
2307 .await
2308 .map_err(DbErrorRead::from)?;
2309
2310 let mut events = Vec::with_capacity(rows.len());
2311 for row in rows {
2312 let created_at: DateTime<Utc> = get(&row, "created_at")?;
2313 let backtrace_id = get::<Option<i64>>(&row, "backtrace_id")?
2314 .map(Version::try_from)
2315 .transpose()
2316 .map_err(|_| consistency_db_err("version must be non-negative"))?;
2317
2318 let version = get::<i64>(&row, "version")?;
2319 let version = Version::new(
2320 VersionType::try_from(version)
2321 .map_err(|_| consistency_db_err("version must be non-negative"))?,
2322 );
2323 let event_req: Json<ExecutionRequest> = get(&row, "json_value")?;
2324 let event_req = event_req.0;
2325
2326 events.push(ExecutionEvent {
2327 created_at,
2328 event: event_req,
2329 backtrace_id,
2330 version,
2331 });
2332 }
2333 Ok(events)
2334}
2335
2336async fn get_execution_event(
2337 tx: &Transaction<'_>,
2338 execution_id: &ExecutionId,
2339 version: VersionType,
2340) -> Result<ExecutionEvent, DbErrorRead> {
2341 let row = tx
2342 .query_one(
2343 "SELECT created_at, json_value, version FROM t_execution_log WHERE \
2344 execution_id = $1 AND version = $2",
2345 &[&execution_id.to_string(), &i64::from(version)],
2346 )
2347 .await?;
2348
2349 let created_at: DateTime<Utc> = get(&row, "created_at")?;
2350 let json_val: Json<ExecutionRequest> = get(&row, "json_value")?;
2351 let version = get::<i64>(&row, "version")?;
2352 let version = Version::try_from(version)
2353 .map_err(|_| consistency_db_err("version must be non-negative"))?;
2354 let event = json_val.0;
2355
2356 Ok(ExecutionEvent {
2357 created_at,
2358 event,
2359 backtrace_id: None,
2360 version,
2361 })
2362}
2363
2364async fn get_last_execution_event(
2365 tx: &Transaction<'_>,
2366 execution_id: &ExecutionId,
2367) -> Result<ExecutionEvent, DbErrorRead> {
2368 let row = tx
2369 .query_one(
2370 "SELECT created_at, json_value, version FROM t_execution_log WHERE \
2371 execution_id = $1 ORDER BY version DESC LIMIT 1",
2372 &[&execution_id.to_string()],
2373 )
2374 .await?;
2375
2376 let created_at: DateTime<Utc> = get(&row, "created_at")?;
2377 let event: Json<ExecutionRequest> = get(&row, "json_value")?;
2378 let event = event.0;
2379 let version = get::<i64>(&row, "version")?;
2380 let version = Version::try_from(version)
2381 .map_err(|_| consistency_db_err("version must be non-negative"))?;
2382
2383 Ok(ExecutionEvent {
2384 created_at,
2385 event,
2386 backtrace_id: None,
2387 version,
2388 })
2389}
2390
2391async fn delay_response(
2392 tx: &Transaction<'_>,
2393 execution_id: &ExecutionId,
2394 delay_id: &DelayId,
2395) -> Result<Option<bool>, DbErrorRead> {
2396 let row = tx
2397 .query_opt(
2398 "SELECT delay_success \
2399 FROM t_join_set_response \
2400 WHERE \
2401 execution_id = $1 AND delay_id = $2",
2402 &[&execution_id.to_string(), &delay_id.to_string()],
2403 )
2404 .await?;
2405
2406 match row {
2407 Some(r) => Ok(Some(get::<bool>(&r, "delay_success")?)),
2408 None => Ok(None),
2409 }
2410}
2411
2412#[instrument(level = Level::TRACE, skip_all)]
2413async fn get_responses_with_offset(
2414 tx: &Transaction<'_>,
2415 execution_id: &ExecutionId,
2416 skip_rows: u32,
2417) -> Result<Vec<JoinSetResponseEventOuter>, DbErrorRead> {
2418 let rows = tx
2419 .query(
2420 "SELECT r.id, r.created_at, r.join_set_id, \
2421 r.delay_id, r.delay_success, \
2422 r.child_execution_id, r.finished_version, l.json_value \
2423 FROM t_join_set_response r LEFT OUTER JOIN t_execution_log l ON r.child_execution_id = l.execution_id \
2424 WHERE \
2425 r.execution_id = $1 AND \
2426 ( \
2427 r.finished_version = l.version \
2428 OR r.child_execution_id IS NULL \
2429 ) \
2430 ORDER BY id \
2431 OFFSET $2",
2432 &[
2433 &execution_id.to_string(),
2434 &(i64::from(skip_rows)),
2435 ]
2436 )
2437 .await?;
2438
2439 let mut results = Vec::with_capacity(rows.len());
2440 for row in rows {
2441 let resp = parse_response_with_cursor(&row)?;
2442 results.push(resp.event);
2443 }
2444 Ok(results)
2445}
2446
2447async fn get_pending_of_single_ffqn(
2448 tx: &Transaction<'_>,
2449 batch_size: u32,
2450 pending_at_or_sooner: DateTime<Utc>,
2451 ffqn: &FunctionFqn,
2452 select_strategy: SelectStrategy,
2453) -> Result<Vec<(ExecutionId, Version)>, ()> {
2454 let rows = tx
2455 .query(
2456 &format!(
2457 "SELECT execution_id, corresponding_version FROM t_state \
2458 WHERE \
2459 state = '{STATE_PENDING_AT}' AND \
2460 pending_expires_finished <= $1 AND ffqn = $2 \
2461 ORDER BY pending_expires_finished \
2462 {} \
2463 LIMIT $3",
2464 if select_strategy == SelectStrategy::LockForUpdate {
2465 "FOR UPDATE SKIP LOCKED"
2466 } else {
2467 ""
2468 }
2469 ),
2470 &[
2471 &pending_at_or_sooner,
2472 &ffqn.to_string(),
2473 &(i64::from(batch_size)),
2474 ],
2475 )
2476 .await
2477 .map_err(|err| {
2478 warn!("Ignoring consistency error {err:?}");
2479 })?;
2480
2481 let mut result = Vec::with_capacity(rows.len());
2482 for row in rows {
2483 let unpack = || -> Result<(ExecutionId, Version), DbErrorGeneric> {
2484 let eid_str: String = get(&row, "execution_id")?;
2485 let corresponding_version: i64 = get(&row, "corresponding_version")?;
2486 let corresponding_version = Version::try_from(corresponding_version)
2487 .map_err(|_| consistency_db_err("version must be non-negative"))?;
2488
2489 if let Ok(eid) = ExecutionId::from_str(&eid_str) {
2490 return Ok((eid, corresponding_version.increment()));
2491 }
2492 Err(consistency_db_err("invalid execution_id"))
2493 };
2494
2495 match unpack() {
2496 Ok(val) => result.push(val),
2497 Err(err) => warn!("Ignoring corrupted row in pending check: {err:?}"),
2498 }
2499 }
2500 Ok(result)
2501}
2502
2503async fn get_pending_by_ffqns(
2505 tx: &Transaction<'_>,
2506 batch_size: u32,
2507 pending_at_or_sooner: DateTime<Utc>,
2508 ffqns: &[FunctionFqn],
2509 select_strategy: SelectStrategy,
2510) -> Result<Vec<(ExecutionId, Version)>, DbErrorGeneric> {
2511 let batch_size = usize::try_from(batch_size).expect("16 bit systems are unsupported");
2512 let mut execution_ids_versions = Vec::with_capacity(batch_size);
2513
2514 for ffqn in ffqns {
2515 let needed = batch_size - execution_ids_versions.len();
2516 if needed == 0 {
2517 break;
2518 }
2519 let needed = u32::try_from(needed).expect("u32 - usize cannot overflow an 32");
2520 if let Ok(execs) =
2521 get_pending_of_single_ffqn(tx, needed, pending_at_or_sooner, ffqn, select_strategy)
2522 .await
2523 {
2524 execution_ids_versions.extend(execs);
2525 }
2526 }
2527
2528 Ok(execution_ids_versions)
2529}
2530
2531#[derive(Debug, Copy, Clone, PartialEq, Eq)]
2532enum SelectStrategy {
2533 Read,
2534 LockForUpdate,
2535}
2536
2537async fn get_pending_by_component_input_digest(
2538 tx: &Transaction<'_>,
2539 batch_size: u32,
2540 pending_at_or_sooner: DateTime<Utc>,
2541 input_digest: &InputContentDigest,
2542 select_strategy: SelectStrategy,
2543) -> Result<Vec<(ExecutionId, Version)>, DbErrorGeneric> {
2544 let rows = tx
2545 .query(
2546 &format!(
2547 "SELECT execution_id, corresponding_version FROM t_state WHERE \
2548 state = '{STATE_PENDING_AT}' AND \
2549 pending_expires_finished <= $1 AND \
2550 component_id_input_digest = $2 \
2551 ORDER BY pending_expires_finished \
2552 {} \
2553 LIMIT $3",
2554 if select_strategy == SelectStrategy::LockForUpdate {
2555 "FOR UPDATE SKIP LOCKED"
2556 } else {
2557 ""
2558 }
2559 ),
2560 &[
2561 &pending_at_or_sooner,
2562 &input_digest.as_slice(), &i64::from(batch_size),
2564 ],
2565 )
2566 .await?;
2567
2568 let mut result = Vec::with_capacity(rows.len());
2569 for row in rows {
2570 let unpack = || -> Result<(ExecutionId, Version), DbErrorGeneric> {
2571 let eid_str: String = get(&row, "execution_id")?;
2572 let corresponding_version: i64 = get(&row, "corresponding_version")?;
2573 let corresponding_version = Version::try_from(corresponding_version)
2574 .map_err(|_| consistency_db_err("version must be non-negative"))?;
2575
2576 let eid = ExecutionId::from_str(&eid_str)
2577 .map_err(|err| consistency_db_err(err.to_string()))?;
2578 Ok((eid, corresponding_version.increment()))
2579 };
2580
2581 match unpack() {
2582 Ok(val) => result.push(val),
2583 Err(err) => {
2584 warn!("Skipping corrupted row in get_pending_by_component_input_digest: {err:?}");
2585 }
2586 }
2587 }
2588
2589 Ok(result)
2590}
2591
2592fn notify_pending_locked(
2593 notifier: &NotifierPendingAt,
2594 current_time: DateTime<Utc>,
2595 ffqn_to_pending_subscription: &std::sync::MutexGuard<PendingFfqnSubscribersHolder>,
2596) {
2597 if notifier.scheduled_at <= current_time {
2598 ffqn_to_pending_subscription.notify(notifier);
2599 }
2600}
2601
2602async fn upgrade_execution_component(
2603 tx: &Transaction<'_>,
2604 execution_id: &ExecutionId,
2605 old: &InputContentDigest,
2606 new: &InputContentDigest,
2607) -> Result<(), DbErrorWrite> {
2608 debug!("Updating t_state to component {new}");
2609
2610 let updated = tx
2611 .execute(
2612 r"
2613 UPDATE t_state
2614 SET
2615 updated_at = CURRENT_TIMESTAMP,
2616 component_id_input_digest = $1
2617 WHERE
2618 execution_id = $2 AND
2619 component_id_input_digest = $3
2620 ",
2621 &[
2622 &new.as_slice(), &execution_id.to_string(), &old.as_slice(), ],
2626 )
2627 .await?;
2628
2629 if updated != 1 {
2630 return Err(DbErrorWrite::NotFound);
2631 }
2632 Ok(())
2633}
2634
2635impl PostgresConnection {
2636 #[instrument(level = Level::TRACE, skip_all)]
2638 fn notify_all(&self, notifiers: Vec<AppendNotifier>, current_time: DateTime<Utc>) {
2639 let (pending_ats, finished_execs, responses) = {
2640 let (mut pending_ats, mut finished_execs, mut responses) =
2641 (Vec::new(), Vec::new(), Vec::new());
2642 for notifier in notifiers {
2643 if let Some(pending_at) = notifier.pending_at {
2644 pending_ats.push(pending_at);
2645 }
2646 if let Some(finished) = notifier.execution_finished {
2647 finished_execs.push(finished);
2648 }
2649 if let Some(response) = notifier.response {
2650 responses.push(response);
2651 }
2652 }
2653 (pending_ats, finished_execs, responses)
2654 };
2655
2656 if !pending_ats.is_empty() {
2658 let guard = self.pending_subscribers.lock().unwrap();
2659 for pending_at in pending_ats {
2660 notify_pending_locked(&pending_at, current_time, &guard);
2661 }
2662 }
2663 if !finished_execs.is_empty() {
2665 let mut guard = self.execution_finished_subscribers.lock().unwrap();
2666 for finished in finished_execs {
2667 if let Some(listeners_of_exe_id) = guard.remove(&finished.execution_id) {
2668 for (_tag, sender) in listeners_of_exe_id {
2669 let _ = sender.send(finished.retval.clone());
2670 }
2671 }
2672 }
2673 }
2674 if !responses.is_empty() {
2676 let mut guard = self.response_subscribers.lock().unwrap();
2677 for (execution_id, response) in responses {
2678 if let Some((sender, _)) = guard.remove(&execution_id) {
2679 let _ = sender.send(response);
2680 }
2681 }
2682 }
2683 }
2684}
2685
2686#[async_trait]
2687impl DbExecutor for PostgresConnection {
2688 #[instrument(level = Level::TRACE, skip(self))]
2689 async fn lock_pending_by_ffqns(
2690 &self,
2691 batch_size: u32,
2692 pending_at_or_sooner: DateTime<Utc>,
2693 ffqns: Arc<[FunctionFqn]>,
2694 created_at: DateTime<Utc>,
2695 component_id: ComponentId,
2696 executor_id: ExecutorId,
2697 lock_expires_at: DateTime<Utc>,
2698 run_id: RunId,
2699 retry_config: ComponentRetryConfig,
2700 ) -> Result<LockPendingResponse, DbErrorGeneric> {
2701 let mut client_guard = self.client.lock().await;
2702 let tx = client_guard.transaction().await?;
2703
2704 let execution_ids_versions = get_pending_by_ffqns(
2705 &tx,
2706 batch_size,
2707 pending_at_or_sooner,
2708 &ffqns,
2709 SelectStrategy::LockForUpdate,
2710 )
2711 .await?;
2712
2713 if execution_ids_versions.is_empty() {
2714 tx.commit().await?;
2717 return Ok(vec![]);
2718 }
2719
2720 debug!("Locking {execution_ids_versions:?}");
2721
2722 let mut locked_execs = Vec::with_capacity(execution_ids_versions.len());
2724 for (execution_id, version) in execution_ids_versions {
2725 match lock_single_execution(
2726 &tx,
2727 created_at,
2728 component_id.clone(),
2729 &execution_id,
2730 run_id,
2731 &version,
2732 executor_id,
2733 lock_expires_at,
2734 retry_config,
2735 )
2736 .await
2737 {
2738 Ok(locked) => locked_execs.push(locked),
2739 Err(err) => {
2740 warn!("Locking row {execution_id} failed - {err:?}");
2741 }
2742 }
2743 }
2744
2745 tx.commit().await?;
2746
2747 Ok(locked_execs)
2748 }
2749
2750 #[instrument(level = Level::TRACE, skip(self))]
2751 async fn lock_pending_by_component_id(
2752 &self,
2753 batch_size: u32,
2754 pending_at_or_sooner: DateTime<Utc>,
2755 component_id: &ComponentId,
2756 created_at: DateTime<Utc>,
2757 executor_id: ExecutorId,
2758 lock_expires_at: DateTime<Utc>,
2759 run_id: RunId,
2760 retry_config: ComponentRetryConfig,
2761 ) -> Result<LockPendingResponse, DbErrorGeneric> {
2762 let mut client_guard = self.client.lock().await;
2763 let tx = client_guard.transaction().await?;
2764
2765 let execution_ids_versions = get_pending_by_component_input_digest(
2766 &tx,
2767 batch_size,
2768 pending_at_or_sooner,
2769 &component_id.input_digest,
2770 SelectStrategy::LockForUpdate,
2771 )
2772 .await?;
2773
2774 if execution_ids_versions.is_empty() {
2775 tx.commit().await?;
2776 return Ok(vec![]);
2777 }
2778
2779 debug!("Locking {execution_ids_versions:?}");
2780
2781 let mut locked_execs = Vec::with_capacity(execution_ids_versions.len());
2782 for (execution_id, version) in execution_ids_versions {
2783 match lock_single_execution(
2784 &tx,
2785 created_at,
2786 component_id.clone(),
2787 &execution_id,
2788 run_id,
2789 &version,
2790 executor_id,
2791 lock_expires_at,
2792 retry_config,
2793 )
2794 .await
2795 {
2796 Ok(locked) => locked_execs.push(locked),
2797 Err(err) => {
2798 warn!("Locking row {execution_id} failed - {err:?}");
2799 }
2800 }
2801 }
2802
2803 tx.commit().await?;
2804 Ok(locked_execs)
2805 }
2806
2807 #[instrument(level = Level::DEBUG, skip(self))]
2808 async fn lock_one(
2809 &self,
2810 created_at: DateTime<Utc>,
2811 component_id: ComponentId,
2812 execution_id: &ExecutionId,
2813 run_id: RunId,
2814 version: Version,
2815 executor_id: ExecutorId,
2816 lock_expires_at: DateTime<Utc>,
2817 retry_config: ComponentRetryConfig,
2818 ) -> Result<LockedExecution, DbErrorWrite> {
2819 debug!(%execution_id, "lock_one");
2820 let mut client_guard = self.client.lock().await;
2821 let tx = client_guard.transaction().await?;
2822
2823 let res = lock_single_execution(
2824 &tx,
2825 created_at,
2826 component_id,
2827 execution_id,
2828 run_id,
2829 &version,
2830 executor_id,
2831 lock_expires_at,
2832 retry_config,
2833 )
2834 .await?;
2835
2836 tx.commit().await?;
2837 Ok(res)
2838 }
2839
2840 #[instrument(level = Level::DEBUG, skip(self, req))]
2841 async fn append(
2842 &self,
2843 execution_id: ExecutionId,
2844 version: Version,
2845 req: AppendRequest,
2846 ) -> Result<AppendResponse, DbErrorWrite> {
2847 debug!(%req, "append");
2848 trace!(?req, "append");
2849 let created_at = req.created_at;
2850
2851 let mut client_guard = self.client.lock().await;
2852 let tx = client_guard.transaction().await?;
2853
2854 let (new_version, notifier) = append(&tx, &execution_id, req, version).await?;
2855
2856 tx.commit().await?;
2857
2858 drop(client_guard);
2860
2861 self.notify_all(vec![notifier], created_at);
2862 Ok(new_version)
2863 }
2864
2865 #[instrument(level = Level::DEBUG, skip_all)]
2866 async fn append_batch_respond_to_parent(
2867 &self,
2868 events: AppendEventsToExecution,
2869 response: AppendResponseToExecution,
2870 current_time: DateTime<Utc>,
2871 ) -> Result<AppendBatchResponse, DbErrorWrite> {
2872 debug!("append_batch_respond_to_parent");
2873 if events.execution_id == response.parent_execution_id {
2874 return Err(DbErrorWrite::NonRetriable(
2875 DbErrorWriteNonRetriable::ValidationFailed(
2876 "Parameters `execution_id` and `parent_execution_id` cannot be the same".into(),
2877 ),
2878 ));
2879 }
2880 if events.batch.is_empty() {
2881 return Err(DbErrorWrite::NonRetriable(
2882 DbErrorWriteNonRetriable::ValidationFailed("batch cannot be empty".into()),
2883 ));
2884 }
2885
2886 let mut client_guard = self.client.lock().await;
2887 let tx = client_guard.transaction().await?;
2888
2889 let mut version = events.version;
2890 let mut notifiers = Vec::new();
2891
2892 for append_request in events.batch {
2893 let (v, n) = append(&tx, &events.execution_id, append_request, version).await?;
2894 version = v;
2895 notifiers.push(n);
2896 }
2897
2898 let pending_at_parent = append_response(
2899 &tx,
2900 &response.parent_execution_id,
2901 JoinSetResponseEventOuter {
2902 created_at: response.created_at,
2903 event: JoinSetResponseEvent {
2904 join_set_id: response.join_set_id,
2905 event: JoinSetResponse::ChildExecutionFinished {
2906 child_execution_id: response.child_execution_id,
2907 finished_version: response.finished_version,
2908 result: response.result,
2909 },
2910 },
2911 },
2912 )
2913 .await?;
2914 notifiers.push(pending_at_parent);
2915
2916 tx.commit().await?;
2917 drop(client_guard);
2918
2919 self.notify_all(notifiers, current_time);
2920 Ok(version)
2921 }
2922
2923 #[instrument(level = Level::TRACE, skip(self, timeout_fut))]
2924 async fn wait_for_pending_by_ffqn(
2925 &self,
2926 pending_at_or_sooner: DateTime<Utc>,
2927 ffqns: Arc<[FunctionFqn]>,
2928 timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
2929 ) {
2930 let unique_tag: u64 = rand::random();
2931 let (sender, mut receiver) = mpsc::channel(1);
2932 {
2933 let mut pending_subscribers = self.pending_subscribers.lock().unwrap();
2934 for ffqn in ffqns.as_ref() {
2935 pending_subscribers.insert_ffqn(ffqn.clone(), (sender.clone(), unique_tag));
2936 }
2937 }
2938
2939 async {
2940 let mut db_has_pending = false;
2941 {
2942 let mut client_guard = self.client.lock().await;
2944 if let Ok(tx) = client_guard.transaction().await {
2946 if let Ok(res) = get_pending_by_ffqns(
2947 &tx,
2948 1,
2949 pending_at_or_sooner,
2950 &ffqns,
2951 SelectStrategy::Read,
2952 )
2953 .await
2954 && !res.is_empty()
2955 {
2956 db_has_pending = true;
2957 }
2958 let _ = tx.commit().await;
2960 }
2961 }
2962
2963 if db_has_pending {
2964 trace!("Not waiting, database already contains new pending executions");
2965 return;
2966 }
2967
2968 tokio::select! {
2969 _ = receiver.recv() => {
2970 trace!("Received a notification");
2971 }
2972 () = timeout_fut => {
2973 }
2974 }
2975 }
2976 .await;
2977
2978 {
2980 let mut pending_subscribers = self.pending_subscribers.lock().unwrap();
2981 for ffqn in ffqns.as_ref() {
2982 match pending_subscribers.remove_ffqn(ffqn) {
2983 Some((_, tag)) if tag == unique_tag => {}
2984 Some(other) => {
2985 pending_subscribers.insert_ffqn(ffqn.clone(), other);
2986 }
2987 None => {}
2988 }
2989 }
2990 }
2991 }
2992
2993 #[instrument(level = Level::DEBUG, skip(self, timeout_fut))]
2994 async fn wait_for_pending_by_component_id(
2995 &self,
2996 pending_at_or_sooner: DateTime<Utc>,
2997 component_id: &ComponentId,
2998 timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
2999 ) {
3000 let unique_tag: u64 = rand::random();
3001 let (sender, mut receiver) = mpsc::channel(1);
3002 {
3003 let mut pending_subscribers = self.pending_subscribers.lock().unwrap();
3004 pending_subscribers.insert_by_component(
3005 component_id.input_digest.clone(),
3006 (sender.clone(), unique_tag),
3007 );
3008 }
3009
3010 async {
3011 let mut db_has_pending = false;
3012 {
3013 let mut client_guard = self.client.lock().await;
3014 if let Ok(tx) = client_guard.transaction().await {
3015 if let Ok(res) = get_pending_by_component_input_digest(
3016 &tx,
3017 1,
3018 pending_at_or_sooner,
3019 &component_id.input_digest,
3020 SelectStrategy::Read,
3021 )
3022 .await
3023 && !res.is_empty()
3024 {
3025 db_has_pending = true;
3026 }
3027 let _ = tx.commit().await;
3028 }
3029 }
3030
3031 if db_has_pending {
3032 trace!("Not waiting, database already contains new pending executions");
3033 return;
3034 }
3035
3036 tokio::select! {
3037 _ = receiver.recv() => {
3038 trace!("Received a notification");
3039 }
3040 () = timeout_fut => {
3041 }
3042 }
3043 }
3044 .await;
3045
3046 {
3048 let mut pending_subscribers = self.pending_subscribers.lock().unwrap();
3049 match pending_subscribers.remove_by_component(&component_id.input_digest) {
3050 Some((_, tag)) if tag == unique_tag => {}
3051 Some(other) => {
3052 pending_subscribers
3053 .insert_by_component(component_id.input_digest.clone(), other);
3054 }
3055 None => {}
3056 }
3057 }
3058 }
3059
3060 async fn get_last_execution_event(
3061 &self,
3062 execution_id: &ExecutionId,
3063 ) -> Result<ExecutionEvent, DbErrorRead> {
3064 let mut client_guard = self.client.lock().await;
3065 let tx = client_guard.transaction().await?;
3066
3067 let event = get_last_execution_event(&tx, execution_id).await?;
3068
3069 tx.commit().await?;
3070 Ok(event)
3071 }
3072}
3073#[async_trait]
3074impl DbConnection for PostgresConnection {
3075 #[instrument(level = Level::DEBUG, skip_all, fields(execution_id = %req.execution_id))]
3076 async fn create(&self, req: CreateRequest) -> Result<AppendResponse, DbErrorWrite> {
3077 debug!("create");
3078 trace!(?req, "create");
3079 let created_at = req.created_at;
3080
3081 let mut client_guard = self.client.lock().await;
3082 let tx = client_guard.transaction().await?;
3083
3084 let (version, notifier) = create_inner(&tx, req.clone()).await?;
3085
3086 tx.commit().await?;
3087 drop(client_guard); self.notify_all(vec![notifier], created_at);
3090 Ok(version)
3091 }
3092
3093 #[instrument(level = Level::DEBUG, skip(self, batch))]
3094 async fn append_batch(
3095 &self,
3096 current_time: DateTime<Utc>,
3097 batch: Vec<AppendRequest>,
3098 execution_id: ExecutionId,
3099 version: Version,
3100 ) -> Result<AppendBatchResponse, DbErrorWrite> {
3101 debug!("append_batch");
3102 trace!(?batch, "append_batch");
3103 assert!(!batch.is_empty(), "Empty batch request");
3104
3105 let mut client_guard = self.client.lock().await;
3106 let tx = client_guard.transaction().await?;
3107
3108 let mut version = version;
3109 let mut notifier = None;
3110
3111 for append_request in batch {
3112 let (v, n) = append(&tx, &execution_id, append_request, version).await?;
3113 version = v;
3114 notifier = Some(n);
3115 }
3116
3117 tx.commit().await?;
3118 drop(client_guard);
3119
3120 self.notify_all(
3121 vec![notifier.expect("checked that the batch is not empty")],
3122 current_time,
3123 );
3124 Ok(version)
3125 }
3126
3127 #[instrument(level = Level::DEBUG, skip(self, batch, child_req))]
3128 async fn append_batch_create_new_execution(
3129 &self,
3130 current_time: DateTime<Utc>,
3131 batch: Vec<AppendRequest>,
3132 execution_id: ExecutionId,
3133 version: Version,
3134 child_req: Vec<CreateRequest>,
3135 ) -> Result<AppendBatchResponse, DbErrorWrite> {
3136 debug!("append_batch_create_new_execution");
3137 trace!(?batch, ?child_req, "append_batch_create_new_execution");
3138 assert!(!batch.is_empty(), "Empty batch request");
3139
3140 let mut client_guard = self.client.lock().await;
3141 let tx = client_guard.transaction().await?;
3142
3143 let mut version = version;
3144 let mut notifier = None;
3145
3146 for append_request in batch {
3147 let (v, n) = append(&tx, &execution_id, append_request, version).await?;
3148 version = v;
3149 notifier = Some(n);
3150 }
3151
3152 let mut notifiers = Vec::new();
3153 notifiers.push(notifier.expect("checked that the batch is not empty"));
3154
3155 for req in child_req {
3156 let (_, n) = create_inner(&tx, req).await?;
3157 notifiers.push(n);
3158 }
3159
3160 tx.commit().await?;
3161 drop(client_guard);
3162
3163 self.notify_all(notifiers, current_time);
3164 Ok(version)
3165 }
3166
3167 #[instrument(level = Level::DEBUG, skip(self, timeout_fut))]
3168 async fn subscribe_to_next_responses(
3169 &self,
3170 execution_id: &ExecutionId,
3171 start_idx: u32,
3172 timeout_fut: Pin<Box<dyn Future<Output = TimeoutOutcome> + Send>>,
3173 ) -> Result<Vec<JoinSetResponseEventOuter>, DbErrorReadWithTimeout> {
3174 debug!("next_responses");
3175 let unique_tag: u64 = rand::random();
3176 let execution_id_clone = execution_id.clone();
3177
3178 let cleanup = || {
3179 let mut guard = self.response_subscribers.lock().unwrap();
3180 match guard.remove(&execution_id_clone) {
3181 Some((_, tag)) if tag == unique_tag => {}
3182 Some(other) => {
3183 guard.insert(execution_id_clone.clone(), other);
3184 }
3185 None => {}
3186 }
3187 };
3188
3189 let receiver = {
3190 let mut client_guard = self.client.lock().await;
3191 let tx = client_guard.transaction().await?;
3192
3193 let (sender, receiver) = oneshot::channel();
3199 self.response_subscribers
3200 .lock()
3201 .unwrap()
3202 .insert(execution_id.clone(), (sender, unique_tag));
3203
3204 let responses = get_responses_with_offset(&tx, execution_id, start_idx).await?;
3205
3206 if responses.is_empty() {
3207 tx.commit().await.map_err(|err| {
3209 cleanup(); DbErrorRead::from(err)
3211 })?;
3212 receiver
3213 } else {
3214 cleanup(); tx.commit().await?;
3216 return Ok(responses);
3217 }
3218 };
3219
3220 let res = tokio::select! {
3221 resp = receiver => {
3222 match resp {
3223 Ok(resp) => Ok(vec![resp]),
3224 Err(_) => Err(DbErrorReadWithTimeout::from(DbErrorGeneric::Close)),
3225 }
3226 }
3227 outcome = timeout_fut => Err(DbErrorReadWithTimeout::Timeout(outcome)),
3228 };
3229
3230 cleanup();
3231 res
3232 }
3233
3234 #[instrument(level = Level::DEBUG, skip(self, timeout_fut))]
3235 async fn wait_for_finished_result(
3236 &self,
3237 execution_id: &ExecutionId,
3238 timeout_fut: Option<Pin<Box<dyn Future<Output = TimeoutOutcome> + Send>>>,
3239 ) -> Result<SupportedFunctionReturnValue, DbErrorReadWithTimeout> {
3240 let unique_tag: u64 = rand::random();
3241 let execution_id_clone = execution_id.clone();
3242
3243 let cleanup = || {
3244 let mut guard = self.execution_finished_subscribers.lock().unwrap();
3245 if let Some(subscribers) = guard.get_mut(&execution_id_clone) {
3246 subscribers.remove(&unique_tag);
3247 }
3248 };
3249
3250 let receiver = {
3251 let mut client_guard = self.client.lock().await;
3252 let tx = client_guard.transaction().await?;
3253
3254 let (sender, receiver) = oneshot::channel();
3256 {
3257 let mut guard = self.execution_finished_subscribers.lock().unwrap();
3258 guard
3259 .entry(execution_id.clone())
3260 .or_default()
3261 .insert(unique_tag, sender);
3262 }
3263
3264 let pending_state = get_combined_state(&tx, execution_id)
3265 .await?
3266 .execution_with_state
3267 .pending_state;
3268
3269 if let PendingState::Finished { finished, .. } = pending_state {
3270 let event = get_execution_event(&tx, execution_id, finished.version).await?;
3271 tx.commit().await?;
3272 cleanup();
3273
3274 if let ExecutionRequest::Finished { result, .. } = event.event {
3275 return Ok(result);
3276 }
3277 error!("Mismatch, expected Finished row: {event:?} based on t_state {finished}");
3278 return Err(DbErrorReadWithTimeout::from(consistency_db_err(
3279 "cannot get finished event based on t_state version",
3280 )));
3281 }
3282 tx.commit().await?;
3283 receiver
3284 };
3285
3286 let timeout_fut = timeout_fut.unwrap_or_else(|| Box::pin(std::future::pending()));
3287 let res = tokio::select! {
3288 resp = receiver => {
3289 match resp {
3290 Ok(retval) => Ok(retval),
3291 Err(_recv_err) => Err(DbErrorGeneric::Close.into())
3292 }
3293 }
3294 outcome = timeout_fut => Err(DbErrorReadWithTimeout::Timeout(outcome)),
3295 };
3296
3297 cleanup();
3298 res
3299 }
3300
3301 #[instrument(level = Level::DEBUG, skip_all, fields(%join_set_id, %execution_id))]
3302 async fn append_delay_response(
3303 &self,
3304 created_at: DateTime<Utc>,
3305 execution_id: ExecutionId,
3306 join_set_id: JoinSetId,
3307 delay_id: DelayId,
3308 result: Result<(), ()>,
3309 ) -> Result<AppendDelayResponseOutcome, DbErrorWrite> {
3310 debug!("append_delay_response");
3311 let event = JoinSetResponseEventOuter {
3312 created_at,
3313 event: JoinSetResponseEvent {
3314 join_set_id,
3315 event: JoinSetResponse::DelayFinished {
3316 delay_id: delay_id.clone(),
3317 result,
3318 },
3319 },
3320 };
3321
3322 let mut client_guard = self.client.lock().await;
3323 let tx = client_guard.transaction().await?;
3324
3325 let res = append_response(&tx, &execution_id, event).await;
3326
3327 match res {
3328 Ok(notifier) => {
3329 tx.commit().await?;
3330 drop(client_guard);
3331 self.notify_all(vec![notifier], created_at);
3332 Ok(AppendDelayResponseOutcome::Success)
3333 }
3334 Err(DbErrorWrite::NonRetriable(DbErrorWriteNonRetriable::Conflict)) => {
3335 tx.rollback().await?;
3338
3339 let tx = client_guard.transaction().await?;
3341 let delay_success = delay_response(&tx, &execution_id, &delay_id).await?;
3342 tx.commit().await?;
3343
3344 match delay_success {
3345 Some(true) => Ok(AppendDelayResponseOutcome::AlreadyFinished),
3346 Some(false) => Ok(AppendDelayResponseOutcome::AlreadyCancelled),
3347 None => Err(DbErrorWrite::Generic(consistency_db_err(
3348 "insert failed yet select did not find the response",
3349 ))),
3350 }
3351 }
3352 Err(err) => {
3353 let _ = tx.rollback().await; Err(err)
3355 }
3356 }
3357 }
3358
3359 #[instrument(level = Level::DEBUG, skip_all)]
3360 async fn append_backtrace(&self, append: BacktraceInfo) -> Result<(), DbErrorWrite> {
3361 debug!("append_backtrace");
3362 let mut client_guard = self.client.lock().await;
3363 let tx = client_guard.transaction().await?;
3364
3365 append_backtrace(&tx, &append).await?;
3366
3367 tx.commit().await?;
3368 Ok(())
3369 }
3370
3371 #[instrument(level = Level::DEBUG, skip_all)]
3372 async fn append_backtrace_batch(&self, batch: Vec<BacktraceInfo>) -> Result<(), DbErrorWrite> {
3373 debug!("append_backtrace_batch");
3374 let mut client_guard = self.client.lock().await;
3375 let tx = client_guard.transaction().await?;
3376
3377 for append in batch {
3378 append_backtrace(&tx, &append).await?;
3379 }
3380
3381 tx.commit().await?;
3382 Ok(())
3383 }
3384
3385 #[instrument(level = Level::TRACE, skip(self))]
3387 async fn get_expired_timers(
3388 &self,
3389 at: DateTime<Utc>,
3390 ) -> Result<Vec<ExpiredTimer>, DbErrorGeneric> {
3391 let mut client_guard = self.client.lock().await;
3392 let tx = client_guard.transaction().await?;
3393
3394 let rows = tx
3396 .query(
3397 "SELECT execution_id, join_set_id, delay_id FROM t_delay WHERE expires_at <= $1",
3398 &[&at],
3399 )
3400 .await?;
3401
3402 let mut expired_timers = Vec::with_capacity(rows.len());
3403 for row in rows {
3404 let unpack = || -> Result<ExpiredTimer, DbErrorGeneric> {
3405 let execution_id: String = get(&row, "execution_id")?;
3406 let execution_id = ExecutionId::from_str(&execution_id)?;
3407 let join_set_id: String = get(&row, "join_set_id")?;
3408 let join_set_id = JoinSetId::from_str(&join_set_id)?;
3409 let delay_id: String = get(&row, "delay_id")?;
3410 let delay_id = DelayId::from_str(&delay_id)?;
3411
3412 Ok(ExpiredTimer::Delay(ExpiredDelay {
3413 execution_id,
3414 join_set_id,
3415 delay_id,
3416 }))
3417 };
3418
3419 match unpack() {
3420 Ok(timer) => expired_timers.push(timer),
3421 Err(err) => warn!("Skipping corrupted row in get_expired_timers (delays): {err:?}"),
3422 }
3423 }
3424
3425 let rows = tx.query(
3427 &format!(
3428 "SELECT execution_id, last_lock_version, corresponding_version, intermittent_event_count, max_retries, retry_exp_backoff_millis, executor_id, run_id \
3429 FROM t_state \
3430 WHERE pending_expires_finished <= $1 AND state = '{STATE_LOCKED}'"
3431 ),
3432 &[&at]
3433 ).await?;
3434
3435 for row in rows {
3436 let unpack = || -> Result<ExpiredTimer, DbErrorGeneric> {
3437 let execution_id: String = get(&row, "execution_id")?;
3438 let execution_id = ExecutionId::from_str(&execution_id)?;
3439 let last_lock_version: i64 = get(&row, "last_lock_version")?;
3440 let last_lock_version = Version::try_from(last_lock_version)?;
3441
3442 let corresponding_version: i64 = get(&row, "corresponding_version")?;
3443 let corresponding_version = Version::try_from(corresponding_version)?;
3444
3445 let intermittent_event_count =
3446 u32::try_from(get::<i64>(&row, "intermittent_event_count")?).map_err(|_| {
3447 consistency_db_err("`intermittent_event_count` must not be negative")
3448 })?;
3449
3450 let max_retries = get::<Option<i64>>(&row, "max_retries")?
3451 .map(u32::try_from)
3452 .transpose()
3453 .map_err(|_| consistency_db_err("`max_retries` must not be negative"))?;
3454 let retry_exp_backoff_millis =
3455 u32::try_from(get::<i64>(&row, "retry_exp_backoff_millis")?).map_err(|_| {
3456 consistency_db_err("`retry_exp_backoff_millis` must not be negative")
3457 })?;
3458 let executor_id: String = get(&row, "executor_id")?;
3459 let executor_id = ExecutorId::from_str(&executor_id)?;
3460 let run_id: String = get(&row, "run_id")?;
3461 let run_id = RunId::from_str(&run_id)?;
3462
3463 Ok(ExpiredTimer::Lock(ExpiredLock {
3464 execution_id,
3465 locked_at_version: last_lock_version,
3466 next_version: corresponding_version.increment(),
3467 intermittent_event_count,
3468 max_retries,
3469 retry_exp_backoff: Duration::from_millis(u64::from(retry_exp_backoff_millis)),
3470 locked_by: LockedBy {
3471 executor_id,
3472 run_id,
3473 },
3474 }))
3475 };
3476
3477 match unpack() {
3478 Ok(timer) => expired_timers.push(timer),
3479 Err(err) => warn!("Skipping corrupted row in get_expired_timers (locks): {err:?}"),
3480 }
3481 }
3482
3483 tx.commit().await?;
3484
3485 if !expired_timers.is_empty() {
3486 debug!("get_expired_timers found {expired_timers:?}");
3487 }
3488 Ok(expired_timers)
3489 }
3490
3491 async fn get_execution_event(
3492 &self,
3493 execution_id: &ExecutionId,
3494 version: &Version,
3495 ) -> Result<ExecutionEvent, DbErrorRead> {
3496 let mut client_guard = self.client.lock().await;
3497 let tx = client_guard.transaction().await?;
3498
3499 let event = get_execution_event(&tx, execution_id, version.0).await?;
3500
3501 tx.commit().await?;
3502 Ok(event)
3503 }
3504
3505 async fn get_pending_state(
3506 &self,
3507 execution_id: &ExecutionId,
3508 ) -> Result<ExecutionWithState, DbErrorRead> {
3509 let mut client_guard = self.client.lock().await;
3510 let tx = client_guard.transaction().await?;
3511
3512 let combined_state = get_combined_state(&tx, execution_id).await?;
3513
3514 tx.commit().await?;
3515 Ok(combined_state.execution_with_state)
3516 }
3517}
3518
3519#[async_trait]
3520impl DbExternalApi for PostgresConnection {
3521 #[instrument(skip(self))]
3522 async fn get_backtrace(
3523 &self,
3524 execution_id: &ExecutionId,
3525 filter: BacktraceFilter,
3526 ) -> Result<BacktraceInfo, DbErrorRead> {
3527 debug!("get_backtrace");
3528
3529 let mut client_guard = self.client.lock().await;
3530 let tx = client_guard.transaction().await?;
3531
3532 let mut params: Vec<Box<dyn tokio_postgres::types::ToSql + Sync + Send>> = Vec::new();
3533
3534 params.push(Box::new(execution_id.to_string())); let p_execution_id_idx = format!("${}", params.len()); let mut sql = String::new();
3538 write!(
3539 &mut sql,
3540 "SELECT component_id, version_min_including, version_max_excluding, wasm_backtrace \
3541 FROM t_backtrace WHERE execution_id = {p_execution_id_idx}"
3542 )
3543 .unwrap();
3544
3545 match &filter {
3546 BacktraceFilter::Specific(version) => {
3547 params.push(Box::new(i64::from(version.0))); let p_ver_idx = format!("${}", params.len()); write!(
3550 &mut sql,
3551 " AND version_min_including <= {p_ver_idx} AND version_max_excluding > {p_ver_idx}"
3552 )
3553 .unwrap();
3554 }
3555 BacktraceFilter::First => {
3556 sql.push_str(" ORDER BY version_min_including LIMIT 1");
3557 }
3558 BacktraceFilter::Last => {
3559 sql.push_str(" ORDER BY version_min_including DESC LIMIT 1");
3560 }
3561 }
3562
3563 let params_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> =
3564 params.iter().map(|p| p.as_ref() as _).collect();
3565
3566 let row = tx.query_one(&sql, ¶ms_refs).await?;
3567
3568 let component_id: Json<ComponentId> = get(&row, "component_id")?;
3569 let component_id = component_id.0;
3570
3571 let version_min_including = Version::try_from(get::<i64>(&row, "version_min_including")?)?;
3572
3573 let version_max_excluding = Version::try_from(get::<i64>(&row, "version_max_excluding")?)?;
3574
3575 let wasm_backtrace: Json<WasmBacktrace> = get(&row, "wasm_backtrace")?;
3577 let wasm_backtrace = wasm_backtrace.0;
3578
3579 tx.commit().await?;
3580
3581 Ok(BacktraceInfo {
3582 execution_id: execution_id.clone(),
3583 component_id,
3584 version_min_including,
3585 version_max_excluding,
3586 wasm_backtrace,
3587 })
3588 }
3589
3590 #[instrument(skip(self))]
3591 async fn list_executions(
3592 &self,
3593 filter: ListExecutionsFilter,
3594 pagination: ExecutionListPagination,
3595 ) -> Result<Vec<ExecutionWithState>, DbErrorGeneric> {
3596 let mut client_guard = self.client.lock().await;
3597 let tx = client_guard.transaction().await?;
3598
3599 let result = list_executions(&tx, filter, &pagination).await?;
3600
3601 tx.commit().await?;
3602 Ok(result)
3603 }
3604
3605 #[instrument(skip(self))]
3606 async fn list_execution_events(
3607 &self,
3608 execution_id: &ExecutionId,
3609 since: &Version,
3610 max_length: VersionType,
3611 include_backtrace_id: bool,
3612 ) -> Result<Vec<ExecutionEvent>, DbErrorRead> {
3613 let mut client_guard = self.client.lock().await;
3614 let tx = client_guard.transaction().await?;
3615
3616 let execution_events = list_execution_events(
3617 &tx,
3618 execution_id,
3619 since.0,
3620 since.0 + max_length,
3621 include_backtrace_id,
3622 )
3623 .await?;
3624
3625 tx.commit().await?;
3626 Ok(execution_events)
3627 }
3628
3629 #[instrument(skip(self))]
3630 async fn list_responses(
3631 &self,
3632 execution_id: &ExecutionId,
3633 pagination: Pagination<u32>,
3634 ) -> Result<Vec<ResponseWithCursor>, DbErrorRead> {
3635 let mut client_guard = self.client.lock().await;
3636 let tx = client_guard.transaction().await?;
3637
3638 let responses = list_responses(&tx, execution_id, Some(pagination)).await?;
3639
3640 tx.commit().await?;
3641 Ok(responses)
3642 }
3643
3644 #[instrument(skip(self))]
3645 async fn list_execution_events_responses(
3646 &self,
3647 execution_id: &ExecutionId,
3648 req_since: &Version,
3649 req_max_length: VersionType,
3650 req_include_backtrace_id: bool,
3651 resp_pagination: Pagination<u32>,
3652 ) -> Result<ExecutionWithStateRequestsResponses, DbErrorRead> {
3653 let mut client_guard = self.client.lock().await;
3654 let tx = client_guard.transaction().await?;
3655
3656 let combined_state = get_combined_state(&tx, execution_id).await?;
3657
3658 let events = list_execution_events(
3659 &tx,
3660 execution_id,
3661 req_since.0,
3662 req_since.0 + req_max_length,
3663 req_include_backtrace_id,
3664 )
3665 .await?;
3666
3667 let responses = list_responses(&tx, execution_id, Some(resp_pagination)).await?;
3668
3669 tx.commit().await?;
3670
3671 Ok(ExecutionWithStateRequestsResponses {
3672 execution_with_state: combined_state.execution_with_state,
3673 events,
3674 responses,
3675 })
3676 }
3677
3678 #[instrument(skip(self))]
3679 async fn upgrade_execution_component(
3680 &self,
3681 execution_id: &ExecutionId,
3682 old: &InputContentDigest,
3683 new: &InputContentDigest,
3684 ) -> Result<(), DbErrorWrite> {
3685 let mut client_guard = self.client.lock().await;
3686 let tx = client_guard.transaction().await?;
3687
3688 upgrade_execution_component(&tx, execution_id, old, new).await?;
3689
3690 tx.commit().await?;
3691 Ok(())
3692 }
3693}
3694
3695#[async_trait]
3696impl DbPoolCloseable for PostgresPool {
3697 async fn close(&self) {
3698 self.pool.close();
3699 }
3700}
3701
3702#[cfg(feature = "test")]
3703#[async_trait]
3704impl concepts::storage::DbConnectionTest for PostgresConnection {
3705 #[instrument(level = Level::DEBUG, skip(self, response_event), fields(join_set_id = %response_event.join_set_id))]
3706 async fn append_response(
3707 &self,
3708 created_at: DateTime<Utc>,
3709 execution_id: ExecutionId,
3710 response_event: JoinSetResponseEvent,
3711 ) -> Result<(), DbErrorWrite> {
3712 debug!("append_response");
3713 let event = JoinSetResponseEventOuter {
3714 created_at,
3715 event: response_event,
3716 };
3717
3718 let mut client_guard = self.client.lock().await;
3719 let tx = client_guard.transaction().await?;
3720
3721 let notifier = append_response(&tx, &execution_id, event).await?;
3722
3723 tx.commit().await?;
3724 drop(client_guard);
3725
3726 self.notify_all(vec![notifier], created_at);
3727 Ok(())
3728 }
3729
3730 #[instrument(level = Level::DEBUG, skip(self))]
3731 async fn get(
3732 &self,
3733 execution_id: &ExecutionId,
3734 ) -> Result<concepts::storage::ExecutionLog, DbErrorRead> {
3735 trace!("get");
3736 let mut client_guard = self.client.lock().await;
3737 let tx = client_guard.transaction().await?;
3738
3739 let res = get_execution_log(&tx, execution_id).await?;
3740
3741 tx.commit().await?;
3742 Ok(res)
3743 }
3744}
3745
3746#[cfg(feature = "test")]
3747impl PostgresPool {
3748 pub async fn drop_database(&self) {
3749 let mut cfg = Config::new();
3750 cfg.host = Some(self.config.host.clone());
3751 cfg.user = Some(self.config.user.clone());
3752 cfg.password = Some(self.config.password.clone());
3753 cfg.dbname = Some(ADMIN_DB_NAME.into());
3754 cfg.manager = Some(ManagerConfig {
3755 recycling_method: RecyclingMethod::Fast,
3756 });
3757
3758 let pool = cfg
3759 .create_pool(None, NoTls)
3760 .map_err(|err| {
3761 error!("Cannot create the default pool - {err:?}");
3762 InitializationError
3763 })
3764 .unwrap();
3765
3766 let client = pool
3767 .get()
3768 .await
3769 .map_err(|err| {
3770 error!("Cannot get a connection from the default pool - {err:?}");
3771 InitializationError
3772 })
3773 .unwrap();
3774 for _ in 0..3 {
3775 let res = client
3776 .execute(&format!("DROP DATABASE {}", self.config.db_name), &[])
3777 .await; if res.is_ok() {
3779 debug!("Database '{}' dropped.", self.config.db_name);
3780 return;
3781 }
3782 debug!("Dropping db failed - {res:?}",);
3783 }
3784 warn!("Did not drop database {}", self.config.db_name);
3785 }
3786}