1use crate::postgres_dao::ddl::{ADMIN_DB_NAME, T_METADATA_EXPECTED_SCHEMA_VERSION};
2use async_trait::async_trait;
3use chrono::{DateTime, Utc};
4use concepts::{
5 ComponentId, ComponentRetryConfig, ContentDigest, ExecutionId, FunctionFqn, JoinSetId,
6 StrVariant, SupportedFunctionReturnValue,
7 component_id::{Digest, InputContentDigest},
8 prefixed_ulid::{DelayId, ExecutionIdDerived, ExecutorId, RunId},
9 storage::{
10 AppendBatchResponse, AppendDelayResponseOutcome, AppendEventsToExecution, AppendRequest,
11 AppendResponse, AppendResponseToExecution, BacktraceFilter, BacktraceInfo, CreateRequest,
12 DUMMY_CREATED, DUMMY_HISTORY_EVENT, DbConnection, DbErrorGeneric, DbErrorRead,
13 DbErrorReadWithTimeout, DbErrorWrite, DbErrorWriteNonRetriable, DbExecutor, DbExternalApi,
14 DbPool, DbPoolCloseable, ExecutionEvent, ExecutionListPagination, ExecutionRequest,
15 ExecutionWithState, ExpiredDelay, ExpiredLock, ExpiredTimer, HISTORY_EVENT_TYPE_JOIN_NEXT,
16 HistoryEvent, JoinSetRequest, JoinSetResponse, JoinSetResponseEvent,
17 JoinSetResponseEventOuter, LockPendingResponse, Locked, LockedBy, LockedExecution,
18 Pagination, PendingState, PendingStateFinished, PendingStateFinishedResultKind,
19 PendingStateLocked, ResponseWithCursor, STATE_BLOCKED_BY_JOIN_SET, STATE_FINISHED,
20 STATE_LOCKED, STATE_PENDING_AT, TimeoutOutcome, Version, VersionType, WasmBacktrace,
21 },
22};
23use deadpool_postgres::{Client, Config, ManagerConfig, Pool, RecyclingMethod};
24use hashbrown::HashMap;
25use std::fmt::Write as _;
26use std::{collections::VecDeque, pin::Pin, str::FromStr as _, sync::Arc, time::Duration};
27use tokio::sync::{mpsc, oneshot};
28use tokio_postgres::{
29 NoTls, Transaction,
30 types::{Json, ToSql},
31};
32use tracing::{Level, debug, error, info, instrument, trace, warn};
33
34macro_rules! get {
35 ($row:expr, $col:expr) => {
36 $row.try_get($col).map_err(|e| {
37 consistency_db_err(format!("Failed to retrieve column '{}': {}", $col, e))
38 })?
39 };
40 ($row:expr, $col:expr, $ty:ty) => {
42 $row.try_get::<_, $ty>($col).map_err(|e| {
43 consistency_db_err(format!("Failed to retrieve column '{}': {}", $col, e))
44 })?
45 };
46}
47
48mod ddl {
49 use concepts::storage::HISTORY_EVENT_TYPE_JOIN_NEXT;
50
51 pub const ADMIN_DB_NAME: &str = "postgres";
52
53 pub const T_METADATA_EXPECTED_SCHEMA_VERSION: i32 = 1;
54
55 pub const CREATE_TABLE_T_METADATA: &str = r"
57CREATE TABLE IF NOT EXISTS t_metadata (
58 id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
59 schema_version INTEGER NOT NULL,
60 created_at TIMESTAMPTZ NOT NULL
61);
62";
63
64 pub const CREATE_TABLE_T_EXECUTION_LOG: &str = r"
66CREATE TABLE IF NOT EXISTS t_execution_log (
67 execution_id TEXT NOT NULL,
68 created_at TIMESTAMPTZ NOT NULL,
69 json_value JSON NOT NULL,
70 version BIGINT NOT NULL CHECK (version >= 0),
71 variant TEXT NOT NULL,
72 join_set_id TEXT,
73 history_event_type TEXT GENERATED ALWAYS AS (json_value #>> '{HistoryEvent,event,type}') STORED,
74 PRIMARY KEY (execution_id, version)
75);
76";
77
78 pub const CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_VERSION: &str = r"
80CREATE INDEX IF NOT EXISTS idx_t_execution_log_execution_id_version ON t_execution_log (execution_id, version);
81";
82
83 pub const CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_VARIANT: &str = r"
84CREATE INDEX IF NOT EXISTS idx_t_execution_log_execution_id_variant ON t_execution_log (execution_id, variant);
85";
86
87 pub const CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_JOIN_SET: &str = const_format::formatcp!(
88 "CREATE INDEX IF NOT EXISTS idx_t_execution_log_execution_id_join_set ON t_execution_log (execution_id, join_set_id, history_event_type) WHERE history_event_type='{}';",
89 HISTORY_EVENT_TYPE_JOIN_NEXT
90 );
91
92 pub const CREATE_TABLE_T_JOIN_SET_RESPONSE: &str = r"
94CREATE TABLE IF NOT EXISTS t_join_set_response (
95 id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
96 created_at TIMESTAMPTZ NOT NULL,
97 execution_id TEXT NOT NULL,
98 join_set_id TEXT NOT NULL,
99
100 delay_id TEXT,
101 delay_success BOOLEAN,
102
103 child_execution_id TEXT,
104 finished_version BIGINT CHECK (finished_version >= 0),
105
106 UNIQUE (execution_id, join_set_id, delay_id, child_execution_id)
107);
108";
109
110 pub const CREATE_INDEX_IDX_T_JOIN_SET_RESPONSE_EXECUTION_ID_ID: &str = r"
112CREATE INDEX IF NOT EXISTS idx_t_join_set_response_execution_id_id ON t_join_set_response (execution_id, id);
113";
114
115 pub const CREATE_INDEX_IDX_JOIN_SET_RESPONSE_UNIQUE_CHILD_ID: &str = r"
116CREATE UNIQUE INDEX IF NOT EXISTS idx_join_set_response_unique_child_id
117ON t_join_set_response (child_execution_id) WHERE child_execution_id IS NOT NULL;
118";
119
120 pub const CREATE_INDEX_IDX_JOIN_SET_RESPONSE_UNIQUE_DELAY_ID: &str = r"
121CREATE UNIQUE INDEX IF NOT EXISTS idx_join_set_response_unique_delay_id
122ON t_join_set_response (delay_id) WHERE delay_id IS NOT NULL;
123";
124
125 pub const CREATE_TABLE_T_STATE: &str = r"
127CREATE TABLE IF NOT EXISTS t_state (
128 execution_id TEXT NOT NULL,
129 is_top_level BOOLEAN NOT NULL,
130 corresponding_version BIGINT NOT NULL CHECK (corresponding_version >= 0),
131 ffqn TEXT NOT NULL,
132 created_at TIMESTAMPTZ NOT NULL,
133 component_id_input_digest BYTEA NOT NULL,
134 first_scheduled_at TIMESTAMPTZ NOT NULL,
135
136 pending_expires_finished TIMESTAMPTZ NOT NULL,
137 state TEXT NOT NULL,
138 updated_at TIMESTAMPTZ NOT NULL,
139 intermittent_event_count BIGINT NOT NULL CHECK (intermittent_event_count >=0),
140
141 max_retries BIGINT CHECK (max_retries >= 0),
142 retry_exp_backoff_millis BIGINT CHECK (retry_exp_backoff_millis >= 0),
143 last_lock_version BIGINT CHECK (last_lock_version >= 0),
144 executor_id TEXT,
145 run_id TEXT,
146
147 join_set_id TEXT,
148 join_set_closing BOOLEAN,
149
150 result_kind JSONB,
151
152 PRIMARY KEY (execution_id)
153);
154";
155
156 pub const IDX_T_STATE_LOCK_PENDING: &str = r"
158CREATE INDEX IF NOT EXISTS idx_t_state_lock_pending ON t_state (state, pending_expires_finished, ffqn);
159";
160
161 pub const IDX_T_STATE_EXPIRED_TIMERS: &str = r"
162CREATE INDEX IF NOT EXISTS idx_t_state_expired_timers ON t_state (pending_expires_finished) WHERE executor_id IS NOT NULL;
163";
164
165 pub const IDX_T_STATE_EXECUTION_ID_IS_TOP_LEVEL: &str = r"
166CREATE INDEX IF NOT EXISTS idx_t_state_execution_id_is_root ON t_state (execution_id, is_top_level);
167";
168
169 pub const IDX_T_STATE_FFQN: &str = r"
170CREATE INDEX IF NOT EXISTS idx_t_state_ffqn ON t_state (ffqn);
171";
172
173 pub const IDX_T_STATE_CREATED_AT: &str = r"
174CREATE INDEX IF NOT EXISTS idx_t_state_created_at ON t_state (created_at);
175";
176
177 pub const CREATE_TABLE_T_DELAY: &str = r"
179CREATE TABLE IF NOT EXISTS t_delay (
180 execution_id TEXT NOT NULL,
181 join_set_id TEXT NOT NULL,
182 delay_id TEXT NOT NULL,
183 expires_at TIMESTAMPTZ NOT NULL,
184 PRIMARY KEY (execution_id, join_set_id, delay_id)
185);
186";
187
188 pub const CREATE_TABLE_T_BACKTRACE: &str = r"
190CREATE TABLE IF NOT EXISTS t_backtrace (
191 execution_id TEXT NOT NULL,
192 component_id JSONB NOT NULL,
193 version_min_including BIGINT NOT NULL CHECK (version_min_including >= 0),
194 version_max_excluding BIGINT NOT NULL CHECK (version_max_excluding >= 0),
195 wasm_backtrace JSONB NOT NULL,
196 PRIMARY KEY (execution_id, version_min_including, version_max_excluding)
197);
198";
199
200 pub const IDX_T_BACKTRACE_EXECUTION_ID_VERSION: &str = r"
201CREATE INDEX IF NOT EXISTS idx_t_backtrace_execution_id_version ON t_backtrace (execution_id, version_min_including, version_max_excluding);
202";
203}
204
205#[derive(derive_more::Debug, Clone)]
206pub struct PostgresConfig {
207 pub host: String,
208 pub user: String,
209 #[debug(skip)]
210 pub password: String,
211 pub db_name: String,
212}
213
214#[derive(Debug, thiserror::Error)]
215#[error("initialization error")]
216pub struct InitializationError;
217
218async fn create_database(
219 config: &PostgresConfig,
220) -> Result<DbInitialzationOutcome, InitializationError> {
221 let mut cfg = Config::new();
222 cfg.host = Some(config.host.clone());
223 cfg.user = Some(config.user.clone());
224 cfg.password = Some(config.password.clone());
225 cfg.dbname = Some(ADMIN_DB_NAME.into());
226 cfg.manager = Some(ManagerConfig {
227 recycling_method: RecyclingMethod::Fast,
228 });
229
230 let pool = cfg.create_pool(None, NoTls).map_err(|err| {
231 error!("Cannot create the default pool - {err:?}");
232 InitializationError
233 })?;
234
235 let client = pool.get().await.map_err(|err| {
236 error!("Cannot get a connection from the default pool - {err:?}");
237 InitializationError
238 })?;
239
240 let row = client
241 .query_opt(
242 &format!(
243 "SELECT 1 FROM pg_database WHERE datname = '{}'",
244 config.db_name
245 ),
246 &[],
247 )
248 .await
249 .map_err(|err| {
250 error!("Cannot select from the default database - {err:?}");
251 InitializationError
252 })?;
253
254 let outcome = if row.is_none() {
255 client
256 .execute(&format!("CREATE DATABASE {}", config.db_name), &[])
257 .await
258 .map_err(|err| {
259 error!("Cannot create the database - {err:?}");
260 InitializationError
261 })?;
262
263 DbInitialzationOutcome::Created
264 } else {
265 DbInitialzationOutcome::Existing
266 };
267 match outcome {
268 DbInitialzationOutcome::Created => info!("Database '{}' created.", config.db_name),
269 DbInitialzationOutcome::Existing => info!("Database '{}' exists.", config.db_name),
270 }
271 Ok(outcome)
272}
273
274type ResponseSubscribers =
276 Arc<std::sync::Mutex<HashMap<ExecutionId, (oneshot::Sender<JoinSetResponseEventOuter>, u64)>>>;
277type PendingSubscribers = Arc<std::sync::Mutex<PendingFfqnSubscribersHolder>>;
278type ExecutionFinishedSubscribers = std::sync::Mutex<
279 HashMap<ExecutionId, HashMap<u64, oneshot::Sender<SupportedFunctionReturnValue>>>,
280>;
281
282fn map_pool_error(err: deadpool_postgres::PoolError) -> DbErrorGeneric {
283 match err {
284 deadpool_postgres::PoolError::Backend(err) => DbErrorGeneric::from(err),
285 deadpool_postgres::PoolError::Closed => DbErrorGeneric::Close,
286 other => DbErrorGeneric::Uncategorized(other.to_string().into()),
287 }
288}
289
290pub struct PostgresPool {
291 pool: Pool,
292 response_subscribers: ResponseSubscribers,
293 pending_subscribers: PendingSubscribers,
294 execution_finished_subscribers: Arc<ExecutionFinishedSubscribers>,
295 #[allow(dead_code)] config: PostgresConfig,
297}
298
299#[async_trait]
300impl DbPool for PostgresPool {
301 async fn db_exec_conn(&self) -> Result<Box<dyn DbExecutor>, DbErrorGeneric> {
302 let client = self.pool.get().await.map_err(map_pool_error)?;
303
304 Ok(Box::new(PostgresConnection {
305 client: tokio::sync::Mutex::new(client),
306 response_subscribers: self.response_subscribers.clone(),
307 pending_subscribers: self.pending_subscribers.clone(),
308 execution_finished_subscribers: self.execution_finished_subscribers.clone(),
309 }))
310 }
311
312 async fn connection(&self) -> Result<Box<dyn DbConnection>, DbErrorGeneric> {
313 let client = self.pool.get().await.map_err(map_pool_error)?;
314
315 Ok(Box::new(PostgresConnection {
316 client: tokio::sync::Mutex::new(client),
317 response_subscribers: self.response_subscribers.clone(),
318 pending_subscribers: self.pending_subscribers.clone(),
319 execution_finished_subscribers: self.execution_finished_subscribers.clone(),
320 }))
321 }
322
323 async fn external_api_conn(&self) -> Result<Box<dyn DbExternalApi>, DbErrorGeneric> {
324 let client = self.pool.get().await.map_err(map_pool_error)?;
325
326 Ok(Box::new(PostgresConnection {
327 client: tokio::sync::Mutex::new(client),
328 response_subscribers: self.response_subscribers.clone(),
329 pending_subscribers: self.pending_subscribers.clone(),
330 execution_finished_subscribers: self.execution_finished_subscribers.clone(),
331 }))
332 }
333
334 #[cfg(feature = "test")]
335 async fn connection_test(
336 &self,
337 ) -> Result<Box<dyn concepts::storage::DbConnectionTest>, DbErrorGeneric> {
338 let client = self.pool.get().await.map_err(map_pool_error)?;
339
340 Ok(Box::new(PostgresConnection {
341 client: tokio::sync::Mutex::new(client),
342 response_subscribers: self.response_subscribers.clone(),
343 pending_subscribers: self.pending_subscribers.clone(),
344 execution_finished_subscribers: self.execution_finished_subscribers.clone(),
345 }))
346 }
347}
348
349pub struct PostgresConnection {
350 client: tokio::sync::Mutex<Client>, response_subscribers: ResponseSubscribers,
352 pending_subscribers: PendingSubscribers,
353 execution_finished_subscribers: Arc<ExecutionFinishedSubscribers>,
354}
355
356#[derive(Default)]
357struct PendingFfqnSubscribersHolder {
358 by_ffqns: HashMap<FunctionFqn, (mpsc::Sender<()>, u64)>,
359 by_component: HashMap<InputContentDigest , (mpsc::Sender<()>, u64)>,
360}
361impl PendingFfqnSubscribersHolder {
362 fn notify(&self, notifier: &NotifierPendingAt) {
363 if let Some((subscription, _)) = self.by_ffqns.get(¬ifier.ffqn) {
364 debug!("Notifying pending subscriber by ffqn");
365 let _ = subscription.try_send(());
367 }
368 if let Some((subscription, _)) = self.by_component.get(¬ifier.component_input_digest) {
369 debug!("Notifying pending subscriber by component");
370 let _ = subscription.try_send(());
372 }
373 }
374
375 fn insert_ffqn(&mut self, ffqn: FunctionFqn, value: (mpsc::Sender<()>, u64)) {
376 self.by_ffqns.insert(ffqn, value);
377 }
378
379 fn remove_ffqn(&mut self, ffqn: &FunctionFqn) -> Option<(mpsc::Sender<()>, u64)> {
380 self.by_ffqns.remove(ffqn)
381 }
382
383 fn insert_by_component(
384 &mut self,
385 input_content_digest: InputContentDigest,
386 value: (mpsc::Sender<()>, u64),
387 ) {
388 self.by_component.insert(input_content_digest, value);
389 }
390
391 fn remove_by_component(
392 &mut self,
393 input_content_digest: &InputContentDigest,
394 ) -> Option<(mpsc::Sender<()>, u64)> {
395 self.by_component.remove(input_content_digest)
396 }
397}
398
399#[derive(Debug, Clone, Copy, PartialEq, Eq)]
400pub enum ProvisionPolicy {
401 Never,
402 Auto,
404}
405
406#[derive(Debug, Clone, Copy, PartialEq, Eq)]
407pub enum DbInitialzationOutcome {
408 Created,
409 Existing,
410}
411
412impl PostgresPool {
413 #[instrument(skip_all, name = "postgres_new")]
414 pub async fn new(
415 config: PostgresConfig,
416 provision_policy: ProvisionPolicy,
417 ) -> Result<PostgresPool, InitializationError> {
418 Self::new_with_outcome(config, provision_policy)
419 .await
420 .map(|(db, _)| db)
421 }
422
423 pub async fn new_with_outcome(
424 config: PostgresConfig,
425 provision_policy: ProvisionPolicy,
426 ) -> Result<(PostgresPool, DbInitialzationOutcome), InitializationError> {
427 let outcome = if provision_policy == ProvisionPolicy::Auto {
428 create_database(&config).await?
429 } else {
430 DbInitialzationOutcome::Existing
431 };
432 let mut cfg = Config::new();
433 cfg.host = Some(config.host.clone());
434 cfg.user = Some(config.user.clone());
435 cfg.password = Some(config.password.clone());
436 cfg.dbname = Some(config.db_name.clone());
437 cfg.manager = Some(ManagerConfig {
438 recycling_method: RecyclingMethod::Fast,
439 });
440
441 let pool = cfg.create_pool(None, NoTls).map_err(|err| {
442 error!("Cannot create the database pool - {err:?}");
443 InitializationError
444 })?;
445 let client = pool.get().await.map_err(|err| {
446 error!("Cannot get a connection from the database pool - {err:?}");
447 InitializationError
448 })?;
449
450 let statements = vec![
451 ddl::CREATE_TABLE_T_METADATA,
452 ddl::CREATE_TABLE_T_EXECUTION_LOG,
453 ddl::CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_VERSION,
454 ddl::CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_VARIANT,
455 ddl::CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_JOIN_SET,
456 ddl::CREATE_TABLE_T_JOIN_SET_RESPONSE,
457 ddl::CREATE_INDEX_IDX_T_JOIN_SET_RESPONSE_EXECUTION_ID_ID,
458 ddl::CREATE_INDEX_IDX_JOIN_SET_RESPONSE_UNIQUE_CHILD_ID,
459 ddl::CREATE_INDEX_IDX_JOIN_SET_RESPONSE_UNIQUE_DELAY_ID,
460 ddl::CREATE_TABLE_T_STATE,
461 ddl::IDX_T_STATE_LOCK_PENDING,
462 ddl::IDX_T_STATE_EXPIRED_TIMERS,
463 ddl::IDX_T_STATE_EXECUTION_ID_IS_TOP_LEVEL,
464 ddl::IDX_T_STATE_FFQN,
465 ddl::IDX_T_STATE_CREATED_AT,
466 ddl::CREATE_TABLE_T_DELAY,
467 ddl::CREATE_TABLE_T_BACKTRACE,
468 ddl::IDX_T_BACKTRACE_EXECUTION_ID_VERSION,
469 ];
470
471 let batch_sql = statements.join("\n");
473 client.batch_execute(&batch_sql).await.map_err(|err| {
474 error!("Cannot run the DDL import - {err:?}");
475 InitializationError
476 })?;
477
478 let row = client
479 .query_opt(
480 "SELECT schema_version FROM t_metadata ORDER BY id DESC LIMIT 1",
481 &[],
482 )
483 .await
484 .map_err(|err| {
485 error!("Cannot select schema version - {err:?}");
486 InitializationError
487 })?;
488
489 let actual_version = match row {
491 Some(r) => Some(r.try_get::<_, i32>("schema_version").map_err(|e| {
492 error!("Failed to get schema_version column: {e}");
493 InitializationError
494 })?),
495 None => None,
496 };
497
498 match actual_version {
499 None => {
500 client
501 .execute(
502 "INSERT INTO t_metadata (schema_version, created_at) VALUES ($1, $2)",
503 &[&(T_METADATA_EXPECTED_SCHEMA_VERSION), &Utc::now()],
504 )
505 .await
506 .map_err(|err| {
507 error!("Cannot insert schema version - {err:?}");
508 InitializationError
509 })?;
510 }
511 Some(actual_version) => {
512 if (actual_version) != T_METADATA_EXPECTED_SCHEMA_VERSION {
514 error!(
515 "Wrong schema version, expected {T_METADATA_EXPECTED_SCHEMA_VERSION}, got {actual_version}"
516 );
517 return Err(InitializationError);
518 }
519 }
520 }
521
522 debug!("Database schema initialized.");
523
524 Ok((
525 PostgresPool {
526 pool,
527 execution_finished_subscribers: Arc::default(),
528 pending_subscribers: Arc::default(),
529 response_subscribers: Arc::default(),
530 config,
531 },
532 outcome,
533 ))
534 }
535}
536
537fn consistency_db_err(err: impl Into<StrVariant>) -> DbErrorGeneric {
538 let err = err.into();
539 warn!(
540 backtrace = %std::backtrace::Backtrace::capture(),
541 "Consistency error: {err}"
542 );
543 DbErrorGeneric::Uncategorized(err)
544}
545
546#[derive(Debug)]
547struct CombinedStateDTO {
548 state: String,
549 ffqn: String,
550 component_id_input_digest: InputContentDigest,
551 pending_expires_finished: DateTime<Utc>,
552 last_lock_version: Option<Version>,
554 executor_id: Option<ExecutorId>,
555 run_id: Option<RunId>,
556 join_set_id: Option<JoinSetId>,
558 join_set_closing: Option<bool>,
559 result_kind: Option<PendingStateFinishedResultKind>,
561}
562#[derive(Debug)]
563struct CombinedState {
564 ffqn: FunctionFqn,
565 pending_state: PendingState,
566 corresponding_version: Version,
567}
568
569impl CombinedState {
570 fn new(dto: CombinedStateDTO, corresponding_version: Version) -> Result<Self, DbErrorGeneric> {
571 let ffqn = FunctionFqn::from_str(&dto.ffqn).map_err(|parse_err| {
572 error!("Error parsing ffqn of {dto:?} - {parse_err:?}");
573 consistency_db_err("invalid ffqn value in `t_state`")
574 })?;
575 let pending_state = match dto {
576 CombinedStateDTO {
578 state,
579 ffqn: _,
580 component_id_input_digest,
581 pending_expires_finished: scheduled_at,
582 last_lock_version: None,
583 executor_id: None,
584 run_id: None,
585 join_set_id: None,
586 join_set_closing: None,
587 result_kind: None,
588 } if state == STATE_PENDING_AT => PendingState::PendingAt {
589 scheduled_at,
590 last_lock: None,
591 component_id_input_digest,
592 },
593 CombinedStateDTO {
595 state,
596 ffqn: _,
597 component_id_input_digest,
598 pending_expires_finished: scheduled_at,
599 last_lock_version: None,
600 executor_id: Some(executor_id),
601 run_id: Some(run_id),
602 join_set_id: None,
603 join_set_closing: None,
604 result_kind: None,
605 } if state == STATE_PENDING_AT => PendingState::PendingAt {
606 scheduled_at,
607 last_lock: Some(LockedBy {
608 executor_id,
609 run_id,
610 }),
611 component_id_input_digest,
612 },
613 CombinedStateDTO {
614 state,
615 ffqn: _,
616 component_id_input_digest,
617 pending_expires_finished: lock_expires_at,
618 last_lock_version: Some(_),
619 executor_id: Some(executor_id),
620 run_id: Some(run_id),
621 join_set_id: None,
622 join_set_closing: None,
623 result_kind: None,
624 } if state == STATE_LOCKED => PendingState::Locked(PendingStateLocked {
625 locked_by: LockedBy {
626 executor_id,
627 run_id,
628 },
629 lock_expires_at,
630 component_id_input_digest,
631 }),
632 CombinedStateDTO {
633 state,
634 ffqn: _,
635 component_id_input_digest,
636 pending_expires_finished: lock_expires_at,
637 last_lock_version: None,
638 executor_id: _,
639 run_id: _,
640 join_set_id: Some(join_set_id),
641 join_set_closing: Some(join_set_closing),
642 result_kind: None,
643 } if state == STATE_BLOCKED_BY_JOIN_SET => PendingState::BlockedByJoinSet {
644 join_set_id: join_set_id.clone(),
645 closing: join_set_closing,
646 lock_expires_at,
647 component_id_input_digest,
648 },
649 CombinedStateDTO {
650 state,
651 ffqn: _,
652 component_id_input_digest,
653 pending_expires_finished: finished_at,
654 last_lock_version: None,
655 executor_id: None,
656 run_id: None,
657 join_set_id: None,
658 join_set_closing: None,
659 result_kind: Some(result_kind),
660 } if state == STATE_FINISHED => PendingState::Finished {
661 finished: PendingStateFinished {
662 finished_at,
663 version: corresponding_version.0,
664 result_kind,
665 },
666 component_id_input_digest,
667 },
668
669 _ => {
670 error!("Cannot deserialize pending state from {dto:?}");
671 return Err(consistency_db_err("invalid `t_state`"));
672 }
673 };
674 Ok(Self {
675 ffqn,
676 pending_state,
677 corresponding_version,
678 })
679 }
680
681 fn get_next_version_assert_not_finished(&self) -> Version {
682 assert!(!self.pending_state.is_finished());
683 self.corresponding_version.increment()
684 }
685
686 #[cfg(feature = "test")]
687 fn get_next_version_or_finished(&self) -> Version {
688 if self.pending_state.is_finished() {
689 self.corresponding_version.clone()
690 } else {
691 self.corresponding_version.increment()
692 }
693 }
694}
695
696#[derive(Debug)]
697struct NotifierPendingAt {
698 scheduled_at: DateTime<Utc>,
699 ffqn: FunctionFqn,
700 component_input_digest: InputContentDigest,
701}
702
703#[derive(Debug)]
704struct NotifierExecutionFinished {
705 execution_id: ExecutionId,
706 retval: SupportedFunctionReturnValue,
707}
708
709#[derive(Debug, Default)]
710struct AppendNotifier {
711 pending_at: Option<NotifierPendingAt>,
712 execution_finished: Option<NotifierExecutionFinished>,
713 response: Option<(ExecutionId, JoinSetResponseEventOuter)>,
714}
715
716#[derive(Debug, Clone)]
717struct DelayReq {
718 join_set_id: JoinSetId,
719 delay_id: DelayId,
720 expires_at: DateTime<Utc>,
721}
722
723async fn fetch_created_event(
724 tx: &Transaction<'_>,
725 execution_id: &ExecutionId,
726) -> Result<CreateRequest, DbErrorRead> {
727 let stmt = "SELECT created_at, json_value FROM t_execution_log WHERE \
728 execution_id = $1 AND version = 0";
729
730 let row = tx.query_one(stmt, &[&execution_id.to_string()]).await?;
731
732 let created_at = get!(row, "created_at");
733 let event: Json<ExecutionRequest> = get!(row, "json_value");
734 let event = event.0;
735
736 if let ExecutionRequest::Created {
737 ffqn,
738 params,
739 parent,
740 scheduled_at,
741 component_id,
742 metadata,
743 scheduled_by,
744 } = event
745 {
746 Ok(CreateRequest {
747 created_at,
748 execution_id: execution_id.clone(),
749 ffqn,
750 params,
751 parent,
752 scheduled_at,
753 component_id,
754 metadata,
755 scheduled_by,
756 })
757 } else {
758 error!("Row with version=0 must be a `Created` event - {event:?}");
759 Err(consistency_db_err("expected `Created` event").into())
760 }
761}
762
763fn check_expected_next_and_appending_version(
764 expected_version: &Version,
765 appending_version: &Version,
766) -> Result<(), DbErrorWrite> {
767 if *expected_version != *appending_version {
768 debug!(
769 "Version conflict - expected: {expected_version:?}, appending: {appending_version:?}"
770 );
771 return Err(DbErrorWrite::NonRetriable(
772 DbErrorWriteNonRetriable::VersionConflict {
773 expected: expected_version.clone(),
774 requested: appending_version.clone(),
775 },
776 ));
777 }
778 Ok(())
779}
780
781#[instrument(level = Level::TRACE, skip_all, fields(execution_id = %req.execution_id))]
782async fn create_inner(
783 tx: &Transaction<'_>,
784 req: CreateRequest,
785) -> Result<(AppendResponse, AppendNotifier), DbErrorWrite> {
786 trace!("create_inner");
787
788 let version = Version::default();
789 let execution_id = req.execution_id.clone();
790 let execution_id_str = execution_id.to_string();
791 let ffqn = req.ffqn.clone();
792 let created_at = req.created_at;
793 let scheduled_at = req.scheduled_at;
794 let component_id = req.component_id.clone();
795
796 let event = ExecutionRequest::from(req);
797 let event = Json(event);
798
799 tx.execute(
800 "INSERT INTO t_execution_log (
801 execution_id, created_at, version, json_value, variant, join_set_id
802 ) VALUES ($1, $2, $3, $4, $5, $6)",
803 &[
804 &execution_id_str,
805 &created_at,
806 &i64::from(version.0), &event,
808 &event.0.variant(),
809 &event.0.join_set_id().map(std::string::ToString::to_string),
810 ],
811 )
812 .await?;
813
814 let pending_at = {
815 debug!("Creating with `Pending(`{scheduled_at:?}`)");
816
817 tx.execute(
818 r"
819 INSERT INTO t_state (
820 execution_id,
821 is_top_level,
822 corresponding_version,
823 pending_expires_finished,
824 ffqn,
825 state,
826 created_at,
827 component_id_input_digest,
828 updated_at,
829 first_scheduled_at,
830 intermittent_event_count
831 ) VALUES (
832 $1, $2, $3, $4, $5, $6, $7, $8, CURRENT_TIMESTAMP, $9, 0
833 )",
834 &[
835 &execution_id_str,
836 &execution_id.is_top_level(),
837 &i64::from(version.0),
838 &scheduled_at,
839 &ffqn.to_string(),
840 &STATE_PENDING_AT,
841 &created_at,
842 &component_id.input_digest.as_slice(),
843 &scheduled_at,
844 ],
845 )
846 .await?;
847
848 AppendNotifier {
849 pending_at: Some(NotifierPendingAt {
850 scheduled_at,
851 ffqn,
852 component_input_digest: component_id.input_digest,
853 }),
854 execution_finished: None,
855 response: None,
856 }
857 };
858
859 let next_version = Version::new(version.0 + 1);
860 Ok((next_version, pending_at))
861}
862
863#[instrument(level = Level::DEBUG, skip_all, fields(%execution_id, %scheduled_at, %corresponding_version))]
864async fn update_state_pending_after_response_appended(
865 tx: &Transaction<'_>,
866 execution_id: &ExecutionId,
867 scheduled_at: DateTime<Utc>, corresponding_version: &Version, component_input_digest: InputContentDigest,
870) -> Result<AppendNotifier, DbErrorWrite> {
871 debug!("Setting t_state to Pending(`{scheduled_at:?}`) after response appended");
872
873 let execution_id_str = execution_id.to_string();
875 let version = i64::from(corresponding_version.0);
876
877 let updated = tx
878 .execute(
879 r"
880 UPDATE t_state
881 SET
882 corresponding_version = $1,
883 pending_expires_finished = $2,
884 state = $3,
885 updated_at = CURRENT_TIMESTAMP,
886
887 last_lock_version = NULL,
888
889 join_set_id = NULL,
890 join_set_closing = NULL,
891
892 result_kind = NULL
893 WHERE execution_id = $4
894 ",
895 &[
896 &version, &scheduled_at, &STATE_PENDING_AT, &execution_id_str, ],
901 )
902 .await?;
903
904 if updated == 0 {
905 return Err(DbErrorWrite::NotFound);
906 }
907
908 Ok(AppendNotifier {
909 pending_at: Some(NotifierPendingAt {
910 scheduled_at,
911 ffqn: fetch_created_event(tx, execution_id).await?.ffqn,
912 component_input_digest,
913 }),
914 execution_finished: None,
915 response: None,
916 })
917}
918
919#[instrument(level = Level::DEBUG, skip_all, fields(%execution_id, %scheduled_at, %appending_version))]
920async fn update_state_pending_after_event_appended(
921 tx: &Transaction<'_>,
922 execution_id: &ExecutionId,
923 appending_version: &Version,
924 scheduled_at: DateTime<Utc>,
925 intermittent_failure: bool,
926 component_input_digest: InputContentDigest,
927) -> Result<(AppendResponse, AppendNotifier), DbErrorWrite> {
928 debug!("Setting t_state to Pending(`{scheduled_at:?}`) after event appended");
929
930 let intermittent_delta = i64::from(intermittent_failure); let updated = tx
933 .execute(
934 r"
935 UPDATE t_state
936 SET
937 corresponding_version = $1,
938 pending_expires_finished = $2,
939 state = $3,
940 updated_at = CURRENT_TIMESTAMP,
941 intermittent_event_count = intermittent_event_count + $4,
942
943 last_lock_version = NULL,
944
945 join_set_id = NULL,
946 join_set_closing = NULL,
947
948 result_kind = NULL
949 WHERE execution_id = $5;
950 ",
951 &[
952 &i64::from(appending_version.0), &scheduled_at, &STATE_PENDING_AT, &intermittent_delta, &execution_id.to_string(), ],
958 )
959 .await?;
960
961 if updated != 1 {
962 return Err(DbErrorWrite::NotFound);
963 }
964
965 Ok((
966 appending_version.increment(),
967 AppendNotifier {
968 pending_at: Some(NotifierPendingAt {
969 scheduled_at,
970 ffqn: fetch_created_event(tx, execution_id).await?.ffqn,
971 component_input_digest,
972 }),
973 execution_finished: None,
974 response: None,
975 },
976 ))
977}
978
979async fn update_state_locked_get_intermittent_event_count(
980 tx: &Transaction<'_>,
981 execution_id: &ExecutionId,
982 executor_id: ExecutorId,
983 run_id: RunId,
984 lock_expires_at: DateTime<Utc>,
985 appending_version: &Version,
986 retry_config: ComponentRetryConfig,
987) -> Result<u32, DbErrorWrite> {
988 debug!("Setting t_state to Locked(`{lock_expires_at:?}`)");
989 let backoff_millis = i64::try_from(retry_config.retry_exp_backoff.as_millis())
990 .map_err(|_| DbErrorGeneric::Uncategorized("backoff too big".into()))?; let execution_id_str = execution_id.to_string();
993
994 let updated = tx
995 .execute(
996 r"
997 UPDATE t_state
998 SET
999 corresponding_version = $1,
1000 pending_expires_finished = $2,
1001 state = $3,
1002 updated_at = CURRENT_TIMESTAMP,
1003
1004 max_retries = $4,
1005 retry_exp_backoff_millis = $5,
1006 last_lock_version = $1, -- appending_version again
1007 executor_id = $6,
1008 run_id = $7,
1009
1010 join_set_id = NULL,
1011 join_set_closing = NULL,
1012
1013 result_kind = NULL
1014 WHERE execution_id = $8
1015 ",
1016 &[
1017 &i64::from(appending_version.0), &lock_expires_at, &STATE_LOCKED, &retry_config.max_retries.map(i64::from), &backoff_millis, &executor_id.to_string(), &run_id.to_string(), &execution_id_str, ],
1026 )
1027 .await?;
1028
1029 if updated != 1 {
1030 return Err(DbErrorWrite::NotFound);
1031 }
1032
1033 let row = tx
1035 .query_one(
1036 "SELECT intermittent_event_count FROM t_state WHERE execution_id = $1",
1037 &[&execution_id_str],
1038 )
1039 .await
1040 .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1041
1042 let count: i64 = get!(row, "intermittent_event_count"); let count = u32::try_from(count)
1044 .map_err(|_| consistency_db_err("`intermittent_event_count` must not be negative"))?;
1045 Ok(count)
1046}
1047
1048async fn update_state_blocked(
1049 tx: &Transaction<'_>,
1050 execution_id: &ExecutionId,
1051 appending_version: &Version,
1052 join_set_id: &JoinSetId,
1053 lock_expires_at: DateTime<Utc>,
1054 join_set_closing: bool,
1055) -> Result<AppendResponse, DbErrorWrite> {
1056 debug!("Setting t_state to BlockedByJoinSet(`{join_set_id}`)");
1057
1058 let updated = tx
1059 .execute(
1060 r"
1061 UPDATE t_state
1062 SET
1063 corresponding_version = $1,
1064 pending_expires_finished = $2,
1065 state = $3,
1066 updated_at = CURRENT_TIMESTAMP,
1067
1068 last_lock_version = NULL,
1069
1070 join_set_id = $4,
1071 join_set_closing = $5,
1072
1073 result_kind = NULL
1074 WHERE execution_id = $6
1075 ",
1076 &[
1077 &i64::from(appending_version.0), &lock_expires_at, &STATE_BLOCKED_BY_JOIN_SET, &join_set_id.to_string(), &join_set_closing, &execution_id.to_string(), ],
1084 )
1085 .await?;
1086
1087 if updated != 1 {
1088 return Err(DbErrorWrite::NotFound);
1089 }
1090 Ok(appending_version.increment())
1091}
1092
1093async fn update_state_finished(
1094 tx: &Transaction<'_>,
1095 execution_id: &ExecutionId,
1096 appending_version: &Version,
1097 finished_at: DateTime<Utc>,
1098 result_kind: PendingStateFinishedResultKind,
1099) -> Result<(), DbErrorWrite> {
1100 debug!("Setting t_state to Finished");
1101
1102 let result_kind_json = Json(result_kind);
1103
1104 let updated = tx
1105 .execute(
1106 r"
1107 UPDATE t_state
1108 SET
1109 corresponding_version = $1,
1110 pending_expires_finished = $2,
1111 state = $3,
1112 updated_at = CURRENT_TIMESTAMP,
1113
1114 last_lock_version = NULL,
1115 executor_id = NULL,
1116 run_id = NULL,
1117
1118 join_set_id = NULL,
1119 join_set_closing = NULL,
1120
1121 result_kind = $4
1122 WHERE execution_id = $5
1123 ",
1124 &[
1125 &i64::from(appending_version.0), &finished_at, &STATE_FINISHED, &result_kind_json, &execution_id.to_string(), ],
1131 )
1132 .await?;
1133
1134 if updated != 1 {
1135 return Err(DbErrorWrite::NotFound);
1136 }
1137 Ok(())
1138}
1139
1140#[instrument(level = Level::TRACE, skip_all, fields(%execution_id, %appending_version))]
1141async fn bump_state_next_version(
1142 tx: &Transaction<'_>,
1143 execution_id: &ExecutionId,
1144 appending_version: &Version,
1145 delay_req: Option<DelayReq>,
1146) -> Result<AppendResponse, DbErrorWrite> {
1147 debug!("update_index_version");
1148 let execution_id_str = execution_id.to_string();
1149
1150 let updated = tx
1151 .execute(
1152 r"
1153 UPDATE t_state
1154 SET
1155 corresponding_version = $1,
1156 updated_at = CURRENT_TIMESTAMP
1157 WHERE execution_id = $2
1158 ",
1159 &[
1160 &i64::from(appending_version.0), &execution_id_str, ],
1163 )
1164 .await?;
1165
1166 if updated != 1 {
1167 return Err(DbErrorWrite::NotFound);
1168 }
1169
1170 if let Some(DelayReq {
1171 join_set_id,
1172 delay_id,
1173 expires_at,
1174 }) = delay_req
1175 {
1176 debug!("Inserting delay to `t_delay`");
1177 tx.execute(
1178 "INSERT INTO t_delay (execution_id, join_set_id, delay_id, expires_at) VALUES ($1, $2, $3, $4)",
1179 &[
1180 &execution_id_str,
1181 &join_set_id.to_string(),
1182 &delay_id.to_string(),
1183 &expires_at,
1184 ],
1185 )
1186 .await?;
1187 }
1188 Ok(appending_version.increment())
1189}
1190
1191async fn get_combined_state(
1192 tx: &Transaction<'_>,
1193 execution_id: &ExecutionId,
1194) -> Result<CombinedState, DbErrorRead> {
1195 let row = tx
1196 .query_one(
1197 r"
1198 SELECT
1199 state, ffqn, component_id_input_digest, corresponding_version, pending_expires_finished,
1200 last_lock_version, executor_id, run_id,
1201 join_set_id, join_set_closing,
1202 result_kind
1203 FROM t_state
1204 WHERE execution_id = $1
1205 ",
1206 &[&execution_id.to_string()],
1207 )
1208 .await
1209 .map_err(DbErrorRead::from)?;
1210
1211 let digest_bytes: Vec<u8> = get!(row, "component_id_input_digest");
1213 let digest = Digest::try_from(digest_bytes.as_slice())
1214 .map_err(|err| consistency_db_err(err.to_string()))?;
1215 let component_id_input_digest = InputContentDigest(ContentDigest(digest));
1216
1217 let state: String = get!(row, "state");
1218 let ffqn: String = get!(row, "ffqn");
1219 let pending_expires_finished: DateTime<Utc> = get!(row, "pending_expires_finished");
1220
1221 let last_lock_version_raw: Option<i64> = get!(row, "last_lock_version");
1222 let last_lock_version = last_lock_version_raw
1223 .map(Version::try_from)
1224 .transpose()
1225 .map_err(|_| consistency_db_err("version must be non-negative"))?;
1226
1227 let executor_id_raw: Option<String> = get!(row, "executor_id");
1228 let executor_id = executor_id_raw
1229 .map(|id| ExecutorId::from_str(&id))
1230 .transpose()
1231 .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1232
1233 let run_id_raw: Option<String> = get!(row, "run_id");
1234 let run_id = run_id_raw
1235 .map(|id| RunId::from_str(&id))
1236 .transpose()
1237 .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1238
1239 let join_set_id_raw: Option<String> = get!(row, "join_set_id");
1240 let join_set_id = join_set_id_raw
1241 .map(|id| JoinSetId::from_str(&id))
1242 .transpose()
1243 .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1244
1245 let join_set_closing: Option<bool> = get!(row, "join_set_closing");
1246
1247 let result_kind: Option<Json<PendingStateFinishedResultKind>> = get!(row, "result_kind");
1248 let result_kind = result_kind.map(|it| it.0);
1249
1250 let corresponding_version = get!(row, "corresponding_version", i64);
1251 let corresponding_version = Version::new(
1252 VersionType::try_from(corresponding_version)
1253 .map_err(|_| consistency_db_err("version must be non-negative"))?,
1254 );
1255
1256 let dto = CombinedStateDTO {
1257 state,
1258 ffqn,
1259 component_id_input_digest,
1260 pending_expires_finished,
1261 last_lock_version,
1262 executor_id,
1263 run_id,
1264 join_set_id,
1265 join_set_closing,
1266 result_kind,
1267 };
1268 CombinedState::new(dto, corresponding_version).map_err(DbErrorRead::from)
1269}
1270
1271async fn list_executions(
1272 read_tx: &Transaction<'_>,
1273 ffqn: Option<&FunctionFqn>,
1274 top_level_only: bool,
1275 pagination: &ExecutionListPagination,
1276) -> Result<Vec<ExecutionWithState>, DbErrorGeneric> {
1277 struct QueryBuilder {
1279 where_clauses: Vec<String>,
1280 params: Vec<Box<dyn ToSql + Send + Sync>>,
1281 }
1282
1283 impl QueryBuilder {
1284 fn new() -> Self {
1285 Self {
1286 where_clauses: Vec::new(),
1287 params: Vec::new(),
1288 }
1289 }
1290
1291 fn add_param<T>(&mut self, param: T) -> String
1292 where
1293 T: ToSql + Sync + Send + 'static,
1294 {
1295 self.params.push(Box::new(param));
1296 format!("${}", self.params.len())
1297 }
1298
1299 fn add_where(&mut self, clause: String) {
1300 self.where_clauses.push(clause);
1301 }
1302 }
1303
1304 let mut qb = QueryBuilder::new();
1305
1306 let (limit, limit_desc) = match pagination {
1308 ExecutionListPagination::CreatedBy(p) => {
1309 let limit = p.length();
1310 let is_desc = p.is_desc();
1311 if let Some(cursor) = p.cursor() {
1312 let placeholder = qb.add_param(*cursor);
1313 qb.add_where(format!("created_at {} {}", p.rel(), placeholder));
1314 }
1315 (limit, is_desc)
1316 }
1317 ExecutionListPagination::ExecutionId(p) => {
1318 let limit = p.length();
1319 let is_desc = p.is_desc();
1320 if let Some(cursor) = p.cursor() {
1321 let placeholder = qb.add_param(cursor.to_string());
1322 qb.add_where(format!("execution_id {} {}", p.rel(), placeholder));
1323 }
1324 (limit, is_desc)
1325 }
1326 };
1327
1328 if top_level_only {
1330 qb.add_where("is_top_level = true".to_string());
1331 }
1332
1333 if let Some(ffqn) = ffqn {
1335 let placeholder = qb.add_param(ffqn.to_string());
1336 qb.add_where(format!("ffqn = {placeholder}"));
1337 }
1338
1339 let where_str = if qb.where_clauses.is_empty() {
1341 String::new()
1342 } else {
1343 format!("WHERE {}", qb.where_clauses.join(" AND "))
1344 };
1345
1346 let order_col = match pagination {
1347 ExecutionListPagination::CreatedBy(_) => "created_at",
1348 ExecutionListPagination::ExecutionId(_) => "execution_id",
1349 };
1350
1351 let desc_str = if limit_desc { "DESC" } else { "" };
1352
1353 let sql = format!(
1354 r"
1355 SELECT created_at, first_scheduled_at, component_id_input_digest,
1356 state, execution_id, ffqn, corresponding_version, pending_expires_finished,
1357 last_lock_version, executor_id, run_id,
1358 join_set_id, join_set_closing,
1359 result_kind
1360 FROM t_state {where_str} ORDER BY {order_col} {desc_str} LIMIT {limit}
1361 "
1362 );
1363
1364 let params_refs: Vec<&(dyn ToSql + Sync)> = qb
1365 .params
1366 .iter()
1367 .map(|p| p.as_ref() as &(dyn ToSql + Sync))
1368 .collect();
1369
1370 let rows = read_tx
1371 .query(&sql, ¶ms_refs)
1372 .await
1373 .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1374
1375 let mut vec = Vec::with_capacity(rows.len());
1376
1377 for row in rows {
1378 let unpack = || -> Result<ExecutionWithState, DbErrorGeneric> {
1380 let execution_id_str: String = get!(row, "execution_id");
1381 let execution_id = ExecutionId::from_str(&execution_id_str)
1382 .map_err(|err| consistency_db_err(err.to_string()))?;
1383
1384 let digest_bytes: Vec<u8> = get!(row, "component_id_input_digest");
1385 let digest = Digest::try_from(digest_bytes.as_slice())
1386 .map_err(|err| consistency_db_err(err.to_string()))?;
1387 let component_id_input_digest = InputContentDigest(ContentDigest(digest));
1388
1389 let created_at: DateTime<Utc> = get!(row, "created_at");
1390 let first_scheduled_at: DateTime<Utc> = get!(row, "first_scheduled_at");
1391
1392 let result_kind: Option<Json<PendingStateFinishedResultKind>> =
1393 get!(row, "result_kind");
1394 let result_kind = result_kind.map(|it| it.0);
1395
1396 let corresponding_version: i64 = get!(row, "corresponding_version");
1397 let corresponding_version = Version::try_from(corresponding_version)
1398 .map_err(|_| consistency_db_err("version must be non-negative"))?;
1399
1400 let executor_id_str: Option<String> = get!(row, "executor_id");
1401 let executor_id = executor_id_str
1402 .map(|id| ExecutorId::from_str(&id))
1403 .transpose()
1404 .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1405
1406 let last_lock_version_raw: Option<i64> = get!(row, "last_lock_version");
1407 let last_lock_version = last_lock_version_raw
1408 .map(Version::try_from)
1409 .transpose()
1410 .map_err(|_| consistency_db_err("version must be non-negative"))?;
1411
1412 let run_id_str: Option<String> = get!(row, "run_id");
1413 let run_id = run_id_str
1414 .map(|id| RunId::from_str(&id))
1415 .transpose()
1416 .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1417
1418 let join_set_id_str: Option<String> = get!(row, "join_set_id");
1419 let join_set_id = join_set_id_str
1420 .map(|id| JoinSetId::from_str(&id))
1421 .transpose()
1422 .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1423
1424 let combined_state_dto = CombinedStateDTO {
1425 component_id_input_digest: component_id_input_digest.clone(),
1426 state: get!(row, "state"),
1427 ffqn: get!(row, "ffqn"),
1428 pending_expires_finished: get!(row, "pending_expires_finished"),
1429 executor_id,
1430 last_lock_version,
1431 run_id,
1432 join_set_id,
1433 join_set_closing: get!(row, "join_set_closing"),
1434 result_kind,
1435 };
1436
1437 let combined_state = CombinedState::new(combined_state_dto, corresponding_version)?;
1438
1439 Ok(ExecutionWithState {
1440 execution_id,
1441 ffqn: combined_state.ffqn,
1442 pending_state: combined_state.pending_state,
1443 created_at,
1444 first_scheduled_at,
1445 component_digest: component_id_input_digest,
1446 })
1447 };
1448
1449 match unpack() {
1450 Ok(execution) => vec.push(execution),
1451 Err(err) => {
1452 warn!("Skipping corrupted row in t_state: {err:?}");
1453 }
1454 }
1455 }
1456
1457 if !limit_desc {
1458 vec.reverse();
1460 }
1461 Ok(vec)
1462}
1463
1464async fn list_responses(
1465 tx: &Transaction<'_>,
1466 execution_id: &ExecutionId,
1467 pagination: Option<Pagination<u32>>,
1468) -> Result<Vec<ResponseWithCursor>, DbErrorRead> {
1469 let mut params: Vec<Box<dyn ToSql + Send + Sync>> = Vec::new();
1471 let mut add_param = |p: Box<dyn ToSql + Send + Sync>| {
1472 params.push(p);
1473 format!("${}", params.len())
1474 };
1475
1476 let p_execution_id = add_param(Box::new(execution_id.to_string()));
1478
1479 let mut sql = format!(
1480 "SELECT \
1481 r.id, r.created_at, r.join_set_id, r.delay_id, r.delay_success, r.child_execution_id, r.finished_version, l.json_value \
1482 FROM t_join_set_response r LEFT OUTER JOIN t_execution_log l ON r.child_execution_id = l.execution_id \
1483 WHERE \
1484 r.execution_id = {p_execution_id} \
1485 AND ( r.finished_version = l.version OR r.child_execution_id IS NULL )"
1486 );
1487
1488 let limit = match &pagination {
1490 Some(p @ (Pagination::NewerThan { cursor, .. } | Pagination::OlderThan { cursor, .. })) => {
1491 let p_cursor = add_param(Box::new(i64::from(*cursor)));
1493
1494 write!(sql, " AND r.id {} {}", p.rel(), p_cursor).unwrap();
1496
1497 Some(p.length())
1498 }
1499 None => None,
1500 };
1501
1502 sql.push_str(" ORDER BY r.id");
1504 if pagination.as_ref().is_some_and(Pagination::is_desc) {
1505 sql.push_str(" DESC");
1506 }
1507
1508 if let Some(limit) = limit {
1510 let p_limit = add_param(Box::new(i64::from(limit)));
1512 write!(sql, " LIMIT {p_limit}").unwrap();
1513 }
1514
1515 let params_refs: Vec<&(dyn ToSql + Sync)> = params
1516 .iter()
1517 .map(|p| p.as_ref() as &(dyn ToSql + Sync))
1518 .collect();
1519
1520 let rows = tx
1521 .query(&sql, ¶ms_refs)
1522 .await
1523 .map_err(DbErrorRead::from)?;
1524
1525 let mut results = Vec::with_capacity(rows.len());
1526 for row in rows {
1527 results.push(parse_response_with_cursor(&row)?);
1528 }
1529
1530 Ok(results)
1531}
1532
1533fn parse_response_with_cursor(
1534 row: &tokio_postgres::Row,
1535) -> Result<ResponseWithCursor, DbErrorRead> {
1536 let id = u32::try_from(get!(row, "id", i64))
1538 .map_err(|_| consistency_db_err("id must not be negative"))?;
1539
1540 let created_at: DateTime<Utc> = get!(row, "created_at");
1541 let join_set_id_str: String = get!(row, "join_set_id");
1542 let join_set_id = JoinSetId::from_str(&join_set_id_str)
1543 .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1544
1545 let delay_id: Option<String> = get!(row, "delay_id");
1547 let delay_id = delay_id
1548 .map(|id| DelayId::from_str(&id))
1549 .transpose()
1550 .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1551 let delay_success: Option<bool> = get!(row, "delay_success");
1552 let child_execution_id: Option<String> = get!(row, "child_execution_id");
1553 let child_execution_id = child_execution_id
1554 .map(|id| ExecutionIdDerived::from_str(&id))
1555 .transpose()
1556 .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1557 let finished_version = get!(row, "finished_version", Option<i64>)
1558 .map(Version::try_from)
1559 .transpose()
1560 .map_err(|_| consistency_db_err("version must be non-negative"))?;
1561 let json_value: Option<Json<ExecutionRequest>> = get!(row, "json_value");
1562 let json_value = json_value.map(|it| it.0);
1563
1564 let event = match (
1565 delay_id,
1566 delay_success,
1567 child_execution_id,
1568 finished_version,
1569 json_value,
1570 ) {
1571 (Some(delay_id), Some(delay_success), None, None, None) => JoinSetResponse::DelayFinished {
1572 delay_id,
1573 result: delay_success.then_some(()).ok_or(()),
1574 },
1575 (None, None, Some(child_execution_id), Some(finished_version), Some(json_val)) => {
1576 if let ExecutionRequest::Finished { result, .. } = json_val {
1577 JoinSetResponse::ChildExecutionFinished {
1578 child_execution_id,
1579 finished_version,
1580 result,
1581 }
1582 } else {
1583 error!("Joined log entry must be 'Finished'");
1584 return Err(consistency_db_err("joined log entry must be 'Finished'").into());
1585 }
1586 }
1587 (delay, delay_success, child, finished, result) => {
1588 error!(
1589 "Invalid row in t_join_set_response {id} - {delay:?} {delay_success:?} {child:?} {finished:?} {result:?}",
1590 );
1591 return Err(consistency_db_err("invalid row in t_join_set_response").into());
1592 }
1593 };
1594
1595 Ok(ResponseWithCursor {
1596 cursor: id,
1597 event: JoinSetResponseEventOuter {
1598 event: JoinSetResponseEvent { join_set_id, event },
1599 created_at,
1600 },
1601 })
1602}
1603
1604#[instrument(level = Level::TRACE, skip_all, fields(%execution_id, %run_id, %executor_id))]
1605#[expect(clippy::too_many_arguments)]
1606async fn lock_single_execution(
1607 tx: &Transaction<'_>,
1608 created_at: DateTime<Utc>,
1609 component_id: ComponentId,
1610 execution_id: &ExecutionId,
1611 run_id: RunId,
1612 appending_version: &Version,
1613 executor_id: ExecutorId,
1614 lock_expires_at: DateTime<Utc>,
1615 retry_config: ComponentRetryConfig,
1616) -> Result<LockedExecution, DbErrorWrite> {
1617 debug!("lock_single_execution");
1618
1619 let combined_state = get_combined_state(tx, execution_id).await?;
1621 combined_state.pending_state.can_append_lock(
1622 created_at,
1623 executor_id,
1624 run_id,
1625 lock_expires_at,
1626 )?;
1627 let expected_version = combined_state.get_next_version_assert_not_finished();
1628 check_expected_next_and_appending_version(&expected_version, appending_version)?;
1629
1630 let locked_event = Locked {
1632 component_id,
1633 executor_id,
1634 lock_expires_at,
1635 run_id,
1636 retry_config,
1637 };
1638 let event = ExecutionRequest::Locked(locked_event.clone());
1639
1640 let event = Json(event);
1641
1642 tx.execute(
1644 "INSERT INTO t_execution_log \
1645 (execution_id, created_at, json_value, version, variant) \
1646 VALUES ($1, $2, $3, $4, $5)",
1647 &[
1648 &execution_id.to_string(),
1649 &created_at,
1650 &event,
1651 &i64::from(appending_version.0),
1652 &event.0.variant(),
1653 ],
1654 )
1655 .await
1656 .map_err(|err| {
1657 warn!("Cannot lock execution - {err:?}");
1658 DbErrorWrite::NonRetriable(DbErrorWriteNonRetriable::IllegalState("cannot lock".into()))
1659 })?;
1660
1661 let responses_dto = list_responses(tx, execution_id, None).await?;
1663 let responses = responses_dto.into_iter().map(|resp| resp.event).collect();
1664 trace!("Responses: {responses:?}");
1665
1666 let intermittent_event_count = update_state_locked_get_intermittent_event_count(
1667 tx,
1668 execution_id,
1669 executor_id,
1670 run_id,
1671 lock_expires_at,
1672 appending_version,
1673 retry_config,
1674 )
1675 .await?;
1676
1677 let rows = tx
1680 .query(
1681 "SELECT json_value, version FROM t_execution_log WHERE \
1682 execution_id = $1 AND (variant = $2 OR variant = $3) \
1683 ORDER BY version",
1684 &[
1685 &execution_id.to_string(),
1686 &DUMMY_CREATED.variant(),
1687 &DUMMY_HISTORY_EVENT.variant(),
1688 ],
1689 )
1690 .await
1691 .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
1692
1693 let mut events: VecDeque<ExecutionEvent> = VecDeque::new();
1694
1695 for row in rows {
1696 let event: Json<ExecutionRequest> = get!(row, "json_value");
1697 let event = event.0;
1698
1699 let version: i64 = get!(row, "version");
1700 let version = Version::try_from(version)
1701 .map_err(|_| consistency_db_err("version must be non-negative"))?;
1702
1703 events.push_back(ExecutionEvent {
1704 created_at: DateTime::from_timestamp_nanos(0), event,
1706 backtrace_id: None,
1707 version,
1708 });
1709 }
1710
1711 let Some(ExecutionRequest::Created {
1713 ffqn,
1714 params,
1715 parent,
1716 metadata,
1717 ..
1718 }) = events.pop_front().map(|outer| outer.event)
1719 else {
1720 error!("Execution log must contain at least `Created` event");
1721 return Err(consistency_db_err("execution log must contain `Created` event").into());
1722 };
1723
1724 let mut event_history = Vec::new();
1726 for ExecutionEvent { event, version, .. } in events {
1727 if let ExecutionRequest::HistoryEvent { event } = event {
1728 event_history.push((event, version));
1729 } else {
1730 error!("Rows can only contain `Created` and `HistoryEvent` event kinds");
1731 return Err(consistency_db_err(
1732 "rows can only contain `Created` and `HistoryEvent` event kinds",
1733 )
1734 .into());
1735 }
1736 }
1737
1738 Ok(LockedExecution {
1739 execution_id: execution_id.clone(),
1740 metadata,
1741 next_version: appending_version.increment(),
1742 ffqn,
1743 params,
1744 event_history,
1745 responses,
1746 parent,
1747 intermittent_event_count,
1748 locked_event,
1749 })
1750}
1751
1752async fn count_join_next(
1753 tx: &Transaction<'_>,
1754 execution_id: &ExecutionId,
1755 join_set_id: &JoinSetId,
1756) -> Result<u32, DbErrorRead> {
1757 let row = tx
1758 .query_one(
1759 "SELECT COUNT(*) as count FROM t_execution_log WHERE execution_id = $1 AND join_set_id = $2 \
1760 AND history_event_type = $3",
1761 &[
1762 &execution_id.to_string(),
1763 &join_set_id.to_string(),
1764 &HISTORY_EVENT_TYPE_JOIN_NEXT,
1765 ],
1766 )
1767 .await
1768 .map_err(DbErrorRead::from)?;
1769
1770 let count = u32::try_from(get!(row, "count", i64)).expect("COUNT cannot be negative");
1771 Ok(count)
1772}
1773
1774async fn nth_response(
1775 tx: &Transaction<'_>,
1776 execution_id: &ExecutionId,
1777 join_set_id: &JoinSetId,
1778 skip_rows: u32,
1779) -> Result<Option<ResponseWithCursor>, DbErrorRead> {
1780 let row = tx
1781 .query_opt(
1782 "SELECT r.id, r.created_at, r.join_set_id, \
1783 r.delay_id, r.delay_success, \
1784 r.child_execution_id, r.finished_version, l.json_value \
1785 FROM t_join_set_response r LEFT OUTER JOIN t_execution_log l ON r.child_execution_id = l.execution_id \
1786 WHERE \
1787 r.execution_id = $1 AND r.join_set_id = $2 AND \
1788 ( \
1789 r.finished_version = l.version \
1790 OR \
1791 r.child_execution_id IS NULL \
1792 ) \
1793 ORDER BY id \
1794 LIMIT 1 OFFSET $3",
1795 &[
1796 &execution_id.to_string(),
1797 &join_set_id.to_string(),
1798 &i64::from(skip_rows),
1799 ]
1800 )
1801 .await
1802 .map_err(DbErrorRead::from)?;
1803
1804 match row {
1805 Some(r) => Ok(Some(parse_response_with_cursor(&r)?)),
1806 None => Ok(None),
1807 }
1808}
1809
1810#[instrument(level = Level::TRACE, skip_all, fields(%execution_id))]
1811async fn append(
1812 tx: &Transaction<'_>,
1813 execution_id: &ExecutionId,
1814 req: AppendRequest,
1815 appending_version: Version,
1816) -> Result<(AppendResponse, AppendNotifier), DbErrorWrite> {
1817 if matches!(req.event, ExecutionRequest::Created { .. }) {
1818 return Err(DbErrorWrite::NonRetriable(
1819 DbErrorWriteNonRetriable::ValidationFailed(
1820 "cannot append `Created` event - use `create` instead".into(),
1821 ),
1822 ));
1823 }
1824
1825 if let AppendRequest {
1826 event:
1827 ExecutionRequest::Locked(Locked {
1828 component_id,
1829 executor_id,
1830 run_id,
1831 lock_expires_at,
1832 retry_config,
1833 }),
1834 created_at,
1835 } = req
1836 {
1837 return lock_single_execution(
1838 tx,
1839 created_at,
1840 component_id,
1841 execution_id,
1842 run_id,
1843 &appending_version,
1844 executor_id,
1845 lock_expires_at,
1846 retry_config,
1847 )
1848 .await
1849 .map(|locked_execution| (locked_execution.next_version, AppendNotifier::default()));
1850 }
1851
1852 let combined_state = get_combined_state(tx, execution_id).await?;
1853 if combined_state.pending_state.is_finished() {
1854 debug!("Execution is already finished");
1855 return Err(DbErrorWrite::NonRetriable(
1856 DbErrorWriteNonRetriable::IllegalState("already finished".into()),
1857 ));
1858 }
1859
1860 check_expected_next_and_appending_version(
1861 &combined_state.get_next_version_assert_not_finished(),
1862 &appending_version,
1863 )?;
1864
1865 let event = Json(req.event);
1866
1867 tx.execute(
1869 "INSERT INTO t_execution_log (execution_id, created_at, json_value, version, variant, join_set_id) \
1870 VALUES ($1, $2, $3, $4, $5, $6)",
1871 &[
1872 &execution_id.to_string(),
1873 &req.created_at,
1874 &event,
1875 &i64::from(appending_version.0),
1876 &event.0.variant(),
1877 &event.0.join_set_id().map(std::string::ToString::to_string),
1878 ],
1879 )
1880 .await?;
1881
1882 match &event.0 {
1884 ExecutionRequest::Created { .. } => {
1885 unreachable!("handled in the caller")
1886 }
1887
1888 ExecutionRequest::Locked { .. } => {
1889 unreachable!("handled above")
1890 }
1891
1892 ExecutionRequest::TemporarilyFailed {
1893 backoff_expires_at, ..
1894 }
1895 | ExecutionRequest::TemporarilyTimedOut {
1896 backoff_expires_at, ..
1897 } => {
1898 let (next_version, notifier) = update_state_pending_after_event_appended(
1899 tx,
1900 execution_id,
1901 &appending_version,
1902 *backoff_expires_at,
1903 true, combined_state
1905 .pending_state
1906 .get_component_id_input_digest()
1907 .clone(),
1908 )
1909 .await?;
1910 return Ok((next_version, notifier));
1911 }
1912
1913 ExecutionRequest::Unlocked {
1914 backoff_expires_at, ..
1915 } => {
1916 let (next_version, notifier) = update_state_pending_after_event_appended(
1917 tx,
1918 execution_id,
1919 &appending_version,
1920 *backoff_expires_at,
1921 false, combined_state
1923 .pending_state
1924 .get_component_id_input_digest()
1925 .clone(),
1926 )
1927 .await?;
1928 return Ok((next_version, notifier));
1929 }
1930
1931 ExecutionRequest::Finished { result, .. } => {
1932 update_state_finished(
1933 tx,
1934 execution_id,
1935 &appending_version,
1936 req.created_at,
1937 PendingStateFinishedResultKind::from(result),
1938 )
1939 .await?;
1940 return Ok((
1941 appending_version,
1942 AppendNotifier {
1943 pending_at: None,
1944 execution_finished: Some(NotifierExecutionFinished {
1945 execution_id: execution_id.clone(),
1946 retval: result.clone(),
1947 }),
1948 response: None,
1949 },
1950 ));
1951 }
1952
1953 ExecutionRequest::HistoryEvent {
1954 event:
1955 HistoryEvent::JoinSetCreate { .. }
1956 | HistoryEvent::JoinSetRequest {
1957 request: JoinSetRequest::ChildExecutionRequest { .. },
1958 ..
1959 }
1960 | HistoryEvent::Persist { .. }
1961 | HistoryEvent::Schedule { .. }
1962 | HistoryEvent::Stub { .. }
1963 | HistoryEvent::JoinNextTooMany { .. },
1964 } => {
1965 return Ok((
1966 bump_state_next_version(tx, execution_id, &appending_version, None).await?,
1967 AppendNotifier::default(),
1968 ));
1969 }
1970
1971 ExecutionRequest::HistoryEvent {
1972 event:
1973 HistoryEvent::JoinSetRequest {
1974 join_set_id,
1975 request:
1976 JoinSetRequest::DelayRequest {
1977 delay_id,
1978 expires_at,
1979 ..
1980 },
1981 },
1982 } => {
1983 return Ok((
1984 bump_state_next_version(
1985 tx,
1986 execution_id,
1987 &appending_version,
1988 Some(DelayReq {
1989 join_set_id: join_set_id.clone(),
1990 delay_id: delay_id.clone(),
1991 expires_at: *expires_at,
1992 }),
1993 )
1994 .await?,
1995 AppendNotifier::default(),
1996 ));
1997 }
1998
1999 ExecutionRequest::HistoryEvent {
2000 event:
2001 HistoryEvent::JoinNext {
2002 join_set_id,
2003 run_expires_at,
2004 closing,
2005 requested_ffqn: _,
2006 },
2007 } => {
2008 let join_next_count = count_join_next(tx, execution_id, join_set_id).await?;
2010
2011 let nth_response =
2013 nth_response(tx, execution_id, join_set_id, join_next_count - 1).await?;
2014
2015 trace!("join_next_count: {join_next_count}, nth_response: {nth_response:?}");
2016 assert!(join_next_count > 0);
2017
2018 if let Some(ResponseWithCursor {
2019 event:
2020 JoinSetResponseEventOuter {
2021 created_at: nth_created_at,
2022 ..
2023 },
2024 cursor: _,
2025 }) = nth_response
2026 {
2027 let scheduled_at = std::cmp::max(*run_expires_at, nth_created_at);
2028 let (next_version, notifier) = update_state_pending_after_event_appended(
2029 tx,
2030 execution_id,
2031 &appending_version,
2032 scheduled_at,
2033 false, combined_state
2035 .pending_state
2036 .get_component_id_input_digest()
2037 .clone(),
2038 )
2039 .await?;
2040 return Ok((next_version, notifier));
2041 }
2042
2043 return Ok((
2044 update_state_blocked(
2045 tx,
2046 execution_id,
2047 &appending_version,
2048 join_set_id,
2049 *run_expires_at,
2050 *closing,
2051 )
2052 .await?,
2053 AppendNotifier::default(),
2054 ));
2055 }
2056 }
2057}
2058
2059async fn append_response(
2060 tx: &Transaction<'_>,
2061 execution_id: &ExecutionId,
2062 response_outer: JoinSetResponseEventOuter,
2063) -> Result<AppendNotifier, DbErrorWrite> {
2064 let join_set_id = &response_outer.event.join_set_id;
2065
2066 let (delay_id, delay_success) = match &response_outer.event.event {
2067 JoinSetResponse::DelayFinished { delay_id, result } => {
2068 (Some(delay_id.to_string()), Some(result.is_ok()))
2069 }
2070 JoinSetResponse::ChildExecutionFinished { .. } => (None, None),
2071 };
2072
2073 let (child_execution_id, finished_version) = match &response_outer.event.event {
2074 JoinSetResponse::ChildExecutionFinished {
2075 child_execution_id,
2076 finished_version,
2077 result: _,
2078 } => (
2079 Some(child_execution_id.to_string()),
2080 Some(i64::from(finished_version.0)),
2081 ),
2082 JoinSetResponse::DelayFinished { .. } => (None, None),
2083 };
2084
2085 tx.execute(
2086 "INSERT INTO t_join_set_response (execution_id, created_at, join_set_id, delay_id, delay_success, child_execution_id, finished_version) \
2087 VALUES ($1, $2, $3, $4, $5, $6, $7)",
2088 &[
2089 &execution_id.to_string(),
2090 &response_outer.created_at,
2091 &join_set_id.to_string(),
2092 &delay_id,
2093 &delay_success,
2094 &child_execution_id,
2095 &finished_version,
2096 ]
2097 ).await?;
2098
2099 let combined_state = get_combined_state(tx, execution_id).await?;
2101 debug!("previous_pending_state: {combined_state:?}");
2102
2103 let mut notifier = if let PendingState::BlockedByJoinSet {
2104 join_set_id: found_join_set_id,
2105 lock_expires_at,
2106 closing: _,
2107 component_id_input_digest,
2108 } = combined_state.pending_state
2109 && *join_set_id == found_join_set_id
2110 {
2111 let scheduled_at = std::cmp::max(lock_expires_at, response_outer.created_at);
2112 update_state_pending_after_response_appended(
2114 tx,
2115 execution_id,
2116 scheduled_at,
2117 &combined_state.corresponding_version,
2118 component_id_input_digest,
2119 )
2120 .await?
2121 } else {
2122 AppendNotifier::default()
2123 };
2124
2125 if let JoinSetResponseEvent {
2126 join_set_id,
2127 event:
2128 JoinSetResponse::DelayFinished {
2129 delay_id,
2130 result: _,
2131 },
2132 } = &response_outer.event
2133 {
2134 debug!(%join_set_id, %delay_id, "Deleting from `t_delay`");
2135 tx.execute(
2136 "DELETE FROM t_delay WHERE execution_id = $1 AND join_set_id = $2 AND delay_id = $3",
2137 &[
2138 &execution_id.to_string(),
2139 &join_set_id.to_string(),
2140 &delay_id.to_string(),
2141 ],
2142 )
2143 .await?;
2144 }
2145
2146 notifier.response = Some((execution_id.clone(), response_outer));
2147 Ok(notifier)
2148}
2149
2150async fn append_backtrace(
2151 tx: &Transaction<'_>,
2152 backtrace_info: &BacktraceInfo,
2153) -> Result<(), DbErrorWrite> {
2154 let backtrace_json = Json(&backtrace_info.wasm_backtrace);
2155
2156 tx.execute(
2157 "INSERT INTO t_backtrace (execution_id, component_id, version_min_including, version_max_excluding, wasm_backtrace) \
2158 VALUES ($1, $2, $3, $4, $5)",
2159 &[
2160 &backtrace_info.execution_id.to_string(),
2161 &Json(&backtrace_info.component_id),
2162 &i64::from(backtrace_info.version_min_including.0),
2163 &i64::from(backtrace_info.version_max_excluding.0),
2164 &backtrace_json,
2165 ],
2166 )
2167 .await?;
2168
2169 Ok(())
2170}
2171
2172#[cfg(feature = "test")]
2173async fn get(
2174 tx: &Transaction<'_>,
2175 execution_id: &ExecutionId,
2176) -> Result<concepts::storage::ExecutionLog, DbErrorRead> {
2177 let rows = tx
2178 .query(
2179 "SELECT created_at, json_value, version FROM t_execution_log WHERE \
2180 execution_id = $1 ORDER BY version",
2181 &[&execution_id.to_string()],
2182 )
2183 .await
2184 .map_err(DbErrorRead::from)?;
2185
2186 if rows.is_empty() {
2187 return Err(DbErrorRead::NotFound);
2188 }
2189
2190 let mut events = Vec::with_capacity(rows.len());
2191 for row in rows {
2192 let created_at: DateTime<Utc> = get!(row, "created_at");
2193 let event: Json<ExecutionRequest> = get!(row, "json_value");
2194 let event = event.0;
2195 let version: i64 = get!(row, "version");
2196 let version = Version::try_from(version)
2197 .map_err(|_| consistency_db_err("version must be non-negative"))?;
2198
2199 events.push(ExecutionEvent {
2200 created_at,
2201 event,
2202 backtrace_id: None,
2203 version,
2204 });
2205 }
2206
2207 let combined_state = get_combined_state(tx, execution_id).await?;
2208 let responses_dto = list_responses(tx, execution_id, None).await?;
2209 let responses = responses_dto.into_iter().map(|resp| resp.event).collect();
2210
2211 Ok(concepts::storage::ExecutionLog {
2212 execution_id: execution_id.clone(),
2213 events,
2214 responses,
2215 next_version: combined_state.get_next_version_or_finished(),
2216 pending_state: combined_state.pending_state,
2217 })
2218}
2219
2220async fn list_execution_events(
2221 tx: &Transaction<'_>,
2222 execution_id: &ExecutionId,
2223 version_min: VersionType,
2224 version_max_excluding: VersionType,
2225 include_backtrace_id: bool,
2226) -> Result<Vec<ExecutionEvent>, DbErrorRead> {
2227 let sql = if include_backtrace_id {
2228 "SELECT
2229 log.created_at,
2230 log.json_value,
2231 log.version,
2232 bt.version_min_including AS backtrace_id
2233 FROM
2234 t_execution_log AS log
2235 LEFT OUTER JOIN
2236 t_backtrace AS bt ON log.execution_id = bt.execution_id
2237 AND log.version >= bt.version_min_including
2238 AND log.version < bt.version_max_excluding
2239 WHERE
2240 log.execution_id = $1
2241 AND log.version >= $2
2242 AND log.version < $3
2243 ORDER BY
2244 log.version"
2245 } else {
2246 "SELECT
2247 created_at, json_value, NULL::BIGINT as backtrace_id, version
2248 FROM t_execution_log WHERE
2249 execution_id = $1 AND version >= $2 AND version < $3
2250 ORDER BY version"
2251 };
2252
2253 let rows = tx
2254 .query(
2255 sql,
2256 &[
2257 &execution_id.to_string(),
2258 &i64::from(version_min),
2259 &i64::from(version_max_excluding),
2260 ],
2261 )
2262 .await
2263 .map_err(DbErrorRead::from)?;
2264
2265 let mut events = Vec::with_capacity(rows.len());
2266 for row in rows {
2267 let created_at: DateTime<Utc> = get!(row, "created_at");
2268 let backtrace_id = get!(row, "backtrace_id", Option<i64>)
2269 .map(Version::try_from)
2270 .transpose()
2271 .map_err(|_| consistency_db_err("version must be non-negative"))?;
2272
2273 let version = get!(row, "version", i64);
2274 let version = Version::new(
2275 VersionType::try_from(version)
2276 .map_err(|_| consistency_db_err("version must be non-negative"))?,
2277 );
2278 let event_req: Json<ExecutionRequest> = get!(row, "json_value");
2279 let event_req = event_req.0;
2280
2281 events.push(ExecutionEvent {
2282 created_at,
2283 event: event_req,
2284 backtrace_id,
2285 version,
2286 });
2287 }
2288 Ok(events)
2289}
2290
2291async fn get_execution_event(
2292 tx: &Transaction<'_>,
2293 execution_id: &ExecutionId,
2294 version: VersionType,
2295) -> Result<ExecutionEvent, DbErrorRead> {
2296 let row = tx
2297 .query_one(
2298 "SELECT created_at, json_value, version FROM t_execution_log WHERE \
2299 execution_id = $1 AND version = $2",
2300 &[&execution_id.to_string(), &i64::from(version)],
2301 )
2302 .await
2303 .map_err(DbErrorRead::from)?;
2304
2305 let created_at: DateTime<Utc> = get!(row, "created_at");
2306 let json_val: Json<ExecutionRequest> = get!(row, "json_value");
2307 let version = get!(row, "version", i64);
2308 let version = Version::try_from(version)
2309 .map_err(|_| consistency_db_err("version must be non-negative"))?;
2310 let event = json_val.0;
2311
2312 Ok(ExecutionEvent {
2313 created_at,
2314 event,
2315 backtrace_id: None,
2316 version,
2317 })
2318}
2319
2320async fn get_last_execution_event(
2321 tx: &Transaction<'_>,
2322 execution_id: &ExecutionId,
2323) -> Result<ExecutionEvent, DbErrorRead> {
2324 let row = tx
2325 .query_one(
2326 "SELECT created_at, json_value, version FROM t_execution_log WHERE \
2327 execution_id = $1 ORDER BY version DESC LIMIT 1",
2328 &[&execution_id.to_string()],
2329 )
2330 .await
2331 .map_err(DbErrorRead::from)?;
2332
2333 let created_at: DateTime<Utc> = get!(row, "created_at");
2334 let event: Json<ExecutionRequest> = get!(row, "json_value");
2335 let event = event.0;
2336 let version = get!(row, "version", i64);
2337 let version = Version::try_from(version)
2338 .map_err(|_| consistency_db_err("version must be non-negative"))?;
2339
2340 Ok(ExecutionEvent {
2341 created_at,
2342 event,
2343 backtrace_id: None,
2344 version,
2345 })
2346}
2347
2348async fn delay_response(
2349 tx: &Transaction<'_>,
2350 execution_id: &ExecutionId,
2351 delay_id: &DelayId,
2352) -> Result<Option<bool>, DbErrorRead> {
2353 let row = tx
2354 .query_opt(
2355 "SELECT delay_success \
2356 FROM t_join_set_response \
2357 WHERE \
2358 execution_id = $1 AND delay_id = $2",
2359 &[&execution_id.to_string(), &delay_id.to_string()],
2360 )
2361 .await
2362 .map_err(DbErrorRead::from)?;
2363
2364 match row {
2365 Some(r) => Ok(Some(get!(r, "delay_success", bool))),
2366 None => Ok(None),
2367 }
2368}
2369
2370#[instrument(level = Level::TRACE, skip_all)]
2371async fn get_responses_with_offset(
2372 tx: &Transaction<'_>,
2373 execution_id: &ExecutionId,
2374 skip_rows: u32,
2375) -> Result<Vec<JoinSetResponseEventOuter>, DbErrorRead> {
2376 let rows = tx
2377 .query(
2378 "SELECT r.id, r.created_at, r.join_set_id, \
2379 r.delay_id, r.delay_success, \
2380 r.child_execution_id, r.finished_version, l.json_value \
2381 FROM t_join_set_response r LEFT OUTER JOIN t_execution_log l ON r.child_execution_id = l.execution_id \
2382 WHERE \
2383 r.execution_id = $1 AND \
2384 ( \
2385 r.finished_version = l.version \
2386 OR r.child_execution_id IS NULL \
2387 ) \
2388 ORDER BY id \
2389 OFFSET $2",
2390 &[
2391 &execution_id.to_string(),
2392 &(i64::from(skip_rows)),
2393 ]
2394 )
2395 .await
2396 .map_err(DbErrorRead::from)?;
2397
2398 let mut results = Vec::with_capacity(rows.len());
2399 for row in rows {
2400 let resp = parse_response_with_cursor(&row)?;
2401 results.push(resp.event);
2402 }
2403 Ok(results)
2404}
2405
2406async fn get_pending_of_single_ffqn(
2407 tx: &Transaction<'_>,
2408 batch_size: u32,
2409 pending_at_or_sooner: DateTime<Utc>,
2410 ffqn: &FunctionFqn,
2411 select_strategy: SelectStrategy,
2412) -> Result<Vec<(ExecutionId, Version)>, ()> {
2413 let rows = tx
2414 .query(
2415 &format!(
2416 "SELECT execution_id, corresponding_version FROM t_state \
2417 WHERE \
2418 state = '{STATE_PENDING_AT}' AND \
2419 pending_expires_finished <= $1 AND ffqn = $2 \
2420 ORDER BY pending_expires_finished \
2421 {} \
2422 LIMIT $3",
2423 if select_strategy == SelectStrategy::LockForUpdate {
2424 "FOR UPDATE SKIP LOCKED"
2425 } else {
2426 ""
2427 }
2428 ),
2429 &[
2430 &pending_at_or_sooner,
2431 &ffqn.to_string(),
2432 &(i64::from(batch_size)),
2433 ],
2434 )
2435 .await
2436 .map_err(|err| {
2437 warn!("Ignoring consistency error {err:?}");
2438 })?;
2439
2440 let mut result = Vec::with_capacity(rows.len());
2441 for row in rows {
2442 let unpack = || -> Result<(ExecutionId, Version), DbErrorGeneric> {
2443 let eid_str: String = get!(row, "execution_id");
2444 let corresponding_version: i64 = get!(row, "corresponding_version");
2445 let corresponding_version = Version::try_from(corresponding_version)
2446 .map_err(|_| consistency_db_err("version must be non-negative"))?;
2447
2448 if let Ok(eid) = ExecutionId::from_str(&eid_str) {
2449 return Ok((eid, corresponding_version.increment()));
2450 }
2451 Err(consistency_db_err("invalid execution_id"))
2452 };
2453
2454 match unpack() {
2455 Ok(val) => result.push(val),
2456 Err(err) => warn!("Ignoring corrupted row in pending check: {err:?}"),
2457 }
2458 }
2459 Ok(result)
2460}
2461
2462async fn get_pending_by_ffqns(
2464 tx: &Transaction<'_>,
2465 batch_size: u32,
2466 pending_at_or_sooner: DateTime<Utc>,
2467 ffqns: &[FunctionFqn],
2468 select_strategy: SelectStrategy,
2469) -> Result<Vec<(ExecutionId, Version)>, DbErrorGeneric> {
2470 let batch_size = usize::try_from(batch_size).expect("16 bit systems are unsupported");
2471 let mut execution_ids_versions = Vec::with_capacity(batch_size);
2472
2473 for ffqn in ffqns {
2474 let needed = batch_size - execution_ids_versions.len();
2475 if needed == 0 {
2476 break;
2477 }
2478 let needed = u32::try_from(needed).expect("u32 - usize cannot overflow an 32");
2479 if let Ok(execs) =
2480 get_pending_of_single_ffqn(tx, needed, pending_at_or_sooner, ffqn, select_strategy)
2481 .await
2482 {
2483 execution_ids_versions.extend(execs);
2484 }
2485 }
2486
2487 Ok(execution_ids_versions)
2488}
2489
2490#[derive(Debug, Copy, Clone, PartialEq, Eq)]
2491enum SelectStrategy {
2492 Read,
2493 LockForUpdate,
2494}
2495
2496async fn get_pending_by_component_input_digest(
2497 tx: &Transaction<'_>,
2498 batch_size: u32,
2499 pending_at_or_sooner: DateTime<Utc>,
2500 input_digest: &InputContentDigest,
2501 select_strategy: SelectStrategy,
2502) -> Result<Vec<(ExecutionId, Version)>, DbErrorGeneric> {
2503 let rows = tx
2504 .query(
2505 &format!(
2506 "SELECT execution_id, corresponding_version FROM t_state WHERE \
2507 state = '{STATE_PENDING_AT}' AND \
2508 pending_expires_finished <= $1 AND \
2509 component_id_input_digest = $2 \
2510 ORDER BY pending_expires_finished \
2511 {} \
2512 LIMIT $3",
2513 if select_strategy == SelectStrategy::LockForUpdate {
2514 "FOR UPDATE SKIP LOCKED"
2515 } else {
2516 ""
2517 }
2518 ),
2519 &[
2520 &pending_at_or_sooner,
2521 &input_digest.as_slice(), &i64::from(batch_size),
2523 ],
2524 )
2525 .await
2526 .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
2527
2528 let mut result = Vec::with_capacity(rows.len());
2529 for row in rows {
2530 let unpack = || -> Result<(ExecutionId, Version), DbErrorGeneric> {
2531 let eid_str: String = get!(row, "execution_id");
2532 let corresponding_version: i64 = get!(row, "corresponding_version");
2533 let corresponding_version = Version::try_from(corresponding_version)
2534 .map_err(|_| consistency_db_err("version must be non-negative"))?;
2535
2536 let eid = ExecutionId::from_str(&eid_str)
2537 .map_err(|err| consistency_db_err(err.to_string()))?;
2538 Ok((eid, corresponding_version.increment()))
2539 };
2540
2541 match unpack() {
2542 Ok(val) => result.push(val),
2543 Err(err) => {
2544 warn!("Skipping corrupted row in get_pending_by_component_input_digest: {err:?}");
2545 }
2546 }
2547 }
2548
2549 Ok(result)
2550}
2551
2552fn notify_pending_locked(
2553 notifier: &NotifierPendingAt,
2554 current_time: DateTime<Utc>,
2555 ffqn_to_pending_subscription: &std::sync::MutexGuard<PendingFfqnSubscribersHolder>,
2556) {
2557 if notifier.scheduled_at <= current_time {
2558 ffqn_to_pending_subscription.notify(notifier);
2559 }
2560}
2561
2562async fn upgrade_execution_component(
2563 tx: &Transaction<'_>,
2564 execution_id: &ExecutionId,
2565 old: &InputContentDigest,
2566 new: &InputContentDigest,
2567) -> Result<(), DbErrorWrite> {
2568 debug!("Updating t_state to component {new}");
2569
2570 let updated = tx
2571 .execute(
2572 r"
2573 UPDATE t_state
2574 SET
2575 updated_at = CURRENT_TIMESTAMP,
2576 component_id_input_digest = $1
2577 WHERE
2578 execution_id = $2 AND
2579 component_id_input_digest = $3
2580 ",
2581 &[
2582 &new.as_slice(), &execution_id.to_string(), &old.as_slice(), ],
2586 )
2587 .await
2588 .map_err(|err| DbErrorGeneric::Uncategorized(err.to_string().into()))?;
2589
2590 if updated != 1 {
2591 return Err(DbErrorWrite::NotFound);
2592 }
2593 Ok(())
2594}
2595
2596impl PostgresConnection {
2597 #[instrument(level = Level::TRACE, skip_all)]
2599 fn notify_all(&self, notifiers: Vec<AppendNotifier>, current_time: DateTime<Utc>) {
2600 let (pending_ats, finished_execs, responses) = {
2601 let (mut pending_ats, mut finished_execs, mut responses) =
2602 (Vec::new(), Vec::new(), Vec::new());
2603 for notifier in notifiers {
2604 if let Some(pending_at) = notifier.pending_at {
2605 pending_ats.push(pending_at);
2606 }
2607 if let Some(finished) = notifier.execution_finished {
2608 finished_execs.push(finished);
2609 }
2610 if let Some(response) = notifier.response {
2611 responses.push(response);
2612 }
2613 }
2614 (pending_ats, finished_execs, responses)
2615 };
2616
2617 if !pending_ats.is_empty() {
2619 let guard = self.pending_subscribers.lock().unwrap();
2620 for pending_at in pending_ats {
2621 notify_pending_locked(&pending_at, current_time, &guard);
2622 }
2623 }
2624 if !finished_execs.is_empty() {
2626 let mut guard = self.execution_finished_subscribers.lock().unwrap();
2627 for finished in finished_execs {
2628 if let Some(listeners_of_exe_id) = guard.remove(&finished.execution_id) {
2629 for (_tag, sender) in listeners_of_exe_id {
2630 let _ = sender.send(finished.retval.clone());
2631 }
2632 }
2633 }
2634 }
2635 if !responses.is_empty() {
2637 let mut guard = self.response_subscribers.lock().unwrap();
2638 for (execution_id, response) in responses {
2639 if let Some((sender, _)) = guard.remove(&execution_id) {
2640 let _ = sender.send(response);
2641 }
2642 }
2643 }
2644 }
2645}
2646
2647#[async_trait]
2648impl DbExecutor for PostgresConnection {
2649 #[instrument(level = Level::TRACE, skip(self))]
2650 async fn lock_pending_by_ffqns(
2651 &self,
2652 batch_size: u32,
2653 pending_at_or_sooner: DateTime<Utc>,
2654 ffqns: Arc<[FunctionFqn]>,
2655 created_at: DateTime<Utc>,
2656 component_id: ComponentId,
2657 executor_id: ExecutorId,
2658 lock_expires_at: DateTime<Utc>,
2659 run_id: RunId,
2660 retry_config: ComponentRetryConfig,
2661 ) -> Result<LockPendingResponse, DbErrorGeneric> {
2662 let mut client_guard = self.client.lock().await;
2663 let tx = client_guard
2664 .transaction()
2665 .await
2666 .map_err(DbErrorGeneric::from)?;
2667
2668 let execution_ids_versions = get_pending_by_ffqns(
2669 &tx,
2670 batch_size,
2671 pending_at_or_sooner,
2672 &ffqns,
2673 SelectStrategy::LockForUpdate,
2674 )
2675 .await?;
2676
2677 if execution_ids_versions.is_empty() {
2678 tx.commit().await.map_err(DbErrorGeneric::from)?;
2681 return Ok(vec![]);
2682 }
2683
2684 debug!("Locking {execution_ids_versions:?}");
2685
2686 let mut locked_execs = Vec::with_capacity(execution_ids_versions.len());
2688 for (execution_id, version) in execution_ids_versions {
2689 match lock_single_execution(
2690 &tx,
2691 created_at,
2692 component_id.clone(),
2693 &execution_id,
2694 run_id,
2695 &version,
2696 executor_id,
2697 lock_expires_at,
2698 retry_config,
2699 )
2700 .await
2701 {
2702 Ok(locked) => locked_execs.push(locked),
2703 Err(err) => {
2704 warn!("Locking row {execution_id} failed - {err:?}");
2705 }
2706 }
2707 }
2708
2709 tx.commit().await.map_err(DbErrorGeneric::from)?;
2710
2711 Ok(locked_execs)
2712 }
2713
2714 #[instrument(level = Level::TRACE, skip(self))]
2715 async fn lock_pending_by_component_id(
2716 &self,
2717 batch_size: u32,
2718 pending_at_or_sooner: DateTime<Utc>,
2719 component_id: &ComponentId,
2720 created_at: DateTime<Utc>,
2721 executor_id: ExecutorId,
2722 lock_expires_at: DateTime<Utc>,
2723 run_id: RunId,
2724 retry_config: ComponentRetryConfig,
2725 ) -> Result<LockPendingResponse, DbErrorGeneric> {
2726 let mut client_guard = self.client.lock().await;
2727 let tx = client_guard
2728 .transaction()
2729 .await
2730 .map_err(DbErrorGeneric::from)?;
2731
2732 let execution_ids_versions = get_pending_by_component_input_digest(
2733 &tx,
2734 batch_size,
2735 pending_at_or_sooner,
2736 &component_id.input_digest,
2737 SelectStrategy::LockForUpdate,
2738 )
2739 .await?;
2740
2741 if execution_ids_versions.is_empty() {
2742 tx.commit().await.map_err(DbErrorGeneric::from)?;
2743 return Ok(vec![]);
2744 }
2745
2746 debug!("Locking {execution_ids_versions:?}");
2747
2748 let mut locked_execs = Vec::with_capacity(execution_ids_versions.len());
2749 for (execution_id, version) in execution_ids_versions {
2750 match lock_single_execution(
2751 &tx,
2752 created_at,
2753 component_id.clone(),
2754 &execution_id,
2755 run_id,
2756 &version,
2757 executor_id,
2758 lock_expires_at,
2759 retry_config,
2760 )
2761 .await
2762 {
2763 Ok(locked) => locked_execs.push(locked),
2764 Err(err) => {
2765 warn!("Locking row {execution_id} failed - {err:?}");
2766 }
2767 }
2768 }
2769
2770 tx.commit().await.map_err(DbErrorGeneric::from)?;
2771 Ok(locked_execs)
2772 }
2773
2774 #[instrument(level = Level::DEBUG, skip(self))]
2775 async fn lock_one(
2776 &self,
2777 created_at: DateTime<Utc>,
2778 component_id: ComponentId,
2779 execution_id: &ExecutionId,
2780 run_id: RunId,
2781 version: Version,
2782 executor_id: ExecutorId,
2783 lock_expires_at: DateTime<Utc>,
2784 retry_config: ComponentRetryConfig,
2785 ) -> Result<LockedExecution, DbErrorWrite> {
2786 debug!(%execution_id, "lock_one");
2787 let mut client_guard = self.client.lock().await;
2788 let tx = client_guard
2789 .transaction()
2790 .await
2791 .map_err(DbErrorGeneric::from)?;
2792
2793 let res = lock_single_execution(
2794 &tx,
2795 created_at,
2796 component_id,
2797 execution_id,
2798 run_id,
2799 &version,
2800 executor_id,
2801 lock_expires_at,
2802 retry_config,
2803 )
2804 .await?;
2805
2806 tx.commit().await.map_err(DbErrorGeneric::from)?;
2807 Ok(res)
2808 }
2809
2810 #[instrument(level = Level::DEBUG, skip(self, req))]
2811 async fn append(
2812 &self,
2813 execution_id: ExecutionId,
2814 version: Version,
2815 req: AppendRequest,
2816 ) -> Result<AppendResponse, DbErrorWrite> {
2817 debug!(%req, "append");
2818 trace!(?req, "append");
2819 let created_at = req.created_at;
2820
2821 let mut client_guard = self.client.lock().await;
2822 let tx = client_guard
2823 .transaction()
2824 .await
2825 .map_err(DbErrorGeneric::from)?;
2826
2827 let (new_version, notifier) = append(&tx, &execution_id, req, version).await?;
2828
2829 tx.commit().await.map_err(DbErrorGeneric::from)?;
2830
2831 drop(client_guard);
2833
2834 self.notify_all(vec![notifier], created_at);
2835 Ok(new_version)
2836 }
2837
2838 #[instrument(level = Level::DEBUG, skip_all)]
2839 async fn append_batch_respond_to_parent(
2840 &self,
2841 events: AppendEventsToExecution,
2842 response: AppendResponseToExecution,
2843 current_time: DateTime<Utc>,
2844 ) -> Result<AppendBatchResponse, DbErrorWrite> {
2845 debug!("append_batch_respond_to_parent");
2846 if events.execution_id == response.parent_execution_id {
2847 return Err(DbErrorWrite::NonRetriable(
2848 DbErrorWriteNonRetriable::ValidationFailed(
2849 "Parameters `execution_id` and `parent_execution_id` cannot be the same".into(),
2850 ),
2851 ));
2852 }
2853 if events.batch.is_empty() {
2854 return Err(DbErrorWrite::NonRetriable(
2855 DbErrorWriteNonRetriable::ValidationFailed("batch cannot be empty".into()),
2856 ));
2857 }
2858
2859 let mut client_guard = self.client.lock().await;
2860 let tx = client_guard
2861 .transaction()
2862 .await
2863 .map_err(DbErrorGeneric::from)?;
2864
2865 let mut version = events.version;
2866 let mut notifiers = Vec::new();
2867
2868 for append_request in events.batch {
2869 let (v, n) = append(&tx, &events.execution_id, append_request, version).await?;
2870 version = v;
2871 notifiers.push(n);
2872 }
2873
2874 let pending_at_parent = append_response(
2875 &tx,
2876 &response.parent_execution_id,
2877 JoinSetResponseEventOuter {
2878 created_at: response.created_at,
2879 event: JoinSetResponseEvent {
2880 join_set_id: response.join_set_id,
2881 event: JoinSetResponse::ChildExecutionFinished {
2882 child_execution_id: response.child_execution_id,
2883 finished_version: response.finished_version,
2884 result: response.result,
2885 },
2886 },
2887 },
2888 )
2889 .await?;
2890 notifiers.push(pending_at_parent);
2891
2892 tx.commit().await.map_err(DbErrorGeneric::from)?;
2893 drop(client_guard);
2894
2895 self.notify_all(notifiers, current_time);
2896 Ok(version)
2897 }
2898
2899 #[instrument(level = Level::TRACE, skip(self, timeout_fut))]
2900 async fn wait_for_pending_by_ffqn(
2901 &self,
2902 pending_at_or_sooner: DateTime<Utc>,
2903 ffqns: Arc<[FunctionFqn]>,
2904 timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
2905 ) {
2906 let unique_tag: u64 = rand::random();
2907 let (sender, mut receiver) = mpsc::channel(1);
2908 {
2909 let mut pending_subscribers = self.pending_subscribers.lock().unwrap();
2910 for ffqn in ffqns.as_ref() {
2911 pending_subscribers.insert_ffqn(ffqn.clone(), (sender.clone(), unique_tag));
2912 }
2913 }
2914
2915 async {
2916 let mut db_has_pending = false;
2917 {
2918 let mut client_guard = self.client.lock().await;
2920 if let Ok(tx) = client_guard.transaction().await {
2922 if let Ok(res) = get_pending_by_ffqns(
2923 &tx,
2924 1,
2925 pending_at_or_sooner,
2926 &ffqns,
2927 SelectStrategy::Read,
2928 )
2929 .await
2930 && !res.is_empty()
2931 {
2932 db_has_pending = true;
2933 }
2934 let _ = tx.commit().await;
2936 }
2937 }
2938
2939 if db_has_pending {
2940 trace!("Not waiting, database already contains new pending executions");
2941 return;
2942 }
2943
2944 tokio::select! {
2945 _ = receiver.recv() => {
2946 trace!("Received a notification");
2947 }
2948 () = timeout_fut => {
2949 }
2950 }
2951 }
2952 .await;
2953
2954 {
2956 let mut pending_subscribers = self.pending_subscribers.lock().unwrap();
2957 for ffqn in ffqns.as_ref() {
2958 match pending_subscribers.remove_ffqn(ffqn) {
2959 Some((_, tag)) if tag == unique_tag => {}
2960 Some(other) => {
2961 pending_subscribers.insert_ffqn(ffqn.clone(), other);
2962 }
2963 None => {}
2964 }
2965 }
2966 }
2967 }
2968
2969 async fn wait_for_pending_by_component_id(
2970 &self,
2971 pending_at_or_sooner: DateTime<Utc>,
2972 component_id: &ComponentId,
2973 timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
2974 ) {
2975 let unique_tag: u64 = rand::random();
2976 let (sender, mut receiver) = mpsc::channel(1);
2977 {
2978 let mut pending_subscribers = self.pending_subscribers.lock().unwrap();
2979 pending_subscribers.insert_by_component(
2980 component_id.input_digest.clone(),
2981 (sender.clone(), unique_tag),
2982 );
2983 }
2984
2985 async {
2986 let mut db_has_pending = false;
2987 {
2988 let mut client_guard = self.client.lock().await;
2989 if let Ok(tx) = client_guard.transaction().await {
2990 if let Ok(res) = get_pending_by_component_input_digest(
2991 &tx,
2992 1,
2993 pending_at_or_sooner,
2994 &component_id.input_digest,
2995 SelectStrategy::Read,
2996 )
2997 .await
2998 && !res.is_empty()
2999 {
3000 db_has_pending = true;
3001 }
3002 let _ = tx.commit().await;
3003 }
3004 }
3005
3006 if db_has_pending {
3007 trace!("Not waiting, database already contains new pending executions");
3008 return;
3009 }
3010
3011 tokio::select! {
3012 _ = receiver.recv() => {
3013 trace!("Received a notification");
3014 }
3015 () = timeout_fut => {
3016 }
3017 }
3018 }
3019 .await;
3020
3021 {
3023 let mut pending_subscribers = self.pending_subscribers.lock().unwrap();
3024 match pending_subscribers.remove_by_component(&component_id.input_digest) {
3025 Some((_, tag)) if tag == unique_tag => {}
3026 Some(other) => {
3027 pending_subscribers
3028 .insert_by_component(component_id.input_digest.clone(), other);
3029 }
3030 None => {}
3031 }
3032 }
3033 }
3034
3035 async fn get_last_execution_event(
3036 &self,
3037 execution_id: &ExecutionId,
3038 ) -> Result<ExecutionEvent, DbErrorRead> {
3039 let mut client_guard = self.client.lock().await;
3040 let tx = client_guard
3041 .transaction()
3042 .await
3043 .map_err(DbErrorGeneric::from)?;
3044
3045 let event = get_last_execution_event(&tx, execution_id).await?;
3046
3047 tx.commit().await.map_err(DbErrorGeneric::from)?;
3048 Ok(event)
3049 }
3050}
3051#[async_trait]
3052impl DbConnection for PostgresConnection {
3053 #[instrument(level = Level::DEBUG, skip_all, fields(execution_id = %req.execution_id))]
3054 async fn create(&self, req: CreateRequest) -> Result<AppendResponse, DbErrorWrite> {
3055 debug!("create");
3056 trace!(?req, "create");
3057 let created_at = req.created_at;
3058
3059 let mut client_guard = self.client.lock().await;
3060 let tx = client_guard
3061 .transaction()
3062 .await
3063 .map_err(DbErrorGeneric::from)?;
3064
3065 let (version, notifier) = create_inner(&tx, req.clone()).await?;
3066
3067 tx.commit().await.map_err(DbErrorGeneric::from)?;
3068 drop(client_guard); self.notify_all(vec![notifier], created_at);
3071 Ok(version)
3072 }
3073
3074 #[instrument(level = Level::DEBUG, skip(self, batch))]
3075 async fn append_batch(
3076 &self,
3077 current_time: DateTime<Utc>,
3078 batch: Vec<AppendRequest>,
3079 execution_id: ExecutionId,
3080 version: Version,
3081 ) -> Result<AppendBatchResponse, DbErrorWrite> {
3082 debug!("append_batch");
3083 trace!(?batch, "append_batch");
3084 assert!(!batch.is_empty(), "Empty batch request");
3085
3086 let mut client_guard = self.client.lock().await;
3087 let tx = client_guard
3088 .transaction()
3089 .await
3090 .map_err(DbErrorGeneric::from)?;
3091
3092 let mut version = version;
3093 let mut notifier = None;
3094
3095 for append_request in batch {
3096 let (v, n) = append(&tx, &execution_id, append_request, version).await?;
3097 version = v;
3098 notifier = Some(n);
3099 }
3100
3101 tx.commit().await.map_err(DbErrorGeneric::from)?;
3102 drop(client_guard);
3103
3104 self.notify_all(
3105 vec![notifier.expect("checked that the batch is not empty")],
3106 current_time,
3107 );
3108 Ok(version)
3109 }
3110
3111 #[instrument(level = Level::DEBUG, skip(self, batch, child_req))]
3112 async fn append_batch_create_new_execution(
3113 &self,
3114 current_time: DateTime<Utc>,
3115 batch: Vec<AppendRequest>,
3116 execution_id: ExecutionId,
3117 version: Version,
3118 child_req: Vec<CreateRequest>,
3119 ) -> Result<AppendBatchResponse, DbErrorWrite> {
3120 debug!("append_batch_create_new_execution");
3121 trace!(?batch, ?child_req, "append_batch_create_new_execution");
3122 assert!(!batch.is_empty(), "Empty batch request");
3123
3124 let mut client_guard = self.client.lock().await;
3125 let tx = client_guard
3126 .transaction()
3127 .await
3128 .map_err(DbErrorGeneric::from)?;
3129
3130 let mut version = version;
3131 let mut notifier = None;
3132
3133 for append_request in batch {
3134 let (v, n) = append(&tx, &execution_id, append_request, version).await?;
3135 version = v;
3136 notifier = Some(n);
3137 }
3138
3139 let mut notifiers = Vec::new();
3140 notifiers.push(notifier.expect("checked that the batch is not empty"));
3141
3142 for req in child_req {
3143 let (_, n) = create_inner(&tx, req).await?;
3144 notifiers.push(n);
3145 }
3146
3147 tx.commit().await.map_err(DbErrorGeneric::from)?;
3148 drop(client_guard);
3149
3150 self.notify_all(notifiers, current_time);
3151 Ok(version)
3152 }
3153
3154 #[instrument(level = Level::DEBUG, skip(self, timeout_fut))]
3155 async fn subscribe_to_next_responses(
3156 &self,
3157 execution_id: &ExecutionId,
3158 start_idx: u32,
3159 timeout_fut: Pin<Box<dyn Future<Output = TimeoutOutcome> + Send>>,
3160 ) -> Result<Vec<JoinSetResponseEventOuter>, DbErrorReadWithTimeout> {
3161 debug!("next_responses");
3162 let unique_tag: u64 = rand::random();
3163 let execution_id_clone = execution_id.clone();
3164
3165 let cleanup = || {
3166 let mut guard = self.response_subscribers.lock().unwrap();
3167 match guard.remove(&execution_id_clone) {
3168 Some((_, tag)) if tag == unique_tag => {}
3169 Some(other) => {
3170 guard.insert(execution_id_clone.clone(), other);
3171 }
3172 None => {}
3173 }
3174 };
3175
3176 let receiver = {
3177 let mut client_guard = self.client.lock().await;
3178 let tx = client_guard
3179 .transaction()
3180 .await
3181 .map_err(DbErrorRead::from)?;
3182
3183 let (sender, receiver) = oneshot::channel();
3189 self.response_subscribers
3190 .lock()
3191 .unwrap()
3192 .insert(execution_id.clone(), (sender, unique_tag));
3193
3194 let responses = get_responses_with_offset(&tx, execution_id, start_idx).await?;
3195
3196 if responses.is_empty() {
3197 tx.commit().await.map_err(|err| {
3199 cleanup(); DbErrorRead::from(err)
3201 })?;
3202 receiver
3203 } else {
3204 cleanup(); tx.commit().await.map_err(DbErrorRead::from)?;
3206 return Ok(responses);
3207 }
3208 };
3209
3210 let res = tokio::select! {
3211 resp = receiver => {
3212 match resp {
3213 Ok(resp) => Ok(vec![resp]),
3214 Err(_) => Err(DbErrorReadWithTimeout::from(DbErrorGeneric::Close)),
3215 }
3216 }
3217 outcome = timeout_fut => Err(DbErrorReadWithTimeout::Timeout(outcome)),
3218 };
3219
3220 cleanup();
3221 res
3222 }
3223
3224 async fn wait_for_finished_result(
3225 &self,
3226 execution_id: &ExecutionId,
3227 timeout_fut: Option<Pin<Box<dyn Future<Output = TimeoutOutcome> + Send>>>,
3228 ) -> Result<SupportedFunctionReturnValue, DbErrorReadWithTimeout> {
3229 let unique_tag: u64 = rand::random();
3230 let execution_id_clone = execution_id.clone();
3231
3232 let cleanup = || {
3233 let mut guard = self.execution_finished_subscribers.lock().unwrap();
3234 if let Some(subscribers) = guard.get_mut(&execution_id_clone) {
3235 subscribers.remove(&unique_tag);
3236 }
3237 };
3238
3239 let receiver = {
3240 let mut client_guard = self.client.lock().await;
3241 let tx = client_guard
3242 .transaction()
3243 .await
3244 .map_err(DbErrorRead::from)?;
3245
3246 let (sender, receiver) = oneshot::channel();
3248 {
3249 let mut guard = self.execution_finished_subscribers.lock().unwrap();
3250 guard
3251 .entry(execution_id.clone())
3252 .or_default()
3253 .insert(unique_tag, sender);
3254 }
3255
3256 let pending_state = get_combined_state(&tx, execution_id).await?.pending_state;
3257
3258 if let PendingState::Finished { finished, .. } = pending_state {
3259 let event = get_execution_event(&tx, execution_id, finished.version).await?;
3260 tx.commit().await.map_err(DbErrorRead::from)?;
3261 cleanup();
3262
3263 if let ExecutionRequest::Finished { result, .. } = event.event {
3264 return Ok(result);
3265 }
3266 error!("Mismatch, expected Finished row: {event:?} based on t_state {finished}");
3267 return Err(DbErrorReadWithTimeout::from(consistency_db_err(
3268 "cannot get finished event based on t_state version",
3269 )));
3270 }
3271 tx.commit().await.map_err(DbErrorRead::from)?;
3272 receiver
3273 };
3274
3275 let timeout_fut = timeout_fut.unwrap_or_else(|| Box::pin(std::future::pending()));
3276 let res = tokio::select! {
3277 resp = receiver => {
3278 match resp {
3279 Ok(retval) => Ok(retval),
3280 Err(_recv_err) => Err(DbErrorGeneric::Close.into())
3281 }
3282 }
3283 outcome = timeout_fut => Err(DbErrorReadWithTimeout::Timeout(outcome)),
3284 };
3285
3286 cleanup();
3287 res
3288 }
3289
3290 #[instrument(level = Level::DEBUG, skip_all, fields(%join_set_id, %execution_id))]
3291 async fn append_delay_response(
3292 &self,
3293 created_at: DateTime<Utc>,
3294 execution_id: ExecutionId,
3295 join_set_id: JoinSetId,
3296 delay_id: DelayId,
3297 result: Result<(), ()>,
3298 ) -> Result<AppendDelayResponseOutcome, DbErrorWrite> {
3299 debug!("append_delay_response");
3300 let event = JoinSetResponseEventOuter {
3301 created_at,
3302 event: JoinSetResponseEvent {
3303 join_set_id,
3304 event: JoinSetResponse::DelayFinished {
3305 delay_id: delay_id.clone(),
3306 result,
3307 },
3308 },
3309 };
3310
3311 let mut client_guard = self.client.lock().await;
3312 let tx = client_guard
3313 .transaction()
3314 .await
3315 .map_err(DbErrorGeneric::from)?;
3316
3317 let res = append_response(&tx, &execution_id, event).await;
3318
3319 match res {
3320 Ok(notifier) => {
3321 tx.commit().await.map_err(DbErrorGeneric::from)?;
3322 drop(client_guard);
3323 self.notify_all(vec![notifier], created_at);
3324 Ok(AppendDelayResponseOutcome::Success)
3325 }
3326 Err(DbErrorWrite::NonRetriable(DbErrorWriteNonRetriable::Conflict)) => {
3327 tx.rollback().await.map_err(DbErrorGeneric::from)?;
3330
3331 let tx = client_guard
3333 .transaction()
3334 .await
3335 .map_err(DbErrorGeneric::from)?;
3336 let delay_success = delay_response(&tx, &execution_id, &delay_id).await?;
3337 tx.commit().await.map_err(DbErrorGeneric::from)?;
3338
3339 match delay_success {
3340 Some(true) => Ok(AppendDelayResponseOutcome::AlreadyFinished),
3341 Some(false) => Ok(AppendDelayResponseOutcome::AlreadyCancelled),
3342 None => Err(DbErrorWrite::Generic(DbErrorGeneric::Uncategorized(
3343 "insert failed yet select did not find the response".into(),
3344 ))),
3345 }
3346 }
3347 Err(err) => {
3348 let _ = tx.rollback().await; Err(err)
3350 }
3351 }
3352 }
3353
3354 #[instrument(level = Level::DEBUG, skip_all)]
3355 async fn append_backtrace(&self, append: BacktraceInfo) -> Result<(), DbErrorWrite> {
3356 debug!("append_backtrace");
3357 let mut client_guard = self.client.lock().await;
3358 let tx = client_guard
3359 .transaction()
3360 .await
3361 .map_err(DbErrorGeneric::from)?;
3362
3363 append_backtrace(&tx, &append).await?;
3364
3365 tx.commit().await.map_err(DbErrorGeneric::from)?;
3366 Ok(())
3367 }
3368
3369 #[instrument(level = Level::DEBUG, skip_all)]
3370 async fn append_backtrace_batch(&self, batch: Vec<BacktraceInfo>) -> Result<(), DbErrorWrite> {
3371 debug!("append_backtrace_batch");
3372 let mut client_guard = self.client.lock().await;
3373 let tx = client_guard
3374 .transaction()
3375 .await
3376 .map_err(DbErrorGeneric::from)?;
3377
3378 for append in batch {
3379 append_backtrace(&tx, &append).await?;
3380 }
3381
3382 tx.commit().await.map_err(DbErrorGeneric::from)?;
3383 Ok(())
3384 }
3385
3386 #[instrument(level = Level::TRACE, skip(self))]
3388 async fn get_expired_timers(
3389 &self,
3390 at: DateTime<Utc>,
3391 ) -> Result<Vec<ExpiredTimer>, DbErrorGeneric> {
3392 let mut client_guard = self.client.lock().await;
3393 let tx = client_guard
3394 .transaction()
3395 .await
3396 .map_err(DbErrorGeneric::from)?;
3397
3398 let rows = tx
3400 .query(
3401 "SELECT execution_id, join_set_id, delay_id FROM t_delay WHERE expires_at <= $1",
3402 &[&at],
3403 )
3404 .await
3405 .map_err(DbErrorGeneric::from)?;
3406
3407 let mut expired_timers = Vec::with_capacity(rows.len());
3408 for row in rows {
3409 let unpack = || -> Result<ExpiredTimer, DbErrorGeneric> {
3410 let execution_id: String = get!(row, "execution_id");
3411 let execution_id = ExecutionId::from_str(&execution_id)
3412 .map_err(|err| consistency_db_err(err.to_string()))?;
3413 let join_set_id: String = get!(row, "join_set_id");
3414 let join_set_id = JoinSetId::from_str(&join_set_id)
3415 .map_err(|err| consistency_db_err(err.to_string()))?;
3416 let delay_id: String = get!(row, "delay_id");
3417 let delay_id = DelayId::from_str(&delay_id)
3418 .map_err(|err| consistency_db_err(err.to_string()))?;
3419
3420 Ok(ExpiredTimer::Delay(ExpiredDelay {
3421 execution_id,
3422 join_set_id,
3423 delay_id,
3424 }))
3425 };
3426
3427 match unpack() {
3428 Ok(timer) => expired_timers.push(timer),
3429 Err(err) => warn!("Skipping corrupted row in get_expired_timers (delays): {err:?}"),
3430 }
3431 }
3432
3433 let rows = tx.query(
3435 &format!(
3436 "SELECT execution_id, last_lock_version, corresponding_version, intermittent_event_count, max_retries, retry_exp_backoff_millis, executor_id, run_id \
3437 FROM t_state \
3438 WHERE pending_expires_finished <= $1 AND state = '{STATE_LOCKED}'"
3439 ),
3440 &[&at]
3441 ).await.map_err(DbErrorGeneric::from)?;
3442
3443 for row in rows {
3444 let unpack = || -> Result<ExpiredTimer, DbErrorGeneric> {
3445 let execution_id: String = get!(row, "execution_id");
3446 let execution_id = ExecutionId::from_str(&execution_id)
3447 .map_err(|err| consistency_db_err(err.to_string()))?;
3448 let last_lock_version: i64 = get!(row, "last_lock_version");
3449 let last_lock_version = Version::try_from(last_lock_version)
3450 .map_err(|_| consistency_db_err("version must be non-negative"))?;
3451
3452 let corresponding_version: i64 = get!(row, "corresponding_version");
3453 let corresponding_version = Version::try_from(corresponding_version)
3454 .map_err(|_| consistency_db_err("version must be non-negative"))?;
3455
3456 let intermittent_event_count =
3457 u32::try_from(get!(row, "intermittent_event_count", i64)).map_err(|_| {
3458 consistency_db_err("`intermittent_event_count` must not be negative")
3459 })?;
3460
3461 let max_retries = get!(row, "max_retries", Option<i64>)
3462 .map(u32::try_from)
3463 .transpose()
3464 .map_err(|_| consistency_db_err("`max_retries` must not be negative"))?;
3465 let retry_exp_backoff_millis =
3466 u32::try_from(get!(row, "retry_exp_backoff_millis", i64)).map_err(|_| {
3467 consistency_db_err("`retry_exp_backoff_millis` must not be negative")
3468 })?;
3469 let executor_id: String = get!(row, "executor_id");
3470 let executor_id = ExecutorId::from_str(&executor_id)
3471 .map_err(|err| consistency_db_err(err.to_string()))?;
3472 let run_id: String = get!(row, "run_id");
3473 let run_id =
3474 RunId::from_str(&run_id).map_err(|err| consistency_db_err(err.to_string()))?;
3475
3476 Ok(ExpiredTimer::Lock(ExpiredLock {
3477 execution_id,
3478 locked_at_version: last_lock_version,
3479 next_version: corresponding_version.increment(),
3480 intermittent_event_count,
3481 max_retries,
3482 retry_exp_backoff: Duration::from_millis(u64::from(retry_exp_backoff_millis)),
3483 locked_by: LockedBy {
3484 executor_id,
3485 run_id,
3486 },
3487 }))
3488 };
3489
3490 match unpack() {
3491 Ok(timer) => expired_timers.push(timer),
3492 Err(err) => warn!("Skipping corrupted row in get_expired_timers (locks): {err:?}"),
3493 }
3494 }
3495
3496 tx.commit().await.map_err(DbErrorGeneric::from)?;
3497
3498 if !expired_timers.is_empty() {
3499 debug!("get_expired_timers found {expired_timers:?}");
3500 }
3501 Ok(expired_timers)
3502 }
3503
3504 async fn get_execution_event(
3505 &self,
3506 execution_id: &ExecutionId,
3507 version: &Version,
3508 ) -> Result<ExecutionEvent, DbErrorRead> {
3509 let mut client_guard = self.client.lock().await;
3510 let tx = client_guard
3511 .transaction()
3512 .await
3513 .map_err(DbErrorRead::from)?;
3514
3515 let event = get_execution_event(&tx, execution_id, version.0).await?;
3516
3517 tx.commit().await.map_err(DbErrorRead::from)?;
3518 Ok(event)
3519 }
3520
3521 async fn get_pending_state(
3522 &self,
3523 execution_id: &ExecutionId,
3524 ) -> Result<PendingState, DbErrorRead> {
3525 let mut client_guard = self.client.lock().await;
3526 let tx = client_guard
3527 .transaction()
3528 .await
3529 .map_err(DbErrorRead::from)?;
3530
3531 let state = get_combined_state(&tx, execution_id).await?.pending_state;
3532
3533 tx.commit().await.map_err(DbErrorRead::from)?;
3534 Ok(state)
3535 }
3536}
3537
3538#[async_trait]
3539impl DbExternalApi for PostgresConnection {
3540 #[instrument(level = Level::DEBUG, skip_all)]
3541 async fn get_backtrace(
3542 &self,
3543 execution_id: &ExecutionId,
3544 filter: BacktraceFilter,
3545 ) -> Result<BacktraceInfo, DbErrorRead> {
3546 debug!("get_backtrace");
3547
3548 let mut client_guard = self.client.lock().await;
3549 let tx = client_guard
3550 .transaction()
3551 .await
3552 .map_err(DbErrorRead::from)?;
3553
3554 let mut params: Vec<Box<dyn tokio_postgres::types::ToSql + Sync + Send>> = Vec::new();
3555
3556 params.push(Box::new(execution_id.to_string())); let p_execution_id_idx = format!("${}", params.len()); let mut sql = String::new();
3560 write!(
3561 &mut sql,
3562 "SELECT component_id, version_min_including, version_max_excluding, wasm_backtrace \
3563 FROM t_backtrace WHERE execution_id = {p_execution_id_idx}"
3564 )
3565 .unwrap();
3566
3567 match &filter {
3568 BacktraceFilter::Specific(version) => {
3569 params.push(Box::new(i64::from(version.0))); let p_ver_idx = format!("${}", params.len()); write!(
3572 &mut sql,
3573 " AND version_min_including <= {p_ver_idx} AND version_max_excluding > {p_ver_idx}"
3574 )
3575 .unwrap();
3576 }
3577 BacktraceFilter::First => {
3578 sql.push_str(" ORDER BY version_min_including LIMIT 1");
3579 }
3580 BacktraceFilter::Last => {
3581 sql.push_str(" ORDER BY version_min_including DESC LIMIT 1");
3582 }
3583 }
3584
3585 let params_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> =
3586 params.iter().map(|p| p.as_ref() as _).collect();
3587
3588 let row = tx
3589 .query_one(&sql, ¶ms_refs)
3590 .await
3591 .map_err(DbErrorRead::from)?;
3592
3593 let component_id: Json<ComponentId> = get!(row, "component_id");
3594 let component_id = component_id.0;
3595
3596 let version_min_including = Version::try_from(get!(row, "version_min_including", i64))
3597 .map_err(|_| consistency_db_err("version must be non-negative"))?;
3598
3599 let version_max_excluding = Version::try_from(get!(row, "version_max_excluding", i64))
3600 .map_err(|_| consistency_db_err("version must be non-negative"))?;
3601
3602 let wasm_backtrace: Json<WasmBacktrace> = get!(row, "wasm_backtrace");
3604 let wasm_backtrace = wasm_backtrace.0;
3605
3606 tx.commit().await.map_err(DbErrorRead::from)?;
3607
3608 Ok(BacktraceInfo {
3609 execution_id: execution_id.clone(),
3610 component_id,
3611 version_min_including,
3612 version_max_excluding,
3613 wasm_backtrace,
3614 })
3615 }
3616
3617 async fn list_executions(
3618 &self,
3619 ffqn: Option<FunctionFqn>,
3620 top_level_only: bool,
3621 pagination: ExecutionListPagination,
3622 ) -> Result<Vec<ExecutionWithState>, DbErrorGeneric> {
3623 let mut client_guard = self.client.lock().await;
3624 let tx = client_guard
3625 .transaction()
3626 .await
3627 .map_err(DbErrorGeneric::from)?;
3628
3629 let result = list_executions(&tx, ffqn.as_ref(), top_level_only, &pagination).await?;
3630
3631 tx.commit().await.map_err(DbErrorGeneric::from)?;
3632 Ok(result)
3633 }
3634
3635 #[instrument(level = Level::DEBUG, skip(self))]
3636 async fn list_execution_events(
3637 &self,
3638 execution_id: &ExecutionId,
3639 since: &Version,
3640 max_length: VersionType,
3641 include_backtrace_id: bool,
3642 ) -> Result<Vec<ExecutionEvent>, DbErrorRead> {
3643 let mut client_guard = self.client.lock().await;
3644 let tx = client_guard
3645 .transaction()
3646 .await
3647 .map_err(DbErrorRead::from)?;
3648
3649 let result = list_execution_events(
3650 &tx,
3651 execution_id,
3652 since.0,
3653 since.0 + max_length,
3654 include_backtrace_id,
3655 )
3656 .await?;
3657
3658 tx.commit().await.map_err(DbErrorRead::from)?;
3659 Ok(result)
3660 }
3661
3662 async fn list_responses(
3663 &self,
3664 execution_id: &ExecutionId,
3665 pagination: Pagination<u32>,
3666 ) -> Result<Vec<ResponseWithCursor>, DbErrorRead> {
3667 let mut client_guard = self.client.lock().await;
3668 let tx = client_guard
3669 .transaction()
3670 .await
3671 .map_err(DbErrorRead::from)?;
3672
3673 let result = list_responses(&tx, execution_id, Some(pagination)).await?;
3674
3675 tx.commit().await.map_err(DbErrorRead::from)?;
3676 Ok(result)
3677 }
3678
3679 async fn upgrade_execution_component(
3680 &self,
3681 execution_id: &ExecutionId,
3682 old: &InputContentDigest,
3683 new: &InputContentDigest,
3684 ) -> Result<(), DbErrorWrite> {
3685 let mut client_guard = self.client.lock().await;
3686 let tx = client_guard
3687 .transaction()
3688 .await
3689 .map_err(DbErrorGeneric::from)?;
3690
3691 upgrade_execution_component(&tx, execution_id, old, new).await?;
3692
3693 tx.commit().await.map_err(DbErrorGeneric::from)?;
3694 Ok(())
3695 }
3696}
3697
3698#[async_trait]
3699impl DbPoolCloseable for PostgresPool {
3700 async fn close(&self) {
3701 self.pool.close();
3702 }
3703}
3704
3705#[cfg(feature = "test")]
3706#[async_trait]
3707impl concepts::storage::DbConnectionTest for PostgresConnection {
3708 #[instrument(level = Level::DEBUG, skip(self, response_event), fields(join_set_id = %response_event.join_set_id))]
3709 async fn append_response(
3710 &self,
3711 created_at: DateTime<Utc>,
3712 execution_id: ExecutionId,
3713 response_event: JoinSetResponseEvent,
3714 ) -> Result<(), DbErrorWrite> {
3715 debug!("append_response");
3716 let event = JoinSetResponseEventOuter {
3717 created_at,
3718 event: response_event,
3719 };
3720
3721 let mut client_guard = self.client.lock().await;
3722 let tx = client_guard
3723 .transaction()
3724 .await
3725 .map_err(DbErrorGeneric::from)?;
3726
3727 let notifier = append_response(&tx, &execution_id, event).await?;
3728
3729 tx.commit().await.map_err(DbErrorGeneric::from)?;
3730 drop(client_guard);
3731
3732 self.notify_all(vec![notifier], created_at);
3733 Ok(())
3734 }
3735
3736 #[instrument(level = Level::DEBUG, skip(self))]
3737 async fn get(
3738 &self,
3739 execution_id: &ExecutionId,
3740 ) -> Result<concepts::storage::ExecutionLog, DbErrorRead> {
3741 trace!("get");
3742 let mut client_guard = self.client.lock().await;
3743 let tx = client_guard
3744 .transaction()
3745 .await
3746 .map_err(DbErrorRead::from)?;
3747
3748 let res = get(&tx, execution_id).await?;
3749
3750 tx.commit().await.map_err(DbErrorRead::from)?;
3751 Ok(res)
3752 }
3753}
3754
3755#[cfg(feature = "test")]
3756impl PostgresPool {
3757 pub async fn drop_database(&self) {
3758 let mut cfg = Config::new();
3759 cfg.host = Some(self.config.host.clone());
3760 cfg.user = Some(self.config.user.clone());
3761 cfg.password = Some(self.config.password.clone());
3762 cfg.dbname = Some(ADMIN_DB_NAME.into());
3763 cfg.manager = Some(ManagerConfig {
3764 recycling_method: RecyclingMethod::Fast,
3765 });
3766
3767 let pool = cfg
3768 .create_pool(None, NoTls)
3769 .map_err(|err| {
3770 error!("Cannot create the default pool - {err:?}");
3771 InitializationError
3772 })
3773 .unwrap();
3774
3775 let client = pool
3776 .get()
3777 .await
3778 .map_err(|err| {
3779 error!("Cannot get a connection from the default pool - {err:?}");
3780 InitializationError
3781 })
3782 .unwrap();
3783 for _ in 0..3 {
3784 let res = client
3785 .execute(&format!("DROP DATABASE {}", self.config.db_name), &[])
3786 .await; if res.is_ok() {
3788 debug!("Database '{}' dropped.", self.config.db_name);
3789 return;
3790 }
3791 debug!("Dropping db failed - {res:?}",);
3792 }
3793 warn!("Did not drop database {}", self.config.db_name);
3794 }
3795}