1use crate::{histograms::Histograms, sqlite_dao::conversions::to_generic_error};
2use async_trait::async_trait;
3use chrono::{DateTime, Utc};
4use concepts::{
5 ComponentId, ComponentRetryConfig, ExecutionId, FunctionFqn, JoinSetId, StrVariant,
6 SupportedFunctionReturnValue,
7 component_id::InputContentDigest,
8 prefixed_ulid::{DelayId, DeploymentId, ExecutionIdDerived, ExecutorId, RunId},
9 storage::{
10 AppendBatchResponse, AppendDelayResponseOutcome, AppendEventsToExecution, AppendRequest,
11 AppendResponse, AppendResponseToExecution, BacktraceFilter, BacktraceInfo, CreateRequest,
12 DUMMY_CREATED, DUMMY_HISTORY_EVENT, DbConnection, DbErrorGeneric, DbErrorRead,
13 DbErrorReadWithTimeout, DbErrorWrite, DbErrorWriteNonRetriable, DbExecutor, DbExternalApi,
14 DbPool, DbPoolCloseable, DeploymentState, ExecutionEvent, ExecutionListPagination,
15 ExecutionRequest, ExecutionWithState, ExecutionWithStateRequestsResponses, ExpiredDelay,
16 ExpiredLock, ExpiredTimer, HISTORY_EVENT_TYPE_JOIN_NEXT, HistoryEvent, JoinSetRequest,
17 JoinSetResponse, JoinSetResponseEvent, JoinSetResponseEventOuter,
18 ListExecutionEventsResponse, ListExecutionsFilter, ListLogsResponse, ListResponsesResponse,
19 LockPendingResponse, Locked, LockedBy, LockedExecution, LogEntry, LogEntryRow, LogFilter,
20 LogInfoAppendRow, LogLevel, LogStreamType, Pagination, PendingState,
21 PendingStateBlockedByJoinSet, PendingStateFinishedResultKind, PendingStateMergedPause,
22 ResponseCursor, ResponseWithCursor, STATE_BLOCKED_BY_JOIN_SET, STATE_FINISHED,
23 STATE_LOCKED, STATE_PENDING_AT, TimeoutOutcome, Version, VersionType,
24 },
25};
26use const_format::formatcp;
27use conversions::{JsonWrapper, consistency_db_err, consistency_rusqlite, from_generic_error};
28use db_common::{
29 AppendNotifier, CombinedState, CombinedStateDTO, NotifierExecutionFinished, NotifierPendingAt,
30 PendingFfqnSubscribersHolder,
31};
32use hashbrown::HashMap;
33use rusqlite::{
34 CachedStatement, Connection, OpenFlags, OptionalExtension, Params, Row, ToSql, Transaction,
35 TransactionBehavior, named_params, types::ToSqlOutput,
36};
37use std::{
38 cmp::max,
39 collections::VecDeque,
40 fmt::Debug,
41 ops::DerefMut,
42 panic::Location,
43 path::Path,
44 sync::{
45 Arc, Mutex,
46 atomic::{AtomicBool, Ordering},
47 },
48 time::{Duration, Instant},
49};
50use std::{fmt::Write as _, pin::Pin};
51use strum::IntoEnumIterator as _;
52use tokio::sync::{mpsc, oneshot};
53use tracing::{Level, Span, debug, error, info, instrument, trace, warn};
54use tracing_error::SpanTrace;
55
56#[derive(Debug, thiserror::Error)]
57#[error("initialization error")]
58pub struct InitializationError;
59
60#[derive(Debug, Clone)]
61struct DelayReq {
62 join_set_id: JoinSetId,
63 delay_id: DelayId,
64 expires_at: DateTime<Utc>,
65}
66const PRAGMA: [[&str; 2]; 10] = [
78 ["journal_mode", "wal"],
79 ["synchronous", "FULL"],
80 ["foreign_keys", "true"],
81 ["busy_timeout", "1000"],
82 ["cache_size", "10000"], ["temp_store", "MEMORY"],
84 ["page_size", "8192"], ["mmap_size", "134217728"],
86 ["journal_size_limit", "67108864"],
87 ["integrity_check", ""],
88];
89
90const CREATE_TABLE_T_METADATA: &str = r"
92CREATE TABLE IF NOT EXISTS t_metadata (
93 id INTEGER PRIMARY KEY AUTOINCREMENT,
94 schema_version INTEGER NOT NULL,
95 created_at TEXT NOT NULL
96) STRICT
97";
98const T_METADATA_EXPECTED_SCHEMA_VERSION: u32 = 6;
99
100const CREATE_TABLE_T_EXECUTION_LOG: &str = r"
102CREATE TABLE IF NOT EXISTS t_execution_log (
103 execution_id TEXT NOT NULL,
104 created_at TEXT NOT NULL,
105 json_value TEXT NOT NULL,
106 version INTEGER NOT NULL,
107 variant TEXT NOT NULL,
108 join_set_id TEXT,
109 history_event_type TEXT GENERATED ALWAYS AS (json_value->>'$.history_event.event.type') STORED,
110 PRIMARY KEY (execution_id, version)
111) STRICT
112";
113const CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_VERSION: &str = r"
115CREATE INDEX IF NOT EXISTS idx_t_execution_log_execution_id_version ON t_execution_log (execution_id, version);
116";
117const CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_VARIANT: &str = r"
119CREATE INDEX IF NOT EXISTS idx_t_execution_log_execution_id_variant ON t_execution_log (execution_id, variant);
120";
121
122const CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_JOIN_SET: &str = const_format::formatcp!(
124 "CREATE INDEX IF NOT EXISTS idx_t_execution_log_execution_id_join_set ON t_execution_log (execution_id, join_set_id, history_event_type) WHERE history_event_type=\"{}\";",
125 HISTORY_EVENT_TYPE_JOIN_NEXT
126);
127
128const CREATE_TABLE_T_JOIN_SET_RESPONSE: &str = r"
133CREATE TABLE IF NOT EXISTS t_join_set_response (
134 id INTEGER PRIMARY KEY AUTOINCREMENT,
135 created_at TEXT NOT NULL,
136 execution_id TEXT NOT NULL,
137 join_set_id TEXT NOT NULL,
138
139 delay_id TEXT,
140 delay_success INTEGER,
141
142 child_execution_id TEXT,
143 finished_version INTEGER,
144
145 UNIQUE (execution_id, join_set_id, delay_id, child_execution_id)
146) STRICT
147";
148const CREATE_INDEX_IDX_T_JOIN_SET_RESPONSE_EXECUTION_ID_ID: &str = r"
150CREATE INDEX IF NOT EXISTS idx_t_join_set_response_execution_id_id ON t_join_set_response (execution_id, id);
151";
152const CREATE_INDEX_IDX_JOIN_SET_RESPONSE_UNIQUE_CHILD_ID: &str = r"
154CREATE UNIQUE INDEX IF NOT EXISTS idx_join_set_response_unique_child_id
155ON t_join_set_response (child_execution_id) WHERE child_execution_id IS NOT NULL;
156";
157const CREATE_INDEX_IDX_JOIN_SET_RESPONSE_UNIQUE_DELAY_ID: &str = r"
159CREATE UNIQUE INDEX IF NOT EXISTS idx_join_set_response_unique_delay_id
160ON t_join_set_response (delay_id) WHERE delay_id IS NOT NULL;
161";
162
163const CREATE_TABLE_T_STATE: &str = r"
190CREATE TABLE IF NOT EXISTS t_state (
191 execution_id TEXT NOT NULL,
192 is_top_level INTEGER NOT NULL,
193 corresponding_version INTEGER NOT NULL,
194 ffqn TEXT NOT NULL,
195 created_at TEXT NOT NULL,
196 component_id_input_digest BLOB NOT NULL,
197 component_type TEXT NOT NULL,
198 first_scheduled_at TEXT NOT NULL,
199 deployment_id TEXT NOT NULL,
200 is_paused INTEGER NOT NULL,
201
202 pending_expires_finished TEXT NOT NULL,
203 state TEXT NOT NULL,
204 updated_at TEXT NOT NULL,
205 intermittent_event_count INTEGER NOT NULL,
206
207 max_retries INTEGER,
208 retry_exp_backoff_millis INTEGER,
209 last_lock_version INTEGER,
210 executor_id TEXT,
211 run_id TEXT,
212
213 join_set_id TEXT,
214 join_set_closing INTEGER,
215
216 result_kind TEXT,
217
218 PRIMARY KEY (execution_id)
219) STRICT
220";
221
222const IDX_T_STATE_LOCK_PENDING_BY_FFQN: &str = formatcp!(
224 r"
225CREATE INDEX IF NOT EXISTS idx_t_state_lock_pending_by_ffqn ON t_state (pending_expires_finished, ffqn) WHERE state = '{}';
226",
227 STATE_PENDING_AT
228);
229const IDX_T_STATE_LOCK_PENDING_BY_COMPONENT: &str = formatcp!(
231 r"
232CREATE INDEX IF NOT EXISTS idx_t_state_lock_pending_by_component ON t_state (pending_expires_finished, component_id_input_digest) WHERE state = '{}';
233",
234 STATE_PENDING_AT
235);
236const IDX_T_STATE_EXPIRED_LOCKS: &str = formatcp!(
237 "CREATE INDEX IF NOT EXISTS idx_t_state_expired_locks ON t_state (pending_expires_finished) WHERE state = '{}';",
238 STATE_LOCKED
239);
240const IDX_T_STATE_EXECUTION_ID_IS_TOP_LEVEL: &str = r"
241CREATE INDEX IF NOT EXISTS idx_t_state_execution_id_is_root ON t_state (execution_id, is_top_level);
242";
243const IDX_T_STATE_FFQN: &str = r"
245CREATE INDEX IF NOT EXISTS idx_t_state_ffqn ON t_state (ffqn);
246";
247const IDX_T_STATE_CREATED_AT: &str = r"
249CREATE INDEX IF NOT EXISTS idx_t_state_created_at ON t_state (created_at);
250";
251
252const IDX_T_STATE_DEPLOYMENT_STATE: &str = r"
254CREATE INDEX IF NOT EXISTS idx_t_state_deployment_state ON t_state (deployment_id, state);
255";
256
257const CREATE_TABLE_T_DELAY: &str = r"
259CREATE TABLE IF NOT EXISTS t_delay (
260 execution_id TEXT NOT NULL,
261 join_set_id TEXT NOT NULL,
262 delay_id TEXT NOT NULL,
263 expires_at TEXT NOT NULL,
264 PRIMARY KEY (execution_id, join_set_id, delay_id)
265) STRICT
266";
267
268const CREATE_TABLE_T_EXECUTION_BACKTRACE: &str = r"
271CREATE TABLE IF NOT EXISTS t_execution_backtrace (
272 execution_id TEXT NOT NULL,
273 component_id TEXT NOT NULL,
274 version_min_including INTEGER NOT NULL,
275 version_max_excluding INTEGER NOT NULL,
276 backtrace_hash BLOB NOT NULL,
277
278 PRIMARY KEY (
279 execution_id,
280 version_min_including,
281 version_max_excluding
282 ),
283 FOREIGN KEY (backtrace_hash)
284 REFERENCES t_wasm_backtrace(backtrace_hash)
285) STRICT
286";
287const IDX_T_EXECUTION_BACKTRACE_EXECUTION_ID_VERSION: &str = r"
289CREATE INDEX IF NOT EXISTS idx_t_execution_backtrace_execution_id_version
290ON t_execution_backtrace (
291 execution_id,
292 version_min_including,
293 version_max_excluding
294)
295";
296const CREATE_TABLE_T_WASM_BACKTRACE: &str = r"
298CREATE TABLE IF NOT EXISTS t_wasm_backtrace (
299 backtrace_hash BLOB NOT NULL,
300 wasm_backtrace TEXT NOT NULL,
301
302 PRIMARY KEY (backtrace_hash)
303) STRICT
304";
305
306const CREATE_TABLE_T_LOG: &str = r"
310CREATE TABLE IF NOT EXISTS t_log (
311 id INTEGER PRIMARY KEY,
312 execution_id TEXT NOT NULL,
313 run_id TEXT NOT NULL,
314 created_at TEXT NOT NULL,
315 level INTEGER,
316 message TEXT,
317 stream_type INTEGER,
318 payload BLOB
319) STRICT
320";
321const IDX_T_LOG_EXECUTION_ID_RUN_ID_CREATED_AT: &str = r"
322CREATE INDEX IF NOT EXISTS idx_t_log_execution_id_run_id_created_at ON t_log (execution_id, run_id, created_at);
323";
324const IDX_T_LOG_EXECUTION_ID_CREATED_AT: &str = r"
325CREATE INDEX IF NOT EXISTS idx_t_log_execution_id_created_at ON t_log (execution_id, created_at);
326";
327
328#[derive(Debug, thiserror::Error, Clone)]
329enum RusqliteError {
330 #[error("not found")]
331 NotFound,
332 #[error("generic: {reason}")]
333 Generic {
334 reason: StrVariant,
335 context: SpanTrace,
336 source: Option<Arc<dyn std::error::Error + Send + Sync>>,
337 loc: &'static Location<'static>,
338 },
339 #[error("close")]
340 Close,
341}
342
343mod conversions {
344
345 use super::RusqliteError;
346 use concepts::{
347 StrVariant,
348 storage::{DbErrorGeneric, DbErrorRead, DbErrorReadWithTimeout, DbErrorWrite},
349 };
350 use rusqlite::{
351 ToSql,
352 types::{FromSql, FromSqlError},
353 };
354 use std::{fmt::Debug, panic::Location, sync::Arc};
355 use tracing::error;
356 use tracing_error::SpanTrace;
357
358 impl From<rusqlite::Error> for RusqliteError {
359 #[track_caller]
361 fn from(err: rusqlite::Error) -> Self {
362 if matches!(err, rusqlite::Error::QueryReturnedNoRows) {
363 RusqliteError::NotFound
364 } else {
365 RusqliteError::Generic {
366 reason: err.to_string().into(),
367 context: SpanTrace::capture(),
368 source: Some(Arc::new(err)),
369 loc: Location::caller(),
370 }
371 }
372 }
373 }
374
375 #[track_caller]
377 pub fn to_generic_error(err: RusqliteError) -> DbErrorGeneric {
378 if let RusqliteError::Close = err {
379 DbErrorGeneric::Close
380 } else {
381 DbErrorGeneric::Uncategorized {
382 reason: err.to_string().into(),
383 context: SpanTrace::capture(),
384 source: Some(Arc::new(err)),
385 loc: Location::caller(),
386 }
387 }
388 }
389
390 #[track_caller]
392 pub fn from_generic_error(err: &DbErrorGeneric) -> rusqlite::Error {
393 FromSqlError::other(OtherError {
394 reason: err.to_string().into(),
395 loc: Location::caller(),
396 })
397 .into()
398 }
399
400 impl From<RusqliteError> for DbErrorRead {
401 fn from(err: RusqliteError) -> Self {
402 if matches!(err, RusqliteError::NotFound) {
403 Self::NotFound
404 } else {
405 to_generic_error(err).into()
406 }
407 }
408 }
409 impl From<RusqliteError> for DbErrorReadWithTimeout {
410 fn from(err: RusqliteError) -> Self {
411 Self::from(DbErrorRead::from(err))
412 }
413 }
414 impl From<RusqliteError> for DbErrorWrite {
415 fn from(err: RusqliteError) -> Self {
416 if matches!(err, RusqliteError::NotFound) {
417 Self::NotFound
418 } else {
419 to_generic_error(err).into()
420 }
421 }
422 }
423
424 pub(crate) struct JsonWrapper<T>(pub(crate) T);
425 impl<T: serde::de::DeserializeOwned + 'static + Debug> FromSql for JsonWrapper<T> {
426 fn column_result(
427 value: rusqlite::types::ValueRef<'_>,
428 ) -> rusqlite::types::FromSqlResult<Self> {
429 let value = match value {
430 rusqlite::types::ValueRef::Text(value) | rusqlite::types::ValueRef::Blob(value) => {
431 Ok(value)
432 }
433 other => {
434 error!(
435 backtrace = %std::backtrace::Backtrace::capture(),
436 "Unexpected type when conveting to JSON - expected Text or Blob, got type `{other:?}`",
437 );
438 Err(FromSqlError::InvalidType)
439 }
440 }?;
441 let value = serde_json::from_slice::<T>(value).map_err(|err| {
442 error!(
443 backtrace = %std::backtrace::Backtrace::capture(),
444 "Cannot convert JSON value `{value:?}` to type:`{type}` - {err:?}",
445 r#type = std::any::type_name::<T>()
446 );
447 FromSqlError::InvalidType
448 })?;
449 Ok(Self(value))
450 }
451 }
452 impl<T: serde::ser::Serialize + Debug> ToSql for JsonWrapper<T> {
453 fn to_sql(&self) -> rusqlite::Result<rusqlite::types::ToSqlOutput<'_>> {
454 let string = serde_json::to_string(&self.0).map_err(|err| {
455 error!(
456 "Cannot serialize {value:?} of type `{type}` - {err:?}",
457 value = self.0,
458 r#type = std::any::type_name::<T>()
459 );
460 rusqlite::Error::ToSqlConversionFailure(Box::new(err))
461 })?;
462 Ok(rusqlite::types::ToSqlOutput::Owned(
463 rusqlite::types::Value::Text(string),
464 ))
465 }
466 }
467
468 #[derive(Debug, thiserror::Error)]
470 #[error("{reason}")]
471 pub(crate) struct OtherError {
472 reason: StrVariant,
473 loc: &'static Location<'static>,
474 }
475
476 #[track_caller]
477 pub(crate) fn consistency_rusqlite(reason: impl Into<StrVariant>) -> rusqlite::Error {
478 FromSqlError::other(OtherError {
479 reason: reason.into(),
480 loc: Location::caller(),
481 })
482 .into()
483 }
484
485 #[track_caller]
486 pub(crate) fn consistency_db_err(reason: impl Into<StrVariant>) -> DbErrorGeneric {
487 DbErrorGeneric::Uncategorized {
488 reason: reason.into(),
489 context: SpanTrace::capture(),
490 source: None,
491 loc: Location::caller(),
492 }
493 }
494}
495
496#[derive(Debug, Copy, Clone, PartialEq, Eq)]
497enum TxType {
498 MultipleWrites, Other, }
501
502#[derive(Clone)]
503struct CommitError(RusqliteError);
504
505#[derive(Debug)]
506struct ShouldRollback;
507
508#[derive(derive_more::Debug)]
509struct LogicalTx {
510 #[debug(skip)]
511 #[expect(clippy::type_complexity)]
512 func: Box<dyn FnMut(&mut Transaction) -> Result<(), ShouldRollback> + Send>,
513 sent_at: Instant,
514 func_name: &'static str,
515 #[debug(skip)]
516 phytx_flush_sender: oneshot::Sender<Result<(), CommitError>>,
518 priority: LtxPriority,
519}
520#[derive(Copy, Clone, Debug, PartialEq, Eq)]
521enum LtxPriority {
522 High,
523 Low,
524}
525
526#[derive(derive_more::Debug)]
527enum ThreadCommand {
528 LogicalTx(LogicalTx),
529 Shutdown,
530}
531
532#[derive(Clone)]
533pub struct SqlitePool(SqlitePoolInner);
534
535type ResponseSubscribers =
536 Arc<Mutex<HashMap<ExecutionId, (oneshot::Sender<ResponseWithCursor>, u64)>>>;
537type PendingSubscribers = Arc<Mutex<PendingFfqnSubscribersHolder>>;
538type ExecutionFinishedSubscribers =
539 Mutex<HashMap<ExecutionId, HashMap<u64, oneshot::Sender<SupportedFunctionReturnValue>>>>;
540
541#[derive(Clone)]
542struct SqlitePoolInner {
543 shutdown_requested: Arc<AtomicBool>,
544 shutdown_finished: Arc<AtomicBool>,
545 command_tx: tokio::sync::mpsc::Sender<ThreadCommand>,
546 response_subscribers: ResponseSubscribers,
547 pending_subscribers: PendingSubscribers,
548 execution_finished_subscribers: Arc<ExecutionFinishedSubscribers>,
549 join_handle: Option<Arc<std::thread::JoinHandle<()>>>, }
551
552#[async_trait]
553impl DbPoolCloseable for SqlitePool {
554 async fn close(&self) {
555 debug!("Sqlite is closing");
556 self.0.shutdown_requested.store(true, Ordering::Release);
557 let _ = self.0.command_tx.try_send(ThreadCommand::Shutdown);
559 while !self.0.shutdown_finished.load(Ordering::Acquire) {
560 tokio::time::sleep(Duration::from_millis(1)).await;
561 }
562 debug!("Sqlite was closed");
563 }
564}
565
566#[async_trait]
567impl DbPool for SqlitePool {
568 async fn db_exec_conn(&self) -> Result<Box<dyn DbExecutor>, DbErrorGeneric> {
569 if self.0.shutdown_requested.load(Ordering::Acquire) {
570 return Err(DbErrorGeneric::Close);
571 }
572 Ok(Box::new(self.clone()))
573 }
574
575 async fn connection(&self) -> Result<Box<dyn DbConnection>, DbErrorGeneric> {
576 if self.0.shutdown_requested.load(Ordering::Acquire) {
577 return Err(DbErrorGeneric::Close);
578 }
579 Ok(Box::new(self.clone()))
580 }
581 async fn external_api_conn(&self) -> Result<Box<dyn DbExternalApi>, DbErrorGeneric> {
582 if self.0.shutdown_requested.load(Ordering::Acquire) {
583 return Err(DbErrorGeneric::Close);
584 }
585 Ok(Box::new(self.clone()))
586 }
587 #[cfg(feature = "test")]
588 async fn connection_test(
589 &self,
590 ) -> Result<Box<dyn concepts::storage::DbConnectionTest>, DbErrorGeneric> {
591 if self.0.shutdown_requested.load(Ordering::Acquire) {
592 return Err(DbErrorGeneric::Close);
593 }
594 Ok(Box::new(self.clone()))
595 }
596}
597
598impl Drop for SqlitePool {
599 fn drop(&mut self) {
600 let arc = self.0.join_handle.take().expect("join_handle was set");
601 if let Ok(join_handle) = Arc::try_unwrap(arc) {
602 if !join_handle.is_finished() {
604 if !self.0.shutdown_finished.load(Ordering::Acquire) {
605 let backtrace = std::backtrace::Backtrace::capture();
607 warn!("SqlitePool was not closed properly - {backtrace}");
608 self.0.shutdown_requested.store(true, Ordering::Release);
609 let _ = self.0.command_tx.try_send(ThreadCommand::Shutdown);
611 } else {
614 }
616 }
617 }
618 }
619}
620
621#[derive(Debug, Clone)]
622pub struct SqliteConfig {
623 pub queue_capacity: usize,
624 pub pragma_override: Option<HashMap<String, String>>,
625 pub metrics_threshold: Option<Duration>,
626}
627impl Default for SqliteConfig {
628 fn default() -> Self {
629 Self {
630 queue_capacity: 100,
631 pragma_override: None,
632 metrics_threshold: None,
633 }
634 }
635}
636
637struct ShutdownRequested;
638
639impl SqlitePool {
640 fn init_thread(
641 path: &Path,
642 mut pragma_override: HashMap<String, String>,
643 ) -> Result<Connection, InitializationError> {
644 fn conn_execute<P: Params>(
645 conn: &Connection,
646 sql: &str,
647 params: P,
648 ) -> Result<(), InitializationError> {
649 conn.execute(sql, params).map(|_| ()).map_err(|err| {
650 error!("Cannot run `{sql}` - {err:?}");
651 InitializationError
652 })
653 }
654 fn pragma_update(
655 conn: &Connection,
656 name: &str,
657 value: &str,
658 ) -> Result<(), InitializationError> {
659 if value.is_empty() {
660 debug!("Querying PRAGMA {name}");
661 conn.pragma_query(None, name, |row| {
662 debug!("{row:?}");
663 Ok(())
664 })
665 .map_err(|err| {
666 error!("cannot update pragma `{name}`=`{value}` - {err:?}");
667 InitializationError
668 })
669 } else {
670 debug!("Setting PRAGMA {name}={value}");
671 conn.pragma_update(None, name, value).map_err(|err| {
672 error!("cannot update pragma `{name}`=`{value}` - {err:?}");
673 InitializationError
674 })
675 }
676 }
677
678 let conn = Connection::open_with_flags(path, OpenFlags::default()).map_err(|err| {
679 error!("cannot open the connection - {err:?}");
680 InitializationError
681 })?;
682
683 for [pragma_name, default_value] in PRAGMA {
684 let pragma_value = pragma_override
685 .remove(pragma_name)
686 .unwrap_or_else(|| default_value.to_string());
687 pragma_update(&conn, pragma_name, &pragma_value)?;
688 }
689 for (pragma_name, pragma_value) in pragma_override.drain() {
691 pragma_update(&conn, &pragma_name, &pragma_value)?;
692 }
693
694 {
696 conn_execute(&conn, CREATE_TABLE_T_METADATA, [])?;
697 let actual_version = conn
700 .prepare("SELECT schema_version FROM t_metadata ORDER BY id DESC LIMIT 1")
701 .map_err(|err| {
702 error!("cannot select schema version - {err:?}");
703 InitializationError
704 })?
705 .query_row([], |row| row.get::<_, u32>("schema_version"))
706 .optional()
707 .map_err(|err| {
708 error!("Cannot read the schema version - {err:?}");
709 InitializationError
710 })?;
711
712 match actual_version {
713 None => conn_execute(
714 &conn,
715 &format!(
716 "INSERT INTO t_metadata (schema_version, created_at) VALUES
717 ({T_METADATA_EXPECTED_SCHEMA_VERSION}, ?) ON CONFLICT DO NOTHING"
718 ),
719 [Utc::now()],
720 )?,
721 Some(actual_version) => {
722 if actual_version != T_METADATA_EXPECTED_SCHEMA_VERSION {
724 error!(
725 "wrong schema version, expected {T_METADATA_EXPECTED_SCHEMA_VERSION}, got {actual_version}"
726 );
727 return Err(InitializationError);
728 }
729 }
730 }
731 }
732
733 conn_execute(&conn, CREATE_TABLE_T_EXECUTION_LOG, [])?;
735 conn_execute(
736 &conn,
737 CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_VERSION,
738 [],
739 )?;
740 conn_execute(
741 &conn,
742 CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_VARIANT,
743 [],
744 )?;
745 conn_execute(
746 &conn,
747 CREATE_INDEX_IDX_T_EXECUTION_LOG_EXECUTION_ID_JOIN_SET,
748 [],
749 )?;
750 conn_execute(&conn, CREATE_TABLE_T_JOIN_SET_RESPONSE, [])?;
752 conn_execute(
753 &conn,
754 CREATE_INDEX_IDX_T_JOIN_SET_RESPONSE_EXECUTION_ID_ID,
755 [],
756 )?;
757 conn_execute(
758 &conn,
759 CREATE_INDEX_IDX_JOIN_SET_RESPONSE_UNIQUE_CHILD_ID,
760 [],
761 )?;
762 conn_execute(
763 &conn,
764 CREATE_INDEX_IDX_JOIN_SET_RESPONSE_UNIQUE_DELAY_ID,
765 [],
766 )?;
767 conn_execute(&conn, CREATE_TABLE_T_STATE, [])?;
769 conn_execute(&conn, IDX_T_STATE_LOCK_PENDING_BY_FFQN, [])?;
770 conn_execute(&conn, IDX_T_STATE_LOCK_PENDING_BY_COMPONENT, [])?;
771 conn_execute(&conn, IDX_T_STATE_EXPIRED_LOCKS, [])?;
772 conn_execute(&conn, IDX_T_STATE_EXECUTION_ID_IS_TOP_LEVEL, [])?;
773 conn_execute(&conn, IDX_T_STATE_FFQN, [])?;
774 conn_execute(&conn, IDX_T_STATE_CREATED_AT, [])?;
775 conn_execute(&conn, IDX_T_STATE_DEPLOYMENT_STATE, [])?;
776 conn_execute(&conn, CREATE_TABLE_T_DELAY, [])?;
778 conn_execute(&conn, CREATE_TABLE_T_EXECUTION_BACKTRACE, [])?;
780 conn_execute(&conn, IDX_T_EXECUTION_BACKTRACE_EXECUTION_ID_VERSION, [])?;
781 conn_execute(&conn, CREATE_TABLE_T_WASM_BACKTRACE, [])?;
782 conn_execute(&conn, CREATE_TABLE_T_LOG, [])?;
784 conn_execute(&conn, IDX_T_LOG_EXECUTION_ID_RUN_ID_CREATED_AT, [])?;
785 conn_execute(&conn, IDX_T_LOG_EXECUTION_ID_CREATED_AT, [])?;
786 Ok(conn)
787 }
788
789 fn connection_rpc(
790 mut conn: Connection,
791 shutdown_requested: &AtomicBool,
792 shutdown_finished: &AtomicBool,
793 mut command_rx: mpsc::Receiver<ThreadCommand>,
794 metrics_threshold: Option<Duration>,
795 ) {
796 let mut histograms = Histograms::new(metrics_threshold);
797 while Self::tick(
798 &mut conn,
799 shutdown_requested,
800 &mut command_rx,
801 &mut histograms,
802 )
803 .is_ok()
804 {
805 }
807 debug!("Closing command thread");
808 shutdown_finished.store(true, Ordering::Release);
809 }
810
811 fn tick(
812 conn: &mut Connection,
813 shutdown_requested: &AtomicBool,
814 command_rx: &mut mpsc::Receiver<ThreadCommand>,
815 histograms: &mut Histograms,
816 ) -> Result<(), ShutdownRequested> {
817 #[derive(Clone, Copy, PartialEq, Eq)]
818 enum ApplyOrSkip {
819 Apply,
820 Skip, }
822 let mut ltx_list: Vec<(LogicalTx, ApplyOrSkip)> = Vec::new();
823 loop {
825 let ltx = match command_rx.blocking_recv() {
826 Some(ThreadCommand::LogicalTx(ltx)) => ltx,
827 Some(ThreadCommand::Shutdown) => {
828 debug!("Shutdown message received");
829 return Err(ShutdownRequested);
830 }
831 None => {
832 debug!("command_rx was closed");
833 return Err(ShutdownRequested);
834 }
835 };
836 let prio = ltx.priority;
837 ltx_list.push((ltx, ApplyOrSkip::Apply));
838 if prio == LtxPriority::High {
839 break;
840 }
841 }
842 let all_fns_start = std::time::Instant::now();
845
846 while let Ok(more) = command_rx.try_recv() {
848 let ltx = match more {
849 ThreadCommand::Shutdown => {
850 debug!("Shutdown message received");
851 return Err(ShutdownRequested);
854 }
855 ThreadCommand::LogicalTx(ltx) => ltx,
856 };
857 ltx_list.push((ltx, ApplyOrSkip::Apply));
858 }
859
860 struct NeedsRestart;
861 type CommitResult = Result<(), CommitError>;
862 fn try_apply_all(
863 mut ptx: Transaction<'_>,
864 ltx_list: &mut [(LogicalTx, ApplyOrSkip)],
865 histograms: &mut Histograms,
866 all_fns_start: Instant,
867 ) -> Result<CommitResult, NeedsRestart> {
868 for (ltx, former_res) in ltx_list
869 .iter_mut()
870 .filter(|(_, former_res)| *former_res == ApplyOrSkip::Apply)
871 {
872 if let Ok(()) = SqlitePool::ltx_apply_to_phytx(ltx, &mut ptx, histograms) {
873 } else {
874 *former_res = ApplyOrSkip::Skip;
875 return Err(NeedsRestart);
877 }
878 }
879 histograms.record_all_fns(all_fns_start.elapsed());
881 let now = std::time::Instant::now();
882 let commit_result = ptx.commit().map_err(|err| {
883 warn!("Cannot commit transaction - {err:?}");
884 CommitError(RusqliteError::from(err))
885 });
886 histograms.record_commit(now.elapsed());
887 Ok(commit_result)
888 }
889
890 fn apply_all(
891 conn: &mut Connection,
892 ltx_list: &mut [(LogicalTx, ApplyOrSkip)],
893 histograms: &mut Histograms,
894 all_fns_start: Instant,
895 shutdown_requested: &AtomicBool,
896 ) -> Result<CommitResult, ShutdownRequested> {
897 loop {
899 match conn.transaction_with_behavior(TransactionBehavior::Immediate) {
900 Ok(ptx) => {
901 if let Ok(commit_res) =
902 try_apply_all(ptx, ltx_list, histograms, all_fns_start)
903 {
904 return Ok(commit_res);
905 }
906 }
907 Err(begin_err) => {
908 error!("Cannot open transaction - {begin_err:?}");
909 std::thread::sleep(Duration::from_millis(100));
910 if shutdown_requested.load(Ordering::Acquire) {
911 return Err(ShutdownRequested);
912 }
913 }
914 }
915 }
916 }
917 let ok_or_commit_error = apply_all(
918 conn,
919 &mut ltx_list,
920 histograms,
921 all_fns_start,
922 shutdown_requested,
923 )?;
924
925 for (ltx, apply_or_skip) in ltx_list {
926 let to_send = match apply_or_skip {
927 ApplyOrSkip::Apply => ok_or_commit_error.clone(),
928 ApplyOrSkip::Skip => {
929 Ok(()) }
931 };
932 let _ = ltx.phytx_flush_sender.send(to_send);
934 }
935
936 histograms.print_if_elapsed();
937 Ok(())
938 }
939
940 fn ltx_apply_to_phytx(
942 ltx: &mut LogicalTx,
943 physical_tx: &mut Transaction,
944 histograms: &mut Histograms,
945 ) -> Result<(), ShouldRollback> {
946 let sent_latency = ltx.sent_at.elapsed();
947 let started_at = Instant::now();
948 let res = (ltx.func)(physical_tx);
949 histograms.record_command(sent_latency, ltx.func_name, started_at.elapsed());
950 res
951 }
952
953 #[instrument(skip_all, name = "sqlite_new")]
954 pub async fn new<P: AsRef<Path>>(
955 path: P,
956 config: SqliteConfig,
957 ) -> Result<Self, InitializationError> {
958 let path = path.as_ref().to_owned();
959
960 let shutdown_requested = Arc::new(AtomicBool::new(false));
961 let shutdown_finished = Arc::new(AtomicBool::new(false));
962
963 let (command_tx, command_rx) = tokio::sync::mpsc::channel(config.queue_capacity);
964 info!("Sqlite database location: {path:?}");
965 let join_handle = {
966 let init_task = {
968 tokio::task::spawn_blocking(move || {
969 Self::init_thread(&path, config.pragma_override.unwrap_or_default())
970 })
971 .await
972 };
973 let conn = match init_task {
974 Ok(res) => res?,
975 Err(join_err) => {
976 error!("Initialization panic - {join_err:?}");
977 return Err(InitializationError);
978 }
979 };
980 let shutdown_requested = shutdown_requested.clone();
981 let shutdown_finished = shutdown_finished.clone();
982 std::thread::spawn(move || {
984 Self::connection_rpc(
985 conn,
986 &shutdown_requested,
987 &shutdown_finished,
988 command_rx,
989 config.metrics_threshold,
990 );
991 })
992 };
993 Ok(SqlitePool(SqlitePoolInner {
994 shutdown_requested,
995 shutdown_finished,
996 command_tx,
997 response_subscribers: Arc::default(),
998 pending_subscribers: Arc::default(),
999 join_handle: Some(Arc::new(join_handle)),
1000 execution_finished_subscribers: Arc::default(),
1001 }))
1002 }
1003
1004 async fn transaction<F, T, E>(
1006 &self,
1007 mut func: F,
1008 tx_type: TxType,
1009 func_name: &'static str,
1010 ) -> Result<T, E>
1011 where
1012 F: FnMut(&mut rusqlite::Transaction) -> Result<T, E> + Send + 'static,
1013 T: Send + 'static,
1014 E: From<RusqliteError> + Send + 'static,
1015 {
1016 let fn_res: Arc<std::sync::Mutex<Option<_>>> = Arc::default();
1017 let (phytx_flush_sender, phytx_flush_receiver) = oneshot::channel();
1018 let current_span = Span::current();
1019 let thread_command_func = {
1020 let fn_res = fn_res.clone();
1021 ThreadCommand::LogicalTx(LogicalTx {
1022 func: Box::new(move |tx| {
1023 let _guard = current_span.enter();
1024 let func_res = func(tx);
1025 let res = if func_res.is_ok() {
1026 Ok(())
1027 } else {
1028 Err(ShouldRollback)
1029 };
1030 *fn_res.lock().unwrap() = Some(func_res);
1032 match tx_type {
1033 TxType::MultipleWrites => res,
1034 TxType::Other => Ok(()),
1035 }
1036 }),
1037 sent_at: Instant::now(),
1038 func_name,
1039 phytx_flush_sender,
1040 priority: LtxPriority::High,
1041 })
1042 };
1043 self.0
1044 .command_tx
1045 .send(thread_command_func)
1046 .await
1047 .map_err(|_send_err| RusqliteError::Close)?;
1048
1049 match phytx_flush_receiver.await {
1051 Ok(Ok(())) => {
1052 let mut guard = fn_res.lock().unwrap();
1053 std::mem::take(guard.deref_mut()).expect("ltx must have been run at least once")
1054 }
1055 Ok(Err(CommitError(rusqlite_err))) => Err(E::from(rusqlite_err)),
1056 Err(_) => Err(E::from(RusqliteError::Close)),
1057 }
1058 }
1059
1060 async fn transaction_fire_forget<F, T, E>(&self, mut func: F, func_name: &'static str)
1062 where
1063 F: FnMut(&mut rusqlite::Transaction) -> Result<T, E> + Send + 'static,
1064 T: Send + 'static + Default,
1065 E: From<RusqliteError> + Send + 'static,
1066 {
1067 let (commit_ack_sender, _commit_ack_receiver) = oneshot::channel(); let current_span = Span::current();
1069 let thread_command_func = {
1070 ThreadCommand::LogicalTx(LogicalTx {
1071 func: Box::new(move |tx| {
1072 let _guard = current_span.enter();
1073 let _ = func(tx);
1074 Ok(()) }),
1076 sent_at: Instant::now(),
1077 func_name,
1078 phytx_flush_sender: commit_ack_sender,
1079 priority: LtxPriority::Low,
1080 })
1081 };
1082 let _ = self.0.command_tx.send(thread_command_func).await; }
1084
1085 fn fetch_created_event(
1086 conn: &Connection,
1087 execution_id: &ExecutionId,
1088 ) -> Result<CreateRequest, DbErrorRead> {
1089 let mut stmt = conn.prepare(
1090 "SELECT created_at, json_value FROM t_execution_log WHERE \
1091 execution_id = :execution_id AND version = 0",
1092 )?;
1093 let (created_at, event) = stmt.query_row(
1094 named_params! {
1095 ":execution_id": execution_id.to_string(),
1096 },
1097 |row| {
1098 let created_at = row.get("created_at")?;
1099 let event = row
1100 .get::<_, JsonWrapper<ExecutionRequest>>("json_value")
1101 .map_err(|serde| {
1102 error!("cannot deserialize `Created` event: {row:?} - `{serde:?}`");
1103 consistency_rusqlite("cannot deserialize `Created` event")
1104 })?;
1105 Ok((created_at, event.0))
1106 },
1107 )?;
1108 if let ExecutionRequest::Created {
1109 ffqn,
1110 params,
1111 parent,
1112 scheduled_at,
1113 component_id,
1114 deployment_id,
1115 metadata,
1116 scheduled_by,
1117 } = event
1118 {
1119 Ok(CreateRequest {
1120 created_at,
1121 execution_id: execution_id.clone(),
1122 ffqn,
1123 params,
1124 parent,
1125 scheduled_at,
1126 component_id,
1127 deployment_id,
1128 metadata,
1129 scheduled_by,
1130 })
1131 } else {
1132 error!("Row with version=0 must be a `Created` event - {event:?}");
1133 Err(consistency_db_err("expected `Created` event").into())
1134 }
1135 }
1136
1137 fn check_expected_next_and_appending_version(
1138 expected_version: &Version,
1139 appending_version: &Version,
1140 ) -> Result<(), DbErrorWrite> {
1141 if *expected_version != *appending_version {
1142 debug!(
1143 "Version conflict - expected: {expected_version:?}, appending: {appending_version:?}"
1144 );
1145 return Err(DbErrorWrite::NonRetriable(
1146 DbErrorWriteNonRetriable::VersionConflict {
1147 expected: expected_version.clone(),
1148 requested: appending_version.clone(),
1149 },
1150 ));
1151 }
1152 Ok(())
1153 }
1154
1155 #[instrument(level = Level::DEBUG, skip_all, fields(execution_id = %req.execution_id))]
1156 fn create_inner(
1157 tx: &Transaction,
1158 req: CreateRequest,
1159 ) -> Result<(AppendResponse, AppendNotifier), DbErrorWrite> {
1160 trace!("create_inner");
1161
1162 let version = Version::default();
1163 let execution_id = req.execution_id.clone();
1164 let execution_id_str = execution_id.to_string();
1165 let ffqn = req.ffqn.clone();
1166 let created_at = req.created_at;
1167 let scheduled_at = req.scheduled_at;
1168 let component_id = req.component_id.clone();
1169 let deployment_id = req.deployment_id;
1170 let event = ExecutionRequest::from(req);
1171 let event_ser = serde_json::to_string(&event).map_err(|err| {
1172 error!("Cannot serialize {event:?} - {err:?}");
1173 DbErrorWriteNonRetriable::ValidationFailed("parameter serialization error".into())
1174 })?;
1175 tx.prepare(
1176 "INSERT INTO t_execution_log (execution_id, created_at, version, json_value, variant, join_set_id ) \
1177 VALUES (:execution_id, :created_at, :version, :json_value, :variant, :join_set_id)")
1178 ?
1179 .execute(named_params! {
1180 ":execution_id": &execution_id_str,
1181 ":created_at": created_at,
1182 ":version": version.0,
1183 ":json_value": event_ser,
1184 ":variant": event.variant(),
1185 ":join_set_id": event.join_set_id().map(std::string::ToString::to_string),
1186 })
1187 ?;
1188 let pending_at = {
1189 debug!("Creating with `Pending(`{scheduled_at:?}`)");
1190 tx.prepare(
1191 r"
1192 INSERT INTO t_state (
1193 execution_id,
1194 is_top_level,
1195 corresponding_version,
1196 pending_expires_finished,
1197 ffqn,
1198 state,
1199 created_at,
1200 component_id_input_digest,
1201 component_type,
1202 deployment_id,
1203 updated_at,
1204 first_scheduled_at,
1205 intermittent_event_count,
1206 is_paused
1207 )
1208 VALUES (
1209 :execution_id,
1210 :is_top_level,
1211 :corresponding_version,
1212 :pending_expires_finished,
1213 :ffqn,
1214 :state,
1215 :created_at,
1216 :component_id_input_digest,
1217 :component_type,
1218 :deployment_id,
1219 CURRENT_TIMESTAMP,
1220 :first_scheduled_at,
1221 0,
1222 false
1223 )
1224 ",
1225 )?
1226 .execute(named_params! {
1227 ":execution_id": execution_id.to_string(),
1228 ":is_top_level": execution_id.is_top_level(),
1229 ":corresponding_version": version.0,
1230 ":pending_expires_finished": scheduled_at,
1231 ":ffqn": ffqn.to_string(),
1232 ":state": STATE_PENDING_AT,
1233 ":created_at": created_at,
1234 ":component_id_input_digest": component_id.input_digest,
1235 ":component_type": component_id.component_type,
1236 ":deployment_id": deployment_id.to_string(),
1237 ":first_scheduled_at": scheduled_at,
1238 })?;
1239 AppendNotifier {
1240 pending_at: Some(NotifierPendingAt {
1241 scheduled_at,
1242 ffqn,
1243 component_input_digest: component_id.input_digest,
1244 }),
1245 execution_finished: None,
1246 response: None,
1247 }
1248 };
1249 let next_version = Version::new(version.0 + 1);
1250 Ok((next_version, pending_at))
1251 }
1252
1253 #[instrument(level = Level::DEBUG, skip_all, fields(%execution_id, %scheduled_at))]
1254 fn update_state_pending_after_response_appended(
1255 tx: &Transaction,
1256 execution_id: &ExecutionId,
1257 scheduled_at: DateTime<Utc>, component_input_digest: InputContentDigest,
1259 ) -> Result<AppendNotifier, DbErrorWrite> {
1260 debug!("Setting t_state to Pending(`{scheduled_at:?}`) after response appended");
1261 let mut stmt = tx
1262 .prepare_cached(
1263 r"
1264 UPDATE t_state
1265 SET
1266 pending_expires_finished = :pending_expires_finished,
1267 state = :state,
1268 updated_at = CURRENT_TIMESTAMP,
1269 max_retries = NULL,
1270 retry_exp_backoff_millis = NULL,
1271 last_lock_version = NULL,
1272
1273 join_set_id = NULL,
1274 join_set_closing = NULL,
1275
1276 result_kind = NULL
1277 WHERE execution_id = :execution_id
1278 ",
1279 )
1280 .map_err(|err| DbErrorGeneric::Uncategorized {
1281 reason: err.to_string().into(),
1282 context: SpanTrace::capture(),
1283 source: Some(Arc::new(err)),
1284 loc: Location::caller(),
1285 })?;
1286 let updated = stmt
1287 .execute(named_params! {
1288 ":execution_id": execution_id,
1289 ":pending_expires_finished": scheduled_at,
1290 ":state": STATE_PENDING_AT,
1291 })
1292 .map_err(|err| DbErrorGeneric::Uncategorized {
1293 reason: err.to_string().into(),
1294 context: SpanTrace::capture(),
1295 source: Some(Arc::new(err)),
1296 loc: Location::caller(),
1297 })?;
1298 if updated != 1 {
1299 return Err(DbErrorWrite::NotFound);
1300 }
1301 Ok(AppendNotifier {
1302 pending_at: Some(NotifierPendingAt {
1303 scheduled_at,
1304 ffqn: Self::fetch_created_event(tx, execution_id)?.ffqn,
1305 component_input_digest,
1306 }),
1307 execution_finished: None,
1308 response: None,
1309 })
1310 }
1311
1312 #[instrument(level = Level::DEBUG, skip_all, fields(%execution_id, %scheduled_at, %appending_version))]
1313 fn update_state_pending_after_event_appended(
1314 tx: &Transaction,
1315 execution_id: &ExecutionId,
1316 appending_version: &Version,
1317 scheduled_at: DateTime<Utc>, intermittent_failure: bool,
1319 component_input_digest: InputContentDigest,
1320 ) -> Result<(AppendResponse, AppendNotifier), DbErrorWrite> {
1321 debug!("Setting t_state to Pending(`{scheduled_at:?}`) after event appended");
1322 let mut stmt = tx.prepare_cached(
1323 r"
1324 UPDATE t_state
1325 SET
1326 corresponding_version = :appending_version,
1327 pending_expires_finished = :pending_expires_finished,
1328 state = :state,
1329 updated_at = CURRENT_TIMESTAMP,
1330 intermittent_event_count = intermittent_event_count + :intermittent_delta,
1331
1332 max_retries = NULL,
1333 retry_exp_backoff_millis = NULL,
1334 last_lock_version = NULL,
1335
1336 join_set_id = NULL,
1337 join_set_closing = NULL,
1338
1339 result_kind = NULL
1340 WHERE execution_id = :execution_id;
1341 ", )?;
1343 let updated = stmt
1344 .execute(named_params! {
1345 ":execution_id": execution_id.to_string(),
1346 ":appending_version": appending_version.0,
1347 ":pending_expires_finished": scheduled_at,
1348 ":state": STATE_PENDING_AT,
1349 ":intermittent_delta": i32::from(intermittent_failure) })
1351 .map_err(DbErrorWrite::from)?;
1352 if updated != 1 {
1353 return Err(DbErrorWrite::NotFound);
1354 }
1355 Ok((
1356 appending_version.increment(),
1357 AppendNotifier {
1358 pending_at: Some(NotifierPendingAt {
1359 scheduled_at,
1360 ffqn: Self::fetch_created_event(tx, execution_id)?.ffqn,
1361 component_input_digest,
1362 }),
1363 execution_finished: None,
1364 response: None,
1365 },
1366 ))
1367 }
1368
1369 #[expect(clippy::too_many_arguments)]
1370 #[instrument(level = Level::DEBUG, skip_all, fields(%execution_id, %appending_version))]
1371 fn update_state_locked_get_intermittent_event_count(
1372 tx: &Transaction,
1373 execution_id: &ExecutionId,
1374 deployment_id: DeploymentId,
1375 component_digest: &InputContentDigest,
1376 executor_id: ExecutorId,
1377 run_id: RunId,
1378 lock_expires_at: DateTime<Utc>,
1379 appending_version: &Version,
1380 retry_config: ComponentRetryConfig,
1381 ) -> Result<u32, DbErrorWrite> {
1382 debug!("Setting t_state to Locked(`{lock_expires_at:?}`)");
1383 let backoff_millis =
1384 i64::try_from(retry_config.retry_exp_backoff.as_millis()).map_err(|err| {
1385 DbErrorGeneric::Uncategorized {
1386 reason: "backoff too big".into(),
1387 context: SpanTrace::capture(),
1388 source: Some(Arc::new(err)),
1389 loc: Location::caller(),
1390 }
1391 })?; let execution_id_str = execution_id.to_string();
1393 let mut stmt = tx.prepare_cached(
1394 r"
1395 UPDATE t_state
1396 SET
1397 corresponding_version = :appending_version,
1398 pending_expires_finished = :pending_expires_finished,
1399 state = :state,
1400 updated_at = CURRENT_TIMESTAMP,
1401 deployment_id = :deployment_id,
1402 component_id_input_digest = :component_id_input_digest,
1403
1404 max_retries = :max_retries,
1405 retry_exp_backoff_millis = :retry_exp_backoff_millis,
1406 last_lock_version = :appending_version,
1407 executor_id = :executor_id,
1408 run_id = :run_id,
1409
1410 join_set_id = NULL,
1411 join_set_closing = NULL,
1412
1413 result_kind = NULL
1414 WHERE execution_id = :execution_id
1415 AND is_paused = false
1416 ",
1417 )?;
1418 let updated = stmt.execute(named_params! {
1419 ":execution_id": execution_id_str,
1420 ":appending_version": appending_version.0,
1421 ":pending_expires_finished": lock_expires_at,
1422 ":state": STATE_LOCKED,
1423 ":deployment_id": deployment_id.to_string(),
1424 ":component_id_input_digest": component_digest,
1425 ":max_retries": retry_config.max_retries,
1426 ":retry_exp_backoff_millis": backoff_millis,
1427 ":executor_id": executor_id.to_string(),
1428 ":run_id": run_id.to_string(),
1429 })?;
1430 if updated != 1 {
1431 return Err(DbErrorWrite::NotFound);
1432 }
1433
1434 let intermittent_event_count = tx
1436 .prepare(
1437 "SELECT intermittent_event_count FROM t_state WHERE execution_id = :execution_id",
1438 )?
1439 .query_row(
1440 named_params! {
1441 ":execution_id": execution_id_str,
1442 },
1443 |row| {
1444 let intermittent_event_count = row.get("intermittent_event_count")?;
1445 Ok(intermittent_event_count)
1446 },
1447 )?;
1448
1449 Ok(intermittent_event_count)
1450 }
1451
1452 #[instrument(level = Level::DEBUG, skip_all, fields(%execution_id, %appending_version))]
1454 fn update_state_blocked(
1455 tx: &Transaction,
1456 execution_id: &ExecutionId,
1457 appending_version: &Version,
1458 join_set_id: &JoinSetId,
1460 lock_expires_at: DateTime<Utc>,
1461 join_set_closing: bool,
1462 ) -> Result<
1463 AppendResponse, DbErrorWrite,
1465 > {
1466 debug!("Setting t_state to BlockedByJoinSet(`{join_set_id}`)");
1467 let execution_id_str = execution_id.to_string();
1468 let mut stmt = tx.prepare_cached(
1469 r"
1470 UPDATE t_state
1471 SET
1472 corresponding_version = :appending_version,
1473 pending_expires_finished = :pending_expires_finished,
1474 state = :state,
1475 updated_at = CURRENT_TIMESTAMP,
1476
1477 max_retries = NULL,
1478 retry_exp_backoff_millis = NULL,
1479 last_lock_version = NULL,
1480
1481 join_set_id = :join_set_id,
1482 join_set_closing = :join_set_closing,
1483
1484 result_kind = NULL
1485 WHERE execution_id = :execution_id
1486 ",
1487 )?;
1488 let updated = stmt.execute(named_params! {
1489 ":execution_id": execution_id_str,
1490 ":appending_version": appending_version.0,
1491 ":pending_expires_finished": lock_expires_at,
1492 ":state": STATE_BLOCKED_BY_JOIN_SET,
1493 ":join_set_id": join_set_id,
1494 ":join_set_closing": join_set_closing,
1495 })?;
1496 if updated != 1 {
1497 return Err(DbErrorWrite::NotFound);
1498 }
1499 Ok(appending_version.increment())
1500 }
1501
1502 #[instrument(level = Level::DEBUG, skip_all, fields(%execution_id, %appending_version))]
1503 fn update_state_finished(
1504 tx: &Transaction,
1505 execution_id: &ExecutionId,
1506 appending_version: &Version,
1507 finished_at: DateTime<Utc>,
1509 result_kind: PendingStateFinishedResultKind,
1510 ) -> Result<(), DbErrorWrite> {
1511 debug!("Setting t_state to Finished");
1512 let execution_id_str = execution_id.to_string();
1513 let mut stmt = tx.prepare_cached(
1514 r"
1515 UPDATE t_state
1516 SET
1517 corresponding_version = :appending_version,
1518 pending_expires_finished = :pending_expires_finished,
1519 state = :state,
1520 updated_at = CURRENT_TIMESTAMP,
1521
1522 max_retries = NULL,
1523 retry_exp_backoff_millis = NULL,
1524 last_lock_version = NULL,
1525 executor_id = NULL,
1526 run_id = NULL,
1527
1528 join_set_id = NULL,
1529 join_set_closing = NULL,
1530
1531 result_kind = :result_kind
1532 WHERE execution_id = :execution_id
1533 ",
1534 )?;
1535
1536 let updated = stmt.execute(named_params! {
1537 ":execution_id": execution_id_str,
1538 ":appending_version": appending_version.0,
1539 ":pending_expires_finished": finished_at,
1540 ":state": STATE_FINISHED,
1541 ":result_kind": JsonWrapper(result_kind),
1542 })?;
1543 if updated != 1 {
1544 return Err(DbErrorWrite::NotFound);
1545 }
1546 Ok(())
1547 }
1548
1549 #[instrument(level = Level::DEBUG, skip_all, fields(%execution_id, %appending_version, %is_paused))]
1550 fn update_state_paused(
1551 tx: &Transaction,
1552 execution_id: &ExecutionId,
1553 appending_version: &Version,
1554 is_paused: bool,
1555 ) -> Result<AppendResponse, DbErrorWrite> {
1556 debug!(
1557 "Setting t_state to {}",
1558 if is_paused { "paused" } else { "unpaused" }
1559 );
1560 let execution_id_str = execution_id.to_string();
1561 let mut stmt = tx.prepare_cached(
1562 r"
1563 UPDATE t_state
1564 SET
1565 corresponding_version = :appending_version,
1566 is_paused = :is_paused,
1567 updated_at = CURRENT_TIMESTAMP
1568 WHERE execution_id = :execution_id
1569 ",
1570 )?;
1571
1572 let updated = stmt.execute(named_params! {
1573 ":execution_id": execution_id_str,
1574 ":appending_version": appending_version.0,
1575 ":is_paused": is_paused,
1576 })?;
1577 if updated != 1 {
1578 return Err(DbErrorWrite::NotFound);
1579 }
1580 Ok(appending_version.increment())
1581 }
1582
1583 #[instrument(level = Level::DEBUG, skip_all, fields(%execution_id, %appending_version))]
1585 fn bump_state_next_version(
1586 tx: &Transaction,
1587 execution_id: &ExecutionId,
1588 appending_version: &Version,
1589 delay_req: Option<DelayReq>,
1590 ) -> Result<AppendResponse , DbErrorWrite> {
1591 debug!("update_index_version");
1592 let execution_id_str = execution_id.to_string();
1593 let mut stmt = tx.prepare_cached(
1594 r"
1595 UPDATE t_state
1596 SET
1597 corresponding_version = :appending_version,
1598 updated_at = CURRENT_TIMESTAMP
1599 WHERE execution_id = :execution_id
1600 ",
1601 )?;
1602 let updated = stmt.execute(named_params! {
1603 ":execution_id": execution_id_str,
1604 ":appending_version": appending_version.0,
1605 })?;
1606 if updated != 1 {
1607 return Err(DbErrorWrite::NotFound);
1608 }
1609 if let Some(DelayReq {
1610 join_set_id,
1611 delay_id,
1612 expires_at,
1613 }) = delay_req
1614 {
1615 debug!("Inserting delay to `t_delay`");
1616 let mut stmt = tx.prepare_cached(
1617 "INSERT INTO t_delay (execution_id, join_set_id, delay_id, expires_at) \
1618 VALUES \
1619 (:execution_id, :join_set_id, :delay_id, :expires_at)",
1620 )?;
1621 stmt.execute(named_params! {
1622 ":execution_id": execution_id_str,
1623 ":join_set_id": join_set_id.to_string(),
1624 ":delay_id": delay_id.to_string(),
1625 ":expires_at": expires_at,
1626 })?;
1627 }
1628 Ok(appending_version.increment())
1629 }
1630
1631 fn get_combined_state(
1632 tx: &Transaction,
1633 execution_id: &ExecutionId,
1634 ) -> Result<CombinedState, DbErrorRead> {
1635 let mut stmt = tx.prepare(
1636 r"
1637 SELECT
1638 created_at, first_scheduled_at,
1639 state, ffqn, component_id_input_digest, component_type, deployment_id,
1640 corresponding_version, pending_expires_finished,
1641 last_lock_version, executor_id, run_id,
1642 join_set_id, join_set_closing,
1643 result_kind, is_paused
1644 FROM t_state
1645 WHERE
1646 execution_id = :execution_id
1647 ",
1648 )?;
1649 stmt.query_row(
1650 named_params! {
1651 ":execution_id": execution_id.to_string(),
1652 },
1653 |row| {
1654 CombinedState::new(
1655 CombinedStateDTO {
1656 execution_id: execution_id.clone(),
1657 created_at: row.get("created_at")?,
1658 first_scheduled_at: row.get("first_scheduled_at")?,
1659 component_digest: row.get("component_id_input_digest")?,
1660 component_type: row.get("component_type")?,
1661 deployment_id: row.get("deployment_id")?,
1662 state: row.get("state")?,
1663 ffqn: row.get("ffqn")?,
1664 pending_expires_finished: row
1665 .get::<_, DateTime<Utc>>("pending_expires_finished")?,
1666 last_lock_version: row
1667 .get::<_, Option<VersionType>>("last_lock_version")?
1668 .map(Version::new),
1669 executor_id: row.get::<_, Option<ExecutorId>>("executor_id")?,
1670 run_id: row.get::<_, Option<RunId>>("run_id")?,
1671 join_set_id: row.get::<_, Option<JoinSetId>>("join_set_id")?,
1672 join_set_closing: row.get::<_, Option<bool>>("join_set_closing")?,
1673 result_kind: row
1674 .get::<_, Option<JsonWrapper<PendingStateFinishedResultKind>>>(
1675 "result_kind",
1676 )?
1677 .map(|wrapper| wrapper.0),
1678 is_paused: row.get("is_paused")?,
1679 },
1680 Version::new(row.get("corresponding_version")?),
1681 )
1682 .map_err(|e| from_generic_error(&e))
1683 },
1684 )
1685 .map_err(DbErrorRead::from)
1686 }
1687
1688 fn list_executions(
1689 read_tx: &Transaction,
1690 filter: &ListExecutionsFilter,
1691 pagination: &ExecutionListPagination,
1692 ) -> Result<Vec<ExecutionWithState>, RusqliteError> {
1693 #[derive(Debug)]
1694 struct StatementModifier<'a> {
1695 where_vec: Vec<String>,
1696 params: Vec<(&'static str, ToSqlOutput<'a>)>,
1697 limit: u32,
1698 limit_desc: bool,
1699 }
1700
1701 fn paginate<'a, T: Clone + rusqlite::ToSql + 'static>(
1702 pagination: &'a Pagination<Option<T>>,
1703 column: &str,
1704 filter: &ListExecutionsFilter,
1705 ) -> Result<StatementModifier<'a>, RusqliteError> {
1706 let mut where_vec: Vec<String> = vec![];
1707 let mut params: Vec<(&'static str, ToSqlOutput<'a>)> = vec![];
1708 let limit = pagination.length();
1709 let limit_desc = pagination.is_desc();
1710 match pagination {
1711 Pagination::NewerThan {
1712 cursor: Some(cursor),
1713 ..
1714 }
1715 | Pagination::OlderThan {
1716 cursor: Some(cursor),
1717 ..
1718 } => {
1719 where_vec.push(format!("{column} {rel} :cursor", rel = pagination.rel()));
1720 let cursor = cursor.to_sql().map_err(|err| {
1721 error!("Possible program error - cannot convert cursor to sql - {err:?}");
1722 RusqliteError::Generic {
1723 reason: "cannot convert cursor to sql".into(),
1724 context: SpanTrace::capture(),
1725 source: Some(Arc::new(err)),
1726 loc: Location::caller(),
1727 }
1728 })?;
1729 params.push((":cursor", cursor));
1730 }
1731 _ => {}
1732 }
1733 if !filter.show_derived {
1734 where_vec.push("is_top_level=true".to_string());
1735 }
1736 Ok(StatementModifier {
1737 where_vec,
1738 params,
1739 limit: u32::from(limit),
1740 limit_desc,
1741 })
1742 }
1743
1744 let mut statement_mod = match pagination {
1745 ExecutionListPagination::CreatedBy(pagination) => {
1746 paginate(pagination, "created_at", filter)?
1747 }
1748 ExecutionListPagination::ExecutionId(pagination) => {
1749 paginate(pagination, "execution_id", filter)?
1750 }
1751 };
1752 let like = |str| format!("{str}%");
1753
1754 let ffqn_temporary;
1755 if let Some(ffqn_prefix) = &filter.ffqn_prefix {
1756 statement_mod.where_vec.push("ffqn LIKE :ffqn".to_string());
1757 ffqn_temporary = like(ffqn_prefix);
1758 let ffqn = ffqn_temporary
1759 .to_sql()
1760 .expect("string conversion never fails");
1761 statement_mod.params.push((":ffqn", ffqn));
1762 }
1763
1764 if filter.hide_finished {
1765 statement_mod
1766 .where_vec
1767 .push(format!("state != '{STATE_FINISHED}'"));
1768 }
1769 let prefix_temporary;
1770 if let Some(prefix) = &filter.execution_id_prefix {
1771 statement_mod
1772 .where_vec
1773 .push("execution_id LIKE :prefix".to_string());
1774 prefix_temporary = like(prefix);
1775 statement_mod.params.push((
1776 ":prefix",
1777 prefix_temporary
1778 .to_sql()
1779 .expect("string conversion never fails"),
1780 ));
1781 }
1782
1783 let component_digest_temporary;
1784 if let Some(componnet_digest) = &filter.component_digest {
1785 statement_mod
1786 .where_vec
1787 .push("component_id_input_digest = :component_digest".to_string());
1788 component_digest_temporary = componnet_digest.clone();
1789 let component_digest_sql = component_digest_temporary
1790 .to_sql()
1791 .expect("InputContentDigest conversion never fails");
1792 statement_mod
1793 .params
1794 .push((":component_digest", component_digest_sql));
1795 }
1796
1797 let deployment_id_temporary;
1798 if let Some(deployment_id) = filter.deployment_id {
1799 statement_mod
1800 .where_vec
1801 .push("deployment_id = :deployment_id".to_string());
1802 deployment_id_temporary = deployment_id;
1803 let deployment_id = deployment_id_temporary
1804 .to_sql()
1805 .expect("DeploymentId conversion never fails");
1806 statement_mod.params.push((":deployment_id", deployment_id));
1807 }
1808
1809 let where_str = if statement_mod.where_vec.is_empty() {
1810 String::new()
1811 } else {
1812 format!("WHERE {}", statement_mod.where_vec.join(" AND "))
1813 };
1814
1815 let (inner_order, outer_order) = if statement_mod.limit_desc {
1818 ("DESC", "")
1819 } else {
1820 ("", "DESC")
1821 };
1822
1823 let inner_sql = format!(
1824 r"SELECT created_at, first_scheduled_at, component_id_input_digest, component_type, deployment_id,
1825 state, execution_id, ffqn, corresponding_version, pending_expires_finished,
1826 last_lock_version, executor_id, run_id,
1827 join_set_id, join_set_closing,
1828 result_kind, is_paused
1829 FROM t_state {where_str} ORDER BY created_at {inner_order} LIMIT {limit}",
1830 limit = statement_mod.limit,
1831 );
1832
1833 let sql = if outer_order.is_empty() {
1834 inner_sql
1835 } else {
1836 format!("SELECT * FROM ({inner_sql}) AS sub ORDER BY created_at {outer_order}")
1837 };
1838 let vec: Vec<_> = read_tx
1839 .prepare(&sql)?
1840 .query_map::<_, &[(&'static str, ToSqlOutput)], _>(
1841 statement_mod
1842 .params
1843 .into_iter()
1844 .collect::<Vec<_>>()
1845 .as_ref(),
1846 |row| {
1847 let combined_state = CombinedState::new(
1848 CombinedStateDTO {
1849 execution_id: row.get("execution_id")?,
1850 created_at: row.get("created_at")?,
1851 first_scheduled_at: row.get("first_scheduled_at")?,
1852 component_digest: row.get("component_id_input_digest")?,
1853 component_type: row.get("component_type")?,
1854 deployment_id: row.get("deployment_id")?,
1855 state: row.get("state")?,
1856 ffqn: row.get("ffqn")?,
1857 pending_expires_finished: row.get("pending_expires_finished")?,
1858 executor_id: row.get::<_, Option<ExecutorId>>("executor_id")?,
1859
1860 last_lock_version: row
1861 .get::<_, Option<VersionType>>("last_lock_version")?
1862 .map(Version::new),
1863 run_id: row.get::<_, Option<RunId>>("run_id")?,
1864 join_set_id: row.get::<_, Option<JoinSetId>>("join_set_id")?,
1865 join_set_closing: row.get::<_, Option<bool>>("join_set_closing")?,
1866 result_kind: row
1867 .get::<_, Option<JsonWrapper<PendingStateFinishedResultKind>>>(
1868 "result_kind",
1869 )?
1870 .map(|wrapper| wrapper.0),
1871 is_paused: row.get("is_paused")?,
1872 },
1873 Version::new(row.get("corresponding_version")?),
1874 )
1875 .map_err(|e| from_generic_error(&e))?;
1876 Ok(combined_state.execution_with_state)
1877 },
1878 )?
1879 .collect::<Vec<Result<_, _>>>()
1880 .into_iter()
1881 .filter_map(|row| match row {
1882 Ok(row) => Some(row),
1883 Err(err) => {
1884 warn!("Skipping row - {err:?}");
1885 None
1886 }
1887 })
1888 .collect();
1889
1890 Ok(vec)
1891 }
1892
1893 fn list_responses(
1894 tx: &Transaction,
1895 execution_id: &ExecutionId,
1896 pagination: Option<Pagination<u32>>,
1897 ) -> Result<Vec<ResponseWithCursor>, DbErrorRead> {
1898 let mut params: Vec<(&'static str, Box<dyn rusqlite::ToSql>)> = vec![];
1900 let mut sql = "SELECT \
1901 r.id, r.created_at, r.join_set_id, r.delay_id, r.delay_success, r.child_execution_id, r.finished_version, l.json_value \
1902 FROM t_join_set_response r LEFT OUTER JOIN t_execution_log l ON r.child_execution_id = l.execution_id \
1903 WHERE \
1904 r.execution_id = :execution_id \
1905 AND ( r.finished_version = l.version OR r.child_execution_id IS NULL ) \
1906 "
1907 .to_string();
1908 let limit = match &pagination {
1909 Some(
1910 pagination @ (Pagination::NewerThan { cursor, .. }
1911 | Pagination::OlderThan { cursor, .. }),
1912 ) => {
1913 params.push((":cursor", Box::new(cursor)));
1914 write!(sql, " AND r.id {rel} :cursor", rel = pagination.rel()).unwrap();
1915 Some(pagination.length())
1916 }
1917 None => None,
1918 };
1919 sql.push_str(" ORDER BY id");
1920 let is_desc = pagination.as_ref().is_some_and(Pagination::is_desc);
1921 if is_desc {
1922 sql.push_str(" DESC");
1923 }
1924 if let Some(limit) = limit {
1925 write!(sql, " LIMIT {limit}").unwrap();
1926 }
1927 if is_desc {
1929 sql = format!("SELECT * FROM ({sql}) ORDER BY id ASC");
1930 }
1931 params.push((":execution_id", Box::new(execution_id.to_string())));
1932 tx.prepare(&sql)?
1933 .query_map::<_, &[(&'static str, &dyn ToSql)], _>(
1934 params
1935 .iter()
1936 .map(|(key, value)| (*key, value.as_ref()))
1937 .collect::<Vec<_>>()
1938 .as_ref(),
1939 Self::parse_response_with_cursor,
1940 )?
1941 .collect::<Result<Vec<_>, rusqlite::Error>>()
1942 .map_err(DbErrorRead::from)
1943 }
1944
1945 fn parse_response_with_cursor(
1946 row: &rusqlite::Row<'_>,
1947 ) -> Result<ResponseWithCursor, rusqlite::Error> {
1948 let id = row.get("id")?;
1949 let created_at: DateTime<Utc> = row.get("created_at")?;
1950 let join_set_id = row.get::<_, JoinSetId>("join_set_id")?;
1951 let event = match (
1952 row.get::<_, Option<DelayId>>("delay_id")?,
1953 row.get::<_, Option<bool>>("delay_success")?,
1954 row.get::<_, Option<ExecutionIdDerived>>("child_execution_id")?,
1955 row.get::<_, Option<VersionType>>("finished_version")?,
1956 row.get::<_, Option<JsonWrapper<ExecutionRequest>>>("json_value")?,
1957 ) {
1958 (Some(delay_id), Some(delay_success), None, None, None) => {
1959 JoinSetResponse::DelayFinished {
1960 delay_id,
1961 result: delay_success.then_some(()).ok_or(()),
1962 }
1963 }
1964 (
1965 None,
1966 None,
1967 Some(child_execution_id),
1968 Some(finished_version),
1969 Some(JsonWrapper(ExecutionRequest::Finished { result, .. })),
1970 ) => JoinSetResponse::ChildExecutionFinished {
1971 child_execution_id,
1972 finished_version: Version(finished_version),
1973 result,
1974 },
1975 (delay, delay_success, child, finished, result) => {
1976 error!(
1977 "Invalid row in t_join_set_response {id} - {delay:?} {delay_success:?} {child:?} {finished:?} {:?}",
1978 result.map(|it| it.0)
1979 );
1980 return Err(consistency_rusqlite("invalid row in t_join_set_response"));
1981 }
1982 };
1983 Ok(ResponseWithCursor {
1984 cursor: ResponseCursor(id),
1985 event: JoinSetResponseEventOuter {
1986 event: JoinSetResponseEvent { join_set_id, event },
1987 created_at,
1988 },
1989 })
1990 }
1991
1992 #[instrument(level = Level::TRACE, skip(tx))]
1993 #[expect(clippy::too_many_arguments)]
1994 fn lock_single_execution(
1995 tx: &Transaction,
1996 created_at: DateTime<Utc>,
1997 component_id: &ComponentId,
1998 deployment_id: DeploymentId,
1999 execution_id: &ExecutionId,
2000 run_id: RunId,
2001 appending_version: &Version,
2002 executor_id: ExecutorId,
2003 lock_expires_at: DateTime<Utc>,
2004 retry_config: ComponentRetryConfig,
2005 ) -> Result<LockedExecution, DbErrorWrite> {
2006 debug!("lock_single_execution");
2007 let combined_state = Self::get_combined_state(tx, execution_id)?;
2008 combined_state
2009 .execution_with_state
2010 .pending_state
2011 .can_append_lock(created_at, executor_id, run_id, lock_expires_at)?;
2012 let expected_version = combined_state.get_next_version_assert_not_finished();
2013 Self::check_expected_next_and_appending_version(&expected_version, appending_version)?;
2014
2015 let locked_event = Locked {
2017 component_id: component_id.clone(),
2018 deployment_id,
2019 executor_id,
2020 lock_expires_at,
2021 run_id,
2022 retry_config,
2023 };
2024 let event = ExecutionRequest::Locked(locked_event.clone());
2025 let event_ser = serde_json::to_string(&event).map_err(|err| {
2026 warn!("Cannot serialize {event:?} - {err:?}");
2027 DbErrorWriteNonRetriable::ValidationFailed("parameter serialization error".into())
2028 })?;
2029 let mut stmt = tx
2030 .prepare_cached(
2031 "INSERT INTO t_execution_log \
2032 (execution_id, created_at, json_value, version, variant) \
2033 VALUES \
2034 (:execution_id, :created_at, :json_value, :version, :variant)",
2035 )
2036 .map_err(|err| DbErrorGeneric::Uncategorized {
2037 reason: err.to_string().into(),
2038 context: SpanTrace::capture(),
2039 source: Some(Arc::new(err)),
2040 loc: Location::caller(),
2041 })?;
2042 stmt.execute(named_params! {
2043 ":execution_id": execution_id.to_string(),
2044 ":created_at": created_at,
2045 ":json_value": event_ser,
2046 ":version": appending_version.0,
2047 ":variant": event.variant(),
2048 })
2049 .map_err(|err| {
2050 DbErrorWrite::NonRetriable(DbErrorWriteNonRetriable::IllegalState {
2051 reason: "cannot lock".into(),
2052 context: SpanTrace::capture(),
2053 source: Some(Arc::new(err)),
2054 loc: Location::caller(),
2055 })
2056 })?;
2057
2058 let responses = Self::list_responses(tx, execution_id, None)?;
2059 trace!("Responses: {responses:?}");
2060
2061 let intermittent_event_count = Self::update_state_locked_get_intermittent_event_count(
2063 tx,
2064 execution_id,
2065 deployment_id,
2066 &component_id.input_digest,
2067 executor_id,
2068 run_id,
2069 lock_expires_at,
2070 appending_version,
2071 retry_config,
2072 )?;
2073 let mut events = tx
2075 .prepare(
2076 "SELECT json_value, version FROM t_execution_log WHERE \
2077 execution_id = :execution_id AND (variant = :variant1 OR variant = :variant2) \
2078 ORDER BY version",
2079 )?
2080 .query_map(
2081 named_params! {
2082 ":execution_id": execution_id.to_string(),
2083 ":variant1": DUMMY_CREATED.variant(),
2084 ":variant2": DUMMY_HISTORY_EVENT.variant(),
2085 },
2086 |row| {
2087 let created_at_fake = DateTime::from_timestamp_nanos(0); let event = row
2089 .get::<_, JsonWrapper<ExecutionRequest>>("json_value")
2090 .map_err(|serde| {
2091 error!("Cannot deserialize {row:?} - {serde:?}");
2092 consistency_rusqlite("cannot deserialize event")
2093 })?
2094 .0;
2095 let version = Version(row.get("version")?);
2096
2097 Ok(ExecutionEvent {
2098 created_at: created_at_fake,
2099 event,
2100 backtrace_id: None,
2101 version,
2102 })
2103 },
2104 )?
2105 .collect::<Result<Vec<_>, _>>()?
2106 .into_iter()
2107 .collect::<VecDeque<_>>();
2108 let Some(ExecutionRequest::Created {
2109 ffqn,
2110 params,
2111 parent,
2112 metadata,
2113 ..
2114 }) = events.pop_front().map(|outer| outer.event)
2115 else {
2116 return Err(consistency_db_err("execution log must contain `Created` event").into());
2117 };
2118
2119 let event_history = events
2120 .into_iter()
2121 .map(|ExecutionEvent { event, version, .. }| {
2122 if let ExecutionRequest::HistoryEvent { event } = event {
2123 Ok((event, version))
2124 } else {
2125 Err(consistency_db_err(
2126 "rows can only contain `Created` and `HistoryEvent` event kinds",
2127 ))
2128 }
2129 })
2130 .collect::<Result<Vec<_>, _>>()?;
2131
2132 Ok(LockedExecution {
2133 execution_id: execution_id.clone(),
2134 metadata,
2135 next_version: appending_version.increment(),
2136 ffqn,
2137 params,
2138 event_history,
2139 responses,
2140 parent,
2141 intermittent_event_count,
2142 locked_event,
2143 })
2144 }
2145
2146 fn count_join_next(
2147 tx: &Transaction,
2148 execution_id: &ExecutionId,
2149 join_set_id: &JoinSetId,
2150 ) -> Result<u32, DbErrorRead> {
2151 let mut stmt = tx.prepare(
2152 "SELECT COUNT(*) as count FROM t_execution_log WHERE execution_id = :execution_id AND join_set_id = :join_set_id \
2153 AND history_event_type = :join_next",
2154 )?;
2155 Ok(stmt.query_row(
2156 named_params! {
2157 ":execution_id": execution_id.to_string(),
2158 ":join_set_id": join_set_id.to_string(),
2159 ":join_next": HISTORY_EVENT_TYPE_JOIN_NEXT,
2160 },
2161 |row| row.get::<_, u32>("count"),
2162 )?)
2163 }
2164
2165 fn nth_response(
2166 tx: &Transaction,
2167 execution_id: &ExecutionId,
2168 join_set_id: &JoinSetId,
2169 skip_rows: u32,
2170 ) -> Result<Option<ResponseWithCursor>, DbErrorRead> {
2171 tx
2173 .prepare(
2174 "SELECT r.id, r.created_at, r.join_set_id, \
2175 r.delay_id, r.delay_success, \
2176 r.child_execution_id, r.finished_version, l.json_value \
2177 FROM t_join_set_response r LEFT OUTER JOIN t_execution_log l ON r.child_execution_id = l.execution_id \
2178 WHERE \
2179 r.execution_id = :execution_id AND r.join_set_id = :join_set_id AND \
2180 (
2181 r.finished_version = l.version \
2182 OR \
2183 r.child_execution_id IS NULL \
2184 ) \
2185 ORDER BY id \
2186 LIMIT 1 OFFSET :offset",
2187 )
2188 ?
2189 .query_row(
2190 named_params! {
2191 ":execution_id": execution_id.to_string(),
2192 ":join_set_id": join_set_id.to_string(),
2193 ":offset": skip_rows,
2194 },
2195 Self::parse_response_with_cursor,
2196 )
2197 .optional()
2198 .map_err(DbErrorRead::from)
2199 }
2200
2201 #[instrument(level = Level::TRACE, skip_all, fields(%execution_id, %appending_version))]
2202 #[expect(clippy::needless_return)]
2203 fn append(
2204 tx: &Transaction,
2205 execution_id: &ExecutionId,
2206 req: AppendRequest,
2207 appending_version: Version,
2208 ) -> Result<(AppendResponse, AppendNotifier), DbErrorWrite> {
2209 if matches!(req.event, ExecutionRequest::Created { .. }) {
2210 return Err(DbErrorWrite::NonRetriable(
2211 DbErrorWriteNonRetriable::ValidationFailed(
2212 "cannot append `Created` event - use `create` instead".into(),
2213 ),
2214 ));
2215 }
2216 if let AppendRequest {
2217 event:
2218 ExecutionRequest::Locked(Locked {
2219 component_id,
2220 deployment_id,
2221 executor_id,
2222 run_id,
2223 lock_expires_at,
2224 retry_config,
2225 }),
2226 created_at,
2227 } = req
2228 {
2229 return Self::lock_single_execution(
2230 tx,
2231 created_at,
2232 &component_id,
2233 deployment_id,
2234 execution_id,
2235 run_id,
2236 &appending_version,
2237 executor_id,
2238 lock_expires_at,
2239 retry_config,
2240 )
2241 .map(|locked_execution| (locked_execution.next_version, AppendNotifier::default()));
2242 }
2243
2244 let combined_state = Self::get_combined_state(tx, execution_id)?;
2245 if combined_state
2246 .execution_with_state
2247 .pending_state
2248 .is_finished()
2249 {
2250 debug!("Execution is already finished");
2251 return Err(DbErrorWrite::NonRetriable(
2252 DbErrorWriteNonRetriable::IllegalState {
2253 reason: "already finished".into(),
2254 context: SpanTrace::capture(),
2255 source: None,
2256 loc: Location::caller(),
2257 },
2258 ));
2259 }
2260
2261 Self::check_expected_next_and_appending_version(
2262 &combined_state.get_next_version_assert_not_finished(),
2263 &appending_version,
2264 )?;
2265 let event_ser = serde_json::to_string(&req.event).map_err(|err| {
2266 error!("Cannot serialize {:?} - {err:?}", req.event);
2267 DbErrorWriteNonRetriable::ValidationFailed("parameter serialization error".into())
2268 })?;
2269
2270 let mut stmt = tx.prepare(
2271 "INSERT INTO t_execution_log (execution_id, created_at, json_value, version, variant, join_set_id) \
2272 VALUES (:execution_id, :created_at, :json_value, :version, :variant, :join_set_id)")
2273 ?;
2274 stmt.execute(named_params! {
2275 ":execution_id": execution_id.to_string(),
2276 ":created_at": req.created_at,
2277 ":json_value": event_ser,
2278 ":version": appending_version.0,
2279 ":variant": req.event.variant(),
2280 ":join_set_id": req.event.join_set_id().map(std::string::ToString::to_string),
2281 })?;
2282 match &req.event {
2285 ExecutionRequest::Created { .. } => {
2286 unreachable!("handled in the caller")
2287 }
2288
2289 ExecutionRequest::Locked { .. } => {
2290 unreachable!("handled above")
2291 }
2292
2293 ExecutionRequest::TemporarilyFailed {
2294 backoff_expires_at, ..
2295 }
2296 | ExecutionRequest::TemporarilyTimedOut {
2297 backoff_expires_at, ..
2298 } => {
2299 let (next_version, notifier) = Self::update_state_pending_after_event_appended(
2300 tx,
2301 execution_id,
2302 &appending_version,
2303 *backoff_expires_at,
2304 true, combined_state.execution_with_state.component_digest,
2306 )?;
2307 return Ok((next_version, notifier));
2308 }
2309
2310 ExecutionRequest::Unlocked {
2311 backoff_expires_at, ..
2312 } => {
2313 let (next_version, notifier) = Self::update_state_pending_after_event_appended(
2314 tx,
2315 execution_id,
2316 &appending_version,
2317 *backoff_expires_at,
2318 false, combined_state.execution_with_state.component_digest,
2320 )?;
2321 return Ok((next_version, notifier));
2322 }
2323
2324 ExecutionRequest::Paused => {
2325 match &combined_state.execution_with_state.pending_state {
2326 PendingState::Finished { .. } => {
2327 unreachable!("handled above");
2328 }
2329 PendingState::Paused(..) => {
2330 return Err(DbErrorWriteNonRetriable::IllegalState {
2331 reason: "cannot pause, execution is already paused".into(),
2332 context: SpanTrace::capture(),
2333 source: None,
2334 loc: Location::caller(),
2335 }
2336 .into());
2337 }
2338 _ => {}
2339 }
2340 let next_version =
2341 Self::update_state_paused(tx, execution_id, &appending_version, true)?;
2342 return Ok((next_version, AppendNotifier::default()));
2343 }
2344
2345 ExecutionRequest::Unpaused => {
2346 if !combined_state
2347 .execution_with_state
2348 .pending_state
2349 .is_paused()
2350 {
2351 return Err(DbErrorWriteNonRetriable::IllegalState {
2352 reason: "cannot unpause, execution is not paused".into(),
2353 context: SpanTrace::capture(),
2354 source: None,
2355 loc: Location::caller(),
2356 }
2357 .into());
2358 }
2359 let next_version =
2360 Self::update_state_paused(tx, execution_id, &appending_version, false)?;
2361 return Ok((next_version, AppendNotifier::default()));
2362 }
2363
2364 ExecutionRequest::Finished { result, .. } => {
2365 Self::update_state_finished(
2366 tx,
2367 execution_id,
2368 &appending_version,
2369 req.created_at,
2370 PendingStateFinishedResultKind::from(result),
2371 )?;
2372 return Ok((
2373 appending_version,
2374 AppendNotifier {
2375 pending_at: None,
2376 execution_finished: Some(NotifierExecutionFinished {
2377 execution_id: execution_id.clone(),
2378 retval: result.clone(),
2379 }),
2380 response: None,
2381 },
2382 ));
2383 }
2384
2385 ExecutionRequest::HistoryEvent {
2386 event:
2387 HistoryEvent::JoinSetCreate { .. }
2388 | HistoryEvent::JoinSetRequest {
2389 request: JoinSetRequest::ChildExecutionRequest { .. },
2390 ..
2391 }
2392 | HistoryEvent::Persist { .. }
2393 | HistoryEvent::Schedule { .. }
2394 | HistoryEvent::Stub { .. }
2395 | HistoryEvent::JoinNextTooMany { .. }
2396 | HistoryEvent::JoinNextTry { .. },
2397 } => {
2398 return Ok((
2399 Self::bump_state_next_version(tx, execution_id, &appending_version, None)?,
2400 AppendNotifier::default(),
2401 ));
2402 }
2403
2404 ExecutionRequest::HistoryEvent {
2405 event:
2406 HistoryEvent::JoinSetRequest {
2407 join_set_id,
2408 request:
2409 JoinSetRequest::DelayRequest {
2410 delay_id,
2411 expires_at,
2412 ..
2413 },
2414 },
2415 } => {
2416 return Ok((
2417 Self::bump_state_next_version(
2418 tx,
2419 execution_id,
2420 &appending_version,
2421 Some(DelayReq {
2422 join_set_id: join_set_id.clone(),
2423 delay_id: delay_id.clone(),
2424 expires_at: *expires_at,
2425 }),
2426 )?,
2427 AppendNotifier::default(),
2428 ));
2429 }
2430
2431 ExecutionRequest::HistoryEvent {
2432 event:
2433 HistoryEvent::JoinNext {
2434 join_set_id,
2435 run_expires_at,
2436 closing,
2437 requested_ffqn: _,
2438 },
2439 } => {
2440 let join_next_count = Self::count_join_next(tx, execution_id, join_set_id)?;
2442 let nth_response =
2443 Self::nth_response(tx, execution_id, join_set_id, join_next_count - 1)?; trace!("join_next_count: {join_next_count}, nth_response: {nth_response:?}");
2445 assert!(join_next_count > 0);
2446 if let Some(ResponseWithCursor {
2447 event:
2448 JoinSetResponseEventOuter {
2449 created_at: nth_created_at,
2450 ..
2451 },
2452 cursor: _,
2453 }) = nth_response
2454 {
2455 let scheduled_at = max(*run_expires_at, nth_created_at); let (next_version, notifier) = Self::update_state_pending_after_event_appended(
2457 tx,
2458 execution_id,
2459 &appending_version,
2460 scheduled_at,
2461 false, combined_state.execution_with_state.component_digest,
2463 )?;
2464 return Ok((next_version, notifier));
2465 }
2466 return Ok((
2467 Self::update_state_blocked(
2468 tx,
2469 execution_id,
2470 &appending_version,
2471 join_set_id,
2472 *run_expires_at,
2473 *closing,
2474 )?,
2475 AppendNotifier::default(),
2476 ));
2477 }
2478 }
2479 }
2480
2481 fn append_response(
2482 tx: &Transaction,
2483 execution_id: &ExecutionId,
2484 event: JoinSetResponseEventOuter,
2485 ) -> Result<AppendNotifier, DbErrorWrite> {
2486 let mut stmt = tx.prepare(
2487 "INSERT INTO t_join_set_response (execution_id, created_at, join_set_id, delay_id, delay_success, child_execution_id, finished_version) \
2488 VALUES (:execution_id, :created_at, :join_set_id, :delay_id, :delay_success, :child_execution_id, :finished_version)",
2489 )?;
2490 let join_set_id = &event.event.join_set_id;
2491 let (delay_id, delay_success) = match &event.event.event {
2492 JoinSetResponse::DelayFinished { delay_id, result } => {
2493 (Some(delay_id.to_string()), Some(result.is_ok()))
2494 }
2495 JoinSetResponse::ChildExecutionFinished { .. } => (None, None),
2496 };
2497 let (child_execution_id, finished_version) = match &event.event.event {
2498 JoinSetResponse::ChildExecutionFinished {
2499 child_execution_id,
2500 finished_version,
2501 result: _,
2502 } => (
2503 Some(child_execution_id.to_string()),
2504 Some(finished_version.0),
2505 ),
2506 JoinSetResponse::DelayFinished { .. } => (None, None),
2507 };
2508
2509 stmt.execute(named_params! {
2510 ":execution_id": execution_id.to_string(),
2511 ":created_at": event.created_at,
2512 ":join_set_id": join_set_id.to_string(),
2513 ":delay_id": delay_id,
2514 ":delay_success": delay_success,
2515 ":child_execution_id": child_execution_id,
2516 ":finished_version": finished_version,
2517 })?;
2518 let cursor = ResponseCursor(
2519 u32::try_from(tx.last_insert_rowid())
2520 .map_err(|_| consistency_db_err("t_join_set_response.id must not be negative"))?,
2521 );
2522
2523 let combined_state = Self::get_combined_state(tx, execution_id)?;
2525 debug!("previous_pending_state: {combined_state:?}");
2526 let mut notifier = if let PendingStateMergedPause::BlockedByJoinSet {
2527 state:
2528 PendingStateBlockedByJoinSet {
2529 join_set_id: found_join_set_id,
2530 lock_expires_at, closing: _,
2532 },
2533 paused: _,
2534 } =
2535 PendingStateMergedPause::from(combined_state.execution_with_state.pending_state)
2536 && *join_set_id == found_join_set_id
2537 {
2538 let scheduled_at = max(lock_expires_at, event.created_at);
2541 Self::update_state_pending_after_response_appended(
2544 tx,
2545 execution_id,
2546 scheduled_at,
2547 combined_state.execution_with_state.component_digest,
2548 )?
2549 } else {
2550 AppendNotifier::default()
2551 };
2552 if let JoinSetResponseEvent {
2553 join_set_id,
2554 event:
2555 JoinSetResponse::DelayFinished {
2556 delay_id,
2557 result: _,
2558 },
2559 } = &event.event
2560 {
2561 debug!(%join_set_id, %delay_id, "Deleting from `t_delay`");
2562 let mut stmt =
2563 tx.prepare_cached("DELETE FROM t_delay WHERE execution_id = :execution_id AND join_set_id = :join_set_id AND delay_id = :delay_id")
2564 ?;
2565 stmt.execute(named_params! {
2566 ":execution_id": execution_id.to_string(),
2567 ":join_set_id": join_set_id.to_string(),
2568 ":delay_id": delay_id.to_string(),
2569 })?;
2570 }
2571 notifier.response = Some((execution_id.clone(), ResponseWithCursor { cursor, event }));
2572 Ok(notifier)
2573 }
2574
2575 fn append_backtrace(
2576 tx: &Transaction,
2577 backtrace_info: &BacktraceInfo,
2578 ) -> Result<(), DbErrorWrite> {
2579 let backtrace_hash = backtrace_info.wasm_backtrace.hash();
2580
2581 tx.prepare("INSERT OR IGNORE INTO t_wasm_backtrace (backtrace_hash, wasm_backtrace) VALUES (:backtrace_hash, :wasm_backtrace)")?
2582 .execute(named_params! {
2583 ":backtrace_hash": backtrace_hash,
2584 ":wasm_backtrace": JsonWrapper(&backtrace_info.wasm_backtrace)
2585 })?;
2586
2587 tx.prepare(
2588 "INSERT INTO t_execution_backtrace (execution_id, component_id, version_min_including, version_max_excluding, backtrace_hash) \
2589 VALUES (:execution_id, :component_id, :version_min_including, :version_max_excluding, :backtrace_hash)",
2590 )?
2591 .execute(named_params! {
2592 ":execution_id": backtrace_info.execution_id.to_string(),
2593 ":component_id": JsonWrapper(&backtrace_info.component_id),
2594 ":version_min_including": backtrace_info.version_min_including.0,
2595 ":version_max_excluding": backtrace_info.version_max_excluding.0,
2596 ":backtrace_hash": backtrace_hash,
2597 })?;
2598
2599 Ok(())
2600 }
2601
2602 fn append_log(tx: &Transaction, row: &LogInfoAppendRow) -> Result<(), DbErrorWrite> {
2603 let mut stmt = tx.prepare(
2604 "INSERT INTO t_log (
2605 execution_id,
2606 run_id,
2607 created_at,
2608 level,
2609 message,
2610 stream_type,
2611 payload
2612 ) VALUES (
2613 :execution_id,
2614 :run_id,
2615 :created_at,
2616 :level,
2617 :message,
2618 :stream_type,
2619 :payload
2620 )",
2621 )?;
2622
2623 match &row.log_entry {
2624 LogEntry::Log {
2625 created_at,
2626 level,
2627 message,
2628 } => {
2629 stmt.execute(named_params! {
2630 ":execution_id": row.execution_id,
2631 ":run_id": row.run_id,
2632 ":created_at": created_at,
2633 ":level": *level as u8,
2634 ":message": message,
2635 ":stream_type": Option::<u8>::None,
2636 ":payload": Option::<Vec<u8>>::None,
2637 })?;
2638 }
2639 LogEntry::Stream {
2640 created_at,
2641 payload,
2642 stream_type,
2643 } => {
2644 stmt.execute(named_params! {
2645 ":execution_id": row.execution_id,
2646 ":run_id": row.run_id,
2647 ":created_at": created_at,
2648 ":level": Option::<u8>::None,
2649 ":message": Option::<String>::None,
2650 ":stream_type": *stream_type as u8,
2651 ":payload": payload,
2652 })?;
2653 }
2654 }
2655
2656 Ok(())
2657 }
2658
2659 fn get(
2660 tx: &Transaction,
2661 execution_id: &ExecutionId,
2662 ) -> Result<concepts::storage::ExecutionLog, DbErrorRead> {
2663 let mut stmt = tx.prepare(
2664 "SELECT created_at, json_value, version FROM t_execution_log WHERE \
2665 execution_id = :execution_id ORDER BY version",
2666 )?;
2667 let events = stmt
2668 .query_map(
2669 named_params! {
2670 ":execution_id": execution_id.to_string(),
2671 },
2672 |row| {
2673 let created_at = row.get("created_at")?;
2674 let event = row
2675 .get::<_, JsonWrapper<ExecutionRequest>>("json_value")
2676 .map_err(|serde| {
2677 error!("Cannot deserialize {row:?} - {serde:?}");
2678 consistency_rusqlite("cannot deserialize event")
2679 })?
2680 .0;
2681 let version = Version(row.get("version")?);
2682
2683 Ok(ExecutionEvent {
2684 created_at,
2685 event,
2686 backtrace_id: None,
2687 version,
2688 })
2689 },
2690 )?
2691 .collect::<Result<Vec<_>, _>>()?;
2692 if events.is_empty() {
2693 return Err(DbErrorRead::NotFound);
2694 }
2695 let combined_state = Self::get_combined_state(tx, execution_id)?;
2696 let responses = Self::list_responses(tx, execution_id, None)?;
2697 Ok(concepts::storage::ExecutionLog {
2698 execution_id: execution_id.clone(),
2699 events,
2700 responses,
2701 next_version: combined_state.get_next_version_or_finished(), pending_state: combined_state.execution_with_state.pending_state,
2703 component_digest: combined_state.execution_with_state.component_digest,
2704 component_type: combined_state.execution_with_state.component_type,
2705 deployment_id: combined_state.execution_with_state.deployment_id,
2706 })
2707 }
2708
2709 fn get_max_version(
2710 tx: &Transaction,
2711 execution_id: &ExecutionId,
2712 ) -> Result<Version, DbErrorRead> {
2713 tx.prepare("SELECT MAX(version) FROM t_execution_log WHERE execution_id = :execution_id")?
2714 .query_row(
2715 named_params! { ":execution_id": execution_id.to_string() },
2716 |row| row.get::<_, Option<VersionType>>(0),
2717 )
2718 .map(|v| v.map(Version::new).ok_or(DbErrorRead::NotFound))
2719 .map_err(DbErrorRead::from)
2720 .flatten()
2721 }
2722
2723 fn get_max_response_cursor(
2724 tx: &Transaction,
2725 execution_id: &ExecutionId,
2726 ) -> Result<ResponseCursor, DbErrorRead> {
2727 let max_cursor = tx
2728 .prepare("SELECT MAX(id) FROM t_join_set_response WHERE execution_id = :execution_id")?
2729 .query_row(
2730 named_params! { ":execution_id": execution_id.to_string() },
2731 |row| row.get::<_, Option<u32>>(0),
2732 )?;
2733 let max_cursor = max_cursor.unwrap_or_default();
2735 Ok(ResponseCursor(max_cursor))
2736 }
2737
2738 fn list_execution_events(
2739 tx: &Transaction,
2740 execution_id: &ExecutionId,
2741 pagination: Pagination<VersionType>,
2742 include_backtrace_id: bool,
2743 ) -> Result<Vec<ExecutionEvent>, DbErrorRead> {
2744 let mut params: Vec<(&'static str, Box<dyn rusqlite::ToSql>)> = vec![];
2745 params.push((":execution_id", Box::new(execution_id.to_string())));
2746
2747 let (cursor, length, rel, is_desc) = match &pagination {
2748 Pagination::NewerThan {
2749 cursor,
2750 length,
2751 including_cursor,
2752 } => (
2753 *cursor,
2754 *length,
2755 if *including_cursor { ">=" } else { ">" },
2756 false,
2757 ),
2758 Pagination::OlderThan {
2759 cursor,
2760 length,
2761 including_cursor,
2762 } => (
2763 *cursor,
2764 *length,
2765 if *including_cursor { "<=" } else { "<" },
2766 true,
2767 ),
2768 };
2769 params.push((":cursor", Box::new(cursor)));
2770
2771 let base_select = if include_backtrace_id {
2772 format!(
2773 "SELECT
2774 log.created_at,
2775 log.json_value,
2776 log.version as version,
2777 bt.version_min_including AS backtrace_id
2778 FROM
2779 t_execution_log AS log
2780 LEFT OUTER JOIN
2781 t_execution_backtrace AS bt ON log.execution_id = bt.execution_id
2782 AND log.version >= bt.version_min_including
2783 AND log.version < bt.version_max_excluding
2784 WHERE
2785 log.execution_id = :execution_id
2786 AND log.version {rel} :cursor"
2787 )
2788 } else {
2789 format!(
2790 "SELECT
2791 created_at, json_value, NULL as backtrace_id, version
2792 FROM t_execution_log WHERE
2793 execution_id = :execution_id AND version {rel} :cursor"
2794 )
2795 };
2796
2797 let order = if is_desc { "DESC" } else { "ASC" };
2798 let mut sql = format!("{base_select} ORDER BY version {order} LIMIT {length}");
2799
2800 if is_desc {
2802 sql = format!("SELECT * FROM ({sql}) ORDER BY version ASC");
2803 }
2804
2805 tx.prepare(&sql)?
2806 .query_map::<_, &[(&'static str, &dyn ToSql)], _>(
2807 params
2808 .iter()
2809 .map(|(key, value)| (*key, value.as_ref()))
2810 .collect::<Vec<_>>()
2811 .as_ref(),
2812 |row| {
2813 let created_at = row.get("created_at")?;
2814 let backtrace_id = row
2815 .get::<_, Option<VersionType>>("backtrace_id")?
2816 .map(Version::new);
2817 let version = Version(row.get("version")?);
2818
2819 let event = row
2820 .get::<_, JsonWrapper<ExecutionRequest>>("json_value")
2821 .map(|event| ExecutionEvent {
2822 created_at,
2823 event: event.0,
2824 backtrace_id,
2825 version,
2826 })
2827 .map_err(|serde| {
2828 error!("Cannot deserialize {row:?} - {serde:?}");
2829 consistency_rusqlite("cannot deserialize")
2830 })?;
2831 Ok(event)
2832 },
2833 )?
2834 .collect::<Result<Vec<_>, _>>()
2835 .map_err(DbErrorRead::from)
2836 }
2837
2838 fn map_t_execution_log_row(row: &Row<'_>) -> Result<ExecutionEvent, rusqlite::Error> {
2839 let created_at = row.get("created_at")?;
2840 let event = row
2841 .get::<_, JsonWrapper<ExecutionRequest>>("json_value")
2842 .map_err(|serde| {
2843 error!("Cannot deserialize {row:?} - {serde:?}");
2844 consistency_rusqlite("cannot deserialize event")
2845 })?;
2846 let version = Version(row.get("version")?);
2847
2848 Ok(ExecutionEvent {
2849 created_at,
2850 event: event.0,
2851 backtrace_id: None,
2852 version,
2853 })
2854 }
2855
2856 fn get_execution_event(
2857 tx: &Transaction,
2858 execution_id: &ExecutionId,
2859 version: VersionType,
2860 ) -> Result<ExecutionEvent, DbErrorRead> {
2861 tx.prepare(
2862 "SELECT created_at, json_value, version FROM t_execution_log WHERE \
2863 execution_id = :execution_id AND version = :version",
2864 )?
2865 .query_row(
2866 named_params! {
2867 ":execution_id": execution_id.to_string(),
2868 ":version": version,
2869 },
2870 SqlitePool::map_t_execution_log_row,
2871 )
2872 .map_err(DbErrorRead::from)
2873 }
2874
2875 fn get_last_execution_event(
2876 tx: &Transaction,
2877 execution_id: &ExecutionId,
2878 ) -> Result<ExecutionEvent, DbErrorRead> {
2879 tx.prepare(
2880 "SELECT created_at, json_value, version FROM t_execution_log WHERE \
2881 execution_id = :execution_id ORDER BY version DESC LIMIT 1",
2882 )?
2883 .query_row(
2884 named_params! {
2885 ":execution_id": execution_id.to_string(),
2886 },
2887 SqlitePool::map_t_execution_log_row,
2888 )
2889 .map_err(DbErrorRead::from)
2890 }
2891
2892 fn get_delay_response(
2893 tx: &Transaction,
2894 execution_id: &ExecutionId,
2895 delay_id: &DelayId,
2896 ) -> Result<Option<bool>, DbErrorRead> {
2897 tx.prepare(
2899 "SELECT delay_success \
2900 FROM t_join_set_response \
2901 WHERE \
2902 execution_id = :execution_id AND delay_id = :delay_id
2903 ",
2904 )?
2905 .query_row(
2906 named_params! {
2907 ":execution_id": execution_id.to_string(),
2908 ":delay_id": delay_id.to_string(),
2909 },
2910 |row| {
2911 let delay_success = row.get::<_, bool>("delay_success")?;
2912 Ok(delay_success)
2913 },
2914 )
2915 .optional()
2916 .map_err(DbErrorRead::from)
2917 }
2918
2919 #[instrument(level = Level::TRACE, skip_all)]
2920 fn get_responses_after(
2922 tx: &Transaction,
2923 execution_id: &ExecutionId,
2924 last_response: ResponseCursor,
2925 ) -> Result<Vec<ResponseWithCursor>, DbErrorRead> {
2926 tx.prepare(
2928 "SELECT r.id, r.created_at, r.join_set_id, \
2929 r.delay_id, r.delay_success, \
2930 r.child_execution_id, r.finished_version, child.json_value \
2931 FROM t_join_set_response r LEFT OUTER JOIN t_execution_log child ON r.child_execution_id = child.execution_id \
2932 WHERE \
2933 r.id > :last_response_id AND \
2934 r.execution_id = :execution_id AND \
2935 ( \
2936 r.finished_version = child.version \
2937 OR r.child_execution_id IS NULL \
2938 ) \
2939 ORDER BY id",
2940 )
2941 ?
2942 .query_map(
2943 named_params! {
2944 ":last_response_id": last_response.0,
2945 ":execution_id": execution_id.to_string(),
2946 },
2947 Self::parse_response_with_cursor,
2948 )
2949 ?
2950 .collect::<Result<Vec<_>, _>>()
2951 .map_err(DbErrorRead::from)
2952 }
2953
2954 fn get_pending_of_single_ffqn(
2955 mut stmt: CachedStatement,
2956 batch_size: u32,
2957 pending_at_or_sooner: DateTime<Utc>,
2958 ffqn: &FunctionFqn,
2959 ) -> Result<Vec<(ExecutionId, Version)>, ()> {
2960 stmt.query_map(
2961 named_params! {
2962 ":pending_expires_finished": pending_at_or_sooner,
2963 ":ffqn": ffqn.to_string(),
2964 ":batch_size": batch_size,
2965 },
2966 |row| {
2967 let execution_id = row.get::<_, ExecutionId>("execution_id")?;
2968 let next_version =
2969 Version::new(row.get::<_, VersionType>("corresponding_version")?).increment();
2970 Ok((execution_id, next_version))
2971 },
2972 )
2973 .map_err(|err| {
2974 warn!("Ignoring consistency error {err:?}");
2975 })?
2976 .collect::<Result<Vec<_>, _>>()
2977 .map_err(|err| {
2978 warn!("Ignoring consistency error {err:?}");
2979 })
2980 }
2981
2982 fn get_pending_by_ffqns(
2984 conn: &Connection,
2985 batch_size: u32,
2986 pending_at_or_sooner: DateTime<Utc>,
2987 ffqns: &[FunctionFqn],
2988 ) -> Result<Vec<(ExecutionId, Version)>, RusqliteError> {
2989 let batch_size = usize::try_from(batch_size).expect("16 bit systems are unsupported");
2990 let mut execution_ids_versions = Vec::with_capacity(batch_size);
2991 for ffqn in ffqns {
2992 let stmt = conn.prepare_cached(&format!(
2994 r#"
2995 SELECT execution_id, corresponding_version FROM t_state WHERE
2996 state = "{STATE_PENDING_AT}" AND
2997 pending_expires_finished <= :pending_expires_finished AND ffqn = :ffqn
2998 AND is_paused = false
2999 ORDER BY pending_expires_finished LIMIT :batch_size
3000 "#
3001 ))?;
3002
3003 if let Ok(execs_and_versions) = Self::get_pending_of_single_ffqn(
3004 stmt,
3005 u32::try_from(batch_size - execution_ids_versions.len())
3006 .expect("u32 - anything must fit to u32"),
3007 pending_at_or_sooner,
3008 ffqn,
3009 ) {
3010 execution_ids_versions.extend(execs_and_versions);
3011 if execution_ids_versions.len() == batch_size {
3012 break;
3014 }
3015 }
3017 }
3018 Ok(execution_ids_versions)
3019 }
3020
3021 fn get_pending_by_component_input_digest(
3022 conn: &Connection,
3023 batch_size: u32,
3024 pending_at_or_sooner: DateTime<Utc>,
3025 input_digest: &InputContentDigest,
3026 ) -> Result<Vec<(ExecutionId, Version)>, RusqliteError> {
3027 let mut stmt = conn.prepare_cached(&format!(
3028 r#"
3029 SELECT execution_id, corresponding_version FROM t_state WHERE
3030 state = "{STATE_PENDING_AT}" AND
3031 pending_expires_finished <= :pending_expires_finished AND
3032 component_id_input_digest = :component_id_input_digest
3033 AND is_paused = false
3034 ORDER BY pending_expires_finished LIMIT :batch_size
3035 "#
3036 ))?;
3037
3038 stmt.query_map(
3039 named_params! {
3040 ":pending_expires_finished": pending_at_or_sooner,
3041 ":component_id_input_digest": input_digest,
3042 ":batch_size": batch_size,
3043 },
3044 |row| {
3045 let execution_id = row.get::<_, ExecutionId>("execution_id")?;
3046 let next_version =
3047 Version::new(row.get::<_, VersionType>("corresponding_version")?).increment();
3048 Ok((execution_id, next_version))
3049 },
3050 )?
3051 .collect::<Result<Vec<_>, _>>()
3052 .map_err(RusqliteError::from)
3053 }
3054
3055 #[instrument(level = Level::TRACE, skip_all)]
3057 fn notify_all(&self, notifiers: Vec<AppendNotifier>, current_time: DateTime<Utc>) {
3058 let (pending_ats, finished_execs, responses) = {
3059 let (mut pending_ats, mut finished_execs, mut responses) =
3060 (Vec::new(), Vec::new(), Vec::new());
3061 for notifier in notifiers {
3062 if let Some(pending_at) = notifier.pending_at {
3063 pending_ats.push(pending_at);
3064 }
3065 if let Some(finished) = notifier.execution_finished {
3066 finished_execs.push(finished);
3067 }
3068 if let Some(response) = notifier.response {
3069 responses.push(response);
3070 }
3071 }
3072 (pending_ats, finished_execs, responses)
3073 };
3074
3075 if !pending_ats.is_empty() {
3077 let guard = self.0.pending_subscribers.lock().unwrap();
3078 for pending_at in pending_ats {
3079 Self::notify_pending_locked(&pending_at, current_time, &guard);
3080 }
3081 }
3082 if !finished_execs.is_empty() {
3085 let mut guard = self.0.execution_finished_subscribers.lock().unwrap();
3086 for finished in finished_execs {
3087 if let Some(listeners_of_exe_id) = guard.remove(&finished.execution_id) {
3088 for (_tag, sender) in listeners_of_exe_id {
3089 let _ = sender.send(finished.retval.clone());
3092 }
3093 }
3094 }
3095 }
3096 if !responses.is_empty() {
3098 let mut guard = self.0.response_subscribers.lock().unwrap();
3099 for (execution_id, response) in responses {
3100 if let Some((sender, _)) = guard.remove(&execution_id) {
3101 let _ = sender.send(response);
3102 }
3103 }
3104 }
3105 }
3106
3107 fn notify_pending_locked(
3108 notifier: &NotifierPendingAt,
3109 current_time: DateTime<Utc>,
3110 ffqn_to_pending_subscription: &std::sync::MutexGuard<PendingFfqnSubscribersHolder>,
3111 ) {
3112 if notifier.scheduled_at <= current_time {
3114 ffqn_to_pending_subscription.notify(notifier);
3115 }
3116 }
3117
3118 fn upgrade_execution_component_single_write(
3119 tx: &Transaction,
3120 execution_id: &ExecutionId,
3121 old: &InputContentDigest,
3122 new: &InputContentDigest,
3123 ) -> Result<(), DbErrorWrite> {
3124 debug!("Updating t_state to component {new}");
3125 let mut stmt = tx.prepare_cached(
3126 r"
3127 UPDATE t_state
3128 SET
3129 updated_at = CURRENT_TIMESTAMP,
3130 component_id_input_digest = :new
3131 WHERE
3132 execution_id = :execution_id AND
3133 component_id_input_digest = :old
3134 ",
3135 )?;
3136 let updated = stmt.execute(named_params! {
3137 ":execution_id": execution_id,
3138 ":old": old,
3139 ":new": new,
3140 })?;
3141 if updated != 1 {
3142 return Err(DbErrorWrite::NotFound);
3143 }
3144 Ok(())
3145 }
3146
3147 fn list_logs_tx(
3148 tx: &Transaction,
3149 execution_id: &ExecutionId,
3150 filter: &LogFilter,
3151 pagination: &Pagination<u32>,
3152 ) -> Result<ListLogsResponse, DbErrorRead> {
3153 let mut query = String::from(
3154 "SELECT id, run_id, created_at, level, message, stream_type, payload
3155 FROM t_log
3156 WHERE execution_id = :execution_id",
3157 );
3158
3159 let length = pagination.length();
3160 let params = vec![
3161 (":execution_id", &execution_id as &dyn rusqlite::ToSql),
3162 (":cursor", pagination.cursor() as &dyn rusqlite::ToSql),
3163 (":length", &length as &dyn rusqlite::ToSql),
3164 ];
3165
3166 let level_filter = if filter.should_show_logs() {
3168 let levels_str = if !filter.levels().is_empty() {
3169 filter
3170 .levels()
3171 .iter()
3172 .map(|lvl| (*lvl as u8).to_string())
3173 .collect::<Vec<_>>()
3174 .join(",")
3175 } else {
3176 LogLevel::iter()
3177 .map(|lvl| (lvl as u8).to_string())
3178 .collect::<Vec<_>>()
3179 .join(",")
3180 };
3181 Some(format!(" level IN ({levels_str})"))
3182 } else {
3183 None
3184 };
3185 let stream_filter = if filter.should_show_streams() {
3186 let streams_str = if !filter.stream_types().is_empty() {
3187 filter
3188 .stream_types()
3189 .iter()
3190 .map(|st| (*st as u8).to_string())
3191 .collect::<Vec<_>>()
3192 .join(",")
3193 } else {
3194 LogStreamType::iter()
3195 .map(|st| (st as u8).to_string())
3196 .collect::<Vec<_>>()
3197 .join(",")
3198 };
3199 Some(format!(" stream_type IN ({streams_str})"))
3200 } else {
3201 None
3202 };
3203 match (level_filter, stream_filter) {
3204 (Some(level_filter), Some(stream_filter)) => {
3205 write!(&mut query, " AND ({level_filter} OR {stream_filter})")
3206 .expect("writing to string");
3207 }
3208 (Some(level_filter), None) => {
3209 write!(&mut query, " AND {level_filter}").expect("writing to string");
3210 }
3211 (None, Some(stream_filter)) => {
3212 write!(&mut query, " AND {stream_filter}").expect("writing to string");
3213 }
3214 (None, None) => unreachable!("guarded by constructor"),
3215 }
3216
3217 write!(&mut query, " AND id {} :cursor", pagination.rel()).expect("writing to string");
3219
3220 query.push_str(" ORDER BY id ");
3222 query.push_str(pagination.asc_or_desc());
3223
3224 query.push_str(" LIMIT :length");
3226
3227 let mut stmt = tx.prepare(&query)?;
3228
3229 let items = stmt
3230 .query_map(params.as_slice(), |row| {
3231 let cursor = row.get("id")?;
3232 let created_at: DateTime<Utc> = row.get("created_at")?;
3233 let run_id = row.get("run_id")?;
3234 let level: Option<u8> = row.get("level")?;
3235 let message: Option<String> = row.get("message")?;
3236 let stream_type: Option<u8> = row.get("stream_type")?;
3237 let payload: Option<Vec<u8>> = row.get("payload")?;
3238
3239 let log_entry = match (level, message, stream_type, payload) {
3240 (Some(lvl), Some(msg), None, None) => LogEntry::Log {
3241 created_at,
3242 level: LogLevel::try_from(lvl).map_err(|_| {
3243 consistency_rusqlite(format!(
3244 "cannot convert {lvl} to LogLevel , id: {cursor}"
3245 ))
3246 })?,
3247 message: msg,
3248 },
3249 (None, None, Some(stype), Some(pl)) => LogEntry::Stream {
3250 created_at,
3251 stream_type: LogStreamType::try_from(stype).map_err(|_| {
3252 consistency_rusqlite(format!(
3253 "cannot convert {stype} to LogStreamType , id: {cursor}"
3254 ))
3255 })?,
3256 payload: pl,
3257 },
3258 _ => {
3259 return Err(consistency_rusqlite(format!(
3260 "invalid t_log row id:{cursor}"
3261 )));
3262 }
3263 };
3264 Ok(LogEntryRow {
3265 cursor,
3266 run_id,
3267 log_entry,
3268 })
3269 })?
3270 .collect::<Result<Vec<_>, _>>()?;
3271
3272 Ok(ListLogsResponse {
3273 next_page: items
3274 .last()
3275 .map(|item| Pagination::NewerThan {
3276 length: pagination.length(),
3277 cursor: item.cursor,
3278 including_cursor: false,
3279 })
3280 .unwrap_or({
3281 if pagination.is_asc() {
3282 *pagination } else {
3284 Pagination::NewerThan {
3286 length: pagination.length(),
3287 cursor: 0,
3288 including_cursor: false, }
3290 }
3291 }),
3292 prev_page: match items.first() {
3293 Some(item) => Some(Pagination::OlderThan {
3294 length: pagination.length(),
3295 cursor: item.cursor,
3296 including_cursor: false,
3297 }),
3298 None if pagination.is_asc() && *pagination.cursor() > 0 => {
3299 Some(pagination.invert())
3301 }
3302 None => None,
3303 },
3304 items,
3305 })
3306 }
3307
3308 fn list_deployment_states(
3309 tx: &Transaction,
3310 current_time: DateTime<Utc>,
3311 pagination: Pagination<Option<DeploymentId>>,
3312 ) -> Result<Vec<DeploymentState>, DbErrorRead> {
3313 let mut params: Vec<(&'static str, Box<dyn ToSql>)> = vec![];
3314 let mut sql = format!(
3315 r"
3316 SELECT
3317 deployment_id,
3318 SUM(state = '{STATE_LOCKED}') AS locked,
3319 SUM(state = '{STATE_PENDING_AT}' AND pending_expires_finished <= :now) AS pending,
3320 SUM(state = '{STATE_PENDING_AT}' AND pending_expires_finished > :now) AS scheduled,
3321 SUM(state = '{STATE_BLOCKED_BY_JOIN_SET}') AS blocked,
3322 SUM(state = '{STATE_FINISHED}') AS finished
3323 FROM t_state"
3324 );
3325
3326 params.push((":now", Box::new(current_time)));
3327
3328 if let Some(cursor) = pagination.cursor() {
3329 params.push((":cursor", Box::new(*cursor)));
3330 write!(
3331 sql,
3332 " WHERE deployment_id {rel} :cursor",
3333 rel = pagination.rel()
3334 )
3335 .expect("writing to string");
3336 }
3337
3338 let (inner_order, outer_order) = if pagination.is_desc() {
3341 ("DESC", "")
3342 } else {
3343 ("ASC", "DESC")
3344 };
3345
3346 write!(
3347 sql,
3348 " GROUP BY deployment_id ORDER BY deployment_id {inner_order} LIMIT {limit}",
3349 limit = pagination.length()
3350 )
3351 .expect("writing to string");
3352
3353 let final_sql = if outer_order.is_empty() {
3354 sql
3355 } else {
3356 format!("SELECT * FROM ({sql}) AS sub ORDER BY deployment_id {outer_order}")
3357 };
3358
3359 let result: Vec<DeploymentState> = tx
3360 .prepare(&final_sql)?
3361 .query_map::<_, &[(&'static str, &dyn ToSql)], _>(
3362 params
3363 .iter()
3364 .map(|(k, v)| (*k, v.as_ref()))
3365 .collect::<Vec<_>>()
3366 .as_ref(),
3367 |row| {
3368 Ok(DeploymentState {
3369 deployment_id: row.get("deployment_id")?,
3370 locked: row.get("locked")?,
3371 pending: row.get("pending")?,
3372 scheduled: row.get("scheduled")?,
3373 blocked: row.get("blocked")?,
3374 finished: row.get("finished")?,
3375 })
3376 },
3377 )?
3378 .collect::<Result<Vec<_>, rusqlite::Error>>()
3379 .map_err(DbErrorRead::from)?;
3380
3381 Ok(result)
3382 }
3383
3384 fn pause_execution(
3385 tx: &Transaction,
3386 execution_id: &ExecutionId,
3387 paused_at: DateTime<Utc>,
3388 ) -> Result<Version, DbErrorWrite> {
3389 let combined_state = Self::get_combined_state(tx, execution_id)?;
3390 let appending_version = combined_state.get_next_version_fail_if_finished()?;
3391 debug!("Pausing with {appending_version}");
3392 let (next_version, _) = Self::append(
3393 tx,
3394 execution_id,
3395 AppendRequest {
3396 created_at: paused_at,
3397 event: ExecutionRequest::Paused,
3398 },
3399 appending_version,
3400 )?;
3401 Ok(next_version)
3402 }
3403
3404 fn unpause_execution(
3405 tx: &Transaction,
3406 execution_id: &ExecutionId,
3407 paused_at: DateTime<Utc>,
3408 ) -> Result<Version, DbErrorWrite> {
3409 let combined_state = Self::get_combined_state(tx, execution_id)?;
3410 let appending_version = combined_state.get_next_version_fail_if_finished()?;
3411 debug!("Unpausing with {appending_version}");
3412 let (next_version, _) = Self::append(
3413 tx,
3414 execution_id,
3415 AppendRequest {
3416 created_at: paused_at,
3417 event: ExecutionRequest::Unpaused,
3418 },
3419 appending_version,
3420 )?;
3421 Ok(next_version)
3422 }
3423}
3424
3425#[async_trait]
3426impl DbExecutor for SqlitePool {
3427 #[instrument(level = Level::TRACE, skip(self))]
3428 async fn lock_pending_by_ffqns(
3429 &self,
3430 batch_size: u32,
3431 pending_at_or_sooner: DateTime<Utc>,
3432 ffqns: Arc<[FunctionFqn]>,
3433 created_at: DateTime<Utc>,
3434 component_id: ComponentId,
3435 deployment_id: DeploymentId,
3436 executor_id: ExecutorId,
3437 lock_expires_at: DateTime<Utc>,
3438 run_id: RunId,
3439 retry_config: ComponentRetryConfig,
3440 ) -> Result<LockPendingResponse, DbErrorWrite> {
3441 let execution_ids_versions = self
3442 .transaction(
3443 move |conn| {
3444 Self::get_pending_by_ffqns(conn, batch_size, pending_at_or_sooner, &ffqns)
3445 },
3446 TxType::Other, "lock_pending_by_ffqns_get",
3448 )
3449 .await
3450 .map_err(to_generic_error)?;
3451 if execution_ids_versions.is_empty() {
3452 Ok(vec![])
3453 } else {
3454 debug!("Locking {execution_ids_versions:?}");
3455 self.transaction(
3456 move |tx| {
3457 let mut locked_execs = Vec::with_capacity(execution_ids_versions.len());
3458 for (execution_id, version) in &execution_ids_versions {
3460 locked_execs.push(Self::lock_single_execution(
3461 tx,
3462 created_at,
3463 &component_id,
3464 deployment_id,
3465 execution_id,
3466 run_id,
3467 version,
3468 executor_id,
3469 lock_expires_at,
3470 retry_config,
3471 )?);
3472 }
3473 Ok::<_, DbErrorWrite>(locked_execs)
3474 },
3475 TxType::MultipleWrites,
3476 "lock_pending_by_ffqns_one",
3477 )
3478 .await
3479 }
3480 }
3481
3482 #[instrument(level = Level::TRACE, skip(self))]
3483 async fn lock_pending_by_component_digest(
3484 &self,
3485 batch_size: u32,
3486 pending_at_or_sooner: DateTime<Utc>,
3487 component_id: &ComponentId,
3488 deployment_id: DeploymentId,
3489 created_at: DateTime<Utc>,
3490 executor_id: ExecutorId,
3491 lock_expires_at: DateTime<Utc>,
3492 run_id: RunId,
3493 retry_config: ComponentRetryConfig,
3494 ) -> Result<LockPendingResponse, DbErrorWrite> {
3495 let component_id = component_id.clone();
3496 let execution_ids_versions = self
3497 .transaction(
3498 {
3499 let component_id = component_id.clone();
3500 move |conn| {
3501 Self::get_pending_by_component_input_digest(
3502 conn,
3503 batch_size,
3504 pending_at_or_sooner,
3505 &component_id.input_digest,
3506 )
3507 }
3508 },
3509 TxType::Other, "lock_pending_by_component_id_get",
3511 )
3512 .await
3513 .map_err(to_generic_error)?;
3514 if execution_ids_versions.is_empty() {
3515 Ok(vec![])
3516 } else {
3517 debug!("Locking {execution_ids_versions:?}");
3518 self.transaction(
3519 move |tx| {
3520 let mut locked_execs = Vec::with_capacity(execution_ids_versions.len());
3521 for (execution_id, version) in &execution_ids_versions {
3523 locked_execs.push(Self::lock_single_execution(
3524 tx,
3525 created_at,
3526 &component_id,
3527 deployment_id,
3528 execution_id,
3529 run_id,
3530 version,
3531 executor_id,
3532 lock_expires_at,
3533 retry_config,
3534 )?);
3535 }
3536 Ok::<_, DbErrorWrite>(locked_execs)
3537 },
3538 TxType::MultipleWrites,
3539 "lock_pending_by_component_id_one",
3540 )
3541 .await
3542 }
3543 }
3544
3545 #[cfg(feature = "test")]
3546 #[instrument(level = Level::DEBUG, skip(self))]
3547 async fn lock_one(
3548 &self,
3549 created_at: DateTime<Utc>,
3550 component_id: ComponentId,
3551 deployment_id: DeploymentId,
3552 execution_id: &ExecutionId,
3553 run_id: RunId,
3554 version: Version,
3555 executor_id: ExecutorId,
3556 lock_expires_at: DateTime<Utc>,
3557 retry_config: ComponentRetryConfig,
3558 ) -> Result<LockedExecution, DbErrorWrite> {
3559 debug!(%execution_id, "lock_one");
3560 let execution_id = execution_id.clone();
3561 self.transaction(
3562 move |tx| {
3563 Self::lock_single_execution(
3564 tx,
3565 created_at,
3566 &component_id,
3567 deployment_id,
3568 &execution_id,
3569 run_id,
3570 &version,
3571 executor_id,
3572 lock_expires_at,
3573 retry_config,
3574 )
3575 },
3576 TxType::MultipleWrites, "lock_inner",
3578 )
3579 .await
3580 }
3581
3582 #[instrument(level = Level::DEBUG, skip(self, req))]
3583 async fn append(
3584 &self,
3585 execution_id: ExecutionId,
3586 version: Version,
3587 req: AppendRequest,
3588 ) -> Result<AppendResponse, DbErrorWrite> {
3589 debug!(%req, "append");
3590 trace!(?req, "append");
3591 let created_at = req.created_at;
3592 let (version, notifier) = self
3593 .transaction(
3594 move |tx| Self::append(tx, &execution_id, req.clone(), version.clone()),
3595 TxType::MultipleWrites, "append",
3597 )
3598 .await?;
3599 self.notify_all(vec![notifier], created_at);
3600 Ok(version)
3601 }
3602
3603 #[instrument(level = Level::DEBUG, skip_all)]
3604 async fn append_batch_respond_to_parent(
3605 &self,
3606 events: AppendEventsToExecution,
3607 response: AppendResponseToExecution,
3608 current_time: DateTime<Utc>,
3609 ) -> Result<AppendBatchResponse, DbErrorWrite> {
3610 debug!("append_batch_respond_to_parent");
3611 if events.execution_id == response.parent_execution_id {
3612 return Err(DbErrorWrite::NonRetriable(
3615 DbErrorWriteNonRetriable::ValidationFailed(
3616 "Parameters `execution_id` and `parent_execution_id` cannot be the same".into(),
3617 ),
3618 ));
3619 }
3620 if events.batch.is_empty() {
3621 error!("Batch cannot be empty");
3622 return Err(DbErrorWrite::NonRetriable(
3623 DbErrorWriteNonRetriable::ValidationFailed("batch cannot be empty".into()),
3624 ));
3625 }
3626 let (version, notifiers) = {
3627 self.transaction(
3628 move |tx| {
3629 let mut version = events.version.clone();
3630 let mut notifier_of_child = None;
3631 for append_request in &events.batch {
3632 let (v, n) = Self::append(
3633 tx,
3634 &events.execution_id,
3635 append_request.clone(),
3636 version,
3637 )?;
3638 version = v;
3639 notifier_of_child = Some(n);
3640 }
3641
3642 let pending_at_parent = Self::append_response(
3643 tx,
3644 &response.parent_execution_id,
3645 JoinSetResponseEventOuter {
3646 created_at: response.created_at,
3647 event: JoinSetResponseEvent {
3648 join_set_id: response.join_set_id.clone(),
3649 event: JoinSetResponse::ChildExecutionFinished {
3650 child_execution_id: response.child_execution_id.clone(),
3651 finished_version: response.finished_version.clone(),
3652 result: response.result.clone(),
3653 },
3654 },
3655 },
3656 )?;
3657 Ok::<_, DbErrorWrite>((
3658 version,
3659 vec![
3660 notifier_of_child.expect("checked that the batch is not empty"),
3661 pending_at_parent,
3662 ],
3663 ))
3664 },
3665 TxType::MultipleWrites,
3666 "append_batch_respond_to_parent",
3667 )
3668 .await?
3669 };
3670 self.notify_all(notifiers, current_time);
3671 Ok(version)
3672 }
3673
3674 #[instrument(level = Level::TRACE, skip(self, timeout_fut))]
3677 async fn wait_for_pending_by_ffqn(
3678 &self,
3679 pending_at_or_sooner: DateTime<Utc>,
3680 ffqns: Arc<[FunctionFqn]>,
3681 timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
3682 ) {
3683 let unique_tag: u64 = rand::random();
3684 let (sender, mut receiver) = mpsc::channel(1); {
3686 let mut pending_subscribers = self.0.pending_subscribers.lock().unwrap();
3687 for ffqn in ffqns.as_ref() {
3688 pending_subscribers.insert_ffqn(ffqn.clone(), (sender.clone(), unique_tag));
3689 }
3690 }
3691 async {
3692 let Ok(execution_ids_versions) = self
3693 .transaction(
3694 {
3695 let ffqns = ffqns.clone();
3696 move |conn| Self::get_pending_by_ffqns(conn, 1, pending_at_or_sooner, ffqns.as_ref())
3697 },
3698 TxType::Other, "get_pending_by_ffqns",
3700 )
3701 .await
3702 else {
3703 trace!(
3704 "Ignoring get_pending error and waiting in for timeout to avoid executor repolling too soon"
3705 );
3706 timeout_fut.await;
3707 return;
3708 };
3709 if !execution_ids_versions.is_empty() {
3710 trace!("Not waiting, database already contains new pending executions");
3711 return;
3712 }
3713 tokio::select! { _ = receiver.recv() => {
3715 trace!("Received a notification");
3716 }
3717 () = timeout_fut => {
3718 }
3719 }
3720 }.await;
3721 {
3723 let mut pending_subscribers = self.0.pending_subscribers.lock().unwrap();
3724 for ffqn in ffqns.as_ref() {
3725 match pending_subscribers.remove_ffqn(ffqn) {
3726 Some((_, tag)) if tag == unique_tag => {
3727 }
3729 Some(other) => {
3730 pending_subscribers.insert_ffqn(ffqn.clone(), other);
3732 }
3733 None => {
3734 }
3736 }
3737 }
3738 }
3739 }
3740
3741 #[instrument(level = Level::DEBUG, skip(self, timeout_fut))]
3744 async fn wait_for_pending_by_component_digest(
3745 &self,
3746 pending_at_or_sooner: DateTime<Utc>,
3747 component_digest: &InputContentDigest,
3748 timeout_fut: Pin<Box<dyn Future<Output = ()> + Send>>,
3749 ) {
3750 let unique_tag: u64 = rand::random();
3751 let (sender, mut receiver) = mpsc::channel(1); {
3753 let mut pending_subscribers = self.0.pending_subscribers.lock().unwrap();
3754 pending_subscribers
3755 .insert_by_component(component_digest.clone(), (sender.clone(), unique_tag));
3756 }
3757 async {
3758 let Ok(execution_ids_versions) = self
3759 .transaction(
3760 {
3761 let input_digest = component_digest.clone();
3762 move |conn| Self::get_pending_by_component_input_digest(conn, 1, pending_at_or_sooner, &input_digest)
3763 },
3764 TxType::Other, "get_pending_by_component_input_digest",
3766 )
3767 .await
3768 else {
3769 trace!(
3770 "Ignoring get_pending error and waiting in for timeout to avoid executor repolling too soon"
3771 );
3772 timeout_fut.await;
3773 return;
3774 };
3775 if !execution_ids_versions.is_empty() {
3776 trace!("Not waiting, database already contains new pending executions");
3777 return;
3778 }
3779 tokio::select! { _ = receiver.recv() => {
3781 trace!("Received a notification");
3782 }
3783 () = timeout_fut => {
3784 }
3785 }
3786 }.await;
3787 {
3789 let mut pending_subscribers = self.0.pending_subscribers.lock().unwrap();
3790
3791 match pending_subscribers.remove_by_component(component_digest) {
3792 Some((_, tag)) if tag == unique_tag => {
3793 }
3795 Some(other) => {
3796 pending_subscribers.insert_by_component(component_digest.clone(), other);
3798 }
3799 None => {
3800 }
3802 }
3803 }
3804 }
3805
3806 async fn get_last_execution_event(
3807 &self,
3808 execution_id: &ExecutionId,
3809 ) -> Result<ExecutionEvent, DbErrorRead> {
3810 let execution_id = execution_id.clone();
3811 self.transaction(
3812 move |tx| Self::get_last_execution_event(tx, &execution_id),
3813 TxType::Other, "get_last_execution_event",
3815 )
3816 .await
3817 }
3818}
3819
3820#[async_trait]
3821impl DbExternalApi for SqlitePool {
3822 #[instrument(skip(self))]
3823 async fn get_backtrace(
3824 &self,
3825 execution_id: &ExecutionId,
3826 filter: BacktraceFilter,
3827 ) -> Result<BacktraceInfo, DbErrorRead> {
3828 debug!("get_backtrace");
3829 let execution_id = execution_id.clone();
3830
3831 self.transaction(
3832 move |tx| {
3833 let select = "SELECT component_id, version_min_including, version_max_excluding, wasm_backtrace FROM t_execution_backtrace e \
3834 INNER JOIN t_wasm_backtrace w ON e.backtrace_hash = w.backtrace_hash \
3835 WHERE execution_id = :execution_id";
3836 let mut params: Vec<(&'static str, Box<dyn rusqlite::ToSql>)> = vec![(":execution_id", Box::new(execution_id.to_string()))];
3837 let select = match &filter {
3838 BacktraceFilter::Specific(version) =>{
3839 params.push((":version", Box::new(version.0)));
3840 format!("{select} AND version_min_including <= :version AND version_max_excluding > :version")
3841 },
3842 BacktraceFilter::First => format!("{select} ORDER BY version_min_including LIMIT 1"),
3843 BacktraceFilter::Last => format!("{select} ORDER BY version_min_including DESC LIMIT 1")
3844 };
3845 tx
3846 .prepare(&select)
3847 ?
3848 .query_row::<_, &[(&'static str, &dyn ToSql)], _>(
3849 params
3850 .iter()
3851 .map(|(key, value)| (*key, value.as_ref()))
3852 .collect::<Vec<_>>()
3853 .as_ref(),
3854 |row| {
3855 Ok(BacktraceInfo {
3856 execution_id: execution_id.clone(),
3857 component_id: row.get::<_, JsonWrapper<_> >("component_id")?.0,
3858 version_min_including: Version::new(row.get::<_, VersionType>("version_min_including")?),
3859 version_max_excluding: Version::new(row.get::<_, VersionType>("version_max_excluding")?),
3860 wasm_backtrace: row.get::<_, JsonWrapper<_>>("wasm_backtrace")?.0,
3861 })
3862 },
3863 ).map_err(DbErrorRead::from)
3864 },
3865 TxType::Other, "get_last_backtrace",
3867 ).await
3868 }
3869
3870 #[instrument(skip(self))]
3871 async fn list_executions(
3872 &self,
3873 filter: ListExecutionsFilter,
3874 pagination: ExecutionListPagination,
3875 ) -> Result<Vec<ExecutionWithState>, DbErrorGeneric> {
3876 self.transaction(
3877 move |tx| Self::list_executions(tx, &filter, &pagination),
3878 TxType::Other, "list_executions",
3880 )
3881 .await
3882 .map_err(to_generic_error)
3883 }
3884
3885 #[instrument(skip(self))]
3886 async fn list_execution_events(
3887 &self,
3888 execution_id: &ExecutionId,
3889 pagination: Pagination<VersionType>,
3890 include_backtrace_id: bool,
3891 ) -> Result<ListExecutionEventsResponse, DbErrorRead> {
3892 let execution_id = execution_id.clone();
3893 self.transaction(
3894 move |tx| {
3895 let events = Self::list_execution_events(
3896 tx,
3897 &execution_id,
3898 pagination,
3899 include_backtrace_id,
3900 )?;
3901 let max_version = Self::get_max_version(tx, &execution_id)?;
3902 Ok(ListExecutionEventsResponse {
3903 events,
3904 max_version,
3905 })
3906 },
3907 TxType::Other, "get",
3909 )
3910 .await
3911 }
3912
3913 #[instrument(skip(self))]
3914 async fn list_responses(
3915 &self,
3916 execution_id: &ExecutionId,
3917 pagination: Pagination<u32>,
3918 ) -> Result<ListResponsesResponse, DbErrorRead> {
3919 let execution_id = execution_id.clone();
3920 self.transaction(
3921 move |tx| {
3922 let responses = Self::list_responses(tx, &execution_id, Some(pagination))?;
3923 let max_cursor = Self::get_max_response_cursor(tx, &execution_id)?;
3924 Ok(ListResponsesResponse {
3925 responses,
3926 max_cursor,
3927 })
3928 },
3929 TxType::Other, "list_responses",
3931 )
3932 .await
3933 }
3934
3935 #[instrument(skip(self))]
3936 async fn list_execution_events_responses(
3937 &self,
3938 execution_id: &ExecutionId,
3939 req_since: &Version,
3940 req_max_length: VersionType,
3941 req_include_backtrace_id: bool,
3942 resp_pagination: Pagination<u32>,
3943 ) -> Result<ExecutionWithStateRequestsResponses, DbErrorRead> {
3944 let execution_id = execution_id.clone();
3945 let req_since = req_since.0;
3946 self.transaction(
3947 move |tx| {
3948 let combined_state = Self::get_combined_state(tx, &execution_id)?;
3949 let events = Self::list_execution_events(
3950 tx,
3951 &execution_id,
3952 Pagination::NewerThan {
3953 length: req_max_length
3954 .try_into()
3955 .expect("req_max_length fits in u16"),
3956 cursor: req_since,
3957 including_cursor: true,
3958 },
3959 req_include_backtrace_id,
3960 )?;
3961 let responses = Self::list_responses(tx, &execution_id, Some(resp_pagination))?;
3962 let max_version = Self::get_max_version(tx, &execution_id)?;
3963 let max_cursor = Self::get_max_response_cursor(tx, &execution_id)?;
3964 Ok(ExecutionWithStateRequestsResponses {
3965 execution_with_state: combined_state.execution_with_state,
3966 events,
3967 responses,
3968 max_version,
3969 max_cursor,
3970 })
3971 },
3972 TxType::Other, "list_execution_events_responses",
3974 )
3975 .await
3976 }
3977
3978 #[instrument(skip(self))]
3979 async fn upgrade_execution_component(
3980 &self,
3981 execution_id: &ExecutionId,
3982 old: &InputContentDigest,
3983 new: &InputContentDigest,
3984 ) -> Result<(), DbErrorWrite> {
3985 let execution_id = execution_id.clone();
3986 let old = old.clone();
3987 let new = new.clone();
3988 self.transaction(
3989 move |tx| Self::upgrade_execution_component_single_write(tx, &execution_id, &old, &new),
3990 TxType::Other, "upgrade_execution_component",
3992 )
3993 .await
3994 }
3995
3996 #[instrument(skip(self))]
3997 async fn list_logs(
3998 &self,
3999 execution_id: &ExecutionId,
4000 filter: LogFilter,
4001 pagination: Pagination<u32>,
4002 ) -> Result<ListLogsResponse, DbErrorRead> {
4003 let execution_id = execution_id.clone();
4004 self.transaction(
4005 move |tx| Self::list_logs_tx(tx, &execution_id, &filter, &pagination),
4006 TxType::Other, "list_logs",
4008 )
4009 .await
4010 }
4011
4012 #[instrument(skip(self))]
4013 async fn list_deployment_states(
4014 &self,
4015 current_time: DateTime<Utc>,
4016 pagination: Pagination<Option<DeploymentId>>,
4017 ) -> Result<Vec<DeploymentState>, DbErrorRead> {
4018 self.transaction(
4019 move |tx| Self::list_deployment_states(tx, current_time, pagination),
4020 TxType::Other, "list_deployment_states",
4022 )
4023 .await
4024 }
4025
4026 #[instrument(skip(self))]
4027 async fn pause_execution(
4028 &self,
4029 execution_id: &ExecutionId,
4030 paused_at: DateTime<Utc>,
4031 ) -> Result<AppendResponse, DbErrorWrite> {
4032 let execution_id = execution_id.clone();
4033 self.transaction(
4034 move |tx| SqlitePool::pause_execution(tx, &execution_id, paused_at),
4035 TxType::MultipleWrites,
4036 "pause_execution",
4037 )
4038 .await
4039 }
4040
4041 #[instrument(skip(self))]
4042 async fn unpause_execution(
4043 &self,
4044 execution_id: &ExecutionId,
4045 unpaused_at: DateTime<Utc>,
4046 ) -> Result<AppendResponse, DbErrorWrite> {
4047 let execution_id = execution_id.clone();
4048 self.transaction(
4049 move |tx| SqlitePool::unpause_execution(tx, &execution_id, unpaused_at),
4050 TxType::MultipleWrites,
4051 "unpause_execution",
4052 )
4053 .await
4054 }
4055}
4056
4057#[async_trait]
4058impl DbConnection for SqlitePool {
4059 #[instrument(level = Level::DEBUG, skip_all, fields(execution_id = %req.execution_id))]
4060 async fn create(&self, req: CreateRequest) -> Result<AppendResponse, DbErrorWrite> {
4061 debug!("create");
4062 trace!(?req, "create");
4063 let created_at = req.created_at;
4064 let (version, notifier) = self
4065 .transaction(
4066 move |tx| Self::create_inner(tx, req.clone()),
4067 TxType::MultipleWrites,
4068 "create",
4069 )
4070 .await?;
4071 self.notify_all(vec![notifier], created_at);
4072 Ok(version)
4073 }
4074
4075 #[instrument(level = Level::DEBUG, skip(self))]
4076 async fn get(
4077 &self,
4078 execution_id: &ExecutionId,
4079 ) -> Result<concepts::storage::ExecutionLog, DbErrorRead> {
4080 trace!("get");
4081 let execution_id = execution_id.clone();
4082 self.transaction(
4083 move |tx| Self::get(tx, &execution_id),
4084 TxType::Other, "get",
4086 )
4087 .await
4088 }
4089
4090 #[instrument(level = Level::DEBUG, skip(self, batch))]
4091 async fn append_batch(
4092 &self,
4093 current_time: DateTime<Utc>,
4094 batch: Vec<AppendRequest>,
4095 execution_id: ExecutionId,
4096 version: Version,
4097 ) -> Result<AppendBatchResponse, DbErrorWrite> {
4098 debug!("append_batch");
4099 trace!(?batch, "append_batch");
4100 assert!(!batch.is_empty(), "Empty batch request");
4101
4102 let (version, notifier) = self
4103 .transaction(
4104 move |tx| {
4105 let mut version = version.clone();
4106 let mut notifier = None;
4107 for append_request in &batch {
4108 let (v, n) =
4109 Self::append(tx, &execution_id, append_request.clone(), version)?;
4110 version = v;
4111 notifier = Some(n);
4112 }
4113 Ok::<_, DbErrorWrite>((
4114 version,
4115 notifier.expect("checked that the batch is not empty"),
4116 ))
4117 },
4118 TxType::MultipleWrites,
4119 "append_batch",
4120 )
4121 .await?;
4122
4123 self.notify_all(vec![notifier], current_time);
4124 Ok(version)
4125 }
4126
4127 #[instrument(level = Level::DEBUG, skip_all, fields(%execution_id, %version))]
4128 async fn append_batch_create_new_execution(
4129 &self,
4130 current_time: DateTime<Utc>,
4131 batch: Vec<AppendRequest>,
4132 execution_id: ExecutionId,
4133 version: Version,
4134 child_req: Vec<CreateRequest>,
4135 backtraces: Vec<BacktraceInfo>,
4136 ) -> Result<AppendBatchResponse, DbErrorWrite> {
4137 debug!("append_batch_create_new_execution");
4138 trace!(?batch, ?child_req, "append_batch_create_new_execution");
4139 assert!(!batch.is_empty(), "Empty batch request");
4140
4141 let (version, notifiers) = self
4142 .transaction(
4143 move |tx| {
4144 let mut notifier = None;
4145 let mut version = version.clone();
4146 for append_request in &batch {
4147 let (v, n) =
4148 Self::append(tx, &execution_id, append_request.clone(), version)?;
4149 version = v;
4150 notifier = Some(n);
4151 }
4152 let mut notifiers = Vec::new();
4153 notifiers.push(notifier.expect("checked that the batch is not empty"));
4154
4155 for child_req in &child_req {
4156 let (_, notifier) = Self::create_inner(tx, child_req.clone())?;
4157 notifiers.push(notifier);
4158 }
4159 Ok::<_, DbErrorWrite>((version, notifiers))
4160 },
4161 TxType::MultipleWrites,
4162 "append_batch_create_new_execution_inner",
4163 )
4164 .await?;
4165 self.notify_all(notifiers, current_time);
4166 self.transaction_fire_forget(
4167 move |tx| {
4168 for backtrace in &backtraces {
4169 Self::append_backtrace(tx, backtrace)?;
4170 }
4171 Ok::<_, DbErrorWrite>(())
4172 },
4173 "append_batch_create_new_execution_append_backtrace",
4174 )
4175 .await;
4176 Ok(version)
4177 }
4178
4179 #[instrument(level = Level::DEBUG, skip(self, timeout_fut))]
4183 async fn subscribe_to_next_responses(
4184 &self,
4185 execution_id: &ExecutionId,
4186 last_response: ResponseCursor,
4187 timeout_fut: Pin<Box<dyn Future<Output = TimeoutOutcome> + Send>>,
4188 ) -> Result<Vec<ResponseWithCursor>, DbErrorReadWithTimeout> {
4189 debug!("next_responses");
4190 let unique_tag: u64 = rand::random();
4191 let execution_id = execution_id.clone();
4192
4193 let cleanup = || {
4194 let mut guard = self.0.response_subscribers.lock().unwrap();
4195 match guard.remove(&execution_id) {
4196 Some((_, tag)) if tag == unique_tag => {} Some(other) => {
4198 guard.insert(execution_id.clone(), other);
4200 }
4201 None => {} }
4203 };
4204
4205 let response_subscribers = self.0.response_subscribers.clone();
4206 let resp_or_receiver = {
4207 let execution_id = execution_id.clone();
4208 self.transaction(
4209 move |tx| {
4210 let responses = Self::get_responses_after(tx, &execution_id, last_response)?;
4211 if responses.is_empty() {
4212 let (sender, receiver) = oneshot::channel();
4214 response_subscribers
4215 .lock()
4216 .unwrap()
4217 .insert(execution_id.clone(), (sender, unique_tag));
4218 Ok::<_, DbErrorReadWithTimeout>(itertools::Either::Right(receiver))
4219 } else {
4220 Ok(itertools::Either::Left(responses))
4221 }
4222 },
4223 TxType::Other, "subscribe_to_next_responses",
4225 )
4226 .await
4227 }
4228 .inspect_err(|_| {
4229 cleanup();
4230 })?;
4231 match resp_or_receiver {
4232 itertools::Either::Left(resp) => Ok(resp), itertools::Either::Right(receiver) => {
4234 let res = tokio::select! {
4235 resp = receiver => {
4236 match resp {
4237 Ok(resp) => Ok(vec![resp]),
4238 Err(_) => Err(DbErrorReadWithTimeout::from(DbErrorGeneric::Close)),
4239 }
4240 }
4241 outcome = timeout_fut => Err(DbErrorReadWithTimeout::Timeout(outcome)),
4242 };
4243 cleanup();
4244 res
4245 }
4246 }
4247 }
4248
4249 #[instrument(level = Level::DEBUG, skip(self, timeout_fut))]
4251 async fn wait_for_finished_result(
4252 &self,
4253 execution_id: &ExecutionId,
4254 timeout_fut: Option<Pin<Box<dyn Future<Output = TimeoutOutcome> + Send>>>,
4255 ) -> Result<SupportedFunctionReturnValue, DbErrorReadWithTimeout> {
4256 let unique_tag: u64 = rand::random();
4257 let execution_id = execution_id.clone();
4258 let execution_finished_subscription = self.0.execution_finished_subscribers.clone();
4259
4260 let cleanup = || {
4261 let mut guard = self.0.execution_finished_subscribers.lock().unwrap();
4262 if let Some(subscribers) = guard.get_mut(&execution_id) {
4263 subscribers.remove(&unique_tag);
4264 }
4265 };
4266
4267 let resp_or_receiver = {
4268 let execution_id = execution_id.clone();
4269 self.transaction(move |tx| {
4270 let pending_state =
4271 Self::get_combined_state(tx, &execution_id)?.execution_with_state.pending_state;
4272 if let PendingState::Finished(finished) = pending_state {
4273 let event =
4274 Self::get_execution_event(tx, &execution_id, finished.version)?;
4275 if let ExecutionRequest::Finished { result, ..} = event.event {
4276 Ok(itertools::Either::Left(result))
4277 } else {
4278 error!("Mismatch, expected Finished row: {event:?} based on t_state {finished}");
4279 Err(DbErrorReadWithTimeout::from(consistency_db_err(
4280 "cannot get finished event based on t_state version"
4281 )))
4282 }
4283 } else {
4284 let (sender, receiver) = oneshot::channel();
4288 let mut guard = execution_finished_subscription.lock().unwrap();
4289 guard.entry(execution_id.clone()).or_default().insert(unique_tag, sender);
4290 Ok(itertools::Either::Right(receiver))
4291 }
4292 },
4293 TxType::Other, "wait_for_finished_result")
4295 .await
4296 }
4297 .inspect_err(|_| {
4298 cleanup();
4302 })?;
4303
4304 let timeout_fut = timeout_fut.unwrap_or_else(|| Box::pin(std::future::pending()));
4305 match resp_or_receiver {
4306 itertools::Either::Left(resp) => Ok(resp), itertools::Either::Right(receiver) => {
4308 let res = tokio::select! {
4309 resp = receiver => {
4310 match resp {
4311 Ok(retval) => Ok(retval),
4312 Err(_recv_err) => Err(DbErrorGeneric::Close.into())
4313 }
4314 }
4315 outcome = timeout_fut => Err(DbErrorReadWithTimeout::Timeout(outcome)),
4316 };
4317 cleanup();
4318 res
4319 }
4320 }
4321 }
4322
4323 #[instrument(level = Level::DEBUG, skip_all, fields(%join_set_id, %execution_id))]
4324 async fn append_delay_response(
4325 &self,
4326 created_at: DateTime<Utc>,
4327 execution_id: ExecutionId,
4328 join_set_id: JoinSetId,
4329 delay_id: DelayId,
4330 result: Result<(), ()>,
4331 ) -> Result<AppendDelayResponseOutcome, DbErrorWrite> {
4332 debug!("append_delay_response");
4333 let event = JoinSetResponseEventOuter {
4334 created_at,
4335 event: JoinSetResponseEvent {
4336 join_set_id,
4337 event: JoinSetResponse::DelayFinished {
4338 delay_id: delay_id.clone(),
4339 result,
4340 },
4341 },
4342 };
4343 let res = self
4344 .transaction(
4345 {
4346 let execution_id = execution_id.clone();
4347 move |tx| Self::append_response(tx, &execution_id, event.clone())
4348 },
4349 TxType::MultipleWrites,
4350 "append_delay_response",
4351 )
4352 .await;
4353 match res {
4354 Ok(notifier) => {
4355 self.notify_all(vec![notifier], created_at);
4356 Ok(AppendDelayResponseOutcome::Success)
4357 }
4358 Err(DbErrorWrite::NonRetriable(DbErrorWriteNonRetriable::Conflict)) => {
4359 let delay_success = self
4360 .transaction(
4361 move |tx| Self::get_delay_response(tx, &execution_id, &delay_id),
4362 TxType::Other, "get_delay_response",
4364 )
4365 .await?;
4366 match delay_success {
4367 Some(true) => Ok(AppendDelayResponseOutcome::AlreadyFinished),
4368 Some(false) => Ok(AppendDelayResponseOutcome::AlreadyCancelled),
4369 None => Err(DbErrorWrite::Generic(DbErrorGeneric::Uncategorized {
4370 reason: "insert failed yet select did not find the response".into(),
4371 context: SpanTrace::capture(),
4372 source: None,
4373 loc: Location::caller(),
4374 })),
4375 }
4376 }
4377 Err(err) => Err(err),
4378 }
4379 }
4380
4381 #[instrument(level = Level::DEBUG, skip_all)]
4382 async fn append_backtrace(&self, append: BacktraceInfo) -> Result<(), DbErrorWrite> {
4383 trace!("append_backtrace");
4384 self.transaction_fire_forget(
4385 move |tx| Self::append_backtrace(tx, &append),
4386 "append_backtrace",
4387 )
4388 .await;
4389 Ok(())
4390 }
4391
4392 #[instrument(level = Level::DEBUG, skip_all)]
4393 async fn append_backtrace_batch(&self, batch: Vec<BacktraceInfo>) -> Result<(), DbErrorWrite> {
4394 trace!("append_backtrace_batch");
4395 self.transaction_fire_forget(
4396 move |tx| {
4397 for append in &batch {
4398 Self::append_backtrace(tx, append)?;
4399 }
4400 Ok::<_, DbErrorWrite>(())
4401 },
4402 "append_backtrace_batch",
4403 )
4404 .await;
4405 Ok(())
4406 }
4407
4408 #[instrument(level = Level::DEBUG, skip_all)]
4409 async fn append_log(&self, row: LogInfoAppendRow) -> Result<(), DbErrorWrite> {
4410 trace!("append_log");
4411 self.transaction_fire_forget(move |tx| Self::append_log(tx, &row), "append_log")
4412 .await;
4413 Ok(())
4414 }
4415
4416 #[instrument(level = Level::DEBUG, skip_all)]
4417 async fn append_log_batch(&self, batch: &[LogInfoAppendRow]) -> Result<(), DbErrorWrite> {
4418 trace!("append_log_batch");
4419 let batch = Vec::from(batch);
4420 self.transaction_fire_forget(
4421 move |tx| {
4422 for row in &batch {
4423 Self::append_log(tx, row)?;
4424 }
4425 Ok::<_, DbErrorWrite>(())
4426 },
4427 "append_log_batch",
4428 )
4429 .await;
4430 Ok(())
4431 }
4432
4433 #[instrument(level = Level::TRACE, skip(self))]
4435 async fn get_expired_timers(
4436 &self,
4437 at: DateTime<Utc>,
4438 ) -> Result<Vec<ExpiredTimer>, DbErrorGeneric> {
4439 self.transaction(
4440 move |conn| {
4441 let mut expired_timers = conn.prepare(
4442 "SELECT execution_id, join_set_id, delay_id FROM t_delay WHERE expires_at <= :at",
4443 )?
4444 .query_map(
4445 named_params! {
4446 ":at": at,
4447 },
4448 |row| {
4449 let execution_id = row.get("execution_id")?;
4450 let join_set_id = row.get::<_, JoinSetId>("join_set_id")?;
4451 let delay_id = row.get::<_, DelayId>("delay_id")?;
4452 let delay = ExpiredDelay { execution_id, join_set_id, delay_id };
4453 Ok(ExpiredTimer::Delay(delay))
4454 },
4455 )?
4456 .collect::<Result<Vec<_>, _>>()?;
4457 let expired = conn.prepare(&format!(r#"
4459 SELECT execution_id, last_lock_version, corresponding_version, intermittent_event_count, max_retries, retry_exp_backoff_millis,
4460 executor_id, run_id
4461 FROM t_state
4462 WHERE pending_expires_finished <= :at AND state = "{STATE_LOCKED}"
4463 "#
4464 )
4465 )?
4466 .query_map(
4467 named_params! {
4468 ":at": at,
4469 },
4470 |row| {
4471 let execution_id = row.get("execution_id")?;
4472 let locked_at_version = Version::new(row.get("last_lock_version")?);
4473 let next_version = Version::new(row.get("corresponding_version")?).increment();
4474 let intermittent_event_count = row.get("intermittent_event_count")?;
4475 let max_retries = row.get("max_retries")?;
4476 let retry_exp_backoff_millis = u64::from(row.get::<_, u32>("retry_exp_backoff_millis")?);
4477 let executor_id = row.get("executor_id")?;
4478 let run_id = row.get("run_id")?;
4479 let lock = ExpiredLock {
4480 execution_id,
4481 locked_at_version,
4482 next_version,
4483 intermittent_event_count,
4484 max_retries,
4485 retry_exp_backoff: Duration::from_millis(retry_exp_backoff_millis),
4486 locked_by: LockedBy { executor_id, run_id },
4487 };
4488 Ok(ExpiredTimer::Lock(lock))
4489 }
4490 )?
4491 .collect::<Result<Vec<_>, _>>()?;
4492 expired_timers.extend(expired);
4493 if !expired_timers.is_empty() {
4494 debug!("get_expired_timers found {expired_timers:?}");
4495 }
4496 Ok(expired_timers)
4497 },
4498 TxType::Other, "get_expired_timers"
4500 )
4501 .await
4502 .map_err(to_generic_error)
4503 }
4504
4505 async fn get_execution_event(
4506 &self,
4507 execution_id: &ExecutionId,
4508 version: &Version,
4509 ) -> Result<ExecutionEvent, DbErrorRead> {
4510 let version = version.0;
4511 let execution_id = execution_id.clone();
4512 self.transaction(
4513 move |tx| Self::get_execution_event(tx, &execution_id, version),
4514 TxType::Other, "get_execution_event",
4516 )
4517 .await
4518 }
4519
4520 async fn get_pending_state(
4521 &self,
4522 execution_id: &ExecutionId,
4523 ) -> Result<ExecutionWithState, DbErrorRead> {
4524 let execution_id = execution_id.clone();
4525 Ok(self
4526 .transaction(
4527 move |tx| Self::get_combined_state(tx, &execution_id),
4528 TxType::Other, "get_pending_state",
4530 )
4531 .await?
4532 .execution_with_state)
4533 }
4534}
4535
4536#[cfg(feature = "test")]
4537#[async_trait]
4538impl concepts::storage::DbConnectionTest for SqlitePool {
4539 #[instrument(level = Level::DEBUG, skip(self, response_event), fields(join_set_id = %response_event.join_set_id))]
4540 async fn append_response(
4541 &self,
4542 created_at: DateTime<Utc>,
4543 execution_id: ExecutionId,
4544 response_event: JoinSetResponseEvent,
4545 ) -> Result<(), DbErrorWrite> {
4546 debug!("append_response");
4547 let event = JoinSetResponseEventOuter {
4548 created_at,
4549 event: response_event,
4550 };
4551 let notifier = self
4552 .transaction(
4553 move |tx| Self::append_response(tx, &execution_id, event.clone()),
4554 TxType::Other, "append_response",
4556 )
4557 .await?;
4558 self.notify_all(vec![notifier], created_at);
4559 Ok(())
4560 }
4561}
4562
4563#[cfg(any(test, feature = "tempfile"))]
4564pub mod tempfile {
4565 use super::{SqliteConfig, SqlitePool};
4566 use tempfile::NamedTempFile;
4567
4568 pub async fn sqlite_pool() -> (SqlitePool, Option<NamedTempFile>) {
4569 if let Ok(path) = std::env::var("SQLITE_FILE") {
4570 (
4571 SqlitePool::new(path, SqliteConfig::default())
4572 .await
4573 .unwrap(),
4574 None,
4575 )
4576 } else {
4577 let file = NamedTempFile::new().unwrap();
4578 let path = file.path();
4579 (
4580 SqlitePool::new(path, SqliteConfig::default())
4581 .await
4582 .unwrap(),
4583 Some(file),
4584 )
4585 }
4586 }
4587}
4588
4589#[cfg(test)]
4590mod tests {
4591 use crate::sqlite_dao::{SqlitePool, TxType, tempfile::sqlite_pool};
4592 use assert_matches::assert_matches;
4593 use chrono::DateTime;
4594 use concepts::{
4595 ComponentId, FunctionFqn, Params,
4596 prefixed_ulid::{DEPLOYMENT_ID_DUMMY, EXECUTION_ID_DUMMY},
4597 storage::{CreateRequest, DbErrorWrite, DbErrorWriteNonRetriable, DbPoolCloseable},
4598 };
4599 use rusqlite::named_params;
4600
4601 const SOME_FFQN: FunctionFqn = FunctionFqn::new_static("ns:pkg/ifc", "fn");
4602
4603 #[tokio::test]
4604 async fn failing_ltx_should_be_rolled_back() -> Result<(), DbErrorWrite> {
4605 let created_at = DateTime::from_timestamp_nanos(0);
4606 let (pool, _guard) = sqlite_pool().await;
4607 pool.transaction(
4608 move |tx| {
4609 let req = CreateRequest {
4610 created_at,
4611 execution_id: EXECUTION_ID_DUMMY,
4612 ffqn: SOME_FFQN,
4613 params: Params::empty(),
4614 parent: None,
4615 metadata: concepts::ExecutionMetadata::empty(),
4616 scheduled_at: created_at,
4617 component_id: ComponentId::dummy_activity(),
4618 deployment_id: DEPLOYMENT_ID_DUMMY,
4619 scheduled_by: None,
4620 };
4621 SqlitePool::create_inner(tx, req)?;
4622 SqlitePool::pause_execution(tx, &EXECUTION_ID_DUMMY, created_at)?;
4623 Ok::<_, DbErrorWrite>(())
4624 },
4625 TxType::MultipleWrites,
4626 "create_inner + pause_execution",
4627 )
4628 .await?;
4629
4630 let err = pool
4632 .transaction(
4633 move |tx| SqlitePool::pause_execution(tx, &EXECUTION_ID_DUMMY, created_at),
4634 TxType::MultipleWrites,
4635 "pause_execution",
4636 )
4637 .await
4638 .unwrap_err();
4639 let reason = assert_matches!(err, DbErrorWrite::NonRetriable(DbErrorWriteNonRetriable::IllegalState { reason, .. }) => reason);
4640 assert_eq!("cannot pause, execution is already paused", reason.as_ref());
4641
4642 let events = pool.transaction(
4643 move |tx| {
4644 let events =
4645 tx.prepare(
4646 "SELECT created_at, json_value, version FROM t_execution_log WHERE execution_id = :execution_id",
4647 )?
4648 .query_map(
4649 named_params! {
4650 ":execution_id": EXECUTION_ID_DUMMY.to_string(),
4651 },
4652 SqlitePool::map_t_execution_log_row,
4653 )
4654 .map_err(DbErrorWrite::from)?
4655 .collect::<Result<Vec<_>, _>>()?;
4656
4657 Ok::<_, DbErrorWrite>(events)
4658 },
4659 TxType::Other, "get_log",
4661 )
4662 .await?;
4663 assert_eq!(2, events.len());
4664 pool.close().await;
4665 Ok(())
4666 }
4667}