1use crate::postgres_dao::ddl::ADMIN_DB_NAME;
2use async_trait::async_trait;
3use chrono::{DateTime, Utc};
4use concepts::{
5 ComponentId, ComponentRetryConfig, ComponentType, ExecutionId, FunctionFqn, JoinSetId,
6 StrVariant, SupportedFunctionReturnValue,
7 component_id::{ComponentDigest, Digest},
8 prefixed_ulid::{DelayId, DeploymentId, ExecutionIdDerived, ExecutorId, RunId},
9 storage::{
10 AppendBatchResponse, AppendDelayResponseOutcome, AppendEventsToExecution, AppendRequest,
11 AppendResponse, AppendResponseToExecution, BacktraceFilter, BacktraceInfo,
12 ComponentMetadataRecord, CreateRequest, DUMMY_CREATED, DUMMY_HISTORY_EVENT, DbConnection,
13 DbErrorGeneric, DbErrorRead, DbErrorReadWithTimeout, DbErrorStubResponse, DbErrorWrite,
14 DbErrorWriteNonRetriable, DbExecutor, DbExternalApi, DbPool, DbPoolCloseable,
15 DeploymentComponentDetail, DeploymentComponentRecord, DeploymentRecord, DeploymentState,
16 DeploymentStatus, ExecutionEvent, ExecutionListPagination, ExecutionRequest,
17 ExecutionWithState, ExecutionWithStateRequestsResponses, ExpiredDelay, ExpiredLock,
18 ExpiredTimer, HISTORY_EVENT_TYPE_JOIN_NEXT, HistoryEvent, JoinSetRequest, JoinSetResponse,
19 JoinSetResponseEvent, JoinSetResponseEventOuter, ListExecutionEventsResponse,
20 ListExecutionsFilter, ListLogsResponse, ListResponsesResponse, LockPendingResponse, Locked,
21 LockedBy, LockedExecution, LogEntry, LogEntryRow, LogFilter, LogInfoAppendRow, LogLevel,
22 LogStreamType, Pagination, PendingState, PendingStateBlockedByJoinSet,
23 PendingStateFinishedResultKind, PendingStateMergedPause, ResponseCursor,
24 ResponseWithCursor, STATE_BLOCKED_BY_JOIN_SET, STATE_FINISHED, STATE_LOCKED,
25 STATE_PENDING_AT, TimeoutOutcome, Version, VersionType, WasmBacktrace,
26 },
27};
28use db_common::{
29 AppendNotifier, CombinedState, CombinedStateDTO, NotifierExecutionFinished, NotifierPendingAt,
30 PendingFfqnSubscribersHolder,
31};
32use deadpool_postgres::{Client, ManagerConfig, Pool, RecyclingMethod};
33use hashbrown::HashMap;
34use secrecy::{ExposeSecret as _, SecretString};
35use sha2::{Digest as _, Sha256};
36use std::{collections::VecDeque, pin::Pin, str::FromStr as _, sync::Arc, time::Duration};
37use std::{fmt::Write as _, panic::Location};
38use strum::IntoEnumIterator as _;
39use tokio::sync::{mpsc, oneshot};
40use tokio_postgres::{
41 NoTls, Row, Transaction,
42 row::RowIndex,
43 types::{FromSql, Json, ToSql},
44};
45use tracing::{Level, debug, error, info, instrument, trace, warn};
46use tracing_error::SpanTrace;
47
48#[track_caller]
49fn get<'a, T: FromSql<'a>, I: RowIndex + std::fmt::Display + Copy>(
50 row: &'a Row,
51 name: I,
52) -> Result<T, DbErrorGeneric> {
53 match row.try_get(name) {
54 Ok(ok) => Ok(ok),
55 Err(err) => {
56 Err(consistency_db_err(format!(
58 "Failed to retrieve column '{name}': {err:?}"
59 )))
60 }
61 }
62}
63
64mod ddl {
65 pub const ADMIN_DB_NAME: &str = "postgres";
66}
67
68mod embedded {
69 refinery::embed_migrations!("migrations");
70}
71
72#[derive(Debug, Clone)]
73pub struct PostgresConfig {
74 pub host: String,
75 pub user: String,
76 pub password: SecretString,
77 pub db_name: String,
78}
79
80#[derive(Debug, thiserror::Error)]
81#[error("initialization error")]
82pub struct InitializationError;
83
84async fn create_database(
85 config: &PostgresConfig,
86 provision_policy: ProvisionPolicy,
87) -> Result<DbInitialzationOutcome, InitializationError> {
88 let mut admin_cfg = deadpool_postgres::Config::new();
89 admin_cfg.host = Some(config.host.clone());
90 admin_cfg.user = Some(config.user.clone());
91 admin_cfg.password = Some(config.password.expose_secret().to_string());
92 admin_cfg.dbname = Some(ADMIN_DB_NAME.into());
93 admin_cfg.manager = Some(ManagerConfig {
94 recycling_method: RecyclingMethod::Fast,
95 });
96
97 let admin_pool = admin_cfg.create_pool(None, NoTls).map_err(|err| {
98 error!("Cannot create the default pool - {err:?}");
99 InitializationError
100 })?;
101
102 let client = admin_pool.get().await.map_err(|err| {
103 error!("Cannot get a connection from the default pool - {err:?}");
104 InitializationError
105 })?;
106
107 let row = client
108 .query_opt(
109 &format!(
110 "SELECT 1 FROM pg_database WHERE datname = '{}'",
111 config.db_name
112 ),
113 &[],
114 )
115 .await
116 .map_err(|err| {
117 error!("Cannot select from the default database - {err:?}");
118 InitializationError
119 })?;
120
121 match (row, provision_policy) {
122 (None, ProvisionPolicy::MustCreate | ProvisionPolicy::Auto) => {
123 client
124 .execute(&format!("CREATE DATABASE {}", config.db_name), &[])
125 .await
126 .map_err(|err| {
127 error!("Cannot create the database - {err:?}");
128 InitializationError
129 })?;
130 info!("Database '{}' created.", config.db_name);
131 Ok(DbInitialzationOutcome::Created)
132 }
133 (Some(_), ProvisionPolicy::Auto) => {
134 info!("Database '{}' exists.", config.db_name);
135 Ok(DbInitialzationOutcome::Existing)
136 }
137 (Some(_), ProvisionPolicy::MustCreate) => {
138 warn!("Database '{}' already exists.", config.db_name);
139 Err(InitializationError)
140 }
141 (_, ProvisionPolicy::NeverCreate) => unreachable!("checked by the caller"),
142 }
143}
144
145type ResponseSubscribers =
147 Arc<std::sync::Mutex<HashMap<ExecutionId, (oneshot::Sender<ResponseWithCursor>, u64)>>>;
148type PendingSubscribers = Arc<std::sync::Mutex<PendingFfqnSubscribersHolder>>;
149type ExecutionFinishedSubscribers = std::sync::Mutex<
150 HashMap<ExecutionId, HashMap<u64, oneshot::Sender<SupportedFunctionReturnValue>>>,
151>;
152
153pub struct PostgresPool {
154 pool: Pool,
155 response_subscribers: ResponseSubscribers,
156 pending_subscribers: PendingSubscribers,
157 execution_finished_subscribers: Arc<ExecutionFinishedSubscribers>,
158 pub config: PostgresConfig,
160}
161
162#[async_trait]
163impl DbPool for PostgresPool {
164 async fn db_exec_conn(&self) -> Result<Box<dyn DbExecutor>, DbErrorGeneric> {
165 let client = self.pool.get().await?;
166
167 Ok(Box::new(PostgresConnection {
168 client: tokio::sync::Mutex::new(client),
169 response_subscribers: self.response_subscribers.clone(),
170 pending_subscribers: self.pending_subscribers.clone(),
171 execution_finished_subscribers: self.execution_finished_subscribers.clone(),
172 }))
173 }
174
175 async fn connection(&self) -> Result<Box<dyn DbConnection>, DbErrorGeneric> {
176 let client = self.pool.get().await?;
177
178 Ok(Box::new(PostgresConnection {
179 client: tokio::sync::Mutex::new(client),
180 response_subscribers: self.response_subscribers.clone(),
181 pending_subscribers: self.pending_subscribers.clone(),
182 execution_finished_subscribers: self.execution_finished_subscribers.clone(),
183 }))
184 }
185
186 async fn external_api_conn(&self) -> Result<Box<dyn DbExternalApi>, DbErrorGeneric> {
187 let client = self.pool.get().await?;
188
189 Ok(Box::new(PostgresConnection {
190 client: tokio::sync::Mutex::new(client),
191 response_subscribers: self.response_subscribers.clone(),
192 pending_subscribers: self.pending_subscribers.clone(),
193 execution_finished_subscribers: self.execution_finished_subscribers.clone(),
194 }))
195 }
196
197 #[cfg(feature = "test")]
198 async fn connection_test(
199 &self,
200 ) -> Result<Box<dyn concepts::storage::DbConnectionTest>, DbErrorGeneric> {
201 let client = self.pool.get().await?;
202
203 Ok(Box::new(PostgresConnection {
204 client: tokio::sync::Mutex::new(client),
205 response_subscribers: self.response_subscribers.clone(),
206 pending_subscribers: self.pending_subscribers.clone(),
207 execution_finished_subscribers: self.execution_finished_subscribers.clone(),
208 }))
209 }
210}
211
212pub struct PostgresConnection {
213 client: tokio::sync::Mutex<Client>, response_subscribers: ResponseSubscribers,
215 pending_subscribers: PendingSubscribers,
216 execution_finished_subscribers: Arc<ExecutionFinishedSubscribers>,
217}
218
219#[derive(Debug, Clone, Copy, PartialEq, Eq)]
220pub enum ProvisionPolicy {
221 NeverCreate,
222 Auto,
224 MustCreate,
226}
227
228#[derive(Debug, Clone, Copy, PartialEq, Eq)]
229pub enum DbInitialzationOutcome {
230 Created,
231 Existing,
232}
233
234impl PostgresPool {
235 #[instrument(skip_all, name = "postgres_new")]
236 pub async fn new(
237 config: PostgresConfig,
238 provision_policy: ProvisionPolicy,
239 ) -> Result<PostgresPool, InitializationError> {
240 Self::new_with_outcome(config, provision_policy)
241 .await
242 .map(|(db, _)| db)
243 }
244
245 pub async fn new_with_outcome(
246 config: PostgresConfig,
247 provision_policy: ProvisionPolicy,
248 ) -> Result<(PostgresPool, DbInitialzationOutcome), InitializationError> {
249 let outcome = if matches!(
250 provision_policy,
251 ProvisionPolicy::Auto | ProvisionPolicy::MustCreate
252 ) {
253 create_database(&config, provision_policy).await?
254 } else {
255 DbInitialzationOutcome::Existing
256 };
257 let mut cfg = deadpool_postgres::Config::new();
258 cfg.host = Some(config.host.clone());
259 cfg.user = Some(config.user.clone());
260 cfg.password = Some(config.password.expose_secret().to_string());
261 cfg.dbname = Some(config.db_name.clone());
262 cfg.manager = Some(ManagerConfig {
263 recycling_method: RecyclingMethod::Fast,
264 });
265
266 let pool = cfg.create_pool(None, NoTls).map_err(|err| {
267 error!("Cannot create the database pool - {err:?}");
268 InitializationError
269 })?;
270 let mut client = pool.get().await.map_err(|err| {
271 error!("Cannot get a connection from the database pool - {err:?}");
272 InitializationError
273 })?;
274
275 embedded::migrations::runner()
276 .run_async(&mut **client)
277 .await
278 .map_err(|err| {
279 error!("Cannot run migrations - {err:?}");
280 InitializationError
281 })?;
282
283 debug!("Database schema initialized.");
284
285 Ok((
286 PostgresPool {
287 pool,
288 execution_finished_subscribers: Arc::default(),
289 pending_subscribers: Arc::default(),
290 response_subscribers: Arc::default(),
291 config,
292 },
293 outcome,
294 ))
295 }
296}
297
298fn deployment_record_from_pg_row(row: &Row) -> Result<DeploymentRecord, DbErrorRead> {
299 let deployment_id_str: String = get(row, "deployment_id")?;
300 let deployment_id = deployment_id_str.parse::<DeploymentId>().map_err(|e| {
301 DbErrorRead::Generic(consistency_db_err(format!("invalid deployment_id: {e}")))
302 })?;
303 let status_str: String = get(row, "status")?;
304 let status = status_str
305 .parse::<DeploymentStatus>()
306 .map_err(|e| DbErrorRead::Generic(consistency_db_err(format!("invalid status: {e}"))))?;
307 Ok(DeploymentRecord {
308 deployment_id,
309 created_at: get(row, "created_at")?,
310 last_active_at: get(row, "last_active_at")?,
311 status,
312 config_json: get(row, "config_json")?,
313 obelisk_version: get(row, "obelisk_version")?,
314 created_by: get(row, "created_by")?,
315 })
316}
317
318fn deployment_component_detail_from_pg_row(
319 row: &Row,
320) -> Result<DeploymentComponentDetail, DbErrorRead> {
321 let component_name: String = get(row, "component_name")?;
322 let component_type: ComponentType =
323 get::<String, _>(row, "component_type")?
324 .parse()
325 .map_err(|err| {
326 DbErrorRead::Generic(consistency_db_err(format!("invalid component_type: {err}")))
327 })?;
328 let component_digest = ComponentDigest(Digest(
329 get::<Vec<u8>, _>(row, "component_digest")?
330 .try_into()
331 .map_err(|_| {
332 DbErrorRead::Generic(consistency_db_err("invalid component_digest length"))
333 })?,
334 ));
335 let component_id = ComponentId::new(
336 component_type,
337 StrVariant::from(component_name),
338 component_digest,
339 )
340 .map_err(|err| DbErrorRead::Generic(consistency_db_err(err.to_string())))?;
341 let imports =
342 get::<Json<Vec<concepts::storage::PersistedFunctionMetadata>>, _>(row, "imports_json")?.0;
343 let exports =
344 get::<Json<Vec<concepts::storage::PersistedFunctionMetadata>>, _>(row, "exports_json")?.0;
345 let wit: String = get(row, "wit")?;
346 Ok(DeploymentComponentDetail {
347 component_id,
348 imports,
349 exports,
350 wit,
351 })
352}
353
354#[track_caller]
355fn consistency_db_err(reason: impl Into<StrVariant>) -> DbErrorGeneric {
356 DbErrorGeneric::Uncategorized {
357 reason: reason.into(),
358 context: SpanTrace::capture(),
359 source: None,
360 loc: Location::caller(),
361 }
362}
363#[track_caller]
364fn consistency_db_err_src(
365 reason: impl Into<StrVariant>,
366 source: Arc<dyn std::error::Error + Send + Sync>,
367) -> DbErrorGeneric {
368 DbErrorGeneric::Uncategorized {
369 reason: reason.into(),
370 context: SpanTrace::capture(),
371 source: Some(source),
372 loc: Location::caller(),
373 }
374}
375
376#[derive(Debug, Clone)]
377struct DelayReq {
378 join_set_id: JoinSetId,
379 delay_id: DelayId,
380 expires_at: DateTime<Utc>,
381 paused: bool,
382}
383
384async fn fetch_created_event(
385 tx: &Transaction<'_>,
386 execution_id: &ExecutionId,
387) -> Result<CreateRequest, DbErrorRead> {
388 let stmt = "SELECT created_at, json_value FROM t_execution_log WHERE \
389 execution_id = $1 AND version = 0";
390
391 let row = tx.query_one(stmt, &[&execution_id.to_string()]).await?;
392
393 let created_at = get(&row, "created_at")?;
394 let event: Json<ExecutionRequest> = get(&row, "json_value")?;
395 let event = event.0;
396
397 if let ExecutionRequest::Created {
398 ffqn,
399 params,
400 parent,
401 scheduled_at,
402 component_id,
403 deployment_id,
404 metadata,
405 scheduled_by,
406 } = event
407 {
408 Ok(CreateRequest {
409 created_at,
410 execution_id: execution_id.clone(),
411 ffqn,
412 params,
413 parent,
414 scheduled_at,
415 component_id,
416 deployment_id,
417 metadata,
418 scheduled_by,
419 paused: false,
420 })
421 } else {
422 error!("Row with version=0 must be a `Created` event - {event:?}");
423 Err(consistency_db_err("expected `Created` event").into())
424 }
425}
426
427fn check_expected_next_and_appending_version(
428 expected_version: &Version,
429 appending_version: &Version,
430) -> Result<(), DbErrorWrite> {
431 if *expected_version != *appending_version {
432 debug!(
433 "Version conflict - expected: {expected_version:?}, appending: {appending_version:?}"
434 );
435 return Err(DbErrorWrite::NonRetriable(
436 DbErrorWriteNonRetriable::VersionConflict {
437 expected: expected_version.clone(),
438 requested: appending_version.clone(),
439 },
440 ));
441 }
442 Ok(())
443}
444
445#[instrument(level = Level::DEBUG, skip_all, fields(execution_id = %req.execution_id))]
446async fn create_inner(
447 tx: &Transaction<'_>,
448 req: CreateRequest,
449) -> Result<(AppendResponse, AppendNotifier), DbErrorWrite> {
450 trace!("create_inner");
451
452 let version = Version::default();
453 let execution_id = req.execution_id.clone();
454 let execution_id_str = execution_id.to_string();
455 let ffqn = req.ffqn.clone();
456 let created_at = req.created_at;
457 let scheduled_at = req.scheduled_at;
458 let component_id = req.component_id.clone();
459 let deployment_id = req.deployment_id;
460 let paused = req.paused;
461
462 let event = ExecutionRequest::from(req);
463 let event = Json(event);
464
465 tx.execute(
466 "INSERT INTO t_execution_log (
467 execution_id, created_at, version, json_value, variant, join_set_id
468 ) VALUES ($1, $2, $3, $4, $5, $6)",
469 &[
470 &execution_id_str,
471 &created_at,
472 &i64::from(version.0), &event,
474 &event.0.variant(),
475 &event.0.join_set_id().map(std::string::ToString::to_string),
476 ],
477 )
478 .await?;
479
480 let pending_at = {
481 debug!("Creating with `Pending(`{scheduled_at:?}`)");
482
483 tx.execute(
484 r"
485 INSERT INTO t_state (
486 execution_id,
487 is_top_level,
488 corresponding_version,
489 pending_expires_finished,
490 ffqn,
491 state,
492 created_at,
493 component_id_input_digest,
494 component_type,
495 deployment_id,
496 first_scheduled_at,
497 updated_at,
498 intermittent_event_count,
499 is_paused
500 ) VALUES (
501 $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, CURRENT_TIMESTAMP, 0, $12
502 )",
503 &[
504 &execution_id_str,
505 &execution_id.is_top_level(),
506 &i64::from(version.0),
507 &scheduled_at,
508 &ffqn.to_string(),
509 &STATE_PENDING_AT,
510 &created_at,
511 &component_id.component_digest.as_slice(),
512 &component_id.component_type.to_string(),
513 &deployment_id.to_string(),
514 &scheduled_at,
515 &false,
516 ], )
518 .await?;
519
520 AppendNotifier {
521 pending_at: if paused {
522 None
523 } else {
524 Some(NotifierPendingAt {
525 scheduled_at,
526 ffqn: ffqn.clone(),
527 component_input_digest: component_id.component_digest,
528 })
529 },
530 execution_finished: None,
531 response: None,
532 }
533 };
534
535 let mut next_version = Version::new(version.0 + 1);
536 if paused {
537 let (v, _) = append(
538 tx,
539 &execution_id,
540 AppendRequest {
541 created_at,
542 event: ExecutionRequest::Paused,
543 },
544 next_version,
545 )
546 .await?;
547 next_version = v;
548 }
549 Ok((next_version, pending_at))
550}
551
552#[instrument(level = Level::DEBUG, skip_all, fields(%execution_id, %scheduled_at))]
553async fn update_state_pending_after_response_appended(
554 tx: &Transaction<'_>,
555 execution_id: &ExecutionId,
556 scheduled_at: DateTime<Utc>, component_input_digest: ComponentDigest,
558) -> Result<AppendNotifier, DbErrorWrite> {
559 debug!("Setting t_state to Pending(`{scheduled_at:?}`) after response appended");
560
561 let execution_id_str = execution_id.to_string();
563
564 let updated = tx
565 .execute(
566 r"
567 UPDATE t_state
568 SET
569 pending_expires_finished = $1,
570 state = $2,
571 updated_at = CURRENT_TIMESTAMP,
572
573 max_retries = NULL,
574 retry_exp_backoff_millis = NULL,
575 last_lock_version = NULL,
576
577 join_set_id = NULL,
578 join_set_closing = NULL,
579
580 result_kind = NULL
581 WHERE execution_id = $3
582 ",
583 &[
584 &scheduled_at, &STATE_PENDING_AT, &execution_id_str, ],
588 )
589 .await?;
590
591 if updated == 0 {
592 return Err(DbErrorWrite::NotFound);
593 }
594
595 Ok(AppendNotifier {
596 pending_at: Some(NotifierPendingAt {
597 scheduled_at,
598 ffqn: fetch_created_event(tx, execution_id).await?.ffqn,
599 component_input_digest,
600 }),
601 execution_finished: None,
602 response: None,
603 })
604}
605
606#[instrument(level = Level::DEBUG, skip_all, fields(%execution_id, %scheduled_at, %appending_version))]
607async fn update_state_pending_after_event_appended(
608 tx: &Transaction<'_>,
609 execution_id: &ExecutionId,
610 appending_version: &Version,
611 scheduled_at: DateTime<Utc>,
612 intermittent_failure: bool,
613 component_input_digest: ComponentDigest,
614) -> Result<(AppendResponse, AppendNotifier), DbErrorWrite> {
615 debug!("Setting t_state to Pending(`{scheduled_at:?}`) after event appended");
616
617 let intermittent_delta = i64::from(intermittent_failure); let updated = tx
620 .execute(
621 r"
622 UPDATE t_state
623 SET
624 corresponding_version = $1,
625 pending_expires_finished = $2,
626 state = $3,
627 updated_at = CURRENT_TIMESTAMP,
628 intermittent_event_count = intermittent_event_count + $4,
629
630 max_retries = NULL,
631 retry_exp_backoff_millis = NULL,
632 last_lock_version = NULL,
633
634 join_set_id = NULL,
635 join_set_closing = NULL,
636
637 result_kind = NULL
638 WHERE execution_id = $5;
639 ",
640 &[
641 &i64::from(appending_version.0), &scheduled_at, &STATE_PENDING_AT, &intermittent_delta, &execution_id.to_string(), ],
647 )
648 .await?;
649
650 if updated != 1 {
651 return Err(DbErrorWrite::NotFound);
652 }
653
654 Ok((
655 appending_version.increment(),
656 AppendNotifier {
657 pending_at: Some(NotifierPendingAt {
658 scheduled_at,
659 ffqn: fetch_created_event(tx, execution_id).await?.ffqn,
660 component_input_digest,
661 }),
662 execution_finished: None,
663 response: None,
664 },
665 ))
666}
667
668#[instrument(level = Level::DEBUG, skip_all, fields(%execution_id, %appending_version))]
669#[expect(clippy::too_many_arguments)]
670async fn update_state_locked_get_intermittent_event_count(
671 tx: &Transaction<'_>,
672 execution_id: &ExecutionId,
673 deployment_id: DeploymentId,
674 component_digest: &ComponentDigest,
675 executor_id: ExecutorId,
676 run_id: RunId,
677 lock_expires_at: DateTime<Utc>,
678 appending_version: &Version,
679 retry_config: ComponentRetryConfig,
680) -> Result<u32, DbErrorWrite> {
681 debug!("Setting t_state to Locked(`{lock_expires_at:?}`)");
682 let backoff_millis =
683 i64::try_from(retry_config.retry_exp_backoff.as_millis()).map_err(|err| {
684 DbErrorGeneric::Uncategorized {
686 reason: "backoff too big".into(),
687 context: SpanTrace::capture(),
688 source: Some(Arc::new(err)),
689 loc: Location::caller(),
690 }
691 })?;
692
693 let execution_id_str = execution_id.to_string();
694
695 let updated = tx
696 .execute(
697 r"
698 UPDATE t_state
699 SET
700 corresponding_version = $1,
701 pending_expires_finished = $2,
702 state = $3,
703 updated_at = CURRENT_TIMESTAMP,
704 deployment_id = $4,
705 component_id_input_digest = $5,
706
707 max_retries = $6,
708 retry_exp_backoff_millis = $7,
709 last_lock_version = $1,
710 executor_id = $8,
711 run_id = $9,
712
713 join_set_id = NULL,
714 join_set_closing = NULL,
715
716 result_kind = NULL
717 WHERE execution_id = $10
718 AND is_paused = false
719 ",
720 &[
721 &i64::from(appending_version.0),
722 &lock_expires_at,
723 &STATE_LOCKED,
724 &deployment_id.to_string(),
725 &component_digest,
726 &retry_config.max_retries.map(i64::from),
727 &backoff_millis,
728 &executor_id.to_string(),
729 &run_id.to_string(),
730 &execution_id_str,
731 ],
732 )
733 .await?;
734
735 if updated != 1 {
736 return Err(DbErrorWrite::NotFound);
737 }
738
739 let row = tx
741 .query_one(
742 "SELECT intermittent_event_count FROM t_state WHERE execution_id = $1",
743 &[&execution_id_str],
744 )
745 .await
746 .map_err(DbErrorGeneric::from)?;
747
748 let count: i64 = get(&row, "intermittent_event_count")?; let count = u32::try_from(count)
750 .map_err(|_| consistency_db_err("`intermittent_event_count` must not be negative"))?;
751 Ok(count)
752}
753
754#[instrument(level = Level::DEBUG, skip_all, fields(%execution_id, %appending_version))]
755async fn update_state_blocked(
756 tx: &Transaction<'_>,
757 execution_id: &ExecutionId,
758 appending_version: &Version,
759 join_set_id: &JoinSetId,
760 lock_expires_at: DateTime<Utc>,
761 join_set_closing: bool,
762) -> Result<AppendResponse, DbErrorWrite> {
763 debug!("Setting t_state to BlockedByJoinSet(`{join_set_id}`)");
764
765 let updated = tx
766 .execute(
767 r"
768 UPDATE t_state
769 SET
770 corresponding_version = $1,
771 pending_expires_finished = $2,
772 state = $3,
773 updated_at = CURRENT_TIMESTAMP,
774
775 max_retries = NULL,
776 retry_exp_backoff_millis = NULL,
777 last_lock_version = NULL,
778
779 join_set_id = $4,
780 join_set_closing = $5,
781
782 result_kind = NULL
783 WHERE execution_id = $6
784 ",
785 &[
786 &i64::from(appending_version.0), &lock_expires_at, &STATE_BLOCKED_BY_JOIN_SET, &join_set_id.to_string(), &join_set_closing, &execution_id.to_string(), ],
793 )
794 .await?;
795
796 if updated != 1 {
797 return Err(DbErrorWrite::NotFound);
798 }
799 Ok(appending_version.increment())
800}
801
802#[instrument(level = Level::DEBUG, skip_all, fields(%execution_id, %appending_version))]
803async fn update_state_finished(
804 tx: &Transaction<'_>,
805 execution_id: &ExecutionId,
806 appending_version: &Version,
807 finished_at: DateTime<Utc>,
808 result_kind: PendingStateFinishedResultKind,
809) -> Result<(), DbErrorWrite> {
810 debug!("Setting t_state to Finished");
811
812 let result_kind_json = Json(result_kind);
813
814 let updated = tx
815 .execute(
816 r"
817 UPDATE t_state
818 SET
819 corresponding_version = $1,
820 pending_expires_finished = $2,
821 state = $3,
822 updated_at = CURRENT_TIMESTAMP,
823
824 max_retries = NULL,
825 retry_exp_backoff_millis = NULL,
826 last_lock_version = NULL,
827 executor_id = NULL,
828 run_id = NULL,
829
830 join_set_id = NULL,
831 join_set_closing = NULL,
832
833 is_paused = false,
834 result_kind = $4
835 WHERE execution_id = $5
836 ",
837 &[
838 &i64::from(appending_version.0), &finished_at, &STATE_FINISHED, &result_kind_json, &execution_id.to_string(), ],
844 )
845 .await?;
846
847 if updated != 1 {
848 return Err(DbErrorWrite::NotFound);
849 }
850 Ok(())
851}
852
853#[instrument(level = Level::DEBUG, skip_all, fields(%execution_id, %appending_version, %is_paused))]
854async fn update_state_paused(
855 tx: &Transaction<'_>,
856 execution_id: &ExecutionId,
857 appending_version: &Version,
858 is_paused: bool,
859) -> Result<AppendResponse, DbErrorWrite> {
860 debug!(
861 "Setting t_state to {}",
862 if is_paused { "paused" } else { "unpaused" }
863 );
864
865 let updated = tx
866 .execute(
867 r"
868 UPDATE t_state
869 SET
870 corresponding_version = $1,
871 is_paused = $2,
872 updated_at = CURRENT_TIMESTAMP
873 WHERE execution_id = $3
874 ",
875 &[
876 &i64::from(appending_version.0), &is_paused, &execution_id.to_string(), ],
880 )
881 .await?;
882
883 if updated != 1 {
884 return Err(DbErrorWrite::NotFound);
885 }
886 Ok(appending_version.increment())
887}
888
889#[instrument(level = Level::DEBUG, skip_all, fields(%execution_id, %appending_version))]
890async fn bump_state_next_version(
891 tx: &Transaction<'_>,
892 execution_id: &ExecutionId,
893 appending_version: &Version,
894 delay_req: Option<DelayReq>,
895) -> Result<AppendResponse, DbErrorWrite> {
896 debug!("update_index_version");
897 let execution_id_str = execution_id.to_string();
898
899 let updated = tx
900 .execute(
901 r"
902 UPDATE t_state
903 SET
904 corresponding_version = $1,
905 updated_at = CURRENT_TIMESTAMP
906 WHERE execution_id = $2
907 ",
908 &[
909 &i64::from(appending_version.0), &execution_id_str, ],
912 )
913 .await?;
914
915 if updated != 1 {
916 return Err(DbErrorWrite::NotFound);
917 }
918
919 if let Some(DelayReq {
920 join_set_id,
921 delay_id,
922 expires_at,
923 paused,
924 }) = delay_req
925 {
926 debug!("Inserting delay to `t_delay`");
927 tx.execute(
928 "INSERT INTO t_delay (execution_id, join_set_id, delay_id, expires_at, is_paused) VALUES ($1, $2, $3, $4, $5)",
929 &[
930 &execution_id_str,
931 &join_set_id.to_string(),
932 &delay_id.to_string(),
933 &expires_at,
934 &paused,
935 ],
936 )
937 .await?;
938 }
939 Ok(appending_version.increment())
940}
941
942async fn get_combined_state(
943 tx: &Transaction<'_>,
944 execution_id: &ExecutionId,
945) -> Result<CombinedState, DbErrorRead> {
946 let row = tx
947 .query_one(
948 r"
949 SELECT
950 created_at, first_scheduled_at,
951 state, ffqn, component_id_input_digest, component_type, deployment_id, corresponding_version, pending_expires_finished,
952 last_lock_version, executor_id, run_id,
953 join_set_id, join_set_closing,
954 result_kind, is_paused
955 FROM t_state
956 WHERE execution_id = $1
957 ",
958 &[&execution_id.to_string()],
959 )
960 .await
961 .map_err(DbErrorRead::from)?;
962
963 let created_at: DateTime<Utc> = get(&row, "created_at")?;
966 let first_scheduled_at: DateTime<Utc> = get(&row, "first_scheduled_at")?;
967
968 let digest_bytes: Vec<u8> = get(&row, "component_id_input_digest")?;
969 let digest = Digest::try_from(digest_bytes.as_slice()).map_err(|err| {
970 consistency_db_err_src("cannot parse `component_id_input_digest`", Arc::from(err))
971 })?;
972 let component_digest = ComponentDigest(digest);
973
974 let component_type: String = get(&row, "component_type")?;
975 let component_type = ComponentType::from_str(&component_type)
976 .map_err(|err| consistency_db_err_src("cannot parse `component_type`", Arc::from(err)))?;
977
978 let deployment_id: String = get(&row, "deployment_id")?;
979 let deployment_id = DeploymentId::from_str(&deployment_id).map_err(DbErrorGeneric::from)?;
980
981 let state: String = get(&row, "state")?;
982 let ffqn: String = get(&row, "ffqn")?;
983 let ffqn = FunctionFqn::from_str(&ffqn).map_err(|parse_err| {
984 consistency_db_err(format!("invalid ffqn value in `t_state` - {parse_err}"))
985 })?;
986
987 let pending_expires_finished: DateTime<Utc> = get(&row, "pending_expires_finished")?;
988
989 let last_lock_version_raw: Option<i64> = get(&row, "last_lock_version")?;
990 let last_lock_version = last_lock_version_raw
991 .map(Version::try_from)
992 .transpose()
993 .map_err(|_| consistency_db_err("version must be non-negative"))?;
994
995 let executor_id_raw: Option<String> = get(&row, "executor_id")?;
996 let executor_id = executor_id_raw
997 .map(|id| ExecutorId::from_str(&id))
998 .transpose()
999 .map_err(DbErrorGeneric::from)?;
1000
1001 let run_id_raw: Option<String> = get(&row, "run_id")?;
1002 let run_id = run_id_raw
1003 .map(|id| RunId::from_str(&id))
1004 .transpose()
1005 .map_err(DbErrorGeneric::from)?;
1006
1007 let join_set_id_raw: Option<String> = get(&row, "join_set_id")?;
1008 let join_set_id = join_set_id_raw
1009 .map(|id| JoinSetId::from_str(&id))
1010 .transpose()
1011 .map_err(DbErrorGeneric::from)?;
1012
1013 let join_set_closing: Option<bool> = get(&row, "join_set_closing")?;
1014
1015 let result_kind: Option<Json<PendingStateFinishedResultKind>> = get(&row, "result_kind")?;
1016 let result_kind = result_kind.map(|it| it.0);
1017
1018 let is_paused: bool = get(&row, "is_paused")?;
1019
1020 let corresponding_version: i64 = get(&row, "corresponding_version")?;
1021 let corresponding_version = Version::new(
1022 VersionType::try_from(corresponding_version)
1023 .map_err(|_| consistency_db_err("version must be non-negative"))?,
1024 );
1025
1026 let dto = CombinedStateDTO {
1027 execution_id: execution_id.clone(),
1028 created_at,
1029 first_scheduled_at,
1030 state,
1031 ffqn,
1032 component_digest,
1033 component_type,
1034 deployment_id,
1035 pending_expires_finished,
1036 last_lock_version,
1037 executor_id,
1038 run_id,
1039 join_set_id,
1040 join_set_closing,
1041 result_kind,
1042 is_paused,
1043 };
1044 CombinedState::new(dto, corresponding_version).map_err(DbErrorRead::from)
1045}
1046
1047async fn list_executions(
1048 read_tx: &Transaction<'_>,
1049 filter: ListExecutionsFilter,
1050 pagination: &ExecutionListPagination,
1051) -> Result<Vec<ExecutionWithState>, DbErrorGeneric> {
1052 struct QueryBuilder {
1054 where_clauses: Vec<String>,
1055 params: Vec<Box<dyn ToSql + Send + Sync>>,
1056 }
1057
1058 impl QueryBuilder {
1059 fn new() -> Self {
1060 Self {
1061 where_clauses: Vec::new(),
1062 params: Vec::new(),
1063 }
1064 }
1065
1066 fn add_param<T>(&mut self, param: T) -> String
1067 where
1068 T: ToSql + Sync + Send + 'static,
1069 {
1070 self.params.push(Box::new(param));
1071 format!("${}", self.params.len())
1072 }
1073
1074 fn add_where(&mut self, clause: String) {
1075 self.where_clauses.push(clause);
1076 }
1077 }
1078
1079 let mut qb = QueryBuilder::new();
1080
1081 let (limit, limit_desc) = match pagination {
1083 ExecutionListPagination::CreatedBy(p) => {
1084 let limit = p.length();
1085 let is_desc = p.is_desc();
1086 if let Some(cursor) = p.cursor() {
1087 let placeholder = qb.add_param(*cursor);
1088 qb.add_where(format!("created_at {} {placeholder}", p.rel()));
1089 }
1090 (limit, is_desc)
1091 }
1092 ExecutionListPagination::ExecutionId(p) => {
1093 let limit = p.length();
1094 let is_desc = p.is_desc();
1095 if let Some(cursor) = p.cursor() {
1096 let placeholder = qb.add_param(cursor.to_string());
1097 qb.add_where(format!("execution_id {} {placeholder}", p.rel()));
1098 }
1099 (limit, is_desc)
1100 }
1101 };
1102
1103 if !filter.show_derived {
1104 qb.add_where("is_top_level = true".to_string());
1105 }
1106 if let Some(function_name_filter) = filter.function_name_filter {
1107 let placeholder = qb.add_param(function_name_filter.like_pattern());
1108 qb.add_where(format!("ffqn LIKE {placeholder}"));
1109 }
1110 let like = |value: String| format!("{value}%");
1111 if filter.hide_finished {
1112 qb.add_where(format!("state != '{STATE_FINISHED}'"));
1113 }
1114 if let Some(prefix) = filter.execution_id_prefix {
1115 let placeholder = qb.add_param(like(prefix));
1116 qb.add_where(format!("execution_id LIKE {placeholder}"));
1117 }
1118 if let Some(component_digest) = filter.component_digest {
1119 let placeholder = qb.add_param(component_digest);
1120 qb.add_where(format!("component_id_input_digest = {placeholder}"));
1121 }
1122 if let Some(deployment_id) = filter.deployment_id {
1123 let placeholder = qb.add_param(deployment_id.to_string());
1124 qb.add_where(format!("deployment_id = {placeholder}"));
1125 }
1126
1127 let where_str = if qb.where_clauses.is_empty() {
1128 String::new()
1129 } else {
1130 format!("WHERE {}", qb.where_clauses.join(" AND "))
1131 };
1132
1133 let order_col = match pagination {
1134 ExecutionListPagination::CreatedBy(_) => "created_at",
1135 ExecutionListPagination::ExecutionId(_) => "execution_id",
1136 };
1137
1138 let (inner_order, outer_order) = if limit_desc {
1141 ("DESC", "")
1142 } else {
1143 ("", "DESC")
1144 };
1145
1146 let inner_sql = format!(
1147 r"SELECT created_at, first_scheduled_at, component_id_input_digest, component_type, deployment_id,
1148 state, execution_id, ffqn, corresponding_version, pending_expires_finished,
1149 last_lock_version, executor_id, run_id,
1150 join_set_id, join_set_closing,
1151 result_kind, is_paused
1152 FROM t_state {where_str} ORDER BY {order_col} {inner_order} LIMIT {limit}"
1153 );
1154
1155 let sql = if outer_order.is_empty() {
1156 inner_sql
1157 } else {
1158 format!("SELECT * FROM ({inner_sql}) AS sub ORDER BY {order_col} {outer_order}")
1159 };
1160
1161 let params_refs: Vec<&(dyn ToSql + Sync)> = qb
1162 .params
1163 .iter()
1164 .map(|p| p.as_ref() as &(dyn ToSql + Sync))
1165 .collect();
1166
1167 let rows = read_tx.query(&sql, ¶ms_refs).await?;
1168
1169 let mut vec = Vec::with_capacity(rows.len());
1170
1171 for row in rows {
1172 let unpack = || -> Result<ExecutionWithState, DbErrorGeneric> {
1174 let execution_id_str: String = get(&row, "execution_id")?;
1175 let execution_id = ExecutionId::from_str(&execution_id_str)
1176 .map_err(|err| consistency_db_err(err.to_string()))?;
1177
1178 let digest_bytes: Vec<u8> = get(&row, "component_id_input_digest")?;
1179 let digest = Digest::try_from(digest_bytes.as_slice()).map_err(|err| {
1180 consistency_db_err_src("cannot parse `component_id_input_digest`", Arc::from(err))
1181 })?;
1182 let component_digest = ComponentDigest(digest);
1183
1184 let component_type: String = get(&row, "component_type")?;
1185 let component_type = ComponentType::from_str(&component_type).map_err(|err| {
1186 consistency_db_err_src("cannot parse `component_type`", Arc::from(err))
1187 })?;
1188
1189 let deployment_id: String = get(&row, "deployment_id")?;
1190 let deployment_id =
1191 DeploymentId::from_str(&deployment_id).map_err(DbErrorGeneric::from)?;
1192
1193 let created_at: DateTime<Utc> = get(&row, "created_at")?;
1194 let first_scheduled_at: DateTime<Utc> = get(&row, "first_scheduled_at")?;
1195
1196 let result_kind: Option<Json<PendingStateFinishedResultKind>> =
1197 get(&row, "result_kind")?;
1198 let result_kind = result_kind.map(|it| it.0);
1199
1200 let is_paused: bool = get(&row, "is_paused")?;
1201
1202 let corresponding_version: i64 = get(&row, "corresponding_version")?;
1203 let corresponding_version = Version::try_from(corresponding_version)
1204 .map_err(|_| consistency_db_err("version must be non-negative"))?;
1205
1206 let executor_id_str: Option<String> = get(&row, "executor_id")?;
1207 let executor_id = executor_id_str
1208 .map(|id| ExecutorId::from_str(&id))
1209 .transpose()?;
1210
1211 let last_lock_version_raw: Option<i64> = get(&row, "last_lock_version")?;
1212 let last_lock_version = last_lock_version_raw
1213 .map(Version::try_from)
1214 .transpose()
1215 .map_err(|_| consistency_db_err("version must be non-negative"))?;
1216
1217 let run_id_str: Option<String> = get(&row, "run_id")?;
1218 let run_id = run_id_str.map(|id| RunId::from_str(&id)).transpose()?;
1219
1220 let join_set_id_str: Option<String> = get(&row, "join_set_id")?;
1221 let join_set_id = join_set_id_str
1222 .map(|id| JoinSetId::from_str(&id))
1223 .transpose()?;
1224
1225 let ffqn: String = get(&row, "ffqn")?;
1226 let ffqn = FunctionFqn::from_str(&ffqn).map_err(|parse_err| {
1227 error!("Error parsing ffqn - {parse_err:?}");
1228 consistency_db_err("invalid ffqn value in `t_state`")
1229 })?;
1230
1231 let combined_state_dto = CombinedStateDTO {
1232 execution_id,
1233 created_at,
1234 first_scheduled_at,
1235 component_digest,
1236 component_type,
1237 deployment_id,
1238 state: get(&row, "state")?,
1239 ffqn,
1240 pending_expires_finished: get(&row, "pending_expires_finished")?,
1241 executor_id,
1242 last_lock_version,
1243 run_id,
1244 join_set_id,
1245 join_set_closing: get(&row, "join_set_closing")?,
1246 result_kind,
1247 is_paused,
1248 };
1249
1250 let combined_state = CombinedState::new(combined_state_dto, corresponding_version)?;
1251
1252 Ok(combined_state.execution_with_state)
1253 };
1254
1255 match unpack() {
1256 Ok(execution) => vec.push(execution),
1257 Err(err) => {
1258 warn!("Skipping corrupted row in t_state: {err:?}");
1259 }
1260 }
1261 }
1262
1263 Ok(vec)
1264}
1265
1266async fn list_responses(
1267 tx: &Transaction<'_>,
1268 execution_id: &ExecutionId,
1269 pagination: Option<Pagination<u32>>,
1270) -> Result<Vec<ResponseWithCursor>, DbErrorRead> {
1271 let mut params: Vec<Box<dyn ToSql + Send + Sync>> = Vec::new();
1273 let mut add_param = |p: Box<dyn ToSql + Send + Sync>| {
1274 params.push(p);
1275 format!("${}", params.len())
1276 };
1277
1278 let p_execution_id = add_param(Box::new(execution_id.to_string()));
1280
1281 let mut sql = format!(
1282 "SELECT \
1283 r.id, r.created_at, r.join_set_id, r.delay_id, r.delay_success, r.child_execution_id, r.finished_version, l.json_value \
1284 FROM t_join_set_response r LEFT OUTER JOIN t_execution_log l ON r.child_execution_id = l.execution_id \
1285 WHERE \
1286 r.execution_id = {p_execution_id} \
1287 AND ( r.finished_version = l.version OR r.child_execution_id IS NULL )"
1288 );
1289
1290 let limit = match &pagination {
1292 Some(p @ (Pagination::NewerThan { cursor, .. } | Pagination::OlderThan { cursor, .. })) => {
1293 let p_cursor = add_param(Box::new(i64::from(*cursor)));
1295
1296 write!(sql, " AND r.id {} {}", p.rel(), p_cursor).unwrap();
1298
1299 Some(p.length())
1300 }
1301 None => None,
1302 };
1303
1304 sql.push_str(" ORDER BY r.id");
1306 let is_desc = pagination.as_ref().is_some_and(Pagination::is_desc);
1307 if is_desc {
1308 sql.push_str(" DESC");
1309 }
1310
1311 if let Some(limit) = limit {
1313 let p_limit = add_param(Box::new(i64::from(limit)));
1315 write!(sql, " LIMIT {p_limit}").unwrap();
1316 }
1317
1318 if is_desc {
1320 sql = format!("SELECT * FROM ({sql}) AS sub ORDER BY id ASC");
1321 }
1322
1323 let params_refs: Vec<&(dyn ToSql + Sync)> = params
1324 .iter()
1325 .map(|p| p.as_ref() as &(dyn ToSql + Sync))
1326 .collect();
1327
1328 let rows = tx
1329 .query(&sql, ¶ms_refs)
1330 .await
1331 .map_err(DbErrorRead::from)?;
1332
1333 let mut results = Vec::with_capacity(rows.len());
1334 for row in rows {
1335 results.push(parse_response_with_cursor(&row)?);
1336 }
1337
1338 Ok(results)
1339}
1340
1341async fn list_logs_tx(
1342 tx: &Transaction<'_>,
1343 execution_id: &ExecutionId,
1344 show_derived: bool,
1345 filter: &LogFilter,
1346 pagination: &Pagination<DateTime<Utc>>,
1347) -> Result<ListLogsResponse, DbErrorRead> {
1348 let mut param_index = 1;
1349 let exec_id_str = execution_id.to_string();
1350 let exec_id_filter = if show_derived {
1351 format!("execution_id LIKE ${param_index} || '%'")
1352 } else {
1353 format!("execution_id = ${param_index}")
1354 };
1355 let mut query = format!(
1356 "SELECT id, run_id, created_at, level, message, stream_type, payload, execution_id
1357 FROM t_log
1358 WHERE {exec_id_filter}"
1359 );
1360 let mut params: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = vec![&exec_id_str];
1361 param_index += 1;
1362
1363 let level_filter = if filter.should_show_logs() {
1365 let levels_str = if !filter.levels().is_empty() {
1366 filter
1367 .levels()
1368 .iter()
1369 .map(|lvl| (*lvl as u8).to_string())
1370 .collect::<Vec<_>>()
1371 .join(",")
1372 } else {
1373 LogLevel::iter()
1374 .map(|lvl| (lvl as u8).to_string())
1375 .collect::<Vec<_>>()
1376 .join(",")
1377 };
1378 Some(format!(" level IN ({levels_str})"))
1379 } else {
1380 None
1381 };
1382 let stream_filter = if filter.should_show_streams() {
1383 let streams_str = if !filter.stream_types().is_empty() {
1384 filter
1385 .stream_types()
1386 .iter()
1387 .map(|st| (*st as u8).to_string())
1388 .collect::<Vec<_>>()
1389 .join(",")
1390 } else {
1391 LogStreamType::iter()
1392 .map(|st| (st as u8).to_string())
1393 .collect::<Vec<_>>()
1394 .join(",")
1395 };
1396 Some(format!(" stream_type IN ({streams_str})"))
1397 } else {
1398 None
1399 };
1400 match (level_filter, stream_filter) {
1401 (Some(level_filter), Some(stream_filter)) => {
1402 write!(&mut query, " AND ({level_filter} OR {stream_filter})")
1403 .expect("writing to string");
1404 }
1405 (Some(level_filter), None) => {
1406 write!(&mut query, " AND {level_filter}").expect("writing to string");
1407 }
1408 (None, Some(stream_filter)) => {
1409 write!(&mut query, " AND {stream_filter}").expect("writing to string");
1410 }
1411 (None, None) => unreachable!("guarded by constructor"),
1412 }
1413
1414 write!(
1416 &mut query,
1417 " AND created_at {} ${param_index}",
1418 pagination.rel()
1419 )
1420 .expect("writing to string");
1421 let cursor_val = pagination.cursor();
1422 params.push(cursor_val);
1423 param_index += 1;
1424
1425 let dir = if pagination.is_desc() { "DESC" } else { "ASC" };
1427 write!(
1428 &mut query,
1429 " ORDER BY created_at {dir}, id {dir} LIMIT ${param_index}",
1430 )
1431 .expect("writing to string");
1432 let length_val: i64 = i64::from(pagination.length());
1433 params.push(&length_val);
1434
1435 let rows = tx.query(&query, ¶ms[..]).await?;
1436
1437 let mut items = Vec::with_capacity(rows.len());
1438
1439 for row in rows {
1440 let created_at: chrono::DateTime<chrono::Utc> = get(&row, "created_at")?;
1441 let run_id: String = get(&row, "run_id")?;
1442 let run_id = RunId::from_str(&run_id).map_err(|parse_err| {
1443 consistency_db_err_src(
1444 format!("cannot convert RunId {run_id}"),
1445 Arc::from(parse_err),
1446 )
1447 })?;
1448 let execution_id_str: String = get(&row, "execution_id")?;
1449 let execution_id = ExecutionId::from_str(&execution_id_str).map_err(|parse_err| {
1450 consistency_db_err_src(
1451 format!("cannot convert ExecutionId {execution_id_str}"),
1452 Arc::from(parse_err),
1453 )
1454 })?;
1455
1456 let level: Option<i32> = get(&row, "level")?;
1457 let message: Option<String> = get(&row, "message")?;
1458 let stream_type: Option<i32> = get(&row, "stream_type")?;
1459 let payload: Option<Vec<u8>> = get(&row, "payload")?;
1460
1461 let log_entry = match (level, message, stream_type, payload) {
1462 (Some(lvl), Some(msg), None, None) => {
1463 let map_err =
1464 |err| consistency_db_err_src(format!("cannot convert {lvl} to LogLevel"), err);
1465 LogEntry::Log {
1466 created_at,
1467 level: u8::try_from(lvl)
1468 .map(|lvl| LogLevel::try_from(lvl).map_err(|err| map_err(Arc::from(err))))
1469 .map_err(|err| map_err(Arc::from(err)))??,
1470 message: msg,
1471 }
1472 }
1473 (None, None, Some(stype), Some(pl)) => {
1474 let map_err = |err| {
1475 consistency_db_err_src(format!("cannot convert {stype} to LogStreamType"), err)
1476 };
1477 LogEntry::Stream {
1478 created_at,
1479 stream_type: u8::try_from(stype)
1480 .map(|stype| {
1481 LogStreamType::try_from(stype).map_err(|err| map_err(Arc::from(err)))
1482 })
1483 .map_err(|err| map_err(Arc::from(err)))??,
1484 payload: pl,
1485 }
1486 }
1487 _ => {
1488 return Err(consistency_db_err("invalid t_log row".to_string()).into());
1489 }
1490 };
1491
1492 items.push(LogEntryRow {
1493 cursor: created_at,
1494 run_id,
1495 log_entry,
1496 execution_id,
1497 });
1498 }
1499
1500 Ok(ListLogsResponse {
1501 next_page: items
1502 .last()
1503 .map(|item| Pagination::NewerThan {
1504 length: pagination.length(),
1505 cursor: item.cursor,
1506 including_cursor: false,
1507 })
1508 .unwrap_or(if pagination.is_asc() {
1509 *pagination } else {
1511 Pagination::NewerThan {
1513 length: pagination.length(),
1514 cursor: DateTime::<Utc>::UNIX_EPOCH,
1515 including_cursor: false,
1516 }
1517 }),
1518 prev_page: match items.first() {
1519 Some(item) => Some(Pagination::OlderThan {
1520 length: pagination.length(),
1521 cursor: item.cursor,
1522 including_cursor: false,
1523 }),
1524 None if pagination.is_asc() && pagination.cursor() > &DateTime::<Utc>::UNIX_EPOCH => {
1525 Some(pagination.invert())
1527 }
1528 None => None,
1529 },
1530 items,
1531 })
1532}
1533
1534async fn list_deployment_states(
1535 tx: &Transaction<'_>,
1536 current_time: DateTime<Utc>,
1537 pagination: Pagination<Option<DeploymentId>>,
1538 include_config_json: bool,
1539) -> Result<Vec<DeploymentState>, DbErrorRead> {
1540 let mut params: Vec<Box<dyn ToSql + Send + Sync>> = Vec::new();
1542 let mut add_param = |p: Box<dyn ToSql + Send + Sync>| {
1543 params.push(p);
1544 format!("${}", params.len())
1545 };
1546
1547 let p_now = add_param(Box::new(current_time));
1549
1550 let config_json_col = if include_config_json {
1551 "d.config_json"
1552 } else {
1553 "NULL::TEXT AS config_json"
1554 };
1555
1556 let mut sql = format!(
1557 "
1558 SELECT
1559 d.deployment_id,
1560
1561 COUNT(*) FILTER (WHERE s.state = '{STATE_LOCKED}') AS locked,
1562
1563 COUNT(*) FILTER (
1564 WHERE s.state = '{STATE_PENDING_AT}'
1565 AND s.pending_expires_finished <= {p_now}
1566 ) AS pending,
1567
1568 COUNT(*) FILTER (
1569 WHERE s.state = '{STATE_PENDING_AT}'
1570 AND s.pending_expires_finished > {p_now}
1571 ) AS scheduled,
1572
1573 COUNT(*) FILTER (WHERE s.state = '{STATE_BLOCKED_BY_JOIN_SET}') AS blocked,
1574
1575 COUNT(*) FILTER (WHERE s.state = '{STATE_FINISHED}') AS finished,
1576
1577 {config_json_col},
1578 d.created_at,
1579 d.last_active_at,
1580 d.status
1581 FROM t_deployment d
1582 LEFT JOIN t_state s ON s.deployment_id = d.deployment_id"
1583 );
1584
1585 if let Some(cursor) = pagination.cursor() {
1587 let p_cursor = add_param(Box::new(cursor.to_string()));
1588 write!(
1589 sql,
1590 " WHERE d.deployment_id {rel} {p_cursor}",
1591 rel = pagination.rel()
1592 )
1593 .expect("writing to string");
1594 }
1595
1596 let (inner_order, outer_order) = if pagination.is_desc() {
1600 ("DESC", "")
1601 } else {
1602 ("ASC", "DESC")
1603 };
1604
1605 write!(
1606 sql,
1607 " GROUP BY d.deployment_id, d.config_json, d.created_at, d.last_active_at, d.status ORDER BY d.deployment_id {inner_order} LIMIT {}",
1608 pagination.length()
1609 )
1610 .expect("writing to string");
1611
1612 let final_sql = if outer_order.is_empty() {
1613 sql
1614 } else {
1615 format!("SELECT * FROM ({sql}) AS sub ORDER BY deployment_id {outer_order}")
1616 };
1617
1618 let params_refs: Vec<&(dyn ToSql + Sync)> = params
1619 .iter()
1620 .map(|p| p.as_ref() as &(dyn ToSql + Sync))
1621 .collect();
1622
1623 let rows = tx
1624 .query(&final_sql, ¶ms_refs)
1625 .await
1626 .map_err(DbErrorRead::from)?;
1627
1628 let mut result = Vec::with_capacity(rows.len());
1629 for row in rows {
1630 let deployment_id: String = get(&row, "deployment_id")?;
1631 let status_str: String = get::<String, _>(&row, "status")?;
1632 let status = status_str
1633 .parse::<DeploymentStatus>()
1634 .map_err(|e| consistency_db_err(format!("unknown deployment status: {e}")))?;
1635 result.push(DeploymentState {
1636 deployment_id: DeploymentId::from_str(&deployment_id).map_err(DbErrorGeneric::from)?,
1637 locked: u32::try_from(get::<i64, _>(&row, "locked")?).expect("count is never negative"),
1638 pending: u32::try_from(get::<i64, _>(&row, "pending")?)
1639 .expect("count is never negative"),
1640 scheduled: u32::try_from(get::<i64, _>(&row, "scheduled")?)
1641 .expect("count is never negative"),
1642 blocked: u32::try_from(get::<i64, _>(&row, "blocked")?)
1643 .expect("count is never negative"),
1644 finished: u32::try_from(get::<i64, _>(&row, "finished")?)
1645 .expect("count is never negative"),
1646 config_json: get::<Option<String>, _>(&row, "config_json")?,
1647 created_at: get::<DateTime<Utc>, _>(&row, "created_at")?,
1648 last_active_at: get::<Option<DateTime<Utc>>, _>(&row, "last_active_at")?,
1649 status,
1650 });
1651 }
1652
1653 Ok(result)
1654}
1655
1656fn parse_response_with_cursor(
1657 row: &tokio_postgres::Row,
1658) -> Result<ResponseWithCursor, DbErrorRead> {
1659 let id = u32::try_from(get::<i64, _>(row, "id")?)
1661 .map_err(|_| consistency_db_err("id must not be negative"))?;
1662
1663 let created_at: DateTime<Utc> = get(row, "created_at")?;
1664 let join_set_id_str: String = get(row, "join_set_id")?;
1665 let join_set_id = JoinSetId::from_str(&join_set_id_str).map_err(DbErrorGeneric::from)?;
1666
1667 let delay_id: Option<String> = get(row, "delay_id")?;
1669 let delay_id = delay_id
1670 .map(|id| DelayId::from_str(&id))
1671 .transpose()
1672 .map_err(DbErrorGeneric::from)?;
1673 let delay_success: Option<bool> = get(row, "delay_success")?;
1674 let child_execution_id: Option<String> = get(row, "child_execution_id")?;
1675 let child_execution_id = child_execution_id
1676 .map(|id| ExecutionIdDerived::from_str(&id))
1677 .transpose()
1678 .map_err(DbErrorGeneric::from)?;
1679 let finished_version = get::<Option<i64>, _>(row, "finished_version")?
1680 .map(Version::try_from)
1681 .transpose()
1682 .map_err(|_| consistency_db_err("version must be non-negative"))?;
1683 let json_value: Option<Json<ExecutionRequest>> = get(row, "json_value")?;
1684 let json_value = json_value.map(|it| it.0);
1685
1686 let event = match (
1687 delay_id,
1688 delay_success,
1689 child_execution_id,
1690 finished_version,
1691 json_value,
1692 ) {
1693 (Some(delay_id), Some(delay_success), None, None, None) => JoinSetResponse::DelayFinished {
1694 delay_id,
1695 result: delay_success.then_some(()).ok_or(()),
1696 },
1697 (None, None, Some(child_execution_id), Some(finished_version), Some(json_val)) => {
1698 if let ExecutionRequest::Finished { retval: result, .. } = json_val {
1699 JoinSetResponse::ChildExecutionFinished {
1700 child_execution_id,
1701 finished_version,
1702 result,
1703 }
1704 } else {
1705 error!("Joined log entry must be 'Finished'");
1706 return Err(consistency_db_err("joined log entry must be 'Finished'").into());
1707 }
1708 }
1709 (delay, delay_success, child, finished, result) => {
1710 error!(
1711 "Invalid row in t_join_set_response {id} - {delay:?} {delay_success:?} {child:?} {finished:?} {result:?}",
1712 );
1713 return Err(consistency_db_err("invalid row in t_join_set_response").into());
1714 }
1715 };
1716
1717 Ok(ResponseWithCursor {
1718 cursor: ResponseCursor(id),
1719 event: JoinSetResponseEventOuter {
1720 event: JoinSetResponseEvent { join_set_id, event },
1721 created_at,
1722 },
1723 })
1724}
1725
1726#[instrument(level = Level::TRACE, skip_all, fields(%execution_id, %run_id, %executor_id))]
1727#[expect(clippy::too_many_arguments)]
1728async fn lock_single_execution(
1729 tx: &Transaction<'_>,
1730 created_at: DateTime<Utc>,
1731 component_id: &ComponentId,
1732 deployment_id: DeploymentId,
1733 execution_id: &ExecutionId,
1734 run_id: RunId,
1735 appending_version: &Version,
1736 executor_id: ExecutorId,
1737 lock_expires_at: DateTime<Utc>,
1738 retry_config: ComponentRetryConfig,
1739) -> Result<LockedExecution, DbErrorWrite> {
1740 trace!("lock_single_execution");
1741
1742 let combined_state = get_combined_state(tx, execution_id).await?;
1744 combined_state
1745 .execution_with_state
1746 .pending_state
1747 .can_append_lock(created_at, executor_id, run_id, lock_expires_at)?;
1748 let expected_version = combined_state.get_next_version_assert_not_finished();
1749 check_expected_next_and_appending_version(&expected_version, appending_version)?;
1750
1751 let locked_event = Locked {
1753 component_id: component_id.clone(),
1754 deployment_id,
1755 executor_id,
1756 lock_expires_at,
1757 run_id,
1758 retry_config,
1759 };
1760 let event = ExecutionRequest::Locked(locked_event.clone());
1761
1762 let event = Json(event);
1763
1764 tx.execute(
1766 "INSERT INTO t_execution_log \
1767 (execution_id, created_at, json_value, version, variant) \
1768 VALUES ($1, $2, $3, $4, $5)",
1769 &[
1770 &execution_id.to_string(),
1771 &created_at,
1772 &event,
1773 &i64::from(appending_version.0),
1774 &event.0.variant(),
1775 ],
1776 )
1777 .await
1778 .map_err(|err| {
1779 DbErrorWrite::NonRetriable(DbErrorWriteNonRetriable::IllegalState {
1780 reason: "cannot lock".into(),
1781 context: SpanTrace::capture(),
1782 source: Some(Arc::new(err)),
1783 loc: Location::caller(),
1784 })
1785 })?;
1786
1787 let responses = list_responses(tx, execution_id, None).await?;
1788 trace!("Responses: {responses:?}");
1789
1790 let intermittent_event_count = update_state_locked_get_intermittent_event_count(
1792 tx,
1793 execution_id,
1794 deployment_id,
1795 &component_id.component_digest,
1796 executor_id,
1797 run_id,
1798 lock_expires_at,
1799 appending_version,
1800 retry_config,
1801 )
1802 .await?;
1803
1804 let rows = tx
1807 .query(
1808 "SELECT json_value, version FROM t_execution_log WHERE \
1809 execution_id = $1 AND (variant = $2 OR variant = $3) \
1810 ORDER BY version",
1811 &[
1812 &execution_id.to_string(),
1813 &DUMMY_CREATED.variant(),
1814 &DUMMY_HISTORY_EVENT.variant(),
1815 ],
1816 )
1817 .await
1818 .map_err(DbErrorGeneric::from)?;
1819
1820 let mut events: VecDeque<ExecutionEvent> = VecDeque::new();
1821
1822 for row in rows {
1823 let event: Json<ExecutionRequest> = get(&row, "json_value")?;
1824 let event = event.0;
1825
1826 let version: i64 = get(&row, "version")?;
1827 let version = Version::try_from(version)
1828 .map_err(|_| consistency_db_err("version must be non-negative"))?;
1829
1830 events.push_back(ExecutionEvent {
1831 created_at: DateTime::from_timestamp_nanos(0), event,
1833 backtrace_id: None,
1834 version,
1835 });
1836 }
1837
1838 let Some(ExecutionRequest::Created {
1840 ffqn,
1841 params,
1842 parent,
1843 metadata,
1844 ..
1845 }) = events.pop_front().map(|outer| outer.event)
1846 else {
1847 error!("Execution log must contain at least `Created` event");
1848 return Err(consistency_db_err("execution log must contain `Created` event").into());
1849 };
1850
1851 let mut event_history = Vec::new();
1853 for ExecutionEvent { event, version, .. } in events {
1854 if let ExecutionRequest::HistoryEvent { event } = event {
1855 event_history.push((event, version));
1856 } else {
1857 error!("Rows can only contain `Created` and `HistoryEvent` event kinds");
1858 return Err(consistency_db_err(
1859 "rows can only contain `Created` and `HistoryEvent` event kinds",
1860 )
1861 .into());
1862 }
1863 }
1864
1865 Ok(LockedExecution {
1866 execution_id: execution_id.clone(),
1867 metadata,
1868 next_version: appending_version.increment(),
1869 ffqn,
1870 params,
1871 event_history,
1872 responses,
1873 parent,
1874 intermittent_event_count,
1875 locked_event,
1876 })
1877}
1878
1879async fn count_join_next(
1880 tx: &Transaction<'_>,
1881 execution_id: &ExecutionId,
1882 join_set_id: &JoinSetId,
1883) -> Result<u32, DbErrorRead> {
1884 let row = tx
1885 .query_one(
1886 "SELECT COUNT(*) as count FROM t_execution_log WHERE execution_id = $1 AND join_set_id = $2 \
1887 AND history_event_type = $3",
1888 &[
1889 &execution_id.to_string(),
1890 &join_set_id.to_string(),
1891 &HISTORY_EVENT_TYPE_JOIN_NEXT,
1892 ],
1893 )
1894 .await
1895 .map_err(DbErrorRead::from)?;
1896
1897 let count = u32::try_from(get::<i64, _>(&row, "count")?).expect("COUNT cannot be negative");
1898 Ok(count)
1899}
1900
1901async fn nth_response(
1902 tx: &Transaction<'_>,
1903 execution_id: &ExecutionId,
1904 join_set_id: &JoinSetId,
1905 skip_rows: u32,
1906) -> Result<Option<ResponseWithCursor>, DbErrorRead> {
1907 let row = tx
1908 .query_opt(
1909 "SELECT r.id, r.created_at, r.join_set_id, \
1910 r.delay_id, r.delay_success, \
1911 r.child_execution_id, r.finished_version, l.json_value \
1912 FROM t_join_set_response r LEFT OUTER JOIN t_execution_log l ON r.child_execution_id = l.execution_id \
1913 WHERE \
1914 r.execution_id = $1 AND r.join_set_id = $2 AND \
1915 ( \
1916 r.finished_version = l.version \
1917 OR \
1918 r.child_execution_id IS NULL \
1919 ) \
1920 ORDER BY id \
1921 LIMIT 1 OFFSET $3",
1922 &[
1923 &execution_id.to_string(),
1924 &join_set_id.to_string(),
1925 &i64::from(skip_rows),
1926 ]
1927 )
1928 .await
1929 .map_err(DbErrorRead::from)?;
1930
1931 match row {
1932 Some(r) => Ok(Some(parse_response_with_cursor(&r)?)),
1933 None => Ok(None),
1934 }
1935}
1936
1937#[instrument(level = Level::TRACE, skip_all, fields(%execution_id))]
1938async fn append(
1939 tx: &Transaction<'_>,
1940 execution_id: &ExecutionId,
1941 req: AppendRequest,
1942 appending_version: Version,
1943) -> Result<(AppendResponse, AppendNotifier), DbErrorWrite> {
1944 if matches!(req.event, ExecutionRequest::Created { .. }) {
1945 return Err(DbErrorWrite::NonRetriable(
1946 DbErrorWriteNonRetriable::ValidationFailed(
1947 "cannot append `Created` event - use `create` instead".into(),
1948 ),
1949 ));
1950 }
1951
1952 if let AppendRequest {
1953 event:
1954 ExecutionRequest::Locked(Locked {
1955 component_id,
1956 deployment_id,
1957 executor_id,
1958 run_id,
1959 lock_expires_at,
1960 retry_config,
1961 }),
1962 created_at,
1963 } = req
1964 {
1965 return lock_single_execution(
1966 tx,
1967 created_at,
1968 &component_id,
1969 deployment_id,
1970 execution_id,
1971 run_id,
1972 &appending_version,
1973 executor_id,
1974 lock_expires_at,
1975 retry_config,
1976 )
1977 .await
1978 .map(|locked_execution| (locked_execution.next_version, AppendNotifier::default()));
1979 }
1980
1981 let combined_state = get_combined_state(tx, execution_id).await?;
1982 if combined_state
1983 .execution_with_state
1984 .pending_state
1985 .is_finished()
1986 {
1987 debug!("Execution is already finished");
1988 return Err(DbErrorWrite::NonRetriable(
1989 DbErrorWriteNonRetriable::AlreadyFinished,
1990 ));
1991 }
1992
1993 check_expected_next_and_appending_version(
1994 &combined_state.get_next_version_assert_not_finished(),
1995 &appending_version,
1996 )?;
1997
1998 let event = Json(req.event);
1999
2000 tx.execute(
2002 "INSERT INTO t_execution_log (execution_id, created_at, json_value, version, variant, join_set_id) \
2003 VALUES ($1, $2, $3, $4, $5, $6)",
2004 &[
2005 &execution_id.to_string(),
2006 &req.created_at,
2007 &event,
2008 &i64::from(appending_version.0),
2009 &event.0.variant(),
2010 &event.0.join_set_id().map(std::string::ToString::to_string),
2011 ],
2012 )
2013 .await?;
2014
2015 match &event.0 {
2017 ExecutionRequest::Created { .. } => {
2018 unreachable!("handled in the caller")
2019 }
2020
2021 ExecutionRequest::Locked { .. } => {
2022 unreachable!("handled above")
2023 }
2024
2025 ExecutionRequest::TemporarilyFailed {
2026 backoff_expires_at, ..
2027 }
2028 | ExecutionRequest::TemporarilyTimedOut {
2029 backoff_expires_at, ..
2030 } => {
2031 let (next_version, notifier) = update_state_pending_after_event_appended(
2032 tx,
2033 execution_id,
2034 &appending_version,
2035 *backoff_expires_at,
2036 true, combined_state.execution_with_state.component_digest,
2038 )
2039 .await?;
2040 return Ok((next_version, notifier));
2041 }
2042
2043 ExecutionRequest::Unlocked {
2044 backoff_expires_at, ..
2045 } => {
2046 let (next_version, notifier) = update_state_pending_after_event_appended(
2047 tx,
2048 execution_id,
2049 &appending_version,
2050 *backoff_expires_at,
2051 false, combined_state.execution_with_state.component_digest,
2053 )
2054 .await?;
2055 return Ok((next_version, notifier));
2056 }
2057
2058 ExecutionRequest::Paused => {
2059 match &combined_state.execution_with_state.pending_state {
2060 PendingState::Finished { .. } => {
2061 unreachable!("handled above");
2062 }
2063 PendingState::Locked(..) => {
2064 return Err(DbErrorWriteNonRetriable::IllegalState {
2065 reason:
2066 "cannot append Paused event when execution is locked; use pause_execution"
2067 .into(),
2068 context: SpanTrace::capture(),
2069 source: None,
2070 loc: Location::caller(),
2071 }
2072 .into());
2073 }
2074 PendingState::Paused(..) => {
2075 return Err(DbErrorWriteNonRetriable::IllegalState {
2076 reason: "cannot pause, execution is already paused".into(),
2077 context: SpanTrace::capture(),
2078 source: None,
2079 loc: Location::caller(),
2080 }
2081 .into());
2082 }
2083 _ => {}
2084 }
2085 let next_version =
2086 update_state_paused(tx, execution_id, &appending_version, true).await?;
2087 return Ok((next_version, AppendNotifier::default()));
2088 }
2089
2090 ExecutionRequest::Unpaused => {
2091 if !combined_state
2092 .execution_with_state
2093 .pending_state
2094 .is_paused()
2095 {
2096 return Err(DbErrorWriteNonRetriable::IllegalState {
2097 reason: "cannot unpause, execution is not paused".into(),
2098 context: SpanTrace::capture(),
2099 source: None,
2100 loc: Location::caller(),
2101 }
2102 .into());
2103 }
2104 let next_version =
2105 update_state_paused(tx, execution_id, &appending_version, false).await?;
2106 return Ok((next_version, AppendNotifier::default()));
2107 }
2108
2109 ExecutionRequest::Finished { retval, .. } => {
2110 update_state_finished(
2111 tx,
2112 execution_id,
2113 &appending_version,
2114 req.created_at,
2115 PendingStateFinishedResultKind::from(retval),
2116 )
2117 .await?;
2118 return Ok((
2119 appending_version,
2120 AppendNotifier {
2121 pending_at: None,
2122 execution_finished: Some(NotifierExecutionFinished {
2123 execution_id: execution_id.clone(),
2124 retval: retval.clone(),
2125 }),
2126 response: None,
2127 },
2128 ));
2129 }
2130
2131 ExecutionRequest::HistoryEvent {
2132 event:
2133 HistoryEvent::JoinSetCreate { .. }
2134 | HistoryEvent::JoinSetRequest {
2135 request: JoinSetRequest::ChildExecutionRequest { .. },
2136 ..
2137 }
2138 | HistoryEvent::Persist { .. }
2139 | HistoryEvent::Schedule { .. }
2140 | HistoryEvent::Stub { .. }
2141 | HistoryEvent::JoinNextTooMany { .. }
2142 | HistoryEvent::JoinNextTry { .. },
2143 } => {
2144 return Ok((
2145 bump_state_next_version(tx, execution_id, &appending_version, None).await?,
2146 AppendNotifier::default(),
2147 ));
2148 }
2149
2150 ExecutionRequest::HistoryEvent {
2151 event:
2152 HistoryEvent::JoinSetRequest {
2153 join_set_id,
2154 request:
2155 JoinSetRequest::DelayRequest {
2156 delay_id,
2157 expires_at,
2158 paused,
2159 ..
2160 },
2161 },
2162 } => {
2163 return Ok((
2164 bump_state_next_version(
2165 tx,
2166 execution_id,
2167 &appending_version,
2168 Some(DelayReq {
2169 join_set_id: join_set_id.clone(),
2170 delay_id: delay_id.clone(),
2171 expires_at: *expires_at,
2172 paused: *paused,
2173 }),
2174 )
2175 .await?,
2176 AppendNotifier::default(),
2177 ));
2178 }
2179
2180 ExecutionRequest::HistoryEvent {
2181 event:
2182 HistoryEvent::JoinNext {
2183 join_set_id,
2184 run_expires_at,
2185 closing,
2186 requested_ffqn: _,
2187 },
2188 } => {
2189 let join_next_count = count_join_next(tx, execution_id, join_set_id).await?;
2191
2192 let nth_response =
2194 nth_response(tx, execution_id, join_set_id, join_next_count - 1).await?;
2195
2196 trace!("join_next_count: {join_next_count}, nth_response: {nth_response:?}");
2197 assert!(join_next_count > 0);
2198
2199 if let Some(ResponseWithCursor {
2200 event:
2201 JoinSetResponseEventOuter {
2202 created_at: nth_created_at,
2203 ..
2204 },
2205 cursor: _,
2206 }) = nth_response
2207 {
2208 let scheduled_at = std::cmp::max(*run_expires_at, nth_created_at);
2209 let (next_version, notifier) = update_state_pending_after_event_appended(
2210 tx,
2211 execution_id,
2212 &appending_version,
2213 scheduled_at,
2214 false, combined_state.execution_with_state.component_digest,
2216 )
2217 .await?;
2218 return Ok((next_version, notifier));
2219 }
2220
2221 return Ok((
2222 update_state_blocked(
2223 tx,
2224 execution_id,
2225 &appending_version,
2226 join_set_id,
2227 *run_expires_at,
2228 *closing,
2229 )
2230 .await?,
2231 AppendNotifier::default(),
2232 ));
2233 }
2234 }
2235}
2236
2237async fn append_response(
2238 tx: &Transaction<'_>,
2239 execution_id: &ExecutionId,
2240 event: JoinSetResponseEventOuter,
2241) -> Result<AppendNotifier, DbErrorWrite> {
2242 let join_set_id = &event.event.join_set_id;
2243
2244 let (delay_id, delay_success) = match &event.event.event {
2245 JoinSetResponse::DelayFinished { delay_id, result } => {
2246 (Some(delay_id.to_string()), Some(result.is_ok()))
2247 }
2248 JoinSetResponse::ChildExecutionFinished { .. } => (None, None),
2249 };
2250
2251 let (child_execution_id, finished_version) = match &event.event.event {
2252 JoinSetResponse::ChildExecutionFinished {
2253 child_execution_id,
2254 finished_version,
2255 result: _,
2256 } => (
2257 Some(child_execution_id.to_string()),
2258 Some(i64::from(finished_version.0)),
2259 ),
2260 JoinSetResponse::DelayFinished { .. } => (None, None),
2261 };
2262
2263 let row = tx.query_one(
2264 "INSERT INTO t_join_set_response (execution_id, created_at, join_set_id, delay_id, delay_success, child_execution_id, finished_version) \
2265 VALUES ($1, $2, $3, $4, $5, $6, $7) RETURNING id",
2266 &[
2267 &execution_id.to_string(),
2268 &event.created_at,
2269 &join_set_id.to_string(),
2270 &delay_id,
2271 &delay_success,
2272 &child_execution_id,
2273 &finished_version,
2274 ]
2275 ).await?;
2276 let cursor = ResponseCursor(
2277 u32::try_from(get::<i64, _>(&row, 0)?)
2278 .map_err(|_| consistency_db_err("t_join_set_response.id must not be negative"))?,
2279 );
2280 let combined_state = get_combined_state(tx, execution_id).await?;
2282 debug!("previous_pending_state: {combined_state:?}");
2283
2284 let mut notifier = if let PendingStateMergedPause::BlockedByJoinSet {
2285 state:
2286 PendingStateBlockedByJoinSet {
2287 join_set_id: found_join_set_id,
2288 lock_expires_at, closing: _,
2290 },
2291 paused: _,
2292 } =
2293 PendingStateMergedPause::from(combined_state.execution_with_state.pending_state)
2294 && *join_set_id == found_join_set_id
2295 {
2296 let scheduled_at = std::cmp::max(lock_expires_at, event.created_at);
2297 update_state_pending_after_response_appended(
2299 tx,
2300 execution_id,
2301 scheduled_at,
2302 combined_state.execution_with_state.component_digest,
2303 )
2304 .await?
2305 } else {
2306 AppendNotifier::default()
2307 };
2308
2309 if let JoinSetResponseEvent {
2310 join_set_id,
2311 event:
2312 JoinSetResponse::DelayFinished {
2313 delay_id,
2314 result: _,
2315 },
2316 } = &event.event
2317 {
2318 debug!(%join_set_id, %delay_id, "Deleting from `t_delay`");
2319 tx.execute(
2320 "DELETE FROM t_delay WHERE execution_id = $1 AND join_set_id = $2 AND delay_id = $3",
2321 &[
2322 &execution_id.to_string(),
2323 &join_set_id.to_string(),
2324 &delay_id.to_string(),
2325 ],
2326 )
2327 .await?;
2328 }
2329
2330 notifier.response = Some((execution_id.clone(), ResponseWithCursor { cursor, event }));
2331 Ok(notifier)
2332}
2333
2334async fn append_backtrace(
2335 tx: &Transaction<'_>,
2336 backtrace_info: &BacktraceInfo,
2337) -> Result<(), DbErrorWrite> {
2338 let backtrace_hash = backtrace_info.wasm_backtrace.hash();
2340
2341 tx.execute(
2343 "INSERT INTO t_wasm_backtrace (backtrace_hash, wasm_backtrace) \
2344 VALUES ($1, $2) \
2345 ON CONFLICT (backtrace_hash) DO NOTHING",
2346 &[
2347 &backtrace_hash.as_slice(),
2348 &Json(&backtrace_info.wasm_backtrace),
2349 ],
2350 )
2351 .await?;
2352
2353 tx.execute(
2355 "INSERT INTO t_execution_backtrace \
2356 (execution_id, component_id, version_min_including, version_max_excluding, backtrace_hash) \
2357 VALUES ($1, $2, $3, $4, $5)",
2358 &[
2359 &backtrace_info.execution_id.to_string(),
2360 &Json(&backtrace_info.component_id),
2361 &i64::from(backtrace_info.version_min_including.0),
2362 &i64::from(backtrace_info.version_max_excluding.0),
2363 &backtrace_hash.as_slice(),
2364 ],
2365 )
2366 .await?;
2367
2368 Ok(())
2369}
2370
2371async fn append_log(tx: &Transaction<'_>, row: &LogInfoAppendRow) -> Result<(), DbErrorWrite> {
2372 let (level, message, stream_type, payload, created_at) = match &row.log_entry {
2373 LogEntry::Log {
2374 created_at,
2375 level,
2376 message,
2377 } => (
2378 Some(*level as i32),
2379 Some(message.as_str()),
2380 None::<i32>,
2381 None::<&[u8]>,
2382 created_at,
2383 ),
2384 LogEntry::Stream {
2385 created_at,
2386 payload,
2387 stream_type,
2388 } => (
2389 None::<i32>,
2390 None::<&str>,
2391 Some(*stream_type as i32),
2392 Some(payload.as_slice()),
2393 created_at,
2394 ),
2395 };
2396
2397 tx.execute(
2398 "INSERT INTO t_log (
2399 execution_id,
2400 run_id,
2401 created_at,
2402 level,
2403 message,
2404 stream_type,
2405 payload
2406 ) VALUES ($1, $2, $3, $4, $5, $6, $7)",
2407 &[
2408 &row.execution_id.to_string(),
2409 &row.run_id.to_string(),
2410 &created_at,
2411 &level,
2412 &message,
2413 &stream_type,
2414 &payload,
2415 ],
2416 )
2417 .await?;
2418
2419 Ok(())
2420}
2421
2422async fn get_execution_log(
2423 tx: &Transaction<'_>,
2424 execution_id: &ExecutionId,
2425) -> Result<concepts::storage::ExecutionLog, DbErrorRead> {
2426 let rows = tx
2427 .query(
2428 "SELECT created_at, json_value, version FROM t_execution_log WHERE \
2429 execution_id = $1 ORDER BY version",
2430 &[&execution_id.to_string()],
2431 )
2432 .await
2433 .map_err(DbErrorRead::from)?;
2434
2435 if rows.is_empty() {
2436 return Err(DbErrorRead::NotFound);
2437 }
2438
2439 let mut events = Vec::with_capacity(rows.len());
2440 for row in rows {
2441 let created_at: DateTime<Utc> = get(&row, "created_at")?;
2442 let event: Json<ExecutionRequest> = get(&row, "json_value")?;
2443 let event = event.0;
2444 let version: i64 = get(&row, "version")?;
2445 let version = Version::try_from(version)
2446 .map_err(|_| consistency_db_err("version must be non-negative"))?;
2447
2448 events.push(ExecutionEvent {
2449 created_at,
2450 event,
2451 backtrace_id: None,
2452 version,
2453 });
2454 }
2455
2456 let combined_state = get_combined_state(tx, execution_id).await?;
2457 let responses = list_responses(tx, execution_id, None).await?;
2458
2459 Ok(concepts::storage::ExecutionLog {
2460 execution_id: execution_id.clone(),
2461 events,
2462 responses,
2463 next_version: combined_state.get_next_version_or_finished(),
2464 pending_state: combined_state.execution_with_state.pending_state,
2465 component_digest: combined_state.execution_with_state.component_digest,
2466 component_type: combined_state.execution_with_state.component_type,
2467 deployment_id: combined_state.execution_with_state.deployment_id,
2468 })
2469}
2470
2471async fn get_max_version(
2472 tx: &Transaction<'_>,
2473 execution_id: &ExecutionId,
2474) -> Result<Version, DbErrorRead> {
2475 let row = tx
2476 .query_one(
2477 "SELECT MAX(version) as version FROM t_execution_log WHERE execution_id = $1",
2478 &[&execution_id.to_string()],
2479 )
2480 .await?;
2481 let max_version: i64 = get(&row, "version")?;
2482 let max_version = Version::try_from(max_version)
2483 .map_err(|_| consistency_db_err("version must be non-negative"))?;
2484 Ok(max_version)
2485}
2486
2487async fn get_max_response_cursor(
2488 tx: &Transaction<'_>,
2489 execution_id: &ExecutionId,
2490) -> Result<ResponseCursor, DbErrorRead> {
2491 let row = tx
2492 .query_one(
2493 "SELECT MAX(id) as id FROM t_join_set_response WHERE execution_id = $1",
2494 &[&execution_id.to_string()],
2495 )
2496 .await?;
2497 let max_cursor = get::<Option<i64>, _>(&row, "id")?.unwrap_or_default();
2499 let max_cursor = ResponseCursor(
2500 u32::try_from(max_cursor).map_err(|_| consistency_db_err("id must not be negative"))?,
2501 );
2502 Ok(max_cursor)
2503}
2504
2505async fn list_execution_events(
2506 tx: &Transaction<'_>,
2507 execution_id: &ExecutionId,
2508 pagination: Pagination<VersionType>,
2509 include_backtrace_id: bool,
2510) -> Result<Vec<ExecutionEvent>, DbErrorRead> {
2511 let mut params: Vec<Box<dyn ToSql + Send + Sync>> = Vec::new();
2512 let mut add_param = |p: Box<dyn ToSql + Send + Sync>| {
2513 params.push(p);
2514 format!("${}", params.len())
2515 };
2516
2517 let p_execution_id = add_param(Box::new(execution_id.to_string()));
2518
2519 let (cursor, length, rel, is_desc) = match &pagination {
2520 Pagination::NewerThan {
2521 cursor,
2522 length,
2523 including_cursor,
2524 } => (
2525 *cursor,
2526 *length,
2527 if *including_cursor { ">=" } else { ">" },
2528 false,
2529 ),
2530 Pagination::OlderThan {
2531 cursor,
2532 length,
2533 including_cursor,
2534 } => (
2535 *cursor,
2536 *length,
2537 if *including_cursor { "<=" } else { "<" },
2538 true,
2539 ),
2540 };
2541 let p_cursor = add_param(Box::new(i64::from(cursor)));
2542 let p_limit = add_param(Box::new(i64::from(length)));
2543
2544 let base_select = if include_backtrace_id {
2545 format!(
2546 "SELECT
2547 log.created_at,
2548 log.json_value,
2549 log.version,
2550 bt.version_min_including AS backtrace_id
2551 FROM
2552 t_execution_log AS log
2553 LEFT OUTER JOIN
2554 t_execution_backtrace AS bt ON log.execution_id = bt.execution_id
2555 AND log.version >= bt.version_min_including
2556 AND log.version < bt.version_max_excluding
2557 WHERE
2558 log.execution_id = {p_execution_id}
2559 AND log.version {rel} {p_cursor}"
2560 )
2561 } else {
2562 format!(
2563 "SELECT
2564 created_at, json_value, NULL::BIGINT as backtrace_id, version
2565 FROM t_execution_log WHERE
2566 execution_id = {p_execution_id} AND version {rel} {p_cursor}"
2567 )
2568 };
2569
2570 let order = if is_desc { "DESC" } else { "ASC" };
2571 let mut sql = format!("{base_select} ORDER BY version {order} LIMIT {p_limit}");
2572
2573 if is_desc {
2575 sql = format!("SELECT * FROM ({sql}) AS sub ORDER BY version ASC");
2576 }
2577
2578 let params_refs: Vec<&(dyn ToSql + Sync)> = params
2579 .iter()
2580 .map(|p| p.as_ref() as &(dyn ToSql + Sync))
2581 .collect();
2582
2583 let rows = tx
2584 .query(&sql, ¶ms_refs)
2585 .await
2586 .map_err(DbErrorRead::from)?;
2587
2588 let mut events = Vec::with_capacity(rows.len());
2589 for row in rows {
2590 let created_at: DateTime<Utc> = get(&row, "created_at")?;
2591 let backtrace_id = get::<Option<i64>, _>(&row, "backtrace_id")?
2592 .map(Version::try_from)
2593 .transpose()
2594 .map_err(|_| consistency_db_err("version must be non-negative"))?;
2595
2596 let version = get::<i64, _>(&row, "version")?;
2597 let version = Version::new(
2598 VersionType::try_from(version)
2599 .map_err(|_| consistency_db_err("version must be non-negative"))?,
2600 );
2601 let event_req: Json<ExecutionRequest> = get(&row, "json_value")?;
2602 let event_req = event_req.0;
2603
2604 events.push(ExecutionEvent {
2605 created_at,
2606 event: event_req,
2607 backtrace_id,
2608 version,
2609 });
2610 }
2611 Ok(events)
2612}
2613
2614async fn get_execution_event(
2615 tx: &Transaction<'_>,
2616 execution_id: &ExecutionId,
2617 version: VersionType,
2618) -> Result<ExecutionEvent, DbErrorRead> {
2619 let row = tx
2620 .query_one(
2621 "SELECT created_at, json_value, version FROM t_execution_log WHERE \
2622 execution_id = $1 AND version = $2",
2623 &[&execution_id.to_string(), &i64::from(version)],
2624 )
2625 .await?;
2626
2627 let created_at: DateTime<Utc> = get(&row, "created_at")?;
2628 let json_val: Json<ExecutionRequest> = get(&row, "json_value")?;
2629 let version = get::<i64, _>(&row, "version")?;
2630 let version = Version::try_from(version)
2631 .map_err(|_| consistency_db_err("version must be non-negative"))?;
2632 let event = json_val.0;
2633
2634 Ok(ExecutionEvent {
2635 created_at,
2636 event,
2637 backtrace_id: None,
2638 version,
2639 })
2640}
2641
2642async fn get_last_execution_event(
2643 tx: &Transaction<'_>,
2644 execution_id: &ExecutionId,
2645) -> Result<ExecutionEvent, DbErrorRead> {
2646 let row = tx
2647 .query_one(
2648 "SELECT created_at, json_value, version FROM t_execution_log WHERE \
2649 execution_id = $1 ORDER BY version DESC LIMIT 1",
2650 &[&execution_id.to_string()],
2651 )
2652 .await?;
2653
2654 let created_at: DateTime<Utc> = get(&row, "created_at")?;
2655 let event: Json<ExecutionRequest> = get(&row, "json_value")?;
2656 let event = event.0;
2657 let version: i64 = get(&row, "version")?;
2658 let version = Version::try_from(version)
2659 .map_err(|_| consistency_db_err("version must be non-negative"))?;
2660
2661 Ok(ExecutionEvent {
2662 created_at,
2663 event,
2664 backtrace_id: None,
2665 version,
2666 })
2667}
2668
2669async fn delay_response(
2670 tx: &Transaction<'_>,
2671 execution_id: &ExecutionId,
2672 delay_id: &DelayId,
2673) -> Result<Option<bool>, DbErrorRead> {
2674 let row = tx
2675 .query_opt(
2676 "SELECT delay_success \
2677 FROM t_join_set_response \
2678 WHERE \
2679 execution_id = $1 AND delay_id = $2",
2680 &[&execution_id.to_string(), &delay_id.to_string()],
2681 )
2682 .await?;
2683
2684 match row {
2685 Some(r) => Ok(Some(get::<bool, _>(&r, "delay_success")?)),
2686 None => Ok(None),
2687 }
2688}
2689
2690#[instrument(level = Level::TRACE, skip_all)]
2691async fn get_responses_after(
2692 tx: &Transaction<'_>,
2693 execution_id: &ExecutionId,
2694 last_response: ResponseCursor,
2695) -> Result<Vec<ResponseWithCursor>, DbErrorRead> {
2696 let rows = tx
2697 .query(
2698 "SELECT r.id, r.created_at, r.join_set_id, \
2699 r.delay_id, r.delay_success, \
2700 r.child_execution_id, r.finished_version, child.json_value \
2701 FROM t_join_set_response r LEFT OUTER JOIN t_execution_log child ON r.child_execution_id = child.execution_id \
2702 WHERE \
2703 r.id > $1 AND \
2704 r.execution_id = $2 AND \
2705 ( \
2706 r.finished_version = child.version \
2707 OR \
2708 r.child_execution_id IS NULL \
2709 ) \
2710 ORDER BY id \
2711 ",
2712 &[
2713 &i64::from(last_response.0),
2714 &execution_id.to_string(),
2715 ]
2716 )
2717 .await?;
2718
2719 let mut results = Vec::with_capacity(rows.len());
2720 for row in rows {
2721 let resp = parse_response_with_cursor(&row)?;
2722 results.push(resp);
2723 }
2724 Ok(results)
2725}
2726
2727async fn get_pending_of_single_ffqn(
2728 tx: &Transaction<'_>,
2729 batch_size: u32,
2730 pending_at_or_sooner: DateTime<Utc>,
2731 ffqn: &FunctionFqn,
2732 select_strategy: SelectStrategy,
2733) -> Result<Vec<(ExecutionId, Version)>, ()> {
2734 let rows = tx
2735 .query(
2736 &format!(
2737 r"
2738 SELECT execution_id, corresponding_version FROM t_state
2739 WHERE
2740 state = '{STATE_PENDING_AT}' AND
2741 pending_expires_finished <= $1 AND ffqn = $2
2742 AND is_paused = false
2743 ORDER BY pending_expires_finished
2744 {}
2745 LIMIT $3
2746 ",
2747 if select_strategy == SelectStrategy::LockForUpdate {
2748 "FOR UPDATE SKIP LOCKED"
2749 } else {
2750 ""
2751 }
2752 ),
2753 &[
2754 &pending_at_or_sooner,
2755 &ffqn.to_string(),
2756 &(i64::from(batch_size)),
2757 ],
2758 )
2759 .await
2760 .map_err(|err| {
2761 warn!("Ignoring consistency error {err:?}");
2762 })?;
2763
2764 let mut result = Vec::with_capacity(rows.len());
2765 for row in rows {
2766 let unpack = || -> Result<(ExecutionId, Version), DbErrorGeneric> {
2767 let eid_str: String = get(&row, "execution_id")?;
2768 let corresponding_version: i64 = get(&row, "corresponding_version")?;
2769 let corresponding_version = Version::try_from(corresponding_version)
2770 .map_err(|_| consistency_db_err("version must be non-negative"))?;
2771
2772 if let Ok(eid) = ExecutionId::from_str(&eid_str) {
2773 return Ok((eid, corresponding_version.increment()));
2774 }
2775 Err(consistency_db_err("invalid execution_id"))
2776 };
2777
2778 match unpack() {
2779 Ok(val) => result.push(val),
2780 Err(err) => warn!("Ignoring corrupted row in pending check: {err:?}"),
2781 }
2782 }
2783 Ok(result)
2784}
2785
2786async fn get_pending_by_ffqns(
2788 tx: &Transaction<'_>,
2789 batch_size: u32,
2790 pending_at_or_sooner: DateTime<Utc>,
2791 ffqns: &[FunctionFqn],
2792 select_strategy: SelectStrategy,
2793) -> Result<Vec<(ExecutionId, Version)>, DbErrorGeneric> {
2794 let batch_size = usize::try_from(batch_size).expect("16 bit systems are unsupported");
2795 let mut execution_ids_versions = Vec::with_capacity(batch_size);
2796
2797 for ffqn in ffqns {
2798 let needed = batch_size - execution_ids_versions.len();
2799 if needed == 0 {
2800 break;
2801 }
2802 let needed = u32::try_from(needed).expect("u32 - usize cannot overflow an 32");
2803 if let Ok(execs) =
2804 get_pending_of_single_ffqn(tx, needed, pending_at_or_sooner, ffqn, select_strategy)
2805 .await
2806 {
2807 execution_ids_versions.extend(execs);
2808 }
2809 }
2810
2811 Ok(execution_ids_versions)
2812}
2813
2814#[derive(Debug, Copy, Clone, PartialEq, Eq)]
2815enum SelectStrategy {
2816 Read,
2817 LockForUpdate,
2818}
2819
2820async fn get_pending_by_component_input_digest(
2821 tx: &Transaction<'_>,
2822 batch_size: u32,
2823 pending_at_or_sooner: DateTime<Utc>,
2824 input_digest: &ComponentDigest,
2825 select_strategy: SelectStrategy,
2826) -> Result<Vec<(ExecutionId, Version)>, DbErrorGeneric> {
2827 let rows = tx
2828 .query(
2829 &format!(
2830 r"
2831 SELECT execution_id, corresponding_version FROM t_state WHERE
2832 state = '{STATE_PENDING_AT}' AND
2833 pending_expires_finished <= $1 AND
2834 component_id_input_digest = $2
2835 AND is_paused = false
2836 ORDER BY pending_expires_finished
2837 {}
2838 LIMIT $3
2839 ",
2840 if select_strategy == SelectStrategy::LockForUpdate {
2841 "FOR UPDATE SKIP LOCKED"
2842 } else {
2843 ""
2844 }
2845 ),
2846 &[&pending_at_or_sooner, &input_digest, &i64::from(batch_size)],
2847 )
2848 .await?;
2849
2850 let mut result = Vec::with_capacity(rows.len());
2851 for row in rows {
2852 let unpack = || -> Result<(ExecutionId, Version), DbErrorGeneric> {
2853 let eid_str: String = get(&row, "execution_id")?;
2854 let corresponding_version: i64 = get(&row, "corresponding_version")?;
2855 let corresponding_version = Version::try_from(corresponding_version)
2856 .map_err(|_| consistency_db_err("version must be non-negative"))?;
2857
2858 let eid = ExecutionId::from_str(&eid_str)
2859 .map_err(|err| consistency_db_err(err.to_string()))?;
2860 Ok((eid, corresponding_version.increment()))
2861 };
2862
2863 match unpack() {
2864 Ok(val) => result.push(val),
2865 Err(err) => {
2866 warn!("Skipping corrupted row in get_pending_by_component_input_digest: {err:?}");
2867 }
2868 }
2869 }
2870
2871 Ok(result)
2872}
2873
2874fn notify_pending_locked(
2875 notifier: &NotifierPendingAt,
2876 current_time: DateTime<Utc>,
2877 ffqn_to_pending_subscription: &std::sync::MutexGuard<PendingFfqnSubscribersHolder>,
2878) {
2879 if notifier.scheduled_at <= current_time {
2880 ffqn_to_pending_subscription.notify(notifier);
2881 }
2882}
2883
2884async fn upgrade_execution_component(
2885 tx: &Transaction<'_>,
2886 execution_id: &ExecutionId,
2887 old: &ComponentDigest,
2888 new: &ComponentDigest,
2889) -> Result<(), DbErrorWrite> {
2890 debug!("Updating t_state to component {new}");
2891
2892 let updated = tx
2893 .execute(
2894 r"
2895 UPDATE t_state
2896 SET
2897 updated_at = CURRENT_TIMESTAMP,
2898 component_id_input_digest = $1
2899 WHERE
2900 execution_id = $2 AND
2901 component_id_input_digest = $3
2902 ",
2903 &[
2904 &new.as_slice(), &execution_id.to_string(), &old.as_slice(), ],
2908 )
2909 .await?;
2910
2911 if updated != 1 {
2912 return Err(DbErrorWrite::NotFound);
2913 }
2914 Ok(())
2915}
2916
2917impl PostgresConnection {
2918 #[instrument(level = Level::TRACE, skip_all)]
2920 fn notify_all(&self, notifiers: Vec<AppendNotifier>, current_time: DateTime<Utc>) {
2921 let (pending_ats, finished_execs, responses) = {
2922 let (mut pending_ats, mut finished_execs, mut responses) =
2923 (Vec::new(), Vec::new(), Vec::new());
2924 for notifier in notifiers {
2925 if let Some(pending_at) = notifier.pending_at {
2926 pending_ats.push(pending_at);
2927 }
2928 if let Some(finished) = notifier.execution_finished {
2929 finished_execs.push(finished);
2930 }
2931 if let Some(response) = notifier.response {
2932 responses.push(response);
2933 }
2934 }
2935 (pending_ats, finished_execs, responses)
2936 };
2937
2938 if !pending_ats.is_empty() {
2940 let guard = self.pending_subscribers.lock().unwrap();
2941 for pending_at in pending_ats {
2942 notify_pending_locked(&pending_at, current_time, &guard);
2943 }
2944 }
2945 if !finished_execs.is_empty() {
2947 let mut guard = self.execution_finished_subscribers.lock().unwrap();
2948 for finished in finished_execs {
2949 if let Some(listeners_of_exe_id) = guard.remove(&finished.execution_id) {
2950 for (_tag, sender) in listeners_of_exe_id {
2951 let _ = sender.send(finished.retval.clone());
2952 }
2953 }
2954 }
2955 }
2956 if !responses.is_empty() {
2958 let mut guard = self.response_subscribers.lock().unwrap();
2959 for (execution_id, response) in responses {
2960 if let Some((sender, _)) = guard.remove(&execution_id) {
2961 let _ = sender.send(response);
2962 }
2963 }
2964 }
2965 }
2966}
2967
2968#[async_trait]
2969impl DbExecutor for PostgresConnection {
2970 #[instrument(level = Level::TRACE, skip(self))]
2971 async fn lock_pending_by_ffqns(
2972 &self,
2973 batch_size: u32,
2974 pending_at_or_sooner: DateTime<Utc>,
2975 ffqns: Arc<[FunctionFqn]>,
2976 created_at: DateTime<Utc>,
2977 component_id: ComponentId,
2978 deployment_id: DeploymentId,
2979 executor_id: ExecutorId,
2980 lock_expires_at: DateTime<Utc>,
2981 run_id: RunId,
2982 retry_config: ComponentRetryConfig,
2983 ) -> Result<LockPendingResponse, DbErrorWrite> {
2984 let mut client_guard = self.client.lock().await;
2985 let tx = client_guard.transaction().await?;
2986
2987 let execution_ids_versions = get_pending_by_ffqns(
2988 &tx,
2989 batch_size,
2990 pending_at_or_sooner,
2991 &ffqns,
2992 SelectStrategy::LockForUpdate,
2993 )
2994 .await?;
2995
2996 if execution_ids_versions.is_empty() {
2997 tx.commit().await?;
3000 return Ok(vec![]);
3001 }
3002
3003 debug!("Locking {execution_ids_versions:?}");
3004
3005 let mut locked_execs = Vec::with_capacity(execution_ids_versions.len());
3007 for (execution_id, version) in execution_ids_versions {
3008 match lock_single_execution(
3009 &tx,
3010 created_at,
3011 &component_id,
3012 deployment_id,
3013 &execution_id,
3014 run_id,
3015 &version,
3016 executor_id,
3017 lock_expires_at,
3018 retry_config,
3019 )
3020 .await
3021 {
3022 Ok(locked) => locked_execs.push(locked),
3023 Err(err) => {
3024 tx.rollback().await?; debug!("Locking row {execution_id} failed - {err:?}");
3026 return Err(err);
3027 }
3028 }
3029 }
3030
3031 tx.commit().await?;
3032
3033 Ok(locked_execs)
3034 }
3035
3036 #[instrument(level = Level::TRACE, skip(self))]
3037 async fn lock_pending_by_component_digest(
3038 &self,
3039 batch_size: u32,
3040 pending_at_or_sooner: DateTime<Utc>,
3041 component_id: &ComponentId,
3042 deployment_id: DeploymentId,
3043 created_at: DateTime<Utc>,
3044 executor_id: ExecutorId,
3045 lock_expires_at: DateTime<Utc>,
3046 run_id: RunId,
3047 retry_config: ComponentRetryConfig,
3048 ) -> Result<LockPendingResponse, DbErrorWrite> {
3049 let mut client_guard = self.client.lock().await;
3050 let tx = client_guard.transaction().await?;
3051
3052 let execution_ids_versions = get_pending_by_component_input_digest(
3053 &tx,
3054 batch_size,
3055 pending_at_or_sooner,
3056 &component_id.component_digest,
3057 SelectStrategy::LockForUpdate,
3058 )
3059 .await?;
3060
3061 if execution_ids_versions.is_empty() {
3062 tx.commit().await?;
3063 return Ok(vec![]);
3064 }
3065
3066 debug!("Locking {execution_ids_versions:?}");
3067
3068 let mut locked_execs = Vec::with_capacity(execution_ids_versions.len());
3069 for (execution_id, version) in execution_ids_versions {
3070 match lock_single_execution(
3071 &tx,
3072 created_at,
3073 component_id,
3074 deployment_id,
3075 &execution_id,
3076 run_id,
3077 &version,
3078 executor_id,
3079 lock_expires_at,
3080 retry_config,
3081 )
3082 .await
3083 {
3084 Ok(locked) => locked_execs.push(locked),
3085 Err(err) => {
3086 tx.rollback().await?; debug!("Locking row {execution_id} failed - {err:?}");
3088 return Err(err);
3089 }
3090 }
3091 }
3092
3093 tx.commit().await?;
3094 Ok(locked_execs)
3095 }
3096
3097 #[cfg(feature = "test")]
3098 #[instrument(level = Level::DEBUG, skip(self))]
3099 async fn lock_one(
3100 &self,
3101 created_at: DateTime<Utc>,
3102 component_id: ComponentId,
3103 deployment_id: DeploymentId,
3104 execution_id: &ExecutionId,
3105 run_id: RunId,
3106 version: Version,
3107 executor_id: ExecutorId,
3108 lock_expires_at: DateTime<Utc>,
3109 retry_config: ComponentRetryConfig,
3110 ) -> Result<LockedExecution, DbErrorWrite> {
3111 debug!(%execution_id, "lock_one");
3112 let mut client_guard = self.client.lock().await;
3113 let tx = client_guard.transaction().await?;
3114
3115 let res = lock_single_execution(
3116 &tx,
3117 created_at,
3118 &component_id,
3119 deployment_id,
3120 execution_id,
3121 run_id,
3122 &version,
3123 executor_id,
3124 lock_expires_at,
3125 retry_config,
3126 )
3127 .await?;
3128
3129 tx.commit().await?;
3130 Ok(res)
3131 }
3132
3133 #[instrument(level = Level::DEBUG, skip(self, req))]
3134 async fn append(
3135 &self,
3136 execution_id: ExecutionId,
3137 version: Version,
3138 req: AppendRequest,
3139 ) -> Result<AppendResponse, DbErrorWrite> {
3140 debug!(%req, "append");
3141 trace!(?req, "append");
3142 let created_at = req.created_at;
3143
3144 let mut client_guard = self.client.lock().await;
3145 let tx = client_guard.transaction().await?;
3146
3147 let (new_version, notifier) = append(&tx, &execution_id, req, version).await?;
3148
3149 tx.commit().await?;
3150
3151 drop(client_guard);
3153
3154 self.notify_all(vec![notifier], created_at);
3155 Ok(new_version)
3156 }
3157
3158 #[instrument(level = Level::DEBUG, skip_all)]
3159 async fn append_batch_respond_to_parent(
3160 &self,
3161 events: AppendEventsToExecution,
3162 response: AppendResponseToExecution,
3163 current_time: DateTime<Utc>,
3164 ) -> Result<AppendBatchResponse, DbErrorWrite> {
3165 debug!("append_batch_respond_to_parent");
3166 if events.execution_id == response.parent_execution_id {
3167 return Err(DbErrorWrite::NonRetriable(
3168 DbErrorWriteNonRetriable::ValidationFailed(
3169 "Parameters `execution_id` and `parent_execution_id` cannot be the same".into(),
3170 ),
3171 ));
3172 }
3173 if events.batch.is_empty() {
3174 return Err(DbErrorWrite::NonRetriable(
3175 DbErrorWriteNonRetriable::ValidationFailed("batch cannot be empty".into()),
3176 ));
3177 }
3178
3179 let mut client_guard = self.client.lock().await;
3180 let tx = client_guard.transaction().await?;
3181
3182 let mut version = events.version;
3183 let mut notifiers = Vec::new();
3184
3185 for append_request in events.batch {
3186 let (v, n) = append(&tx, &events.execution_id, append_request, version).await?;
3187 version = v;
3188 notifiers.push(n);
3189 }
3190
3191 let pending_at_parent = append_response(
3192 &tx,
3193 &response.parent_execution_id,
3194 JoinSetResponseEventOuter {
3195 created_at: response.created_at,
3196 event: JoinSetResponseEvent {
3197 join_set_id: response.join_set_id,
3198 event: JoinSetResponse::ChildExecutionFinished {
3199 child_execution_id: response.child_execution_id,
3200 finished_version: response.finished_version,
3201 result: response.result,
3202 },
3203 },
3204 },
3205 )
3206 .await?;
3207 notifiers.push(pending_at_parent);
3208
3209 tx.commit().await?;
3210 drop(client_guard);
3211
3212 self.notify_all(notifiers, current_time);
3213 Ok(version)
3214 }
3215
3216 #[instrument(level = Level::TRACE, skip(self, timeout_fut))]
3217 async fn wait_for_pending_by_ffqn(
3218 &self,
3219 pending_at_or_sooner: DateTime<Utc>,
3220 ffqns: Arc<[FunctionFqn]>,
3221 timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
3222 ) {
3223 let unique_tag: u64 = rand::random();
3224 let (sender, mut receiver) = mpsc::channel(1);
3225 {
3226 let mut pending_subscribers = self.pending_subscribers.lock().unwrap();
3227 for ffqn in ffqns.as_ref() {
3228 pending_subscribers.insert_ffqn(ffqn.clone(), (sender.clone(), unique_tag));
3229 }
3230 }
3231
3232 async {
3233 let mut db_has_pending = false;
3234 {
3235 let mut client_guard = self.client.lock().await;
3237 if let Ok(tx) = client_guard.transaction().await {
3239 if let Ok(res) = get_pending_by_ffqns(
3240 &tx,
3241 1,
3242 pending_at_or_sooner,
3243 &ffqns,
3244 SelectStrategy::Read,
3245 )
3246 .await
3247 && !res.is_empty()
3248 {
3249 db_has_pending = true;
3250 }
3251 let _ = tx.commit().await;
3253 }
3254 }
3255
3256 if db_has_pending {
3257 trace!("Not waiting, database already contains new pending executions");
3258 return;
3259 }
3260
3261 tokio::select! {
3262 _ = receiver.recv() => {
3263 trace!("Received a notification");
3264 }
3265 () = timeout_fut => {
3266 }
3267 }
3268 }
3269 .await;
3270
3271 {
3273 let mut pending_subscribers = self.pending_subscribers.lock().unwrap();
3274 for ffqn in ffqns.as_ref() {
3275 match pending_subscribers.remove_ffqn(ffqn) {
3276 Some((_, tag)) if tag == unique_tag => {}
3277 Some(other) => {
3278 pending_subscribers.insert_ffqn(ffqn.clone(), other);
3279 }
3280 None => {}
3281 }
3282 }
3283 }
3284 }
3285
3286 #[instrument(level = Level::DEBUG, skip(self, timeout_fut))]
3287 async fn wait_for_pending_by_component_digest(
3288 &self,
3289 pending_at_or_sooner: DateTime<Utc>,
3290 component_digest: &ComponentDigest,
3291 timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
3292 ) {
3293 let unique_tag: u64 = rand::random();
3294 let (sender, mut receiver) = mpsc::channel(1);
3295 {
3296 let mut pending_subscribers = self.pending_subscribers.lock().unwrap();
3297 pending_subscribers
3298 .insert_by_component(component_digest.clone(), (sender.clone(), unique_tag));
3299 }
3300
3301 async {
3302 let mut db_has_pending = false;
3303 {
3304 let mut client_guard = self.client.lock().await;
3305 if let Ok(tx) = client_guard.transaction().await {
3306 if let Ok(res) = get_pending_by_component_input_digest(
3307 &tx,
3308 1,
3309 pending_at_or_sooner,
3310 component_digest,
3311 SelectStrategy::Read,
3312 )
3313 .await
3314 && !res.is_empty()
3315 {
3316 db_has_pending = true;
3317 }
3318 let _ = tx.commit().await;
3319 }
3320 }
3321
3322 if db_has_pending {
3323 trace!("Not waiting, database already contains new pending executions");
3324 return;
3325 }
3326
3327 tokio::select! {
3328 _ = receiver.recv() => {
3329 trace!("Received a notification");
3330 }
3331 () = timeout_fut => {
3332 }
3333 }
3334 }
3335 .await;
3336
3337 {
3339 let mut pending_subscribers = self.pending_subscribers.lock().unwrap();
3340 match pending_subscribers.remove_by_component(component_digest) {
3341 Some((_, tag)) if tag == unique_tag => {}
3342 Some(other) => {
3343 pending_subscribers.insert_by_component(component_digest.clone(), other);
3344 }
3345 None => {}
3346 }
3347 }
3348 }
3349
3350 async fn get_last_execution_event(
3351 &self,
3352 execution_id: &ExecutionId,
3353 ) -> Result<ExecutionEvent, DbErrorRead> {
3354 let mut client_guard = self.client.lock().await;
3355 let tx = client_guard.transaction().await?;
3356
3357 let event = get_last_execution_event(&tx, execution_id).await?;
3358
3359 tx.commit().await?;
3360 Ok(event)
3361 }
3362}
3363#[async_trait]
3364impl DbConnection for PostgresConnection {
3365 #[instrument(level = Level::DEBUG, skip_all, fields(execution_id = %req.execution_id))]
3366 async fn create(&self, req: CreateRequest) -> Result<AppendResponse, DbErrorWrite> {
3367 debug!("create");
3368 trace!(?req, "create");
3369 let created_at = req.created_at;
3370
3371 let mut client_guard = self.client.lock().await;
3372 let tx = client_guard.transaction().await?;
3373
3374 let (version, notifier) = create_inner(&tx, req.clone()).await?;
3375
3376 tx.commit().await?;
3377 drop(client_guard); self.notify_all(vec![notifier], created_at);
3380 Ok(version)
3381 }
3382
3383 #[instrument(level = Level::DEBUG, skip(self))]
3384 async fn get(
3385 &self,
3386 execution_id: &ExecutionId,
3387 ) -> Result<concepts::storage::ExecutionLog, DbErrorRead> {
3388 trace!("get");
3389 let mut client_guard = self.client.lock().await;
3390 let tx = client_guard.transaction().await?;
3391
3392 let res = get_execution_log(&tx, execution_id).await?;
3393
3394 tx.commit().await?;
3395 Ok(res)
3396 }
3397
3398 #[instrument(level = Level::DEBUG, skip(self, batch))]
3399 async fn append_batch(
3400 &self,
3401 current_time: DateTime<Utc>,
3402 batch: Vec<AppendRequest>,
3403 execution_id: ExecutionId,
3404 version: Version,
3405 ) -> Result<AppendBatchResponse, DbErrorWrite> {
3406 debug!("append_batch");
3407 trace!(?batch, "append_batch");
3408 assert!(!batch.is_empty(), "Empty batch request");
3409
3410 let mut client_guard = self.client.lock().await;
3411 let tx = client_guard.transaction().await?;
3412
3413 let mut version = version;
3414 let mut notifier = None;
3415
3416 for append_request in batch {
3417 let (v, n) = append(&tx, &execution_id, append_request, version).await?;
3418 version = v;
3419 notifier = Some(n);
3420 }
3421
3422 tx.commit().await?;
3423 drop(client_guard);
3424
3425 self.notify_all(
3426 vec![notifier.expect("checked that the batch is not empty")],
3427 current_time,
3428 );
3429 Ok(version)
3430 }
3431
3432 #[instrument(level = Level::DEBUG, skip_all, fields(%execution_id, %version))]
3433 async fn append_batch_create_new_execution(
3434 &self,
3435 current_time: DateTime<Utc>,
3436 batch: Vec<AppendRequest>,
3437 execution_id: ExecutionId,
3438 version: Version,
3439 child_req: Vec<CreateRequest>,
3440 backtraces: Vec<BacktraceInfo>,
3441 ) -> Result<AppendBatchResponse, DbErrorWrite> {
3442 debug!("append_batch_create_new_execution");
3443 trace!(?batch, ?child_req, "append_batch_create_new_execution");
3444 assert!(!batch.is_empty(), "Empty batch request");
3445
3446 let mut client_guard = self.client.lock().await;
3447 let tx = client_guard.transaction().await?;
3448
3449 let mut version = version;
3450 let mut notifier = None;
3451
3452 for append_request in batch {
3453 let (v, n) = append(&tx, &execution_id, append_request, version).await?;
3454 version = v;
3455 notifier = Some(n);
3456 }
3457
3458 let mut notifiers = Vec::new();
3459 notifiers.push(notifier.expect("checked that the batch is not empty"));
3460
3461 for req in child_req {
3462 let (_, n) = create_inner(&tx, req).await?;
3463 notifiers.push(n);
3464 }
3465 for backtrace in backtraces {
3466 append_backtrace(&tx, &backtrace).await?;
3467 }
3468 tx.commit().await?;
3469 drop(client_guard);
3470
3471 self.notify_all(notifiers, current_time);
3472 Ok(version)
3473 }
3474
3475 #[instrument(level = Level::DEBUG, skip(self, timeout_fut))]
3476 async fn subscribe_to_next_responses(
3477 &self,
3478 execution_id: &ExecutionId,
3479 last_response: ResponseCursor,
3480 timeout_fut: Pin<Box<dyn Future<Output = TimeoutOutcome> + Send>>,
3481 ) -> Result<Vec<ResponseWithCursor>, DbErrorReadWithTimeout> {
3482 debug!("next_responses");
3483 let unique_tag: u64 = rand::random();
3484 let execution_id_clone = execution_id.clone();
3485
3486 let cleanup = || {
3487 let mut guard = self.response_subscribers.lock().unwrap();
3488 match guard.remove(&execution_id_clone) {
3489 Some((_, tag)) if tag == unique_tag => {}
3490 Some(other) => {
3491 guard.insert(execution_id_clone.clone(), other);
3492 }
3493 None => {}
3494 }
3495 };
3496
3497 let receiver = {
3498 let mut client_guard = self.client.lock().await;
3499 let tx = client_guard.transaction().await?;
3500
3501 let (sender, receiver) = oneshot::channel();
3507 self.response_subscribers
3508 .lock()
3509 .unwrap()
3510 .insert(execution_id.clone(), (sender, unique_tag));
3511
3512 let responses = get_responses_after(&tx, execution_id, last_response).await?;
3513
3514 if responses.is_empty() {
3515 tx.commit().await.map_err(|err| {
3517 cleanup(); DbErrorRead::from(err)
3519 })?;
3520 receiver
3521 } else {
3522 cleanup(); tx.commit().await?;
3524 return Ok(responses);
3525 }
3526 };
3527
3528 let res = tokio::select! {
3529 resp = receiver => {
3530 match resp {
3531 Ok(resp) => Ok(vec![resp]),
3532 Err(_) => Err(DbErrorReadWithTimeout::from(DbErrorGeneric::Close)),
3533 }
3534 }
3535 outcome = timeout_fut => Err(DbErrorReadWithTimeout::Timeout(outcome)),
3536 };
3537
3538 cleanup();
3539 res
3540 }
3541
3542 #[instrument(level = Level::DEBUG, skip(self, timeout_fut))]
3543 async fn wait_for_finished_result(
3544 &self,
3545 execution_id: &ExecutionId,
3546 timeout_fut: Option<Pin<Box<dyn Future<Output = TimeoutOutcome> + Send>>>,
3547 ) -> Result<SupportedFunctionReturnValue, DbErrorReadWithTimeout> {
3548 let unique_tag: u64 = rand::random();
3549 let execution_id_clone = execution_id.clone();
3550
3551 let cleanup = || {
3552 let mut guard = self.execution_finished_subscribers.lock().unwrap();
3553 if let Some(subscribers) = guard.get_mut(&execution_id_clone) {
3554 subscribers.remove(&unique_tag);
3555 }
3556 };
3557
3558 let receiver = {
3559 let mut client_guard = self.client.lock().await;
3560 let tx = client_guard.transaction().await?;
3561
3562 let (sender, receiver) = oneshot::channel();
3564 {
3565 let mut guard = self.execution_finished_subscribers.lock().unwrap();
3566 guard
3567 .entry(execution_id.clone())
3568 .or_default()
3569 .insert(unique_tag, sender);
3570 }
3571
3572 let pending_state = get_combined_state(&tx, execution_id)
3573 .await?
3574 .execution_with_state
3575 .pending_state;
3576
3577 if let PendingState::Finished(finished) = pending_state {
3578 let event = get_execution_event(&tx, execution_id, finished.version).await?;
3579 tx.commit().await?;
3580 cleanup();
3581
3582 if let ExecutionRequest::Finished { retval, .. } = event.event {
3583 return Ok(retval);
3584 }
3585 error!("Mismatch, expected Finished row: {event:?} based on t_state {finished}");
3586 return Err(DbErrorReadWithTimeout::from(consistency_db_err(
3587 "cannot get finished event based on t_state version",
3588 )));
3589 }
3590 tx.commit().await?;
3591 receiver
3592 };
3593
3594 let timeout_fut = timeout_fut.unwrap_or_else(|| Box::pin(std::future::pending()));
3595 let res = tokio::select! {
3596 resp = receiver => {
3597 match resp {
3598 Ok(retval) => Ok(retval),
3599 Err(_recv_err) => Err(DbErrorGeneric::Close.into())
3600 }
3601 }
3602 outcome = timeout_fut => Err(DbErrorReadWithTimeout::Timeout(outcome)),
3603 };
3604
3605 cleanup();
3606 res
3607 }
3608
3609 #[instrument(level = Level::DEBUG, skip_all, fields(%join_set_id, %execution_id))]
3610 async fn append_delay_response(
3611 &self,
3612 created_at: DateTime<Utc>,
3613 execution_id: ExecutionId,
3614 join_set_id: JoinSetId,
3615 delay_id: DelayId,
3616 result: Result<(), ()>,
3617 ) -> Result<AppendDelayResponseOutcome, DbErrorWrite> {
3618 debug!("append_delay_response");
3619 let event = JoinSetResponseEventOuter {
3620 created_at,
3621 event: JoinSetResponseEvent {
3622 join_set_id,
3623 event: JoinSetResponse::DelayFinished {
3624 delay_id: delay_id.clone(),
3625 result,
3626 },
3627 },
3628 };
3629
3630 let mut client_guard = self.client.lock().await;
3631 let tx = client_guard.transaction().await?;
3632
3633 let res = append_response(&tx, &execution_id, event).await;
3634
3635 match res {
3636 Ok(notifier) => {
3637 tx.commit().await?;
3638 drop(client_guard);
3639 self.notify_all(vec![notifier], created_at);
3640 Ok(AppendDelayResponseOutcome::Success)
3641 }
3642 Err(DbErrorWrite::NonRetriable(DbErrorWriteNonRetriable::Conflict)) => {
3643 tx.rollback().await?;
3646
3647 let tx = client_guard.transaction().await?;
3649 let delay_success = delay_response(&tx, &execution_id, &delay_id).await?;
3650 tx.commit().await?;
3651
3652 match delay_success {
3653 Some(true) => Ok(AppendDelayResponseOutcome::AlreadyFinished),
3654 Some(false) => Ok(AppendDelayResponseOutcome::AlreadyCancelled),
3655 None => Err(DbErrorWrite::Generic(consistency_db_err(
3656 "insert failed yet select did not find the response",
3657 ))),
3658 }
3659 }
3660 Err(err) => {
3661 let _ = tx.rollback().await; Err(err)
3663 }
3664 }
3665 }
3666
3667 #[instrument(level = Level::DEBUG, skip_all)]
3668 async fn append_backtrace(&self, append: BacktraceInfo) -> Result<(), DbErrorWrite> {
3669 debug!("append_backtrace");
3670 let mut client_guard = self.client.lock().await;
3671 let tx = client_guard.transaction().await?;
3672
3673 append_backtrace(&tx, &append).await?;
3674
3675 tx.commit().await?;
3676 Ok(())
3677 }
3678
3679 #[instrument(level = Level::DEBUG, skip_all)]
3680 async fn append_backtrace_batch(&self, batch: Vec<BacktraceInfo>) -> Result<(), DbErrorWrite> {
3681 debug!("append_backtrace_batch");
3682 let mut client_guard = self.client.lock().await;
3683 let tx = client_guard.transaction().await?;
3684
3685 for append in batch {
3686 append_backtrace(&tx, &append).await?;
3687 }
3688
3689 tx.commit().await?;
3690 Ok(())
3691 }
3692
3693 #[instrument(level = Level::DEBUG, skip_all)]
3694 async fn append_log(&self, row: LogInfoAppendRow) -> Result<(), DbErrorWrite> {
3695 trace!("append_log");
3696 let mut client_guard = self.client.lock().await;
3697 let tx = client_guard.transaction().await?;
3698 append_log(&tx, &row).await?;
3699 tx.commit().await?;
3700
3701 Ok(())
3702 }
3703
3704 #[instrument(level = Level::DEBUG, skip_all)]
3705 async fn append_log_batch(&self, batch: &[LogInfoAppendRow]) -> Result<(), DbErrorWrite> {
3706 trace!("append_log_batch");
3707 let mut client_guard = self.client.lock().await;
3708 let tx = client_guard.transaction().await?;
3709 for row in batch {
3710 append_log(&tx, row).await?;
3711 }
3712 tx.commit().await?;
3713 Ok(())
3714 }
3715
3716 #[instrument(level = Level::TRACE, skip(self))]
3718 async fn get_expired_timers(
3719 &self,
3720 at: DateTime<Utc>,
3721 ) -> Result<Vec<ExpiredTimer>, DbErrorGeneric> {
3722 let mut client_guard = self.client.lock().await;
3723 let tx = client_guard.transaction().await?;
3724
3725 let rows = tx
3727 .query(
3728 "SELECT execution_id, join_set_id, delay_id FROM t_delay WHERE expires_at <= $1 AND NOT is_paused",
3729 &[&at],
3730 )
3731 .await?;
3732
3733 let mut expired_timers = Vec::with_capacity(rows.len());
3734 for row in rows {
3735 let unpack = || -> Result<ExpiredTimer, DbErrorGeneric> {
3736 let execution_id: String = get(&row, "execution_id")?;
3737 let execution_id = ExecutionId::from_str(&execution_id)?;
3738 let join_set_id: String = get(&row, "join_set_id")?;
3739 let join_set_id = JoinSetId::from_str(&join_set_id)?;
3740 let delay_id: String = get(&row, "delay_id")?;
3741 let delay_id = DelayId::from_str(&delay_id)?;
3742
3743 Ok(ExpiredTimer::Delay(ExpiredDelay {
3744 execution_id,
3745 join_set_id,
3746 delay_id,
3747 }))
3748 };
3749
3750 match unpack() {
3751 Ok(timer) => expired_timers.push(timer),
3752 Err(err) => warn!("Skipping corrupted row in get_expired_timers (delays): {err:?}"),
3753 }
3754 }
3755
3756 let rows = tx.query(
3758 &format!(
3759 "SELECT execution_id, last_lock_version, corresponding_version, intermittent_event_count, max_retries, retry_exp_backoff_millis, executor_id, run_id, is_paused \
3760 FROM t_state \
3761 WHERE pending_expires_finished <= $1 AND state = '{STATE_LOCKED}'"
3762 ),
3763 &[&at]
3764 ).await?;
3765
3766 for row in rows {
3767 let unpack = || -> Result<Option<ExpiredTimer>, DbErrorGeneric> {
3768 let execution_id: String = get(&row, "execution_id")?;
3769 let execution_id = ExecutionId::from_str(&execution_id)?;
3770 let is_paused: bool = get(&row, "is_paused")?;
3771 if is_paused {
3772 error!(%execution_id, "encountered invalid paused locked execution while scanning expired locks");
3773 return Ok(None);
3774 }
3775 let last_lock_version: i64 = get(&row, "last_lock_version")?;
3776 let last_lock_version = Version::try_from(last_lock_version)?;
3777
3778 let corresponding_version: i64 = get(&row, "corresponding_version")?;
3779 let corresponding_version = Version::try_from(corresponding_version)?;
3780
3781 let intermittent_event_count =
3782 u32::try_from(get::<i64, _>(&row, "intermittent_event_count")?).map_err(
3783 |_| consistency_db_err("`intermittent_event_count` must not be negative"),
3784 )?;
3785
3786 let max_retries = get::<Option<i64>, _>(&row, "max_retries")?
3787 .map(u32::try_from)
3788 .transpose()
3789 .map_err(|_| consistency_db_err("`max_retries` must not be negative"))?;
3790 let retry_exp_backoff_millis =
3791 u32::try_from(get::<i64, _>(&row, "retry_exp_backoff_millis")?).map_err(
3792 |_| consistency_db_err("`retry_exp_backoff_millis` must not be negative"),
3793 )?;
3794 let executor_id: String = get(&row, "executor_id")?;
3795 let executor_id = ExecutorId::from_str(&executor_id)?;
3796 let run_id: String = get(&row, "run_id")?;
3797 let run_id = RunId::from_str(&run_id)?;
3798
3799 Ok(Some(ExpiredTimer::Lock(ExpiredLock {
3800 execution_id,
3801 locked_at_version: last_lock_version,
3802 next_version: corresponding_version.increment(),
3803 intermittent_event_count,
3804 max_retries,
3805 retry_exp_backoff: Duration::from_millis(u64::from(retry_exp_backoff_millis)),
3806 locked_by: LockedBy {
3807 executor_id,
3808 run_id,
3809 },
3810 })))
3811 };
3812
3813 match unpack() {
3814 Ok(Some(timer)) => expired_timers.push(timer),
3815 Ok(None) => {}
3816 Err(err) => warn!("Skipping corrupted row in get_expired_timers (locks): {err:?}"),
3817 }
3818 }
3819
3820 tx.commit().await?;
3821
3822 if !expired_timers.is_empty() {
3823 debug!("get_expired_timers found {expired_timers:?}");
3824 }
3825 Ok(expired_timers)
3826 }
3827
3828 async fn get_execution_event(
3829 &self,
3830 execution_id: &ExecutionId,
3831 version: &Version,
3832 ) -> Result<ExecutionEvent, DbErrorRead> {
3833 let mut client_guard = self.client.lock().await;
3834 let tx = client_guard.transaction().await?;
3835
3836 let event = get_execution_event(&tx, execution_id, version.0).await?;
3837
3838 tx.commit().await?;
3839 Ok(event)
3840 }
3841
3842 #[instrument(level = Level::DEBUG, skip_all)]
3843 async fn upsert_stub_response(
3844 &self,
3845 execution_id: ExecutionIdDerived,
3846 version: Version,
3847 req: AppendRequest,
3848 response: AppendResponseToExecution,
3849 current_time: DateTime<Utc>,
3850 ) -> Result<(), DbErrorStubResponse> {
3851 debug!("upsert_stub_response");
3852 #[cfg(debug_assertions)]
3853 {
3854 let (expected_parent, expected_join_set) = execution_id.split_to_parts();
3855 debug_assert_eq!(expected_parent, response.parent_execution_id);
3856 debug_assert_eq!(expected_join_set, response.join_set_id);
3857 debug_assert_eq!(execution_id, response.child_execution_id);
3858 }
3859 let execution_id = ExecutionId::Derived(execution_id);
3860 let expected_retval = response.result.clone();
3861 let version_for_read = version.0;
3862
3863 let mut client_guard = self.client.lock().await;
3864 let tx = client_guard.transaction().await?;
3865
3866 let notifiers = match append(&tx, &execution_id, req, version).await {
3867 Ok((_next_version, notifier_of_child)) => {
3868 let pending_at_parent = append_response(
3869 &tx,
3870 &response.parent_execution_id,
3871 JoinSetResponseEventOuter {
3872 created_at: response.created_at,
3873 event: JoinSetResponseEvent {
3874 join_set_id: response.join_set_id,
3875 event: JoinSetResponse::ChildExecutionFinished {
3876 child_execution_id: response.child_execution_id,
3877 finished_version: response.finished_version,
3878 result: response.result,
3879 },
3880 },
3881 },
3882 )
3883 .await
3884 .map_err(DbErrorStubResponse::Write)?;
3885 Some(vec![notifier_of_child, pending_at_parent])
3886 }
3887 Err(DbErrorWrite::NonRetriable(DbErrorWriteNonRetriable::AlreadyFinished)) => {
3888 let found = get_execution_event(&tx, &execution_id, version_for_read)
3890 .await
3891 .map_err(|_| DbErrorStubResponse::StubConflict)?;
3892 match found.event {
3893 ExecutionRequest::Finished { retval, .. } if retval == expected_retval => None,
3894 _ => return Err(DbErrorStubResponse::StubConflict),
3895 }
3896 }
3897 Err(other) => return Err(DbErrorStubResponse::Write(other)),
3898 };
3899 tx.commit().await?;
3900 drop(client_guard);
3901 if let Some(notifiers) = notifiers {
3902 self.notify_all(notifiers, current_time);
3903 }
3904 Ok(())
3905 }
3906
3907 async fn get_pending_state(
3908 &self,
3909 execution_id: &ExecutionId,
3910 ) -> Result<ExecutionWithState, DbErrorRead> {
3911 let mut client_guard = self.client.lock().await;
3912 let tx = client_guard.transaction().await?;
3913
3914 let combined_state = get_combined_state(&tx, execution_id).await?;
3915
3916 tx.commit().await?;
3917 Ok(combined_state.execution_with_state)
3918 }
3919}
3920
3921#[async_trait]
3922impl DbExternalApi for PostgresConnection {
3923 #[instrument(skip(self))]
3924 async fn get_backtrace(
3925 &self,
3926 execution_id: &ExecutionId,
3927 filter: BacktraceFilter,
3928 ) -> Result<BacktraceInfo, DbErrorRead> {
3929 debug!("get_backtrace");
3930
3931 let mut client_guard = self.client.lock().await;
3932 let tx = client_guard.transaction().await?;
3933
3934 let mut params: Vec<Box<dyn tokio_postgres::types::ToSql + Sync + Send>> = Vec::new();
3935
3936 params.push(Box::new(execution_id.to_string())); let p_execution_id_idx = format!("${}", params.len()); let mut sql = String::new();
3940 write!(
3941 &mut sql,
3942 "SELECT component_id, version_min_including, version_max_excluding, wasm_backtrace \
3943 FROM t_execution_backtrace e INNER JOIN t_wasm_backtrace w ON e.backtrace_hash = w.backtrace_hash \
3944 WHERE execution_id = {p_execution_id_idx}"
3945 )
3946 .unwrap();
3947
3948 match &filter {
3949 BacktraceFilter::Specific(version) => {
3950 params.push(Box::new(i64::from(version.0))); let p_ver_idx = format!("${}", params.len()); write!(
3953 &mut sql,
3954 " AND version_min_including <= {p_ver_idx} AND version_max_excluding > {p_ver_idx}"
3955 )
3956 .unwrap();
3957 }
3958 BacktraceFilter::First => {
3959 sql.push_str(" ORDER BY version_min_including LIMIT 1");
3960 }
3961 BacktraceFilter::Last => {
3962 sql.push_str(" ORDER BY version_min_including DESC LIMIT 1");
3963 }
3964 }
3965
3966 let params_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> =
3967 params.iter().map(|p| p.as_ref() as _).collect();
3968
3969 let row = tx.query_one(&sql, ¶ms_refs).await?;
3970
3971 let component_id: Json<ComponentId> = get(&row, "component_id")?;
3972 let component_id = component_id.0;
3973
3974 let version_min_including =
3975 Version::try_from(get::<i64, _>(&row, "version_min_including")?)?;
3976
3977 let version_max_excluding =
3978 Version::try_from(get::<i64, _>(&row, "version_max_excluding")?)?;
3979
3980 let wasm_backtrace: Json<WasmBacktrace> = get(&row, "wasm_backtrace")?;
3982 let wasm_backtrace = wasm_backtrace.0;
3983
3984 tx.commit().await?;
3985
3986 Ok(BacktraceInfo {
3987 execution_id: execution_id.clone(),
3988 component_id,
3989 version_min_including,
3990 version_max_excluding,
3991 wasm_backtrace,
3992 })
3993 }
3994
3995 #[instrument(skip_all)]
3996 async fn upsert_source_file(
3997 &self,
3998 component_digest: &ComponentDigest,
3999 frame_key: &str,
4000 is_suffix: bool,
4001 content: &str,
4002 ) -> Result<(), DbErrorWrite> {
4003 let content_hash: [u8; 32] = Sha256::digest(content.as_bytes()).into();
4004 let mut client_guard = self.client.lock().await;
4005 let tx = client_guard.transaction().await?;
4006 tx.execute(
4007 "INSERT INTO t_source_file (content_hash, content) \
4008 VALUES ($1, $2) \
4009 ON CONFLICT (content_hash) DO NOTHING",
4010 &[&content_hash.as_slice(), &content],
4011 )
4012 .await?;
4013 tx.execute(
4014 "INSERT INTO t_component_source \
4015 (component_digest, frame_key, is_suffix, content_hash) \
4016 VALUES ($1, $2, $3, $4) \
4017 ON CONFLICT (component_digest, frame_key, is_suffix) DO NOTHING",
4018 &[
4019 &component_digest.as_slice(),
4020 &frame_key,
4021 &is_suffix,
4022 &content_hash.as_slice(),
4023 ],
4024 )
4025 .await?;
4026 tx.commit().await?;
4027 Ok(())
4028 }
4029
4030 #[instrument(skip_all)]
4031 async fn get_source_file(
4032 &self,
4033 component_digest: &ComponentDigest,
4034 file: &str,
4035 ) -> Result<Option<String>, DbErrorRead> {
4036 let mut client_guard = self.client.lock().await;
4037 let tx = client_guard.transaction().await?;
4038 let rows = tx
4039 .query(
4040 "SELECT s.content \
4041 FROM t_component_source cs \
4042 JOIN t_source_file s ON cs.content_hash = s.content_hash \
4043 WHERE cs.component_digest = $1 \
4044 AND ( \
4045 (NOT cs.is_suffix AND cs.frame_key = $2) \
4046 OR (cs.is_suffix AND right($2, length(cs.frame_key)) = cs.frame_key) \
4047 )",
4048 &[&component_digest.as_slice(), &file],
4049 )
4050 .await?;
4051 tx.commit().await?;
4052 match rows.len() {
4053 0 => Ok(None),
4054 1 => Ok(Some(get::<String, _>(&rows[0], "content")?)),
4055 _ => {
4056 warn!("Multiple suffix matches for '{file}', returning None");
4057 Ok(None)
4058 }
4059 }
4060 }
4061
4062 #[instrument(skip_all)]
4063 async fn upsert_component_metadata(
4064 &self,
4065 records: Vec<ComponentMetadataRecord>,
4066 ) -> Result<(), DbErrorWrite> {
4067 let mut client_guard = self.client.lock().await;
4068 let tx = client_guard.transaction().await?;
4069 for record in records {
4070 tx.execute(
4071 "INSERT INTO t_component_metadata \
4072 (component_digest, imports_json, exports_json, wit, wit_origin) \
4073 VALUES ($1, $2, $3, $4, $5) \
4074 ON CONFLICT (component_digest) DO NOTHING",
4075 &[
4076 &record.component_digest.as_slice(),
4077 &Json(&record.imports),
4078 &Json(&record.exports),
4079 &record.wit,
4080 &record.wit_origin,
4081 ],
4082 )
4083 .await?;
4084 }
4085 tx.commit().await?;
4086 Ok(())
4087 }
4088
4089 #[instrument(skip_all)]
4090 async fn insert_deployment_components(
4091 &self,
4092 deployment_id: DeploymentId,
4093 records: Vec<DeploymentComponentRecord>,
4094 ) -> Result<(), DbErrorWrite> {
4095 let mut client_guard = self.client.lock().await;
4096 let tx = client_guard.transaction().await?;
4097 for record in records {
4098 debug_assert_eq!(record.deployment_id, deployment_id);
4099 tx.execute(
4100 "INSERT INTO t_deployment_component \
4101 (deployment_id, component_name, component_type, component_digest) \
4102 VALUES ($1, $2, $3, $4) \
4103 ON CONFLICT (deployment_id, component_name) DO NOTHING",
4104 &[
4105 &deployment_id.to_string(),
4106 &record.component_name.to_string(),
4107 &record.component_type.to_string(),
4108 &record.component_digest.as_slice(),
4109 ],
4110 )
4111 .await?;
4112 }
4113 tx.commit().await?;
4114 Ok(())
4115 }
4116
4117 #[instrument(skip_all)]
4118 async fn list_deployment_components(
4119 &self,
4120 deployment_id: DeploymentId,
4121 ) -> Result<Vec<DeploymentComponentDetail>, DbErrorRead> {
4122 let mut client_guard = self.client.lock().await;
4123 let tx = client_guard.transaction().await?;
4124 let rows = tx
4125 .query(
4126 "SELECT dc.component_name, dc.component_type, dc.component_digest, \
4127 cm.imports_json, cm.exports_json, cm.wit \
4128 FROM t_deployment_component dc \
4129 JOIN t_component_metadata cm ON dc.component_digest = cm.component_digest \
4130 WHERE dc.deployment_id = $1 \
4131 ORDER BY dc.component_type, dc.component_name",
4132 &[&deployment_id.to_string()],
4133 )
4134 .await?;
4135 tx.commit().await?;
4136 rows.iter()
4137 .map(deployment_component_detail_from_pg_row)
4138 .collect()
4139 }
4140
4141 #[instrument(skip_all)]
4142 async fn get_deployment_component_wit(
4143 &self,
4144 deployment_id: DeploymentId,
4145 component_digest: &ComponentDigest,
4146 ) -> Result<Option<String>, DbErrorRead> {
4147 let mut client_guard = self.client.lock().await;
4148 let tx = client_guard.transaction().await?;
4149 let row = tx
4150 .query_opt(
4151 "SELECT cm.wit \
4152 FROM t_deployment_component dc \
4153 JOIN t_component_metadata cm ON dc.component_digest = cm.component_digest \
4154 WHERE dc.deployment_id = $1 AND dc.component_digest = $2 \
4155 LIMIT 1",
4156 &[&deployment_id.to_string(), &component_digest.as_slice()],
4157 )
4158 .await?;
4159 tx.commit().await?;
4160 row.map(|row| get::<String, _>(&row, "wit").map_err(DbErrorRead::Generic))
4161 .transpose()
4162 }
4163
4164 #[instrument(skip(self))]
4165 async fn list_executions(
4166 &self,
4167 filter: ListExecutionsFilter,
4168 pagination: ExecutionListPagination,
4169 ) -> Result<Vec<ExecutionWithState>, DbErrorGeneric> {
4170 let mut client_guard = self.client.lock().await;
4171 let tx = client_guard.transaction().await?;
4172
4173 let result = list_executions(&tx, filter, &pagination).await?;
4174
4175 tx.commit().await?;
4176 Ok(result)
4177 }
4178
4179 #[instrument(skip(self))]
4180 async fn list_execution_events(
4181 &self,
4182 execution_id: &ExecutionId,
4183 pagination: Pagination<VersionType>,
4184 include_backtrace_id: bool,
4185 ) -> Result<ListExecutionEventsResponse, DbErrorRead> {
4186 let mut client_guard = self.client.lock().await;
4187 let tx = client_guard.transaction().await?;
4188
4189 let events =
4190 list_execution_events(&tx, execution_id, pagination, include_backtrace_id).await?;
4191 let max_version = get_max_version(&tx, execution_id).await?;
4192
4193 tx.commit().await?;
4194 Ok(ListExecutionEventsResponse {
4195 events,
4196 max_version,
4197 })
4198 }
4199
4200 #[instrument(skip(self))]
4201 async fn list_responses(
4202 &self,
4203 execution_id: &ExecutionId,
4204 pagination: Pagination<u32>,
4205 ) -> Result<ListResponsesResponse, DbErrorRead> {
4206 let mut client_guard = self.client.lock().await;
4207 let tx = client_guard.transaction().await?;
4208
4209 let responses = list_responses(&tx, execution_id, Some(pagination)).await?;
4210 let max_cursor = get_max_response_cursor(&tx, execution_id).await?;
4211
4212 tx.commit().await?;
4213 Ok(ListResponsesResponse {
4214 responses,
4215 max_cursor,
4216 })
4217 }
4218
4219 #[instrument(skip(self))]
4220 async fn list_execution_events_responses(
4221 &self,
4222 execution_id: &ExecutionId,
4223 req_since: &Version,
4224 req_max_length: VersionType,
4225 req_include_backtrace_id: bool,
4226 resp_pagination: Pagination<u32>,
4227 ) -> Result<ExecutionWithStateRequestsResponses, DbErrorRead> {
4228 let mut client_guard = self.client.lock().await;
4229 let tx = client_guard.transaction().await?;
4230
4231 let combined_state = get_combined_state(&tx, execution_id).await?;
4232
4233 let events = list_execution_events(
4234 &tx,
4235 execution_id,
4236 Pagination::NewerThan {
4237 length: req_max_length
4238 .try_into()
4239 .expect("req_max_length fits in u16"),
4240 cursor: req_since.0,
4241 including_cursor: true,
4242 },
4243 req_include_backtrace_id,
4244 )
4245 .await?;
4246
4247 let responses = list_responses(&tx, execution_id, Some(resp_pagination)).await?;
4248 let max_version = get_max_version(&tx, execution_id).await?;
4249 let max_cursor = get_max_response_cursor(&tx, execution_id).await?;
4250
4251 tx.commit().await?;
4252
4253 Ok(ExecutionWithStateRequestsResponses {
4254 execution_with_state: combined_state.execution_with_state,
4255 events,
4256 responses,
4257 max_version,
4258 max_cursor,
4259 })
4260 }
4261
4262 #[instrument(skip(self))]
4263 async fn upgrade_execution_component(
4264 &self,
4265 execution_id: &ExecutionId,
4266 old: &ComponentDigest,
4267 new: &ComponentDigest,
4268 ) -> Result<(), DbErrorWrite> {
4269 let mut client_guard = self.client.lock().await;
4270 let tx = client_guard.transaction().await?;
4271
4272 upgrade_execution_component(&tx, execution_id, old, new).await?;
4273
4274 tx.commit().await?;
4275 Ok(())
4276 }
4277
4278 #[instrument(skip(self))]
4279 async fn list_logs(
4280 &self,
4281 execution_id: &ExecutionId,
4282 show_derived: bool,
4283 filter: LogFilter,
4284 pagination: Pagination<DateTime<Utc>>,
4285 ) -> Result<ListLogsResponse, DbErrorRead> {
4286 let mut client_guard = self.client.lock().await;
4287 let tx = client_guard.transaction().await?;
4288 let responses = list_logs_tx(&tx, execution_id, show_derived, &filter, &pagination).await?;
4289 tx.commit().await?;
4290 Ok(responses)
4291 }
4292
4293 #[instrument(skip(self))]
4294 async fn list_deployment_states(
4295 &self,
4296 current_time: DateTime<Utc>,
4297 pagination: Pagination<Option<DeploymentId>>,
4298 include_config_json: bool,
4299 ) -> Result<Vec<DeploymentState>, DbErrorRead> {
4300 let mut client_guard = self.client.lock().await;
4301 let tx = client_guard.transaction().await?;
4302 let deployments =
4303 list_deployment_states(&tx, current_time, pagination, include_config_json).await?;
4304 tx.commit().await?;
4305 Ok(deployments)
4306 }
4307
4308 #[instrument(skip(self))]
4309 async fn insert_deployment(&self, record: DeploymentRecord) -> Result<(), DbErrorWrite> {
4310 assert_eq!(
4311 record.status,
4312 DeploymentStatus::Inactive,
4313 "insert_deployment requires Inactive status"
4314 );
4315 assert!(
4316 record.last_active_at.is_none(),
4317 "insert_deployment requires last_active_at == None"
4318 );
4319 let mut client_guard = self.client.lock().await;
4320 let tx = client_guard.transaction().await?;
4321 tx.execute(
4322 "INSERT INTO t_deployment \
4323 (deployment_id, created_at, status, config_json, obelisk_version, created_by) \
4324 VALUES ($1, $2, $3, $4, $5, $6)",
4325 &[
4326 &record.deployment_id.to_string(), &record.created_at, &record.status.as_str(), &record.config_json, &record.obelisk_version, &record.created_by, ],
4333 )
4334 .await?;
4335 tx.commit().await?;
4336 Ok(())
4337 }
4338
4339 #[instrument(skip(self))]
4340 async fn activate_deployment(
4341 &self,
4342 deployment_id: DeploymentId,
4343 now: DateTime<Utc>,
4344 ) -> Result<(), DbErrorWrite> {
4345 let mut client_guard = self.client.lock().await;
4346 let tx = client_guard.transaction().await?;
4347 tx.execute(
4349 "UPDATE t_deployment SET status = 'inactive' WHERE status IN ('active', 'enqueued')",
4350 &[],
4351 )
4352 .await?;
4353 let rows = tx
4355 .execute(
4356 "UPDATE t_deployment SET status = 'active', last_active_at = $1 WHERE deployment_id = $2",
4357 &[&now, &deployment_id.to_string()],
4358 )
4359 .await?;
4360 tx.commit().await?;
4361 if rows == 0 {
4362 return Err(DbErrorWrite::NotFound);
4363 }
4364 Ok(())
4365 }
4366
4367 async fn enqueue_deployment(&self, deployment_id: DeploymentId) -> Result<(), DbErrorWrite> {
4368 let mut client_guard = self.client.lock().await;
4369 let tx = client_guard.transaction().await?;
4370 let status_opt = tx
4372 .query_opt(
4373 "SELECT status FROM t_deployment WHERE deployment_id = $1",
4374 &[&deployment_id.to_string()],
4375 )
4376 .await?;
4377 match status_opt.as_ref().map(|r| r.get::<_, &str>("status")) {
4378 None => return Err(DbErrorWrite::NotFound),
4379 Some("active") => return Err(DbErrorWriteNonRetriable::Conflict.into()),
4380 _ => {}
4381 }
4382 tx.execute(
4384 "UPDATE t_deployment SET status = 'inactive' WHERE status = 'enqueued'",
4385 &[],
4386 )
4387 .await?;
4388 let rows = tx
4390 .execute(
4391 "UPDATE t_deployment SET status = 'enqueued' WHERE deployment_id = $1",
4392 &[&deployment_id.to_string()],
4393 )
4394 .await?;
4395 tx.commit().await?;
4396 if rows == 0 {
4397 return Err(DbErrorWrite::NotFound);
4398 }
4399 Ok(())
4400 }
4401
4402 #[instrument(skip(self))]
4403 async fn get_deployment(
4404 &self,
4405 deployment_id: DeploymentId,
4406 ) -> Result<Option<DeploymentRecord>, DbErrorRead> {
4407 let mut client_guard = self.client.lock().await;
4408 let tx = client_guard.transaction().await?;
4409 let row = tx
4410 .query_opt(
4411 "SELECT deployment_id, created_at, last_active_at, status, config_json, obelisk_version, created_by \
4412 FROM t_deployment WHERE deployment_id = $1",
4413 &[&deployment_id.to_string()],
4414 )
4415 .await?;
4416 tx.commit().await?;
4417 match row {
4418 None => Ok(None),
4419 Some(r) => Ok(Some(deployment_record_from_pg_row(&r)?)),
4420 }
4421 }
4422
4423 #[instrument(skip(self))]
4424 #[cfg(feature = "test")]
4425 async fn get_active_deployment(&self) -> Result<Option<DeploymentRecord>, DbErrorRead> {
4426 let mut client_guard = self.client.lock().await;
4427 let tx = client_guard.transaction().await?;
4428 let row = tx
4429 .query_opt(
4430 "SELECT deployment_id, created_at, last_active_at, status, config_json, obelisk_version, created_by \
4431 FROM t_deployment WHERE status = 'active' LIMIT 1",
4432 &[],
4433 )
4434 .await?;
4435 tx.commit().await?;
4436 match row {
4437 None => Ok(None),
4438 Some(r) => Ok(Some(deployment_record_from_pg_row(&r)?)),
4439 }
4440 }
4441
4442 async fn get_current_deployment(&self) -> Result<Option<DeploymentRecord>, DbErrorRead> {
4443 let mut client_guard = self.client.lock().await;
4444 let tx = client_guard.transaction().await?;
4445 let row = tx
4446 .query_opt(
4447 "SELECT deployment_id, created_at, last_active_at, status, config_json, obelisk_version, created_by \
4448 FROM t_deployment WHERE status IN ('enqueued', 'active') \
4449 ORDER BY CASE status WHEN 'enqueued' THEN 0 ELSE 1 END LIMIT 1",
4450 &[],
4451 )
4452 .await?;
4453 tx.commit().await?;
4454 match row {
4455 None => Ok(None),
4456 Some(r) => Ok(Some(deployment_record_from_pg_row(&r)?)),
4457 }
4458 }
4459
4460 #[instrument(skip(self))]
4461 async fn list_deployments(
4462 &self,
4463 pagination: Pagination<Option<DeploymentId>>,
4464 ) -> Result<Vec<DeploymentRecord>, DbErrorRead> {
4465 let mut client_guard = self.client.lock().await;
4466 let tx = client_guard.transaction().await?;
4467 let mut params: Vec<Box<dyn tokio_postgres::types::ToSql + Sync + Send>> = Vec::new();
4468 let mut add_param = |p: Box<dyn tokio_postgres::types::ToSql + Sync + Send>| {
4469 params.push(p);
4470 format!("${}", params.len())
4471 };
4472
4473 let mut sql = String::from(
4474 "SELECT deployment_id, created_at, last_active_at, status, config_json, obelisk_version, created_by \
4475 FROM t_deployment",
4476 );
4477
4478 if let Some(cursor) = pagination.cursor() {
4479 let p_cursor = add_param(Box::new(cursor.to_string()));
4480 write!(
4481 sql,
4482 " WHERE deployment_id {rel} {p_cursor}",
4483 rel = pagination.rel()
4484 )
4485 .expect("writing to string");
4486 }
4487
4488 let (inner_order, outer_order) = if pagination.is_desc() {
4489 ("DESC", "")
4490 } else {
4491 ("ASC", "DESC")
4492 };
4493
4494 write!(
4495 sql,
4496 " ORDER BY deployment_id {inner_order} LIMIT {limit}",
4497 limit = pagination.length()
4498 )
4499 .expect("writing to string");
4500
4501 let final_sql = if outer_order.is_empty() {
4502 sql
4503 } else {
4504 format!("SELECT * FROM ({sql}) AS sub ORDER BY deployment_id {outer_order}")
4505 };
4506
4507 let params_refs: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> =
4508 params.iter().map(|p| p.as_ref() as _).collect();
4509
4510 let rows = tx.query(&final_sql, ¶ms_refs).await?;
4511 tx.commit().await?;
4512
4513 rows.iter()
4514 .map(deployment_record_from_pg_row)
4515 .collect::<Result<Vec<_>, _>>()
4516 }
4517
4518 #[instrument(skip(self))]
4519 async fn pause_execution(
4520 &self,
4521 execution_id: &ExecutionId,
4522 paused_at: DateTime<Utc>,
4523 ) -> Result<AppendResponse, DbErrorWrite> {
4524 let mut client_guard = self.client.lock().await;
4525 let tx = client_guard.transaction().await?;
4526
4527 let combined_state = get_combined_state(&tx, execution_id).await?;
4528 let appending_version = combined_state.get_next_version_fail_if_finished()?;
4529 debug!("Pausing with {appending_version}");
4530 let next_version = if matches!(
4531 combined_state.execution_with_state.pending_state,
4532 PendingState::Locked(_)
4533 ) {
4534 let (next_version, _notifier) = append(
4535 &tx,
4536 execution_id,
4537 AppendRequest {
4538 created_at: paused_at,
4539 event: ExecutionRequest::Unlocked {
4540 backoff_expires_at: paused_at,
4541 reason: StrVariant::Static("paused"),
4542 },
4543 },
4544 appending_version,
4545 )
4546 .await?;
4547 next_version
4548 } else {
4549 appending_version
4550 };
4551 let (next_version, _notifier) = append(
4552 &tx,
4553 execution_id,
4554 AppendRequest {
4555 created_at: paused_at,
4556 event: ExecutionRequest::Paused,
4557 },
4558 next_version,
4559 )
4560 .await?;
4561 tx.commit().await?;
4562 Ok(next_version)
4563 }
4564
4565 #[instrument(skip(self))]
4566 async fn unpause_execution(
4567 &self,
4568 execution_id: &ExecutionId,
4569 unpaused_at: DateTime<Utc>,
4570 ) -> Result<AppendResponse, DbErrorWrite> {
4571 let mut client_guard = self.client.lock().await;
4572 let tx = client_guard.transaction().await?;
4573
4574 let combined_state = get_combined_state(&tx, execution_id).await?;
4575 let appending_version = combined_state.get_next_version_fail_if_finished()?;
4576 debug!("Unpausing with {appending_version}");
4577 let (next_version, _) = append(
4578 &tx,
4579 execution_id,
4580 AppendRequest {
4581 created_at: unpaused_at,
4582 event: ExecutionRequest::Unpaused,
4583 },
4584 appending_version,
4585 )
4586 .await?;
4587
4588 tx.commit().await?;
4589 Ok(next_version)
4590 }
4591
4592 #[instrument(skip(self))]
4593 async fn pause_delay(&self, delay_id: &DelayId) -> Result<(), DbErrorWrite> {
4594 let (execution_id, join_set_id) = delay_id.split_to_parts();
4595 let client_guard = self.client.lock().await;
4596 let rows_modified = client_guard
4597 .execute(
4598 "UPDATE t_delay SET is_paused = TRUE \
4599 WHERE execution_id = $1 AND join_set_id = $2 AND delay_id = $3",
4600 &[
4601 &execution_id.to_string(),
4602 &join_set_id.to_string(),
4603 &delay_id.to_string(),
4604 ],
4605 )
4606 .await?;
4607 if rows_modified == 0 {
4608 return Err(DbErrorWrite::NotFound);
4609 }
4610 Ok(())
4611 }
4612
4613 #[instrument(skip(self))]
4614 async fn unpause_delay(&self, delay_id: &DelayId) -> Result<(), DbErrorWrite> {
4615 let (execution_id, join_set_id) = delay_id.split_to_parts();
4616 let client_guard = self.client.lock().await;
4617 let rows_modified = client_guard
4618 .execute(
4619 "UPDATE t_delay SET is_paused = FALSE \
4620 WHERE execution_id = $1 AND join_set_id = $2 AND delay_id = $3",
4621 &[
4622 &execution_id.to_string(),
4623 &join_set_id.to_string(),
4624 &delay_id.to_string(),
4625 ],
4626 )
4627 .await?;
4628 if rows_modified == 0 {
4629 return Err(DbErrorWrite::NotFound);
4630 }
4631 Ok(())
4632 }
4633}
4634
4635#[async_trait]
4636impl DbPoolCloseable for PostgresPool {
4637 async fn close(&self) {
4638 self.pool.close();
4639 }
4640}
4641
4642#[cfg(feature = "test")]
4643#[async_trait]
4644impl concepts::storage::DbConnectionTest for PostgresConnection {
4645 #[instrument(level = Level::DEBUG, skip(self, response_event), fields(join_set_id = %response_event.join_set_id))]
4646 async fn append_response(
4647 &self,
4648 created_at: DateTime<Utc>,
4649 execution_id: ExecutionId,
4650 response_event: JoinSetResponseEvent,
4651 ) -> Result<(), DbErrorWrite> {
4652 debug!("append_response");
4653 let event = JoinSetResponseEventOuter {
4654 created_at,
4655 event: response_event,
4656 };
4657
4658 let mut client_guard = self.client.lock().await;
4659 let tx = client_guard.transaction().await?;
4660
4661 let notifier = append_response(&tx, &execution_id, event).await?;
4662
4663 tx.commit().await?;
4664 drop(client_guard);
4665
4666 self.notify_all(vec![notifier], created_at);
4667 Ok(())
4668 }
4669}
4670
4671#[cfg(feature = "test")]
4672impl PostgresPool {
4673 pub async fn drop_database(&self) {
4674 let mut cfg = deadpool_postgres::Config::new();
4675 cfg.host = Some(self.config.host.clone());
4676 cfg.user = Some(self.config.user.clone());
4677 cfg.password = Some(self.config.password.expose_secret().to_string());
4678 cfg.dbname = Some(ADMIN_DB_NAME.into());
4679 cfg.manager = Some(ManagerConfig {
4680 recycling_method: RecyclingMethod::Fast,
4681 });
4682
4683 let pool = cfg
4684 .create_pool(None, NoTls)
4685 .map_err(|err| {
4686 error!("Cannot create the default pool - {err:?}");
4687 InitializationError
4688 })
4689 .unwrap();
4690
4691 let client = pool
4692 .get()
4693 .await
4694 .map_err(|err| {
4695 error!("Cannot get a connection from the default pool - {err:?}");
4696 InitializationError
4697 })
4698 .unwrap();
4699 for _ in 0..3 {
4700 let res = client
4701 .execute(&format!("DROP DATABASE {}", self.config.db_name), &[])
4702 .await; if res.is_ok() {
4704 debug!("Database '{}' dropped.", self.config.db_name);
4705 return;
4706 }
4707 debug!("Dropping db failed - {res:?}",);
4708 }
4709 warn!("Did not drop database {}", self.config.db_name);
4710 }
4711}